AnyEvent-Beanstalk-Worker

 view release on metacpan or  search on metacpan

eg/ssh-add.pl  view on Meta::CPAN


die "This goes to STDERR";
_SCRIPT_

my $job = $bs->put({ priority => 100,
                     ttr      => 10,
                     delay    => 1,
                     encode   => { target => 'localhost',
                                   scripts => \@scripts }})->recv;

say STDERR "job added to queue: " . Dumper($job->id);

exit;

eg/ssh.pl  view on Meta::CPAN


$w->on(
    run => sub {
        my $self = shift;
        my ( $qjob, $qresp ) = @_;
        my $job = $qjob->decode;

        my $cv      = AnyEvent->condvar(
            cb => sub {
                my ($results, $res_out, $res_err) = $_[0]->recv;
                say "success or fail: " . Dumper($results);
                say "script stdout: " . Dumper($res_out);
                say "script stderr: " . Dumper($res_err);

                $self->finish( delete => $qjob->id );
            }
        );

        my %results = ();
        my %stdout  = ();
        my %stderr  = ();
        $cv->begin( sub { $_[0]->send( \%results, \%stdout, \%stderr ) } );
        run_scripts( $cv, $job->{target}, $job->{scripts}, \%results, \%stdout, \%stderr );
        $cv->end;
    }
);

$w->start;

say STDERR "ctrl-c/SIGINT to stop";

EV::run;

exit;

sub run_scripts {
    my $cv_done = shift;
    my $target  = shift;
    my $scripts = shift;

eg/web-state.pl  view on Meta::CPAN

  ( concurrency     => 1,  ## crank this up to 10000 for some real action
    max_stop_tries  => 1,
    initial_state   => 'fetch',
    beanstalk_watch => "urls" );

$w->beanstalk->use("urls")->recv;

$w->on(fetch => sub {
    my ($self, $job, $resp) = @_;

    say STDERR "fetching " . $job->data;
    $w->{ua}->get($job->data, sub { $self->emit(receive => $job, @_) });
});

$w->on(receive => sub {
    my ($self, $job, undef, $tx) = @_;

    if ( $tx->error ) {
        warn "Moved or some error: " . $tx->error;
        return $self->finish(delete => $job->id);
    }

    unless ($tx->res->headers->content_type =~ /html/i) {
        warn "Not HTML; skipping\n";
        return $self->finish(delete => $job->id);
    }

    say STDERR "parsing " . $job->data;
    eval {
        $tx->res->dom->at("html body")->find('a[href]')
          ->each(sub { $self->emit(add_url => shift->{href}) });
    };

    return $self->finish(delete => $job->id);
});

$w->on(add_url => sub {
    my ($self, $url) = @_;

    return unless $url =~ /^http/;

    $self->beanstalk
      ->put({ priority => 100,
              ttr      => 15,
              delay    => 1,
              data     => $url },
            sub { say STDERR "URL $url added" });
});

$w->start;

EV::run;

eg/web.pl  view on Meta::CPAN

    beanstalk_decoder => sub {
        eval { decode_json(shift) };
    }
);

$w->on(
    fetch => sub {
        my $self = shift;
        my $job  = shift;

        say STDERR "trying to fetch " . $job->decode->{url} . "...";
        $self->{ua}->get(
            $job->decode->{url},
            sub {
                $self->emit( show => $job, @_ );
            }
        );
    }
);

$w->on(

eg/web.pl  view on Meta::CPAN

        unless ( $tx->res->code and $tx->res->code =~ /^2/ ) {
            warn => "Moved or some error";
            return $self->finish(delete => $job->id);
        }

        if ($tx->res->headers->content_type =~ /html/i) {
            my $title = '';
            eval { $title = $tx->res->dom->html->head->title->text };

            if ($@) { warn $@ }
            else { say STDERR "found title: " . $title }
        }

        else {
            say STDERR "found body: " . $tx->res->body;
        }

        return $self->finish(delete => $job->id);
    }
);

$w->start;

EV::run;

lib/AnyEvent/Beanstalk/Worker.pm  view on Meta::CPAN

      beanstalk_watch   => 'jobs',
      beanstalk_decoder => sub {
          eval { decode_json(shift) };
      }
  );

  $w->on(reserved => sub {
      my $self = shift;
      my ($qjob, $qresp) = @_;

      say "Got a job: " . Dumper($qjob->decode);

      shift->emit( my_next_state => $qjob );
  });

  $w->on(my_next_state => sub {
      my $self = shift;
      my $job  = shift;

      ## do something with job
      ...

lib/AnyEvent/Beanstalk/Worker.pm  view on Meta::CPAN

        max_stop_tries  => 1,
        initial_state   => 'fetch',
        beanstalk_watch => "urls" );

    ## do this before we call start()
    $w->beanstalk->use("urls")->recv;

    $w->on(fetch => sub {
        my ($self, $job, $resp) = @_;

        say STDERR "fetching " . $job->data;
        $w->{ua}->get($job->data, sub { $self->emit(receive => $job, @_) });
    });

    $w->on(receive => sub {
        my ($self, $job, undef, $tx) = @_;

        if ( $tx->error ) {
            warn "Moved or some error: " . $tx->error;
            return $self->finish(delete => $job->id);
        }

        unless ($tx->res->headers->content_type =~ /html/i) {
            warn "Not HTML; skipping\n";
            return $self->finish(delete => $job->id);
        }

        say STDERR "parsing " . $job->data;
        eval {
            $tx->res->dom->at("html body")->find('a[href]')
              ->each(sub { $self->emit(add_url => shift->{href}) });
        };

        return $self->finish(delete => $job->id);
    });

    $w->on(add_url => sub {
        my ($self, $url) = @_;

        return unless $url =~ /^http/;

        $self->beanstalk
          ->put({ priority => 100,
                  ttr      => 15,
                  delay    => 1,
                  data     => $url },
                sub { say STDERR "URL $url added" });
    });

    $w->start;

    AnyEvent->condvar->recv;

We've just written a simple (and impolite--should read F<robots.txt>)
web crawler.

See F<eg/web-state.pl> and F<eg/web-state-add.pl> for this example.



( run in 0.760 second using v1.01-cache-2.11-cpan-d7a12ab2c7f )