AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    if ($method_frame && $method_frame->isa('Net::AMQP::Protocol::Connection::Close')) {
        delete $self->{_heartbeat_timer};
        $self->_push_write(Net::AMQP::Protocol::Connection::CloseOk->new());
        $self->_server_closed($close_cb, $frame);
        return;
    }

    return 1;
}

sub _server_closed {
    my $self = shift;
    my ($close_cb, $why,) = @_;

    $self->{_state} = _ST_CLOSING;
    for my $channel (values %{ $self->{_channels} }) {
        $channel->_closed(ref($why) ? $why : $channel->_close_frame($why));
    }
    $self->{_channels} = {};
    $self->{_handle}->push_shutdown;
    $self->{_state} = _ST_CLOSED;

    $close_cb->($why);
    return;
}

sub _start {
    my $self = shift;
    my %args = @_;

    if ($self->{verbose}) {
        warn 'post header', "\n";
    }

    $self->{_handle}->push_write(Net::AMQP::Protocol->header);

    $self->_push_read_and_valid(
        'Connection::Start',
        sub {
            my $frame = shift;

            my @mechanisms = split /\s/, $frame->method_frame->mechanisms;
            return $args{on_failure}->('AMQPLAIN is not found in mechanisms')
                if none {$_ eq 'AMQPLAIN'} @mechanisms;

            my @locales = split /\s/, $frame->method_frame->locales;
            return $args{on_failure}->('en_US is not found in locales')
                if none {$_ eq 'en_US'} @locales;

            $self->{_server_properties} = $frame->method_frame->server_properties;

            $self->_push_write(
                Net::AMQP::Protocol::Connection::StartOk->new(
                    client_properties => {
                        platform     => 'Perl',
                        product      => __PACKAGE__,
                        information  => 'http://d.hatena.ne.jp/cooldaemon/',
                        version      => Net::AMQP::Value::String->new(__PACKAGE__->VERSION),
                        capabilities => {
                            consumer_cancel_notify     => Net::AMQP::Value::true,
                            exchange_exchange_bindings => Net::AMQP::Value::true,
                        },
                        %{ $args{client_properties} || {} },
                    },
                    mechanism => 'AMQPLAIN',
                    response => {
                        LOGIN    => $args{user},
                        PASSWORD => $args{pass},
                    },
                    locale => 'en_US',
                ),
            );

            $self->_tune(%args,);
        },
        $args{on_failure},
    );

    return $self;
}

sub _tune {
    my $self = shift;
    my %args = @_;

    weaken(my $weak_self = $self);
    $self->_push_read_and_valid(
        'Connection::Tune',
        sub {
            my $self = $weak_self or return;
            my $frame = shift;

            my %tune;
            foreach (qw( channel_max frame_max heartbeat )) {
                my $client = $args{tune}{$_} || 0;
                my $server = $frame->method_frame->$_ || 0;

                # negotiate with the server such that we cannot request a larger
                # value set by the server, unless the server said unlimited
                $tune{$_} = ($server == 0 or $client == 0)
                    ? ($server > $client ? $server : $client)   # max
                    : ($client > $server ? $server : $client);  # min
            }

            if ($self->{_frame_max} = $tune{frame_max}) {
                # calculate how big the body can actually be
                $self->{_body_max} = $self->{_frame_max} - Net::AMQP::_HEADER_LEN - Net::AMQP::_FOOTER_LEN;
            }

            $self->{_channel_max} = $tune{channel_max} || $DEFAULT_CHANNEL_MAX;

            $self->_push_write(
                Net::AMQP::Protocol::Connection::TuneOk->new(%tune,)
            );

            if ($tune{heartbeat} > 0) {
                $self->_start_heartbeat($tune{heartbeat}, %args,);
            }

            $self->_open(%args,);
        },

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

  my $cv = AnyEvent->condvar;

  my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
      host       => 'localhost',
      port       => 5672,
      user       => 'guest',
      pass       => 'guest',
      vhost      => '/',
      timeout    => 1,
      tls        => 0, # Or 1 if you'd like SSL
      tls_ctx    => $anyevent_tls # or a hash of AnyEvent::TLS options.
      tune       => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
      nodelay    => 1, # Reduces latency by disabling Nagle's algorithm
      on_success => sub {
          my $ar = shift;
          $ar->open_channel(
              on_success => sub {
                  my $channel = shift;
                  $channel->declare_exchange(
                      exchange   => 'test_exchange',
                      on_success => sub {
                          $cv->send('Declared exchange');
                      },
                      on_failure => $cv,
                  );
              },
              on_failure => $cv,
              on_close   => sub {
                  my $method_frame = shift->method_frame;
                  die $method_frame->reply_code, $method_frame->reply_text;
              },
          );
      },
      on_failure => $cv,
      on_read_failure => sub { die @_ },
      on_return  => sub {
          my $frame = shift;
          die "Unable to deliver ", Dumper($frame);
      },
      on_close   => sub {
          my $why = shift;
          if (ref($why)) {
              my $method_frame = $why->method_frame;
              die $method_frame->reply_code, ": ", $method_frame->reply_text;
          }
          else {
              die $why;
          }
      },
  );

  print $cv->recv, "\n";

=head1 DESCRIPTION

AnyEvent::RabbitMQ is an AMQP(Advanced Message Queuing Protocol) client library, that is intended to allow you to interact with AMQP-compliant message brokers/servers such as RabbitMQ in an asynchronous fashion.

You can use AnyEvent::RabbitMQ to -

  * Declare and delete exchanges
  * Declare, delete, bind and unbind queues
  * Set QoS and confirm mode
  * Publish, consume, get, ack, recover and reject messages
  * Select, commit and rollback transactions

Most of these actions can be done through L<AnyEvent::RabbitMQ::Channel>.
Please see the documentation there for more details.

AnyEvent::RabbitMQ is known to work with RabbitMQ versions 2.5.1 and versions 0-8 and 0-9-1 of the AMQP specification.

This client is the non-blocking version, for a blocking version with a similar API, see L<Net::RabbitFoot>.

=head1 AUTHOR

Masahito Ikuta E<lt>cooldaemon@gmail.comE<gt>

=head1 MAINTAINER

Currently maintained by C<< <bobtfish@bobtfish.net> >> due to the original
author being missing in action.

=head1 COPYRIGHT

Copyright (c) 2010, the above named author(s).

=head1 SEE ALSO

=head1 LICENSE

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut



( run in 1.446 second using v1.01-cache-2.11-cpan-97f6503c9c8 )