1 /* conn.c - network connection state */
3 /* Copyright (C) 2007 Keith Rarick and Philotic Inc.
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
29 #define SAFETY_MARGIN 1 /* seconds */
31 /* Doubly-linked list of free connections. */
32 static struct conn pool
= { &pool
, &pool
, 0 };
34 static int cur_conn_ct
= 0, cur_worker_ct
= 0, cur_producer_ct
= 0;
35 static unsigned int tot_conn_ct
= 0;
40 return conn_remove(pool
.next
) ? : malloc(sizeof(struct conn
));
47 conn_insert(&pool
, c
);
51 on_watch(ms a
, tube t
, size_t i
)
58 on_ignore(ms a
, tube t
, size_t i
)
65 make_conn(int fd
, char start_state
, tube use
, tube watch
)
71 if (!c
) return twarn("OOM"), NULL
;
73 ms_init(&c
->watch
, (ms_event_fn
) on_watch
, (ms_event_fn
) on_ignore
);
74 if (!ms_append(&c
->watch
, watch
)) {
76 return twarn("OOM"), NULL
;
79 c
->use
= NULL
; /* initialize */
80 TUBE_ASSIGN(c
->use
, use
);
84 c
->state
= start_state
;
87 c
->in_job
= c
->out_job
= NULL
;
88 c
->in_job_read
= c
->out_job_sent
= 0;
89 c
->prev
= c
->next
= c
; /* must be out of a linked list right now */
90 j
= &c
->reserved_jobs
;
91 j
->prev
= j
->next
= j
;
101 conn_set_producer(conn c
)
103 if (c
->type
& CONN_TYPE_PRODUCER
) return;
104 c
->type
|= CONN_TYPE_PRODUCER
;
105 cur_producer_ct
++; /* stats */
109 conn_set_worker(conn c
)
111 if (c
->type
& CONN_TYPE_WORKER
) return;
112 c
->type
|= CONN_TYPE_WORKER
;
113 cur_worker_ct
++; /* stats */
129 count_cur_producers()
131 return cur_producer_ct
;
137 return cur_worker_ct
;
141 has_reserved_job(conn c
)
143 return job_list_any_p(&c
->reserved_jobs
);
147 conn_set_evq(conn c
, const int events
, evh handler
)
150 struct timeval tv
= {0, 0};
152 event_set(&c
->evq
, c
->fd
, events
, handler
, c
);
154 if (conn_waiting(c
)) margin
= 1;
155 if (has_reserved_job(c
)) {
156 time_t t
= soonest_job(c
)->deadline
- time(NULL
) - margin
;
157 tv
.tv_sec
= t
> 0 ? t
: 0;
160 r
= event_add(&c
->evq
, has_reserved_job(c
) ? &tv
: NULL
);
161 if (r
== -1) return twarn("event_add() err %d", errno
), -1;
167 conn_update_evq(conn c
, const int events
)
171 if (!c
) return twarnx("c is NULL"), -1;
173 /* If it's been added, try to delete it first */
174 if (c
->evq
.ev_base
) {
175 r
= event_del(&c
->evq
);
176 if (r
== -1) return twarn("event_del() err %d", errno
), -1;
179 return conn_set_evq(c
, events
, c
->evq
.ev_callback
);
183 conn_list_any_p(conn head
)
185 return head
->next
!= head
|| head
->prev
!= head
;
191 if (!conn_list_any_p(c
)) return NULL
; /* not in a doubly-linked list */
193 c
->next
->prev
= c
->prev
;
194 c
->prev
->next
= c
->next
;
196 c
->prev
= c
->next
= c
;
201 conn_insert(conn head
, conn c
)
203 if (conn_list_any_p(c
)) return; /* already in a linked list */
205 c
->prev
= head
->prev
;
207 head
->prev
->next
= c
;
211 /* return the reserved job with the earliest deadline,
212 * or NULL if there's no reserved job */
216 job j
, soonest
= NULL
;
218 for (j
= c
->reserved_jobs
.next
; j
!= &c
->reserved_jobs
; j
= j
->next
) {
219 if (j
->deadline
<= (soonest
? : j
)->deadline
) soonest
= j
;
225 has_reserved_this_job(conn c
, job needle
)
229 for (j
= c
->reserved_jobs
.next
; j
!= &c
->reserved_jobs
; j
= j
->next
) {
230 if (needle
== j
) return 1;
235 /* return true if c has a reserved job with less than one second until its
238 conn_has_close_deadline(conn c
)
240 time_t t
= time(NULL
);
241 job j
= soonest_job(c
);
243 return j
&& t
>= j
->deadline
- SAFETY_MARGIN
;
251 for (i
= 0; i
< c
->watch
.used
; i
++) {
252 if (((tube
) c
->watch
.items
[i
])->ready
.used
) return 1;
266 /* was this a peek or stats command? */
267 if (!has_reserved_this_job(c
, c
->out_job
)) job_free(c
->out_job
);
269 c
->in_job
= c
->out_job
= NULL
;
272 if (c
->type
& CONN_TYPE_PRODUCER
) cur_producer_ct
--; /* stats */
273 if (c
->type
& CONN_TYPE_WORKER
) cur_worker_ct
--; /* stats */
275 cur_conn_ct
--; /* stats */
278 remove_waiting_conn(c
);
280 if (has_reserved_job(c
)) enqueue_reserved_jobs(c
);
284 TUBE_ASSIGN(c
->use
, NULL
);