AnyEvent-BitTorrent

 view release on metacpan or  search on metacpan

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

        $s->files->[$i]->{fh} = ();
    }
    if ($m eq 'r') {
        AE::log trace => 'Opening %s to read', $s->files->[$i]->{path};
        sysopen($s->files->[$i]->{fh}, $s->files->[$i]->{path}, O_RDONLY)
            || return;
        flock($s->files->[$i]->{fh}, LOCK_SH) || return;
        weaken $s unless isweak $s;
        my $x = $i;
        $s->files->[$x]->{timeout}
            = AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
    }
    elsif ($m eq 'w') {
        AE::log trace => 'Opening %s to write', $s->files->[$i]->{path};
        my @split = File::Spec->splitdir($s->files->[$i]->{path});
        pop @split;    # File name itself
        my $dir = File::Spec->catdir(@split);
        File::Path::mkpath($dir) if !-d $dir;
        sysopen($s->files->[$i]->{fh},
                $s->files->[$i]->{path},
                O_WRONLY | O_CREAT)
            || return;
        flock $s->files->[$i]->{fh}, LOCK_EX;
        truncate $s->files->[$i]->{fh}, $s->files->[$i]->{length}
            if -s $s->files->[$i]->{fh}
            != $s->files->[$i]->{length};    # XXX - pre-allocate files
        weaken $s unless isweak $s;
        my $x = $i;
        $s->files->[$x]->{timeout}
            = AE::timer(60, 0, sub { $s // return; $s->_open($x, 'c') });
    }
    elsif ($m eq 'c') { $s->files->[$i]->{timeout} = () }
    else              {return}
    return $s->files->[$i]->{mode} = $m;
}
has piece_cache => (is => 'ro', isa => HashRef, default => sub { {} });

sub _cache_path {
    my $s = shift;
    File::Spec->catfile($s->basedir,

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

            sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
            sysread $s->files->[$file_index]->{fh}, my ($_data), $this_read;
            $data .= $_data if $_data;
            AE::log
                trace =>
                'Read %d bytes of data from file (%d bytes collected so far)',
                length $_data, length $data;
            weaken $s unless isweak $s;
            my $x = $file_index;
            $s->files->[$x]->{timeout}
                = AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
        }
        $file_index++;
        $length -= $this_read;
        AE::log
            trace => 'Still need to read %d bytes',
            $length;
        last READ if not defined $s->files->[$file_index];
        $total_offset = 0;
    }
    AE::log trace => 'Returning %d bytes of data', length $data;

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

            $s->_open($file_index, 'w');
            sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
            my $w = syswrite $s->files->[$file_index]->{fh}, substr $data, 0,
                $this_write, '';
            AE::log
                trace => 'Wrote %d bytes of data to file (%d bytes left)',
                $w, length $data;
            weaken $s unless isweak $s;
            my $x = $file_index;
            $s->files->[$x]->{timeout}
                = AE::timer(120, 0, sub { $s // return; $s->_open($x, 'c') });
        }
        $file_index++;
        last WRITE if not defined $s->files->[$file_index];
        $total_offset = 0;
    }
    return length $data;
}

sub hashcheck (;@) {
    my $s = shift;

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

    $s->{peers}{+$h} = {
        handle            => $h,
        peerid            => '',
        bitfield          => (pack 'b*', "\0" x $s->piece_count),
        remote_choked     => 1,
        remote_interested => 0,
        remote_requests   => [],
        local_choked      => 1,
        local_interested  => 0,
        local_requests    => [],
        timeout           => AE::timer(20, 0, sub { $s->_del_peer($h) }),
        keepalive         => AE::timer(
            30, 120,
            sub {
                $s->_send_encrypted($h, build_keepalive());
            }
        ),

        # BEP06
        local_allowed  => [],
        remote_allowed => [],
        local_suggest  => [],

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

        }
    };
    my $trackers = [
        map {
            {urls       => $_,
             complete   => 0,
             incomplete => 0,
             peers      => '',
             peers6     => '',
             announcer  => undef,
             ticker     => AE::timer(
                 1,
                 15 * 60,
                 sub {
                     return if $s->state eq 'stopped';
                     $s->announce('started');
                 }
             ),
             failures => 0
            }
            } defined $s->metadata->{announce} ? [$s->metadata->{announce}]

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

                $tier->{peers}
                    = compact_ipv4(
                             uncompact_ipv4($tier->{peers} . $reply->{peers}))
                    if $reply->{peers};
                $tier->{peers6}
                    = compact_ipv6(
                           uncompact_ipv6($tier->{peers6} . $reply->{peers6}))
                    if $reply->{peers6};
                $tier->{complete}   = $reply->{complete};
                $tier->{incomplete} = $reply->{incomplete};
                $tier->{ticker} = AE::timer(
                    $reply->{interval} // (15 * 60),
                    $reply->{interval} // (15 * 60),
                    sub {
                        return if $s->state eq 'stopped';
                        $s->_announce_tier($e, $tier);
                    }
                );
            }
        }
        else {    # XXX - Callback?
            $tier->{'failure reason'}
                = "HTTP Error: $hdr->{Status} $hdr->{Reason}\n";
            $tier->{failures}++;
            push @{$tier->{urls}}, shift @{$tier->{urls}};
            $s->_announce_tier($e, $tier);
        }
        }
}
has _choke_timer => (
    is       => 'bare',
    isa      => Ref,
    init_arg => undef,
    required => 1,
    default  => sub {
        my $s = shift;
        AE::timer(
            15, 45,
            sub {
                return if $s->state ne 'active';
                AE::log trace => 'Choke timer...';
                my @interested
                    = grep { $_->{remote_interested} && $_->{remote_choked} }
                    values %{$s->peers};

                # XXX - Limit the number of upload slots
                for my $p (@interested) {
                    $p->{remote_choked} = 0;
                    $s->_send_encrypted($p->{handle}, build_unchoke());
                    AE::log trace => 'Choked %s', $p->{peerid};
                }

                # XXX - Send choke to random peer
            }
        );
    }
);
has _fill_requests_timer => (
    is       => 'bare',
    isa      => Ref,
    init_arg => undef,
    required => 1,
    default  => sub {
        my $s = shift;
        AE::timer(
            15, 10,
            sub {    # XXX - Limit by time/bandwidth
                return if $s->state ne 'active';
                AE::log trace => 'Request fill timer...';
                my @waiting
                    = grep { defined && scalar @{$_->{remote_requests}} }
                    values %{$s->peers};
                return if !@waiting;
                my $total_sent = 0;
                while (@waiting && $total_sent < 2**20) {
                    my $p = splice(@waiting, rand @waiting, 1, ());
                    AE::log trace => 'Chosen peer: %s...', $p->{peerid};
                    while ($total_sent < 2**20 && @{$p->{remote_requests}}) {
                        my $req = shift @{$p->{remote_requests}};

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

                                )
                        );
                        $total_sent += $req->[2];
                    }
                }
                $s->_set_uploaded($s->uploaded + $total_sent);
            }
        );
    }
);
has _peer_timer => (is       => 'ro',
                    lazy     => 1,
                    isa      => Ref,
                    init_arg => undef,
                    clearer  => '_clear_peer_timer',
                    builder  => '_build_peer_timer'
);

sub _build_peer_timer {
    my $s = shift;
    AE::timer(
        1, 15,
        sub {
            return if !$s->_left;
            AE::log trace => 'Attempting to connect to new peer...';

            # XXX - Initiate connections when we are in Super seed mode?
            my @cache = map {
                $_->{peers} ? uncompact_ipv4($_->{peers}) : (),
                    $_->{peers6} ?
                    uncompact_ipv6($_->{peers6})

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

    elsif ($packet->{type} == $HANDSHAKE) {
        ref $packet->{payload} // return;
        return if ref $packet->{payload} ne 'ARRAY';
        $s->peers->{$h}{reserved} = $packet->{payload}[0];
        return $s->_del_peer($h)
            if $packet->{payload}[1] ne $s->infohash;
        $s->peers->{$h}{peerid} = $packet->{payload}[2];
        $s->_send_handshake($h);
        $s->_send_bitfield($h);
        $s->peers->{$h}{timeout}
            = AE::timer(60, 0, sub { $s->_del_peer($h) });
        $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
        $h->on_read(sub { $s->_on_read(@_) });
    }
    else {    # This should never happen
    }
    1;
}

sub _on_read {
    my ($s, $h) = @_;

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

            # Do nothing!
        }
        elsif ($packet->{type} == $HANDSHAKE) {
            ref $packet->{payload} // return;
            $s->peers->{$h}{reserved} = $packet->{payload}[0];
            return $s->_del_peer($h)
                if $packet->{payload}[1] ne $s->infohash;
            $s->peers->{$h}{peerid} = $packet->{payload}[2];
            $s->_send_bitfield($h);
            $s->peers->{$h}{timeout}
                = AE::timer(60, 0, sub { $s->_del_peer($h) });
            $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
        }
        elsif ($packet->{type} == $INTERESTED) {
            $s->peers->{$h}{remote_interested} = 1;
        }
        elsif ($packet->{type} == $NOT_INTERESTED) {
            $s->peers->{$h}{remote_interested} = 0;

            # XXX - Clear any requests in queue
            # XXX - Send choke just to be sure

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

                    $s->working_pieces->{$req->[0]}{$req->[1]}[3] = ()
                        unless
                        defined $s->working_pieces->{$req->[0]}{$req->[1]}[4];
                }
            }
            $s->_consider_peer($s->peers->{$h});
        }
        elsif ($packet->{type} == $UNCHOKE) {
            $s->peers->{$h}{local_choked} = 0;
            $s->peers->{$h}{timeout}
                = AE::timer(120, 0, sub { $s->_del_peer($h) });
            $s->_request_pieces($s->peers->{$h});
        }
        elsif ($packet->{type} == $HAVE) {
            vec($s->peers->{$h}{bitfield}, $packet->{payload}, 1) = 1;
            $s->_consider_peer($s->peers->{$h});
            $s->peers->{$h}{timeout}
                = AE::timer(60, 0, sub { $s->_del_peer($h) });
        }
        elsif ($packet->{type} == $BITFIELD) {
            $s->peers->{$h}{bitfield} = $packet->{payload};
            $s->_consider_peer($s->peers->{$h});
        }
        elsif ($packet->{type} == $REQUEST) {
            $s->peers->{$h}{timeout}
                = AE::timer(120, 0, sub { $s->_del_peer($h) });

            # XXX - Make sure (index + offset + length) < $s->size
            #       if not, send reject if remote supports fast ext
            #       either way, ignore the request
            push @{$s->peers->{$h}{remote_requests}}, $packet->{payload};
        }
        elsif ($packet->{type} == $PIECE) {
            $s->peers->{$h}{timeout}
                = AE::timer(120, 0, sub { $s->_del_peer($h) });
            my ($index, $offset, $data) = @{$packet->{payload}};

            # Make sure $index is a working piece
            $s->working_pieces->{$index} // return;

            # Make sure we req from this peer
            return
                if !grep {
                       $_->[0] == $index
                    && $_->[1] == $offset

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

                } @{$s->peers->{$h}{remote_requests}}
            ];
        }
        elsif ($packet->{type} == $SUGGEST) {
            push @{$s->peers->{$h}{local_suggest}}, $packet->{payload};
        }
        elsif ($packet->{type} == $HAVE_ALL) {
            $s->peers->{$h}{bitfield} = pack 'b*', (1 x $s->piece_count);
            $s->_consider_peer($s->peers->{$h});
            $s->peers->{$h}{timeout}
                = AE::timer(120, 0, sub { $s->_del_peer($h) });
        }
        elsif ($packet->{type} == $HAVE_NONE) {
            $s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
            $s->peers->{$h}{timeout}
                = AE::timer(30, 0, sub { $s->_del_peer($h) });
        }
        elsif ($packet->{type} == $REJECT) {
            my ($index, $offset, $length) = @{$packet->{payload}};
            return    # XXX - error callback if this block is not in the queue
                if !grep {
                       $_->[0] == $index
                    && $_->[1] == $offset
                    && $_->[2] == $length
                } @{$s->peers->{$h}{local_requests}};
            $s->working_pieces->{$index}{$offset}->[3] = ();
            $s->peers->{$h}{local_requests} = [
                grep {
                           ($_->[0] != $index)
                        || ($_->[1] != $offset)
                        || ($_->[2] != $length)
                } @{$s->peers->{$h}{local_requests}}
            ];
            $s->peers->{$h}{timeout}
                = AE::timer(30, 0, sub { $s->_del_peer($h) });
        }
        elsif ($packet->{type} == $ALLOWED_FAST) {
            push @{$s->peers->{$h}{local_allowed}}, $packet->{payload};
        }
        else {
            # use Data::Dump qw[pp];
            # die 'Unhandled packet: ' . pp $packet;
        }
        last
            if 5 > length($h->rbuf // '');    # Min size for protocol

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN

        AE::log
            trace => 'Requesting %d, %d, %d',
            $index, $offset, $_block_size;
        $s->_send_encrypted($p->{handle},
                            build_request($index, $offset, $_block_size))
            ;    # XXX - len for last piece
        $s->working_pieces->{$index}{$offset} = [
            $index, $offset,
            $_block_size,
            $p,     undef,
            AE::timer(
                60, 0,
                sub {
                    $p // return;
                    $p->{handle} // return;
                    $s->_send_encrypted($p->{handle},
                                 build_cancel($index, $offset, $_block_size));
                    $s->working_pieces->{$index}{$offset}[3] = ();
                    $p->{local_requests} = [
                        grep {
                                   $_->[0] != $index
                                || $_->[1] != $offset
                                || $_->[2] != $_block_size
                        } @{$p->{local_requests}}
                    ];
                    $p->{timeout} = AE::timer(45, 0,
                                         sub { $s->_del_peer($p->{handle}) });

                    #$s->_request_pieces( $p) #  XXX - Ask a different peer
                }
            )
        ];
        weaken($s->working_pieces->{$index}{$offset}[3])
            unless isweak($s->working_pieces->{$index}{$offset}[3]);
        push @{$p->{local_requests}}, [$index, $offset, $_block_size];
    }

lib/AnyEvent/BitTorrent.pm  view on Meta::CPAN


sub stop {
    my $s = shift;
    AE::log debug => 'Stopping...';
    return if $s->state eq 'stopped';
    AE::log trace => 'Announcing "stopped" event to trackers...';
    $s->announce('stopped');
    AE::log trace => 'Disconnecting peers...';
    $s->_clear_peers;
    AE::log trace => 'Stopping new peers ticker...';
    $s->_clear_peer_timer;
    AE::log trace => 'Closing files...';
    $s->_open($_, 'c') for 0 .. $#{$s->files};
    AE::log trace => 'Setting internal status...';
    $s->_set_state('stopped');
}

sub start {
    my $s = shift;
    AE::log debug => 'Starting...';
    $s->announce('started') unless $s->state eq 'active';
    $s->peers;
    AE::log trace => 'Starting new peers ticker...';
    $s->_peer_timer;
    AE::log trace => 'Setting internal status...';
    $s->_set_state('active');
}

sub pause {
    my $s = shift;
    AE::log debug => 'Pausing...';
    $s->peers;
    AE::log trace => 'Starting new peers ticker...';
    $s->_peer_timer;
    AE::log trace => 'Setting internal status...';
    $s->_set_state('paused');
}
#
sub BUILD {
    my ($s, $a) = @_;
    AE::log debug => 'BUILD()';
    $s->start && AE::log debug => 'Calling ->start()'
        if $s->state eq 'active';
    $s->paused && AE::log debug => 'Calling ->paused() '

t/000_tests/004_global.t  view on Meta::CPAN

use AnyEvent::BitTorrent;
use Net::BitTorrent::Protocol qw[:all];
use Test::More;
use File::Temp;
$|++;
my $torrent = q[t/900_data/kubuntu-active-13.04-desktop-i386.iso.torrent];
my $basedir = File::Temp::tempdir('AB_XXXX', TMPDIR => 1);
chdir '../..' if !-f $torrent;
my $cv = AE::cv;
my $client;
my $to = AE::timer(90, 0, sub { diag 'Timeout'; ok 'Timeout'; $cv->send });
#
$client = AnyEvent::BitTorrent->new(
    basedir      => $basedir,
    path         => $torrent,
    on_hash_pass => sub {
        pass 'Got piece number ' . pop;
        $client->stop;
        $cv->send;
    }
);

t/000_tests/005_local.t  view on Meta::CPAN

use lib '../../lib';
use AnyEvent::BitTorrent;
#use Net::BitTorrent::Protocol qw[:all];
use Test::More;
use File::Temp;
$|++;
my $torrent = q[t/900_data/kubuntu-active-13.04-desktop-i386.iso.torrent];
chdir '../..' if !-f $torrent;
require t::800_utils::Tracker::HTTP;
my $cv = AE::cv;
my $to = AE::timer(90, 0, sub { diag 'Timeout'; ok 'Timeout'; $cv->send });

#
my $tracker =
    t::800_utils::Tracker::HTTP->new(host     => '127.0.0.1',
                                     interval => 15
    );
note 'HTTP tracker @ http://'
    . $tracker->host . ':'
    . $tracker->port
    . '/announce.pl';

t/000_tests/005_local.t  view on Meta::CPAN

for my $p ($client, $peer) {
$p->hashcheck();
# add local tracker
push @{$p->trackers}, {
        urls => [
            'http://' . $tracker->host . ':' . $tracker->port . '/announce.pl'
        ],
        complete   => 0,
        incomplete => 0,
        peers      => '',
        ticker     => AE::timer(
            1,
            rand(15) + 5,
            sub {
                return if $p->state eq 'stopped';
                $p->announce();
                note 'Announced from ' . $p->peerid
            }
        ),
        failures => 0
    };}



( run in 1.152 second using v1.01-cache-2.11-cpan-49f99fa48dc )