2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 #include "cmogstored.h"
9 static __thread
struct {
11 struct random_data data
;
15 static void file_close_null(struct mog_fd
*mfd
)
17 struct mog_http
*http
= &mfd
->as
.http
;
19 if (http
->forward
== NULL
)
21 mog_http_unlink_ftmp(http
);
22 TRACE(CMOGSTORED_HTTP_BYTES_XFER(mfd
->fd
, http
->forward
->as
.file
.foff
));
23 mog_file_close(http
->forward
);
27 bool mog_http_write_full(struct mog_fd
*file_mfd
, char *buf
, size_t buf_len
)
29 struct mog_file
*file
= &file_mfd
->as
.file
;
34 gc_hash_write(file
->digest
.ctx
, buf_len
, buf
);
40 w
= pwrite(file_mfd
->fd
, buf
, buf_len
, file
->foff
);
51 if (w
< 0 && errno
== EINTR
)
53 if (w
== 0 && errno
== 0)
59 errpath
= file
->tmppath
? file
->tmppath
: file
->path
;
64 "pwrite() to %s wrote zero bytes of "
65 "%llu at offset: %lld: assuming %m",
66 errpath
, (unsigned long long)buf_len
,
67 (long long)file
->foff
);
70 "pwrite() to %s failed at offset: %lld: %m",
71 errpath
, (long long)file
->foff
);
77 #define stop(mfd,status) stop0((mfd),(status),sizeof(status)-1);
79 MOG_NOINLINE
static enum mog_next
80 stop0(struct mog_fd
*mfd
, const char *status
, size_t status_len
)
84 union { const char *in
; char *out
; } deconst
;
87 iov
.iov_base
= deconst
.out
;
88 iov
.iov_len
= status_len
;
90 mog_http_resp0(mfd
, &iov
, false);
93 return MOG_NEXT_CLOSE
;
96 MOG_NOINLINE
static enum mog_next
97 write_err(struct mog_fd
*mfd
, const char *default_msg
)
103 return stop(mfd
, "507 Insufficient Storage");
106 if (default_msg
== NULL
)
107 default_msg
= "500 Internal Server Error";
109 return stop0(mfd
, default_msg
, strlen(default_msg
));
112 static bool md5_ok(struct mog_http
*http
)
114 gc_hash_handle ctx
= http
->forward
->as
.file
.digest
.ctx
;
117 /* PUT requests don't _require_ Content-MD5 header/trailer */
121 result
= gc_hash_read(ctx
);
123 return (memcmp(http
->expect_md5
, result
, 16) == 0);
126 static bool set_perms_commit(struct mog_http
*http
)
128 struct mog_file
*file
= &http
->forward
->as
.file
;
130 if (fchmod(http
->forward
->fd
, http
->svc
->put_perms
) != 0) {
131 syslog(LOG_ERR
, "fchmod() failed: %m");
135 if (file
->tmppath
== NULL
)
137 assert(file
->path
&& "file->path NULL when file->tmppath set");
138 if (mog_rename(http
->svc
, file
->tmppath
, file
->path
) == 0) {
139 mog_free_and_null(&file
->tmppath
);
143 syslog(LOG_ERR
, "renameat(%s => %s) failed: %m",
144 file
->tmppath
, file
->path
);
148 static void put_commit_resp(struct mog_fd
*mfd
)
150 struct mog_http
*http
= &mfd
->as
.http
;
152 if (md5_ok(http
)) { /* true if there's no MD5, too */
153 if (set_perms_commit(http
)) {
154 file_close_null(mfd
);
155 mog_http_resp(mfd
, "201 Created", true);
156 mog_notify(MOG_NOTIFY_DEVICE_REFRESH
);
158 file_close_null(mfd
);
159 mog_http_resp(mfd
, "500 Internal Server Error", false);
162 file_close_null(mfd
);
163 mog_http_resp(mfd
, "400 Bad Request", true);
167 static enum mog_next
http_put_commit(struct mog_fd
*mfd
)
169 struct mog_http
*http
= &mfd
->as
.http
;
171 put_commit_resp(mfd
);
173 if (http
->wbuf
&& http
->wbuf
!= MOG_WR_ERROR
)
174 return MOG_NEXT_WAIT_WR
;
175 if (!http
->_p
.persistent
|| http
->wbuf
== MOG_WR_ERROR
)
176 return MOG_NEXT_CLOSE
;
178 return MOG_NEXT_ACTIVE
;
181 static void stash_advance_rbuf(struct mog_http
*http
, char *buf
, size_t buf_len
)
183 struct mog_rbuf
*rbuf
= http
->rbuf
;
184 size_t end
= http
->_p
.line_end
+ 1;
186 if (http
->_p
.line_end
== 0 || buf_len
<= end
) {
187 http
->_p
.buf_off
= 0;
188 mog_rbuf_reattach_and_null(&http
->rbuf
);
192 assert(buf
[http
->_p
.line_end
] == '\n' && "line_end is not LF");
193 assert(buf_len
<= MOG_RBUF_MAX_SIZE
&& "bad rbuf size");
194 assert(end
<= http
->_p
.buf_off
&& "invalid line end");
196 http
->rbuf
= rbuf
= mog_rbuf_new(MOG_RBUF_BASE_SIZE
);
198 memmove(rbuf
->rptr
, buf
+ end
, buf_len
- end
);
199 rbuf
->rsize
= buf_len
- end
;
200 http
->_p
.buf_off
-= end
;
201 if (http
->_p
.tmp_tip
>= end
)
202 http
->_p
.tmp_tip
-= end
;
203 http
->_p
.line_end
= 0;
207 chunked_body_after_header(struct mog_fd
*mfd
, char *buf
, size_t buf_len
)
209 struct mog_http
*http
= &mfd
->as
.http
;
210 size_t tmpoff
= http
->_p
.buf_off
;
212 mog_chunk_init(http
);
213 http
->_p
.buf_off
= tmpoff
;
215 switch (mog_chunk_parse(http
, buf
, buf_len
)) {
216 case MOG_PARSER_ERROR
:
217 (void)write_err(mfd
, "400 Bad Request");
219 case MOG_PARSER_CONTINUE
:
220 assert(http
->_p
.chunk_state
!= MOG_CHUNK_STATE_DONE
);
222 case MOG_PARSER_DONE
:
223 switch (http
->_p
.chunk_state
) {
224 case MOG_CHUNK_STATE_SIZE
:
225 assert(http
->_p
.buf_off
== buf_len
226 && "HTTP chunk parser didn't finish size");
228 case MOG_CHUNK_STATE_DATA
:
229 assert(http
->_p
.buf_off
== buf_len
230 && "HTTP chunk parser didn't finish data");
232 case MOG_CHUNK_STATE_TRAILER
:
233 assert(http
->_p
.buf_off
> 0 &&
234 "http->_p.buf_off unset while in trailer");
235 stash_advance_rbuf(http
, buf
, buf_len
);
236 http
->_p
.skip_rbuf_defer
= 1;
238 case MOG_CHUNK_STATE_DONE
:
239 put_commit_resp(mfd
);
240 assert(http
->_p
.buf_off
> 0 &&
241 "http->_p.buf_off unset after chunk body done");
242 stash_advance_rbuf(http
, buf
, buf_len
);
243 http
->_p
.skip_rbuf_defer
= 1;
249 identity_body_after_header(struct mog_fd
*mfd
, char *buf
, size_t buf_len
)
251 struct mog_http
*http
= &mfd
->as
.http
;
252 size_t body_len
= buf_len
- http
->_p
.buf_off
;
253 char *body_ptr
= buf
+ http
->_p
.buf_off
;
255 if (http
->_p
.content_len
< body_len
)
256 body_len
= http
->_p
.content_len
;
259 http
->_p
.buf_off
+= body_len
;
260 if (!mog_http_write_full(http
->forward
, body_ptr
, body_len
))
261 (void)write_err(mfd
, NULL
);
264 static bool lengths_ok(struct mog_http
*http
)
266 if (http
->_p
.content_len
< 0)
267 return false; /* ERANGE */
269 if (http
->_p
.has_content_range
) {
270 if (http
->_p
.chunked
)
273 if (http
->_p
.range_end
< 0 || http
->_p
.range_beg
< 0)
274 return false; /* ERANGE */
276 assert(http
->_p
.range_end
>= 0 && http
->_p
.range_beg
>= 0 &&
277 "bad range, http_parser.rl broken");
279 /* can't end after we start */
280 if (http
->_p
.range_end
< http
->_p
.range_beg
)
284 * Content-Length should match Content-Range boundaries
285 * WARNING: Eric Wong sucks at arithmetic, check this:
287 if (http
->_p
.content_len
>= 0) {
288 off_t expect
= http
->_p
.range_end
-
289 http
->_p
.range_beg
+ 1;
291 if (http
->_p
.content_len
!= expect
)
298 MOG_NOINLINE
static void rnd_init_per_thread(void)
300 unsigned seed
= (unsigned)((size_t)&rnd
>> 1);
303 initstate_r(seed
, rnd
.state
, sizeof(rnd
.state
), &rnd
.data
));
307 static char *tmppath_for(struct mog_http
*http
, const char *path
)
314 rnd_init_per_thread();
316 assert(http
&& "validation later"); /* TODO */
317 CHECK(int, 0, random_r(&rnd
.data
, &result
));
319 rc
= asprintf(&s
, "%s.%08x.%d.tmp",
320 path
, (unsigned)result
, (int)getpid());
322 return rc
>= 0 ? s
: 0;
325 static struct mog_file
* open_put(struct mog_http
*http
, char *path
)
327 struct mog_file
*file
;
330 * we can't do an atomic rename(2) on successful PUT
331 * if we have a partial upload
333 if (http
->_p
.has_content_range
) {
334 http
->forward
= mog_file_open_put(http
->svc
, path
, O_CREAT
);
335 if (http
->forward
== NULL
)
338 file
= &http
->forward
->as
.file
;
339 assert(file
->tmppath
== NULL
&& file
->path
== NULL
&&
340 "file->*path should both be NULL after open");
342 char *tmp
= tmppath_for(http
, path
);
343 int fl
= O_EXCL
| O_TRUNC
| O_CREAT
;
348 http
->forward
= mog_file_open_put(http
->svc
, tmp
, fl
);
350 /* retry once on EEXIST, don't inf loop if RNG is broken */
351 if (http
->forward
== NULL
&& errno
== EEXIST
) {
353 tmp
= tmppath_for(http
, path
);
356 http
->forward
= mog_file_open_put(http
->svc
, tmp
, fl
);
358 if (http
->forward
== NULL
) {
359 PRESERVE_ERRNO( free(tmp
) );
362 file
= &http
->forward
->as
.file
;
366 file
->path
= xstrdup(path
);
367 assert(file
->foff
== 0 && "file->foff should be zero");
368 if (http
->_p
.has_content_range
)
369 file
->foff
= http
->_p
.range_beg
;
370 if (http
->_p
.has_md5
)
371 mog_digest_init(&file
->digest
, GC_MD5
);
376 void mog_http_put(struct mog_fd
*mfd
, char *buf
, size_t buf_len
)
378 struct mog_http
*http
= &mfd
->as
.http
;
380 struct mog_file
*file
;
382 if (mfd
->fd_type
== MOG_FD_TYPE_HTTPGET
) {
383 mog_http_resp(mfd
, "405 Method Not Allowed", false);
387 path
= mog_http_path(http
, buf
);
389 goto err
; /* bad path */
390 assert(http
->forward
== NULL
&& "already have http->forward");
391 assert(path
[0] == '/' && "bad path");
393 TRACE(CMOGSTORED_HTTP_REQ_START(mfd
->fd
, "PUT", path
));
395 if (!lengths_ok(http
)) {
396 write_err(mfd
, "400 Bad Request");
400 file
= open_put(http
, path
);
404 if (buf_len
== http
->_p
.buf_off
) {
405 /* we got the HTTP header in one read() */
406 if (http
->_p
.chunked
) {
407 mog_rbuf_reattach_and_null(&http
->rbuf
);
408 mog_chunk_init(http
);
409 http
->_p
.buf_off
= buf_len
;
414 * otherwise we got part of the request body with the header,
415 * write partially read body
417 assert(buf_len
> http
->_p
.buf_off
&& http
->_p
.buf_off
> 0
418 && "http->_p.buf_off is wrong");
420 if (http
->_p
.chunked
)
421 chunked_body_after_header(mfd
, buf
, buf_len
);
423 identity_body_after_header(mfd
, buf
, buf_len
);
429 mog_http_resp(mfd
, "400 Bad Request", false);
432 mog_http_resp(mfd
, "404 Not Found", false);
435 mog_http_resp(mfd
, "403 Forbidden", false);
438 syslog(LOG_ERR
, "problem starting PUT for path=%s (%m)", path
);
439 (void)write_err(mfd
, NULL
);
442 static unsigned last_data_recv(int fd
)
445 struct tcp_info info
;
446 socklen_t len
= (socklen_t
)sizeof(struct tcp_info
);
447 int rc
= getsockopt(fd
, IPPROTO_TCP
, TCP_INFO
, &info
, &len
);
450 return (unsigned)info
.tcpi_last_data_recv
;
451 #endif /* TCP_INFO */
455 MOG_NOINLINE
static void read_err_dbg(struct mog_fd
*mfd
, ssize_t r
)
457 int save_errno
= errno
;
459 const char *path
= "(unknown)";
460 long long bytes
= -1;
462 unsigned last
= last_data_recv(mfd
->fd
);
464 mog_nameinfo(&mfd
->as
.http
.mpa
, &ni
);
466 if (mfd
->as
.http
.forward
) {
467 path
= mfd
->as
.http
.forward
->as
.file
.path
;
468 bytes
= (long long)mfd
->as
.http
.forward
->as
.file
.foff
;
471 #define PFX "PUT %s failed from %s%s after %lld bytes: "
472 errfmt
= (r
== 0) ? PFX
"premature EOF" : PFX
"%m";
475 syslog(LOG_ERR
, errfmt
, path
, ni
.ni_host
, ni
.ni_serv
, bytes
);
477 if (last
!= (unsigned)-1)
478 syslog(LOG_ERR
, "last_data_recv=%ums from %s%s for PUT %s",
479 last
, ni
.ni_host
, ni
.ni_serv
, path
);
482 static enum mog_next
identity_put_in_progress(struct mog_fd
*mfd
)
484 struct mog_http
*http
= &mfd
->as
.http
;
490 assert(http
->wbuf
== NULL
&& "can't receive file with http->wbuf");
491 assert(http
->forward
&& http
->forward
!= MOG_IOSTAT
&& "bad forward");
493 need
= http
->_p
.content_len
- http
->forward
->as
.file
.foff
;
494 if (http
->_p
.has_content_range
)
495 need
+= http
->_p
.range_beg
;
497 return http_put_commit(mfd
);
499 buf
= mog_fsbuf_get(&buf_len
);
501 assert(need
> 0 && "over-wrote on PUT request");
505 r
= read(mfd
->fd
, buf
, buf_len
);
507 if (!mog_http_write_full(http
->forward
, buf
, r
))
508 return write_err(mfd
, NULL
);
511 return http_put_commit(mfd
);
513 if (mog_ioq_contended())
514 return MOG_NEXT_WAIT_RD
;
519 case_EAGAIN
: return MOG_NEXT_WAIT_RD
;
520 case EINTR
: goto retry
;
524 /* assume all read() errors mean socket is unwritable, too */
525 read_err_dbg(mfd
, r
);
526 return stop(mfd
, NULL
);
529 static enum mog_next
chunked_put_in_progress(struct mog_fd
*mfd
)
531 struct mog_rbuf
*rbuf
;
532 struct mog_http
*http
= &mfd
->as
.http
;
537 bool in_trailer
= false;
540 assert(http
->wbuf
== NULL
&& "can't receive file with http->wbuf");
541 assert(http
->forward
&& http
->forward
!= MOG_IOSTAT
&& "bad forward");
543 switch (http
->_p
.chunk_state
) {
544 case MOG_CHUNK_STATE_DATA
:
545 assert(http
->rbuf
== NULL
&& "unexpected http->rbuf");
546 if (http
->_p
.content_len
== 0) { /* final chunk */
547 http
->_p
.chunk_state
= MOG_CHUNK_STATE_TRAILER
;
548 http
->_p
.buf_off
= 0;
549 goto chunk_state_trailer
;
551 assert(http
->_p
.content_len
> 0 && "bad chunk length");
552 /* read the chunk into memory */
553 buf
= mog_fsbuf_get(&buf_len
);
554 if (buf_len
> http
->_p
.content_len
)
555 buf_len
= http
->_p
.content_len
;
557 r
= read(mfd
->fd
, buf
, buf_len
);
558 } while (r
< 0 && errno
== EINTR
);
562 if (!mog_http_write_full(http
->forward
, buf
, r
))
563 return write_err(mfd
, NULL
);
565 http
->_p
.content_len
-= r
;
567 /* chunk is complete */
568 if (http
->_p
.content_len
== 0)
569 mog_chunk_init(http
);
571 if (mog_ioq_contended())
572 return MOG_NEXT_WAIT_RD
;
574 case MOG_CHUNK_STATE_TRAILER
:
578 case MOG_CHUNK_STATE_SIZE
:
581 prev_len
= rbuf
->rsize
;
582 buf_len
= rbuf
->rcapa
- prev_len
;
583 buf
= rbuf
->rptr
+ prev_len
;
585 * buf_len == 0 may happen here if client sends
586 * us very bogus data... just 400 it below
590 rbuf
= mog_rbuf_get(MOG_RBUF_BASE_SIZE
);
591 buf_len
= rbuf
->rcapa
;
595 r
= read(mfd
->fd
, buf
, buf_len
);
596 } while (r
< 0 && errno
== EINTR
);
601 buf_len
= r
+ prev_len
;
603 switch (mog_chunk_parse(http
, buf
, buf_len
)) {
604 case MOG_PARSER_ERROR
:
605 return write_err(mfd
, "400 Bad Request");
606 case MOG_PARSER_CONTINUE
:
607 assert(http
->_p
.chunk_state
!= MOG_CHUNK_STATE_DONE
);
608 case MOG_PARSER_DONE
:
609 switch (http
->_p
.chunk_state
) {
610 case MOG_CHUNK_STATE_SIZE
:
612 assert(0 && "bad chunk state: size");
613 /* client is trickling chunk size :< */
614 mog_rbuf_reattach_and_null(&http
->rbuf
);
615 http
->_p
.buf_off
= 0;
617 case MOG_CHUNK_STATE_DATA
:
619 assert(0 && "bad chunk state: data");
620 /* client is trickling final chunk/trailer */
621 mog_rbuf_reattach_and_null(&http
->rbuf
);
623 case MOG_CHUNK_STATE_TRAILER
:
624 stash_advance_rbuf(http
, buf
, buf_len
);
626 case MOG_CHUNK_STATE_DONE
:
627 stash_advance_rbuf(http
, buf
, buf_len
);
629 /* pipelined HTTP request after trailers! */
631 assert(http
->rbuf
->rsize
> 0
632 && http
->_p
.buf_off
== 0
634 return http_put_commit(mfd
);
637 assert(0 && "compiler bug?");
638 case MOG_CHUNK_STATE_DONE
:
639 assert(0 && "invalid state");
645 case_EAGAIN
: return MOG_NEXT_WAIT_RD
;
648 read_err_dbg(mfd
, r
);
649 return stop(mfd
, NULL
);
652 enum mog_next
mog_http_put_in_progress(struct mog_fd
*mfd
)
654 if (mfd
->as
.http
._p
.chunked
)
655 return chunked_put_in_progress(mfd
);
657 return identity_put_in_progress(mfd
);