AnyEvent-BitTorrent
view release on metacpan or search on metacpan
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
# XXX - callback
AE::log error => 'Socket error: ' . $msg;
$s->_del_peer($hdl);
},
on_eof => sub {
my $h = shift;
AE::log info => 'Socket EOF';
$s->_del_peer($h);
},
on_read => sub {
AE::log debug => 'Read Socket';
$s->_on_read_incoming(@_);
}
);
$s->_add_peer($handle);
}, sub {
my ($fh, $thishost, $thisport) = @_;
$s->_set_port($thisport);
AE::log info => "bound to $thishost, port $thisport";
};
}
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
has reserved => (is => 'ro',
builder => '_build_reserved',
isa => $RESERVED
);
sub _build_reserved {
my $reserved = "\0" x 8;
#vec($reserved, 5, 8) = 0x10; # Ext Protocol
vec($reserved, 7, 8) = 0x04; # Fast Ext
AE::log debug => '_build_reserved() => ' . $reserved;
$reserved;
}
has peerid => (is => 'ro',
isa => $PEERID,
init_arg => undef,
required => 1,
builder => '_build_peerid'
);
sub _build_peerid {
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
sub wanted {
my $s = shift;
my $wanted = '';
for my $findex (0 .. $#{$s->files}) {
my $prio = !!$s->files->[$findex]{priority};
for my $index ($s->_file_to_range($findex)) {
vec($wanted, $index, 1) = $prio && !vec($s->bitfield, $index, 1);
}
}
AE::log debug => '->wanted() => ' . unpack 'b*', $wanted;
$wanted;
}
sub complete {
my $s = shift;
-1 == index substr(unpack('b*', $s->wanted), 0, $s->piece_count + 1), 1;
}
sub seed {
my $s = shift;
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
init_arg => undef
);
sub _build_metadata {
my $s = shift;
#return if ref $s ne __PACKAGE__; # Applying roles makes deep rec
open my $fh, '<', $s->path;
sysread $fh, my $raw, -s $fh;
my $metadata = bdecode $raw;
AE::log debug => sub {
require Data::Dump;
'_build_metadata() => ' . Data::Dump::dump($metadata);
};
$metadata;
}
sub name { shift->metadata->{info}{name} }
sub pieces { shift->metadata->{info}{pieces} }
sub piece_length { shift->metadata->{info}{'piece length'} }
sub piece_count {
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
lazy => 1,
builder => '_build_size',
isa => Int,
init_arg => undef
);
sub _build_size {
my $s = shift;
my $ret = 0;
$ret += $_->{length} for @{$s->files};
AE::log debug => '_build_size() => ' . $ret;
$ret;
}
sub _open {
my ($s, $i, $m) = @_;
AE::log
trace => 'Opening file #%d (%s) for %s',
$i, $s->files->[$i]->{path}, $m;
return 1 if $s->files->[$i]->{mode} eq $m;
if (defined $s->files->[$i]->{fh}) {
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
'~ABPartFile_-'
. uc(substr(unpack('H*', $s->infohash), 0, 10))
. '.dat'
);
}
sub _write_cache {
my ($s, $f, $o, $d) = @_;
my $path = $s->_cache_path;
AE::log
debug =>
'Attempting to store %d bytes to cache file (%s) [$f=%s, $o=%s]',
length($d), $path, $f, $o;
my @split = File::Spec->splitdir($path);
pop @split; # File name itself
my $dir = File::Spec->catdir(@split);
File::Path::mkpath($dir) if !-d $dir;
sysopen(my ($fh), $path, O_WRONLY | O_CREAT)
|| return;
flock $fh, LOCK_EX;
my $pos = sysseek $fh, 0, SEEK_CUR;
my $w = syswrite $fh, $d;
flock $fh, LOCK_UN;
close $fh;
$s->piece_cache->{$f}{$o} = $pos;
AE::log debug => 'Wrote %d bytes to cache file', $w;
return $w;
}
sub _read_cache {
my ($s, $f, $o, $l) = @_;
$s->piece_cache->{$f} // return;
$s->piece_cache->{$f}{$o} // return;
my $path = $s->_cache_path;
AE::log
debug =>
'Attempting to read %d bytes from cache file (%s) [$f=%s, $o=%s]',
$l, $path, $f, $o;
sysopen(my ($fh), $path, O_RDONLY)
|| return;
flock $fh, LOCK_SH;
sysseek $fh, $s->piece_cache->{$f}{$o}, SEEK_SET;
my $w = sysread $fh, my ($d), $l;
flock $fh, LOCK_UN;
close $fh;
return $d;
}
sub _read {
my ($s, $index, $offset, $length) = @_;
AE::log
debug =>
'Attempting to read %d bytes from piece %d starting at %d bytes',
$length, $index, $offset;
my $data = '';
my $file_index = 0;
my $total_offset = ($index * $s->piece_length) + $offset;
SEARCH:
while ($total_offset > $s->files->[$file_index]->{length}) {
$total_offset -= $s->files->[$file_index]->{length};
$file_index++;
AE::log
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
last READ if not defined $s->files->[$file_index];
$total_offset = 0;
}
AE::log trace => 'Returning %d bytes of data', length $data;
return $data;
}
sub _write {
my ($s, $index, $offset, $data) = @_;
AE::log
debug =>
'Attempting to write %d bytes from piece %d starting at %d bytes',
length($data), $index, $offset;
my $file_index = 0;
my $total_offset = int(($index * $s->piece_length) + ($offset || 0));
AE::log
debug => '...calculated offset == %d',
$total_offset;
SEARCH:
while ($total_offset > $s->files->[$file_index]->{length}) {
$total_offset -= $s->files->[$file_index]->{length};
$file_index++;
AE::log
trace =>
'Searching for location... $total_offset = %d, $file_index = %d',
$total_offset, $file_index;
last SEARCH # XXX - return?
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
$_;
}
->($s->infohash)
. ('&peer_id=' . $s->peerid)
. ('&uploaded=' . $s->uploaded)
. ('&downloaded=' . $s->downloaded)
. ('&left=' . $s->_left)
. ('&port=' . $s->port)
. '&compact=1'
. ($e ? '&event=' . $e : '');
AE::log debug => 'Announce URL: ' . $_url;
http_get $_url, sub {
my ($body, $hdr) = @_;
AE::log trace => sub {
require Data::Dump;
'Announce response: ' . Data::Dump::dump($body, $hdr);
};
$tier->{announcer} = ();
if ($hdr->{Status} =~ /^2/) {
my $reply = bdecode($body);
if (defined $reply->{'failure reason'}) { # XXX - Callback?
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
}
else { # This should never happen
}
1;
}
sub _on_read {
my ($s, $h) = @_;
while (my $packet = parse_packet(\$h->rbuf)) {
last if !$packet;
AE::log debug => sub {
require Data::Dump;
'Incoming packet: ' . Data::Dump::dump($packet->{error});
};
if (defined $packet->{error}) {
$s->_del_peer($h);
return;
}
elsif ($packet->{type} eq $KEEPALIVE) {
# Do nothing!
lib/AnyEvent/BitTorrent.pm view on Meta::CPAN
sub _trigger_hash_fail { shift->on_hash_fail()->(@_) }
#
has state => (is => 'ro',
isa => Enum [qw[active stopped paused]],
writer => '_set_state',
default => sub {'active'}
);
sub stop {
my $s = shift;
AE::log debug => 'Stopping...';
return if $s->state eq 'stopped';
AE::log trace => 'Announcing "stopped" event to trackers...';
$s->announce('stopped');
AE::log trace => 'Disconnecting peers...';
$s->_clear_peers;
AE::log trace => 'Stopping new peers ticker...';
$s->_clear_peer_timer;
AE::log trace => 'Closing files...';
$s->_open($_, 'c') for 0 .. $#{$s->files};
AE::log trace => 'Setting internal status...';
$s->_set_state('stopped');
}
sub start {
my $s = shift;
AE::log debug => 'Starting...';
$s->announce('started') unless $s->state eq 'active';
$s->peers;
AE::log trace => 'Starting new peers ticker...';
$s->_peer_timer;
AE::log trace => 'Setting internal status...';
$s->_set_state('active');
}
sub pause {
my $s = shift;
AE::log debug => 'Pausing...';
$s->peers;
AE::log trace => 'Starting new peers ticker...';
$s->_peer_timer;
AE::log trace => 'Setting internal status...';
$s->_set_state('paused');
}
#
sub BUILD {
my ($s, $a) = @_;
AE::log debug => 'BUILD()';
$s->start && AE::log debug => 'Calling ->start()'
if $s->state eq 'active';
$s->paused && AE::log debug => 'Calling ->paused() '
if $s->state eq 'paused';
}
# Testing stuff goes here
sub _send_encrypted {
my ($s, $h, $packet) = @_;
return if !$h; # XXX - $s->_del_peer($p->{handle})
AE::log trace => sub {
require Data::Dump;
'Outgoing packet: ' . Data::Dump::dump($packet);
( run in 0.519 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )