db_updater: Put parentheses back
[merlin.git] / binlog.c
blob252ee27b08a0b831912cee49ea40d323b602aa51
1 /**
2 * binary logging functions
3 */
5 #include <sys/types.h>
6 #include <sys/stat.h>
7 #include <unistd.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <errno.h>
11 #include <fcntl.h>
12 #include <stdio.h> /* for debugging only */
13 #include "binlog.h"
15 struct binlog_entry {
16 unsigned int size;
17 void *data;
19 typedef struct binlog_entry binlog_entry;
20 #define entry_size(entry) (entry->size + sizeof(struct binlog_entry))
22 struct binlog {
23 struct binlog_entry **cache;
24 unsigned int write_index, read_index, file_entries;
25 unsigned int alloc, max_mem_usage;
26 unsigned int mem_size, max_mem_size;
27 unsigned int mem_avail;
28 off_t max_file_size, file_size, file_read_pos, file_write_pos;
29 int is_valid;
30 char *path;
31 int fd;
34 #if 0
35 static char *base_path;
37 static int binlog_set_base_path(const char *path)
39 unsigned int len;
40 struct stat st;
41 int result = 0;
43 len = strlen(path);
45 if (stat(path, &st) < 0)
46 return BINLOG_ESTAT;
48 if (!S_ISDIR(st.st_mode))
49 return BINLOG_ENOTDIR;
51 if (st.st_mode & S_IROTH || st.st_mode & S_IWOTH)
52 result = BINLOG_UNSAFE;
54 if (base_path)
55 free(base_path);
57 base_path = malloc(len + 2);
58 memcpy(base_path, path, len);
59 if (path[len] != '/') {
60 base_path[len++] = '/';
62 base_path[len++] = '\0';
64 return result;
66 #endif
68 /*** private helpers ***/
69 static int safe_write(binlog *bl, void *buf, int len)
71 int result;
72 off_t pos;
74 pos = lseek(bl->fd, 0, SEEK_CUR);
75 if (pos != bl->file_size)
76 lseek(bl->fd, 0, SEEK_END);
77 result = write(bl->fd, buf, len);
79 if (result == len) {
80 bl->file_write_pos = lseek(bl->fd, 0, SEEK_CUR);
81 return 0;
83 if (result < 0)
84 return result;
86 /* partial write. this will mess up the sync */
87 if (pos != lseek(bl->fd, pos, SEEK_SET)) {
88 binlog_invalidate(bl);
89 return BINLOG_EINVALID;
92 return BINLOG_EINCOMPLETE;
95 /*** public api ***/
96 int binlog_is_valid(binlog *bl)
98 return bl->is_valid;
101 void binlog_invalidate(binlog *bl)
103 binlog_close(bl);
104 bl->is_valid = 0;
105 unlink(bl->path);
108 const char *binlog_path(binlog *bl)
110 return bl->path;
113 binlog *binlog_create(const char *path, unsigned int msize, unsigned int fsize, int flags)
115 binlog *bl;
117 /* can't have a max filesize without a path */
118 if (fsize && !path)
119 return NULL;
121 bl = calloc(1, sizeof(binlog));
122 if (!bl)
123 return NULL;
125 if (fsize && path) {
126 bl->path = strdup(path);
127 if (!bl->path) {
128 free(bl);
129 return NULL;
133 bl->fd = -1;
134 bl->max_mem_size = msize;
135 bl->max_file_size = fsize;
136 bl->is_valid = 1;
138 if (bl->path && (flags & BINLOG_UNLINK))
139 unlink(bl->path);
141 return bl;
144 void binlog_wipe(binlog *bl, int flags)
146 unsigned int max_mem_size, max_file_size;
147 char *path;
149 if (!bl)
150 return;
152 max_mem_size = bl->max_mem_size;
153 max_file_size = bl->max_file_size;
154 path = bl->path;
156 if (!(flags & BINLOG_UNLINK)) {
157 binlog_flush(bl);
160 binlog_close(bl);
162 if (!(flags & BINLOG_UNLINK) || bl->file_read_pos == bl->file_write_pos) {
163 unlink(bl->path);
166 if (bl->cache) {
167 unsigned int i;
169 for (i = 0; i < bl->write_index; i++) {
170 struct binlog_entry *entry = bl->cache[i];
172 if (!entry)
173 continue;
175 if (entry->data)
176 free(entry->data);
177 free(entry);
179 free(bl->cache);
182 memset(bl, 0, sizeof(*bl));
183 bl->max_mem_size = max_mem_size;
184 bl->max_file_size = max_file_size;
185 bl->path = path;
186 bl->is_valid = 1;
187 bl->fd = -1;
190 void binlog_destroy(binlog *bl, int flags)
192 if (!bl)
193 return;
195 binlog_wipe(bl, flags);
197 if (bl->path) {
198 free(bl->path);
199 bl->path = NULL;
202 free(bl);
205 static int binlog_file_read(binlog *bl, void **buf, unsigned int *len)
207 int result;
210 * if we're done reading the file fully, close and
211 * unlink it so we go back to using memory-based
212 * binlog when we're added to next
214 if (bl->file_read_pos >= bl->file_size) {
215 binlog_close(bl);
216 bl->file_read_pos = bl->file_write_pos = bl->file_size = 0;
217 bl->file_entries = 0;
218 unlink(bl->path);
219 return BINLOG_EMPTY;
222 lseek(bl->fd, bl->file_read_pos, SEEK_SET);
223 result = read(bl->fd, len, sizeof(*len));
224 if(result < 0)
225 return -1;
226 *buf = malloc(*len);
227 result = read(bl->fd, *buf, *len);
228 if(result < 0)
229 return -1;
230 bl->file_read_pos = lseek(bl->fd, 0, SEEK_CUR);
231 bl->file_entries--;
233 return 0;
236 static int binlog_mem_read(binlog *bl, void **buf, unsigned int *len)
238 if (!bl->cache || bl->read_index >= bl->write_index) {
239 bl->read_index = bl->write_index = 0;
240 return BINLOG_EMPTY;
243 if (!bl->cache[bl->read_index]) {
244 /* this might cause leaks! */
245 bl->read_index = bl->write_index = 0;
246 return BINLOG_EINVALID;
249 *buf = bl->cache[bl->read_index]->data;
250 *len = bl->cache[bl->read_index]->size;
251 bl->mem_avail -= *len;
253 /* free the entry and mark it as empty */
254 free(bl->cache[bl->read_index]);
255 bl->cache[bl->read_index] = NULL;
256 bl->read_index++;
259 * reset the read and write index in case we've read
260 * all entries. This lets us re-use the entry slot
261 * later and not just steadily increase the array
262 * size
264 if (bl->read_index >= bl->write_index) {
265 bl->read_index = bl->write_index = 0;
266 bl->mem_avail = 0;
269 return 0;
272 int binlog_read(binlog *bl, void **buf, unsigned int *len)
274 if (!bl || !buf || !len)
275 return BINLOG_EADDRESS;
277 /* don't let users read from an invalidated binlog */
278 if (!binlog_is_valid(bl)) {
279 return BINLOG_EINVALID;
283 * reading from memory must come first in order to
284 * maintain sequential ordering. Otherwise we'd
285 * have to flush memory-based entries before
286 * starting to write to file
288 if (!binlog_mem_read(bl, buf, len))
289 return 0;
291 return binlog_file_read(bl, buf, len);
295 * This is easy. We just reset file_read_pos to point to the start
296 * of the old entry and increment the file_entries counter.
298 static int binlog_file_unread(binlog *bl, unsigned int len)
300 bl->file_read_pos -= len;
301 bl->file_entries++;
302 return 0;
306 * This is also fairly straightforward. We basically just add the
307 * pointer to the previous entry in the memory list
309 static int binlog_mem_unread(binlog *bl, void *buf, unsigned int len)
311 binlog_entry *entry;
313 /* we can't restore items to an invalid binlog */
314 if (!bl || !bl->cache || !binlog_is_valid(bl))
315 return BINLOG_EDROPPED;
318 * In case the entire filebased backlog gets unread(),
319 * we can stash one and only one entry on memory stack,
320 * so return BINLOG_EDROPPED in case bl->read_index is
321 * already 0 and we've pushed bl->write_index to 1.
322 * This is because we have no way of finding out which
323 * slot the entry should have had, so further undo's
324 * will not be in sequential ordering.
326 if (bl->read_index == 0 && bl->write_index == 1)
327 return BINLOG_EDROPPED;
329 entry = malloc(sizeof(*entry));
330 if (!entry)
331 return BINLOG_EDROPPED;
333 bl->mem_avail += len;
334 entry->size = len;
335 entry->data = buf;
336 if (!bl->read_index) {
338 * first entry to be pushed back to memory after
339 * filebased binlog was emptied. Sequential messages
340 * can be added later, but we can't handle further
341 * undo's
343 bl->cache[bl->read_index] = entry;
344 bl->write_index = 1;
346 else {
347 bl->cache[--bl->read_index] = entry;
349 return 0;
352 int binlog_unread(binlog *bl, void *buf, unsigned int len)
354 if (!bl || !buf || !len) {
355 return BINLOG_EADDRESS;
359 * if the binlog is empty, adding the entry normally has the
360 * same effect as fiddling around with pointer manipulation
362 if (!binlog_num_entries(bl))
363 return binlog_add(bl, buf, len);
366 * if we've started reading from the file, the entry belongs
367 * there. If we haven't, it belongs on the memory stack
369 if (bl->file_read_pos >= (off_t)len)
370 binlog_file_unread(bl, len);
372 return binlog_mem_unread(bl, buf, len);
375 unsigned int binlog_num_entries(binlog *bl)
377 unsigned int entries = 0;
379 if (!bl)
380 return 0;
382 if (bl->file_size && bl->file_read_pos < bl->file_size)
383 entries = bl->file_entries;
384 if (bl->cache && bl->read_index < bl->write_index)
385 entries += bl->write_index - bl->read_index;
387 return entries;
390 static int binlog_open(binlog *bl)
392 int flags = O_RDWR | O_APPEND | O_CREAT;
394 if (bl->fd != -1)
395 return bl->fd;
397 if (!bl->path)
398 return BINLOG_ENOPATH;
400 if (!binlog_is_valid(bl)) {
401 bl->file_read_pos = bl->file_write_pos = bl->file_size = 0;
402 flags = O_RDWR | O_CREAT | O_TRUNC;
405 bl->fd = open(bl->path, flags, 0600);
406 if (bl->fd < 0)
407 return -1;
409 return 0;
412 static int binlog_grow(binlog *bl)
414 bl->alloc = ((bl->alloc + 16) * 3) / 2;
415 bl->cache = realloc(bl->cache, sizeof(binlog_entry *) * bl->alloc);
416 if (!bl->cache)
417 return -1;
419 return 0;
422 static int binlog_mem_add(binlog *bl, void *buf, unsigned int len)
424 binlog_entry *entry;
426 if (bl->write_index >= bl->alloc && binlog_grow(bl) < 0)
427 return BINLOG_EDROPPED;
429 entry = malloc(sizeof(*entry));
430 if (!entry)
431 return BINLOG_EDROPPED;
433 entry->data = malloc(len);
434 if (!entry->data) {
435 free(entry);
436 return BINLOG_EDROPPED;
439 entry->size = len;
440 memcpy(entry->data, buf, len);
441 bl->cache[bl->write_index++] = entry;
442 bl->mem_size += entry_size(entry);
443 bl->mem_avail += len;
445 return 0;
448 static int binlog_file_add(binlog *bl, void *buf, unsigned int len)
450 int ret;
452 /* bail out early if there's no room */
453 if (bl->file_size + (off_t)len > bl->max_file_size)
454 return BINLOG_ENOSPC;
456 ret = binlog_open(bl);
457 if (ret < 0)
458 return ret;
460 ret = safe_write(bl, &len, sizeof(len));
461 if (ret)
462 return ret;
463 ret = safe_write(bl, buf, len);
464 bl->file_size += len + sizeof(len);
465 bl->file_entries++;
467 return ret;
470 int binlog_add(binlog *bl, void *buf, unsigned int len)
472 if (!bl || !buf) {
473 return BINLOG_EADDRESS;
476 /* don't try to write to an invalid binlog */
477 if (!binlog_is_valid(bl)) {
478 return BINLOG_EINVALID;
482 * if we've started adding to the file, we must continue
483 * doing so in order to preserve the parsing order when
484 * reading the events
486 if (bl->fd == -1 && bl->mem_size + len < bl->max_mem_size) {
487 return binlog_mem_add(bl, buf, len);
490 return binlog_file_add(bl, buf, len);
493 int binlog_close(binlog *bl)
495 int ret = 0;
497 if (!bl)
498 return BINLOG_EADDRESS;
500 if (bl->fd != -1) {
501 ret = close(bl->fd);
502 bl->fd = -1;
505 return ret;
508 int binlog_flush(binlog *bl)
510 if (!bl)
511 return BINLOG_EADDRESS;
513 if (bl->cache) {
514 while (bl->read_index < bl->write_index) {
515 binlog_entry *entry = bl->cache[bl->read_index++];
516 binlog_file_add(bl, entry->data, entry->size);
517 free(entry->data);
518 free(entry);
520 free(bl->cache);
521 bl->cache = NULL;
523 bl->mem_size = bl->write_index = bl->read_index = bl->alloc = 0;
525 return 0;
528 unsigned int binlog_msize(binlog *bl)
530 return bl ? bl->mem_size : 0;
533 unsigned int binlog_fsize(binlog *bl)
535 return bl ? bl->file_size : 0;
538 unsigned int binlog_size(binlog *bl)
540 return binlog_fsize(bl) + binlog_msize(bl);
543 unsigned int binlog_available(binlog *bl)
545 if (!bl)
546 return 0;
548 return (bl->file_size - bl->file_read_pos) + bl->mem_avail;