AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

          to RabbitMQ, as long as they all use the same spec file.

1.05    Tue Jul 22 16:55:55 2011
        - Fixed a compiling error.

1.04    Tue Jul 19 17:04:24 2011
        - Bug fix for consuming large messages.

1.03    Thu Apr  7 02:55:12 2011
        - Separate AnyEvent::RabbitMQ from Net::RabbitFoot.
        - Avoid (additional) issues when in global destruction.
        - Do not set reply_to to an empty string in the header frame.
        - Implement basic.reject (requires RabbitMQ >= 2.0.0).
        - Store server properties in the object for easy server product
          and sever version access.
        - Shutdown the AnyEvent handle using push_shutdown.
        - Be more careful in DESTROY blocks.

1.02    Wed Jun 30 11:35:32 2010
        - Fix errors in global destruction due to destruction order being
          random.
        - Fix bug if you call ->close on a Net::RabbitFoot instance which
          is not already connected. Previously this would never return.

1.01    Sun Mar 18 07:21:58 2010
        - fix bugs.
        - support channel.flow.

1.00    Fri Mar  5 11:30:00 2010
        - fix module name.

META.json  view on Meta::CPAN

   "version" : "1.22",
   "x_contributors" : [
      "Tom Doran <bobtfish@cpan.org>",
      "Nicolas R <atoomic@cpan.org>",
      "Dave Lambley <dlambley@cpan.org>",
      "Ruslan Zakirov <ruz@bestpractical.com>",
      "Masahito Ikuta <cooldaemon@gmail.com>",
      "Rod Taylor <rod.taylor@gmail.com>",
      "Carl H\u00f6rberg <carl@cloudamqp.com>",
      "Julio Polo <julio@hawaii.edu>",
      "A.J. Ragusa <aragusa@globalnoc.iu.edu>",
      "Jos\u00e9 Mic\u00f3",
      "Scott O'Neil <scott@cpanel.net>"
   ],
   "x_serialization_backend" : "Cpanel::JSON::XS version 3.0239"
}

META.yml  view on Meta::CPAN

version: '1.22'
x_contributors:
  - 'Tom Doran <bobtfish@cpan.org>'
  - 'Nicolas R <atoomic@cpan.org>'
  - 'Dave Lambley <dlambley@cpan.org>'
  - 'Ruslan Zakirov <ruz@bestpractical.com>'
  - 'Masahito Ikuta <cooldaemon@gmail.com>'
  - 'Rod Taylor <rod.taylor@gmail.com>'
  - 'Carl Hörberg <carl@cloudamqp.com>'
  - 'Julio Polo <julio@hawaii.edu>'
  - 'A.J. Ragusa <aragusa@globalnoc.iu.edu>'
  - 'José Micó'
  - "Scott O'Neil <scott@cpanel.net>"
x_serialization_backend: 'YAML::Tiny version 1.70'

dist.ini  view on Meta::CPAN


[Metadata]
x_contributors = Tom Doran <bobtfish@cpan.org>
x_contributors = Nicolas R <atoomic@cpan.org>
x_contributors = Dave Lambley <dlambley@cpan.org>
x_contributors = Ruslan Zakirov <ruz@bestpractical.com>
x_contributors = Masahito Ikuta <cooldaemon@gmail.com>
x_contributors = Rod Taylor <rod.taylor@gmail.com>
x_contributors = Carl Hörberg <carl@cloudamqp.com>
x_contributors = Julio Polo <julio@hawaii.edu>
x_contributors = A.J. Ragusa <aragusa@globalnoc.iu.edu>
x_contributors = José Micó
x_contributors = Scott O'Neil <scott@cpanel.net>

[InstallGuide]
[CPANFile]
[MetaJSON]

[OurPkgVersion]
underscore_eval_version = 1
no_critic = 1

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

            $self->{_login_user} = $args{user};
            $args{on_success}->($self);
        },
        $args{on_failure},
    );

    return $self;
}

sub close {
    return if in_global_destruction;
    my $self = shift;
    my %args = $self->_set_cbs(@_);

    if ($self->{_state} == _ST_CLOSED) {
        $args{on_success}->(@_);
        return $self;
    }
    if ($self->{_state} != _ST_OPEN) {
        $args{on_failure}->(($self->{_state} == _ST_OPENING ? "open" : "close") . " already in progress");
        return $self;

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN


    my $queue;
    if (!$id) {
        $queue = $self->{_queue};
    } elsif (defined $self->{_channels}->{$id}) {
        $queue = $self->{_channels}->{$id}->queue;
    } else {
        $failure_cb->('Unknown channel id: ' . $id);
    }

    return unless $queue; # Can go away in global destruction..
    $queue->get(sub {
        my $frame = shift;

        return $failure_cb->('Received data is not method frame')
            if !$frame->isa('Net::AMQP::Frame::Method');

        my $method_frame = $frame->method_frame;
        for my $exp_elem (@$exp) {
            return $cb->($frame)
                if $method_frame->isa('Net::AMQP::Protocol::' . $exp_elem);

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

    if ($output->isa('Net::AMQP::Protocol::Base')) {
        $output = $output->frame_wrap;
    }
    $output->channel($id || 0);

    if ($self->{verbose}) {
        warn '[C] --> [S] ', Dumper($output);
    }

    $self->{_handle}->push_write($output->to_raw_frame())
        if $self->{_handle}; # Careful - could have gone (global destruction)
    return;
}

sub _set_cbs {
    my $self = shift;
    my %args = @_;

    $args{on_success} ||= sub {};
    $args{on_failure} ||= sub { die @_ unless in_global_destruction };

    return %args;
}

sub _check_open {
    my $self = shift;
    my ($failure_cb) = @_;

    return 1 if $self->is_open;

lib/AnyEvent/RabbitMQ.pm  view on Meta::CPAN

        $self->{drain_timer} = AnyEvent->timer( after => $timeout, sub {
            $self->{drain_condvar}->croak("Timed out after $timeout");
        });
    }
    $self->{drain_condvar}->recv;
    delete $self->{drain_timer};
}

sub DESTROY {
    my $self = shift;
    $self->close() unless in_global_destruction;
    return;
}

1;
__END__

=head1 NAME

AnyEvent::RabbitMQ - An asynchronous and multi channel Perl AMQP client.

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Basic::Qos',
        {
            prefetch_count => 1,
            prefetch_size  => 0,
            global         => 0,
            %args,
        },
        'Basic::QosOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

    my $amin = $Net::AMQP::Protocol::VERSION_MINOR;

    return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;

    $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
    return 0;
}

sub DESTROY {
    my $self = shift;
    $self->close() if !in_global_destruction && $self->is_open();
    return;
}

1;
__END__

=head1 NAME

AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.

share/fixed_amqp0-8.xml  view on Meta::CPAN

      a message will only be sent in advance if both prefetch windows
      (and those at the channel and connection level) allow it.
      The prefetch-count is ignored if the no-ack option is set.
    </doc>
    <doc name = "rule" test = "amq_basic_18">
      The server MAY send less data in advance than allowed by the
      client's specified prefetch windows but it MUST NOT send more.
    </doc>
  </field>

  <field name = "global" type = "bit">
    apply to entire connection
    <doc>
      By default the QoS settings apply to the current channel only.  If
      this field is set, they are applied to the entire connection.
    </doc>
  </field>
</method>

<method name = "qos-ok" synchronous = "1" index = "11">
  confirm the requested qos

share/fixed_amqp0-8.xml  view on Meta::CPAN

      message will only be sent in advance if both prefetch windows
      (and those at the channel and connection level) allow it.
      The prefetch-count is ignored if the no-ack option is set.
    </doc>
    <doc name = "rule">
      The server MAY send less data in advance than allowed by the
      client's specified prefetch windows but it MUST NOT send more.
    </doc>
  </field>

  <field name = "global" type = "bit">
    apply to entire connection
    <doc>
      By default the QoS settings apply to the current channel only.  If
      this field is set, they are applied to the entire connection.
    </doc>
  </field>
</method>

<method name = "qos-ok" synchronous = "1" index = "11">
  confirm the requested qos

share/fixed_amqp0-8.xml  view on Meta::CPAN

      possible.
    </doc>
    <doc name = "rule">
      The server MAY ignore the prefetch values and consume rates,
      depending on the type of stream and the ability of the server
      to queue and/or reply it.  The server MAY drop low-priority
      messages in favour of high-priority messages.
    </doc>
  </field>

  <field name = "global" type = "bit">
    apply to entire connection
    <doc>
      By default the QoS settings apply to the current channel only.  If
      this field is set, they are applied to the entire connection.
    </doc>
  </field>
</method>

<method name = "qos-ok" synchronous = "1" index = "11">
  confirm the requested qos

share/fixed_amqp0-9-1.xml  view on Meta::CPAN

            specified prefetch windows but it MUST NOT send more.
          </doc>
          <doc type = "scenario">
            Define a QoS prefetch-size limit and a prefetch-count limit greater than
            one.  Send multiple messages that exceed the prefetch size.  Verify that
            no more than one message arrives at once.
          </doc>
        </rule>
      </field>

      <field name = "global" domain = "bit" label = "apply to entire connection">
        <doc>
          By default the QoS settings apply to the current channel only. If this field is
          set, they are applied to the entire connection.
        </doc>
      </field>
    </method>

    <method name = "qos-ok" synchronous = "1" index = "11" label = "confirm the requested qos">
      <doc>
        This method tells the client that the requested QoS levels could be handled by the



( run in 1.083 second using v1.01-cache-2.11-cpan-49f99fa48dc )