document the new binlog stats
[beanstalkd.git] / walg.c
blobd8fccddc57911b616ac659d415f18d8e738b3c15
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <dirent.h>
9 #include <sys/uio.h>
10 #include <sys/stat.h>
11 #include "dat.h"
13 static int reserve(Wal *w, int n);
16 // Reads w->dir for files matching binlog.NNN,
17 // sets w->next to the next unused number, and
18 // returns the minimum number.
19 // If no files are found, sets w->next to 1 and
20 // returns a large number.
21 static int
22 walscandir(Wal *w)
24 static char base[] = "binlog.";
25 static const int len = sizeof(base) - 1;
26 DIR *d;
27 struct dirent *e;
28 int min = 1<<30;
29 int max = 0;
30 int n;
31 char *p;
33 d = opendir(w->dir);
34 if (!d) return min;
36 while ((e = readdir(d))) {
37 if (strncmp(e->d_name, base, len) == 0) {
38 n = strtol(e->d_name+len, &p, 10);
39 if (p && *p == '\0') {
40 if (n > max) max = n;
41 if (n < min) min = n;
46 closedir(d);
47 w->next = max + 1;
48 return min;
52 void
53 walgc(Wal *w)
55 File *f;
57 while (w->head && !w->head->refs) {
58 f = w->head;
59 w->head = f->next;
60 if (w->tail == f) {
61 w->tail = f->next; // also, f->next == NULL
64 w->nfile--;
65 unlink(f->path);
66 free(f->path);
67 free(f);
72 // returns 1 on success, 0 on error.
73 static int
74 usenext(Wal *w)
76 File *f;
78 f = w->cur;
79 if (!f->next) {
80 twarnx("there is no next wal file");
81 return 0;
84 w->cur = f->next;
85 filewclose(f);
86 return 1;
90 static int
91 ratio(Wal *w)
93 int n, d;
95 d = w->alive + w->resv;
96 n = w->nfile*w->filesz - d;
97 if (!d) return 0;
98 return n / d;
102 // Returns the number of bytes reserved or 0 on error.
103 static int
104 walresvmigrate(Wal *w, job j)
106 int z = 0;
108 // reserve only space for the migrated full job record
109 // space for the delete is already reserved
110 z += sizeof(int);
111 z += strlen(j->tube->name);
112 z += sizeof(Jobrec);
113 z += j->r.body_size;
115 return reserve(w, z);
119 static void
120 moveone(Wal *w)
122 job j;
124 if (w->head == w->cur || w->head->next == w->cur) {
125 // no point in moving a job
126 return;
129 j = w->head->jlist.fnext;
130 if (!j || j == &w->head->jlist) {
131 // head holds no jlist; can't happen
132 twarnx("head holds no jlist");
133 return;
136 if (!walresvmigrate(w, j)) {
137 // it will not fit, so we'll try again later
138 return;
141 filermjob(w->head, j);
142 w->nmig++;
143 walwrite(w, j);
147 static void
148 walcompact(Wal *w)
150 int r;
152 for (r=ratio(w); r>=2; r--) {
153 moveone(w);
158 static void
159 walsync(Wal *w)
161 int64 now;
163 now = nanoseconds();
164 if (w->wantsync && now >= w->lastsync+w->syncrate) {
165 w->lastsync = now;
166 if (fsync(w->cur->fd) == -1) {
167 twarn("fsync");
173 // Walwrite writes j to the log w (if w is enabled).
174 // On failure, walwrite disables w and returns 0; on success, it returns 1.
175 // Unlke walresv*, walwrite should never fail because of a full disk.
176 // If w is disabled, then walwrite takes no action and returns 1.
178 walwrite(Wal *w, job j)
180 int r = 0;
182 if (!w->use) return 1;
183 if (w->cur->resv > 0 || usenext(w)) {
184 if (j->file) {
185 r = filewrjobshort(w->cur, j);
186 } else {
187 r = filewrjobfull(w->cur, j);
190 if (!r) {
191 filewclose(w->cur);
192 w->use = 0;
194 w->nrec++;
195 return r;
199 void
200 walmaint(Wal *w)
202 if (w->use) {
203 if (!w->nocomp) {
204 walcompact(w);
206 walsync(w);
211 static int
212 makenextfile(Wal *w)
214 File *f;
216 f = new(File);
217 if (!f) {
218 twarnx("OOM");
219 return 0;
222 if (!fileinit(f, w, w->next)) {
223 free(f);
224 twarnx("OOM");
225 return 0;
228 filewopen(f);
229 if (!f->iswopen) {
230 free(f->path);
231 free(f);
232 return 0;
235 w->next++;
236 fileadd(f, w);
237 return 1;
241 static void
242 moveresv(File *to, File *from, int n)
244 from->resv -= n;
245 from->free += n;
246 to->resv += n;
247 to->free -= n;
251 static int
252 needfree(Wal *w, int n)
254 if (w->tail->free >= n) return n;
255 if (makenextfile(w)) return n;
256 return 0;
260 // Ensures:
261 // 1. b->resv is congruent to n (mod z).
262 // 2. x->resv is congruent to 0 (mod z) for each future file x.
263 // Assumes (and preserves) that b->resv >= n.
264 // Reserved space is conserved (neither created nor destroyed);
265 // we just move it around to preserve the invariant.
266 // We might have to allocate a new file.
267 // Returns 1 on success, otherwise 0. If there was a failure,
268 // w->tail is not updated.
269 static int
270 balancerest(Wal *w, File *b, int n)
272 int rest, c, r;
273 static const int z = sizeof(int) + sizeof(Jobrec);
275 if (!b) return 1;
277 rest = b->resv - n;
278 r = rest % z;
279 if (r == 0) return balancerest(w, b->next, 0);
281 c = z - r;
282 if (w->tail->resv >= c && b->free >= c) {
283 moveresv(b, w->tail, c);
284 return balancerest(w, b->next, 0);
287 if (needfree(w, r) != r) {
288 twarnx("needfree");
289 return 0;
291 moveresv(w->tail, b, r);
292 return balancerest(w, b->next, 0);
296 // Ensures:
297 // 1. w->cur->resv >= n.
298 // 2. w->cur->resv is congruent to n (mod z).
299 // 3. x->resv is congruent to 0 (mod z) for each future file x.
300 // (where z is the size of a delete record in the wal).
301 // Reserved space is conserved (neither created nor destroyed);
302 // we just move it around to preserve the invariant.
303 // We might have to allocate a new file.
304 // Returns 1 on success, otherwise 0. If there was a failure,
305 // w->tail is not updated.
306 static int
307 balance(Wal *w, int n)
309 int r;
311 // Invariant 1
312 // (this loop will run at most once)
313 while (w->cur->resv < n) {
314 int m = w->cur->resv;
316 r = needfree(w, m);
317 if (r != m) {
318 twarnx("needfree");
319 return 0;
322 moveresv(w->tail, w->cur, m);
323 usenext(w);
326 // Invariants 2 and 3
327 return balancerest(w, w->cur, n);
331 // Returns the number of bytes successfully reserved: either 0 or n.
332 static int
333 reserve(Wal *w, int n)
335 int r;
337 // return value must be nonzero but is otherwise ignored
338 if (!w->use) return 1;
340 if (w->cur->free >= n) {
341 w->cur->free -= n;
342 w->cur->resv += n;
343 w->resv += n;
344 return n;
347 r = needfree(w, n);
348 if (r != n) {
349 twarnx("needfree");
350 return 0;
353 w->tail->free -= n;
354 w->tail->resv += n;
355 w->resv += n;
356 if (!balance(w, n)) {
357 // error; undo the reservation
358 w->resv -= n;
359 w->tail->resv -= n;
360 w->tail->free += n;
361 return 0;
364 return n;
368 // Returns the number of bytes reserved or 0 on error.
370 walresvput(Wal *w, job j)
372 int z = 0;
374 // reserve space for the initial job record
375 z += sizeof(int);
376 z += strlen(j->tube->name);
377 z += sizeof(Jobrec);
378 z += j->r.body_size;
380 // plus space for a delete to come later
381 z += sizeof(int);
382 z += sizeof(Jobrec);
384 return reserve(w, z);
388 // Returns the number of bytes reserved or 0 on error.
390 walresvupdate(Wal *w, job j)
392 int z = 0;
394 z +=sizeof(int);
395 z +=sizeof(Jobrec);
396 return reserve(w, z);
400 // Returns the number of locks acquired: either 0 or 1.
402 waldirlock(Wal *w)
404 int r;
405 int fd;
406 struct flock lk;
407 char path[PATH_MAX];
409 r = snprintf(path, PATH_MAX, "%s/lock", w->dir);
410 if (r > PATH_MAX) {
411 twarnx("path too long: %s/lock", w->dir);
412 return 0;
415 fd = open(path, O_WRONLY|O_CREAT, 0600);
416 if (fd == -1) {
417 twarn("open");
418 return 0;
421 lk.l_type = F_WRLCK;
422 lk.l_whence = SEEK_SET;
423 lk.l_start = 0;
424 lk.l_len = 0;
425 r = fcntl(fd, F_SETLK, &lk);
426 if (r) {
427 twarn("fcntl");
428 return 0;
431 // intentionally leak fd, since we never want to close it
432 // and we'll never need it again
433 return 1;
437 void
438 walread(Wal *w, job list, int min, int max)
440 File *f;
441 int i, fd;
442 int err = 0;
444 for (i = min; i < w->next; i++) {
445 f = new(File);
446 if (!f) {
447 twarnx("OOM");
448 exit(1);
451 if (!fileinit(f, w, i)) {
452 free(f);
453 twarnx("OOM");
454 exit(1);
457 fd = open(f->path, O_RDONLY);
458 if (fd < 0) {
459 twarn("%s", f->path);
460 free(f->path);
461 free(f);
462 continue;
465 f->fd = fd;
466 fileadd(f, w);
467 err |= fileread(f, list);
468 close(fd);
471 if (err) {
472 warnx("Errors reading one or more WAL files.");
473 warnx("Continuing. You may be missing data.");
478 void
479 walinit(Wal *w, job list)
481 int min;
483 min = walscandir(w);
484 walread(w, list, min, w->next);
486 // first writable file
487 if (!makenextfile(w)) {
488 twarnx("makenextfile");
489 exit(1);
492 w->cur = w->tail;