Centrifugo-Client
view release on metacpan or search on metacpan
lib/Centrifugo/Client.pm view on Meta::CPAN
croak("Missing timestamp in Centrifugo::Client->connect(...)") if ! $PARAMS{timestamp};
croak("Missing token in Centrifugo::Client->connect(...)") if ! $PARAMS{token};
# Fix parameters sent to Centrifugo
$PARAMS{timestamp}="$PARAMS{timestamp}" if $PARAMS{timestamp}; # This MUST be a string
# Save the Centrifugo connection parameters
$this->{_cnx_uid} = delete $PARAMS{uid} || _generate_random_id();
$this->{_cnx_params} = \%PARAMS;
# Connects to Websocket
$this->_reset_reconnect_sequence();
$this->_connect();
return $this;
}
# This function (re)connects to the websocket
sub _connect {
my ($this) = @_;
$this->{WEBSOCKET}->connect( $this->{WS_URL} )->cb(sub {
$this->{WSHANDLE} = eval { shift->recv };
if ($@) {
$this->_on_error($@);
$this->_reconnect();
return;
}
# The websocket connection is OK
$this->_on_ws_connect();
});
}
# This function is called when client is connected to the WebSocket
sub _on_ws_connect {
my ($this) = @_;
$this->_debug( "Centrifugo::Client : WebSocket connected to $this->{WS_URL}" );
# define the callbacks
$this->{WSHANDLE}->on(each_message => sub { $this->_on_ws_message($_[1]) });
$this->{WSHANDLE}->on(finish => sub { $this->_on_close(($_[0])->close_reason()) });
$this->{WSHANDLE}->on(parse_error => sub {
my($cnx, $error) = @_;
$this->_debug( "Error in Centrifugo::Client : $error" );
$this->{ON}->{'error'}->($error) if $this->{ON}->{'error'};
});
# Then, connects to Centrifugo
$this->_send_message( {
method => 'connect',
UID => $this->{_cnx_uid},
params => $this->{_cnx_params}
} );
}
# This function is called when client is connected to Centrifugo
sub _on_connect {
my ($this, $body) = @_;
$this->_debug( "Centrifugo::Client : Connected to Centrifugo : ".encode_json $body );
# on Connect, the client_id must be read (if available)
if ($body && ref($body) eq 'HASH' && $body->{client}) {
$this->{CLIENT_ID} = $body->{client};
$this->_debug( "Centrifugo::Client : CLIENT_ID=".$this->{CLIENT_ID} );
}
$this->_init_keep_alive_timer() if $this->{MAX_ALIVE};
$this->_reset_reconnect_sequence();
$this->_resubscribe() if $this->{RESUBSCRIBE};
}
# This function is called when client receives a message
sub _on_message {
my ($this, $body) = @_;
my $uid = $body->{uid};
my $channel = $body->{channel};
$this->_debug( "Centrifugo::Client : Message from $channel : ".encode_json $body->{data} );
$this->{_channels}->{ $channel }->{last} = $uid; # Keeps track of last IDs of messages
}
# This function is called when client is connected to Centrifugo
sub _on_subscribe {
my ($this, $body) = @_;
my $channel = $body->{channel};
$this->_debug( "Centrifugo::Client : Subscribed to $channel : ".encode_json $body );
if ($body->{recovered} == JSON::true) {
# Re-emits the lost messages
my $messages = $body->{messages};
foreach my $message (reverse @$messages) {
$this->_on_message($message);
my $sub = $this->{ON}->{message};
$sub->($message) if $sub;
}
}
# Keeps track of channels
$channel=~s/&.*/&/; # Client channel boundary
$this->{_channels}->{ $channel } = $body;
$this->{_subscribed_channels}->{ $channel } = 1; # TEST if it worked
delete $this->{_pending_subscriptions}->{ $channel };
}
# This function is called when client is connected to Centrifugo
sub _on_unsubscribe {
my ($this, $body) = @_;
my $channel = $body->{channel};
$this->_debug( "Centrifugo::Client : Unsubscribed from $body->{channel} : ".encode_json $body );
# Keeps track of channels
$channel=~s/&.*/&/; # Client channel boundary
delete $this->{_channels}->{ $channel };
delete $this->{_subscribed_channels}->{ $channel };
delete $this->{_pending_subscriptions}->{ $channel };
}
# This function automatically reconnects to channels
sub _resubscribe {
my ($this) = @_;
foreach my $channel (keys %{$this->{_channels}}) {
$this->_debug( "Centrifugo::Client : Resubscribe to $channel" );
$channel=~s/&.*/&/; # Client channel boundary
my $params = {
channel => $channel
};
if ($this->{RECOVER} && $this->{_channels}->{$channel}->{last}) {
$params->{recover}=JSON::true;
$params->{last}=$this->{_channels}->{$channel}->{last};
}
$this->subscribe( %$params );
lib/Centrifugo/Client.pm view on Meta::CPAN
undef $this->{CLIENT_ID};
delete $this->{_subscribed_channels};
delete $this->{_pending_subscriptions};
$this->_reconnect();
}
# This function is called if an errors occurs with the server
sub _on_error {
my ($this, @infos) = @_;
warn "Error in Centrifugo::Client : @infos";
$this->{ON}->{'error'}->(@infos) if $this->{ON}->{'error'};
}
# This function is called once for each message received from Centrifugo
sub _on_ws_message {
my ($this, $message) = @_;
$this->_debug_ws("Send > WebSocket : $message->{body}");
$this->{_last_alive_message} = time();
my $fullbody = decode_json($message->{body}); # The body of websocket message
# Handle a body containing {response} : converts into a singleton
if (ref($fullbody) eq 'HASH') {
$fullbody = [ $fullbody ];
}
# Handle the body which is now an array of response
foreach my $info (@$fullbody) {
my $uid = $info->{uid};
my $method = $info->{method};
my $body = $info->{body}; # The body of Centrifugo message
$this->_on_connect( $body ) if $method eq 'connect';
$this->_on_subscribe( $body ) if $method eq 'subscribe';
$this->_on_unsubscribe( $body ) if $method eq 'unsubscribe';
$this->_on_message( $body ) if $method eq 'message';
# Call the callback of the method
my $sub = $this->{ON}->{$method};
if ($sub) { # TODO : CHECK THIS !!!
# Add UID into body if available
if ($uid) {
$body->{uid}=$uid;
}
$sub->( $body );
}
}
}
# Inits the Fibonacci sequence for reconnection retries
sub _reset_reconnect_sequence {
my ($this) = @_;
$this->{_last_retry} = 0;
$this->{_next_retry} = $this->{RETRY};
}
# Reconnects to the server after a loss of connection
# When client disconnected from server it will automatically try to reconnect using
# fibonacci sequence to get interval between reconnect attempts which value grows exponentially. (why not ?)
sub _reconnect {
my ($this) = @_;
my $retry_after = $this->{_next_retry} > $this->{MAX_RETRY} ? $this->{MAX_RETRY} : $this->{_next_retry};
$retry_after = int($retry_after) if $retry_after > 3;
$this->_debug( "Centrifugo::Client : will reconnect after $retry_after s." );
$this->{reconnect_handler} = AnyEvent->timer(
after => $retry_after,
cb => sub {
$this->{_next_retry} += $this->{_last_retry};
$this->{_last_retry} = $retry_after;
$this->_connect();
}
);
}
# Creates the timer to send periodic ping
sub _init_keep_alive_timer {
my ($this) = @_;
$this->{_alive_handler} = AnyEvent->timer(
after => $this->{REFRESH},
interval => $this->{REFRESH},
cb => sub {
my $late = time() - $this->{_last_alive_message};
if ($late > $this->{MAX_ALIVE}) {
$this->_debug( "Sending ping (${late}s without message)" );
$this->ping();
}
}
);
}
=head1 FUNCTION publish - allows clients directly publish messages into channel (use with caution. Client->Server communication is NOT the aim of Centrifugo)
$client->publish( channel=>$channel, data=>$data, [uid => $uid] );
$data must be a HASHREF to a structure (which will be encoded to JSON), for example :
$client->public ( channel => "public",
data => {
nick => "Anonymous",
text => "My message",
} );
or even :
$client->public ( channel => "public", data => { } ); # Sends an empty message to the "public" channel
This function returns the UID used to send the command to the server. (a random string if none is provided)
=cut
sub publish {
my ($this, %PARAMS) = @_;
croak("Missing channel in Centrifugo::Client->publish(...)") unless $PARAMS{channel};
croak("Missing data in Centrifugo::Client->publish(...)") unless $PARAMS{data};
my $uid = $PARAMS{'uid'} || _generate_random_id();
delete $PARAMS{'uid'};
$PARAMS{channel}=~s/&.*/'&' . $this->client_id()/e; # Client channel boundary
$this->_send_message({
UID => $uid,
method => 'publish',
params => \%PARAMS
});
return $uid;
}
=head1 FUNCTION disconnect
$client->disconnect();
=cut
sub disconnect {
my ($this) = @_;
$this->{WSHANDLE}->close() if $this->{WSHANDLE};
my $sub = $this->{ON}->{'disconnect'};
$sub->() if $sub;
}
=head1 FUNCTION subscribe - allows to subscribe on channel after client successfully connected.
( run in 0.844 second using v1.01-cache-2.11-cpan-39bf76dae61 )