1 /* conn.c - network connection state */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 #define SAFETY_MARGIN 1 /* seconds */
32 /* Doubly-linked list of free connections. */
33 static struct conn pool
= { &pool
, &pool
, 0 };
35 static int cur_conn_ct
= 0, cur_worker_ct
= 0, cur_producer_ct
= 0;
36 static unsigned int tot_conn_ct
= 0;
41 return conn_remove(pool
.next
) ? : malloc(sizeof(struct conn
));
48 conn_insert(&pool
, c
);
52 on_watch(ms a
, tube t
, size_t i
)
59 on_ignore(ms a
, tube t
, size_t i
)
66 make_conn(int fd
, char start_state
, tube use
, tube watch
)
72 if (!c
) return twarn("OOM"), NULL
;
74 ms_init(&c
->watch
, (ms_event_fn
) on_watch
, (ms_event_fn
) on_ignore
);
75 if (!ms_append(&c
->watch
, watch
)) {
77 return twarn("OOM"), NULL
;
80 c
->use
= NULL
; /* initialize */
81 TUBE_ASSIGN(c
->use
, use
);
85 c
->state
= start_state
;
88 c
->pending_timeout
= -1;
89 c
->soonest_job
= NULL
;
90 c
->in_job
= c
->out_job
= NULL
;
91 c
->in_job_read
= c
->out_job_sent
= 0;
92 c
->prev
= c
->next
= c
; /* must be out of a linked list right now */
93 j
= &c
->reserved_jobs
;
94 j
->prev
= j
->next
= j
;
104 conn_set_producer(conn c
)
106 if (c
->type
& CONN_TYPE_PRODUCER
) return;
107 c
->type
|= CONN_TYPE_PRODUCER
;
108 cur_producer_ct
++; /* stats */
112 conn_set_worker(conn c
)
114 if (c
->type
& CONN_TYPE_WORKER
) return;
115 c
->type
|= CONN_TYPE_WORKER
;
116 cur_worker_ct
++; /* stats */
132 count_cur_producers()
134 return cur_producer_ct
;
140 return cur_worker_ct
;
144 has_reserved_job(conn c
)
146 return job_list_any_p(&c
->reserved_jobs
);
150 conn_set_evq(conn c
, const int events
, evh handler
)
152 int r
, margin
= 0, should_timeout
= 0;
153 struct timeval tv
= {INT_MAX
, 0};
155 event_set(&c
->evq
, c
->fd
, events
, handler
, c
);
157 if (conn_waiting(c
)) margin
= 1;
158 if (has_reserved_job(c
)) {
159 time_t t
= soonest_job(c
)->deadline
- time(NULL
) - margin
;
160 tv
.tv_sec
= t
> 0 ? t
: 0;
163 if (c
->pending_timeout
>= 0) {
164 tv
.tv_sec
= min(tv
.tv_sec
, c
->pending_timeout
);
168 r
= event_add(&c
->evq
, should_timeout
? &tv
: NULL
);
169 if (r
== -1) return twarn("event_add() err %d", errno
), -1;
175 conn_update_evq(conn c
, const int events
)
179 if (!c
) return twarnx("c is NULL"), -1;
181 /* If it's been added, try to delete it first */
182 if (c
->evq
.ev_base
) {
183 r
= event_del(&c
->evq
);
184 if (r
== -1) return twarn("event_del() err %d", errno
), -1;
187 return conn_set_evq(c
, events
, c
->evq
.ev_callback
);
191 conn_list_any_p(conn head
)
193 return head
->next
!= head
|| head
->prev
!= head
;
199 if (!conn_list_any_p(c
)) return NULL
; /* not in a doubly-linked list */
201 c
->next
->prev
= c
->prev
;
202 c
->prev
->next
= c
->next
;
204 c
->prev
= c
->next
= c
;
209 conn_insert(conn head
, conn c
)
211 if (conn_list_any_p(c
)) return; /* already in a linked list */
213 c
->prev
= head
->prev
;
215 head
->prev
->next
= c
;
219 /* return the reserved job with the earliest deadline,
220 * or NULL if there's no reserved job */
225 job soonest
= c
->soonest_job
;
227 if (soonest
== NULL
) {
228 for (j
= c
->reserved_jobs
.next
; j
!= &c
->reserved_jobs
; j
= j
->next
) {
229 if (j
->deadline
<= (soonest
? : j
)->deadline
) soonest
= j
;
232 c
->soonest_job
= soonest
;
237 has_reserved_this_job(conn c
, job j
)
239 return j
&& j
->state
== JOB_STATE_RESERVED
&& j
->reserver
== c
;
242 /* return true if c has a reserved job with less than one second until its
245 conn_has_close_deadline(conn c
)
247 time_t t
= time(NULL
);
248 job j
= soonest_job(c
);
250 return j
&& t
>= j
->deadline
- SAFETY_MARGIN
;
258 for (i
= 0; i
< c
->watch
.used
; i
++) {
259 if (((tube
) c
->watch
.items
[i
])->ready
.used
) return 1;
273 /* was this a peek or stats command? */
274 if (c
->out_job
&& !c
->out_job
->id
) job_free(c
->out_job
);
276 c
->in_job
= c
->out_job
= NULL
;
279 if (c
->type
& CONN_TYPE_PRODUCER
) cur_producer_ct
--; /* stats */
280 if (c
->type
& CONN_TYPE_WORKER
) cur_worker_ct
--; /* stats */
282 cur_conn_ct
--; /* stats */
285 remove_waiting_conn(c
);
287 if (has_reserved_job(c
)) enqueue_reserved_jobs(c
);
291 TUBE_ASSIGN(c
->use
, NULL
);