update redame
[beanstalkd.git] / walg.c
blobc5be6a1a2f1be0a8e98f919644ef91158b8bcd44
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6 #include <string.h>
7 #include <errno.h>
8 #include <dirent.h>
9 #include <sys/uio.h>
10 #include <sys/stat.h>
11 #include "dat.h"
14 // Reads w->dir for files matching binlog.NNN,
15 // sets w->next to the next unused number, and
16 // returns the minimum number.
17 // If no files are found, sets w->next to 1 and
18 // returns a large number.
19 static int
20 walscandir(Wal *w)
22 static char base[] = "binlog.";
23 static const int len = sizeof(base) - 1;
24 DIR *d;
25 struct dirent *e;
26 int min = 1<<30;
27 int max = 0;
28 int n;
29 char *p;
31 d = opendir(w->dir);
32 if (!d) return min;
34 while ((e = readdir(d))) {
35 if (strncmp(e->d_name, base, len) == 0) {
36 n = strtol(e->d_name+len, &p, 10);
37 if (p && *p == '\0') {
38 if (n > max) max = n;
39 if (n < min) min = n;
44 closedir(d);
45 w->next = max + 1;
46 return min;
50 void
51 walgc(Wal *w)
53 File *f;
55 while (w->head && !w->head->refs) {
56 f = w->head;
57 w->head = f->next;
58 if (w->tail == f) {
59 w->tail = f->next; // also, f->next == NULL
62 unlink(f->path);
63 free(f->path);
64 free(f);
69 // returns 1 on success, 0 on error.
70 static int
71 usenext(Wal *w)
73 File *f;
75 f = w->cur;
76 if (!f->next) {
77 twarnx("there is no next wal file");
78 return 0;
81 w->cur = f->next;
82 filewclose(f);
83 return 1;
87 static void
88 walsync(Wal *w)
90 int64 now;
92 now = nanoseconds();
93 if (w->wantsync && now >= w->lastsync+w->syncrate) {
94 w->lastsync = now;
95 if (fsync(w->cur->fd) == -1) {
96 twarn("fsync");
102 // Walwrite writes j to the log w (if w is enabled).
103 // On failure, walwrite disables w and returns 0; on success, it returns 1.
104 // Unlke walresv*, walwrite should never fail because of a full disk.
105 // If w is disabled, then walwrite takes no action and returns 1.
107 walwrite(Wal *w, job j)
109 int r = 0;
111 if (!w->use) return 1;
112 if (w->cur->resv > 0 || usenext(w)) {
113 if (j->file) {
114 r = filewrjobshort(w->cur, j);
115 } else {
116 r = filewrjobfull(w->cur, j);
119 if (!r) {
120 filewclose(w->cur);
121 w->use = 0;
122 return 0;
125 walsync(w);
126 return r;
130 static int
131 makenextfile(Wal *w)
133 File *f;
135 f = new(File);
136 if (!f) {
137 twarnx("OOM");
138 return 0;
141 if (!fileinit(f, w, w->next)) {
142 free(f);
143 twarnx("OOM");
144 return 0;
147 filewopen(f);
148 if (!f->iswopen) {
149 free(f->path);
150 free(f);
151 return 0;
154 w->next++;
155 fileadd(f, w);
156 return 1;
160 static void
161 moveresv(File *to, File *from, int n)
163 from->resv -= n;
164 from->free += n;
165 to->resv += n;
166 to->free -= n;
170 static int
171 needfree(Wal *w, int n)
173 if (w->tail->free >= n) return n;
174 if (makenextfile(w)) return n;
175 return 0;
179 // Ensures:
180 // 1. b->resv is congruent to n (mod z).
181 // 2. x->resv is congruent to 0 (mod z) for each future file x.
182 // Assumes (and preserves) that b->resv >= n.
183 // Reserved space is conserved (neither created nor destroyed);
184 // we just move it around to preserve the invariant.
185 // We might have to allocate a new file.
186 // Returns 1 on success, otherwise 0. If there was a failure,
187 // w->tail is not updated.
188 static int
189 balancerest(Wal *w, File *b, int n)
191 int rest, c, r;
192 static const int z = sizeof(int) + sizeof(Jobrec);
194 if (!b) return 1;
196 rest = b->resv - n;
197 r = rest % z;
198 if (r == 0) return balancerest(w, b->next, 0);
200 c = z - r;
201 if (w->tail->resv >= c && b->free >= c) {
202 moveresv(b, w->tail, c);
203 return balancerest(w, b->next, 0);
206 if (needfree(w, r) != r) {
207 twarnx("needfree");
208 return 0;
210 moveresv(w->tail, b, r);
211 return balancerest(w, b->next, 0);
215 // Ensures:
216 // 1. w->cur->resv >= n.
217 // 2. w->cur->resv is congruent to n (mod z).
218 // 3. x->resv is congruent to 0 (mod z) for each future file x.
219 // (where z is the size of a delete record in the wal).
220 // Reserved space is conserved (neither created nor destroyed);
221 // we just move it around to preserve the invariant.
222 // We might have to allocate a new file.
223 // Returns 1 on success, otherwise 0. If there was a failure,
224 // w->tail is not updated.
225 static int
226 balance(Wal *w, int n)
228 int r;
230 // Invariant 1
231 // (this loop will run at most once)
232 while (w->cur->resv < n) {
233 int m = w->cur->resv;
235 r = needfree(w, m);
236 if (r != m) {
237 twarnx("needfree");
238 return 0;
241 moveresv(w->tail, w->cur, m);
242 usenext(w);
245 // Invariants 2 and 3
246 return balancerest(w, w->cur, n);
250 // Returns the number of bytes successfully reserved: either 0 or n.
251 static int
252 reserve(Wal *w, int n)
254 int r;
256 // return value must be nonzero but is otherwise ignored
257 if (!w->use) return 1;
259 if (w->cur->free >= n) {
260 w->cur->free -= n;
261 w->cur->resv += n;
262 return n;
265 r = needfree(w, n);
266 if (r != n) {
267 twarnx("needfree");
268 return 0;
271 w->tail->free -= n;
272 w->tail->resv += n;
273 if (!balance(w, n)) {
274 // error; undo the reservation
275 w->tail->resv -= n;
276 w->tail->free += n;
277 return 0;
280 return n;
284 // Returns the number of bytes reserved or 0 on error.
286 walresvput(Wal *w, job j)
288 int z = 0;
290 // reserve space for the initial job record
291 z += sizeof(int);
292 z += strlen(j->tube->name);
293 z += sizeof(Jobrec);
294 z += j->r.body_size;
296 // plus space for a delete to come later
297 z += sizeof(int);
298 z += sizeof(Jobrec);
300 return reserve(w, z);
304 // Returns the number of bytes reserved or 0 on error.
306 walresvupdate(Wal *w, job j)
308 int z = 0;
310 z +=sizeof(int);
311 z +=sizeof(Jobrec);
312 return reserve(w, z);
316 // Returns the number of locks acquired: either 0 or 1.
318 waldirlock(Wal *w)
320 int r;
321 int fd;
322 struct flock lk;
323 char path[PATH_MAX];
325 r = snprintf(path, PATH_MAX, "%s/lock", w->dir);
326 if (r > PATH_MAX) {
327 twarnx("path too long: %s/lock", w->dir);
328 return 0;
331 fd = open(path, O_WRONLY|O_CREAT, 0600);
332 if (fd == -1) {
333 twarn("open");
334 return 0;
337 lk.l_type = F_WRLCK;
338 lk.l_whence = SEEK_SET;
339 lk.l_start = 0;
340 lk.l_len = 0;
341 r = fcntl(fd, F_SETLK, &lk);
342 if (r) {
343 twarn("fcntl");
344 return 0;
347 // intentionally leak fd, since we never want to close it
348 // and we'll never need it again
349 return 1;
353 void
354 walread(Wal *w, job list, int min, int max)
356 File *f;
357 int i, fd;
358 int err = 0;
360 for (i = min; i < w->next; i++) {
361 f = new(File);
362 if (!f) {
363 twarnx("OOM");
364 exit(1);
367 if (!fileinit(f, w, i)) {
368 free(f);
369 twarnx("OOM");
370 exit(1);
373 fd = open(f->path, O_RDONLY);
374 if (fd < 0) {
375 twarn("%s", f->path);
376 free(f->path);
377 free(f);
378 continue;
381 f->fd = fd;
382 fileadd(f, w);
383 err |= fileread(f, list);
384 close(fd);
387 if (err) {
388 warnx("Errors reading one or more WAL files.");
389 warnx("Continuing. You may be missing data.");
394 void
395 walinit(Wal *w, job list)
397 int min;
399 min = walscandir(w);
400 walread(w, list, min, w->next);
402 // first writable file
403 if (!makenextfile(w)) {
404 twarnx("makenextfile");
405 exit(1);
408 w->cur = w->tail;