More descriptive error message.
[beanstalkd.git] / conn.c
blob32f5f169122fb8bc071b3cf00a6f327b76f6157d
1 /* conn.c - network connection state */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 #include <stdlib.h>
20 #include <stdio.h>
21 #include <errno.h>
22 #include <limits.h>
24 #include "conn.h"
25 #include "net.h"
26 #include "util.h"
27 #include "prot.h"
29 #define SAFETY_MARGIN (1 * SECOND)
31 /* Doubly-linked list of free connections. */
32 static struct conn pool = { &pool, &pool, 0 };
34 static int cur_conn_ct = 0, cur_worker_ct = 0, cur_producer_ct = 0;
35 static unsigned int tot_conn_ct = 0;
37 static conn
38 conn_alloc()
40 return conn_remove(pool.next) ? : malloc(sizeof(struct conn));
43 static void
44 conn_free(conn c)
46 c->fd = 0;
47 conn_insert(&pool, c);
50 static void
51 on_watch(ms a, tube t, size_t i)
53 tube_iref(t);
54 t->watching_ct++;
57 static void
58 on_ignore(ms a, tube t, size_t i)
60 t->watching_ct--;
61 tube_dref(t);
64 conn
65 make_conn(int fd, char start_state, tube use, tube watch)
67 job j;
68 conn c;
70 c = conn_alloc();
71 if (!c) return twarn("OOM"), (conn) 0;
73 ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
74 if (!ms_append(&c->watch, watch)) {
75 conn_free(c);
76 return twarn("OOM"), (conn) 0;
79 c->use = NULL; /* initialize */
80 TUBE_ASSIGN(c->use, use);
81 use->using_ct++;
83 c->fd = fd;
84 c->state = start_state;
85 c->type = 0;
86 c->cmd_read = 0;
87 c->pending_timeout = -1;
88 c->soonest_job = NULL;
89 c->in_job = c->out_job = NULL;
90 c->in_job_read = c->out_job_sent = 0;
91 c->prev = c->next = c; /* must be out of a linked list right now */
92 j = &c->reserved_jobs;
93 j->prev = j->next = j;
95 /* stats */
96 cur_conn_ct++;
97 tot_conn_ct++;
99 return c;
102 void
103 conn_set_producer(conn c)
105 if (c->type & CONN_TYPE_PRODUCER) return;
106 c->type |= CONN_TYPE_PRODUCER;
107 cur_producer_ct++; /* stats */
110 void
111 conn_set_worker(conn c)
113 if (c->type & CONN_TYPE_WORKER) return;
114 c->type |= CONN_TYPE_WORKER;
115 cur_worker_ct++; /* stats */
119 count_cur_conns()
121 return cur_conn_ct;
124 unsigned int
125 count_tot_conns()
127 return tot_conn_ct;
131 count_cur_producers()
133 return cur_producer_ct;
137 count_cur_workers()
139 return cur_worker_ct;
142 static int
143 has_reserved_job(conn c)
145 return job_list_any_p(&c->reserved_jobs);
149 conn_set_evq(conn c, const int events, evh handler)
151 int r, margin = 0, should_timeout = 0;
152 struct timeval tv = {INT_MAX, 0};
153 usec t = UINT64_MAX;
155 event_set(&c->evq, c->fd, events, handler, c);
157 if (conn_waiting(c)) margin = SAFETY_MARGIN;
158 if (has_reserved_job(c)) {
159 t = soonest_job(c)->deadline_at - now_usec() - margin;
160 should_timeout = 1;
162 if (c->pending_timeout >= 0) {
163 t = min(t, c->pending_timeout * SECOND);
164 should_timeout = 1;
166 if (should_timeout) timeval_from_usec(&tv, t);
168 r = event_add(&c->evq, should_timeout ? &tv : NULL);
169 if (r == -1) return twarn("event_add() err %d", errno), -1;
171 return 0;
175 conn_update_evq(conn c, const int events)
177 int r;
179 if (!c) return twarnx("c is NULL"), -1;
181 /* If it's been added, try to delete it first */
182 if (c->evq.ev_base) {
183 r = event_del(&c->evq);
184 if (r == -1) return twarn("event_del() err %d", errno), -1;
187 return conn_set_evq(c, events, c->evq.ev_callback);
190 static int
191 conn_list_any_p(conn head)
193 return head->next != head || head->prev != head;
196 conn
197 conn_remove(conn c)
199 if (!conn_list_any_p(c)) return NULL; /* not in a doubly-linked list */
201 c->next->prev = c->prev;
202 c->prev->next = c->next;
204 c->prev = c->next = c;
205 return c;
208 void
209 conn_insert(conn head, conn c)
211 if (conn_list_any_p(c)) return; /* already in a linked list */
213 c->prev = head->prev;
214 c->next = head;
215 head->prev->next = c;
216 head->prev = c;
219 /* return the reserved job with the earliest deadline,
220 * or NULL if there's no reserved job */
222 soonest_job(conn c)
224 job j = NULL;
225 job soonest = c->soonest_job;
227 if (soonest == NULL) {
228 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
229 if (j->deadline_at <= (soonest ? : j)->deadline_at) soonest = j;
232 c->soonest_job = soonest;
233 return soonest;
237 has_reserved_this_job(conn c, job j)
239 return j && j->state == JOB_STATE_RESERVED && j->reserver == c;
242 /* return true if c has a reserved job with less than one second until its
243 * deadline */
245 conn_has_close_deadline(conn c)
247 usec t = now_usec();
248 job j = soonest_job(c);
250 return j && t >= j->deadline_at - SAFETY_MARGIN;
254 conn_ready(conn c)
256 size_t i;
258 for (i = 0; i < c->watch.used; i++) {
259 if (((tube) c->watch.items[i])->ready.used) return 1;
261 return 0;
264 void
265 conn_close(conn c)
267 event_del(&c->evq);
269 close(c->fd);
271 job_free(c->in_job);
273 /* was this a peek or stats command? */
274 if (c->out_job && !c->out_job->id) job_free(c->out_job);
276 c->in_job = c->out_job = NULL;
277 c->in_job_read = 0;
279 if (c->type & CONN_TYPE_PRODUCER) cur_producer_ct--; /* stats */
280 if (c->type & CONN_TYPE_WORKER) cur_worker_ct--; /* stats */
282 cur_conn_ct--; /* stats */
284 unbrake(NULL);
285 remove_waiting_conn(c);
286 conn_remove(c);
287 if (has_reserved_job(c)) enqueue_reserved_jobs(c);
289 ms_clear(&c->watch);
290 c->use->using_ct--;
291 TUBE_ASSIGN(c->use, NULL);
293 conn_free(c);