1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
26 #include <sys/resource.h>
27 #include <sys/param.h>
39 /* max size we will create a log file */
40 size_t binlog_size_limit
= 10 << 20;
42 char *binlog_dir
= NULL
;
43 static int binlog_index
= 0;
44 static int binlog_fd
= -1, next_binlog_fd
= -1;
45 static int binlog_version
= 2;
46 static size_t binlog_space
= 0, next_binlog_space
= 0;
47 static size_t binlog_reserved
= 0, next_binlog_reserved
= 0;
48 static size_t bytes_written
;
51 static binlog first_binlog
= NULL
, last_binlog
= NULL
, next_binlog
= NULL
;
64 dirp
= opendir(binlog_dir
);
67 while ((dp
= readdir(dirp
)) != NULL
) {
68 name_len
= strlen(dp
->d_name
);
69 if (name_len
> 7 && !strncmp("binlog.", dp
->d_name
, 7)) {
70 val
= strtol(dp
->d_name
+ 7, &endptr
, 10);
71 if (endptr
&& *endptr
== 0) {
72 if (max
== 0 || val
> max
) max
= val
;
73 if (min
== 0 || val
< min
) min
= val
;
79 binlog_index
= (int) max
;
86 binlog b
= first_binlog
;
90 first_binlog
= b
->next
;
91 if (!first_binlog
) last_binlog
= NULL
;
105 binlog_dref(binlog b
)
108 if (b
->refs
< 1) return twarnx("refs is zero for binlog: %s", b
->path
);
112 while (first_binlog
&& first_binlog
->refs
== 0) binlog_remove_first();
118 binlog_warn(int fd, const char* path, const char *msg)
120 warnx("WARNING, %s at %s:%u.\n%s", msg, path, lseek(fd, 0, SEEK_CUR),
121 " Continuing. You may be missing data.");
125 #define binlog_warn(fd, path, fmt, args...) \
126 warnx("WARNING, " fmt " at %s:%u. %s: ", \
127 ##args, path, lseek(fd, 0, SEEK_CUR), \
128 "Continuing. You may be missing data.")
131 binlog_read_one(int fd
, job binlog_jobs
, const char *path
)
136 char tubename
[MAX_TUBE_NAME_LEN
];
141 r
= read(fd
, &version
, sizeof(version
));
142 if (r
== -1) return twarn("read()");
143 if (r
< sizeof(version
)) {
144 return binlog_warn(fd
, path
, "EOF while reading version record");
147 if (version
!= binlog_version
) {
148 return warnx("%s: binlog version mismatch %d %d", path
, version
,
152 while (read(fd
, &namelen
, sizeof(size_t)) == sizeof(size_t)) {
154 r
= read(fd
, tubename
, namelen
);
155 if (r
== -1) return twarn("read()");
157 lseek(fd
, SEEK_CUR
, 0);
158 return binlog_warn(fd
, path
, "EOF while reading tube name");
162 tubename
[namelen
] = '\0';
163 r
= read(fd
, &js
, sizeof(struct job
));
164 if (r
== -1) return twarn("read()");
165 if (r
< sizeof(struct job
)) {
166 return binlog_warn(fd
, path
, "EOF while reading job record");
173 case JOB_STATE_INVALID
:
176 binlog_dref(j
->binlog
);
181 case JOB_STATE_READY
:
182 case JOB_STATE_DELAYED
:
184 t
= tube_find_or_make(tubename
);
185 j
= make_job_with_id(js
.pri
, js
.delay
, js
.ttr
, js
.body_size
,
187 j
->next
= j
->prev
= j
;
188 j
->creation
= js
.creation
;
189 job_insert(binlog_jobs
, j
);
192 if (js
.body_size
> j
->body_size
) {
193 warnx("job size increased from %zu to %zu", j
->body_size
,
196 binlog_dref(j
->binlog
);
198 return binlog_warn(fd
, path
, "EOF while reading job body");
200 r
= read(fd
, j
->body
, js
.body_size
);
201 if (r
== -1) return twarn("read()");
202 if (r
< js
.body_size
) {
203 warnx("dropping incomplete job %llu", j
->id
);
205 binlog_dref(j
->binlog
);
207 return binlog_warn(fd
, path
, "EOF while reading job body");
214 j
->deadline
= js
.deadline
;
218 j
->timeout_ct
= js
.timeout_ct
;
219 j
->release_ct
= js
.release_ct
;
220 j
->bury_ct
= js
.bury_ct
;
221 j
->kick_ct
= js
.kick_ct
;
223 /* this is a complete record, so we can move the binlog ref */
224 if (namelen
&& js
.body_size
) {
225 binlog_dref(j
->binlog
);
226 j
->binlog
= binlog_iref(last_binlog
);
235 if (binlog_fd
< 0) return;
237 binlog_dref(last_binlog
);
242 make_binlog(char *path
)
246 b
= (binlog
) malloc(sizeof(struct binlog
) + strlen(path
) + 1);
247 if (!b
) return twarnx("OOM"), NULL
;
248 strcpy(b
->path
, path
);
260 if (!binlog_dir
) return NULL
;
262 r
= snprintf(path
, PATH_MAX
, "%s/binlog.%d", binlog_dir
, ++binlog_index
);
263 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
), NULL
;
265 return make_binlog(path
);
271 if (last_binlog
) last_binlog
->next
= b
;
273 if (!first_binlog
) first_binlog
= b
;
279 binlog_open(binlog log
)
283 if (!binlog_iref(log
)) return -1;
285 fd
= open(log
->path
, O_WRONLY
| O_CREAT
, 0400);
287 if (fd
< 0) return twarn("Cannot open binlog %s", log
->path
), -1;
289 #ifdef HAVE_POSIX_FALLOCATE
290 r
= posix_fallocate(fd
, 0, binlog_size_limit
);
292 #warning no known method to preallocate files on this platform
299 return twarn("Cannot allocate space for binlog %s", log
->path
), -1;
302 bytes_written
= write(fd
, &binlog_version
, sizeof(int));
304 if (bytes_written
< sizeof(int)) {
305 twarn("Cannot write to binlog");
314 /* returns 1 on success, 0 on error. */
318 if (binlog_fd
< 0) return 0;
319 if (next_binlog_fd
< 0) return 0;
320 if (binlog_reserved
> next_binlog_space
) return twarnx("overextended"), 0;
324 binlog_fd
= next_binlog_fd
;
325 add_binlog(next_binlog
);
330 binlog_space
= next_binlog_space
- binlog_reserved
;
331 binlog_reserved
= next_binlog_reserved
+ binlog_reserved
;
333 next_binlog_reserved
= next_binlog_space
= 0;
344 /* Returns the number of jobs successfully written (either 0 or 1). */
345 /* If we are not using the binlog at all (binlog_fd < 0), then we pretend to
346 have made a successful write and return 1. */
348 binlog_write_job(job j
)
351 size_t tube_namelen
, to_write
= 0;
352 struct iovec vec
[4], *vptr
;
355 if (binlog_fd
< 0) return 1;
358 vec
[0].iov_base
= (char *) &tube_namelen
;
359 to_write
+= vec
[0].iov_len
= sizeof(size_t);
361 vec
[1].iov_base
= j
->tube
->name
;
364 /* we could save some bytes in the binlog file by only saving some parts of
366 vec
[2].iov_base
= (char *) j
;
367 to_write
+= vec
[2].iov_len
= sizeof(struct job
);
369 printf("writing job %lld state %d\n", j
->id
, j
->state
);
371 if (j
->state
== JOB_STATE_READY
|| j
->state
== JOB_STATE_DELAYED
) {
373 tube_namelen
= strlen(j
->tube
->name
);
374 to_write
+= vec
[1].iov_len
= tube_namelen
;
376 vec
[3].iov_base
= j
->body
;
377 to_write
+= vec
[3].iov_len
= j
->body_size
;
379 } else if (j
->state
== JOB_STATE_INVALID
) {
380 if (j
->binlog
) binlog_dref(j
->binlog
);
383 return twarnx("unserializable job state: %d", j
->state
), 0;
386 if (to_write
> binlog_reserved
) {
387 r
= binlog_use_next();
388 if (!r
) return twarnx("failed to use next binlog"), 0;
391 if (j
->state
&& !j
->binlog
) j
->binlog
= binlog_iref(last_binlog
);
393 while (to_write
> 0) {
394 written
= writev(binlog_fd
, vec
, vcnt
);
397 if (errno
== EAGAIN
) continue;
398 if (errno
== EINTR
) continue;
405 bytes_written
+= written
;
407 if (to_write
> 0 && written
> 0) {
408 for (vptr
= vec
; written
>= vptr
->iov_len
; vptr
++) {
409 written
-= vptr
->iov_len
;
412 vptr
->iov_base
= (char *) vptr
->iov_base
+ written
;
413 vptr
->iov_len
-= written
;
420 /* Returns the number of bytes successfully reserved: either 0 or n. */
422 binlog_reserve_space(size_t n
)
424 /* This value must be nonzero but is otherwise ignored. */
425 if (binlog_fd
< 0) return 1;
427 if (n
<= binlog_space
) {
429 binlog_reserved
+= n
;
433 if (n
<= next_binlog_space
) {
434 next_binlog_space
-= n
;
435 next_binlog_reserved
+= n
;
439 /* The next binlog is already allocated and it is full. */
440 if (next_binlog_fd
>= 0) return 0;
442 /* open a new binlog with more space to reserve */
443 next_binlog
= make_next_binlog();
444 if (!next_binlog
) return twarnx("error making next binlog"), 0;
445 next_binlog_fd
= binlog_open(next_binlog
);
447 /* open failed, so we can't reserve any space */
448 if (next_binlog_fd
< 0) return 0;
450 next_binlog_space
= binlog_size_limit
- bytes_written
- n
;
451 next_binlog_reserved
= n
;
456 /* Returns the number of bytes reserved. */
458 binlog_reserve_space_put(job j
)
462 /* reserve space for the initial job record */
464 z
+= strlen(j
->tube
->name
);
465 z
+= sizeof(struct job
);
468 /* plus space for a delete to come later */
470 z
+= sizeof(struct job
);
472 return binlog_reserve_space(z
);
476 binlog_reserve_space_update(job j
)
481 z
+= sizeof(struct job
);
482 return binlog_reserve_space(z
);
486 binlog_read(job binlog_jobs
)
488 int binlog_index_min
;
494 if (!binlog_dir
) return;
496 if (stat(binlog_dir
, &sbuf
) < 0) {
497 if (mkdir(binlog_dir
, 0700) < 0) return twarn("%s", binlog_dir
);
498 } else if (!(sbuf
.st_mode
& S_IFDIR
)) {
499 twarnx("%s", binlog_dir
);
503 binlog_index_min
= binlog_scan_dir();
505 if (binlog_index_min
) {
506 for (idx
= binlog_index_min
; idx
<= binlog_index
; idx
++) {
507 r
= snprintf(path
, PATH_MAX
, "%s/binlog.%d", binlog_dir
, idx
);
508 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
);
510 fd
= open(path
, O_RDONLY
);
515 b
= binlog_iref(add_binlog(make_binlog(path
)));
516 binlog_read_one(fd
, binlog_jobs
, path
);
532 r
= snprintf(path
, PATH_MAX
, "%s/lock", binlog_dir
);
533 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
), 0;
535 lock_fd
= open(path
, O_WRONLY
|O_CREAT
, 0600);
536 if (lock_fd
== -1) return twarn("open"), 0;
538 lock
.l_type
= F_WRLCK
;
539 lock
.l_whence
= SEEK_SET
;
542 r
= fcntl(lock_fd
, F_SETLK
, &lock
);
543 if (r
) return twarn("fcntl"), 0;
553 if (!binlog_dir
) return;
555 log
= make_next_binlog();
556 if (!log
) return twarnx("error making first binlog");
557 binlog_fd
= binlog_open(log
);
558 if (binlog_fd
>= 0) add_binlog(log
);
562 binlog_oldest_index()
564 if (!first_binlog
) return "0";
566 return strrchr(first_binlog
->path
, '.') + 1;
570 binlog_current_index()
572 if (!last_binlog
) return "0";
574 return strrchr(last_binlog
->path
, '.') + 1;