Mac-FSEvents

 view release on metacpan or  search on metacpan

FSEvents.xs  view on Meta::CPAN

        self->flags
    );

    FSEventStreamScheduleWithRunLoop(
        stream,
        mainLoop,
        kCFRunLoopDefaultMode
    );

    FSEventStreamStart(stream);

    pthread_mutex_lock(&self->mutex);

    self->stream = stream;

    pthread_cond_signal(&wd->cond);
    pthread_mutex_unlock(&self->mutex);

    CFRunLoopRun();
    return NULL;
}

int _check_process(FSEvents *self)
{
    return self->original_pid == getpid();
}

static void
stop_impl(FSEvents *self)
{
    if ( !self ) {
        return;
    }

    /* If we don't own the data, let the parent
     * clean it up */
    if ( !_check_process(self) ) {
        return;
    }

    if ( !self->stream ) {
        // We've already stopped
        return;
    }

    // Signal the thread with a dummy byte
    write(self->reqpipe[1], (const void *)&self->reqpipe, 1);

    // wait for it to stop
    pthread_join( self->tid, NULL );
}

#include "const-c.inc"

MODULE = Mac::FSEvents      PACKAGE = Mac::FSEvents

PROTOTYPES: DISABLE

void
_new (char *klass, HV *args)
PPCODE:
{
    SV *pv = NEWSV(0, sizeof(FSEvents));
    SV **svp;
    AV *ppaths;
    SSize_t numPaths;
    int i;

    FSEvents *self = (FSEvents *)SvPVX(pv);

    SvPOK_only(pv);

    _init(self);

    if ((svp = hv_fetch(args, "latency", 7, FALSE))) {
        self->latency = (CFAbsoluteTime)SvNV(*svp);
    }

    if ((svp = hv_fetch(args, "since", 5, FALSE))) {
        self->since = (FSEventStreamEventId)SvIV(*svp);
    }

    if ((svp = hv_fetch(args, "path", 4, FALSE))) {
        ppaths = (AV*)SvRV(*svp);
        numPaths = av_len( ppaths ) + 1;

        CFStringRef paths[numPaths];
        for ( i = 0; i < numPaths; i++ ) {
            svp = av_fetch( ppaths, i, 0 );
            paths[i] = CFStringCreateWithCString(
                NULL,
                SvPV_nolen( *svp ),
                kCFStringEncodingUTF8
            );
        }

        self->pathsToWatch = CFArrayCreate(
            NULL,
            (const void **)paths,
            numPaths,
            NULL
        );
    }

    if ((svp = hv_fetch(args, "flags", 5, FALSE))) {
        self->flags = (FSEventStreamCreateFlags)SvIV(*svp);
    }

    XPUSHs( sv_2mortal( sv_bless(
        newRV_noinc(pv),
        gv_stashpv(klass, 1)
    ) ) );
}

void
_DESTROY(FSEvents *self)
CODE:
{
    if ( !self ) {
        return;
    }

    stop_impl(self);

    /* we don't check if we own anything, because we have to clean up
     * memory anyway */

    if ( self->pathsToWatch ) {
        CFRelease( self->pathsToWatch );
        self->pathsToWatch = NULL;
    }

    if ( self->queue ) {
        free( self->queue );
        self->queue = NULL;
    }

    pthread_mutex_destroy(&self->mutex);
}

void
watch(FSEvents *self)
PPCODE:
{
    int err;
    FILE *fh;
    struct watch_data wd;
    GV *glob;
    PerlIO *fp;
    const char *error_message = NULL;
    int respipe_read_copy = -1;

    /* we don't check process ownership here, because we'll be populating
     * new data structures anyway */

    if (self->respipe[0] > 0) {
        fprintf( stderr, "Error: already watching, please call stop() first\n" );
        XSRETURN_UNDEF;
    }

    if ( pipe( self->respipe ) ) {
        error_message = "unable to initialize result pipe: %s";
        err = errno;
        goto handle_errors;
    }
    respipe_read_copy = dup(self->respipe[0]);
    if(respipe_read_copy < 0) {
        error_message = "Unable to dup file descriptor: %s";
        err = errno;
        goto handle_errors;
    }

    if ( pipe( self->reqpipe ) ) {
        error_message = "unable to initialize request pipe: %s";
        err = errno;
        goto handle_errors;
    }

    self->original_pid = getpid();

    wd.fs_events = self;

    pthread_cond_init(&wd.cond, NULL);

    err = pthread_create( &self->tid, NULL, _watch_thread, (void *)&wd );
    if (err != 0) {
        pthread_cond_destroy(&wd.cond);
        error_message = "can't create thread: %s";
        goto handle_errors;
    }

    pthread_mutex_lock(&self->mutex);
    while(! self->stream) {
        pthread_cond_wait(&wd.cond, &self->mutex);
    }
    pthread_mutex_unlock(&self->mutex);

    pthread_cond_destroy(&wd.cond);

    fh = fdopen( respipe_read_copy, "r" );

    glob = (GV *) newGVgen("Mac::FSEvents");
    fp   = PerlIO_importFILE(fh, 0);
    do_open(glob, "+<&", 3, FALSE, 0, 0, fp);

    XPUSHs( sv_2mortal( newRV_noinc((SV *) glob) ) );
    return;
handle_errors:
    if(self->respipe[0] >= 0) {
        close(self->respipe[0]);
        close(self->respipe[1]);
        self->respipe[0] = -1;
        self->respipe[1] = -1;
    }
    if(self->reqpipe[0] >= 0) {
        close(self->reqpipe[0]);
        close(self->reqpipe[1]);
        self->reqpipe[0] = -1;
        self->reqpipe[1] = -1;
    }
    if(respipe_read_copy >= 0) {
        close(respipe_read_copy);
    }
    croak(error_message, err);
}

void
stop(FSEvents *self)
CODE:
{
    stop_impl(self);
}

void
read_events(FSEvents *self)
PPCODE:
{
    HV *event;
    char buf;
    struct event *e;

    if ( !_check_process(self) ) {
        /* If we don't own the data, we die with an error message */
        croak( "Called Mac::FSEvents::read_events from process other than the originator" );
    }

    if ( self->respipe[0] > 0 ) {
        ssize_t bytes;
        int read_attempts = 0;

        pthread_mutex_lock(&self->mutex);

        /* If the queue is not empty, that means there's at least one byte in
         * our pipe.  We need to clear it so that select() returns an accurate
         * result. */
        if(self->queue->head) {
            bytes = read(self->respipe[0], &buf, 1);
            if(bytes <= 0) {
                return;
            }
        }
        /* Otherwise, we need to wait for the helper thread to populate the
         * queue.  Once it does this, read() will return, so we'll grab the
         * mutex again and check its success.
         */
        while(! self->queue->head) {
            pthread_mutex_unlock(&self->mutex);
            bytes = read(self->respipe[0], &buf, 1);
            if(bytes <= 0) {
                return;
            }
            pthread_mutex_lock(&self->mutex);
        }

        // read queue into hash
        for (e = self->queue->head; e != NULL; e = e->next) {
            event = newHV();

            hv_store( event, "id",    2, newSVuv(e->id), 0 );
            hv_store( event, "path",  4, newSVpv(e->path, 0), 0 );

            // Translate flags into friendly hash keys
            if ( e->flags > 0 ) {
                hv_store( event, "flags", 5, newSVuv(e->flags), 0 );

                if ( e->flags & kFSEventStreamEventFlagMustScanSubDirs ) {
                    hv_store( event, "must_scan_subdirs", 17, newSVuv(1), 0 );

                    if ( e->flags & kFSEventStreamEventFlagUserDropped ) {
                        hv_store( event, "user_dropped", 12, newSVuv(1), 0 );
                    }
                    else if ( e->flags & kFSEventStreamEventFlagKernelDropped ) {
                        hv_store( event, "kernel_dropped", 14, newSVuv(1), 0 );
                    }
                }



( run in 0.514 second using v1.01-cache-2.11-cpan-5511b514fd6 )