Event-Lib-UDPPump

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

Revision history for Perl extension Event::Lib::UDPPump.

0.01  Thu Nov  2 16:40:31 2006
	- original version; created by h2xs 1.23 with options
		-A -O -n Event::Lib::UDPPump eventlib-test/blocked_read.h

MANIFEST  view on Meta::CPAN

blocked_read.c
blocked_read.h
Changes
check.c
lib/Event/Lib/UDPPump.pm
Makefile.PL
MANIFEST
ppport.h
README
t/Event-Lib-UDPPump.t
t/perlcritic.t
t/pod.t

Makefile.PL  view on Meta::CPAN

$args{LIBS} = '-levent -lm -lpthread' if ! exists $args{LIBS};

WriteMakefile(
    NAME            => 'Event::Lib::UDPPump',
    VERSION_FROM    => 'lib/Event/Lib/UDPPump.pm', 
    PREREQ_PM       => { 'Event::Lib' => '1.00'}, 
    ABSTRACT_FROM   => 'lib/Event/Lib/UDPPump.pm', 
    AUTHOR	    => 'Clayton O\'Neill <coneill@oneill.net>',
    LIBS            => [ $args{LIBS} ], 
    INC             => $args{INC} ,
    H		    => [ qw/blocked_read.h/ ],
    XS		    => { 'UDPPump.xs' => 'UDPPump.c' },
    clean	    => { FILES => "a.out config.c" },
    OBJECT          => 'blocked_read.o UDPPump.o',
);


package MY;

# need to override here so that DEFINE=... on commandline works
# along with the DEFINEs internally diddled out by Makefile.PL
sub constants {
    my $self = shift;
    $self->{DEFINE} .= " -DHAVE_CONFIG_H";

UDPPump.xs  view on Meta::CPAN

#include "EXTERN.h"
#include "perl.h"
#include "perlio.h"
#include "XSUB.h"

#include "ppport.h"

#include "blocked_read.h"

struct udppump {
  blocked_read_t *br;
  SV *io;
  CV *func;
  int buckets;
  SV** args;
};

#define to_perlio(sv)   IoIFP(sv_2io(sv))

struct udppump *IN_CALLBACK = NULL;
#define ENTER_callback(ev)      IN_CALLBACK = ev

UDPPump.xs  view on Meta::CPAN

  RETVAL

void
add(struct udppump* pump) 
CODE:
  PerlIO* io = to_perlio(pump->io);
  int fd = io ? PerlIO_fileno(io) : -1;
  if (fd == -1) {
    croak("Event::Lib::UDPPump::add - bad file descriptor");
  }
  pump->br = register_blocked_read(fd, pump_cb, (void *)pump);

void 
fh(struct udppump* pump)
CODE:
  ST(0) = pump->io;
  XSRETURN(1);

blocked_read.c  view on Meta::CPAN

#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <pthread.h>
#include "blocked_read.h"

static void msgqueue_pop(int fd, short flags, void *arg) {
   blocked_read_t *br = arg;
   char buf[64];
   void *data;

   read(fd, buf, sizeof(buf));

   pthread_mutex_lock(&br->lock);
   br->callback(br->data, br->cbarg);
   br->data = NULL;
   pthread_mutex_unlock(&br->lock);
   pthread_cond_signal(&br->processed);
}

static int msgqueue_push(blocked_read_t *br, void *msg) {
   const char buf[1] = { 0 };
   int r = 0;

   br->data = msg;
   write(br->push_fd, buf, 1);

   while (br->data) {
     pthread_cond_wait(&br->processed, &br->lock);
   }

   return(r);
}

static void *read_thread(void *argument) {
  socklen_t fromlen;
  blocked_read_t *br = (blocked_read_t *)argument;
  
  while (1) {
    fromlen = sizeof(br->msg.from);
    br->msg.len = recvfrom(br->fd, br->msg.buffer, MAXMSGSIZE, 0,
                           (struct sockaddr *)&br->msg.from, 
                           &fromlen
                           );
    if (br->msg.len == -1) {
      br->msg.error = errno;
    } else {
      br->msg.error = 0;
    }

    msgqueue_push(br, (void *)&br->msg);
  }

  return NULL;
}

/* returns handle on success and NULL on failure */
blocked_read_t* register_blocked_read(int fd, 
                                      void (*callback)(void*, void*),
                                      void *cbarg
                                      ) 
{
  blocked_read_t *br;
  int rc;
  int fds[2];

  br = calloc(sizeof(blocked_read_t), 1);
  br->fd = fd;

  if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) {
    free(br);
    return NULL;
  }

  br->push_fd = fds[0];
  br->pop_fd = fds[1];
  br->data = NULL;

blocked_read.h  view on Meta::CPAN

  
  struct event queue_ev;

  msg_t msg;
  
  pthread_mutex_t lock;
  pthread_cond_t processed;
  void (*callback)(void*, void*);
  void *data;
  void *cbarg;
} blocked_read_t;

blocked_read_t* register_blocked_read(int fd, 
                                      void (*callback)(void*, void*),
                                      void *cbarg
                                      );


#endif /* _BLOCKED_READ_ */



( run in 0.739 second using v1.01-cache-2.11-cpan-49f99fa48dc )