2 * Copyright (C) 2005-2009 MaNGOS <http://getmangos.com/>
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 /** \file WorldSocketMgr.cpp
21 * \author Derex <derex101@gmail.com>
24 #include "WorldSocketMgr.h"
27 #include <ace/Log_Msg.h>
28 #include <ace/Reactor.h>
29 #include <ace/Reactor_Impl.h>
30 #include <ace/TP_Reactor.h>
31 #include <ace/Dev_Poll_Reactor.h>
32 #include <ace/Guard_T.h>
33 #include <ace/Atomic_Op.h>
34 #include <ace/os_include/arpa/os_inet.h>
35 #include <ace/os_include/netinet/os_tcp.h>
36 #include <ace/os_include/sys/os_types.h>
37 #include <ace/os_include/sys/os_socket.h>
43 #include "Config/ConfigEnv.h"
44 #include "Database/DatabaseEnv.h"
45 #include "WorldSocket.h"
48 * This is a helper class to WorldSocketMgr ,that manages
49 * network threads, and assigning connections from acceptor thread
50 * to other network threads
52 class ReactorRunnable
: protected ACE_Task_Base
61 ACE_Reactor_Impl
* imp
= 0;
63 #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
65 imp
= new ACE_Dev_Poll_Reactor ();
67 imp
->max_notify_iterations (128);
72 imp
= new ACE_TP_Reactor ();
73 imp
->max_notify_iterations (128);
77 m_Reactor
= new ACE_Reactor (imp
, 1);
80 virtual ~ReactorRunnable ()
91 m_Reactor
->end_reactor_event_loop ();
99 return (m_ThreadId
= activate ());
102 void Wait () { ACE_Task_Base::wait (); }
106 return static_cast<long> (m_Connections
.value ());
109 int AddSocket (WorldSocket
* sock
)
111 ACE_GUARD_RETURN (ACE_Thread_Mutex
, Guard
, m_NewSockets_Lock
, -1);
114 sock
->AddReference();
115 sock
->reactor (m_Reactor
);
116 m_NewSockets
.insert (sock
);
121 ACE_Reactor
* GetReactor ()
128 void AddNewSockets ()
130 ACE_GUARD (ACE_Thread_Mutex
, Guard
, m_NewSockets_Lock
);
132 if (m_NewSockets
.empty ())
135 for (SocketSet::const_iterator i
= m_NewSockets
.begin (); i
!= m_NewSockets
.end (); ++i
)
137 WorldSocket
* sock
= (*i
);
139 if (sock
->IsClosed ())
141 sock
->RemoveReference ();
145 m_Sockets
.insert (sock
);
148 m_NewSockets
.clear ();
153 DEBUG_LOG ("Network Thread Starting");
155 WorldDatabase
.ThreadStart ();
157 ACE_ASSERT (m_Reactor
);
159 SocketSet::iterator i
, t
;
161 while (!m_Reactor
->reactor_event_loop_done ())
163 // dont be too smart to move this outside the loop
164 // the run_reactor_event_loop will modify interval
165 ACE_Time_Value
interval (0, 10000);
167 if (m_Reactor
->run_reactor_event_loop (interval
) == -1)
172 for (i
= m_Sockets
.begin (); i
!= m_Sockets
.end ();)
174 if ((*i
)->Update () == -1)
178 (*t
)->CloseSocket ();
179 (*t
)->RemoveReference ();
188 WorldDatabase
.ThreadEnd ();
190 DEBUG_LOG ("Network Thread Exitting");
196 typedef ACE_Atomic_Op
<ACE_SYNCH_MUTEX
, long> AtomicInt
;
197 typedef std::set
<WorldSocket
*> SocketSet
;
199 ACE_Reactor
* m_Reactor
;
200 AtomicInt m_Connections
;
205 SocketSet m_NewSockets
;
206 ACE_Thread_Mutex m_NewSockets_Lock
;
209 WorldSocketMgr::WorldSocketMgr () :
210 m_NetThreadsCount (0),
213 m_SockOutUBuff (65536),
219 WorldSocketMgr::~WorldSocketMgr ()
222 delete [] m_NetThreads
;
229 WorldSocketMgr::StartReactiveIO (ACE_UINT16 port
, const char* address
)
231 m_UseNoDelay
= sConfig
.GetBoolDefault ("Network.TcpNodelay", true);
233 int num_threads
= sConfig
.GetIntDefault ("Network.Threads", 1);
235 if (num_threads
<= 0)
237 sLog
.outError ("Network.Threads is wrong in your config file");
241 m_NetThreadsCount
= static_cast<size_t> (num_threads
+ 1);
243 m_NetThreads
= new ReactorRunnable
[m_NetThreadsCount
];
245 sLog
.outBasic ("Max allowed socket connections %d",ACE::max_handles ());
247 // -1 means use default
248 m_SockOutKBuff
= sConfig
.GetIntDefault ("Network.OutKBuff", -1);
250 m_SockOutUBuff
= sConfig
.GetIntDefault ("Network.OutUBuff", 65536);
252 if ( m_SockOutUBuff
<= 0 )
254 sLog
.outError ("Network.OutUBuff is wrong in your config file");
258 WorldSocket::Acceptor
*acc
= new WorldSocket::Acceptor
;
261 ACE_INET_Addr
listen_addr (port
, address
);
263 if (acc
->open (listen_addr
, m_NetThreads
[0].GetReactor (), ACE_NONBLOCK
) == -1)
265 sLog
.outError ("Failed to open acceptor ,check if the port is free");
269 for (size_t i
= 0; i
< m_NetThreadsCount
; ++i
)
270 m_NetThreads
[i
].Start ();
276 WorldSocketMgr::StartNetwork (ACE_UINT16 port
, const char* address
)
278 if (!sLog
.IsOutDebug ())
279 ACE_Log_Msg::instance ()->priority_mask (LM_ERROR
, ACE_Log_Msg::PROCESS
);
281 if (StartReactiveIO (port
, address
) == -1)
288 WorldSocketMgr::StopNetwork ()
292 WorldSocket::Acceptor
* acc
= dynamic_cast<WorldSocket::Acceptor
*> (m_Acceptor
);
298 if (m_NetThreadsCount
!= 0)
300 for (size_t i
= 0; i
< m_NetThreadsCount
; ++i
)
301 m_NetThreads
[i
].Stop ();
308 WorldSocketMgr::Wait ()
310 if (m_NetThreadsCount
!= 0)
312 for (size_t i
= 0; i
< m_NetThreadsCount
; ++i
)
313 m_NetThreads
[i
].Wait ();
318 WorldSocketMgr::OnSocketOpen (WorldSocket
* sock
)
320 // set some options here
321 if (m_SockOutKBuff
>= 0)
323 if (sock
->peer ().set_option (SOL_SOCKET
,
325 (void*) & m_SockOutKBuff
,
326 sizeof (int)) == -1 && errno
!= ENOTSUP
)
328 sLog
.outError ("WorldSocketMgr::OnSocketOpen set_option SO_SNDBUF");
333 static const int ndoption
= 1;
338 if (sock
->peer ().set_option (ACE_IPPROTO_TCP
,
343 sLog
.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno
));
348 sock
->m_OutBufferSize
= static_cast<size_t> (m_SockOutUBuff
);
350 // we skip the Acceptor Thread
353 ACE_ASSERT (m_NetThreadsCount
>= 1);
355 for (size_t i
= 1; i
< m_NetThreadsCount
; ++i
)
356 if (m_NetThreads
[i
].Connections () < m_NetThreads
[min
].Connections ())
359 return m_NetThreads
[min
].AddSocket (sock
);
363 WorldSocketMgr::Instance ()
365 return ACE_Singleton
<WorldSocketMgr
,ACE_Thread_Mutex
>::instance();