13 static int reserve(Wal
*w
, int n
);
16 // Reads w->dir for files matching binlog.NNN,
17 // sets w->next to the next unused number, and
18 // returns the minimum number.
19 // If no files are found, sets w->next to 1 and
20 // returns a large number.
24 static char base
[] = "binlog.";
25 static const int len
= sizeof(base
) - 1;
36 while ((e
= readdir(d
))) {
37 if (strncmp(e
->d_name
, base
, len
) == 0) {
38 n
= strtol(e
->d_name
+len
, &p
, 10);
39 if (p
&& *p
== '\0') {
57 while (w
->head
&& !w
->head
->refs
) {
61 w
->tail
= f
->next
; // also, f->next == NULL
72 // returns 1 on success, 0 on error.
80 twarnx("there is no next wal file");
95 d
= w
->alive
+ w
->resv
;
96 n
= w
->nfile
*w
->filesz
- d
;
102 // Returns the number of bytes reserved or 0 on error.
104 walresvmigrate(Wal
*w
, job j
)
108 // reserve only space for the migrated full job record
109 // space for the delete is already reserved
111 z
+= strlen(j
->tube
->name
);
115 return reserve(w
, z
);
124 if (w
->head
== w
->cur
|| w
->head
->next
== w
->cur
) {
125 // no point in moving a job
129 j
= w
->head
->jlist
.fnext
;
130 if (!j
|| j
== &w
->head
->jlist
) {
131 // head holds no jlist; can't happen
132 twarnx("head holds no jlist");
136 if (!walresvmigrate(w
, j
)) {
137 // it will not fit, so we'll try again later
141 filermjob(w
->head
, j
);
152 for (r
=ratio(w
); r
>=2; r
--) {
164 if (w
->wantsync
&& now
>= w
->lastsync
+w
->syncrate
) {
166 if (fsync(w
->cur
->fd
) == -1) {
173 // Walwrite writes j to the log w (if w is enabled).
174 // On failure, walwrite disables w and returns 0; on success, it returns 1.
175 // Unlke walresv*, walwrite should never fail because of a full disk.
176 // If w is disabled, then walwrite takes no action and returns 1.
178 walwrite(Wal
*w
, job j
)
182 if (!w
->use
) return 1;
183 if (w
->cur
->resv
> 0 || usenext(w
)) {
185 r
= filewrjobshort(w
->cur
, j
);
187 r
= filewrjobfull(w
->cur
, j
);
222 if (!fileinit(f
, w
, w
->next
)) {
242 moveresv(File
*to
, File
*from
, int n
)
252 needfree(Wal
*w
, int n
)
254 if (w
->tail
->free
>= n
) return n
;
255 if (makenextfile(w
)) return n
;
261 // 1. b->resv is congruent to n (mod z).
262 // 2. x->resv is congruent to 0 (mod z) for each future file x.
263 // Assumes (and preserves) that b->resv >= n.
264 // Reserved space is conserved (neither created nor destroyed);
265 // we just move it around to preserve the invariant.
266 // We might have to allocate a new file.
267 // Returns 1 on success, otherwise 0. If there was a failure,
268 // w->tail is not updated.
270 balancerest(Wal
*w
, File
*b
, int n
)
273 static const int z
= sizeof(int) + sizeof(Jobrec
);
279 if (r
== 0) return balancerest(w
, b
->next
, 0);
282 if (w
->tail
->resv
>= c
&& b
->free
>= c
) {
283 moveresv(b
, w
->tail
, c
);
284 return balancerest(w
, b
->next
, 0);
287 if (needfree(w
, r
) != r
) {
291 moveresv(w
->tail
, b
, r
);
292 return balancerest(w
, b
->next
, 0);
297 // 1. w->cur->resv >= n.
298 // 2. w->cur->resv is congruent to n (mod z).
299 // 3. x->resv is congruent to 0 (mod z) for each future file x.
300 // (where z is the size of a delete record in the wal).
301 // Reserved space is conserved (neither created nor destroyed);
302 // we just move it around to preserve the invariant.
303 // We might have to allocate a new file.
304 // Returns 1 on success, otherwise 0. If there was a failure,
305 // w->tail is not updated.
307 balance(Wal
*w
, int n
)
312 // (this loop will run at most once)
313 while (w
->cur
->resv
< n
) {
314 int m
= w
->cur
->resv
;
322 moveresv(w
->tail
, w
->cur
, m
);
326 // Invariants 2 and 3
327 return balancerest(w
, w
->cur
, n
);
331 // Returns the number of bytes successfully reserved: either 0 or n.
333 reserve(Wal
*w
, int n
)
337 // return value must be nonzero but is otherwise ignored
338 if (!w
->use
) return 1;
340 if (w
->cur
->free
>= n
) {
356 if (!balance(w
, n
)) {
357 // error; undo the reservation
368 // Returns the number of bytes reserved or 0 on error.
370 walresvput(Wal
*w
, job j
)
374 // reserve space for the initial job record
376 z
+= strlen(j
->tube
->name
);
380 // plus space for a delete to come later
384 return reserve(w
, z
);
388 // Returns the number of bytes reserved or 0 on error.
390 walresvupdate(Wal
*w
, job j
)
396 return reserve(w
, z
);
400 // Returns the number of locks acquired: either 0 or 1.
409 r
= snprintf(path
, PATH_MAX
, "%s/lock", w
->dir
);
411 twarnx("path too long: %s/lock", w
->dir
);
415 fd
= open(path
, O_WRONLY
|O_CREAT
, 0600);
422 lk
.l_whence
= SEEK_SET
;
425 r
= fcntl(fd
, F_SETLK
, &lk
);
431 // intentionally leak fd, since we never want to close it
432 // and we'll never need it again
438 walread(Wal
*w
, job list
, int min
, int max
)
444 for (i
= min
; i
< w
->next
; i
++) {
451 if (!fileinit(f
, w
, i
)) {
457 fd
= open(f
->path
, O_RDONLY
);
459 twarn("%s", f
->path
);
467 err
|= fileread(f
, list
);
472 warnx("Errors reading one or more WAL files.");
473 warnx("Continuing. You may be missing data.");
479 walinit(Wal
*w
, job list
)
484 walread(w
, list
, min
, w
->next
);
486 // first writable file
487 if (!makenextfile(w
)) {
488 twarnx("makenextfile");