Dezi-Bot
view release on metacpan or search on metacpan
lib/Dezi/Bot/Queue/DBI.pm view on Meta::CPAN
$password,
{ RaiseError => 1,
AutoCommit => 1,
}
);
$self->{conn}->mode('fixup'); # ping only on failure
$self->{ih} = DBIx::InsertHash->new(
table => $self->{table_name},
quote => $self->{quote},
quote_char => $self->{quote_char},
);
return $self;
}
=head2 conn
Returns the internal DBIx::Connector object.
=cut
sub conn {
return shift->{conn};
}
=head2 put( I<item>, I<args> )
Add I<item> to the queue.
=cut
sub put {
my $self = shift;
my $item = shift;
if ( !defined $item ) {
croak "item required";
}
my %cols = @_;
my $md5 = md5_hex("$item");
my $row = {
lock_time => 0,
uri_md5 => $md5,
uri => $item,
queue_time => Time::HiRes::time(),
queue_name => $self->name,
%cols,
};
$self->{conn}->run(
sub {
my $dbh = $_; # just for clarity
$self->{ih}->insert( $row, $self->{table_name}, $dbh );
}
);
}
=head2 get([ I<limit>, I<update_cols> ])
Returns the next item from the queue, marking it as unavailable.
Default is to return 1 item, but set I<limit> to return multiple.
I<update_cols> is an optional hashref of column/value pairs to update
when each item is locked.
=cut
sub get {
my $self = shift;
my $limit = shift || 1;
my $update_cols = shift || {};
my @items;
my $t = $self->{table_name};
$self->{conn}->run(
sub {
my $dbh = $_; # just for clarity
my $sth
= $dbh->prepare(
qq/select * from $t where queue_name=? and lock_time=0 order by priority DESC, queue_time ASC limit ?/
);
$sth->execute( $self->name, $limit );
while ( my $row = $sth->fetchrow_hashref() ) {
push @items, URI->new( $row->{uri} );
# lock
$row->{lock_time} = Time::HiRes::time();
# mixin/override
$row->{$_} = $update_cols->{$_} for keys %$update_cols;
# update
$self->{ih}->update( $row, [ $row->{id} ],
'id=?', $self->{table_name}, $dbh );
}
}
);
return ( $limit == 1 ) ? $items[0] : \@items;
}
=head2 remove( I<item> )
Remove I<item> from the queue completely.
=cut
sub remove {
my $self = shift;
my $item = shift;
if ( !defined $item ) {
croak "item required";
}
my $count = 0;
my $md5 = md5_hex("$item");
my $t = $self->{table_name};
$self->{conn}->run(
sub {
my $dbh = $_; # just for clarity
$count
= $dbh->do( qq/delete from $t where uri_md5=?/, undef, $md5 );
}
);
return $count;
}
=head2 clean
Remove all locked items from the queue.
=cut
sub clean {
my $self = shift;
my $count = 0;
my $t = $self->{table_name};
$self->{conn}->run(
sub {
my $dbh = $_; # just for clarity
$count = $dbh->do(qq/delete from $t where locked!=0/);
}
);
return $count;
}
=head2 peek([ I<limit> ])
Returns the next item value, but leaves it on the stack as available.
=cut
sub peek {
my $self = shift;
my $limit = shift || 1;
my @items;
my $t = $self->{table_name};
$self->{conn}->run(
sub {
my $dbh = $_; # just for clarity
my $sth
= $dbh->prepare(
qq/select * from $t where lock_time=0 order by priority DESC, queue_time ASC limit ?/
);
$sth->execute($limit);
while ( my $row = $sth->fetchrow_hashref() ) {
push @items, $row->{uri};
}
}
);
return ( $limit == 1 ) ? $items[0] : \@items;
}
=head2 size
Returns the number of items currently in the queue.
=cut
sub size {
my $self = shift;
my $size = 0;
my $t = $self->{table_name};
$self->{conn}->run(
sub {
my $dbh = $_; # just for clarity
my $sth = $dbh->prepare(
qq/select count(*) from $t where lock_time=0/);
$sth->execute();
$size = $sth->fetch->[0];
}
);
return $size;
}
=head2 schema
Callable as a function or class method. Returns string suitable
for initializing a B<dezi_queue> SQL table.
Example:
( run in 0.778 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )