3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Nginx, Inc.
8 #include <ngx_config.h>
10 #include <ngx_event.h>
11 #include <ngx_event_pipe.h>
14 static ngx_int_t
ngx_event_pipe_read_upstream(ngx_event_pipe_t
*p
);
15 static ngx_int_t
ngx_event_pipe_write_to_downstream(ngx_event_pipe_t
*p
);
17 static ngx_int_t
ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t
*p
);
18 static ngx_inline
void ngx_event_pipe_remove_shadow_links(ngx_buf_t
*buf
);
19 static ngx_int_t
ngx_event_pipe_drain_chains(ngx_event_pipe_t
*p
);
23 ngx_event_pipe(ngx_event_pipe_t
*p
, ngx_int_t do_write
)
27 ngx_event_t
*rev
, *wev
;
31 p
->log
->action
= "sending to client";
33 rc
= ngx_event_pipe_write_to_downstream(p
);
35 if (rc
== NGX_ABORT
) {
45 p
->upstream_blocked
= 0;
47 p
->log
->action
= "reading upstream";
49 if (ngx_event_pipe_read_upstream(p
) == NGX_ABORT
) {
53 if (!p
->read
&& !p
->upstream_blocked
) {
60 if (p
->upstream
->fd
!= -1) {
61 rev
= p
->upstream
->read
;
63 flags
= (rev
->eof
|| rev
->error
) ? NGX_CLOSE_EVENT
: 0;
65 if (ngx_handle_read_event(rev
, flags
) != NGX_OK
) {
69 if (rev
->active
&& !rev
->ready
) {
70 ngx_add_timer(rev
, p
->read_timeout
);
72 } else if (rev
->timer_set
) {
77 if (p
->downstream
->fd
!= -1 && p
->downstream
->data
== p
->output_ctx
) {
78 wev
= p
->downstream
->write
;
79 if (ngx_handle_write_event(wev
, p
->send_lowat
) != NGX_OK
) {
84 if (wev
->active
&& !wev
->ready
) {
85 ngx_add_timer(wev
, p
->send_timeout
);
87 } else if (wev
->timer_set
) {
98 ngx_event_pipe_read_upstream(ngx_event_pipe_t
*p
)
103 ngx_chain_t
*chain
, *cl
, *ln
;
105 if (p
->upstream_eof
|| p
->upstream_error
|| p
->upstream_done
) {
109 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
110 "pipe read upstream: %d", p
->upstream
->read
->ready
);
114 if (p
->upstream_eof
|| p
->upstream_error
|| p
->upstream_done
) {
118 if (p
->preread_bufs
== NULL
&& !p
->upstream
->read
->ready
) {
122 if (p
->preread_bufs
) {
124 /* use the pre-read bufs if they exist */
126 chain
= p
->preread_bufs
;
127 p
->preread_bufs
= NULL
;
130 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
131 "pipe preread: %z", n
);
139 #if (NGX_HAVE_KQUEUE)
142 * kqueue notifies about the end of file or a pending error.
143 * This test allows not to allocate a buf on these conditions
144 * and not to call c->recv_chain().
147 if (p
->upstream
->read
->available
== 0
148 && p
->upstream
->read
->pending_eof
)
150 p
->upstream
->read
->ready
= 0;
151 p
->upstream
->read
->eof
= 1;
155 if (p
->upstream
->read
->kq_errno
) {
156 p
->upstream
->read
->error
= 1;
157 p
->upstream_error
= 1;
160 ngx_log_error(NGX_LOG_ERR
, p
->log
,
161 p
->upstream
->read
->kq_errno
,
162 "kevent() reported that upstream "
163 "closed connection");
170 if (p
->free_raw_bufs
) {
172 /* use the free bufs if they exist */
174 chain
= p
->free_raw_bufs
;
176 p
->free_raw_bufs
= p
->free_raw_bufs
->next
;
179 p
->free_raw_bufs
= NULL
;
182 } else if (p
->allocated
< p
->bufs
.num
) {
184 /* allocate a new buf if it's still allowed */
186 b
= ngx_create_temp_buf(p
->pool
, p
->bufs
.size
);
193 chain
= ngx_alloc_chain_link(p
->pool
);
201 } else if (!p
->cacheable
202 && p
->downstream
->data
== p
->output_ctx
203 && p
->downstream
->write
->ready
204 && !p
->downstream
->write
->delayed
)
207 * if the bufs are not needed to be saved in a cache and
208 * a downstream is ready then write the bufs to a downstream
211 p
->upstream_blocked
= 1;
213 ngx_log_debug0(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
214 "pipe downstream ready");
218 } else if (p
->cacheable
219 || p
->temp_file
->offset
< p
->max_temp_file_size
)
223 * if it is allowed, then save some bufs from r->in
224 * to a temporary file, and add them to a r->out chain
227 rc
= ngx_event_pipe_write_chain_to_temp_file(p
);
229 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
230 "pipe temp offset: %O", p
->temp_file
->offset
);
232 if (rc
== NGX_BUSY
) {
236 if (rc
== NGX_AGAIN
) {
237 if (ngx_event_flags
& NGX_USE_LEVEL_EVENT
238 && p
->upstream
->read
->active
239 && p
->upstream
->read
->ready
)
241 if (ngx_del_event(p
->upstream
->read
, NGX_READ_EVENT
, 0)
253 chain
= p
->free_raw_bufs
;
255 p
->free_raw_bufs
= p
->free_raw_bufs
->next
;
258 p
->free_raw_bufs
= NULL
;
263 /* there are no bufs to read in */
265 ngx_log_debug0(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
266 "no pipe bufs to read in");
271 n
= p
->upstream
->recv_chain(p
->upstream
, chain
);
273 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
274 "pipe recv chain: %z", n
);
276 if (p
->free_raw_bufs
) {
277 chain
->next
= p
->free_raw_bufs
;
279 p
->free_raw_bufs
= chain
;
281 if (n
== NGX_ERROR
) {
282 p
->upstream_error
= 1;
286 if (n
== NGX_AGAIN
) {
288 ngx_event_pipe_remove_shadow_links(chain
->buf
);
304 p
->free_raw_bufs
= NULL
;
306 while (cl
&& n
> 0) {
308 ngx_event_pipe_remove_shadow_links(cl
->buf
);
310 size
= cl
->buf
->end
- cl
->buf
->last
;
313 cl
->buf
->last
= cl
->buf
->end
;
315 /* STUB */ cl
->buf
->num
= p
->num
++;
317 if (p
->input_filter(p
, cl
->buf
) == NGX_ERROR
) {
324 ngx_free_chain(p
->pool
, ln
);
333 for (ln
= cl
; ln
->next
; ln
= ln
->next
) { /* void */ }
335 ln
->next
= p
->free_raw_bufs
;
336 p
->free_raw_bufs
= cl
;
342 for (cl
= p
->busy
; cl
; cl
= cl
->next
) {
343 ngx_log_debug8(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
344 "pipe buf busy s:%d t:%d f:%d "
345 "%p, pos %p, size: %z "
346 "file: %O, size: %z",
347 (cl
->buf
->shadow
? 1 : 0),
348 cl
->buf
->temporary
, cl
->buf
->in_file
,
349 cl
->buf
->start
, cl
->buf
->pos
,
350 cl
->buf
->last
- cl
->buf
->pos
,
352 cl
->buf
->file_last
- cl
->buf
->file_pos
);
355 for (cl
= p
->out
; cl
; cl
= cl
->next
) {
356 ngx_log_debug8(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
357 "pipe buf out s:%d t:%d f:%d "
358 "%p, pos %p, size: %z "
359 "file: %O, size: %z",
360 (cl
->buf
->shadow
? 1 : 0),
361 cl
->buf
->temporary
, cl
->buf
->in_file
,
362 cl
->buf
->start
, cl
->buf
->pos
,
363 cl
->buf
->last
- cl
->buf
->pos
,
365 cl
->buf
->file_last
- cl
->buf
->file_pos
);
368 for (cl
= p
->in
; cl
; cl
= cl
->next
) {
369 ngx_log_debug8(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
370 "pipe buf in s:%d t:%d f:%d "
371 "%p, pos %p, size: %z "
372 "file: %O, size: %z",
373 (cl
->buf
->shadow
? 1 : 0),
374 cl
->buf
->temporary
, cl
->buf
->in_file
,
375 cl
->buf
->start
, cl
->buf
->pos
,
376 cl
->buf
->last
- cl
->buf
->pos
,
378 cl
->buf
->file_last
- cl
->buf
->file_pos
);
381 for (cl
= p
->free_raw_bufs
; cl
; cl
= cl
->next
) {
382 ngx_log_debug8(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
383 "pipe buf free s:%d t:%d f:%d "
384 "%p, pos %p, size: %z "
385 "file: %O, size: %z",
386 (cl
->buf
->shadow
? 1 : 0),
387 cl
->buf
->temporary
, cl
->buf
->in_file
,
388 cl
->buf
->start
, cl
->buf
->pos
,
389 cl
->buf
->last
- cl
->buf
->pos
,
391 cl
->buf
->file_last
- cl
->buf
->file_pos
);
394 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
395 "pipe length: %O", p
->length
);
399 if (p
->free_raw_bufs
&& p
->length
!= -1) {
400 cl
= p
->free_raw_bufs
;
402 if (cl
->buf
->last
- cl
->buf
->pos
>= p
->length
) {
404 p
->free_raw_bufs
= cl
->next
;
406 /* STUB */ cl
->buf
->num
= p
->num
++;
408 if (p
->input_filter(p
, cl
->buf
) == NGX_ERROR
) {
412 ngx_free_chain(p
->pool
, cl
);
416 if (p
->length
== 0) {
417 p
->upstream_done
= 1;
421 if ((p
->upstream_eof
|| p
->upstream_error
) && p
->free_raw_bufs
) {
423 /* STUB */ p
->free_raw_bufs
->buf
->num
= p
->num
++;
425 if (p
->input_filter(p
, p
->free_raw_bufs
->buf
) == NGX_ERROR
) {
429 p
->free_raw_bufs
= p
->free_raw_bufs
->next
;
431 if (p
->free_bufs
&& p
->buf_to_file
== NULL
) {
432 for (cl
= p
->free_raw_bufs
; cl
; cl
= cl
->next
) {
433 if (cl
->buf
->shadow
== NULL
) {
434 ngx_pfree(p
->pool
, cl
->buf
->start
);
440 if (p
->cacheable
&& p
->in
) {
441 if (ngx_event_pipe_write_chain_to_temp_file(p
) == NGX_ABORT
) {
451 ngx_event_pipe_write_to_downstream(ngx_event_pipe_t
*p
)
456 ngx_uint_t flush
, flushed
, prev_last_shadow
;
457 ngx_chain_t
*out
, **ll
, *cl
, file
;
458 ngx_connection_t
*downstream
;
460 downstream
= p
->downstream
;
462 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
463 "pipe write downstream: %d", downstream
->write
->ready
);
468 if (p
->downstream_error
) {
469 return ngx_event_pipe_drain_chains(p
);
472 if (p
->upstream_eof
|| p
->upstream_error
|| p
->upstream_done
) {
474 /* pass the p->out and p->in chains to the output filter */
476 for (cl
= p
->busy
; cl
; cl
= cl
->next
) {
477 cl
->buf
->recycled
= 0;
481 ngx_log_debug0(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
482 "pipe write downstream flush out");
484 for (cl
= p
->out
; cl
; cl
= cl
->next
) {
485 cl
->buf
->recycled
= 0;
488 rc
= p
->output_filter(p
->output_ctx
, p
->out
);
490 if (rc
== NGX_ERROR
) {
491 p
->downstream_error
= 1;
492 return ngx_event_pipe_drain_chains(p
);
499 ngx_log_debug0(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
500 "pipe write downstream flush in");
502 for (cl
= p
->in
; cl
; cl
= cl
->next
) {
503 cl
->buf
->recycled
= 0;
506 rc
= p
->output_filter(p
->output_ctx
, p
->in
);
508 if (rc
== NGX_ERROR
) {
509 p
->downstream_error
= 1;
510 return ngx_event_pipe_drain_chains(p
);
516 if (p
->cacheable
&& p
->buf_to_file
) {
518 file
.buf
= p
->buf_to_file
;
521 if (ngx_write_chain_to_temp_file(p
->temp_file
, &file
)
528 ngx_log_debug0(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
529 "pipe write downstream done");
531 /* TODO: free unused bufs */
533 p
->downstream_done
= 1;
537 if (downstream
->data
!= p
->output_ctx
538 || !downstream
->write
->ready
539 || downstream
->write
->delayed
)
544 /* bsize is the size of the busy recycled bufs */
549 for (cl
= p
->busy
; cl
; cl
= cl
->next
) {
551 if (cl
->buf
->recycled
) {
552 if (prev
== cl
->buf
->start
) {
556 bsize
+= cl
->buf
->end
- cl
->buf
->start
;
557 prev
= cl
->buf
->start
;
561 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
562 "pipe write busy: %uz", bsize
);
566 if (bsize
>= (size_t) p
->busy_size
) {
573 prev_last_shadow
= 1;
579 if (cl
->buf
->recycled
) {
580 ngx_log_error(NGX_LOG_ALERT
, p
->log
, 0,
581 "recycled buffer in pipe out chain");
584 p
->out
= p
->out
->next
;
586 } else if (!p
->cacheable
&& p
->in
) {
589 ngx_log_debug3(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
590 "pipe write buf ls:%d %p %z",
591 cl
->buf
->last_shadow
,
593 cl
->buf
->last
- cl
->buf
->pos
);
595 if (cl
->buf
->recycled
&& prev_last_shadow
) {
596 if (bsize
+ cl
->buf
->end
- cl
->buf
->start
> p
->busy_size
) {
601 bsize
+= cl
->buf
->end
- cl
->buf
->start
;
604 prev_last_shadow
= cl
->buf
->last_shadow
;
624 ngx_log_debug2(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
625 "pipe write: out:%p, f:%d", out
, flush
);
633 /* a workaround for AIO */
634 if (flushed
++ > 10) {
639 rc
= p
->output_filter(p
->output_ctx
, out
);
641 ngx_chain_update_chains(p
->pool
, &p
->free
, &p
->busy
, &out
, p
->tag
);
643 if (rc
== NGX_ERROR
) {
644 p
->downstream_error
= 1;
645 return ngx_event_pipe_drain_chains(p
);
648 for (cl
= p
->free
; cl
; cl
= cl
->next
) {
650 if (cl
->buf
->temp_file
) {
651 if (p
->cacheable
|| !p
->cyclic_temp_file
) {
655 /* reset p->temp_offset if all bufs had been sent */
657 if (cl
->buf
->file_last
== p
->temp_file
->offset
) {
658 p
->temp_file
->offset
= 0;
662 /* TODO: free buf if p->free_bufs && upstream done */
664 /* add the free shadow raw buf to p->free_raw_bufs */
666 if (cl
->buf
->last_shadow
) {
667 if (ngx_event_pipe_add_free_buf(p
, cl
->buf
->shadow
) != NGX_OK
) {
671 cl
->buf
->last_shadow
= 0;
674 cl
->buf
->shadow
= NULL
;
683 ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t
*p
)
685 ssize_t size
, bsize
, n
;
687 ngx_uint_t prev_last_shadow
;
688 ngx_chain_t
*cl
, *tl
, *next
, *out
, **ll
, **last_out
, **last_free
, fl
;
690 if (p
->buf_to_file
) {
691 fl
.buf
= p
->buf_to_file
;
704 prev_last_shadow
= 1;
706 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
707 "pipe offset: %O", p
->temp_file
->offset
);
710 bsize
= cl
->buf
->last
- cl
->buf
->pos
;
712 ngx_log_debug4(NGX_LOG_DEBUG_EVENT
, p
->log
, 0,
713 "pipe buf ls:%d %p, pos %p, size: %z",
714 cl
->buf
->last_shadow
, cl
->buf
->start
,
715 cl
->buf
->pos
, bsize
);
718 && ((size
+ bsize
> p
->temp_file_write_size
)
719 || (p
->temp_file
->offset
+ size
+ bsize
720 > p
->max_temp_file_size
)))
725 prev_last_shadow
= cl
->buf
->last_shadow
;
733 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0, "size: %z", size
);
753 n
= ngx_write_chain_to_temp_file(p
->temp_file
, out
);
755 if (n
== NGX_ERROR
) {
759 if (p
->buf_to_file
) {
760 p
->temp_file
->offset
= p
->buf_to_file
->last
- p
->buf_to_file
->pos
;
761 n
-= p
->buf_to_file
->last
- p
->buf_to_file
->pos
;
762 p
->buf_to_file
= NULL
;
767 /* update previous buffer or add new buffer */
770 for (cl
= p
->out
; cl
->next
; cl
= cl
->next
) { /* void */ }
774 if (b
->file_last
== p
->temp_file
->offset
) {
775 p
->temp_file
->offset
+= n
;
776 b
->file_last
= p
->temp_file
->offset
;
780 last_out
= &cl
->next
;
786 cl
= ngx_chain_get_free_buf(p
->pool
, &p
->free
);
793 ngx_memzero(b
, sizeof(ngx_buf_t
));
797 b
->file
= &p
->temp_file
->file
;
798 b
->file_pos
= p
->temp_file
->offset
;
799 p
->temp_file
->offset
+= n
;
800 b
->file_last
= p
->temp_file
->offset
;
810 for (last_free
= &p
->free_raw_bufs
;
812 last_free
= &(*last_free
)->next
)
817 for (cl
= out
; cl
; cl
= next
) {
825 if (b
->last_shadow
) {
827 tl
= ngx_alloc_chain_link(p
->pool
);
836 last_free
= &tl
->next
;
838 b
->shadow
->pos
= b
->shadow
->start
;
839 b
->shadow
->last
= b
->shadow
->start
;
841 ngx_event_pipe_remove_shadow_links(b
->shadow
);
849 /* the copy input filter */
852 ngx_event_pipe_copy_input_filter(ngx_event_pipe_t
*p
, ngx_buf_t
*buf
)
857 if (buf
->pos
== buf
->last
) {
865 ngx_free_chain(p
->pool
, cl
);
868 b
= ngx_alloc_buf(p
->pool
);
874 ngx_memcpy(b
, buf
, sizeof(ngx_buf_t
));
881 cl
= ngx_alloc_chain_link(p
->pool
);
889 ngx_log_debug1(NGX_LOG_DEBUG_EVENT
, p
->log
, 0, "input buf #%d", b
->num
);
896 p
->last_in
= &cl
->next
;
898 if (p
->length
== -1) {
902 p
->length
-= b
->last
- b
->pos
;
908 static ngx_inline
void
909 ngx_event_pipe_remove_shadow_links(ngx_buf_t
*buf
)
919 while (!b
->last_shadow
) {
940 ngx_event_pipe_add_free_buf(ngx_event_pipe_t
*p
, ngx_buf_t
*b
)
944 cl
= ngx_alloc_chain_link(p
->pool
);
949 if (p
->buf_to_file
&& b
->start
== p
->buf_to_file
->start
) {
950 b
->pos
= p
->buf_to_file
->last
;
951 b
->last
= p
->buf_to_file
->last
;
962 if (p
->free_raw_bufs
== NULL
) {
963 p
->free_raw_bufs
= cl
;
969 if (p
->free_raw_bufs
->buf
->pos
== p
->free_raw_bufs
->buf
->last
) {
971 /* add the free buf to the list start */
973 cl
->next
= p
->free_raw_bufs
;
974 p
->free_raw_bufs
= cl
;
979 /* the first free buf is partially filled, thus add the free buf after it */
981 cl
->next
= p
->free_raw_bufs
->next
;
982 p
->free_raw_bufs
->next
= cl
;
989 ngx_event_pipe_drain_chains(ngx_event_pipe_t
*p
)
991 ngx_chain_t
*cl
, *tl
;
1011 if (cl
->buf
->last_shadow
) {
1012 if (ngx_event_pipe_add_free_buf(p
, cl
->buf
->shadow
) != NGX_OK
) {
1016 cl
->buf
->last_shadow
= 0;
1019 cl
->buf
->shadow
= NULL
;