view release on metacpan or search on metacpan
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
$s->files->[$i]->{fh} = ();
}
if ($m eq 'r') {
AE::log trace => 'Opening %s to read', $s->files->[$i]->{path};
sysopen($s->files->[$i]->{fh}, $s->files->[$i]->{path}, O_RDONLY)
|| return;
flock($s->files->[$i]->{fh}, LOCK_SH) || return;
weaken $s unless isweak $s;
my $x = $i;
$s->files->[$x]->{timeout}
= AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
}
elsif ($m eq 'w') {
AE::log trace => 'Opening %s to write', $s->files->[$i]->{path};
my @split = File::Spec->splitdir($s->files->[$i]->{path});
pop @split; # File name itself
my $dir = File::Spec->catdir(@split);
File::Path::mkpath($dir) if !-d $dir;
sysopen($s->files->[$i]->{fh},
$s->files->[$i]->{path},
O_WRONLY | O_CREAT)
|| return;
flock $s->files->[$i]->{fh}, LOCK_EX;
truncate $s->files->[$i]->{fh}, $s->files->[$i]->{length}
if -s $s->files->[$i]->{fh}
!= $s->files->[$i]->{length}; # XXX - pre-allocate files
weaken $s unless isweak $s;
my $x = $i;
$s->files->[$x]->{timeout}
= AE::timer(60, 0, sub { $s // return; $s->_open($x, 'c') });
}
elsif ($m eq 'c') { $s->files->[$i]->{timeout} = () }
else {return}
return $s->files->[$i]->{mode} = $m;
}
has piece_cache => (is => 'ro', isa => HashRef, default => sub { {} });
sub _cache_path {
my $s = shift;
File::Spec->catfile($s->basedir,
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
sysread $s->files->[$file_index]->{fh}, my ($_data), $this_read;
$data .= $_data if $_data;
AE::log
trace =>
'Read %d bytes of data from file (%d bytes collected so far)',
length $_data, length $data;
weaken $s unless isweak $s;
my $x = $file_index;
$s->files->[$x]->{timeout}
= AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
}
$file_index++;
$length -= $this_read;
AE::log
trace => 'Still need to read %d bytes',
$length;
last READ if not defined $s->files->[$file_index];
$total_offset = 0;
}
AE::log trace => 'Returning %d bytes of data', length $data;
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
$s->_open($file_index, 'w');
sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
my $w = syswrite $s->files->[$file_index]->{fh}, substr $data, 0,
$this_write, '';
AE::log
trace => 'Wrote %d bytes of data to file (%d bytes left)',
$w, length $data;
weaken $s unless isweak $s;
my $x = $file_index;
$s->files->[$x]->{timeout}
= AE::timer(120, 0, sub { $s // return; $s->_open($x, 'c') });
}
$file_index++;
last WRITE if not defined $s->files->[$file_index];
$total_offset = 0;
}
return length $data;
}
sub hashcheck (;@) {
my $s = shift;
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
$s->{peers}{+$h} = {
handle => $h,
peerid => '',
bitfield => (pack 'b*', "\0" x $s->piece_count),
remote_choked => 1,
remote_interested => 0,
remote_requests => [],
local_choked => 1,
local_interested => 0,
local_requests => [],
timeout => AE::timer(20, 0, sub { $s->_del_peer($h) }),
keepalive => AE::timer(
30, 120,
sub {
$s->_send_encrypted($h, build_keepalive());
}
),
# BEP06
local_allowed => [],
remote_allowed => [],
local_suggest => [],
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
}
};
my $trackers = [
map {
{urls => $_,
complete => 0,
incomplete => 0,
peers => '',
peers6 => '',
announcer => undef,
ticker => AE::timer(
1,
15 * 60,
sub {
return if $s->state eq 'stopped';
$s->announce('started');
}
),
failures => 0
}
} defined $s->metadata->{announce} ? [$s->metadata->{announce}]
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
$tier->{peers}
= compact_ipv4(
uncompact_ipv4($tier->{peers} . $reply->{peers}))
if $reply->{peers};
$tier->{peers6}
= compact_ipv6(
uncompact_ipv6($tier->{peers6} . $reply->{peers6}))
if $reply->{peers6};
$tier->{complete} = $reply->{complete};
$tier->{incomplete} = $reply->{incomplete};
$tier->{ticker} = AE::timer(
$reply->{interval} // (15 * 60),
$reply->{interval} // (15 * 60),
sub {
return if $s->state eq 'stopped';
$s->_announce_tier($e, $tier);
}
);
}
}
else { # XXX - Callback?
$tier->{'failure reason'}
= "HTTP Error: $hdr->{Status} $hdr->{Reason}\n";
$tier->{failures}++;
push @{$tier->{urls}}, shift @{$tier->{urls}};
$s->_announce_tier($e, $tier);
}
}
}
has _choke_timer => (
is => 'bare',
isa => Ref,
init_arg => undef,
required => 1,
default => sub {
my $s = shift;
AE::timer(
15, 45,
sub {
return if $s->state ne 'active';
AE::log trace => 'Choke timer...';
my @interested
= grep { $_->{remote_interested} && $_->{remote_choked} }
values %{$s->peers};
# XXX - Limit the number of upload slots
for my $p (@interested) {
$p->{remote_choked} = 0;
$s->_send_encrypted($p->{handle}, build_unchoke());
AE::log trace => 'Choked %s', $p->{peerid};
}
# XXX - Send choke to random peer
}
);
}
);
has _fill_requests_timer => (
is => 'bare',
isa => Ref,
init_arg => undef,
required => 1,
default => sub {
my $s = shift;
AE::timer(
15, 10,
sub { # XXX - Limit by time/bandwidth
return if $s->state ne 'active';
AE::log trace => 'Request fill timer...';
my @waiting
= grep { defined && scalar @{$_->{remote_requests}} }
values %{$s->peers};
return if !@waiting;
my $total_sent = 0;
while (@waiting && $total_sent < 2**20) {
my $p = splice(@waiting, rand @waiting, 1, ());
AE::log trace => 'Chosen peer: %s...', $p->{peerid};
while ($total_sent < 2**20 && @{$p->{remote_requests}}) {
my $req = shift @{$p->{remote_requests}};
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
)
);
$total_sent += $req->[2];
}
}
$s->_set_uploaded($s->uploaded + $total_sent);
}
);
}
);
has _peer_timer => (is => 'ro',
lazy => 1,
isa => Ref,
init_arg => undef,
clearer => '_clear_peer_timer',
builder => '_build_peer_timer'
);
sub _build_peer_timer {
my $s = shift;
AE::timer(
1, 15,
sub {
return if !$s->_left;
AE::log trace => 'Attempting to connect to new peer...';
# XXX - Initiate connections when we are in Super seed mode?
my @cache = map {
$_->{peers} ? uncompact_ipv4($_->{peers}) : (),
$_->{peers6} ?
uncompact_ipv6($_->{peers6})
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
elsif ($packet->{type} == $HANDSHAKE) {
ref $packet->{payload} // return;
return if ref $packet->{payload} ne 'ARRAY';
$s->peers->{$h}{reserved} = $packet->{payload}[0];
return $s->_del_peer($h)
if $packet->{payload}[1] ne $s->infohash;
$s->peers->{$h}{peerid} = $packet->{payload}[2];
$s->_send_handshake($h);
$s->_send_bitfield($h);
$s->peers->{$h}{timeout}
= AE::timer(60, 0, sub { $s->_del_peer($h) });
$s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
$h->on_read(sub { $s->_on_read(@_) });
}
else { # This should never happen
}
1;
}
sub _on_read {
my ($s, $h) = @_;
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
# Do nothing!
}
elsif ($packet->{type} == $HANDSHAKE) {
ref $packet->{payload} // return;
$s->peers->{$h}{reserved} = $packet->{payload}[0];
return $s->_del_peer($h)
if $packet->{payload}[1] ne $s->infohash;
$s->peers->{$h}{peerid} = $packet->{payload}[2];
$s->_send_bitfield($h);
$s->peers->{$h}{timeout}
= AE::timer(60, 0, sub { $s->_del_peer($h) });
$s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
}
elsif ($packet->{type} == $INTERESTED) {
$s->peers->{$h}{remote_interested} = 1;
}
elsif ($packet->{type} == $NOT_INTERESTED) {
$s->peers->{$h}{remote_interested} = 0;
# XXX - Clear any requests in queue
# XXX - Send choke just to be sure
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
$s->working_pieces->{$req->[0]}{$req->[1]}[3] = ()
unless
defined $s->working_pieces->{$req->[0]}{$req->[1]}[4];
}
}
$s->_consider_peer($s->peers->{$h});
}
elsif ($packet->{type} == $UNCHOKE) {
$s->peers->{$h}{local_choked} = 0;
$s->peers->{$h}{timeout}
= AE::timer(120, 0, sub { $s->_del_peer($h) });
$s->_request_pieces($s->peers->{$h});
}
elsif ($packet->{type} == $HAVE) {
vec($s->peers->{$h}{bitfield}, $packet->{payload}, 1) = 1;
$s->_consider_peer($s->peers->{$h});
$s->peers->{$h}{timeout}
= AE::timer(60, 0, sub { $s->_del_peer($h) });
}
elsif ($packet->{type} == $BITFIELD) {
$s->peers->{$h}{bitfield} = $packet->{payload};
$s->_consider_peer($s->peers->{$h});
}
elsif ($packet->{type} == $REQUEST) {
$s->peers->{$h}{timeout}
= AE::timer(120, 0, sub { $s->_del_peer($h) });
# XXX - Make sure (index + offset + length) < $s->size
# if not, send reject if remote supports fast ext
# either way, ignore the request
push @{$s->peers->{$h}{remote_requests}}, $packet->{payload};
}
elsif ($packet->{type} == $PIECE) {
$s->peers->{$h}{timeout}
= AE::timer(120, 0, sub { $s->_del_peer($h) });
my ($index, $offset, $data) = @{$packet->{payload}};
# Make sure $index is a working piece
$s->working_pieces->{$index} // return;
# Make sure we req from this peer
return
if !grep {
$_->[0] == $index
&& $_->[1] == $offset
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
} @{$s->peers->{$h}{remote_requests}}
];
}
elsif ($packet->{type} == $SUGGEST) {
push @{$s->peers->{$h}{local_suggest}}, $packet->{payload};
}
elsif ($packet->{type} == $HAVE_ALL) {
$s->peers->{$h}{bitfield} = pack 'b*', (1 x $s->piece_count);
$s->_consider_peer($s->peers->{$h});
$s->peers->{$h}{timeout}
= AE::timer(120, 0, sub { $s->_del_peer($h) });
}
elsif ($packet->{type} == $HAVE_NONE) {
$s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
$s->peers->{$h}{timeout}
= AE::timer(30, 0, sub { $s->_del_peer($h) });
}
elsif ($packet->{type} == $REJECT) {
my ($index, $offset, $length) = @{$packet->{payload}};
return # XXX - error callback if this block is not in the queue
if !grep {
$_->[0] == $index
&& $_->[1] == $offset
&& $_->[2] == $length
} @{$s->peers->{$h}{local_requests}};
$s->working_pieces->{$index}{$offset}->[3] = ();
$s->peers->{$h}{local_requests} = [
grep {
($_->[0] != $index)
|| ($_->[1] != $offset)
|| ($_->[2] != $length)
} @{$s->peers->{$h}{local_requests}}
];
$s->peers->{$h}{timeout}
= AE::timer(30, 0, sub { $s->_del_peer($h) });
}
elsif ($packet->{type} == $ALLOWED_FAST) {
push @{$s->peers->{$h}{local_allowed}}, $packet->{payload};
}
else {
# use Data::Dump qw[pp];
# die 'Unhandled packet: ' . pp $packet;
}
last
if 5 > length($h->rbuf // ''); # Min size for protocol
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
AE::log
trace => 'Requesting %d, %d, %d',
$index, $offset, $_block_size;
$s->_send_encrypted($p->{handle},
build_request($index, $offset, $_block_size))
; # XXX - len for last piece
$s->working_pieces->{$index}{$offset} = [
$index, $offset,
$_block_size,
$p, undef,
AE::timer(
60, 0,
sub {
$p // return;
$p->{handle} // return;
$s->_send_encrypted($p->{handle},
build_cancel($index, $offset, $_block_size));
$s->working_pieces->{$index}{$offset}[3] = ();
$p->{local_requests} = [
grep {
$_->[0] != $index
|| $_->[1] != $offset
|| $_->[2] != $_block_size
} @{$p->{local_requests}}
];
$p->{timeout} = AE::timer(45, 0,
sub { $s->_del_peer($p->{handle}) });
#$s->_request_pieces( $p) # XXX - Ask a different peer
}
)
];
weaken($s->working_pieces->{$index}{$offset}[3])
unless isweak($s->working_pieces->{$index}{$offset}[3]);
push @{$p->{local_requests}}, [$index, $offset, $_block_size];
}
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
sub stop {
my $s = shift;
AE::log debug => 'Stopping...';
return if $s->state eq 'stopped';
AE::log trace => 'Announcing "stopped" event to trackers...';
$s->announce('stopped');
AE::log trace => 'Disconnecting peers...';
$s->_clear_peers;
AE::log trace => 'Stopping new peers ticker...';
$s->_clear_peer_timer;
AE::log trace => 'Closing files...';
$s->_open($_, 'c') for 0 .. $#{$s->files};
AE::log trace => 'Setting internal status...';
$s->_set_state('stopped');
}
sub start {
my $s = shift;
AE::log debug => 'Starting...';
$s->announce('started') unless $s->state eq 'active';
$s->peers;
AE::log trace => 'Starting new peers ticker...';
$s->_peer_timer;
AE::log trace => 'Setting internal status...';
$s->_set_state('active');
}
sub pause {
my $s = shift;
AE::log debug => 'Pausing...';
$s->peers;
AE::log trace => 'Starting new peers ticker...';
$s->_peer_timer;
AE::log trace => 'Setting internal status...';
$s->_set_state('paused');
}
#
sub BUILD {
my ($s, $a) = @_;
AE::log debug => 'BUILD()';
$s->start && AE::log debug => 'Calling ->start()'
if $s->state eq 'active';
$s->paused && AE::log debug => 'Calling ->paused() '
t/000_tests/004_global.t view on Meta::CPAN
use AnyEvent::BitTorrent;
use Net::BitTorrent::Protocol qw[:all];
use Test::More;
use File::Temp;
$|++;
my $torrent = q[t/900_data/kubuntu-active-13.04-desktop-i386.iso.torrent];
my $basedir = File::Temp::tempdir('AB_XXXX', TMPDIR => 1);
chdir '../..' if !-f $torrent;
my $cv = AE::cv;
my $client;
my $to = AE::timer(90, 0, sub { diag 'Timeout'; ok 'Timeout'; $cv->send });
#
$client = AnyEvent::BitTorrent->new(
basedir => $basedir,
path => $torrent,
on_hash_pass => sub {
pass 'Got piece number ' . pop;
$client->stop;
$cv->send;
}
);
t/000_tests/005_local.t view on Meta::CPAN
use lib '../../lib';
use AnyEvent::BitTorrent;
#use Net::BitTorrent::Protocol qw[:all];
use Test::More;
use File::Temp;
$|++;
my $torrent = q[t/900_data/kubuntu-active-13.04-desktop-i386.iso.torrent];
chdir '../..' if !-f $torrent;
require t::800_utils::Tracker::HTTP;
my $cv = AE::cv;
my $to = AE::timer(90, 0, sub { diag 'Timeout'; ok 'Timeout'; $cv->send });
#
my $tracker =
t::800_utils::Tracker::HTTP->new(host => '127.0.0.1',
interval => 15
);
note 'HTTP tracker @ http://'
. $tracker->host . ':'
. $tracker->port
. '/announce.pl';
t/000_tests/005_local.t view on Meta::CPAN
for my $p ($client, $peer) {
$p->hashcheck();
# add local tracker
push @{$p->trackers}, {
urls => [
'http://' . $tracker->host . ':' . $tracker->port . '/announce.pl'
],
complete => 0,
incomplete => 0,
peers => '',
ticker => AE::timer(
1,
rand(15) + 5,
sub {
return if $p->state eq 'stopped';
$p->announce();
note 'Announced from ' . $p->peerid
}
),
failures => 0
};}