document the new binlog stats
[beanstalkd.git] / conn.c
blob809fa4f01eef21a9e14f936bcf4d70938badb438
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <errno.h>
5 #include <limits.h>
6 #include <unistd.h>
7 #include "dat.h"
9 #define SAFETY_MARGIN (1000000000) /* 1 second */
11 /* Doubly-linked list of free connections. */
12 static struct conn pool = { &pool, &pool };
14 static int cur_conn_ct = 0, cur_worker_ct = 0, cur_producer_ct = 0;
15 static uint tot_conn_ct = 0;
17 static conn
18 conn_alloc()
20 conn c;
22 c = conn_remove(pool.next);
23 if (!c) {
24 c = malloc(sizeof(struct conn));
27 return memset(c, 0, sizeof *c);
30 static void
31 conn_free(conn c)
33 c->sock.fd = 0;
34 conn_insert(&pool, c);
37 static void
38 on_watch(ms a, tube t, size_t i)
40 tube_iref(t);
41 t->watching_ct++;
44 static void
45 on_ignore(ms a, tube t, size_t i)
47 t->watching_ct--;
48 tube_dref(t);
51 conn
52 make_conn(int fd, char start_state, tube use, tube watch)
54 job j;
55 conn c;
57 c = conn_alloc();
58 if (!c) return twarn("OOM"), (conn) 0;
60 ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
61 if (!ms_append(&c->watch, watch)) {
62 conn_free(c);
63 return twarn("OOM"), (conn) 0;
66 c->use = NULL; /* initialize */
67 TUBE_ASSIGN(c->use, use);
68 use->using_ct++;
70 c->sock.fd = fd;
71 c->state = start_state;
72 c->type = 0;
73 c->cmd_read = 0;
74 c->pending_timeout = -1;
75 c->soonest_job = NULL;
76 c->in_job = c->out_job = NULL;
77 c->in_job_read = c->out_job_sent = 0;
78 c->prev = c->next = c; /* must be out of a linked list right now */
79 j = &c->reserved_jobs;
80 j->prev = j->next = j;
82 /* stats */
83 cur_conn_ct++;
84 tot_conn_ct++;
86 return c;
89 void
90 conn_set_producer(conn c)
92 if (c->type & CONN_TYPE_PRODUCER) return;
93 c->type |= CONN_TYPE_PRODUCER;
94 cur_producer_ct++; /* stats */
97 void
98 conn_set_worker(conn c)
100 if (c->type & CONN_TYPE_WORKER) return;
101 c->type |= CONN_TYPE_WORKER;
102 cur_worker_ct++; /* stats */
106 count_cur_conns()
108 return cur_conn_ct;
111 uint
112 count_tot_conns()
114 return tot_conn_ct;
118 count_cur_producers()
120 return cur_producer_ct;
124 count_cur_workers()
126 return cur_worker_ct;
129 static int
130 has_reserved_job(conn c)
132 return job_list_any_p(&c->reserved_jobs);
136 static int64
137 conntickat(conn c)
139 int margin = 0, should_timeout = 0;
140 int64 t = INT64_MAX;
142 if (conn_waiting(c)) {
143 margin = SAFETY_MARGIN;
146 if (has_reserved_job(c)) {
147 t = soonest_job(c)->r.deadline_at - nanoseconds() - margin;
148 should_timeout = 1;
150 if (c->pending_timeout >= 0) {
151 t = min(t, ((int64)c->pending_timeout) * 1000000000);
152 should_timeout = 1;
155 if (should_timeout) {
156 return nanoseconds() + t;
158 return 0;
162 void
163 connwant(conn c, int rw, conn list)
165 c->rw = rw;
166 conn_insert(list, c);
167 connsched(c);
171 void
172 connsched(conn c)
174 c->tickat = conntickat(c);
175 srvschedconn(c->srv, c);
179 static int
180 conn_list_any_p(conn head)
182 return head->next != head || head->prev != head;
185 conn
186 conn_remove(conn c)
188 if (!conn_list_any_p(c)) return NULL; /* not in a doubly-linked list */
190 c->next->prev = c->prev;
191 c->prev->next = c->next;
193 c->prev = c->next = c;
194 return c;
197 void
198 conn_insert(conn head, conn c)
200 if (conn_list_any_p(c)) return; /* already in a linked list */
202 c->prev = head->prev;
203 c->next = head;
204 head->prev->next = c;
205 head->prev = c;
208 /* return the reserved job with the earliest deadline,
209 * or NULL if there's no reserved job */
211 soonest_job(conn c)
213 job j = NULL;
214 job soonest = c->soonest_job;
216 if (soonest == NULL) {
217 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
218 if (j->r.deadline_at <= (soonest ? : j)->r.deadline_at) soonest = j;
221 c->soonest_job = soonest;
222 return soonest;
226 has_reserved_this_job(conn c, job j)
228 return j && j->r.state == Reserved && j->reserver == c;
231 /* return true if c has a reserved job with less than one second until its
232 * deadline */
234 conn_has_close_deadline(conn c)
236 int64 t = nanoseconds();
237 job j = soonest_job(c);
239 return j && t >= j->r.deadline_at - SAFETY_MARGIN;
243 conn_ready(conn c)
245 size_t i;
247 for (i = 0; i < c->watch.used; i++) {
248 if (((tube) c->watch.items[i])->ready.len) return 1;
250 return 0;
255 connless(conn a, conn b)
257 return a->tickat < b->tickat;
261 void
262 connrec(conn c, int i)
264 c->tickpos = i;
268 void
269 conn_close(conn c)
271 sockwant(&c->sock, 0);
272 close(c->sock.fd);
274 job_free(c->in_job);
276 /* was this a peek or stats command? */
277 if (c->out_job && !c->out_job->r.id) job_free(c->out_job);
279 c->in_job = c->out_job = NULL;
280 c->in_job_read = 0;
282 if (c->type & CONN_TYPE_PRODUCER) cur_producer_ct--; /* stats */
283 if (c->type & CONN_TYPE_WORKER) cur_worker_ct--; /* stats */
285 cur_conn_ct--; /* stats */
287 remove_waiting_conn(c);
288 conn_remove(c);
289 if (has_reserved_job(c)) enqueue_reserved_jobs(c);
291 ms_clear(&c->watch);
292 c->use->using_ct--;
293 TUBE_ASSIGN(c->use, NULL);
295 conn_free(c);