AMQP

 view release on metacpan or  search on metacpan

lib/AMQP.pm  view on Meta::CPAN


package AMQP;
our $VERSION = '0.01';

use Mojo::Base -base;

sub server {
	my ($self,$url) = @_;
	$url ||= '';			# incase we don't pass a url
	$url =~ /amqp:\/\/
		(?<username>[^:]+):
		(?<password>[^@]+)@
		(?<hostname>[^:\/]+):
		(?<port>\d+)\/
		(?<vhost>[^\/]*)
	/x;
	$self->host($+{'hostname'} || 'localhost');

lib/AMQP/Publisher.pm  view on Meta::CPAN

has 'heartbeat' => 30;
has 'exchange' => 'log';
has 'type' => 'topic';
has 'key' => '#';
has 'rabbit';
has 'connection';
has 'channel';
has 'status';
has 'on_connect';

sub attach {
	my $self = shift;
	$self->status(AnyEvent->condvar);
	$self->rabbit(AnyEvent::RabbitMQ->new);
	$self->rabbit->load_xml_spec();
	$self->rabbit->connect(
		host => $self->host,
		port => $self->port,
		user => $self->user,
		pass => $self->password,
		vhost => $self->vhost,
		timeout => $self->timeout,
		tune => { heartbeat => $self->heartbeat },
		on_success => sub {
			say "Connected to amqp://" . $self->host . ":" . $self->port . $self->vhost if $self->debug;
			$self->connection(shift);
			$self->connection->open_channel(
				on_failure => $self->status,
				on_close => sub {
					say "Channel closed" if $self->debug;
					$self->status->send;
				},
				on_success => sub {
					say "Opened channel" if $self->debug;
					$self->channel(shift);
					$self->on_connect->($self);
				},
			);
		},
		on_failure => $self->status,
		on_read_failure =>  sub {
			say "Failed to read" if $self->debug;
			$self->status->send;
		},
		on_return => sub {
			say "Failed to send" if $self->debug;
			$self->status->send;
		},
		on_close => sub {
			say "Connection closed" if $self->debug;
			$self->status->send;
		}
	);
	$self->status->recv;
}

sub send {
	my ($self,$message) = @_;
	$self->channel->send($message);
}

1;

__END__

=pod

lib/AMQP/Publisher.pm  view on Meta::CPAN

AMQP::Publisher -- Publishes messages to an exchange.

=head1 SYNOPSIS
  
 use AMQP::Publisher;
 my $publisher = AMQP::Publisher->new;
 $publisher->server('amqp://foo:bar@localhost:5672/testing');
 $publisher->exchange('test');
 $publisher->type('topic');
 $publisher->queue('testing');
 $publisher->on_connect( sub {
 	my ($self) = @_;
	$self->channel->send('hello world');
 });
 $publisher->attach;

=head1 DESCRIPTION

The AMQP::Publisher publishes messages to an AMQP exchange

=head1 METHODS

lib/AMQP/Subscriber.pm  view on Meta::CPAN

has 'type' => 'topic';
has 'key' => '#';
has 'queue' => 'test';
has 'rabbit';
has 'connection';
has 'channel';
has 'status';
has 'tag' => $ENV{LOGNAME} . "@" . hostname;
has 'on_message';

sub attach {
	my $self = shift;
	$self->useragent(Mojo::UserAgent->new);
	$self->status(AnyEvent->condvar);
	$self->rabbit(AnyEvent::RabbitMQ->new);
	$self->rabbit->load_xml_spec();
	$self->rabbit->connect(
		host => $self->host,
		port => $self->port,
		username => $self->username,
		pass => $self->password,
		vhost => $self->vhost,
		timeout => $self->timeout,
		tune => { heartbeat => $self->heartbeat },
		on_success => sub {
			say "Connected to amqp://" . $self->host . ":" . $self->port . $self->vhost if $self->debug;
			$self->connection(shift);
			$self->connection->open_channel(
				on_failure => $self->status,
				on_close => sub {
					say "Channel closed" if $self->debug;
					$self->status->send;
				},
				on_success => sub {
					say "Opened channel" if $self->debug;
					$self->channel(shift);
					$self->channel->declare_exchange(
						exchange => $self->exchange,
						type => $self->type,
						auto_delete => 1,
						on_failure => $self->status,
						on_success => sub {
							say "Declared exchange " . $self->exchange if $self->debug;
							$self->channel->declare_queue(
								queue => $self->queue,
								auto_delete => 1,
								on_failure => $self->status,
								on_success => sub {
									say "Declared queue " . $self->queue if $self->debug;
									$self->channel->bind_queue(
										queue => $self->queue,
										exchange => $self->exchange,
										routing_key => $self->key,
										on_failure => $self->status,
										on_success => sub {
											say "Bound " . $self->queue . " to " . $self->exchange . " " . $self->key if $self->debug;
											$self->channel->consume(
												consumer_tag => $self->tag,
												on_success => sub {
													say 'Consuming from ' . $self->queue if $self->debug;
												},
												on_consume => sub {
													my $msg = shift;
													$self->on_message->($self,$msg);
												},
												on_cancel => sub {
													say "Consumption canceled" if $self->debug;
													$self->status->send;
												},
												on_failure => $self->status,
											);
										}
									);
								}
							);
						}
					);
				},
			);
		},
		on_failure => $self->status,
		on_read_failure =>  sub {
			say "Failed to read" if $self->debug;
			$self->status->send;
		},
		on_return => sub {
			say "Failed to send" if $self->debug;
			$self->status->send;
		},
		on_close => sub {
			say "Connection closed" if $self->debug;
			$self->status->send;
		}
	);
	$self->status->recv;
}
		

1;

lib/AMQP/Subscriber.pm  view on Meta::CPAN

AMQP::Subscriber -- Listens for messages on a queue and does stuff with them.

=head1 SYNOPSIS
  
 use AMQP::Subscriber;
 my $subscriber = AMQP::Subscriber->new;
 $subscriber->server('amqp://foo:bar@localhost:5672/testing');
 $subscriber->exchange('test');
 $subscriber->type('topic');
 $subscriber->queue('testing');
 $subscriber->callback( sub {
 	my ($self,$message) = @_;
	say $message;
 });
 $subscriber->attach;

=head1 DESCRIPTION

The AMQP::Subscriber wraps 

=head1 METHODS



( run in 0.262 second using v1.01-cache-2.11-cpan-4d50c553e7e )