Plugins :: Add _mk_plugin_worker_init() callback
[MonkeyD.git] / src / scheduler.c
blob7d9b3dceb6c7e400ad6cf7d92cb0e7b135b2a93d
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 8 -*- */
3 /* Monkey HTTP Daemon
4 * ------------------
5 * Copyright (C) 2008, Eduardo Silva P.
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Library General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include <pthread.h>
26 #include <sys/epoll.h>
27 #include <unistd.h>
28 #include <sys/syscall.h>
29 #include <string.h>
31 #include "monkey.h"
32 #include "connection.h"
33 #include "scheduler.h"
34 #include "memory.h"
35 #include "epoll.h"
36 #include "request.h"
37 #include "cache.h"
38 #include "config.h"
39 #include "clock.h"
40 #include "signals.h"
42 /* Register thread information */
43 int mk_sched_register_thread(pthread_t tid, int efd)
45 int i;
46 struct sched_list_node *sr, *aux;
48 sr = mk_mem_malloc_z(sizeof(struct sched_list_node));
49 sr->tid = tid;
50 sr->pid = -1;
51 sr->epoll_fd = efd;
52 sr->queue = mk_mem_malloc_z(sizeof(struct sched_connection)*
53 config->worker_capacity);
54 sr->request_handler = NULL;
55 sr->next = NULL;
57 for(i=0; i<config->worker_capacity; i++){
58 sr->queue[i].status = MK_SCHEDULER_CONN_AVAILABLE;
61 if(!sched_list)
63 sr->idx = 1;
64 sched_list = sr;
65 return 0;
68 aux = sched_list;
69 while(aux->next)
71 aux = aux->next;
73 sr->idx = aux->idx + 1;
74 aux->next = sr;
76 return 0;
80 * Create thread which will be listening
81 * for incomings file descriptors
83 int mk_sched_launch_thread(int max_events)
85 int efd;
86 pthread_t tid;
87 pthread_attr_t attr;
88 sched_thread_conf *thconf;
89 pthread_mutex_t mutex_wait_register;
91 /* Creating epoll file descriptor */
92 efd = mk_epoll_create(max_events);
93 if(efd < 1)
95 return -1;
98 /* Thread stuff */
99 pthread_mutex_init(&mutex_wait_register,(pthread_mutexattr_t *) NULL);
100 pthread_mutex_lock(&mutex_wait_register);
102 thconf = mk_mem_malloc(sizeof(sched_thread_conf));
103 thconf->epoll_fd = efd;
104 thconf->max_events = max_events;
106 pthread_attr_init(&attr);
107 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
108 if(pthread_create(&tid, &attr, mk_sched_launch_epoll_loop,
109 (void *) thconf)!=0)
111 perror("pthread_create");
112 return -1;
115 /* Register working thread */
116 mk_sched_register_thread(tid, efd);
117 pthread_mutex_unlock(&mutex_wait_register);
119 return 0;
122 /* created thread, all this calls are in the thread context */
123 void *mk_sched_launch_epoll_loop(void *thread_conf)
125 sched_thread_conf *thconf;
126 struct sched_list_node *thinfo;
128 /* Avoid SIGPIPE signals */
129 mk_signal_thread_sigpipe_safe();
131 thconf = thread_conf;
133 /* Init specific thread cache */
134 mk_cache_thread_init();
135 mk_plugin_worker_startup();
137 mk_epoll_handlers *handler;
138 handler = mk_epoll_set_handlers((void *) mk_conn_read,
139 (void *) mk_conn_write,
140 (void *) mk_conn_error,
141 (void *) mk_conn_close,
142 (void *) mk_conn_timeout);
144 /* Nasty way to export task id */
145 usleep(1000);
146 thinfo = mk_sched_get_thread_conf();
147 while(!thinfo){
148 thinfo = mk_sched_get_thread_conf();
151 /* Glibc doesn't export to user space the gettid() syscall */
152 thinfo->pid = syscall(__NR_gettid);
154 mk_sched_set_thread_poll(thconf->epoll_fd);
155 mk_epoll_init(thconf->epoll_fd, handler, thconf->max_events);
157 return 0;
160 struct request_idx *mk_sched_get_request_index()
162 return pthread_getspecific(request_index);
165 void mk_sched_set_request_index(struct request_idx *ri)
167 pthread_setspecific(request_index, (void *)ri);
170 void mk_sched_set_thread_poll(int epoll)
172 pthread_setspecific(epoll_fd, (void *) epoll);
175 int mk_sched_get_thread_poll()
177 return (int) pthread_getspecific(epoll_fd);
180 struct sched_list_node *mk_sched_get_thread_conf()
182 struct sched_list_node *node;
183 pthread_t current;
185 current = pthread_self();
186 node = sched_list;
187 while(node){
188 if(pthread_equal(node->tid, current) != 0){
189 return node;
191 node = node->next;
194 return NULL;
198 void mk_sched_update_thread_status(int active, int closed)
200 struct sched_list_node *thnode;
202 thnode = mk_sched_get_thread_conf();
204 switch(active){
205 case MK_SCHEDULER_ACTIVE_UP:
206 thnode->active_requests++;
207 break;
208 case MK_SCHEDULER_ACTIVE_DOWN:
209 thnode->active_requests--;
210 break;
213 switch(closed){
214 case MK_SCHEDULER_CLOSED_UP:
215 thnode->closed_requests++;
216 break;
217 case MK_SCHEDULER_CLOSED_DOWN:
218 thnode->closed_requests--;
219 break;
223 int mk_sched_add_client(struct sched_list_node **sched, int remote_fd)
225 int i;
226 struct sched_list_node *l;
228 l = (struct sched_list_node *) *sched;
230 /* Look for an available slot */
231 for(i=0; i<config->worker_capacity; i++){
232 if(l->queue[i].status == MK_SCHEDULER_CONN_AVAILABLE){
233 l->queue[i].socket = remote_fd;
234 l->queue[i].status = MK_SCHEDULER_CONN_PENDING;
235 l->queue[i].arrive_time = log_current_utime;
237 mk_epoll_add_client(l->epoll_fd, remote_fd,
238 MK_EPOLL_BEHAVIOR_TRIGGERED);
239 return 0;
243 return -1;
246 int mk_sched_remove_client(struct sched_list_node **sched, int remote_fd)
248 struct sched_connection *sc;
250 sc = mk_sched_get_connection(sched, remote_fd);
251 if(sc){
252 close(remote_fd);
253 sc->status = MK_SCHEDULER_CONN_AVAILABLE;
254 return 0;
256 return -1;
259 struct sched_connection *mk_sched_get_connection(struct sched_list_node **sched,
260 int remote_fd)
262 int i;
263 struct sched_list_node *l;
265 if(!sched){
266 l = mk_sched_get_thread_conf();
268 else{
269 l = (struct sched_list_node *) *sched;
272 for(i=0; i<config->worker_capacity; i++){
273 if(l->queue[i].socket == remote_fd){
274 return &l->queue[i];
278 return NULL;
281 int mk_sched_check_timeouts(struct sched_list_node **sched)
283 int i;
284 struct request_idx *req_idx;
285 struct client_request *req_cl;
286 struct sched_list_node *l;
288 l = (struct sched_list_node *) *sched;
290 /* PENDING CONN TIMEOUT */
291 for(i=0; i<config->worker_capacity; i++){
292 if(l->queue[i].status == MK_SCHEDULER_CONN_PENDING){
293 if(l->queue[i].arrive_time + config->timeout <= log_current_utime){
294 mk_sched_remove_client(&l, l->queue[i].socket);
299 /* PROCESSING CONN TIMEOUT */
300 req_idx = mk_sched_get_request_index();
301 req_cl = req_idx->first;
303 while(req_cl){
304 if(req_cl->status == MK_REQUEST_STATUS_INCOMPLETE){
305 if((req_cl->init_time + config->timeout) >=
306 log_current_utime){
307 close(req_cl->socket);
310 req_cl = req_cl->next;
313 return 0;