AnyEvent-STOMP-Client

 view release on metacpan or  search on metacpan

README.pod  view on Meta::CPAN

      }
  );

  AnyEvent->condvar->recv;


=head1 DESCRIPTION

AnyEvent::STOMP::Client provides a STOMP (Simple Text Oriented Messaging
Protocol) client. Thanks to AnyEvent, AnyEvent::STOMP::Client is completely
non-blocking, by making extensive use of the AnyEvent::Handle and timers (and,
under the hood, AnyEvent::Socket). Building on Object::Event,
AnyEvent::STOMP::Client implements various events (e.g. the MESSAGE event, when
a STOMP MESSAGE frame is received) and offers callbacks for these (e.g.
on_message($callback)).

=head1 METHODS

=head2 $client = new $host, $port, $connect_headers, $tls_context

Create an instance of C<AnyEvent::STOMP::Client>.

examples/connection_handling.pl  view on Meta::CPAN

# Example for Connection Handling using AnyEvent::STOMP::Client
#
################################################################################

use AnyEvent;
use AnyEvent::STOMP::Client;

my $stomp_client = new AnyEvent::STOMP::Client();

my $backoff = 0;
my $backoff_timer;

sub backoff {
    $backoff_timer = AnyEvent->timer(
        after => $backoff,
        cb => sub { $stomp_client->connect(); }
    );
}

$stomp_client->on_connected(sub { $backoff = 0; });
$stomp_client->on_connection_lost(sub { &backoff });
$stomp_client->on_connect_error(sub { $backoff += 10; &backoff; });

$stomp_client->connect();

examples/sample-consumer.pl  view on Meta::CPAN

        print "MESSAGE\n";
        foreach (sort keys %$header) {
            print "$_:$header->{$_}\n";
        }
        print "\n$body\n";

        $self->ack($header->{'ack'}) if (defined $header->{'ack'});
    }
);

$w = AnyEvent->timer(
    after => 10,
    cb => sub {
        $stomp_client->unsubscribe('/queue/test-destination');
    }
);

$stomp_client->on_unsubscribed(
    sub {
        my ($self, $destination) = @_;
        print "Unsubscribed from '$destination'!\n";

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

sub disconnect {
    my $self = shift;
    my $ungraceful = shift;

    unless ($self->is_connected) {
        if (defined $self->{handle}) {
            $self->{handle}->destroy;
            delete $self->{handle};
        }
        $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
        delete $self->{heartbeat}{timer};
        return;
    }

    if (defined $ungraceful and $ungraceful) {
        $self->send_frame('DISCONNECT');
        $self->{connected} = 0;
        if (defined $self->{handle}) {
            $self->{handle}->push_shutdown;
            $self->{handle}->destroy;
            delete $self->{handle};
        }
        $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
        delete $self->{heartbeat}{timer};
    }
    else {
        my $receipt_id = $self->get_uuid;

        $self->send_frame('DISCONNECT', {receipt => $receipt_id,});

        $self->before_receipt(
            sub {
                my ($self, $header) = @_;

                if ($header->{'receipt-id'} eq $receipt_id) {
                    $self->{connected} = 0;
                    $self->stop_event;
                    $self->unreg_me;
                    if (defined $self->{handle}) {
                        $self->{handle}->push_shutdown;
                        $self->{handle}->destroy;
                        delete $self->{handle};
                    }
                    $self->event('DISCONNECTED', $self->{host}, $self->{port}, $ungraceful);
                    delete $self->{heartbeat}{timer};
                }
            }
        );
    }
}

sub destroy {
    my $self = shift;
    $self->disconnect(1) if $self->is_connected;
    $self->remove_all_callbacks;

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

    }

    if ($sx == 0 or $cy == 0) {
        $self->{heartbeat}{interval}{server} = 0;
    }
    else {
        $self->{heartbeat}{interval}{server} = max($sx, $cy);
    }
}

sub reset_client_heartbeat_timer {
    my $self = shift;
    my $interval = $self->{heartbeat}{interval}{client};

    unless (defined $interval and $interval > 0) {
        return;
    }

    $self->{heartbeat}{timer}{client} = AnyEvent->timer(
        after => ($interval/1000),
        cb => sub {
            $self->send_heartbeat;
        }
    );
}

sub reset_server_heartbeat_timer {
    my $self = shift;
    my $interval = $self->{heartbeat}{interval}{server};

    unless (defined $interval and $interval > 0) {
        return;
    }

    $self->{heartbeat}{timer}{server} = AnyEvent->timer(
        after => (($interval+$self->get_connection_timeout_margin)/1000),
        cb => sub {
            if ($self->{connected}) {
                $self->{connected} = 0;
                if (defined $self->{handle}) {
                    $self->{handle}->push_shutdown;
                    $self->{handle}->destroy;
                    undef $self->{handle};
                }

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

        $body = '' unless defined $body;
        $frame = $command.$EOL.$header.$EOL.$EOL.$body.$NULL;
    }
    else {
        $frame = $command.$EOL.$header.$EOL.$EOL.$NULL;
    }

    $self->event('SEND_FRAME', $frame);
    $self->event($command, $frame) if ($command =~ m/SEND|ACK|NACK|/);
    $self->{handle}->push_write($frame);
    $self->reset_client_heartbeat_timer;
}

sub send {
    my ($self, $destination, $headers, $body) = @_;

    if (defined $destination) {
        $headers->{destination} = $destination;
    }
    else {
        croak "Would you mind supplying me with a destination?";

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

    $header->{transaction} = $transaction if (defined $transaction);

    $self->send_frame('NACK', $header);
}

sub send_heartbeat {
    my $self = shift;

    if ($self->is_connected) {
        $self->{handle}->push_write($EOL);
        $self->reset_client_heartbeat_timer;
    }
}

sub begin_transaction {
    my $self = shift;
    my $id = shift;
    my $additional_headers = shift || {};

    croak "I really need a transaction identifier here!" unless (defined $id);

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

    $self->send_frame('ABORT', {transaction => $id, %$additional_headers,});
    delete $self->{transactions}{$id};
}

sub read_frame {
    my $self = shift;
    $self->{handle}->unshift_read(
        line => sub {
            my ($handle, $command, $eol) = @_;

            $self->reset_server_heartbeat_timer;

            if ($command =~ /^(CONNECTED|MESSAGE|RECEIPT|ERROR)$/) {
                $command = $1;
            }
            else {
                return;
            }

            $self->{handle}->unshift_read(
                regex => qr<\r?\n\r?\n>,

lib/AnyEvent/STOMP/Client.pm  view on Meta::CPAN

      }
  );

  AnyEvent->condvar->recv;


=head1 DESCRIPTION

AnyEvent::STOMP::Client provides a STOMP (Simple Text Oriented Messaging
Protocol) client. Thanks to AnyEvent, AnyEvent::STOMP::Client is completely
non-blocking, by making extensive use of the AnyEvent::Handle and timers (and,
under the hood, AnyEvent::Socket). Building on Object::Event,
AnyEvent::STOMP::Client implements various events (e.g. the MESSAGE event, when
a STOMP MESSAGE frame is received) and offers callbacks for these (e.g.
on_message($callback)).

=head1 METHODS

=head2 $client = new $host, $port, $connect_headers, $tls_context

Create an instance of C<AnyEvent::STOMP::Client>.

lib/AnyEvent/STOMP/Client/All.pm  view on Meta::CPAN


    if (defined $self->{backoff}{$id}{current}) {
        $self->increase_backoff($id);
    }
    else {
        $self->{backoff}{$id}{current} = $self->{config}{backoff}{start_value};
    }

    $log->debug("$id backoff: ".$self->{backoff}{$id}{current});

    $self->{reconnect_timers}{$id} = AnyEvent->timer (
        after => $self->get_backoff($id),
        cb => sub {
            $log->debug("$id trying to connect.");
            $self->{stomp_clients}{$id}->connect;
        },
    );
}

sub increase_backoff {
    my ($self, $id) = @_;

    if ($self->{backoff}{$id}{current} < $self->{config}{backoff}{maximum}) {
        $self->{backoff}{$id}{current} *= $self->{config}{backoff}{multiplier};
    }
}

sub reset_backoff {
    my ($self, $id) = @_;

    delete $self->{reconnect_timer}{$id};
    delete $self->{backoff}{$id}{current};
}

sub get_backoff {
    my ($self, $id) = @_;

    return $self->{backoff}{$id}{current};
}

1;

lib/AnyEvent/STOMP/Client/Any.pm  view on Meta::CPAN

        );

        $self->{stomp_clients}{$id}->on_connected(
            sub {
                my (undef, $header) = @_;

                $log->debug("$id STOMP connection established.");

                $self->{current_stomp_client} = $self->{stomp_clients}{$id};
                $self->reset_backoff;
                delete $self->{connect_timeout_timer};

                $self->event('ANY_CONNECTED', $header, $id);
            }
        );

        $self->{stomp_clients}{$id}->on_transport_connected(
            sub {
                $log->debug("$id TCP/TLS connection established.");
            }
        );

lib/AnyEvent/STOMP/Client/Any.pm  view on Meta::CPAN

            }
        );

        $self->{stomp_clients}{$id}->on_connection_lost(
            sub {
                my (undef, undef, undef, $reason) = @_;

                delete $self->{current_stomp_client};

                $log->debug("$id Connection lost ($reason).");
                delete $self->{connect_timeout_timer};
                $self->set_client_unavailable($id);
                $self->event('ANY_CONNECTION_LOST', $id);
                $self->backoff;
            }
        );

        $self->{stomp_clients}{$id}->on_connect_error(
            sub {
                my (undef, undef, undef, $reason) = @_;
                $log->debug("$id Could not establish connection ($reason).");
                delete $self->{connect_timeout_timer};
                $self->set_client_unavailable($id);
                $self->backoff;
            }
        );

        $self->{stomp_clients}{$id}->on_receipt(
            sub {
                my (undef, $header) = @_;
                $self->event('ANY_RECEIPT', $header, $id);
            }

lib/AnyEvent/STOMP/Client/Any.pm  view on Meta::CPAN

    $log->debug("STOMP clients set up.");
}

sub connect {
    my $self = shift;
    my $id = $self->get_random_client_id;

    $log->debug("$id Establishing TCP/TLS connection.");
    $self->{stomp_clients}{$id}->connect;

    $self->{connect_timeout_timer} = AnyEvent->timer(
        after => 10,
        cb => sub {
            $log->debug("$id Timeout establishing STOMP connection.");
            $self->{stomp_clients}{$id}->disconnect;
            $self->set_client_unavailable($id);
            $self->backoff;
        }
    );
}

lib/AnyEvent/STOMP/Client/Any.pm  view on Meta::CPAN

sub backoff {
    my $self = shift;

    if ($self->is_client_available) {
        $self->connect;
    }
    else {
        $self->increase_backoff;
        $self->reset_clients_state;

        $self->{reconnect_timer} = AnyEvent->timer(
            after => $self->get_backoff,
            cb => sub {
                $self->backoff;
            },
        );
    }
}

sub increase_backoff {
    my $self = shift;

lib/AnyEvent/STOMP/Client/Any.pm  view on Meta::CPAN

        my $val = $self->{config}{backoff}{start_value};
        $self->{backoff} = rand($val)+$val/2;
    }

    $log->debug("Backing off ".$self->{backoff});
}

sub reset_backoff {
    my $self = shift;

    delete $self->{reconnect_timer};
    delete $self->{backoff};
    $self->reset_clients_state;
}

sub get_backoff {
    return shift->{backoff};
}

sub get_random_client_id {
    my $self = shift;



( run in 1.093 second using v1.01-cache-2.11-cpan-49f99fa48dc )