delete unused param max in walread
[beanstalkd.git] / walg.c
blobb6ef6d3e2b048df05ad35529474cd4d689f67dc6
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <dirent.h>
9 #include <sys/uio.h>
10 #include <sys/stat.h>
11 #include <limits.h>
12 #include "dat.h"
14 static int reserve(Wal *w, int n);
17 // Reads w->dir for files matching binlog.NNN,
18 // sets w->next to the next unused number, and
19 // returns the minimum number.
20 // If no files are found, sets w->next to 1 and
21 // returns a large number.
22 static int
23 walscandir(Wal *w)
25 static char base[] = "binlog.";
26 static const int len = sizeof(base) - 1;
27 DIR *d;
28 struct dirent *e;
29 int min = 1<<30;
30 int max = 0;
31 int n;
32 char *p;
34 d = opendir(w->dir);
35 if (!d) return min;
37 while ((e = readdir(d))) {
38 if (strncmp(e->d_name, base, len) == 0) {
39 n = strtol(e->d_name+len, &p, 10);
40 if (p && *p == '\0') {
41 if (n > max) max = n;
42 if (n < min) min = n;
47 closedir(d);
48 w->next = max + 1;
49 return min;
53 void
54 walgc(Wal *w)
56 File *f;
58 while (w->head && !w->head->refs) {
59 f = w->head;
60 w->head = f->next;
61 if (w->tail == f) {
62 w->tail = f->next; // also, f->next == NULL
65 w->nfile--;
66 unlink(f->path);
67 free(f->path);
68 free(f);
73 // returns 1 on success, 0 on error.
74 static int
75 usenext(Wal *w)
77 File *f;
79 f = w->cur;
80 if (!f->next) {
81 twarnx("there is no next wal file");
82 return 0;
85 w->cur = f->next;
86 filewclose(f);
87 return 1;
91 static int
92 ratio(Wal *w)
94 int64 n, d;
96 d = w->alive + w->resv;
97 n = (int64)w->nfile * (int64)w->filesize - d;
98 if (!d) return 0;
99 return n / d;
103 // Returns the number of bytes reserved or 0 on error.
104 static int
105 walresvmigrate(Wal *w, job j)
107 int z = 0;
109 // reserve only space for the migrated full job record
110 // space for the delete is already reserved
111 z += sizeof(int);
112 z += strlen(j->tube->name);
113 z += sizeof(Jobrec);
114 z += j->r.body_size;
116 return reserve(w, z);
120 static void
121 moveone(Wal *w)
123 job j;
125 if (w->head == w->cur || w->head->next == w->cur) {
126 // no point in moving a job
127 return;
130 j = w->head->jlist.fnext;
131 if (!j || j == &w->head->jlist) {
132 // head holds no jlist; can't happen
133 twarnx("head holds no jlist");
134 return;
137 if (!walresvmigrate(w, j)) {
138 // it will not fit, so we'll try again later
139 return;
142 filermjob(w->head, j);
143 w->nmig++;
144 walwrite(w, j);
148 static void
149 walcompact(Wal *w)
151 int r;
153 for (r=ratio(w); r>=2; r--) {
154 moveone(w);
159 static void
160 walsync(Wal *w)
162 int64 now;
164 now = nanoseconds();
165 if (w->wantsync && now >= w->lastsync+w->syncrate) {
166 w->lastsync = now;
167 if (fsync(w->cur->fd) == -1) {
168 twarn("fsync");
174 // Walwrite writes j to the log w (if w is enabled).
175 // On failure, walwrite disables w and returns 0; on success, it returns 1.
176 // Unlke walresv*, walwrite should never fail because of a full disk.
177 // If w is disabled, then walwrite takes no action and returns 1.
179 walwrite(Wal *w, job j)
181 int r = 0;
183 if (!w->use) return 1;
184 if (w->cur->resv > 0 || usenext(w)) {
185 if (j->file) {
186 r = filewrjobshort(w->cur, j);
187 } else {
188 r = filewrjobfull(w->cur, j);
191 if (!r) {
192 filewclose(w->cur);
193 w->use = 0;
195 w->nrec++;
196 return r;
200 void
201 walmaint(Wal *w)
203 if (w->use) {
204 if (!w->nocomp) {
205 walcompact(w);
207 walsync(w);
212 static int
213 makenextfile(Wal *w)
215 File *f;
217 f = new(File);
218 if (!f) {
219 twarnx("OOM");
220 return 0;
223 if (!fileinit(f, w, w->next)) {
224 free(f);
225 twarnx("OOM");
226 return 0;
229 filewopen(f);
230 if (!f->iswopen) {
231 free(f->path);
232 free(f);
233 return 0;
236 w->next++;
237 fileadd(f, w);
238 return 1;
242 static void
243 moveresv(File *to, File *from, int n)
245 from->resv -= n;
246 from->free += n;
247 to->resv += n;
248 to->free -= n;
252 static int
253 needfree(Wal *w, int n)
255 if (w->tail->free >= n) return n;
256 if (makenextfile(w)) return n;
257 return 0;
261 // Ensures:
262 // 1. b->resv is congruent to n (mod z).
263 // 2. x->resv is congruent to 0 (mod z) for each future file x.
264 // Assumes (and preserves) that b->resv >= n.
265 // Reserved space is conserved (neither created nor destroyed);
266 // we just move it around to preserve the invariant.
267 // We might have to allocate a new file.
268 // Returns 1 on success, otherwise 0. If there was a failure,
269 // w->tail is not updated.
270 static int
271 balancerest(Wal *w, File *b, int n)
273 int rest, c, r;
274 static const int z = sizeof(int) + sizeof(Jobrec);
276 if (!b) return 1;
278 rest = b->resv - n;
279 r = rest % z;
280 if (r == 0) return balancerest(w, b->next, 0);
282 c = z - r;
283 if (w->tail->resv >= c && b->free >= c) {
284 moveresv(b, w->tail, c);
285 return balancerest(w, b->next, 0);
288 if (needfree(w, r) != r) {
289 twarnx("needfree");
290 return 0;
292 moveresv(w->tail, b, r);
293 return balancerest(w, b->next, 0);
297 // Ensures:
298 // 1. w->cur->resv >= n.
299 // 2. w->cur->resv is congruent to n (mod z).
300 // 3. x->resv is congruent to 0 (mod z) for each future file x.
301 // (where z is the size of a delete record in the wal).
302 // Reserved space is conserved (neither created nor destroyed);
303 // we just move it around to preserve the invariant.
304 // We might have to allocate a new file.
305 // Returns 1 on success, otherwise 0. If there was a failure,
306 // w->tail is not updated.
307 static int
308 balance(Wal *w, int n)
310 int r;
312 // Invariant 1
313 // (this loop will run at most once)
314 while (w->cur->resv < n) {
315 int m = w->cur->resv;
317 r = needfree(w, m);
318 if (r != m) {
319 twarnx("needfree");
320 return 0;
323 moveresv(w->tail, w->cur, m);
324 usenext(w);
327 // Invariants 2 and 3
328 return balancerest(w, w->cur, n);
332 // Returns the number of bytes successfully reserved: either 0 or n.
333 static int
334 reserve(Wal *w, int n)
336 int r;
338 // return value must be nonzero but is otherwise ignored
339 if (!w->use) return 1;
341 if (w->cur->free >= n) {
342 w->cur->free -= n;
343 w->cur->resv += n;
344 w->resv += n;
345 return n;
348 r = needfree(w, n);
349 if (r != n) {
350 twarnx("needfree");
351 return 0;
354 w->tail->free -= n;
355 w->tail->resv += n;
356 w->resv += n;
357 if (!balance(w, n)) {
358 // error; undo the reservation
359 w->resv -= n;
360 w->tail->resv -= n;
361 w->tail->free += n;
362 return 0;
365 return n;
369 // Returns the number of bytes reserved or 0 on error.
371 walresvput(Wal *w, job j)
373 int z = 0;
375 // reserve space for the initial job record
376 z += sizeof(int);
377 z += strlen(j->tube->name);
378 z += sizeof(Jobrec);
379 z += j->r.body_size;
381 // plus space for a delete to come later
382 z += sizeof(int);
383 z += sizeof(Jobrec);
385 return reserve(w, z);
389 // Returns the number of bytes reserved or 0 on error.
391 walresvupdate(Wal *w, job j)
393 int z = 0;
395 z +=sizeof(int);
396 z +=sizeof(Jobrec);
397 return reserve(w, z);
401 // Returns the number of locks acquired: either 0 or 1.
403 waldirlock(Wal *w)
405 int r;
406 int fd;
407 struct flock lk;
408 char *path;
409 size_t path_length;
411 path_length = strlen(w->dir) + strlen("/lock") + 1;
412 if ((path = malloc(path_length)) == NULL) {
413 twarn("malloc");
414 return 0;
416 r = snprintf(path, path_length, "%s/lock", w->dir);
418 fd = open(path, O_WRONLY|O_CREAT, 0600);
419 free(path);
420 if (fd == -1) {
421 twarn("open");
422 return 0;
425 lk.l_type = F_WRLCK;
426 lk.l_whence = SEEK_SET;
427 lk.l_start = 0;
428 lk.l_len = 0;
429 r = fcntl(fd, F_SETLK, &lk);
430 if (r) {
431 twarn("fcntl");
432 return 0;
435 // intentionally leak fd, since we never want to close it
436 // and we'll never need it again
437 return 1;
441 void
442 walread(Wal *w, job list, int min)
444 File *f;
445 int i, fd;
446 int err = 0;
448 for (i = min; i < w->next; i++) {
449 f = new(File);
450 if (!f) {
451 twarnx("OOM");
452 exit(1);
455 if (!fileinit(f, w, i)) {
456 free(f);
457 twarnx("OOM");
458 exit(1);
461 fd = open(f->path, O_RDONLY);
462 if (fd < 0) {
463 twarn("open %s", f->path);
464 free(f->path);
465 free(f);
466 continue;
469 f->fd = fd;
470 fileadd(f, w);
471 err |= fileread(f, list);
472 close(fd);
475 if (err) {
476 warnx("Errors reading one or more WAL files.");
477 warnx("Continuing. You may be missing data.");
482 void
483 walinit(Wal *w, job list)
485 int min;
487 min = walscandir(w);
488 walread(w, list, min);
490 // first writable file
491 if (!makenextfile(w)) {
492 twarnx("makenextfile");
493 exit(1);
496 w->cur = w->tail;