14 static int reserve(Wal
*w
, int n
);
17 // Reads w->dir for files matching binlog.NNN,
18 // sets w->next to the next unused number, and
19 // returns the minimum number.
20 // If no files are found, sets w->next to 1 and
21 // returns a large number.
25 static char base
[] = "binlog.";
26 static const int len
= sizeof(base
) - 1;
37 while ((e
= readdir(d
))) {
38 if (strncmp(e
->d_name
, base
, len
) == 0) {
39 n
= strtol(e
->d_name
+len
, &p
, 10);
40 if (p
&& *p
== '\0') {
58 while (w
->head
&& !w
->head
->refs
) {
62 w
->tail
= f
->next
; // also, f->next == NULL
73 // returns 1 on success, 0 on error.
81 twarnx("there is no next wal file");
96 d
= w
->alive
+ w
->resv
;
97 n
= w
->nfile
*w
->filesize
- d
;
103 // Returns the number of bytes reserved or 0 on error.
105 walresvmigrate(Wal
*w
, job j
)
109 // reserve only space for the migrated full job record
110 // space for the delete is already reserved
112 z
+= strlen(j
->tube
->name
);
116 return reserve(w
, z
);
125 if (w
->head
== w
->cur
|| w
->head
->next
== w
->cur
) {
126 // no point in moving a job
130 j
= w
->head
->jlist
.fnext
;
131 if (!j
|| j
== &w
->head
->jlist
) {
132 // head holds no jlist; can't happen
133 twarnx("head holds no jlist");
137 if (!walresvmigrate(w
, j
)) {
138 // it will not fit, so we'll try again later
142 filermjob(w
->head
, j
);
153 for (r
=ratio(w
); r
>=2; r
--) {
165 if (w
->wantsync
&& now
>= w
->lastsync
+w
->syncrate
) {
167 if (fsync(w
->cur
->fd
) == -1) {
174 // Walwrite writes j to the log w (if w is enabled).
175 // On failure, walwrite disables w and returns 0; on success, it returns 1.
176 // Unlke walresv*, walwrite should never fail because of a full disk.
177 // If w is disabled, then walwrite takes no action and returns 1.
179 walwrite(Wal
*w
, job j
)
183 if (!w
->use
) return 1;
184 if (w
->cur
->resv
> 0 || usenext(w
)) {
186 r
= filewrjobshort(w
->cur
, j
);
188 r
= filewrjobfull(w
->cur
, j
);
223 if (!fileinit(f
, w
, w
->next
)) {
243 moveresv(File
*to
, File
*from
, int n
)
253 needfree(Wal
*w
, int n
)
255 if (w
->tail
->free
>= n
) return n
;
256 if (makenextfile(w
)) return n
;
262 // 1. b->resv is congruent to n (mod z).
263 // 2. x->resv is congruent to 0 (mod z) for each future file x.
264 // Assumes (and preserves) that b->resv >= n.
265 // Reserved space is conserved (neither created nor destroyed);
266 // we just move it around to preserve the invariant.
267 // We might have to allocate a new file.
268 // Returns 1 on success, otherwise 0. If there was a failure,
269 // w->tail is not updated.
271 balancerest(Wal
*w
, File
*b
, int n
)
274 static const int z
= sizeof(int) + sizeof(Jobrec
);
280 if (r
== 0) return balancerest(w
, b
->next
, 0);
283 if (w
->tail
->resv
>= c
&& b
->free
>= c
) {
284 moveresv(b
, w
->tail
, c
);
285 return balancerest(w
, b
->next
, 0);
288 if (needfree(w
, r
) != r
) {
292 moveresv(w
->tail
, b
, r
);
293 return balancerest(w
, b
->next
, 0);
298 // 1. w->cur->resv >= n.
299 // 2. w->cur->resv is congruent to n (mod z).
300 // 3. x->resv is congruent to 0 (mod z) for each future file x.
301 // (where z is the size of a delete record in the wal).
302 // Reserved space is conserved (neither created nor destroyed);
303 // we just move it around to preserve the invariant.
304 // We might have to allocate a new file.
305 // Returns 1 on success, otherwise 0. If there was a failure,
306 // w->tail is not updated.
308 balance(Wal
*w
, int n
)
313 // (this loop will run at most once)
314 while (w
->cur
->resv
< n
) {
315 int m
= w
->cur
->resv
;
323 moveresv(w
->tail
, w
->cur
, m
);
327 // Invariants 2 and 3
328 return balancerest(w
, w
->cur
, n
);
332 // Returns the number of bytes successfully reserved: either 0 or n.
334 reserve(Wal
*w
, int n
)
338 // return value must be nonzero but is otherwise ignored
339 if (!w
->use
) return 1;
341 if (w
->cur
->free
>= n
) {
357 if (!balance(w
, n
)) {
358 // error; undo the reservation
369 // Returns the number of bytes reserved or 0 on error.
371 walresvput(Wal
*w
, job j
)
375 // reserve space for the initial job record
377 z
+= strlen(j
->tube
->name
);
381 // plus space for a delete to come later
385 return reserve(w
, z
);
389 // Returns the number of bytes reserved or 0 on error.
391 walresvupdate(Wal
*w
, job j
)
397 return reserve(w
, z
);
401 // Returns the number of locks acquired: either 0 or 1.
410 r
= snprintf(path
, PATH_MAX
, "%s/lock", w
->dir
);
412 twarnx("path too long: %s/lock", w
->dir
);
416 fd
= open(path
, O_WRONLY
|O_CREAT
, 0600);
423 lk
.l_whence
= SEEK_SET
;
426 r
= fcntl(fd
, F_SETLK
, &lk
);
432 // intentionally leak fd, since we never want to close it
433 // and we'll never need it again
439 walread(Wal
*w
, job list
, int min
, int max
)
445 for (i
= min
; i
< w
->next
; i
++) {
452 if (!fileinit(f
, w
, i
)) {
458 fd
= open(f
->path
, O_RDONLY
);
460 twarn("open %s", f
->path
);
468 err
|= fileread(f
, list
);
473 warnx("Errors reading one or more WAL files.");
474 warnx("Continuing. You may be missing data.");
480 walinit(Wal
*w
, job list
)
485 walread(w
, list
, min
, w
->next
);
487 // first writable file
488 if (!makenextfile(w
)) {
489 twarnx("makenextfile");