1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
5 * Copyright (C) 2001-2010, Eduardo Silva P. <edsiper@gmail.com>
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU Library General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26 #include <sys/epoll.h>
28 #include <sys/syscall.h>
32 #include "connection.h"
33 #include "scheduler.h"
44 /* Register thread information */
45 int mk_sched_register_thread(pthread_t tid
, int efd
)
48 struct sched_list_node
*sl
, *aux
;
50 sl
= mk_mem_malloc_z(sizeof(struct sched_list_node
));
54 sl
->queue
= mk_mem_malloc_z(sizeof(struct sched_connection
) *
55 config
->worker_capacity
);
56 sl
->request_handler
= NULL
;
59 for (i
= 0; i
< config
->worker_capacity
; i
++) {
60 /* Pre alloc IPv4 memory buffer */
61 sl
->queue
[i
].ipv4
.data
= mk_mem_malloc_z(16);
62 sl
->queue
[i
].status
= MK_SCHEDULER_CONN_AVAILABLE
;
75 sl
->idx
= aux
->idx
+ 1;
82 * Create thread which will be listening
83 * for incomings file descriptors
85 int mk_sched_launch_thread(int max_events
)
90 sched_thread_conf
*thconf
;
91 pthread_mutex_t mutex_wait_register
;
93 /* Creating epoll file descriptor */
94 efd
= mk_epoll_create(max_events
);
100 pthread_mutex_init(&mutex_wait_register
, (pthread_mutexattr_t
*) NULL
);
101 pthread_mutex_lock(&mutex_wait_register
);
103 thconf
= mk_mem_malloc(sizeof(sched_thread_conf
));
104 thconf
->epoll_fd
= efd
;
105 thconf
->epoll_max_events
= max_events
*2;
106 thconf
->max_events
= max_events
;
108 pthread_attr_init(&attr
);
109 pthread_attr_setdetachstate(&attr
, PTHREAD_CREATE_DETACHED
);
110 if (pthread_create(&tid
, &attr
, mk_sched_launch_epoll_loop
,
111 (void *) thconf
) != 0) {
112 perror("pthread_create");
116 /* Register working thread */
117 mk_sched_register_thread(tid
, efd
);
118 pthread_mutex_unlock(&mutex_wait_register
);
123 /* created thread, all this calls are in the thread context */
124 void *mk_sched_launch_epoll_loop(void *thread_conf
)
126 sched_thread_conf
*thconf
;
127 struct sched_list_node
*thinfo
;
128 mk_epoll_handlers
*handler
;
130 /* Avoid SIGPIPE signals */
131 mk_signal_thread_sigpipe_safe();
133 thconf
= thread_conf
;
135 /* Init specific thread cache */
136 mk_cache_thread_init();
137 mk_plugin_core_thread();
139 /* Epoll event handlers */
140 handler
= mk_epoll_set_handlers((void *) mk_conn_read
,
141 (void *) mk_conn_write
,
142 (void *) mk_conn_error
,
143 (void *) mk_conn_close
,
144 (void *) mk_conn_timeout
);
146 /* Nasty way to export task id */
148 thinfo
= mk_sched_get_thread_conf();
150 thinfo
= mk_sched_get_thread_conf();
153 /* Glibc doesn't export to user space the gettid() syscall */
154 thinfo
->pid
= syscall(__NR_gettid
);
156 mk_sched_set_thread_poll(thconf
->epoll_fd
);
157 mk_epoll_init(thconf
->epoll_fd
, handler
, thconf
->epoll_max_events
);
162 struct request_idx
*mk_sched_get_request_index()
164 return pthread_getspecific(request_index
);
167 void mk_sched_set_request_index(struct request_idx
*ri
)
169 pthread_setspecific(request_index
, (void *) ri
);
172 void mk_sched_set_thread_poll(int epoll
)
174 pthread_setspecific(epoll_fd
, (void *) (size_t) epoll
);
177 int mk_sched_get_thread_poll()
179 return (size_t) pthread_getspecific(epoll_fd
);
182 struct sched_list_node
*mk_sched_get_thread_conf()
184 struct sched_list_node
*node
;
187 current
= pthread_self();
190 if (pthread_equal(node
->tid
, current
) != 0) {
199 int mk_sched_add_client(struct sched_list_node
*sched
, int remote_fd
)
203 for (i
= 0; i
< config
->worker_capacity
; i
++) {
204 if (sched
->queue
[i
].status
== MK_SCHEDULER_CONN_AVAILABLE
) {
206 MK_TRACE("[FD %i] Add", remote_fd
);
209 mk_socket_get_ip(remote_fd
, sched
->queue
[i
].ipv4
.data
);
210 mk_pointer_set( &sched
->queue
[i
].ipv4
, sched
->queue
[i
].ipv4
.data
);
212 /* Before to continue, we need to run plugin stage 20 */
213 ret
= mk_plugin_stage_run(MK_PLUGIN_STAGE_10
,
215 &sched
->queue
[i
], NULL
, NULL
);
217 /* Close connection, otherwise continue */
218 if (ret
== MK_PLUGIN_RET_CLOSE_CONX
) {
219 mk_conn_close(remote_fd
);
220 return MK_PLUGIN_RET_CLOSE_CONX
;
223 /* Socket and status */
224 sched
->queue
[i
].socket
= remote_fd
;
225 sched
->queue
[i
].status
= MK_SCHEDULER_CONN_PENDING
;
226 sched
->queue
[i
].arrive_time
= log_current_utime
;
228 mk_epoll_add(sched
->epoll_fd
, remote_fd
, MK_EPOLL_READ
,
229 MK_EPOLL_BEHAVIOR_TRIGGERED
);
237 int mk_sched_remove_client(struct sched_list_node
*sched
, int remote_fd
)
239 struct sched_connection
*sc
;
241 sc
= mk_sched_get_connection(sched
, remote_fd
);
244 MK_TRACE("[FD %i] Scheduler remove", remote_fd
);
246 /* Close socket and change status */
249 /* Invoke plugins in stage 50 */
250 mk_plugin_stage_run(MK_PLUGIN_STAGE_50
, remote_fd
, NULL
, NULL
, NULL
);
252 /* Change node status */
253 sc
->status
= MK_SCHEDULER_CONN_AVAILABLE
;
259 MK_TRACE("[FD %i] Not found");
265 struct sched_connection
*mk_sched_get_connection(struct sched_list_node
266 *sched
, int remote_fd
)
271 sched
= mk_sched_get_thread_conf();
274 MK_TRACE("[FD %i] No scheduler information", remote_fd
);
281 for (i
= 0; i
< config
->worker_capacity
; i
++) {
282 if (sched
->queue
[i
].socket
== remote_fd
) {
283 return &sched
->queue
[i
];
288 MK_TRACE("\n [%i] not found , why?\n\n", remote_fd
);
294 int mk_sched_check_timeouts(struct sched_list_node
*sched
)
296 int i
, client_timeout
;
297 struct request_idx
*req_idx
;
298 struct client_request
*req_cl
;
300 /* PENDING CONN TIMEOUT */
301 for (i
= 0; i
< config
->worker_capacity
; i
++) {
302 if (sched
->queue
[i
].status
== MK_SCHEDULER_CONN_PENDING
) {
303 client_timeout
= sched
->queue
[i
].arrive_time
+ config
->timeout
;
306 if (client_timeout
<= log_current_utime
) {
308 MK_TRACE("Scheduler, closing fd %i due TIMEOUT",
309 sched
->queue
[i
].socket
);
311 mk_sched_remove_client(sched
, sched
->queue
[i
].socket
);
316 /* PROCESSING CONN TIMEOUT */
317 req_idx
= mk_sched_get_request_index();
318 req_cl
= req_idx
->first
;
321 if (req_cl
->status
== MK_REQUEST_STATUS_INCOMPLETE
) {
322 if (req_cl
->counter_connections
== 0) {
323 client_timeout
= req_cl
->init_time
+ config
->timeout
;
326 client_timeout
= req_cl
->init_time
+ config
->keep_alive_timeout
;
330 if (client_timeout
<= log_current_utime
) {
332 MK_TRACE("Scheduler, closing fd %i due to timeout (incomplete)",
335 close(req_cl
->socket
);
336 mk_sched_remove_client(sched
, req_cl
->socket
);
337 mk_request_client_remove(req_cl
->socket
);
340 req_cl
= req_cl
->next
;
346 int mk_sched_update_conn_status(struct sched_list_node
*sched
,
347 int remote_fd
, int status
)
355 for (i
= 0; i
< config
->workers
; i
++) {
356 if (sched
->queue
[i
].socket
== remote_fd
) {
357 sched
->queue
[i
].status
= status
;