DBGp-Client
view release on metacpan or search on metacpan
lib/DBGp/Client/AsyncConnection.pm view on Meta::CPAN
package DBGp::Client::AsyncConnection;
use strict;
use warnings;
use DBGp::Client::AsyncStream;
use DBGp::Client::Parser;
use Scalar::Util;
sub new {
my ($class, %args) = @_;
my $stream = DBGp::Client::AsyncStream->new(socket => $args{socket});
my $self = my $weak_self = bless {
stream => $stream,
sequence => 0,
init => undef,
commands => {},
on_stream => undef,
on_notification => undef,
}, $class;
Scalar::Util::weaken($weak_self);
$stream->on_line(sub { $weak_self->_receive_line(@_) });
return $self;
}
sub init { $_[0]->{init} }
sub send_command {
my ($self, $callback, $command, @args) = @_;
my $seq_id = ++$self->{sequence};
$self->{commands}{$seq_id} = $callback;
$self->{stream}->put_line($command, '-i', $seq_id, @args);
}
sub add_data { $_[0]->{stream}->add_data($_[1]) }
sub closed {
my ($self) = @_;
for my $transaction_id (keys %{$self->{commands}}) {
my $error = bless {
transaction_id => $transaction_id,
code => 999,
apperr => 1,
message => "Broken connection",
}, 'DBGp::Client::Response::InternalError';
eval {
delete($self->{commands}{$transaction_id})->($error);
};
}
}
sub _receive_line {
my ($self, $line) = @_;
if (!$self->{init}) {
$self->{init} = DBGp::Client::Parser::parse($line);
} else {
my $res = DBGp::Client::Parser::parse($line);
if ($res->is_oob) {
if ($res->is_stream && $self->{on_stream}) {
$self->{on_stream}->($res);
} elsif ($res->is_notification && $self->{on_notification}) {
$self->{on_notification}->($res);
}
} else {
my $callback = delete $self->{commands}{$res->transaction_id};
die 'Mismatched transaction IDs: ', $res->transaction_id
unless $callback;
$callback->($res);
}
}
}
sub on_stream { $_[0]->{on_stream} = $_[1] }
( run in 3.004 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )