Refer to the correct binlog.
[beanstalkd.git] / binlog.c
blobd8351726f4b7eefec59ba7b7c63f40a008d7bcc8
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <dirent.h>
26 #include <sys/resource.h>
27 #include <sys/param.h>
28 #include <sys/uio.h>
29 #include <sys/stat.h>
30 #include <stdarg.h>
32 #include "tube.h"
33 #include "job.h"
34 #include "binlog.h"
35 #include "util.h"
36 #include "version.h"
38 /* max size we will create a log file */
39 size_t binlog_size_limit = 10 << 20;
41 char *binlog_dir = NULL;
42 static int binlog_index = 0;
43 static int binlog_fd = -1;
44 static int binlog_version = 1;
45 static size_t bytes_written;
47 static binlog first_binlog = NULL, last_binlog = NULL;
49 static int
50 binlog_scan_dir()
52 DIR *dirp;
53 struct dirent *dp;
54 long min = 0;
55 long max = 0;
56 long val;
57 char *endptr;
58 size_t name_len;
60 dirp = opendir(binlog_dir);
61 if (!dirp) return 0;
63 while ((dp = readdir(dirp)) != NULL) {
64 name_len = strlen(dp->d_name);
65 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
66 val = strtol(dp->d_name + 7, &endptr, 10);
67 if (endptr && *endptr == 0) {
68 if (max == 0 || val > max) max = val;
69 if (min == 0 || val < min) min = val;
74 closedir(dirp);
75 binlog_index = (int) max;
76 return (int) min;
79 static void
80 binlog_remove_first()
82 binlog b = first_binlog;
84 if (!b) return;
86 first_binlog = b->next;
87 if (!first_binlog) last_binlog = NULL;
89 unlink(b->path);
90 free(b);
93 static binlog
94 binlog_iref(binlog b)
96 if (b) b->refs++;
97 return b;
100 static void
101 binlog_dref(binlog b)
103 if (!b) return;
104 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
106 --b->refs;
107 if (b->refs < 1) {
108 while (first_binlog && first_binlog->refs == 0) binlog_remove_first();
112 static void
113 binlog_replay(int fd, job binlog_jobs)
115 struct job js;
116 tube t;
117 job j;
118 char tubename[MAX_TUBE_NAME_LEN];
119 size_t namelen;
120 int version;
122 if (read(fd, &version, sizeof(version)) < sizeof(version)) {
123 return twarn("read()");
125 if (version != binlog_version) {
126 return twarnx("binlog version mismatch %d %d", version, binlog_version);
129 while (read(fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
130 if (namelen > 0 && read(fd, tubename, namelen) != namelen) {
131 return twarnx("oops %x %d", namelen, (int)lseek(fd, SEEK_CUR, 0));
134 tubename[namelen] = '\0';
135 if (read(fd, &js, sizeof(struct job)) != sizeof(struct job)) {
136 return twarn("read()");
139 j = job_find(js.id);
140 switch (js.state) {
141 case JOB_STATE_INVALID:
142 if (j) {
143 job_remove(j);
144 binlog_dref(j->binlog);
145 job_free(j);
146 j = NULL;
148 break;
149 case JOB_STATE_READY:
150 case JOB_STATE_DELAYED:
151 if (!j) {
152 t = tube_find_or_make(tubename);
153 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
154 t, js.id);
155 j->next = j->prev = j;
156 j->creation = js.creation;
157 job_insert(binlog_jobs, j);
158 if (read(fd, j->body, js.body_size) < js.body_size) {
159 twarn("read()");
160 return;
163 break;
165 if (j) {
166 j->state = js.state;
167 j->deadline = js.deadline;
168 j->pri = js.pri;
169 j->delay = js.delay;
170 j->ttr = js.ttr;
171 j->timeout_ct = js.timeout_ct;
172 j->release_ct = js.release_ct;
173 j->bury_ct = js.bury_ct;
174 j->kick_ct = js.kick_ct;
176 /* this is a complete record, so we can move the binlog ref */
177 if (namelen && js.body_size) {
178 binlog_dref(j->binlog);
179 j->binlog = binlog_iref(last_binlog);
185 void
186 binlog_close()
188 if (binlog_fd < 0) return;
189 close(binlog_fd);
190 binlog_dref(last_binlog);
191 binlog_fd = -1;
194 static binlog
195 add_binlog(char *path)
197 binlog b;
199 b = (binlog)malloc(sizeof(struct binlog) + strlen(path) + 1);
200 if (!b) return twarnx("OOM"), NULL;
201 strcpy(b->path, path);
202 b->refs = 0;
203 b->next = NULL;
204 if (last_binlog) last_binlog->next = b;
205 last_binlog = b;
206 if (!first_binlog) first_binlog = b;
208 return b;
211 static int
212 binlog_open()
214 char path[PATH_MAX];
215 binlog b;
216 int fd, r;
218 if (!binlog_dir) return -1;
219 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, ++binlog_index);
220 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir), -1;
222 if (!binlog_iref(add_binlog(path))) return -1;
223 fd = open(path, O_WRONLY | O_CREAT, 0400);
225 if (fd < 0) {
226 twarn("Cannot open binlog %s", path);
227 return -1;
231 bytes_written = write(fd, &binlog_version, sizeof(int));
233 if (bytes_written < sizeof(int)) {
234 twarn("Cannot write to binlog");
235 close(fd);
236 binlog_dref(last_binlog);
237 return -1;
240 return fd;
243 static void
244 binlog_open_next()
246 if (binlog_fd < 0) return;
247 close(binlog_fd);
248 binlog_dref(last_binlog);
249 binlog_fd = binlog_open();
252 void
253 binlog_write_job(job j)
255 size_t tube_namelen, to_write;
256 struct iovec vec[4], *vptr;
257 int vcnt = 3;
259 if (binlog_fd < 0) return;
260 tube_namelen = 0;
262 vec[0].iov_base = (char *) &tube_namelen;
263 vec[0].iov_len = sizeof(size_t);
264 to_write = sizeof(size_t);
266 vec[1].iov_base = j->tube->name;
267 vec[1].iov_len = 0;
269 /* we could save some bytes in the binlog file by only saving some parts of
270 * the job struct */
271 vec[2].iov_base = (char *) j;
272 vec[2].iov_len = sizeof(struct job);
273 to_write += sizeof(struct job);
275 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
276 if (!j->binlog) {
277 tube_namelen = strlen(j->tube->name);
278 vec[1].iov_len = tube_namelen;
279 to_write += tube_namelen;
280 vcnt = 4;
281 vec[3].iov_base = j->body;
282 vec[3].iov_len = j->body_size;
283 to_write += j->body_size;
285 } else if (j->state == JOB_STATE_INVALID) {
286 if (j->binlog) binlog_dref(j->binlog);
287 j->binlog = NULL;
288 } else {
289 return twarnx("unserializable job state: %d", j->state);
292 if ((bytes_written + to_write) > binlog_size_limit) binlog_open_next();
293 if (binlog_fd < 0) return;
295 if (j->state && !j->binlog) j->binlog = binlog_iref(last_binlog);
297 while (to_write > 0) {
298 size_t written = writev(binlog_fd, vec, vcnt);
300 if (written < 0) {
301 twarn("Cannot write to binlog");
302 binlog_close();
303 return;
306 bytes_written += written;
307 to_write -= written;
308 if (to_write > 0 && written > 0) {
309 for (vptr = vec; written >= vptr->iov_len; vptr++) {
310 written -= vptr->iov_len;
311 vptr->iov_len = 0;
313 vptr->iov_base = (char *) vptr->iov_base + written;
314 vptr->iov_len -= written;
320 void
321 binlog_read(job binlog_jobs)
323 int binlog_index_min;
324 struct stat sbuf;
325 int fd, idx, r;
326 char path[PATH_MAX];
327 binlog b;
329 if (!binlog_dir) return;
331 if (stat(binlog_dir, &sbuf) < 0) {
332 if (mkdir(binlog_dir, 0700) < 0) return twarn("%s", binlog_dir);
333 } else if (!(sbuf.st_mode & S_IFDIR)) {
334 twarnx("%s", binlog_dir);
335 return;
338 binlog_index_min = binlog_scan_dir();
340 if (binlog_index_min) {
341 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
342 r = snprintf(path, PATH_MAX, "%s/binlog.%d", binlog_dir, idx);
343 if (r > PATH_MAX) return twarnx("path too long: %s", binlog_dir);
345 fd = open(path, O_RDONLY);
347 if (fd < 0) {
348 twarn("%s", path);
349 } else {
350 b = binlog_iref(add_binlog(path));
351 binlog_replay(fd, binlog_jobs);
352 close(fd);
353 binlog_dref(b);
360 void
361 binlog_init()
363 binlog_fd = binlog_open();