1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 8 -*- */
5 * Copyright (C) 2008, Eduardo Silva P.
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Library General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26 #include <sys/epoll.h>
28 #include <sys/syscall.h>
32 #include "connection.h"
33 #include "scheduler.h"
42 /* Register thread information */
43 int mk_sched_register_thread(pthread_t tid
, int efd
)
46 struct sched_list_node
*sr
, *aux
;
48 sr
= mk_mem_malloc_z(sizeof(struct sched_list_node
));
52 sr
->queue
= mk_mem_malloc_z(sizeof(struct sched_connection
)*
53 config
->worker_capacity
);
54 sr
->request_handler
= NULL
;
57 for(i
=0; i
<config
->worker_capacity
; i
++){
58 sr
->queue
[i
].status
= MK_SCHEDULER_CONN_AVAILABLE
;
73 sr
->idx
= aux
->idx
+ 1;
80 * Create thread which will be listening
81 * for incomings file descriptors
83 int mk_sched_launch_thread(int max_events
)
88 sched_thread_conf
*thconf
;
89 pthread_mutex_t mutex_wait_register
;
91 /* Creating epoll file descriptor */
92 efd
= mk_epoll_create(max_events
);
99 pthread_mutex_init(&mutex_wait_register
,(pthread_mutexattr_t
*) NULL
);
100 pthread_mutex_lock(&mutex_wait_register
);
102 thconf
= mk_mem_malloc(sizeof(sched_thread_conf
));
103 thconf
->epoll_fd
= efd
;
104 thconf
->max_events
= max_events
;
106 pthread_attr_init(&attr
);
107 pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_DETACHED
);
108 if(pthread_create(&tid
, &attr
, mk_sched_launch_epoll_loop
,
111 perror("pthread_create");
115 /* Register working thread */
116 mk_sched_register_thread(tid
, efd
);
117 pthread_mutex_unlock(&mutex_wait_register
);
122 /* created thread, all this calls are in the thread context */
123 void *mk_sched_launch_epoll_loop(void *thread_conf
)
125 sched_thread_conf
*thconf
;
126 struct sched_list_node
*thinfo
;
128 /* Avoid SIGPIPE signals */
129 mk_signal_thread_sigpipe_safe();
131 thconf
= thread_conf
;
133 /* Init specific thread cache */
134 mk_cache_thread_init();
135 mk_plugin_worker_startup();
137 mk_epoll_handlers
*handler
;
138 handler
= mk_epoll_set_handlers((void *) mk_conn_read
,
139 (void *) mk_conn_write
,
140 (void *) mk_conn_error
,
141 (void *) mk_conn_close
,
142 (void *) mk_conn_timeout
);
144 /* Nasty way to export task id */
146 thinfo
= mk_sched_get_thread_conf();
148 thinfo
= mk_sched_get_thread_conf();
151 /* Glibc doesn't export to user space the gettid() syscall */
152 thinfo
->pid
= syscall(__NR_gettid
);
154 mk_sched_set_thread_poll(thconf
->epoll_fd
);
155 mk_epoll_init(thconf
->epoll_fd
, handler
, thconf
->max_events
);
160 struct request_idx
*mk_sched_get_request_index()
162 return pthread_getspecific(request_index
);
165 void mk_sched_set_request_index(struct request_idx
*ri
)
167 pthread_setspecific(request_index
, (void *)ri
);
170 void mk_sched_set_thread_poll(int epoll
)
172 pthread_setspecific(epoll_fd
, (void *) epoll
);
175 int mk_sched_get_thread_poll()
177 return (int) pthread_getspecific(epoll_fd
);
180 struct sched_list_node
*mk_sched_get_thread_conf()
182 struct sched_list_node
*node
;
185 current
= pthread_self();
188 if(pthread_equal(node
->tid
, current
) != 0){
198 void mk_sched_update_thread_status(int active
, int closed
)
200 struct sched_list_node
*thnode
;
202 thnode
= mk_sched_get_thread_conf();
205 case MK_SCHEDULER_ACTIVE_UP
:
206 thnode
->active_requests
++;
208 case MK_SCHEDULER_ACTIVE_DOWN
:
209 thnode
->active_requests
--;
214 case MK_SCHEDULER_CLOSED_UP
:
215 thnode
->closed_requests
++;
217 case MK_SCHEDULER_CLOSED_DOWN
:
218 thnode
->closed_requests
--;
223 int mk_sched_add_client(struct sched_list_node
**sched
, int remote_fd
)
226 struct sched_list_node
*l
;
228 l
= (struct sched_list_node
*) *sched
;
230 /* Look for an available slot */
231 for(i
=0; i
<config
->worker_capacity
; i
++){
232 if(l
->queue
[i
].status
== MK_SCHEDULER_CONN_AVAILABLE
){
233 l
->queue
[i
].socket
= remote_fd
;
234 l
->queue
[i
].status
= MK_SCHEDULER_CONN_PENDING
;
235 l
->queue
[i
].arrive_time
= log_current_utime
;
237 mk_epoll_add_client(l
->epoll_fd
, remote_fd
,
238 MK_EPOLL_BEHAVIOR_TRIGGERED
);
246 int mk_sched_remove_client(struct sched_list_node
**sched
, int remote_fd
)
248 struct sched_connection
*sc
;
250 sc
= mk_sched_get_connection(sched
, remote_fd
);
253 sc
->status
= MK_SCHEDULER_CONN_AVAILABLE
;
259 struct sched_connection
*mk_sched_get_connection(struct sched_list_node
**sched
,
263 struct sched_list_node
*l
;
266 l
= mk_sched_get_thread_conf();
269 l
= (struct sched_list_node
*) *sched
;
272 for(i
=0; i
<config
->worker_capacity
; i
++){
273 if(l
->queue
[i
].socket
== remote_fd
){
281 int mk_sched_check_timeouts(struct sched_list_node
**sched
)
284 struct request_idx
*req_idx
;
285 struct client_request
*req_cl
;
286 struct sched_list_node
*l
;
288 l
= (struct sched_list_node
*) *sched
;
290 /* PENDING CONN TIMEOUT */
291 for(i
=0; i
<config
->worker_capacity
; i
++){
292 if(l
->queue
[i
].status
== MK_SCHEDULER_CONN_PENDING
){
293 if(l
->queue
[i
].arrive_time
+ config
->timeout
<= log_current_utime
){
294 mk_sched_remove_client(&l
, l
->queue
[i
].socket
);
299 /* PROCESSING CONN TIMEOUT */
300 req_idx
= mk_sched_get_request_index();
301 req_cl
= req_idx
->first
;
304 if(req_cl
->status
== MK_REQUEST_STATUS_INCOMPLETE
){
305 if((req_cl
->init_time
+ config
->timeout
) >=
307 close(req_cl
->socket
);
310 req_cl
= req_cl
->next
;