Amazon-MWS

 view release on metacpan or  search on metacpan

lib/Amazon/MWS/Uploader.pm  view on Meta::CPAN

package Amazon::MWS::Uploader;

use utf8;
use strict;
use warnings;

use DBI;
use Amazon::MWS::XML::Feed;
use Amazon::MWS::XML::Order;
use Amazon::MWS::Client;
use Amazon::MWS::XML::Response::FeedSubmissionResult;
use Amazon::MWS::XML::Response::OrderReport;
use Data::Dumper;
use File::Spec;
use DateTime;
use SQL::Abstract;
use Try::Tiny;
use Path::Tiny;
use Scalar::Util qw/blessed/;
use XML::Compile::Schema;

use Moo;
use MooX::Types::MooseLike::Base qw(:all);
use namespace::clean;

our $VERSION = '0.18';

use constant {
    AMW_ORDER_WILDCARD_ERROR => 999999,
    DEBUG => $ENV{AMZ_UPLOADER_DEBUG},
};

=head1 NAME

Amazon::MWS::Uploader -- high level agent to upload products to AMWS

=head1 DESCRIPTION

This module provide an high level interface to the upload process. It
has to keep track of the state to resume the uploading, which could
get stuck on the Amazon's side processing, so database credentials
have to be provided (or the database handle itself).

The table structure needed is defined and commented in sql/amazon.sql

=head1 SYNOPSIS

  my $agent = Amazon::MWS::Uploader->new(
                                         db_dsn => 'DBI:mysql:database=XXX',
                                         db_username => 'xxx',
                                         db_password => 'xxx',
                                         db_options => \%options
                                         # or dbh => $dbh,
  
                                         schema_dir => '/path/to/xml_schema',
                                         feed_dir => '/path/to/directory/for/xml',
  
                                         merchant_id => 'xxx',
                                         access_key_id => 'xxx',
                                         secret_key => 'xxx',
  
                                         marketplace_id => 'xxx',
                                         endpoint => 'xxx',
  
                                         products => \@products,
                                        );
  
  # say once a day, retrieve the full batch and send it up
  $agent->upload; 
  
  # every 10 minutes or so, continue the work started with ->upload, if any
  $agent->resume;


=head1 UPGRADE NOTES

lib/Amazon/MWS/Uploader.pm  view on Meta::CPAN

    my $feeder = Amazon::MWS::XML::Feed->new(
                                             products => \@products,
                                             xml_writer => $self->xml_writer,
                                             merchant_id => $self->merchant_id,
                                            );
    my @feeds;
    foreach my $feed_name (qw/product
                              inventory
                              price
                              image
                              variants
                             /) {
        my $method = $feed_name . "_feed";
        if (my $content = $feeder->$method) {
            push @feeds, {
                          name => $feed_name,
                          content => $content,
                         };
        }
    }
    if (my $job_id = $self->prepare_feeds(upload => \@feeds)) {
        $self->_mark_products_as_pending($job_id, @products);
        return $job_id;
    }
    return;
}

sub _mark_products_as_pending {
    my ($self, $job_id, @products) = @_;
    die "Bad usage" unless $job_id;
    # these skus were cleared up when asking for the products to upload
    foreach my $p (@products) {
        my %identifier = (
                          sku => $p->sku,
                          shop_id => $self->_unique_shop_id,
                         );
        my %data = (
                    amws_job_id => $job_id,
                    status => 'pending',
                    warnings => '', # clear out
                    timestamp_string => $p->timestamp_string,
                   );
        my $check = $self
          ->_exe_query($self->sqla->select(amazon_mws_products => [qw/sku/],  { %identifier }));
        my $existing = $check->fetchrow_hashref;
        $check->finish;
        if ($existing) {
            $self->_exe_query($self->sqla->update(amazon_mws_products => \%data, \%identifier));
        }
        else {
            $self->_exe_query($self->sqla->insert(amazon_mws_products => { %identifier, %data }));
        }
    }
}


sub prepare_feeds {
    my ($self, $task, $feeds) = @_;
    die "Missing task ($task) and feeds ($feeds)" unless $task && $feeds;
    return unless @$feeds; # nothing to do
    my $job_id = $task . "-" . DateTime->now->strftime('%F-%H-%M-%S');
    my $job_started_epoch = time();

    $self->_exe_query($self->sqla
                      ->insert(amazon_mws_jobs => {
                                                   amws_job_id => $job_id,
                                                   shop_id => $self->_unique_shop_id,
                                                   task => $task,
                                                   job_started_epoch => $job_started_epoch,
                                                  }));

    # to complete the process, we need to fill out these five
    # feeds. every feed has the same procedure, as per
    # http://docs.developer.amazonservices.com/en_US/feeds/Feeds_Overview.html
    # so we put a flag on the feed when it is done. The processing
    # of the feed itself is tracked in the amazon_mws_feeds

    # TODO: we could pass to the object some flags to filter out results.
    foreach my $feed (@$feeds) {
        # write out the feed if we got something to do, and add a row
        # to the feeds.

        # when there is no content, no need to create a job for it.
        if (my $content = $feed->{content}) {
            my $name = $feed->{name} or die "Missing feed_name";
            my $file = $self->_feed_file_for_method($job_id, $name);
            open (my $fh, '>', $file) or die "Couldn't open $file $!";
            print $fh $content;
            close $fh;
            # and prepare a row for it

            my $insertion = {
                             feed_name => $name,
                             feed_file => $file,
                             amws_job_id => $job_id,
                             shop_id => $self->_unique_shop_id,
                            };
            $self->_exe_query($self->sqla
                              ->insert(amazon_mws_feeds => $insertion));
        }
    }
    return $job_id;
}


sub get_pending_jobs {
    my ($self, @args) = @_;
    my %additional;
    my @named_jobs;
    foreach my $arg (@args) {
        if (!ref($arg)) {
            push @named_jobs, $arg;
        }
        elsif (ref($arg) eq 'HASH') {
            # add the filters
            foreach my $key (keys %$arg) {
                if ($additional{$key}) {
                    die "Attempt to overwrite $key in the additional parameters!\n";
                }
                else {
                    $additional{$key} = $arg->{$key};

lib/Amazon/MWS/Uploader.pm  view on Meta::CPAN

=cut

sub process_feeds {
    my ($self, $row) = @_;
    # print Dumper($row);
    # upload the feeds one by one and stop if something is blocking
    my $job_id = $row->{amws_job_id};
    print "Processing job $job_id\n";

    # query the feeds table for this job
    my ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
                                            {
                                             amws_job_id => $job_id,
                                             aborted => 0,
                                             success => 0,
                                             shop_id => $self->_unique_shop_id,
                                            },
                                            ['amws_feed_pk']);

    my $sth = $self->_exe_query($stmt, @bind);
    my $unfinished;
    while (my $feed = $sth->fetchrow_hashref) {
        last unless $self->upload_feed($feed);
    }
    $sth->finish;

    ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
                                         {
                                          shop_id => $self->_unique_shop_id,
                                          amws_job_id => $job_id,
                                         });

    $sth = $self->_exe_query($stmt, @bind);

    my ($total, $success, $aborted) = (0, 0, 0);

    # query again and check if we have aborted jobs;
    while (my $feed = $sth->fetchrow_hashref) {
        $total++;
        $success++ if $feed->{success};
        $aborted++ if $feed->{aborted};
    }

    # a job was aborted
    my $update;
    if ($aborted) {
        $update = {
                   aborted => 1,
                   status => 'Feed error',
                  };
        $self->_print_or_warn_error("Job $job_id aborted!\n");
    }
    elsif ($success == $total) {
        $update = { success => 1 };
        print "Job successful!\n";
        # if we're here, all the products are fine, so mark them as
        # such if it's an upload job
        if ($row->{task} eq 'upload') {
            $self->_exe_query($self->sqla->update('amazon_mws_products',
                                                  { status => 'ok',
                                                    listed_date => DateTime->now,
                                                    listed => 1,
                                                  },
                                                  {
                                                   amws_job_id => $job_id,
                                                   shop_id => $self->_unique_shop_id,
                                                  }));
        }
    }
    else {
        print "Job still to be processed\n";
    }
    if ($update) {
        $self->_exe_query($self->sqla->update(amazon_mws_jobs => $update,
                                              {
                                               amws_job_id => $job_id,
                                               shop_id => $self->_unique_shop_id,
                                              }));
    }
}

=head2 upload_feed($type, $feed_id);

Routine to upload the feed. Return true if it's complete, false
otherwise.

=cut

sub upload_feed {
    my ($self, $record) = @_;
    my $job_id = $record->{amws_job_id};
    my $type   = $record->{feed_name};
    my $feed_id = $record->{feed_id};
    print "Checking $type feed for $job_id\n";
    # http://docs.developer.amazonservices.com/en_US/feeds/Feeds_FeedType.html


    my %names = (
                 product => '_POST_PRODUCT_DATA_',
                 inventory => '_POST_INVENTORY_AVAILABILITY_DATA_',
                 price => '_POST_PRODUCT_PRICING_DATA_',
                 image => '_POST_PRODUCT_IMAGE_DATA_',
                 variants => '_POST_PRODUCT_RELATIONSHIP_DATA_',
                 order_ack => '_POST_ORDER_ACKNOWLEDGEMENT_DATA_',
                 shipping_confirmation => '_POST_ORDER_FULFILLMENT_DATA_',
                );

    die "Unrecognized type $type" unless $names{$type};

    # no feed id, it's a new batch
    if (!$feed_id) {
        print "No feed id found, doing a request for $job_id $type\n";
        my $feed_content = $self->_slurp_file($record->{feed_file});
        my $res;
        try {
            $res = $self->client
              ->SubmitFeed(content_type => 'text/xml; charset=utf-8',
                           FeedType => $names{$type},
                           FeedContent => $feed_content,
                           MarketplaceIdList => [$self->marketplace_id],
                          );

lib/Amazon/MWS/Uploader.pm  view on Meta::CPAN

                $found = $feed;
                last;
            }
        }

        # check the result
        if ($found && $found->{FeedProcessingStatus} eq '_DONE_') {
            return 1;
        }
        elsif ($found) {
            print "Feed $type $feed_id still $found->{FeedProcessingStatus}\n";
            return;
        }
        else {
            # there is a remote possibility that in it in another
            # page, but it should be very unlikely, as we block the
            # process when the first one is not complete
            print "$feed_id not found in submission list\n";
            return;
        }
    }
    else {
        warn "No FeedSubmissionInfo found for $type $feed_id:" . Dumper($res);
        return;
    }
}

=head2 submission_result($feed_id)

Return a L<Amazon::MWS::XML::Response::FeedSubmissionResult> object
for the given feed ID.

=cut

sub submission_result {
    my ($self, $feed_id) = @_;
    my $xml;
    try {
        $xml = $self->client
          ->GetFeedSubmissionResult(FeedSubmissionId => $feed_id);
    } catch {
        my $exception = $_;
        if (ref($exception) && $exception->can('xml')) {
            warn "submission result error: " . $exception->xml;
        }
        else {
            warn "submission result error: " . Dumper($exception);
        }
    };
    die unless $xml;
    return Amazon::MWS::XML::Response::FeedSubmissionResult
      ->new(
            xml => $xml,
            xml_reader => $self->xml_reader,
           );
}

=head2 get_orders($from_date)

This is a self-contained method and doesn't require a product list.
The from_date must be a L<DateTime> object. If not provided, it will
the last week.

Returns a list of Amazon::MWS::XML::Order objects.

Beware that it's possible you get some items with 0 quantity, i.e.
single items cancelled. The application code needs to be prepared to
deal with such phantom items. You can check each order looping over
C<$order->items> checking for C<$item->quantity>.

=cut

sub get_orders {
    my ($self, $from_date) = @_;
    unless ($from_date) {
        $from_date = DateTime->now;
        $from_date->subtract(days => $self->order_days_range);
    }
    my @order_structs;
    my $res;
    try {
        $res = $self->client->ListOrders(
                                         MarketplaceId => [$self->marketplace_id],
                                         CreatedAfter  => $from_date,
                                        );
        push @order_structs, @{ $res->{Orders}->{Order} };
    }
    catch {
        die Dumper($_);
    };
    while (my $next_token = $res->{NextToken}) {
        # print "Found next token!\n";
        try {
            $res = $self->client->ListOrdersByNextToken(NextToken => $next_token);
            push @order_structs, @{ $res->{Orders}->{Order} };
        }
        catch {
            die Dumper($_);
        };
    }
    my @orders;
    foreach my $order (@order_structs) {
        my $amws_id = $order->{AmazonOrderId};
        die "Missing amazon AmazonOrderId?!" unless $amws_id;

        my $get_orderline = sub {
        # begin of the closure
        my $orderline;
        my @items;
        try {
            $orderline = $self->client->ListOrderItems(AmazonOrderId => $amws_id);
            push @items, @{ $orderline->{OrderItems}->{OrderItem} };
        }
        catch {
            my $err = $_;
            if (blessed($err) && $err->isa('Amazon::MWS::Exception::Throttled')) {
                die "Request is throttled. Consider to adjust order_days_range as documented at https://metacpan.org/pod/Amazon::MWS::Uploader#ACCESSORS";
            }
            else {
                die Dumper($err);
            }
        };
        while (my $next = $orderline->{NextToken}) {
            try {
                $orderline =
                  $self->client->ListOrderItemsByNextToken(NextToken  => $next);
                push @items, @{ $orderline->{OrderItems}->{OrderItem} };
            }
            catch {
                die Dumper($_);
            };
        }
        return \@items;
        # end of the closure
        };



( run in 0.899 second using v1.01-cache-2.11-cpan-39bf76dae61 )