Fix formatting.
[beanstalkd.git] / binlog.c
blob2eed77d9ef89ca32e0a6e1c8b64ef7502816d2de
1 /* binlog.c - binary log implementation */
3 /* Copyright (C) 2008 Graham Barr
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23 #include <string.h>
24 #include <errno.h>
25 #include <dirent.h>
26 #include <sys/resource.h>
27 #include <sys/param.h>
28 #include <sys/uio.h>
29 #include <sys/stat.h>
30 #include <stdarg.h>
32 #include "tube.h"
33 #include "job.h"
34 #include "binlog.h"
35 #include "util.h"
36 #include "version.h"
38 /* max size we will create a log file */
39 size_t binlog_size_limit = 10 << 20;
41 char *binlog_dir = NULL;
42 static int binlog_index = 0;
43 static int binlog_fd = -1;
44 static int binlog_version = 1;
45 static size_t bytes_written;
47 struct binlog binlog_head;
49 static int
50 binlog_scan_dir()
52 DIR *dirp;
53 struct dirent *dp;
54 long min = 0;
55 long max = 0;
56 long val;
57 char *endptr;
58 size_t name_len;
60 dirp = opendir(binlog_dir);
61 if (!dirp) return 0;
63 while ((dp = readdir(dirp)) != NULL) {
64 name_len = strlen(dp->d_name);
65 if (name_len > 7 && !strncmp("binlog.", dp->d_name, 7)) {
66 val = strtol(dp->d_name + 7, &endptr, 10);
67 if (endptr && *endptr == 0) {
68 if (max == 0 || val > max) max = val;
69 if (min == 0 || val < min) min = val;
74 closedir(dirp);
75 binlog_index = (int) max;
76 return (int) min;
79 static void
80 binlog_remove(binlog b)
82 if (!b) return;
83 if (b->next == b) return; /* not in a doubly-linked list */
85 b->next->prev = b->prev;
86 b->prev->next = b->next;
88 unlink(b->path);
89 free(b);
92 static binlog
93 binlog_iref(binlog b)
95 if (b) b->refs++;
96 return b;
99 static void
100 binlog_dref(binlog b)
102 binlog bn;
104 if (!b) return;
105 if (b->refs < 1) return twarnx("refs is zero for binlog: %s", b->path);
107 --b->refs;
108 if (b->refs < 1) {
109 for (b = binlog_head.next; b != binlog_head.prev && b->refs == 0; b = bn) {
110 bn = b->next;
111 binlog_remove(b);
116 static void
117 binlog_replay(int fd, job binlog_jobs)
119 struct job js;
120 tube t;
121 job j;
122 char tubename[MAX_TUBE_NAME_LEN];
123 size_t namelen;
124 int version;
126 if (read(fd, &version, sizeof(version)) < sizeof(version)) {
127 return twarnx("oops");
129 if (version != binlog_version) {
130 return twarnx("binlog version mismatch %d %d", version, binlog_version);
133 while (read(fd, &namelen, sizeof(size_t)) == sizeof(size_t)) {
134 if (namelen > 0 && read(fd, tubename, namelen) != namelen) {
135 return twarnx("oops %x %d", namelen, (int)lseek(fd, SEEK_CUR, 0));
138 tubename[namelen] = '\0';
139 if (read(fd, &js, sizeof(struct job)) != sizeof(struct job)) {
140 return twarnx("oops");
143 j = job_find(js.id);
144 switch (js.state) {
145 case JOB_STATE_INVALID:
146 if (j) {
147 job_remove(j);
148 binlog_dref(j->binlog);
149 job_free(j);
150 j = NULL;
152 break;
153 case JOB_STATE_READY:
154 case JOB_STATE_DELAYED:
155 if (!j) {
156 t = tube_find_or_make(tubename);
157 j = make_job_with_id(js.pri, js.delay, js.ttr, js.body_size,
158 t, js.id);
159 j->next = j->prev = j;
160 j->binlog = binlog_iref(binlog_head.prev);
161 j->creation = js.creation;
162 job_insert(binlog_jobs, j);
163 if (read(fd, j->body, js.body_size) < js.body_size) {
164 twarnx("oops");
165 return;
168 break;
170 if (j) {
171 j->state = js.state;
172 j->deadline = js.deadline;
173 j->pri = js.pri;
174 j->delay = js.delay;
175 j->ttr = js.ttr;
176 j->timeout_ct = js.timeout_ct;
177 j->release_ct = js.release_ct;
178 j->bury_ct = js.bury_ct;
179 j->kick_ct = js.kick_ct;
184 void
185 binlog_close()
187 if (binlog_fd < 0) return;
188 close(binlog_fd);
189 binlog_fd = -1;
192 static binlog
193 add_binlog(char *path)
195 binlog b;
197 b = (binlog)malloc(sizeof(struct binlog) + strlen(path) + 1);
198 if (!b) return twarnx("OOM"), NULL;
199 strcpy(b->path, path);
200 b->refs = 0;
201 b->prev = binlog_head.prev;
202 b->next = &binlog_head;
203 b->prev->next = b;
204 binlog_head.prev = b;
205 return b;
208 static int
209 binlog_open()
211 char path[PATH_MAX];
212 binlog b;
213 int fd;
215 sprintf(path, "%s/binlog.%d", binlog_dir, ++binlog_index);
217 if (!add_binlog(path)) return -1;
218 fd = open(path, O_WRONLY | O_CREAT, 0400);
220 if (fd < 0) {
221 twarnx("Cannot open binlog %s", path);
222 return -1;
226 bytes_written = write(fd, &binlog_version, sizeof(int));
228 if (bytes_written < sizeof(int)) {
229 twarnx("Cannot write to binlog");
230 close(fd);
231 return -1;
234 return fd;
237 static void
238 binlog_open_next()
240 if (binlog_fd < 0) return;
241 close(binlog_fd);
242 binlog_fd = binlog_open();
245 void
246 binlog_write_job(job j)
248 size_t tube_namelen, to_write;
249 struct iovec vec[4], *vptr;
250 int vcnt = 3;
252 if (binlog_fd < 0) return;
253 tube_namelen = 0;
255 vec[0].iov_base = (char *) &tube_namelen;
256 vec[0].iov_len = sizeof(size_t);
257 to_write = sizeof(size_t);
259 vec[1].iov_base = j->tube->name;
260 vec[1].iov_len = 0;
262 /* we could save some bytes in the binlog file by only saving some parts of
263 * the job struct */
264 vec[2].iov_base = (char *) j;
265 vec[2].iov_len = sizeof(struct job);
266 to_write += sizeof(struct job);
268 if (j->state == JOB_STATE_READY || j->state == JOB_STATE_DELAYED) {
269 if (!j->binlog) {
270 tube_namelen = strlen(j->tube->name);
271 vec[1].iov_len = tube_namelen;
272 to_write += tube_namelen;
273 j->binlog = binlog_iref(binlog_head.prev);
274 vcnt = 4;
275 vec[3].iov_base = j->body;
276 vec[3].iov_len = j->body_size;
277 to_write += j->body_size;
279 } else if (j->state == JOB_STATE_INVALID) {
280 if (j->binlog) binlog_dref(j->binlog);
281 j->binlog = NULL;
284 if ((bytes_written + to_write) > binlog_size_limit) binlog_open_next();
285 if (binlog_fd < 0) return;
287 while (to_write > 0) {
288 size_t written = writev(binlog_fd, vec, vcnt);
290 if (written < 0) {
291 twarnx("Cannot write to binlog");
292 binlog_close();
293 return;
296 bytes_written += written;
297 to_write -= written;
298 if (to_write > 0 && written > 0) {
299 for (vptr = vec; written >= vptr->iov_len; vptr++) {
300 written -= vptr->iov_len;
301 vptr->iov_len = 0;
303 vptr->iov_base = (char *) vptr->iov_base + written;
304 vptr->iov_len -= written;
310 void
311 binlog_read(job binlog_jobs)
313 int binlog_index_min;
314 struct stat sbuf;
315 int fd, idx;
316 char path[PATH_MAX];
318 binlog_head.prev = binlog_head.next = &binlog_head;
320 if (!binlog_dir) return;
322 if (stat(binlog_dir, &sbuf) < 0) {
323 if (mkdir(binlog_dir, 0700) < 0) return twarnx("%s", binlog_dir);
324 } else if (!(sbuf.st_mode & S_IFDIR)) {
325 twarnx("%s", binlog_dir);
326 return;
329 binlog_index_min = binlog_scan_dir();
331 if (binlog_index_min) {
332 for (idx = binlog_index_min; idx <= binlog_index; idx++) {
333 sprintf(path, "%s/binlog.%d", binlog_dir, idx);
334 fd = open(path, O_RDONLY);
335 add_binlog(path);
337 if (fd < 0) {
338 twarnx("oops");
339 } else {
340 binlog_replay(fd, binlog_jobs);
341 close(fd);
348 void
349 binlog_init()
351 binlog_fd = binlog_open();