AnyEvent-BitTorrent
view release on metacpan or search on metacpan
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
[
map {
{priority => 1,
fh => undef,
mode => 'c',
length => $_->{length},
timeout => undef,
path =>
File::Spec->rel2abs(
File::Spec->catfile($s->basedir, $s->name, @{$_->{path}})
)
}
} @{$s->metadata->{info}{files}}
]
: [
{priority => 1,
fh => undef,
mode => 'c',
length => $s->metadata->{info}{length},
timeout => undef,
path =>
File::Spec->rel2abs(File::Spec->catfile($s->basedir, $s->name))
}
];
}
has size => (is => 'ro',
lazy => 1,
builder => '_build_size',
isa => Int,
init_arg => undef
);
sub _build_size {
my $s = shift;
my $ret = 0;
$ret += $_->{length} for @{$s->files};
AE::log debug => '_build_size() => ' . $ret;
$ret;
}
sub _open {
my ($s, $i, $m) = @_;
AE::log
trace => 'Opening file #%d (%s) for %s',
$i, $s->files->[$i]->{path}, $m;
return 1 if $s->files->[$i]->{mode} eq $m;
if (defined $s->files->[$i]->{fh}) {
AE::log trace => 'Closing %s', $s->files->[$i]->{fh};
flock $s->files->[$i]->{fh}, LOCK_UN;
close $s->files->[$i]->{fh};
$s->files->[$i]->{fh} = ();
}
if ($m eq 'r') {
AE::log trace => 'Opening %s to read', $s->files->[$i]->{path};
sysopen($s->files->[$i]->{fh}, $s->files->[$i]->{path}, O_RDONLY)
|| return;
flock($s->files->[$i]->{fh}, LOCK_SH) || return;
weaken $s unless isweak $s;
my $x = $i;
$s->files->[$x]->{timeout}
= AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
}
elsif ($m eq 'w') {
AE::log trace => 'Opening %s to write', $s->files->[$i]->{path};
my @split = File::Spec->splitdir($s->files->[$i]->{path});
pop @split; # File name itself
my $dir = File::Spec->catdir(@split);
File::Path::mkpath($dir) if !-d $dir;
sysopen($s->files->[$i]->{fh},
$s->files->[$i]->{path},
O_WRONLY | O_CREAT)
|| return;
flock $s->files->[$i]->{fh}, LOCK_EX;
truncate $s->files->[$i]->{fh}, $s->files->[$i]->{length}
if -s $s->files->[$i]->{fh}
!= $s->files->[$i]->{length}; # XXX - pre-allocate files
weaken $s unless isweak $s;
my $x = $i;
$s->files->[$x]->{timeout}
= AE::timer(60, 0, sub { $s // return; $s->_open($x, 'c') });
}
elsif ($m eq 'c') { $s->files->[$i]->{timeout} = () }
else {return}
return $s->files->[$i]->{mode} = $m;
}
has piece_cache => (is => 'ro', isa => HashRef, default => sub { {} });
sub _cache_path {
my $s = shift;
File::Spec->catfile($s->basedir,
(scalar @{$s->files} == 1 ? () : $s->name),
'~ABPartFile_-'
. uc(substr(unpack('H*', $s->infohash), 0, 10))
. '.dat'
);
}
sub _write_cache {
my ($s, $f, $o, $d) = @_;
my $path = $s->_cache_path;
AE::log
debug =>
'Attempting to store %d bytes to cache file (%s) [$f=%s, $o=%s]',
length($d), $path, $f, $o;
my @split = File::Spec->splitdir($path);
pop @split; # File name itself
my $dir = File::Spec->catdir(@split);
File::Path::mkpath($dir) if !-d $dir;
sysopen(my ($fh), $path, O_WRONLY | O_CREAT)
|| return;
flock $fh, LOCK_EX;
my $pos = sysseek $fh, 0, SEEK_CUR;
my $w = syswrite $fh, $d;
flock $fh, LOCK_UN;
close $fh;
$s->piece_cache->{$f}{$o} = $pos;
AE::log debug => 'Wrote %d bytes to cache file', $w;
return $w;
}
sub _read_cache {
my ($s, $f, $o, $l) = @_;
$s->piece_cache->{$f} // return;
$s->piece_cache->{$f}{$o} // return;
my $path = $s->_cache_path;
AE::log
debug =>
'Attempting to read %d bytes from cache file (%s) [$f=%s, $o=%s]',
$l, $path, $f, $o;
sysopen(my ($fh), $path, O_RDONLY)
|| return;
flock $fh, LOCK_SH;
sysseek $fh, $s->piece_cache->{$f}{$o}, SEEK_SET;
my $w = sysread $fh, my ($d), $l;
flock $fh, LOCK_UN;
close $fh;
return $d;
}
sub _read {
my ($s, $index, $offset, $length) = @_;
AE::log
debug =>
'Attempting to read %d bytes from piece %d starting at %d bytes',
$length, $index, $offset;
my $data = '';
my $file_index = 0;
my $total_offset = ($index * $s->piece_length) + $offset;
SEARCH:
while ($total_offset > $s->files->[$file_index]->{length}) {
$total_offset -= $s->files->[$file_index]->{length};
$file_index++;
AE::log
trace =>
'Searching for location... $total_offset = %d, $file_index = %d',
$total_offset, $file_index;
last SEARCH # XXX - return?
if not defined $s->files->[$file_index]->{length};
}
READ: while ((defined $length) && ($length > 0)) {
my $this_read
= (
($total_offset + $length) >= $s->files->[$file_index]->{length})
?
($s->files->[$file_index]->{length} - $total_offset)
: $length;
AE::log
trace =>
'Attempting to read %d bytes from file #%d (%s), starting at %d',
$this_read,
$file_index, $s->files->[$file_index]->{path}, $total_offset;
if ( (!-f $s->files->[$file_index]->{path})
|| (!$s->_open($file_index, 'r')))
{ $data .= $s->_read_cache($file_index, $total_offset, $this_read)
// ("\0" x $this_read);
AE::log note => 'Failed to open file. Using null chars instead.';
}
else {
sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
sysread $s->files->[$file_index]->{fh}, my ($_data), $this_read;
$data .= $_data if $_data;
AE::log
trace =>
'Read %d bytes of data from file (%d bytes collected so far)',
length $_data, length $data;
weaken $s unless isweak $s;
my $x = $file_index;
$s->files->[$x]->{timeout}
= AE::timer(500, 0, sub { $s // return; $s->_open($x, 'c') });
}
$file_index++;
$length -= $this_read;
AE::log
trace => 'Still need to read %d bytes',
$length;
last READ if not defined $s->files->[$file_index];
$total_offset = 0;
}
AE::log trace => 'Returning %d bytes of data', length $data;
return $data;
}
sub _write {
my ($s, $index, $offset, $data) = @_;
AE::log
debug =>
'Attempting to write %d bytes from piece %d starting at %d bytes',
length($data), $index, $offset;
my $file_index = 0;
my $total_offset = int(($index * $s->piece_length) + ($offset || 0));
AE::log
debug => '...calculated offset == %d',
$total_offset;
SEARCH:
while ($total_offset > $s->files->[$file_index]->{length}) {
$total_offset -= $s->files->[$file_index]->{length};
$file_index++;
AE::log
trace =>
'Searching for location... $total_offset = %d, $file_index = %d',
$total_offset, $file_index;
last SEARCH # XXX - return?
if not defined $s->files->[$file_index]->{length};
}
WRITE: while ((defined $data) && (length $data > 0)) {
my $this_write
= (($total_offset + length $data)
>= $s->files->[$file_index]->{length})
?
($s->files->[$file_index]->{length} - $total_offset)
: length $data;
AE::log
trace =>
'Attempting to write %d bytes from file #%d (%s), starting at %d',
$this_write,
$file_index, $s->files->[$file_index]->{path}, $total_offset;
if ($s->files->[$file_index]->{priority} == 0) {
$s->_write_cache($file_index, $total_offset, substr $data, 0,
$this_write, '');
AE::log trace => 'Wrote data to cache...';
}
else {
$s->_open($file_index, 'w');
sysseek $s->files->[$file_index]->{fh}, $total_offset, SEEK_SET;
my $w = syswrite $s->files->[$file_index]->{fh}, substr $data, 0,
$this_write, '';
AE::log
trace => 'Wrote %d bytes of data to file (%d bytes left)',
$w, length $data;
weaken $s unless isweak $s;
my $x = $file_index;
$s->files->[$x]->{timeout}
= AE::timer(120, 0, sub { $s // return; $s->_open($x, 'c') });
}
$file_index++;
last WRITE if not defined $s->files->[$file_index];
$total_offset = 0;
}
return length $data;
}
sub hashcheck (;@) {
my $s = shift;
my @indexes = @_ ? @_ : (0 .. $s->piece_count);
AE::log trace => sub {
require Data::Dump;
'Hashcheck of : ' . Data::Dump::dump(\@indexes);
};
$s->bitfield; # Makes sure it's built
my $total_size = $s->size;
for my $index (@indexes) {
next if $index < 0 || $index > $s->piece_count;
my $piece = $s->_read($index,
0,
$index == $s->piece_count
?
$total_size % $s->piece_length
: $s->piece_length
);
my $expected = substr($s->pieces, $index * 20, 20);
my $reality = sha1($piece);
my $ok = defined($piece)
&& ($expected eq $reality);
vec($s->{bitfield}, $index, 1) = $ok;
AE::log trace => sub {
"Validate piece #%06d %s, Expected: %s\n"
. " Reality: %s",
$index, ($ok ? 'PASS' : 'FAIL'), unpack('H*', $expected),
unpack('H*', $reality);
};
$ok ?
$s->_trigger_hash_pass($index)
: $s->_trigger_hash_fail($index);
}
}
has peers => (is => 'ro',
lazy => 1,
isa => HashRef,
clearer => '_clear_peers',
builder => '_build_peers'
);
sub _build_peers { {} }
sub _add_peer {
my ($s, $h) = @_;
$s->{peers}{+$h} = {
handle => $h,
peerid => '',
bitfield => (pack 'b*', "\0" x $s->piece_count),
remote_choked => 1,
remote_interested => 0,
remote_requests => [],
local_choked => 1,
local_interested => 0,
local_requests => [],
timeout => AE::timer(20, 0, sub { $s->_del_peer($h) }),
keepalive => AE::timer(
30, 120,
sub {
$s->_send_encrypted($h, build_keepalive());
}
),
# BEP06
local_allowed => [],
remote_allowed => [],
local_suggest => [],
remote_suggest => [],
# No encryption :(
encryption => '?'
};
}
sub _del_peer {
my ($s, $h) = @_;
$s->peers->{$h} // return;
for my $req (@{$s->peers->{$h}{local_requests}}) {
my ($i, $o, $l) = @$req;
$s->working_pieces->{$i}{$o}[3] = ();
}
delete $s->peers->{$h};
$h->destroy;
}
my $shuffle;
has trackers => (is => 'ro',
lazy => 1,
builder => '_build_trackers',
isa => ArrayRef [HashRef],
init_arg => undef
);
sub _build_trackers {
my $s = shift;
$shuffle //= sub {
my $deck = shift; # $deck is a reference to an array
return unless @$deck; # must not be empty!
my $i = @$deck;
while (--$i) {
my $j = int rand($i + 1);
@$deck[$i, $j] = @$deck[$j, $i];
}
};
my $trackers = [
map {
{urls => $_,
complete => 0,
incomplete => 0,
peers => '',
peers6 => '',
announcer => undef,
ticker => AE::timer(
1,
15 * 60,
sub {
return if $s->state eq 'stopped';
$s->announce('started');
}
),
failures => 0
}
} defined $s->metadata->{announce} ? [$s->metadata->{announce}]
: (),
defined $s->metadata->{'announce-list'}
? @{$s->metadata->{'announce-list'}}
: ()
];
AE::log trace => sub {
require Data::Dump;
'$trackers before shuffle => ' . Data::Dump::dump($trackers);
};
$shuffle->($trackers);
$shuffle->($_->{urls}) for @$trackers;
AE::log trace => sub {
require Data::Dump;
'$trackers after shuffle => ' . Data::Dump::dump($trackers);
};
$trackers;
}
sub announce {
my ($s, $e) = @_;
return if $a++ > 10; # Retry attempts
for my $tier (@{$s->trackers}) {
$tier->{announcer} //= $s->_announce_tier($e, $tier);
}
}
sub _announce_tier {
my ($s, $e, $tier) = @_;
my @urls = grep {m[^https?://]} @{$tier->{urls}};
return if $tier->{failures} > 5;
return if $#{$tier->{urls}} < 0; # Empty tier?
return if $tier->{urls}[0] !~ m[^https?://.+];
local $AnyEvent::HTTP::USERAGENT
= 'AnyEvent::BitTorrent/' . $AnyEvent::BitTorrent::VERSION;
my $_url = $tier->{urls}[0] . '?info_hash=' . sub {
local $_ = shift;
s/([^A-Za-z0-9])/sprintf("%%%2.2X", ord($1))/ge;
$_;
}
->($s->infohash)
. ('&peer_id=' . $s->peerid)
. ('&uploaded=' . $s->uploaded)
. ('&downloaded=' . $s->downloaded)
. ('&left=' . $s->_left)
. ('&port=' . $s->port)
. '&compact=1'
. ($e ? '&event=' . $e : '');
AE::log debug => 'Announce URL: ' . $_url;
http_get $_url, sub {
my ($body, $hdr) = @_;
AE::log trace => sub {
require Data::Dump;
'Announce response: ' . Data::Dump::dump($body, $hdr);
};
$tier->{announcer} = ();
if ($hdr->{Status} =~ /^2/) {
my $reply = bdecode($body);
if (defined $reply->{'failure reason'}) { # XXX - Callback?
push @{$tier->{urls}}, shift @{$tier->{urls}};
$s->_announce_tier($e, $tier);
$tier->{'failure reason'} = $reply->{'failure reason'};
$tier->{failures}++;
}
else { # XXX - Callback?
$tier->{failures} = $tier->{'failure reason'} = 0;
$tier->{peers}
= compact_ipv4(
uncompact_ipv4($tier->{peers} . $reply->{peers}))
if $reply->{peers};
$tier->{peers6}
= compact_ipv6(
uncompact_ipv6($tier->{peers6} . $reply->{peers6}))
if $reply->{peers6};
$tier->{complete} = $reply->{complete};
$tier->{incomplete} = $reply->{incomplete};
$tier->{ticker} = AE::timer(
$reply->{interval} // (15 * 60),
$reply->{interval} // (15 * 60),
sub {
return if $s->state eq 'stopped';
$s->_announce_tier($e, $tier);
}
);
}
}
else { # XXX - Callback?
$tier->{'failure reason'}
= "HTTP Error: $hdr->{Status} $hdr->{Reason}\n";
$tier->{failures}++;
push @{$tier->{urls}}, shift @{$tier->{urls}};
$s->_announce_tier($e, $tier);
}
}
}
has _choke_timer => (
is => 'bare',
isa => Ref,
init_arg => undef,
required => 1,
default => sub {
my $s = shift;
AE::timer(
15, 45,
sub {
return if $s->state ne 'active';
AE::log trace => 'Choke timer...';
my @interested
= grep { $_->{remote_interested} && $_->{remote_choked} }
values %{$s->peers};
# XXX - Limit the number of upload slots
for my $p (@interested) {
$p->{remote_choked} = 0;
$s->_send_encrypted($p->{handle}, build_unchoke());
AE::log trace => 'Choked %s', $p->{peerid};
}
# XXX - Send choke to random peer
}
);
}
);
has _fill_requests_timer => (
is => 'bare',
isa => Ref,
init_arg => undef,
required => 1,
default => sub {
my $s = shift;
AE::timer(
15, 10,
sub { # XXX - Limit by time/bandwidth
return if $s->state ne 'active';
AE::log trace => 'Request fill timer...';
my @waiting
= grep { defined && scalar @{$_->{remote_requests}} }
values %{$s->peers};
return if !@waiting;
my $total_sent = 0;
while (@waiting && $total_sent < 2**20) {
my $p = splice(@waiting, rand @waiting, 1, ());
AE::log trace => 'Chosen peer: %s...', $p->{peerid};
while ($total_sent < 2**20 && @{$p->{remote_requests}}) {
my $req = shift @{$p->{remote_requests}};
AE::log
trace =>
'Filling request i:%d, o:%d, l:%d for %s',
@$req;
# XXX - If piece is bad locally
# if remote supports fast ext
# send reject
# else
# simply return
# else...
$s->_send_encrypted(
$p->{handle},
build_piece(
$req->[0], $req->[1],
$s->_read($req->[0], $req->[1], $req->[2])
)
);
$total_sent += $req->[2];
}
}
$s->_set_uploaded($s->uploaded + $total_sent);
}
);
}
);
has _peer_timer => (is => 'ro',
lazy => 1,
isa => Ref,
init_arg => undef,
clearer => '_clear_peer_timer',
builder => '_build_peer_timer'
);
sub _build_peer_timer {
my $s = shift;
AE::timer(
1, 15,
sub {
return if !$s->_left;
AE::log trace => 'Attempting to connect to new peer...';
# XXX - Initiate connections when we are in Super seed mode?
my @cache = map {
$_->{peers} ? uncompact_ipv4($_->{peers}) : (),
$_->{peers6} ?
uncompact_ipv6($_->{peers6})
: ()
} @{$s->trackers};
return if !@cache;
for my $i (1 .. @cache) {
last if $i > 10; # XXX - Max half open
last
if scalar(keys %{$s->peers}) > 100; # XXX - Max peers
my $addr = splice @cache, rand $#cache, 1;
$s->_new_peer($addr);
}
}
);
}
sub _new_peer {
my ($s, $addr) = @_;
AE::log trace => 'Connecting to %s:%d', @$addr;
my $handle;
$handle = AnyEvent::Handle->new(
connect => $addr,
on_prepare => sub {60},
on_error => sub {
my ($hdl, $fatal, $msg) = @_;
# XXX - callback
AE::log
error => 'Socket error: %s (Removing peer)',
$msg;
$s->_del_peer($hdl);
},
on_connect_error => sub {
my ($hdl, $fatal, $msg) = @_;
$s->_del_peer($hdl);
# XXX - callback
AE::log
error => sprintf "%sfatal error (%s)\n",
$fatal ? '' : 'non-',
$msg // 'Connection timed out';
return if !$fatal;
},
on_connect => sub {
my ($h, $host, $port, $retry) = @_;
AE::log
trace => 'Connection established with %s:%d',
$host, $port;
$s->_add_peer($handle);
$s->_send_handshake($handle);
},
on_eof => sub {
my $h = shift;
AE::log trace => 'EOF from peer';
$s->_del_peer($h);
},
on_read => sub {
$s->_on_read(@_);
}
);
return $handle;
}
sub _on_read_incoming {
my ($s, $h) = @_;
$h->rbuf // return;
# XXX - Handle things if the stream is encrypted
my $packet = parse_packet(\$h->rbuf);
return if !$packet;
AE::log trace => sub {
require Data::Dump;
'Incoming packet: ' . Data::Dump::dump($packet);
};
if (defined $packet->{error}) {
return $s->_del_peer($h);
}
elsif ($packet->{type} == $HANDSHAKE) {
ref $packet->{payload} // return;
return if ref $packet->{payload} ne 'ARRAY';
$s->peers->{$h}{reserved} = $packet->{payload}[0];
return $s->_del_peer($h)
if $packet->{payload}[1] ne $s->infohash;
$s->peers->{$h}{peerid} = $packet->{payload}[2];
$s->_send_handshake($h);
$s->_send_bitfield($h);
$s->peers->{$h}{timeout}
= AE::timer(60, 0, sub { $s->_del_peer($h) });
$s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
$h->on_read(sub { $s->_on_read(@_) });
}
else { # This should never happen
}
1;
}
sub _on_read {
my ($s, $h) = @_;
while (my $packet = parse_packet(\$h->rbuf)) {
last if !$packet;
AE::log debug => sub {
require Data::Dump;
'Incoming packet: ' . Data::Dump::dump($packet->{error});
};
if (defined $packet->{error}) {
$s->_del_peer($h);
return;
}
elsif ($packet->{type} eq $KEEPALIVE) {
# Do nothing!
}
elsif ($packet->{type} == $HANDSHAKE) {
ref $packet->{payload} // return;
$s->peers->{$h}{reserved} = $packet->{payload}[0];
return $s->_del_peer($h)
if $packet->{payload}[1] ne $s->infohash;
$s->peers->{$h}{peerid} = $packet->{payload}[2];
$s->_send_bitfield($h);
$s->peers->{$h}{timeout}
= AE::timer(60, 0, sub { $s->_del_peer($h) });
$s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
}
elsif ($packet->{type} == $INTERESTED) {
$s->peers->{$h}{remote_interested} = 1;
}
elsif ($packet->{type} == $NOT_INTERESTED) {
$s->peers->{$h}{remote_interested} = 0;
# XXX - Clear any requests in queue
# XXX - Send choke just to be sure
}
elsif ($packet->{type} == $CHOKE) {
$s->peers->{$h}{local_choked} = 1;
if (!(vec($s->peers->{$h}{reserved}, 7, 1) & 0x04)) {
for my $req (@{$s->peers->{$h}{local_requests}}) {
$s->working_pieces->{$req->[0]}{$req->[1]}[3] = ()
unless
defined $s->working_pieces->{$req->[0]}{$req->[1]}[4];
}
}
$s->_consider_peer($s->peers->{$h});
}
elsif ($packet->{type} == $UNCHOKE) {
$s->peers->{$h}{local_choked} = 0;
$s->peers->{$h}{timeout}
= AE::timer(120, 0, sub { $s->_del_peer($h) });
$s->_request_pieces($s->peers->{$h});
}
elsif ($packet->{type} == $HAVE) {
vec($s->peers->{$h}{bitfield}, $packet->{payload}, 1) = 1;
$s->_consider_peer($s->peers->{$h});
$s->peers->{$h}{timeout}
= AE::timer(60, 0, sub { $s->_del_peer($h) });
}
elsif ($packet->{type} == $BITFIELD) {
$s->peers->{$h}{bitfield} = $packet->{payload};
$s->_consider_peer($s->peers->{$h});
}
elsif ($packet->{type} == $REQUEST) {
$s->peers->{$h}{timeout}
= AE::timer(120, 0, sub { $s->_del_peer($h) });
# XXX - Make sure (index + offset + length) < $s->size
# if not, send reject if remote supports fast ext
# either way, ignore the request
push @{$s->peers->{$h}{remote_requests}}, $packet->{payload};
}
elsif ($packet->{type} == $PIECE) {
$s->peers->{$h}{timeout}
= AE::timer(120, 0, sub { $s->_del_peer($h) });
my ($index, $offset, $data) = @{$packet->{payload}};
# Make sure $index is a working piece
$s->working_pieces->{$index} // return;
# Make sure we req from this peer
return
if !grep {
$_->[0] == $index
&& $_->[1] == $offset
&& $_->[2] == length $data
} @{$s->peers->{$h}{local_requests}};
$s->peers->{$h}{local_requests} = [
grep {
($_->[0] != $index)
|| ($_->[1] != $offset)
|| ($_->[2] != length($data))
} @{$s->peers->{$h}{local_requests}}
];
$s->working_pieces->{$index}{$offset}[4] = $data;
$s->working_pieces->{$index}{$offset}[5] = ();
$s->_set_downloaded($s->downloaded + length $data);
if (0 == scalar grep { !defined $_->[4] }
values %{$s->working_pieces->{$index}})
{ my $piece = join '',
map { $s->working_pieces->{$index}{$_}[4] }
sort { $a <=> $b }
keys %{$s->working_pieces->{$index}};
if ((substr($s->pieces, $index * 20, 20) eq sha1($piece))) {
for my $attempt (1 .. 5) { # XXX = 5 == failure callback
last unless $s->_write($index, 0, $piece);
}
vec($s->{bitfield}, $index, 1) = 1;
$s->_broadcast(
build_have($index),
sub {
!!!index substr(unpack('b*', $_->{bitfield}),
0, $s->piece_count + 1),
0, 0;
}
);
$s->announce('complete')
if !scalar grep {$_} split '',
substr unpack('b*', ~$s->bitfield), 0,
$s->piece_count + 1;
$s->_consider_peer($_)
for grep { $_->{local_interested} }
values %{$s->peers};
$s->_trigger_hash_pass($index);
}
else {
$s->_trigger_hash_fail($index);
# XXX - Not sure what to do... I'd
# ban the peers involved and
# try the same piece again.
}
delete $s->working_pieces->{$index};
}
$s->_request_pieces($s->peers->{$h});
}
elsif ($packet->{type} == $CANCEL) {
my ($index, $offset, $length) = @{$packet->{payload}};
return # XXX - error callback if this block is not in the queue
if !grep {
$_->[0] == $index
&& $_->[1] == $offset
&& $_->[2] == $length
} @{$s->peers->{$h}{remote_requests}};
$s->peers->{$h}{remote_requests} = [
grep {
($_->[0] != $index)
|| ($_->[1] != $offset)
|| ($_->[2] != $length)
} @{$s->peers->{$h}{remote_requests}}
];
}
elsif ($packet->{type} == $SUGGEST) {
push @{$s->peers->{$h}{local_suggest}}, $packet->{payload};
}
elsif ($packet->{type} == $HAVE_ALL) {
$s->peers->{$h}{bitfield} = pack 'b*', (1 x $s->piece_count);
$s->_consider_peer($s->peers->{$h});
$s->peers->{$h}{timeout}
= AE::timer(120, 0, sub { $s->_del_peer($h) });
}
elsif ($packet->{type} == $HAVE_NONE) {
$s->peers->{$h}{bitfield} = pack 'b*', (0 x $s->piece_count);
$s->peers->{$h}{timeout}
= AE::timer(30, 0, sub { $s->_del_peer($h) });
}
elsif ($packet->{type} == $REJECT) {
my ($index, $offset, $length) = @{$packet->{payload}};
return # XXX - error callback if this block is not in the queue
if !grep {
$_->[0] == $index
&& $_->[1] == $offset
&& $_->[2] == $length
} @{$s->peers->{$h}{local_requests}};
$s->working_pieces->{$index}{$offset}->[3] = ();
$s->peers->{$h}{local_requests} = [
grep {
($_->[0] != $index)
|| ($_->[1] != $offset)
|| ($_->[2] != $length)
} @{$s->peers->{$h}{local_requests}}
];
$s->peers->{$h}{timeout}
= AE::timer(30, 0, sub { $s->_del_peer($h) });
}
elsif ($packet->{type} == $ALLOWED_FAST) {
push @{$s->peers->{$h}{local_allowed}}, $packet->{payload};
}
else {
# use Data::Dump qw[pp];
# die 'Unhandled packet: ' . pp $packet;
}
last
if 5 > length($h->rbuf // ''); # Min size for protocol
}
}
sub _send_bitfield {
my ($s, $h) = @_;
if (vec($s->peers->{$h}{reserved}, 7, 1) & 0x04) {
if ($s->seed) {
return $s->_send_encrypted($h, build_haveall());
}
elsif ($s->bitfield() !~ m[[^\0]]) {
return $s->_send_encrypted($h, build_havenone());
}
}
# XXX - If it's cheaper to send HAVE packets than a full BITFIELD, do it
$s->_send_encrypted($h, build_bitfield($s->bitfield));
}
sub _broadcast {
my ($s, $data, $qualifier) = @_;
$qualifier //= sub {1};
$s->_send_encrypted($_->{handle}, $data)
for grep { $qualifier->() } values %{$s->peers};
}
sub _consider_peer { # Figure out whether or not we find a peer interesting
my ($s, $p) = @_;
return if $s->state ne 'active';
return if $s->complete;
my $relevence = $p->{bitfield} & $s->wanted;
my $interesting
= (
index(substr(unpack('b*', $relevence), 0, $s->piece_count + 1), 1, 0)
!= -1) ? 1 : 0;
if ($interesting) {
if (!$p->{local_interested}) {
$p->{local_interested} = 1;
$s->_send_encrypted($p->{handle}, build_interested());
}
}
else {
if ($p->{local_interested}) {
$p->{local_interested} = 0;
$s->_send_encrypted($p->{handle}, build_not_interested());
}
}
}
has working_pieces => (is => 'ro',
lazy => 1,
isa => HashRef,
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
sub _request_pieces {
my ($s, $p) = @_;
return if $s->state ne 'active';
weaken $p unless isweak $p;
$p // return;
$p->{handle} // return;
my @indexes;
if (scalar keys %{$s->working_pieces} < 10) { # XXX - Max working pieces
for my $findex (0 .. $#{$s->files}) {
for my $index ($s->_file_to_range($findex)) {
next
if !(vec($p->{bitfield}, $index, 1)
&& !vec($s->bitfield, $index, 1));
push @indexes,
map {$index} 1 .. $s->{files}[$findex]{priority};
}
}
}
else {
@indexes = keys %{$s->working_pieces};
}
return if !@indexes;
my $index = $indexes[rand @indexes]; # XXX - Weighted random/Rarest first
my $piece_size
= $index == $s->piece_count ?
$s->size % $s->piece_length
: $s->piece_length;
my $block_count = $piece_size / $block_size;
my @offsets = map { $_ * $block_size }
0 .. $block_count - ((int($block_count) == $block_count) ? 1 : 0);
$s->working_pieces->{$index} //= {map { $_ => {} } @offsets};
my @unrequested = sort { $a <=> $b }
grep { # XXX - If there are no unrequested blocks, pick a new index
(!ref $s->working_pieces->{$index}{$_})
|| ( (!defined $s->working_pieces->{$index}{$_}[4])
&& (!defined $s->working_pieces->{$index}{$_}[3]))
} @offsets;
my @unfilled_local_requests
= grep { !defined $_->[4] } @{$p->{local_requests}};
for (scalar @unfilled_local_requests .. 12) {
my $offset = shift @unrequested;
$offset // return; # XXX - Start working on another piece
my $_block_size
= ($offset == $offsets[-1]) ?
($piece_size % $block_size) || $block_size
: $block_size;
next if !$_block_size;
# XXX - Limit to x req per peer (future: based on bandwidth)
AE::log
trace => 'Requesting %d, %d, %d',
$index, $offset, $_block_size;
$s->_send_encrypted($p->{handle},
build_request($index, $offset, $_block_size))
; # XXX - len for last piece
$s->working_pieces->{$index}{$offset} = [
$index, $offset,
$_block_size,
$p, undef,
AE::timer(
60, 0,
sub {
$p // return;
$p->{handle} // return;
$s->_send_encrypted($p->{handle},
build_cancel($index, $offset, $_block_size));
$s->working_pieces->{$index}{$offset}[3] = ();
$p->{local_requests} = [
grep {
$_->[0] != $index
|| $_->[1] != $offset
|| $_->[2] != $_block_size
} @{$p->{local_requests}}
];
$p->{timeout} = AE::timer(45, 0,
sub { $s->_del_peer($p->{handle}) });
#$s->_request_pieces( $p) # XXX - Ask a different peer
}
)
];
weaken($s->working_pieces->{$index}{$offset}[3])
unless isweak($s->working_pieces->{$index}{$offset}[3]);
push @{$p->{local_requests}}, [$index, $offset, $_block_size];
}
}
# Cheap callback system
has on_hash_pass => (
is => 'rw',
isa => CodeRef,
default => sub {
sub { !!1 }
},
clearer => '_no_hash_pass'
);
sub _trigger_hash_pass { shift->on_hash_pass()->(@_) }
has on_hash_fail => (
is => 'rw',
isa => CodeRef,
default => sub {
sub { !!1 }
},
clearer => '_no_hash_fail'
);
sub _trigger_hash_fail { shift->on_hash_fail()->(@_) }
#
has state => (is => 'ro',
isa => Enum [qw[active stopped paused]],
writer => '_set_state',
default => sub {'active'}
);
sub stop {
my $s = shift;
AE::log debug => 'Stopping...';
return if $s->state eq 'stopped';
AE::log trace => 'Announcing "stopped" event to trackers...';
$s->announce('stopped');
AE::log trace => 'Disconnecting peers...';
$s->_clear_peers;
AE::log trace => 'Stopping new peers ticker...';
$s->_clear_peer_timer;
AE::log trace => 'Closing files...';
$s->_open($_, 'c') for 0 .. $#{$s->files};
AE::log trace => 'Setting internal status...';
$s->_set_state('stopped');
}
sub start {
my $s = shift;
AE::log debug => 'Starting...';
$s->announce('started') unless $s->state eq 'active';
$s->peers;
AE::log trace => 'Starting new peers ticker...';
$s->_peer_timer;
AE::log trace => 'Setting internal status...';
$s->_set_state('active');
}
sub pause {
my $s = shift;
AE::log debug => 'Pausing...';
$s->peers;
AE::log trace => 'Starting new peers ticker...';
$s->_peer_timer;
AE::log trace => 'Setting internal status...';
$s->_set_state('paused');
}
#
sub BUILD {
my ($s, $a) = @_;
AE::log debug => 'BUILD()';
$s->start && AE::log debug => 'Calling ->start()'
if $s->state eq 'active';
$s->paused && AE::log debug => 'Calling ->paused() '
if $s->state eq 'paused';
}
# Testing stuff goes here
sub _send_encrypted {
my ($s, $h, $packet) = @_;
return if !$h; # XXX - $s->_del_peer($p->{handle})
AE::log trace => sub {
require Data::Dump;
'Outgoing packet: ' . Data::Dump::dump($packet);
};
return $h->push_write($packet);
}
sub _send_handshake {
my ($s, $h) = @_;
AE::log trace => 'Outgoing handshake';
$h->push_write(build_handshake($s->reserved, $s->infohash, $s->peerid));
}
1337;
=pod
=head1 NAME
AnyEvent::BitTorrent - Yet Another BitTorrent Client Module
=head1 Synopsis
use AnyEvent::BitTorrent;
my $client = AnyEvent::BitTorrent->new( path => 'some.torrent' );
AE::cv->recv;
=head1 Description
This is a painfully simple BitTorrent client written on a whim that implements
the absolute basics. For a full list of what's currently supported, what you
will likely find in a future version, and what you'll never get from this, see
the section entitled "L<This Module is Lame!|/"This Module is Lame!">"
=head1 Methods
The API, much like the module itself, is simple.
Anything you find by skimming the source is likely not ready for public use
and will be subject to change before C<v1.0.0>. Here's the public interface as
of this version:
=head2 C<new( ... )>
( run in 1.326 second using v1.01-cache-2.11-cpan-754626df90b )