15 static int readrec(File
*, Job
*, int*);
16 static int readrec5(File
*, Job
*, int*);
17 static int readfull(File
*, void*, int, int*, char*);
18 static void warnpos(File
*, int, char*, ...)
19 __attribute__((format(printf
, 3, 4)));
21 FAlloc
*falloc
= &rawfalloc
;
28 typedef struct Jobrec5 Jobrec5
;
36 uint64 created_at
; // usec
37 uint64 deadline_at
; // usec
50 Jobrec5size
= offsetof(Jobrec5
, pad
)
53 // rawfalloc allocates disk space of len bytes.
54 // It expects fd's offset to be 0; may also reset fd's offset to 0.
55 // Returns 0 on success, and a positive errno otherwise.
57 rawfalloc(int fd
, int len
)
59 // We do not use ftruncate() because it might extend the file
60 // with a sequence of null bytes or a hole.
61 // posix_fallocate() is not portable enough, might fail for NFS.
62 static char buf
[4096] = {0};
65 for (i
= 0; i
< len
; i
+= w
) {
66 w
= write(fd
, buf
, sizeof buf
);
70 lseek(fd
, 0, 0); // do not care if this fails
94 fileaddjob(File
*f
, Job
*j
)
99 if (!h
->fprev
) h
->fprev
= h
;
110 filermjob(File
*f
, Job
*j
)
113 if (f
!= j
->file
) return;
114 j
->fnext
->fprev
= j
->fprev
;
115 j
->fprev
->fnext
= j
->fnext
;
119 f
->w
->alive
-= j
->walused
;
125 // Fileread reads jobs from f->path into list.
126 // It returns 0 on success, or 1 if any errors occurred.
128 fileread(File
*f
, Job
*list
)
132 if (!readfull(f
, &v
, sizeof(v
), &err
, "version")) {
138 while (readrec(f
, list
, &err
));
143 while (readrec5(f
, list
, &err
));
148 warnx("%s: unknown version: %d", f
->path
, v
);
153 // Readrec reads a record from f->fd into linked list l.
154 // If an error occurs, it sets *err to 1.
155 // Readrec returns the number of records read, either 1 or 0.
157 readrec(File
*f
, Job
*l
, int *err
)
164 char tubename
[MAX_TUBE_NAME_LEN
];
166 r
= read(f
->fd
, &namelen
, sizeof(int));
169 warnpos(f
, 0, "error");
173 if (r
!= sizeof(int)) {
177 if (namelen
>= MAX_TUBE_NAME_LEN
) {
178 warnpos(f
, -r
, "namelen %d exceeds maximum of %d", namelen
, MAX_TUBE_NAME_LEN
- 1);
184 warnpos(f
, -r
, "namelen %d is negative", namelen
);
190 r
= readfull(f
, tubename
, namelen
, err
, "tube name");
196 tubename
[namelen
] = '\0';
198 r
= readfull(f
, &jr
, sizeof(Jobrec
), err
, "job struct");
204 // are we reading trailing zeroes?
205 if (!jr
.id
) return 0;
208 if (!(j
|| namelen
)) {
209 // We read a short record without having seen a
210 // full record for this job, so the full record
211 // was in an earlier file that has been deleted.
212 // Therefore the job itself has either been
213 // deleted or migrated; either way, this record
214 // should be ignored.
226 if ((size_t)jr
.body_size
> job_data_size_limit
) {
227 warnpos(f
, -r
, "job %"PRIu64
" is too big (%"PRId32
" > %zu)",
230 job_data_size_limit
);
233 t
= tube_find_or_make(tubename
);
234 j
= make_job_with_id(jr
.pri
, jr
.delay
, jr
.ttr
, jr
.body_size
,
237 j
->r
.created_at
= jr
.created_at
;
240 job_list_insert(l
, j
);
242 // full record; read the job body
244 if (jr
.body_size
!= j
->r
.body_size
) {
245 warnpos(f
, -r
, "job %"PRIu64
" size changed", j
->r
.id
);
246 warnpos(f
, -r
, "was %d, now %d", j
->r
.body_size
, jr
.body_size
);
249 r
= readfull(f
, j
->body
, j
->r
.body_size
, err
, "job body");
255 // since this is a full record, we can move
256 // the file pointer and decref the old
258 filermjob(j
->file
, j
);
268 filermjob(j
->file
, j
);
278 filermjob(j
->file
, j
);
285 // Readrec5 is like readrec, but it reads a record in "version 5"
286 // of the log format.
288 readrec5(File
*f
, Job
*l
, int *err
)
295 char tubename
[MAX_TUBE_NAME_LEN
];
297 r
= read(f
->fd
, &namelen
, sizeof(namelen
));
300 warnpos(f
, 0, "error");
304 if (r
!= sizeof(namelen
)) {
308 if (namelen
>= MAX_TUBE_NAME_LEN
) {
309 warnpos(f
, -r
, "namelen %zu exceeds maximum of %d", namelen
, MAX_TUBE_NAME_LEN
- 1);
315 r
= readfull(f
, tubename
, namelen
, err
, "v5 tube name");
321 tubename
[namelen
] = '\0';
323 r
= readfull(f
, &jr
, Jobrec5size
, err
, "v5 job struct");
329 // are we reading trailing zeroes?
330 if (!jr
.id
) return 0;
333 if (!(j
|| namelen
)) {
334 // We read a short record without having seen a
335 // full record for this job, so the full record
336 // was in an eariler file that has been deleted.
337 // Therefore the job itself has either been
338 // deleted or migrated; either way, this record
339 // should be ignored.
351 if ((size_t)jr
.body_size
> job_data_size_limit
) {
352 warnpos(f
, -r
, "job %"PRIu64
" is too big (%"PRId32
" > %zu)",
355 job_data_size_limit
);
358 t
= tube_find_or_make(tubename
);
359 j
= make_job_with_id(jr
.pri
, jr
.delay
, jr
.ttr
, jr
.body_size
,
365 j
->r
.delay
= jr
.delay
* 1000; // us => ns
366 j
->r
.ttr
= jr
.ttr
* 1000; // us => ns
367 j
->r
.body_size
= jr
.body_size
;
368 j
->r
.created_at
= jr
.created_at
* 1000; // us => ns
369 j
->r
.deadline_at
= jr
.deadline_at
* 1000; // us => ns
370 j
->r
.reserve_ct
= jr
.reserve_ct
;
371 j
->r
.timeout_ct
= jr
.timeout_ct
;
372 j
->r
.release_ct
= jr
.release_ct
;
373 j
->r
.bury_ct
= jr
.bury_ct
;
374 j
->r
.kick_ct
= jr
.kick_ct
;
375 j
->r
.state
= jr
.state
;
376 job_list_insert(l
, j
);
378 // full record; read the job body
380 if (jr
.body_size
!= j
->r
.body_size
) {
381 warnpos(f
, -r
, "job %"PRIu64
" size changed", j
->r
.id
);
382 warnpos(f
, -r
, "was %"PRId32
", now %"PRId32
, j
->r
.body_size
, jr
.body_size
);
385 r
= readfull(f
, j
->body
, j
->r
.body_size
, err
, "v5 job body");
391 // since this is a full record, we can move
392 // the file pointer and decref the old
394 filermjob(j
->file
, j
);
404 filermjob(j
->file
, j
);
414 filermjob(j
->file
, j
);
422 readfull(File
*f
, void *c
, int n
, int *err
, char *desc
)
426 r
= read(f
->fd
, c
, n
);
429 warnpos(f
, 0, "error reading %s", desc
);
434 warnpos(f
, -r
, "unexpected EOF reading %d bytes (got %d): %s", n
, r
, desc
);
442 warnpos(File
*f
, int adj
, char *fmt
, ...)
447 off
= lseek(f
->fd
, 0, SEEK_CUR
);
448 fprintf(stderr
, "%s:%d: ", f
->path
, off
+adj
);
450 vfprintf(stderr
, fmt
, ap
);
456 // Opens f for writing, writes a header, and initializes
457 // f->free and f->resv.
458 // Sets f->iswopen if successful.
466 fd
= open(f
->path
, O_WRONLY
|O_CREAT
, 0400);
468 twarn("open %s", f
->path
);
472 r
= falloc(fd
, f
->w
->filesize
);
477 twarn("falloc %s", f
->path
);
480 twarn("unlink %s", f
->path
);
485 n
= write(fd
, &ver
, sizeof(int));
486 if (n
< 0 || (size_t)n
< sizeof(int)) {
487 twarn("write %s", f
->path
);
496 f
->free
= f
->w
->filesize
- n
;
502 filewrite(File
*f
, Job
*j
, void *buf
, int len
)
506 r
= write(f
->fd
, buf
, len
);
522 filewrjobshort(File
*f
, Job
*j
)
526 nl
= 0; // name len 0 indicates short record
527 r
= filewrite(f
, j
, &nl
, sizeof nl
) &&
528 filewrite(f
, j
, &j
->r
, sizeof j
->r
);
531 if (j
->r
.state
== Invalid
) {
532 filermjob(j
->file
, j
);
540 filewrjobfull(File
*f
, Job
*j
)
545 nl
= strlen(j
->tube
->name
);
547 filewrite(f
, j
, &nl
, sizeof nl
) &&
548 filewrite(f
, j
, j
->tube
->name
, nl
) &&
549 filewrite(f
, j
, &j
->r
, sizeof j
->r
) &&
550 filewrite(f
, j
, j
->body
, j
->r
.body_size
);
558 if (!f
->iswopen
) return;
561 if (ftruncate(f
->fd
, f
->w
->filesize
- f
->free
) != 0) {
565 if (close(f
->fd
) == -1)
573 fileinit(File
*f
, Wal
*w
, int n
)
577 f
->path
= fmtalloc("%s/binlog.%d", w
->dir
, n
);
582 // Adds f to the linked list in w,
583 // updating w->tail and w->head as necessary.
585 fileadd(File
*f
, Wal
*w
)