AnyEvent-SSH2

 view release on metacpan or  search on metacpan

lib/AnyEvent/SSH2.pm  view on Meta::CPAN

# $Id: SSH2.pm,v 1.47 2009/01/26 01:50:38 turnstep Exp $
package AnyEvent::SSH2;
use strict;
use AE;
use AnyEvent::Handle;
use Net::SSH::Perl::Kex;
use Net::SSH::Perl::ChannelMgr;
use Net::SSH::Perl::Packet;
use Net::SSH::Perl::Buffer;
use Net::SSH::Perl::Constants qw( :protocol :msg2 :compat :hosts :channels :proposal :kex
                                  CHAN_INPUT_CLOSED CHAN_INPUT_WAIT_DRAIN );
use Net::SSH::Perl::Cipher;
use Net::SSH::Perl::AuthMgr;
use Net::SSH::Perl::Comp;
use Net::SSH::Perl::Util qw(:hosts);
use Scalar::Util qw(blessed weaken);
use Carp qw( croak );

use base qw( Net::SSH::Perl );
our $VERSION = '0.04';

use Errno qw( EAGAIN EWOULDBLOCK );
use vars qw( $VERSION $CONFIG $HOSTNAME @PROPOSAL );
use vars qw( @PROPOSAL );
@PROPOSAL = ( 
        KEX_DEFAULT_KEX,
        KEX_DEFAULT_PK_ALG,
        KEX_DEFAULT_ENCRYPT,
        KEX_DEFAULT_ENCRYPT,
        KEX_DEFAULT_MAC,
        KEX_DEFAULT_MAC,
        KEX_DEFAULT_COMP,
        KEX_DEFAULT_COMP,
        KEX_DEFAULT_LANG,
        KEX_DEFAULT_LANG,
        );

$CONFIG = {};

BEGIN {
    use Net::SSH::Perl::Packet;
    no warnings qw(redefine);
    *Net::SSH::Perl::Packet::send_ssh2 = sub  {
        my $pack = shift;
        my $buffer = shift || $pack->{data};
        my $ssh = $pack->{ssh};

        my $kex = $ssh->kex;
        my($ciph, $mac, $comp);
        if ($kex) {
            $ciph = $kex->send_cipher;
            $mac  = $kex->send_mac;
            $comp = $kex->send_comp;
        }
        my $block_size = 8;

        if ($comp && $comp->enabled) {
            my $compressed = $comp->compress($buffer->bytes);
            $buffer->empty;
            $buffer->append($compressed);
        }

        my $len = $buffer->length + 4 + 1;
        my $padlen = $block_size - ($len % $block_size);
        $padlen += $block_size if $padlen < 4;
        my $junk = $ciph ? (join '', map chr rand 255, 1..$padlen) : ("\0" x $padlen);
        $buffer->append($junk);

        my $packet_len = $buffer->length + 1;
        $buffer->bytes(0, 0, pack("N", $packet_len) . pack("c", $padlen));

lib/AnyEvent/SSH2.pm  view on Meta::CPAN

        });
    }
    if (my $r = $h->{stderr}) {
        $channel->register_handler("_extended_buffer",
            $r->{code}, @{ $r->{extra} });
    }
    else {
        $channel->register_handler("_extended_buffer", sub {
            $stderr .= $_[1]->bytes;
        });
    }

    $ssh->debug("Entering interactive session.");
    $channel->{cb} = sub {
        $cb->($ssh, $stdout, $stderr);
    }
    
}

sub break_client_loop { $_[0]->{ek_client_loopcl_quit_pending} = 1 }
sub restore_client_loop { $_[0]->{_cl_quit_pending} = 0 }
sub _quit_pending { $_[0]->{_cl_quit_pending} }

sub client_loop {
    my $ssh = shift;
    return unless scalar @{$ssh->{events}{cmd}} > 0;
    $ssh->emit('cmd');
    $ssh->{_cl_quit_pending} = 0;

    # 取所有频道
    my $cmgr = $ssh->channel_mgr;
    
    # 处理每个频道的事件
    my $h = $cmgr->handlers;
    $ssh->event_loop($cmgr, $h);
}

sub event_loop {
    my ($ssh, $cmgr, $h, $cb) = @_;
    return $ssh->client_loop if $ssh->_quit_pending;
    while (my $packet = Net::SSH::Perl::Packet->read_poll($ssh)) {
        if (my $code = $h->{ $packet->type }) {
            $code->($cmgr, $packet);
        }
        else {
            $ssh->debug("Warning: ignore packet type " . $packet->type);
        }
    }

    return $ssh->client_loop if $ssh->_quit_pending;

    $cmgr->process_output_packets;

    # 如果处理完了. 关掉所有的连接
    # 之所以在这进行这个操作是因为主 channel 也需要操作
    for my $c (@{ $cmgr->{channels} }) {
        next unless defined $c;
        if ($c->{wfd} &&
            $c->{extended}->length == 0 &&
            $c->{output}->length == 0 &&
            $c->{ostate} == CHAN_OUTPUT_WAIT_DRAIN ) { 
                $c->obuf_empty;
        }
        # 上面 obuf_empty 会给 ostate 变成 CHAN_OUTPUT_CLOSED
        # 下面这个就会发关闭给远程
        if ($c->delete_if_full_closed) {
            defined $c->{cb} ? $c->{cb}->() : '';
            $cmgr->remove($c->{id});
        }
    }
        
    my $oc = grep { defined } @{ $cmgr->{channels} };
    return $ssh->client_loop unless $oc > 1;

    my $cv = AE::cv sub {
        my $result = shift->recv;
        delete $ssh->{watcher};
        $ssh->event_loop($cmgr, $h, $cb);
    };

    # 这是处理频道上的输出, 客户端的输入
    for my $c (@{ $cmgr->{channels} }) {
        next unless defined $c;
        my $id = $c->{id};
        if ($c->{rfd} && $c->{istate} == CHAN_INPUT_OPEN &&
            $c->{remote_window} > 0 &&
            $c->{input}->length < $c->{remote_window}) {
            $ssh->{watcher}{$id}{rfd} = AE::io $c->{rfd}, 0, sub {
                # 顺序记录 - 频道 - rfd
                my $buf;
                sysread $c->{rfd}, $buf, 8192;
                ($buf) = $buf =~ /(.*)/s;
                $c->send_data($buf);
                $cv->send('rfd');
                delete $ssh->{watcher}{$id}{rfd}
            };
        } 

        # 给内容输出
        if (defined $c->{wfd} &&
            $c->{ostate} == CHAN_OUTPUT_OPEN ||
            $c->{ostate} == CHAN_OUTPUT_WAIT_DRAIN) {
            if ($c->{output} and $c->{output}->length > 0) {
                $ssh->{watcher}{$id}{wfd} = AE::io $c->{wfd}, 1, sub {
                   if (my $r = $c->{handlers}{"_output_buffer"}) {
                       $r->{code}->( $c, $c->{output}, @{ $r->{extra} } );
                   }
                   $c->{local_consumed} += $c->{output}->length;
                   $c->{output}->empty;
                   $cv->send('wfd');
                    delete $ssh->{watcher}{$id}{wfd}
                }
            }
        }
        
        if ($c->{efd} && $c->{extended}->length > 0) {
            my $c->{watcher}{$id}{efd} = AE::io $c->{efd}, 1, sub {
                if (my $r = $c->{handlers}{"_extended_buffer"}) {
                    $r->{code}->( $c, $c->{extended}, @{ $r->{extra} } );
                }
                $c->{local_consumed} += $c->{extended}->length;
                $c->{extended}->empty;
                $cv->send('efd');
                delete $ssh->{watcher}{$id}{efd}
            };
        }

        
        # 原进程
        $c->check_window;
        if ($c->delete_if_full_closed) {
            defined $c->{cb} ? $c->{cb}->() : '';
            $cmgr->remove($c->{id});
        }
    }


    # 这是主连接的句柄
    my $handle = $ssh->{session}{sock};
    $handle->push_read( chunk => 4 => sub {
        my ($handle, $buf) = @_;
        if (!length($buf)) {
            croak "Connection failed: $!\n";
        }
        $ssh->break_client_loop if length($buf) == 0;
        ($buf) = $buf =~ /(.*)/s;  ## Untaint data. Anything allowed.
        $ssh->incoming_data->append($buf);
        $cv->send('main');
    });
}

sub channel_mgr {
    my $ssh = shift;
    unless (defined $ssh->{channel_mgr}) {
        $ssh->{channel_mgr} = Net::SSH::Perl::ChannelMgr->new($ssh);
    }
    $ssh->{channel_mgr};
}
sub _read_version {
    my $ssh = shift;
    my $line = shift;;
    my $len = length $line;



( run in 0.521 second using v1.01-cache-2.11-cpan-13bb782fe5a )