Net-AMQP-RabbitMQ-PP
view release on metacpan or search on metacpan
lib/Net/AMQP/RabbitMQ/PP.pm view on Meta::CPAN
package Net::AMQP::RabbitMQ::PP;
use strict;
use warnings;
our $VERSION = '0.11';
use Carp;
use Cwd;
use English qw(-no_match_vars);
use File::ShareDir;
use IO::Select;
use IO::Socket::INET;
use Socket qw( IPPROTO_TCP );
use List::MoreUtils;
use Net::AMQP;
use Sys::Hostname;
use Try::Tiny;
use Time::HiRes;
use constant HAS_TLS => eval { require IO::Socket::SSL; 1 };
sub new {
my ( $class, %parameters ) = @_;
if( ! %Net::AMQP::Protocol::spec ) {
Net::AMQP::Protocol->load_xml_spec(
File::ShareDir::dist_file(
'Net-AMQP-RabbitMQ-PP',
'amqp0-9-1.extended.xml'
)
);
}
my $self = bless {}, ref $class || $class;
return $self;
}
sub connect {
my ( $self, %args ) = @_;
try {
local $SIG{ALRM} = sub {
Carp::croak 'Timed out';
};
if( $args{timeout} ) {
Time::HiRes::alarm( $args{timeout} );
}
my $connection_class = "IO::Socket::INET";
my %connection_args;
if ( $args{secure} ) {
die "IO::Socket::SSL is required for secure connections"
if ! HAS_TLS;
$connection_class = "IO::Socket::SSL";
my @ssl_args = grep { /^SSL_/ } sort keys %args;
@connection_args{ @ssl_args } = @args{ @ssl_args };
}
$self->_set_handle(
$connection_class->new(
PeerAddr => $args{host} || 'localhost',
PeerPort => $args{port} || ( $args{secure} ? 5671 : 5672 ),
( ! $args{secure} ? ( Proto => 'tcp' ) : () ),
( $args{socket_timeout} ? ( Timeout => $args{socket_timeout} ) : () ),
%connection_args,
) or Carp::croak "Could not connect: $EVAL_ERROR"
);
$self->_select( IO::Select->new( $self->_get_handle ) );
if( $args{timeout} ) {
Time::HiRes::alarm( 0 );
}
}
catch {
Carp::croak $_;
};
$self->_get_handle->autoflush( 1 );
my $password = $args{password} || 'guest';
my $username = $args{username} || 'guest';
my $virtualhost = $args{virtual_host} || '/';
my $frame_max = $args{frame_max} || 0;
my $heartbeat = $args{heartbeat} || 0;
# Backlog of messages.
$self->_backlog( [] );
$self->_startup(
username => $username,
password => $password,
virtual_host => $virtualhost,
frame_max => $frame_max,
heartbeat => $heartbeat,
);
return $self;
}
sub set_keepalive {
my ( $self, %args ) = @_;
my $handle = $self->_get_handle;
my $idle = $args{idle};
my $count = $args{count};
my $interval = $args{interval};
if( eval { require Socket::Linux } ) {
# Turn on keep alive probes.
defined $handle->sockopt( SO_KEEPALIVE, 1 )
or Carp::croak "Could not turn on tcp keep alive: $OS_ERROR";
# Time between last meaningful packet and first keep alive
lib/Net/AMQP/RabbitMQ/PP.pm view on Meta::CPAN
),
);
}
1;
__END__
=head1 NAME
Net::AMQP::RabbitMQ::PP - Pure perl AMQP client for RabbitMQ
=head1 SYNOPSIS
use Net::AMQP::RabbitMQ::PP;
my $connection = Net::AMQP::RabbitMQ::PP->new();
$connection->connect;
$connection->basic_publish(
payload => "Foo",
routing_key => "foo.bar",
);
$connection->disconnect
=head1 DESCRIPTION
Like L<Net::RabbitMQ> but pure perl rather than a wrapper around librabbitmq.
=head1 VERSION
0.11
=head1 SUBROUTINES/METHODS
A list of methods with their default arguments (undef = no default)
=head2 new
Loads the AMQP protocol definition, primarily. Will not be an active
connection until ->connect is called.
my $mq = Net::AMQP::RabbitMQ::PP->new;
=head2 connect
Connect to the server. Default arguments are shown below:
$mq->connect(
host => "localhost",
port => 5672,
timeout => undef,
username => 'guest',
password => 'guest',
virtual_host => '/',
heartbeat => undef,
socket_timeout => 5,
frame_max => 131072,
);
connect can also take a secure flag for SSL connections, this will only work if
L<IO::Socket::SSL> is available. You can also pass SSL specific arguments through
in the connect method and these will be passed through
$mq->connect(
...
secure => 1,
SSL_blah_blah => 1,
);
=head2 disconnect
Disconnects from the server
$mq->disconnect;
=head2 set_keepalive
Set a keep alive poller. Note: requires L<Socket::Linux>
$mq->set_keepalive(
idle => $secs, # time between last meaningful packet and first keep alive
count => $n, # number of failures to allow,
interval => $secs, # time between keep alives
);
=head2 receive
Receive the nextframe
my $rv = $mq->receive;
Content or $rv will look something like:
{
payload => $str,
content_header_frame => Net::AMQP::Frame::Header,
delivery_frame => Net::AMQP::Frame::Method,
}
=head2 channel_open
Open the given channel:
$mq->channel_open( channel => undef );
=head2 exchange_declare
Instantiate an exchange with a previously opened channel:
$mq->exchange_declare(
channel => undef,
exchange => undef,
exchange_type => undef,
passive => undef,
durable => undef,
auto_delete => undef,
internal => undef,
alternate_exchange => undef,
);
=head2 exchange_delete
( run in 0.537 second using v1.01-cache-2.11-cpan-5735350b133 )