AnyEvent-Beanstalk-Worker

 view release on metacpan or  search on metacpan

lib/AnyEvent/Beanstalk/Worker.pm  view on Meta::CPAN

        AnyEvent->condvar(
            cb => sub {
                if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) {
                    $self->{_log}->{trace}->("event: $evt");
                    my @data = $_[0]->recv;
                    $self->{_log}->{debug}->(
                        "shift event ($evt): " . shift @{ $self->{_events} } );
                    $self->{_log}->{debug}->(
                        "EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) );
                    $self->{_cb}->{$evt}->(@data);
                }

                $self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ );
            }
        );
      }
      ->($event);
}

sub emit {
    my $self  = shift;
    my $event = shift;
    $self->{_log}->{debug}->("push event ($event)");
    push @{ $self->{_events} }, $event;
    $self->{_log}->{debug}
      ->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) );
    $self->{_event}->{$event}->send( $self, @_ );
}

sub beanstalk {
    my $self = shift;
    $self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_;
    return $self->{_beanstalk};
}

sub job_count { scalar keys %{ $_[0]->{_jobs} } }

sub handled_jobs { $_[0]->{_handled_jobs} }

sub max_jobs { $_[0]->{_max_jobs} }

sub concurrency {
    my $self = shift;

    if (@_) {
        $self->{_concurrency} = shift;
    }
    return $self->{_concurrency};
}

1;
__END__

=head1 NAME

AnyEvent::Beanstalk::Worker - Event-driven FSA for beanstalk queues

=head1 SYNOPSIS

  use AnyEvent::Beanstalk::Worker;
  use Data::Dumper;
  use JSON;

  my $w = AnyEvent::Beanstalk::Worker->new(
      concurrency       => 10,
      initial_state     => 'reserved',
      beanstalk_watch   => 'jobs',
      beanstalk_decoder => sub {
          eval { decode_json(shift) };
      }
  );

  $w->on(reserved => sub {
      my $self = shift;
      my ($qjob, $qresp) = @_;

      say "Got a job: " . Dumper($qjob->decode);

      shift->emit( my_next_state => $qjob );
  });

  $w->on(my_next_state => sub {
      my $self = shift;
      my $job  = shift;

      ## do something with job
      ...

      ## maybe not ready yet?
      unless ($job_is_ready) {
          return $self->finish(release => $job->id, { delay => 60 });
      }

      ## all done!
      $self->finish(delete => $job->id);
  });

  $w->start;
  AnyEvent->condvar->recv;

=head1 DESCRIPTION

B<AnyEvent::Beanstalk::Worker> implements a simple, abstract
finite-state automaton for beanstalk queues. It can handle a
configurable number of concurrent jobs, and implements graceful worker
shutdown.

You are encouraged to subclass B<AnyEvent::Beanstalk::Worker> and
implement your own B<init> function, for example, so your object has
access to anything you need in subsequent states.

The L</SUPPLEMENTAL> section below contains additional information
about the various technolgies this module uses.

=head1 METHODS

B<AnyEvent::Beanstalk::Worker> implements these methods:

=head2 new

Create a new object. The B<new> method accepts the following arguments:



( run in 1.258 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )