Net-Async-AMQP

 view release on metacpan or  search on metacpan

lib/Net/Async/AMQP.pm  view on Meta::CPAN

	user => 'guest',
	pass => 'guest',
);

=head1 METHODS

=cut

=head2 configure

Set up variables. Takes the following optional named parameters:

=over 4

=item * heartbeat_interval - (optional) interval between heartbeat messages,
default is set by the L</HEARTBEAT_INTERVAL> constant

=item * max_channels - how many channels to allow on this connection,
default is defined by the L</MAX_CHANNELS> constant

=back

Returns the new instance.

=cut

sub configure {
	my ($self, %args) = @_;
	for (qw(heartbeat_interval max_channels)) {
		$self->{$_} = delete $args{$_} if exists $args{$_}
	}
	$self->SUPER::configure(%args)
}

=head2 bus

Event bus. Used for sharing global events such as connection closure.

=cut

sub bus { $_[0]->{bus} ||= Mixin::Event::Dispatch::Bus->new }

=head2 connect

Takes the following parameters:

=over 4

=item * port - the AMQP port, defaults to 5672, can be a service name if preferred

=item * host - host to connect to, defaults to localhost

=item * local_host - our local IP to connect from

=item * user - which user to connect as, defaults to guest

=item * pass - the password for this user, defaults to guest

=item * ssl - true if you want to connect over SSL

=item * SSL_* - SSL-specific parameters, see L<IO::Async::SSL> and L<IO::Socket::SSL> for details

=back

Returns $self.

=cut

sub connect {
	my $self = shift;
	my %args = @_;

	die 'no loop' unless my $loop = $self->loop;

	my $f = $self->loop->new_future;

	# Apply defaults
	$self->{$_} = $args{$_} //= $CONNECTION_DEFAULTS{$_} for keys %CONNECTION_DEFAULTS;

	# Remember our event callbacks so we can unsubscribe
	my $connected;
	my $close;

	# Clean up once we succeed/fail
	$f->on_ready(sub {
		$self->bus->unsubscribe_from_event(close => $close) if $close;
		$self->bus->unsubscribe_from_event(connected => $connected) if $connected;
		undef $close;
		undef $connected;
		undef $self;
		undef $f;
	});

	# One-shot event on connection
	$self->bus->subscribe_to_event(connected => $connected = sub {
		$f->done($self) unless $f->is_ready;
	});
	# Also pick up connection termination
	$self->bus->subscribe_to_event(close => $close = sub {
		$f->fail(connect => 'Remote closed connection') unless $f->is_ready;
	});

	# Support SSL connection
	require IO::Async::SSL if $args{ssl};
	my $method = $args{ssl} ? 'SSL_connect' : 'connect';
	$loop->$method(
		host     => $self->{host},
		# local_host can be used to send from a different source address,
		# sometimes useful for routing purposes or loadtesting
		(exists $args{local_host} ? (local_host => $args{local_host}) : ()),
		service  => $self->{port},
		socktype => 'stream',

		on_stream => $self->curry::on_stream(\%args),

		on_resolve_error => $f->curry::fail('resolve'),
		on_connect_error => $f->curry::fail('connect'),
		($args{ssl}
		? (on_ssl_error => $f->curry::fail('ssl'))
		: ()
		),



( run in 1.773 second using v1.01-cache-2.11-cpan-39bf76dae61 )