Add Eduardo's email to copyright header files
[MonkeyD.git] / src / scheduler.c
blobbf7d59b84036d69c9e92481087aa773815802ee7
1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 /* Monkey HTTP Daemon
4 * ------------------
5 * Copyright (C) 2001-2010, Eduardo Silva P. <edsiper@gmail.com>
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Library General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include <pthread.h>
26 #include <sys/epoll.h>
27 #include <unistd.h>
28 #include <sys/syscall.h>
29 #include <string.h>
31 #include "monkey.h"
32 #include "connection.h"
33 #include "scheduler.h"
34 #include "memory.h"
35 #include "epoll.h"
36 #include "request.h"
37 #include "cache.h"
38 #include "config.h"
39 #include "clock.h"
40 #include "signals.h"
41 #include "plugin.h"
42 #include "utils.h"
44 /* Register thread information */
45 int mk_sched_register_thread(pthread_t tid, int efd)
47 int i;
48 struct sched_list_node *sr, *aux;
50 sr = mk_mem_malloc_z(sizeof(struct sched_list_node));
51 sr->tid = tid;
52 sr->pid = -1;
53 sr->epoll_fd = efd;
54 sr->queue = mk_mem_malloc_z(sizeof(struct sched_connection) *
55 config->worker_capacity);
56 sr->request_handler = NULL;
57 sr->next = NULL;
59 for (i = 0; i < config->worker_capacity; i++) {
60 sr->queue[i].status = MK_SCHEDULER_CONN_AVAILABLE;
63 if (!sched_list) {
64 sr->idx = 1;
65 sched_list = sr;
66 return 0;
69 aux = sched_list;
70 while (aux->next) {
71 aux = aux->next;
73 sr->idx = aux->idx + 1;
74 aux->next = sr;
76 return 0;
80 * Create thread which will be listening
81 * for incomings file descriptors
83 int mk_sched_launch_thread(int max_events)
85 int efd;
86 pthread_t tid;
87 pthread_attr_t attr;
88 sched_thread_conf *thconf;
89 pthread_mutex_t mutex_wait_register;
91 /* Creating epoll file descriptor */
92 efd = mk_epoll_create(max_events);
93 if (efd < 1) {
94 return -1;
97 /* Thread stuff */
98 pthread_mutex_init(&mutex_wait_register, (pthread_mutexattr_t *) NULL);
99 pthread_mutex_lock(&mutex_wait_register);
101 thconf = mk_mem_malloc(sizeof(sched_thread_conf));
102 thconf->epoll_fd = efd;
103 thconf->max_events = max_events;
105 pthread_attr_init(&attr);
106 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
107 if (pthread_create(&tid, &attr, mk_sched_launch_epoll_loop,
108 (void *) thconf) != 0) {
109 perror("pthread_create");
110 return -1;
113 /* Register working thread */
114 mk_sched_register_thread(tid, efd);
115 pthread_mutex_unlock(&mutex_wait_register);
117 return 0;
120 /* created thread, all this calls are in the thread context */
121 void *mk_sched_launch_epoll_loop(void *thread_conf)
123 sched_thread_conf *thconf;
124 struct sched_list_node *thinfo;
125 mk_epoll_handlers *handler;
127 /* Avoid SIGPIPE signals */
128 mk_signal_thread_sigpipe_safe();
130 thconf = thread_conf;
132 /* Init specific thread cache */
133 mk_cache_thread_init();
134 mk_plugin_worker_startup();
136 /* Epoll event handlers */
137 handler = mk_epoll_set_handlers((void *) mk_conn_read,
138 (void *) mk_conn_write,
139 (void *) mk_conn_error,
140 (void *) mk_conn_close,
141 (void *) mk_conn_timeout);
143 /* Nasty way to export task id */
144 usleep(1000);
145 thinfo = mk_sched_get_thread_conf();
146 while (!thinfo) {
147 thinfo = mk_sched_get_thread_conf();
150 /* Glibc doesn't export to user space the gettid() syscall */
151 thinfo->pid = syscall(__NR_gettid);
153 mk_sched_set_thread_poll(thconf->epoll_fd);
154 mk_epoll_init(thconf->epoll_fd, handler, thconf->max_events);
156 return 0;
159 struct request_idx *mk_sched_get_request_index()
161 return pthread_getspecific(request_index);
164 void mk_sched_set_request_index(struct request_idx *ri)
166 pthread_setspecific(request_index, (void *) ri);
169 void mk_sched_set_thread_poll(int epoll)
171 pthread_setspecific(epoll_fd, (void *) epoll);
174 int mk_sched_get_thread_poll()
176 return (int) pthread_getspecific(epoll_fd);
179 struct sched_list_node *mk_sched_get_thread_conf()
181 struct sched_list_node *node;
182 pthread_t current;
184 current = pthread_self();
185 node = sched_list;
186 while (node) {
187 if (pthread_equal(node->tid, current) != 0) {
188 return node;
190 node = node->next;
193 return NULL;
196 void mk_sched_update_thread_status(struct sched_list_node *sched,
197 int active, int closed)
199 if (!sched) {
200 sched = mk_sched_get_thread_conf();
203 switch (active) {
204 case MK_SCHEDULER_ACTIVE_UP:
205 sched->active_requests++;
206 break;
207 case MK_SCHEDULER_ACTIVE_DOWN:
208 sched->active_requests--;
209 break;
212 switch (closed) {
213 case MK_SCHEDULER_CLOSED_UP:
214 sched->closed_requests++;
215 break;
216 case MK_SCHEDULER_CLOSED_DOWN:
217 sched->closed_requests--;
218 break;
222 int mk_sched_add_client(struct sched_list_node *sched, int remote_fd)
224 unsigned int i, ret;
226 /* Look for an available slot */
227 for (i = 0; i < config->worker_capacity; i++) {
228 if (sched->queue[i].status == MK_SCHEDULER_CONN_AVAILABLE) {
229 /* Set IP */
230 sched->queue[i].ipv4.data = mk_mem_malloc_z(16);
231 mk_socket_get_ip(remote_fd, sched->queue[i].ipv4.data);
232 mk_pointer_set( &sched->queue[i].ipv4, sched->queue[i].ipv4.data );
234 /* Before to continue, we need to run plugin stage 20 */
235 ret = mk_plugin_stage_run(MK_PLUGIN_STAGE_20,
236 remote_fd,
237 &sched->queue[i], NULL, NULL);
239 /* Close connection, otherwise continue */
240 if (ret == MK_PLUGIN_RET_CLOSE_CONX) {
241 mk_conn_close(remote_fd);
242 return MK_PLUGIN_RET_CLOSE_CONX;
245 /* Socket and status */
246 sched->queue[i].socket = remote_fd;
247 sched->queue[i].status = MK_SCHEDULER_CONN_PENDING;
248 sched->queue[i].arrive_time = log_current_utime;
250 mk_epoll_add_client(sched->epoll_fd, remote_fd, MK_EPOLL_READ,
251 MK_EPOLL_BEHAVIOR_TRIGGERED);
252 return 0;
256 return -1;
259 int mk_sched_remove_client(struct sched_list_node *sched, int remote_fd)
261 struct sched_connection *sc;
263 sc = mk_sched_get_connection(sched, remote_fd);
264 if (sc) {
265 close(remote_fd);
266 sc->status = MK_SCHEDULER_CONN_AVAILABLE;
267 return 0;
269 return -1;
272 struct sched_connection *mk_sched_get_connection(struct sched_list_node
273 *sched, int remote_fd)
275 int i;
277 if (!sched) {
278 sched = mk_sched_get_thread_conf();
279 if (!sched) {
280 close(remote_fd);
281 return NULL;
285 for (i = 0; i < config->worker_capacity; i++) {
286 if (sched->queue[i].socket == remote_fd) {
287 return &sched->queue[i];
291 return NULL;
294 int mk_sched_check_timeouts(struct sched_list_node *sched)
296 int i;
297 struct request_idx *req_idx;
298 struct client_request *req_cl;
300 /* PENDING CONN TIMEOUT */
301 for (i = 0; i < config->worker_capacity; i++) {
302 if (sched->queue[i].status == MK_SCHEDULER_CONN_PENDING) {
303 if (sched->queue[i].arrive_time + config->timeout <=
304 log_current_utime) {
305 #ifdef TRACE
306 MK_TRACE("Scheduler, closing fd %i due TIMEOUT",
307 sched->queue[i].socket);
308 #endif
309 mk_sched_remove_client(sched, sched->queue[i].socket);
314 /* PROCESSING CONN TIMEOUT */
315 req_idx = mk_sched_get_request_index();
316 req_cl = req_idx->first;
318 while (req_cl) {
319 if (req_cl->status == MK_REQUEST_STATUS_INCOMPLETE) {
320 if ((req_cl->init_time + config->timeout) >= log_current_utime) {
321 #ifdef TRACE
322 MK_TRACE("Scheduler, closing fd %i due to timeout (incomplete)",
323 req_cl->socket);
324 #endif
325 close(req_cl->socket);
328 req_cl = req_cl->next;
331 return 0;
334 int mk_sched_update_conn_status(struct sched_list_node *sched,
335 int remote_fd, int status)
337 int i;
339 if (!sched) {
340 return -1;
343 for (i = 0; i < config->workers; i++) {
344 if (sched->queue[i].socket == remote_fd) {
345 sched->queue[i].status = status;
346 return 0;
349 return 0;