Removed trailing whitespaces and CRLFs
[getmangos.git] / src / game / WorldSocketMgr.cpp
bloba7558e354e6bc1976523cc153ffaea59bb14a529
1 /*
2 * Copyright (C) 2005-2008,2007 MaNGOS <http://getmangos.com/>
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 /** \file WorldSocketMgr.cpp
20 * \ingroup u2w
21 * \author Derex <derex101@gmail.com>
24 #include "WorldSocketMgr.h"
26 #include <ace/ACE.h>
27 #include <ace/Log_Msg.h>
28 #include <ace/Reactor.h>
29 #include <ace/Reactor_Impl.h>
30 #include <ace/TP_Reactor.h>
31 #include <ace/Dev_Poll_Reactor.h>
32 #include <ace/Guard_T.h>
33 #include <ace/Atomic_Op.h>
34 #include <ace/os_include/arpa/os_inet.h>
35 #include <ace/os_include/netinet/os_tcp.h>
36 #include <ace/os_include/sys/os_types.h>
37 #include <ace/os_include/sys/os_socket.h>
39 #include <set>
41 #include "Log.h"
42 #include "Common.h"
43 #include "Config/ConfigEnv.h"
44 #include "Database/DatabaseEnv.h"
45 #include "WorldSocket.h"
47 /**
48 * This is a helper class to WorldSocketMgr ,that manages
49 * network threads, and assigning connections from acceptor thread
50 * to other network threads
52 class ReactorRunnable : protected ACE_Task_Base
54 public:
56 ReactorRunnable () :
57 m_ThreadId (-1),
58 m_Connections (0),
59 m_Reactor (0)
61 ACE_Reactor_Impl* imp = 0;
63 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
65 imp = new ACE_Dev_Poll_Reactor ();
67 imp->max_notify_iterations (128);
68 imp->restart (1);
70 #else
72 imp = new ACE_TP_Reactor ();
73 imp->max_notify_iterations (128);
75 #endif
77 m_Reactor = new ACE_Reactor (imp, 1);
80 virtual ~ReactorRunnable ()
82 this->Stop ();
83 this->Wait ();
85 if (m_Reactor)
86 delete m_Reactor;
89 void Stop ()
91 m_Reactor->end_reactor_event_loop ();
94 int Start ()
96 if (m_ThreadId != -1)
97 return -1;
99 return (m_ThreadId = this->activate ());
102 void Wait () { ACE_Task_Base::wait (); }
104 long Connections ()
106 return static_cast<long> (m_Connections.value ());
109 int AddSocket (WorldSocket* sock)
111 ACE_GUARD_RETURN (ACE_Thread_Mutex, Guard, m_NewSockets_Lock, -1);
113 ++m_Connections;
114 sock->AddReference();
115 sock->reactor (m_Reactor);
116 m_NewSockets.insert (sock);
118 return 0;
121 ACE_Reactor* GetReactor ()
123 return m_Reactor;
126 protected:
128 void AddNewSockets ()
130 ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock);
132 if (m_NewSockets.empty ())
133 return;
135 for (SocketSet::iterator i = m_NewSockets.begin (); i != m_NewSockets.end (); ++i)
137 WorldSocket* sock = (*i);
139 if (sock->IsClosed ())
141 sock->RemoveReference ();
142 --m_Connections;
144 else
145 m_Sockets.insert (sock);
148 m_NewSockets.clear ();
151 virtual int svc ()
153 DEBUG_LOG ("Network Thread Starting");
155 WorldDatabase.ThreadStart ();
157 ACE_ASSERT (m_Reactor);
159 SocketSet::iterator i, t;
161 while (!m_Reactor->reactor_event_loop_done ())
163 // dont be too smart to move this outside the loop
164 // the run_reactor_event_loop will modify interval
165 ACE_Time_Value interval (0, 10000);
167 if (m_Reactor->run_reactor_event_loop (interval) == -1)
168 break;
170 AddNewSockets ();
172 for (i = m_Sockets.begin (); i != m_Sockets.end ();)
174 if ((*i)->Update () == -1)
176 t = i;
177 i++;
178 (*t)->CloseSocket ();
179 (*t)->RemoveReference ();
180 --m_Connections;
181 m_Sockets.erase (t);
183 else
184 i++;
188 WorldDatabase.ThreadEnd ();
190 DEBUG_LOG ("Network Thread Exitting");
192 return 0;
195 private:
196 typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, long> AtomicInt;
197 typedef std::set<WorldSocket*> SocketSet;
199 ACE_Reactor* m_Reactor;
200 AtomicInt m_Connections;
201 int m_ThreadId;
203 SocketSet m_Sockets;
205 SocketSet m_NewSockets;
206 ACE_Thread_Mutex m_NewSockets_Lock;
209 WorldSocketMgr::WorldSocketMgr () :
210 m_NetThreadsCount (0),
211 m_NetThreads (0),
212 m_SockOutKBuff (-1),
213 m_SockOutUBuff (65536),
214 m_UseNoDelay (true),
215 m_Acceptor (0)
219 WorldSocketMgr::~WorldSocketMgr ()
221 if (m_NetThreads)
222 delete [] m_NetThreads;
224 if(m_Acceptor)
225 delete m_Acceptor;
229 WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
231 m_UseNoDelay = sConfig.GetBoolDefault ("Network.TcpNodelay", true);
233 int num_threads = sConfig.GetIntDefault ("Network.Threads", 1);
235 if (num_threads <= 0)
237 sLog.outError ("Network.Threads is wrong in your config file");
238 return -1;
241 m_NetThreadsCount = static_cast<size_t> (num_threads + 1);
243 m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
245 sLog.outBasic ("Max alowed socket connections %d",ACE::max_handles ());
247 // -1 means use default
248 m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1);
250 m_SockOutUBuff = sConfig.GetIntDefault ("Network.OutUBuff", 65536);
252 if ( m_SockOutUBuff <= 0 )
254 sLog.outError ("Network.OutUBuff is wrong in your config file");
255 return -1;
258 WorldSocket::Acceptor *acc = new WorldSocket::Acceptor;
259 m_Acceptor = acc;
261 ACE_INET_Addr listen_addr (port, address);
263 if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
265 sLog.outError ("Failed to open acceptor ,check if the port is free");
266 return -1;
269 for (size_t i = 0; i < m_NetThreadsCount; ++i)
270 m_NetThreads[i].Start ();
272 return 0;
276 WorldSocketMgr::StartNetwork (ACE_UINT16 port, const char* address)
278 if (!sLog.IsOutDebug ())
279 ACE_Log_Msg::instance ()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS);
281 if (this->StartReactiveIO (port, address) == -1)
282 return -1;
284 return 0;
287 void
288 WorldSocketMgr::StopNetwork ()
290 if (m_Acceptor)
292 WorldSocket::Acceptor* acc = dynamic_cast<WorldSocket::Acceptor*> (m_Acceptor);
294 if (acc)
295 acc->close ();
298 if (m_NetThreadsCount != 0)
300 for (size_t i = 0; i < m_NetThreadsCount; ++i)
301 m_NetThreads[i].Stop ();
304 this->Wait ();
307 void
308 WorldSocketMgr::Wait ()
310 if (m_NetThreadsCount != 0)
312 for (size_t i = 0; i < m_NetThreadsCount; ++i)
313 m_NetThreads[i].Wait ();
318 WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
320 // set some options here
321 if (m_SockOutKBuff >= 0)
323 if (sock->peer ().set_option (SOL_SOCKET,
324 SO_SNDBUF,
325 (void*) & m_SockOutKBuff,
326 sizeof (int)) == -1 && errno != ENOTSUP)
328 sLog.outError ("WorldSocketMgr::OnSocketOpen set_option SO_SNDBUF");
329 return -1;
333 static const int ndoption = 1;
335 // Set TCP_NODELAY.
336 if (m_UseNoDelay)
338 if (sock->peer ().set_option (ACE_IPPROTO_TCP,
339 TCP_NODELAY,
340 (void*)&ndoption,
341 sizeof (int)) == -1)
343 sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno));
344 return -1;
348 sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff);
350 // we skip the Acceptor Thread
351 size_t min = 1;
353 ACE_ASSERT (m_NetThreadsCount >= 1);
355 for (size_t i = 1; i < m_NetThreadsCount; ++i)
356 if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
357 min = i;
359 return m_NetThreads[min].AddSocket (sock);
362 WorldSocketMgr*
363 WorldSocketMgr::Instance ()
365 return ACE_Singleton<WorldSocketMgr,ACE_Thread_Mutex>::instance();