AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

sub get {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    my $no_ack = delete $args{no_ack} // 1;

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Basic::Get',
        {
            no_ack => $no_ack,
            %args, # queue
            ticket => 0,
        },
        [qw(Basic::GetOk Basic::GetEmpty)],
        sub {
            my $frame = shift;
            return $cb->({empty => $frame})
                if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
            $self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
        },
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub ack {
    my $self = shift;
    my %args = @_;

    return $self if !$self->_check_open(sub {});

    $self->{connection}->_push_write(
        Net::AMQP::Protocol::Basic::Ack->new(
            delivery_tag => 0,
            multiple     => (
                defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
            ),
            %args,
        ),
        $self->{id},
    );

    return $self;
}

sub qos {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);

    $self->{connection}->_push_write_and_read(
        'Basic::Qos',
        {
            prefetch_count => 1,
            prefetch_size  => 0,
            global         => 0,
            %args,
        },
        'Basic::QosOk',
        $cb,
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub confirm {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open($failure_cb);
    return $self if !$self->_check_version(0, 9, $failure_cb);

    weaken(my $wself = $self);

    $self->{connection}->_push_write_and_read(
        'Confirm::Select',
        {
            %args,
            nowait       => 0, # FIXME
        },
        'Confirm::SelectOk',
        sub {
            my $me = $wself or return;
            $me->{_is_confirm} = 1;
            $cb->();
        },
        $failure_cb,
        $self->{id},
    );

    return $self;
}

sub recover {
    my $self = shift;
    my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);

    return $self if !$self->_check_open(sub {});

    $self->{connection}->_push_write(
        Net::AMQP::Protocol::Basic::Recover->new(
            requeue => 1,
            %args,
        ),
        $self->{id},
    );

     if (!$args{nowait} && $self->_check_version(0, 9)) {
        $self->{connection}->_push_read_and_valid(
            'Basic::RecoverOk',
            $cb,
            $failure_cb,
            $self->{id},
        );

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN


        return $failure_cb->('Received data is not header frame')
            if !$frame->isa('Net::AMQP::Frame::Header');

        my $header_frame = $frame->header_frame;
        return $failure_cb->(
              'Header is not Protocol::Basic::ContentHeader'
            . 'Header was ' . ref $header_frame
        ) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');

        $response->{header} = $header_frame;

        $body_size = $frame->body_size;
        if ( $body_size ) {
            my $contq = $wcontq or return;
            $contq->get($body_frame);
        } else {
            $response->{body} = undef;
            $cb->($response);
        }
    });

    return $self;
}

sub _delete_cbs {
    my $self = shift;
    my %args = @_;

    my $cb         = delete $args{on_success} || sub {};
    my $failure_cb = delete $args{on_failure} || sub {die @_};

    return $cb, $failure_cb, %args;
}

sub _check_open {
    my $self = shift;
    my ($failure_cb) = @_;

    return 1 if $self->is_open();

    $failure_cb->('Channel has already been closed');
    return 0;
}

sub _check_version {
    my $self = shift;
    my ($major, $minor, $failure_cb) = @_;

    my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
    my $amin = $Net::AMQP::Protocol::VERSION_MINOR;

    return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;

    $failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
    return 0;
}

sub DESTROY {
    my $self = shift;
    $self->close() if !in_global_destruction && $self->is_open();
    return;
}

1;
__END__

=head1 NAME

AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.

=head1 SYNOPSIS

    my $ch = $rf->open_channel();
    $ch->declare_exchange(exchange => 'test_exchange');

=head1 DESCRIPTION

A RabbitMQ channel.

A channel is a light-weight virtual connection within a TCP connection to a
RabbitMQ broker.

=head1 ARGUMENTS FOR C<open_channel>

=over

=item on_close

Callback invoked when the channel closes.  Callback will be passed the
incoming message that caused the close, if any.

=item on_return

Callback invoked when a mandatory or immediate message publish fails.
Callback will be passed the incoming message, with accessors
C<method_frame>, C<header_frame>, and C<body_frame>.

=back

=head1 METHODS

=head2 declare_exchange (%args)

Declare an exchange (to publish messages to) on the server.

Arguments:

=over

=item on_success

=item on_failure

=item type

Default 'direct'

=item passive

Default 0



( run in 1.784 second using v1.01-cache-2.11-cpan-ceb78f64989 )