10 #define SAFETY_MARGIN (1000000000) /* 1 second */
12 static int cur_conn_ct
= 0, cur_worker_ct
= 0, cur_producer_ct
= 0;
13 static uint tot_conn_ct
= 0;
17 on_watch(ms a
, tube t
, size_t i
)
24 on_ignore(ms a
, tube t
, size_t i
)
31 make_conn(int fd
, char start_state
, tube use
, tube watch
)
37 if (!c
) return twarn("OOM"), NULL
;
39 ms_init(&c
->watch
, (ms_event_fn
) on_watch
, (ms_event_fn
) on_ignore
);
40 if (!ms_append(&c
->watch
, watch
)) {
42 return twarn("OOM"), NULL
;
45 TUBE_ASSIGN(c
->use
, use
);
49 c
->state
= start_state
;
50 c
->pending_timeout
= -1;
52 j
= &c
->reserved_jobs
;
53 j
->prev
= j
->next
= j
;
63 connsetproducer(Conn
*c
)
65 if (c
->type
& CONN_TYPE_PRODUCER
) return;
66 c
->type
|= CONN_TYPE_PRODUCER
;
67 cur_producer_ct
++; /* stats */
71 connsetworker(Conn
*c
)
73 if (c
->type
& CONN_TYPE_WORKER
) return;
74 c
->type
|= CONN_TYPE_WORKER
;
75 cur_worker_ct
++; /* stats */
93 return cur_producer_ct
;
103 has_reserved_job(Conn
*c
)
105 return job_list_any_p(&c
->reserved_jobs
);
112 int margin
= 0, should_timeout
= 0;
115 if (conn_waiting(c
)) {
116 margin
= SAFETY_MARGIN
;
119 if (has_reserved_job(c
)) {
120 t
= connsoonestjob(c
)->r
.deadline_at
- nanoseconds() - margin
;
123 if (c
->pending_timeout
>= 0) {
124 t
= min(t
, ((int64
)c
->pending_timeout
) * 1000000000);
128 if (should_timeout
) {
129 return nanoseconds() + t
;
136 connwant(Conn
*c
, int rw
)
146 if (c
->tickpos
> -1) {
147 heapremove(&c
->srv
->conns
, c
->tickpos
);
149 c
->tickat
= conntickat(c
);
151 heapinsert(&c
->srv
->conns
, c
);
156 /* return the reserved job with the earliest deadline,
157 * or NULL if there's no reserved job */
159 connsoonestjob(Conn
*c
)
162 job soonest
= c
->soonest_job
;
164 if (soonest
== NULL
) {
165 for (j
= c
->reserved_jobs
.next
; j
!= &c
->reserved_jobs
; j
= j
->next
) {
166 if (j
->r
.deadline_at
<= (soonest
? : j
)->r
.deadline_at
) soonest
= j
;
169 c
->soonest_job
= soonest
;
174 /* return true if c has a reserved job with less than one second until its
177 conndeadlinesoon(Conn
*c
)
179 int64 t
= nanoseconds();
180 job j
= connsoonestjob(c
);
182 return j
&& t
>= j
->r
.deadline_at
- SAFETY_MARGIN
;
190 for (i
= 0; i
< c
->watch
.used
; i
++) {
191 if (((tube
) c
->watch
.items
[i
])->ready
.len
) return 1;
198 connless(Conn
*a
, Conn
*b
)
200 return a
->tickat
< b
->tickat
;
205 connrec(Conn
*c
, int i
)
214 sockwant(&c
->sock
, 0);
217 printf("close %d\n", c
->sock
.fd
);
222 /* was this a peek or stats command? */
223 if (c
->out_job
&& !c
->out_job
->r
.id
) job_free(c
->out_job
);
225 c
->in_job
= c
->out_job
= NULL
;
228 if (c
->type
& CONN_TYPE_PRODUCER
) cur_producer_ct
--; /* stats */
229 if (c
->type
& CONN_TYPE_WORKER
) cur_worker_ct
--; /* stats */
231 cur_conn_ct
--; /* stats */
233 remove_waiting_conn(c
);
234 if (has_reserved_job(c
)) enqueue_reserved_jobs(c
);
238 TUBE_ASSIGN(c
->use
, NULL
);
240 if (c
->tickpos
> -1) {
241 heapremove(&c
->srv
->conns
, c
->tickpos
);