document the new binlog stats
[beanstalkd.git] / dat.h
blob6c2b2bf3552aba9a5c46a4d1008086608a5a2607
1 typedef unsigned char uchar;
2 typedef uchar byte;
3 typedef unsigned int uint;
4 typedef int32_t int32;
5 typedef uint32_t uint32;
6 typedef int64_t int64;
7 typedef uint64_t uint64;
9 #define int8_t do_not_use_int8_t
10 #define uint8_t do_not_use_uint8_t
11 #define int32_t do_not_use_int32_t
12 #define uint32_t do_not_use_uint32_t
13 #define int64_t do_not_use_int64_t
14 #define uint64_t do_not_use_uint64_t
16 typedef struct ms *ms;
17 typedef struct job *job;
18 typedef struct tube *tube;
19 typedef struct conn *conn;
20 typedef struct Heap Heap;
21 typedef struct Jobrec Jobrec;
22 typedef struct File File;
23 typedef struct Socket Socket;
24 typedef struct Srv Srv;
25 typedef struct Wal Wal;
27 typedef void(*evh)(int, short, void *);
28 typedef void(*ms_event_fn)(ms a, void *item, size_t i);
29 typedef void(*Handle)(void*, int);
30 typedef int(*Less)(void*, void*);
31 typedef void(*Record)(void*, int);
33 #if _LP64
34 #define NUM_PRIMES 48
35 #else
36 #define NUM_PRIMES 19
37 #endif
39 #define MAX_TUBE_NAME_LEN 201
41 /* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
42 * MUST be enough to hold the longest possible command or reply line, which is
43 * currently "USING a{200}\r\n". */
44 #define LINE_BUF_SIZE 208
46 /* CONN_TYPE_* are bit masks */
47 #define CONN_TYPE_PRODUCER 1
48 #define CONN_TYPE_WORKER 2
49 #define CONN_TYPE_WAITING 4
51 #define min(a,b) ((a)<(b)?(a):(b))
53 #define twarn(fmt, args...) warn("%s:%d in %s: " fmt, \
54 __FILE__, __LINE__, __func__, ##args)
55 #define twarnx(fmt, args...) warnx("%s:%d in %s: " fmt, \
56 __FILE__, __LINE__, __func__, ##args)
58 #ifdef DEBUG
59 #define dbgprintf(fmt, args...) ((void) fprintf(stderr, fmt, ##args))
60 #else
61 #define dbgprintf(fmt, ...) ((void) 0)
62 #endif
64 #define URGENT_THRESHOLD 1024
65 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
67 extern const char version[];
69 struct stats {
70 uint urgent_ct;
71 uint waiting_ct;
72 uint buried_ct;
73 uint reserved_ct;
74 uint pause_ct;
75 uint64 total_jobs_ct;
79 struct Heap {
80 int cap;
81 int len;
82 void **data;
83 Less less;
84 Record rec;
86 int heapinsert(Heap *h, void *x);
87 void* heapremove(Heap *h, int k);
90 struct Socket {
91 int fd;
92 Handle f;
93 void *x;
94 int added;
97 void sockinit(Handle tick, void *x, int64 ns);
98 int sockwant(Socket *s, int rw);
99 void sockmain(void); // does not return
101 struct ms {
102 size_t used, cap, last;
103 void **items;
104 ms_event_fn oninsert, onremove;
107 enum
109 Walver = 7
112 enum // Jobrec.state
114 Invalid,
115 Ready,
116 Reserved,
117 Buried,
118 Delayed,
119 Copy
122 // if you modify this struct, you must increment Walver above
123 struct Jobrec {
124 uint64 id;
125 uint32 pri;
126 int64 delay;
127 int64 ttr;
128 int32 body_size;
129 int64 created_at;
130 int64 deadline_at;
131 uint32 reserve_ct;
132 uint32 timeout_ct;
133 uint32 release_ct;
134 uint32 bury_ct;
135 uint32 kick_ct;
136 byte state;
139 struct job {
140 Jobrec r; // persistent fields; these get written to the wal
142 /* bookeeping fields; these are in-memory only */
143 char pad[6];
144 tube tube;
145 job prev, next; /* linked list of jobs */
146 job ht_next; /* Next job in a hash table list */
147 size_t heap_index; /* where is this job in its current heap */
148 File *file;
149 job fnext;
150 job fprev;
151 void *reserver;
152 int walresv;
153 int walused;
155 char body[]; // written separately to the wal
158 struct tube {
159 uint refs;
160 char name[MAX_TUBE_NAME_LEN];
161 Heap ready;
162 Heap delay;
163 struct ms waiting; /* set of conns */
164 struct stats stat;
165 uint using_ct;
166 uint watching_ct;
167 int64 pause;
168 int64 deadline_at;
169 struct job buried;
172 struct conn {
173 conn prev, next; /* linked list of connections */
174 Srv *srv;
175 Socket sock;
176 char state;
177 char type;
178 int rw; // currently want: 'r' or 'w'
179 int pending_timeout;
180 int64 tickat; // time at which to do more work
181 int tickpos; // position in srv->conns
183 /* we cannot share this buffer with the reply line because we might read in
184 * command line data for a subsequent command, and we need to store it
185 * here. */
186 char cmd[LINE_BUF_SIZE]; /* this string is NOT NUL-terminated */
187 int cmd_len;
188 int cmd_read;
189 const char *reply;
190 int reply_len;
191 int reply_sent;
192 char reply_buf[LINE_BUF_SIZE]; /* this string IS NUL-terminated */
194 /* A job to be read from the client. */
195 job in_job;
197 /* Memoization of the soonest job */
198 job soonest_job;
200 /* How many bytes of in_job->body have been read so far. If in_job is NULL
201 * while in_job_read is nonzero, we are in bit bucket mode and
202 * in_job_read's meaning is inverted -- then it counts the bytes that
203 * remain to be thrown away. */
204 int in_job_read;
206 job out_job;
207 int out_job_sent;
208 tube use;
209 struct ms watch;
210 struct job reserved_jobs; /* doubly-linked list header */
214 void v(void);
216 void warn(const char *fmt, ...);
217 void warnx(const char *fmt, ...);
218 char* fmtalloc(char *fmt, ...);
219 void* zalloc(int n);
220 #define new(T) zalloc(sizeof(T))
222 extern const char *progname;
224 int64 nanoseconds(void);
225 int falloc(int fd, int len);
228 void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);
229 void ms_clear(ms a);
230 int ms_append(ms a, void *item);
231 int ms_remove(ms a, void *item);
232 int ms_contains(ms a, void *item);
233 void *ms_take(ms a);
236 #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
238 job allocate_job(int body_size);
239 job make_job_with_id(uint pri, int64 delay, int64 ttr,
240 int body_size, tube tube, uint64 id);
241 void job_free(job j);
243 /* Lookup a job by job ID */
244 job job_find(uint64 job_id);
246 /* the void* parameters are really job pointers */
247 void job_setheappos(void*, int);
248 int job_pri_less(void*, void*);
249 int job_delay_less(void*, void*);
251 job job_copy(job j);
253 const char * job_state(job j);
255 int job_list_any_p(job head);
256 job job_remove(job j);
257 void job_insert(job head, job j);
259 uint64 total_jobs(void);
261 /* for unit tests */
262 size_t get_all_jobs_used(void);
265 extern struct ms tubes;
267 tube make_tube(const char *name);
268 void tube_dref(tube t);
269 void tube_iref(tube t);
270 tube tube_find(const char *name);
271 tube tube_find_or_make(const char *name);
272 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
275 conn make_conn(int fd, char start_state, tube use, tube watch);
277 int connless(conn a, conn b);
278 void connrec(conn c, int i);
279 void connwant(conn c, const int mask, conn list);
280 void connsched(conn c);
282 void conn_close(conn c);
284 conn conn_remove(conn c);
285 void conn_insert(conn head, conn c);
287 int count_cur_conns(void);
288 uint count_tot_conns(void);
289 int count_cur_producers(void);
290 int count_cur_workers(void);
292 void conn_set_producer(conn c);
293 void conn_set_worker(conn c);
295 job soonest_job(conn c);
296 int has_reserved_this_job(conn c, job j);
297 int conn_has_close_deadline(conn c);
298 int conn_ready(conn c);
300 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
303 extern size_t primes[];
306 extern size_t job_data_size_limit;
308 void prot_init(void);
309 void prottick(Srv *s);
311 conn remove_waiting_conn(conn c);
313 void enqueue_reserved_jobs(conn c);
315 void enter_drain_mode(int sig);
316 void h_accept(const int fd, const short which, Srv* srv);
317 void prot_remove_tube(tube t);
318 void prot_replay(Srv *s, job list);
321 int make_server_socket(char *host_addr, char *port);
324 enum
326 Filesizedef = (10 << 20)
329 struct Wal {
330 int use;
331 char *dir;
332 File *head;
333 File *cur;
334 File *tail;
335 int nfile;
336 int next;
337 int filesz;
338 int resv; // bytes reserved
339 int alive; // bytes in use
340 int64 nmig; // migrations
341 int64 nrec; // records written ever
342 int wantsync;
343 int64 syncrate;
344 int64 lastsync;
345 int nocomp; // disable binlog compaction?
347 int waldirlock(Wal*);
348 void walinit(Wal*, job list);
349 int walwrite(Wal*, job);
350 void walmaint(Wal*);
351 int walresvput(Wal*, job);
352 int walresvupdate(Wal*, job);
353 void walgc(Wal*);
356 struct File {
357 File *next;
358 uint refs;
359 int seq;
360 int iswopen; // is open for writing
361 int fd;
362 int free;
363 int resv;
364 char *path;
365 Wal *w;
367 struct job jlist; // jobs written in this file
369 int fileinit(File*, Wal*, int);
370 Wal* fileadd(File*, Wal*);
371 void fileincref(File*);
372 void filedecref(File*);
373 void fileaddjob(File*, job);
374 void filermjob(File*, job);
375 int fileread(File*, job list);
376 void filewopen(File*);
377 void filewclose(File*);
378 int filewrjobshort(File*, job);
379 int filewrjobfull(File*, job);
382 struct Srv {
383 Socket sock;
384 Heap conns;
385 Wal wal;
387 void srv(Srv *srv);
388 void srvaccept(Srv *s, int ev);
389 void srvschedconn(Srv *srv, conn c);
390 void srvtick(Srv *s, int ev);