1 typedef unsigned char uchar
;
3 typedef unsigned int uint
;
5 typedef uint32_t uint32
;
7 typedef uint64_t uint64
;
9 #define int8_t do_not_use_int8_t
10 #define uint8_t do_not_use_uint8_t
11 #define int32_t do_not_use_int32_t
12 #define uint32_t do_not_use_uint32_t
13 #define int64_t do_not_use_int64_t
14 #define uint64_t do_not_use_uint64_t
16 typedef struct ms
*ms
;
17 typedef struct job
*job
;
18 typedef struct tube
*tube
;
19 typedef struct conn
*conn
;
20 typedef struct Heap Heap
;
21 typedef struct Jobrec Jobrec
;
22 typedef struct File File
;
23 typedef struct Socket Socket
;
24 typedef struct Srv Srv
;
25 typedef struct Wal Wal
;
27 typedef void(*evh
)(int, short, void *);
28 typedef void(*ms_event_fn
)(ms a
, void *item
, size_t i
);
29 typedef void(*Handle
)(void*, int);
30 typedef int(*Less
)(void*, void*);
31 typedef void(*Record
)(void*, int);
39 #define MAX_TUBE_NAME_LEN 201
41 /* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
42 * MUST be enough to hold the longest possible command or reply line, which is
43 * currently "USING a{200}\r\n". */
44 #define LINE_BUF_SIZE 208
46 /* CONN_TYPE_* are bit masks */
47 #define CONN_TYPE_PRODUCER 1
48 #define CONN_TYPE_WORKER 2
49 #define CONN_TYPE_WAITING 4
51 #define min(a,b) ((a)<(b)?(a):(b))
53 #define twarn(fmt, args...) warn("%s:%d in %s: " fmt, \
54 __FILE__, __LINE__, __func__, ##args)
55 #define twarnx(fmt, args...) warnx("%s:%d in %s: " fmt, \
56 __FILE__, __LINE__, __func__, ##args)
59 #define dbgprintf(fmt, args...) ((void) fprintf(stderr, fmt, ##args))
61 #define dbgprintf(fmt, ...) ((void) 0)
64 #define URGENT_THRESHOLD 1024
65 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
67 extern const char version
[];
86 int heapinsert(Heap
*h
, void *x
);
87 void* heapremove(Heap
*h
, int k
);
97 void sockinit(Handle tick
, void *x
, int64 ns
);
98 int sockwant(Socket
*s
, int rw
);
99 void sockmain(void); // does not return
102 size_t used
, cap
, last
;
104 ms_event_fn oninsert
, onremove
;
122 // if you modify this struct, you must increment Walver above
140 Jobrec r
; // persistent fields; these get written to the wal
142 /* bookeeping fields; these are in-memory only */
145 job prev
, next
; /* linked list of jobs */
146 job ht_next
; /* Next job in a hash table list */
147 size_t heap_index
; /* where is this job in its current heap */
155 char body
[]; // written separately to the wal
160 char name
[MAX_TUBE_NAME_LEN
];
163 struct ms waiting
; /* set of conns */
173 conn prev
, next
; /* linked list of connections */
178 int rw
; // currently want: 'r' or 'w'
180 int64 tickat
; // time at which to do more work
181 int tickpos
; // position in srv->conns
183 /* we cannot share this buffer with the reply line because we might read in
184 * command line data for a subsequent command, and we need to store it
186 char cmd
[LINE_BUF_SIZE
]; /* this string is NOT NUL-terminated */
192 char reply_buf
[LINE_BUF_SIZE
]; /* this string IS NUL-terminated */
194 /* A job to be read from the client. */
197 /* Memoization of the soonest job */
200 /* How many bytes of in_job->body have been read so far. If in_job is NULL
201 * while in_job_read is nonzero, we are in bit bucket mode and
202 * in_job_read's meaning is inverted -- then it counts the bytes that
203 * remain to be thrown away. */
210 struct job reserved_jobs
; /* doubly-linked list header */
216 void warn(const char *fmt
, ...);
217 void warnx(const char *fmt
, ...);
218 char* fmtalloc(char *fmt
, ...);
220 #define new(T) zalloc(sizeof(T))
222 extern const char *progname
;
224 int64
nanoseconds(void);
225 int falloc(int fd
, int len
);
228 void ms_init(ms a
, ms_event_fn oninsert
, ms_event_fn onremove
);
230 int ms_append(ms a
, void *item
);
231 int ms_remove(ms a
, void *item
);
232 int ms_contains(ms a
, void *item
);
236 #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
238 job
allocate_job(int body_size
);
239 job
make_job_with_id(uint pri
, int64 delay
, int64 ttr
,
240 int body_size
, tube tube
, uint64 id
);
241 void job_free(job j
);
243 /* Lookup a job by job ID */
244 job
job_find(uint64 job_id
);
246 /* the void* parameters are really job pointers */
247 void job_setheappos(void*, int);
248 int job_pri_less(void*, void*);
249 int job_delay_less(void*, void*);
253 const char * job_state(job j
);
255 int job_list_any_p(job head
);
256 job
job_remove(job j
);
257 void job_insert(job head
, job j
);
259 uint64
total_jobs(void);
262 size_t get_all_jobs_used(void);
265 extern struct ms tubes
;
267 tube
make_tube(const char *name
);
268 void tube_dref(tube t
);
269 void tube_iref(tube t
);
270 tube
tube_find(const char *name
);
271 tube
tube_find_or_make(const char *name
);
272 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
275 conn
make_conn(int fd
, char start_state
, tube use
, tube watch
);
277 int connless(conn a
, conn b
);
278 void connrec(conn c
, int i
);
279 void connwant(conn c
, const int mask
, conn list
);
280 void connsched(conn c
);
282 void conn_close(conn c
);
284 conn
conn_remove(conn c
);
285 void conn_insert(conn head
, conn c
);
287 int count_cur_conns(void);
288 uint
count_tot_conns(void);
289 int count_cur_producers(void);
290 int count_cur_workers(void);
292 void conn_set_producer(conn c
);
293 void conn_set_worker(conn c
);
295 job
soonest_job(conn c
);
296 int has_reserved_this_job(conn c
, job j
);
297 int conn_has_close_deadline(conn c
);
298 int conn_ready(conn c
);
300 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
303 extern size_t primes
[];
306 extern size_t job_data_size_limit
;
308 void prot_init(void);
309 void prottick(Srv
*s
);
311 conn
remove_waiting_conn(conn c
);
313 void enqueue_reserved_jobs(conn c
);
315 void enter_drain_mode(int sig
);
316 void h_accept(const int fd
, const short which
, Srv
* srv
);
317 void prot_remove_tube(tube t
);
318 void prot_replay(Srv
*s
, job list
);
321 int make_server_socket(char *host_addr
, char *port
);
326 Filesizedef
= (10 << 20)
338 int resv
; // bytes reserved
339 int alive
; // bytes in use
340 int64 nmig
; // migrations
341 int64 nrec
; // records written ever
345 int nocomp
; // disable binlog compaction?
347 int waldirlock(Wal
*);
348 void walinit(Wal
*, job list
);
349 int walwrite(Wal
*, job
);
351 int walresvput(Wal
*, job
);
352 int walresvupdate(Wal
*, job
);
360 int iswopen
; // is open for writing
367 struct job jlist
; // jobs written in this file
369 int fileinit(File
*, Wal
*, int);
370 Wal
* fileadd(File
*, Wal
*);
371 void fileincref(File
*);
372 void filedecref(File
*);
373 void fileaddjob(File
*, job
);
374 void filermjob(File
*, job
);
375 int fileread(File
*, job list
);
376 void filewopen(File
*);
377 void filewclose(File
*);
378 int filewrjobshort(File
*, job
);
379 int filewrjobfull(File
*, job
);
388 void srvaccept(Srv
*s
, int ev
);
389 void srvschedconn(Srv
*srv
, conn c
);
390 void srvtick(Srv
*s
, int ev
);