Fix 64 bits warnings and issues
[MonkeyD.git] / src / scheduler.c
blob578bf4d4ec37e5e40260c5327ae78c55479cb967
1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
3 /* Monkey HTTP Daemon
4 * ------------------
5 * Copyright (C) 2001-2010, Eduardo Silva P. <edsiper@gmail.com>
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Library General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include <pthread.h>
26 #include <sys/epoll.h>
27 #include <unistd.h>
28 #include <sys/syscall.h>
29 #include <string.h>
31 #include "monkey.h"
32 #include "connection.h"
33 #include "scheduler.h"
34 #include "memory.h"
35 #include "epoll.h"
36 #include "request.h"
37 #include "cache.h"
38 #include "config.h"
39 #include "clock.h"
40 #include "signals.h"
41 #include "plugin.h"
42 #include "utils.h"
44 /* Register thread information */
45 int mk_sched_register_thread(pthread_t tid, int efd)
47 int i;
48 struct sched_list_node *sl, *aux;
50 sl = mk_mem_malloc_z(sizeof(struct sched_list_node));
51 sl->tid = tid;
52 sl->pid = -1;
53 sl->epoll_fd = efd;
54 sl->queue = mk_mem_malloc_z(sizeof(struct sched_connection) *
55 config->worker_capacity);
56 sl->request_handler = NULL;
57 sl->next = NULL;
59 for (i = 0; i < config->worker_capacity; i++) {
60 /* Pre alloc IPv4 memory buffer */
61 sl->queue[i].ipv4.data = mk_mem_malloc_z(16);
62 sl->queue[i].status = MK_SCHEDULER_CONN_AVAILABLE;
65 if (!sched_list) {
66 sl->idx = 1;
67 sched_list = sl;
68 return 0;
71 aux = sched_list;
72 while (aux->next) {
73 aux = aux->next;
75 sl->idx = aux->idx + 1;
76 aux->next = sl;
78 return 0;
82 * Create thread which will be listening
83 * for incomings file descriptors
85 int mk_sched_launch_thread(int max_events)
87 int efd;
88 pthread_t tid;
89 pthread_attr_t attr;
90 sched_thread_conf *thconf;
91 pthread_mutex_t mutex_wait_register;
93 /* Creating epoll file descriptor */
94 efd = mk_epoll_create(max_events);
95 if (efd < 1) {
96 return -1;
99 /* Thread stuff */
100 pthread_mutex_init(&mutex_wait_register, (pthread_mutexattr_t *) NULL);
101 pthread_mutex_lock(&mutex_wait_register);
103 thconf = mk_mem_malloc(sizeof(sched_thread_conf));
104 thconf->epoll_fd = efd;
105 thconf->epoll_max_events = max_events*2;
106 thconf->max_events = max_events;
108 pthread_attr_init(&attr);
109 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
110 if (pthread_create(&tid, &attr, mk_sched_launch_epoll_loop,
111 (void *) thconf) != 0) {
112 perror("pthread_create");
113 return -1;
116 /* Register working thread */
117 mk_sched_register_thread(tid, efd);
118 pthread_mutex_unlock(&mutex_wait_register);
120 return 0;
123 /* created thread, all this calls are in the thread context */
124 void *mk_sched_launch_epoll_loop(void *thread_conf)
126 sched_thread_conf *thconf;
127 struct sched_list_node *thinfo;
128 mk_epoll_handlers *handler;
130 /* Avoid SIGPIPE signals */
131 mk_signal_thread_sigpipe_safe();
133 thconf = thread_conf;
135 /* Init specific thread cache */
136 mk_cache_thread_init();
137 mk_plugin_core_thread();
139 /* Epoll event handlers */
140 handler = mk_epoll_set_handlers((void *) mk_conn_read,
141 (void *) mk_conn_write,
142 (void *) mk_conn_error,
143 (void *) mk_conn_close,
144 (void *) mk_conn_timeout);
146 /* Nasty way to export task id */
147 usleep(1000);
148 thinfo = mk_sched_get_thread_conf();
149 while (!thinfo) {
150 thinfo = mk_sched_get_thread_conf();
153 /* Glibc doesn't export to user space the gettid() syscall */
154 thinfo->pid = syscall(__NR_gettid);
156 mk_sched_set_thread_poll(thconf->epoll_fd);
157 mk_epoll_init(thconf->epoll_fd, handler, thconf->epoll_max_events);
159 return 0;
162 struct request_idx *mk_sched_get_request_index()
164 return pthread_getspecific(request_index);
167 void mk_sched_set_request_index(struct request_idx *ri)
169 pthread_setspecific(request_index, (void *) ri);
172 void mk_sched_set_thread_poll(int epoll)
174 pthread_setspecific(epoll_fd, (void *) (size_t) epoll);
177 int mk_sched_get_thread_poll()
179 return (size_t) pthread_getspecific(epoll_fd);
182 struct sched_list_node *mk_sched_get_thread_conf()
184 struct sched_list_node *node;
185 pthread_t current;
187 current = pthread_self();
188 node = sched_list;
189 while (node) {
190 if (pthread_equal(node->tid, current) != 0) {
191 return node;
193 node = node->next;
196 return NULL;
199 int mk_sched_add_client(struct sched_list_node *sched, int remote_fd)
201 unsigned int i, ret;
203 for (i = 0; i < config->worker_capacity; i++) {
204 if (sched->queue[i].status == MK_SCHEDULER_CONN_AVAILABLE) {
205 #ifdef TRACE
206 MK_TRACE("[FD %i] Add", remote_fd);
207 #endif
208 /* Set IP */
209 mk_socket_get_ip(remote_fd, sched->queue[i].ipv4.data);
210 mk_pointer_set( &sched->queue[i].ipv4, sched->queue[i].ipv4.data );
212 /* Before to continue, we need to run plugin stage 20 */
213 ret = mk_plugin_stage_run(MK_PLUGIN_STAGE_10,
214 remote_fd,
215 &sched->queue[i], NULL, NULL);
217 /* Close connection, otherwise continue */
218 if (ret == MK_PLUGIN_RET_CLOSE_CONX) {
219 mk_conn_close(remote_fd);
220 return MK_PLUGIN_RET_CLOSE_CONX;
223 /* Socket and status */
224 sched->queue[i].socket = remote_fd;
225 sched->queue[i].status = MK_SCHEDULER_CONN_PENDING;
226 sched->queue[i].arrive_time = log_current_utime;
228 mk_epoll_add(sched->epoll_fd, remote_fd, MK_EPOLL_READ,
229 MK_EPOLL_BEHAVIOR_TRIGGERED);
230 return 0;
234 return -1;
237 int mk_sched_remove_client(struct sched_list_node *sched, int remote_fd)
239 struct sched_connection *sc;
241 sc = mk_sched_get_connection(sched, remote_fd);
242 if (sc) {
243 #ifdef TRACE
244 MK_TRACE("[FD %i] Scheduler remove", remote_fd);
245 #endif
246 /* Close socket and change status */
247 close(remote_fd);
249 /* Invoke plugins in stage 50 */
250 mk_plugin_stage_run(MK_PLUGIN_STAGE_50, remote_fd, NULL, NULL, NULL);
252 /* Change node status */
253 sc->status = MK_SCHEDULER_CONN_AVAILABLE;
254 sc->socket = -1;
255 return 0;
257 #ifdef TRACE
258 else {
259 MK_TRACE("[FD %i] Not found");
261 #endif
262 return -1;
265 struct sched_connection *mk_sched_get_connection(struct sched_list_node
266 *sched, int remote_fd)
268 int i;
270 if (!sched) {
271 sched = mk_sched_get_thread_conf();
272 if (!sched) {
273 #ifdef TRACE
274 MK_TRACE("[FD %i] No scheduler information", remote_fd);
275 #endif
276 close(remote_fd);
277 return NULL;
281 for (i = 0; i < config->worker_capacity; i++) {
282 if (sched->queue[i].socket == remote_fd) {
283 return &sched->queue[i];
287 #ifdef TRACE
288 MK_TRACE("\n [%i] not found , why?\n\n", remote_fd);
289 #endif
291 return NULL;
294 int mk_sched_check_timeouts(struct sched_list_node *sched)
296 int i, client_timeout;
297 struct request_idx *req_idx;
298 struct client_request *req_cl;
300 /* PENDING CONN TIMEOUT */
301 for (i = 0; i < config->worker_capacity; i++) {
302 if (sched->queue[i].status == MK_SCHEDULER_CONN_PENDING) {
303 client_timeout = sched->queue[i].arrive_time + config->timeout;
305 /* Check timeout */
306 if (client_timeout <= log_current_utime) {
307 #ifdef TRACE
308 MK_TRACE("Scheduler, closing fd %i due TIMEOUT",
309 sched->queue[i].socket);
310 #endif
311 mk_sched_remove_client(sched, sched->queue[i].socket);
316 /* PROCESSING CONN TIMEOUT */
317 req_idx = mk_sched_get_request_index();
318 req_cl = req_idx->first;
320 while (req_cl) {
321 if (req_cl->status == MK_REQUEST_STATUS_INCOMPLETE) {
322 if (req_cl->counter_connections == 0) {
323 client_timeout = req_cl->init_time + config->timeout;
325 else {
326 client_timeout = req_cl->init_time + config->keep_alive_timeout;
329 /* Check timeout */
330 if (client_timeout <= log_current_utime) {
331 #ifdef TRACE
332 MK_TRACE("Scheduler, closing fd %i due to timeout (incomplete)",
333 req_cl->socket);
334 #endif
335 close(req_cl->socket);
336 mk_sched_remove_client(sched, req_cl->socket);
337 mk_request_client_remove(req_cl->socket);
340 req_cl = req_cl->next;
343 return 0;
346 int mk_sched_update_conn_status(struct sched_list_node *sched,
347 int remote_fd, int status)
349 int i;
351 if (!sched) {
352 return -1;
355 for (i = 0; i < config->workers; i++) {
356 if (sched->queue[i].socket == remote_fd) {
357 sched->queue[i].status = status;
358 return 0;
361 return 0;