Data-Consumer
view release on metacpan or search on metacpan
lib/Data/Consumer.pm view on Meta::CPAN
=head1 SYNOPSIS
use Data::Consumer;
my $consumer = Data::Consumer->new(
type => $consumer_name,
unprocessed => $unprocessed,
working => $working,
processed => $processed,
failed => $failed,
max_passes => $num_or_undef,
max_process => $num_or_undef,
max_elapsed => $seconds_or_undef,
);
$consumer->consume( sub {
my $id = shift;
print "processed $id\n";
} );
=head1 DESCRIPTION
It is a common requirement to need to process a feed of items of some
sort in a robust manner. Such a feed might be records that are inserted
into a table, or files dropped in a delivery directory.
Writing a script that handles all the edge cases, like getting "stuck"
on a failed item, and manages things like locking so that the script
can be parallelized can be tricky and is certainly repetitive.
The aim of L<Data::Consumer> is to provide a framework to allow writing
such consumer type scripts as easy as writing a callback that processes
each item. The framework handles the rest.
The basic idea is that one need only use, or in the case of a feed type
not already supported, define a L<Data::Consumer> subclass
which implements a few reasonably well defined primitive methods which
handle the required tasks, and then the L<Data::Consumer> methods use
those to provide a DWIMily consistent interface to the end consumer.
Currently L<Data::Consumer> is distributed with two subclasses, (well
three actually, but L<Data::Consumer::MySQL> is deprecated in favour
of L<Data::Consumer::MySQL2>) L<Data::Consumer::MySQL2> for handling
records in a MySQL db (using the MySQL C<GET_LOCK()> function), and
L<Data::Consumer::Dir> for handling a drop directory scenario (like
for FTP or a mail directory).
Once a resource type has been defined as a L<Data::Consumer> subclass
the use pattern is to construct the subclass with the appropriate
arguments, and then call consume with a callback.
=head2 The Consumer Pattern
The consumer pattern is where code wants to consume an 'atomic' resource
piece by piece. The consuming code doesn't really want to worry much
about how they got the piece, a task that should be handled by the framework.
The consumer subclasses assume that the resource can be modeled as a
queue (that there is some ordering principle by which they can be processed
in a predictable sequence). The consume pattern in full glory is something
very close to the following following pseudo code. The items marked with
asterisks are where user callbacks may be invoked:
DO
RESET TO THE BEGINNING OF THE QUEUE
WHILE QUEUE NOT EMPTY AND CAN *PROCEED*
ACQUIRE NEXT ITEM TO PROCESS FROM QUEUE
MARK AS 'WORKING'
*PROCESS* ITEM
IF PROCESSING FAILED
MARK AS 'FAILED'
OTHERWISE
MARK AS 'PROCESSED'
SWEEP UP ABANDONDED 'WORKING' ITEMS AND MARK THEM AS 'FAILED'
UNTIL WE CANNOT *PROCEED* OR NOTHING WAS PROCESSED
RELEASE ANY LOCKS STILL HELD
This implies that each item potentially has four states: C<unprocessed>,
C<working>, C<processed> and C<failed>. In a database these might be
values in a field, in a drop directory scenario these would be different
directories, but with all of them they would normally be supplied as
values to the L<Data::Consumer> subclass being created.
=head2 Subclassing Data::Consumer
L<Data::Consumer> can be used with any resource type that can be modeled
as a queue, supports some form of advisory locking mechanism, and
provides a way to discriminate between at least the C<unprocessed> and
C<processed> state.
The routines that must be defined for a new consumer type are C<new()>,
C<reset()>, C<acquire()>, C<release()>, and C<_mark_as()>,
C<_do_callback()>.
=over 4
=item new
It is almost for sure that a subclass will need to override the default
constructor. All L<Data::Consumer> objects are blessed hashes, and in
fact you should always call the parents classes constructor first with:
my $self= $class->SUPER::new();
=item reset
This routine is used to reset the objects internal state so the next call to acquire
will return the first available item in the queue.
=item acquire
This routine is to find and in some way lock the next item in the queue. It should ensure
that it call is_ignored() on each item to verify the item has not been requested to be
ignored.
=item release
This routine is to release any held locks in the object.
=item _mark_as
This routine is called to "mark" an item as a particular state. It
lib/Data/Consumer.pm view on Meta::CPAN
}
}
return 1;
}
=head2 $consumer->leave()
Sometimes its useful to defer processing. This method when called
from within a consume/process callback will result in the
item being marked as 'unprocessed' after the callback returns
(so long as it does not die).
Typically this is invoked as
return $consumer->leave;
from withing a consume/process callback.
Returns $consumer. Will die if not 'unprocessed' state is defined.
=cut
sub leave {
my $self= shift;
confess("Can't leave as 'unprocessed' is undefined!") if not defined $self->{unprocessed};
$self->{defer_leave}++;
return $self;
}
=head2 $consumer->ignore(@list)
This can used to cause acquire to ignore each item in @list.
If @list is empty then it is assumed it is being called from
within consume/process and marks the currently acquired item
as ignored and calls C<< $consumer->leave() >>.
Returns $consumer. Will die if no 'unprocessed' state is defined.
=cut
sub ignore {
my $self= shift;
if (@_) {
for my $id (@_) {
$self->{ignore}{$id}++;
}
} else {
my $id= $self->last_id;
$self->{ignore}{$id}++;
$self->leave;
}
return $self;
}
=head2 $consumer->fail($message)
Same as doing C<die($message)> from within a consume/process callback except
that no exception is thrown (no C<$SIG{__DIE__}> callbacks are invoked) and
the error is deferred until the callback actually returns.
Typically used as
return $consumer->fail;
from within a consumer() callback.
Returns the $consumer object.
=cut
sub fail {
my $self= shift;
$self->{fail}= shift;
return $self;
}
=head2 $consumer->halt()
Causes consume() to halt processing and exit once
the callback returns. Typically invoked like
return $consumer->halt;
or
return $consumer->fail->halt;
Returns the consumer object.
=cut
sub halt {
my $self= shift;
$self->{halt}++;
return $self;
}
=head2 $object->is_ignored($id)
Returns true if an item has been set to be ignored. If $id is omitted
defaults to last_id
=cut
sub is_ignored {
my $self= shift;
my $id= @_ ? shift @_ : $self->last_id;
return if !defined $id;
return $self->{ignore}{$id} ? 1 : 0
}
=head2 $object->reset()
Reset the state of the object.
( run in 0.859 second using v1.01-cache-2.11-cpan-39bf76dae61 )