added rtpsession seen
[anytun.git] / anyrtpproxy / anyrtpproxy.cpp
blobcf5b69dcb14b6e8028a27b010d430d59f430db18
1 /*
2 * anytun
4 * The secure anycast tunneling protocol (satp) defines a protocol used
5 * for communication between any combination of unicast and anycast
6 * tunnel endpoints. It has less protocol overhead than IPSec in Tunnel
7 * mode and allows tunneling of every ETHER TYPE protocol (e.g.
8 * ethernet, ip, arp ...). satp directly includes cryptography and
9 * message authentication based on the methodes used by SRTP. It is
10 * intended to deliver a generic, scaleable and secure solution for
11 * tunneling and relaying of packets of any protocol.
14 * Copyright (C) 2007 anytun.org <satp@wirdorange.org>
16 * This program is free software; you can redistribute it and/or modify
17 * it under the terms of the GNU General Public License version 2
18 * as published by the Free Software Foundation.
20 * This program is distributed in the hope that it will be useful,
21 * but WITHOUT ANY WARRANTY; without even the implied warranty of
22 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 * GNU General Public License for more details.
25 * You should have received a copy of the GNU General Public License
26 * along with this program (see the file COPYING included with this
27 * distribution); if not, write to the Free Software Foundation, Inc.,
28 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
31 #include <iostream>
33 #include <fcntl.h>
34 #include <pwd.h>
35 #include <grp.h>
37 #include "../datatypes.h"
39 #include "../log.h"
40 #include "../signalController.h"
41 #include "../PracticalSocket.h"
42 #include "../buffer.h"
43 #include "connectionList.h"
44 #include "../rtpSessionTable.h"
45 #include "../syncCommand.h"
46 #include "../syncQueue.h"
47 #include "../syncSocketHandler.h"
48 #include "../syncListenSocket.h"
50 #include "../syncSocket.h"
51 #include "../syncClientSocket.h"
52 #include "../threadUtils.hpp"
54 #include "commandHandler.h"
55 #include "callIdQueue.h"
57 #include "options.h"
58 #include "portWindow.h"
59 #include <map>
62 #define MAX_PACKET_SIZE 1500
65 class ThreadParam
67 public:
68 ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_)
69 : queue(queue_),connto(connto_)
70 {};
71 SyncQueue & queue;
72 OptionConnectTo & connto;
75 class ListenerThreadParam
77 public:
78 ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), dir_(d)
79 {};
81 UDPSocket& sock1_;
82 UDPSocket& sock2_;
83 std::string call_id_;
84 int dir_;
87 void* listener(void* p)
89 ListenerThreadParam* param = reinterpret_cast<ListenerThreadParam*>(p);
91 cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started";
93 try
95 Buffer buf(u_int32_t(MAX_PACKET_SIZE));
96 string remote_addr;
97 u_int16_t remote_port;
98 while(1) {
99 buf.setLength(MAX_PACKET_SIZE);
100 u_int32_t len=0;
101 if(param->dir_ == 1)
102 len = param->sock1_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port);
103 else if(param->dir_ == 2)
104 len = param->sock2_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port);
105 else break;
106 buf.setLength(len);
108 RtpSession& session = gRtpSessionTable.getSession(param->call_id_);
109 if(session.isDead())
110 break;
112 if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) ||
113 (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2())))
115 //TODO: if weak? don't check but save the new(?) remote addr into list
116 continue;
119 if(param->dir_ == 1)
121 param->sock2_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr2(), session.getRemotePort2());
123 else if(param->dir_ == 2)
125 param->sock1_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr1(), session.getRemotePort1());
129 catch(std::exception &e)
131 cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what();
133 cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting normally";
135 pthread_exit(NULL);
138 class ListenerData
140 public:
141 ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2)
144 UDPSocket* sock1_;
145 UDPSocket* sock2_;
146 pthread_t threads1_;
147 pthread_t threads2_;
148 ListenerThreadParam params1_;
149 ListenerThreadParam params2_;
152 void* listenerManager(void* dont_use_me)
154 try
156 std::map<std::string, ListenerData> listenerMap;
157 while(1)
159 std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id
160 gCallIdQueue.pop();
162 RtpSession& session = gRtpSessionTable.getSession(call_id);
163 if(!session.isComplete())
164 continue;
166 std::map<std::string, ListenerData>::iterator it;
167 it = listenerMap.find(call_id);
168 if(it == listenerMap.end()) // listener Threads not existing yet
170 cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: "
171 << session.getLocalAddr() << ":" << session.getLocalPort1() << " "
172 << session.getLocalAddr() << ":" << session.getLocalPort2();
174 UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1());
175 UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2());
177 ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1),
178 ListenerThreadParam(*sock1, *sock2, call_id, 2));
179 ld.sock1_ = sock1;
180 ld.sock2_ = sock2;
181 pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_));
182 pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_));
184 std::pair<std::map<std::string, ListenerData>::iterator, bool> ret;
185 ret = listenerMap.insert(std::map<std::string, ListenerData>::value_type(call_id, ld));
186 it = ret.first;
187 continue;
189 // TODO: reinit if session is changed or cleanup if it is daed
192 catch(std::exception &e)
194 cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because: " << e.what();
196 pthread_exit(NULL);
199 void chrootAndDrop(string const& chrootdir, string const& username)
201 if (getuid() != 0)
203 std::cerr << "this programm has to be run as root in order to run in a chroot" << std::endl;
204 exit(-1);
207 struct passwd *pw = getpwnam(username.c_str());
208 if(pw) {
209 if(chroot(chrootdir.c_str()))
211 std::cerr << "can't chroot to " << chrootdir << std::endl;
212 exit(-1);
214 std::cout << "we are in chroot jail (" << chrootdir << ") now" << std::endl;
215 chdir("/");
216 if (initgroups(pw->pw_name, pw->pw_gid) || setgid(pw->pw_gid) || setuid(pw->pw_uid))
218 std::cerr << "can't drop to user " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
219 exit(-1);
221 std::cout << "dropped user to " << username << " " << pw->pw_uid << ":" << pw->pw_gid << std::endl;
223 else
225 std::cerr << "unknown user " << username << std::endl;
226 exit(-1);
230 void daemonize()
232 pid_t pid;
234 pid = fork();
235 if(pid) exit(0);
236 setsid();
237 pid = fork();
238 if(pid) exit(0);
240 std::cout << "running in background now..." << std::endl;
242 int fd;
243 for (fd=getdtablesize();fd>=0;--fd) // close all file descriptors
244 close(fd);
245 fd=open("/dev/null",O_RDWR); // stdin
246 dup(fd); // stdout
247 dup(fd); // stderr
248 umask(027);
251 void* syncConnector(void* p )
253 ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
255 SocketHandler h;
256 ConnectionList cl;
257 SyncClientSocket sock(h,cl);
258 sock.Open( param->connto.host, param->connto.port);
259 h.Add(&sock);
260 while (h.GetCount())
262 h.Select();
264 pthread_exit(NULL);
267 void* syncListener(void* p )
269 ThreadParam* param = reinterpret_cast<ThreadParam*>(p);
270 ConnectionList cl;
272 SyncSocketHandler h(param->queue);
273 SyncListenSocket<SyncSocket,ConnectionList> l(h,cl);
275 if (l.Bind(gOpt.getLocalSyncPort()))
276 pthread_exit(NULL);
278 Utility::ResolveLocal(); // resolve local hostname
279 h.Add(&l);
280 h.Select(1,0);
281 while (1) {
282 h.Select(1,0);
286 int main(int argc, char* argv[])
288 std::cout << "anyrtpproxy" << std::endl;
289 if(!gOpt.parse(argc, argv))
291 gOpt.printUsage();
292 exit(-1);
295 if(gOpt.getChroot())
296 chrootAndDrop(gOpt.getChrootDir(), gOpt.getUsername());
297 if(gOpt.getDaemonize())
298 daemonize();
300 cLog.setLogName("anyrtpproxy");
301 cLog.msg(Log::PRIO_NOTICE) << "anyrtpproxy started...";
303 SignalController sig;
304 sig.init();
306 pthread_t listenerManagerThread;
307 pthread_create(&listenerManagerThread, NULL, listenerManager, NULL);
308 pthread_detach(listenerManagerThread);
310 pthread_t syncListenerThread;
312 SyncQueue queue;
313 ConnectToList connect_to = gOpt.getConnectTo();
314 ThreadParam p( queue,*(new OptionConnectTo()));
315 if ( gOpt.getLocalSyncPort())
316 pthread_create(&syncListenerThread, NULL, syncListener, &p);
318 std::list<pthread_t> connectThreads;
319 for(ConnectToList::iterator it = connect_to.begin() ;it != connect_to.end(); ++it)
321 connectThreads.push_back(pthread_t());
322 ThreadParam * point = new ThreadParam(queue,*it);
323 pthread_create(& connectThreads.back(), NULL, syncConnector, point);
326 PortWindow port_window(gOpt.getRtpStartPort(),gOpt.getRtpEndPort());
327 CommandHandler cmd(queue, gOpt.getControlInterface().addr_, gOpt.getControlInterface().port_,port_window);
329 int ret = sig.run();
330 return ret;