From 355f39fc53a34ed76ea56edd947bad4ff0da98a4 Mon Sep 17 00:00:00 2001 From: equinox Date: Tue, 25 Nov 2008 16:58:46 +0000 Subject: [PATCH] anyrtpproxy ported to boost threads and boost asio still some work git-svn-id: https://anytun.org/svn/anytun@612 2edecd69-f0ce-4815-94af-351a89d40aaa --- src/anyrtpproxy/anyrtpproxy.cpp | 183 +++++++++++++++++++------------------ src/anyrtpproxy/commandHandler.cpp | 2 +- src/anyrtpproxy/options.cpp | 2 +- 3 files changed, 96 insertions(+), 91 deletions(-) diff --git a/src/anyrtpproxy/anyrtpproxy.cpp b/src/anyrtpproxy/anyrtpproxy.cpp index bbfd412..91cb0e9 100644 --- a/src/anyrtpproxy/anyrtpproxy.cpp +++ b/src/anyrtpproxy/anyrtpproxy.cpp @@ -31,6 +31,8 @@ #include +#include + #include #include #include @@ -59,55 +61,29 @@ #define MAX_PACKET_SIZE 1500 +typedef boost::asio::ip::udp rtp_proto; -class ThreadParam -{ -public: - ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_) - : queue(queue_),connto(connto_) - {}; - SyncQueue & queue; - OptionConnectTo & connto; -}; - -class ListenerThreadParam +void listener(rtp_proto::socket* sock1, rtp_proto::socket* sock2, std::string call_id, int dir, SyncQueue* queue, bool* running) { -public: - ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d, SyncQueue& q) : sock1_(s1), sock2_(s2), call_id_(c), - dir_(d), running_(true), queue_(q) - {}; - - UDPSocket& sock1_; - UDPSocket& sock2_; - std::string call_id_; - int dir_; - bool running_; - SyncQueue& queue_; -}; + cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") started"; -void* listener(void* p) -{ - ListenerThreadParam* param = reinterpret_cast(p); - - cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started"; - try { Buffer buf(u_int32_t(MAX_PACKET_SIZE)); - string remote_addr; - u_int16_t remote_port; + rtp_proto::endpoint remote_end; + while(1) { buf.setLength(MAX_PACKET_SIZE); u_int32_t len=0; - if(param->dir_ == 1) - len = param->sock1_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000); - else if(param->dir_ == 2) - len = param->sock2_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000); + if(dir == 1) + len = 0;//sock1->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000); + else if(dir == 2) + len = 0; //sock2->recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_end, 1000); else break; - RtpSession& session = gRtpSessionTable.getSession(param->call_id_); + RtpSession& session = gRtpSessionTable.getSession(call_id); if(session.isDead()) { - cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") session is dead, exiting"; + cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") session is dead, exiting"; break; } @@ -115,27 +91,23 @@ void* listener(void* p) continue; buf.setLength(len); - if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) || - (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2()))) +// if((dir == 1 && remote_end != session.getRemoteEnd1()) || +// (dir == 2 && remote_end != session.getRemoteEnd2())) { if(gOpt.getNat() || - (!gOpt.getNoNatOnce() && ((param->dir_ == 1 && !session.getSeen1()) || - (param->dir_ == 2 && !session.getSeen2())))) + (!gOpt.getNoNatOnce() && ((dir == 1 && !session.getSeen1()) || + (dir == 2 && !session.getSeen2())))) { - cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") setting remote host to " - << remote_addr << ":" << remote_port; - if(param->dir_ == 1) { - session.setRemotePort1(remote_port); - session.setRemoteAddr1(remote_addr); - } - if(param->dir_ == 2) { - session.setRemotePort2(remote_port); - session.setRemoteAddr2(remote_addr); - } + cLog.msg(Log::PRIO_NOTICE) << "listener(" << call_id << "/" << dir << ") setting remote host to " + << remote_end; +// if(dir == 1) +// session.setRemoteEnd1(remote_end); +// if(dir == 2) +// session.setRemoteEnd2(remote_end); if(!gOpt.getNat()) { // with nat enabled sync is not needed - SyncCommand sc(param->call_id_); - param->queue_.push(sc); + SyncCommand sc(call_id); + queue->push(sc); } } else @@ -144,37 +116,50 @@ void* listener(void* p) session.setSeen1(); session.setSeen2(); - if(param->dir_ == 1) - param->sock2_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr2(), session.getRemotePort2()); - else if(param->dir_ == 2) - param->sock1_.sendTo(buf.getBuf(), buf.getLength(), session.getRemoteAddr1(), session.getRemotePort1()); - else break; +// if(dir == 1) +// sock2->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd2()); +// else if(dir == 2) +// sock1->send_to(boost::asio::buffer(buf.getBuf(), buf.getLength()), session.getRemoteEnd1()); +// else break; } } catch(std::exception &e) { - cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what(); + cLog.msg(Log::PRIO_ERR) << "listener(" << call_id << "/" << dir << ") exiting because: " << e.what(); } - param->running_ = false; - gCallIdQueue.push(param->call_id_); - pthread_exit(NULL); + *running = false; + gCallIdQueue.push(call_id); } class ListenerData { public: - ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : param1_(lp1), param2_(lp2) - {}; - - UDPSocket* sock1_; - UDPSocket* sock2_; - pthread_t thread1_; - pthread_t thread2_; - ListenerThreadParam param1_; - ListenerThreadParam param2_; + ListenerData() + { + ios1_ = new boost::asio::io_service(); + sock1_ = new rtp_proto::socket(*ios1_); + ios2_ = new boost::asio::io_service(); + sock2_ = new rtp_proto::socket(*ios2_); + } + ~ListenerData() + { + if(sock1_) delete sock1_; + if(ios1_) delete ios1_; + if(sock2_) delete sock2_; + if(ios2_) delete ios2_; + } + + boost::asio::io_service* ios1_; + boost::asio::io_service* ios2_; + rtp_proto::socket* sock1_; + rtp_proto::socket* sock2_; + boost::thread* thread1_; + boost::thread* thread2_; + bool running1_; + bool running2_; }; -void* listenerManager(void* p) +void listenerManager(void* p) { SyncQueue* queue_ = reinterpret_cast(p); @@ -193,29 +178,40 @@ void* listenerManager(void* p) std::map::iterator it; it = listenerMap.find(call_id); if(it == listenerMap.end()) // listener Threads not existing yet - { - UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1()); - UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2()); - - ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1, *queue_), - ListenerThreadParam(*sock1, *sock2, call_id, 2, *queue_)); - ld->sock1_ = sock1; - ld->sock2_ = sock2; - pthread_create(&(ld->thread1_), NULL, listener, &(ld->param1_)); - pthread_create(&(ld->thread2_), NULL, listener, &(ld->param2_)); - + { + ListenerData* ld = new ListenerData(); + + rtp_proto::resolver resolver1(*(ld->ios1_)); + rtp_proto::resolver::query query1(session.getLocalAddr(), session.getLocalPort1()); + rtp_proto::endpoint e1 = *resolver1.resolve(query1); + ld->sock1_->open(e1.protocol()); + ld->sock1_->bind(e1); + + rtp_proto::resolver resolver2(*(ld->ios2_)); + rtp_proto::resolver::query query2(session.getLocalAddr(), session.getLocalPort2()); + rtp_proto::endpoint e2 = *resolver2.resolve(query2); + ld->sock2_->open(e2.protocol()); + ld->sock2_->bind(e2); + + ld->thread1_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 1, queue_, &(ld->running1_))); + ld->thread2_ = new boost::thread(boost::bind(listener, ld->sock1_, ld->sock2_, call_id, 2, queue_, &(ld->running2_))); + std::pair::iterator, bool> ret; ret = listenerMap.insert(std::map::value_type(call_id, ld)); continue; } - if(!it->second->param1_.running_ && !it->second->param2_.running_) + if(!it->second->running1_ && !it->second->running2_) { cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up"; - pthread_join(it->second->thread1_, NULL); - pthread_join(it->second->thread2_, NULL); - delete it->second->sock1_; - delete it->second->sock2_; + if(it->second->thread1_) { + it->second->thread1_->join(); + delete it->second->thread1_; + } + if(it->second->thread2_) { + it->second->thread2_->join(); + delete it->second->thread2_; + } delete it->second; listenerMap.erase(it); gRtpSessionTable.delSession(call_id); @@ -230,7 +226,6 @@ void* listenerManager(void* p) } } cLog.msg(Log::PRIO_ERR) << "listenerManager exiting because of unknown reason"; - pthread_exit(NULL); } void chrootAndDrop(string const& chrootdir, string const& username) @@ -286,6 +281,16 @@ void daemonize() umask(027); } +class ThreadParam +{ +public: + ThreadParam(SyncQueue & queue_,OptionConnectTo & connto_) + : queue(queue_),connto(connto_) + {}; + SyncQueue & queue; + OptionConnectTo & connto; +}; + void syncConnector(void* p) { ThreadParam* param = reinterpret_cast(p); diff --git a/src/anyrtpproxy/commandHandler.cpp b/src/anyrtpproxy/commandHandler.cpp index 495f6e6..c95d338 100644 --- a/src/anyrtpproxy/commandHandler.cpp +++ b/src/anyrtpproxy/commandHandler.cpp @@ -94,7 +94,7 @@ void CommandHandler::run(void* s) self->control_sock_.send_to(boost::asio::buffer(ret.c_str(), ret.length()), remote_end); } } - catch(SocketException &e) + catch(std::exception& e) { self->running_ = false; } diff --git a/src/anyrtpproxy/options.cpp b/src/anyrtpproxy/options.cpp index e3f3850..fa0879e 100644 --- a/src/anyrtpproxy/options.cpp +++ b/src/anyrtpproxy/options.cpp @@ -191,7 +191,7 @@ bool Options::parse(int argc, char* argv[]) bool Options::sanityCheck() { - if(!control_interface_.port_) control_interface_.port_ = 22222; + if(control_interface_.port_ == "") control_interface_.port_ = "22222"; return true; } -- 2.11.4.GIT