Graphite-Simple
view release on metacpan or search on metacpan
inline void disconnect_ (GraphiteXS_Object* graphite) {
if (graphite->is_connected) {
close(graphite->sock_fd);
graphite->is_connected = false;
graphite->sock_fd = 0;
}
}
MODULE = Graphite::Simple PACKAGE = Graphite::Simple
PROTOTYPES: DISABLE
BOOT:
{
sv_use_sock_stream_key = move(newSVpv("use_sock_stream", strlen("use_sock_stream")));
sv_store_invalid_metrics_key = move(newSVpv("store_invalid_metrics", strlen("store_invalid_metrics")));
sv_use_global_storage_key = move(newSVpv("use_common_storage", strlen("use_global_storage")));
sv_sender_key = move(newSVpv("sender_name", strlen("sender_name")));
sv_block_re_key = move(newSVpv("block_metrics_re", strlen("block_metrics_re")));
sv_sock_path_key = move(newSVpv("path", strlen("path")));
sv_host_key = move(newSVpv("host", strlen("host")));
sv_port_key = move(newSVpv("port", strlen("port")));
sv_enabled_key = move(newSVpv("enabled", strlen("enabled")));
sv_prefix_key = move(newSVpv("project", strlen("project")));
MAX_CHUNK_SIZE = 1460;
}
GraphiteXS_Object* new (char* class_name, HV* opts)
CODE:
GraphiteXS_Object *self = (GraphiteXS_Object *) safemalloc(sizeof(GraphiteXS_Object));
self->invalid_key_counter = 0;
self->bulk_hv = nullptr;
self->avg_hv = nullptr;
self->invalid_hv = nullptr;
self->block_re = NULL;
self->is_enabled = false;
self->is_connected = false;
self->is_sock_stream = false;
self->use_global_storage = false;
self->store_invalid_metrics = false;
self->global_prefix = nullptr;
self->sock_path = nullptr;
self->sender_name = nullptr;
self->hostname = nullptr;
self->port = 0;
self->sock_fd = 0;
memset(&self->sock_addr_unix, 0, sizeof(sockaddr_un));
memset(&self->sock_addr_inet, 0, sizeof(sockaddr_in));
apply_constructor_options_(self, opts);
RETVAL = self;
OUTPUT:
RETVAL
IV is_connected (GraphiteXS_Object *self)
PPCODE:
mXPUSHi(self->is_connected ? 1 : 0);
XSRETURN(1);
IV connect (GraphiteXS_Object *self)
PPCODE:
connect_(self);
mXPUSHi(self->is_connected ? 1 : 0);
XSRETURN(1);
IV reconnect (GraphiteXS_Object *self)
PPCODE:
disconnect_(self);
connect_(self);
mXPUSHi(self->is_connected ? 1 : 0);
XSRETURN(1);
void disconnect (GraphiteXS_Object *self)
PPCODE:
disconnect_(self);
XSRETURN_EMPTY;
HV* get_bulk_metrics (GraphiteXS_Object *self)
CODE:
//ST(0) = sv_mortalcopy((SV *) self->bulk_hv); // mXPUSHs((SV *) self->bulk_hv );
//ST(0) = sv_2mortal(newRV_noinc( (SV *) self->bulk_hv ));
//XSRETURN(1);
RETVAL = self->bulk_hv;
OUTPUT:
RETVAL
HV* get_invalid_metrics (GraphiteXS_Object *self)
CODE:
RETVAL = self->invalid_hv;
OUTPUT:
RETVAL
SV* send_bulk (GraphiteXS_Object* self)
PPCODE:
IV is_success = 1;
if (self->is_connected) {
SSize_t keys_cnt = hv_iterinit(self->bulk_hv);
if (keys_cnt) {
// writes results into self->bulk_hv
calculate_result_metrics_(self);
STRLEN len, key_len = 0;
const char *prefix = SvPVX(self->global_prefix);
HE *entry = NULL;
static string_view suffix;
static string data ("");
static bool is_first_time = true;
static int send_flags = MSG_NOSIGNAL; // we don't set MSG_DONTWAIT flag because O_NONBLOCK is already set via fcntl
if (is_first_time) {
data.reserve(MAX_CHUNK_SIZE + 100);
is_first_time = false;
}
suffix = move( string_view(move( " " + to_string(time(NULL)).append("\n") )) );
while (entry = hv_iternext(self->bulk_hv)) {
data.append(prefix);
data.append(move(HePV(entry, key_len)));
data.append(" ");
data.append(move(SvPV_nolen(HeVAL(entry))));
data.append(suffix);
if (( len = data.length() ) >= MAX_CHUNK_SIZE) {
//warn("data:\n%s", data.c_str());
if (send(self->sock_fd, data.c_str(), move(len), send_flags) == -1) {
if (self->sock_path) {
disconnect_(self);
data.clear();
croak("Error: can't send. %s\n", strerror(errno));
}
is_success = 0;
}
data.clear();
}
}
if ((len = data.length()) > 0) { // if we have only one key, then "len" iz zero here
//warn("data:\n%s", data.c_str());
if (send(self->sock_fd, data.c_str(), move(len), send_flags) == -1) {
if (self->sock_path) {
disconnect_(self);
data.clear();
croak("Error: can't send. %s\n", strerror(errno));
}
is_success = 0;
}
data.clear();
}
}
}
else
warn("Client is not connected to server");
clear_bulk_(self);
mXPUSHi(move(is_success));
XSRETURN(1);
SV* send_bulk_delegate (GraphiteXS_Object *self)
PPCODE:
const char *sender = SvPVX(self->sender_name);
if (!sender)
croak("No sender was specified");
calculate_result_metrics_(self);
// see "man perlcall" for details
IV status = 0;
dSP; // Declares a local copy of perl's stack pointer for the XSUB, available via the "SP" macro. See "SP".
I32 ax;
I32 count;
ENTER; // Opening bracket on a callback. See "LEAVE" and perlcall.
SAVETMPS; // pening bracket for temporaries on a callback. See "FREETMPS" and perlcall.
PUSHMARK(SP); // Opening bracket for arguments on a callback. See "PUTBACK" and perlcall.
EXTEND(SP, 1); // We are going to pass only one argument
PUSHs(sv_2mortal(newRV( (SV *) self->bulk_hv ))); // Push into outgoing stack an argument
PUTBACK; // Closing bracket for XSUB arguments. This is usually handled by "xsubpp". See "PUSHMARK" and perlcall for other uses.
count = call_pv(sender, G_SCALAR);
SPAGAIN; // Refetch the stack pointer. Used after a callback. See perlcall.
SP -= count;
ax = (SP - PL_stack_base) + 1;
if (count != 1)
croak("Unexpected size of returned stack from sender\n");
uint32_t type = SvTYPE(ST(0));
if (SVt_IV == type || SVt_NV == type)
status = SvIV(ST(0)); // round the value
else
warn("The sender must return a number type of status. Using 0 as sender status now.");
PUTBACK; // Closing bracket for XSUB arguments. This is usually handled by "xsubpp". See "PUSHMARK" and perlcall for other uses.
FREETMPS; // Closing bracket for temporaries on a callback. See "SAVETMPS" and perlcall.
LEAVE; // Closing bracket on a callback. See "ENTER" and perlcall.
// warn("status: %d\n", status);
clear_bulk_(self);
mXPUSHi(status);
XSRETURN(1);
HV* get_metrics (GraphiteXS_Object *self)
CODE:
HV* res_hv = (HV *) sv_2mortal((SV*) newHV());
calculate_result_metrics_(self, res_hv);
RETVAL = move(res_hv);
OUTPUT:
RETVAL
HV* get_average_counters (GraphiteXS_Object *self)
CODE:
RETVAL = self->avg_hv;
OUTPUT:
RETVAL
void clear_bulk (GraphiteXS_Object *self)
PPCODE:
clear_bulk_(self);
XSRETURN_EMPTY;
void incr_bulk (GraphiteXS_Object *self, SV* key, NV value = 1)
PPCODE:
bool is_ok = false;
if (key != &PL_sv_undef) {
uint32_t key_type = SvTYPE(key);
if (key_type == SVt_PVLV || key_type == SVt_PVMG) {
// SVt_PVLV can be returned from substr
// SVt_PVMG can be returned from RegExp
STRLEN key_len;
char *ch = SvPVx(move(key), key_len);
key = newSVpv(move(ch), move(key_len));
}
if (SvTYPE(key) == SVt_PV) {
is_ok = true;
increment_metric_( self, move(key), move(value) );
}
}
if (!is_ok)
croak("key must be a string");
XSRETURN_EMPTY;
void append_bulk (GraphiteXS_Object *self, HV* hv, SV* prefix = &PL_sv_undef)
PPCODE:
if (SvTIED_mg((SV *) hv, PERL_MAGIC_tied))
croak("Tied hashes are not supported");
bool use_prefix = false;
if (prefix && prefix != &PL_sv_undef) {
uint32_t type = SvTYPE(prefix);
use_prefix = type == SVt_PV || type == SVt_PVLV;
if (!use_prefix && type != SVt_NULL)
croak("prefix must be a string or undefined");
}
HE* entry = NULL;
static string sprefix ('\0', 128); // preallocate 128 byte
if (use_prefix) {
sprefix.assign( SvPVX(prefix) );
if (sprefix.back() != '.')
sprefix.append(".");
}
hv_iterinit(hv);
while (entry = hv_iternext(hv)) {
SV* key = hv_iterkeysv(entry);
SV* value = hv_iterval(hv, move(entry));
uint32_t value_type = SvTYPE(value);
if (
SvROK(value) ||
(
// value_type < SVt_IV || value_type > SVt_PVNV
SVt_IV != value_type &&
SVt_NV != value_type &&
SVt_PV != value_type &&
SVt_PVNV != value_type &&
SVt_PVIV != value_type
)
) {
warn("Value type of key '%s' is not a number", SvPVX(key));
continue;
}
if (use_prefix)
sv_setpv(key, ( sprefix + SvPVX(key) ).c_str());
increment_metric_( self, move(key), move(SvNVx(value)) );
}
XSRETURN_EMPTY;
SV* is_valid_key (GraphiteXS_Object *self, SV* key)
PPCODE:
bool is_valid = is_valid_key_(self, move(key));
mXPUSHi( move(is_valid ? 1 : 0) );
XSRETURN(1);
IV get_invalid_key_counter (GraphiteXS_Object *self)
PPCODE:
mXPUSHi( self->invalid_key_counter );
XSRETURN(1);
void check_and_bump_invalid_metric (GraphiteXS_Object *self, SV* key)
PPCODE:
if ( SVt_NULL != SvTYPE(key) ) {
uint32_t counter = move(self->invalid_key_counter);
self->invalid_key_counter = 0;
increment_hash_value_by( self->bulk_hv, move(key), move(counter) );
}
XSRETURN_EMPTY;
IV is_metric_blocked (GraphiteXS_Object *self, SV* key)
PPCODE:
bool is_blocked = false;
if ( is_valid_key_(self, key) )
is_blocked = is_metric_blocked_(self, move(key));
mXPUSHi( move(is_blocked ? 1 : 0) );
XSRETURN(1);
void set_blocked_metrics_re (GraphiteXS_Object *self, SV* block_re = &PL_sv_undef)
PPCODE:
set_blocked_metrics_re_(self, block_re);
XSRETURN_EMPTY;
void DESTROY (...)
PPCODE:
GraphiteXS_Object *self = (GraphiteXS_Object *) SvUV(SvRV(ST(0)));
if (PL_dirty) // global destruction
return;
if (self->sender_name && SvREFCNT(self->sender_name)) // sv_clear
SvREFCNT_dec_NN(self->sender_name);
if (self->global_prefix && SvREFCNT(self->global_prefix)) // sv_clear
SvREFCNT_dec_NN(self->global_prefix);
if (self->hostname && SvREFCNT(self->hostname)) // sv_clear
SvREFCNT_dec_NN(self->hostname);
if (self->sock_path && SvREFCNT(self->sock_path)) // sv_clear
SvREFCNT_dec_NN(self->sock_path);
if (self->block_re && SvREFCNT(self->block_re))
SvREFCNT_dec_NN(self->block_re);
if (!self->use_global_storage) {
if (self->bulk_hv) // hv_undef
SvREFCNT_dec_NN(self->bulk_hv);
if (self->avg_hv) // hv_undef
SvREFCNT_dec_NN(self->avg_hv);
if (self->invalid_hv) // hv_undef
SvREFCNT_dec_NN(self->invalid_hv);
}
disconnect_(self);
//dump_all();
safefree(move(self));
XSRETURN_EMPTY;
( run in 1.843 second using v1.01-cache-2.11-cpan-71847e10f99 )