Nginx-Perl
view release on metacpan or search on metacpan
src/event/ngx_event_pipe.c view on Meta::CPAN
break;
}
p->read = 1;
if (n == 0) {
p->upstream_eof = 1;
break;
}
}
delay = p->limit_rate ? (ngx_msec_t) n * 1000 / p->limit_rate : 0;
p->read_length += n;
cl = chain;
p->free_raw_bufs = NULL;
while (cl && n > 0) {
ngx_event_pipe_remove_shadow_links(cl->buf);
size = cl->buf->end - cl->buf->last;
if (n >= size) {
cl->buf->last = cl->buf->end;
/* STUB */ cl->buf->num = p->num++;
if (p->input_filter(p, cl->buf) == NGX_ERROR) {
return NGX_ABORT;
}
n -= size;
ln = cl;
cl = cl->next;
ngx_free_chain(p->pool, ln);
} else {
cl->buf->last += n;
n = 0;
}
}
if (cl) {
for (ln = cl; ln->next; ln = ln->next) { /* void */ }
ln->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
}
if (delay > 0) {
p->upstream->read->delayed = 1;
ngx_add_timer(p->upstream->read, delay);
break;
}
}
#if (NGX_DEBUG)
for (cl = p->busy; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf busy s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}
for (cl = p->out; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf out s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}
for (cl = p->in; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf in s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe buf free s:%d t:%d f:%d "
"%p, pos %p, size: %z "
"file: %O, size: %O",
(cl->buf->shadow ? 1 : 0),
cl->buf->temporary, cl->buf->in_file,
cl->buf->start, cl->buf->pos,
cl->buf->last - cl->buf->pos,
cl->buf->file_pos,
cl->buf->file_last - cl->buf->file_pos);
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe length: %O", p->length);
#endif
if (p->free_raw_bufs && p->length != -1) {
cl = p->free_raw_bufs;
if (cl->buf->last - cl->buf->pos >= p->length) {
p->free_raw_bufs = cl->next;
src/event/ngx_event_pipe.c view on Meta::CPAN
if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
/* STUB */ p->free_raw_bufs->buf->num = p->num++;
if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
return NGX_ABORT;
}
p->free_raw_bufs = p->free_raw_bufs->next;
if (p->free_bufs && p->buf_to_file == NULL) {
for (cl = p->free_raw_bufs; cl; cl = cl->next) {
if (cl->buf->shadow == NULL) {
ngx_pfree(p->pool, cl->buf->start);
}
}
}
}
if (p->cacheable && (p->in || p->buf_to_file)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write chain");
if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
return NGX_ABORT;
}
}
return NGX_OK;
}
static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
{
u_char *prev;
size_t bsize;
ngx_int_t rc;
ngx_uint_t flush, flushed, prev_last_shadow;
ngx_chain_t *out, **ll, *cl;
ngx_connection_t *downstream;
downstream = p->downstream;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream: %d", downstream->write->ready);
flushed = 0;
for ( ;; ) {
if (p->downstream_error) {
return ngx_event_pipe_drain_chains(p);
}
if (p->upstream_eof || p->upstream_error || p->upstream_done) {
/* pass the p->out and p->in chains to the output filter */
for (cl = p->busy; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
if (p->out) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush out");
for (cl = p->out; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->out);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
p->out = NULL;
}
if (p->in) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");
for (cl = p->in; cl; cl = cl->next) {
cl->buf->recycled = 0;
}
rc = p->output_filter(p->output_ctx, p->in);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
p->in = NULL;
}
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream done");
/* TODO: free unused bufs */
p->downstream_done = 1;
break;
}
if (downstream->data != p->output_ctx
|| !downstream->write->ready
|| downstream->write->delayed)
{
break;
}
/* bsize is the size of the busy recycled bufs */
prev = NULL;
bsize = 0;
for (cl = p->busy; cl; cl = cl->next) {
if (cl->buf->recycled) {
if (prev == cl->buf->start) {
continue;
}
bsize += cl->buf->end - cl->buf->start;
prev = cl->buf->start;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write busy: %uz", bsize);
out = NULL;
if (bsize >= (size_t) p->busy_size) {
flush = 1;
goto flush;
}
flush = 0;
ll = NULL;
prev_last_shadow = 1;
for ( ;; ) {
if (p->out) {
cl = p->out;
if (cl->buf->recycled) {
ngx_log_error(NGX_LOG_ALERT, p->log, 0,
"recycled buffer in pipe out chain");
}
p->out = p->out->next;
} else if (!p->cacheable && p->in) {
cl = p->in;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write buf ls:%d %p %z",
cl->buf->last_shadow,
cl->buf->pos,
cl->buf->last - cl->buf->pos);
if (cl->buf->recycled && prev_last_shadow) {
if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
flush = 1;
break;
}
bsize += cl->buf->end - cl->buf->start;
}
prev_last_shadow = cl->buf->last_shadow;
p->in = p->in->next;
} else {
break;
}
cl->next = NULL;
if (out) {
*ll = cl;
} else {
out = cl;
}
ll = &cl->next;
}
flush:
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write: out:%p, f:%d", out, flush);
if (out == NULL) {
if (!flush) {
break;
}
/* a workaround for AIO */
if (flushed++ > 10) {
return NGX_BUSY;
}
}
rc = p->output_filter(p->output_ctx, out);
ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
if (rc == NGX_ERROR) {
p->downstream_error = 1;
return ngx_event_pipe_drain_chains(p);
}
for (cl = p->free; cl; cl = cl->next) {
if (cl->buf->temp_file) {
if (p->cacheable || !p->cyclic_temp_file) {
continue;
}
/* reset p->temp_offset if all bufs had been sent */
if (cl->buf->file_last == p->temp_file->offset) {
p->temp_file->offset = 0;
}
}
/* TODO: free buf if p->free_bufs && upstream done */
/* add the free shadow raw buf to p->free_raw_bufs */
if (cl->buf->last_shadow) {
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
return NGX_ABORT;
}
cl->buf->last_shadow = 0;
}
cl->buf->shadow = NULL;
}
}
return NGX_OK;
}
static ngx_int_t
ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
{
ssize_t size, bsize, n;
ngx_buf_t *b;
ngx_uint_t prev_last_shadow;
ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
if (p->buf_to_file) {
fl.buf = p->buf_to_file;
fl.next = p->in;
out = &fl;
} else {
out = p->in;
}
if (!p->cacheable) {
size = 0;
src/event/ngx_event_pipe.c view on Meta::CPAN
buf->shadow = NULL;
}
ngx_int_t
ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
{
ngx_chain_t *cl;
cl = ngx_alloc_chain_link(p->pool);
if (cl == NULL) {
return NGX_ERROR;
}
if (p->buf_to_file && b->start == p->buf_to_file->start) {
b->pos = p->buf_to_file->last;
b->last = p->buf_to_file->last;
} else {
b->pos = b->start;
b->last = b->start;
}
b->shadow = NULL;
cl->buf = b;
if (p->free_raw_bufs == NULL) {
p->free_raw_bufs = cl;
cl->next = NULL;
return NGX_OK;
}
if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
/* add the free buf to the list start */
cl->next = p->free_raw_bufs;
p->free_raw_bufs = cl;
return NGX_OK;
}
/* the first free buf is partially filled, thus add the free buf after it */
cl->next = p->free_raw_bufs->next;
p->free_raw_bufs->next = cl;
return NGX_OK;
}
static ngx_int_t
ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
{
ngx_chain_t *cl, *tl;
for ( ;; ) {
if (p->busy) {
cl = p->busy;
p->busy = NULL;
} else if (p->out) {
cl = p->out;
p->out = NULL;
} else if (p->in) {
cl = p->in;
p->in = NULL;
} else {
return NGX_OK;
}
while (cl) {
if (cl->buf->last_shadow) {
if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
return NGX_ABORT;
}
cl->buf->last_shadow = 0;
}
cl->buf->shadow = NULL;
tl = cl->next;
cl->next = p->free;
p->free = cl;
cl = tl;
}
}
}
( run in 0.575 second using v1.01-cache-2.11-cpan-39bf76dae61 )