Net-AMQP-RabbitMQ

 view release on metacpan or  search on metacpan

RabbitMQ.xs  view on Meta::CPAN


    /* Pull in arguments if we have any */
    if(args)
    {
      hash_to_amqp_table(args, &arguments, 1);
    }

    reply = amqp_exchange_unbind(
      conn,
      channel,
      amqp_cstring_bytes(destination),
      amqp_cstring_bytes(source),
      amqp_cstring_bytes(routing_key),
      arguments
    );
    die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Unbinding Exchange");

void net_amqp_rabbitmq_queue_delete(conn, channel, queuename, options = NULL)
  Net::AMQP::RabbitMQ conn
  int channel
  char *queuename
  HV *options
  PREINIT:
    int if_unused = 1;
    int if_empty = 1;
    amqp_queue_delete_ok_t *reply = (amqp_queue_delete_ok_t*)NULL;
  CODE:
    assert_amqp_connected(conn);

    if(options) {
      int_from_hv(options, if_unused);
      int_from_hv(options, if_empty);
    }
    reply = amqp_queue_delete(
            conn,
            channel,
            amqp_cstring_bytes(queuename),
            if_unused,
            if_empty
        );
    if (reply == NULL) {
        die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Deleting queue");
    }
    XPUSHs(sv_2mortal(newSVuv(reply->message_count)));

void
net_amqp_rabbitmq_queue_declare(conn, channel, queuename, options = NULL, args = NULL)
  Net::AMQP::RabbitMQ conn
  int channel
  char *queuename
  HV *options
  HV *args
  PREINIT:
    int passive = 0;
    int durable = 0;
    int exclusive = 0;
    int auto_delete = 1;
    amqp_table_t arguments = amqp_empty_table;
    amqp_bytes_t queuename_b = amqp_empty_bytes;
    amqp_queue_declare_ok_t *r = (amqp_queue_declare_ok_t*)NULL;
  PPCODE:
    assert_amqp_connected(conn);

    if(queuename && strcmp(queuename, "")) queuename_b = amqp_cstring_bytes(queuename);
    if(options) {
      int_from_hv(options, passive);
      int_from_hv(options, durable);
      int_from_hv(options, exclusive);
      int_from_hv(options, auto_delete);
    }
    if(args)
    {
      hash_to_amqp_table(args, &arguments, 1);
    }
    r = amqp_queue_declare(conn, channel, queuename_b, passive,
                                                    durable, exclusive, auto_delete,
                                                    arguments);
    die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Declaring queue");
    XPUSHs(sv_2mortal(newSVpvn(r->queue.bytes, r->queue.len)));
    if(GIMME_V == G_LIST) {
      XPUSHs(sv_2mortal(newSVuv(r->message_count)));
      XPUSHs(sv_2mortal(newSVuv(r->consumer_count)));
    }

void
net_amqp_rabbitmq_queue_bind(conn, channel, queuename, exchange, bindingkey, args = NULL)
  Net::AMQP::RabbitMQ conn
  int channel
  char *queuename
  char *exchange
  char *bindingkey
  HV *args
  PREINIT:
    amqp_table_t arguments = amqp_empty_table;
  CODE:
    assert_amqp_connected(conn);

    if(queuename == NULL
      ||
      exchange == NULL
      ||
      0 == strlen(queuename)
      ||
      0 == strlen(exchange)
    )
    {
      Perl_croak(aTHX_ "queuename and exchange must both be specified");
    }

    if(args)
      hash_to_amqp_table(args, &arguments, 0);
    amqp_queue_bind(conn, channel, amqp_cstring_bytes(queuename),
                    amqp_cstring_bytes(exchange),
                    amqp_cstring_bytes(bindingkey),
                    arguments);
    maybe_release_buffers(conn);
    die_on_amqp_error(aTHX_ amqp_get_rpc_reply(conn), conn, "Binding queue");

void
net_amqp_rabbitmq_queue_unbind(conn, channel, queuename, exchange, bindingkey, args = NULL)
  Net::AMQP::RabbitMQ conn



( run in 1.138 second using v1.01-cache-2.11-cpan-5511b514fd6 )