refactor
[beanstalkd.git] / dat.h
blob8ee840800288d1586afbad536eb41649cbb9a766
1 #define int32_t do_not_use_int32_t
2 #define uint32_t do_not_use_uint32_t
3 #define int64_t do_not_use_int64_t
4 #define uint64_t do_not_use_uint64_t
6 typedef struct ms *ms;
7 typedef struct job *job;
8 typedef struct tube *tube;
9 typedef struct conn *conn;
10 typedef struct Heap Heap;
12 typedef void(*evh)(int, short, void *);
13 typedef void(*ms_event_fn)(ms a, void *item, size_t i);
14 typedef int(*Compare)(void*, void*);
15 typedef void(*Record)(void*, int);
17 #if _LP64
18 #define NUM_PRIMES 48
19 #else
20 #define NUM_PRIMES 19
21 #endif
23 #define MAX_TUBE_NAME_LEN 201
25 /* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
26 * MUST be enough to hold the longest possible command or reply line, which is
27 * currently "USING a{200}\r\n". */
28 #define LINE_BUF_SIZE 208
30 #define JOB_STATE_INVALID 0
31 #define JOB_STATE_READY 1
32 #define JOB_STATE_RESERVED 2
33 #define JOB_STATE_BURIED 3
34 #define JOB_STATE_DELAYED 4
35 #define JOB_STATE_COPY 5
37 /* CONN_TYPE_* are bit masks */
38 #define CONN_TYPE_PRODUCER 1
39 #define CONN_TYPE_WORKER 2
40 #define CONN_TYPE_WAITING 4
42 #define min(a,b) ((a)<(b)?(a):(b))
44 #define twarn(fmt, args...) warn("%s:%d in %s: " fmt, \
45 __FILE__, __LINE__, __func__, ##args)
46 #define twarnx(fmt, args...) warnx("%s:%d in %s: " fmt, \
47 __FILE__, __LINE__, __func__, ##args)
49 #ifdef DEBUG
50 #define dbgprintf(fmt, args...) ((void) fprintf(stderr, fmt, ##args))
51 #else
52 #define dbgprintf(fmt, ...) ((void) 0)
53 #endif
55 #define URGENT_THRESHOLD 1024
56 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
58 struct stats {
59 uint urgent_ct;
60 uint waiting_ct;
61 uint buried_ct;
62 uint reserved_ct;
63 uint pause_ct;
64 uint64 total_jobs_ct;
67 struct Heap {
68 int cap;
69 int len;
70 void **data;
71 Compare cmp;
72 Record rec;
75 struct ms {
76 size_t used, cap, last;
77 void **items;
78 ms_event_fn oninsert, onremove;
81 /* If you modify this struct, you MUST increment binlog format version in
82 * binlog.c. */
83 struct job {
85 /* persistent fields; these get written to the binlog */
86 uint64 id;
87 uint32 pri;
88 int64 delay;
89 int64 ttr;
90 int32 body_size;
91 int64 created_at;
92 int64 deadline_at;
93 uint32 reserve_ct;
94 uint32 timeout_ct;
95 uint32 release_ct;
96 uint32 bury_ct;
97 uint32 kick_ct;
98 uint8_t state;
100 /* bookeeping fields; these are in-memory only */
101 char pad[6];
102 tube tube;
103 job prev, next; /* linked list of jobs */
104 job ht_next; /* Next job in a hash table list */
105 size_t heap_index; /* where is this job in its current heap */
106 void *binlog;
107 void *reserver;
108 size_t reserved_binlog_space;
110 /* variable-size job data; written separately to the binlog */
111 char body[];
114 struct tube {
115 uint refs;
116 char name[MAX_TUBE_NAME_LEN];
117 Heap ready;
118 Heap delay;
119 struct job buried;
120 struct ms waiting; /* set of conns */
121 struct stats stat;
122 uint using_ct;
123 uint watching_ct;
124 int64 pause;
125 int64 deadline_at;
128 struct conn {
129 conn prev, next; /* linked list of connections */
130 int fd;
131 char state;
132 char type;
133 struct event evq;
134 int evmask;
135 int pending_timeout;
137 /* we cannot share this buffer with the reply line because we might read in
138 * command line data for a subsequent command, and we need to store it
139 * here. */
140 char cmd[LINE_BUF_SIZE]; /* this string is NOT NUL-terminated */
141 int cmd_len;
142 int cmd_read;
143 const char *reply;
144 int reply_len;
145 int reply_sent;
146 char reply_buf[LINE_BUF_SIZE]; /* this string IS NUL-terminated */
148 /* A job to be read from the client. */
149 job in_job;
151 /* Memoization of the soonest job */
152 job soonest_job;
154 /* How many bytes of in_job->body have been read so far. If in_job is NULL
155 * while in_job_read is nonzero, we are in bit bucket mode and
156 * in_job_read's meaning is inverted -- then it counts the bytes that
157 * remain to be thrown away. */
158 int in_job_read;
160 job out_job;
161 int out_job_sent;
162 struct job reserved_jobs; /* doubly-linked list header */
163 tube use;
164 struct ms watch;
167 void v();
169 void warn(const char *fmt, ...);
170 void warnx(const char *fmt, ...);
172 extern char *progname;
174 int64 nanoseconds();
175 void init_timeval(struct timeval *tv, int64 t);
177 void ms_init(ms a, ms_event_fn oninsert, ms_event_fn onremove);
178 void ms_clear(ms a);
179 int ms_append(ms a, void *item);
180 int ms_remove(ms a, void *item);
181 int ms_contains(ms a, void *item);
182 void *ms_take(ms a);
185 int heapinsert(Heap *h, void *x); /* return 1 on success, else 0 */
186 void* heapremove(Heap *h, int k);
189 #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
191 job allocate_job(int body_size);
192 job make_job_with_id(uint pri, int64 delay, int64 ttr,
193 int body_size, tube tube, uint64 id);
194 void job_free(job j);
196 /* Lookup a job by job ID */
197 job job_find(uint64 job_id);
199 /* the void* parameters are really job pointers */
200 void job_setheappos(void*, int);
201 int job_pri_cmp(void*, void*);
202 int job_delay_cmp(void*, void*);
204 job job_copy(job j);
206 const char * job_state(job j);
208 int job_list_any_p(job head);
209 job job_remove(job j);
210 void job_insert(job head, job j);
212 uint64 total_jobs();
214 /* for unit tests */
215 size_t get_all_jobs_used();
217 void job_init();
220 extern struct ms tubes;
222 tube make_tube(const char *name);
223 void tube_dref(tube t);
224 void tube_iref(tube t);
225 tube tube_find(const char *name);
226 tube tube_find_or_make(const char *name);
227 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
230 conn make_conn(int fd, char start_state, tube use, tube watch);
232 int conn_set_evq(conn c, const int events, evh handler);
233 void conn_set_evmask(conn c, const int evmask, conn list);
234 int conn_update_net(conn c);
236 void conn_close(conn c);
238 conn conn_remove(conn c);
239 void conn_insert(conn head, conn c);
241 int count_cur_conns();
242 uint count_tot_conns();
243 int count_cur_producers();
244 int count_cur_workers();
246 void conn_set_producer(conn c);
247 void conn_set_worker(conn c);
249 job soonest_job(conn c);
250 int has_reserved_this_job(conn c, job j);
251 int conn_has_close_deadline(conn c);
252 int conn_ready(conn c);
254 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
257 extern size_t primes[];
260 extern size_t job_data_size_limit;
262 void prot_init();
264 conn remove_waiting_conn(conn c);
266 void enqueue_reserved_jobs(conn c);
268 void enter_drain_mode(int sig);
269 void h_accept(const int fd, const short which, struct event *ev);
270 void prot_remove_tube(tube t);
271 void prot_replay_binlog(job binlog_jobs);
274 int make_server_socket(char *host_addr, char *port);
276 void unbrake();
277 extern int listening;
278 extern evh accept_handler;
281 extern char *binlog_dir;
282 extern size_t binlog_size_limit;
283 #define BINLOG_SIZE_LIMIT_DEFAULT (10 << 20)
285 extern int enable_fsync;
286 extern size_t fsync_throttle_ms;
288 void binlog_init(job binlog_jobs);
290 /* Return the number of locks acquired: either 0 or 1. */
291 int binlog_lock();
293 /* Returns the number of jobs successfully written (either 0 or 1). */
294 int binlog_write_job(job j);
295 size_t binlog_reserve_space_put(job j);
296 size_t binlog_reserve_space_update(job j);
298 void binlog_shutdown();
299 const char *binlog_oldest_index();
300 const char *binlog_current_index();