4 * the network chunk-API
13 #include <sys/types.h>
25 chunkqueue
*chunkqueue_init(void) {
28 cq
= calloc(1, sizeof(*cq
));
29 force_assert(NULL
!= cq
);
39 static chunk
*chunk_init(void) {
42 c
= calloc(1, sizeof(*c
));
43 force_assert(NULL
!= c
);
46 c
->mem
= buffer_init();
47 c
->file
.name
= buffer_init();
48 c
->file
.start
= c
->file
.length
= c
->file
.mmap
.offset
= 0;
50 c
->file
.mmap
.start
= MAP_FAILED
;
51 c
->file
.mmap
.length
= 0;
59 static void chunk_reset(chunk
*c
) {
60 if (NULL
== c
) return;
66 if (c
->file
.is_temp
&& !buffer_string_is_empty(c
->file
.name
)) {
67 unlink(c
->file
.name
->ptr
);
70 buffer_reset(c
->file
.name
);
72 if (c
->file
.fd
!= -1) {
76 if (MAP_FAILED
!= c
->file
.mmap
.start
) {
77 munmap(c
->file
.mmap
.start
, c
->file
.mmap
.length
);
78 c
->file
.mmap
.start
= MAP_FAILED
;
80 c
->file
.start
= c
->file
.length
= c
->file
.mmap
.offset
= 0;
81 c
->file
.mmap
.length
= 0;
87 static void chunk_free(chunk
*c
) {
88 if (NULL
== c
) return;
93 buffer_free(c
->file
.name
);
98 static off_t
chunk_remaining_length(const chunk
*c
) {
102 len
= buffer_string_length(c
->mem
);
105 len
= c
->file
.length
;
108 force_assert(c
->type
== MEM_CHUNK
|| c
->type
== FILE_CHUNK
);
111 force_assert(c
->offset
<= len
);
112 return len
- c
->offset
;
115 void chunkqueue_free(chunkqueue
*cq
) {
118 if (NULL
== cq
) return;
120 for (c
= cq
->first
; c
; ) {
126 for (c
= cq
->unused
; c
; ) {
135 static void chunkqueue_push_unused_chunk(chunkqueue
*cq
, chunk
*c
) {
136 force_assert(NULL
!= cq
&& NULL
!= c
);
138 /* keep at max 4 chunks in the 'unused'-cache */
139 if (cq
->unused_chunks
> 4) {
143 c
->next
= cq
->unused
;
149 static chunk
*chunkqueue_get_unused_chunk(chunkqueue
*cq
) {
152 force_assert(NULL
!= cq
);
154 /* check if we have a unused chunk */
155 if (0 == cq
->unused
) {
158 /* take the first element from the list (a stack) */
160 cq
->unused
= c
->next
;
168 static void chunkqueue_prepend_chunk(chunkqueue
*cq
, chunk
*c
) {
172 if (NULL
== cq
->last
) {
175 cq
->bytes_in
+= chunk_remaining_length(c
);
178 static void chunkqueue_append_chunk(chunkqueue
*cq
, chunk
*c
) {
185 if (NULL
== cq
->first
) {
188 cq
->bytes_in
+= chunk_remaining_length(c
);
191 void chunkqueue_reset(chunkqueue
*cq
) {
192 chunk
*cur
= cq
->first
;
194 cq
->first
= cq
->last
= NULL
;
196 while (NULL
!= cur
) {
197 chunk
*next
= cur
->next
;
198 chunkqueue_push_unused_chunk(cq
, cur
);
207 void chunkqueue_append_file_fd(chunkqueue
*cq
, buffer
*fn
, int fd
, off_t offset
, off_t len
) {
215 c
= chunkqueue_get_unused_chunk(cq
);
217 c
->type
= FILE_CHUNK
;
219 buffer_copy_buffer(c
->file
.name
, fn
);
220 c
->file
.start
= offset
;
221 c
->file
.length
= len
;
225 chunkqueue_append_chunk(cq
, c
);
228 void chunkqueue_append_file(chunkqueue
*cq
, buffer
*fn
, off_t offset
, off_t len
) {
231 if (0 == len
) return;
233 c
= chunkqueue_get_unused_chunk(cq
);
235 c
->type
= FILE_CHUNK
;
237 buffer_copy_buffer(c
->file
.name
, fn
);
238 c
->file
.start
= offset
;
239 c
->file
.length
= len
;
242 chunkqueue_append_chunk(cq
, c
);
245 void chunkqueue_append_buffer(chunkqueue
*cq
, buffer
*mem
) {
248 if (buffer_string_is_empty(mem
)) return;
250 c
= chunkqueue_get_unused_chunk(cq
);
252 force_assert(NULL
!= c
->mem
);
253 buffer_move(c
->mem
, mem
);
255 chunkqueue_append_chunk(cq
, c
);
258 void chunkqueue_prepend_buffer(chunkqueue
*cq
, buffer
*mem
) {
261 if (buffer_string_is_empty(mem
)) return;
263 c
= chunkqueue_get_unused_chunk(cq
);
265 force_assert(NULL
!= c
->mem
);
266 buffer_move(c
->mem
, mem
);
268 chunkqueue_prepend_chunk(cq
, c
);
272 void chunkqueue_append_mem(chunkqueue
*cq
, const char * mem
, size_t len
) {
275 if (0 == len
) return;
277 c
= chunkqueue_get_unused_chunk(cq
);
279 buffer_copy_string_len(c
->mem
, mem
, len
);
281 chunkqueue_append_chunk(cq
, c
);
285 void chunkqueue_append_chunkqueue(chunkqueue
*cq
, chunkqueue
*src
) {
286 if (src
== NULL
|| NULL
== src
->first
) return;
288 if (NULL
== cq
->first
) {
289 cq
->first
= src
->first
;
291 cq
->last
->next
= src
->first
;
293 cq
->last
= src
->last
;
294 cq
->bytes_in
+= (src
->bytes_in
- src
->bytes_out
);
298 src
->bytes_out
= src
->bytes_in
;
302 void chunkqueue_get_memory(chunkqueue
*cq
, char **mem
, size_t *len
, size_t min_size
, size_t alloc_size
) {
303 static const size_t REALLOC_MAX_SIZE
= 256;
309 force_assert(NULL
!= cq
);
310 if (NULL
== mem
) mem
= &dummy_mem
;
311 if (NULL
== len
) len
= &dummy_len
;
313 /* default values: */
314 if (0 == min_size
) min_size
= 1024;
315 if (0 == alloc_size
) alloc_size
= 4096;
316 if (alloc_size
< min_size
) alloc_size
= min_size
;
318 if (NULL
!= cq
->last
&& MEM_CHUNK
== cq
->last
->type
) {
322 have
= buffer_string_space(b
);
324 /* unused buffer: allocate space */
325 if (buffer_string_is_empty(b
)) {
326 buffer_string_prepare_copy(b
, alloc_size
);
327 have
= buffer_string_space(b
);
329 /* if buffer is really small just make it bigger */
330 else if (have
< min_size
&& b
->size
<= REALLOC_MAX_SIZE
) {
331 size_t cur_len
= buffer_string_length(b
);
332 size_t new_size
= cur_len
+ min_size
, append
;
333 if (new_size
< alloc_size
) new_size
= alloc_size
;
335 append
= new_size
- cur_len
;
336 if (append
>= min_size
) {
337 buffer_string_prepare_append(b
, append
);
338 have
= buffer_string_space(b
);
342 /* return pointer into existing buffer if large enough */
343 if (have
>= min_size
) {
344 *mem
= b
->ptr
+ buffer_string_length(b
);
350 /* allocate new chunk */
351 c
= chunkqueue_get_unused_chunk(cq
);
353 chunkqueue_append_chunk(cq
, c
);
356 buffer_string_prepare_append(b
, alloc_size
);
358 *mem
= b
->ptr
+ buffer_string_length(b
);
359 *len
= buffer_string_space(b
);
362 void chunkqueue_use_memory(chunkqueue
*cq
, size_t len
) {
365 force_assert(NULL
!= cq
);
366 force_assert(NULL
!= cq
->last
&& MEM_CHUNK
== cq
->last
->type
);
370 buffer_commit(b
, len
);
372 } else if (buffer_string_is_empty(b
)) {
373 /* unused buffer: can't remove chunk easily from
374 * end of list, so just reset the buffer
380 /* default 1MB, upper limit 128MB */
381 #define DEFAULT_TEMPFILE_SIZE (1 * 1024 * 1024)
382 #define MAX_TEMPFILE_SIZE (128 * 1024 * 1024)
384 void chunkqueue_set_tempdirs(chunkqueue
*cq
, array
*tempdirs
, unsigned int upload_temp_file_size
) {
385 force_assert(NULL
!= cq
);
386 cq
->tempdirs
= tempdirs
;
387 cq
->upload_temp_file_size
388 = (0 == upload_temp_file_size
) ? DEFAULT_TEMPFILE_SIZE
389 : (upload_temp_file_size
> MAX_TEMPFILE_SIZE
) ? MAX_TEMPFILE_SIZE
390 : upload_temp_file_size
;
394 void chunkqueue_steal(chunkqueue
*dest
, chunkqueue
*src
, off_t len
) {
396 chunk
*c
= src
->first
;
399 if (NULL
== c
) break;
401 clen
= chunk_remaining_length(c
);
403 /* drop empty chunk */
404 src
->first
= c
->next
;
405 if (c
== src
->last
) src
->last
= NULL
;
406 chunkqueue_push_unused_chunk(src
, c
);
410 use
= len
>= clen
? clen
: len
;
414 /* move complete chunk */
415 src
->first
= c
->next
;
416 if (c
== src
->last
) src
->last
= NULL
;
418 chunkqueue_append_chunk(dest
, c
);
420 /* partial chunk with length "use" */
424 chunkqueue_append_mem(dest
, c
->mem
->ptr
+ c
->offset
, use
);
427 /* tempfile flag is in "last" chunk after the split */
428 chunkqueue_append_file(dest
, c
->file
.name
, c
->file
.start
+ c
->offset
, use
);
433 force_assert(0 == len
);
436 src
->bytes_out
+= use
;
440 static chunk
*chunkqueue_get_append_tempfile(chunkqueue
*cq
) {
442 buffer
*template = buffer_init_string("/var/tmp/lighttpd-upload-XXXXXX");
445 if (cq
->tempdirs
&& cq
->tempdirs
->used
) {
446 /* we have several tempdirs, only if all of them fail we jump out */
448 for (errno
= EIO
; cq
->tempdir_idx
< cq
->tempdirs
->used
; ++cq
->tempdir_idx
) {
449 data_string
*ds
= (data_string
*)cq
->tempdirs
->data
[cq
->tempdir_idx
];
451 buffer_copy_buffer(template, ds
->value
);
452 buffer_append_slash(template);
453 buffer_append_string_len(template, CONST_STR_LEN("lighttpd-upload-XXXXXX"));
455 if (-1 != (fd
= mkstemp(template->ptr
))) break;
458 fd
= mkstemp(template->ptr
);
462 buffer_free(template);
466 c
= chunkqueue_get_unused_chunk(cq
);
467 c
->type
= FILE_CHUNK
;
470 buffer_copy_buffer(c
->file
.name
, template);
473 chunkqueue_append_chunk(cq
, c
);
475 buffer_free(template);
480 static void chunkqueue_remove_empty_chunks(chunkqueue
*cq
);
482 int chunkqueue_append_mem_to_tempfile(server
*srv
, chunkqueue
*dest
, const char *mem
, size_t len
) {
488 * if the last chunk is
489 * - smaller than dest->upload_temp_file_size
490 * - not read yet (offset == 0)
491 * -> append to it (so it might actually become larger than dest->upload_temp_file_size)
493 * -> create a new chunk
499 && FILE_CHUNK
== dst_c
->type
500 && dst_c
->file
.is_temp
501 && dst_c
->file
.fd
>= 0
502 && 0 == dst_c
->offset
) {
503 /* ok, take the last chunk for our job */
505 if (dst_c
->file
.length
>= (off_t
)dest
->upload_temp_file_size
) {
506 /* the chunk is too large now, close it */
507 int rc
= close(dst_c
->file
.fd
);
510 log_error_write(srv
, __FILE__
, __LINE__
, "sbss",
511 "close() temp-file", dst_c
->file
.name
, "failed:",
521 if (NULL
== dst_c
&& NULL
== (dst_c
= chunkqueue_get_append_tempfile(dest
))) {
522 /* we don't have file to write to,
523 * EACCES might be one reason.
525 * Instead of sending 500 we send 413 and say the request is too large
528 log_error_write(srv
, __FILE__
, __LINE__
, "ss",
529 "opening temp-file failed:", strerror(errno
));
534 written
= write(dst_c
->file
.fd
, mem
, len
);
536 if ((size_t) written
== len
) {
537 dst_c
->file
.length
+= len
;
538 dest
->bytes_in
+= len
;
541 } else if (written
>= 0) {
542 /*(assume EINTR if partial write and retry write();
543 * retry write() might fail with ENOSPC if no more space on volume)*/
544 dest
->bytes_in
+= written
;
546 len
-= (size_t)written
;
547 dst_c
->file
.length
+= (size_t)written
;
548 /* continue; retry */
549 } else if (errno
== EINTR
) {
550 /* continue; retry */
552 int retry
= (errno
== ENOSPC
&& dest
->tempdirs
&& ++dest
->tempdir_idx
< dest
->tempdirs
->used
);
554 log_error_write(srv
, __FILE__
, __LINE__
, "sbs",
555 "write() temp-file", dst_c
->file
.name
, "failed:",
559 if (0 == chunk_remaining_length(dst_c
)) {
560 /*(remove empty chunk and unlink tempfile)*/
561 chunkqueue_remove_empty_chunks(dest
);
562 } else {/*(close tempfile; avoid later attempts to append)*/
563 int rc
= close(dst_c
->file
.fd
);
566 log_error_write(srv
, __FILE__
, __LINE__
, "sbss",
567 "close() temp-file", dst_c
->file
.name
, "failed:",
572 if (!retry
) return -1;
574 /* continue; retry */
579 return -1; /*(not reached)*/
582 int chunkqueue_steal_with_tempfiles(server
*srv
, chunkqueue
*dest
, chunkqueue
*src
, off_t len
) {
584 chunk
*c
= src
->first
;
587 if (NULL
== c
) break;
589 clen
= chunk_remaining_length(c
);
591 /* drop empty chunk */
592 src
->first
= c
->next
;
593 if (c
== src
->last
) src
->last
= NULL
;
594 chunkqueue_push_unused_chunk(src
, c
);
598 use
= (len
>= clen
) ? clen
: len
;
604 /* move complete chunk */
605 src
->first
= c
->next
;
606 if (c
== src
->last
) src
->last
= NULL
;
607 chunkqueue_append_chunk(dest
, c
);
609 /* partial chunk with length "use" */
610 /* tempfile flag is in "last" chunk after the split */
611 chunkqueue_append_file(dest
, c
->file
.name
, c
->file
.start
+ c
->offset
, use
);
614 force_assert(0 == len
);
619 /* store "use" bytes from memory chunk in tempfile */
620 if (0 != chunkqueue_append_mem_to_tempfile(srv
, dest
, c
->mem
->ptr
+ c
->offset
, use
)) {
626 src
->first
= c
->next
;
627 if (c
== src
->last
) src
->last
= NULL
;
628 chunkqueue_push_unused_chunk(src
, c
);
632 force_assert(0 == len
);
637 src
->bytes_out
+= use
;
643 off_t
chunkqueue_length(chunkqueue
*cq
) {
647 for (c
= cq
->first
; c
; c
= c
->next
) {
648 len
+= chunk_remaining_length(c
);
654 int chunkqueue_is_empty(chunkqueue
*cq
) {
655 return NULL
== cq
->first
;
658 void chunkqueue_mark_written(chunkqueue
*cq
, off_t len
) {
661 force_assert(len
>= 0);
663 for (c
= cq
->first
; NULL
!= c
; c
= cq
->first
) {
664 off_t c_len
= chunk_remaining_length(c
);
666 if (0 == written
&& 0 != c_len
) break; /* no more finished chunks */
668 if (written
>= c_len
) { /* chunk got finished */
673 if (c
== cq
->last
) cq
->last
= NULL
;
675 chunkqueue_push_unused_chunk(cq
, c
);
676 } else { /* partial chunk */
677 c
->offset
+= written
;
679 break; /* chunk not finished */
683 force_assert(0 == written
);
684 cq
->bytes_out
+= len
;
687 void chunkqueue_remove_finished_chunks(chunkqueue
*cq
) {
690 for (c
= cq
->first
; c
; c
= cq
->first
) {
691 if (0 != chunk_remaining_length(c
)) break; /* not finished yet */
694 if (c
== cq
->last
) cq
->last
= NULL
;
696 chunkqueue_push_unused_chunk(cq
, c
);
700 static void chunkqueue_remove_empty_chunks(chunkqueue
*cq
) {
702 chunkqueue_remove_finished_chunks(cq
);
703 if (chunkqueue_is_empty(cq
)) return;
705 for (c
= cq
->first
; c
->next
; c
= c
->next
) {
706 if (0 == chunk_remaining_length(c
->next
)) {
707 chunk
*empty
= c
->next
;
708 c
->next
= empty
->next
;
709 if (empty
== cq
->last
) cq
->last
= c
;
711 chunkqueue_push_unused_chunk(cq
, empty
);