Amazon-MWS

 view release on metacpan or  search on metacpan

lib/Amazon/MWS/Uploader.pm  view on Meta::CPAN

    # feeds. every feed has the same procedure, as per
    # http://docs.developer.amazonservices.com/en_US/feeds/Feeds_Overview.html
    # so we put a flag on the feed when it is done. The processing
    # of the feed itself is tracked in the amazon_mws_feeds

    # TODO: we could pass to the object some flags to filter out results.
    foreach my $feed (@$feeds) {
        # write out the feed if we got something to do, and add a row
        # to the feeds.

        # when there is no content, no need to create a job for it.
        if (my $content = $feed->{content}) {
            my $name = $feed->{name} or die "Missing feed_name";
            my $file = $self->_feed_file_for_method($job_id, $name);
            open (my $fh, '>', $file) or die "Couldn't open $file $!";
            print $fh $content;
            close $fh;
            # and prepare a row for it

            my $insertion = {
                             feed_name => $name,
                             feed_file => $file,
                             amws_job_id => $job_id,
                             shop_id => $self->_unique_shop_id,
                            };
            $self->_exe_query($self->sqla
                              ->insert(amazon_mws_feeds => $insertion));
        }
    }
    return $job_id;
}


sub get_pending_jobs {
    my ($self, @args) = @_;
    my %additional;
    my @named_jobs;
    foreach my $arg (@args) {
        if (!ref($arg)) {
            push @named_jobs, $arg;
        }
        elsif (ref($arg) eq 'HASH') {
            # add the filters
            foreach my $key (keys %$arg) {
                if ($additional{$key}) {
                    die "Attempt to overwrite $key in the additional parameters!\n";
                }
                else {
                    $additional{$key} = $arg->{$key};
                }
            }
        }
        else {
            die "Argument must be either a scalar with a job name and/or "
              . "an hashref with additional filters!";
        }
    }
    if (@named_jobs) {
        $additional{amws_job_id} = { -in => \@named_jobs };
    }
    my ($stmt, @bind) = $self->sqla->select(amazon_mws_jobs => '*',
                                            {
                                             %additional,
                                             aborted => 0,
                                             success => 0,
                                             shop_id => $self->_unique_shop_id,
                                            },
                                            { -asc => 'job_started_epoch'});
    my $pending = $self->_exe_query($stmt, @bind);
    my %jobs;
    while (my $row = $pending->fetchrow_hashref) {
        $jobs{$row->{task}} ||= [];
        push @{$jobs{$row->{task}}}, $row;
    }
    my @out;
    foreach my $task (qw/product_deletion upload shipping_confirmation order_ack/) {
        if (my $list = delete $jobs{$task}) {
            if ($task eq 'order_ack') {
                for (1..2) {
                    push @out, pop @$list if @$list;
                }
            }
            elsif ($task eq 'shipping_confirmation') {
                while (@$list) {
                    push @out, pop @$list;
                }
            }
            else {
                push @out, @$list if @$list;
            }
        }
    }
    return @out;
}

sub resume {
    my ($self, @args) = @_;
    foreach my $row ($self->get_pending_jobs(@args)) {
        print "Working on $row->{amws_job_id}\n";
        # check if the job dir exists
        if (-d $self->_feed_job_dir($row->{amws_job_id})) {
            if (my $seconds_elapsed = $self->job_timed_out($row)) {
                $self->_print_or_warn_error("Timeout reached for $row->{amws_job_id}, aborting: "
                                            . Dumper($row));
                $self->cancel_job($row->{task}, $row->{amws_job_id},
                                  "Job timed out after $seconds_elapsed seconds");
                next;
            }
            $self->process_feeds($row);
        }
        else {
            warn "No directory " . $self->_feed_job_dir($row->{amws_job_id}) .
              " found, removing job id $row->{amws_job_id}\n";
            $self->cancel_job($row->{task}, $row->{amws_job_id},
                              "Job canceled due to missing feed directory");
        }
    }
}

=head2 cancel_job($task, $job_id, $reason)

Abort the job setting the aborted flag in C<amazon_mws_jobs> table.

=cut

sub cancel_job {
    my ($self, $task, $job_id, $reason) = @_;
    $self->_exe_query($self->sqla->update('amazon_mws_jobs',
                                          {
                                           aborted => 1,
                                           status => $reason,
                                          },
                                          {
                                           amws_job_id => $job_id,
                                           shop_id => $self->_unique_shop_id,
                                          }));

    # and revert the products' status
    my $status;
    if ($task eq 'product_deletion') {
        # let's pretend we were deleting good products
        $status = 'ok';
    }
    elsif ($task eq 'upload') {
        $status = 'redo';
    }
    if ($status) {
        print "Updating product to $status for products with job id $job_id\n";
        $self->_exe_query($self->sqla->update('amazon_mws_products',
                                              { status => $status  },
                                              {
                                               amws_job_id => $job_id,
                                               shop_id => $self->_unique_shop_id,
                                              }));
    }
}



=head2 process_feeds(\%job_row)

Given the hashref with the db row of the job, check at which point it
is and resume.

=cut

sub process_feeds {
    my ($self, $row) = @_;
    # print Dumper($row);
    # upload the feeds one by one and stop if something is blocking
    my $job_id = $row->{amws_job_id};
    print "Processing job $job_id\n";

    # query the feeds table for this job
    my ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
                                            {
                                             amws_job_id => $job_id,
                                             aborted => 0,
                                             success => 0,
                                             shop_id => $self->_unique_shop_id,
                                            },
                                            ['amws_feed_pk']);

    my $sth = $self->_exe_query($stmt, @bind);
    my $unfinished;
    while (my $feed = $sth->fetchrow_hashref) {
        last unless $self->upload_feed($feed);
    }
    $sth->finish;

    ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
                                         {
                                          shop_id => $self->_unique_shop_id,
                                          amws_job_id => $job_id,
                                         });

    $sth = $self->_exe_query($stmt, @bind);

    my ($total, $success, $aborted) = (0, 0, 0);

    # query again and check if we have aborted jobs;
    while (my $feed = $sth->fetchrow_hashref) {
        $total++;
        $success++ if $feed->{success};
        $aborted++ if $feed->{aborted};
    }

    # a job was aborted
    my $update;
    if ($aborted) {
        $update = {
                   aborted => 1,
                   status => 'Feed error',
                  };
        $self->_print_or_warn_error("Job $job_id aborted!\n");
    }
    elsif ($success == $total) {
        $update = { success => 1 };
        print "Job successful!\n";
        # if we're here, all the products are fine, so mark them as
        # such if it's an upload job
        if ($row->{task} eq 'upload') {
            $self->_exe_query($self->sqla->update('amazon_mws_products',
                                                  { status => 'ok',
                                                    listed_date => DateTime->now,
                                                    listed => 1,
                                                  },
                                                  {
                                                   amws_job_id => $job_id,
                                                   shop_id => $self->_unique_shop_id,
                                                  }));
        }
    }
    else {
        print "Job still to be processed\n";
    }
    if ($update) {
        $self->_exe_query($self->sqla->update(amazon_mws_jobs => $update,
                                              {
                                               amws_job_id => $job_id,
                                               shop_id => $self->_unique_shop_id,
                                              }));
    }
}

=head2 upload_feed($type, $feed_id);

Routine to upload the feed. Return true if it's complete, false
otherwise.

=cut

sub upload_feed {
    my ($self, $record) = @_;
    my $job_id = $record->{amws_job_id};
    my $type   = $record->{feed_name};
    my $feed_id = $record->{feed_id};

lib/Amazon/MWS/Uploader.pm  view on Meta::CPAN

            elsif ($type eq 'shipping_confirmation') {
                $self->_exe_query($self->sqla->update(amazon_mws_orders => { shipping_confirmation_ok => 1 },
                                                      { shipping_confirmation_job_id => $job_id,
                                                        shop_id => $self->_unique_shop_id }));
            }
            if (my $warn = $result->warnings) {
                if (my $warns = $result->skus_warnings) {
                    foreach my $w (@$warns) {
                        $self->_error_logger(warning => $w->{code},
                                             "$w->{sku}: $w->{error}");
                        # and register it in the db
                        if ($w->{sku} && $w->{error}) {
                            $self->_exe_query($self->sqla->update('amazon_mws_products',
                                                                  { warnings => "$job_id $w->{code} $w->{error}" },
                                                                  {
                                                                   sku => $w->{sku},
                                                                   shop_id => $self->_unique_shop_id,
                                                                  }));
                        }
                    }
                }
                else {
                    warn "$warn\n";
                }
            }
            return 1;
        }
        else {
            foreach my $err ($result->report_errors) {
                $self->_error_logger($err->{type},
                                     $err->{code},
                                     $err->{message});
            }
            $self->_exe_query($self->sqla
                              ->update('amazon_mws_feeds',
                                       {
                                        aborted => 1,
                                        errors => $result->errors,
                                       },
                                       {
                                        feed_id => $feed_id,
                                        shop_id => $self->_unique_shop_id,
                                       }));
            $self->register_errors($job_id, $result);
            
            if ($type eq 'order_ack') {
                $self->register_order_ack_errors($job_id, $result);
            }
            elsif ($type eq 'shipping_confirmation') {
                $self->register_ship_order_errors($job_id, $result);
            }
            
            # and we stop this job, has errors
            return 0;
        }
    }
    return $record->{success};
}

sub _exe_query {
    my ($self, $stmt, @bind) = @_;
    my $sth = $self->dbh->prepare($stmt);
    print $stmt, Dumper(\@bind) if DEBUG;
    eval {
        $sth->execute(@bind);
    };
    if ($@) {
        die "Failed to execute $stmt with params" . Dumper(\@bind);
    }
    return $sth;
}

sub _check_processing_complete {
    my ($self, $feed_id, $type) = @_;
    my $res;
    try {
        $res = $self->client->GetFeedSubmissionList;
    } catch {
        my $exception = $_;
        if (ref($exception) && $exception->can('xml')) {
            warn "checking processing complete error for $type $feed_id: " . $exception->xml;
        }
        else {
            warn "checking processing complete for $type $feed_id: " . Dumper($exception);
        }
    };
    die unless $res;
    print "Checking if the processing is complete for $type $feed_id\n"; # . Dumper($res);
    my $found;
    if (my $list = $res->{FeedSubmissionInfo}) {
        foreach my $feed (@$list) {
            if ($feed->{FeedSubmissionId} eq $feed_id) {
                $found = $feed;
                last;
            }
        }

        # check the result
        if ($found && $found->{FeedProcessingStatus} eq '_DONE_') {
            return 1;
        }
        elsif ($found) {
            print "Feed $type $feed_id still $found->{FeedProcessingStatus}\n";
            return;
        }
        else {
            # there is a remote possibility that in it in another
            # page, but it should be very unlikely, as we block the
            # process when the first one is not complete
            print "$feed_id not found in submission list\n";
            return;
        }
    }
    else {
        warn "No FeedSubmissionInfo found for $type $feed_id:" . Dumper($res);
        return;
    }
}

=head2 submission_result($feed_id)

Return a L<Amazon::MWS::XML::Response::FeedSubmissionResult> object
for the given feed ID.

=cut

sub submission_result {
    my ($self, $feed_id) = @_;



( run in 1.156 second using v1.01-cache-2.11-cpan-2398b32b56e )