AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
sub get {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
my $no_ack = delete $args{no_ack} // 1;
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Basic::Get',
{
no_ack => $no_ack,
%args, # queue
ticket => 0,
},
[qw(Basic::GetOk Basic::GetEmpty)],
sub {
my $frame = shift;
return $cb->({empty => $frame})
if $frame->method_frame->isa('Net::AMQP::Protocol::Basic::GetEmpty');
$self->_push_read_header_and_body('ok', $frame, $cb, $failure_cb);
},
$failure_cb,
$self->{id},
);
return $self;
}
sub ack {
my $self = shift;
my %args = @_;
return $self if !$self->_check_open(sub {});
$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Ack->new(
delivery_tag => 0,
multiple => (
defined $args{delivery_tag} && $args{delivery_tag} != 0 ? 0 : 1
),
%args,
),
$self->{id},
);
return $self;
}
sub qos {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
$self->{connection}->_push_write_and_read(
'Basic::Qos',
{
prefetch_count => 1,
prefetch_size => 0,
global => 0,
%args,
},
'Basic::QosOk',
$cb,
$failure_cb,
$self->{id},
);
return $self;
}
sub confirm {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open($failure_cb);
return $self if !$self->_check_version(0, 9, $failure_cb);
weaken(my $wself = $self);
$self->{connection}->_push_write_and_read(
'Confirm::Select',
{
%args,
nowait => 0, # FIXME
},
'Confirm::SelectOk',
sub {
my $me = $wself or return;
$me->{_is_confirm} = 1;
$cb->();
},
$failure_cb,
$self->{id},
);
return $self;
}
sub recover {
my $self = shift;
my ($cb, $failure_cb, %args) = $self->_delete_cbs(@_);
return $self if !$self->_check_open(sub {});
$self->{connection}->_push_write(
Net::AMQP::Protocol::Basic::Recover->new(
requeue => 1,
%args,
),
$self->{id},
);
if (!$args{nowait} && $self->_check_version(0, 9)) {
$self->{connection}->_push_read_and_valid(
'Basic::RecoverOk',
$cb,
$failure_cb,
$self->{id},
);
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
return $failure_cb->('Received data is not header frame')
if !$frame->isa('Net::AMQP::Frame::Header');
my $header_frame = $frame->header_frame;
return $failure_cb->(
'Header is not Protocol::Basic::ContentHeader'
. 'Header was ' . ref $header_frame
) if !$header_frame->isa('Net::AMQP::Protocol::Basic::ContentHeader');
$response->{header} = $header_frame;
$body_size = $frame->body_size;
if ( $body_size ) {
my $contq = $wcontq or return;
$contq->get($body_frame);
} else {
$response->{body} = undef;
$cb->($response);
}
});
return $self;
}
sub _delete_cbs {
my $self = shift;
my %args = @_;
my $cb = delete $args{on_success} || sub {};
my $failure_cb = delete $args{on_failure} || sub {die @_};
return $cb, $failure_cb, %args;
}
sub _check_open {
my $self = shift;
my ($failure_cb) = @_;
return 1 if $self->is_open();
$failure_cb->('Channel has already been closed');
return 0;
}
sub _check_version {
my $self = shift;
my ($major, $minor, $failure_cb) = @_;
my $amaj = $Net::AMQP::Protocol::VERSION_MAJOR;
my $amin = $Net::AMQP::Protocol::VERSION_MINOR;
return 1 if $amaj >= $major || $amaj == $major && $amin >= $minor;
$failure_cb->("Not supported in AMQP $amaj-$amin") if $failure_cb;
return 0;
}
sub DESTROY {
my $self = shift;
$self->close() if !in_global_destruction && $self->is_open();
return;
}
1;
__END__
=head1 NAME
AnyEvent::RabbitMQ::Channel - Abstraction of an AMQP channel.
=head1 SYNOPSIS
my $ch = $rf->open_channel();
$ch->declare_exchange(exchange => 'test_exchange');
=head1 DESCRIPTION
A RabbitMQ channel.
A channel is a light-weight virtual connection within a TCP connection to a
RabbitMQ broker.
=head1 ARGUMENTS FOR C<open_channel>
=over
=item on_close
Callback invoked when the channel closes. Callback will be passed the
incoming message that caused the close, if any.
=item on_return
Callback invoked when a mandatory or immediate message publish fails.
Callback will be passed the incoming message, with accessors
C<method_frame>, C<header_frame>, and C<body_frame>.
=back
=head1 METHODS
=head2 declare_exchange (%args)
Declare an exchange (to publish messages to) on the server.
Arguments:
=over
=item on_success
=item on_failure
=item type
Default 'direct'
=item passive
Default 0
( run in 1.784 second using v1.01-cache-2.11-cpan-ceb78f64989 )