release 1.10
[beanstalkd.git] / conn.c
blobcc45b617f23eac1ada4ea469483236d1d6329771
1 #include <stdint.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 #include <string.h>
5 #include <errno.h>
6 #include <limits.h>
7 #include <unistd.h>
8 #include "dat.h"
10 #define SAFETY_MARGIN (1000000000) /* 1 second */
12 static int cur_conn_ct = 0, cur_worker_ct = 0, cur_producer_ct = 0;
13 static uint tot_conn_ct = 0;
14 int verbose = 0;
16 static void
17 on_watch(ms a, tube t, size_t i)
19 tube_iref(t);
20 t->watching_ct++;
23 static void
24 on_ignore(ms a, tube t, size_t i)
26 t->watching_ct--;
27 tube_dref(t);
30 Conn *
31 make_conn(int fd, char start_state, tube use, tube watch)
33 job j;
34 Conn *c;
36 c = new(Conn);
37 if (!c) return twarn("OOM"), NULL;
39 ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
40 if (!ms_append(&c->watch, watch)) {
41 free(c);
42 return twarn("OOM"), NULL;
45 TUBE_ASSIGN(c->use, use);
46 use->using_ct++;
48 c->sock.fd = fd;
49 c->state = start_state;
50 c->pending_timeout = -1;
51 c->tickpos = -1;
52 j = &c->reserved_jobs;
53 j->prev = j->next = j;
55 /* stats */
56 cur_conn_ct++;
57 tot_conn_ct++;
59 return c;
62 void
63 connsetproducer(Conn *c)
65 if (c->type & CONN_TYPE_PRODUCER) return;
66 c->type |= CONN_TYPE_PRODUCER;
67 cur_producer_ct++; /* stats */
70 void
71 connsetworker(Conn *c)
73 if (c->type & CONN_TYPE_WORKER) return;
74 c->type |= CONN_TYPE_WORKER;
75 cur_worker_ct++; /* stats */
78 int
79 count_cur_conns()
81 return cur_conn_ct;
84 uint
85 count_tot_conns()
87 return tot_conn_ct;
90 int
91 count_cur_producers()
93 return cur_producer_ct;
96 int
97 count_cur_workers()
99 return cur_worker_ct;
102 static int
103 has_reserved_job(Conn *c)
105 return job_list_any_p(&c->reserved_jobs);
109 static int64
110 conntickat(Conn *c)
112 int margin = 0, should_timeout = 0;
113 int64 t = INT64_MAX;
115 if (conn_waiting(c)) {
116 margin = SAFETY_MARGIN;
119 if (has_reserved_job(c)) {
120 t = connsoonestjob(c)->r.deadline_at - nanoseconds() - margin;
121 should_timeout = 1;
123 if (c->pending_timeout >= 0) {
124 t = min(t, ((int64)c->pending_timeout) * 1000000000);
125 should_timeout = 1;
128 if (should_timeout) {
129 return nanoseconds() + t;
131 return 0;
135 void
136 connwant(Conn *c, int rw)
138 c->rw = rw;
139 connsched(c);
143 void
144 connsched(Conn *c)
146 if (c->tickpos > -1) {
147 heapremove(&c->srv->conns, c->tickpos);
149 c->tickat = conntickat(c);
150 if (c->tickat) {
151 heapinsert(&c->srv->conns, c);
156 /* return the reserved job with the earliest deadline,
157 * or NULL if there's no reserved job */
159 connsoonestjob(Conn *c)
161 job j = NULL;
162 job soonest = c->soonest_job;
164 if (soonest == NULL) {
165 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
166 if (j->r.deadline_at <= (soonest ? : j)->r.deadline_at) soonest = j;
169 c->soonest_job = soonest;
170 return soonest;
174 /* return true if c has a reserved job with less than one second until its
175 * deadline */
177 conndeadlinesoon(Conn *c)
179 int64 t = nanoseconds();
180 job j = connsoonestjob(c);
182 return j && t >= j->r.deadline_at - SAFETY_MARGIN;
186 conn_ready(Conn *c)
188 size_t i;
190 for (i = 0; i < c->watch.used; i++) {
191 if (((tube) c->watch.items[i])->ready.len) return 1;
193 return 0;
198 connless(Conn *a, Conn *b)
200 return a->tickat < b->tickat;
204 void
205 connrec(Conn *c, int i)
207 c->tickpos = i;
211 void
212 connclose(Conn *c)
214 sockwant(&c->sock, 0);
215 close(c->sock.fd);
216 if (verbose) {
217 printf("close %d\n", c->sock.fd);
220 job_free(c->in_job);
222 /* was this a peek or stats command? */
223 if (c->out_job && !c->out_job->r.id) job_free(c->out_job);
225 c->in_job = c->out_job = NULL;
226 c->in_job_read = 0;
228 if (c->type & CONN_TYPE_PRODUCER) cur_producer_ct--; /* stats */
229 if (c->type & CONN_TYPE_WORKER) cur_worker_ct--; /* stats */
231 cur_conn_ct--; /* stats */
233 remove_waiting_conn(c);
234 if (has_reserved_job(c)) enqueue_reserved_jobs(c);
236 ms_clear(&c->watch);
237 c->use->using_ct--;
238 TUBE_ASSIGN(c->use, NULL);
240 if (c->tickpos > -1) {
241 heapremove(&c->srv->conns, c->tickpos);
244 free(c);