update to newest ct; fixes #60
[beanstalkd.git] / dat.h
blobfa84c00762ed3c7415a955104a77fa2f4aacbe92
1 typedef unsigned char uchar;
2 typedef uchar byte;
3 typedef unsigned int uint;
4 typedef int32_t int32;
5 typedef uint32_t uint32;
6 typedef int64_t int64;
7 typedef uint64_t uint64;
9 #define int32_t do_not_use_int32_t
10 #define uint32_t do_not_use_uint32_t
11 #define int64_t do_not_use_int64_t
12 #define uint64_t do_not_use_uint64_t
14 typedef struct ms *ms;
15 typedef struct job *job;
16 typedef struct tube *tube;
17 typedef struct conn *conn;
18 typedef struct Heap Heap;
19 typedef struct Socket Socket;
20 typedef struct Srv Srv;
22 typedef void(*evh)(int, short, void *);
23 typedef void(*ms_event_fn)(ms a, void *item, size_t i);
24 typedef void(*Handle)(void*, int);
25 typedef int(*Compare)(void*, void*);
26 typedef void(*Record)(void*, int);
28 #if _LP64
29 #define NUM_PRIMES 48
30 #else
31 #define NUM_PRIMES 19
32 #endif
34 #define MAX_TUBE_NAME_LEN 201
36 /* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
37 * MUST be enough to hold the longest possible command or reply line, which is
38 * currently "USING a{200}\r\n". */
39 #define LINE_BUF_SIZE 208
41 #define JOB_STATE_INVALID 0
42 #define JOB_STATE_READY 1
43 #define JOB_STATE_RESERVED 2
44 #define JOB_STATE_BURIED 3
45 #define JOB_STATE_DELAYED 4
46 #define JOB_STATE_COPY 5
48 /* CONN_TYPE_* are bit masks */
49 #define CONN_TYPE_PRODUCER 1
50 #define CONN_TYPE_WORKER 2
51 #define CONN_TYPE_WAITING 4
53 #define min(a,b) ((a)<(b)?(a):(b))
55 #define twarn(fmt, args...) warn("%s:%d in %s: " fmt, \
56 __FILE__, __LINE__, __func__, ##args)
57 #define twarnx(fmt, args...) warnx("%s:%d in %s: " fmt, \
58 __FILE__, __LINE__, __func__, ##args)
60 #ifdef DEBUG
61 #define dbgprintf(fmt, args...) ((void) fprintf(stderr, fmt, ##args))
62 #else
63 #define dbgprintf(fmt, ...) ((void) 0)
64 #endif
66 #define URGENT_THRESHOLD 1024
67 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
69 extern const char version[];
71 struct stats {
72 uint urgent_ct;
73 uint waiting_ct;
74 uint buried_ct;
75 uint reserved_ct;
76 uint pause_ct;
77 uint64 total_jobs_ct;
80 struct Heap {
81 int cap;
82 int len;
83 void **data;
84 Compare cmp;
85 Record rec;
88 struct Socket {
89 int fd;
90 Handle f;
91 void *x;
92 int added;
95 void sockinit(Handle tick, void *x, int64 ns);
96 int sockwant(Socket *s, int rw);
97 void sockmain(); // does not return
99 struct Srv {
100 Socket sock;
101 Heap conns;
104 struct ms {
105 size_t used, cap, last;
106 void **items;
107 ms_event_fn oninsert, onremove;
110 /* If you modify this struct, you MUST increment binlog format version in
111 * binlog.c. */
112 struct job {
114 /* persistent fields; these get written to the binlog */
115 uint64 id;
116 uint32 pri;
117 int64 delay;
118 int64 ttr;
119 int32 body_size;
120 int64 created_at;
121 int64 deadline_at;
122 uint32 reserve_ct;
123 uint32 timeout_ct;
124 uint32 release_ct;
125 uint32 bury_ct;
126 uint32 kick_ct;
127 uint8_t state;
129 /* bookeeping fields; these are in-memory only */
130 char pad[6];
131 tube tube;
132 job prev, next; /* linked list of jobs */
133 job ht_next; /* Next job in a hash table list */
134 size_t heap_index; /* where is this job in its current heap */
135 void *binlog;
136 void *reserver;
137 size_t reserved_binlog_space;
139 /* variable-size job data; written separately to the binlog */
140 char body[];
143 struct tube {
144 uint refs;
145 char name[MAX_TUBE_NAME_LEN];
146 Heap ready;
147 Heap delay;
148 struct job buried;
149 struct ms waiting; /* set of conns */
150 struct stats stat;
151 uint using_ct;
152 uint watching_ct;
153 int64 pause;
154 int64 deadline_at;
157 struct conn {
158 conn prev, next; /* linked list of connections */
159 Srv *srv;
160 Socket sock;
161 char state;
162 char type;
163 int rw; // currently want: 'r' or 'w'
164 int pending_timeout;
165 int64 tickat; // time at which to do more work
166 int tickpos; // position in srv->conns
168 /* we cannot share this buffer with the reply line because we might read in
169 * command line data for a subsequent command, and we need to store it
170 * here. */
171 char cmd[LINE_BUF_SIZE]; /* this string is NOT NUL-terminated */
172 int cmd_len;
173 int cmd_read;
174 const char *reply;
175 int reply_len;
176 int reply_sent;
177 char reply_buf[LINE_BUF_SIZE]; /* this string IS NUL-terminated */
179 /* A job to be read from the client. */
180 job in_job;
182 /* Memoization of the soonest job */
183 job soonest_job;
185 /* How many bytes of in_job->body have been read so far. If in_job is NULL
186 * while in_job_read is nonzero, we are in bit bucket mode and
187 * in_job_read's meaning is inverted -- then it counts the bytes that
188 * remain to be thrown away. */
189 int in_job_read;
191 job out_job;
192 int out_job_sent;
193 struct job reserved_jobs; /* doubly-linked list header */
194 tube use;
195 struct ms watch;
199 void srv(Srv *srv);
200 void srvaccept(Srv *s, int ev);
201 void srvschedconn(Srv *srv, conn c);
202 void srvtick(Srv *s, int ev);
205 void v();
207 void warn(const char *fmt, ...);
208 void warnx(const char *fmt, ...);
210 extern const char *progname;
212 int64 nanoseconds();
214 void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);
215 void ms_clear(ms a);
216 int ms_append(ms a, void *item);
217 int ms_remove(ms a, void *item);
218 int ms_contains(ms a, void *item);
219 void *ms_take(ms a);
222 int heapinsert(Heap *h, void *x); /* return 1 on success, else 0 */
223 void* heapremove(Heap *h, int k);
226 #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
228 job allocate_job(int body_size);
229 job make_job_with_id(uint pri, int64 delay, int64 ttr,
230 int body_size, tube tube, uint64 id);
231 void job_free(job j);
233 /* Lookup a job by job ID */
234 job job_find(uint64 job_id);
236 /* the void* parameters are really job pointers */
237 void job_setheappos(void*, int);
238 int job_pri_cmp(void*, void*);
239 int job_delay_cmp(void*, void*);
241 job job_copy(job j);
243 const char * job_state(job j);
245 int job_list_any_p(job head);
246 job job_remove(job j);
247 void job_insert(job head, job j);
249 uint64 total_jobs();
251 /* for unit tests */
252 size_t get_all_jobs_used();
255 extern struct ms tubes;
257 tube make_tube(const char *name);
258 void tube_dref(tube t);
259 void tube_iref(tube t);
260 tube tube_find(const char *name);
261 tube tube_find_or_make(const char *name);
262 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
265 conn make_conn(int fd, char start_state, tube use, tube watch);
267 int conncmp(conn a, conn b);
268 void connrec(conn c, int i);
269 void connwant(conn c, const int mask, conn list);
270 void connsched(conn c);
272 void conn_close(conn c);
274 conn conn_remove(conn c);
275 void conn_insert(conn head, conn c);
277 int count_cur_conns();
278 uint count_tot_conns();
279 int count_cur_producers();
280 int count_cur_workers();
282 void conn_set_producer(conn c);
283 void conn_set_worker(conn c);
285 job soonest_job(conn c);
286 int has_reserved_this_job(conn c, job j);
287 int conn_has_close_deadline(conn c);
288 int conn_ready(conn c);
290 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
293 extern size_t primes[];
296 extern size_t job_data_size_limit;
298 void prot_init();
299 void prottick(Srv *s);
301 conn remove_waiting_conn(conn c);
303 void enqueue_reserved_jobs(conn c);
305 void enter_drain_mode(int sig);
306 void h_accept(const int fd, const short which, Srv* srv);
307 void prot_remove_tube(tube t);
308 void prot_replay_binlog(job binlog_jobs);
311 int make_server_socket(char *host_addr, char *port);
313 extern char *binlog_dir;
314 extern size_t binlog_size_limit;
315 #define BINLOG_SIZE_LIMIT_DEFAULT (10 << 20)
317 extern int enable_fsync;
318 extern size_t fsync_throttle_ms;
320 void binlog_init(job binlog_jobs);
322 /* Return the number of locks acquired: either 0 or 1. */
323 int binlog_lock();
325 /* Returns the number of jobs successfully written (either 0 or 1). */
326 int binlog_write_job(job j);
327 size_t binlog_reserve_space_put(job j);
328 size_t binlog_reserve_space_update(job j);
330 void binlog_shutdown();
331 const char *binlog_oldest_index();
332 const char *binlog_current_index();
334 /* Allocate disk space.
335 * Expects fd's offset to be 0; may also reset fd's offset to 0.
336 * Returns 0 on success, and a positive errno otherwise. */
337 int falloc(int fd, int len);