add changelog for 1.13
[beanstalkd.git] / conn.c
blob07272fa2c2063c608f5ef7160d79098cd4f8039c
1 #include "dat.h"
2 #include <errno.h>
3 #include <limits.h>
4 #include <stdint.h>
5 #include <stdio.h>
6 #include <stdlib.h>
7 #include <string.h>
8 #include <unistd.h>
10 #define SAFETY_MARGIN (1000000000) /* 1 second */
12 static int cur_conn_ct = 0, cur_worker_ct = 0, cur_producer_ct = 0;
13 static uint tot_conn_ct = 0;
14 int verbose = 0;
16 static void
17 on_watch(Ms *a, Tube *t, size_t i)
19 UNUSED_PARAMETER(a);
20 UNUSED_PARAMETER(i);
21 tube_iref(t);
22 t->watching_ct++;
25 static void
26 on_ignore(Ms *a, Tube *t, size_t i)
28 UNUSED_PARAMETER(a);
29 UNUSED_PARAMETER(i);
30 t->watching_ct--;
31 tube_dref(t);
34 Conn *
35 make_conn(int fd, char start_state, Tube *use, Tube *watch)
37 Conn *c = new(Conn);
38 if (!c) {
39 twarn("OOM");
40 return NULL;
43 ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
44 if (!ms_append(&c->watch, watch)) {
45 free(c);
46 twarn("OOM");
47 return NULL;
50 TUBE_ASSIGN(c->use, use);
51 use->using_ct++;
53 c->state = start_state;
54 c->pending_timeout = -1;
55 c->tickpos = 0; // Does not mean anything if in_conns is set to 0.
56 c->in_conns = 0;
58 // The list is empty.
59 job_list_reset(&c->reserved_jobs);
61 /* stats */
62 cur_conn_ct++;
63 tot_conn_ct++;
65 return c;
68 void
69 connsetproducer(Conn *c)
71 if (c->type & CONN_TYPE_PRODUCER) return;
72 c->type |= CONN_TYPE_PRODUCER;
73 cur_producer_ct++; /* stats */
76 void
77 connsetworker(Conn *c)
79 if (c->type & CONN_TYPE_WORKER) return;
80 c->type |= CONN_TYPE_WORKER;
81 cur_worker_ct++; /* stats */
84 int
85 count_cur_conns()
87 return cur_conn_ct;
90 uint
91 count_tot_conns()
93 return tot_conn_ct;
96 int
97 count_cur_producers()
99 return cur_producer_ct;
103 count_cur_workers()
105 return cur_worker_ct;
108 static int
109 has_reserved_job(Conn *c)
111 return !job_list_is_empty(&c->reserved_jobs);
115 // Returns positive nanoseconds when c should tick, 0 otherwise.
116 static int64
117 conntickat(Conn *c)
119 int margin = 0, should_timeout = 0;
120 int64 t = INT64_MAX;
122 if (conn_waiting(c)) {
123 margin = SAFETY_MARGIN;
126 if (has_reserved_job(c)) {
127 t = connsoonestjob(c)->r.deadline_at - nanoseconds() - margin;
128 should_timeout = 1;
130 if (c->pending_timeout >= 0) {
131 t = min(t, ((int64)c->pending_timeout) * 1000000000);
132 should_timeout = 1;
135 if (should_timeout) {
136 return nanoseconds() + t;
138 return 0;
142 // Remove c from the c->srv heap and reschedule it using the value
143 // returned by conntickat if there is an outstanding timeout in the c.
144 void
145 connsched(Conn *c)
147 if (c->in_conns) {
148 heapremove(&c->srv->conns, c->tickpos);
149 c->in_conns = 0;
151 c->tickat = conntickat(c);
152 if (c->tickat) {
153 heapinsert(&c->srv->conns, c);
154 c->in_conns = 1;
158 // conn_set_soonestjob updates c->soonest_job with j
159 // if j should be handled sooner than c->soonest_job.
160 static void
161 conn_set_soonestjob(Conn *c, Job *j) {
162 if (!c->soonest_job || j->r.deadline_at < c->soonest_job->r.deadline_at) {
163 c->soonest_job = j;
167 // Return the reserved job with the earliest deadline,
168 // or NULL if there's no reserved job.
169 Job *
170 connsoonestjob(Conn *c)
172 // use cached value and bail out.
173 if (c->soonest_job != NULL)
174 return c->soonest_job;
176 Job *j = NULL;
177 for (j = c->reserved_jobs.next; j != &c->reserved_jobs; j = j->next) {
178 conn_set_soonestjob(c, j);
180 return c->soonest_job;
183 void
184 conn_reserve_job(Conn *c, Job *j) {
185 j->tube->stat.reserved_ct++;
186 j->r.reserve_ct++;
188 j->r.deadline_at = nanoseconds() + j->r.ttr;
189 j->r.state = Reserved;
190 job_list_insert(&c->reserved_jobs, j);
191 j->reserver = c;
192 c->pending_timeout = -1;
193 conn_set_soonestjob(c, j);
196 // Return true if c has a reserved job with less than one second until its
197 // deadline.
199 conndeadlinesoon(Conn *c)
201 int64 t = nanoseconds();
202 Job *j = connsoonestjob(c);
204 return j && t >= j->r.deadline_at - SAFETY_MARGIN;
208 conn_ready(Conn *c)
210 size_t i;
212 for (i = 0; i < c->watch.len; i++) {
213 if (((Tube *) c->watch.items[i])->ready.len)
214 return 1;
216 return 0;
221 conn_less(void *ca, void *cb)
223 Conn *a = (Conn *)ca;
224 Conn *b = (Conn *)cb;
225 return a->tickat < b->tickat;
229 void
230 conn_setpos(void *c, size_t i)
232 ((Conn *)c)->tickpos = i;
236 void
237 connclose(Conn *c)
239 sockwant(&c->sock, 0);
240 close(c->sock.fd);
241 if (verbose) {
242 printf("close %d\n", c->sock.fd);
245 job_free(c->in_job);
247 /* was this a peek or stats command? */
248 if (c->out_job && c->out_job->r.state == Copy)
249 job_free(c->out_job);
251 c->in_job = c->out_job = NULL;
252 c->in_job_read = 0;
254 if (c->type & CONN_TYPE_PRODUCER) cur_producer_ct--; /* stats */
255 if (c->type & CONN_TYPE_WORKER) cur_worker_ct--; /* stats */
257 cur_conn_ct--; /* stats */
259 remove_waiting_conn(c);
260 if (has_reserved_job(c))
261 enqueue_reserved_jobs(c);
263 ms_clear(&c->watch);
264 c->use->using_ct--;
265 TUBE_ASSIGN(c->use, NULL);
267 if (c->in_conns) {
268 heapremove(&c->srv->conns, c->tickpos);
269 c->in_conns = 0;
272 free(c);