Rename this for consistency.
[beanstalkd.git] / binlog.c
blob13b095986dda5dcaa587e4de61b03d8520e33ac1
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <dirent.h>
26 #include <sys/resource.h>
27 #include <sys/param.h>
28 #include <sys/uio.h>
29 #include <sys/stat.h>
30 #include <stdarg.h>
32 #include "tube.h"
33 #include "job.h"
34 #include "binlog.h"
35 #include "util.h"
37 /* max size we will create a log file */
38 size_t binlog_size_limit = 10 << 20;
40 char *binlog_dir = NULL;
41 static int binlog_index = 0;
42 static int binlog_fd = -1;
43 static int binlog_version = 1;
44 static size_t bytes_written;
46 static binlog first_binlog = NULL, last_binlog = NULL;
48 static int
49 binlog_scan_dir()
51 DIR *dirp;
52 struct dirent *dp;
53 long min = 0;
54 long max = 0;
55 long val;
56 char *endptr;
57 size_t name_len;
59 dirp = opendir(binlog_dir);
60 if (!dirp) return 0;
62 while ((dp = readdir(dirp)) != NULL) {
63 name_len = strlen(dp->d_name);
64 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
65 val = strtol(dp->d_name + 7, &endptr, 10);
66 if (endptr && *endptr == 0) {
67 if (max == 0 || val > max) max = val;
68 if (min == 0 || val < min) min = val;
73 closedir(dirp);
74 binlog_index = (int) max;
75 return (int) min;
78 static void
79 binlog_remove_first()
81 binlog b = first_binlog;
83 if (!b) return;
85 first_binlog = b->next;
86 if (!first_binlog) last_binlog = NULL;
88 unlink(b->path);
89 free(b);
92 static binlog
93 binlog_iref(binlog b)
95 if (b) b->refs++;
96 return b;
99 static void
100 binlog_dref(binlog b)
102 if (!b) return;
103 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
105 --b->refs;
106 if (b->refs < 1) {
107 while (first_binlog && first_binlog->refs == 0) binlog_remove_first();
111 static void
112 binlog_replay(int fd, job binlog_jobs)
114 struct job js;
115 tube t;
116 job j;
117 char tubename[MAX_TUBE_NAME_LEN];
118 size_t namelen;
119 int version;
121 if (read(fd, &version, sizeof(version)) < sizeof(version)) {
122 return twarn("read()");
124 if (version != binlog_version) {
125 return twarnx("binlog version mismatch %d %d", version, binlog_version);
128 while (read(fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
129 if (namelen > 0 && read(fd, tubename, namelen) != namelen) {
130 return twarnx("oops %x %d", namelen, (int)lseek(fd, SEEK_CUR, 0));
133 tubename[namelen] = '\0';
134 if (read(fd, &js, sizeof(struct job)) != sizeof(struct job)) {
135 return twarn("read()");
138 j = job_find(js.id);
139 switch (js.state) {
140 case JOB_STATE_INVALID:
141 if (j) {
142 job_remove(j);
143 binlog_dref(j->binlog);
144 job_free(j);
145 j = NULL;
147 break;
148 case JOB_STATE_READY:
149 case JOB_STATE_DELAYED:
150 if (!j) {
151 t = tube_find_or_make(tubename);
152 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
153 t, js.id);
154 j->next = j->prev = j;
155 j->creation = js.creation;
156 job_insert(binlog_jobs, j);
157 if (read(fd, j->body, js.body_size) < js.body_size) {
158 twarn("read()");
159 return;
162 break;
164 if (j) {
165 j->state = js.state;
166 j->deadline = js.deadline;
167 j->pri = js.pri;
168 j->delay = js.delay;
169 j->ttr = js.ttr;
170 j->timeout_ct = js.timeout_ct;
171 j->release_ct = js.release_ct;
172 j->bury_ct = js.bury_ct;
173 j->kick_ct = js.kick_ct;
175 /* this is a complete record, so we can move the binlog ref */
176 if (namelen && js.body_size) {
177 binlog_dref(j->binlog);
178 j->binlog = binlog_iref(last_binlog);
184 void
185 binlog_close()
187 if (binlog_fd < 0) return;
188 close(binlog_fd);
189 binlog_dref(last_binlog);
190 binlog_fd = -1;
193 static binlog
194 add_binlog(char *path)
196 binlog b;
198 b = (binlog)malloc(sizeof(struct binlog) + strlen(path) + 1);
199 if (!b) return twarnx("OOM"), NULL;
200 strcpy(b->path, path);
201 b->refs = 0;
202 b->next = NULL;
203 if (last_binlog) last_binlog->next = b;
204 last_binlog = b;
205 if (!first_binlog) first_binlog = b;
207 return b;
210 static int
211 binlog_open()
213 char path[PATH_MAX];
214 binlog b;
215 int fd, r;
217 if (!binlog_dir) return -1;
218 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
219 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), -1;
221 if (!binlog_iref(add_binlog(path))) return -1;
222 fd = open(path, O_WRONLY | O_CREAT, 0400);
224 if (fd < 0) {
225 twarn("Cannot open binlog %s", path);
226 return -1;
230 bytes_written = write(fd, &binlog_version, sizeof(int));
232 if (bytes_written < sizeof(int)) {
233 twarn("Cannot write to binlog");
234 close(fd);
235 binlog_dref(last_binlog);
236 return -1;
239 return fd;
242 static void
243 binlog_open_next()
245 if (binlog_fd < 0) return;
246 close(binlog_fd);
247 binlog_dref(last_binlog);
248 binlog_fd = binlog_open();
251 void
252 binlog_write_job(job j)
254 size_t tube_namelen, to_write;
255 struct iovec vec[4], *vptr;
256 int vcnt = 3;
258 if (binlog_fd < 0) return;
259 tube_namelen = 0;
261 vec[0].iov_base = (char *) &tube_namelen;
262 vec[0].iov_len = sizeof(size_t);
263 to_write = sizeof(size_t);
265 vec[1].iov_base = j->tube->name;
266 vec[1].iov_len = 0;
268 /* we could save some bytes in the binlog file by only saving some parts of
269 * the job struct */
270 vec[2].iov_base = (char *) j;
271 vec[2].iov_len = sizeof(struct job);
272 to_write += sizeof(struct job);
274 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
275 if (!j->binlog) {
276 tube_namelen = strlen(j->tube->name);
277 vec[1].iov_len = tube_namelen;
278 to_write += tube_namelen;
279 vcnt = 4;
280 vec[3].iov_base = j->body;
281 vec[3].iov_len = j->body_size;
282 to_write += j->body_size;
284 } else if (j->state == JOB_STATE_INVALID) {
285 if (j->binlog) binlog_dref(j->binlog);
286 j->binlog = NULL;
287 } else {
288 return twarnx("unserializable job state: %d", j->state);
291 if ((bytes_written + to_write) > binlog_size_limit) binlog_open_next();
292 if (binlog_fd < 0) return;
294 if (j->state && !j->binlog) j->binlog = binlog_iref(last_binlog);
296 while (to_write > 0) {
297 size_t written = writev(binlog_fd, vec, vcnt);
299 if (written < 0) {
300 twarn("Cannot write to binlog");
301 binlog_close();
302 return;
305 bytes_written += written;
306 to_write -= written;
307 if (to_write > 0 && written > 0) {
308 for (vptr = vec; written >= vptr->iov_len; vptr++) {
309 written -= vptr->iov_len;
310 vptr->iov_len = 0;
312 vptr->iov_base = (char *) vptr->iov_base + written;
313 vptr->iov_len -= written;
319 void
320 binlog_read(job binlog_jobs)
322 int binlog_index_min;
323 struct stat sbuf;
324 int fd, idx, r;
325 char path[PATH_MAX];
326 binlog b;
328 if (!binlog_dir) return;
330 if (stat(binlog_dir, &sbuf) < 0) {
331 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
332 } else if (!(sbuf.st_mode & S_IFDIR)) {
333 twarnx("%s", binlog_dir);
334 return;
337 binlog_index_min = binlog_scan_dir();
339 if (binlog_index_min) {
340 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
341 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
342 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
344 fd = open(path, O_RDONLY);
346 if (fd < 0) {
347 twarn("%s", path);
348 } else {
349 b = binlog_iref(add_binlog(path));
350 binlog_replay(fd, binlog_jobs);
351 close(fd);
352 binlog_dref(b);
359 void
360 binlog_init()
362 binlog_fd = binlog_open();