fix bsd sock build dep
[beanstalkd.git] / dat.h
blobe026a8b86612a86db5ad933b689038ddeba94764
1 typedef unsigned char uchar;
2 typedef uchar byte;
3 typedef unsigned int uint;
4 typedef int32_t int32;
5 typedef uint32_t uint32;
6 typedef int64_t int64;
7 typedef uint64_t uint64;
9 #define int8_t do_not_use_int8_t
10 #define uint8_t do_not_use_uint8_t
11 #define int32_t do_not_use_int32_t
12 #define uint32_t do_not_use_uint32_t
13 #define int64_t do_not_use_int64_t
14 #define uint64_t do_not_use_uint64_t
16 typedef struct ms *ms;
17 typedef struct job *job;
18 typedef struct tube *tube;
19 typedef struct Conn Conn;
20 typedef struct Heap Heap;
21 typedef struct Jobrec Jobrec;
22 typedef struct File File;
23 typedef struct Socket Socket;
24 typedef struct Server Server;
25 typedef struct Wal Wal;
27 typedef void(*evh)(int, short, void *);
28 typedef void(*ms_event_fn)(ms a, void *item, size_t i);
29 typedef void(*Handle)(void*, int rw); // rw can also be 'h' for hangup
30 typedef int(*Less)(void*, void*);
31 typedef void(*Record)(void*, int);
32 typedef int(FAlloc)(int, int);
34 #if _LP64
35 #define NUM_PRIMES 48
36 #else
37 #define NUM_PRIMES 19
38 #endif
40 #define MAX_TUBE_NAME_LEN 201
42 /* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
43 * MUST be enough to hold the longest possible command or reply line, which is
44 * currently "USING a{200}\r\n". */
45 #define LINE_BUF_SIZE 208
47 /* CONN_TYPE_* are bit masks */
48 #define CONN_TYPE_PRODUCER 1
49 #define CONN_TYPE_WORKER 2
50 #define CONN_TYPE_WAITING 4
52 #define min(a,b) ((a)<(b)?(a):(b))
54 #define URGENT_THRESHOLD 1024
55 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
57 extern const char version[];
58 extern int verbose;
59 extern struct Server srv;
61 // Replaced by tests to simulate failures.
62 extern FAlloc *falloc;
64 struct stats {
65 uint urgent_ct;
66 uint waiting_ct;
67 uint buried_ct;
68 uint reserved_ct;
69 uint pause_ct;
70 uint64 total_delete_ct;
71 uint64 total_jobs_ct;
75 struct Heap {
76 int cap;
77 int len;
78 void **data;
79 Less less;
80 Record rec;
82 int heapinsert(Heap *h, void *x);
83 void* heapremove(Heap *h, int k);
86 struct Socket {
87 int fd;
88 Handle f;
89 void *x;
90 int added;
93 void sockinit(Handle tick, void *x, int64 ns);
94 int sockwant(Socket *s, int rw);
95 void sockmain(void); // does not return
97 struct ms {
98 size_t used, cap, last;
99 void **items;
100 ms_event_fn oninsert, onremove;
103 enum
105 Walver = 7
108 enum // Jobrec.state
110 Invalid,
111 Ready,
112 Reserved,
113 Buried,
114 Delayed,
115 Copy
118 // if you modify this struct, you must increment Walver above
119 struct Jobrec {
120 uint64 id;
121 uint32 pri;
122 int64 delay;
123 int64 ttr;
124 int32 body_size;
125 int64 created_at;
126 int64 deadline_at;
127 uint32 reserve_ct;
128 uint32 timeout_ct;
129 uint32 release_ct;
130 uint32 bury_ct;
131 uint32 kick_ct;
132 byte state;
135 struct job {
136 Jobrec r; // persistent fields; these get written to the wal
138 /* bookeeping fields; these are in-memory only */
139 char pad[6];
140 tube tube;
141 job prev, next; /* linked list of jobs */
142 job ht_next; /* Next job in a hash table list */
143 size_t heap_index; /* where is this job in its current heap */
144 File *file;
145 job fnext;
146 job fprev;
147 void *reserver;
148 int walresv;
149 int walused;
151 char body[]; // written separately to the wal
154 struct tube {
155 uint refs;
156 char name[MAX_TUBE_NAME_LEN];
157 Heap ready;
158 Heap delay;
159 struct ms waiting; /* set of conns */
160 struct stats stat;
161 uint using_ct;
162 uint watching_ct;
163 int64 pause;
164 int64 deadline_at;
165 struct job buried;
169 #define twarn(fmt, args...) warn("%s:%d in %s: " fmt, \
170 __FILE__, __LINE__, __func__, ##args)
171 #define twarnx(fmt, args...) warnx("%s:%d in %s: " fmt, \
172 __FILE__, __LINE__, __func__, ##args)
174 void warn(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
175 void warnx(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
176 char* fmtalloc(char *fmt, ...) __attribute__((format(printf, 1, 2)));
177 void* zalloc(int n);
178 #define new(T) zalloc(sizeof(T))
179 void optparse(Server*, char**);
181 extern const char *progname;
183 int64 nanoseconds(void);
184 int rawfalloc(int fd, int len);
187 void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);
188 void ms_clear(ms a);
189 int ms_append(ms a, void *item);
190 int ms_remove(ms a, void *item);
191 int ms_contains(ms a, void *item);
192 void *ms_take(ms a);
195 #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
197 job allocate_job(int body_size);
198 job make_job_with_id(uint pri, int64 delay, int64 ttr,
199 int body_size, tube tube, uint64 id);
200 void job_free(job j);
202 /* Lookup a job by job ID */
203 job job_find(uint64 job_id);
205 /* the void* parameters are really job pointers */
206 void job_setheappos(void*, int);
207 int job_pri_less(void*, void*);
208 int job_delay_less(void*, void*);
210 job job_copy(job j);
212 const char * job_state(job j);
214 int job_list_any_p(job head);
215 job job_remove(job j);
216 void job_insert(job head, job j);
218 uint64 total_jobs(void);
220 /* for unit tests */
221 size_t get_all_jobs_used(void);
224 extern struct ms tubes;
226 tube make_tube(const char *name);
227 void tube_dref(tube t);
228 void tube_iref(tube t);
229 tube tube_find(const char *name);
230 tube tube_find_or_make(const char *name);
231 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
234 Conn *make_conn(int fd, char start_state, tube use, tube watch);
236 int count_cur_conns(void);
237 uint count_tot_conns(void);
238 int count_cur_producers(void);
239 int count_cur_workers(void);
242 extern size_t primes[];
245 extern size_t job_data_size_limit;
247 void prot_init(void);
248 void prottick(Server *s);
249 void protrmdirty(Conn*);
251 Conn *remove_waiting_conn(Conn *c);
253 void enqueue_reserved_jobs(Conn *c);
255 void enter_drain_mode(int sig);
256 void h_accept(const int fd, const short which, Server* srv);
257 void prot_remove_tube(tube t);
258 int prot_replay(Server *s, job list);
261 int make_server_socket(char *host_addr, char *port);
264 struct Conn {
265 Server *srv;
266 Socket sock;
267 char state;
268 char type;
269 Conn *next;
270 tube use;
271 int64 tickat; // time at which to do more work
272 int tickpos; // position in srv->conns
273 job soonest_job; // memoization of the soonest job
274 int rw; // currently want: 'r', 'w', or 'h'
275 int pending_timeout;
277 char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated
278 int cmd_len;
279 int cmd_read;
281 char *reply;
282 int reply_len;
283 int reply_sent;
284 char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated
286 // How many bytes of in_job->body have been read so far. If in_job is NULL
287 // while in_job_read is nonzero, we are in bit bucket mode and
288 // in_job_read's meaning is inverted -- then it counts the bytes that
289 // remain to be thrown away.
290 int in_job_read;
291 job in_job; // a job to be read from the client
293 job out_job;
294 int out_job_sent;
296 struct ms watch;
297 struct job reserved_jobs; // linked list header
299 int connless(Conn *a, Conn *b);
300 void connrec(Conn *c, int i);
301 void connwant(Conn *c, int rw);
302 void connsched(Conn *c);
303 void connclose(Conn *c);
304 void connsetproducer(Conn *c);
305 void connsetworker(Conn *c);
306 job connsoonestjob(Conn *c);
307 int conndeadlinesoon(Conn *c);
308 int conn_ready(Conn *c);
309 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
314 enum
316 Filesizedef = (10 << 20)
319 struct Wal {
320 int filesize;
321 int use;
322 char *dir;
323 File *head;
324 File *cur;
325 File *tail;
326 int nfile;
327 int next;
328 int resv; // bytes reserved
329 int alive; // bytes in use
330 int64 nmig; // migrations
331 int64 nrec; // records written ever
332 int wantsync;
333 int64 syncrate;
334 int64 lastsync;
335 int nocomp; // disable binlog compaction?
337 int waldirlock(Wal*);
338 void walinit(Wal*, job list);
339 int walwrite(Wal*, job);
340 void walmaint(Wal*);
341 int walresvput(Wal*, job);
342 int walresvupdate(Wal*, job);
343 void walgc(Wal*);
346 struct File {
347 File *next;
348 uint refs;
349 int seq;
350 int iswopen; // is open for writing
351 int fd;
352 int free;
353 int resv;
354 char *path;
355 Wal *w;
357 struct job jlist; // jobs written in this file
359 int fileinit(File*, Wal*, int);
360 Wal* fileadd(File*, Wal*);
361 void fileincref(File*);
362 void filedecref(File*);
363 void fileaddjob(File*, job);
364 void filermjob(File*, job);
365 int fileread(File*, job list);
366 void filewopen(File*);
367 void filewclose(File*);
368 int filewrjobshort(File*, job);
369 int filewrjobfull(File*, job);
372 #define Portdef "11300"
374 struct Server {
375 char *port;
376 char *addr;
377 char *user;
379 Wal wal;
380 Socket sock;
381 Heap conns;
383 void srvserve(Server *srv);
384 void srvaccept(Server *s, int ev);
385 void srvtick(Server *s, int ev);