AnyEvent-MultiDownload
view release on metacpan or search on metacpan
lib/AnyEvent/MultiDownload.pm view on Meta::CPAN
my ($self, $retry) = @_;
my $url = $self->shuffle_url;
my $first_task;
my $ev; $ev = http_get $url,
headers => $self->headers,
timeout => $self->timeout,
recurse => $self->recurse,
on_header => sub {
my ($hdr) = @_;
if ( $hdr->{Status} == 200 ) {
my $len = $hdr->{'content-length'};
return $self->cv->send(("Cannot find a content-length header.", $hdr))
if !defined($len) or $len <= 0;
# åå¤å¼å§ä¸è½½çä¿¡æ¯
my $ranges = $self->split_range($len);
# é¤äºç¬¬ä¸ä¸ªå, å
¶å®åç°å¨å¼å§ä¸è½½
# äºä»¶å¼å§, ä½è¿ä¸ªåè°ä¼å¨æåæè°ç¨.
$first_task = shift @{ $self->tasks };
$first_task->{block} = $first_task->{block} || 0;
$first_task->{ofs} = $first_task->{ofs} || 0;
return 1 if $len <= $self->block_size;
for ( 1 .. $self->max_per_host ) {
my $block_task = shift @{ $self->tasks };
last unless defined $block_task;
$self->cv->begin;
$self->fetch_block($block_task) ;
}
}
1
},
on_body => sub {
my ($partial_body, $hdr) = @_;
if ( $self->on_body($first_task)->($partial_body, $hdr) ) {
# 妿æ¯ç¬¬ä¸ä¸ªåçè¯, ä¸è½½å°æå®ç大å°å°±éè¦æå¼
if ( ( $hdr->{'content-length'} <= $self->block_size and $first_task->{size} == $hdr->{'content-length'} )
or
$first_task->{size} >= $self->block_size
) {
$self->cv->send(("The 0 block the compared failure", $hdr))
if !$self->on_block_finish->( $hdr, $first_task, $self->digest ? $first_task->{ctx}->hexdigest : '');
$self->cv->end;
return 0
}
}
return 1;
},
sub {
my (undef, $hdr) = @_;
undef $ev;
my $status = $hdr->{Status};
# on_body æ£å¸¸çä¸è½½
return if ( $hdr->{OrigStatus} and $hdr->{OrigStatus} == 200 ) or $hdr->{Status} == 200 or $hdr->{Status} == 416;
if ( ($status == 500 or $status == 503 or $status =~ /^59/) and $retry < $self->max_retries ) {
my $w; $w = AE::timer( $self->retry_interval, 0, sub {
$first_task->{pos} = $first_task->{ofs}; # é䏿¬åæ¶è¦ seek åé¶
$first_task->{size} = 0;
$first_task->{ctx} = undef;
$self->first_request(++$retry);
undef $w;
});
AE::log debug => "å°å $url çå 0 ä¸è½½åºé, éè¯";
return;
}
return $self->cv->send((
sprintf("Status: %s, Reason: %s.", $status ? $status : '500', $hdr->{Reason} ? $hdr->{Reason} : ' '),
$hdr)
);
}
}
sub shuffle_url {
my $self = shift;
my $urls = $self->url_status;
return (shuffle keys %$urls)[0];
}
sub on_body {
my ($self, $task) = @_;
return sub {
my ($partial_body, $hdr) = @_;
return 0 unless ($hdr->{Status} == 206 || $hdr->{Status} == 200);
my $len = length($partial_body);
# ä¸»è¦æ¯ç¨äºè§£å³ç¬¬ä¸ä¸ªåä¼è¶
è¿åçä½ç½®
if ( $task->{size} + $len > $self->block_size ) {
my $spsize = $len - ( $task->{size} + $len - $self->block_size );
$partial_body = substr($partial_body, 0, $spsize);
$len = $spsize;
}
$self->fh->start_range($task->{pos});
$self->fh->add_chunk($partial_body);
if ( $self->digest ) {
$task->{ctx} ||= AnyEvent::Digest->new($self->digest);
$task->{ctx}->add_async($partial_body);
}
$task->{pos} += $len;
$task->{size} += $len;
return 1;
}
}
sub fetch_block {
my ($self, $task, $retry) = @_;
$retry ||= 0;
my $url = $self->shuffle_url;
my $ev; $ev = http_get $url,
timeout => $self->timeout,
recurse => $self->recurse,
persistent => 1,
keepalive => 1,
headers => {
%{ $self->headers },
Range => $task->{range}
},
on_body => $self->on_body($task),
sub {
my ($hdl, $hdr) = @_;
my $status = $hdr->{Status};
undef $ev;
# æåä¸è½½å°çæµç¨
# 1. éè¦å¯¹æ¯å¤§å°æ¯å¦ä¸è´, æ¥ç对æ¯åè¾æ£
# 2. å¼å§ä¸ä¸ä¸ªä»»å¡çä¸è½½
# 3. å½ååå°±éåº, ä¸ç¶ä¸é¢ä¼éè¯
if ( $status == 200 || $status == 206 ) { # 第ä¸ä¸ªå, è¿äºä¸ªé½æå¯è½
# not ok åè¾æ£ä¸ç¸ç | ç´æ¥å¤±è´¥
return $self->cv->send(("The $task->{block} block the compared failure", $hdr))
if ($task->{size} != ( $task->{tail} -$task->{ofs} + 1 )
or !$self->on_block_finish->($hdl, $task, $self->digest ? $task->{ctx}->hexdigest : ''));
my $block_task = shift @{ $self->tasks };
# 宿, æ è®°ç»ææ¬æ¬¡è¯·æ±
# ok 大å°ç¸ç, åè¾æ£ç¸ç, å½ååä¸è½½å®æ, å¼å§ä¸è½½æ°ç
AE::log debug => "å°å $url çå $task->{block} ä¸è½½å®æ $$";
# å¤çæ¥ä¸æ¥çä¸ä¸ªè¯·æ±
$block_task ? $self->fetch_block($block_task) : $self->cv->end;
return;
}
# æ¯å¦éè¯çæµç¨
my $error = sprintf(
"Block %s the size is wrong, expect the size: %s actual size: %s, The %s try again, Status: %s, Reason: %s.",
$task->{block},
$self->block_size,
$task->{size},
$retry,
$status ? $status : '500',
$hdr->{Reason} ? $hdr->{Reason} : ' ', );
AE::log warn => $error;
# 失败
# 妿æå¯è½è¿è¿æ¥ä¸çååº, å°±éè¦éè¯, ç´å°è¾¾å°éè¯, 妿ä¸å¯è½è¿æ¥çååº, å°±ç´æ¥å¿«éçéåº
return $self->cv->send(($error, $hdr))
if $status !~ /^(59.|503|500|502|200|206|)$/ or $retry > $self->max_retries;
$self->retry($task, $retry);
}
};
sub retry {
my ($self, $task, $retry) = @_;
my $w;$w = AE::timer( $self->retry_interval, 0, sub {
$task->{pos} = $task->{ofs}; # é䏿¬åæ¶è¦ seek åé¶
$task->{size} = 0;
$task->{ctx} = undef;
$self->fetch_block( $task, ++$retry );
undef $w;
});
}
sub split_range {
my $self = shift;
my $length = shift;
# æ¯ä¸ªè¯·æ±ç段大å°çèå´,åè
my $block_size = $self->block_size;
my $segments = int($length / $block_size);
# è¦å¤ççåèçæ»æ°
my $len_remain = $length;
my @ranges;
my $block = 0;
while ( $len_remain > 0 ) {
# æ¯ä¸ª segment ç大å°
my $seg_len = $block_size;
# åç§»é¿åº¦
my $ofs = $length - $len_remain;
# å©ä½åè
$len_remain -= $seg_len;
my $tail = $ofs + $seg_len - 1;
if ( $length-1 < $tail) {
$tail = $length-1;
}
my $task = {
block => $block, # å½ååç¼å·
ofs => $ofs, # å½åçåç§»é
pos => $ofs, # æ¬åçèµ·ç¹
tail => $tail, # æ¬åçç»æ
range => 'bytes=' . $ofs . '-' . $tail,
size => 0, # æ»å
±ä¸è½½çé¿åº¦
};
$self->tasks->[$block] = $task;
$block++;
}
}
1;
__END__
=pod
=encoding utf8
=head1 NAME
( run in 1.564 second using v1.01-cache-2.11-cpan-63c85eba8c4 )