shrink the hashmap when it's too sparse
[beanstalkd.git] / file.c
blobe55a9c0f1f7e8a5b51b7e10decc4b3a1427a8c14
1 #include <stdint.h>
2 #include <inttypes.h>
3 #include <stddef.h>
4 #include <stdlib.h>
5 #include <stdio.h>
6 #include <unistd.h>
7 #include <stdarg.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <fcntl.h>
11 #include <errno.h>
12 #include <string.h>
13 #include "dat.h"
15 static int readrec(File*, job, int*);
16 static int readrec5(File*, job, int*);
17 static int readfull(File*, void*, int, int*, char*);
18 static void warnpos(File*, int, char*, ...)
19 __attribute__((format(printf, 3, 4)));
21 FAlloc *falloc = &rawfalloc;
23 enum
25 Walver5 = 5
28 typedef struct Jobrec5 Jobrec5;
30 struct Jobrec5 {
31 uint64 id;
32 uint32 pri;
33 uint64 delay; // usec
34 uint64 ttr; // usec
35 int32 body_size;
36 uint64 created_at; // usec
37 uint64 deadline_at; // usec
38 uint32 reserve_ct;
39 uint32 timeout_ct;
40 uint32 release_ct;
41 uint32 bury_ct;
42 uint32 kick_ct;
43 byte state;
45 char pad[1];
48 enum
50 Jobrec5size = offsetof(Jobrec5, pad)
53 void
54 fileincref(File *f)
56 if (!f) return;
57 f->refs++;
61 void
62 filedecref(File *f)
64 if (!f) return;
65 f->refs--;
66 if (f->refs < 1) {
67 walgc(f->w);
72 void
73 fileaddjob(File *f, job j)
75 job h;
77 h = &f->jlist;
78 if (!h->fprev) h->fprev = h;
79 j->file = f;
80 j->fprev = h->fprev;
81 j->fnext = h;
82 h->fprev->fnext = j;
83 h->fprev = j;
84 fileincref(f);
88 void
89 filermjob(File *f, job j)
91 if (!f) return;
92 if (f != j->file) return;
93 j->fnext->fprev = j->fprev;
94 j->fprev->fnext = j->fnext;
95 j->fnext = 0;
96 j->fprev = 0;
97 j->file = NULL;
98 f->w->alive -= j->walused;
99 j->walused = 0;
100 filedecref(f);
104 // Fileread reads jobs from f->path into list.
105 // It returns 0 on success, or 1 if any errors occurred.
107 fileread(File *f, job list)
109 int err = 0, v;
111 if (!readfull(f, &v, sizeof(v), &err, "version")) {
112 return err;
114 switch (v) {
115 case Walver:
116 fileincref(f);
117 while (readrec(f, list, &err));
118 filedecref(f);
119 return err;
120 case Walver5:
121 fileincref(f);
122 while (readrec5(f, list, &err));
123 filedecref(f);
124 return err;
127 warnx("%s: unknown version: %d", f->path, v);
128 return 1;
132 // Readrec reads a record from f->fd into linked list l.
133 // If an error occurs, it sets *err to 1.
134 // Readrec returns the number of records read, either 1 or 0.
135 static int
136 readrec(File *f, job l, int *err)
138 int r, sz = 0;
139 int namelen;
140 Jobrec jr;
141 job j;
142 tube t;
143 char tubename[MAX_TUBE_NAME_LEN];
145 r = read(f->fd, &namelen, sizeof(int));
146 if (r == -1) {
147 twarn("read");
148 warnpos(f, 0, "error");
149 *err = 1;
150 return 0;
152 if (r != sizeof(int)) {
153 return 0;
155 sz += r;
156 if (namelen >= MAX_TUBE_NAME_LEN) {
157 warnpos(f, -r, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
158 *err = 1;
159 return 0;
162 if (namelen < 0) {
163 warnpos(f, -r, "namelen %d is negative", namelen);
164 *err = 1;
165 return 0;
168 if (namelen) {
169 r = readfull(f, tubename, namelen, err, "tube name");
170 if (!r) {
171 return 0;
173 sz += r;
175 tubename[namelen] = '\0';
177 r = readfull(f, &jr, sizeof(Jobrec), err, "job struct");
178 if (!r) {
179 return 0;
181 sz += r;
183 // are we reading trailing zeroes?
184 if (!jr.id) return 0;
186 j = job_find(jr.id);
187 if (!(j || namelen)) {
188 // We read a short record without having seen a
189 // full record for this job, so the full record
190 // was in an earlier file that has been deleted.
191 // Therefore the job itself has either been
192 // deleted or migrated; either way, this record
193 // should be ignored.
194 return 1;
197 switch (jr.state) {
198 case Reserved:
199 jr.state = Ready;
200 case Ready:
201 case Buried:
202 case Delayed:
203 if (!j) {
204 if (jr.body_size > job_data_size_limit) {
205 warnpos(f, -r, "job %"PRIu64" is too big (%"PRId32" > %zu)",
206 jr.id,
207 jr.body_size,
208 job_data_size_limit);
209 goto Error;
211 t = tube_find_or_make(tubename);
212 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
213 t, jr.id);
214 j->next = j->prev = j;
215 j->r.created_at = jr.created_at;
217 j->r = jr;
218 job_insert(l, j);
220 // full record; read the job body
221 if (namelen) {
222 if (jr.body_size != j->r.body_size) {
223 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
224 warnpos(f, -r, "was %d, now %d", j->r.body_size, jr.body_size);
225 goto Error;
227 r = readfull(f, j->body, j->r.body_size, err, "job body");
228 if (!r) {
229 goto Error;
231 sz += r;
233 // since this is a full record, we can move
234 // the file pointer and decref the old
235 // file, if any
236 filermjob(j->file, j);
237 fileaddjob(f, j);
239 j->walused += sz;
240 f->w->alive += sz;
242 return 1;
243 case Invalid:
244 if (j) {
245 job_remove(j);
246 filermjob(j->file, j);
247 job_free(j);
249 return 1;
252 Error:
253 *err = 1;
254 if (j) {
255 job_remove(j);
256 filermjob(j->file, j);
257 job_free(j);
259 return 0;
263 // Readrec5 is like readrec, but it reads a record in "version 5"
264 // of the log format.
265 static int
266 readrec5(File *f, job l, int *err)
268 int r, sz = 0;
269 size_t namelen;
270 Jobrec5 jr;
271 job j;
272 tube t;
273 char tubename[MAX_TUBE_NAME_LEN];
275 r = read(f->fd, &namelen, sizeof(namelen));
276 if (r == -1) {
277 twarn("read");
278 warnpos(f, 0, "error");
279 *err = 1;
280 return 0;
282 if (r != sizeof(namelen)) {
283 return 0;
285 sz += r;
286 if (namelen >= MAX_TUBE_NAME_LEN) {
287 warnpos(f, -r, "namelen %zu exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
288 *err = 1;
289 return 0;
292 if (namelen) {
293 r = readfull(f, tubename, namelen, err, "v5 tube name");
294 if (!r) {
295 return 0;
297 sz += r;
299 tubename[namelen] = '\0';
301 r = readfull(f, &jr, Jobrec5size, err, "v5 job struct");
302 if (!r) {
303 return 0;
305 sz += r;
307 // are we reading trailing zeroes?
308 if (!jr.id) return 0;
310 j = job_find(jr.id);
311 if (!(j || namelen)) {
312 // We read a short record without having seen a
313 // full record for this job, so the full record
314 // was in an eariler file that has been deleted.
315 // Therefore the job itself has either been
316 // deleted or migrated; either way, this record
317 // should be ignored.
318 return 1;
321 switch (jr.state) {
322 case Reserved:
323 jr.state = Ready;
324 case Ready:
325 case Buried:
326 case Delayed:
327 if (!j) {
328 if (jr.body_size > job_data_size_limit) {
329 warnpos(f, -r, "job %"PRIu64" is too big (%"PRId32" > %zu)",
330 jr.id,
331 jr.body_size,
332 job_data_size_limit);
333 goto Error;
335 t = tube_find_or_make(tubename);
336 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
337 t, jr.id);
338 j->next = j->prev = j;
339 j->r.created_at = jr.created_at;
341 j->r.id = jr.id;
342 j->r.pri = jr.pri;
343 j->r.delay = jr.delay * 1000; // us => ns
344 j->r.ttr = jr.ttr * 1000; // us => ns
345 j->r.body_size = jr.body_size;
346 j->r.created_at = jr.created_at * 1000; // us => ns
347 j->r.deadline_at = jr.deadline_at * 1000; // us => ns
348 j->r.reserve_ct = jr.reserve_ct;
349 j->r.timeout_ct = jr.timeout_ct;
350 j->r.release_ct = jr.release_ct;
351 j->r.bury_ct = jr.bury_ct;
352 j->r.kick_ct = jr.kick_ct;
353 j->r.state = jr.state;
354 job_insert(l, j);
356 // full record; read the job body
357 if (namelen) {
358 if (jr.body_size != j->r.body_size) {
359 warnpos(f, -r, "job %"PRIu64" size changed", j->r.id);
360 warnpos(f, -r, "was %"PRId32", now %"PRId32, j->r.body_size, jr.body_size);
361 goto Error;
363 r = readfull(f, j->body, j->r.body_size, err, "v5 job body");
364 if (!r) {
365 goto Error;
367 sz += r;
369 // since this is a full record, we can move
370 // the file pointer and decref the old
371 // file, if any
372 filermjob(j->file, j);
373 fileaddjob(f, j);
375 j->walused += sz;
376 f->w->alive += sz;
378 return 1;
379 case Invalid:
380 if (j) {
381 job_remove(j);
382 filermjob(j->file, j);
383 job_free(j);
385 return 1;
388 Error:
389 *err = 1;
390 if (j) {
391 job_remove(j);
392 filermjob(j->file, j);
393 job_free(j);
395 return 0;
399 static int
400 readfull(File *f, void *c, int n, int *err, char *desc)
402 int r;
404 r = read(f->fd, c, n);
405 if (r == -1) {
406 twarn("read");
407 warnpos(f, 0, "error reading %s", desc);
408 *err = 1;
409 return 0;
411 if (r != n) {
412 warnpos(f, -r, "unexpected EOF reading %d bytes (got %d): %s", n, r, desc);
413 *err = 1;
414 return 0;
416 return r;
419 static void
420 warnpos(File *f, int adj, char *fmt, ...)
422 int off;
423 va_list ap;
425 off = lseek(f->fd, 0, SEEK_CUR);
426 fprintf(stderr, "%s:%u: ", f->path, off+adj);
427 va_start(ap, fmt);
428 vfprintf(stderr, fmt, ap);
429 va_end(ap);
430 fputc('\n', stderr);
434 // Opens f for writing, writes a header, and initializes
435 // f->free and f->resv.
436 // Sets f->iswopen if successful.
437 void
438 filewopen(File *f)
440 int fd, r;
441 int n;
442 int ver = Walver;
444 fd = open(f->path, O_WRONLY|O_CREAT, 0400);
445 if (fd < 0) {
446 twarn("open %s", f->path);
447 return;
450 r = falloc(fd, f->w->filesize);
451 if (r) {
452 close(fd);
453 errno = r;
454 twarn("falloc %s", f->path);
455 r = unlink(f->path);
456 if (r) {
457 twarn("unlink %s", f->path);
459 return;
462 n = write(fd, &ver, sizeof(int));
463 if (n < sizeof(int)) {
464 twarn("write %s", f->path);
465 close(fd);
466 return;
469 f->fd = fd;
470 f->iswopen = 1;
471 fileincref(f);
472 f->free = f->w->filesize - n;
473 f->resv = 0;
477 static int
478 filewrite(File *f, job j, void *buf, int len)
480 int r;
482 r = write(f->fd, buf, len);
483 if (r != len) {
484 twarn("write");
485 return 0;
488 f->w->resv -= r;
489 f->resv -= r;
490 j->walresv -= r;
491 j->walused += r;
492 f->w->alive += r;
493 return 1;
498 filewrjobshort(File *f, job j)
500 int r, nl;
502 nl = 0; // name len 0 indicates short record
503 r = filewrite(f, j, &nl, sizeof nl) &&
504 filewrite(f, j, &j->r, sizeof j->r);
505 if (!r) return 0;
507 if (j->r.state == Invalid) {
508 filermjob(j->file, j);
511 return r;
516 filewrjobfull(File *f, job j)
518 int nl;
520 fileaddjob(f, j);
521 nl = strlen(j->tube->name);
522 return
523 filewrite(f, j, &nl, sizeof nl) &&
524 filewrite(f, j, j->tube->name, nl) &&
525 filewrite(f, j, &j->r, sizeof j->r) &&
526 filewrite(f, j, j->body, j->r.body_size);
530 void
531 filewclose(File *f)
533 if (!f) return;
534 if (!f->iswopen) return;
535 if (f->free) {
536 (void)ftruncate(f->fd, f->w->filesize - f->free);
538 close(f->fd);
539 f->iswopen = 0;
540 filedecref(f);
545 fileinit(File *f, Wal *w, int n)
547 f->w = w;
548 f->seq = n;
549 f->path = fmtalloc("%s/binlog.%d", w->dir, n);
550 return !!f->path;
554 // Adds f to the linked list in w,
555 // updating w->tail and w->head as necessary.
556 Wal*
557 fileadd(File *f, Wal *w)
559 if (w->tail) {
560 w->tail->next = f;
562 w->tail = f;
563 if (!w->head) {
564 w->head = f;
566 w->nfile++;
567 return w;