[core] chunkqueue perf: code reuse
[lighttpd.git] / src / chunk.c
blobd24199909c2f04c713fe624280119549611f3b34
1 #include "first.h"
3 /**
4 * the network chunk-API
7 */
9 #include "chunk.h"
10 #include "fdevent.h"
11 #include "log.h"
13 #include <sys/types.h>
14 #include <sys/stat.h>
15 #include "sys-mmap.h"
17 #include <stdlib.h>
18 #include <fcntl.h>
19 #include <unistd.h>
21 #include <errno.h>
22 #include <string.h>
24 /* default 1MB, upper limit 128MB */
25 #define DEFAULT_TEMPFILE_SIZE (1 * 1024 * 1024)
26 #define MAX_TEMPFILE_SIZE (128 * 1024 * 1024)
28 static size_t chunk_buf_sz = 4096;
29 static chunk *chunks, *chunks_oversized;
30 static chunk *chunk_buffers;
31 static array *chunkqueue_default_tempdirs = NULL;
32 static off_t chunkqueue_default_tempfile_size = DEFAULT_TEMPFILE_SIZE;
34 void chunkqueue_set_chunk_size (size_t sz)
36 chunk_buf_sz = sz > 0 ? ((sz + 1023) & ~1023uL) : 4096;
39 void chunkqueue_set_tempdirs_default_reset (void)
41 chunkqueue_default_tempdirs = NULL;
42 chunkqueue_default_tempfile_size = DEFAULT_TEMPFILE_SIZE;
45 chunkqueue *chunkqueue_init(void) {
46 chunkqueue *cq;
48 cq = calloc(1, sizeof(*cq));
49 force_assert(NULL != cq);
51 cq->first = NULL;
52 cq->last = NULL;
54 cq->tempdirs = chunkqueue_default_tempdirs;
55 cq->upload_temp_file_size = chunkqueue_default_tempfile_size;
57 return cq;
60 static chunk *chunk_init(size_t sz) {
61 chunk *c;
63 c = calloc(1, sizeof(*c));
64 force_assert(NULL != c);
66 c->type = MEM_CHUNK;
67 c->mem = buffer_init();
68 c->file.start = c->file.length = c->file.mmap.offset = 0;
69 c->file.fd = -1;
70 c->file.mmap.start = MAP_FAILED;
71 c->file.mmap.length = 0;
72 c->file.is_temp = 0;
73 c->offset = 0;
74 c->next = NULL;
76 buffer_string_prepare_copy(c->mem, sz-1);
78 return c;
81 static void chunk_reset_file_chunk(chunk *c) {
82 if (c->file.is_temp && !buffer_string_is_empty(c->mem)) {
83 unlink(c->mem->ptr);
85 if (c->file.fd != -1) {
86 close(c->file.fd);
87 c->file.fd = -1;
89 if (MAP_FAILED != c->file.mmap.start) {
90 munmap(c->file.mmap.start, c->file.mmap.length);
91 c->file.mmap.start = MAP_FAILED;
93 c->file.start = c->file.length = c->file.mmap.offset = 0;
94 c->file.mmap.length = 0;
95 c->file.is_temp = 0;
96 c->type = MEM_CHUNK;
99 static void chunk_reset(chunk *c) {
100 if (c->type == FILE_CHUNK) chunk_reset_file_chunk(c);
102 buffer_clear(c->mem);
103 c->offset = 0;
106 static void chunk_free(chunk *c) {
107 if (c->type == FILE_CHUNK) chunk_reset_file_chunk(c);
108 buffer_free(c->mem);
109 free(c);
112 buffer * chunk_buffer_acquire(void) {
113 chunk *c;
114 buffer *b;
115 if (chunks) {
116 c = chunks;
117 chunks = c->next;
119 else {
120 c = chunk_init(chunk_buf_sz);
122 c->next = chunk_buffers;
123 chunk_buffers = c;
124 b = c->mem;
125 c->mem = NULL;
126 return b;
129 void chunk_buffer_release(buffer *b) {
130 if (NULL == b) return;
131 if (b->size >= chunk_buf_sz && chunk_buffers) {
132 chunk *c = chunk_buffers;
133 chunk_buffers = c->next;
134 c->mem = b;
135 c->next = chunks;
136 chunks = c;
137 buffer_clear(b);
139 else {
140 buffer_free(b);
144 static chunk * chunk_acquire(size_t sz) {
145 if (sz <= chunk_buf_sz) {
146 if (chunks) {
147 chunk *c = chunks;
148 chunks = c->next;
149 return c;
151 sz = chunk_buf_sz;
153 else {
154 sz = (sz + 8191) & ~8191uL;
155 /* future: might have buckets of certain sizes, up to socket buf sizes*/
156 if (chunks_oversized && chunks_oversized->mem->size >= sz) {
157 chunk *c = chunks_oversized;
158 chunks_oversized = c->next;
159 return c;
163 return chunk_init(sz);
166 static void chunk_release(chunk *c) {
167 const size_t sz = c->mem->size;
168 if (sz == chunk_buf_sz) {
169 chunk_reset(c);
170 c->next = chunks;
171 chunks = c;
173 else if (sz > chunk_buf_sz) {
174 chunk_reset(c);
175 chunk **co = &chunks_oversized;
176 while (*co && sz < (*co)->mem->size) co = &(*co)->next;
177 c->next = *co;
178 *co = c;
180 else {
181 chunk_free(c);
185 void chunkqueue_chunk_pool_clear(void)
187 for (chunk *next, *c = chunks; c; c = next) {
188 next = c->next;
189 chunk_free(c);
191 chunks = NULL;
192 for (chunk *next, *c = chunks_oversized; c; c = next) {
193 next = c->next;
194 chunk_free(c);
196 chunks_oversized = NULL;
199 void chunkqueue_chunk_pool_free(void)
201 chunkqueue_chunk_pool_clear();
202 for (chunk *next, *c = chunk_buffers; c; c = next) {
203 next = c->next;
204 c->mem = buffer_init(); /*(chunk_reset() expects c->mem != NULL)*/
205 chunk_free(c);
207 chunk_buffers = NULL;
210 static off_t chunk_remaining_length(const chunk *c) {
211 /* MEM_CHUNK or FILE_CHUNK */
212 return (c->type == MEM_CHUNK
213 ? (off_t)buffer_string_length(c->mem)
214 : c->file.length)
215 - c->offset;
218 static void chunkqueue_release_chunks(chunkqueue *cq) {
219 cq->last = NULL;
220 for (chunk *c; (c = cq->first); ) {
221 cq->first = c->next;
222 chunk_release(c);
226 void chunkqueue_free(chunkqueue *cq) {
227 if (NULL == cq) return;
228 chunkqueue_release_chunks(cq);
229 free(cq);
232 static void chunkqueue_prepend_chunk(chunkqueue *cq, chunk *c) {
233 if (NULL == (c->next = cq->first)) cq->last = c;
234 cq->first = c;
237 static void chunkqueue_append_chunk(chunkqueue *cq, chunk *c) {
238 c->next = NULL;
239 *(cq->last ? &cq->last->next : &cq->first) = c;
240 cq->last = c;
243 static chunk * chunkqueue_prepend_mem_chunk(chunkqueue *cq, size_t sz) {
244 chunk *c = chunk_acquire(sz);
245 chunkqueue_prepend_chunk(cq, c);
246 return c;
249 static chunk * chunkqueue_append_mem_chunk(chunkqueue *cq, size_t sz) {
250 chunk *c = chunk_acquire(sz);
251 chunkqueue_append_chunk(cq, c);
252 return c;
255 static chunk * chunkqueue_append_file_chunk(chunkqueue *cq, buffer *fn, off_t offset, off_t len) {
256 chunk *c = chunk_acquire(buffer_string_length(fn)+1);
257 chunkqueue_append_chunk(cq, c);
258 c->type = FILE_CHUNK;
259 c->file.start = offset;
260 c->file.length = len;
261 cq->bytes_in += len;
262 buffer_copy_buffer(c->mem, fn);
263 return c;
266 void chunkqueue_reset(chunkqueue *cq) {
267 chunkqueue_release_chunks(cq);
268 cq->bytes_in = 0;
269 cq->bytes_out = 0;
270 cq->tempdir_idx = 0;
273 void chunkqueue_append_file_fd(chunkqueue *cq, buffer *fn, int fd, off_t offset, off_t len) {
274 if (len > 0) {
275 (chunkqueue_append_file_chunk(cq, fn, offset, len))->file.fd = fd;
277 else {
278 close(fd);
282 void chunkqueue_append_file(chunkqueue *cq, buffer *fn, off_t offset, off_t len) {
283 if (len > 0) {
284 chunkqueue_append_file_chunk(cq, fn, offset, len);
289 static int chunkqueue_append_mem_extend_chunk(chunkqueue *cq, const char *mem, size_t len) {
290 chunk *c = cq->last;
291 if (0 == len) return 1;
292 if (c != NULL && c->type == MEM_CHUNK
293 && buffer_string_space(c->mem) >= len) {
294 buffer_append_string_len(c->mem, mem, len);
295 cq->bytes_in += len;
296 return 1;
298 return 0;
302 void chunkqueue_append_buffer(chunkqueue *cq, buffer *mem) {
303 chunk *c;
304 size_t len = buffer_string_length(mem);
305 if (len < 256 && chunkqueue_append_mem_extend_chunk(cq, mem->ptr, len)) return;
307 c = chunkqueue_append_mem_chunk(cq, chunk_buf_sz);
308 cq->bytes_in += len;
309 buffer_move(c->mem, mem);
313 void chunkqueue_append_mem(chunkqueue *cq, const char * mem, size_t len) {
314 chunk *c;
315 if (len < chunk_buf_sz && chunkqueue_append_mem_extend_chunk(cq, mem, len))
316 return;
318 c = chunkqueue_append_mem_chunk(cq, len+1);
319 cq->bytes_in += len;
320 buffer_copy_string_len(c->mem, mem, len);
324 void chunkqueue_append_mem_min(chunkqueue *cq, const char * mem, size_t len) {
325 chunk *c;
326 if (len < chunk_buf_sz && chunkqueue_append_mem_extend_chunk(cq, mem, len))
327 return;
329 c = chunk_init(len+1);
330 chunkqueue_append_chunk(cq, c);
331 cq->bytes_in += len;
332 buffer_copy_string_len(c->mem, mem, len);
336 void chunkqueue_append_chunkqueue(chunkqueue *cq, chunkqueue *src) {
337 if (src == NULL || NULL == src->first) return;
339 if (NULL == cq->first) {
340 cq->first = src->first;
341 } else {
342 cq->last->next = src->first;
344 cq->last = src->last;
345 cq->bytes_in += (src->bytes_in - src->bytes_out);
347 src->first = NULL;
348 src->last = NULL;
349 src->bytes_out = src->bytes_in;
353 buffer * chunkqueue_prepend_buffer_open_sz(chunkqueue *cq, size_t sz) {
354 chunk * const c = chunkqueue_prepend_mem_chunk(cq, sz);
355 return c->mem;
359 buffer * chunkqueue_prepend_buffer_open(chunkqueue *cq) {
360 return chunkqueue_prepend_buffer_open_sz(cq, chunk_buf_sz);
364 void chunkqueue_prepend_buffer_commit(chunkqueue *cq) {
365 cq->bytes_in += buffer_string_length(cq->first->mem);
369 buffer * chunkqueue_append_buffer_open_sz(chunkqueue *cq, size_t sz) {
370 chunk * const c = chunkqueue_append_mem_chunk(cq, sz);
371 return c->mem;
375 buffer * chunkqueue_append_buffer_open(chunkqueue *cq) {
376 return chunkqueue_append_buffer_open_sz(cq, chunk_buf_sz);
380 void chunkqueue_append_buffer_commit(chunkqueue *cq) {
381 cq->bytes_in += buffer_string_length(cq->last->mem);
385 static void chunkqueue_remove_empty_chunks(chunkqueue *cq);
388 char * chunkqueue_get_memory(chunkqueue *cq, size_t *len) {
389 size_t sz = *len ? *len : (chunk_buf_sz >> 1);
390 buffer *b;
391 chunk *c = cq->last;
392 if (NULL != c && MEM_CHUNK == c->type) {
393 /* return pointer into existing buffer if large enough */
394 size_t avail = buffer_string_space(c->mem);
395 if (avail >= sz) {
396 *len = avail;
397 b = c->mem;
398 return b->ptr + buffer_string_length(b);
402 /* allocate new chunk */
403 b = chunkqueue_append_buffer_open_sz(cq, sz);
404 *len = buffer_string_space(b);
405 return b->ptr;
408 void chunkqueue_use_memory(chunkqueue *cq, size_t len) {
409 buffer *b;
411 force_assert(NULL != cq);
412 force_assert(NULL != cq->last && MEM_CHUNK == cq->last->type);
413 b = cq->last->mem;
415 if (len > 0) {
416 buffer_commit(b, len);
417 cq->bytes_in += len;
418 } else if (buffer_string_is_empty(b)) {
419 /* scan chunkqueue to remove empty last chunk
420 * (generally not expecting a deep queue) */
421 chunkqueue_remove_empty_chunks(cq);
425 void chunkqueue_set_tempdirs_default (array *tempdirs, off_t upload_temp_file_size) {
426 chunkqueue_default_tempdirs = tempdirs;
427 chunkqueue_default_tempfile_size
428 = (0 == upload_temp_file_size) ? DEFAULT_TEMPFILE_SIZE
429 : (upload_temp_file_size > MAX_TEMPFILE_SIZE) ? MAX_TEMPFILE_SIZE
430 : upload_temp_file_size;
433 void chunkqueue_set_tempdirs(chunkqueue *cq, array *tempdirs, off_t upload_temp_file_size) {
434 force_assert(NULL != cq);
435 cq->tempdirs = tempdirs;
436 cq->upload_temp_file_size
437 = (0 == upload_temp_file_size) ? DEFAULT_TEMPFILE_SIZE
438 : (upload_temp_file_size > MAX_TEMPFILE_SIZE) ? MAX_TEMPFILE_SIZE
439 : upload_temp_file_size;
440 cq->tempdir_idx = 0;
443 void chunkqueue_steal(chunkqueue *dest, chunkqueue *src, off_t len) {
444 while (len > 0) {
445 chunk *c = src->first;
446 off_t clen = 0, use;
448 if (NULL == c) break;
450 clen = chunk_remaining_length(c);
451 if (0 == clen) {
452 /* drop empty chunk */
453 src->first = c->next;
454 if (c == src->last) src->last = NULL;
455 chunk_release(c);
456 continue;
459 use = len >= clen ? clen : len;
460 len -= use;
462 if (use == clen) {
463 /* move complete chunk */
464 src->first = c->next;
465 if (c == src->last) src->last = NULL;
467 chunkqueue_append_chunk(dest, c);
468 dest->bytes_in += use;
469 } else {
470 /* partial chunk with length "use" */
472 switch (c->type) {
473 case MEM_CHUNK:
474 chunkqueue_append_mem(dest, c->mem->ptr + c->offset, use);
475 break;
476 case FILE_CHUNK:
477 /* tempfile flag is in "last" chunk after the split */
478 chunkqueue_append_file(dest, c->mem, c->file.start + c->offset, use);
479 break;
482 c->offset += use;
483 force_assert(0 == len);
486 src->bytes_out += use;
490 static chunk *chunkqueue_get_append_tempfile(server *srv, chunkqueue *cq) {
491 chunk *c;
492 buffer *template = buffer_init_string("/var/tmp/lighttpd-upload-XXXXXX");
493 int fd = -1;
495 if (cq->tempdirs && cq->tempdirs->used) {
496 /* we have several tempdirs, only if all of them fail we jump out */
498 for (errno = EIO; cq->tempdir_idx < cq->tempdirs->used; ++cq->tempdir_idx) {
499 data_string *ds = (data_string *)cq->tempdirs->data[cq->tempdir_idx];
501 buffer_copy_buffer(template, ds->value);
502 buffer_append_path_len(template, CONST_STR_LEN("lighttpd-upload-XXXXXX"));
503 if (-1 != (fd = fdevent_mkstemp_append(template->ptr))) break;
505 } else {
506 fd = fdevent_mkstemp_append(template->ptr);
509 if (fd < 0) {
510 /* (report only the last error to mkstemp()
511 * if multiple temp dirs attempted) */
512 log_error_write(srv, __FILE__, __LINE__, "sbs",
513 "opening temp-file failed:",
514 template, strerror(errno));
515 buffer_free(template);
516 return NULL;
519 c = chunkqueue_append_file_chunk(cq, template, 0, 0);
520 c->file.fd = fd;
521 c->file.is_temp = 1;
523 buffer_free(template);
525 return c;
528 int chunkqueue_append_mem_to_tempfile(server *srv, chunkqueue *dest, const char *mem, size_t len) {
529 chunk *dst_c;
530 ssize_t written;
532 do {
534 * if the last chunk is
535 * - smaller than dest->upload_temp_file_size
536 * - not read yet (offset == 0)
537 * -> append to it (so it might actually become larger than dest->upload_temp_file_size)
538 * otherwise
539 * -> create a new chunk
541 * */
543 dst_c = dest->last;
544 if (NULL != dst_c
545 && FILE_CHUNK == dst_c->type
546 && dst_c->file.is_temp
547 && dst_c->file.fd >= 0
548 && 0 == dst_c->offset) {
549 /* ok, take the last chunk for our job */
551 if (dst_c->file.length >= (off_t)dest->upload_temp_file_size) {
552 /* the chunk is too large now, close it */
553 int rc = close(dst_c->file.fd);
554 dst_c->file.fd = -1;
555 if (0 != rc) {
556 log_error_write(srv, __FILE__, __LINE__, "sbss",
557 "close() temp-file", dst_c->mem, "failed:",
558 strerror(errno));
559 return -1;
561 dst_c = NULL;
563 } else {
564 dst_c = NULL;
567 if (NULL == dst_c && NULL == (dst_c = chunkqueue_get_append_tempfile(srv, dest))) {
568 return -1;
570 #ifdef __COVERITY__
571 if (dst_c->file.fd < 0) return -1;
572 #endif
574 /* (dst_c->file.fd >= 0) */
575 /* coverity[negative_returns : FALSE] */
576 written = write(dst_c->file.fd, mem, len);
578 if ((size_t) written == len) {
579 dst_c->file.length += len;
580 dest->bytes_in += len;
582 return 0;
583 } else if (written >= 0) {
584 /*(assume EINTR if partial write and retry write();
585 * retry write() might fail with ENOSPC if no more space on volume)*/
586 dest->bytes_in += written;
587 mem += written;
588 len -= (size_t)written;
589 dst_c->file.length += (size_t)written;
590 /* continue; retry */
591 } else if (errno == EINTR) {
592 /* continue; retry */
593 } else {
594 int retry = (errno == ENOSPC && dest->tempdirs && ++dest->tempdir_idx < dest->tempdirs->used);
595 if (!retry) {
596 log_error_write(srv, __FILE__, __LINE__, "sbs",
597 "write() temp-file", dst_c->mem, "failed:",
598 strerror(errno));
601 if (0 == chunk_remaining_length(dst_c)) {
602 /*(remove empty chunk and unlink tempfile)*/
603 chunkqueue_remove_empty_chunks(dest);
604 } else {/*(close tempfile; avoid later attempts to append)*/
605 int rc = close(dst_c->file.fd);
606 dst_c->file.fd = -1;
607 if (0 != rc) {
608 log_error_write(srv, __FILE__, __LINE__, "sbss",
609 "close() temp-file", dst_c->mem, "failed:",
610 strerror(errno));
611 return -1;
614 if (!retry) break; /* return -1; */
616 /* continue; retry */
619 } while (dst_c);
621 return -1;
624 int chunkqueue_steal_with_tempfiles(server *srv, chunkqueue *dest, chunkqueue *src, off_t len) {
625 while (len > 0) {
626 chunk *c = src->first;
627 off_t clen = 0, use;
629 if (NULL == c) break;
631 clen = chunk_remaining_length(c);
632 if (0 == clen) {
633 /* drop empty chunk */
634 src->first = c->next;
635 if (c == src->last) src->last = NULL;
636 chunk_release(c);
637 continue;
640 use = (len >= clen) ? clen : len;
641 len -= use;
643 switch (c->type) {
644 case FILE_CHUNK:
645 if (use == clen) {
646 /* move complete chunk */
647 src->first = c->next;
648 if (c == src->last) src->last = NULL;
649 chunkqueue_append_chunk(dest, c);
650 dest->bytes_in += use;
651 } else {
652 /* partial chunk with length "use" */
653 /* tempfile flag is in "last" chunk after the split */
654 chunkqueue_append_file(dest, c->mem, c->file.start + c->offset, use);
656 c->offset += use;
657 force_assert(0 == len);
659 break;
661 case MEM_CHUNK:
662 /* store "use" bytes from memory chunk in tempfile */
663 if (0 != chunkqueue_append_mem_to_tempfile(srv, dest, c->mem->ptr + c->offset, use)) {
664 return -1;
667 if (use == clen) {
668 /* finished chunk */
669 src->first = c->next;
670 if (c == src->last) src->last = NULL;
671 chunk_release(c);
672 } else {
673 /* partial chunk */
674 c->offset += use;
675 force_assert(0 == len);
677 break;
680 src->bytes_out += use;
683 return 0;
686 off_t chunkqueue_length(chunkqueue *cq) {
687 off_t len = 0;
688 chunk *c;
690 for (c = cq->first; c; c = c->next) {
691 len += chunk_remaining_length(c);
694 return len;
697 void chunkqueue_mark_written(chunkqueue *cq, off_t len) {
698 cq->bytes_out += len;
700 for (chunk *c; (c = cq->first); ) {
701 off_t c_len = chunk_remaining_length(c);
702 if (len >= c_len) { /* chunk got finished */
703 len -= c_len;
704 cq->first = c->next;
705 chunk_release(c);
706 if (0 == len) break;
708 else { /* partial chunk */
709 c->offset += len;
710 return; /* chunk not finished */
714 if (NULL == cq->first)
715 cq->last = NULL;
716 else
717 chunkqueue_remove_finished_chunks(cq);
720 void chunkqueue_remove_finished_chunks(chunkqueue *cq) {
721 for (chunk *c; (c = cq->first) && 0 == chunk_remaining_length(c); ){
722 if (NULL == (cq->first = c->next)) cq->last = NULL;
723 chunk_release(c);
727 static void chunkqueue_remove_empty_chunks(chunkqueue *cq) {
728 chunk *c;
729 chunkqueue_remove_finished_chunks(cq);
731 for (c = cq->first; c && c->next; c = c->next) {
732 if (0 == chunk_remaining_length(c->next)) {
733 chunk *empty = c->next;
734 c->next = empty->next;
735 if (empty == cq->last) cq->last = c;
736 chunk_release(empty);
741 int chunkqueue_open_file_chunk(server *srv, chunkqueue *cq) {
742 chunk* const c = cq->first;
743 off_t offset, toSend;
744 struct stat st;
746 force_assert(NULL != c);
747 force_assert(FILE_CHUNK == c->type);
748 force_assert(c->offset >= 0 && c->offset <= c->file.length);
750 offset = c->file.start + c->offset;
751 toSend = c->file.length - c->offset;
753 if (-1 == c->file.fd) {
754 /* (permit symlinks; should already have been checked. However, TOC-TOU remains) */
755 if (-1 == (c->file.fd = fdevent_open_cloexec(c->mem->ptr, 1, O_RDONLY, 0))) {
756 log_error_write(srv, __FILE__, __LINE__, "ssb", "open failed:", strerror(errno), c->mem);
757 return -1;
761 /*(skip file size checks if file is temp file created by lighttpd)*/
762 if (c->file.is_temp) return 0;
764 if (-1 == fstat(c->file.fd, &st)) {
765 log_error_write(srv, __FILE__, __LINE__, "ss", "fstat failed:", strerror(errno));
766 return -1;
769 if (offset > st.st_size || toSend > st.st_size || offset > st.st_size - toSend) {
770 log_error_write(srv, __FILE__, __LINE__, "sb", "file shrunk:", c->mem);
771 return -1;
774 return 0;