14 static int reserve(Wal
*w
, int n
);
17 // Reads w->dir for files matching binlog.NNN,
18 // sets w->next to the next unused number, and
19 // returns the minimum number.
20 // If no files are found, sets w->next to 1 and
21 // returns a large number.
25 static char base
[] = "binlog.";
26 static const int len
= sizeof(base
) - 1;
37 while ((e
= readdir(d
))) {
38 if (strncmp(e
->d_name
, base
, len
) == 0) {
39 n
= strtol(e
->d_name
+len
, &p
, 10);
40 if (p
&& *p
== '\0') {
58 while (w
->head
&& !w
->head
->refs
) {
62 w
->tail
= f
->next
; // also, f->next == NULL
73 // returns 1 on success, 0 on error.
81 twarnx("there is no next wal file");
96 d
= w
->alive
+ w
->resv
;
97 n
= (int64
)w
->nfile
* (int64
)w
->filesize
- d
;
103 // Returns the number of bytes reserved or 0 on error.
105 walresvmigrate(Wal
*w
, Job
*j
)
109 // reserve only space for the migrated full job record
110 // space for the delete is already reserved
112 z
+= strlen(j
->tube
->name
);
116 return reserve(w
, z
);
125 if (w
->head
== w
->cur
|| w
->head
->next
== w
->cur
) {
126 // no point in moving a job
130 j
= w
->head
->jlist
.fnext
;
131 if (!j
|| j
== &w
->head
->jlist
) {
132 // head holds no jlist; can't happen
133 twarnx("head holds no jlist");
137 if (!walresvmigrate(w
, j
)) {
138 // it will not fit, so we'll try again later
142 filermjob(w
->head
, j
);
153 for (r
=ratio(w
); r
>=2; r
--) {
165 if (w
->wantsync
&& now
>= w
->lastsync
+w
->syncrate
) {
167 if (fsync(w
->cur
->fd
) == -1) {
174 // Walwrite writes j to the log w (if w is enabled).
175 // On failure, walwrite disables w and returns 0; on success, it returns 1.
176 // Unlke walresv*, walwrite should never fail because of a full disk.
177 // If w is disabled, then walwrite takes no action and returns 1.
179 walwrite(Wal
*w
, Job
*j
)
183 if (!w
->use
) return 1;
184 if (w
->cur
->resv
> 0 || usenext(w
)) {
186 r
= filewrjobshort(w
->cur
, j
);
188 r
= filewrjobfull(w
->cur
, j
);
221 if (!fileinit(f
, w
, w
->next
)) {
241 moveresv(File
*to
, File
*from
, int n
)
251 needfree(Wal
*w
, int n
)
253 if (w
->tail
->free
>= n
) return n
;
254 if (makenextfile(w
)) return n
;
260 // 1. b->resv is congruent to n (mod z).
261 // 2. x->resv is congruent to 0 (mod z) for each future file x.
262 // Assumes (and preserves) that b->resv >= n.
263 // Reserved space is conserved (neither created nor destroyed);
264 // we just move it around to preserve the invariant.
265 // We might have to allocate a new file.
266 // Returns 1 on success, otherwise 0. If there was a failure,
267 // w->tail is not updated.
269 balancerest(Wal
*w
, File
*b
, int n
)
272 static const int z
= sizeof(int) + sizeof(Jobrec
);
278 if (r
== 0) return balancerest(w
, b
->next
, 0);
281 if (w
->tail
->resv
>= c
&& b
->free
>= c
) {
282 moveresv(b
, w
->tail
, c
);
283 return balancerest(w
, b
->next
, 0);
286 if (needfree(w
, r
) != r
) {
290 moveresv(w
->tail
, b
, r
);
291 return balancerest(w
, b
->next
, 0);
296 // 1. w->cur->resv >= n.
297 // 2. w->cur->resv is congruent to n (mod z).
298 // 3. x->resv is congruent to 0 (mod z) for each future file x.
299 // (where z is the size of a delete record in the wal).
300 // Reserved space is conserved (neither created nor destroyed);
301 // we just move it around to preserve the invariant.
302 // We might have to allocate a new file.
303 // Returns 1 on success, otherwise 0. If there was a failure,
304 // w->tail is not updated.
306 balance(Wal
*w
, int n
)
309 // (this loop will run at most once)
310 while (w
->cur
->resv
< n
) {
311 int m
= w
->cur
->resv
;
313 int r
= needfree(w
, m
);
319 moveresv(w
->tail
, w
->cur
, m
);
323 // Invariants 2 and 3
324 return balancerest(w
, w
->cur
, n
);
328 // Returns the number of bytes successfully reserved: either 0 or n.
330 reserve(Wal
*w
, int n
)
334 // return value must be nonzero but is otherwise ignored
335 if (!w
->use
) return 1;
337 if (w
->cur
->free
>= n
) {
353 if (!balance(w
, n
)) {
354 // error; undo the reservation
365 // Returns the number of bytes reserved or 0 on error.
367 walresvput(Wal
*w
, Job
*j
)
371 // reserve space for the initial job record
373 z
+= strlen(j
->tube
->name
);
377 // plus space for a delete to come later
381 return reserve(w
, z
);
385 // Returns the number of bytes reserved or 0 on error.
387 walresvupdate(Wal
*w
)
392 return reserve(w
, z
);
396 // Returns the number of locks acquired: either 0 or 1.
406 path_length
= strlen(w
->dir
) + strlen("/lock") + 1;
407 if ((path
= malloc(path_length
)) == NULL
) {
411 snprintf(path
, path_length
, "%s/lock", w
->dir
);
413 fd
= open(path
, O_WRONLY
|O_CREAT
, 0600);
421 lk
.l_whence
= SEEK_SET
;
424 r
= fcntl(fd
, F_SETLK
, &lk
);
430 // intentionally leak fd, since we never want to close it
431 // and we'll never need it again
437 walread(Wal
*w
, Job
*list
, int min
)
442 for (i
= min
; i
< w
->next
; i
++) {
449 if (!fileinit(f
, w
, i
)) {
455 int fd
= open(f
->path
, O_RDONLY
);
457 twarn("open %s", f
->path
);
465 err
|= fileread(f
, list
);
471 warnx("Errors reading one or more WAL files.");
472 warnx("Continuing. You may be missing data.");
478 walinit(Wal
*w
, Job
*list
)
483 walread(w
, list
, min
);
485 // first writable file
486 if (!makenextfile(w
)) {
487 twarnx("makenextfile");