AnyEvent-RabbitMQ

 view release on metacpan or  search on metacpan

lib/AnyEvent/RabbitMQ/Channel.pm  view on Meta::CPAN

    return $self->{id};
}

sub is_open {
    my $self = shift;
    return $self->{_state} == _ST_OPEN;
}

sub is_active {
    my $self = shift;
    return $self->{_is_active};
}

sub is_confirm {
    my $self = shift;
    return $self->{_is_confirm};
}

sub queue {
    my $self = shift;
    return $self->{_queue};
}

sub open {
    my $self = shift;
    my %args = @_;

    if ($self->{_state} != _ST_CLOSED) {
        $args{on_failure}->('Channel has already been opened');
        return $self;
    }

    $self->{_state} = _ST_OPENING;

    $self->{connection}->_push_write_and_read(
        'Channel::Open', {}, 'Channel::OpenOk',
        sub {
            $self->{_state} = _ST_OPEN;
            $self->{_is_active} = 1;
            $args{on_success}->($self);
        },
        sub {
	    $self->{_state} = _ST_CLOSED;
            $args{on_failure}->($self);
        },
        $self->{id},
    );

    return $self;
}

sub close {
    my $self = shift;
    my $connection = $self->{connection}
        or return;
    my %args = $connection->_set_cbs(@_);

    # If open in in progess, wait for it; 1s arbitrary timing.

    weaken(my $wself = $self);
    my $t; $t = AE::timer 0, 1, sub {
	(my $self = $wself) or undef $t, return;
	return if $self->{_state} == _ST_OPENING;

	# No more tests are required
	undef $t;

        # Double close is OK
	if ($self->{_state} == _ST_CLOSED) {
	    $args{on_success}->($self);
            return;
        }

        $connection->_push_write(
            $self->_close_frame,
            $self->{id},
        );

        # The spec says that after a party sends Channel::Close, it MUST
        # discard all frames for that channel.  So this channel is dead
        # immediately.
        $self->_closed();

        $connection->_push_read_and_valid(
            'Channel::CloseOk',
            sub {
                $args{on_success}->($self);
                $self->_orphan();
            },
            sub {
                $args{on_failure}->(@_);
                $self->_orphan();
            },
            $self->{id},
        );
    };

    return $self;
}

sub _closed {
    my $self = shift;
    my ($frame,) = @_;
    $frame ||= $self->_close_frame();

    return if $self->{_state} == _ST_CLOSED;
    $self->{_state} = _ST_CLOSED;

    # Perform callbacks for all outstanding commands
    $self->{_queue}->_flush($frame);
    $self->{_content_queue}->_flush($frame);

    # Fake nacks of all outstanding publishes
    $_->($frame) for grep { defined } map { $_->[1] } values %{ $self->{_publish_cbs} };

    # Report cancelation of all outstanding consumes
    my @tags = keys %{ $self->{_consumer_cbs} };
    $self->_canceled($_, $frame) for @tags;

    # Report close to on_close callback
    { local $@;



( run in 3.664 seconds using v1.01-cache-2.11-cpan-98e64b0badf )