Amazon-MWS

 view release on metacpan or  search on metacpan

lib/Amazon/MWS/Uploader.pm  view on Meta::CPAN

            # add the filters
            foreach my $key (keys %$arg) {
                if ($additional{$key}) {
                    die "Attempt to overwrite $key in the additional parameters!\n";
                }
                else {
                    $additional{$key} = $arg->{$key};
                }
            }
        }
        else {
            die "Argument must be either a scalar with a job name and/or "
              . "an hashref with additional filters!";
        }
    }
    if (@named_jobs) {
        $additional{amws_job_id} = { -in => \@named_jobs };
    }
    my ($stmt, @bind) = $self->sqla->select(amazon_mws_jobs => '*',
                                            {
                                             %additional,
                                             aborted => 0,
                                             success => 0,
                                             shop_id => $self->_unique_shop_id,
                                            },
                                            { -asc => 'job_started_epoch'});
    my $pending = $self->_exe_query($stmt, @bind);
    my %jobs;
    while (my $row = $pending->fetchrow_hashref) {
        $jobs{$row->{task}} ||= [];
        push @{$jobs{$row->{task}}}, $row;
    }
    my @out;
    foreach my $task (qw/product_deletion upload shipping_confirmation order_ack/) {
        if (my $list = delete $jobs{$task}) {
            if ($task eq 'order_ack') {
                for (1..2) {
                    push @out, pop @$list if @$list;
                }
            }
            elsif ($task eq 'shipping_confirmation') {
                while (@$list) {
                    push @out, pop @$list;
                }
            }
            else {
                push @out, @$list if @$list;
            }
        }
    }
    return @out;
}

sub resume {
    my ($self, @args) = @_;
    foreach my $row ($self->get_pending_jobs(@args)) {
        print "Working on $row->{amws_job_id}\n";
        # check if the job dir exists
        if (-d $self->_feed_job_dir($row->{amws_job_id})) {
            if (my $seconds_elapsed = $self->job_timed_out($row)) {
                $self->_print_or_warn_error("Timeout reached for $row->{amws_job_id}, aborting: "
                                            . Dumper($row));
                $self->cancel_job($row->{task}, $row->{amws_job_id},
                                  "Job timed out after $seconds_elapsed seconds");
                next;
            }
            $self->process_feeds($row);
        }
        else {
            warn "No directory " . $self->_feed_job_dir($row->{amws_job_id}) .
              " found, removing job id $row->{amws_job_id}\n";
            $self->cancel_job($row->{task}, $row->{amws_job_id},
                              "Job canceled due to missing feed directory");
        }
    }
}

=head2 cancel_job($task, $job_id, $reason)

Abort the job setting the aborted flag in C<amazon_mws_jobs> table.

=cut

sub cancel_job {
    my ($self, $task, $job_id, $reason) = @_;
    $self->_exe_query($self->sqla->update('amazon_mws_jobs',
                                          {
                                           aborted => 1,
                                           status => $reason,
                                          },
                                          {
                                           amws_job_id => $job_id,
                                           shop_id => $self->_unique_shop_id,
                                          }));

    # and revert the products' status
    my $status;
    if ($task eq 'product_deletion') {
        # let's pretend we were deleting good products
        $status = 'ok';
    }
    elsif ($task eq 'upload') {
        $status = 'redo';
    }
    if ($status) {
        print "Updating product to $status for products with job id $job_id\n";
        $self->_exe_query($self->sqla->update('amazon_mws_products',
                                              { status => $status  },
                                              {
                                               amws_job_id => $job_id,
                                               shop_id => $self->_unique_shop_id,
                                              }));
    }
}



=head2 process_feeds(\%job_row)

Given the hashref with the db row of the job, check at which point it
is and resume.

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 3.436 seconds using v1.00-cache-2.02-grep-82fe00e-cpan-48ebf85a1963 )