4 * the network chunk-API
13 #include <sys/types.h>
24 /* default 1MB, upper limit 128MB */
25 #define DEFAULT_TEMPFILE_SIZE (1 * 1024 * 1024)
26 #define MAX_TEMPFILE_SIZE (128 * 1024 * 1024)
28 static size_t chunk_buf_sz
= 4096;
30 static chunk
*chunk_buffers
;
31 static array
*chunkqueue_default_tempdirs
= NULL
;
32 static unsigned int chunkqueue_default_tempfile_size
= DEFAULT_TEMPFILE_SIZE
;
34 void chunkqueue_set_chunk_size (size_t sz
)
36 chunk_buf_sz
= sz
> 0 ? ((sz
+ 1023) & ~1023uL) : 4096;
39 void chunkqueue_set_tempdirs_default_reset (void)
41 chunkqueue_default_tempdirs
= NULL
;
42 chunkqueue_default_tempfile_size
= DEFAULT_TEMPFILE_SIZE
;
45 chunkqueue
*chunkqueue_init(void) {
48 cq
= calloc(1, sizeof(*cq
));
49 force_assert(NULL
!= cq
);
54 cq
->tempdirs
= chunkqueue_default_tempdirs
;
55 cq
->upload_temp_file_size
= chunkqueue_default_tempfile_size
;
60 static chunk
*chunk_init(size_t sz
) {
63 c
= calloc(1, sizeof(*c
));
64 force_assert(NULL
!= c
);
67 c
->mem
= buffer_init();
68 c
->file
.start
= c
->file
.length
= c
->file
.mmap
.offset
= 0;
70 c
->file
.mmap
.start
= MAP_FAILED
;
71 c
->file
.mmap
.length
= 0;
76 buffer_string_prepare_copy(c
->mem
, sz
-1);
81 static void chunk_reset_file_chunk(chunk
*c
) {
82 if (c
->file
.is_temp
&& !buffer_string_is_empty(c
->mem
)) {
85 if (c
->file
.fd
!= -1) {
89 if (MAP_FAILED
!= c
->file
.mmap
.start
) {
90 munmap(c
->file
.mmap
.start
, c
->file
.mmap
.length
);
91 c
->file
.mmap
.start
= MAP_FAILED
;
93 c
->file
.start
= c
->file
.length
= c
->file
.mmap
.offset
= 0;
94 c
->file
.mmap
.length
= 0;
99 static void chunk_reset(chunk
*c
) {
100 if (c
->type
== FILE_CHUNK
) chunk_reset_file_chunk(c
);
102 buffer_clear(c
->mem
);
106 static void chunk_free(chunk
*c
) {
107 if (c
->type
== FILE_CHUNK
) chunk_reset_file_chunk(c
);
112 buffer
* chunk_buffer_acquire(void) {
120 c
= chunk_init(chunk_buf_sz
);
122 c
->next
= chunk_buffers
;
129 void chunk_buffer_release(buffer
*b
) {
130 if (NULL
== b
) return;
131 if (b
->size
>= chunk_buf_sz
&& chunk_buffers
) {
132 chunk
*c
= chunk_buffers
;
133 chunk_buffers
= c
->next
;
144 static chunk
* chunk_acquire(void) {
151 return chunk_init(chunk_buf_sz
);
155 static void chunk_release(chunk
*c
) {
156 if (c
->mem
->size
>= chunk_buf_sz
) {
166 void chunkqueue_chunk_pool_clear(void)
168 for (chunk
*next
, *c
= chunks
; c
; c
= next
) {
175 void chunkqueue_chunk_pool_free(void)
177 chunkqueue_chunk_pool_clear();
178 for (chunk
*next
, *c
= chunk_buffers
; c
; c
= next
) {
180 c
->mem
= buffer_init(); /*(chunk_reset() expects c->mem != NULL)*/
183 chunk_buffers
= NULL
;
186 static off_t
chunk_remaining_length(const chunk
*c
) {
190 len
= buffer_string_length(c
->mem
);
193 len
= c
->file
.length
;
196 force_assert(c
->type
== MEM_CHUNK
|| c
->type
== FILE_CHUNK
);
199 force_assert(c
->offset
<= len
);
200 return len
- c
->offset
;
203 void chunkqueue_free(chunkqueue
*cq
) {
206 if (NULL
== cq
) return;
208 for (c
= cq
->first
; c
; ) {
217 static void chunkqueue_prepend_chunk(chunkqueue
*cq
, chunk
*c
) {
221 if (NULL
== cq
->last
) {
226 static void chunkqueue_append_chunk(chunkqueue
*cq
, chunk
*c
) {
233 if (NULL
== cq
->first
) {
238 static chunk
* chunkqueue_prepend_mem_chunk(chunkqueue
*cq
) {
239 chunk
*c
= chunk_acquire();
240 chunkqueue_prepend_chunk(cq
, c
);
244 static chunk
* chunkqueue_append_mem_chunk(chunkqueue
*cq
) {
245 chunk
*c
= chunk_acquire();
246 chunkqueue_append_chunk(cq
, c
);
250 static chunk
* chunkqueue_append_file_chunk(chunkqueue
*cq
, buffer
*fn
, off_t offset
, off_t len
) {
251 chunk
*c
= chunk_acquire();
252 chunkqueue_append_chunk(cq
, c
);
253 c
->type
= FILE_CHUNK
;
254 c
->file
.start
= offset
;
255 c
->file
.length
= len
;
257 buffer_copy_buffer(c
->mem
, fn
);
261 void chunkqueue_reset(chunkqueue
*cq
) {
262 chunk
*cur
= cq
->first
;
264 cq
->first
= cq
->last
= NULL
;
266 while (NULL
!= cur
) {
267 chunk
*next
= cur
->next
;
277 void chunkqueue_append_file_fd(chunkqueue
*cq
, buffer
*fn
, int fd
, off_t offset
, off_t len
) {
279 (chunkqueue_append_file_chunk(cq
, fn
, offset
, len
))->file
.fd
= fd
;
286 void chunkqueue_append_file(chunkqueue
*cq
, buffer
*fn
, off_t offset
, off_t len
) {
288 chunkqueue_append_file_chunk(cq
, fn
, offset
, len
);
293 static int chunkqueue_append_mem_extend_chunk(chunkqueue
*cq
, const char *mem
, size_t len
) {
295 if (0 == len
) return 1;
296 if (c
!= NULL
&& c
->type
== MEM_CHUNK
297 && buffer_string_space(c
->mem
) >= len
) {
298 buffer_append_string_len(c
->mem
, mem
, len
);
306 void chunkqueue_append_buffer(chunkqueue
*cq
, buffer
*mem
) {
308 size_t len
= buffer_string_length(mem
);
309 if (len
< 256 && chunkqueue_append_mem_extend_chunk(cq
, mem
->ptr
, len
)) return;
311 c
= chunkqueue_append_mem_chunk(cq
);
313 buffer_move(c
->mem
, mem
);
317 void chunkqueue_append_mem(chunkqueue
*cq
, const char * mem
, size_t len
) {
319 if (len
< chunk_buf_sz
&& chunkqueue_append_mem_extend_chunk(cq
, mem
, len
))
322 c
= chunkqueue_append_mem_chunk(cq
);
324 buffer_copy_string_len(c
->mem
, mem
, len
);
328 void chunkqueue_append_mem_min(chunkqueue
*cq
, const char * mem
, size_t len
) {
330 if (len
< chunk_buf_sz
&& chunkqueue_append_mem_extend_chunk(cq
, mem
, len
))
333 c
= chunk_init(len
+1);
334 chunkqueue_append_chunk(cq
, c
);
336 buffer_copy_string_len(c
->mem
, mem
, len
);
340 void chunkqueue_append_chunkqueue(chunkqueue
*cq
, chunkqueue
*src
) {
341 if (src
== NULL
|| NULL
== src
->first
) return;
343 if (NULL
== cq
->first
) {
344 cq
->first
= src
->first
;
346 cq
->last
->next
= src
->first
;
348 cq
->last
= src
->last
;
349 cq
->bytes_in
+= (src
->bytes_in
- src
->bytes_out
);
353 src
->bytes_out
= src
->bytes_in
;
358 static void chunkqueue_buffer_open_resize(chunk
*c
, size_t sz
) {
359 chunk
* const n
= chunk_init((sz
+ 4095) & ~4095uL);
360 buffer
* const b
= c
->mem
;
367 buffer
* chunkqueue_prepend_buffer_open_sz(chunkqueue
*cq
, size_t sz
) {
368 chunk
* const c
= chunkqueue_prepend_mem_chunk(cq
);
369 if (buffer_string_space(c
->mem
) < sz
) {
370 chunkqueue_buffer_open_resize(c
, sz
);
376 buffer
* chunkqueue_prepend_buffer_open(chunkqueue
*cq
) {
377 chunk
*c
= chunkqueue_prepend_mem_chunk(cq
);
382 void chunkqueue_prepend_buffer_commit(chunkqueue
*cq
) {
383 cq
->bytes_in
+= buffer_string_length(cq
->first
->mem
);
387 buffer
* chunkqueue_append_buffer_open_sz(chunkqueue
*cq
, size_t sz
) {
388 chunk
* const c
= chunkqueue_append_mem_chunk(cq
);
389 if (buffer_string_space(c
->mem
) < sz
) {
390 chunkqueue_buffer_open_resize(c
, sz
);
396 buffer
* chunkqueue_append_buffer_open(chunkqueue
*cq
) {
397 chunk
*c
= chunkqueue_append_mem_chunk(cq
);
402 void chunkqueue_append_buffer_commit(chunkqueue
*cq
) {
403 cq
->bytes_in
+= buffer_string_length(cq
->last
->mem
);
407 static void chunkqueue_remove_empty_chunks(chunkqueue
*cq
);
410 char * chunkqueue_get_memory(chunkqueue
*cq
, size_t *len
) {
411 size_t sz
= *len
? *len
: (chunk_buf_sz
>> 1);
414 if (NULL
!= c
&& MEM_CHUNK
== c
->type
) {
415 /* return pointer into existing buffer if large enough */
416 size_t avail
= buffer_string_space(c
->mem
);
420 return b
->ptr
+ buffer_string_length(b
);
424 /* allocate new chunk */
425 b
= chunkqueue_append_buffer_open_sz(cq
, sz
);
426 *len
= buffer_string_space(b
);
430 void chunkqueue_use_memory(chunkqueue
*cq
, size_t len
) {
433 force_assert(NULL
!= cq
);
434 force_assert(NULL
!= cq
->last
&& MEM_CHUNK
== cq
->last
->type
);
438 buffer_commit(b
, len
);
440 } else if (buffer_string_is_empty(b
)) {
441 /* scan chunkqueue to remove empty last chunk
442 * (generally not expecting a deep queue) */
443 chunkqueue_remove_empty_chunks(cq
);
447 void chunkqueue_set_tempdirs_default (array
*tempdirs
, unsigned int upload_temp_file_size
) {
448 chunkqueue_default_tempdirs
= tempdirs
;
449 chunkqueue_default_tempfile_size
450 = (0 == upload_temp_file_size
) ? DEFAULT_TEMPFILE_SIZE
451 : (upload_temp_file_size
> MAX_TEMPFILE_SIZE
) ? MAX_TEMPFILE_SIZE
452 : upload_temp_file_size
;
456 void chunkqueue_set_tempdirs(chunkqueue
*cq
, array
*tempdirs
, unsigned int upload_temp_file_size
) {
457 force_assert(NULL
!= cq
);
458 cq
->tempdirs
= tempdirs
;
459 cq
->upload_temp_file_size
460 = (0 == upload_temp_file_size
) ? DEFAULT_TEMPFILE_SIZE
461 : (upload_temp_file_size
> MAX_TEMPFILE_SIZE
) ? MAX_TEMPFILE_SIZE
462 : upload_temp_file_size
;
467 void chunkqueue_steal(chunkqueue
*dest
, chunkqueue
*src
, off_t len
) {
469 chunk
*c
= src
->first
;
472 if (NULL
== c
) break;
474 clen
= chunk_remaining_length(c
);
476 /* drop empty chunk */
477 src
->first
= c
->next
;
478 if (c
== src
->last
) src
->last
= NULL
;
483 use
= len
>= clen
? clen
: len
;
487 /* move complete chunk */
488 src
->first
= c
->next
;
489 if (c
== src
->last
) src
->last
= NULL
;
491 chunkqueue_append_chunk(dest
, c
);
492 dest
->bytes_in
+= use
;
494 /* partial chunk with length "use" */
498 chunkqueue_append_mem(dest
, c
->mem
->ptr
+ c
->offset
, use
);
501 /* tempfile flag is in "last" chunk after the split */
502 chunkqueue_append_file(dest
, c
->mem
, c
->file
.start
+ c
->offset
, use
);
507 force_assert(0 == len
);
510 src
->bytes_out
+= use
;
514 static chunk
*chunkqueue_get_append_tempfile(server
*srv
, chunkqueue
*cq
) {
516 buffer
*template = buffer_init_string("/var/tmp/lighttpd-upload-XXXXXX");
519 if (cq
->tempdirs
&& cq
->tempdirs
->used
) {
520 /* we have several tempdirs, only if all of them fail we jump out */
522 for (errno
= EIO
; cq
->tempdir_idx
< cq
->tempdirs
->used
; ++cq
->tempdir_idx
) {
523 data_string
*ds
= (data_string
*)cq
->tempdirs
->data
[cq
->tempdir_idx
];
525 buffer_copy_buffer(template, ds
->value
);
526 buffer_append_path_len(template, CONST_STR_LEN("lighttpd-upload-XXXXXX"));
529 /* POSIX-2008 requires mkstemp create file with 0600 perms */
532 /* coverity[secure_temp : FALSE] */
533 if (-1 != (fd
= mkstemp(template->ptr
))) break;
537 /* POSIX-2008 requires mkstemp create file with 0600 perms */
540 /* coverity[secure_temp : FALSE] */
541 fd
= mkstemp(template->ptr
);
545 /* (report only the last error to mkstemp()
546 * if multiple temp dirs attempted) */
547 log_error_write(srv
, __FILE__
, __LINE__
, "sbs",
548 "opening temp-file failed:",
549 template, strerror(errno
));
550 buffer_free(template);
554 if (0 != fcntl(fd
, F_SETFL
, fcntl(fd
, F_GETFL
, 0) | O_APPEND
)) {
555 /* (should not happen; fd is regular file) */
556 log_error_write(srv
, __FILE__
, __LINE__
, "sbs",
557 "fcntl():", template, strerror(errno
));
559 buffer_free(template);
562 fdevent_setfd_cloexec(fd
);
564 c
= chunkqueue_append_file_chunk(cq
, template, 0, 0);
568 buffer_free(template);
573 int chunkqueue_append_mem_to_tempfile(server
*srv
, chunkqueue
*dest
, const char *mem
, size_t len
) {
579 * if the last chunk is
580 * - smaller than dest->upload_temp_file_size
581 * - not read yet (offset == 0)
582 * -> append to it (so it might actually become larger than dest->upload_temp_file_size)
584 * -> create a new chunk
590 && FILE_CHUNK
== dst_c
->type
591 && dst_c
->file
.is_temp
592 && dst_c
->file
.fd
>= 0
593 && 0 == dst_c
->offset
) {
594 /* ok, take the last chunk for our job */
596 if (dst_c
->file
.length
>= (off_t
)dest
->upload_temp_file_size
) {
597 /* the chunk is too large now, close it */
598 int rc
= close(dst_c
->file
.fd
);
601 log_error_write(srv
, __FILE__
, __LINE__
, "sbss",
602 "close() temp-file", dst_c
->mem
, "failed:",
612 if (NULL
== dst_c
&& NULL
== (dst_c
= chunkqueue_get_append_tempfile(srv
, dest
))) {
616 if (dst_c
->file
.fd
< 0) return -1;
619 /* (dst_c->file.fd >= 0) */
620 /* coverity[negative_returns : FALSE] */
621 written
= write(dst_c
->file
.fd
, mem
, len
);
623 if ((size_t) written
== len
) {
624 dst_c
->file
.length
+= len
;
625 dest
->bytes_in
+= len
;
628 } else if (written
>= 0) {
629 /*(assume EINTR if partial write and retry write();
630 * retry write() might fail with ENOSPC if no more space on volume)*/
631 dest
->bytes_in
+= written
;
633 len
-= (size_t)written
;
634 dst_c
->file
.length
+= (size_t)written
;
635 /* continue; retry */
636 } else if (errno
== EINTR
) {
637 /* continue; retry */
639 int retry
= (errno
== ENOSPC
&& dest
->tempdirs
&& ++dest
->tempdir_idx
< dest
->tempdirs
->used
);
641 log_error_write(srv
, __FILE__
, __LINE__
, "sbs",
642 "write() temp-file", dst_c
->mem
, "failed:",
646 if (0 == chunk_remaining_length(dst_c
)) {
647 /*(remove empty chunk and unlink tempfile)*/
648 chunkqueue_remove_empty_chunks(dest
);
649 } else {/*(close tempfile; avoid later attempts to append)*/
650 int rc
= close(dst_c
->file
.fd
);
653 log_error_write(srv
, __FILE__
, __LINE__
, "sbss",
654 "close() temp-file", dst_c
->mem
, "failed:",
659 if (!retry
) break; /* return -1; */
661 /* continue; retry */
669 int chunkqueue_steal_with_tempfiles(server
*srv
, chunkqueue
*dest
, chunkqueue
*src
, off_t len
) {
671 chunk
*c
= src
->first
;
674 if (NULL
== c
) break;
676 clen
= chunk_remaining_length(c
);
678 /* drop empty chunk */
679 src
->first
= c
->next
;
680 if (c
== src
->last
) src
->last
= NULL
;
685 use
= (len
>= clen
) ? clen
: len
;
691 /* move complete chunk */
692 src
->first
= c
->next
;
693 if (c
== src
->last
) src
->last
= NULL
;
694 chunkqueue_append_chunk(dest
, c
);
695 dest
->bytes_in
+= use
;
697 /* partial chunk with length "use" */
698 /* tempfile flag is in "last" chunk after the split */
699 chunkqueue_append_file(dest
, c
->mem
, c
->file
.start
+ c
->offset
, use
);
702 force_assert(0 == len
);
707 /* store "use" bytes from memory chunk in tempfile */
708 if (0 != chunkqueue_append_mem_to_tempfile(srv
, dest
, c
->mem
->ptr
+ c
->offset
, use
)) {
714 src
->first
= c
->next
;
715 if (c
== src
->last
) src
->last
= NULL
;
720 force_assert(0 == len
);
725 src
->bytes_out
+= use
;
731 off_t
chunkqueue_length(chunkqueue
*cq
) {
735 for (c
= cq
->first
; c
; c
= c
->next
) {
736 len
+= chunk_remaining_length(c
);
742 void chunkqueue_mark_written(chunkqueue
*cq
, off_t len
) {
745 force_assert(len
>= 0);
747 for (c
= cq
->first
; NULL
!= c
; c
= cq
->first
) {
748 off_t c_len
= chunk_remaining_length(c
);
750 if (0 == written
&& 0 != c_len
) break; /* no more finished chunks */
752 if (written
>= c_len
) { /* chunk got finished */
757 if (c
== cq
->last
) cq
->last
= NULL
;
759 } else { /* partial chunk */
760 c
->offset
+= written
;
762 break; /* chunk not finished */
766 force_assert(0 == written
);
767 cq
->bytes_out
+= len
;
770 void chunkqueue_remove_finished_chunks(chunkqueue
*cq
) {
773 for (c
= cq
->first
; c
; c
= cq
->first
) {
774 if (0 != chunk_remaining_length(c
)) break; /* not finished yet */
777 if (c
== cq
->last
) cq
->last
= NULL
;
782 static void chunkqueue_remove_empty_chunks(chunkqueue
*cq
) {
784 chunkqueue_remove_finished_chunks(cq
);
785 if (chunkqueue_is_empty(cq
)) return;
787 for (c
= cq
->first
; c
&& c
->next
; c
= c
->next
) {
788 if (0 == chunk_remaining_length(c
->next
)) {
789 chunk
*empty
= c
->next
;
790 c
->next
= empty
->next
;
791 if (empty
== cq
->last
) cq
->last
= c
;
792 chunk_release(empty
);
797 int chunkqueue_open_file_chunk(server
*srv
, chunkqueue
*cq
) {
798 chunk
* const c
= cq
->first
;
799 off_t offset
, toSend
;
802 force_assert(NULL
!= c
);
803 force_assert(FILE_CHUNK
== c
->type
);
804 force_assert(c
->offset
>= 0 && c
->offset
<= c
->file
.length
);
806 offset
= c
->file
.start
+ c
->offset
;
807 toSend
= c
->file
.length
- c
->offset
;
809 if (-1 == c
->file
.fd
) {
810 if (-1 == (c
->file
.fd
= fdevent_open_cloexec(c
->mem
->ptr
, O_RDONLY
, 0))) {
811 log_error_write(srv
, __FILE__
, __LINE__
, "ssb", "open failed:", strerror(errno
), c
->mem
);
816 /*(skip file size checks if file is temp file created by lighttpd)*/
817 if (c
->file
.is_temp
) return 0;
819 if (-1 == fstat(c
->file
.fd
, &st
)) {
820 log_error_write(srv
, __FILE__
, __LINE__
, "ss", "fstat failed:", strerror(errno
));
824 if (offset
> st
.st_size
|| toSend
> st
.st_size
|| offset
> st
.st_size
- toSend
) {
825 log_error_write(srv
, __FILE__
, __LINE__
, "sb", "file shrunk:", c
->mem
);