AnyEvent-STOMP-Client
view release on metacpan or search on metacpan
lib/AnyEvent/STOMP/Client.pm view on Meta::CPAN
carp "You've already begun transaction '$id'";
}
else {
$self->send_frame('BEGIN', {transaction => $id, %$additional_headers,});
$self->{transactions}{$id} = 1;
}
}
sub commit_transaction {
my $self = shift;
my $id = shift;
my $additional_headers = shift || {};
croak "I really need a transaction identifier here!" unless (defined $id);
unless (defined $self->{transactions}{$id}) {
carp "You've already commited transaction '$id'";
}
$self->send_frame('COMMIT', {transaction => $id, %$additional_headers,});
delete $self->{transactions}{$id};
}
sub abort_transaction {
my $self = shift;
my $id = shift;
my $additional_headers = shift || {};
croak "I really need a transaction identifier here!" unless (defined $id);
unless (defined $self->{transactions}{$id}) {
carp "You've already commited transaction '$id'";
}
$self->send_frame('ABORT', {transaction => $id, %$additional_headers,});
delete $self->{transactions}{$id};
}
sub read_frame {
my $self = shift;
$self->{handle}->unshift_read(
line => sub {
my ($handle, $command, $eol) = @_;
$self->reset_server_heartbeat_timer;
if ($command =~ /^(CONNECTED|MESSAGE|RECEIPT|ERROR)$/) {
$command = $1;
}
else {
return;
}
$self->{handle}->unshift_read(
regex => qr<\r?\n\r?\n>,
cb => sub {
my ($handle, $header_string) = @_;
my $header_hashref = header_string2hash($header_string);
my $args;
# The headers of the CONNECTED frame are not en-/decoded
# for backwards compatibility with STOMP 1.0
unless ($command eq 'CONNECTED') {
$header_hashref = decode_header($header_hashref);
}
if ($command =~ m/MESSAGE|ERROR/) {
if (defined $header_hashref->{'content-length'}) {
$args->{chunk} = $header_hashref->{'content-length'};
}
else {
$args->{regex} = qr<[^\000]*\000>;
}
$self->{handle}->unshift_read(
%$args,
cb => sub {
my ($handle, $body) = @_;
$self->event('READ_FRAME', $command, $header_hashref, $body);
if ($command eq 'ERROR') {
$body =~ s/^\s+|\s+$|\0//g; # trim and remove null char
$self->event(
'ERROR',
$self->{host},
$self->{port},
$body || $header_hashref->{message} || 'unknown'
);
}
else {
$self->event('MESSAGE', $header_hashref, $body);
# If frame end was determined by matching
# for a NULL character, then this character
# is not part of the frame body and thus
# removed
if (exists $args->{regex}) {
$body =~ s/\0$//g;
}
if (defined $header_hashref->{subscription}) {
$self->event(
"MESSAGE-$header_hashref->{subscription}",
$header_hashref,
$body
);
}
}
}
);
}
else {
if ($command eq 'CONNECTED') {
$self->{connected} = 1;
$self->{session} = $header_hashref->{session};
$self->{version} = $header_hashref->{version};
$self->{server} = $header_hashref->{server};
$self->set_heartbeat_intervals($header_hashref->{'heart-beat'});
}
( run in 0.418 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )