Argon
view release on metacpan or search on metacpan
lib/Argon/Client.pm view on Meta::CPAN
if ($msg->denied && $info->{retry}) {
my $copy = $info->{orig}->copy;
my $intvl = $info->{intvl}->();
log_debug 'Retrying message in %0.2fs: %s', $intvl, $info->{orig}->explain;
$self->add_msg($copy->id, {
orig => $copy,
cb => $info->{cb},
intvl => $info->{intvl},
retry => 1,
timer => AnyEvent->timer(after => $intvl, cb => K('send', $self, $copy)),
});
return;
}
if ($info->{cb}) {
$info->{cb}->($msg);
}
else {
$self->notify->($msg);
lib/Argon/Client.pm view on Meta::CPAN
Argon network.
=head2 port
The port number for the L<Argon::Manager>.
=head2 retry
By default, when the network is at capacity, new tasks may be rejected, causing
L<Argon::Message/result> to croak. If C<retry> is set, the C<Argon::Client>
will instead retry the task on a logarithmic backoff timer until the task is
accepted by the manager.
=head2 opened
A code ref that is triggered when the connection is initially opened.
=head2 ready
A code ref that is triggered when the connection has been opened and the
client is ready to begin sending tasks.
lib/Argon/Test.pm view on Meta::CPAN
sub ar_test {
my $name = shift;
my $code = pop;
my $timeout = shift || $DEFAULT_TIMEOUT;
subtest $name => sub {
my $cv = AnyEvent->condvar;
my $guard = AnyEvent::Util::guard { $cv->send };
my $timer = AnyEvent->timer(
after => $timeout,
cb => sub { $cv->croak("Failsafe timeout triggered after $timeout seconds") },
);
$code->($cv);
undef $timer;
};
}
sub channel_pair {
my ($cb1, $cb2) = @_;
$cb1 ||= {};
$cb2 ||= {};
my ($fh1, $fh2) = AnyEvent::Util::portable_socketpair;
AnyEvent::Util::fh_nonblocking($fh1, 1);
lib/Argon/Worker.pm view on Meta::CPAN
required => 1,
);
has mgr_port => (
is => 'ro',
isa => 'Int',
required => 1,
);
has timer => (
is => 'rw',
isa => 'Any',
);
has tries => (
is => 'rw',
isa => 'Int',
default => 0,
);
lib/Argon/Worker.pm view on Meta::CPAN
sub start {
my $self = shift;
$self->connect;
}
sub connect {
my $self = shift;
$self->timer(undef);
$self->tries($self->tries + 1);
log_trace 'Connecting to manager (attempt %d)', $self->tries;
$self->mgr(Argon::Client->new(
key => $self->key,
token => $self->token,
host => $self->mgr_host,
port => $self->mgr_port,
ready => K('register', $self),
closed => K('_disconnected', $self),
notify => K('_queue', $self),
));
$self->intvl->(1); # reset
}
sub _disconnected {
my $self = shift;
log_debug 'Manager disconnected' unless $self->timer;
$self->reconnect;
}
sub reconnect {
my $self = shift;
my $intvl = $self->intvl->();
$self->timer(AnyEvent->timer(after => $intvl, cb => K('connect', $self)));
log_debug 'Reconection attempt in %0.4fs', $intvl;
}
sub register {
my $self = shift;
log_note 'Connected to manager';
log_trace 'Registering with manager';
my $msg = Argon::Message->new(
cmd => $HIRE,
( run in 0.985 second using v1.01-cache-2.11-cpan-49f99fa48dc )