Graphite-Simple

 view release on metacpan or  search on metacpan

Simple.xs  view on Meta::CPAN

inline void disconnect_ (GraphiteXS_Object* graphite) {
    if (graphite->is_connected) {
        close(graphite->sock_fd);
        graphite->is_connected = false;
        graphite->sock_fd = 0;
    }
}


MODULE = Graphite::Simple		PACKAGE = Graphite::Simple

PROTOTYPES: DISABLE

BOOT:
{
    sv_use_sock_stream_key = move(newSVpv("use_sock_stream", strlen("use_sock_stream")));
    sv_store_invalid_metrics_key = move(newSVpv("store_invalid_metrics", strlen("store_invalid_metrics")));
    sv_use_global_storage_key = move(newSVpv("use_common_storage", strlen("use_global_storage")));
    sv_sender_key   = move(newSVpv("sender_name", strlen("sender_name")));
    sv_block_re_key = move(newSVpv("block_metrics_re", strlen("block_metrics_re")));
    sv_sock_path_key = move(newSVpv("path", strlen("path")));
    sv_host_key = move(newSVpv("host", strlen("host")));
    sv_port_key = move(newSVpv("port", strlen("port")));
    sv_enabled_key = move(newSVpv("enabled", strlen("enabled")));
    sv_prefix_key = move(newSVpv("project", strlen("project")));
    MAX_CHUNK_SIZE = 1460;
}

GraphiteXS_Object* new (char* class_name, HV* opts)
CODE:

    GraphiteXS_Object *self = (GraphiteXS_Object *) safemalloc(sizeof(GraphiteXS_Object));

    self->invalid_key_counter = 0;
    self->bulk_hv = nullptr;
    self->avg_hv  = nullptr;
    self->invalid_hv = nullptr;
    self->block_re = NULL;
    self->is_enabled = false;
    self->is_connected = false;
    self->is_sock_stream = false;
    self->use_global_storage = false;
    self->store_invalid_metrics = false;
    self->global_prefix = nullptr;
    self->sock_path = nullptr;
    self->sender_name = nullptr;
    self->hostname = nullptr;
    self->port = 0;
    self->sock_fd = 0;

    memset(&self->sock_addr_unix, 0, sizeof(sockaddr_un));
    memset(&self->sock_addr_inet, 0, sizeof(sockaddr_in));
    apply_constructor_options_(self, opts);

    RETVAL = self;
OUTPUT:
    RETVAL


IV is_connected (GraphiteXS_Object *self)
PPCODE:
    mXPUSHi(self->is_connected ? 1 : 0);
    XSRETURN(1);


IV connect (GraphiteXS_Object *self)
PPCODE:
    connect_(self);
    mXPUSHi(self->is_connected ? 1 : 0);
    XSRETURN(1);


IV reconnect (GraphiteXS_Object *self)
PPCODE:
    disconnect_(self);
    connect_(self);
    mXPUSHi(self->is_connected ? 1 : 0);
    XSRETURN(1);


void disconnect (GraphiteXS_Object *self)
PPCODE:
    disconnect_(self);
    XSRETURN_EMPTY;


HV* get_bulk_metrics (GraphiteXS_Object *self)
CODE:
    //ST(0) = sv_mortalcopy((SV *) self->bulk_hv); // mXPUSHs((SV *) self->bulk_hv );
    //ST(0) = sv_2mortal(newRV_noinc( (SV *) self->bulk_hv ));
    //XSRETURN(1);
    RETVAL = self->bulk_hv;
OUTPUT:
    RETVAL


HV* get_invalid_metrics (GraphiteXS_Object *self)
CODE:
    RETVAL = self->invalid_hv;
OUTPUT:
    RETVAL


SV* send_bulk (GraphiteXS_Object* self)
PPCODE:

    IV is_success = 1;

    if (self->is_connected) {

        SSize_t keys_cnt = hv_iterinit(self->bulk_hv);

        if (keys_cnt) {

            // writes results into self->bulk_hv
            calculate_result_metrics_(self);

            STRLEN len, key_len = 0;
            const char *prefix = SvPVX(self->global_prefix);
            HE *entry = NULL;

            static string_view suffix;
            static string data ("");
            static bool is_first_time = true;
            static int send_flags = MSG_NOSIGNAL; // we don't set MSG_DONTWAIT flag because O_NONBLOCK is already set via fcntl

            if (is_first_time) {
                data.reserve(MAX_CHUNK_SIZE + 100);
                is_first_time = false;
            }

            suffix = move( string_view(move( " " + to_string(time(NULL)).append("\n") )) );

            while (entry = hv_iternext(self->bulk_hv)) {

                data.append(prefix);
                data.append(move(HePV(entry, key_len)));
                data.append(" ");
                data.append(move(SvPV_nolen(HeVAL(entry))));
                data.append(suffix);

                if (( len = data.length() ) >= MAX_CHUNK_SIZE) {
                    //warn("data:\n%s", data.c_str());
                    if (send(self->sock_fd, data.c_str(), move(len), send_flags) == -1) {
                        if (self->sock_path) {
                            disconnect_(self);
                            data.clear();
                            croak("Error: can't send. %s\n", strerror(errno));
                        }
                        is_success = 0;
                    }
                    data.clear();
                }
            }

            if ((len = data.length()) > 0) { // if we have only one key, then "len" iz zero here
                //warn("data:\n%s", data.c_str());
                if (send(self->sock_fd, data.c_str(), move(len), send_flags) == -1) {
                    if (self->sock_path) {
                        disconnect_(self);
                        data.clear();
                        croak("Error: can't send. %s\n", strerror(errno));
                    }
                    is_success = 0;
                }
                data.clear();
            }
        }
    }
    else
        warn("Client is not connected to server");

    clear_bulk_(self);
    mXPUSHi(move(is_success));
    XSRETURN(1);


SV* send_bulk_delegate (GraphiteXS_Object *self)
PPCODE:

    const char *sender = SvPVX(self->sender_name);

    if (!sender)
        croak("No sender was specified");

    calculate_result_metrics_(self);

    // see "man perlcall" for details

    IV status = 0;

    dSP; // Declares a local copy of perl's stack pointer for the XSUB, available via the "SP" macro.  See "SP".
    I32 ax;
    I32 count;

    ENTER;         // Opening bracket on a callback.  See "LEAVE" and perlcall.
    SAVETMPS;      // pening bracket for temporaries on a callback.  See "FREETMPS" and perlcall.
    PUSHMARK(SP);  // Opening bracket for arguments on a callback.  See "PUTBACK" and perlcall.
    EXTEND(SP, 1); // We are going to pass only one argument
    PUSHs(sv_2mortal(newRV( (SV *) self->bulk_hv ))); // Push into outgoing stack an argument
    PUTBACK;       // Closing bracket for XSUB arguments.  This is usually handled by "xsubpp".  See "PUSHMARK" and perlcall for other uses.

    count = call_pv(sender, G_SCALAR);

    SPAGAIN;      // Refetch the stack pointer.  Used after a callback.  See perlcall.
    SP -= count;
    ax = (SP - PL_stack_base) + 1;

    if (count != 1)
        croak("Unexpected size of returned stack from sender\n");

    uint32_t type = SvTYPE(ST(0));

    if (SVt_IV == type || SVt_NV == type)
        status = SvIV(ST(0)); // round the value
    else
        warn("The sender must return a number type of status. Using 0 as sender status now.");

    PUTBACK;  // Closing bracket for XSUB arguments.  This is usually handled by "xsubpp".  See "PUSHMARK" and perlcall for other uses.
    FREETMPS; // Closing bracket for temporaries on a callback.  See "SAVETMPS" and perlcall.
    LEAVE;    // Closing bracket on a callback.  See "ENTER" and perlcall.

    // warn("status: %d\n", status);
    clear_bulk_(self);
    mXPUSHi(status);
    XSRETURN(1);


HV* get_metrics (GraphiteXS_Object *self)
CODE:
    HV* res_hv = (HV *) sv_2mortal((SV*) newHV());
    calculate_result_metrics_(self, res_hv);
    RETVAL = move(res_hv);
OUTPUT:
    RETVAL


HV* get_average_counters (GraphiteXS_Object *self)
CODE:
    RETVAL = self->avg_hv;
OUTPUT:
    RETVAL


void clear_bulk (GraphiteXS_Object *self)
PPCODE:
    clear_bulk_(self);
    XSRETURN_EMPTY;


void incr_bulk (GraphiteXS_Object *self, SV* key, NV value = 1)
PPCODE:
    bool is_ok = false;
    if (key != &PL_sv_undef) {
        uint32_t key_type = SvTYPE(key);
        if (key_type == SVt_PVLV || key_type == SVt_PVMG) {
            // SVt_PVLV can be returned from substr
            // SVt_PVMG can be returned from RegExp
            STRLEN key_len;
            char *ch = SvPVx(move(key), key_len);
            key = newSVpv(move(ch), move(key_len));
        }
        if (SvTYPE(key) == SVt_PV) {
            is_ok = true;
            increment_metric_( self, move(key), move(value) );
        }
    }

    if (!is_ok)
        croak("key must be a string");

    XSRETURN_EMPTY;


void append_bulk (GraphiteXS_Object *self, HV* hv, SV* prefix = &PL_sv_undef)
PPCODE:

    if (SvTIED_mg((SV *) hv, PERL_MAGIC_tied))
        croak("Tied hashes are not supported");

    bool use_prefix = false;

    if (prefix && prefix != &PL_sv_undef) {

        uint32_t type = SvTYPE(prefix);

        use_prefix = type == SVt_PV || type == SVt_PVLV;

        if (!use_prefix && type != SVt_NULL)
            croak("prefix must be a string or undefined");
    }

    HE* entry = NULL;
    static string sprefix ('\0', 128); // preallocate 128 byte

    if (use_prefix) {
        sprefix.assign( SvPVX(prefix) );
        if (sprefix.back() != '.')
            sprefix.append(".");
    }

    hv_iterinit(hv);

    while (entry = hv_iternext(hv)) {

        SV* key = hv_iterkeysv(entry);
        SV* value = hv_iterval(hv, move(entry));
        uint32_t value_type = SvTYPE(value);

        if (
                SvROK(value) ||
                (
                    // value_type < SVt_IV || value_type > SVt_PVNV
                    SVt_IV != value_type &&
                    SVt_NV != value_type &&
                    SVt_PV != value_type &&
                    SVt_PVNV != value_type &&
                    SVt_PVIV != value_type
                )
           ) {
            warn("Value type of key '%s' is not a number", SvPVX(key));
            continue;
        }

        if (use_prefix)
            sv_setpv(key, ( sprefix + SvPVX(key) ).c_str());

        increment_metric_( self, move(key), move(SvNVx(value)) );
    }

    XSRETURN_EMPTY;


SV* is_valid_key (GraphiteXS_Object *self, SV* key)
PPCODE:
    bool is_valid = is_valid_key_(self, move(key));
    mXPUSHi( move(is_valid ? 1 : 0) );
    XSRETURN(1);


IV get_invalid_key_counter (GraphiteXS_Object *self)
PPCODE:
    mXPUSHi( self->invalid_key_counter );
    XSRETURN(1);


void check_and_bump_invalid_metric (GraphiteXS_Object *self, SV* key)
PPCODE:
    if ( SVt_NULL != SvTYPE(key) ) {
        uint32_t counter = move(self->invalid_key_counter);
        self->invalid_key_counter = 0;
        increment_hash_value_by( self->bulk_hv, move(key), move(counter) );
    }
    XSRETURN_EMPTY;


IV is_metric_blocked (GraphiteXS_Object *self, SV* key)
PPCODE:
    bool is_blocked = false;
    if ( is_valid_key_(self, key) )
        is_blocked = is_metric_blocked_(self, move(key));
    mXPUSHi( move(is_blocked ? 1 : 0) );
    XSRETURN(1);


void set_blocked_metrics_re (GraphiteXS_Object *self, SV* block_re = &PL_sv_undef)
PPCODE:
    set_blocked_metrics_re_(self, block_re);
    XSRETURN_EMPTY;


void DESTROY (...)
PPCODE:
    GraphiteXS_Object *self = (GraphiteXS_Object *) SvUV(SvRV(ST(0)));
    if (PL_dirty) // global destruction
        return;
    if (self->sender_name && SvREFCNT(self->sender_name)) // sv_clear
        SvREFCNT_dec_NN(self->sender_name);
    if (self->global_prefix && SvREFCNT(self->global_prefix)) // sv_clear
        SvREFCNT_dec_NN(self->global_prefix);
    if (self->hostname && SvREFCNT(self->hostname)) // sv_clear
        SvREFCNT_dec_NN(self->hostname);
    if (self->sock_path && SvREFCNT(self->sock_path)) // sv_clear
        SvREFCNT_dec_NN(self->sock_path);
    if (self->block_re && SvREFCNT(self->block_re))
        SvREFCNT_dec_NN(self->block_re);
    if (!self->use_global_storage) {
        if (self->bulk_hv) // hv_undef
            SvREFCNT_dec_NN(self->bulk_hv);
        if (self->avg_hv) // hv_undef
            SvREFCNT_dec_NN(self->avg_hv);
        if (self->invalid_hv) // hv_undef
            SvREFCNT_dec_NN(self->invalid_hv);
    }
    disconnect_(self);
    //dump_all();
    safefree(move(self));
    XSRETURN_EMPTY;



( run in 1.843 second using v1.01-cache-2.11-cpan-71847e10f99 )