AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

        carp "You've already begun transaction '$id'";
    }
    else {
        $self->send_frame('BEGIN', {transaction => $id, %$additional_headers,});
        $self->{transactions}{$id} = 1;
    }
}

sub commit_transaction {
    my $self = shift;
    my $id = shift;
    my $additional_headers = shift || {};

    croak "I really need a transaction identifier here!" unless (defined $id);

    unless (defined $self->{transactions}{$id}) {
        carp "You've already commited transaction '$id'";
    }

    $self->send_frame('COMMIT', {transaction => $id, %$additional_headers,});
    delete $self->{transactions}{$id};
}

sub abort_transaction {
    my $self = shift;
    my $id = shift;
    my $additional_headers = shift || {};

    croak "I really need a transaction identifier here!" unless (defined $id);

    unless (defined $self->{transactions}{$id}) {
        carp "You've already commited transaction '$id'";
    }

    $self->send_frame('ABORT', {transaction => $id, %$additional_headers,});
    delete $self->{transactions}{$id};
}

sub read_frame {
    my $self = shift;
    $self->{handle}->unshift_read(
        line => sub {
            my ($handle, $command, $eol) = @_;

            $self->reset_server_heartbeat_timer;

            if ($command =~ /^(CONNECTED|MESSAGE|RECEIPT|ERROR)$/) {
                $command = $1;
            }
            else {
                return;
            }

            $self->{handle}->unshift_read(
                regex => qr<\r?\n\r?\n>,
                cb => sub {
                    my ($handle, $header_string) = @_;
                    my $header_hashref = header_string2hash($header_string);
                    my $args;

                    # The headers of the CONNECTED frame are not en-/decoded
                    # for backwards compatibility with STOMP 1.0
                    unless ($command eq 'CONNECTED') {
                        $header_hashref = decode_header($header_hashref);
                    }

                    if ($command =~ m/MESSAGE|ERROR/) {
                        if (defined $header_hashref->{'content-length'}) {
                            $args->{chunk} = $header_hashref->{'content-length'};
                        }
                        else {
                            $args->{regex} = qr<[^\000]*\000>;
                        }

                        $self->{handle}->unshift_read(
                            %$args,
                            cb => sub {
                                my ($handle, $body) = @_;
                                $self->event('READ_FRAME', $command, $header_hashref, $body);
                                if ($command eq 'ERROR') {
                                    $body =~ s/^\s+|\s+$|\0//g; # trim and remove null char
                                    $self->event(
                                        'ERROR',
                                        $self->{host},
                                        $self->{port},
                                        $body || $header_hashref->{message} || 'unknown'
                                    );
                                }
                                else {
                                    $self->event('MESSAGE', $header_hashref, $body);

                                    # If frame end was determined by matching
                                    # for a NULL character, then this character
                                    # is not part of the frame body and thus
                                    # removed
                                    if (exists $args->{regex}) {
                                        $body =~ s/\0$//g;
                                    }


                                    if (defined $header_hashref->{subscription}) {
                                        $self->event(
                                            "MESSAGE-$header_hashref->{subscription}",
                                            $header_hashref,
                                            $body
                                        );
                                    }
                                }
                            }
                        );
                    }
                    else {
                        if ($command eq 'CONNECTED') {
                            $self->{connected} = 1;
                            $self->{session} = $header_hashref->{session};
                            $self->{version} = $header_hashref->{version};
                            $self->{server} = $header_hashref->{server};

                            $self->set_heartbeat_intervals($header_hashref->{'heart-beat'});
                        }



( run in 0.418 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )