big svn cleanup
[anytun.git] / src / anyrtpproxy / anyrtpproxy.cpp
blobd29e4d4dd3154452dee021b0dd21888e478c9677
1 /*
2 * anytun
4 * The secure anycast tunneling protocol (satp) defines a protocol used
5 * for communication between any combination of unicast and anycast
6 * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
7 * mode and allows tunneling of every ETHER TYPE protocol (e.g.
8 * ethernet, ip, arp ...). satp directly includes cryptography and
9 * message authentication based on the methodes used by SRTP. It is
10 * intended to deliver a generic, scaleable and secure solution for
11 * tunneling and relaying of packets of any protocol.
14 * Copyright (C) 2007 anytun.org <satp@wirdorange.org>
16 * This program is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License version 2
18 * as published by the Free Software Foundation.
20 * This program is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * GNU General Public License for more details.
25 * You should have received a copy of the GNU General Public License
26 * along with this program (see the file COPYING included with this
27 * distribution); if not, write to the Free Software Foundation, Inc.,
28 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
31 #include <iostream>
33 #include <fcntl.h>
34 #include <pwd.h>
35 #include <grp.h>
37 #include "../datatypes.h"
39 #include "../log.h"
40 #include "../signalController.h"
41 #include "../PracticalSocket.h"
42 #include "../buffer.h"
43 #include "connectionList.h"
44 #include "../rtpSessionTable.h"
45 #include "../syncCommand.h"
46 #include "../syncQueue.h"
47 #include "../syncSocketHandler.h"
48 #include "../syncListenSocket.h"
50 #include "../syncSocket.h"
51 #include "../syncClientSocket.h"
52 #include "../threadUtils.hpp"
54 #include "commandHandler.h"
55 #include "callIdQueue.h"
57 #include "options.h"
58 #include "portWindow.h"
59 #include <map>
62 #define MAX_PACKET_SIZE 1500
65 class ThreadParam
67 public:
68 ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
69 : queue(queue_),connto(connto_)
70 {};
71 SyncQueue & queue;
72 OptionConnectTo & connto;
75 class ListenerThreadParam
77 public:
78 ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d, SyncQueue& q) : sock1_(s1), sock2_(s2), call_id_(c),
79 dir_(d), running_(true), queue_(q)
80 {};
82 UDPSocket& sock1_;
83 UDPSocket& sock2_;
84 std::string call_id_;
85 int dir_;
86 bool running_;
87 SyncQueue& queue_;
90 void* listener(void* p)
92 ListenerThreadParam* param = reinterpret_cast<ListenerThreadParam*>(p);
94 cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
96 try
98 Buffer buf(u_int32_t(MAX_PACKET_SIZE));
99 string remote_addr;
100 u_int16_t remote_port;
101 while(1) {
102 buf.setLength(MAX_PACKET_SIZE);
103 u_int32_t len=0;
104 if(param->dir_ == 1)
105 len = param->sock1_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000);
106 else if(param->dir_ == 2)
107 len = param->sock2_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000);
108 else break;
110 RtpSession& session = gRtpSessionTable.getSession(param->call_id_);
111 if(session.isDead()) {
112 cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") session is dead, exiting";
113 break;
116 if(!len)
117 continue;
118 buf.setLength(len);
120 if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) ||
121 (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2())))
123 if(gOpt.getNat() ||
124 (!gOpt.getNoNatOnce() && ((param->dir_ == 1 && !session.getSeen1()) ||
125 (param->dir_ == 2 && !session.getSeen2()))))
127 cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") setting remote host to "
128 << remote_addr << ":" << remote_port;
129 if(param->dir_ == 1) {
130 session.setRemotePort1(remote_port);
131 session.setRemoteAddr1(remote_addr);
133 if(param->dir_ == 2) {
134 session.setRemotePort2(remote_port);
135 session.setRemoteAddr2(remote_addr);
138 if(!gOpt.getNat()) { // with nat enabled sync is not needed
139 SyncCommand sc(param->call_id_);
140 param->queue_.push(sc);
143 else
144 continue;
146 session.setSeen1();
147 session.setSeen2();
149 if(param->dir_ == 1)
150 param->sock2_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr2(), session.getRemotePort2());
151 else if(param->dir_ == 2)
152 param->sock1_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr1(), session.getRemotePort1());
153 else break;
156 catch(std::exception &e)
158 cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what();
160 param->running_ = false;
161 gCallIdQueue.push(param->call_id_);
162 pthread_exit(NULL);
165 class ListenerData
167 public:
168 ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : param1_(lp1), param2_(lp2)
171 UDPSocket* sock1_;
172 UDPSocket* sock2_;
173 pthread_t thread1_;
174 pthread_t thread2_;
175 ListenerThreadParam param1_;
176 ListenerThreadParam param2_;
179 void* listenerManager(void* p)
181 SyncQueue* queue_ = reinterpret_cast<SyncQueue*>(p);
183 std::map<std::string, ListenerData*> listenerMap;
184 while(1)
186 try
188 std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
189 gCallIdQueue.pop();
191 RtpSession& session = gRtpSessionTable.getSession(call_id);
192 if(!session.isComplete())
193 continue;
195 std::map<std::string, ListenerData*>::iterator it;
196 it = listenerMap.find(call_id);
197 if(it == listenerMap.end()) // listener Threads not existing yet
199 UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
200 UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
202 ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1, *queue_),
203 ListenerThreadParam(*sock1, *sock2, call_id, 2, *queue_));
204 ld->sock1_ = sock1;
205 ld->sock2_ = sock2;
206 pthread_create(&(ld->thread1_), NULL, listener, &(ld->param1_));
207 pthread_create(&(ld->thread2_), NULL, listener, &(ld->param2_));
209 std::pair<std::map<std::string, ListenerData*>::iterator, bool> ret;
210 ret = listenerMap.insert(std::map<std::string, ListenerData*>::value_type(call_id, ld));
211 continue;
214 if(!it->second->param1_.running_ && !it->second->param2_.running_)
216 cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up";
217 pthread_join(it->second->thread1_, NULL);
218 pthread_join(it->second->thread2_, NULL);
219 delete it->second->sock1_;
220 delete it->second->sock2_;
221 delete it->second;
222 listenerMap.erase(it);
223 gRtpSessionTable.delSession(call_id);
224 continue;
226 // TODO: reinit if session changed
228 catch(std::exception &e)
230 cLog.msg(Log::PRIO_ERR) << "listenerManager restarting after exception: " << e.what();
231 usleep(500); // in case of an hard error don't block cpu (this is ugly)
234 cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason";
235 pthread_exit(NULL);
238 void chrootAndDrop(string const& chrootdir, string const& username)
240 if (getuid() != 0)
242 std::cerr << "this programm has to be run as root in order to run in a chroot" << std::endl;
243 exit(-1);
246 struct passwd *pw = getpwnam(username.c_str());
247 if(pw) {
248 if(chroot(chrootdir.c_str()))
250 std::cerr << "can't chroot to " << chrootdir << std::endl;
251 exit(-1);
253 std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
254 chdir("/");
255 if (initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid))
257 std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
258 exit(-1);
260 std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
262 else
264 std::cerr << "unknown user " << username << std::endl;
265 exit(-1);
269 void daemonize()
271 pid_t pid;
273 pid = fork();
274 if(pid) exit(0);
275 setsid();
276 pid = fork();
277 if(pid) exit(0);
279 std::cout << "running in background now..." << std::endl;
281 int fd;
282 for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
283 close(fd);
284 fd=open("/dev/null",O_RDWR); // stdin
285 dup(fd); // stdout
286 dup(fd); // stderr
287 umask(027);
290 void* syncConnector(void* p )
292 ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
294 SocketHandler h;
295 ConnectionList cl;
296 SyncClientSocket sock(h,cl);
297 sock.Open( param->connto.host, param->connto.port);
298 h.Add(&sock);
299 while (h.GetCount())
301 h.Select();
303 pthread_exit(NULL);
306 void* syncListener(void* p )
308 ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
309 ConnectionList cl;
311 SyncSocketHandler h(param->queue);
312 SyncListenSocket<SyncSocket,ConnectionList> l(h,cl);
314 if (l.Bind(gOpt.getLocalSyncPort()))
315 pthread_exit(NULL);
317 Utility::ResolveLocal(); // resolve local hostname
318 h.Add(&l);
319 h.Select(1,0);
320 while (1) {
321 h.Select(1,0);
325 int main(int argc, char* argv[])
327 std::cout << "anyrtpproxy" << std::endl;
328 if(!gOpt.parse(argc, argv))
330 gOpt.printUsage();
331 exit(-1);
334 if(gOpt.getChroot())
335 chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
336 if(gOpt.getDaemonize())
337 daemonize();
339 cLog.setLogName("anyrtpproxy");
340 cLog.msg(Log::PRIO_NOTICE) << "anyrtpproxy started...";
342 SignalController sig;
343 sig.init();
345 SyncQueue queue;
347 pthread_t listenerManagerThread;
348 pthread_create(&listenerManagerThread, NULL, listenerManager, &queue);
349 pthread_detach(listenerManagerThread);
351 pthread_t syncListenerThread;
353 ConnectToList connect_to = gOpt.getConnectTo();
354 ThreadParam p( queue,*(new OptionConnectTo()));
355 if ( gOpt.getLocalSyncPort())
356 pthread_create(&syncListenerThread, NULL, syncListener, &p);
358 std::list<pthread_t> connectThreads;
359 for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
361 connectThreads.push_back(pthread_t());
362 ThreadParam * point = new ThreadParam(queue,*it);
363 pthread_create(& connectThreads.back(), NULL, syncConnector, point);
366 PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
367 CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
369 int ret = sig.run();
370 return ret;