5 typedef unsigned char uchar
;
7 typedef unsigned int uint
;
9 typedef uint32_t uint32
;
10 typedef int64_t int64
;
11 typedef uint64_t uint64
;
13 #define int8_t do_not_use_int8_t
14 #define uint8_t do_not_use_uint8_t
15 #define int32_t do_not_use_int32_t
16 #define uint32_t do_not_use_uint32_t
17 #define int64_t do_not_use_int64_t
18 #define uint64_t do_not_use_uint64_t
20 typedef struct ms
*ms
;
21 typedef struct job
*job
;
22 typedef struct tube
*tube
;
23 typedef struct Conn Conn
;
24 typedef struct Heap Heap
;
25 typedef struct Jobrec Jobrec
;
26 typedef struct File File
;
27 typedef struct Socket Socket
;
28 typedef struct Server Server
;
29 typedef struct Wal Wal
;
31 typedef void(*ms_event_fn
)(ms a
, void *item
, size_t i
);
32 typedef void(*Handle
)(void*, int rw
);
33 typedef int(*Less
)(void*, void*);
34 typedef void(*Record
)(void*, int);
35 typedef int(FAlloc
)(int, int);
43 #define MAX_TUBE_NAME_LEN 201
45 /* A command can be at most LINE_BUF_SIZE chars, including "\r\n". This value
46 * MUST be enough to hold the longest possible command ("pause-tube a{200} 4294967295\r\n")
47 * or reply line ("USING a{200}\r\n"). */
48 #define LINE_BUF_SIZE 224
50 /* CONN_TYPE_* are bit masks */
51 #define CONN_TYPE_PRODUCER 1
52 #define CONN_TYPE_WORKER 2
53 #define CONN_TYPE_WAITING 4
55 #define min(a,b) ((a)<(b)?(a):(b))
57 #define URGENT_THRESHOLD 1024
58 #define JOB_DATA_SIZE_LIMIT_DEFAULT ((1 << 16) - 1)
60 extern const char version
[];
62 extern struct Server srv
;
64 // Replaced by tests to simulate failures.
65 extern FAlloc
*falloc
;
73 uint64 total_delete_ct
;
85 int heapinsert(Heap
*h
, void *x
);
86 void* heapremove(Heap
*h
, int k
);
96 int sockwant(Socket
*, int);
97 int socknext(Socket
**, int64
);
100 size_t used
, cap
, last
;
102 ms_event_fn oninsert
, onremove
;
120 // if you modify this struct, you must increment Walver above
138 Jobrec r
; // persistent fields; these get written to the wal
140 /* bookeeping fields; these are in-memory only */
143 job prev
, next
; /* linked list of jobs */
144 job ht_next
; /* Next job in a hash table list */
145 size_t heap_index
; /* where is this job in its current heap */
153 char body
[]; // written separately to the wal
158 char name
[MAX_TUBE_NAME_LEN
];
161 struct ms waiting
; /* set of conns */
171 #define twarn(fmt, args...) warn("%s:%d in %s: " fmt, \
172 __FILE__, __LINE__, __func__, ##args)
173 #define twarnx(fmt, args...) warnx("%s:%d in %s: " fmt, \
174 __FILE__, __LINE__, __func__, ##args)
176 void warn(const char *fmt
, ...) __attribute__((format(printf
, 1, 2)));
177 void warnx(const char *fmt
, ...) __attribute__((format(printf
, 1, 2)));
178 char* fmtalloc(char *fmt
, ...) __attribute__((format(printf
, 1, 2)));
180 #define new(T) zalloc(sizeof(T))
181 void optparse(Server
*, char**);
183 extern const char *progname
;
185 int64
nanoseconds(void);
186 int rawfalloc(int fd
, int len
);
189 void ms_init(ms a
, ms_event_fn oninsert
, ms_event_fn onremove
);
191 int ms_append(ms a
, void *item
);
192 int ms_remove(ms a
, void *item
);
193 int ms_contains(ms a
, void *item
);
197 #define make_job(pri,delay,ttr,body_size,tube) make_job_with_id(pri,delay,ttr,body_size,tube,0)
199 job
allocate_job(int body_size
);
200 job
make_job_with_id(uint pri
, int64 delay
, int64 ttr
,
201 int body_size
, tube tube
, uint64 id
);
202 void job_free(job j
);
204 /* Lookup a job by job ID */
205 job
job_find(uint64 job_id
);
207 /* the void* parameters are really job pointers */
208 void job_setheappos(void*, int);
209 int job_pri_less(void*, void*);
210 int job_delay_less(void*, void*);
214 const char * job_state(job j
);
216 int job_list_any_p(job head
);
217 job
job_remove(job j
);
218 void job_insert(job head
, job j
);
220 uint64
total_jobs(void);
223 size_t get_all_jobs_used(void);
226 extern struct ms tubes
;
228 tube
make_tube(const char *name
);
229 void tube_dref(tube t
);
230 void tube_iref(tube t
);
231 tube
tube_find(const char *name
);
232 tube
tube_find_or_make(const char *name
);
233 #define TUBE_ASSIGN(a,b) (tube_dref(a), (a) = (b), tube_iref(a))
236 Conn
*make_conn(int fd
, char start_state
, tube use
, tube watch
);
238 int count_cur_conns(void);
239 uint
count_tot_conns(void);
240 int count_cur_producers(void);
241 int count_cur_workers(void);
244 extern size_t primes
[];
247 extern size_t job_data_size_limit
;
249 void prot_init(void);
250 int64
prottick(Server
*s
);
252 Conn
*remove_waiting_conn(Conn
*c
);
254 void enqueue_reserved_jobs(Conn
*c
);
256 void enter_drain_mode(int sig
);
257 void h_accept(const int fd
, const short which
, Server
* srv
);
258 void prot_remove_tube(tube t
);
259 int prot_replay(Server
*s
, job list
);
262 int make_server_socket(char *host_addr
, char *port
);
272 int64 tickat
; // time at which to do more work
273 int tickpos
; // position in srv->conns
274 job soonest_job
; // memoization of the soonest job
275 int rw
; // currently want: 'r', 'w', or 'h'
279 char cmd
[LINE_BUF_SIZE
]; // this string is NOT NUL-terminated
286 char reply_buf
[LINE_BUF_SIZE
]; // this string IS NUL-terminated
288 // How many bytes of in_job->body have been read so far. If in_job is NULL
289 // while in_job_read is nonzero, we are in bit bucket mode and
290 // in_job_read's meaning is inverted -- then it counts the bytes that
291 // remain to be thrown away.
293 job in_job
; // a job to be read from the client
299 struct job reserved_jobs
; // linked list header
301 int connless(Conn
*a
, Conn
*b
);
302 void connrec(Conn
*c
, int i
);
303 void connwant(Conn
*c
, int rw
);
304 void connsched(Conn
*c
);
305 void connclose(Conn
*c
);
306 void connsetproducer(Conn
*c
);
307 void connsetworker(Conn
*c
);
308 job
connsoonestjob(Conn
*c
);
309 int conndeadlinesoon(Conn
*c
);
310 int conn_ready(Conn
*c
);
311 #define conn_waiting(c) ((c)->type & CONN_TYPE_WAITING)
318 Filesizedef
= (10 << 20)
330 int64 resv
; // bytes reserved
331 int64 alive
; // bytes in use
332 int64 nmig
; // migrations
333 int64 nrec
; // records written ever
337 int nocomp
; // disable binlog compaction?
339 int waldirlock(Wal
*);
340 void walinit(Wal
*, job list
);
341 int walwrite(Wal
*, job
);
343 int walresvput(Wal
*, job
);
344 int walresvupdate(Wal
*, job
);
352 int iswopen
; // is open for writing
359 struct job jlist
; // jobs written in this file
361 int fileinit(File
*, Wal
*, int);
362 Wal
* fileadd(File
*, Wal
*);
363 void fileincref(File
*);
364 void filedecref(File
*);
365 void fileaddjob(File
*, job
);
366 void filermjob(File
*, job
);
367 int fileread(File
*, job list
);
368 void filewopen(File
*);
369 void filewclose(File
*);
370 int filewrjobshort(File
*, job
);
371 int filewrjobfull(File
*, job
);
374 #define Portdef "11300"
385 void srvserve(Server
*srv
);
386 void srvaccept(Server
*s
, int ev
);