binlog: better warning message when open fails
[beanstalkd.git] / file.c
bloba41e17a3ae225029be8d7bc1fdab40c3a01169c4
1 #include <stdint.h>
2 #include <inttypes.h>
3 #include <stddef.h>
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <stdarg.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <errno.h>
12 #include <string.h>
13 #include "dat.h"
15 static void warnpos(File*, int, char*, ...);
16 static int readrec(File*, job, int*);
17 static int readrec5(File*, job, int*);
18 static int readfull(File*, void*, int, int*, char*);
20 enum
22 Walver5 = 5
25 typedef struct Jobrec5 Jobrec5;
27 struct Jobrec5 {
28 uint64 id;
29 uint32 pri;
30 uint64 delay; // usec
31 uint64 ttr; // usec
32 int32 body_size;
33 uint64 created_at; // usec
34 uint64 deadline_at; // usec
35 uint32 reserve_ct;
36 uint32 timeout_ct;
37 uint32 release_ct;
38 uint32 bury_ct;
39 uint32 kick_ct;
40 byte state;
42 char pad[1];
45 enum
47 Jobrec5size = offsetof(Jobrec5, pad)
50 void
51 fileincref(File *f)
53 if (!f) return;
54 f->refs++;
58 void
59 filedecref(File *f)
61 if (!f) return;
62 f->refs--;
63 if (f->refs < 1) {
64 walgc(f->w);
69 void
70 fileaddjob(File *f, job j)
72 job h;
74 h = &f->jlist;
75 if (!h->fprev) h->fprev = h;
76 j->file = f;
77 j->fprev = h->fprev;
78 j->fnext = h;
79 h->fprev->fnext = j;
80 h->fprev = j;
81 fileincref(f);
85 void
86 filermjob(File *f, job j)
88 if (!f) return;
89 if (f != j->file) return;
90 j->fnext->fprev = j->fprev;
91 j->fprev->fnext = j->fnext;
92 j->fnext = 0;
93 j->fprev = 0;
94 j->file = NULL;
95 f->w->alive -= j->walused;
96 j->walused = 0;
97 filedecref(f);
101 // Fileread reads jobs from f->path into list.
102 // It returns 0 on success, or 1 if any errors occurred.
104 fileread(File *f, job list)
106 int err = 0, v;
108 if (!readfull(f, &v, sizeof(v), &err, "version")) {
109 return err;
111 switch (v) {
112 case Walver:
113 fileincref(f);
114 while (readrec(f, list, &err));
115 filedecref(f);
116 return err;
117 case Walver5:
118 fileincref(f);
119 while (readrec5(f, list, &err));
120 filedecref(f);
121 return err;
124 warnx("%s: unknown version: %d", f->path, v);
125 return 1;
129 // Readrec reads a record from f->fd into linked list l.
130 // If an error occurs, it sets *err to 1.
131 // Readrec returns the number of records read, either 1 or 0.
132 static int
133 readrec(File *f, job l, int *err)
135 int r, sz = 0;
136 int namelen;
137 Jobrec jr;
138 job j;
139 tube t;
140 char tubename[MAX_TUBE_NAME_LEN];
142 r = read(f->fd, &namelen, sizeof(int));
143 if (r == -1) {
144 twarn("read");
145 warnpos(f, 0, "error");
146 *err = 1;
147 return 0;
149 if (r != sizeof(int)) {
150 return 0;
152 sz += r;
153 if (namelen >= MAX_TUBE_NAME_LEN) {
154 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
155 *err = 1;
156 return 0;
159 if (namelen) {
160 r = readfull(f, tubename, namelen, err, "tube name");
161 if (!r) {
162 return 0;
164 sz += r;
166 tubename[namelen] = '\0';
168 r = readfull(f, &jr, sizeof(Jobrec), err, "job struct");
169 if (!r) {
170 return 0;
172 sz += r;
174 // are we reading trailing zeroes?
175 if (!jr.id) return 0;
177 j = job_find(jr.id);
178 if (!(j || namelen)) {
179 // We read a short record without having seen a
180 // full record for this job, so the full record
181 // was in an eariler file that has been deleted.
182 // Therefore the job itself has either been
183 // deleted or migrated; either way, this record
184 // should be ignored.
185 return 1;
188 switch (jr.state) {
189 case Reserved:
190 jr.state = Ready;
191 case Ready:
192 case Buried:
193 case Delayed:
194 if (!j) {
195 t = tube_find_or_make(tubename);
196 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
197 t, jr.id);
198 j->next = j->prev = j;
199 j->r.created_at = jr.created_at;
201 j->r = jr;
202 job_insert(l, j);
204 // full record; read the job body
205 if (namelen) {
206 if (jr.body_size != j->r.body_size) {
207 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
208 warnpos(f, -r, "was %zu, now %zu", j->r.body_size, jr.body_size);
209 goto Error;
211 r = readfull(f, j->body, j->r.body_size, err, "job body");
212 if (!r) {
213 goto Error;
215 sz += r;
217 // since this is a full record, we can move
218 // the file pointer and decref the old
219 // file, if any
220 filermjob(j->file, j);
221 fileaddjob(f, j);
223 j->walused += sz;
224 f->w->alive += sz;
226 return 1;
227 case Invalid:
228 if (j) {
229 job_remove(j);
230 filermjob(j->file, j);
231 job_free(j);
233 return 1;
236 Error:
237 *err = 1;
238 if (j) {
239 job_remove(j);
240 filermjob(j->file, j);
241 job_free(j);
243 return 0;
247 // Readrec5 is like readrec, but it reads a record in "version 5"
248 // of the log format.
249 static int
250 readrec5(File *f, job l, int *err)
252 int r, sz = 0;
253 size_t namelen;
254 Jobrec5 jr;
255 job j;
256 tube t;
257 char tubename[MAX_TUBE_NAME_LEN];
259 r = read(f->fd, &namelen, sizeof(namelen));
260 if (r == -1) {
261 twarn("read");
262 warnpos(f, 0, "error");
263 *err = 1;
264 return 0;
266 if (r != sizeof(namelen)) {
267 return 0;
269 sz += r;
270 if (namelen >= MAX_TUBE_NAME_LEN) {
271 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
272 *err = 1;
273 return 0;
276 if (namelen) {
277 r = readfull(f, tubename, namelen, err, "v5 tube name");
278 if (!r) {
279 return 0;
281 sz += r;
283 tubename[namelen] = '\0';
285 r = readfull(f, &jr, Jobrec5size, err, "v5 job struct");
286 if (!r) {
287 return 0;
289 sz += r;
291 // are we reading trailing zeroes?
292 if (!jr.id) return 0;
294 j = job_find(jr.id);
295 if (!(j || namelen)) {
296 // We read a short record without having seen a
297 // full record for this job, so the full record
298 // was in an eariler file that has been deleted.
299 // Therefore the job itself has either been
300 // deleted or migrated; either way, this record
301 // should be ignored.
302 return 1;
305 switch (jr.state) {
306 case Reserved:
307 jr.state = Ready;
308 case Ready:
309 case Buried:
310 case Delayed:
311 if (!j) {
312 t = tube_find_or_make(tubename);
313 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
314 t, jr.id);
315 j->next = j->prev = j;
316 j->r.created_at = jr.created_at;
318 j->r.id = jr.id;
319 j->r.pri = jr.pri;
320 j->r.delay = jr.delay * 1000; // us => ns
321 j->r.ttr = jr.ttr * 1000; // us => ns
322 j->r.body_size = jr.body_size;
323 j->r.created_at = jr.created_at * 1000; // us => ns
324 j->r.deadline_at = jr.deadline_at * 1000; // us => ns
325 j->r.reserve_ct = jr.reserve_ct;
326 j->r.timeout_ct = jr.timeout_ct;
327 j->r.release_ct = jr.release_ct;
328 j->r.bury_ct = jr.bury_ct;
329 j->r.kick_ct = jr.kick_ct;
330 j->r.state = jr.state;
331 job_insert(l, j);
333 // full record; read the job body
334 if (namelen) {
335 if (jr.body_size != j->r.body_size) {
336 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
337 warnpos(f, -r, "was %zu, now %zu", j->r.body_size, jr.body_size);
338 goto Error;
340 r = readfull(f, j->body, j->r.body_size, err, "v5 job body");
341 if (!r) {
342 goto Error;
344 sz += r;
346 // since this is a full record, we can move
347 // the file pointer and decref the old
348 // file, if any
349 filermjob(j->file, j);
350 fileaddjob(f, j);
352 j->walused += sz;
353 f->w->alive += sz;
355 return 1;
356 case Invalid:
357 if (j) {
358 job_remove(j);
359 filermjob(j->file, j);
360 job_free(j);
362 return 1;
365 Error:
366 *err = 1;
367 if (j) {
368 job_remove(j);
369 filermjob(j->file, j);
370 job_free(j);
372 return 0;
376 static int
377 readfull(File *f, void *c, int n, int *err, char *desc)
379 int r;
381 r = read(f->fd, c, n);
382 if (r == -1) {
383 twarn("read");
384 warnpos(f, 0, "error reading %s", desc);
385 *err = 1;
386 return 0;
388 if (r != n) {
389 warnpos(f, -r, "unexpected EOF reading %d bytes (got %d): %s", n, r, desc);
390 *err = 1;
391 return 0;
393 return r;
397 static void
398 warnpos(File *f, int adj, char *fmt, ...)
400 int off;
401 va_list ap;
403 off = lseek(f->fd, 0, SEEK_CUR);
404 fprintf(stderr, "%s:%u: ", f->path, off+adj);
405 va_start(ap, fmt);
406 vfprintf(stderr, fmt, ap);
407 va_end(ap);
408 fputc('\n', stderr);
412 // Opens f for writing, writes a header, and initializes
413 // f->free and f->resv.
414 // Sets f->iswopen if successful.
415 void
416 filewopen(File *f)
418 int fd, r;
419 int n;
420 int ver = Walver;
422 fd = open(f->path, O_WRONLY|O_CREAT, 0400);
423 if (fd < 0) {
424 twarn("open %s", f->path);
425 return;
428 r = falloc(fd, f->w->filesz);
429 if (r) {
430 close(fd);
431 errno = r;
432 twarn("Cannot allocate space for file %s", f->path);
433 return;
436 n = write(fd, &ver, sizeof(int));
437 if (n < sizeof(int)) {
438 twarn("write %s", f->path);
439 close(fd);
440 return;
443 f->fd = fd;
444 f->iswopen = 1;
445 fileincref(f);
446 f->free = f->w->filesz - n;
447 f->resv = 0;
451 static int
452 filewrite(File *f, job j, void *buf, int len)
454 int r;
456 r = write(f->fd, buf, len);
457 if (r != len) {
458 twarn("write");
459 return 0;
462 f->w->resv -= r;
463 f->resv -= r;
464 j->walresv -= r;
465 j->walused += r;
466 f->w->alive += r;
467 return 1;
472 filewrjobshort(File *f, job j)
474 int r, nl;
476 nl = 0; // name len 0 indicates short record
477 r = filewrite(f, j, &nl, sizeof nl) &&
478 filewrite(f, j, &j->r, sizeof j->r);
479 if (!r) return 0;
481 if (j->r.state == Invalid) {
482 filermjob(j->file, j);
485 return r;
490 filewrjobfull(File *f, job j)
492 int nl;
494 fileaddjob(f, j);
495 nl = strlen(j->tube->name);
496 return
497 filewrite(f, j, &nl, sizeof nl) &&
498 filewrite(f, j, j->tube->name, nl) &&
499 filewrite(f, j, &j->r, sizeof j->r) &&
500 filewrite(f, j, j->body, j->r.body_size);
504 void
505 filewclose(File *f)
507 if (!f) return;
508 if (!f->iswopen) return;
509 if (f->free) {
510 (void)ftruncate(f->fd, f->w->filesz - f->free);
512 close(f->fd);
513 f->iswopen = 0;
514 filedecref(f);
519 fileinit(File *f, Wal *w, int n)
521 f->w = w;
522 f->seq = n;
523 f->path = fmtalloc("%s/binlog.%d", w->dir, n);
524 return !!f->path;
528 // Adds f to the linked list in w,
529 // updating w->tail and w->head as necessary.
530 Wal*
531 fileadd(File *f, Wal *w)
533 if (w->tail) {
534 w->tail->next = f;
536 w->tail = f;
537 if (!w->head) {
538 w->head = f;
540 w->nfile++;
541 return w;