AnyEvent-SSH2

 view release on metacpan or  search on metacpan

lib/AnyEvent/SSH2.pm  view on Meta::CPAN

    my $cmgr = $ssh->channel_mgr;
    my $channel = $ssh->_session_channel;
    $channel->open;


    $channel->register_handler(SSH2_MSG_CHANNEL_OPEN_CONFIRMATION, sub {
        my($channel, $packet) = @_;

        ## Experimental pty support:
        if ($ssh->{config}->get('use_pty')) {
		    $ssh->debug("Requesting pty.");

		    my $packet = $channel->request_start('pty-req', 0);

		    my($term) = $ENV{TERM} =~ /(\w+)/;
		    $packet->put_str($term);
		    my $foundsize = 0;
		    if (eval "require Term::ReadKey") {
		    	my @sz = Term::ReadKey::GetTerminalSize($ssh->sock);
		    	if (defined $sz[0]) {
		    		$foundsize = 1;
		    		$packet->put_int32($sz[1]); # height
		    		$packet->put_int32($sz[0]); # width
		    		$packet->put_int32($sz[2]); # xpix
		    		$packet->put_int32($sz[3]); # ypix
		    	}
		    }
		    if (!$foundsize) {
		    	$packet->put_int32(0) for 1..4;
		    }

            # Array used to build Pseudo-tty terminal modes; fat commas separate opcodes from values for clarity.
            my $terminal_mode_string;
            if(!defined($ssh->{config}->get('terminal_mode_string'))) {
                my @terminal_modes = (
                   5 => 0,0,0,4,      # VEOF => 0x04 (^d)
                   0                  # string must end with a 0 opcode
                );
                for my $char (@terminal_modes) {
                    $terminal_mode_string .= chr($char);
                }
            }
            else {
                $terminal_mode_string = $ssh->{config}->get('terminal_mode_string');
            }
            $packet->put_str($terminal_mode_string);
            $packet->send;
        }

        my $r_packet = $channel->request_start("exec", 0);
        $r_packet->put_str($cmd);
        $r_packet->send;

    });

    my($exit);
    $channel->register_handler(SSH2_MSG_CHANNEL_REQUEST,
        _make_input_channel_req(\$exit));

    my $h = $ssh->{client_handlers};
    my($stdout, $stderr);
    if (my $r = $h->{stdout}) {
        $channel->register_handler("_output_buffer",
            $r->{code}, @{ $r->{extra} });
    }
    else {
        $channel->register_handler("_output_buffer", sub {
            $stdout .= $_[1]->bytes;
        });
    }
    if (my $r = $h->{stderr}) {
        $channel->register_handler("_extended_buffer",
            $r->{code}, @{ $r->{extra} });
    }
    else {
        $channel->register_handler("_extended_buffer", sub {
            $stderr .= $_[1]->bytes;
        });
    }

    $ssh->debug("Entering interactive session.");
    $channel->{cb} = sub {
        $cb->($ssh, $stdout, $stderr);
    }
    
}

sub break_client_loop { $_[0]->{ek_client_loopcl_quit_pending} = 1 }
sub restore_client_loop { $_[0]->{_cl_quit_pending} = 0 }
sub _quit_pending { $_[0]->{_cl_quit_pending} }

sub client_loop {
    my $ssh = shift;
    return unless scalar @{$ssh->{events}{cmd}} > 0;
    $ssh->emit('cmd');
    $ssh->{_cl_quit_pending} = 0;

    # 取所有频道
    my $cmgr = $ssh->channel_mgr;
    
    # 处理每个频道的事件
    my $h = $cmgr->handlers;
    $ssh->event_loop($cmgr, $h);
}

sub event_loop {
    my ($ssh, $cmgr, $h, $cb) = @_;
    return $ssh->client_loop if $ssh->_quit_pending;
    while (my $packet = Net::SSH::Perl::Packet->read_poll($ssh)) {
        if (my $code = $h->{ $packet->type }) {
            $code->($cmgr, $packet);
        }
        else {
            $ssh->debug("Warning: ignore packet type " . $packet->type);
        }
    }

    return $ssh->client_loop if $ssh->_quit_pending;

    $cmgr->process_output_packets;

    # 如果处理完了. 关掉所有的连接
    # 之所以在这进行这个操作是因为主 channel 也需要操作
    for my $c (@{ $cmgr->{channels} }) {
        next unless defined $c;
        if ($c->{wfd} &&
            $c->{extended}->length == 0 &&
            $c->{output}->length == 0 &&
            $c->{ostate} == CHAN_OUTPUT_WAIT_DRAIN ) { 
                $c->obuf_empty;
        }
        # 上面 obuf_empty 会给 ostate 变成 CHAN_OUTPUT_CLOSED
        # 下面这个就会发关闭给远程
        if ($c->delete_if_full_closed) {
            defined $c->{cb} ? $c->{cb}->() : '';
            $cmgr->remove($c->{id});
        }
    }
        
    my $oc = grep { defined } @{ $cmgr->{channels} };
    return $ssh->client_loop unless $oc > 1;

    my $cv = AE::cv sub {

lib/AnyEvent/SSH2.pm  view on Meta::CPAN

sub channel_mgr {
    my $ssh = shift;
    unless (defined $ssh->{channel_mgr}) {
        $ssh->{channel_mgr} = Net::SSH::Perl::ChannelMgr->new($ssh);
    }
    $ssh->{channel_mgr};
}
sub _read_version {
    my $ssh = shift;
    my $line = shift;;
    my $len = length $line;
    unless(defined($len)) {
        next if $! == EAGAIN || $! == EWOULDBLOCK;
        croak "Read from socket failed: $!";
    }
    croak "Connection closed by remote host" if $len == 0;
    croak "Version line too long: $line"
     if substr($line, 0, 4) eq "SSH-" and length($line) > 255;
    croak "Pre-version line too long: $line" if length($line) > 4*1024;
    if (substr($line, 0, 4) ne "SSH-") {
        $ssh->debug("Remote version string: $line");
    }
    return $line;
}
sub sock { $_[0]->{session}{sock} }

1;
__END__

=pod
 
=encoding utf8

=head1 NAME

AnyEvent::SSH2 - 基于 AnyEvent 的 SSH2 的非阻塞事件驱动的实现

=head1 SYNOPSIS

对多台主机, 并行的远程执行一些命令.

    use AE;
    use AnyEvent::SSH2;

    my $ssh1 = AnyEvent::SSH2->new(
        'ip',
        user => 'root',
        pass => 'pass',
    );   
    
    my $ssh2 = AnyEvent::SSH2->new(
        'ip'
        user => 'root',
        pass => 'pass',
    );   
    
    my $cv = AE::cv;

    $cv->begin;
    $ssh1->send('sleep 5;hostname' => sub {
        my ($ssh,  $stdout, $stderr) = @_;
        print "$stdout";
        $cv->end;
    })->connect;  
    
    $cv->begin;
    $ssh2->send('sleep 1;hostname' => sub {
        my ($ssh,  $stdout, $stderr) = @_;
        print "$stdout";
        $cv->end;
    })->connect;  

    $cv->recv;

对同一个主机, 并行的执行多条命令...注意顺序并不固定, 任何一个命令先执行完都会先回调.

    use AnyEvent::SSH2;
    my $ssh = AnyEvent::SSH2->new(
        'ip'
        user => 'root',
        pass => 'pass',
    );   
    
    
    my $cv = AE::cv;
    $cv->begin;
    $ssh->send('sleep 5; echo 5' => sub {
        my ($ssh,  $stdout, $stderr) = @_;
        print "$stdout";
        $cv->end;
    });
    
    $cv->begin;
    $ssh->send('sleep 1; echo 1 ; uptime' => sub {
        my ($ssh,  $stdout, $stderr) = @_;
        print "$stdout";
        $cv->end;
    });
    
    $ssh->connect;  
    
    $cv->recv;

或者你可能想有一定层次, 根据前一条命令的条件来执行指定的命令.

    my $cv = AE::cv;
    $ssh->send('sleep 5; echo 5' => sub {
        my ($ssh,  $stdout, $stderr) = @_;
        print "$stdout";
        $ssh->send('sleep 1; echo 1 ; uptime' => sub {
            my ($ssh,  $stdout, $stderr) = @_;
            print "$stdout";
            $cv->send;
        });
    });
    
    $ssh->connect;  
    
    $cv->recv;

=head1 DESCRIPTION

这个模块是基于 Net::SSH::Perl 实现的在 AnyEvent 上的事件驱动的支持. 并不是使用的 Fork 的实现 (non-fork), 这是基于 socket 的原生事件驱动实现. 
可以同时异步的连接多个主机进行操作.  并且也可以支持同时对一个主机同时执行多条命令与根据前面结果然后在执行指定命令.

=head1 属性

默认对象 new 的时候需要提供连接的主机地址. 本对象的属性继承所有的 L<Net::SSH::Perl> 的属性. 并实现了下列这些

=head2 user 

提供用于远程连接的用户名. 如果不提供会默认使用当前用户.

=head2 pass 

提供用于远程连接的密码. 也支持 key 方式认证. 需要指定如下属性

    identity_files => ['/root/.ssh/id_rsa'],
    options => [
        'PubkeyAuthentication yes',
        'PasswordAuthentication no', # 可能你想关掉密码认证
    ],

=head1 方法

本对象所支持的方法如下

=head2 send

这个需要提供你要给远程执行的命令做为第一个参数, 第二个参数是命令执行完的回调函数. 
回调函数的第二个和第三个参数会别会是命令执行的标准输出和标准错误.

本方法可以重复设置, 都会一次性发给远程主机执行. 所以执行完会根据执行结果的速度, 会立即返回并调用回调.

=head2 connect

当上面的命令定义完了, 可以调用 connect 方法来运行整个事件.

=head1 SEE ALSO

L<AnyEvent>, L<Net::SSH::Perl>

=head1 AUTHOR

扶凯 fukai <iakuf@163.com>

=cut



( run in 1.513 second using v1.01-cache-2.11-cpan-39bf76dae61 )