AMQP
view release on metacpan or search on metacpan
received the program in object code or executable form alone.)
Source code for a work means the preferred form of the work for making
modifications to it. For an executable file, complete source code means
all the source code for all modules it contains; but, as a special
exception, it need not include source code for modules which are standard
libraries that accompany the operating system on which the executable
file runs, or for standard header files or definitions files that
accompany that operating system.
4. You may not copy, modify, sublicense, distribute or transfer the
Program except as expressly provided under this General Public License.
Any attempt otherwise to copy, modify, sublicense, distribute or transfer
the Program is void, and will automatically terminate your rights to use
the Program under this License. However, parties who have received
copies, or rights to use copies, from you under this General Public
License will not have their licenses terminated so long as such parties
remain in full compliance.
5. By copying, distributing or modifying the Program (or any work based
on the Program) you indicate your acceptance of this license to do so,
and all its terms and conditions.
6. Each time you redistribute the Program (or any work based on the
Program), the recipient automatically receives a license from the original
licensor to copy, distribute or modify the Program subject to these
terms and conditions. You may not impose any further restrictions on the
recipients' exercise of the rights granted herein.
7. The Free Software Foundation may publish revised and/or new versions
of the General Public License from time to time. Such new versions will
be similar in spirit to the present version, but may differ in detail to
address new problems or concerns.
Each version is given a distinguishing version number. If the Program
specifies a version number of the license which applies to it and "any
may not charge a fee for this Package itself. However, you may distribute this
Package in aggregate with other (possibly commercial) programs as part of a
larger (possibly commercial) software distribution provided that you do not
advertise this Package as a product of your own.
6. The scripts and library files supplied as input to or produced as output
from the programs of this Package do not automatically fall under the copyright
of this Package, but belong to whomever generated them, and may be sold
commercially, and may be aggregated with this Package.
7. C or perl subroutines supplied by you and linked into this Package shall not
be considered part of this Package.
8. The name of the Copyright Holder may not be used to endorse or promote
products derived from this software without specific prior written permission.
9. THIS PACKAGE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
MERCHANTIBILITY AND FITNESS FOR A PARTICULAR PURPOSE.
The End
Makefile.PL
README
README.md
cpanfile
cpanfile.snapshot
dist.ini
lib/AMQP.pm
lib/AMQP/Publisher.pm
lib/AMQP/Subscriber.pm
t/publisher.t
t/subscriber.t
lib/AMQP.pm view on Meta::CPAN
package AMQP;
our $VERSION = '0.01';
use Mojo::Base -base;
sub server {
my ($self,$url) = @_;
$url ||= ''; # incase we don't pass a url
$url =~ /amqp:\/\/
(?<username>[^:]+):
(?<password>[^@]+)@
(?<hostname>[^:\/]+):
(?<port>\d+)\/
(?<vhost>[^\/]*)
/x;
$self->host($+{'hostname'} || 'localhost');
lib/AMQP/Publisher.pm view on Meta::CPAN
has 'heartbeat' => 30;
has 'exchange' => 'log';
has 'type' => 'topic';
has 'key' => '#';
has 'rabbit';
has 'connection';
has 'channel';
has 'status';
has 'on_connect';
sub attach {
my $self = shift;
$self->status(AnyEvent->condvar);
$self->rabbit(AnyEvent::RabbitMQ->new);
$self->rabbit->load_xml_spec();
$self->rabbit->connect(
host => $self->host,
port => $self->port,
user => $self->user,
pass => $self->password,
vhost => $self->vhost,
timeout => $self->timeout,
tune => { heartbeat => $self->heartbeat },
on_success => sub {
say "Connected to amqp://" . $self->host . ":" . $self->port . $self->vhost if $self->debug;
$self->connection(shift);
$self->connection->open_channel(
on_failure => $self->status,
on_close => sub {
say "Channel closed" if $self->debug;
$self->status->send;
},
on_success => sub {
say "Opened channel" if $self->debug;
$self->channel(shift);
$self->on_connect->($self);
},
);
},
on_failure => $self->status,
on_read_failure => sub {
say "Failed to read" if $self->debug;
$self->status->send;
},
on_return => sub {
say "Failed to send" if $self->debug;
$self->status->send;
},
on_close => sub {
say "Connection closed" if $self->debug;
$self->status->send;
}
);
$self->status->recv;
}
sub send {
my ($self,$message) = @_;
$self->channel->send($message);
}
1;
__END__
=pod
lib/AMQP/Publisher.pm view on Meta::CPAN
AMQP::Publisher -- Publishes messages to an exchange.
=head1 SYNOPSIS
use AMQP::Publisher;
my $publisher = AMQP::Publisher->new;
$publisher->server('amqp://foo:bar@localhost:5672/testing');
$publisher->exchange('test');
$publisher->type('topic');
$publisher->queue('testing');
$publisher->on_connect( sub {
my ($self) = @_;
$self->channel->send('hello world');
});
$publisher->attach;
=head1 DESCRIPTION
The AMQP::Publisher publishes messages to an AMQP exchange
=head1 METHODS
lib/AMQP/Subscriber.pm view on Meta::CPAN
has 'type' => 'topic';
has 'key' => '#';
has 'queue' => 'test';
has 'rabbit';
has 'connection';
has 'channel';
has 'status';
has 'tag' => $ENV{LOGNAME} . "@" . hostname;
has 'on_message';
sub attach {
my $self = shift;
$self->useragent(Mojo::UserAgent->new);
$self->status(AnyEvent->condvar);
$self->rabbit(AnyEvent::RabbitMQ->new);
$self->rabbit->load_xml_spec();
$self->rabbit->connect(
host => $self->host,
port => $self->port,
username => $self->username,
pass => $self->password,
vhost => $self->vhost,
timeout => $self->timeout,
tune => { heartbeat => $self->heartbeat },
on_success => sub {
say "Connected to amqp://" . $self->host . ":" . $self->port . $self->vhost if $self->debug;
$self->connection(shift);
$self->connection->open_channel(
on_failure => $self->status,
on_close => sub {
say "Channel closed" if $self->debug;
$self->status->send;
},
on_success => sub {
say "Opened channel" if $self->debug;
$self->channel(shift);
$self->channel->declare_exchange(
exchange => $self->exchange,
type => $self->type,
auto_delete => 1,
on_failure => $self->status,
on_success => sub {
say "Declared exchange " . $self->exchange if $self->debug;
$self->channel->declare_queue(
queue => $self->queue,
auto_delete => 1,
on_failure => $self->status,
on_success => sub {
say "Declared queue " . $self->queue if $self->debug;
$self->channel->bind_queue(
queue => $self->queue,
exchange => $self->exchange,
routing_key => $self->key,
on_failure => $self->status,
on_success => sub {
say "Bound " . $self->queue . " to " . $self->exchange . " " . $self->key if $self->debug;
$self->channel->consume(
consumer_tag => $self->tag,
on_success => sub {
say 'Consuming from ' . $self->queue if $self->debug;
},
on_consume => sub {
my $msg = shift;
$self->on_message->($self,$msg);
},
on_cancel => sub {
say "Consumption canceled" if $self->debug;
$self->status->send;
},
on_failure => $self->status,
);
}
);
}
);
}
);
},
);
},
on_failure => $self->status,
on_read_failure => sub {
say "Failed to read" if $self->debug;
$self->status->send;
},
on_return => sub {
say "Failed to send" if $self->debug;
$self->status->send;
},
on_close => sub {
say "Connection closed" if $self->debug;
$self->status->send;
}
);
$self->status->recv;
}
1;
lib/AMQP/Subscriber.pm view on Meta::CPAN
=pod
=head1 NAME
AMQP::Subscriber -- Listens for messages on a queue and does stuff with them.
=head1 SYNOPSIS
use AMQP::Subscriber;
my $subscriber = AMQP::Subscriber->new;
$subscriber->server('amqp://foo:bar@localhost:5672/testing');
$subscriber->exchange('test');
$subscriber->type('topic');
$subscriber->queue('testing');
$subscriber->callback( sub {
my ($self,$message) = @_;
say $message;
});
$subscriber->attach;
=head1 DESCRIPTION
The AMQP::Subscriber wraps
=head1 METHODS
B<new( \%params )> (constructor)
( run in 1.354 second using v1.01-cache-2.11-cpan-88abd93f124 )