Dezi-Bot

 view release on metacpan or  search on metacpan

lib/Dezi/Bot/Queue/DBI.pm  view on Meta::CPAN

        $password,
        {   RaiseError => 1,
            AutoCommit => 1,
        }
    );
    $self->{conn}->mode('fixup');    # ping only on failure
    $self->{ih} = DBIx::InsertHash->new(
        table      => $self->{table_name},
        quote      => $self->{quote},
        quote_char => $self->{quote_char},
    );
    return $self;
}

=head2 conn

Returns the internal DBIx::Connector object.

=cut

sub conn {
    return shift->{conn};
}

=head2 put( I<item>, I<args> )

Add I<item> to the queue.

=cut

sub put {
    my $self = shift;
    my $item = shift;
    if ( !defined $item ) {
        croak "item required";
    }
    my %cols = @_;
    my $md5  = md5_hex("$item");
    my $row  = {
        lock_time  => 0,
        uri_md5    => $md5,
        uri        => $item,
        queue_time => Time::HiRes::time(),
        queue_name => $self->name,
        %cols,
    };
    $self->{conn}->run(
        sub {
            my $dbh = $_;    # just for clarity
            $self->{ih}->insert( $row, $self->{table_name}, $dbh );
        }
    );
}

=head2 get([ I<limit>, I<update_cols> ])

Returns the next item from the queue, marking it as unavailable.
Default is to return 1 item, but set I<limit> to return multiple.

I<update_cols> is an optional hashref of column/value pairs to update
when each item is locked.

=cut

sub get {
    my $self        = shift;
    my $limit       = shift || 1;
    my $update_cols = shift || {};
    my @items;
    my $t = $self->{table_name};
    $self->{conn}->run(
        sub {
            my $dbh = $_;    # just for clarity
            my $sth
                = $dbh->prepare(
                qq/select * from $t where queue_name=? and lock_time=0 order by priority DESC, queue_time ASC limit ?/
                );
            $sth->execute( $self->name, $limit );
            while ( my $row = $sth->fetchrow_hashref() ) {
                push @items, URI->new( $row->{uri} );

                # lock
                $row->{lock_time} = Time::HiRes::time();

                # mixin/override
                $row->{$_} = $update_cols->{$_} for keys %$update_cols;

                # update
                $self->{ih}->update( $row, [ $row->{id} ],
                    'id=?', $self->{table_name}, $dbh );
            }
        }
    );
    return ( $limit == 1 ) ? $items[0] : \@items;
}

=head2 remove( I<item> )

Remove I<item> from the queue completely.

=cut

sub remove {
    my $self = shift;
    my $item = shift;
    if ( !defined $item ) {
        croak "item required";
    }
    my $count = 0;
    my $md5   = md5_hex("$item");
    my $t     = $self->{table_name};
    $self->{conn}->run(
        sub {
            my $dbh = $_;    # just for clarity
            $count
                = $dbh->do( qq/delete from $t where uri_md5=?/, undef, $md5 );
        }
    );
    return $count;
}

=head2 clean

Remove all locked items from the queue.

=cut

sub clean {
    my $self  = shift;
    my $count = 0;
    my $t     = $self->{table_name};
    $self->{conn}->run(
        sub {
            my $dbh = $_;    # just for clarity
            $count = $dbh->do(qq/delete from $t where locked!=0/);
        }
    );
    return $count;
}

=head2 peek([ I<limit> ])

Returns the next item value, but leaves it on the stack as available.

=cut

sub peek {
    my $self = shift;
    my $limit = shift || 1;
    my @items;
    my $t = $self->{table_name};
    $self->{conn}->run(
        sub {
            my $dbh = $_;    # just for clarity
            my $sth
                = $dbh->prepare(
                qq/select * from $t where lock_time=0 order by priority DESC, queue_time ASC limit ?/
                );
            $sth->execute($limit);
            while ( my $row = $sth->fetchrow_hashref() ) {
                push @items, $row->{uri};
            }
        }
    );
    return ( $limit == 1 ) ? $items[0] : \@items;
}

=head2 size

Returns the number of items currently in the queue.

=cut

sub size {
    my $self = shift;
    my $size = 0;
    my $t    = $self->{table_name};
    $self->{conn}->run(
        sub {
            my $dbh = $_;              # just for clarity
            my $sth = $dbh->prepare(
                qq/select count(*) from $t where lock_time=0/);
            $sth->execute();
            $size = $sth->fetch->[0];
        }
    );
    return $size;
}

=head2 schema

Callable as a function or class method. Returns string suitable
for initializing a B<dezi_queue> SQL table.

Example:



( run in 0.778 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )