15 static void warnpos(File
*, int, char*, ...);
16 static int readrec(File
*, job
, int*);
17 static int readrec5(File
*, job
, int*);
18 static int readfull(File
*, void*, int, int*, char*);
20 FAlloc
*falloc
= &rawfalloc
;
27 typedef struct Jobrec5 Jobrec5
;
35 uint64 created_at
; // usec
36 uint64 deadline_at
; // usec
49 Jobrec5size
= offsetof(Jobrec5
, pad
)
72 fileaddjob(File
*f
, job j
)
77 if (!h
->fprev
) h
->fprev
= h
;
88 filermjob(File
*f
, job j
)
91 if (f
!= j
->file
) return;
92 j
->fnext
->fprev
= j
->fprev
;
93 j
->fprev
->fnext
= j
->fnext
;
97 f
->w
->alive
-= j
->walused
;
103 // Fileread reads jobs from f->path into list.
104 // It returns 0 on success, or 1 if any errors occurred.
106 fileread(File
*f
, job list
)
110 if (!readfull(f
, &v
, sizeof(v
), &err
, "version")) {
116 while (readrec(f
, list
, &err
));
121 while (readrec5(f
, list
, &err
));
126 warnx("%s: unknown version: %d", f
->path
, v
);
131 // Readrec reads a record from f->fd into linked list l.
132 // If an error occurs, it sets *err to 1.
133 // Readrec returns the number of records read, either 1 or 0.
135 readrec(File
*f
, job l
, int *err
)
142 char tubename
[MAX_TUBE_NAME_LEN
];
144 r
= read(f
->fd
, &namelen
, sizeof(int));
147 warnpos(f
, 0, "error");
151 if (r
!= sizeof(int)) {
155 if (namelen
>= MAX_TUBE_NAME_LEN
) {
156 warnpos(f
, -r
, "namelen %d exceeds maximum of %d", namelen
, MAX_TUBE_NAME_LEN
- 1);
162 warnpos(f
, -r
, "namelen %d is negative", namelen
);
168 r
= readfull(f
, tubename
, namelen
, err
, "tube name");
174 tubename
[namelen
] = '\0';
176 r
= readfull(f
, &jr
, sizeof(Jobrec
), err
, "job struct");
182 // are we reading trailing zeroes?
183 if (!jr
.id
) return 0;
186 if (!(j
|| namelen
)) {
187 // We read a short record without having seen a
188 // full record for this job, so the full record
189 // was in an eariler file that has been deleted.
190 // Therefore the job itself has either been
191 // deleted or migrated; either way, this record
192 // should be ignored.
203 if (jr
.body_size
> job_data_size_limit
) {
204 warnpos(f
, -r
, "job %"PRIu64
" is too big (%zd > %zd)", j
->r
.id
, job_data_size_limit
);
207 t
= tube_find_or_make(tubename
);
208 j
= make_job_with_id(jr
.pri
, jr
.delay
, jr
.ttr
, jr
.body_size
,
210 j
->next
= j
->prev
= j
;
211 j
->r
.created_at
= jr
.created_at
;
216 // full record; read the job body
218 if (jr
.body_size
!= j
->r
.body_size
) {
219 warnpos(f
, -r
, "job %"PRIu64
" size changed", j
->r
.id
);
220 warnpos(f
, -r
, "was %zu, now %zu", j
->r
.body_size
, jr
.body_size
);
223 r
= readfull(f
, j
->body
, j
->r
.body_size
, err
, "job body");
229 // since this is a full record, we can move
230 // the file pointer and decref the old
232 filermjob(j
->file
, j
);
242 filermjob(j
->file
, j
);
252 filermjob(j
->file
, j
);
259 // Readrec5 is like readrec, but it reads a record in "version 5"
260 // of the log format.
262 readrec5(File
*f
, job l
, int *err
)
269 char tubename
[MAX_TUBE_NAME_LEN
];
271 r
= read(f
->fd
, &namelen
, sizeof(namelen
));
274 warnpos(f
, 0, "error");
278 if (r
!= sizeof(namelen
)) {
282 if (namelen
>= MAX_TUBE_NAME_LEN
) {
283 warnpos(f
, -r
, "namelen %d exceeds maximum of %d", namelen
, MAX_TUBE_NAME_LEN
- 1);
289 warnpos(f
, -r
, "namelen %d is negative", namelen
);
295 r
= readfull(f
, tubename
, namelen
, err
, "v5 tube name");
301 tubename
[namelen
] = '\0';
303 r
= readfull(f
, &jr
, Jobrec5size
, err
, "v5 job struct");
309 // are we reading trailing zeroes?
310 if (!jr
.id
) return 0;
313 if (!(j
|| namelen
)) {
314 // We read a short record without having seen a
315 // full record for this job, so the full record
316 // was in an eariler file that has been deleted.
317 // Therefore the job itself has either been
318 // deleted or migrated; either way, this record
319 // should be ignored.
330 if (jr
.body_size
> job_data_size_limit
) {
331 warnpos(f
, -r
, "job %"PRIu64
" is too big (%zd > %zd)", j
->r
.id
, job_data_size_limit
);
334 t
= tube_find_or_make(tubename
);
335 j
= make_job_with_id(jr
.pri
, jr
.delay
, jr
.ttr
, jr
.body_size
,
337 j
->next
= j
->prev
= j
;
338 j
->r
.created_at
= jr
.created_at
;
342 j
->r
.delay
= jr
.delay
* 1000; // us => ns
343 j
->r
.ttr
= jr
.ttr
* 1000; // us => ns
344 j
->r
.body_size
= jr
.body_size
;
345 j
->r
.created_at
= jr
.created_at
* 1000; // us => ns
346 j
->r
.deadline_at
= jr
.deadline_at
* 1000; // us => ns
347 j
->r
.reserve_ct
= jr
.reserve_ct
;
348 j
->r
.timeout_ct
= jr
.timeout_ct
;
349 j
->r
.release_ct
= jr
.release_ct
;
350 j
->r
.bury_ct
= jr
.bury_ct
;
351 j
->r
.kick_ct
= jr
.kick_ct
;
352 j
->r
.state
= jr
.state
;
355 // full record; read the job body
357 if (jr
.body_size
!= j
->r
.body_size
) {
358 warnpos(f
, -r
, "job %"PRIu64
" size changed", j
->r
.id
);
359 warnpos(f
, -r
, "was %zu, now %zu", j
->r
.body_size
, jr
.body_size
);
362 r
= readfull(f
, j
->body
, j
->r
.body_size
, err
, "v5 job body");
368 // since this is a full record, we can move
369 // the file pointer and decref the old
371 filermjob(j
->file
, j
);
381 filermjob(j
->file
, j
);
391 filermjob(j
->file
, j
);
399 readfull(File
*f
, void *c
, int n
, int *err
, char *desc
)
403 r
= read(f
->fd
, c
, n
);
406 warnpos(f
, 0, "error reading %s", desc
);
411 warnpos(f
, -r
, "unexpected EOF reading %d bytes (got %d): %s", n
, r
, desc
);
420 warnpos(File
*f
, int adj
, char *fmt
, ...)
425 off
= lseek(f
->fd
, 0, SEEK_CUR
);
426 fprintf(stderr
, "%s:%u: ", f
->path
, off
+adj
);
428 vfprintf(stderr
, fmt
, ap
);
434 // Opens f for writing, writes a header, and initializes
435 // f->free and f->resv.
436 // Sets f->iswopen if successful.
444 fd
= open(f
->path
, O_WRONLY
|O_CREAT
, 0400);
446 twarn("open %s", f
->path
);
450 r
= falloc(fd
, f
->w
->filesize
);
454 twarn("falloc %s", f
->path
);
457 twarn("unlink %s", f
->path
);
462 n
= write(fd
, &ver
, sizeof(int));
463 if (n
< sizeof(int)) {
464 twarn("write %s", f
->path
);
472 f
->free
= f
->w
->filesize
- n
;
478 filewrite(File
*f
, job j
, void *buf
, int len
)
482 r
= write(f
->fd
, buf
, len
);
498 filewrjobshort(File
*f
, job j
)
502 nl
= 0; // name len 0 indicates short record
503 r
= filewrite(f
, j
, &nl
, sizeof nl
) &&
504 filewrite(f
, j
, &j
->r
, sizeof j
->r
);
507 if (j
->r
.state
== Invalid
) {
508 filermjob(j
->file
, j
);
516 filewrjobfull(File
*f
, job j
)
521 nl
= strlen(j
->tube
->name
);
523 filewrite(f
, j
, &nl
, sizeof nl
) &&
524 filewrite(f
, j
, j
->tube
->name
, nl
) &&
525 filewrite(f
, j
, &j
->r
, sizeof j
->r
) &&
526 filewrite(f
, j
, j
->body
, j
->r
.body_size
);
534 if (!f
->iswopen
) return;
536 (void)ftruncate(f
->fd
, f
->w
->filesize
- f
->free
);
545 fileinit(File
*f
, Wal
*w
, int n
)
549 f
->path
= fmtalloc("%s/binlog.%d", w
->dir
, n
);
554 // Adds f to the linked list in w,
555 // updating w->tail and w->head as necessary.
557 fileadd(File
*f
, Wal
*w
)