AnyEvent-YACurl

 view release on metacpan or  search on metacpan

YACurl.xs  view on Meta::CPAN


typedef struct {
    SV *watchset_fn;
    SV *timerset_fn;
    HV *curlopt;
} my_cxt_t;

typedef struct {
    CURLM *multi;
    SV *weak_self_ref;

    int needs_invoke_timeout;
    int needs_read_info;
    int last_running;
} AnyEvent__YACurl;

typedef struct {
    SV *self_rv;
    CURL *easy;
    curl_mime *mimepost;

    AV *held_references;
    FILE *redirected_stderr;
    int slists_count;
    struct curl_slist **slists;
    char errbuf[CURL_ERROR_SIZE];

    SV *callback;
} AnyEvent__YACurl__Response;

START_MY_CXT

struct curl_slist *slist_from_av(pTHX_ struct curl_slist *list, AV *input);

void maybe_warn_eval(pTHX)
{
    SV *error = ERRSV;
    if (SvTRUE(error)) {
        warn("Error in callback: %s", SvPV_nolen(error));
    }
}

int mcurl_socket_callback(CURL* easy,
                          curl_socket_t s,
                          int what,
                          void* userp,
                          void* socketp)
{
    dTHX;
    dMY_CXT;
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, 3);
    PUSHs((SV*)userp); /* XXX This is a weakened reference, will it ever be undef? */
    PUSHs(sv_2mortal(newSViv(s)));
    PUSHs(sv_2mortal(newSViv(what)));
    PUTBACK;

    call_sv(MY_CXT.watchset_fn, G_DISCARD | G_VOID);

    FREETMPS;
    LEAVE;

    return 0;
}

int mcurl_timer_callback(CURLM* multi,
                         long timeout_ms,
                         void *userp)
{
    dTHX;

    if (timeout_ms == 0) {
        /* We short-circuit timeout_ms==0, as we're very likely to call do_post_work shortly
         * after reaching this code path. A timer of 0sec in AnyEvent would almost always turn
         * into a 1ms wait, which is unnecessary and slow. Same goes for AE::postpone. */
        IV tmp = SvIV((SV*)SvRV((SV*)userp));
        AnyEvent__YACurl *client = INT2PTR(AnyEvent__YACurl*, tmp);

        client->needs_invoke_timeout = 1;
        return 0;
    }

    dMY_CXT;
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, 2);
    PUSHs((SV*)userp); /* XXX This is a weakened reference, will it ever be undef? */
    PUSHs(sv_2mortal(newSViv(timeout_ms)));
    PUTBACK;

    call_sv(MY_CXT.timerset_fn, G_DISCARD | G_VOID);

    FREETMPS;
    LEAVE;

    return 0;
}

/* write callback: used for WRITEFUNCTION and HEADERFUNCTION */
size_t mcurl_write_callback(char *ptr,
                           size_t size,
                           size_t nmemb,
                           void *userdata)
{
    dTHX;
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, 1);
    PUSHs(sv_2mortal(newSVpvn(ptr, size*nmemb)));
    PUTBACK;

    call_sv((SV*)userdata, G_DISCARD | G_VOID | G_EVAL);

    SPAGAIN;
    maybe_warn_eval(aTHX);
    PUTBACK;

    FREETMPS;
    LEAVE;

    return size * nmemb;
}

size_t mcurl_read_callback(char *buffer,
                           size_t size,
                           size_t nitems,
                           void *userdata)
{
    size_t result;

    dTHX;
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, 1);
    PUSHs(sv_2mortal(newSViv(size * nitems)));
    PUTBACK;

    call_sv((SV*)userdata, G_SCALAR | G_EVAL);

    SPAGAIN;
    maybe_warn_eval(aTHX);
    SV *data = POPs;
    if (!SvOK(data)) {
        /* undef. We also go here if the callback croaked... how convenient */
        result = CURL_READFUNC_ABORT;
    } else {
        STRLEN pvlen;
        char *pv = SvPV(data, pvlen);

        if (pvlen > size*nitems) {
            warn("Read callback returned more data than allowed; aborting stream");
            result = CURL_READFUNC_ABORT;

        } else {
            result = pvlen;
            Copy(pv, buffer, pvlen, char);
        }
    }
    PUTBACK;

    FREETMPS;
    LEAVE;

    return result;
}

int mcurl_debug_callback(CURL *handle,
                         curl_infotype type,
                         char *data,
                         size_t size,
                         void *userdata)
{
    dTHX;
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, 2);
    PUSHs(sv_2mortal(newSViv(type)));
    PUSHs(sv_2mortal(newSVpvn(data, size)));
    PUTBACK;

    call_sv((SV*)userdata, G_DISCARD | G_VOID | G_EVAL);

    SPAGAIN;
    maybe_warn_eval(aTHX);
    PUTBACK;

    FREETMPS;
    LEAVE;

    return 0;
}

int mcurl_trailer_callback(struct curl_slist **output, void *userdata)
{
    dTHX;
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);

    call_sv((SV*)userdata, G_EVAL | G_SCALAR);

    SPAGAIN;
    int return_result;
    SV *returned = POPs;
    if (SvTRUE(ERRSV)) {
        /* This codepath: something did a die() */
        maybe_warn_eval(aTHX);
        return_result = CURL_TRAILERFUNC_ABORT;
    } else if (!SvTRUE(returned)) {
        /* This codepath: return undef; */
        return_result = CURL_TRAILERFUNC_ABORT;
    } else if (!SvROK(returned) || SvTYPE(SvRV(returned)) != SVt_PVAV) {
        /* This codepath: return "something"; */
        warn("Cannot convert %s to ARRAY reference", SvPV_nolen(returned));
        return_result = CURL_TRAILERFUNC_ABORT;
    } else {
        /* This codepath: return [..trailers..]; */
        *output = slist_from_av(aTHX_ *output, (AV*)SvRV(returned));
        return_result = CURL_TRAILERFUNC_OK;
    }
    PUTBACK;

    FREETMPS;
    LEAVE;

    return return_result;
}

int finish_request(pTHX_ AnyEvent__YACurl* client, CURL* easy, CURLcode code)
{
    AnyEvent__YACurl__Response *response;
    curl_easy_getinfo(easy, CURLINFO_PRIVATE, (void*)&response);

    dSP;
    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, 2);

    if (code == CURLE_OK) {
        PUSHs(response->self_rv);
        PUSHs(&PL_sv_undef);
    } else {
        PUSHs(&PL_sv_undef);
        if (strlen(response->errbuf)) {
            PUSHs(sv_2mortal(newSVpv(response->errbuf, 0)));
        } else {
            PUSHs(sv_2mortal(newSVpv(curl_easy_strerror(code), 0)));
        }
    }

    PUTBACK;

    call_sv(response->callback, G_DISCARD | G_VOID | G_EVAL);

    SPAGAIN;
    maybe_warn_eval(aTHX);
    PUTBACK;

    FREETMPS;
    LEAVE;

    /* Clean some fields we don't need anymore. We do this now instead of via DESTROY, to break
     * potential reference cycles. */
    SvREFCNT_dec(response->held_references);
    response->held_references = NULL;
    SvREFCNT_dec(response->callback);
    response->callback = NULL;

    /* But, the request is done, so let Perl clean things up when ready */
    SV *self_rv = response->self_rv;
    response->self_rv = NULL;
    SvREFCNT_dec(self_rv);

    return 0;
}

int update_running(pTHX_ AnyEvent__YACurl* client, int new_running)
{
    if (client->last_running == new_running) {
        return 0;
    }

    client->last_running = new_running;
    client->needs_read_info = 1;

    return 0;
}

int do_post_work(pTHX_ AnyEvent__YACurl* client)
{
    while (client->needs_invoke_timeout || client->needs_read_info) {
        {
            int running;

            client->needs_invoke_timeout = 0;
            curl_multi_socket_action(client->multi, CURL_SOCKET_TIMEOUT, 0, &running);
            update_running(aTHX_ client, running);
        }

        {
            int msgq;
            struct CURLMsg *m = curl_multi_info_read(client->multi, &msgq);

            if (m && (m->msg == CURLMSG_DONE)) {
                CURL *e = m->easy_handle;

                curl_multi_remove_handle(client->multi, e);
                finish_request(aTHX_ client, e, m->data.result);
            }

            client->needs_read_info = m ? 1 : 0;
        }
    }

    return 0;
}

YACurl.xs  view on Meta::CPAN


        CURLMcode error = curl_multi_add_handle(client->multi, easy);
        if (error != CURLM_OK) {
            croak("Failed to perform CURL request: %s", curl_multi_strerror(error));
        }

        /* At this point we succeeded, so we want to be sure we retain the structs until we're done */
        SvREFCNT_inc(response_ctx->self_rv);

        update_running(aTHX_ client, client->last_running + 1);
        client->needs_invoke_timeout = 1;

        do_post_work(aTHX_ client);

PROTOTYPES: ENABLE

void
_ae_set_helpers(watchset, timerset)
        SV* watchset
        SV* timerset
    CODE:
        dMY_CXT;

        if (MY_CXT.watchset_fn != NULL)
            croak("watchset already set");
        MY_CXT.watchset_fn = newSVsv(watchset);

        if (MY_CXT.timerset_fn != NULL)
            croak("timerset already set");
        MY_CXT.timerset_fn = newSVsv(timerset);

void
_ae_timer_fired(self)
        SV* self
    CODE:
        AnyEvent__YACurl *client = sv_to_client(aTHX_ self);

        client->needs_invoke_timeout = 1;
        do_post_work(aTHX_ client);

void
_ae_event(self, sock, is_write)
        SV* self
        int sock
        int is_write
    CODE:
        AnyEvent__YACurl *client = sv_to_client(aTHX_ self);

        int running;
        curl_multi_socket_action(client->multi, sock, (is_write ? CURL_CSELECT_OUT : CURL_CSELECT_IN), &running);
        update_running(aTHX_ client, running);

        do_post_work(aTHX_ client);

HV*
_get_known_constants()
    CODE:
        RETVAL = newHV();
        sv_2mortal((SV*)RETVAL); /* hehe, perl bugs! */
        fill_hv_with_constants(aTHX_ RETVAL);
    OUTPUT:
        RETVAL

void
DESTROY(self)
        SV* self
    CODE:
        AnyEvent__YACurl *client = sv_to_client(aTHX_ self);
        if (client->last_running)
            warn("Destroying with %d requests active", client->last_running);

        if (client->multi != NULL) {
            curl_multi_cleanup(client->multi);
        }
        if (client->weak_self_ref != NULL) {
            SvREFCNT_dec(client->weak_self_ref);
        }

        Safefree(client);



MODULE = AnyEvent::YACurl       PACKAGE = AnyEvent::YACurl::Response

SV*
getinfo(self, option)
        SV* self
        SV* option
    CODE:
        dMY_CXT;
        AnyEvent__YACurl__Response *response = sv_to_response(aTHX_ self);

        int opt_from_str;
        CURLINFO opt = option_from_sv_or_croak(aTHX_ aMY_CXT_ option, 0, &opt_from_str);

        if (opt == CURLINFO_PRIVATE) {
            /* These would be meaningless to access, so don't bother */
            croak("Refusing access to private CURL data");

        } else if ((opt & CURLINFO_TYPEMASK) == CURLINFO_STRING) {
            char *result;
            CURLcode ccode = curl_easy_getinfo(response->easy, opt, &result);
            if (ccode != CURLE_OK) {
                croak("%s", curl_easy_strerror(ccode));
            }
            RETVAL = newSVpv(result, 0);

        } else if ((opt & CURLINFO_TYPEMASK) == CURLINFO_LONG) {
            long result;
            CURLcode ccode = curl_easy_getinfo(response->easy, opt, &result);
            if (ccode != CURLE_OK) {
                croak("%s", curl_easy_strerror(ccode));
            }
            RETVAL = newSViv(result);

        } else if ((opt & CURLINFO_TYPEMASK) == CURLINFO_OFF_T) {
            curl_off_t result;
            CURLcode ccode = curl_easy_getinfo(response->easy, opt, &result);
            if (ccode != CURLE_OK) {
                croak("%s", curl_easy_strerror(ccode));
            }
            RETVAL = newSViv(result);

        } else if ((opt & CURLINFO_TYPEMASK) == CURLINFO_DOUBLE) {
            double result;
            CURLcode ccode = curl_easy_getinfo(response->easy, opt, &result);
            if (ccode != CURLE_OK) {
                croak("%s", curl_easy_strerror(ccode));
            }
            RETVAL = newSVnv(result);

        } else if (opt_from_str) {
            croak("Don't know what to do with curl's %d (%s)", opt, SvPV_nolen(option));
        } else {
            croak("Don't know what to do with curl's %d", opt);
        }

    OUTPUT:
        RETVAL

void
DESTROY(self)
        SV* self
    CODE:
        AnyEvent__YACurl__Response *response = sv_to_response(aTHX_ self);

        if (response->easy) {
            curl_easy_cleanup(response->easy);
        }
        if (response->mimepost) {
            curl_mime_free(response->mimepost);
        }
        if (response->held_references) {
            SvREFCNT_dec(response->held_references);
        }
        if (response->redirected_stderr) {
            fclose(response->redirected_stderr);
        }
        if (response->slists) {
            int i;
            for (i = 0; i < response->slists_count; i++) {
                curl_slist_free_all(response->slists[i]);
            }
            Safefree(response->slists);
        }
        if (response->callback) {
            SvREFCNT_dec(response->callback);
        }

        Safefree(response);



( run in 0.811 second using v1.01-cache-2.11-cpan-13bb782fe5a )