Net-AMQP-RabbitMQ-PP

 view release on metacpan or  search on metacpan

lib/Net/AMQP/RabbitMQ/PP.pm  view on Meta::CPAN

package Net::AMQP::RabbitMQ::PP;

use strict;
use warnings;

our $VERSION = '0.11';

use Carp;
use Cwd;
use English qw(-no_match_vars);
use File::ShareDir;
use IO::Select;
use IO::Socket::INET;
use Socket qw( IPPROTO_TCP );
use List::MoreUtils;
use Net::AMQP;
use Sys::Hostname;
use Try::Tiny;
use Time::HiRes;

use constant HAS_TLS => eval { require IO::Socket::SSL; 1 };

sub new {
	my ( $class, %parameters ) = @_;

	if( ! %Net::AMQP::Protocol::spec ) {
		Net::AMQP::Protocol->load_xml_spec(
			File::ShareDir::dist_file(
				'Net-AMQP-RabbitMQ-PP',
				'amqp0-9-1.extended.xml'
			)
		);
	}

	my $self = bless {}, ref $class || $class;

	return $self;
}

sub connect {
	my ( $self, %args ) = @_;

	try {
		local $SIG{ALRM} = sub {
			Carp::croak 'Timed out';
		};

		if( $args{timeout} ) {
			Time::HiRes::alarm( $args{timeout} );
		}

		my $connection_class = "IO::Socket::INET";
		my %connection_args;

		if ( $args{secure} ) {
			die "IO::Socket::SSL is required for secure connections"
				if ! HAS_TLS;
			$connection_class = "IO::Socket::SSL";
			my @ssl_args = grep { /^SSL_/ } sort keys %args;
			@connection_args{ @ssl_args } = @args{ @ssl_args };
		}

		$self->_set_handle(
			$connection_class->new(
				PeerAddr => $args{host} || 'localhost',
				PeerPort => $args{port} || ( $args{secure} ? 5671 : 5672 ),
				( ! $args{secure} ? ( Proto => 'tcp' ) : () ),
				( $args{socket_timeout} ? ( Timeout => $args{socket_timeout} ) : () ),
				%connection_args,
			) or Carp::croak "Could not connect: $EVAL_ERROR"
		);

		$self->_select( IO::Select->new( $self->_get_handle ) );

		if( $args{timeout} ) {
			Time::HiRes::alarm( 0 );
		}
	}
	catch {
		Carp::croak $_;
	};

	$self->_get_handle->autoflush( 1 );

	my $password = $args{password} || 'guest';
	my $username = $args{username} || 'guest';
	my $virtualhost = $args{virtual_host} || '/';
	my $frame_max = $args{frame_max} || 0;
	my $heartbeat = $args{heartbeat} || 0;


	# Backlog of messages.
	$self->_backlog( [] );

	$self->_startup(
		username => $username,
		password => $password,
		virtual_host => $virtualhost,
		frame_max => $frame_max,
		heartbeat => $heartbeat,
	);

	return $self;
}

sub set_keepalive {
	my ( $self, %args ) = @_;
	my $handle = $self->_get_handle;
	my $idle = $args{idle};
	my $count = $args{count};
	my $interval = $args{interval};

	if( eval { require Socket::Linux } ) {
		# Turn on keep alive probes.
		defined $handle->sockopt( SO_KEEPALIVE, 1 )
			or Carp::croak "Could not turn on tcp keep alive: $OS_ERROR";

		# Time between last meaningful packet and first keep alive

lib/Net/AMQP/RabbitMQ/PP.pm  view on Meta::CPAN

		),
	);
}

1;

__END__

=head1 NAME

Net::AMQP::RabbitMQ::PP - Pure perl AMQP client for RabbitMQ

=head1 SYNOPSIS

    use Net::AMQP::RabbitMQ::PP;

    my $connection = Net::AMQP::RabbitMQ::PP->new();
    $connection->connect;
    $connection->basic_publish(
        payload => "Foo",
        routing_key => "foo.bar",
    );
    $connection->disconnect

=head1 DESCRIPTION

Like L<Net::RabbitMQ> but pure perl rather than a wrapper around librabbitmq.

=head1 VERSION

0.11

=head1 SUBROUTINES/METHODS

A list of methods with their default arguments (undef = no default)

=head2 new

Loads the AMQP protocol definition, primarily. Will not be an active
connection until ->connect is called.

	my $mq = Net::AMQP::RabbitMQ::PP->new;

=head2 connect

Connect to the server. Default arguments are shown below:

	$mq->connect(
		host           => "localhost",
		port           => 5672,
		timeout        => undef,
		username       => 'guest',
		password       => 'guest',
		virtual_host   => '/',
		heartbeat      => undef,
		socket_timeout => 5,
		frame_max      => 131072,
	);

connect can also take a secure flag for SSL connections, this will only work if
L<IO::Socket::SSL> is available. You can also pass SSL specific arguments through
in the connect method and these will be passed through

	$mq->connect(
		...
		secure => 1,
		SSL_blah_blah => 1,
	);

=head2 disconnect

Disconnects from the server

	$mq->disconnect;

=head2 set_keepalive

Set a keep alive poller. Note: requires L<Socket::Linux>

	$mq->set_keepalive(
		idle     => $secs, # time between last meaningful packet and first keep alive
		count    => $n,    # number of failures to allow,
		interval => $secs, # time between keep alives
	);

=head2 receive

Receive the nextframe

	my $rv = $mq->receive;

Content or $rv will look something like:

	{
		payload              => $str,
		content_header_frame => Net::AMQP::Frame::Header,
		delivery_frame       => Net::AMQP::Frame::Method,
	}

=head2 channel_open

Open the given channel:

	$mq->channel_open( channel => undef );

=head2 exchange_declare

Instantiate an exchange with a previously opened channel:

	$mq->exchange_declare(
		channel            => undef,
		exchange           => undef,
		exchange_type      => undef,
		passive            => undef,
		durable            => undef,
		auto_delete        => undef,
		internal           => undef,
		alternate_exchange => undef,
	);

=head2 exchange_delete



( run in 0.537 second using v1.01-cache-2.11-cpan-5735350b133 )