AMQP

 view release on metacpan or  search on metacpan

lib/AMQP/Publisher.pm  view on Meta::CPAN

package AMQP::Publisher;
our $VERSION = '0.01';

use Mojo::Base 'AMQP';
use AnyEvent::RabbitMQ;
use Sys::Hostname;

has 'debug' => 1;
has 'host' => 'localhost';
has 'port' => 5672;
has 'user' => 'guest';
has 'password' => 'guest';
has 'vhost' => '/';
has 'timeout' => 1;
has 'heartbeat' => 30;
has 'exchange' => 'log';
has 'type' => 'topic';
has 'key' => '#';
has 'rabbit';
has 'connection';
has 'channel';
has 'status';
has 'on_connect';

sub attach {
	my $self = shift;
	$self->status(AnyEvent->condvar);
	$self->rabbit(AnyEvent::RabbitMQ->new);
	$self->rabbit->load_xml_spec();
	$self->rabbit->connect(
		host => $self->host,
		port => $self->port,
		user => $self->user,
		pass => $self->password,
		vhost => $self->vhost,
		timeout => $self->timeout,
		tune => { heartbeat => $self->heartbeat },
		on_success => sub {
			say "Connected to amqp://" . $self->host . ":" . $self->port . $self->vhost if $self->debug;
			$self->connection(shift);
			$self->connection->open_channel(
				on_failure => $self->status,
				on_close => sub {
					say "Channel closed" if $self->debug;
					$self->status->send;
				},
				on_success => sub {
					say "Opened channel" if $self->debug;
					$self->channel(shift);
					$self->on_connect->($self);
				},
			);
		},
		on_failure => $self->status,
		on_read_failure =>  sub {
			say "Failed to read" if $self->debug;
			$self->status->send;
		},
		on_return => sub {
			say "Failed to send" if $self->debug;
			$self->status->send;
		},
		on_close => sub {
			say "Connection closed" if $self->debug;
			$self->status->send;
		}
	);
	$self->status->recv;
}

sub send {
	my ($self,$message) = @_;
	$self->channel->send($message);
}

1;



( run in 1.101 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )