Net--RabbitMQ
view release on metacpan or search on metacpan
RabbitMQ.xs view on Meta::CPAN
void
net_rabbitmq_exchange_declare(conn, channel, exchange, options = NULL, args = NULL)
Net::RabbitMQ conn
int channel
char *exchange
HV *options
HV *args
PREINIT:
amqp_rpc_reply_t *amqp_rpc_reply;
char *exchange_type = "direct";
int passive = 0;
int durable = 0;
int auto_delete = 1;
amqp_table_t arguments = AMQP_EMPTY_TABLE;
CODE:
if(options) {
str_from_hv(options, exchange_type);
int_from_hv(options, passive);
int_from_hv(options, durable);
int_from_hv(options, auto_delete);
}
amqp_exchange_declare(conn, channel, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchange_type),
passive, durable, auto_delete, arguments);
amqp_rpc_reply = amqp_get_rpc_reply();
die_on_amqp_error(aTHX_ *amqp_rpc_reply, "Declaring exchange");
void
net_rabbitmq_exchange_delete(conn, channel, exchange, options = NULL)
Net::RabbitMQ conn
int channel
char *exchange
HV *options
PREINIT:
amqp_rpc_reply_t *amqp_rpc_reply;
int if_unused = 1;
int nowait = 0;
CODE:
if(options) {
int_from_hv(options, if_unused);
int_from_hv(options, nowait);
}
amqp_exchange_delete(conn, channel, amqp_cstring_bytes(exchange), if_unused, nowait);
amqp_rpc_reply = amqp_get_rpc_reply();
die_on_amqp_error(aTHX_ *amqp_rpc_reply, "Deleting exchange");
void
net_rabbitmq_queue_declare(conn, channel, queuename, options = NULL, args = NULL)
Net::RabbitMQ conn
int channel
char *queuename
HV *options
HV *args
PREINIT:
amqp_rpc_reply_t *amqp_rpc_reply;
int passive = 0;
int durable = 0;
int exclusive = 0;
int auto_delete = 1;
amqp_table_t arguments = AMQP_EMPTY_TABLE;
amqp_bytes_t queuename_b = AMQP_EMPTY_BYTES;
PPCODE:
if(queuename && strcmp(queuename, "")) queuename_b = amqp_cstring_bytes(queuename);
if(options) {
int_from_hv(options, passive);
int_from_hv(options, durable);
int_from_hv(options, exclusive);
int_from_hv(options, auto_delete);
}
if(args)
hash_to_amqp_table(conn, args, &arguments);
amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, channel, queuename_b, passive,
durable, exclusive, auto_delete,
arguments);
amqp_rpc_reply = amqp_get_rpc_reply();
die_on_amqp_error(aTHX_ *amqp_rpc_reply, "Declaring queue");
XPUSHs(sv_2mortal(newSVpvn(r->queue.bytes, r->queue.len)));
if(GIMME_V == G_ARRAY) {
XPUSHs(sv_2mortal(newSVuv(r->message_count)));
XPUSHs(sv_2mortal(newSVuv(r->consumer_count)));
}
void
net_rabbitmq_queue_bind(conn, channel, queuename, exchange, bindingkey, args = NULL)
Net::RabbitMQ conn
int channel
char *queuename
char *exchange
char *bindingkey
HV *args
PREINIT:
amqp_rpc_reply_t *amqp_rpc_reply;
amqp_table_t arguments = AMQP_EMPTY_TABLE;
CODE:
if(queuename == NULL || exchange == NULL)
Perl_croak(aTHX_ "queuename and exchange must both be specified");
if(bindingkey == NULL && args == NULL)
Perl_croak(aTHX_ "bindingkey or args must be specified");
if(args)
hash_to_amqp_table(conn, args, &arguments);
amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename),
amqp_cstring_bytes(exchange),
amqp_cstring_bytes(bindingkey),
arguments);
amqp_rpc_reply = amqp_get_rpc_reply();
die_on_amqp_error(aTHX_ *amqp_rpc_reply, "Binding queue");
void
net_rabbitmq_queue_unbind(conn, channel, queuename, exchange, bindingkey, args = NULL)
Net::RabbitMQ conn
int channel
char *queuename
char *exchange
char *bindingkey
HV *args
PREINIT:
amqp_rpc_reply_t *amqp_rpc_reply;
amqp_table_t arguments = AMQP_EMPTY_TABLE;
CODE:
if(queuename == NULL || exchange == NULL)
Perl_croak(aTHX_ "queuename and exchange must both be specified");
if(bindingkey == NULL && args == NULL)
( run in 0.550 second using v1.01-cache-2.11-cpan-5511b514fd6 )