Centrifugo-Client

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

	})-> on('message', sub{
		my ($infoRef)=@_;
		print "Received message : ".encode_json $infoRef->{data};
	});
	
	# Subscription to channels are delayed until connection
	$cclient->subscribe( channel=>'my-channel&' );
	$cclient->subscribe( channel=>'public-channel' );
	$cclient->subscribe( channel=>'$private' );

	# Now start the event loop to keep the program alive
	AnyEvent->condvar->recv;
	
Note : To connect to a Centrifugo instance, your program should first ask for a TOKEN. In production this must only be done on backend side and you should never show secret to client! 

    use Centrifugo::Client qw!generate_token!;
	my $SECRET = "secret";
	my $USER = "someUser";
	my $TIMESTAMP = time();
    my $TOKEN = generate_token( $SECRET, $USER, $TIMESTAMP );

lib/Centrifugo/Client.pm  view on Meta::CPAN

	}) -> connect(
		user => $USER_ID,
		timestamp => $TIMESTAMP,
		token => $TOKEN
	);
	
 $cclient->subscribe( channel => 'my-channel&' );
 $cclient->subscribe( channel => 'public-channel' );
 $cclient->subscribe( channel => '$private' );

 # Now start the event loop to keep the program alive
 AnyEvent->condvar->recv;
	
=head1 DESCRIPTION

This library allows to communicate with Centrifugo through a websocket.

=cut

use strict;
use warnings;

lib/Centrifugo/Client.pm  view on Meta::CPAN

=head1 FUNCTION new

	my $client = Centrifugo::Client->new( $URL );

or

	my $client = Centrifugo::Client->new( $URL,
	   debug => 'true',          # If true, some informations are written on STDERR
	   debug_ws => 'true',       # If true, all web socket messages are written on STDERR
	   authEndpoint => "...",    # The full URL used to ask for a key to subscribe to private channels
	   max_alive_period => 30,   # interval (in s) since last communication with server that triggers a PING (default 0)
	   refresh_period => 5,      # Check frequency for max_alive_period (default 10s)
	   retry => 0.5 ,            # interval (in ms) between reconnect attempts which value grows exponentially (default 1.0)
	   max_retry => 30,          # upper interval value limit when reconnecting. (default 30)
	   resubscribe => 'true',    # automatic resubscribing on subscriptions (default: 'true')
	   recover => 'true',        # Recovers the lost messages after a reconnection (default: 'false')
	   ws_params => {            # These parameters are passed to AnyEvent::WebSocket::Client->new(...)
			 ssl_no_verify => 'true',
			 timeout => 600
		  },
	   );

lib/Centrifugo/Client.pm  view on Meta::CPAN


sub new {
	my ($class, $ws_url, %params)=@_;
	my $this = {};
	bless($this, $class);
	$this->{WS_URL} = $ws_url;
	$this->{DEBUG} = $params{debug} && $params{debug}!~/^(0|false|no)$/i; delete $params{debug};
	$this->{DEBUG_WS} = $params{debug_ws} && $params{debug_ws}!~/^(0|false|no)$/i; delete $params{debug_ws};
	$this->{AUTH_URL} = delete $params{authEndpoint} || "/centrifuge/auth/";
	$this->{WEBSOCKET} = AnyEvent::WebSocket::Client -> new( %{$params{ws_params}} ); delete $params{ws_params};
	$this->{MAX_ALIVE} = delete $params{max_alive_period} || 0;
	$this->{REFRESH} = delete $params{refresh_period} || 10;
	$this->{RETRY} = delete $params{retry} || 1;
	$this->{MAX_RETRY} = delete $params{max_retry} || 30;
	$this->{RESUBSCRIBE} = ! defined $params{resubscribe} || $params{resubscribe}!~/^(0|false|no)$/i; delete $params{resubscribe};
	$this->{RECOVER} = $params{recover} && $params{recover}!~/^(0|false|no)$/i; delete $params{recover};
	croak "Centrifugo::Client : Unknown parameter : ".join',',keys %params if %params;
	return $this;
}

=head1 FUNCTION connect - send authorization parameters to Centrifugo so your connection could start subscribing on channels.

lib/Centrifugo/Client.pm  view on Meta::CPAN


# This function is called when client is connected to Centrifugo
sub _on_connect {
	my ($this, $body) = @_;
	$this->_debug( "Centrifugo::Client : Connected to Centrifugo : ".encode_json $body );	
	# on Connect, the client_id must be read (if available)
	if ($body && ref($body) eq 'HASH' && $body->{client}) {
		$this->{CLIENT_ID} = $body->{client};
		$this->_debug( "Centrifugo::Client : CLIENT_ID=".$this->{CLIENT_ID} );
	}
	$this->_init_keep_alive_timer() if $this->{MAX_ALIVE};
	$this->_reset_reconnect_sequence();
	$this->_resubscribe() if $this->{RESUBSCRIBE};
}

# This function is called when client receives a message
sub _on_message {
	my ($this, $body) = @_;
	my $uid = $body->{uid};
	my $channel = $body->{channel};
	$this->_debug( "Centrifugo::Client : Message from $channel : ".encode_json $body->{data} );

lib/Centrifugo/Client.pm  view on Meta::CPAN

		$this->subscribe( %$params );
	}
}

# This function is called when the connection with server is lost
sub _on_close {
	my ($this, $message) = @_;
	$message="(none)" unless $message;
	$this->_debug( "Centrifugo::Client : Connection closed, reason=$message" );
	$this->{ON}->{'ws_closed'}->($message) if $this->{ON}->{'ws_closed'};
	undef $this->{_alive_handler};
	undef $this->{WSHANDLE};
	undef $this->{CLIENT_ID};
	delete $this->{_subscribed_channels};
	delete $this->{_pending_subscriptions};
	$this->_reconnect();
}

# This function is called if an errors occurs with the server
sub _on_error {
	my ($this, @infos) = @_;
	warn "Error in Centrifugo::Client : @infos";
	$this->{ON}->{'error'}->(@infos) if $this->{ON}->{'error'};
}

# This function is called once for each message received from Centrifugo
sub _on_ws_message {
	my ($this, $message) = @_;
	$this->_debug_ws("Send > WebSocket : $message->{body}");
	$this->{_last_alive_message} = time();
	my $fullbody = decode_json($message->{body}); # The body of websocket message
	# Handle a body containing {response} : converts into a singleton
	if (ref($fullbody) eq 'HASH') {
		$fullbody = [ $fullbody ];
	}
	# Handle the body which is now an array of response
	foreach my $info (@$fullbody) {
		my $uid = $info->{uid};
		my $method = $info->{method};
		my $body = $info->{body}; # The body of Centrifugo message

lib/Centrifugo/Client.pm  view on Meta::CPAN

		after => $retry_after,
		cb => sub {
			$this->{_next_retry} += $this->{_last_retry};
			$this->{_last_retry} = $retry_after;
			$this->_connect();
		}
	);
}

# Creates the timer to send periodic ping
sub _init_keep_alive_timer {
	my ($this) = @_;
	$this->{_alive_handler} = AnyEvent->timer(
		after => $this->{REFRESH},
		interval => $this->{REFRESH},
		cb => sub {
			my $late = time() - $this->{_last_alive_message};
			if ($late > $this->{MAX_ALIVE}) {
				$this->_debug( "Sending ping (${late}s without message)" );
				$this->ping();
			}
		}
	);
}

=head1 FUNCTION publish - allows clients directly publish messages into channel (use with caution. Client->Server communication is NOT the aim of Centrifugo)



( run in 1.513 second using v1.01-cache-2.11-cpan-39bf76dae61 )