Async-Redis
view release on metacpan or search on metacpan
examples/pagi-chat/lib/ChatApp/State.pm view on Meta::CPAN
#
use strict;
use warnings;
use Future;
use Future::AsyncAwait;
use Future::Selector;
use Exporter 'import';
use JSON::MaybeXS;
use Time::HiRes qw(time);
use Scalar::Util qw(weaken);
use Future::IO;
our @EXPORT_OK = qw(
init_redis get_redis get_pubsub
get_session create_session update_session remove_session
get_session_by_name set_session_connected set_session_disconnected
is_session_connected
get_room add_room remove_room get_all_rooms
add_user_to_room remove_user_from_room get_room_users
add_message get_room_messages
lib/Async/Redis.pm view on Meta::CPAN
use strict;
use warnings;
use 5.018;
our $VERSION = '0.002000';
use Future;
use Future::AsyncAwait;
use Future::IO 0.23;
use Future::Selector 0.05;
use Scalar::Util qw(blessed weaken);
use Socket qw(pack_sockaddr_in pack_sockaddr_un inet_aton AF_INET AF_UNIX SOCK_STREAM);
use IO::Handle ();
use IO::Socket::INET;
use Time::HiRes ();
# Error classes
use Async::Redis::Error::Connection;
use Async::Redis::Error::Timeout;
use Async::Redis::Error::Disconnected;
use Async::Redis::Error::Redis;
lib/Async/Redis.pm view on Meta::CPAN
# Fires _resume_after_reconnect on the subscription on success, or
# _fail_fatal on unrecoverable reconnect failure.
sub _reconnect_async {
my ($self, $sub) = @_;
# Dedup against any reconnect already in progress (from either this
# path or _ensure_connected). The slot is the shared signal.
return if $self->{_reconnect_future}
&& !$self->{_reconnect_future}->is_ready;
weaken(my $weak_self = $self);
weaken(my $weak_sub = $sub);
my $f = (async sub {
# Reconnect the socket. _reconnect handles retry/backoff and
# dies with Disconnected if reconnect_max_attempts is exhausted.
await $weak_self->_reconnect;
# Delegate the replay, on_reconnect, and driver-restart work to
# the subscription's unified resume path. _resume_after_reconnect
# handles clearing _paused, setting in_pubsub, replaying all
# tracked channels/patterns, firing on_reconnect, and starting
lib/Async/Redis/Subscription.pm view on Meta::CPAN
package Async::Redis::Subscription;
use strict;
use warnings;
use 5.018;
use Carp ();
use Future;
use Future::AsyncAwait;
use Future::IO;
use Scalar::Util qw(blessed refaddr weaken);
# Threshold for periodic event-loop yield inside the callback driver
# loop. Prevents stack growth when many messages are pre-queued and
# await on an already-ready Future returns synchronously.
use constant MAX_SYNC_DEPTH => 32;
sub new {
my ($class, %args) = @_;
lib/Async/Redis/Subscription.pm view on Meta::CPAN
my $redis = $self->{redis};
my $depth = ($redis && $redis->{message_queue_depth})
? $redis->{message_queue_depth}
: 0; # 0 = unbounded (default)
if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
# Queue full. Return a Future that queues the message once a slot
# opens (signalled by next() calling _slot_waiter->done).
$self->{_slot_waiter} //= Future->new;
my $slot = $self->{_slot_waiter};
weaken(my $weak = $self);
return $slot->then(sub {
return Future->done if !$weak || $weak->{_closed};
push @{$weak->{_pending_messages}}, $msg;
if (my $w = delete $weak->{_message_waiter}) {
$w->done unless $w->is_ready;
}
Future->done;
});
}
( run in 2.687 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )