Amazon-MWS
view release on metacpan or search on metacpan
lib/Amazon/MWS/Uploader.pm view on Meta::CPAN
package Amazon::MWS::Uploader;
use utf8;
use strict;
use warnings;
use DBI;
use Amazon::MWS::XML::Feed;
use Amazon::MWS::XML::Order;
use Amazon::MWS::Client;
use Amazon::MWS::XML::Response::FeedSubmissionResult;
use Amazon::MWS::XML::Response::OrderReport;
use Data::Dumper;
use File::Spec;
use DateTime;
use SQL::Abstract;
use Try::Tiny;
use Path::Tiny;
use Scalar::Util qw/blessed/;
use XML::Compile::Schema;
use Moo;
use MooX::Types::MooseLike::Base qw(:all);
use namespace::clean;
our $VERSION = '0.18';
use constant {
AMW_ORDER_WILDCARD_ERROR => 999999,
DEBUG => $ENV{AMZ_UPLOADER_DEBUG},
};
=head1 NAME
Amazon::MWS::Uploader -- high level agent to upload products to AMWS
=head1 DESCRIPTION
This module provide an high level interface to the upload process. It
has to keep track of the state to resume the uploading, which could
get stuck on the Amazon's side processing, so database credentials
have to be provided (or the database handle itself).
The table structure needed is defined and commented in sql/amazon.sql
=head1 SYNOPSIS
my $agent = Amazon::MWS::Uploader->new(
db_dsn => 'DBI:mysql:database=XXX',
db_username => 'xxx',
db_password => 'xxx',
db_options => \%options
# or dbh => $dbh,
schema_dir => '/path/to/xml_schema',
feed_dir => '/path/to/directory/for/xml',
merchant_id => 'xxx',
access_key_id => 'xxx',
secret_key => 'xxx',
marketplace_id => 'xxx',
endpoint => 'xxx',
products => \@products,
);
# say once a day, retrieve the full batch and send it up
$agent->upload;
# every 10 minutes or so, continue the work started with ->upload, if any
$agent->resume;
=head1 UPGRADE NOTES
lib/Amazon/MWS/Uploader.pm view on Meta::CPAN
my $feeder = Amazon::MWS::XML::Feed->new(
products => \@products,
xml_writer => $self->xml_writer,
merchant_id => $self->merchant_id,
);
my @feeds;
foreach my $feed_name (qw/product
inventory
price
image
variants
/) {
my $method = $feed_name . "_feed";
if (my $content = $feeder->$method) {
push @feeds, {
name => $feed_name,
content => $content,
};
}
}
if (my $job_id = $self->prepare_feeds(upload => \@feeds)) {
$self->_mark_products_as_pending($job_id, @products);
return $job_id;
}
return;
}
sub _mark_products_as_pending {
my ($self, $job_id, @products) = @_;
die "Bad usage" unless $job_id;
# these skus were cleared up when asking for the products to upload
foreach my $p (@products) {
my %identifier = (
sku => $p->sku,
shop_id => $self->_unique_shop_id,
);
my %data = (
amws_job_id => $job_id,
status => 'pending',
warnings => '', # clear out
timestamp_string => $p->timestamp_string,
);
my $check = $self
->_exe_query($self->sqla->select(amazon_mws_products => [qw/sku/], { %identifier }));
my $existing = $check->fetchrow_hashref;
$check->finish;
if ($existing) {
$self->_exe_query($self->sqla->update(amazon_mws_products => \%data, \%identifier));
}
else {
$self->_exe_query($self->sqla->insert(amazon_mws_products => { %identifier, %data }));
}
}
}
sub prepare_feeds {
my ($self, $task, $feeds) = @_;
die "Missing task ($task) and feeds ($feeds)" unless $task && $feeds;
return unless @$feeds; # nothing to do
my $job_id = $task . "-" . DateTime->now->strftime('%F-%H-%M-%S');
my $job_started_epoch = time();
$self->_exe_query($self->sqla
->insert(amazon_mws_jobs => {
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
task => $task,
job_started_epoch => $job_started_epoch,
}));
# to complete the process, we need to fill out these five
# feeds. every feed has the same procedure, as per
# http://docs.developer.amazonservices.com/en_US/feeds/Feeds_Overview.html
# so we put a flag on the feed when it is done. The processing
# of the feed itself is tracked in the amazon_mws_feeds
# TODO: we could pass to the object some flags to filter out results.
foreach my $feed (@$feeds) {
# write out the feed if we got something to do, and add a row
# to the feeds.
# when there is no content, no need to create a job for it.
if (my $content = $feed->{content}) {
my $name = $feed->{name} or die "Missing feed_name";
my $file = $self->_feed_file_for_method($job_id, $name);
open (my $fh, '>', $file) or die "Couldn't open $file $!";
print $fh $content;
close $fh;
# and prepare a row for it
my $insertion = {
feed_name => $name,
feed_file => $file,
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
};
$self->_exe_query($self->sqla
->insert(amazon_mws_feeds => $insertion));
}
}
return $job_id;
}
sub get_pending_jobs {
my ($self, @args) = @_;
my %additional;
my @named_jobs;
foreach my $arg (@args) {
if (!ref($arg)) {
push @named_jobs, $arg;
}
elsif (ref($arg) eq 'HASH') {
# add the filters
foreach my $key (keys %$arg) {
if ($additional{$key}) {
die "Attempt to overwrite $key in the additional parameters!\n";
}
else {
$additional{$key} = $arg->{$key};
lib/Amazon/MWS/Uploader.pm view on Meta::CPAN
=cut
sub process_feeds {
my ($self, $row) = @_;
# print Dumper($row);
# upload the feeds one by one and stop if something is blocking
my $job_id = $row->{amws_job_id};
print "Processing job $job_id\n";
# query the feeds table for this job
my ($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
{
amws_job_id => $job_id,
aborted => 0,
success => 0,
shop_id => $self->_unique_shop_id,
},
['amws_feed_pk']);
my $sth = $self->_exe_query($stmt, @bind);
my $unfinished;
while (my $feed = $sth->fetchrow_hashref) {
last unless $self->upload_feed($feed);
}
$sth->finish;
($stmt, @bind) = $self->sqla->select(amazon_mws_feeds => '*',
{
shop_id => $self->_unique_shop_id,
amws_job_id => $job_id,
});
$sth = $self->_exe_query($stmt, @bind);
my ($total, $success, $aborted) = (0, 0, 0);
# query again and check if we have aborted jobs;
while (my $feed = $sth->fetchrow_hashref) {
$total++;
$success++ if $feed->{success};
$aborted++ if $feed->{aborted};
}
# a job was aborted
my $update;
if ($aborted) {
$update = {
aborted => 1,
status => 'Feed error',
};
$self->_print_or_warn_error("Job $job_id aborted!\n");
}
elsif ($success == $total) {
$update = { success => 1 };
print "Job successful!\n";
# if we're here, all the products are fine, so mark them as
# such if it's an upload job
if ($row->{task} eq 'upload') {
$self->_exe_query($self->sqla->update('amazon_mws_products',
{ status => 'ok',
listed_date => DateTime->now,
listed => 1,
},
{
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
}));
}
}
else {
print "Job still to be processed\n";
}
if ($update) {
$self->_exe_query($self->sqla->update(amazon_mws_jobs => $update,
{
amws_job_id => $job_id,
shop_id => $self->_unique_shop_id,
}));
}
}
=head2 upload_feed($type, $feed_id);
Routine to upload the feed. Return true if it's complete, false
otherwise.
=cut
sub upload_feed {
my ($self, $record) = @_;
my $job_id = $record->{amws_job_id};
my $type = $record->{feed_name};
my $feed_id = $record->{feed_id};
print "Checking $type feed for $job_id\n";
# http://docs.developer.amazonservices.com/en_US/feeds/Feeds_FeedType.html
my %names = (
product => '_POST_PRODUCT_DATA_',
inventory => '_POST_INVENTORY_AVAILABILITY_DATA_',
price => '_POST_PRODUCT_PRICING_DATA_',
image => '_POST_PRODUCT_IMAGE_DATA_',
variants => '_POST_PRODUCT_RELATIONSHIP_DATA_',
order_ack => '_POST_ORDER_ACKNOWLEDGEMENT_DATA_',
shipping_confirmation => '_POST_ORDER_FULFILLMENT_DATA_',
);
die "Unrecognized type $type" unless $names{$type};
# no feed id, it's a new batch
if (!$feed_id) {
print "No feed id found, doing a request for $job_id $type\n";
my $feed_content = $self->_slurp_file($record->{feed_file});
my $res;
try {
$res = $self->client
->SubmitFeed(content_type => 'text/xml; charset=utf-8',
FeedType => $names{$type},
FeedContent => $feed_content,
MarketplaceIdList => [$self->marketplace_id],
);
lib/Amazon/MWS/Uploader.pm view on Meta::CPAN
$found = $feed;
last;
}
}
# check the result
if ($found && $found->{FeedProcessingStatus} eq '_DONE_') {
return 1;
}
elsif ($found) {
print "Feed $type $feed_id still $found->{FeedProcessingStatus}\n";
return;
}
else {
# there is a remote possibility that in it in another
# page, but it should be very unlikely, as we block the
# process when the first one is not complete
print "$feed_id not found in submission list\n";
return;
}
}
else {
warn "No FeedSubmissionInfo found for $type $feed_id:" . Dumper($res);
return;
}
}
=head2 submission_result($feed_id)
Return a L<Amazon::MWS::XML::Response::FeedSubmissionResult> object
for the given feed ID.
=cut
sub submission_result {
my ($self, $feed_id) = @_;
my $xml;
try {
$xml = $self->client
->GetFeedSubmissionResult(FeedSubmissionId => $feed_id);
} catch {
my $exception = $_;
if (ref($exception) && $exception->can('xml')) {
warn "submission result error: " . $exception->xml;
}
else {
warn "submission result error: " . Dumper($exception);
}
};
die unless $xml;
return Amazon::MWS::XML::Response::FeedSubmissionResult
->new(
xml => $xml,
xml_reader => $self->xml_reader,
);
}
=head2 get_orders($from_date)
This is a self-contained method and doesn't require a product list.
The from_date must be a L<DateTime> object. If not provided, it will
the last week.
Returns a list of Amazon::MWS::XML::Order objects.
Beware that it's possible you get some items with 0 quantity, i.e.
single items cancelled. The application code needs to be prepared to
deal with such phantom items. You can check each order looping over
C<$order->items> checking for C<$item->quantity>.
=cut
sub get_orders {
my ($self, $from_date) = @_;
unless ($from_date) {
$from_date = DateTime->now;
$from_date->subtract(days => $self->order_days_range);
}
my @order_structs;
my $res;
try {
$res = $self->client->ListOrders(
MarketplaceId => [$self->marketplace_id],
CreatedAfter => $from_date,
);
push @order_structs, @{ $res->{Orders}->{Order} };
}
catch {
die Dumper($_);
};
while (my $next_token = $res->{NextToken}) {
# print "Found next token!\n";
try {
$res = $self->client->ListOrdersByNextToken(NextToken => $next_token);
push @order_structs, @{ $res->{Orders}->{Order} };
}
catch {
die Dumper($_);
};
}
my @orders;
foreach my $order (@order_structs) {
my $amws_id = $order->{AmazonOrderId};
die "Missing amazon AmazonOrderId?!" unless $amws_id;
my $get_orderline = sub {
# begin of the closure
my $orderline;
my @items;
try {
$orderline = $self->client->ListOrderItems(AmazonOrderId => $amws_id);
push @items, @{ $orderline->{OrderItems}->{OrderItem} };
}
catch {
my $err = $_;
if (blessed($err) && $err->isa('Amazon::MWS::Exception::Throttled')) {
die "Request is throttled. Consider to adjust order_days_range as documented at https://metacpan.org/pod/Amazon::MWS::Uploader#ACCESSORS";
}
else {
die Dumper($err);
}
};
while (my $next = $orderline->{NextToken}) {
try {
$orderline =
$self->client->ListOrderItemsByNextToken(NextToken => $next);
push @items, @{ $orderline->{OrderItems}->{OrderItem} };
}
catch {
die Dumper($_);
};
}
return \@items;
# end of the closure
};
( run in 0.899 second using v1.01-cache-2.11-cpan-39bf76dae61 )