Event-Lib-UDPPump
view release on metacpan or search on metacpan
Revision history for Perl extension Event::Lib::UDPPump.
0.01 Thu Nov 2 16:40:31 2006
- original version; created by h2xs 1.23 with options
-A -O -n Event::Lib::UDPPump eventlib-test/blocked_read.h
blocked_read.c
blocked_read.h
Changes
check.c
lib/Event/Lib/UDPPump.pm
Makefile.PL
MANIFEST
ppport.h
README
t/Event-Lib-UDPPump.t
t/perlcritic.t
t/pod.t
Makefile.PL view on Meta::CPAN
$args{LIBS} = '-levent -lm -lpthread' if ! exists $args{LIBS};
WriteMakefile(
NAME => 'Event::Lib::UDPPump',
VERSION_FROM => 'lib/Event/Lib/UDPPump.pm',
PREREQ_PM => { 'Event::Lib' => '1.00'},
ABSTRACT_FROM => 'lib/Event/Lib/UDPPump.pm',
AUTHOR => 'Clayton O\'Neill <coneill@oneill.net>',
LIBS => [ $args{LIBS} ],
INC => $args{INC} ,
H => [ qw/blocked_read.h/ ],
XS => { 'UDPPump.xs' => 'UDPPump.c' },
clean => { FILES => "a.out config.c" },
OBJECT => 'blocked_read.o UDPPump.o',
);
package MY;
# need to override here so that DEFINE=... on commandline works
# along with the DEFINEs internally diddled out by Makefile.PL
sub constants {
my $self = shift;
$self->{DEFINE} .= " -DHAVE_CONFIG_H";
#include "EXTERN.h"
#include "perl.h"
#include "perlio.h"
#include "XSUB.h"
#include "ppport.h"
#include "blocked_read.h"
struct udppump {
blocked_read_t *br;
SV *io;
CV *func;
int buckets;
SV** args;
};
#define to_perlio(sv) IoIFP(sv_2io(sv))
struct udppump *IN_CALLBACK = NULL;
#define ENTER_callback(ev) IN_CALLBACK = ev
RETVAL
void
add(struct udppump* pump)
CODE:
PerlIO* io = to_perlio(pump->io);
int fd = io ? PerlIO_fileno(io) : -1;
if (fd == -1) {
croak("Event::Lib::UDPPump::add - bad file descriptor");
}
pump->br = register_blocked_read(fd, pump_cb, (void *)pump);
void
fh(struct udppump* pump)
CODE:
ST(0) = pump->io;
XSRETURN(1);
blocked_read.c view on Meta::CPAN
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <pthread.h>
#include "blocked_read.h"
static void msgqueue_pop(int fd, short flags, void *arg) {
blocked_read_t *br = arg;
char buf[64];
void *data;
read(fd, buf, sizeof(buf));
pthread_mutex_lock(&br->lock);
br->callback(br->data, br->cbarg);
br->data = NULL;
pthread_mutex_unlock(&br->lock);
pthread_cond_signal(&br->processed);
}
static int msgqueue_push(blocked_read_t *br, void *msg) {
const char buf[1] = { 0 };
int r = 0;
br->data = msg;
write(br->push_fd, buf, 1);
while (br->data) {
pthread_cond_wait(&br->processed, &br->lock);
}
return(r);
}
static void *read_thread(void *argument) {
socklen_t fromlen;
blocked_read_t *br = (blocked_read_t *)argument;
while (1) {
fromlen = sizeof(br->msg.from);
br->msg.len = recvfrom(br->fd, br->msg.buffer, MAXMSGSIZE, 0,
(struct sockaddr *)&br->msg.from,
&fromlen
);
if (br->msg.len == -1) {
br->msg.error = errno;
} else {
br->msg.error = 0;
}
msgqueue_push(br, (void *)&br->msg);
}
return NULL;
}
/* returns handle on success and NULL on failure */
blocked_read_t* register_blocked_read(int fd,
void (*callback)(void*, void*),
void *cbarg
)
{
blocked_read_t *br;
int rc;
int fds[2];
br = calloc(sizeof(blocked_read_t), 1);
br->fd = fd;
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) {
free(br);
return NULL;
}
br->push_fd = fds[0];
br->pop_fd = fds[1];
br->data = NULL;
blocked_read.h view on Meta::CPAN
struct event queue_ev;
msg_t msg;
pthread_mutex_t lock;
pthread_cond_t processed;
void (*callback)(void*, void*);
void *data;
void *cbarg;
} blocked_read_t;
blocked_read_t* register_blocked_read(int fd,
void (*callback)(void*, void*),
void *cbarg
);
#endif /* _BLOCKED_READ_ */
( run in 0.739 second using v1.01-cache-2.11-cpan-49f99fa48dc )