1 #define int32_t do_not_use_int32_t
2 #define uint32_t do_not_use_uint32_t
3 #define int64_t do_not_use_int64_t
4 #define uint64_t do_not_use_uint64_t
7 typedef struct job
*job
;
8 typedef struct tube
*tube
;
9 typedef struct conn
*conn
;
10 typedef struct Heap Heap
;
12 typedef void(*evh
)(int, short, void *);
13 typedef void(*ms_event_fn
)(ms a
, void *item
, size_t i
);
14 typedef int(*Compare
)(void*, void*);
15 typedef void(*Record
)(void*, int);
23 #define MAX_TUBE_NAME_LEN 201
25 /* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
26 * MUST be enough to hold the longest possible command or reply line, which is
27 * currently "USING a{200}\r\n". */
28 #define LINE_BUF_SIZE 208
30 #define JOB_STATE_INVALID 0
31 #define JOB_STATE_READY 1
32 #define JOB_STATE_RESERVED 2
33 #define JOB_STATE_BURIED 3
34 #define JOB_STATE_DELAYED 4
35 #define JOB_STATE_COPY 5
37 /* CONN_TYPE_* are bit masks */
38 #define CONN_TYPE_PRODUCER 1
39 #define CONN_TYPE_WORKER 2
40 #define CONN_TYPE_WAITING 4
42 #define min(a,b) ((a)<(b)?(a):(b))
44 #define twarn(fmt, args...) warn("%s:%d in %s: " fmt, \
45 __FILE__, __LINE__, __func__, ##args)
46 #define twarnx(fmt, args...) warnx("%s:%d in %s: " fmt, \
47 __FILE__, __LINE__, __func__, ##args)
50 #define dbgprintf(fmt, args...) ((void) fprintf(stderr, fmt, ##args))
52 #define dbgprintf(fmt, ...) ((void) 0)
55 #define URGENT_THRESHOLD 1024
56 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
76 size_t used
, cap
, last
;
78 ms_event_fn oninsert
, onremove
;
81 /* If you modify this struct, you MUST increment binlog format version in
85 /* persistent fields; these get written to the binlog */
100 /* bookeeping fields; these are in-memory only */
103 job prev
, next
; /* linked list of jobs */
104 job ht_next
; /* Next job in a hash table list */
105 size_t heap_index
; /* where is this job in its current heap */
108 size_t reserved_binlog_space
;
110 /* variable-size job data; written separately to the binlog */
116 char name
[MAX_TUBE_NAME_LEN
];
120 struct ms waiting
; /* set of conns */
129 conn prev
, next
; /* linked list of connections */
137 /* we cannot share this buffer with the reply line because we might read in
138 * command line data for a subsequent command, and we need to store it
140 char cmd
[LINE_BUF_SIZE
]; /* this string is NOT NUL-terminated */
146 char reply_buf
[LINE_BUF_SIZE
]; /* this string IS NUL-terminated */
148 /* A job to be read from the client. */
151 /* Memoization of the soonest job */
154 /* How many bytes of in_job->body have been read so far. If in_job is NULL
155 * while in_job_read is nonzero, we are in bit bucket mode and
156 * in_job_read's meaning is inverted -- then it counts the bytes that
157 * remain to be thrown away. */
162 struct job reserved_jobs
; /* doubly-linked list header */
169 void warn(const char *fmt
, ...);
170 void warnx(const char *fmt
, ...);
172 extern char *progname
;
175 void init_timeval(struct timeval
*tv
, int64 t
);
177 void ms_init(ms a
, ms_event_fn oninsert
, ms_event_fn onremove
);
179 int ms_append(ms a
, void *item
);
180 int ms_remove(ms a
, void *item
);
181 int ms_contains(ms a
, void *item
);
185 int heapinsert(Heap
*h
, void *x
); /* return 1 on success, else 0 */
186 void* heapremove(Heap
*h
, int k
);
189 #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
191 job
allocate_job(int body_size
);
192 job
make_job_with_id(uint pri
, int64 delay
, int64 ttr
,
193 int body_size
, tube tube
, uint64 id
);
194 void job_free(job j
);
196 /* Lookup a job by job ID */
197 job
job_find(uint64 job_id
);
199 /* the void* parameters are really job pointers */
200 void job_setheappos(void*, int);
201 int job_pri_cmp(void*, void*);
202 int job_delay_cmp(void*, void*);
206 const char * job_state(job j
);
208 int job_list_any_p(job head
);
209 job
job_remove(job j
);
210 void job_insert(job head
, job j
);
215 size_t get_all_jobs_used();
220 extern struct ms tubes
;
222 tube
make_tube(const char *name
);
223 void tube_dref(tube t
);
224 void tube_iref(tube t
);
225 tube
tube_find(const char *name
);
226 tube
tube_find_or_make(const char *name
);
227 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
230 conn
make_conn(int fd
, char start_state
, tube use
, tube watch
);
232 int conn_set_evq(conn c
, const int events
, evh handler
);
233 void conn_set_evmask(conn c
, const int evmask
, conn list
);
234 int conn_update_net(conn c
);
236 void conn_close(conn c
);
238 conn
conn_remove(conn c
);
239 void conn_insert(conn head
, conn c
);
241 int count_cur_conns();
242 uint
count_tot_conns();
243 int count_cur_producers();
244 int count_cur_workers();
246 void conn_set_producer(conn c
);
247 void conn_set_worker(conn c
);
249 job
soonest_job(conn c
);
250 int has_reserved_this_job(conn c
, job j
);
251 int conn_has_close_deadline(conn c
);
252 int conn_ready(conn c
);
254 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
257 extern size_t primes
[];
260 extern size_t job_data_size_limit
;
264 conn
remove_waiting_conn(conn c
);
266 void enqueue_reserved_jobs(conn c
);
268 void enter_drain_mode(int sig
);
269 void h_accept(const int fd
, const short which
, struct event
*ev
);
270 void prot_remove_tube(tube t
);
271 void prot_replay_binlog(job binlog_jobs
);
274 int make_server_socket(char *host_addr
, char *port
);
277 extern int listening
;
278 extern evh accept_handler
;
281 extern char *binlog_dir
;
282 extern size_t binlog_size_limit
;
283 #define BINLOG_SIZE_LIMIT_DEFAULT (10 << 20)
285 extern int enable_fsync
;
286 extern size_t fsync_throttle_ms
;
288 void binlog_init(job binlog_jobs
);
290 /* Return the number of locks acquired: either 0 or 1. */
293 /* Returns the number of jobs successfully written (either 0 or 1). */
294 int binlog_write_job(job j
);
295 size_t binlog_reserve_space_put(job j
);
296 size_t binlog_reserve_space_update(job j
);
298 void binlog_shutdown();
299 const char *binlog_oldest_index();
300 const char *binlog_current_index();