improve the tests
[beanstalkd.git] / file.c
blobafdb1e304ca3a97d191ededd4d0b08ea332335f5
1 #include <stdint.h>
2 #include <inttypes.h>
3 #include <stddef.h>
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <stdarg.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <errno.h>
12 #include <string.h>
13 #include "dat.h"
15 static void warnpos(File*, int, char*, ...);
16 static int readrec(File*, job, int*);
17 static int readrec5(File*, job, int*);
18 static int readfull(File*, void*, int, int*, char*);
20 FAlloc *falloc = &rawfalloc;
22 enum
24 Walver5 = 5
27 typedef struct Jobrec5 Jobrec5;
29 struct Jobrec5 {
30 uint64 id;
31 uint32 pri;
32 uint64 delay; // usec
33 uint64 ttr; // usec
34 int32 body_size;
35 uint64 created_at; // usec
36 uint64 deadline_at; // usec
37 uint32 reserve_ct;
38 uint32 timeout_ct;
39 uint32 release_ct;
40 uint32 bury_ct;
41 uint32 kick_ct;
42 byte state;
44 char pad[1];
47 enum
49 Jobrec5size = offsetof(Jobrec5, pad)
52 void
53 fileincref(File *f)
55 if (!f) return;
56 f->refs++;
60 void
61 filedecref(File *f)
63 if (!f) return;
64 f->refs--;
65 if (f->refs < 1) {
66 walgc(f->w);
71 void
72 fileaddjob(File *f, job j)
74 job h;
76 h = &f->jlist;
77 if (!h->fprev) h->fprev = h;
78 j->file = f;
79 j->fprev = h->fprev;
80 j->fnext = h;
81 h->fprev->fnext = j;
82 h->fprev = j;
83 fileincref(f);
87 void
88 filermjob(File *f, job j)
90 if (!f) return;
91 if (f != j->file) return;
92 j->fnext->fprev = j->fprev;
93 j->fprev->fnext = j->fnext;
94 j->fnext = 0;
95 j->fprev = 0;
96 j->file = NULL;
97 f->w->alive -= j->walused;
98 j->walused = 0;
99 filedecref(f);
103 // Fileread reads jobs from f->path into list.
104 // It returns 0 on success, or 1 if any errors occurred.
106 fileread(File *f, job list)
108 int err = 0, v;
110 if (!readfull(f, &v, sizeof(v), &err, "version")) {
111 return err;
113 switch (v) {
114 case Walver:
115 fileincref(f);
116 while (readrec(f, list, &err));
117 filedecref(f);
118 return err;
119 case Walver5:
120 fileincref(f);
121 while (readrec5(f, list, &err));
122 filedecref(f);
123 return err;
126 warnx("%s: unknown version: %d", f->path, v);
127 return 1;
131 // Readrec reads a record from f->fd into linked list l.
132 // If an error occurs, it sets *err to 1.
133 // Readrec returns the number of records read, either 1 or 0.
134 static int
135 readrec(File *f, job l, int *err)
137 int r, sz = 0;
138 int namelen;
139 Jobrec jr;
140 job j;
141 tube t;
142 char tubename[MAX_TUBE_NAME_LEN];
144 r = read(f->fd, &namelen, sizeof(int));
145 if (r == -1) {
146 twarn("read");
147 warnpos(f, 0, "error");
148 *err = 1;
149 return 0;
151 if (r != sizeof(int)) {
152 return 0;
154 sz += r;
155 if (namelen >= MAX_TUBE_NAME_LEN) {
156 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
157 *err = 1;
158 return 0;
161 if (namelen) {
162 r = readfull(f, tubename, namelen, err, "tube name");
163 if (!r) {
164 return 0;
166 sz += r;
168 tubename[namelen] = '\0';
170 r = readfull(f, &jr, sizeof(Jobrec), err, "job struct");
171 if (!r) {
172 return 0;
174 sz += r;
176 // are we reading trailing zeroes?
177 if (!jr.id) return 0;
179 j = job_find(jr.id);
180 if (!(j || namelen)) {
181 // We read a short record without having seen a
182 // full record for this job, so the full record
183 // was in an eariler file that has been deleted.
184 // Therefore the job itself has either been
185 // deleted or migrated; either way, this record
186 // should be ignored.
187 return 1;
190 switch (jr.state) {
191 case Reserved:
192 jr.state = Ready;
193 case Ready:
194 case Buried:
195 case Delayed:
196 if (!j) {
197 t = tube_find_or_make(tubename);
198 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
199 t, jr.id);
200 j->next = j->prev = j;
201 j->r.created_at = jr.created_at;
203 j->r = jr;
204 job_insert(l, j);
206 // full record; read the job body
207 if (namelen) {
208 if (jr.body_size != j->r.body_size) {
209 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
210 warnpos(f, -r, "was %zu, now %zu", j->r.body_size, jr.body_size);
211 goto Error;
213 r = readfull(f, j->body, j->r.body_size, err, "job body");
214 if (!r) {
215 goto Error;
217 sz += r;
219 // since this is a full record, we can move
220 // the file pointer and decref the old
221 // file, if any
222 filermjob(j->file, j);
223 fileaddjob(f, j);
225 j->walused += sz;
226 f->w->alive += sz;
228 return 1;
229 case Invalid:
230 if (j) {
231 job_remove(j);
232 filermjob(j->file, j);
233 job_free(j);
235 return 1;
238 Error:
239 *err = 1;
240 if (j) {
241 job_remove(j);
242 filermjob(j->file, j);
243 job_free(j);
245 return 0;
249 // Readrec5 is like readrec, but it reads a record in "version 5"
250 // of the log format.
251 static int
252 readrec5(File *f, job l, int *err)
254 int r, sz = 0;
255 size_t namelen;
256 Jobrec5 jr;
257 job j;
258 tube t;
259 char tubename[MAX_TUBE_NAME_LEN];
261 r = read(f->fd, &namelen, sizeof(namelen));
262 if (r == -1) {
263 twarn("read");
264 warnpos(f, 0, "error");
265 *err = 1;
266 return 0;
268 if (r != sizeof(namelen)) {
269 return 0;
271 sz += r;
272 if (namelen >= MAX_TUBE_NAME_LEN) {
273 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
274 *err = 1;
275 return 0;
278 if (namelen) {
279 r = readfull(f, tubename, namelen, err, "v5 tube name");
280 if (!r) {
281 return 0;
283 sz += r;
285 tubename[namelen] = '\0';
287 r = readfull(f, &jr, Jobrec5size, err, "v5 job struct");
288 if (!r) {
289 return 0;
291 sz += r;
293 // are we reading trailing zeroes?
294 if (!jr.id) return 0;
296 j = job_find(jr.id);
297 if (!(j || namelen)) {
298 // We read a short record without having seen a
299 // full record for this job, so the full record
300 // was in an eariler file that has been deleted.
301 // Therefore the job itself has either been
302 // deleted or migrated; either way, this record
303 // should be ignored.
304 return 1;
307 switch (jr.state) {
308 case Reserved:
309 jr.state = Ready;
310 case Ready:
311 case Buried:
312 case Delayed:
313 if (!j) {
314 t = tube_find_or_make(tubename);
315 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
316 t, jr.id);
317 j->next = j->prev = j;
318 j->r.created_at = jr.created_at;
320 j->r.id = jr.id;
321 j->r.pri = jr.pri;
322 j->r.delay = jr.delay * 1000; // us => ns
323 j->r.ttr = jr.ttr * 1000; // us => ns
324 j->r.body_size = jr.body_size;
325 j->r.created_at = jr.created_at * 1000; // us => ns
326 j->r.deadline_at = jr.deadline_at * 1000; // us => ns
327 j->r.reserve_ct = jr.reserve_ct;
328 j->r.timeout_ct = jr.timeout_ct;
329 j->r.release_ct = jr.release_ct;
330 j->r.bury_ct = jr.bury_ct;
331 j->r.kick_ct = jr.kick_ct;
332 j->r.state = jr.state;
333 job_insert(l, j);
335 // full record; read the job body
336 if (namelen) {
337 if (jr.body_size != j->r.body_size) {
338 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
339 warnpos(f, -r, "was %zu, now %zu", j->r.body_size, jr.body_size);
340 goto Error;
342 r = readfull(f, j->body, j->r.body_size, err, "v5 job body");
343 if (!r) {
344 goto Error;
346 sz += r;
348 // since this is a full record, we can move
349 // the file pointer and decref the old
350 // file, if any
351 filermjob(j->file, j);
352 fileaddjob(f, j);
354 j->walused += sz;
355 f->w->alive += sz;
357 return 1;
358 case Invalid:
359 if (j) {
360 job_remove(j);
361 filermjob(j->file, j);
362 job_free(j);
364 return 1;
367 Error:
368 *err = 1;
369 if (j) {
370 job_remove(j);
371 filermjob(j->file, j);
372 job_free(j);
374 return 0;
378 static int
379 readfull(File *f, void *c, int n, int *err, char *desc)
381 int r;
383 r = read(f->fd, c, n);
384 if (r == -1) {
385 twarn("read");
386 warnpos(f, 0, "error reading %s", desc);
387 *err = 1;
388 return 0;
390 if (r != n) {
391 warnpos(f, -r, "unexpected EOF reading %d bytes (got %d): %s", n, r, desc);
392 *err = 1;
393 return 0;
395 return r;
399 static void
400 warnpos(File *f, int adj, char *fmt, ...)
402 int off;
403 va_list ap;
405 off = lseek(f->fd, 0, SEEK_CUR);
406 fprintf(stderr, "%s:%u: ", f->path, off+adj);
407 va_start(ap, fmt);
408 vfprintf(stderr, fmt, ap);
409 va_end(ap);
410 fputc('\n', stderr);
414 // Opens f for writing, writes a header, and initializes
415 // f->free and f->resv.
416 // Sets f->iswopen if successful.
417 void
418 filewopen(File *f)
420 int fd, r;
421 int n;
422 int ver = Walver;
424 fd = open(f->path, O_WRONLY|O_CREAT, 0400);
425 if (fd < 0) {
426 twarn("open %s", f->path);
427 return;
430 r = falloc(fd, f->w->filesz);
431 if (r) {
432 close(fd);
433 errno = r;
434 twarn("falloc %s", f->path);
435 r = unlink(f->path);
436 if (r) {
437 twarn("unlink %s", f->path);
439 return;
442 n = write(fd, &ver, sizeof(int));
443 if (n < sizeof(int)) {
444 twarn("write %s", f->path);
445 close(fd);
446 return;
449 f->fd = fd;
450 f->iswopen = 1;
451 fileincref(f);
452 f->free = f->w->filesz - n;
453 f->resv = 0;
457 static int
458 filewrite(File *f, job j, void *buf, int len)
460 int r;
462 r = write(f->fd, buf, len);
463 if (r != len) {
464 twarn("write");
465 return 0;
468 f->w->resv -= r;
469 f->resv -= r;
470 j->walresv -= r;
471 j->walused += r;
472 f->w->alive += r;
473 return 1;
478 filewrjobshort(File *f, job j)
480 int r, nl;
482 nl = 0; // name len 0 indicates short record
483 r = filewrite(f, j, &nl, sizeof nl) &&
484 filewrite(f, j, &j->r, sizeof j->r);
485 if (!r) return 0;
487 if (j->r.state == Invalid) {
488 filermjob(j->file, j);
491 return r;
496 filewrjobfull(File *f, job j)
498 int nl;
500 fileaddjob(f, j);
501 nl = strlen(j->tube->name);
502 return
503 filewrite(f, j, &nl, sizeof nl) &&
504 filewrite(f, j, j->tube->name, nl) &&
505 filewrite(f, j, &j->r, sizeof j->r) &&
506 filewrite(f, j, j->body, j->r.body_size);
510 void
511 filewclose(File *f)
513 if (!f) return;
514 if (!f->iswopen) return;
515 if (f->free) {
516 (void)ftruncate(f->fd, f->w->filesz - f->free);
518 close(f->fd);
519 f->iswopen = 0;
520 filedecref(f);
525 fileinit(File *f, Wal *w, int n)
527 f->w = w;
528 f->seq = n;
529 f->path = fmtalloc("%s/binlog.%d", w->dir, n);
530 return !!f->path;
534 // Adds f to the linked list in w,
535 // updating w->tail and w->head as necessary.
536 Wal*
537 fileadd(File *f, Wal *w)
539 if (w->tail) {
540 w->tail->next = f;
542 w->tail = f;
543 if (!w->head) {
544 w->head = f;
546 w->nfile++;
547 return w;