add changelog for 1.13
[beanstalkd.git] / walg.c
blob32f2144c4ba310b084c6c4d8c6c81e7b4a7d62f6
1 #include "dat.h"
2 #include <stdint.h>
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <fcntl.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <errno.h>
9 #include <dirent.h>
10 #include <sys/uio.h>
11 #include <sys/stat.h>
12 #include <limits.h>
14 static int reserve(Wal *w, int n);
17 // Reads w->dir for files matching binlog.NNN,
18 // sets w->next to the next unused number, and
19 // returns the minimum number.
20 // If no files are found, sets w->next to 1 and
21 // returns a large number.
22 static int
23 walscandir(Wal *w)
25 static char base[] = "binlog.";
26 static const int len = sizeof(base) - 1;
27 DIR *d;
28 struct dirent *e;
29 int min = 1<<30;
30 int max = 0;
31 int n;
32 char *p;
34 d = opendir(w->dir);
35 if (!d) return min;
37 while ((e = readdir(d))) {
38 if (strncmp(e->d_name, base, len) == 0) {
39 n = strtol(e->d_name+len, &p, 10);
40 if (p && *p == '\0') {
41 if (n > max) max = n;
42 if (n < min) min = n;
47 closedir(d);
48 w->next = max + 1;
49 return min;
53 void
54 walgc(Wal *w)
56 File *f;
58 while (w->head && !w->head->refs) {
59 f = w->head;
60 w->head = f->next;
61 if (w->tail == f) {
62 w->tail = f->next; // also, f->next == NULL
65 w->nfile--;
66 unlink(f->path);
67 free(f->path);
68 free(f);
73 // returns 1 on success, 0 on error.
74 static int
75 usenext(Wal *w)
77 File *f;
79 f = w->cur;
80 if (!f->next) {
81 twarnx("there is no next wal file");
82 return 0;
85 w->cur = f->next;
86 filewclose(f);
87 return 1;
91 static int
92 ratio(Wal *w)
94 int64 n, d;
96 d = w->alive + w->resv;
97 n = (int64)w->nfile * (int64)w->filesize - d;
98 if (!d) return 0;
99 return n / d;
103 // Returns the number of bytes reserved or 0 on error.
104 static int
105 walresvmigrate(Wal *w, Job *j)
107 int z = 0;
109 // reserve only space for the migrated full job record
110 // space for the delete is already reserved
111 z += sizeof(int);
112 z += strlen(j->tube->name);
113 z += sizeof(Jobrec);
114 z += j->r.body_size;
116 return reserve(w, z);
120 static void
121 moveone(Wal *w)
123 Job *j;
125 if (w->head == w->cur || w->head->next == w->cur) {
126 // no point in moving a job
127 return;
130 j = w->head->jlist.fnext;
131 if (!j || j == &w->head->jlist) {
132 // head holds no jlist; can't happen
133 twarnx("head holds no jlist");
134 return;
137 if (!walresvmigrate(w, j)) {
138 // it will not fit, so we'll try again later
139 return;
142 filermjob(w->head, j);
143 w->nmig++;
144 walwrite(w, j);
148 static void
149 walcompact(Wal *w)
151 int r;
153 for (r=ratio(w); r>=2; r--) {
154 moveone(w);
159 static void
160 walsync(Wal *w)
162 int64 now;
164 now = nanoseconds();
165 if (w->wantsync && now >= w->lastsync+w->syncrate) {
166 w->lastsync = now;
167 if (fsync(w->cur->fd) == -1) {
168 twarn("fsync");
174 // Walwrite writes j to the log w (if w is enabled).
175 // On failure, walwrite disables w and returns 0; on success, it returns 1.
176 // Unlke walresv*, walwrite should never fail because of a full disk.
177 // If w is disabled, then walwrite takes no action and returns 1.
179 walwrite(Wal *w, Job *j)
181 int r = 0;
183 if (!w->use) return 1;
184 if (w->cur->resv > 0 || usenext(w)) {
185 if (j->file) {
186 r = filewrjobshort(w->cur, j);
187 } else {
188 r = filewrjobfull(w->cur, j);
191 if (!r) {
192 filewclose(w->cur);
193 w->use = 0;
195 w->nrec++;
196 return r;
200 void
201 walmaint(Wal *w)
203 if (w->use) {
204 walcompact(w);
205 walsync(w);
210 static int
211 makenextfile(Wal *w)
213 File *f;
215 f = new(File);
216 if (!f) {
217 twarnx("OOM");
218 return 0;
221 if (!fileinit(f, w, w->next)) {
222 free(f);
223 twarnx("OOM");
224 return 0;
227 filewopen(f);
228 if (!f->iswopen) {
229 free(f->path);
230 free(f);
231 return 0;
234 w->next++;
235 fileadd(f, w);
236 return 1;
240 static void
241 moveresv(File *to, File *from, int n)
243 from->resv -= n;
244 from->free += n;
245 to->resv += n;
246 to->free -= n;
250 static int
251 needfree(Wal *w, int n)
253 if (w->tail->free >= n) return n;
254 if (makenextfile(w)) return n;
255 return 0;
259 // Ensures:
260 // 1. b->resv is congruent to n (mod z).
261 // 2. x->resv is congruent to 0 (mod z) for each future file x.
262 // Assumes (and preserves) that b->resv >= n.
263 // Reserved space is conserved (neither created nor destroyed);
264 // we just move it around to preserve the invariant.
265 // We might have to allocate a new file.
266 // Returns 1 on success, otherwise 0. If there was a failure,
267 // w->tail is not updated.
268 static int
269 balancerest(Wal *w, File *b, int n)
271 int rest, c, r;
272 static const int z = sizeof(int) + sizeof(Jobrec);
274 if (!b) return 1;
276 rest = b->resv - n;
277 r = rest % z;
278 if (r == 0) return balancerest(w, b->next, 0);
280 c = z - r;
281 if (w->tail->resv >= c && b->free >= c) {
282 moveresv(b, w->tail, c);
283 return balancerest(w, b->next, 0);
286 if (needfree(w, r) != r) {
287 twarnx("needfree");
288 return 0;
290 moveresv(w->tail, b, r);
291 return balancerest(w, b->next, 0);
295 // Ensures:
296 // 1. w->cur->resv >= n.
297 // 2. w->cur->resv is congruent to n (mod z).
298 // 3. x->resv is congruent to 0 (mod z) for each future file x.
299 // (where z is the size of a delete record in the wal).
300 // Reserved space is conserved (neither created nor destroyed);
301 // we just move it around to preserve the invariant.
302 // We might have to allocate a new file.
303 // Returns 1 on success, otherwise 0. If there was a failure,
304 // w->tail is not updated.
305 static int
306 balance(Wal *w, int n)
308 // Invariant 1
309 // (this loop will run at most once)
310 while (w->cur->resv < n) {
311 int m = w->cur->resv;
313 int r = needfree(w, m);
314 if (r != m) {
315 twarnx("needfree");
316 return 0;
319 moveresv(w->tail, w->cur, m);
320 usenext(w);
323 // Invariants 2 and 3
324 return balancerest(w, w->cur, n);
328 // Returns the number of bytes successfully reserved: either 0 or n.
329 static int
330 reserve(Wal *w, int n)
332 int r;
334 // return value must be nonzero but is otherwise ignored
335 if (!w->use) return 1;
337 if (w->cur->free >= n) {
338 w->cur->free -= n;
339 w->cur->resv += n;
340 w->resv += n;
341 return n;
344 r = needfree(w, n);
345 if (r != n) {
346 twarnx("needfree");
347 return 0;
350 w->tail->free -= n;
351 w->tail->resv += n;
352 w->resv += n;
353 if (!balance(w, n)) {
354 // error; undo the reservation
355 w->resv -= n;
356 w->tail->resv -= n;
357 w->tail->free += n;
358 return 0;
361 return n;
365 // Returns the number of bytes reserved or 0 on error.
367 walresvput(Wal *w, Job *j)
369 int z = 0;
371 // reserve space for the initial job record
372 z += sizeof(int);
373 z += strlen(j->tube->name);
374 z += sizeof(Jobrec);
375 z += j->r.body_size;
377 // plus space for a delete to come later
378 z += sizeof(int);
379 z += sizeof(Jobrec);
381 return reserve(w, z);
385 // Returns the number of bytes reserved or 0 on error.
387 walresvupdate(Wal *w)
389 int z = 0;
390 z +=sizeof(int);
391 z +=sizeof(Jobrec);
392 return reserve(w, z);
396 // Returns the number of locks acquired: either 0 or 1.
398 waldirlock(Wal *w)
400 int r;
401 int fd;
402 struct flock lk;
403 char *path;
404 size_t path_length;
406 path_length = strlen(w->dir) + strlen("/lock") + 1;
407 if ((path = malloc(path_length)) == NULL) {
408 twarn("malloc");
409 return 0;
411 snprintf(path, path_length, "%s/lock", w->dir);
413 fd = open(path, O_WRONLY|O_CREAT, 0600);
414 free(path);
415 if (fd == -1) {
416 twarn("open");
417 return 0;
420 lk.l_type = F_WRLCK;
421 lk.l_whence = SEEK_SET;
422 lk.l_start = 0;
423 lk.l_len = 0;
424 r = fcntl(fd, F_SETLK, &lk);
425 if (r) {
426 twarn("fcntl");
427 return 0;
430 // intentionally leak fd, since we never want to close it
431 // and we'll never need it again
432 return 1;
436 void
437 walread(Wal *w, Job *list, int min)
439 int i;
440 int err = 0;
442 for (i = min; i < w->next; i++) {
443 File *f = new(File);
444 if (!f) {
445 twarnx("OOM");
446 exit(1);
449 if (!fileinit(f, w, i)) {
450 free(f);
451 twarnx("OOM");
452 exit(1);
455 int fd = open(f->path, O_RDONLY);
456 if (fd < 0) {
457 twarn("open %s", f->path);
458 free(f->path);
459 free(f);
460 continue;
463 f->fd = fd;
464 fileadd(f, w);
465 err |= fileread(f, list);
466 if (close(fd) == -1)
467 twarn("close");
470 if (err) {
471 warnx("Errors reading one or more WAL files.");
472 warnx("Continuing. You may be missing data.");
477 void
478 walinit(Wal *w, Job *list)
480 int min;
482 min = walscandir(w);
483 walread(w, list, min);
485 // first writable file
486 if (!makenextfile(w)) {
487 twarnx("makenextfile");
488 exit(1);
491 w->cur = w->tail;