Coro-Mysql
view release on metacpan or search on metacpan
#define IN_DESTRUCT PL_dirty
typedef U16 uint16;
/* cached function gv's */
static CV *readable, *writable;
static int use_ev;
#if MARIADB_VERSION_ID >= 100300
typedef unsigned char uchar; /* bug? */
#include <ma_pvio.h>
#define PVIO 1
#define VIOPTR MARIADB_PVIO *
#define VIOM(vio) (vio)->methods
#define vioblocking blocking
#define vioclose close
#define VIODATA(vio) (vio)->data
/* ma_pvio_get_socket would be it, but it's only declared, not defined */
#define VIOSD(vio) mysql_get_socket ((vio)->mysql)
#define VIO_READ_BUFFER_SIZE PVIO_READ_AHEAD_CACHE_SIZE
#define my_to_vio(sock) (sock)->net.pvio
#define OURDATAPTR ((ourdata *)vio->methods)
typedef uchar *xgptr;
typedef const uchar *cxgptr;
typedef size_t xsize_t;
typedef ssize_t xssize_t;
typedef my_bool xmy_bool;
#else
#include "violite.h"
#define PVIO 0
#define VIOPTR Vio *
#define VIOM(vio) vio
#define VIODATA(vio) (vio)->desc
#define VIOSD(vio) (vio)->sd
#define my_to_vio(sock) (sock)->net.vio
typedef int xmy_bool;
#endif
#define CoMy_MAGIC 0x436f4d79
typedef struct {
#if PVIO
/* must be first member */
struct st_ma_pvio_methods methods;
#else
#if DESC_IS_PTR
char desc[30];
const char *old_desc;
#endif
#endif
int magic;
SV *corohandle_sv, *corohandle;
int bufofs, bufcnt;
#if HAVE_EV
ev_io rw, ww;
#endif
char buf[VIO_READ_BUFFER_SIZE];
#if PVIO
struct st_ma_pvio_methods *oldmethods;
#else
xssize_t (*old_read)(VIOPTR, uchar *, size_t);
xssize_t (*old_write)(VIOPTR, const uchar *, size_t);
xmy_bool (*old_close)(VIOPTR);
#endif
} ourdata;
#ifndef OURDATAPTR
#if DESC_IS_PTR
# define OURDATAPTR (*(ourdata **)&((vio)->desc))
#else
# define DESC_OFFSET 22
# define OURDATAPTR (*((ourdata **)((vio)->desc + DESC_OFFSET)))
#endif
#endif
static xssize_t
our_read (VIOPTR vio, xgptr p, xsize_t len)
{
ourdata *our = OURDATAPTR;
if (!our->bufcnt)
{
int rd;
my_bool dummy;
VIOM (vio)->vioblocking (vio, 0, &dummy);
for (;;)
{
rd = recv (VIOSD (vio), our->buf, sizeof (our->buf), 0);
if (rd >= 0 || errno != EAGAIN)
break;
#if HAVE_EV
if (use_ev)
{
our->rw.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
ev_io_start (EV_DEFAULT_UC, &(our->rw));
CORO_SCHEDULE;
ev_io_stop (EV_DEFAULT_UC, &(our->rw)); /* avoids races */
}
else
#endif
{
dSP;
PUSHMARK (SP);
XPUSHs (our->corohandle);
PUTBACK;
call_sv ((SV *)readable, G_VOID | G_DISCARD);
}
}
if (rd <= 0)
return rd;
our->bufcnt = rd;
our->bufofs = 0;
}
if (our->bufcnt < len)
len = our->bufcnt;
memcpy (p, our->buf + our->bufofs, len);
our->bufofs += len;
our->bufcnt -= len;
return len;
}
static xssize_t
our_write (VIOPTR vio, cxgptr p, xsize_t len)
{
char *ptr = (char *)p;
my_bool dummy;
VIOM (vio)->vioblocking (vio, 0, &dummy);
while (len > 0)
{
int wr = send (VIOSD (vio), ptr, len, 0);
if (wr > 0)
{
ptr += wr;
len -= wr;
}
else if (errno == EAGAIN)
{
ourdata *our = OURDATAPTR;
#if HAVE_EV
if (use_ev)
{
our->ww.data = (void *)sv_2mortal (SvREFCNT_inc (CORO_CURRENT));
ev_io_start (EV_DEFAULT_UC, &(our->ww));
CORO_SCHEDULE;
ev_io_stop (EV_DEFAULT_UC, &(our->ww)); /* avoids races */
}
else
#endif
{
dSP;
PUSHMARK (SP);
XPUSHs (our->corohandle);
PUTBACK;
call_sv ((SV *)writable, G_VOID | G_DISCARD);
}
}
else if (ptr == (char *)p)
return -1;
else
break;
}
return ptr - (char *)p;
}
static xmy_bool
our_close (VIOPTR vio)
{
ourdata *our = OURDATAPTR;
if (VIOM (vio)->read != our_read)
croak ("vio.read has unexpected content during unpatch - wtf?");
if (VIOM (vio)->write != our_write)
croak ("vio.write has unexpected content during unpatch - wtf?");
if (VIOM (vio)->vioclose != our_close)
croak ("vio.vioclose has unexpected content during unpatch - wtf?");
#if HAVE_EV
if (use_ev)
{
ev_io_stop (EV_DEFAULT_UC, &(our->rw));
ev_io_stop (EV_DEFAULT_UC, &(our->ww));
}
#endif
SvREFCNT_dec (our->corohandle);
SvREFCNT_dec (our->corohandle_sv);
#if DESC_IS_PTR
vio->desc = our->old_desc;
#endif
#if PVIO
vio->methods = our->oldmethods;
#else
VIOM (vio)->vioclose = our->old_close;
VIOM (vio)->write = our->old_write;
VIOM (vio)->read = our->old_read;
#endif
Safefree (our);
VIOM (vio)->vioclose (vio);
}
#if HAVE_EV
static void
iocb (EV_P_ ev_io *w, int revents)
{
ev_io_stop (EV_A, w);
CORO_READY ((SV *)w->data);
}
#endif
MODULE = Coro::Mysql PACKAGE = Coro::Mysql
BOOT:
{
readable = get_cv ("Coro::Mysql::readable", 0);
writable = get_cv ("Coro::Mysql::writable", 0);
}
PROTOTYPES: ENABLE
void
_use_ev ()
PPCODE:
{
static int onceonly;
if (!onceonly)
{
onceonly = 1;
#if HAVE_EV
I_EV_API ("Coro::Mysql");
I_CORO_API ("Coro::Mysql");
use_ev = 1;
#endif
}
XPUSHs (use_ev ? &PL_sv_yes : &PL_sv_no);
}
void
_patch (IV sock, int fd, unsigned long client_version, SV *corohandle_sv, SV *corohandle)
CODE:
{
MYSQL *my = (MYSQL *)sock;
VIOPTR vio = my_to_vio (my);
ourdata *our;
/* matching versions are required but not sufficient */
if (client_version != mysql_get_client_version ())
croak ("DBD::mysql linked against different libmysqlclient library than Coro::Mysql (%lu vs. %lu).",
client_version, mysql_get_client_version ());
if (fd != my->net.fd)
croak ("DBD::mysql fd and libmysql disagree - library mismatch, unsupported transport or API changes?");
if (fd != VIOSD (vio))
croak ("DBD::mysql fd and vio-sd disagree - library mismatch, unsupported transport or API changes?");
#if MYSQL_VERSION_ID < 100010 && !defined(MARIADB_BASE_VERSION)
if (VIOM (vio)->vioclose != vio_close)
croak ("vio.vioclose has unexpected content - library mismatch, unsupported transport or API changes?");
if (VIOM (vio)->write != vio_write)
croak ("vio.write has unexpected content - library mismatch, unsupported transport or API changes?");
if (VIOM (vio)->read != vio_read
&& VIOM (vio)->read != vio_read_buff)
croak ("vio.read has unexpected content - library mismatch, unsupported transport or API changes?");
#endif
#if PVIO
if (vio->type != PVIO_TYPE_UNIXSOCKET && vio->type != PVIO_TYPE_SOCKET)
croak ("connection type mismatch: Coro::Mysql only supports 'unixsocket' and 'socket' types at this time");
#endif
Newz (0, our, 1, ourdata);
our->magic = CoMy_MAGIC;
our->corohandle_sv = newSVsv (corohandle_sv);
our->corohandle = newSVsv (corohandle);
#if HAVE_EV
if (use_ev)
{
ev_io_init (&(our->rw), iocb, VIOSD (vio), EV_READ);
ev_io_init (&(our->ww), iocb, VIOSD (vio), EV_WRITE);
}
#endif
#if PVIO
/* with pvio, we replace methods by our own struct,
* both becauase the original might be read-only,
* and because we have no private data member, so the
* methods pointer includes our data as well
*/
our->methods = *vio->methods;
our->oldmethods = vio->methods;
vio->methods = &our->methods;
#else
OURDATAPTR = our;
#if DESC_IS_PTR
our->old_desc = vio->desc;
strncpy (our->desc, vio->desc, sizeof (our->desc));
our->desc [sizeof (our->desc) - 1] = 0;
#else
vio->desc [DESC_OFFSET - 1] = 0;
#endif
our->old_close = VIOM (vio)->vioclose;
our->old_write = VIOM (vio)->write;
our->old_read = VIOM (vio)->read;
#endif
/* with pvio, this patches our own struct */
VIOM (vio)->vioclose = our_close;
VIOM (vio)->write = our_write;
VIOM (vio)->read = our_read;
}
int
_is_patched (IV sock)
CODE:
{
MYSQL *my = (MYSQL *)sock;
VIOPTR vio = my_to_vio (my);
RETVAL = VIOM (vio)->write == our_write;
}
OUTPUT: RETVAL
int
have_ev ()
CODE:
RETVAL = 0;
#if HAVE_EV
RETVAL = 1;
#endif
OUTPUT: RETVAL
( run in 0.505 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )