1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
26 #include <sys/resource.h>
27 #include <sys/param.h>
38 /* max size we will create a log file */
39 size_t binlog_size_limit
= 10 << 20;
41 char *binlog_dir
= NULL
;
42 static int binlog_index
= 0;
43 static int binlog_fd
= -1;
44 static int binlog_version
= 1;
45 static size_t bytes_written
;
48 static binlog first_binlog
= NULL
, last_binlog
= NULL
;
61 dirp
= opendir(binlog_dir
);
64 while ((dp
= readdir(dirp
)) != NULL
) {
65 name_len
= strlen(dp
->d_name
);
66 if (name_len
> 7 && !strncmp("binlog.", dp
->d_name
, 7)) {
67 val
= strtol(dp
->d_name
+ 7, &endptr
, 10);
68 if (endptr
&& *endptr
== 0) {
69 if (max
== 0 || val
> max
) max
= val
;
70 if (min
== 0 || val
< min
) min
= val
;
76 binlog_index
= (int) max
;
83 binlog b
= first_binlog
;
87 first_binlog
= b
->next
;
88 if (!first_binlog
) last_binlog
= NULL
;
102 binlog_dref(binlog b
)
105 if (b
->refs
< 1) return twarnx("refs is zero for binlog: %s", b
->path
);
109 while (first_binlog
&& first_binlog
->refs
== 0) binlog_remove_first();
114 binlog_warn(const char *msg
, int fd
, const char* path
)
116 warnx("WARNING, %s at %s:%u.\n%s", msg
, path
, lseek(fd
, 0, SEEK_CUR
),
117 " Continuing with next file. You may be missing data.");
121 binlog_read_one(int fd
, job binlog_jobs
, const char *path
)
126 char tubename
[MAX_TUBE_NAME_LEN
];
131 r
= read(fd
, &version
, sizeof(version
));
132 if (r
== -1) return twarn("read()");
133 if (r
< sizeof(version
)) {
134 return binlog_warn("version record is too short", fd
, path
);
137 if (version
!= binlog_version
) {
138 return warnx("%s: binlog version mismatch %d %d", path
, version
,
142 while (read(fd
, &namelen
, sizeof(size_t)) == sizeof(size_t)) {
144 r
= read(fd
, tubename
, namelen
);
145 if (r
== -1) return twarn("read()");
147 lseek(fd
, SEEK_CUR
, 0);
148 return binlog_warn("tube name is too short", fd
, path
);
152 tubename
[namelen
] = '\0';
153 r
= read(fd
, &js
, sizeof(struct job
));
154 if (r
== -1) return twarn("read()");
155 if (r
< sizeof(struct job
)) {
156 return binlog_warn("job record is too short", fd
, path
);
161 case JOB_STATE_INVALID
:
164 binlog_dref(j
->binlog
);
169 case JOB_STATE_READY
:
170 case JOB_STATE_DELAYED
:
172 t
= tube_find_or_make(tubename
);
173 j
= make_job_with_id(js
.pri
, js
.delay
, js
.ttr
, js
.body_size
,
175 j
->next
= j
->prev
= j
;
176 j
->creation
= js
.creation
;
177 job_insert(binlog_jobs
, j
);
178 r
= read(fd
, j
->body
, js
.body_size
);
179 if (r
== -1) return twarn("read()");
180 if (r
< js
.body_size
) {
181 warnx("dropping incomplete job %llu", j
->id
);
183 binlog_dref(j
->binlog
);
185 return binlog_warn("job body is too short", fd
, path
);
192 j
->deadline
= js
.deadline
;
196 j
->timeout_ct
= js
.timeout_ct
;
197 j
->release_ct
= js
.release_ct
;
198 j
->bury_ct
= js
.bury_ct
;
199 j
->kick_ct
= js
.kick_ct
;
201 /* this is a complete record, so we can move the binlog ref */
202 if (namelen
&& js
.body_size
) {
203 binlog_dref(j
->binlog
);
204 j
->binlog
= binlog_iref(last_binlog
);
213 if (binlog_fd
< 0) return;
215 binlog_dref(last_binlog
);
220 add_binlog(char *path
)
224 b
= (binlog
)malloc(sizeof(struct binlog
) + strlen(path
) + 1);
225 if (!b
) return twarnx("OOM"), NULL
;
226 strcpy(b
->path
, path
);
229 if (last_binlog
) last_binlog
->next
= b
;
231 if (!first_binlog
) first_binlog
= b
;
243 if (!binlog_dir
) return -1;
244 r
= snprintf(path
, PATH_MAX
, "%s/binlog.%d", binlog_dir
, ++binlog_index
);
245 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
), -1;
247 if (!binlog_iref(add_binlog(path
))) return -1;
248 fd
= open(path
, O_WRONLY
| O_CREAT
, 0400);
251 twarn("Cannot open binlog %s", path
);
256 bytes_written
= write(fd
, &binlog_version
, sizeof(int));
258 if (bytes_written
< sizeof(int)) {
259 twarn("Cannot write to binlog");
261 binlog_dref(last_binlog
);
271 if (binlog_fd
< 0) return;
273 binlog_dref(last_binlog
);
274 binlog_fd
= binlog_open();
278 binlog_write_job(job j
)
280 size_t tube_namelen
, to_write
;
281 struct iovec vec
[4], *vptr
;
284 if (binlog_fd
< 0) return;
287 vec
[0].iov_base
= (char *) &tube_namelen
;
288 vec
[0].iov_len
= sizeof(size_t);
289 to_write
= sizeof(size_t);
291 vec
[1].iov_base
= j
->tube
->name
;
294 /* we could save some bytes in the binlog file by only saving some parts of
296 vec
[2].iov_base
= (char *) j
;
297 vec
[2].iov_len
= sizeof(struct job
);
298 to_write
+= sizeof(struct job
);
300 if (j
->state
== JOB_STATE_READY
|| j
->state
== JOB_STATE_DELAYED
) {
302 tube_namelen
= strlen(j
->tube
->name
);
303 vec
[1].iov_len
= tube_namelen
;
304 to_write
+= tube_namelen
;
306 vec
[3].iov_base
= j
->body
;
307 vec
[3].iov_len
= j
->body_size
;
308 to_write
+= j
->body_size
;
310 } else if (j
->state
== JOB_STATE_INVALID
) {
311 if (j
->binlog
) binlog_dref(j
->binlog
);
314 return twarnx("unserializable job state: %d", j
->state
);
317 if ((bytes_written
+ to_write
) > binlog_size_limit
) binlog_open_next();
318 if (binlog_fd
< 0) return;
320 if (j
->state
&& !j
->binlog
) j
->binlog
= binlog_iref(last_binlog
);
322 while (to_write
> 0) {
323 size_t written
= writev(binlog_fd
, vec
, vcnt
);
326 twarn("Cannot write to binlog");
331 bytes_written
+= written
;
333 if (to_write
> 0 && written
> 0) {
334 for (vptr
= vec
; written
>= vptr
->iov_len
; vptr
++) {
335 written
-= vptr
->iov_len
;
338 vptr
->iov_base
= (char *) vptr
->iov_base
+ written
;
339 vptr
->iov_len
-= written
;
346 binlog_read(job binlog_jobs
)
348 int binlog_index_min
;
354 if (!binlog_dir
) return;
356 if (stat(binlog_dir
, &sbuf
) < 0) {
357 if (mkdir(binlog_dir
, 0700) < 0) return twarn("%s", binlog_dir
);
358 } else if (!(sbuf
.st_mode
& S_IFDIR
)) {
359 twarnx("%s", binlog_dir
);
363 binlog_index_min
= binlog_scan_dir();
365 if (binlog_index_min
) {
366 for (idx
= binlog_index_min
; idx
<= binlog_index
; idx
++) {
367 r
= snprintf(path
, PATH_MAX
, "%s/binlog.%d", binlog_dir
, idx
);
368 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
);
370 fd
= open(path
, O_RDONLY
);
375 b
= binlog_iref(add_binlog(path
));
376 binlog_read_one(fd
, binlog_jobs
, path
);
392 r
= snprintf(path
, PATH_MAX
, "%s/lock", binlog_dir
);
393 if (r
> PATH_MAX
) return twarnx("path too long: %s", binlog_dir
), 0;
395 lock_fd
= open(path
, O_WRONLY
|O_CREAT
, 0600);
396 if (lock_fd
== -1) return twarn("open"), 0;
398 lock
.l_type
= F_WRLCK
;
399 lock
.l_whence
= SEEK_SET
;
402 r
= fcntl(lock_fd
, F_SETLK
, &lock
);
403 if (r
) return twarn("fcntl"), 0;
411 binlog_fd
= binlog_open();
415 binlog_oldest_index()
417 if (!first_binlog
) return "0";
419 return strrchr(first_binlog
->path
, '.') + 1;
423 binlog_current_index()
425 if (!last_binlog
) return "0";
427 return strrchr(last_binlog
->path
, '.') + 1;