AnyEvent-Beanstalk-Worker

 view release on metacpan or  search on metacpan

lib/AnyEvent/Beanstalk/Worker.pm  view on Meta::CPAN

package AnyEvent::Beanstalk::Worker;

use 5.016001;
use strict;
use warnings;
use feature 'current_sub';
use AnyEvent;
use AnyEvent::Log;
use AnyEvent::Beanstalk;

our $VERSION = '0.05';

sub new {
    my $class = shift;
    my $self  = {};
    bless $self => $class;

    my %args = @_;

    $self->{_cb}     = {};
    $self->{_event}  = {};
    $self->{_jobs}   = {};
    $self->{_events} = [];    ## event queue
    $self->{_handled_jobs} = 0;  ## simple job counter

    $self->{_running}        = 0;
    $self->{_stop_tries}     = 0;
    $self->{_max_stop_tries} = $args{max_stop_tries} // 3;
    $self->{_max_jobs}       = $args{max_jobs} || 0;
    $self->{_concurrency}    = $args{concurrency} || 1;
    $self->{_log_level}      = $args{log_level} // 4;

    $self->{_reserve_timeout}        = $args{reserve_timeout} || 1;
    $self->{_reserve_base}           = $self->{_reserve_timeout};
    $self->{_reserve_timeout_factor} = 1.1;
    $self->{_reserve_timeout_max}    = 4;
    $self->{_release_delay}          = $args{release_delay} || 3;

    $self->{_initial_state}          = $args{initial_state};

    $self->{_log_ctx} = AnyEvent::Log::ctx;
    $self->{_log_ctx}->title(__PACKAGE__);
    $self->{_log_ctx}->level($self->{_log_level});

    $self->{_log}          = {};
    $self->{_log}->{trace} = $self->{_log_ctx}->logger("trace");
    $self->{_log}->{debug} = $self->{_log_ctx}->logger("debug");
    $self->{_log}->{info}  = $self->{_log_ctx}->logger("info");
    $self->{_log}->{note}  = $self->{_log_ctx}->logger("note");

    $self->{_signal} = {};
    $self->{_signal}->{TERM} = AnyEvent->signal(
        signal => "TERM",
        cb =>
          sub { $self->{_log_ctx}->log( warn => "TERM received" ); $self->stop }
    );
    $self->{_signal}->{INT} = AnyEvent->signal(
        signal => "INT",
        cb =>
          sub { $self->{_log_ctx}->log( warn => "INT received" ); $self->stop }
    );
    $self->{_signal}->{USR2} = AnyEvent->signal(
        signal => "USR2",
        cb     => sub {
            $self->{_log_level} =
              ( $self->{_log_level} >= 9 ? 2 : $self->{_log_level} + 1 );
            $self->{_log_ctx}->level($self->{_log_level});
        }
    );

    $args{beanstalk_host} ||= 'localhost';
    $args{beanstalk_port} ||= 11300;

    unless ($args{beanstalk_watch}) {
        die "beanstalk_watch argument required\n";
    }

    $self->beanstalk(
        server  => $args{beanstalk_host} . ':' . $args{beanstalk_port},
        decoder => $args{beanstalk_decoder}
    );

    $self->beanstalk->watch( $args{beanstalk_watch} )->recv;

    $self->on(
        start => sub {
            my $self = shift;
            my $reason = shift || '(unknown)';

            $self->{_log}->{trace}->("in start: $reason");

            unless ( $self->{_running} ) {
                $self->{_log}->{trace}->("worker is not running");
                return;
            }

            unless ( $self->job_count < $self->concurrency ) {
                $self->{_log}->{trace}->( "worker running "
                      . $self->job_count
                      . " jobs; will not accept more jobs until others finish"
                );
                return;
            }

            if ( $self->max_jobs and $self->handled_jobs >= $self->max_jobs ) {
                $self->{_log}->{info}->("Handled " . $self->handled_jobs . "; will not accept more jobs");
                return;

lib/AnyEvent/Beanstalk/Worker.pm  view on Meta::CPAN

            ## we've been waiting for a slot to free up
            $self->emit( start => ('finish sub') );
        }

        $self->{_log}->{info}
          ->( "finished with $job_id ($action); outstanding jobs: "
              . $self->job_count );

#        $cb->($job_id);

        ## we're done
        if ( $self->max_jobs
             and $self->handled_jobs >= $self->max_jobs
             and ! $self->job_count ) {
            $self->{_log}->{info}->("Handled " . $self->handled_jobs . "; quitting");
            return $self->stop;
        }
    };

    eval {
        $self->beanstalk->$action( $job_id, ( $args ? $args : () ), $internal );
    };

    $self->{_log_ctx}->log(
        error => "first argument to finish() must be a beanstalk command: $@" )
      if $@;
}

sub stop {
    my $self = shift;
    $self->{_stop_tries}++;

    if ( $self->{_stop_tries} >= $self->{_max_stop_tries} ) {
        $self->{_log_ctx}->log(
            warn => "stop requested; impatiently quitting outstanding jobs" );
        exit;
    }

    if ( $self->job_count ) {
        $self->{_log_ctx}
          ->log( warn => "stop requested; waiting for outstanding jobs" );
        return;
    }

    $self->{_log_ctx}->log( fatal => "exiting" );
    exit;
}

sub on {
    my ( $self, $event, $cb ) = @_;

    $self->{_cb}->{$event} = $cb;

    $self->{_event}->{$event} = sub {
        my $evt = shift;
        AnyEvent->condvar(
            cb => sub {
                if ( ref( $self->{_cb}->{$evt} ) eq 'CODE' ) {
                    $self->{_log}->{trace}->("event: $evt");
                    my @data = $_[0]->recv;
                    $self->{_log}->{debug}->(
                        "shift event ($evt): " . shift @{ $self->{_events} } );
                    $self->{_log}->{debug}->(
                        "EVENTS (s): " . join( ' ' => @{ $self->{_events} } ) );
                    $self->{_cb}->{$evt}->(@data);
                }

                $self->{_event}->{$evt} = AnyEvent->condvar( cb => __SUB__ );
            }
        );
      }
      ->($event);
}

sub emit {
    my $self  = shift;
    my $event = shift;
    $self->{_log}->{debug}->("push event ($event)");
    push @{ $self->{_events} }, $event;
    $self->{_log}->{debug}
      ->( "EVENTS (p): " . join( ' ' => @{ $self->{_events} } ) );
    $self->{_event}->{$event}->send( $self, @_ );
}

sub beanstalk {
    my $self = shift;
    $self->{_beanstalk} = AnyEvent::Beanstalk->new(@_) if @_;
    return $self->{_beanstalk};
}

sub job_count { scalar keys %{ $_[0]->{_jobs} } }

sub handled_jobs { $_[0]->{_handled_jobs} }

sub max_jobs { $_[0]->{_max_jobs} }

sub concurrency {
    my $self = shift;

    if (@_) {
        $self->{_concurrency} = shift;
    }
    return $self->{_concurrency};
}

1;
__END__

=head1 NAME

AnyEvent::Beanstalk::Worker - Event-driven FSA for beanstalk queues

=head1 SYNOPSIS

  use AnyEvent::Beanstalk::Worker;
  use Data::Dumper;
  use JSON;

  my $w = AnyEvent::Beanstalk::Worker->new(
      concurrency       => 10,
      initial_state     => 'reserved',
      beanstalk_watch   => 'jobs',
      beanstalk_decoder => sub {
          eval { decode_json(shift) };
      }
  );

  $w->on(reserved => sub {
      my $self = shift;
      my ($qjob, $qresp) = @_;

      say "Got a job: " . Dumper($qjob->decode);

      shift->emit( my_next_state => $qjob );
  });

  $w->on(my_next_state => sub {
      my $self = shift;
      my $job  = shift;



( run in 2.671 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )