AnyEvent-SSH2

 view release on metacpan or  search on metacpan

lib/AnyEvent/SSH2.pm  view on Meta::CPAN

        if ($comp && $comp->enabled) {
            my $compressed = $comp->compress($buffer->bytes);
            $buffer->empty;
            $buffer->append($compressed);
        }

        my $len = $buffer->length + 4 + 1;
        my $padlen = $block_size - ($len % $block_size);
        $padlen += $block_size if $padlen < 4;
        my $junk = $ciph ? (join '', map chr rand 255, 1..$padlen) : ("\0" x $padlen);
        $buffer->append($junk);

        my $packet_len = $buffer->length + 1;
        $buffer->bytes(0, 0, pack("N", $packet_len) . pack("c", $padlen));

        my($macbuf);
        if ($mac && $mac->enabled) {
            $macbuf = $mac->hmac(pack("N", $ssh->{session}{seqnr_out}) . $buffer->bytes);
        }
        my $output = Net::SSH::Perl::Buffer->new( MP => 'SSH2' );
        $output->append( $ciph && $ciph->enabled ? $ciph->encrypt($buffer->bytes) : $buffer->bytes );
        $output->append($macbuf) if $mac && $mac->enabled;

        $ssh->{session}{seqnr_out}++;

        my $handle = $ssh->sock;
        my $stat = $handle->push_write($output->bytes);
    };
    *Net::SSH::Perl::Packet::read_expect = sub {
        my $class = shift;
        my($ssh, $type, $cb) = @_;
        my $pack = $class->read($ssh, sub{
            my ($ssh, $pack) = @_;
            if ($pack->type != $type) {
                $ssh->fatal_disconnect(sprintf
                  "Protocol error: expected packet type %d, got %d",
                    $type, $pack->type);
            }
            $cb->($ssh, $pack);
        });
    };

    *Net::SSH::Perl::Packet::read = sub {
        my $class = shift;
        my $ssh = shift;
        my $cb  = shift;
        my $sock = $ssh->sock;
        if (my $packet = $class->read_poll($ssh)) {
            $cb->($ssh, $packet);
        }
        else {
            $sock->push_read(chunk => 4 => sub {
                my ($hdl, $buf) = @_;
                if (length($buf) == 0) {
                    croak "Connection closed by remote host." if !$buf;
                }
                if (!defined $buf) {
                    next if $! == EAGAIN || $! == EWOULDBLOCK;
                    croak "Read from socket failed: $!";
                }
                # Untaint data read from sshd. This is binary data,
                # so there's nothing to taint-check against/for.
                ($buf) = $buf =~ /(.*)/s;
                $ssh->incoming_data->append($buf);
                $class->read($ssh, $cb);
            })
        }
    };
    use Net::SSH::Perl::Kex;
    *Net::SSH::Perl::Kex::exchange_kexinit = sub {
        my $kex = shift;
        my $ssh = $kex->{ssh};
        my $received_packet = shift;
        my $cb = shift;
        my $packet;
    
        $packet = $ssh->packet_start(SSH2_MSG_KEXINIT);
        $packet->put_chars($kex->client_kexinit->bytes);
        $packet->send;
    
        if ( defined $received_packet ) {
    	    $ssh->debug("Received key-exchange init (KEXINIT), sent response.");
    	    $packet = $received_packet;
        }
        else {
    	    $ssh->debug("Sent key-exchange init (KEXINIT), wait response.");
    	    Net::SSH::Perl::Packet->read_expect($ssh, SSH2_MSG_KEXINIT, sub{
                my ($ssh, $packet) = @_;
                $kex->{server_kexinit} = $packet->data;
    
                $packet->get_char for 1..16;
                my @s_props = map $packet->get_str, 1..10;
                $packet->get_int8;
                $packet->get_int32;
                $cb->($ssh, \@s_props);
            });
        }
    };
    *Net::SSH::Perl::Kex::exchange = sub {
        my $kex = shift;
        my $ssh = $kex->{ssh};
        my $packet = shift;
        my $cb     = shift;
    
        my @proposal = @PROPOSAL;
        if (!$ssh->config->get('ciphers')) {
            if (my $c = $ssh->config->get('cipher')) {
                $ssh->config->set('ciphers', $c);
            }
        }
        if (my $cs = $ssh->config->get('ciphers')) {
            # SSH2 cipher names are different; for compatibility, we'll map
            # valid SSH1 ciphers to the SSH2 equivalent names
            if($ssh->protocol eq PROTOCOL_SSH2) {
                my %ssh2_cipher = reverse %Net::SSH::Perl::Cipher::CIPHERS_SSH2;
                $cs = join ',', map $ssh2_cipher{$_} || $_, split(/,/, $cs);
            }
            $proposal[ PROPOSAL_CIPH_ALGS_CTOS ] =
            $proposal[ PROPOSAL_CIPH_ALGS_STOC ] = $cs;
        }
        if ($ssh->config->get('compression')) {
            $proposal[ PROPOSAL_COMP_ALGS_CTOS ] =

lib/AnyEvent/SSH2.pm  view on Meta::CPAN

            $c->{remote_window} > 0 &&
            $c->{input}->length < $c->{remote_window}) {
            $ssh->{watcher}{$id}{rfd} = AE::io $c->{rfd}, 0, sub {
                # 顺序记录 - 频道 - rfd
                my $buf;
                sysread $c->{rfd}, $buf, 8192;
                ($buf) = $buf =~ /(.*)/s;
                $c->send_data($buf);
                $cv->send('rfd');
                delete $ssh->{watcher}{$id}{rfd}
            };
        } 

        # 给内容输出
        if (defined $c->{wfd} &&
            $c->{ostate} == CHAN_OUTPUT_OPEN ||
            $c->{ostate} == CHAN_OUTPUT_WAIT_DRAIN) {
            if ($c->{output} and $c->{output}->length > 0) {
                $ssh->{watcher}{$id}{wfd} = AE::io $c->{wfd}, 1, sub {
                   if (my $r = $c->{handlers}{"_output_buffer"}) {
                       $r->{code}->( $c, $c->{output}, @{ $r->{extra} } );
                   }
                   $c->{local_consumed} += $c->{output}->length;
                   $c->{output}->empty;
                   $cv->send('wfd');
                    delete $ssh->{watcher}{$id}{wfd}
                }
            }
        }
        
        if ($c->{efd} && $c->{extended}->length > 0) {
            my $c->{watcher}{$id}{efd} = AE::io $c->{efd}, 1, sub {
                if (my $r = $c->{handlers}{"_extended_buffer"}) {
                    $r->{code}->( $c, $c->{extended}, @{ $r->{extra} } );
                }
                $c->{local_consumed} += $c->{extended}->length;
                $c->{extended}->empty;
                $cv->send('efd');
                delete $ssh->{watcher}{$id}{efd}
            };
        }

        
        # 原进程
        $c->check_window;
        if ($c->delete_if_full_closed) {
            defined $c->{cb} ? $c->{cb}->() : '';
            $cmgr->remove($c->{id});
        }
    }


    # 这是主连接的句柄
    my $handle = $ssh->{session}{sock};
    $handle->push_read( chunk => 4 => sub {
        my ($handle, $buf) = @_;
        if (!length($buf)) {
            croak "Connection failed: $!\n";
        }
        $ssh->break_client_loop if length($buf) == 0;
        ($buf) = $buf =~ /(.*)/s;  ## Untaint data. Anything allowed.
        $ssh->incoming_data->append($buf);
        $cv->send('main');
    });
}

sub channel_mgr {
    my $ssh = shift;
    unless (defined $ssh->{channel_mgr}) {
        $ssh->{channel_mgr} = Net::SSH::Perl::ChannelMgr->new($ssh);
    }
    $ssh->{channel_mgr};
}
sub _read_version {
    my $ssh = shift;
    my $line = shift;;
    my $len = length $line;
    unless(defined($len)) {
        next if $! == EAGAIN || $! == EWOULDBLOCK;
        croak "Read from socket failed: $!";
    }
    croak "Connection closed by remote host" if $len == 0;
    croak "Version line too long: $line"
     if substr($line, 0, 4) eq "SSH-" and length($line) > 255;
    croak "Pre-version line too long: $line" if length($line) > 4*1024;
    if (substr($line, 0, 4) ne "SSH-") {
        $ssh->debug("Remote version string: $line");
    }
    return $line;
}
sub sock { $_[0]->{session}{sock} }

1;
__END__

=pod
 
=encoding utf8

=head1 NAME

AnyEvent::SSH2 - 基于 AnyEvent 的 SSH2 的非阻塞事件驱动的实现

=head1 SYNOPSIS

对多台主机, 并行的远程执行一些命令.

    use AE;
    use AnyEvent::SSH2;

    my $ssh1 = AnyEvent::SSH2->new(
        'ip',
        user => 'root',
        pass => 'pass',
    );   
    
    my $ssh2 = AnyEvent::SSH2->new(
        'ip'
        user => 'root',
        pass => 'pass',
    );   



( run in 1.612 second using v1.01-cache-2.11-cpan-39bf76dae61 )