AMQP
view release on metacpan or search on metacpan
lib/AMQP.pm view on Meta::CPAN
package AMQP;
our $VERSION = '0.01';
use Mojo::Base -base;
sub server {
my ($self,$url) = @_;
$url ||= ''; # incase we don't pass a url
$url =~ /amqp:\/\/
(?<username>[^:]+):
(?<password>[^@]+)@
(?<hostname>[^:\/]+):
(?<port>\d+)\/
(?<vhost>[^\/]*)
/x;
$self->host($+{'hostname'} || 'localhost');
lib/AMQP/Publisher.pm view on Meta::CPAN
has 'heartbeat' => 30;
has 'exchange' => 'log';
has 'type' => 'topic';
has 'key' => '#';
has 'rabbit';
has 'connection';
has 'channel';
has 'status';
has 'on_connect';
sub attach {
my $self = shift;
$self->status(AnyEvent->condvar);
$self->rabbit(AnyEvent::RabbitMQ->new);
$self->rabbit->load_xml_spec();
$self->rabbit->connect(
host => $self->host,
port => $self->port,
user => $self->user,
pass => $self->password,
vhost => $self->vhost,
timeout => $self->timeout,
tune => { heartbeat => $self->heartbeat },
on_success => sub {
say "Connected to amqp://" . $self->host . ":" . $self->port . $self->vhost if $self->debug;
$self->connection(shift);
$self->connection->open_channel(
on_failure => $self->status,
on_close => sub {
say "Channel closed" if $self->debug;
$self->status->send;
},
on_success => sub {
say "Opened channel" if $self->debug;
$self->channel(shift);
$self->on_connect->($self);
},
);
},
on_failure => $self->status,
on_read_failure => sub {
say "Failed to read" if $self->debug;
$self->status->send;
},
on_return => sub {
say "Failed to send" if $self->debug;
$self->status->send;
},
on_close => sub {
say "Connection closed" if $self->debug;
$self->status->send;
}
);
$self->status->recv;
}
sub send {
my ($self,$message) = @_;
$self->channel->send($message);
}
1;
__END__
=pod
lib/AMQP/Publisher.pm view on Meta::CPAN
AMQP::Publisher -- Publishes messages to an exchange.
=head1 SYNOPSIS
use AMQP::Publisher;
my $publisher = AMQP::Publisher->new;
$publisher->server('amqp://foo:bar@localhost:5672/testing');
$publisher->exchange('test');
$publisher->type('topic');
$publisher->queue('testing');
$publisher->on_connect( sub {
my ($self) = @_;
$self->channel->send('hello world');
});
$publisher->attach;
=head1 DESCRIPTION
The AMQP::Publisher publishes messages to an AMQP exchange
=head1 METHODS
lib/AMQP/Subscriber.pm view on Meta::CPAN
has 'type' => 'topic';
has 'key' => '#';
has 'queue' => 'test';
has 'rabbit';
has 'connection';
has 'channel';
has 'status';
has 'tag' => $ENV{LOGNAME} . "@" . hostname;
has 'on_message';
sub attach {
my $self = shift;
$self->useragent(Mojo::UserAgent->new);
$self->status(AnyEvent->condvar);
$self->rabbit(AnyEvent::RabbitMQ->new);
$self->rabbit->load_xml_spec();
$self->rabbit->connect(
host => $self->host,
port => $self->port,
username => $self->username,
pass => $self->password,
vhost => $self->vhost,
timeout => $self->timeout,
tune => { heartbeat => $self->heartbeat },
on_success => sub {
say "Connected to amqp://" . $self->host . ":" . $self->port . $self->vhost if $self->debug;
$self->connection(shift);
$self->connection->open_channel(
on_failure => $self->status,
on_close => sub {
say "Channel closed" if $self->debug;
$self->status->send;
},
on_success => sub {
say "Opened channel" if $self->debug;
$self->channel(shift);
$self->channel->declare_exchange(
exchange => $self->exchange,
type => $self->type,
auto_delete => 1,
on_failure => $self->status,
on_success => sub {
say "Declared exchange " . $self->exchange if $self->debug;
$self->channel->declare_queue(
queue => $self->queue,
auto_delete => 1,
on_failure => $self->status,
on_success => sub {
say "Declared queue " . $self->queue if $self->debug;
$self->channel->bind_queue(
queue => $self->queue,
exchange => $self->exchange,
routing_key => $self->key,
on_failure => $self->status,
on_success => sub {
say "Bound " . $self->queue . " to " . $self->exchange . " " . $self->key if $self->debug;
$self->channel->consume(
consumer_tag => $self->tag,
on_success => sub {
say 'Consuming from ' . $self->queue if $self->debug;
},
on_consume => sub {
my $msg = shift;
$self->on_message->($self,$msg);
},
on_cancel => sub {
say "Consumption canceled" if $self->debug;
$self->status->send;
},
on_failure => $self->status,
);
}
);
}
);
}
);
},
);
},
on_failure => $self->status,
on_read_failure => sub {
say "Failed to read" if $self->debug;
$self->status->send;
},
on_return => sub {
say "Failed to send" if $self->debug;
$self->status->send;
},
on_close => sub {
say "Connection closed" if $self->debug;
$self->status->send;
}
);
$self->status->recv;
}
1;
lib/AMQP/Subscriber.pm view on Meta::CPAN
AMQP::Subscriber -- Listens for messages on a queue and does stuff with them.
=head1 SYNOPSIS
use AMQP::Subscriber;
my $subscriber = AMQP::Subscriber->new;
$subscriber->server('amqp://foo:bar@localhost:5672/testing');
$subscriber->exchange('test');
$subscriber->type('topic');
$subscriber->queue('testing');
$subscriber->callback( sub {
my ($self,$message) = @_;
say $message;
});
$subscriber->attach;
=head1 DESCRIPTION
The AMQP::Subscriber wraps
=head1 METHODS
( run in 0.262 second using v1.01-cache-2.11-cpan-4d50c553e7e )