Amazon-MWS
view release on metacpan or search on metacpan
lib/Amazon/MWS/Uploader.pm view on Meta::CPAN
# feeds. every feed has the same procedure, as per
# http://docs.developer.amazonservices.com/en_US/feeds/Feeds_Overview.html
# so we put a flag on the feed when it is done. The processing
# of the feed itself is tracked in the amazon_mws_feeds
# TODO: we could pass to the object some flags to filter out results.
foreach my $feed (@$feeds) {
# write out the feed if we got something to do, and add a row
# to the feeds.
# when there is no content, no need to create a job for it.
if (my $content = $feed->{content}) {
my $name = $feed->{name} or die "Missing feed_name";
my $file = $self->_feed_file_for_method($job_id, $name);
open (my $fh, '>', $file) or die "Couldn't open $file $!";
print $fh $content;
close $fh;
# and prepare a row for it
my $insertion = {
feed_name => $name,
feed_file => $file,
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
};
$self->_exe_query($self->sqla
->insert(amazon_mws_feeds => $insertion));
}
}
return $job_id;
}
sub get_pending_jobs {
my ($self, @args) = @_;
my %additional;
my @named_jobs;
foreach my $arg (@args) {
if (!ref($arg)) {
push @named_jobs, $arg;
}
elsif (ref($arg) eq 'HASH') {
# add the filters
foreach my $key (keys %$arg) {
if ($additional{$key}) {
die "Attempt to overwrite $key in the additional parameters!\n";
}
else {
$additional{$key} = $arg->{$key};
}
}
}
else {
die "Argument must be either a scalar with a job name and/or "
. "an hashref with additional filters!";
}
}
if (@named_jobs) {
$additional{amws_job_id} = { -in => \@named_jobs };
}
my ($stmt, @bind) = $self->sqla->select(amazon_mws_jobs => '*',
{
%additional,
aborted => 0,
success => 0,
shop_id => $self->_unique_shop_id,
},
{ -asc => 'job_started_epoch'});
my $pending = $self->_exe_query($stmt, @bind);
my %jobs;
while (my $row = $pending->fetchrow_hashref) {
$jobs{$row->{task}} ||= [];
push @{$jobs{$row->{task}}}, $row;
}
my @out;
foreach my $task (qw/product_deletion upload shipping_confirmation order_ack/) {
if (my $list = delete $jobs{$task}) {
if ($task eq 'order_ack') {
for (1..2) {
push @out, pop @$list if @$list;
}
}
elsif ($task eq 'shipping_confirmation') {
while (@$list) {
push @out, pop @$list;
}
}
else {
push @out, @$list if @$list;
}
}
}
return @out;
}
sub resume {
my ($self, @args) = @_;
foreach my $row ($self->get_pending_jobs(@args)) {
print "Working on $row->{amws_job_id}\n";
# check if the job dir exists
if (-d $self->_feed_job_dir($row->{amws_job_id})) {
if (my $seconds_elapsed = $self->job_timed_out($row)) {
$self->_print_or_warn_error("Timeout reached for $row->{amws_job_id}, aborting: "
. Dumper($row));
$self->cancel_job($row->{task}, $row->{amws_job_id},
"Job timed out after $seconds_elapsed seconds");
next;
}
$self->process_feeds($row);
}
else {
warn "No directory " . $self->_feed_job_dir($row->{amws_job_id}) .
" found, removing job id $row->{amws_job_id}\n";
$self->cancel_job($row->{task}, $row->{amws_job_id},
"Job canceled due to missing feed directory");
}
}
}
=head2 cancel_job($task, $job_id, $reason)
Abort the job setting the aborted flag in C<amazon_mws_jobs> table.
=cut
sub cancel_job {
my ($self, $task, $job_id, $reason) = @_;
$self->_exe_query($self->sqla->update('amazon_mws_jobs',
{
aborted => 1,
status => $reason,
},
{
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
}));
# and revert the products' status
my $status;
if ($task eq 'product_deletion') {
# let's pretend we were deleting good products
$status = 'ok';
}
elsif ($task eq 'upload') {
$status = 'redo';
}
if ($status) {
print "Updating product to $status for products with job id $job_id\n";
$self->_exe_query($self->sqla->update('amazon_mws_products',
{ status => $status },
{
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
}));
}
}
=head2 process_feeds(\%job_row)
Given the hashref with the db row of the job, check at which point it
is and resume.
=cut
sub process_feeds {
my ($self, $row) = @_;
# print Dumper($row);
# upload the feeds one by one and stop if something is blocking
my $job_id = $row->{amws_job_id};
print "Processing job $job_id\n";
# query the feeds table for this job
my ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
{
amws_job_id => $job_id,
aborted => 0,
success => 0,
shop_id => $self->_unique_shop_id,
},
['amws_feed_pk']);
my $sth = $self->_exe_query($stmt, @bind);
my $unfinished;
while (my $feed = $sth->fetchrow_hashref) {
last unless $self->upload_feed($feed);
}
$sth->finish;
($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
{
shop_id => $self->_unique_shop_id,
amws_job_id => $job_id,
});
$sth = $self->_exe_query($stmt, @bind);
my ($total, $success, $aborted) = (0, 0, 0);
# query again and check if we have aborted jobs;
while (my $feed = $sth->fetchrow_hashref) {
$total++;
$success++ if $feed->{success};
$aborted++ if $feed->{aborted};
}
# a job was aborted
my $update;
if ($aborted) {
$update = {
aborted => 1,
status => 'Feed error',
};
$self->_print_or_warn_error("Job $job_id aborted!\n");
}
elsif ($success == $total) {
$update = { success => 1 };
print "Job successful!\n";
# if we're here, all the products are fine, so mark them as
# such if it's an upload job
if ($row->{task} eq 'upload') {
$self->_exe_query($self->sqla->update('amazon_mws_products',
{ status => 'ok',
listed_date => DateTime->now,
listed => 1,
},
{
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
}));
}
}
else {
print "Job still to be processed\n";
}
if ($update) {
$self->_exe_query($self->sqla->update(amazon_mws_jobs => $update,
{
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
}));
}
}
=head2 upload_feed($type, $feed_id);
Routine to upload the feed. Return true if it's complete, false
otherwise.
=cut
sub upload_feed {
my ($self, $record) = @_;
my $job_id = $record->{amws_job_id};
my $type = $record->{feed_name};
my $feed_id = $record->{feed_id};
lib/Amazon/MWS/Uploader.pm view on Meta::CPAN
elsif ($type eq 'shipping_confirmation') {
$self->_exe_query($self->sqla->update(amazon_mws_orders => { shipping_confirmation_ok => 1 },
{ shipping_confirmation_job_id => $job_id,
shop_id => $self->_unique_shop_id }));
}
if (my $warn = $result->warnings) {
if (my $warns = $result->skus_warnings) {
foreach my $w (@$warns) {
$self->_error_logger(warning => $w->{code},
"$w->{sku}: $w->{error}");
# and register it in the db
if ($w->{sku} && $w->{error}) {
$self->_exe_query($self->sqla->update('amazon_mws_products',
{ warnings => "$job_id $w->{code} $w->{error}" },
{
sku => $w->{sku},
shop_id => $self->_unique_shop_id,
}));
}
}
}
else {
warn "$warn\n";
}
}
return 1;
}
else {
foreach my $err ($result->report_errors) {
$self->_error_logger($err->{type},
$err->{code},
$err->{message});
}
$self->_exe_query($self->sqla
->update('amazon_mws_feeds',
{
aborted => 1,
errors => $result->errors,
},
{
feed_id => $feed_id,
shop_id => $self->_unique_shop_id,
}));
$self->register_errors($job_id, $result);
if ($type eq 'order_ack') {
$self->register_order_ack_errors($job_id, $result);
}
elsif ($type eq 'shipping_confirmation') {
$self->register_ship_order_errors($job_id, $result);
}
# and we stop this job, has errors
return 0;
}
}
return $record->{success};
}
sub _exe_query {
my ($self, $stmt, @bind) = @_;
my $sth = $self->dbh->prepare($stmt);
print $stmt, Dumper(\@bind) if DEBUG;
eval {
$sth->execute(@bind);
};
if ($@) {
die "Failed to execute $stmt with params" . Dumper(\@bind);
}
return $sth;
}
sub _check_processing_complete {
my ($self, $feed_id, $type) = @_;
my $res;
try {
$res = $self->client->GetFeedSubmissionList;
} catch {
my $exception = $_;
if (ref($exception) && $exception->can('xml')) {
warn "checking processing complete error for $type $feed_id: " . $exception->xml;
}
else {
warn "checking processing complete for $type $feed_id: " . Dumper($exception);
}
};
die unless $res;
print "Checking if the processing is complete for $type $feed_id\n"; # . Dumper($res);
my $found;
if (my $list = $res->{FeedSubmissionInfo}) {
foreach my $feed (@$list) {
if ($feed->{FeedSubmissionId} eq $feed_id) {
$found = $feed;
last;
}
}
# check the result
if ($found && $found->{FeedProcessingStatus} eq '_DONE_') {
return 1;
}
elsif ($found) {
print "Feed $type $feed_id still $found->{FeedProcessingStatus}\n";
return;
}
else {
# there is a remote possibility that in it in another
# page, but it should be very unlikely, as we block the
# process when the first one is not complete
print "$feed_id not found in submission list\n";
return;
}
}
else {
warn "No FeedSubmissionInfo found for $type $feed_id:" . Dumper($res);
return;
}
}
=head2 submission_result($feed_id)
Return a L<Amazon::MWS::XML::Response::FeedSubmissionResult> object
for the given feed ID.
=cut
sub submission_result {
my ($self, $feed_id) = @_;
( run in 1.156 second using v1.01-cache-2.11-cpan-2398b32b56e )