AnyEvent-Beanstalk-Worker
view release on metacpan or search on metacpan
eg/ssh-add.pl view on Meta::CPAN
die "This goes to STDERR";
_SCRIPT_
my $job = $bs->put({ priority => 100,
ttr => 10,
delay => 1,
encode => { target => 'localhost',
scripts => \@scripts }})->recv;
say STDERR "job added to queue: " . Dumper($job->id);
exit;
$w->on(
run => sub {
my $self = shift;
my ( $qjob, $qresp ) = @_;
my $job = $qjob->decode;
my $cv = AnyEvent->condvar(
cb => sub {
my ($results, $res_out, $res_err) = $_[0]->recv;
say "success or fail: " . Dumper($results);
say "script stdout: " . Dumper($res_out);
say "script stderr: " . Dumper($res_err);
$self->finish( delete => $qjob->id );
}
);
my %results = ();
my %stdout = ();
my %stderr = ();
$cv->begin( sub { $_[0]->send( \%results, \%stdout, \%stderr ) } );
run_scripts( $cv, $job->{target}, $job->{scripts}, \%results, \%stdout, \%stderr );
$cv->end;
}
);
$w->start;
say STDERR "ctrl-c/SIGINT to stop";
EV::run;
exit;
sub run_scripts {
my $cv_done = shift;
my $target = shift;
my $scripts = shift;
eg/web-state.pl view on Meta::CPAN
( concurrency => 1, ## crank this up to 10000 for some real action
max_stop_tries => 1,
initial_state => 'fetch',
beanstalk_watch => "urls" );
$w->beanstalk->use("urls")->recv;
$w->on(fetch => sub {
my ($self, $job, $resp) = @_;
say STDERR "fetching " . $job->data;
$w->{ua}->get($job->data, sub { $self->emit(receive => $job, @_) });
});
$w->on(receive => sub {
my ($self, $job, undef, $tx) = @_;
if ( $tx->error ) {
warn "Moved or some error: " . $tx->error;
return $self->finish(delete => $job->id);
}
unless ($tx->res->headers->content_type =~ /html/i) {
warn "Not HTML; skipping\n";
return $self->finish(delete => $job->id);
}
say STDERR "parsing " . $job->data;
eval {
$tx->res->dom->at("html body")->find('a[href]')
->each(sub { $self->emit(add_url => shift->{href}) });
};
return $self->finish(delete => $job->id);
});
$w->on(add_url => sub {
my ($self, $url) = @_;
return unless $url =~ /^http/;
$self->beanstalk
->put({ priority => 100,
ttr => 15,
delay => 1,
data => $url },
sub { say STDERR "URL $url added" });
});
$w->start;
EV::run;
beanstalk_decoder => sub {
eval { decode_json(shift) };
}
);
$w->on(
fetch => sub {
my $self = shift;
my $job = shift;
say STDERR "trying to fetch " . $job->decode->{url} . "...";
$self->{ua}->get(
$job->decode->{url},
sub {
$self->emit( show => $job, @_ );
}
);
}
);
$w->on(
unless ( $tx->res->code and $tx->res->code =~ /^2/ ) {
warn => "Moved or some error";
return $self->finish(delete => $job->id);
}
if ($tx->res->headers->content_type =~ /html/i) {
my $title = '';
eval { $title = $tx->res->dom->html->head->title->text };
if ($@) { warn $@ }
else { say STDERR "found title: " . $title }
}
else {
say STDERR "found body: " . $tx->res->body;
}
return $self->finish(delete => $job->id);
}
);
$w->start;
EV::run;
lib/AnyEvent/Beanstalk/Worker.pm view on Meta::CPAN
beanstalk_watch => 'jobs',
beanstalk_decoder => sub {
eval { decode_json(shift) };
}
);
$w->on(reserved => sub {
my $self = shift;
my ($qjob, $qresp) = @_;
say "Got a job: " . Dumper($qjob->decode);
shift->emit( my_next_state => $qjob );
});
$w->on(my_next_state => sub {
my $self = shift;
my $job = shift;
## do something with job
...
lib/AnyEvent/Beanstalk/Worker.pm view on Meta::CPAN
max_stop_tries => 1,
initial_state => 'fetch',
beanstalk_watch => "urls" );
## do this before we call start()
$w->beanstalk->use("urls")->recv;
$w->on(fetch => sub {
my ($self, $job, $resp) = @_;
say STDERR "fetching " . $job->data;
$w->{ua}->get($job->data, sub { $self->emit(receive => $job, @_) });
});
$w->on(receive => sub {
my ($self, $job, undef, $tx) = @_;
if ( $tx->error ) {
warn "Moved or some error: " . $tx->error;
return $self->finish(delete => $job->id);
}
unless ($tx->res->headers->content_type =~ /html/i) {
warn "Not HTML; skipping\n";
return $self->finish(delete => $job->id);
}
say STDERR "parsing " . $job->data;
eval {
$tx->res->dom->at("html body")->find('a[href]')
->each(sub { $self->emit(add_url => shift->{href}) });
};
return $self->finish(delete => $job->id);
});
$w->on(add_url => sub {
my ($self, $url) = @_;
return unless $url =~ /^http/;
$self->beanstalk
->put({ priority => 100,
ttr => 15,
delay => 1,
data => $url },
sub { say STDERR "URL $url added" });
});
$w->start;
AnyEvent->condvar->recv;
We've just written a simple (and impolite--should read F<robots.txt>)
web crawler.
See F<eg/web-state.pl> and F<eg/web-state-add.pl> for this example.
( run in 0.760 second using v1.01-cache-2.11-cpan-d7a12ab2c7f )