Argon

 view release on metacpan or  search on metacpan

lib/Argon/Client.pm  view on Meta::CPAN

    if ($msg->denied && $info->{retry}) {
      my $copy  = $info->{orig}->copy;
      my $intvl = $info->{intvl}->();
      log_debug 'Retrying message in %0.2fs: %s', $intvl, $info->{orig}->explain;

      $self->add_msg($copy->id, {
        orig  => $copy,
        cb    => $info->{cb},
        intvl => $info->{intvl},
        retry => 1,
        timer => AnyEvent->timer(after => $intvl, cb => K('send', $self, $copy)),
      });

      return;
    }

    if ($info->{cb}) {
      $info->{cb}->($msg);
    }
    else {
      $self->notify->($msg);

lib/Argon/Client.pm  view on Meta::CPAN

Argon network.

=head2 port

The port number for the L<Argon::Manager>.

=head2 retry

By default, when the network is at capacity, new tasks may be rejected, causing
L<Argon::Message/result> to croak. If C<retry> is set, the C<Argon::Client>
will instead retry the task on a logarithmic backoff timer until the task is
accepted by the manager.

=head2 opened

A code ref that is triggered when the connection is initially opened.

=head2 ready

A code ref that is triggered when the connection has been opened and the
client is ready to begin sending tasks.

lib/Argon/Test.pm  view on Meta::CPAN


sub ar_test {
  my $name    = shift;
  my $code    = pop;
  my $timeout = shift || $DEFAULT_TIMEOUT;

  subtest $name => sub {
    my $cv = AnyEvent->condvar;
    my $guard = AnyEvent::Util::guard { $cv->send };

    my $timer = AnyEvent->timer(
      after => $timeout,
      cb => sub { $cv->croak("Failsafe timeout triggered after $timeout seconds") },
    );

    $code->($cv);

    undef $timer;
  };
}

sub channel_pair {
  my ($cb1, $cb2) = @_;
  $cb1 ||= {};
  $cb2 ||= {};

  my ($fh1, $fh2) = AnyEvent::Util::portable_socketpair;
  AnyEvent::Util::fh_nonblocking($fh1, 1);

lib/Argon/Worker.pm  view on Meta::CPAN

  required => 1,
);


has mgr_port => (
  is       => 'ro',
  isa      => 'Int',
  required => 1,
);

has timer => (
  is  => 'rw',
  isa => 'Any',
);

has tries => (
  is      => 'rw',
  isa     => 'Int',
  default => 0,
);

lib/Argon/Worker.pm  view on Meta::CPAN



sub start {
  my $self = shift;
  $self->connect;
}


sub connect {
  my $self = shift;
  $self->timer(undef);
  $self->tries($self->tries + 1);

  log_trace 'Connecting to manager (attempt %d)', $self->tries;

  $self->mgr(Argon::Client->new(
    key    => $self->key,
    token  => $self->token,
    host   => $self->mgr_host,
    port   => $self->mgr_port,
    ready  => K('register', $self),
    closed => K('_disconnected', $self),
    notify => K('_queue', $self),
  ));

  $self->intvl->(1); # reset
}

sub _disconnected {
  my $self = shift;
  log_debug 'Manager disconnected' unless $self->timer;
  $self->reconnect;
}

sub reconnect {
  my $self = shift;
  my $intvl = $self->intvl->();
  $self->timer(AnyEvent->timer(after => $intvl, cb => K('connect', $self)));
  log_debug 'Reconection attempt in %0.4fs', $intvl;
}

sub register {
  my $self = shift;
  log_note 'Connected to manager';
  log_trace 'Registering with manager';

  my $msg = Argon::Message->new(
    cmd  => $HIRE,



( run in 0.985 second using v1.01-cache-2.11-cpan-49f99fa48dc )