AnyEvent-Beanstalk-Worker

 view release on metacpan or  search on metacpan

lib/AnyEvent/Beanstalk/Worker.pm  view on Meta::CPAN

      ->($event);
}

sub emit {
    my $self  = shift;
    my $event = shift;
    $self->{_log}->{debug}->("push event ($event)");
    push @{ $self->{_events} }, $event;
    $self->{_log}->{debug}
      ->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) );
    $self->{_event}->{$event}->send( $self, @_ );
}

sub beanstalk {
    my $self = shift;
    $self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_;
    return $self->{_beanstalk};
}

sub job_count { scalar keys %{ $_[0]->{_jobs} } }

sub handled_jobs { $_[0]->{_handled_jobs} }

sub max_jobs { $_[0]->{_max_jobs} }

sub concurrency {
    my $self = shift;

    if (@_) {
        $self->{_concurrency} = shift;
    }
    return $self->{_concurrency};
}

1;
__END__

=head1 NAME

AnyEvent::Beanstalk::Worker - Event-driven FSA for beanstalk queues

=head1 SYNOPSIS

  use AnyEvent::Beanstalk::Worker;
  use Data::Dumper;
  use JSON;

  my $w = AnyEvent::Beanstalk::Worker->new(
      concurrency       => 10,
      initial_state     => 'reserved',
      beanstalk_watch   => 'jobs',
      beanstalk_decoder => sub {
          eval { decode_json(shift) };
      }
  );

  $w->on(reserved => sub {
      my $self = shift;
      my ($qjob, $qresp) = @_;

      say "Got a job: " . Dumper($qjob->decode);

      shift->emit( my_next_state => $qjob );
  });

  $w->on(my_next_state => sub {
      my $self = shift;
      my $job  = shift;

      ## do something with job
      ...

      ## maybe not ready yet?
      unless ($job_is_ready) {
          return $self->finish(release => $job->id, { delay => 60 });
      }

      ## all done!
      $self->finish(delete => $job->id);
  });

  $w->start;
  AnyEvent->condvar->recv;

=head1 DESCRIPTION

B<AnyEvent::Beanstalk::Worker> implements a simple, abstract
finite-state automaton for beanstalk queues. It can handle a
configurable number of concurrent jobs, and implements graceful worker
shutdown.

You are encouraged to subclass B<AnyEvent::Beanstalk::Worker> and
implement your own B<init> function, for example, so your object has
access to anything you need in subsequent states.

The L</SUPPLEMENTAL> section below contains additional information
about the various technolgies this module uses.

=head1 METHODS

B<AnyEvent::Beanstalk::Worker> implements these methods:

=head2 new

Create a new object. The B<new> method accepts the following arguments:

=over 4

=item initial_state

Specify an initial state to move to after a job has been reserved. The
handler for this state should expect to receive an
B<AnyEvent::Beanstalk::Job> object and the beanstalk queue response (a
string such as "RESERVED"). Default is undefined--you should supply an
initial state if you want your worker to do anything more than
accepting and deleting jobs from the queue.

=item concurrency

How many concurrent jobs this worker will handle. Set this to a higher
number to process more jobs simultaneously. Defaults to 1.

lib/AnyEvent/Beanstalk/Worker.pm  view on Meta::CPAN

done, it deletes the job from the queue and asks for another job.

=head2 Introduction to AnyEvent

B<AnyEvent> is an elegantly designed, generic interface to a variety
of event loops.

=head2 Introduction to state machines

The idea behind state machines is you have a "machine" (or program
modeling a machine) with a set of I<states> and a set of events that
when triggered alter the state of the machine. For example, we could
model a web crawler as a state machine. Our states will be I<get url>,
I<fetch>, I<parse>, and I<add url>, and our events will be I<got url>,
I<fetched>, I<parsed>, and I<added>.

                +---------+
                | get url |
                +-/-----^-+
      (got url)  /       \
                /         \ (added)
         +-----v-+     +---\-----+
         | fetch |     | add url |
         +-----\-+     +-^-------+
      (fetched) \       /
                 \     / (parsed)
                +-v---/-+
                | parse |
                +-------+

In the I<get url> state, we take a URL from a list of URLs (perhaps we
seed it with one URL), then we emit the I<got url> event. This causes
our machine to move to the I<fetch> state. In the I<fetch> state, we
make an HTTP C<GET> request on that URL and then emit the I<fetched>
event, which moves our machine to the I<parse> state where we parse
the incoming web page. Then we add any URLs we find into the queue and
start over.

If we use our B<WebWorker> class above, the result might look like
this:

    #!/usr/bin/env perl
    use strict;
    use warnings;
    use feature 'say';

    use WebWorker;

    my $w = WebWorker->new
      ( concurrency     => 1,
        max_stop_tries  => 1,
        initial_state   => 'fetch',
        beanstalk_watch => "urls" );

    ## do this before we call start()
    $w->beanstalk->use("urls")->recv;

    $w->on(fetch => sub {
        my ($self, $job, $resp) = @_;

        say STDERR "fetching " . $job->data;
        $w->{ua}->get($job->data, sub { $self->emit(receive => $job, @_) });
    });

    $w->on(receive => sub {
        my ($self, $job, undef, $tx) = @_;

        if ( $tx->error ) {
            warn "Moved or some error: " . $tx->error;
            return $self->finish(delete => $job->id);
        }

        unless ($tx->res->headers->content_type =~ /html/i) {
            warn "Not HTML; skipping\n";
            return $self->finish(delete => $job->id);
        }

        say STDERR "parsing " . $job->data;
        eval {
            $tx->res->dom->at("html body")->find('a[href]')
              ->each(sub { $self->emit(add_url => shift->{href}) });
        };

        return $self->finish(delete => $job->id);
    });

    $w->on(add_url => sub {
        my ($self, $url) = @_;

        return unless $url =~ /^http/;

        $self->beanstalk
          ->put({ priority => 100,
                  ttr      => 15,
                  delay    => 1,
                  data     => $url },
                sub { say STDERR "URL $url added" });
    });

    $w->start;

    AnyEvent->condvar->recv;

We've just written a simple (and impolite--should read F<robots.txt>)
web crawler.

See F<eg/web-state.pl> and F<eg/web-state-add.pl> for this example.

=head2 Introduction to event loops

I couldn't find any gentle introductions into event loops; I was going
to write one myself but realized it would probably turn into a
book. Additionally, I'm not qualified to write said book. With that
disclaimer, here is a brief, "close enough" introduction to event
loops which may help some people get an approximate mental model, good
enough to begin event programming.

An event loop can be as simple as this:

    my @events = ();
    my %watchers = ();

    while (1) {
        my $event = pop @events;
        handle($event);
    }

    sub handle {
        my $event = shift;

        $_->($event) for @{$watchers{$event->{type}}};
    }

The C<@events> list (or queue, since events are read as a FIFO) might
be populated asynchronously from system events, such as receiving
signals, network data, disk I/O, timers, or other sources. The
C<handle()> subroutine checks the C<%watchers> hash to see if there
are any watchers or handlers for this event and calls those
subroutines as needed. Some of these subroutines may add more events
to the event queue. Then the loop starts again.

Most of the time you never see the event loop--you just start it. For
example, most of the time when I'm programming with B<EV>, this is all
I ever see of it:

    EV::run;

B<EV> receives all kinds of events from the system, but you can tell
it about more events. Then you register event I<handlers> to fire off
when a particular kind of event is received.

=head1 SEE ALSO

B<beanstalkd>, by Keith Rarick: L<http://kr.github.io/beanstalkd/>

B<AnyEvent::Beanstalk>, by Graham Barr: L<AnyEvent::Beanstalk>



( run in 2.525 seconds using v1.01-cache-2.11-cpan-d7f47b0818f )