cmogstored 1.8.1 - use default system stack size
[cmogstored.git] / http_put.c
blob11d6342a03a2ddced2a3f196d88d05dc7016332c
1 /*
2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4 */
5 #include "cmogstored.h"
6 #include "http.h"
7 #include "digest.h"
9 static __thread struct {
10 bool ready;
11 struct random_data data;
12 char state[128];
13 } rnd;
15 static void file_close_null(struct mog_fd *mfd)
17 struct mog_http *http = &mfd->as.http;
19 if (http->forward == NULL)
20 return;
21 mog_http_unlink_ftmp(http);
22 TRACE(CMOGSTORED_HTTP_BYTES_XFER(mfd->fd, http->forward->as.file.foff));
23 mog_file_close(http->forward);
24 http->forward = NULL;
27 bool mog_http_write_full(struct mog_fd *file_mfd, char *buf, size_t buf_len)
29 struct mog_file *file = &file_mfd->as.file;
30 ssize_t w;
31 const char *errpath;
33 if (file->digest.ctx)
34 gc_hash_write(file->digest.ctx, buf_len, buf);
35 if (buf_len == 0)
36 return true;
38 errno = 0;
39 for (;;) {
40 w = pwrite(file_mfd->fd, buf, buf_len, file->foff);
42 if (w > 0) {
43 file->foff += w;
44 if (w == buf_len)
45 return true;
47 buf_len -= w;
48 buf += w;
49 continue;
51 if (w < 0 && errno == EINTR)
52 continue;
53 if (w == 0 && errno == 0)
54 errno = ENOSPC;
56 break;
59 errpath = file->tmppath ? file->tmppath : file->path;
61 PRESERVE_ERRNO(do {
62 if (w == 0)
63 syslog(LOG_ERR,
64 "pwrite() to %s wrote zero bytes of "
65 "%llu at offset: %lld: assuming %m",
66 errpath, (unsigned long long)buf_len,
67 (long long)file->foff);
68 else
69 syslog(LOG_ERR,
70 "pwrite() to %s failed at offset: %lld: %m",
71 errpath, (long long)file->foff);
72 } while (0));
74 return false;
77 #define stop(mfd,status) stop0((mfd),(status),sizeof(status)-1);
79 MOG_NOINLINE static enum mog_next
80 stop0(struct mog_fd *mfd, const char *status, size_t status_len)
82 if (status) {
83 struct iovec iov;
84 union { const char *in; char *out; } deconst;
86 deconst.in = status;
87 iov.iov_base = deconst.out;
88 iov.iov_len = status_len;
90 mog_http_resp0(mfd, &iov, false);
92 file_close_null(mfd);
93 return MOG_NEXT_CLOSE;
96 MOG_NOINLINE static enum mog_next
97 write_err(struct mog_fd *mfd, const char *default_msg)
99 switch (errno) {
100 case ERANGE:
101 case ENOSPC:
102 case EFBIG:
103 return stop(mfd, "507 Insufficient Storage");
106 if (default_msg == NULL)
107 default_msg = "500 Internal Server Error";
109 return stop0(mfd, default_msg, strlen(default_msg));
112 static bool md5_ok(struct mog_http *http)
114 gc_hash_handle ctx = http->forward->as.file.digest.ctx;
115 const char *result;
117 /* PUT requests don't _require_ Content-MD5 header/trailer */
118 if (ctx == NULL)
119 return true;
121 result = gc_hash_read(ctx);
123 return (memcmp(http->expect_md5, result, 16) == 0);
126 static bool set_perms_commit(struct mog_http *http)
128 struct mog_file *file = &http->forward->as.file;
130 if (fchmod(http->forward->fd, http->svc->put_perms) != 0) {
131 syslog(LOG_ERR, "fchmod() failed: %m");
132 return false;
135 if (file->tmppath == NULL)
136 return true;
137 assert(file->path && "file->path NULL when file->tmppath set");
138 if (mog_rename(http->svc, file->tmppath, file->path) == 0) {
139 mog_free_and_null(&file->tmppath);
140 return true;
143 syslog(LOG_ERR, "renameat(%s => %s) failed: %m",
144 file->tmppath, file->path);
145 return false;
148 static void put_commit_resp(struct mog_fd *mfd)
150 struct mog_http *http = &mfd->as.http;
152 if (md5_ok(http)) { /* true if there's no MD5, too */
153 if (set_perms_commit(http)) {
154 file_close_null(mfd);
155 mog_http_resp(mfd, "201 Created", true);
156 mog_notify(MOG_NOTIFY_DEVICE_REFRESH);
157 } else {
158 file_close_null(mfd);
159 mog_http_resp(mfd, "500 Internal Server Error", false);
161 } else {
162 file_close_null(mfd);
163 mog_http_resp(mfd, "400 Bad Request", true);
167 static enum mog_next http_put_commit(struct mog_fd *mfd)
169 struct mog_http *http = &mfd->as.http;
171 put_commit_resp(mfd);
173 if (http->wbuf && http->wbuf != MOG_WR_ERROR)
174 return MOG_NEXT_WAIT_WR;
175 if (!http->_p.persistent || http->wbuf == MOG_WR_ERROR)
176 return MOG_NEXT_CLOSE;
177 mog_http_reset(mfd);
178 return MOG_NEXT_ACTIVE;
181 static void stash_advance_rbuf(struct mog_http *http, char *buf, size_t buf_len)
183 struct mog_rbuf *rbuf = http->rbuf;
184 size_t end = http->_p.line_end + 1;
186 if (http->_p.line_end == 0 || buf_len <= end) {
187 http->_p.buf_off = 0;
188 mog_rbuf_reattach_and_null(&http->rbuf);
189 return;
192 assert(buf[http->_p.line_end] == '\n' && "line_end is not LF");
193 assert(buf_len <= MOG_RBUF_MAX_SIZE && "bad rbuf size");
194 assert(end <= http->_p.buf_off && "invalid line end");
195 if (rbuf == NULL)
196 http->rbuf = rbuf = mog_rbuf_new(MOG_RBUF_BASE_SIZE);
198 memmove(rbuf->rptr, buf + end, buf_len - end);
199 rbuf->rsize = buf_len - end;
200 http->_p.buf_off -= end;
201 if (http->_p.tmp_tip >= end)
202 http->_p.tmp_tip -= end;
203 http->_p.line_end = 0;
206 static void
207 chunked_body_after_header(struct mog_fd *mfd, char *buf, size_t buf_len)
209 struct mog_http *http = &mfd->as.http;
210 size_t tmpoff = http->_p.buf_off;
212 mog_chunk_init(http);
213 http->_p.buf_off = tmpoff;
215 switch (mog_chunk_parse(http, buf, buf_len)) {
216 case MOG_PARSER_ERROR:
217 (void)write_err(mfd, "400 Bad Request");
218 return;
219 case MOG_PARSER_CONTINUE:
220 assert(http->_p.chunk_state != MOG_CHUNK_STATE_DONE);
221 /* fall through */
222 case MOG_PARSER_DONE:
223 switch (http->_p.chunk_state) {
224 case MOG_CHUNK_STATE_SIZE:
225 assert(http->_p.buf_off == buf_len
226 && "HTTP chunk parser didn't finish size");
227 return;
228 case MOG_CHUNK_STATE_DATA:
229 assert(http->_p.buf_off == buf_len
230 && "HTTP chunk parser didn't finish data");
231 return;
232 case MOG_CHUNK_STATE_TRAILER:
233 assert(http->_p.buf_off > 0 &&
234 "http->_p.buf_off unset while in trailer");
235 stash_advance_rbuf(http, buf, buf_len);
236 http->_p.skip_rbuf_defer = 1;
237 return;
238 case MOG_CHUNK_STATE_DONE:
239 put_commit_resp(mfd);
240 assert(http->_p.buf_off > 0 &&
241 "http->_p.buf_off unset after chunk body done");
242 stash_advance_rbuf(http, buf, buf_len);
243 http->_p.skip_rbuf_defer = 1;
248 static void
249 identity_body_after_header(struct mog_fd *mfd, char *buf, size_t buf_len)
251 struct mog_http *http = &mfd->as.http;
252 size_t body_len = buf_len - http->_p.buf_off;
253 char *body_ptr = buf + http->_p.buf_off;
255 if (http->_p.content_len < body_len)
256 body_len = http->_p.content_len;
257 if (body_len == 0)
258 return;
259 http->_p.buf_off += body_len;
260 if (!mog_http_write_full(http->forward, body_ptr, body_len))
261 (void)write_err(mfd, NULL);
264 static bool lengths_ok(struct mog_http *http)
266 if (http->_p.content_len < 0)
267 return false; /* ERANGE */
269 if (http->_p.has_content_range) {
270 if (http->_p.chunked)
271 return false;
273 if (http->_p.range_end < 0 || http->_p.range_beg < 0)
274 return false; /* ERANGE */
276 assert(http->_p.range_end >= 0 && http->_p.range_beg >= 0 &&
277 "bad range, http_parser.rl broken");
279 /* can't end after we start */
280 if (http->_p.range_end < http->_p.range_beg)
281 return false;
284 * Content-Length should match Content-Range boundaries
285 * WARNING: Eric Wong sucks at arithmetic, check this:
287 if (http->_p.content_len >= 0) {
288 off_t expect = http->_p.range_end -
289 http->_p.range_beg + 1;
291 if (http->_p.content_len != expect)
292 return false;
295 return true;
298 MOG_NOINLINE static void rnd_init_per_thread(void)
300 unsigned seed = (unsigned)((size_t)&rnd >> 1);
302 CHECK(int, 0,
303 initstate_r(seed, rnd.state, sizeof(rnd.state), &rnd.data));
304 rnd.ready = true;
307 static char *tmppath_for(struct mog_http *http, const char *path)
309 int32_t result;
310 int rc;
311 char *s;
313 if (!rnd.ready)
314 rnd_init_per_thread();
316 assert(http && "validation later"); /* TODO */
317 CHECK(int, 0, random_r(&rnd.data, &result));
319 rc = asprintf(&s, "%s.%08x.%d.tmp",
320 path, (unsigned)result, (int)getpid());
322 return rc >= 0 ? s : 0;
325 static struct mog_file * open_put(struct mog_http *http, char *path)
327 struct mog_file *file;
330 * we can't do an atomic rename(2) on successful PUT
331 * if we have a partial upload
333 if (http->_p.has_content_range) {
334 http->forward = mog_file_open_put(http->svc, path, O_CREAT);
335 if (http->forward == NULL)
336 return NULL;
338 file = &http->forward->as.file;
339 assert(file->tmppath == NULL && file->path == NULL &&
340 "file->*path should both be NULL after open");
341 } else {
342 char *tmp = tmppath_for(http, path);
343 int fl = O_EXCL | O_TRUNC | O_CREAT;
345 if (!tmp)
346 return NULL;
348 http->forward = mog_file_open_put(http->svc, tmp, fl);
350 /* retry once on EEXIST, don't inf loop if RNG is broken */
351 if (http->forward == NULL && errno == EEXIST) {
352 free(tmp);
353 tmp = tmppath_for(http, path);
354 if (!tmp)
355 return NULL;
356 http->forward = mog_file_open_put(http->svc, tmp, fl);
358 if (http->forward == NULL) {
359 PRESERVE_ERRNO( free(tmp) );
360 return NULL;
362 file = &http->forward->as.file;
363 file->tmppath = tmp;
366 file->path = xstrdup(path);
367 assert(file->foff == 0 && "file->foff should be zero");
368 if (http->_p.has_content_range)
369 file->foff = http->_p.range_beg;
370 if (http->_p.has_md5)
371 mog_digest_init(&file->digest, GC_MD5);
373 return file;
376 void mog_http_put(struct mog_fd *mfd, char *buf, size_t buf_len)
378 struct mog_http *http = &mfd->as.http;
379 char *path;
380 struct mog_file *file;
382 if (mfd->fd_type == MOG_FD_TYPE_HTTPGET) {
383 mog_http_resp(mfd, "405 Method Not Allowed", false);
384 return;
387 path = mog_http_path(http, buf);
388 if (path == NULL)
389 goto err; /* bad path */
390 assert(http->forward == NULL && "already have http->forward");
391 assert(path[0] == '/' && "bad path");
393 TRACE(CMOGSTORED_HTTP_REQ_START(mfd->fd, "PUT", path));
395 if (!lengths_ok(http)) {
396 write_err(mfd, "400 Bad Request");
397 return;
400 file = open_put(http, path);
401 if (file == NULL)
402 goto err;
404 if (buf_len == http->_p.buf_off) {
405 /* we got the HTTP header in one read() */
406 if (http->_p.chunked) {
407 mog_rbuf_reattach_and_null(&http->rbuf);
408 mog_chunk_init(http);
409 http->_p.buf_off = buf_len;
411 return;
414 * otherwise we got part of the request body with the header,
415 * write partially read body
417 assert(buf_len > http->_p.buf_off && http->_p.buf_off > 0
418 && "http->_p.buf_off is wrong");
420 if (http->_p.chunked)
421 chunked_body_after_header(mfd, buf, buf_len);
422 else
423 identity_body_after_header(mfd, buf, buf_len);
425 return;
426 err:
427 switch (errno) {
428 case EINVAL:
429 mog_http_resp(mfd, "400 Bad Request", false);
430 return;
431 case ENOENT:
432 mog_http_resp(mfd, "404 Not Found", false);
433 return;
434 case EACCES:
435 mog_http_resp(mfd, "403 Forbidden", false);
436 return;
438 syslog(LOG_ERR, "problem starting PUT for path=%s (%m)", path);
439 (void)write_err(mfd, NULL);
442 static unsigned last_data_recv(int fd)
444 #ifdef TCP_INFO
445 struct tcp_info info;
446 socklen_t len = (socklen_t)sizeof(struct tcp_info);
447 int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &info, &len);
449 if (rc == 0)
450 return (unsigned)info.tcpi_last_data_recv;
451 #endif /* TCP_INFO */
452 return (unsigned)-1;
455 MOG_NOINLINE static void read_err_dbg(struct mog_fd *mfd, ssize_t r)
457 int save_errno = errno;
458 struct mog_ni ni;
459 const char *path = "(unknown)";
460 long long bytes = -1;
461 const char *errfmt;
462 unsigned last = last_data_recv(mfd->fd);
464 mog_nameinfo(&mfd->as.http.mpa, &ni);
466 if (mfd->as.http.forward) {
467 path = mfd->as.http.forward->as.file.path;
468 bytes = (long long)mfd->as.http.forward->as.file.foff;
471 #define PFX "PUT %s failed from %s%s after %lld bytes: "
472 errfmt = (r == 0) ? PFX"premature EOF" : PFX"%m";
473 #undef PFX
474 errno = save_errno;
475 syslog(LOG_ERR, errfmt, path, ni.ni_host, ni.ni_serv, bytes);
477 if (last != (unsigned)-1)
478 syslog(LOG_ERR, "last_data_recv=%ums from %s%s for PUT %s",
479 last, ni.ni_host, ni.ni_serv, path);
482 static enum mog_next identity_put_in_progress(struct mog_fd *mfd)
484 struct mog_http *http = &mfd->as.http;
485 ssize_t r;
486 size_t buf_len;
487 char *buf;
488 off_t need;
490 assert(http->wbuf == NULL && "can't receive file with http->wbuf");
491 assert(http->forward && http->forward != MOG_IOSTAT && "bad forward");
493 need = http->_p.content_len - http->forward->as.file.foff;
494 if (http->_p.has_content_range)
495 need += http->_p.range_beg;
496 if (need == 0)
497 return http_put_commit(mfd);
499 buf = mog_fsbuf_get(&buf_len);
500 again:
501 assert(need > 0 && "over-wrote on PUT request");
502 if (need < buf_len)
503 buf_len = need;
504 retry:
505 r = read(mfd->fd, buf, buf_len);
506 if (r > 0) {
507 if (!mog_http_write_full(http->forward, buf, r))
508 return write_err(mfd, NULL);
509 need -= r;
510 if (need == 0)
511 return http_put_commit(mfd);
513 if (mog_ioq_contended())
514 return MOG_NEXT_WAIT_RD;
515 goto again;
517 if (r != 0) {
518 switch (errno) {
519 case_EAGAIN: return MOG_NEXT_WAIT_RD;
520 case EINTR: goto retry;
524 /* assume all read() errors mean socket is unwritable, too */
525 read_err_dbg(mfd, r);
526 return stop(mfd, NULL);
529 static enum mog_next chunked_put_in_progress(struct mog_fd *mfd)
531 struct mog_rbuf *rbuf;
532 struct mog_http *http = &mfd->as.http;
533 ssize_t r;
534 size_t buf_len;
535 size_t prev_len;
536 char *buf;
537 bool in_trailer = false;
539 again:
540 assert(http->wbuf == NULL && "can't receive file with http->wbuf");
541 assert(http->forward && http->forward != MOG_IOSTAT && "bad forward");
543 switch (http->_p.chunk_state) {
544 case MOG_CHUNK_STATE_DATA:
545 assert(http->rbuf == NULL && "unexpected http->rbuf");
546 if (http->_p.content_len == 0) { /* final chunk */
547 http->_p.chunk_state = MOG_CHUNK_STATE_TRAILER;
548 http->_p.buf_off = 0;
549 goto chunk_state_trailer;
551 assert(http->_p.content_len > 0 && "bad chunk length");
552 /* read the chunk into memory */
553 buf = mog_fsbuf_get(&buf_len);
554 if (buf_len > http->_p.content_len)
555 buf_len = http->_p.content_len;
556 do {
557 r = read(mfd->fd, buf, buf_len);
558 } while (r < 0 && errno == EINTR);
560 if (r <= 0)
561 goto read_err;
562 if (!mog_http_write_full(http->forward, buf, r))
563 return write_err(mfd, NULL);
565 http->_p.content_len -= r;
567 /* chunk is complete */
568 if (http->_p.content_len == 0)
569 mog_chunk_init(http);
571 if (mog_ioq_contended())
572 return MOG_NEXT_WAIT_RD;
573 goto again;
574 case MOG_CHUNK_STATE_TRAILER:
575 chunk_state_trailer:
576 in_trailer = true;
577 /* fall-through */
578 case MOG_CHUNK_STATE_SIZE:
579 rbuf = http->rbuf;
580 if (rbuf) {
581 prev_len = rbuf->rsize;
582 buf_len = rbuf->rcapa - prev_len;
583 buf = rbuf->rptr + prev_len;
585 * buf_len == 0 may happen here if client sends
586 * us very bogus data... just 400 it below
588 } else {
589 prev_len = 0;
590 rbuf = mog_rbuf_get(MOG_RBUF_BASE_SIZE);
591 buf_len = rbuf->rcapa;
592 buf = rbuf->rptr;
594 do {
595 r = read(mfd->fd, buf, buf_len);
596 } while (r < 0 && errno == EINTR);
597 if (r <= 0)
598 goto read_err;
600 buf = rbuf->rptr;
601 buf_len = r + prev_len;
603 switch (mog_chunk_parse(http, buf, buf_len)) {
604 case MOG_PARSER_ERROR:
605 return write_err(mfd, "400 Bad Request");
606 case MOG_PARSER_CONTINUE:
607 assert(http->_p.chunk_state != MOG_CHUNK_STATE_DONE);
608 case MOG_PARSER_DONE:
609 switch (http->_p.chunk_state) {
610 case MOG_CHUNK_STATE_SIZE:
611 if (in_trailer)
612 assert(0 && "bad chunk state: size");
613 /* client is trickling chunk size :< */
614 mog_rbuf_reattach_and_null(&http->rbuf);
615 http->_p.buf_off = 0;
616 goto again;
617 case MOG_CHUNK_STATE_DATA:
618 if (in_trailer)
619 assert(0 && "bad chunk state: data");
620 /* client is trickling final chunk/trailer */
621 mog_rbuf_reattach_and_null(&http->rbuf);
622 goto again;
623 case MOG_CHUNK_STATE_TRAILER:
624 stash_advance_rbuf(http, buf, buf_len);
625 goto again;
626 case MOG_CHUNK_STATE_DONE:
627 stash_advance_rbuf(http, buf, buf_len);
629 /* pipelined HTTP request after trailers! */
630 if (http->rbuf)
631 assert(http->rbuf->rsize > 0
632 && http->_p.buf_off == 0
633 && "bad rbuf");
634 return http_put_commit(mfd);
637 assert(0 && "compiler bug?");
638 case MOG_CHUNK_STATE_DONE:
639 assert(0 && "invalid state");
642 read_err:
643 if (r < 0) {
644 switch (errno) {
645 case_EAGAIN: return MOG_NEXT_WAIT_RD;
648 read_err_dbg(mfd, r);
649 return stop(mfd, NULL);
652 enum mog_next mog_http_put_in_progress(struct mog_fd *mfd)
654 if (mfd->as.http._p.chunked)
655 return chunked_put_in_progress(mfd);
657 return identity_put_in_progress(mfd);