New command: kick-job.
[beanstalkd.git] / file.c
blobcd81f443bb05c7cd2f2771ce6c4a6bd8bcab953c
1 #include <stdint.h>
2 #include <inttypes.h>
3 #include <stddef.h>
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <stdarg.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <errno.h>
12 #include <string.h>
13 #include "dat.h"
15 static void warnpos(File*, int, char*, ...);
16 static int readrec(File*, job, int*);
17 static int readrec5(File*, job, int*);
18 static int readfull(File*, void*, int, int*, char*);
20 FAlloc *falloc = &rawfalloc;
22 enum
24 Walver5 = 5
27 typedef struct Jobrec5 Jobrec5;
29 struct Jobrec5 {
30 uint64 id;
31 uint32 pri;
32 uint64 delay; // usec
33 uint64 ttr; // usec
34 int32 body_size;
35 uint64 created_at; // usec
36 uint64 deadline_at; // usec
37 uint32 reserve_ct;
38 uint32 timeout_ct;
39 uint32 release_ct;
40 uint32 bury_ct;
41 uint32 kick_ct;
42 byte state;
44 char pad[1];
47 enum
49 Jobrec5size = offsetof(Jobrec5, pad)
52 void
53 fileincref(File *f)
55 if (!f) return;
56 f->refs++;
60 void
61 filedecref(File *f)
63 if (!f) return;
64 f->refs--;
65 if (f->refs < 1) {
66 walgc(f->w);
71 void
72 fileaddjob(File *f, job j)
74 job h;
76 h = &f->jlist;
77 if (!h->fprev) h->fprev = h;
78 j->file = f;
79 j->fprev = h->fprev;
80 j->fnext = h;
81 h->fprev->fnext = j;
82 h->fprev = j;
83 fileincref(f);
87 void
88 filermjob(File *f, job j)
90 if (!f) return;
91 if (f != j->file) return;
92 j->fnext->fprev = j->fprev;
93 j->fprev->fnext = j->fnext;
94 j->fnext = 0;
95 j->fprev = 0;
96 j->file = NULL;
97 f->w->alive -= j->walused;
98 j->walused = 0;
99 filedecref(f);
103 // Fileread reads jobs from f->path into list.
104 // It returns 0 on success, or 1 if any errors occurred.
106 fileread(File *f, job list)
108 int err = 0, v;
110 if (!readfull(f, &v, sizeof(v), &err, "version")) {
111 return err;
113 switch (v) {
114 case Walver:
115 fileincref(f);
116 while (readrec(f, list, &err));
117 filedecref(f);
118 return err;
119 case Walver5:
120 fileincref(f);
121 while (readrec5(f, list, &err));
122 filedecref(f);
123 return err;
126 warnx("%s: unknown version: %d", f->path, v);
127 return 1;
131 // Readrec reads a record from f->fd into linked list l.
132 // If an error occurs, it sets *err to 1.
133 // Readrec returns the number of records read, either 1 or 0.
134 static int
135 readrec(File *f, job l, int *err)
137 int r, sz = 0;
138 int namelen;
139 Jobrec jr;
140 job j;
141 tube t;
142 char tubename[MAX_TUBE_NAME_LEN];
144 r = read(f->fd, &namelen, sizeof(int));
145 if (r == -1) {
146 twarn("read");
147 warnpos(f, 0, "error");
148 *err = 1;
149 return 0;
151 if (r != sizeof(int)) {
152 return 0;
154 sz += r;
155 if (namelen >= MAX_TUBE_NAME_LEN) {
156 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
157 *err = 1;
158 return 0;
161 if (namelen < 0) {
162 warnpos(f, -r, "namelen %d is negative", namelen);
163 *err = 1;
164 return 0;
167 if (namelen) {
168 r = readfull(f, tubename, namelen, err, "tube name");
169 if (!r) {
170 return 0;
172 sz += r;
174 tubename[namelen] = '\0';
176 r = readfull(f, &jr, sizeof(Jobrec), err, "job struct");
177 if (!r) {
178 return 0;
180 sz += r;
182 // are we reading trailing zeroes?
183 if (!jr.id) return 0;
185 j = job_find(jr.id);
186 if (!(j || namelen)) {
187 // We read a short record without having seen a
188 // full record for this job, so the full record
189 // was in an eariler file that has been deleted.
190 // Therefore the job itself has either been
191 // deleted or migrated; either way, this record
192 // should be ignored.
193 return 1;
196 switch (jr.state) {
197 case Reserved:
198 jr.state = Ready;
199 case Ready:
200 case Buried:
201 case Delayed:
202 if (!j) {
203 if (jr.body_size > job_data_size_limit) {
204 warnpos(f, -r, "job %"PRIu64" is too big (%zd > %zd)", jr.id, job_data_size_limit);
205 goto Error;
207 t = tube_find_or_make(tubename);
208 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
209 t, jr.id);
210 j->next = j->prev = j;
211 j->r.created_at = jr.created_at;
213 j->r = jr;
214 job_insert(l, j);
216 // full record; read the job body
217 if (namelen) {
218 if (jr.body_size != j->r.body_size) {
219 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
220 warnpos(f, -r, "was %zu, now %zu", j->r.body_size, jr.body_size);
221 goto Error;
223 r = readfull(f, j->body, j->r.body_size, err, "job body");
224 if (!r) {
225 goto Error;
227 sz += r;
229 // since this is a full record, we can move
230 // the file pointer and decref the old
231 // file, if any
232 filermjob(j->file, j);
233 fileaddjob(f, j);
235 j->walused += sz;
236 f->w->alive += sz;
238 return 1;
239 case Invalid:
240 if (j) {
241 job_remove(j);
242 filermjob(j->file, j);
243 job_free(j);
245 return 1;
248 Error:
249 *err = 1;
250 if (j) {
251 job_remove(j);
252 filermjob(j->file, j);
253 job_free(j);
255 return 0;
259 // Readrec5 is like readrec, but it reads a record in "version 5"
260 // of the log format.
261 static int
262 readrec5(File *f, job l, int *err)
264 int r, sz = 0;
265 size_t namelen;
266 Jobrec5 jr;
267 job j;
268 tube t;
269 char tubename[MAX_TUBE_NAME_LEN];
271 r = read(f->fd, &namelen, sizeof(namelen));
272 if (r == -1) {
273 twarn("read");
274 warnpos(f, 0, "error");
275 *err = 1;
276 return 0;
278 if (r != sizeof(namelen)) {
279 return 0;
281 sz += r;
282 if (namelen >= MAX_TUBE_NAME_LEN) {
283 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
284 *err = 1;
285 return 0;
288 if (namelen) {
289 r = readfull(f, tubename, namelen, err, "v5 tube name");
290 if (!r) {
291 return 0;
293 sz += r;
295 tubename[namelen] = '\0';
297 r = readfull(f, &jr, Jobrec5size, err, "v5 job struct");
298 if (!r) {
299 return 0;
301 sz += r;
303 // are we reading trailing zeroes?
304 if (!jr.id) return 0;
306 j = job_find(jr.id);
307 if (!(j || namelen)) {
308 // We read a short record without having seen a
309 // full record for this job, so the full record
310 // was in an eariler file that has been deleted.
311 // Therefore the job itself has either been
312 // deleted or migrated; either way, this record
313 // should be ignored.
314 return 1;
317 switch (jr.state) {
318 case Reserved:
319 jr.state = Ready;
320 case Ready:
321 case Buried:
322 case Delayed:
323 if (!j) {
324 if (jr.body_size > job_data_size_limit) {
325 warnpos(f, -r, "job %"PRIu64" is too big (%zd > %zd)", jr.id, job_data_size_limit);
326 goto Error;
328 t = tube_find_or_make(tubename);
329 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
330 t, jr.id);
331 j->next = j->prev = j;
332 j->r.created_at = jr.created_at;
334 j->r.id = jr.id;
335 j->r.pri = jr.pri;
336 j->r.delay = jr.delay * 1000; // us => ns
337 j->r.ttr = jr.ttr * 1000; // us => ns
338 j->r.body_size = jr.body_size;
339 j->r.created_at = jr.created_at * 1000; // us => ns
340 j->r.deadline_at = jr.deadline_at * 1000; // us => ns
341 j->r.reserve_ct = jr.reserve_ct;
342 j->r.timeout_ct = jr.timeout_ct;
343 j->r.release_ct = jr.release_ct;
344 j->r.bury_ct = jr.bury_ct;
345 j->r.kick_ct = jr.kick_ct;
346 j->r.state = jr.state;
347 job_insert(l, j);
349 // full record; read the job body
350 if (namelen) {
351 if (jr.body_size != j->r.body_size) {
352 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
353 warnpos(f, -r, "was %zu, now %zu", j->r.body_size, jr.body_size);
354 goto Error;
356 r = readfull(f, j->body, j->r.body_size, err, "v5 job body");
357 if (!r) {
358 goto Error;
360 sz += r;
362 // since this is a full record, we can move
363 // the file pointer and decref the old
364 // file, if any
365 filermjob(j->file, j);
366 fileaddjob(f, j);
368 j->walused += sz;
369 f->w->alive += sz;
371 return 1;
372 case Invalid:
373 if (j) {
374 job_remove(j);
375 filermjob(j->file, j);
376 job_free(j);
378 return 1;
381 Error:
382 *err = 1;
383 if (j) {
384 job_remove(j);
385 filermjob(j->file, j);
386 job_free(j);
388 return 0;
392 static int
393 readfull(File *f, void *c, int n, int *err, char *desc)
395 int r;
397 r = read(f->fd, c, n);
398 if (r == -1) {
399 twarn("read");
400 warnpos(f, 0, "error reading %s", desc);
401 *err = 1;
402 return 0;
404 if (r != n) {
405 warnpos(f, -r, "unexpected EOF reading %d bytes (got %d): %s", n, r, desc);
406 *err = 1;
407 return 0;
409 return r;
413 static void
414 warnpos(File *f, int adj, char *fmt, ...)
416 int off;
417 va_list ap;
419 off = lseek(f->fd, 0, SEEK_CUR);
420 fprintf(stderr, "%s:%u: ", f->path, off+adj);
421 va_start(ap, fmt);
422 vfprintf(stderr, fmt, ap);
423 va_end(ap);
424 fputc('\n', stderr);
428 // Opens f for writing, writes a header, and initializes
429 // f->free and f->resv.
430 // Sets f->iswopen if successful.
431 void
432 filewopen(File *f)
434 int fd, r;
435 int n;
436 int ver = Walver;
438 fd = open(f->path, O_WRONLY|O_CREAT, 0400);
439 if (fd < 0) {
440 twarn("open %s", f->path);
441 return;
444 r = falloc(fd, f->w->filesize);
445 if (r) {
446 close(fd);
447 errno = r;
448 twarn("falloc %s", f->path);
449 r = unlink(f->path);
450 if (r) {
451 twarn("unlink %s", f->path);
453 return;
456 n = write(fd, &ver, sizeof(int));
457 if (n < sizeof(int)) {
458 twarn("write %s", f->path);
459 close(fd);
460 return;
463 f->fd = fd;
464 f->iswopen = 1;
465 fileincref(f);
466 f->free = f->w->filesize - n;
467 f->resv = 0;
471 static int
472 filewrite(File *f, job j, void *buf, int len)
474 int r;
476 r = write(f->fd, buf, len);
477 if (r != len) {
478 twarn("write");
479 return 0;
482 f->w->resv -= r;
483 f->resv -= r;
484 j->walresv -= r;
485 j->walused += r;
486 f->w->alive += r;
487 return 1;
492 filewrjobshort(File *f, job j)
494 int r, nl;
496 nl = 0; // name len 0 indicates short record
497 r = filewrite(f, j, &nl, sizeof nl) &&
498 filewrite(f, j, &j->r, sizeof j->r);
499 if (!r) return 0;
501 if (j->r.state == Invalid) {
502 filermjob(j->file, j);
505 return r;
510 filewrjobfull(File *f, job j)
512 int nl;
514 fileaddjob(f, j);
515 nl = strlen(j->tube->name);
516 return
517 filewrite(f, j, &nl, sizeof nl) &&
518 filewrite(f, j, j->tube->name, nl) &&
519 filewrite(f, j, &j->r, sizeof j->r) &&
520 filewrite(f, j, j->body, j->r.body_size);
524 void
525 filewclose(File *f)
527 if (!f) return;
528 if (!f->iswopen) return;
529 if (f->free) {
530 (void)ftruncate(f->fd, f->w->filesize - f->free);
532 close(f->fd);
533 f->iswopen = 0;
534 filedecref(f);
539 fileinit(File *f, Wal *w, int n)
541 f->w = w;
542 f->seq = n;
543 f->path = fmtalloc("%s/binlog.%d", w->dir, n);
544 return !!f->path;
548 // Adds f to the linked list in w,
549 // updating w->tail and w->head as necessary.
550 Wal*
551 fileadd(File *f, Wal *w)
553 if (w->tail) {
554 w->tail->next = f;
556 w->tail = f;
557 if (!w->head) {
558 w->head = f;
560 w->nfile++;
561 return w;