4 * The secure anycast tunneling protocol (satp) defines a protocol used
5 * for communication between any combination of unicast and anycast
6 * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
7 * mode and allows tunneling of every ETHER TYPE protocol (e.g.
8 * ethernet, ip, arp ...). satp directly includes cryptography and
9 * message authentication based on the methodes used by SRTP. It is
10 * intended to deliver a generic, scaleable and secure solution for
11 * tunneling and relaying of packets of any protocol.
14 * Copyright (C) 2007-2008 Othmar Gsenger, Erwin Nindl,
15 * Christian Pointner <satp@wirdorange.org>
17 * This file is part of Anytun.
19 * Anytun is free software: you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License version 3 as
21 * published by the Free Software Foundation.
23 * Anytun is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * GNU General Public License for more details.
28 * You should have received a copy of the GNU General Public License
29 * along with anytun. If not, see <http://www.gnu.org/licenses/>.
34 #include <boost/asio.hpp>
40 #include "../datatypes.h"
43 #include "../signalController.h"
44 #include "../buffer.h"
45 #include "connectionList.h"
46 #include "../rtpSessionTable.h"
47 #include "../syncCommand.h"
48 #include "../syncQueue.h"
49 #include "../syncClient.h"
50 //#include "../syncOnConnect.h"
52 #include "../threadUtils.hpp"
54 #include "commandHandler.h"
55 #include "callIdQueue.h"
58 #include "portWindow.h"
62 #define MAX_PACKET_SIZE 1500
64 void listener(RtpSession::proto::socket
* sock1
, RtpSession::proto::socket
* sock2
, std::string call_id
, int dir
, SyncQueue
* queue
, bool* running
)
66 cLog
.msg(Log::PRIO_NOTICE
) << "listener(" << call_id
<< "/" << dir
<< ") started";
70 Buffer
buf(u_int32_t(MAX_PACKET_SIZE
));
71 RtpSession::proto::endpoint remote_end
;
74 buf
.setLength(MAX_PACKET_SIZE
);
77 len
= 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
79 len
= 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
82 RtpSession
& session
= gRtpSessionTable
.getSession(call_id
);
83 if(session
.isDead()) {
84 cLog
.msg(Log::PRIO_NOTICE
) << "listener(" << call_id
<< "/" << dir
<< ") session is dead, exiting";
92 if((dir
== 1 && remote_end
!= session
.getRemoteEnd1()) ||
93 (dir
== 2 && remote_end
!= session
.getRemoteEnd2()))
96 (!gOpt
.getNoNatOnce() && ((dir
== 1 && !session
.getSeen1()) ||
97 (dir
== 2 && !session
.getSeen2()))))
99 cLog
.msg(Log::PRIO_NOTICE
) << "listener(" << call_id
<< "/" << dir
<< ") setting remote host to "
102 session
.setRemoteEnd1(remote_end
);
104 session
.setRemoteEnd2(remote_end
);
106 if(!gOpt
.getNat()) { // with nat enabled sync is not needed
107 SyncCommand
sc(call_id
);
118 sock2
->send_to(boost::asio::buffer(buf
.getBuf(), buf
.getLength()), session
.getRemoteEnd2());
120 sock1
->send_to(boost::asio::buffer(buf
.getBuf(), buf
.getLength()), session
.getRemoteEnd1());
124 catch(std::exception
&e
)
126 cLog
.msg(Log::PRIO_ERR
) << "listener(" << call_id
<< "/" << dir
<< ") exiting because: " << e
.what();
129 gCallIdQueue
.push(call_id
);
137 ios1_
= new boost::asio::io_service();
138 sock1_
= new RtpSession::proto::socket(*ios1_
);
139 ios2_
= new boost::asio::io_service();
140 sock2_
= new RtpSession::proto::socket(*ios2_
);
144 if(sock1_
) delete sock1_
;
145 if(ios1_
) delete ios1_
;
146 if(sock2_
) delete sock2_
;
147 if(ios2_
) delete ios2_
;
150 boost::asio::io_service
* ios1_
;
151 boost::asio::io_service
* ios2_
;
152 RtpSession::proto::socket
* sock1_
;
153 RtpSession::proto::socket
* sock2_
;
154 boost::thread
* thread1_
;
155 boost::thread
* thread2_
;
160 void listenerManager(void* p
)
162 SyncQueue
* queue_
= reinterpret_cast<SyncQueue
*>(p
);
164 std::map
<std::string
, ListenerData
*> listenerMap
;
169 std::string call_id
= gCallIdQueue
.front(); // waits for semaphor and returns next call_id
172 RtpSession
& session
= gRtpSessionTable
.getSession(call_id
);
173 if(!session
.isComplete())
176 std::map
<std::string
, ListenerData
*>::iterator it
;
177 it
= listenerMap
.find(call_id
);
178 if(it
== listenerMap
.end()) // listener Threads not existing yet
180 ListenerData
* ld
= new ListenerData();
182 ld
->sock1_
->open(session
.getLocalEnd1().protocol());
183 ld
->sock1_
->bind(session
.getLocalEnd1());
185 ld
->sock2_
->open(session
.getLocalEnd2().protocol());
186 ld
->sock2_
->bind(session
.getLocalEnd2());
188 ld
->thread1_
= new boost::thread(boost::bind(listener
, ld
->sock1_
, ld
->sock2_
, call_id
, 1, queue_
, &(ld
->running1_
)));
189 ld
->thread2_
= new boost::thread(boost::bind(listener
, ld
->sock1_
, ld
->sock2_
, call_id
, 2, queue_
, &(ld
->running2_
)));
191 std::pair
<std::map
<std::string
, ListenerData
*>::iterator
, bool> ret
;
192 ret
= listenerMap
.insert(std::map
<std::string
, ListenerData
*>::value_type(call_id
, ld
));
196 if(!it
->second
->running1_
&& !it
->second
->running2_
)
198 cLog
.msg(Log::PRIO_NOTICE
) << "listenerManager both threads for '" << call_id
<< "' exited, cleaning up";
199 if(it
->second
->thread1_
) {
200 it
->second
->thread1_
->join();
201 delete it
->second
->thread1_
;
203 if(it
->second
->thread2_
) {
204 it
->second
->thread2_
->join();
205 delete it
->second
->thread2_
;
208 listenerMap
.erase(it
);
209 gRtpSessionTable
.delSession(call_id
);
212 // TODO: reinit if session changed
214 catch(std::exception
&e
)
216 cLog
.msg(Log::PRIO_ERR
) << "listenerManager restarting after exception: " << e
.what();
217 usleep(500); // in case of an hard error don't block cpu (this is ugly)
220 cLog
.msg(Log::PRIO_ERR
) << "listenerManager exiting because of unknown reason";
223 void chrootAndDrop(string
const& chrootdir
, string
const& username
)
227 std::cerr
<< "this programm has to be run as root in order to run in a chroot" << std::endl
;
231 struct passwd
*pw
= getpwnam(username
.c_str());
233 if(chroot(chrootdir
.c_str()))
235 std::cerr
<< "can't chroot to " << chrootdir
<< std::endl
;
238 std::cout
<< "we are in chroot jail (" << chrootdir
<< ") now" << std::endl
;
240 if (initgroups(pw
->pw_name
, pw
->pw_gid
) || setgid(pw
->pw_gid
) || setuid(pw
->pw_uid
))
242 std::cerr
<< "can't drop to user " << username
<< " " << pw
->pw_uid
<< ":" << pw
->pw_gid
<< std::endl
;
245 std::cout
<< "dropped user to " << username
<< " " << pw
->pw_uid
<< ":" << pw
->pw_gid
<< std::endl
;
249 std::cerr
<< "unknown user " << username
<< std::endl
;
264 // std::cout << "running in background now..." << std::endl;
267 // for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
268 for (fd
=0;fd
<=2;fd
++) // close all file descriptors
270 fd
=open("/dev/null",O_RDWR
); // stdin
279 ThreadParam(SyncQueue
& queue_
,OptionConnectTo
& connto_
)
280 : queue(queue_
),connto(connto_
)
283 OptionConnectTo
& connto
;
286 void syncConnector(void* p
)
288 ThreadParam
* param
= reinterpret_cast<ThreadParam
*>(p
);
290 SyncClient
sc ( param
->connto
.host
, param
->connto
.port
);
294 void syncListener(SyncQueue
* queue
)
298 boost::asio::io_service io_service
;
299 SyncTcpConnection::proto::resolver
resolver(io_service
);
300 SyncTcpConnection::proto::endpoint e
;
301 if(gOpt
.getLocalSyncAddr()!="")
303 SyncTcpConnection::proto::resolver::query
query(gOpt
.getLocalSyncAddr(), gOpt
.getLocalSyncPort());
304 e
= *resolver
.resolve(query
);
306 SyncTcpConnection::proto::resolver::query
query(gOpt
.getLocalSyncPort());
307 e
= *resolver
.resolve(query
);
311 SyncServer
server(io_service
,e
);
312 // server.onConnect=boost::bind(syncOnConnect,_1);
313 queue
->setSyncServerPtr(&server
);
316 catch (std::exception
& e
)
318 std::string addr
= gOpt
.getLocalSyncAddr() == "" ? "*" : gOpt
.getLocalSyncAddr();
319 cLog
.msg(Log::PRIO_ERR
) << "sync: cannot bind to " << addr
<< ":" << gOpt
.getLocalSyncPort()
320 << " (" << e
.what() << ")" << std::endl
;
325 int main(int argc
, char* argv
[])
327 // std::cout << "anyrtpproxy" << std::endl;
328 if(!gOpt
.parse(argc
, argv
))
334 cLog
.setLogName("anyrtpproxy");
335 cLog
.msg(Log::PRIO_NOTICE
) << "anyrtpproxy started...";
337 std::ofstream pidFile
;
338 if(gOpt
.getPidFile() != "") {
339 pidFile
.open(gOpt
.getPidFile().c_str());
340 if(!pidFile
.is_open()) {
341 std::cout
<< "can't open pid file" << std::endl
;
346 chrootAndDrop(gOpt
.getChrootDir(), gOpt
.getUsername());
347 if(gOpt
.getDaemonize())
350 if(pidFile
.is_open()) {
351 pid_t pid
= getpid();
356 SignalController sig
;
362 boost::thread
listenerManagerThread(boost::bind(listenerManager
,&queue
));
365 // #ifndef ANYTUN_NOSYNC
366 // boost::thread * syncListenerThread;
367 // if(gOpt.getLocalSyncPort() != "")
368 // syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
370 // std::list<boost::thread *> connectThreads;
371 // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
372 // ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
373 // connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
379 // pthread_t syncListenerThread;
381 // ConnectToList connect_to = gOpt.getConnectTo();
382 // ThreadParam p( queue,*(new OptionConnectTo()));
383 // if ( gOpt.getLocalSyncPort())
384 // pthread_create(&syncListenerThread, NULL, syncListener, &p);
386 // std::list<pthread_t> connectThreads;
387 // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
389 // connectThreads.push_back(pthread_t());
390 // ThreadParam * point = new ThreadParam(queue,*it);
391 // pthread_create(& connectThreads.back(), NULL, syncConnector, point);
394 PortWindow
port_window(gOpt
.getRtpStartPort(),gOpt
.getRtpEndPort());
395 CommandHandler
cmd(queue
, gOpt
.getControlInterface().addr_
, gOpt
.getControlInterface().port_
,port_window
);