add changelog for 1.13
[beanstalkd.git] / file.c
blob9d15135308fa8227ce5033494277e561abdf284a
1 #include "dat.h"
2 #include <stdint.h>
3 #include <inttypes.h>
4 #include <stddef.h>
5 #include <stdlib.h>
6 #include <stdio.h>
7 #include <unistd.h>
8 #include <stdarg.h>
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <fcntl.h>
12 #include <errno.h>
13 #include <string.h>
15 static int readrec(File*, Job *, int*);
16 static int readrec5(File*, Job *, int*);
17 static int readfull(File*, void*, int, int*, char*);
18 static void warnpos(File*, int, char*, ...)
19 __attribute__((format(printf, 3, 4)));
21 FAlloc *falloc = &rawfalloc;
23 enum
25 Walver5 = 5
28 typedef struct Jobrec5 Jobrec5;
30 struct Jobrec5 {
31 uint64 id;
32 uint32 pri;
33 uint64 delay; // usec
34 uint64 ttr; // usec
35 int32 body_size;
36 uint64 created_at; // usec
37 uint64 deadline_at; // usec
38 uint32 reserve_ct;
39 uint32 timeout_ct;
40 uint32 release_ct;
41 uint32 bury_ct;
42 uint32 kick_ct;
43 byte state;
45 char pad[1];
48 enum
50 Jobrec5size = offsetof(Jobrec5, pad)
53 // rawfalloc allocates disk space of len bytes.
54 // It expects fd's offset to be 0; may also reset fd's offset to 0.
55 // Returns 0 on success, and a positive errno otherwise.
56 int
57 rawfalloc(int fd, int len)
59 // We do not use ftruncate() because it might extend the file
60 // with a sequence of null bytes or a hole.
61 // posix_fallocate() is not portable enough, might fail for NFS.
62 static char buf[4096] = {0};
63 int i, w;
65 for (i = 0; i < len; i += w) {
66 w = write(fd, buf, sizeof buf);
67 if (w == -1)
68 return errno;
70 lseek(fd, 0, 0); // do not care if this fails
71 return 0;
74 void
75 fileincref(File *f)
77 if (!f) return;
78 f->refs++;
82 void
83 filedecref(File *f)
85 if (!f) return;
86 f->refs--;
87 if (f->refs < 1) {
88 walgc(f->w);
93 void
94 fileaddjob(File *f, Job *j)
96 Job *h;
98 h = &f->jlist;
99 if (!h->fprev) h->fprev = h;
100 j->file = f;
101 j->fprev = h->fprev;
102 j->fnext = h;
103 h->fprev->fnext = j;
104 h->fprev = j;
105 fileincref(f);
109 void
110 filermjob(File *f, Job *j)
112 if (!f) return;
113 if (f != j->file) return;
114 j->fnext->fprev = j->fprev;
115 j->fprev->fnext = j->fnext;
116 j->fnext = 0;
117 j->fprev = 0;
118 j->file = NULL;
119 f->w->alive -= j->walused;
120 j->walused = 0;
121 filedecref(f);
125 // Fileread reads jobs from f->path into list.
126 // It returns 0 on success, or 1 if any errors occurred.
128 fileread(File *f, Job *list)
130 int err = 0, v;
132 if (!readfull(f, &v, sizeof(v), &err, "version")) {
133 return err;
135 switch (v) {
136 case Walver:
137 fileincref(f);
138 while (readrec(f, list, &err));
139 filedecref(f);
140 return err;
141 case Walver5:
142 fileincref(f);
143 while (readrec5(f, list, &err));
144 filedecref(f);
145 return err;
148 warnx("%s: unknown version: %d", f->path, v);
149 return 1;
153 // Readrec reads a record from f->fd into linked list l.
154 // If an error occurs, it sets *err to 1.
155 // Readrec returns the number of records read, either 1 or 0.
156 static int
157 readrec(File *f, Job *l, int *err)
159 int r, sz = 0;
160 int namelen;
161 Jobrec jr;
162 Job *j;
163 Tube *t;
164 char tubename[MAX_TUBE_NAME_LEN];
166 r = read(f->fd, &namelen, sizeof(int));
167 if (r == -1) {
168 twarn("read");
169 warnpos(f, 0, "error");
170 *err = 1;
171 return 0;
173 if (r != sizeof(int)) {
174 return 0;
176 sz += r;
177 if (namelen >= MAX_TUBE_NAME_LEN) {
178 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
179 *err = 1;
180 return 0;
183 if (namelen < 0) {
184 warnpos(f, -r, "namelen %d is negative", namelen);
185 *err = 1;
186 return 0;
189 if (namelen) {
190 r = readfull(f, tubename, namelen, err, "tube name");
191 if (!r) {
192 return 0;
194 sz += r;
196 tubename[namelen] = '\0';
198 r = readfull(f, &jr, sizeof(Jobrec), err, "job struct");
199 if (!r) {
200 return 0;
202 sz += r;
204 // are we reading trailing zeroes?
205 if (!jr.id) return 0;
207 j = job_find(jr.id);
208 if (!(j || namelen)) {
209 // We read a short record without having seen a
210 // full record for this job, so the full record
211 // was in an earlier file that has been deleted.
212 // Therefore the job itself has either been
213 // deleted or migrated; either way, this record
214 // should be ignored.
215 return 1;
218 switch (jr.state) {
219 case Reserved:
220 jr.state = Ready;
221 /* Falls through */
222 case Ready:
223 case Buried:
224 case Delayed:
225 if (!j) {
226 if ((size_t)jr.body_size > job_data_size_limit) {
227 warnpos(f, -r, "job %"PRIu64" is too big (%"PRId32" > %zu)",
228 jr.id,
229 jr.body_size,
230 job_data_size_limit);
231 goto Error;
233 t = tube_find_or_make(tubename);
234 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
235 t, jr.id);
236 job_list_reset(j);
237 j->r.created_at = jr.created_at;
239 j->r = jr;
240 job_list_insert(l, j);
242 // full record; read the job body
243 if (namelen) {
244 if (jr.body_size != j->r.body_size) {
245 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
246 warnpos(f, -r, "was %d, now %d", j->r.body_size, jr.body_size);
247 goto Error;
249 r = readfull(f, j->body, j->r.body_size, err, "job body");
250 if (!r) {
251 goto Error;
253 sz += r;
255 // since this is a full record, we can move
256 // the file pointer and decref the old
257 // file, if any
258 filermjob(j->file, j);
259 fileaddjob(f, j);
261 j->walused += sz;
262 f->w->alive += sz;
264 return 1;
265 case Invalid:
266 if (j) {
267 job_list_remove(j);
268 filermjob(j->file, j);
269 job_free(j);
271 return 1;
274 Error:
275 *err = 1;
276 if (j) {
277 job_list_remove(j);
278 filermjob(j->file, j);
279 job_free(j);
281 return 0;
285 // Readrec5 is like readrec, but it reads a record in "version 5"
286 // of the log format.
287 static int
288 readrec5(File *f, Job *l, int *err)
290 int r, sz = 0;
291 size_t namelen;
292 Jobrec5 jr;
293 Job *j;
294 Tube *t;
295 char tubename[MAX_TUBE_NAME_LEN];
297 r = read(f->fd, &namelen, sizeof(namelen));
298 if (r == -1) {
299 twarn("read");
300 warnpos(f, 0, "error");
301 *err = 1;
302 return 0;
304 if (r != sizeof(namelen)) {
305 return 0;
307 sz += r;
308 if (namelen >= MAX_TUBE_NAME_LEN) {
309 warnpos(f, -r, "namelen %zu exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
310 *err = 1;
311 return 0;
314 if (namelen) {
315 r = readfull(f, tubename, namelen, err, "v5 tube name");
316 if (!r) {
317 return 0;
319 sz += r;
321 tubename[namelen] = '\0';
323 r = readfull(f, &jr, Jobrec5size, err, "v5 job struct");
324 if (!r) {
325 return 0;
327 sz += r;
329 // are we reading trailing zeroes?
330 if (!jr.id) return 0;
332 j = job_find(jr.id);
333 if (!(j || namelen)) {
334 // We read a short record without having seen a
335 // full record for this job, so the full record
336 // was in an eariler file that has been deleted.
337 // Therefore the job itself has either been
338 // deleted or migrated; either way, this record
339 // should be ignored.
340 return 1;
343 switch (jr.state) {
344 case Reserved:
345 jr.state = Ready;
346 /* Falls through */
347 case Ready:
348 case Buried:
349 case Delayed:
350 if (!j) {
351 if ((size_t)jr.body_size > job_data_size_limit) {
352 warnpos(f, -r, "job %"PRIu64" is too big (%"PRId32" > %zu)",
353 jr.id,
354 jr.body_size,
355 job_data_size_limit);
356 goto Error;
358 t = tube_find_or_make(tubename);
359 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
360 t, jr.id);
361 job_list_reset(j);
363 j->r.id = jr.id;
364 j->r.pri = jr.pri;
365 j->r.delay = jr.delay * 1000; // us => ns
366 j->r.ttr = jr.ttr * 1000; // us => ns
367 j->r.body_size = jr.body_size;
368 j->r.created_at = jr.created_at * 1000; // us => ns
369 j->r.deadline_at = jr.deadline_at * 1000; // us => ns
370 j->r.reserve_ct = jr.reserve_ct;
371 j->r.timeout_ct = jr.timeout_ct;
372 j->r.release_ct = jr.release_ct;
373 j->r.bury_ct = jr.bury_ct;
374 j->r.kick_ct = jr.kick_ct;
375 j->r.state = jr.state;
376 job_list_insert(l, j);
378 // full record; read the job body
379 if (namelen) {
380 if (jr.body_size != j->r.body_size) {
381 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
382 warnpos(f, -r, "was %"PRId32", now %"PRId32, j->r.body_size, jr.body_size);
383 goto Error;
385 r = readfull(f, j->body, j->r.body_size, err, "v5 job body");
386 if (!r) {
387 goto Error;
389 sz += r;
391 // since this is a full record, we can move
392 // the file pointer and decref the old
393 // file, if any
394 filermjob(j->file, j);
395 fileaddjob(f, j);
397 j->walused += sz;
398 f->w->alive += sz;
400 return 1;
401 case Invalid:
402 if (j) {
403 job_list_remove(j);
404 filermjob(j->file, j);
405 job_free(j);
407 return 1;
410 Error:
411 *err = 1;
412 if (j) {
413 job_list_remove(j);
414 filermjob(j->file, j);
415 job_free(j);
417 return 0;
421 static int
422 readfull(File *f, void *c, int n, int *err, char *desc)
424 int r;
426 r = read(f->fd, c, n);
427 if (r == -1) {
428 twarn("read");
429 warnpos(f, 0, "error reading %s", desc);
430 *err = 1;
431 return 0;
433 if (r != n) {
434 warnpos(f, -r, "unexpected EOF reading %d bytes (got %d): %s", n, r, desc);
435 *err = 1;
436 return 0;
438 return r;
441 static void
442 warnpos(File *f, int adj, char *fmt, ...)
444 int off;
445 va_list ap;
447 off = lseek(f->fd, 0, SEEK_CUR);
448 fprintf(stderr, "%s:%d: ", f->path, off+adj);
449 va_start(ap, fmt);
450 vfprintf(stderr, fmt, ap);
451 va_end(ap);
452 fputc('\n', stderr);
456 // Opens f for writing, writes a header, and initializes
457 // f->free and f->resv.
458 // Sets f->iswopen if successful.
459 void
460 filewopen(File *f)
462 int fd, r;
463 int n;
464 int ver = Walver;
466 fd = open(f->path, O_WRONLY|O_CREAT, 0400);
467 if (fd < 0) {
468 twarn("open %s", f->path);
469 return;
472 r = falloc(fd, f->w->filesize);
473 if (r) {
474 if (close(fd) == -1)
475 twarn("close");
476 errno = r;
477 twarn("falloc %s", f->path);
478 r = unlink(f->path);
479 if (r) {
480 twarn("unlink %s", f->path);
482 return;
485 n = write(fd, &ver, sizeof(int));
486 if (n < 0 || (size_t)n < sizeof(int)) {
487 twarn("write %s", f->path);
488 if (close(fd) == -1)
489 twarn("close");
490 return;
493 f->fd = fd;
494 f->iswopen = 1;
495 fileincref(f);
496 f->free = f->w->filesize - n;
497 f->resv = 0;
501 static int
502 filewrite(File *f, Job *j, void *buf, int len)
504 int r;
506 r = write(f->fd, buf, len);
507 if (r != len) {
508 twarn("write");
509 return 0;
512 f->w->resv -= r;
513 f->resv -= r;
514 j->walresv -= r;
515 j->walused += r;
516 f->w->alive += r;
517 return 1;
522 filewrjobshort(File *f, Job *j)
524 int r, nl;
526 nl = 0; // name len 0 indicates short record
527 r = filewrite(f, j, &nl, sizeof nl) &&
528 filewrite(f, j, &j->r, sizeof j->r);
529 if (!r) return 0;
531 if (j->r.state == Invalid) {
532 filermjob(j->file, j);
535 return r;
540 filewrjobfull(File *f, Job *j)
542 int nl;
544 fileaddjob(f, j);
545 nl = strlen(j->tube->name);
546 return
547 filewrite(f, j, &nl, sizeof nl) &&
548 filewrite(f, j, j->tube->name, nl) &&
549 filewrite(f, j, &j->r, sizeof j->r) &&
550 filewrite(f, j, j->body, j->r.body_size);
554 void
555 filewclose(File *f)
557 if (!f) return;
558 if (!f->iswopen) return;
559 if (f->free) {
560 errno = 0;
561 if (ftruncate(f->fd, f->w->filesize - f->free) != 0) {
562 twarn("ftruncate");
565 if (close(f->fd) == -1)
566 twarn("close");
567 f->iswopen = 0;
568 filedecref(f);
573 fileinit(File *f, Wal *w, int n)
575 f->w = w;
576 f->seq = n;
577 f->path = fmtalloc("%s/binlog.%d", w->dir, n);
578 return !!f->path;
582 // Adds f to the linked list in w,
583 // updating w->tail and w->head as necessary.
584 Wal*
585 fileadd(File *f, Wal *w)
587 if (w->tail) {
588 w->tail->next = f;
590 w->tail = f;
591 if (!w->head) {
592 w->head = f;
594 w->nfile++;
595 return w;