General code output cleanup
[fmail.git] / src / threadpool.cpp
blob497a248ea52b63b754a6bbf03debbabd730a0985
1 /*
2 libfmail: Threaded Load handler with Thread Pool
4 Copyright (C) 2007 Carlos Daniel Ruvalcaba Valenzuela <clsdaniel@gmail.com>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include <libfmail.h>
23 /* Worker Thread, has a sempahore for signaling */
24 class ThreadLoadHandler : public Thread, public Semaphore{
25 private:
26 Socket *s;
27 ProtocolHandler *ph;
28 public:
29 int onRun;
30 ThreadLoadHandler(ProtocolHandler *pht) : Thread(), Semaphore(){
31 s = NULL;
32 ph = pht;
33 onRun = 1;
36 /* Set client socket */
37 void setSocket(Socket *sock){
38 s = sock;
41 /* Deprecated */
42 int isReady(){
43 return 0;
46 /* Thread code */
47 int Run(){
48 while (onRun){
49 /* Wait for a job, decrement semaphore */
50 Wait();
52 if (s){
53 /* Run the handler */
54 ph->Handle(s);
57 s = NULL;
59 /* Decrement the semaphore */
60 Wait();
62 return 0;
66 ThreadLoad::ThreadLoad(ProtocolHandler *ph, int tpoolSize){
67 int i;
68 poolSize = tpoolSize;
69 ThreadLoadHandler **tpool;
71 /* Allocate an array for the worker threads */
72 pool = (void**)malloc(sizeof(ThreadLoadHandler*) * poolSize);
73 tpool = (ThreadLoadHandler**)pool;
75 /* Create and start worker threads, all should just wait for job */
76 for (i = 0; i < poolSize; i++){
77 tpool[i] = new ThreadLoadHandler(ph);
78 tpool[i]->Start();
82 ThreadLoad::~ThreadLoad(){
83 int unclean, i;
84 ThreadLoadHandler **tpool;
86 tpool = (ThreadLoadHandler**)pool;
88 unclean = 1;
89 while(unclean){
90 unclean = 0;
91 for(i = 0; i < poolSize; i++){
92 if (tpool[i]->onRun){
93 if (tpool[i]->isReady()){
94 tpool[i]->onRun = 0;
95 tpool[i]->Post();
96 }else{
97 unclean = 1;
103 for(i = 0; i < poolSize; i++){
104 delete tpool[i];
106 free(pool);
107 pool = NULL;
110 /* Boss thread handling code */
111 int ThreadLoad::Dispatch(Socket *sock, ProtocolHandler *ph){
112 int i, dispatched;
113 ThreadLoadHandler **tpool;
115 (void)ph;
116 tpool = (ThreadLoadHandler**)pool;
118 dispatched = 0;
119 i = 0;
121 while (dispatched == 0){
122 /* Check semaphore value, if 0 the thread is waiting for work */
123 if (tpool[i]->getValue() == 0){
124 /* Handle the socket to the thread (work) */
125 tpool[i]->setSocket(sock);
127 /* Increment twice the semaphore, this to prevent
128 * handling work to a thread already doing something but
129 * with semaphore value of 0 */
130 tpool[i]->Post();
131 tpool[i]->Post();
132 dispatched = 1;
134 i++;
135 /* Wrap i around pool size */
136 if (i == poolSize)
137 i = 0;
139 return 0;