AnyEvent-RabbitMQ
view release on metacpan or search on metacpan
lib/AnyEvent/RabbitMQ/Channel.pm view on Meta::CPAN
return $self->{id};
}
sub is_open {
my $self = shift;
return $self->{_state} == _ST_OPEN;
}
sub is_active {
my $self = shift;
return $self->{_is_active};
}
sub is_confirm {
my $self = shift;
return $self->{_is_confirm};
}
sub queue {
my $self = shift;
return $self->{_queue};
}
sub open {
my $self = shift;
my %args = @_;
if ($self->{_state} != _ST_CLOSED) {
$args{on_failure}->('Channel has already been opened');
return $self;
}
$self->{_state} = _ST_OPENING;
$self->{connection}->_push_write_and_read(
'Channel::Open', {}, 'Channel::OpenOk',
sub {
$self->{_state} = _ST_OPEN;
$self->{_is_active} = 1;
$args{on_success}->($self);
},
sub {
$self->{_state} = _ST_CLOSED;
$args{on_failure}->($self);
},
$self->{id},
);
return $self;
}
sub close {
my $self = shift;
my $connection = $self->{connection}
or return;
my %args = $connection->_set_cbs(@_);
# If open in in progess, wait for it; 1s arbitrary timing.
weaken(my $wself = $self);
my $t; $t = AE::timer 0, 1, sub {
(my $self = $wself) or undef $t, return;
return if $self->{_state} == _ST_OPENING;
# No more tests are required
undef $t;
# Double close is OK
if ($self->{_state} == _ST_CLOSED) {
$args{on_success}->($self);
return;
}
$connection->_push_write(
$self->_close_frame,
$self->{id},
);
# The spec says that after a party sends Channel::Close, it MUST
# discard all frames for that channel. So this channel is dead
# immediately.
$self->_closed();
$connection->_push_read_and_valid(
'Channel::CloseOk',
sub {
$args{on_success}->($self);
$self->_orphan();
},
sub {
$args{on_failure}->(@_);
$self->_orphan();
},
$self->{id},
);
};
return $self;
}
sub _closed {
my $self = shift;
my ($frame,) = @_;
$frame ||= $self->_close_frame();
return if $self->{_state} == _ST_CLOSED;
$self->{_state} = _ST_CLOSED;
# Perform callbacks for all outstanding commands
$self->{_queue}->_flush($frame);
$self->{_content_queue}->_flush($frame);
# Fake nacks of all outstanding publishes
$_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };
# Report cancelation of all outstanding consumes
my @tags = keys %{ $self->{_consumer_cbs} };
$self->_canceled($_, $frame) for @tags;
# Report close to on_close callback
{ local $@;
( run in 3.664 seconds using v1.01-cache-2.11-cpan-98e64b0badf )