PocketIO

 view release on metacpan or  search on metacpan

lib/PocketIO/Pool/Redis.pm  view on Meta::CPAN

package PocketIO::Pool::Redis;

use strict;
use warnings;

use base 'PocketIO::Pool';

use AnyEvent::Redis;
use JSON;
use Scalar::Util qw(blessed);

use PocketIO::Connection;

use constant DEBUG => $ENV{POCKETIO_POOL_DEBUG};

sub new {
    my $self = shift->SUPER::new(@_);

    $self->{channel} ||= 'pocketio';

    $self->{redis} ||= {};

    $self->{pub} = $self->_create_client(%{$self->{redis}});
    $self->{sub} = $self->_create_client(%{$self->{redis}});

    $self->{sub}->subscribe(
        $self->{channel} => sub {
            my ($message, $channel) = @_;

            $message = decode_json($message);

            my $invoker_id = $message->{invoker};

            foreach my $conn ($self->_connections) {
                next unless $conn->is_connected;
                next if defined $invoker_id && $conn->id eq $invoker_id;

                $conn->write($message->{message});
            }
        }
    );

    return $self;
}

sub add_connection {
    my $self = shift;
    my $cb   = pop @_;

    my $conn = $self->_build_connection(@_);

    $self->{connections}->{$conn->id} = $conn;

    DEBUG && warn "Added connection '" . $conn->id . "'\n";

    $cb->($conn);
}

sub remove_connection {
    my $self = shift;
    my ($conn, $cb) = @_;

    my $id = blessed $conn ? $conn->id : $conn;

    delete $self->{connections}->{$id};

    DEBUG && warn "Removed connection '" . $id . "'\n";

    $cb->() if $cb;
}

sub send {
    my $self = shift;

    my $message = encode_json({message => "$_[0]"});

    $self->{pub}->publish($self->{channel}, $message);

    return $self;
}

sub broadcast {
    my $self    = shift;
    my $invoker = shift;

    my $message = encode_json({message => "$_[0]", invoker => $invoker->id});

    $self->{pub}->publish($self->{channel}, $message);

    return $self;
}

sub _create_client {
    my $self = shift;

    return AnyEvent::Redis->new(
        host     => '127.0.0.1',
        port     => 6379,
        encoding => 'utf8',
        on_error => sub {
            warn @_;
        },
        @_
    );
}

1;
__END__

=head1 NAME

PocketIO::Pool::Redis - Redis class

=head1 SYNOPSIS

    my $pocketio = PocketIO->new(pool => PocketIO::Pool::Redis->new);

=head1 DESCRIPTION

Uses Redis' pub/sub infrastructure 

=head1 METHODS

=head2 C<new>

Create new instance.

=head2 C<add_connection>

Add new connection.

=head2 C<remove_connection>

Remove connection.

=head2 C<broadcast>

Send broadcast message.

=head2 C<send>

Send message to all client.

=cut



( run in 0.483 second using v1.01-cache-2.11-cpan-7add2cbd662 )