9 #define SAFETY_MARGIN (1000000000) /* 1 second */
11 static int cur_conn_ct
= 0, cur_worker_ct
= 0, cur_producer_ct
= 0;
12 static uint tot_conn_ct
= 0;
15 on_watch(ms a
, tube t
, size_t i
)
22 on_ignore(ms a
, tube t
, size_t i
)
29 make_conn(int fd
, char start_state
, tube use
, tube watch
)
35 if (!c
) return twarn("OOM"), (conn
) 0;
37 ms_init(&c
->watch
, (ms_event_fn
) on_watch
, (ms_event_fn
) on_ignore
);
38 if (!ms_append(&c
->watch
, watch
)) {
40 return twarn("OOM"), (conn
) 0;
43 c
->use
= NULL
; /* initialize */
44 TUBE_ASSIGN(c
->use
, use
);
48 c
->state
= start_state
;
51 c
->pending_timeout
= -1;
52 c
->soonest_job
= NULL
;
53 c
->in_job
= c
->out_job
= NULL
;
54 c
->in_job_read
= c
->out_job_sent
= 0;
55 c
->prev
= c
->next
= c
; /* must be out of a linked list right now */
56 j
= &c
->reserved_jobs
;
57 j
->prev
= j
->next
= j
;
67 conn_set_producer(conn c
)
69 if (c
->type
& CONN_TYPE_PRODUCER
) return;
70 c
->type
|= CONN_TYPE_PRODUCER
;
71 cur_producer_ct
++; /* stats */
75 conn_set_worker(conn c
)
77 if (c
->type
& CONN_TYPE_WORKER
) return;
78 c
->type
|= CONN_TYPE_WORKER
;
79 cur_worker_ct
++; /* stats */
97 return cur_producer_ct
;
103 return cur_worker_ct
;
107 has_reserved_job(conn c
)
109 return job_list_any_p(&c
->reserved_jobs
);
116 int margin
= 0, should_timeout
= 0;
119 if (conn_waiting(c
)) {
120 margin
= SAFETY_MARGIN
;
123 if (has_reserved_job(c
)) {
124 t
= soonest_job(c
)->r
.deadline_at
- nanoseconds() - margin
;
127 if (c
->pending_timeout
>= 0) {
128 t
= min(t
, ((int64
)c
->pending_timeout
) * 1000000000);
132 if (should_timeout
) {
133 return nanoseconds() + t
;
140 connwant(conn c
, int rw
, conn list
)
143 conn_insert(list
, c
);
151 c
->tickat
= conntickat(c
);
152 srvschedconn(c
->srv
, c
);
157 conn_list_any_p(conn head
)
159 return head
->next
!= head
|| head
->prev
!= head
;
165 if (!conn_list_any_p(c
)) return NULL
; /* not in a doubly-linked list */
167 c
->next
->prev
= c
->prev
;
168 c
->prev
->next
= c
->next
;
170 c
->prev
= c
->next
= c
;
175 conn_insert(conn head
, conn c
)
177 if (conn_list_any_p(c
)) return; /* already in a linked list */
179 c
->prev
= head
->prev
;
181 head
->prev
->next
= c
;
185 /* return the reserved job with the earliest deadline,
186 * or NULL if there's no reserved job */
191 job soonest
= c
->soonest_job
;
193 if (soonest
== NULL
) {
194 for (j
= c
->reserved_jobs
.next
; j
!= &c
->reserved_jobs
; j
= j
->next
) {
195 if (j
->r
.deadline_at
<= (soonest
? : j
)->r
.deadline_at
) soonest
= j
;
198 c
->soonest_job
= soonest
;
203 has_reserved_this_job(conn c
, job j
)
205 return j
&& j
->r
.state
== Reserved
&& j
->reserver
== c
;
208 /* return true if c has a reserved job with less than one second until its
211 conn_has_close_deadline(conn c
)
213 int64 t
= nanoseconds();
214 job j
= soonest_job(c
);
216 return j
&& t
>= j
->r
.deadline_at
- SAFETY_MARGIN
;
224 for (i
= 0; i
< c
->watch
.used
; i
++) {
225 if (((tube
) c
->watch
.items
[i
])->ready
.len
) return 1;
232 connless(conn a
, conn b
)
234 return a
->tickat
< b
->tickat
;
239 connrec(conn c
, int i
)
248 sockwant(&c
->sock
, 0);
253 /* was this a peek or stats command? */
254 if (c
->out_job
&& !c
->out_job
->r
.id
) job_free(c
->out_job
);
256 c
->in_job
= c
->out_job
= NULL
;
259 if (c
->type
& CONN_TYPE_PRODUCER
) cur_producer_ct
--; /* stats */
260 if (c
->type
& CONN_TYPE_WORKER
) cur_worker_ct
--; /* stats */
262 cur_conn_ct
--; /* stats */
264 remove_waiting_conn(c
);
266 if (has_reserved_job(c
)) enqueue_reserved_jobs(c
);
270 TUBE_ASSIGN(c
->use
, NULL
);