anyrtpproxy allmost finished
[anytun.git] / src / anyrtpproxy / anyrtpproxy.cpp
blobc96b0869e9c013c392ae0ee3c94f18980ce499e5
1 /*
2 * anytun
4 * The secure anycast tunneling protocol (satp) defines a protocol used
5 * for communication between any combination of unicast and anycast
6 * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
7 * mode and allows tunneling of every ETHER TYPE protocol (e.g.
8 * ethernet, ip, arp ...). satp directly includes cryptography and
9 * message authentication based on the methodes used by SRTP. It is
10 * intended to deliver a generic, scaleable and secure solution for
11 * tunneling and relaying of packets of any protocol.
14 * Copyright (C) 2007-2008 Othmar Gsenger, Erwin Nindl,
15 * Christian Pointner <satp@wirdorange.org>
17 * This file is part of Anytun.
19 * Anytun is free software: you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License version 3 as
21 * published by the Free Software Foundation.
23 * Anytun is distributed in the hope that it will be useful,
24 * but WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * GNU General Public License for more details.
28 * You should have received a copy of the GNU General Public License
29 * along with anytun. If not, see <http://www.gnu.org/licenses/>.
32 #include <iostream>
34 #include <boost/asio.hpp>
36 #include <fcntl.h>
37 #include <pwd.h>
38 #include <grp.h>
40 #include "../datatypes.h"
42 #include "../log.h"
43 #include "../signalController.h"
44 #include "../buffer.h"
45 #include "connectionList.h"
46 #include "../rtpSessionTable.h"
47 #include "../syncCommand.h"
48 #include "../syncQueue.h"
49 #include "../syncClient.h"
50 //#include "../syncOnConnect.h"
52 #include "../threadUtils.hpp"
54 #include "commandHandler.h"
55 #include "callIdQueue.h"
57 #include "options.h"
58 #include "portWindow.h"
59 #include <map>
60 #include <fstream>
62 #define MAX_PACKET_SIZE 1500
64 void listener(RtpSession::proto::socket* sock1, RtpSession::proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running)
66 cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started";
68 try
70 Buffer buf(u_int32_t(MAX_PACKET_SIZE));
71 RtpSession::proto::endpoint remote_end;
73 while(1) {
74 buf.setLength(MAX_PACKET_SIZE);
75 u_int32_t len=0;
76 if(dir == 1)
77 len = 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
78 else if(dir == 2)
79 len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000);
80 else break;
82 RtpSession& session = gRtpSessionTable.getSession(call_id);
83 if(session.isDead()) {
84 cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting";
85 break;
88 if(!len)
89 continue;
90 buf.setLength(len);
92 if((dir == 1 && remote_end != session.getRemoteEnd1()) ||
93 (dir == 2 && remote_end != session.getRemoteEnd2()))
95 if(gOpt.getNat() ||
96 (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) ||
97 (dir == 2 && !session.getSeen2()))))
99 cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to "
100 << remote_end;
101 if(dir == 1)
102 session.setRemoteEnd1(remote_end);
103 if(dir == 2)
104 session.setRemoteEnd2(remote_end);
106 if(!gOpt.getNat()) { // with nat enabled sync is not needed
107 SyncCommand sc(call_id);
108 queue->push(sc);
111 else
112 continue;
114 session.setSeen1();
115 session.setSeen2();
117 if(dir == 1)
118 sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2());
119 else if(dir == 2)
120 sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1());
121 else break;
124 catch(std::exception &e)
126 cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what();
128 *running = false;
129 gCallIdQueue.push(call_id);
132 class ListenerData
134 public:
135 ListenerData()
137 ios1_ = new boost::asio::io_service();
138 sock1_ = new RtpSession::proto::socket(*ios1_);
139 ios2_ = new boost::asio::io_service();
140 sock2_ = new RtpSession::proto::socket(*ios2_);
142 ~ListenerData()
144 if(sock1_) delete sock1_;
145 if(ios1_) delete ios1_;
146 if(sock2_) delete sock2_;
147 if(ios2_) delete ios2_;
150 boost::asio::io_service* ios1_;
151 boost::asio::io_service* ios2_;
152 RtpSession::proto::socket* sock1_;
153 RtpSession::proto::socket* sock2_;
154 boost::thread* thread1_;
155 boost::thread* thread2_;
156 bool running1_;
157 bool running2_;
160 void listenerManager(void* p)
162 SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
164 std::map<std::string, ListenerData*> listenerMap;
165 while(1)
167 try
169 std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
170 gCallIdQueue.pop();
172 RtpSession& session = gRtpSessionTable.getSession(call_id);
173 if(!session.isComplete())
174 continue;
176 std::map<std::string, ListenerData*>::iterator it;
177 it = listenerMap.find(call_id);
178 if(it == listenerMap.end()) // listener Threads not existing yet
180 ListenerData* ld = new ListenerData();
182 ld->sock1_->open(session.getLocalEnd1().protocol());
183 ld->sock1_->bind(session.getLocalEnd1());
185 ld->sock2_->open(session.getLocalEnd2().protocol());
186 ld->sock2_->bind(session.getLocalEnd2());
188 ld->thread1_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 1, queue_, &(ld->running1_)));
189 ld->thread2_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 2, queue_, &(ld->running2_)));
191 std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
192 ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
193 continue;
196 if(!it->second->running1_ && !it->second->running2_)
198 cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
199 if(it->second->thread1_) {
200 it->second->thread1_->join();
201 delete it->second->thread1_;
203 if(it->second->thread2_) {
204 it->second->thread2_->join();
205 delete it->second->thread2_;
207 delete it->second;
208 listenerMap.erase(it);
209 gRtpSessionTable.delSession(call_id);
210 continue;
212 // TODO: reinit if session changed
214 catch(std::exception &e)
216 cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
217 usleep(500); // in case of an hard error don't block cpu (this is ugly)
220 cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason";
223 void chrootAndDrop(string const& chrootdir, string const& username)
225 if (getuid() != 0)
227 std::cerr << "this programm has to be run as root in order to run in a chroot" << std::endl;
228 exit(-1);
231 struct passwd *pw = getpwnam(username.c_str());
232 if(pw) {
233 if(chroot(chrootdir.c_str()))
235 std::cerr << "can't chroot to " << chrootdir << std::endl;
236 exit(-1);
238 std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
239 chdir("/");
240 if (initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid))
242 std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
243 exit(-1);
245 std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
247 else
249 std::cerr << "unknown user " << username << std::endl;
250 exit(-1);
254 void daemonize()
256 pid_t pid;
258 pid = fork();
259 if(pid) exit(0);
260 setsid();
261 pid = fork();
262 if(pid) exit(0);
264 // std::cout << "running in background now..." << std::endl;
266 int fd;
267 // for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
268 for (fd=0;fd<=2;fd++) // close all file descriptors
269 close(fd);
270 fd=open("/dev/null",O_RDWR); // stdin
271 dup(fd); // stdout
272 dup(fd); // stderr
273 umask(027);
276 class ThreadParam
278 public:
279 ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
280 : queue(queue_),connto(connto_)
282 SyncQueue & queue;
283 OptionConnectTo & connto;
286 void syncConnector(void* p)
288 ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
290 SyncClient sc ( param->connto.host, param->connto.port);
291 sc.run();
294 void syncListener(SyncQueue * queue)
298 boost::asio::io_service io_service;
299 SyncTcpConnection::proto::resolver resolver(io_service);
300 SyncTcpConnection::proto::endpoint e;
301 if(gOpt.getLocalSyncAddr()!="")
303 SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncAddr(), gOpt.getLocalSyncPort());
304 e = *resolver.resolve(query);
305 } else {
306 SyncTcpConnection::proto::resolver::query query(gOpt.getLocalSyncPort());
307 e = *resolver.resolve(query);
311 SyncServer server(io_service,e);
312 // server.onConnect=boost::bind(syncOnConnect,_1);
313 queue->setSyncServerPtr(&server);
314 io_service.run();
316 catch (std::exception& e)
318 std::string addr = gOpt.getLocalSyncAddr() == "" ? "*" : gOpt.getLocalSyncAddr();
319 cLog.msg(Log::PRIO_ERR) << "sync: cannot bind to " << addr << ":" << gOpt.getLocalSyncPort()
320 << " (" << e.what() << ")" << std::endl;
325 int main(int argc, char* argv[])
327 // std::cout << "anyrtpproxy" << std::endl;
328 if(!gOpt.parse(argc, argv))
330 gOpt.printUsage();
331 exit(-1);
334 cLog.setLogName("anyrtpproxy");
335 cLog.msg(Log::PRIO_NOTICE) << "anyrtpproxy started...";
337 std::ofstream pidFile;
338 if(gOpt.getPidFile() != "") {
339 pidFile.open(gOpt.getPidFile().c_str());
340 if(!pidFile.is_open()) {
341 std::cout << "can't open pid file" << std::endl;
345 if(gOpt.getChroot())
346 chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
347 if(gOpt.getDaemonize())
348 daemonize();
350 if(pidFile.is_open()) {
351 pid_t pid = getpid();
352 pidFile << pid;
353 pidFile.close();
356 SignalController sig;
357 sig.init();
359 SyncQueue queue;
362 boost::thread listenerManagerThread(boost::bind(listenerManager,&queue));
365 // #ifndef ANYTUN_NOSYNC
366 // boost::thread * syncListenerThread;
367 // if(gOpt.getLocalSyncPort() != "")
368 // syncListenerThread = new boost::thread(boost::bind(syncListener,&queue));
370 // std::list<boost::thread *> connectThreads;
371 // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it) {
372 // ThreadParam * point = new ThreadParam(dev, *src, cl, queue,*it);
373 // connectThreads.push_back(new boost::thread(boost::bind(syncConnector,point)));
374 // }
375 // #endif
379 // pthread_t syncListenerThread;
381 // ConnectToList connect_to = gOpt.getConnectTo();
382 // ThreadParam p( queue,*(new OptionConnectTo()));
383 // if ( gOpt.getLocalSyncPort())
384 // pthread_create(&syncListenerThread, NULL, syncListener, &p);
386 // std::list<pthread_t> connectThreads;
387 // for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
388 // {
389 // connectThreads.push_back(pthread_t());
390 // ThreadParam * point = new ThreadParam(queue,*it);
391 // pthread_create(& connectThreads.back(), NULL, syncConnector, point);
392 // }
394 PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
395 CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
397 int ret = sig.run();
398 return ret;