Centrifugo-Client

 view release on metacpan or  search on metacpan

lib/Centrifugo/Client.pm  view on Meta::CPAN

	$this->{MAX_RETRY} = delete $params{max_retry} || 30;
	$this->{RESUBSCRIBE} = ! defined $params{resubscribe} || $params{resubscribe}!~/^(0|false|no)$/i; delete $params{resubscribe};
	$this->{RECOVER} = $params{recover} && $params{recover}!~/^(0|false|no)$/i; delete $params{recover};
	croak "Centrifugo::Client : Unknown parameter : ".join',',keys %params if %params;
	return $this;
}

=head1 FUNCTION connect - send authorization parameters to Centrifugo so your connection could start subscribing on channels.

$client->connect(
		user => $USER_ID,
		timestamp => $TIMESTAMP,
		token => $TOKEN,
		[info => $info,]
		[uid => $uid,]
		);

This function retuns $self to allow chains of multiple function calls.
		
It is possible to provide a UID for this command, but if you don't, a random one will be generated for you and cannot be retrieved afterward.

=cut

sub connect {
	my ($this,%PARAMS) = @_;
	croak("Missing user in Centrifugo::Client->connect(...)") if ! $PARAMS{user};
	croak("Missing timestamp in Centrifugo::Client->connect(...)") if ! $PARAMS{timestamp};
	croak("Missing token in Centrifugo::Client->connect(...)") if ! $PARAMS{token};
	# Fix parameters sent to Centrifugo
	$PARAMS{timestamp}="$PARAMS{timestamp}" if $PARAMS{timestamp}; # This MUST be a string
	# Save the Centrifugo connection parameters
	$this->{_cnx_uid} = delete $PARAMS{uid} || _generate_random_id();
	$this->{_cnx_params} = \%PARAMS;
	
	# Connects to Websocket
	$this->_reset_reconnect_sequence();
	$this->_connect();
	return $this;
}

# This function (re)connects to the websocket
sub _connect {
	my ($this) = @_;
	$this->{WEBSOCKET}->connect( $this->{WS_URL} )->cb(sub {
		$this->{WSHANDLE} = eval { shift->recv };
		if ($@) {
			$this->_on_error($@);
			$this->_reconnect();
			return;
		}
		# The websocket connection is OK
		$this->_on_ws_connect();
	});
}

# This function is called when client is connected to the WebSocket
sub _on_ws_connect {
	my ($this) = @_;
	$this->_debug( "Centrifugo::Client : WebSocket connected to $this->{WS_URL}" );
	
	# define the callbacks
	$this->{WSHANDLE}->on(each_message => sub { $this->_on_ws_message($_[1]) });
	$this->{WSHANDLE}->on(finish => sub { $this->_on_close(($_[0])->close_reason()) });
	$this->{WSHANDLE}->on(parse_error => sub {
		my($cnx, $error) = @_;
		$this->_debug( "Error in Centrifugo::Client : $error" );
		$this->{ON}->{'error'}->($error) if $this->{ON}->{'error'};
	});
	
	# Then, connects to Centrifugo
	$this->_send_message( {
		method => 'connect',
		UID => $this->{_cnx_uid},
		params => $this->{_cnx_params}
	} );
}

# This function is called when client is connected to Centrifugo
sub _on_connect {
	my ($this, $body) = @_;
	$this->_debug( "Centrifugo::Client : Connected to Centrifugo : ".encode_json $body );	
	# on Connect, the client_id must be read (if available)
	if ($body && ref($body) eq 'HASH' && $body->{client}) {
		$this->{CLIENT_ID} = $body->{client};
		$this->_debug( "Centrifugo::Client : CLIENT_ID=".$this->{CLIENT_ID} );
	}
	$this->_init_keep_alive_timer() if $this->{MAX_ALIVE};
	$this->_reset_reconnect_sequence();
	$this->_resubscribe() if $this->{RESUBSCRIBE};
}

# This function is called when client receives a message
sub _on_message {
	my ($this, $body) = @_;
	my $uid = $body->{uid};
	my $channel = $body->{channel};
	$this->_debug( "Centrifugo::Client : Message from $channel : ".encode_json $body->{data} );
	$this->{_channels}->{ $channel }->{last} = $uid; # Keeps track of last IDs of messages
}

# This function is called when client is connected to Centrifugo
sub _on_subscribe {
	my ($this, $body) = @_;
	my $channel = $body->{channel};
	$this->_debug( "Centrifugo::Client : Subscribed to $channel : ".encode_json $body );
	if ($body->{recovered} == JSON::true) {
		# Re-emits the lost messages
		my $messages = $body->{messages};
		foreach my $message (reverse @$messages) {
			$this->_on_message($message);
			my $sub = $this->{ON}->{message};
			$sub->($message) if $sub;
		}
	}
	# Keeps track of channels
	$channel=~s/&.*/&/; # Client channel boundary
	$this->{_channels}->{ $channel } = $body;
	$this->{_subscribed_channels}->{ $channel } = 1; # TEST if it worked
	delete $this->{_pending_subscriptions}->{ $channel };
}



( run in 0.896 second using v1.01-cache-2.11-cpan-5837b0d9d2c )