document the new binlog stats
[beanstalkd.git] / file.c
blob1ca91f90565eadcdba8a81ea3fe31b920c506af5
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <unistd.h>
5 #include <stdarg.h>
6 #include <sys/types.h>
7 #include <sys/stat.h>
8 #include <fcntl.h>
9 #include <errno.h>
10 #include <string.h>
11 #include "dat.h"
13 static void warnpos(File*, char*, ...);
14 static int readrec(File*, job, int*);
15 static int readfull(File*, void*, int, int*, char*);
18 void
19 fileincref(File *f)
21 if (!f) return;
22 f->refs++;
26 void
27 filedecref(File *f)
29 if (!f) return;
30 f->refs--;
31 if (f->refs < 1) {
32 walgc(f->w);
37 void
38 fileaddjob(File *f, job j)
40 job h;
42 h = &f->jlist;
43 if (!h->fprev) h->fprev = h;
44 j->file = f;
45 j->fprev = h->fprev;
46 j->fnext = h;
47 h->fprev->fnext = j;
48 h->fprev = j;
49 fileincref(f);
53 void
54 filermjob(File *f, job j)
56 if (!f) return;
57 if (f != j->file) return;
58 j->fnext->fprev = j->fprev;
59 j->fprev->fnext = j->fnext;
60 j->fnext = 0;
61 j->fprev = 0;
62 j->file = NULL;
63 f->w->alive -= j->walused;
64 j->walused = 0;
65 filedecref(f);
69 // Fileread reads jobs from f->path into list.
70 // It returns 0 on success, or 1 if any errors occurred.
71 int
72 fileread(File *f, job list)
74 int err = 0, v;
76 if (!readfull(f, &v, sizeof(v), &err, "version")) {
77 return err;
79 if (v != Walver) {
80 warnx("%s: wrong version: want %d, got %d", f->path, Walver, v);
81 return 1;
84 fileincref(f);
85 while (readrec(f, list, &err));
86 filedecref(f);
87 return err;
91 // Readrec reads a record from f->fd into linked list l.
92 // If an error occurs, it sets *err to 1.
93 // Readrec returns the number of records read, either 1 or 0.
94 static int
95 readrec(File *f, job l, int *err)
97 int r, sz = 0;
98 int namelen;
99 Jobrec jr;
100 job j;
101 tube t;
102 char tubename[MAX_TUBE_NAME_LEN];
104 r = read(f->fd, &namelen, sizeof(int));
105 if (r == -1) {
106 twarn("read");
107 warnpos(f, "error");
108 *err = 1;
109 return 0;
111 if (r != sizeof(int)) {
112 return 0;
114 sz += r;
115 if (namelen >= MAX_TUBE_NAME_LEN) {
116 warnpos(f, "namelen %d exceeds maximum of %d", namelen, MAX_TUBE_NAME_LEN - 1);
117 *err = 1;
118 return 0;
121 if (namelen) {
122 r = readfull(f, tubename, namelen, err, "tube name");
123 if (!r) {
124 return 0;
126 sz += r;
128 tubename[namelen] = '\0';
130 r = readfull(f, &jr, sizeof(Jobrec), err, "job struct");
131 if (!r) {
132 return 0;
134 sz += r;
136 // are we reading trailing zeroes?
137 if (!jr.id) return 0;
139 j = job_find(jr.id);
140 if (!(j || namelen)) {
141 // We read a short record without having seen a
142 // full record for this job, so the full record
143 // was in an eariler file that has been deleted.
144 // Therefore the job itself has either been
145 // deleted or migrated; either way, this record
146 // should be ignored.
147 return 1;
150 switch (jr.state) {
151 case Reserved:
152 jr.state = Ready;
153 case Ready:
154 case Buried:
155 case Delayed:
156 if (!j) {
157 t = tube_find_or_make(tubename);
158 j = make_job_with_id(jr.pri, jr.delay, jr.ttr, jr.body_size,
159 t, jr.id);
160 j->next = j->prev = j;
161 j->r.created_at = jr.created_at;
163 j->r = jr;
164 job_insert(l, j);
166 // full record; read the job body
167 if (namelen) {
168 if (jr.body_size != j->r.body_size) {
169 warnpos(f, "job %llu size changed", j->r.id);
170 warnpos(f, "was %zu, now %zu", j->r.body_size, jr.body_size);
171 goto Error;
173 r = readfull(f, j->body, j->r.body_size, err, "job body");
174 if (!r) {
175 goto Error;
177 sz += r;
179 // since this is a full record, we can move
180 // the file pointer and decref the old
181 // file, if any
182 filermjob(j->file, j);
183 fileaddjob(f, j);
185 j->walused += sz;
186 f->w->alive += sz;
188 return 1;
189 case Invalid:
190 if (j) {
191 job_remove(j);
192 filermjob(j->file, j);
193 job_free(j);
195 return 1;
198 Error:
199 *err = 1;
200 if (j) {
201 job_remove(j);
202 filermjob(j->file, j);
203 job_free(j);
205 return 0;
209 static int
210 readfull(File *f, void *c, int n, int *err, char *desc)
212 int r;
214 r = read(f->fd, c, n);
215 if (r == -1) {
216 twarn("read");
217 warnpos(f, "error reading %s", desc);
218 *err = 1;
219 return 0;
221 if (r != n) {
222 warnpos(f, "unexpected EOF reading %s", desc);
223 *err = 1;
224 return 0;
226 return r;
230 static void
231 warnpos(File *f, char *fmt, ...)
233 int off;
234 va_list ap;
236 off = lseek(f->fd, 0, SEEK_CUR);
237 fprintf(stderr, "%s:%u: ", f->path, off);
238 va_start(ap, fmt);
239 vfprintf(stderr, fmt, ap);
240 va_end(ap);
241 fputc('\n', stderr);
245 // Opens f for writing, writes a header, and initializes
246 // f->free and f->resv.
247 // Sets f->iswopen if successful.
248 void
249 filewopen(File *f)
251 int fd, r;
252 int n;
253 int ver = Walver;
255 fd = open(f->path, O_WRONLY|O_CREAT, 0400);
256 if (fd < 0) {
257 twarn("open %s", f->path);
258 return;
261 r = falloc(fd, f->w->filesz);
262 if (r) {
263 close(fd);
264 errno = r;
265 twarn("Cannot allocate space for file %s", f->path);
266 return;
269 n = write(fd, &ver, sizeof(int));
270 if (n < sizeof(int)) {
271 twarn("write %s", f->path);
272 close(fd);
273 return;
276 f->fd = fd;
277 f->iswopen = 1;
278 fileincref(f);
279 f->free = f->w->filesz - n;
280 f->resv = 0;
284 static int
285 filewrite(File *f, job j, void *buf, int len)
287 int r;
289 r = write(f->fd, buf, len);
290 if (r != len) {
291 twarn("write");
292 return 0;
295 f->w->resv -= r;
296 f->resv -= r;
297 j->walresv -= r;
298 j->walused += r;
299 f->w->alive += r;
300 return 1;
305 filewrjobshort(File *f, job j)
307 int r, nl;
309 nl = 0; // name len 0 indicates short record
310 r = filewrite(f, j, &nl, sizeof nl) &&
311 filewrite(f, j, &j->r, sizeof j->r);
312 if (!r) return 0;
314 if (j->r.state == Invalid) {
315 filermjob(j->file, j);
318 return r;
323 filewrjobfull(File *f, job j)
325 int nl;
327 fileaddjob(f, j);
328 nl = strlen(j->tube->name);
329 return
330 filewrite(f, j, &nl, sizeof nl) &&
331 filewrite(f, j, j->tube->name, nl) &&
332 filewrite(f, j, &j->r, sizeof j->r) &&
333 filewrite(f, j, j->body, j->r.body_size);
337 void
338 filewclose(File *f)
340 int r;
342 if (!f) return;
343 if (!f->iswopen) return;
344 if (f->free) {
345 // Some compilers give a warning if the return value of ftruncate is
346 // ignored. So we pretend to use it.
347 r = ftruncate(f->fd, f->w->filesz - f->free);
348 if (r == -1); // do nothing
350 close(f->fd);
351 f->iswopen = 0;
352 filedecref(f);
357 fileinit(File *f, Wal *w, int n)
359 f->w = w;
360 f->seq = n;
361 f->path = fmtalloc("%s/binlog.%d", w->dir, n);
362 return !!f->path;
366 // Adds f to the linked list in w,
367 // updating w->tail and w->head as necessary.
368 Wal*
369 fileadd(File *f, Wal *w)
371 if (w->tail) {
372 w->tail->next = f;
374 w->tail = f;
375 if (!w->head) {
376 w->head = f;
378 w->nfile++;
379 return w;