simplify conn allocation
[beanstalkd.git] / conn.c
blob7934736bc425a40546234196c5e227e1feb28675
1 #include <stdint.h>
2 #include <stdlib.h>
3 #include <string.h>
4 #include <errno.h>
5 #include <limits.h>
6 #include <unistd.h>
7 #include "dat.h"
9 #define SAFETY_MARGIN (1000000000) /* 1 second */
11 static int cur_conn_ct = 0, cur_worker_ct = 0, cur_producer_ct = 0;
12 static uint tot_conn_ct = 0;
14 static void
15 on_watch(ms a, tube t, size_t i)
17 tube_iref(t);
18 t->watching_ct++;
21 static void
22 on_ignore(ms a, tube t, size_t i)
24 t->watching_ct--;
25 tube_dref(t);
28 conn
29 make_conn(int fd, char start_state, tube use, tube watch)
31 job j;
32 conn c;
34 c = new(struct conn);
35 if (!c) return twarn("OOM"), (conn) 0;
37 ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
38 if (!ms_append(&c->watch, watch)) {
39 free(c);
40 return twarn("OOM"), (conn) 0;
43 c->use = NULL; /* initialize */
44 TUBE_ASSIGN(c->use, use);
45 use->using_ct++;
47 c->sock.fd = fd;
48 c->state = start_state;
49 c->type = 0;
50 c->cmd_read = 0;
51 c->pending_timeout = -1;
52 c->soonest_job = NULL;
53 c->in_job = c->out_job = NULL;
54 c->in_job_read = c->out_job_sent = 0;
55 c->prev = c->next = c; /* must be out of a linked list right now */
56 j = &c->reserved_jobs;
57 j->prev = j->next = j;
59 /* stats */
60 cur_conn_ct++;
61 tot_conn_ct++;
63 return c;
66 void
67 conn_set_producer(conn c)
69 if (c->type & CONN_TYPE_PRODUCER) return;
70 c->type |= CONN_TYPE_PRODUCER;
71 cur_producer_ct++; /* stats */
74 void
75 conn_set_worker(conn c)
77 if (c->type & CONN_TYPE_WORKER) return;
78 c->type |= CONN_TYPE_WORKER;
79 cur_worker_ct++; /* stats */
82 int
83 count_cur_conns()
85 return cur_conn_ct;
88 uint
89 count_tot_conns()
91 return tot_conn_ct;
94 int
95 count_cur_producers()
97 return cur_producer_ct;
101 count_cur_workers()
103 return cur_worker_ct;
106 static int
107 has_reserved_job(conn c)
109 return job_list_any_p(&c->reserved_jobs);
113 static int64
114 conntickat(conn c)
116 int margin = 0, should_timeout = 0;
117 int64 t = INT64_MAX;
119 if (conn_waiting(c)) {
120 margin = SAFETY_MARGIN;
123 if (has_reserved_job(c)) {
124 t = soonest_job(c)->r.deadline_at - nanoseconds() - margin;
125 should_timeout = 1;
127 if (c->pending_timeout >= 0) {
128 t = min(t, ((int64)c->pending_timeout) * 1000000000);
129 should_timeout = 1;
132 if (should_timeout) {
133 return nanoseconds() + t;
135 return 0;
139 void
140 connwant(conn c, int rw, conn list)
142 c->rw = rw;
143 conn_insert(list, c);
144 connsched(c);
148 void
149 connsched(conn c)
151 c->tickat = conntickat(c);
152 srvschedconn(c->srv, c);
156 static int
157 conn_list_any_p(conn head)
159 return head->next != head || head->prev != head;
162 conn
163 conn_remove(conn c)
165 if (!conn_list_any_p(c)) return NULL; /* not in a doubly-linked list */
167 c->next->prev = c->prev;
168 c->prev->next = c->next;
170 c->prev = c->next = c;
171 return c;
174 void
175 conn_insert(conn head, conn c)
177 if (conn_list_any_p(c)) return; /* already in a linked list */
179 c->prev = head->prev;
180 c->next = head;
181 head->prev->next = c;
182 head->prev = c;
185 /* return the reserved job with the earliest deadline,
186 * or NULL if there's no reserved job */
188 soonest_job(conn c)
190 job j = NULL;
191 job soonest = c->soonest_job;
193 if (soonest == NULL) {
194 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
195 if (j->r.deadline_at <= (soonest ? : j)->r.deadline_at) soonest = j;
198 c->soonest_job = soonest;
199 return soonest;
203 has_reserved_this_job(conn c, job j)
205 return j && j->r.state == Reserved && j->reserver == c;
208 /* return true if c has a reserved job with less than one second until its
209 * deadline */
211 conn_has_close_deadline(conn c)
213 int64 t = nanoseconds();
214 job j = soonest_job(c);
216 return j && t >= j->r.deadline_at - SAFETY_MARGIN;
220 conn_ready(conn c)
222 size_t i;
224 for (i = 0; i < c->watch.used; i++) {
225 if (((tube) c->watch.items[i])->ready.len) return 1;
227 return 0;
232 connless(conn a, conn b)
234 return a->tickat < b->tickat;
238 void
239 connrec(conn c, int i)
241 c->tickpos = i;
245 void
246 conn_close(conn c)
248 sockwant(&c->sock, 0);
249 close(c->sock.fd);
251 job_free(c->in_job);
253 /* was this a peek or stats command? */
254 if (c->out_job && !c->out_job->r.id) job_free(c->out_job);
256 c->in_job = c->out_job = NULL;
257 c->in_job_read = 0;
259 if (c->type & CONN_TYPE_PRODUCER) cur_producer_ct--; /* stats */
260 if (c->type & CONN_TYPE_WORKER) cur_worker_ct--; /* stats */
262 cur_conn_ct--; /* stats */
264 remove_waiting_conn(c);
265 conn_remove(c);
266 if (has_reserved_job(c)) enqueue_reserved_jobs(c);
268 ms_clear(&c->watch);
269 c->use->using_ct--;
270 TUBE_ASSIGN(c->use, NULL);
272 free(c);