Increase LINE_BUF_SIZE to handle longest possible command
[beanstalkd.git] / dat.h
blob6d36d799db43e4f1cf05be1f7f4a1f6d9ffb137e
1 // Requirements:
2 // #include <stdint.h>
3 // #include <stdlib.h>
5 typedef unsigned char uchar;
6 typedef uchar byte;
7 typedef unsigned int uint;
8 typedef int32_t int32;
9 typedef uint32_t uint32;
10 typedef int64_t int64;
11 typedef uint64_t uint64;
13 #define int8_t do_not_use_int8_t
14 #define uint8_t do_not_use_uint8_t
15 #define int32_t do_not_use_int32_t
16 #define uint32_t do_not_use_uint32_t
17 #define int64_t do_not_use_int64_t
18 #define uint64_t do_not_use_uint64_t
20 typedef struct ms *ms;
21 typedef struct job *job;
22 typedef struct tube *tube;
23 typedef struct Conn Conn;
24 typedef struct Heap Heap;
25 typedef struct Jobrec Jobrec;
26 typedef struct File File;
27 typedef struct Socket Socket;
28 typedef struct Server Server;
29 typedef struct Wal Wal;
31 typedef void(*ms_event_fn)(ms a, void *item, size_t i);
32 typedef void(*Handle)(void*, int rw);
33 typedef int(*Less)(void*, void*);
34 typedef void(*Record)(void*, int);
35 typedef int(FAlloc)(int, int);
37 #if _LP64
38 #define NUM_PRIMES 48
39 #else
40 #define NUM_PRIMES 19
41 #endif
43 #define MAX_TUBE_NAME_LEN 201
45 /* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
46 * MUST be enough to hold the longest possible command ("pause-tube a{200} 4294967295\r\n")
47 * or reply line ("USING a{200}\r\n"). */
48 #define LINE_BUF_SIZE 224
50 /* CONN_TYPE_* are bit masks */
51 #define CONN_TYPE_PRODUCER 1
52 #define CONN_TYPE_WORKER 2
53 #define CONN_TYPE_WAITING 4
55 #define min(a,b) ((a)<(b)?(a):(b))
57 #define URGENT_THRESHOLD 1024
58 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
60 extern const char version[];
61 extern int verbose;
62 extern struct Server srv;
64 // Replaced by tests to simulate failures.
65 extern FAlloc *falloc;
67 struct stats {
68 uint urgent_ct;
69 uint waiting_ct;
70 uint buried_ct;
71 uint reserved_ct;
72 uint pause_ct;
73 uint64 total_delete_ct;
74 uint64 total_jobs_ct;
78 struct Heap {
79 int cap;
80 int len;
81 void **data;
82 Less less;
83 Record rec;
85 int heapinsert(Heap *h, void *x);
86 void* heapremove(Heap *h, int k);
89 struct Socket {
90 int fd;
91 Handle f;
92 void *x;
93 int added;
95 int sockinit(void);
96 int sockwant(Socket*, int);
97 int socknext(Socket**, int64);
99 struct ms {
100 size_t used, cap, last;
101 void **items;
102 ms_event_fn oninsert, onremove;
105 enum
107 Walver = 7
110 enum // Jobrec.state
112 Invalid,
113 Ready,
114 Reserved,
115 Buried,
116 Delayed,
117 Copy
120 // if you modify this struct, you must increment Walver above
121 struct Jobrec {
122 uint64 id;
123 uint32 pri;
124 int64 delay;
125 int64 ttr;
126 int32 body_size;
127 int64 created_at;
128 int64 deadline_at;
129 uint32 reserve_ct;
130 uint32 timeout_ct;
131 uint32 release_ct;
132 uint32 bury_ct;
133 uint32 kick_ct;
134 byte state;
137 struct job {
138 Jobrec r; // persistent fields; these get written to the wal
140 /* bookeeping fields; these are in-memory only */
141 char pad[6];
142 tube tube;
143 job prev, next; /* linked list of jobs */
144 job ht_next; /* Next job in a hash table list */
145 size_t heap_index; /* where is this job in its current heap */
146 File *file;
147 job fnext;
148 job fprev;
149 void *reserver;
150 int walresv;
151 int walused;
153 char body[]; // written separately to the wal
156 struct tube {
157 uint refs;
158 char name[MAX_TUBE_NAME_LEN];
159 Heap ready;
160 Heap delay;
161 struct ms waiting; /* set of conns */
162 struct stats stat;
163 uint using_ct;
164 uint watching_ct;
165 int64 pause;
166 int64 deadline_at;
167 struct job buried;
171 #define twarn(fmt, args...) warn("%s:%d in %s: " fmt, \
172 __FILE__, __LINE__, __func__, ##args)
173 #define twarnx(fmt, args...) warnx("%s:%d in %s: " fmt, \
174 __FILE__, __LINE__, __func__, ##args)
176 void warn(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
177 void warnx(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
178 char* fmtalloc(char *fmt, ...) __attribute__((format(printf, 1, 2)));
179 void* zalloc(int n);
180 #define new(T) zalloc(sizeof(T))
181 void optparse(Server*, char**);
183 extern const char *progname;
185 int64 nanoseconds(void);
186 int rawfalloc(int fd, int len);
189 void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);
190 void ms_clear(ms a);
191 int ms_append(ms a, void *item);
192 int ms_remove(ms a, void *item);
193 int ms_contains(ms a, void *item);
194 void *ms_take(ms a);
197 #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
199 job allocate_job(int body_size);
200 job make_job_with_id(uint pri, int64 delay, int64 ttr,
201 int body_size, tube tube, uint64 id);
202 void job_free(job j);
204 /* Lookup a job by job ID */
205 job job_find(uint64 job_id);
207 /* the void* parameters are really job pointers */
208 void job_setheappos(void*, int);
209 int job_pri_less(void*, void*);
210 int job_delay_less(void*, void*);
212 job job_copy(job j);
214 const char * job_state(job j);
216 int job_list_any_p(job head);
217 job job_remove(job j);
218 void job_insert(job head, job j);
220 uint64 total_jobs(void);
222 /* for unit tests */
223 size_t get_all_jobs_used(void);
226 extern struct ms tubes;
228 tube make_tube(const char *name);
229 void tube_dref(tube t);
230 void tube_iref(tube t);
231 tube tube_find(const char *name);
232 tube tube_find_or_make(const char *name);
233 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
236 Conn *make_conn(int fd, char start_state, tube use, tube watch);
238 int count_cur_conns(void);
239 uint count_tot_conns(void);
240 int count_cur_producers(void);
241 int count_cur_workers(void);
244 extern size_t primes[];
247 extern size_t job_data_size_limit;
249 void prot_init(void);
250 int64 prottick(Server *s);
252 Conn *remove_waiting_conn(Conn *c);
254 void enqueue_reserved_jobs(Conn *c);
256 void enter_drain_mode(int sig);
257 void h_accept(const int fd, const short which, Server* srv);
258 void prot_remove_tube(tube t);
259 int prot_replay(Server *s, job list);
262 int make_server_socket(char *host_addr, char *port);
265 struct Conn {
266 Server *srv;
267 Socket sock;
268 char state;
269 char type;
270 Conn *next;
271 tube use;
272 int64 tickat; // time at which to do more work
273 int tickpos; // position in srv->conns
274 job soonest_job; // memoization of the soonest job
275 int rw; // currently want: 'r', 'w', or 'h'
276 int pending_timeout;
277 char halfclosed;
279 char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated
280 int cmd_len;
281 int cmd_read;
283 char *reply;
284 int reply_len;
285 int reply_sent;
286 char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated
288 // How many bytes of in_job->body have been read so far. If in_job is NULL
289 // while in_job_read is nonzero, we are in bit bucket mode and
290 // in_job_read's meaning is inverted -- then it counts the bytes that
291 // remain to be thrown away.
292 int in_job_read;
293 job in_job; // a job to be read from the client
295 job out_job;
296 int out_job_sent;
298 struct ms watch;
299 struct job reserved_jobs; // linked list header
301 int connless(Conn *a, Conn *b);
302 void connrec(Conn *c, int i);
303 void connwant(Conn *c, int rw);
304 void connsched(Conn *c);
305 void connclose(Conn *c);
306 void connsetproducer(Conn *c);
307 void connsetworker(Conn *c);
308 job connsoonestjob(Conn *c);
309 int conndeadlinesoon(Conn *c);
310 int conn_ready(Conn *c);
311 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
316 enum
318 Filesizedef = (10 << 20)
321 struct Wal {
322 int filesize;
323 int use;
324 char *dir;
325 File *head;
326 File *cur;
327 File *tail;
328 int nfile;
329 int next;
330 int64 resv; // bytes reserved
331 int64 alive; // bytes in use
332 int64 nmig; // migrations
333 int64 nrec; // records written ever
334 int wantsync;
335 int64 syncrate;
336 int64 lastsync;
337 int nocomp; // disable binlog compaction?
339 int waldirlock(Wal*);
340 void walinit(Wal*, job list);
341 int walwrite(Wal*, job);
342 void walmaint(Wal*);
343 int walresvput(Wal*, job);
344 int walresvupdate(Wal*, job);
345 void walgc(Wal*);
348 struct File {
349 File *next;
350 uint refs;
351 int seq;
352 int iswopen; // is open for writing
353 int fd;
354 int free;
355 int resv;
356 char *path;
357 Wal *w;
359 struct job jlist; // jobs written in this file
361 int fileinit(File*, Wal*, int);
362 Wal* fileadd(File*, Wal*);
363 void fileincref(File*);
364 void filedecref(File*);
365 void fileaddjob(File*, job);
366 void filermjob(File*, job);
367 int fileread(File*, job list);
368 void filewopen(File*);
369 void filewclose(File*);
370 int filewrjobshort(File*, job);
371 int filewrjobfull(File*, job);
374 #define Portdef "11300"
376 struct Server {
377 char *port;
378 char *addr;
379 char *user;
381 Wal wal;
382 Socket sock;
383 Heap conns;
385 void srvserve(Server *srv);
386 void srvaccept(Server *s, int ev);