AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


    return $self;
}

sub _start_heartbeat {
    my ($self, $interval, %args,) = @_;

    my $close_cb   = $args{on_close};
    my $failure_cb = $args{on_read_failure};
    my $last_recv = 0;
    my $idle_cycles = 0;
    weaken(my $weak_self = $self);
    my $timer_cb = sub {
        my $self = $weak_self or return;
        if ($self->{_heartbeat_recv} != $last_recv) {
            $last_recv = $self->{_heartbeat_recv};
            $idle_cycles = 0;
        }
        elsif (++$idle_cycles > 1) {
            delete $self->{_heartbeat_timer};
            $failure_cb->("Heartbeat lost");
            $self->_server_closed($close_cb, "Heartbeat lost");
            return;
        }
        $self->_push_write(Net::AMQP::Frame::Heartbeat->new());
    };

    $self->{_heartbeat_recv} = time;
    $self->{_heartbeat_timer} = AnyEvent->timer(
        after    => $interval,
        interval => $interval,
        cb       => $timer_cb,
    );

    return $self;
}

sub _open {
    my $self = shift;
    my %args = @_;

    $self->_push_write_and_read(
        'Connection::Open',
        {
            virtual_host => $args{vhost},
            insist       => 1,
        },
        'Connection::OpenOk',
        sub {
            $self->{_state} = _ST_OPEN;
            $self->{_login_user} = $args{user};
            $args{on_success}->($self);
        },
        $args{on_failure},
    );

    return $self;
}

sub close {
    return if in_global_destruction;
    my $self = shift;
    my %args = $self->_set_cbs(@_);

    if ($self->{_state} == _ST_CLOSED) {
        $args{on_success}->(@_);
        return $self;
    }
    if ($self->{_state} != _ST_OPEN) {
        $args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
        return $self;
    }
    $self->{_state} = _ST_CLOSING;

    my $cv = AE::cv {
        delete $self->{_closing};
        $self->_finish_close(%args);
    };

    $cv->begin();

    my @ids = keys %{$self->{_channels}};
    for my $id (@ids) {
         my $channel = $self->{_channels}->{$id};
         if ($channel->is_open) {
             $cv->begin();
             $channel->close(
                 on_success => sub { $cv->end() },
                 on_failure => sub { $cv->end() },
             );
         }
    }

    $cv->end();

    return $self;
}

sub _finish_close {
    my $self = shift;
    my %args = @_;

    if (my @ch = map { $_->id } grep { defined() && $_->is_open } values %{$self->{_channels}}) {
        $args{on_failure}->("BUG: closing with channel(s) open: @ch");
        return;
    }

    $self->{_state} = _ST_CLOSED;

    $self->_push_write_and_read(
        'Connection::Close', {}, 'Connection::CloseOk',
        sub {
            # circular ref ok
            $self->{_handle}->push_shutdown;
            $args{on_success}->(@_);
        },
        sub {
            # circular ref ok
            $self->{_handle}->push_shutdown;
            $args{on_failure}->(@_);
        },

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

                last;
            }
        }
        if (!$id) {
            $args{on_failure}->('Ran out of channel ids');
            return $self;
        }
        $self->{_last_chan_id} = $id;
    }

    my $channel = AnyEvent::RabbitMQ::Channel->new(
        id         => $id,
        connection => $self,
        on_close   => $args{on_close},
    );

    $self->{_channels}->{$id} = $channel;

    $channel->open(
        on_success => sub {
            $args{on_success}->($channel);
        },
        on_failure => sub {
            $self->_delete_channel($channel);
            $args{on_failure}->(@_);
        },
    );

    return $self;
}

sub _push_write_and_read {
    my $self = shift;
    my ($method, $args, $exp, $cb, $failure_cb, $id,) = @_;

    $method = 'Net::AMQP::Protocol::' . $method;
    $self->_push_write(
        Net::AMQP::Frame::Method->new(
            method_frame => $method->new(%$args)
        ),
        $id,
    );

    return $self->_push_read_and_valid($exp, $cb, $failure_cb, $id,);
}

sub _push_read_and_valid {
    my $self = shift;
    my ($exp, $cb, $failure_cb, $id,) = @_;
    $exp = ref($exp) eq 'ARRAY' ? $exp : [$exp];

    my $queue;
    if (!$id) {
        $queue = $self->{_queue};
    } elsif (defined $self->{_channels}->{$id}) {
        $queue = $self->{_channels}->{$id}->queue;
    } else {
        $failure_cb->('Unknown channel id: ' . $id);
    }

    return unless $queue; # Can go away in global destruction..
    $queue->get(sub {
        my $frame = shift;

        return $failure_cb->('Received data is not method frame')
            if !$frame->isa('Net::AMQP::Frame::Method');

        my $method_frame = $frame->method_frame;
        for my $exp_elem (@$exp) {
            return $cb->($frame)
                if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);
        }

        $failure_cb->(
            $method_frame->isa('Net::AMQP::Protocol::Channel::Close')
              ? 'Channel closed'
              : 'Expected ' . join(',', @$exp) . ' but got ' . ref($method_frame)
        );
    });
}

sub _push_write {
    my $self = shift;
    my ($output, $id,) = @_;

    if ($output->isa('Net::AMQP::Protocol::Base')) {
        $output = $output->frame_wrap;
    }
    $output->channel($id || 0);

    if ($self->{verbose}) {
        warn '[C] --> [S] ', Dumper($output);
    }

    $self->{_handle}->push_write($output->to_raw_frame())
        if $self->{_handle}; # Careful - could have gone (global destruction)
    return;
}

sub _set_cbs {
    my $self = shift;
    my %args = @_;

    $args{on_success} ||= sub {};
    $args{on_failure} ||= sub { die @_ unless in_global_destruction };

    return %args;
}

sub _check_open {
    my $self = shift;
    my ($failure_cb) = @_;

    return 1 if $self->is_open;

    $failure_cb->('Connection has already been closed');
    return 0;
}

sub drain_writes {
    my ($self, $timeout) = shift;
    $self->{drain_condvar} = AnyEvent->condvar;
    if ($timeout) {
        $self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
            $self->{drain_condvar}->croak("Timed out after $timeout");
        });
    }
    $self->{drain_condvar}->recv;
    delete $self->{drain_timer};
}

sub DESTROY {
    my $self = shift;
    $self->close() unless in_global_destruction;
    return;
}

1;
__END__

=head1 NAME

AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.

=head1 SYNOPSIS

  use AnyEvent::RabbitMQ;

  my $cv = AnyEvent->condvar;

  my $ar = AnyEvent::RabbitMQ->new->load_xml_spec()->connect(
      host       => 'localhost',
      port       => 5672,
      user       => 'guest',
      pass       => 'guest',
      vhost      => '/',
      timeout    => 1,
      tls        => 0, # Or 1 if you'd like SSL
      tls_ctx    => $anyevent_tls # or a hash of AnyEvent::TLS options.
      tune       => { heartbeat => 30, channel_max => $whatever, frame_max = $whatever },
      nodelay    => 1, # Reduces latency by disabling Nagle's algorithm
      on_success => sub {
          my $ar = shift;
          $ar->open_channel(
              on_success => sub {
                  my $channel = shift;
                  $channel->declare_exchange(
                      exchange   => 'test_exchange',
                      on_success => sub {
                          $cv->send('Declared exchange');
                      },
                      on_failure => $cv,
                  );
              },
              on_failure => $cv,
              on_close   => sub {
                  my $method_frame = shift->method_frame;
                  die $method_frame->reply_code, $method_frame->reply_text;
              },
          );
      },
      on_failure => $cv,
      on_read_failure => sub { die @_ },
      on_return  => sub {
          my $frame = shift;
          die "Unable to deliver ", Dumper($frame);
      },
      on_close   => sub {
          my $why = shift;
          if (ref($why)) {
              my $method_frame = $why->method_frame;
              die $method_frame->reply_code, ": ", $method_frame->reply_text;
          }
          else {



( run in 1.414 second using v1.01-cache-2.11-cpan-5b529ec07f3 )