factor out common bsd code
[beanstalkd.git] / conn.c
blobe7b2d444db5dbb4c50a995c4e4b5720c40436691
1 /* conn.c - network connection state */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdint.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <errno.h>
23 #include <limits.h>
24 #include <unistd.h>
25 #include "dat.h"
27 #define SAFETY_MARGIN (1000000000) /* 1 second */
29 /* Doubly-linked list of free connections. */
30 static struct conn pool = { &pool, &pool };
32 static int cur_conn_ct = 0, cur_worker_ct = 0, cur_producer_ct = 0;
33 static uint tot_conn_ct = 0;
35 static conn
36 conn_alloc()
38 conn c;
40 c = conn_remove(pool.next);
41 if (!c) {
42 c = malloc(sizeof(struct conn));
45 return memset(c, 0, sizeof *c);
48 static void
49 conn_free(conn c)
51 c->sock.fd = 0;
52 conn_insert(&pool, c);
55 static void
56 on_watch(ms a, tube t, size_t i)
58 tube_iref(t);
59 t->watching_ct++;
62 static void
63 on_ignore(ms a, tube t, size_t i)
65 t->watching_ct--;
66 tube_dref(t);
69 conn
70 make_conn(int fd, char start_state, tube use, tube watch)
72 job j;
73 conn c;
75 c = conn_alloc();
76 if (!c) return twarn("OOM"), (conn) 0;
78 ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
79 if (!ms_append(&c->watch, watch)) {
80 conn_free(c);
81 return twarn("OOM"), (conn) 0;
84 c->use = NULL; /* initialize */
85 TUBE_ASSIGN(c->use, use);
86 use->using_ct++;
88 c->sock.fd = fd;
89 c->state = start_state;
90 c->type = 0;
91 c->cmd_read = 0;
92 c->pending_timeout = -1;
93 c->soonest_job = NULL;
94 c->in_job = c->out_job = NULL;
95 c->in_job_read = c->out_job_sent = 0;
96 c->prev = c->next = c; /* must be out of a linked list right now */
97 j = &c->reserved_jobs;
98 j->prev = j->next = j;
100 /* stats */
101 cur_conn_ct++;
102 tot_conn_ct++;
104 return c;
107 void
108 conn_set_producer(conn c)
110 if (c->type & CONN_TYPE_PRODUCER) return;
111 c->type |= CONN_TYPE_PRODUCER;
112 cur_producer_ct++; /* stats */
115 void
116 conn_set_worker(conn c)
118 if (c->type & CONN_TYPE_WORKER) return;
119 c->type |= CONN_TYPE_WORKER;
120 cur_worker_ct++; /* stats */
124 count_cur_conns()
126 return cur_conn_ct;
129 uint
130 count_tot_conns()
132 return tot_conn_ct;
136 count_cur_producers()
138 return cur_producer_ct;
142 count_cur_workers()
144 return cur_worker_ct;
147 static int
148 has_reserved_job(conn c)
150 return job_list_any_p(&c->reserved_jobs);
154 static int64
155 conntickat(conn c)
157 int margin = 0, should_timeout = 0;
158 int64 t = INT64_MAX;
160 if (conn_waiting(c)) {
161 margin = SAFETY_MARGIN;
164 if (has_reserved_job(c)) {
165 t = soonest_job(c)->deadline_at - nanoseconds() - margin;
166 should_timeout = 1;
168 if (c->pending_timeout >= 0) {
169 t = min(t, ((int64)c->pending_timeout) * 1000000000);
170 should_timeout = 1;
173 if (should_timeout) {
174 return nanoseconds() + t;
176 return 0;
180 void
181 connwant(conn c, int rw, conn list)
183 c->rw = rw;
184 conn_insert(list, c);
185 connsched(c);
189 void
190 connsched(conn c)
192 c->tickat = conntickat(c);
193 srvschedconn(c->srv, c);
197 static int
198 conn_list_any_p(conn head)
200 return head->next != head || head->prev != head;
203 conn
204 conn_remove(conn c)
206 if (!conn_list_any_p(c)) return NULL; /* not in a doubly-linked list */
208 c->next->prev = c->prev;
209 c->prev->next = c->next;
211 c->prev = c->next = c;
212 return c;
215 void
216 conn_insert(conn head, conn c)
218 if (conn_list_any_p(c)) return; /* already in a linked list */
220 c->prev = head->prev;
221 c->next = head;
222 head->prev->next = c;
223 head->prev = c;
226 /* return the reserved job with the earliest deadline,
227 * or NULL if there's no reserved job */
229 soonest_job(conn c)
231 job j = NULL;
232 job soonest = c->soonest_job;
234 if (soonest == NULL) {
235 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
236 if (j->deadline_at <= (soonest ? : j)->deadline_at) soonest = j;
239 c->soonest_job = soonest;
240 return soonest;
244 has_reserved_this_job(conn c, job j)
246 return j && j->state == JOB_STATE_RESERVED && j->reserver == c;
249 /* return true if c has a reserved job with less than one second until its
250 * deadline */
252 conn_has_close_deadline(conn c)
254 int64 t = nanoseconds();
255 job j = soonest_job(c);
257 return j && t >= j->deadline_at - SAFETY_MARGIN;
261 conn_ready(conn c)
263 size_t i;
265 for (i = 0; i < c->watch.used; i++) {
266 if (((tube) c->watch.items[i])->ready.len) return 1;
268 return 0;
273 conncmp(conn a, conn b)
275 if (a->tickat > b->tickat) return 1;
276 if (a->tickat < b->tickat) return -1;
277 return 0;
281 void
282 connrec(conn c, int i)
284 c->tickpos = i;
288 void
289 conn_close(conn c)
291 sockwant(&c->sock, 0);
292 close(c->sock.fd);
294 job_free(c->in_job);
296 /* was this a peek or stats command? */
297 if (c->out_job && !c->out_job->id) job_free(c->out_job);
299 c->in_job = c->out_job = NULL;
300 c->in_job_read = 0;
302 if (c->type & CONN_TYPE_PRODUCER) cur_producer_ct--; /* stats */
303 if (c->type & CONN_TYPE_WORKER) cur_worker_ct--; /* stats */
305 cur_conn_ct--; /* stats */
307 remove_waiting_conn(c);
308 conn_remove(c);
309 if (has_reserved_job(c)) enqueue_reserved_jobs(c);
311 ms_clear(&c->watch);
312 c->use->using_ct--;
313 TUBE_ASSIGN(c->use, NULL);
315 conn_free(c);