4 * the network chunk-API
13 #include <sys/types.h>
24 /* default 1MB, upper limit 128MB */
25 #define DEFAULT_TEMPFILE_SIZE (1 * 1024 * 1024)
26 #define MAX_TEMPFILE_SIZE (128 * 1024 * 1024)
28 static size_t chunk_buf_sz
= 4096;
29 static chunk
*chunks
, *chunks_oversized
;
30 static chunk
*chunk_buffers
;
31 static array
*chunkqueue_default_tempdirs
= NULL
;
32 static off_t chunkqueue_default_tempfile_size
= DEFAULT_TEMPFILE_SIZE
;
34 void chunkqueue_set_chunk_size (size_t sz
)
36 chunk_buf_sz
= sz
> 0 ? ((sz
+ 1023) & ~1023uL) : 4096;
39 void chunkqueue_set_tempdirs_default_reset (void)
41 chunkqueue_default_tempdirs
= NULL
;
42 chunkqueue_default_tempfile_size
= DEFAULT_TEMPFILE_SIZE
;
45 chunkqueue
*chunkqueue_init(void) {
48 cq
= calloc(1, sizeof(*cq
));
49 force_assert(NULL
!= cq
);
54 cq
->tempdirs
= chunkqueue_default_tempdirs
;
55 cq
->upload_temp_file_size
= chunkqueue_default_tempfile_size
;
60 static chunk
*chunk_init(size_t sz
) {
63 c
= calloc(1, sizeof(*c
));
64 force_assert(NULL
!= c
);
67 c
->mem
= buffer_init();
68 c
->file
.start
= c
->file
.length
= c
->file
.mmap
.offset
= 0;
70 c
->file
.mmap
.start
= MAP_FAILED
;
71 c
->file
.mmap
.length
= 0;
76 buffer_string_prepare_copy(c
->mem
, sz
-1);
81 static void chunk_reset_file_chunk(chunk
*c
) {
82 if (c
->file
.is_temp
&& !buffer_string_is_empty(c
->mem
)) {
85 if (c
->file
.fd
!= -1) {
89 if (MAP_FAILED
!= c
->file
.mmap
.start
) {
90 munmap(c
->file
.mmap
.start
, c
->file
.mmap
.length
);
91 c
->file
.mmap
.start
= MAP_FAILED
;
93 c
->file
.start
= c
->file
.length
= c
->file
.mmap
.offset
= 0;
94 c
->file
.mmap
.length
= 0;
99 static void chunk_reset(chunk
*c
) {
100 if (c
->type
== FILE_CHUNK
) chunk_reset_file_chunk(c
);
102 buffer_clear(c
->mem
);
106 static void chunk_free(chunk
*c
) {
107 if (c
->type
== FILE_CHUNK
) chunk_reset_file_chunk(c
);
112 buffer
* chunk_buffer_acquire(void) {
120 c
= chunk_init(chunk_buf_sz
);
122 c
->next
= chunk_buffers
;
129 void chunk_buffer_release(buffer
*b
) {
130 if (NULL
== b
) return;
131 if (b
->size
>= chunk_buf_sz
&& chunk_buffers
) {
132 chunk
*c
= chunk_buffers
;
133 chunk_buffers
= c
->next
;
144 static chunk
* chunk_acquire(size_t sz
) {
145 if (sz
<= chunk_buf_sz
) {
154 sz
= (sz
+ 8191) & ~8191uL;
155 /* future: might have buckets of certain sizes, up to socket buf sizes*/
156 if (chunks_oversized
&& chunks_oversized
->mem
->size
>= sz
) {
157 chunk
*c
= chunks_oversized
;
158 chunks_oversized
= c
->next
;
163 return chunk_init(sz
);
166 static void chunk_release(chunk
*c
) {
167 const size_t sz
= c
->mem
->size
;
168 if (sz
== chunk_buf_sz
) {
173 else if (sz
> chunk_buf_sz
) {
175 chunk
**co
= &chunks_oversized
;
176 while (*co
&& sz
< (*co
)->mem
->size
) co
= &(*co
)->next
;
185 void chunkqueue_chunk_pool_clear(void)
187 for (chunk
*next
, *c
= chunks
; c
; c
= next
) {
192 for (chunk
*next
, *c
= chunks_oversized
; c
; c
= next
) {
196 chunks_oversized
= NULL
;
199 void chunkqueue_chunk_pool_free(void)
201 chunkqueue_chunk_pool_clear();
202 for (chunk
*next
, *c
= chunk_buffers
; c
; c
= next
) {
204 c
->mem
= buffer_init(); /*(chunk_reset() expects c->mem != NULL)*/
207 chunk_buffers
= NULL
;
210 static off_t
chunk_remaining_length(const chunk
*c
) {
211 /* MEM_CHUNK or FILE_CHUNK */
212 return (c
->type
== MEM_CHUNK
213 ? (off_t
)buffer_string_length(c
->mem
)
218 static void chunkqueue_release_chunks(chunkqueue
*cq
) {
220 for (chunk
*c
; (c
= cq
->first
); ) {
226 void chunkqueue_free(chunkqueue
*cq
) {
227 if (NULL
== cq
) return;
228 chunkqueue_release_chunks(cq
);
232 static void chunkqueue_prepend_chunk(chunkqueue
*cq
, chunk
*c
) {
233 if (NULL
== (c
->next
= cq
->first
)) cq
->last
= c
;
237 static void chunkqueue_append_chunk(chunkqueue
*cq
, chunk
*c
) {
239 *(cq
->last
? &cq
->last
->next
: &cq
->first
) = c
;
243 static chunk
* chunkqueue_prepend_mem_chunk(chunkqueue
*cq
, size_t sz
) {
244 chunk
*c
= chunk_acquire(sz
);
245 chunkqueue_prepend_chunk(cq
, c
);
249 static chunk
* chunkqueue_append_mem_chunk(chunkqueue
*cq
, size_t sz
) {
250 chunk
*c
= chunk_acquire(sz
);
251 chunkqueue_append_chunk(cq
, c
);
255 static chunk
* chunkqueue_append_file_chunk(chunkqueue
*cq
, buffer
*fn
, off_t offset
, off_t len
) {
256 chunk
*c
= chunk_acquire(buffer_string_length(fn
)+1);
257 chunkqueue_append_chunk(cq
, c
);
258 c
->type
= FILE_CHUNK
;
259 c
->file
.start
= offset
;
260 c
->file
.length
= len
;
262 buffer_copy_buffer(c
->mem
, fn
);
266 void chunkqueue_reset(chunkqueue
*cq
) {
267 chunkqueue_release_chunks(cq
);
273 void chunkqueue_append_file_fd(chunkqueue
*cq
, buffer
*fn
, int fd
, off_t offset
, off_t len
) {
275 (chunkqueue_append_file_chunk(cq
, fn
, offset
, len
))->file
.fd
= fd
;
282 void chunkqueue_append_file(chunkqueue
*cq
, buffer
*fn
, off_t offset
, off_t len
) {
284 chunkqueue_append_file_chunk(cq
, fn
, offset
, len
);
289 static int chunkqueue_append_mem_extend_chunk(chunkqueue
*cq
, const char *mem
, size_t len
) {
291 if (0 == len
) return 1;
292 if (c
!= NULL
&& c
->type
== MEM_CHUNK
293 && buffer_string_space(c
->mem
) >= len
) {
294 buffer_append_string_len(c
->mem
, mem
, len
);
302 void chunkqueue_append_buffer(chunkqueue
*cq
, buffer
*mem
) {
304 size_t len
= buffer_string_length(mem
);
305 if (len
< 256 && chunkqueue_append_mem_extend_chunk(cq
, mem
->ptr
, len
)) return;
307 c
= chunkqueue_append_mem_chunk(cq
, chunk_buf_sz
);
309 buffer_move(c
->mem
, mem
);
313 void chunkqueue_append_mem(chunkqueue
*cq
, const char * mem
, size_t len
) {
315 if (len
< chunk_buf_sz
&& chunkqueue_append_mem_extend_chunk(cq
, mem
, len
))
318 c
= chunkqueue_append_mem_chunk(cq
, len
+1);
320 buffer_copy_string_len(c
->mem
, mem
, len
);
324 void chunkqueue_append_mem_min(chunkqueue
*cq
, const char * mem
, size_t len
) {
326 if (len
< chunk_buf_sz
&& chunkqueue_append_mem_extend_chunk(cq
, mem
, len
))
329 c
= chunk_init(len
+1);
330 chunkqueue_append_chunk(cq
, c
);
332 buffer_copy_string_len(c
->mem
, mem
, len
);
336 void chunkqueue_append_chunkqueue(chunkqueue
*cq
, chunkqueue
*src
) {
337 if (src
== NULL
|| NULL
== src
->first
) return;
339 if (NULL
== cq
->first
) {
340 cq
->first
= src
->first
;
342 cq
->last
->next
= src
->first
;
344 cq
->last
= src
->last
;
345 cq
->bytes_in
+= (src
->bytes_in
- src
->bytes_out
);
349 src
->bytes_out
= src
->bytes_in
;
353 buffer
* chunkqueue_prepend_buffer_open_sz(chunkqueue
*cq
, size_t sz
) {
354 chunk
* const c
= chunkqueue_prepend_mem_chunk(cq
, sz
);
359 buffer
* chunkqueue_prepend_buffer_open(chunkqueue
*cq
) {
360 return chunkqueue_prepend_buffer_open_sz(cq
, chunk_buf_sz
);
364 void chunkqueue_prepend_buffer_commit(chunkqueue
*cq
) {
365 cq
->bytes_in
+= buffer_string_length(cq
->first
->mem
);
369 buffer
* chunkqueue_append_buffer_open_sz(chunkqueue
*cq
, size_t sz
) {
370 chunk
* const c
= chunkqueue_append_mem_chunk(cq
, sz
);
375 buffer
* chunkqueue_append_buffer_open(chunkqueue
*cq
) {
376 return chunkqueue_append_buffer_open_sz(cq
, chunk_buf_sz
);
380 void chunkqueue_append_buffer_commit(chunkqueue
*cq
) {
381 cq
->bytes_in
+= buffer_string_length(cq
->last
->mem
);
385 static void chunkqueue_remove_empty_chunks(chunkqueue
*cq
);
388 char * chunkqueue_get_memory(chunkqueue
*cq
, size_t *len
) {
389 size_t sz
= *len
? *len
: (chunk_buf_sz
>> 1);
392 if (NULL
!= c
&& MEM_CHUNK
== c
->type
) {
393 /* return pointer into existing buffer if large enough */
394 size_t avail
= buffer_string_space(c
->mem
);
398 return b
->ptr
+ buffer_string_length(b
);
402 /* allocate new chunk */
403 b
= chunkqueue_append_buffer_open_sz(cq
, sz
);
404 *len
= buffer_string_space(b
);
408 void chunkqueue_use_memory(chunkqueue
*cq
, size_t len
) {
411 force_assert(NULL
!= cq
);
412 force_assert(NULL
!= cq
->last
&& MEM_CHUNK
== cq
->last
->type
);
416 buffer_commit(b
, len
);
418 } else if (buffer_string_is_empty(b
)) {
419 /* scan chunkqueue to remove empty last chunk
420 * (generally not expecting a deep queue) */
421 chunkqueue_remove_empty_chunks(cq
);
425 void chunkqueue_set_tempdirs_default (array
*tempdirs
, off_t upload_temp_file_size
) {
426 chunkqueue_default_tempdirs
= tempdirs
;
427 chunkqueue_default_tempfile_size
428 = (0 == upload_temp_file_size
) ? DEFAULT_TEMPFILE_SIZE
429 : (upload_temp_file_size
> MAX_TEMPFILE_SIZE
) ? MAX_TEMPFILE_SIZE
430 : upload_temp_file_size
;
433 void chunkqueue_set_tempdirs(chunkqueue
*cq
, array
*tempdirs
, off_t upload_temp_file_size
) {
434 force_assert(NULL
!= cq
);
435 cq
->tempdirs
= tempdirs
;
436 cq
->upload_temp_file_size
437 = (0 == upload_temp_file_size
) ? DEFAULT_TEMPFILE_SIZE
438 : (upload_temp_file_size
> MAX_TEMPFILE_SIZE
) ? MAX_TEMPFILE_SIZE
439 : upload_temp_file_size
;
443 void chunkqueue_steal(chunkqueue
*dest
, chunkqueue
*src
, off_t len
) {
445 chunk
*c
= src
->first
;
448 if (NULL
== c
) break;
450 clen
= chunk_remaining_length(c
);
452 /* drop empty chunk */
453 src
->first
= c
->next
;
454 if (c
== src
->last
) src
->last
= NULL
;
459 use
= len
>= clen
? clen
: len
;
463 /* move complete chunk */
464 src
->first
= c
->next
;
465 if (c
== src
->last
) src
->last
= NULL
;
467 chunkqueue_append_chunk(dest
, c
);
468 dest
->bytes_in
+= use
;
470 /* partial chunk with length "use" */
474 chunkqueue_append_mem(dest
, c
->mem
->ptr
+ c
->offset
, use
);
477 /* tempfile flag is in "last" chunk after the split */
478 chunkqueue_append_file(dest
, c
->mem
, c
->file
.start
+ c
->offset
, use
);
483 force_assert(0 == len
);
486 src
->bytes_out
+= use
;
490 static chunk
*chunkqueue_get_append_tempfile(server
*srv
, chunkqueue
*cq
) {
492 buffer
*template = buffer_init_string("/var/tmp/lighttpd-upload-XXXXXX");
495 if (cq
->tempdirs
&& cq
->tempdirs
->used
) {
496 /* we have several tempdirs, only if all of them fail we jump out */
498 for (errno
= EIO
; cq
->tempdir_idx
< cq
->tempdirs
->used
; ++cq
->tempdir_idx
) {
499 data_string
*ds
= (data_string
*)cq
->tempdirs
->data
[cq
->tempdir_idx
];
501 buffer_copy_buffer(template, ds
->value
);
502 buffer_append_path_len(template, CONST_STR_LEN("lighttpd-upload-XXXXXX"));
503 if (-1 != (fd
= fdevent_mkstemp_append(template->ptr
))) break;
506 fd
= fdevent_mkstemp_append(template->ptr
);
510 /* (report only the last error to mkstemp()
511 * if multiple temp dirs attempted) */
512 log_error_write(srv
, __FILE__
, __LINE__
, "sbs",
513 "opening temp-file failed:",
514 template, strerror(errno
));
515 buffer_free(template);
519 c
= chunkqueue_append_file_chunk(cq
, template, 0, 0);
523 buffer_free(template);
528 int chunkqueue_append_mem_to_tempfile(server
*srv
, chunkqueue
*dest
, const char *mem
, size_t len
) {
534 * if the last chunk is
535 * - smaller than dest->upload_temp_file_size
536 * - not read yet (offset == 0)
537 * -> append to it (so it might actually become larger than dest->upload_temp_file_size)
539 * -> create a new chunk
545 && FILE_CHUNK
== dst_c
->type
546 && dst_c
->file
.is_temp
547 && dst_c
->file
.fd
>= 0
548 && 0 == dst_c
->offset
) {
549 /* ok, take the last chunk for our job */
551 if (dst_c
->file
.length
>= (off_t
)dest
->upload_temp_file_size
) {
552 /* the chunk is too large now, close it */
553 int rc
= close(dst_c
->file
.fd
);
556 log_error_write(srv
, __FILE__
, __LINE__
, "sbss",
557 "close() temp-file", dst_c
->mem
, "failed:",
567 if (NULL
== dst_c
&& NULL
== (dst_c
= chunkqueue_get_append_tempfile(srv
, dest
))) {
571 if (dst_c
->file
.fd
< 0) return -1;
574 /* (dst_c->file.fd >= 0) */
575 /* coverity[negative_returns : FALSE] */
576 written
= write(dst_c
->file
.fd
, mem
, len
);
578 if ((size_t) written
== len
) {
579 dst_c
->file
.length
+= len
;
580 dest
->bytes_in
+= len
;
583 } else if (written
>= 0) {
584 /*(assume EINTR if partial write and retry write();
585 * retry write() might fail with ENOSPC if no more space on volume)*/
586 dest
->bytes_in
+= written
;
588 len
-= (size_t)written
;
589 dst_c
->file
.length
+= (size_t)written
;
590 /* continue; retry */
591 } else if (errno
== EINTR
) {
592 /* continue; retry */
594 int retry
= (errno
== ENOSPC
&& dest
->tempdirs
&& ++dest
->tempdir_idx
< dest
->tempdirs
->used
);
596 log_error_write(srv
, __FILE__
, __LINE__
, "sbs",
597 "write() temp-file", dst_c
->mem
, "failed:",
601 if (0 == chunk_remaining_length(dst_c
)) {
602 /*(remove empty chunk and unlink tempfile)*/
603 chunkqueue_remove_empty_chunks(dest
);
604 } else {/*(close tempfile; avoid later attempts to append)*/
605 int rc
= close(dst_c
->file
.fd
);
608 log_error_write(srv
, __FILE__
, __LINE__
, "sbss",
609 "close() temp-file", dst_c
->mem
, "failed:",
614 if (!retry
) break; /* return -1; */
616 /* continue; retry */
624 int chunkqueue_steal_with_tempfiles(server
*srv
, chunkqueue
*dest
, chunkqueue
*src
, off_t len
) {
626 chunk
*c
= src
->first
;
629 if (NULL
== c
) break;
631 clen
= chunk_remaining_length(c
);
633 /* drop empty chunk */
634 src
->first
= c
->next
;
635 if (c
== src
->last
) src
->last
= NULL
;
640 use
= (len
>= clen
) ? clen
: len
;
646 /* move complete chunk */
647 src
->first
= c
->next
;
648 if (c
== src
->last
) src
->last
= NULL
;
649 chunkqueue_append_chunk(dest
, c
);
650 dest
->bytes_in
+= use
;
652 /* partial chunk with length "use" */
653 /* tempfile flag is in "last" chunk after the split */
654 chunkqueue_append_file(dest
, c
->mem
, c
->file
.start
+ c
->offset
, use
);
657 force_assert(0 == len
);
662 /* store "use" bytes from memory chunk in tempfile */
663 if (0 != chunkqueue_append_mem_to_tempfile(srv
, dest
, c
->mem
->ptr
+ c
->offset
, use
)) {
669 src
->first
= c
->next
;
670 if (c
== src
->last
) src
->last
= NULL
;
675 force_assert(0 == len
);
680 src
->bytes_out
+= use
;
686 off_t
chunkqueue_length(chunkqueue
*cq
) {
690 for (c
= cq
->first
; c
; c
= c
->next
) {
691 len
+= chunk_remaining_length(c
);
697 void chunkqueue_mark_written(chunkqueue
*cq
, off_t len
) {
698 cq
->bytes_out
+= len
;
700 for (chunk
*c
; (c
= cq
->first
); ) {
701 off_t c_len
= chunk_remaining_length(c
);
702 if (len
>= c_len
) { /* chunk got finished */
708 else { /* partial chunk */
710 return; /* chunk not finished */
714 if (NULL
== cq
->first
)
717 chunkqueue_remove_finished_chunks(cq
);
720 void chunkqueue_remove_finished_chunks(chunkqueue
*cq
) {
721 for (chunk
*c
; (c
= cq
->first
) && 0 == chunk_remaining_length(c
); ){
722 if (NULL
== (cq
->first
= c
->next
)) cq
->last
= NULL
;
727 static void chunkqueue_remove_empty_chunks(chunkqueue
*cq
) {
729 chunkqueue_remove_finished_chunks(cq
);
731 for (c
= cq
->first
; c
&& c
->next
; c
= c
->next
) {
732 if (0 == chunk_remaining_length(c
->next
)) {
733 chunk
*empty
= c
->next
;
734 c
->next
= empty
->next
;
735 if (empty
== cq
->last
) cq
->last
= c
;
736 chunk_release(empty
);
741 int chunkqueue_open_file_chunk(server
*srv
, chunkqueue
*cq
) {
742 chunk
* const c
= cq
->first
;
743 off_t offset
, toSend
;
746 force_assert(NULL
!= c
);
747 force_assert(FILE_CHUNK
== c
->type
);
748 force_assert(c
->offset
>= 0 && c
->offset
<= c
->file
.length
);
750 offset
= c
->file
.start
+ c
->offset
;
751 toSend
= c
->file
.length
- c
->offset
;
753 if (-1 == c
->file
.fd
) {
754 /* (permit symlinks; should already have been checked. However, TOC-TOU remains) */
755 if (-1 == (c
->file
.fd
= fdevent_open_cloexec(c
->mem
->ptr
, 1, O_RDONLY
, 0))) {
756 log_error_write(srv
, __FILE__
, __LINE__
, "ssb", "open failed:", strerror(errno
), c
->mem
);
761 /*(skip file size checks if file is temp file created by lighttpd)*/
762 if (c
->file
.is_temp
) return 0;
764 if (-1 == fstat(c
->file
.fd
, &st
)) {
765 log_error_write(srv
, __FILE__
, __LINE__
, "ss", "fstat failed:", strerror(errno
));
769 if (offset
> st
.st_size
|| toSend
> st
.st_size
|| offset
> st
.st_size
- toSend
) {
770 log_error_write(srv
, __FILE__
, __LINE__
, "sb", "file shrunk:", c
->mem
);