4 * The secure anycast tunneling protocol (satp) defines a protocol used
5 * for communication between any combination of unicast and anycast
6 * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
7 * mode and allows tunneling of every ETHER TYPE protocol (e.g.
8 * ethernet, ip, arp ...). satp directly includes cryptography and
9 * message authentication based on the methodes used by SRTP. It is
10 * intended to deliver a generic, scaleable and secure solution for
11 * tunneling and relaying of packets of any protocol.
14 * Copyright (C) 2007-2008 Othmar Gsenger, Erwin Nindl,
15 * Christian Pointner <satp@wirdorange.org>
17 * This file is part of Anytun.
19 * Anytun is free software: you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License version 3 as
21 * published by the Free Software Foundation.
23 * Anytun is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * GNU General Public License for more details.
28 * You should have received a copy of the GNU General Public License
29 * along with anytun. If not, see <http://www.gnu.org/licenses/>.
34 #include <boost/asio.hpp>
40 #include "../datatypes.h"
43 #include "../signalController.h"
44 #include "../buffer.h"
45 #include "connectionList.h"
46 #include "rtpSessionTable.h"
47 #include "syncRtpCommand.h"
48 #include "../syncQueue.h"
49 #include "../syncClient.h"
50 #include "syncOnConnect.hpp"
52 #include "../threadUtils.hpp"
54 #include "commandHandler.h"
55 #include "callIdQueue.h"
58 #include "portWindow.h"
62 #define MAX_PACKET_SIZE 1500
64 void listener(RtpSession::proto::socket
* sock1
, RtpSession::proto::socket
* sock2
, std::string call_id
, int dir
, SyncQueue
* queue
, bool* running
)
66 cLog
.msg(Log::PRIO_NOTICE
) << "listener(" << call_id
<< "/" << dir
<< ") started";
70 Buffer
buf(u_int32_t(MAX_PACKET_SIZE
));
71 RtpSession::proto::endpoint remote_end
;
74 buf
.setLength(MAX_PACKET_SIZE
);
77 len
= 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
79 len
= 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
82 RtpSession
& session
= gRtpSessionTable
.getSession(call_id
);
83 if(session
.isDead()) {
84 cLog
.msg(Log::PRIO_NOTICE
) << "listener(" << call_id
<< "/" << dir
<< ") session is dead, exiting";
92 if((dir
== 1 && remote_end
!= session
.getRemoteEnd1()) ||
93 (dir
== 2 && remote_end
!= session
.getRemoteEnd2()))
96 (!gOpt
.getNoNatOnce() && ((dir
== 1 && !session
.getSeen1()) ||
97 (dir
== 2 && !session
.getSeen2()))))
99 cLog
.msg(Log::PRIO_NOTICE
) << "listener(" << call_id
<< "/" << dir
<< ") setting remote host to "
102 session
.setRemoteEnd1(remote_end
);
104 session
.setRemoteEnd2(remote_end
);
106 if(!gOpt
.getNat()) { // with nat enabled sync is not needed
107 SyncRtpCommand
sc(call_id
);
118 sock2
->send_to(boost::asio::buffer(buf
.getBuf(), buf
.getLength()), session
.getRemoteEnd2());
120 sock1
->send_to(boost::asio::buffer(buf
.getBuf(), buf
.getLength()), session
.getRemoteEnd1());
124 catch(std::exception
&e
)
126 cLog
.msg(Log::PRIO_ERR
) << "listener(" << call_id
<< "/" << dir
<< ") exiting because: " << e
.what();
129 gCallIdQueue
.push(call_id
);
135 ListenerData() : sock1_(ios1_
), sock2_(ios2_
) {}
137 boost::asio::io_service ios1_
;
138 boost::asio::io_service ios2_
;
139 RtpSession::proto::socket sock1_
;
140 RtpSession::proto::socket sock2_
;
141 boost::thread
* thread1_
;
142 boost::thread
* thread2_
;
147 void listenerManager(void* p
)
149 SyncQueue
* queue_
= reinterpret_cast<SyncQueue
*>(p
);
151 std::map
<std::string
, ListenerData
*> listenerMap
;
156 std::string call_id
= gCallIdQueue
.front(); // waits for semaphor and returns next call_id
159 RtpSession
& session
= gRtpSessionTable
.getSession(call_id
);
160 if(!session
.isComplete())
163 std::map
<std::string
, ListenerData
*>::iterator it
;
164 it
= listenerMap
.find(call_id
);
165 if(it
== listenerMap
.end()) // listener Threads not existing yet
167 ListenerData
* ld
= new ListenerData();
169 ld
->sock1_
.open(session
.getLocalEnd1().protocol());
170 ld
->sock1_
.bind(session
.getLocalEnd1());
172 ld
->sock2_
.open(session
.getLocalEnd2().protocol());
173 ld
->sock2_
.bind(session
.getLocalEnd2());
175 ld
->thread1_
= new boost::thread(boost::bind(listener
, &(ld
->sock1_
), &(ld
->sock2_
), call_id
, 1, queue_
, &(ld
->running1_
)));
176 ld
->thread2_
= new boost::thread(boost::bind(listener
, &(ld
->sock1_
), &(ld
->sock2_
), call_id
, 2, queue_
, &(ld
->running2_
)));
178 std::pair
<std::map
<std::string
, ListenerData
*>::iterator
, bool> ret
;
179 ret
= listenerMap
.insert(std::map
<std::string
, ListenerData
*>::value_type(call_id
, ld
));
183 if(!it
->second
->running1_
&& !it
->second
->running2_
)
185 cLog
.msg(Log::PRIO_NOTICE
) << "listenerManager both threads for '" << call_id
<< "' exited, cleaning up";
186 if(it
->second
->thread1_
) {
187 it
->second
->thread1_
->join();
188 delete it
->second
->thread1_
;
190 if(it
->second
->thread2_
) {
191 it
->second
->thread2_
->join();
192 delete it
->second
->thread2_
;
195 listenerMap
.erase(it
);
196 gRtpSessionTable
.delSession(call_id
);
199 // TODO: reinit if session changed
201 catch(std::exception
&e
)
203 cLog
.msg(Log::PRIO_ERR
) << "listenerManager restarting after exception: " << e
.what();
204 usleep(500); // in case of an hard error don't block cpu (this is ugly)
207 cLog
.msg(Log::PRIO_ERR
) << "listenerManager exiting because of unknown reason";
210 void chrootAndDrop(string
const& chrootdir
, string
const& username
)
214 std::cerr
<< "this programm has to be run as root in order to run in a chroot" << std::endl
;
218 struct passwd
*pw
= getpwnam(username
.c_str());
220 if(chroot(chrootdir
.c_str()))
222 std::cerr
<< "can't chroot to " << chrootdir
<< std::endl
;
225 std::cout
<< "we are in chroot jail (" << chrootdir
<< ") now" << std::endl
;
227 if (initgroups(pw
->pw_name
, pw
->pw_gid
) || setgid(pw
->pw_gid
) || setuid(pw
->pw_uid
))
229 std::cerr
<< "can't drop to user " << username
<< " " << pw
->pw_uid
<< ":" << pw
->pw_gid
<< std::endl
;
232 std::cout
<< "dropped user to " << username
<< " " << pw
->pw_uid
<< ":" << pw
->pw_gid
<< std::endl
;
236 std::cerr
<< "unknown user " << username
<< std::endl
;
251 // std::cout << "running in background now..." << std::endl;
254 // for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
255 for (fd
=0;fd
<=2;fd
++) // close all file descriptors
257 fd
=open("/dev/null",O_RDWR
); // stdin
266 ThreadParam(SyncQueue
& queue_
,OptionConnectTo
& connto_
)
267 : queue(queue_
),connto(connto_
)
270 OptionConnectTo
& connto
;
273 void syncConnector(void* p
)
275 ThreadParam
* param
= reinterpret_cast<ThreadParam
*>(p
);
277 SyncClient
sc ( param
->connto
.host
, param
->connto
.port
);
281 void syncListener(SyncQueue
* queue
)
285 boost::asio::io_service io_service
;
286 SyncTcpConnection::proto::resolver
resolver(io_service
);
287 SyncTcpConnection::proto::endpoint e
;
288 if(gOpt
.getLocalSyncAddr()!="")
290 SyncTcpConnection::proto::resolver::query
query(gOpt
.getLocalSyncAddr(), gOpt
.getLocalSyncPort());
291 e
= *resolver
.resolve(query
);
293 SyncTcpConnection::proto::resolver::query
query(gOpt
.getLocalSyncPort());
294 e
= *resolver
.resolve(query
);
298 SyncServer
server(io_service
,e
);
299 server
.onConnect
=boost::bind(syncOnConnect
,_1
);
300 queue
->setSyncServerPtr(&server
);
303 catch (std::exception
& e
)
305 std::string addr
= gOpt
.getLocalSyncAddr() == "" ? "*" : gOpt
.getLocalSyncAddr();
306 cLog
.msg(Log::PRIO_ERR
) << "sync: cannot bind to " << addr
<< ":" << gOpt
.getLocalSyncPort()
307 << " (" << e
.what() << ")" << std::endl
;
312 int main(int argc
, char* argv
[])
314 // std::cout << "anyrtpproxy" << std::endl;
315 if(!gOpt
.parse(argc
, argv
))
321 cLog
.setLogName("anyrtpproxy");
322 cLog
.msg(Log::PRIO_NOTICE
) << "anyrtpproxy started...";
324 std::ofstream pidFile
;
325 if(gOpt
.getPidFile() != "") {
326 pidFile
.open(gOpt
.getPidFile().c_str());
327 if(!pidFile
.is_open()) {
328 std::cout
<< "can't open pid file" << std::endl
;
333 chrootAndDrop(gOpt
.getChrootDir(), gOpt
.getUsername());
334 if(gOpt
.getDaemonize())
337 if(pidFile
.is_open()) {
338 pid_t pid
= getpid();
343 SignalController sig
;
349 boost::thread
listenerManagerThread(boost::bind(listenerManager
,&queue
));
352 // #ifndef ANYTUN_NOSYNC
353 // boost::thread * syncListenerThread;
354 // if(gOpt.getLocalSyncPort() != "")
355 // syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
357 // std::list<boost::thread *> connectThreads;
358 // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
359 // ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
360 // connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
366 // pthread_t syncListenerThread;
368 // ConnectToList connect_to = gOpt.getConnectTo();
369 // ThreadParam p( queue,*(new OptionConnectTo()));
370 // if ( gOpt.getLocalSyncPort())
371 // pthread_create(&syncListenerThread, NULL, syncListener, &p);
373 // std::list<pthread_t> connectThreads;
374 // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
376 // connectThreads.push_back(pthread_t());
377 // ThreadParam * point = new ThreadParam(queue,*it);
378 // pthread_create(& connectThreads.back(), NULL, syncConnector, point);
381 PortWindow
port_window(gOpt
.getRtpStartPort(),gOpt
.getRtpEndPort());
382 CommandHandler
cmd(queue
, gOpt
.getControlInterface().addr_
, gOpt
.getControlInterface().port_
,port_window
);