check for bogus (negative) namelen value
[beanstalkd.git] / file.c
blobabb566ce3354b88d558d03aa6e34d77e5cb86b65
1 #include <stdint.h>
2 #include <inttypes.h>
3 #include <stddef.h>
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <stdarg.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <errno.h>
12 #include <string.h>
13 #include "dat.h"
15 static void warnpos(File*, int, char*, ...);
16 static int readrec(File*, job, int*);
17 static int readrec5(File*, job, int*);
18 static int readfull(File*, void*, int, int*, char*);
20 FAlloc *falloc = &rawfalloc;
22 enum
24 Walver5 = 5
27 typedef struct Jobrec5 Jobrec5;
29 struct Jobrec5 {
30 uint64 id;
31 uint32 pri;
32 uint64 delay; // usec
33 uint64 ttr; // usec
34 int32 body_size;
35 uint64 created_at; // usec
36 uint64 deadline_at; // usec
37 uint32 reserve_ct;
38 uint32 timeout_ct;
39 uint32 release_ct;
40 uint32 bury_ct;
41 uint32 kick_ct;
42 byte state;
44 char pad[1];
47 enum
49 Jobrec5size = offsetof(Jobrec5, pad)
52 void
53 fileincref(File *f)
55 if (!f) return;
56 f->refs++;
60 void
61 filedecref(File *f)
63 if (!f) return;
64 f->refs--;
65 if (f->refs < 1) {
66 walgc(f->w);
71 void
72 fileaddjob(File *f, job j)
74 job h;
76 h = &f->jlist;
77 if (!h->fprev) h->fprev = h;
78 j->file = f;
79 j->fprev = h->fprev;
80 j->fnext = h;
81 h->fprev->fnext = j;
82 h->fprev = j;
83 fileincref(f);
87 void
88 filermjob(File *f, job j)
90 if (!f) return;
91 if (f != j->file) return;
92 j->fnext->fprev = j->fprev;
93 j->fprev->fnext = j->fnext;
94 j->fnext = 0;
95 j->fprev = 0;
96 j->file = NULL;
97 f->w->alive -= j->walused;
98 j->walused = 0;
99 filedecref(f);
103 // Fileread reads jobs from f->path into list.
104 // It returns 0 on success, or 1 if any errors occurred.
106 fileread(File *f, job list)
108 int err = 0, v;
110 if (!readfull(f, &v, sizeof(v), &err, "version")) {
111 return err;
113 switch (v) {
114 case Walver:
115 fileincref(f);
116 while (readrec(f, list, &err));
117 filedecref(f);
118 return err;
119 case Walver5:
120 fileincref(f);
121 while (readrec5(f, list, &err));
122 filedecref(f);
123 return err;
126 warnx("%s: unknown version: %d", f->path, v);
127 return 1;
131 // Readrec reads a record from f->fd into linked list l.
132 // If an error occurs, it sets *err to 1.
133 // Readrec returns the number of records read, either 1 or 0.
134 static int
135 readrec(File *f, job l, int *err)
137 int r, sz = 0;
138 int namelen;
139 Jobrec jr;
140 job j;
141 tube t;
142 char tubename[MAX_TUBE_NAME_LEN];
144 r = read(f->fd, &namelen, sizeof(int));
145 if (r == -1) {
146 twarn("read");
147 warnpos(f, 0, "error");
148 *err = 1;
149 return 0;
151 if (r != sizeof(int)) {
152 return 0;
154 sz += r;
155 if (namelen >= MAX_TUBE_NAME_LEN) {
156 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
157 *err = 1;
158 return 0;
161 if (namelen < 0) {
162 warnpos(f, -r, "namelen %d is negative", namelen);
163 *err = 1;
164 return 0;
167 if (namelen) {
168 r = readfull(f, tubename, namelen, err, "tube name");
169 if (!r) {
170 return 0;
172 sz += r;
174 tubename[namelen] = '\0';
176 r = readfull(f, &jr, sizeof(Jobrec), err, "job struct");
177 if (!r) {
178 return 0;
180 sz += r;
182 // are we reading trailing zeroes?
183 if (!jr.id) return 0;
185 j = job_find(jr.id);
186 if (!(j || namelen)) {
187 // We read a short record without having seen a
188 // full record for this job, so the full record
189 // was in an eariler file that has been deleted.
190 // Therefore the job itself has either been
191 // deleted or migrated; either way, this record
192 // should be ignored.
193 return 1;
196 switch (jr.state) {
197 case Reserved:
198 jr.state = Ready;
199 case Ready:
200 case Buried:
201 case Delayed:
202 if (!j) {
203 if (jr.body_size > job_data_size_limit) {
204 warnpos(f, -r, "job %"PRIu64" is too big (%zd > %zd)", j->r.id, job_data_size_limit);
205 goto Error;
207 t = tube_find_or_make(tubename);
208 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
209 t, jr.id);
210 j->next = j->prev = j;
211 j->r.created_at = jr.created_at;
213 j->r = jr;
214 job_insert(l, j);
216 // full record; read the job body
217 if (namelen) {
218 if (jr.body_size != j->r.body_size) {
219 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
220 warnpos(f, -r, "was %zu, now %zu", j->r.body_size, jr.body_size);
221 goto Error;
223 r = readfull(f, j->body, j->r.body_size, err, "job body");
224 if (!r) {
225 goto Error;
227 sz += r;
229 // since this is a full record, we can move
230 // the file pointer and decref the old
231 // file, if any
232 filermjob(j->file, j);
233 fileaddjob(f, j);
235 j->walused += sz;
236 f->w->alive += sz;
238 return 1;
239 case Invalid:
240 if (j) {
241 job_remove(j);
242 filermjob(j->file, j);
243 job_free(j);
245 return 1;
248 Error:
249 *err = 1;
250 if (j) {
251 job_remove(j);
252 filermjob(j->file, j);
253 job_free(j);
255 return 0;
259 // Readrec5 is like readrec, but it reads a record in "version 5"
260 // of the log format.
261 static int
262 readrec5(File *f, job l, int *err)
264 int r, sz = 0;
265 size_t namelen;
266 Jobrec5 jr;
267 job j;
268 tube t;
269 char tubename[MAX_TUBE_NAME_LEN];
271 r = read(f->fd, &namelen, sizeof(namelen));
272 if (r == -1) {
273 twarn("read");
274 warnpos(f, 0, "error");
275 *err = 1;
276 return 0;
278 if (r != sizeof(namelen)) {
279 return 0;
281 sz += r;
282 if (namelen >= MAX_TUBE_NAME_LEN) {
283 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
284 *err = 1;
285 return 0;
288 if (namelen < 0) {
289 warnpos(f, -r, "namelen %d is negative", namelen);
290 *err = 1;
291 return 0;
294 if (namelen) {
295 r = readfull(f, tubename, namelen, err, "v5 tube name");
296 if (!r) {
297 return 0;
299 sz += r;
301 tubename[namelen] = '\0';
303 r = readfull(f, &jr, Jobrec5size, err, "v5 job struct");
304 if (!r) {
305 return 0;
307 sz += r;
309 // are we reading trailing zeroes?
310 if (!jr.id) return 0;
312 j = job_find(jr.id);
313 if (!(j || namelen)) {
314 // We read a short record without having seen a
315 // full record for this job, so the full record
316 // was in an eariler file that has been deleted.
317 // Therefore the job itself has either been
318 // deleted or migrated; either way, this record
319 // should be ignored.
320 return 1;
323 switch (jr.state) {
324 case Reserved:
325 jr.state = Ready;
326 case Ready:
327 case Buried:
328 case Delayed:
329 if (!j) {
330 if (jr.body_size > job_data_size_limit) {
331 warnpos(f, -r, "job %"PRIu64" is too big (%zd > %zd)", j->r.id, job_data_size_limit);
332 goto Error;
334 t = tube_find_or_make(tubename);
335 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
336 t, jr.id);
337 j->next = j->prev = j;
338 j->r.created_at = jr.created_at;
340 j->r.id = jr.id;
341 j->r.pri = jr.pri;
342 j->r.delay = jr.delay * 1000; // us => ns
343 j->r.ttr = jr.ttr * 1000; // us => ns
344 j->r.body_size = jr.body_size;
345 j->r.created_at = jr.created_at * 1000; // us => ns
346 j->r.deadline_at = jr.deadline_at * 1000; // us => ns
347 j->r.reserve_ct = jr.reserve_ct;
348 j->r.timeout_ct = jr.timeout_ct;
349 j->r.release_ct = jr.release_ct;
350 j->r.bury_ct = jr.bury_ct;
351 j->r.kick_ct = jr.kick_ct;
352 j->r.state = jr.state;
353 job_insert(l, j);
355 // full record; read the job body
356 if (namelen) {
357 if (jr.body_size != j->r.body_size) {
358 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
359 warnpos(f, -r, "was %zu, now %zu", j->r.body_size, jr.body_size);
360 goto Error;
362 r = readfull(f, j->body, j->r.body_size, err, "v5 job body");
363 if (!r) {
364 goto Error;
366 sz += r;
368 // since this is a full record, we can move
369 // the file pointer and decref the old
370 // file, if any
371 filermjob(j->file, j);
372 fileaddjob(f, j);
374 j->walused += sz;
375 f->w->alive += sz;
377 return 1;
378 case Invalid:
379 if (j) {
380 job_remove(j);
381 filermjob(j->file, j);
382 job_free(j);
384 return 1;
387 Error:
388 *err = 1;
389 if (j) {
390 job_remove(j);
391 filermjob(j->file, j);
392 job_free(j);
394 return 0;
398 static int
399 readfull(File *f, void *c, int n, int *err, char *desc)
401 int r;
403 r = read(f->fd, c, n);
404 if (r == -1) {
405 twarn("read");
406 warnpos(f, 0, "error reading %s", desc);
407 *err = 1;
408 return 0;
410 if (r != n) {
411 warnpos(f, -r, "unexpected EOF reading %d bytes (got %d): %s", n, r, desc);
412 *err = 1;
413 return 0;
415 return r;
419 static void
420 warnpos(File *f, int adj, char *fmt, ...)
422 int off;
423 va_list ap;
425 off = lseek(f->fd, 0, SEEK_CUR);
426 fprintf(stderr, "%s:%u: ", f->path, off+adj);
427 va_start(ap, fmt);
428 vfprintf(stderr, fmt, ap);
429 va_end(ap);
430 fputc('\n', stderr);
434 // Opens f for writing, writes a header, and initializes
435 // f->free and f->resv.
436 // Sets f->iswopen if successful.
437 void
438 filewopen(File *f)
440 int fd, r;
441 int n;
442 int ver = Walver;
444 fd = open(f->path, O_WRONLY|O_CREAT, 0400);
445 if (fd < 0) {
446 twarn("open %s", f->path);
447 return;
450 r = falloc(fd, f->w->filesize);
451 if (r) {
452 close(fd);
453 errno = r;
454 twarn("falloc %s", f->path);
455 r = unlink(f->path);
456 if (r) {
457 twarn("unlink %s", f->path);
459 return;
462 n = write(fd, &ver, sizeof(int));
463 if (n < sizeof(int)) {
464 twarn("write %s", f->path);
465 close(fd);
466 return;
469 f->fd = fd;
470 f->iswopen = 1;
471 fileincref(f);
472 f->free = f->w->filesize - n;
473 f->resv = 0;
477 static int
478 filewrite(File *f, job j, void *buf, int len)
480 int r;
482 r = write(f->fd, buf, len);
483 if (r != len) {
484 twarn("write");
485 return 0;
488 f->w->resv -= r;
489 f->resv -= r;
490 j->walresv -= r;
491 j->walused += r;
492 f->w->alive += r;
493 return 1;
498 filewrjobshort(File *f, job j)
500 int r, nl;
502 nl = 0; // name len 0 indicates short record
503 r = filewrite(f, j, &nl, sizeof nl) &&
504 filewrite(f, j, &j->r, sizeof j->r);
505 if (!r) return 0;
507 if (j->r.state == Invalid) {
508 filermjob(j->file, j);
511 return r;
516 filewrjobfull(File *f, job j)
518 int nl;
520 fileaddjob(f, j);
521 nl = strlen(j->tube->name);
522 return
523 filewrite(f, j, &nl, sizeof nl) &&
524 filewrite(f, j, j->tube->name, nl) &&
525 filewrite(f, j, &j->r, sizeof j->r) &&
526 filewrite(f, j, j->body, j->r.body_size);
530 void
531 filewclose(File *f)
533 if (!f) return;
534 if (!f->iswopen) return;
535 if (f->free) {
536 (void)ftruncate(f->fd, f->w->filesize - f->free);
538 close(f->fd);
539 f->iswopen = 0;
540 filedecref(f);
545 fileinit(File *f, Wal *w, int n)
547 f->w = w;
548 f->seq = n;
549 f->path = fmtalloc("%s/binlog.%d", w->dir, n);
550 return !!f->path;
554 // Adds f to the linked list in w,
555 // updating w->tail and w->head as necessary.
556 Wal*
557 fileadd(File *f, Wal *w)
559 if (w->tail) {
560 w->tail->next = f;
562 w->tail = f;
563 if (!w->head) {
564 w->head = f;
566 w->nfile++;
567 return w;