Net-Async-AMQP
view release on metacpan or search on metacpan
lib/Net/Async/AMQP.pm view on Meta::CPAN
user => 'guest',
pass => 'guest',
);
=head1 METHODS
=cut
=head2 configure
Set up variables. Takes the following optional named parameters:
=over 4
=item * heartbeat_interval - (optional) interval between heartbeat messages,
default is set by the L</HEARTBEAT_INTERVAL> constant
=item * max_channels - how many channels to allow on this connection,
default is defined by the L</MAX_CHANNELS> constant
=back
Returns the new instance.
=cut
sub configure {
my ($self, %args) = @_;
for (qw(heartbeat_interval max_channels)) {
$self->{$_} = delete $args{$_} if exists $args{$_}
}
$self->SUPER::configure(%args)
}
=head2 bus
Event bus. Used for sharing global events such as connection closure.
=cut
sub bus { $_[0]->{bus} ||= Mixin::Event::Dispatch::Bus->new }
=head2 connect
Takes the following parameters:
=over 4
=item * port - the AMQP port, defaults to 5672, can be a service name if preferred
=item * host - host to connect to, defaults to localhost
=item * local_host - our local IP to connect from
=item * user - which user to connect as, defaults to guest
=item * pass - the password for this user, defaults to guest
=item * ssl - true if you want to connect over SSL
=item * SSL_* - SSL-specific parameters, see L<IO::Async::SSL> and L<IO::Socket::SSL> for details
=back
Returns $self.
=cut
sub connect {
my $self = shift;
my %args = @_;
die 'no loop' unless my $loop = $self->loop;
my $f = $self->loop->new_future;
# Apply defaults
$self->{$_} = $args{$_} //= $CONNECTION_DEFAULTS{$_} for keys %CONNECTION_DEFAULTS;
# Remember our event callbacks so we can unsubscribe
my $connected;
my $close;
# Clean up once we succeed/fail
$f->on_ready(sub {
$self->bus->unsubscribe_from_event(close => $close) if $close;
$self->bus->unsubscribe_from_event(connected => $connected) if $connected;
undef $close;
undef $connected;
undef $self;
undef $f;
});
# One-shot event on connection
$self->bus->subscribe_to_event(connected => $connected = sub {
$f->done($self) unless $f->is_ready;
});
# Also pick up connection termination
$self->bus->subscribe_to_event(close => $close = sub {
$f->fail(connect => 'Remote closed connection') unless $f->is_ready;
});
# Support SSL connection
require IO::Async::SSL if $args{ssl};
my $method = $args{ssl} ? 'SSL_connect' : 'connect';
$loop->$method(
host => $self->{host},
# local_host can be used to send from a different source address,
# sometimes useful for routing purposes or loadtesting
(exists $args{local_host} ? (local_host => $args{local_host}) : ()),
service => $self->{port},
socktype => 'stream',
on_stream => $self->curry::on_stream(\%args),
on_resolve_error => $f->curry::fail('resolve'),
on_connect_error => $f->curry::fail('connect'),
($args{ssl}
? (on_ssl_error => $f->curry::fail('ssl'))
: ()
),
( run in 1.773 second using v1.01-cache-2.11-cpan-39bf76dae61 )