Amazon-MWS
view release on metacpan - search on metacpan
view release on metacpan or search on metacpan
lib/Amazon/MWS/Uploader.pm view on Meta::CPAN
# add the filters
foreach my $key (keys %$arg) {
if ($additional{$key}) {
die "Attempt to overwrite $key in the additional parameters!\n";
}
else {
$additional{$key} = $arg->{$key};
}
}
}
else {
die "Argument must be either a scalar with a job name and/or "
. "an hashref with additional filters!";
}
}
if (@named_jobs) {
$additional{amws_job_id} = { -in => \@named_jobs };
}
my ($stmt, @bind) = $self->sqla->select(amazon_mws_jobs => '*',
{
%additional,
aborted => 0,
success => 0,
shop_id => $self->_unique_shop_id,
},
{ -asc => 'job_started_epoch'});
my $pending = $self->_exe_query($stmt, @bind);
my %jobs;
while (my $row = $pending->fetchrow_hashref) {
$jobs{$row->{task}} ||= [];
push @{$jobs{$row->{task}}}, $row;
}
my @out;
foreach my $task (qw/product_deletion upload shipping_confirmation order_ack/) {
if (my $list = delete $jobs{$task}) {
if ($task eq 'order_ack') {
for (1..2) {
push @out, pop @$list if @$list;
}
}
elsif ($task eq 'shipping_confirmation') {
while (@$list) {
push @out, pop @$list;
}
}
else {
push @out, @$list if @$list;
}
}
}
return @out;
}
sub resume {
my ($self, @args) = @_;
foreach my $row ($self->get_pending_jobs(@args)) {
print "Working on $row->{amws_job_id}\n";
# check if the job dir exists
if (-d $self->_feed_job_dir($row->{amws_job_id})) {
if (my $seconds_elapsed = $self->job_timed_out($row)) {
$self->_print_or_warn_error("Timeout reached for $row->{amws_job_id}, aborting: "
. Dumper($row));
$self->cancel_job($row->{task}, $row->{amws_job_id},
"Job timed out after $seconds_elapsed seconds");
next;
}
$self->process_feeds($row);
}
else {
warn "No directory " . $self->_feed_job_dir($row->{amws_job_id}) .
" found, removing job id $row->{amws_job_id}\n";
$self->cancel_job($row->{task}, $row->{amws_job_id},
"Job canceled due to missing feed directory");
}
}
}
=head2 cancel_job($task, $job_id, $reason)
Abort the job setting the aborted flag in C<amazon_mws_jobs> table.
=cut
sub cancel_job {
my ($self, $task, $job_id, $reason) = @_;
$self->_exe_query($self->sqla->update('amazon_mws_jobs',
{
aborted => 1,
status => $reason,
},
{
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
}));
# and revert the products' status
my $status;
if ($task eq 'product_deletion') {
# let's pretend we were deleting good products
$status = 'ok';
}
elsif ($task eq 'upload') {
$status = 'redo';
}
if ($status) {
print "Updating product to $status for products with job id $job_id\n";
$self->_exe_query($self->sqla->update('amazon_mws_products',
{ status => $status },
{
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
}));
}
}
=head2 process_feeds(\%job_row)
Given the hashref with the db row of the job, check at which point it
is and resume.
view all matches for this distributionview release on metacpan - search on metacpan
( run in 3.436 seconds using v1.00-cache-2.02-grep-82fe00e-cpan-48ebf85a1963 )