AnyEvent-MultiDownload

 view release on metacpan or  search on metacpan

lib/AnyEvent/MultiDownload.pm  view on Meta::CPAN

    my ($self, $retry) = @_;
    my $url = $self->shuffle_url;

    my $first_task;
    my $ev; $ev = http_get $url,
        headers     => $self->headers, 
        timeout     => $self->timeout,
        recurse     => $self->recurse,
        on_header   => sub {
            my ($hdr) = @_;
            if ( $hdr->{Status} == 200 ) {
                my $len = $hdr->{'content-length'};
                return $self->cv->send(("Cannot find a content-length header.", $hdr)) 
                    if !defined($len) or $len <= 0;

                # 准备开始下载的信息
                my $ranges = $self->split_range($len);

                # 除了第一个块, 其它块现在开始下载
                # 事件开始, 但这个回调会在最后才调用.
                $first_task = shift @{ $self->tasks };
                $first_task->{block} = $first_task->{block} || 0;
                $first_task->{ofs}   = $first_task->{ofs}   || 0;
                return 1 if $len <= $self->block_size;

                for ( 1 .. $self->max_per_host ) {
                    my $block_task = shift @{ $self->tasks };
                    last unless defined $block_task;
                    $self->cv->begin;
                    $self->fetch_block($block_task) ;

                }
            }
            1
        },
        on_body   => sub {
            my ($partial_body, $hdr) = @_;
            if ( $self->on_body($first_task)->($partial_body, $hdr) ) {
                # 如果是第一个块的话, 下载到指定的大小就需要断开
                if ( ( $hdr->{'content-length'} <= $self->block_size and  $first_task->{size} == $hdr->{'content-length'} )
                        or 
                        $first_task->{size} >= $self->block_size
                    ) {

                    $self->cv->send(("The 0 block the compared failure", $hdr)) 
                        if !$self->on_block_finish->( $hdr, $first_task, $self->digest ? $first_task->{ctx}->hexdigest : '');
                    $self->cv->end;
                    return 0
                }
            }
            return 1;
        },
        sub {
            my (undef, $hdr) = @_;
            undef $ev;
            my $status = $hdr->{Status};
            # on_body 正常的下载
            return if ( $hdr->{OrigStatus} and $hdr->{OrigStatus} == 200 ) or $hdr->{Status} == 200 or $hdr->{Status} == 416;

            if ( ($status == 500 or $status == 503 or $status =~ /^59/) and $retry < $self->max_retries ) {
                my $w; $w = AE::timer( $self->retry_interval, 0, sub {
                    $first_task->{pos}  = $first_task->{ofs}; # 重下本块时要 seek 回零
                    $first_task->{size} = 0;
                    $first_task->{ctx}  = undef;
                     $self->first_request(++$retry);
                    undef $w;
                });
                AE::log debug => "地址 $url 的块 0 下载出错, 重试";
                return;
            }

            return $self->cv->send(( 
                sprintf("Status: %s, Reason: %s.", $status ? $status : '500', $hdr->{Reason} ? $hdr->{Reason} : ' '), 
                $hdr)
            );
        }
}


sub shuffle_url {
    my $self = shift;
    my $urls = $self->url_status;
    return (shuffle keys %$urls)[0];
}

sub on_body {
    my ($self, $task) = @_; 
    return sub {
        my ($partial_body, $hdr) = @_;
        return 0 unless ($hdr->{Status} == 206 || $hdr->{Status} == 200);

        my $len = length($partial_body);
        # 主要是用于解决第一个块会超过写的位置
        if ( $task->{size} + $len > $self->block_size ) {
            my $spsize = $len - ( $task->{size} + $len - $self->block_size );
            $partial_body = substr($partial_body, 0, $spsize);
            $len = $spsize; 
        }

        $self->fh->start_range($task->{pos});
        $self->fh->add_chunk($partial_body);

        if ( $self->digest ) {
            $task->{ctx} ||= AnyEvent::Digest->new($self->digest);
            $task->{ctx}->add_async($partial_body);
        }

        $task->{pos}   += $len;
        $task->{size}  += $len;
        return 1;
    }
}

sub fetch_block {
    my ($self, $task, $retry) = @_; 
    $retry ||= 0;
    my $url   = $self->shuffle_url;

    my $ev; $ev = http_get $url,
        timeout     => $self->timeout,
        recurse     => $self->recurse,
        persistent  => 1,
        keepalive   => 1,
        headers     => { 
            %{ $self->headers }, 
            Range => $task->{range} 
        },
        on_body => $self->on_body($task),
        sub {
            my ($hdl, $hdr) = @_;
            my $status = $hdr->{Status};
            undef $ev;

            # 成功下载到的流程
            # 1. 需要对比大小是否一致, 接着对比块较检
            # 2. 开始下一个任务的下载
            # 3. 当前块就退出, 不然下面会重试
            if ( $status == 200 || $status == 206  ) { # 第一个块, 这二个都有可能
                # not ok 块较检不相等 | 直接失败
                return $self->cv->send(("The $task->{block} block the compared failure", $hdr)) 
                    if ($task->{size} != ( $task->{tail} -$task->{ofs} + 1 ) 
                        or !$self->on_block_finish->($hdl, $task, $self->digest ? $task->{ctx}->hexdigest : ''));
                my $block_task = shift @{ $self->tasks };
                # 完成, 标记结束本次请求
                # ok  大小相等, 块较检相等, 当前块下载完成, 开始下载新的 
                AE::log debug => "地址 $url 的块 $task->{block} 下载完成 $$";
                # 处理接下来的一个请求
                $block_task ? $self->fetch_block($block_task) : $self->cv->end; 
                return; 
            } 

            # 是否重试的流程
            my $error  = sprintf(
                "Block %s the size is wrong, expect the size: %s actual size: %s, The %s try again,  Status: %s, Reason: %s.", 
                $task->{block},
                $self->block_size,
                $task->{size},
                $retry,
		    	$status ? $status : '500', 
                $hdr->{Reason} ? $hdr->{Reason} : ' ', );
            AE::log warn => $error; 
            
            # 失败
            # 如果有可能还连接上的响应, 就需要重试, 直到达到重试, 如果不可能连接的响应, 就直接快速的退出
            return $self->cv->send(($error, $hdr)) 
                if $status !~ /^(59.|503|500|502|200|206|)$/ or $retry > $self->max_retries;
            
            $self->retry($task, $retry);
        }
};

sub retry {
    my ($self, $task, $retry) = @_;
    my $w;$w = AE::timer( $self->retry_interval, 0, sub {
        $task->{pos}  = $task->{ofs}; # 重下本块时要 seek 回零
        $task->{size} = 0;
        $task->{ctx}  = undef;
        $self->fetch_block( $task, ++$retry );
        undef $w;
    });
}

sub split_range {
    my $self    = shift;
    my $length  = shift;

    # 每个请求的段大小的范围,字节
    my $block_size   = $self->block_size;
    my $segments   = int($length / $block_size);

    # 要处理的字节的总数
    my $len_remain = $length;

    my @ranges;
    my $block = 0;
    while ( $len_remain > 0 ) {
        # 每个 segment  的大小
        my $seg_len = $block_size;

        # 偏移长度
        my $ofs = $length - $len_remain;
        
        # 剩余字节
        $len_remain -= $seg_len;

        my $tail  = $ofs + $seg_len - 1; 
        if ( $length-1  < $tail) {
            $tail = $length-1;
        }

        my $task  = { 
            block => $block, # 当前块编号
            ofs   => $ofs,   # 当前的偏移量
            pos   => $ofs,   # 本块的起点
            tail  => $tail,  # 本块的结束
            range => 'bytes=' . $ofs . '-' . $tail, 
            size  => 0,      # 总共下载的长度
        }; 

        $self->tasks->[$block] = $task; 
        $block++;
    }
}

1;

__END__

=pod
 
=encoding utf8

=head1 NAME



( run in 1.564 second using v1.01-cache-2.11-cpan-63c85eba8c4 )