9 #define SAFETY_MARGIN (1000000000) /* 1 second */
11 /* Doubly-linked list of free connections. */
12 static struct conn pool
= { &pool
, &pool
};
14 static int cur_conn_ct
= 0, cur_worker_ct
= 0, cur_producer_ct
= 0;
15 static uint tot_conn_ct
= 0;
22 c
= conn_remove(pool
.next
);
24 c
= malloc(sizeof(struct conn
));
27 return memset(c
, 0, sizeof *c
);
34 conn_insert(&pool
, c
);
38 on_watch(ms a
, tube t
, size_t i
)
45 on_ignore(ms a
, tube t
, size_t i
)
52 make_conn(int fd
, char start_state
, tube use
, tube watch
)
58 if (!c
) return twarn("OOM"), (conn
) 0;
60 ms_init(&c
->watch
, (ms_event_fn
) on_watch
, (ms_event_fn
) on_ignore
);
61 if (!ms_append(&c
->watch
, watch
)) {
63 return twarn("OOM"), (conn
) 0;
66 c
->use
= NULL
; /* initialize */
67 TUBE_ASSIGN(c
->use
, use
);
71 c
->state
= start_state
;
74 c
->pending_timeout
= -1;
75 c
->soonest_job
= NULL
;
76 c
->in_job
= c
->out_job
= NULL
;
77 c
->in_job_read
= c
->out_job_sent
= 0;
78 c
->prev
= c
->next
= c
; /* must be out of a linked list right now */
79 j
= &c
->reserved_jobs
;
80 j
->prev
= j
->next
= j
;
90 conn_set_producer(conn c
)
92 if (c
->type
& CONN_TYPE_PRODUCER
) return;
93 c
->type
|= CONN_TYPE_PRODUCER
;
94 cur_producer_ct
++; /* stats */
98 conn_set_worker(conn c
)
100 if (c
->type
& CONN_TYPE_WORKER
) return;
101 c
->type
|= CONN_TYPE_WORKER
;
102 cur_worker_ct
++; /* stats */
118 count_cur_producers()
120 return cur_producer_ct
;
126 return cur_worker_ct
;
130 has_reserved_job(conn c
)
132 return job_list_any_p(&c
->reserved_jobs
);
139 int margin
= 0, should_timeout
= 0;
142 if (conn_waiting(c
)) {
143 margin
= SAFETY_MARGIN
;
146 if (has_reserved_job(c
)) {
147 t
= soonest_job(c
)->r
.deadline_at
- nanoseconds() - margin
;
150 if (c
->pending_timeout
>= 0) {
151 t
= min(t
, ((int64
)c
->pending_timeout
) * 1000000000);
155 if (should_timeout
) {
156 return nanoseconds() + t
;
163 connwant(conn c
, int rw
, conn list
)
166 conn_insert(list
, c
);
174 c
->tickat
= conntickat(c
);
175 srvschedconn(c
->srv
, c
);
180 conn_list_any_p(conn head
)
182 return head
->next
!= head
|| head
->prev
!= head
;
188 if (!conn_list_any_p(c
)) return NULL
; /* not in a doubly-linked list */
190 c
->next
->prev
= c
->prev
;
191 c
->prev
->next
= c
->next
;
193 c
->prev
= c
->next
= c
;
198 conn_insert(conn head
, conn c
)
200 if (conn_list_any_p(c
)) return; /* already in a linked list */
202 c
->prev
= head
->prev
;
204 head
->prev
->next
= c
;
208 /* return the reserved job with the earliest deadline,
209 * or NULL if there's no reserved job */
214 job soonest
= c
->soonest_job
;
216 if (soonest
== NULL
) {
217 for (j
= c
->reserved_jobs
.next
; j
!= &c
->reserved_jobs
; j
= j
->next
) {
218 if (j
->r
.deadline_at
<= (soonest
? : j
)->r
.deadline_at
) soonest
= j
;
221 c
->soonest_job
= soonest
;
226 has_reserved_this_job(conn c
, job j
)
228 return j
&& j
->r
.state
== Reserved
&& j
->reserver
== c
;
231 /* return true if c has a reserved job with less than one second until its
234 conn_has_close_deadline(conn c
)
236 int64 t
= nanoseconds();
237 job j
= soonest_job(c
);
239 return j
&& t
>= j
->r
.deadline_at
- SAFETY_MARGIN
;
247 for (i
= 0; i
< c
->watch
.used
; i
++) {
248 if (((tube
) c
->watch
.items
[i
])->ready
.len
) return 1;
255 connless(conn a
, conn b
)
257 return a
->tickat
< b
->tickat
;
262 connrec(conn c
, int i
)
271 sockwant(&c
->sock
, 0);
276 /* was this a peek or stats command? */
277 if (c
->out_job
&& !c
->out_job
->r
.id
) job_free(c
->out_job
);
279 c
->in_job
= c
->out_job
= NULL
;
282 if (c
->type
& CONN_TYPE_PRODUCER
) cur_producer_ct
--; /* stats */
283 if (c
->type
& CONN_TYPE_WORKER
) cur_worker_ct
--; /* stats */
285 cur_conn_ct
--; /* stats */
287 remove_waiting_conn(c
);
289 if (has_reserved_job(c
)) enqueue_reserved_jobs(c
);
293 TUBE_ASSIGN(c
->use
, NULL
);