13 static void warnpos(File
*, char*, ...);
14 static int readrec(File
*, job
, int*);
15 static int readfull(File
*, void*, int, int*, char*);
38 fileaddjob(File
*f
, job j
)
43 if (!h
->fprev
) h
->fprev
= h
;
54 filermjob(File
*f
, job j
)
57 if (f
!= j
->file
) return;
58 j
->fnext
->fprev
= j
->fprev
;
59 j
->fprev
->fnext
= j
->fnext
;
63 f
->w
->alive
-= j
->walused
;
69 // Fileread reads jobs from f->path into list.
70 // It returns 0 on success, or 1 if any errors occurred.
72 fileread(File
*f
, job list
)
76 if (!readfull(f
, &v
, sizeof(v
), &err
, "version")) {
80 warnx("%s: wrong version: want %d, got %d", f
->path
, Walver
, v
);
85 while (readrec(f
, list
, &err
));
91 // Readrec reads a record from f->fd into linked list l.
92 // If an error occurs, it sets *err to 1.
93 // Readrec returns the number of records read, either 1 or 0.
95 readrec(File
*f
, job l
, int *err
)
102 char tubename
[MAX_TUBE_NAME_LEN
];
104 r
= read(f
->fd
, &namelen
, sizeof(int));
111 if (r
!= sizeof(int)) {
115 if (namelen
>= MAX_TUBE_NAME_LEN
) {
116 warnpos(f
, "namelen %d exceeds maximum of %d", namelen
, MAX_TUBE_NAME_LEN
- 1);
122 r
= readfull(f
, tubename
, namelen
, err
, "tube name");
128 tubename
[namelen
] = '\0';
130 r
= readfull(f
, &jr
, sizeof(Jobrec
), err
, "job struct");
136 // are we reading trailing zeroes?
137 if (!jr
.id
) return 0;
140 if (!(j
|| namelen
)) {
141 // We read a short record without having seen a
142 // full record for this job, so the full record
143 // was in an eariler file that has been deleted.
144 // Therefore the job itself has either been
145 // deleted or migrated; either way, this record
146 // should be ignored.
157 t
= tube_find_or_make(tubename
);
158 j
= make_job_with_id(jr
.pri
, jr
.delay
, jr
.ttr
, jr
.body_size
,
160 j
->next
= j
->prev
= j
;
161 j
->r
.created_at
= jr
.created_at
;
166 // full record; read the job body
168 if (jr
.body_size
!= j
->r
.body_size
) {
169 warnpos(f
, "job %llu size changed", j
->r
.id
);
170 warnpos(f
, "was %zu, now %zu", j
->r
.body_size
, jr
.body_size
);
173 r
= readfull(f
, j
->body
, j
->r
.body_size
, err
, "job body");
179 // since this is a full record, we can move
180 // the file pointer and decref the old
182 filermjob(j
->file
, j
);
192 filermjob(j
->file
, j
);
202 filermjob(j
->file
, j
);
210 readfull(File
*f
, void *c
, int n
, int *err
, char *desc
)
214 r
= read(f
->fd
, c
, n
);
217 warnpos(f
, "error reading %s", desc
);
222 warnpos(f
, "unexpected EOF reading %s", desc
);
231 warnpos(File
*f
, char *fmt
, ...)
236 off
= lseek(f
->fd
, 0, SEEK_CUR
);
237 fprintf(stderr
, "%s:%u: ", f
->path
, off
);
239 vfprintf(stderr
, fmt
, ap
);
245 // Opens f for writing, writes a header, and initializes
246 // f->free and f->resv.
247 // Sets f->iswopen if successful.
255 fd
= open(f
->path
, O_WRONLY
|O_CREAT
, 0400);
257 twarn("open %s", f
->path
);
261 r
= falloc(fd
, f
->w
->filesz
);
265 twarn("Cannot allocate space for file %s", f
->path
);
269 n
= write(fd
, &ver
, sizeof(int));
270 if (n
< sizeof(int)) {
271 twarn("write %s", f
->path
);
279 f
->free
= f
->w
->filesz
- n
;
285 filewrite(File
*f
, job j
, void *buf
, int len
)
289 r
= write(f
->fd
, buf
, len
);
305 filewrjobshort(File
*f
, job j
)
309 nl
= 0; // name len 0 indicates short record
310 r
= filewrite(f
, j
, &nl
, sizeof nl
) &&
311 filewrite(f
, j
, &j
->r
, sizeof j
->r
);
314 if (j
->r
.state
== Invalid
) {
315 filermjob(j
->file
, j
);
323 filewrjobfull(File
*f
, job j
)
328 nl
= strlen(j
->tube
->name
);
330 filewrite(f
, j
, &nl
, sizeof nl
) &&
331 filewrite(f
, j
, j
->tube
->name
, nl
) &&
332 filewrite(f
, j
, &j
->r
, sizeof j
->r
) &&
333 filewrite(f
, j
, j
->body
, j
->r
.body_size
);
343 if (!f
->iswopen
) return;
345 // Some compilers give a warning if the return value of ftruncate is
346 // ignored. So we pretend to use it.
347 r
= ftruncate(f
->fd
, f
->w
->filesz
- f
->free
);
348 if (r
== -1); // do nothing
357 fileinit(File
*f
, Wal
*w
, int n
)
361 f
->path
= fmtalloc("%s/binlog.%d", w
->dir
, n
);
366 // Adds f to the linked list in w,
367 // updating w->tail and w->head as necessary.
369 fileadd(File
*f
, Wal
*w
)