From 004407f97bf35bd4be05f7c98ed5802eae80518d Mon Sep 17 00:00:00 2001 From: equinox Date: Tue, 18 Mar 2008 18:45:46 +0000 Subject: [PATCH] anyrtpproxy cleanup works now git-svn-id: https://anytun.org/svn/anytun@436 2edecd69-f0ce-4815-94af-351a89d40aaa --- PracticalSocket.cpp | 42 +++++++++++++++++++++++++++ PracticalSocket.h | 25 ++++++++++++++++ anyrtpproxy/anyrtpproxy.cpp | 71 +++++++++++++++++++++++++++------------------ 3 files changed, 110 insertions(+), 28 deletions(-) diff --git a/PracticalSocket.cpp b/PracticalSocket.cpp index 5d39557..6f7b51c 100644 --- a/PracticalSocket.cpp +++ b/PracticalSocket.cpp @@ -62,6 +62,7 @@ #include // For inet_addr() #include // For close() #include // For sockaddr_in + #include typedef void raw_type; // Type used for raw data on this platform #endif @@ -250,6 +251,24 @@ int CommunicatingSocket::recv(void *buffer, int bufferLen) return rtn; } +int CommunicatingSocket::recvNonBlocking(void *buffer, int bufferLen, int timeOut) + throw(SocketException) +{ + struct pollfd pfd[1]; + pfd[0].fd = sockDesc; + pfd[0].events = POLLIN; + int rtn = poll(pfd,1,timeOut); + if(rtn > 0) { + if ((rtn = ::recv(sockDesc, (raw_type *) buffer, bufferLen, 0)) < 0) { + throw SocketException("non blocking receive failed", true); + } + if(!rtn) { + throw SocketException("connection closed by peer", false); + } + } + return rtn; +} + string CommunicatingSocket::getForeignAddress() throw(SocketException) { sockaddr_in addr; @@ -399,6 +418,29 @@ int UDPSocket::recvFrom(void *buffer, int bufferLen, string &sourceAddress, return rtn; } +int UDPSocket::recvFromNonBlocking(void *buffer, int bufferLen, string &sourceAddress, + unsigned short &sourcePort, int timeOut) throw(SocketException) { + sockaddr_in clntAddr; + socklen_t addrLen = sizeof(clntAddr); + struct pollfd pfd[1]; + pfd[0].fd = sockDesc; + pfd[0].events = POLLIN; + int rtn = poll(pfd,1,timeOut); + if(rtn > 0) { + if ((rtn = recvfrom(sockDesc, (raw_type *) buffer, bufferLen, 0, + (sockaddr *) &clntAddr, (socklen_t *) &addrLen)) < 0) { + throw SocketException("Receive failed (recvfrom())", true); + } + if(!rtn) { + throw SocketException("connection closed by peer", false); + } + } + sourceAddress = inet_ntoa(clntAddr.sin_addr); + sourcePort = ntohs(clntAddr.sin_port); + + return rtn; +} + void UDPSocket::setMulticastTTL(unsigned char multicastTTL) throw(SocketException) { if (setsockopt(sockDesc, IPPROTO_IP, IP_MULTICAST_TTL, (raw_type *) &multicastTTL, sizeof(multicastTTL)) < 0) { diff --git a/PracticalSocket.h b/PracticalSocket.h index 0192cfd..58c6424 100644 --- a/PracticalSocket.h +++ b/PracticalSocket.h @@ -202,6 +202,17 @@ public: int recv(void *buffer, int bufferLen) throw(SocketException); /** + * Read into the given buffer up to bufferLen bytes data from this + * socket. Call connect() before recvNonBlocking(). + * @param buffer buffer to receive the data + * @param bufferLen maximum number of bytes to read into buffer + * @param timeout timout in ms + * @return number of bytes read, 0 for timeout, and -1 for error + * @exception SocketException thrown if unable to receive data + */ + int recvNonBlocking(void *buffer, int bufferLen, int timeout) throw(SocketException); + + /** * Get the foreign address. Call connect() before calling recv() * @return foreign address * @exception SocketException thrown if unable to fetch foreign address @@ -348,6 +359,20 @@ public: unsigned short &sourcePort) throw(SocketException); /** + * Read read up to bufferLen bytes data from this socket. The given buffer + * is where the data will be placed + * @param buffer buffer to receive data + * @param bufferLen maximum number of bytes to receive + * @param sourceAddress address of datagram source + * @param sourcePort port of data source + * @param timeout int ms + * @return number of bytes received and -1 for error + * @exception SocketException thrown if unable to receive datagram + */ + int recvFromNonBlocking(void *buffer, int bufferLen, string &sourceAddress, + unsigned short &sourcePort, int timeout) throw(SocketException); + + /** * Set the multicast TTL * @param multicastTTL multicast TTL * @exception SocketException thrown if unable to set TTL diff --git a/anyrtpproxy/anyrtpproxy.cpp b/anyrtpproxy/anyrtpproxy.cpp index cf5b69d..a0f94fa 100644 --- a/anyrtpproxy/anyrtpproxy.cpp +++ b/anyrtpproxy/anyrtpproxy.cpp @@ -75,20 +75,22 @@ public: class ListenerThreadParam { public: - ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), dir_(d) + ListenerThreadParam(UDPSocket& s1, UDPSocket& s2, std::string c, int d) : sock1_(s1), sock2_(s2), call_id_(c), + dir_(d), running_(true) {}; UDPSocket& sock1_; UDPSocket& sock2_; std::string call_id_; int dir_; + bool running_; }; void* listener(void* p) { ListenerThreadParam* param = reinterpret_cast(p); - cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started"; + cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") started"; try { @@ -99,16 +101,21 @@ void* listener(void* p) buf.setLength(MAX_PACKET_SIZE); u_int32_t len=0; if(param->dir_ == 1) - len = param->sock1_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port); + len = param->sock1_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000); else if(param->dir_ == 2) - len = param->sock2_.recvFrom(buf.getBuf(), buf.getLength(), remote_addr, remote_port); + len = param->sock2_.recvFromNonBlocking(buf.getBuf(), buf.getLength(), remote_addr, remote_port, 1000); else break; - buf.setLength(len); RtpSession& session = gRtpSessionTable.getSession(param->call_id_); - if(session.isDead()) + if(session.isDead()) { + cLog.msg(Log::PRIO_NOTICE) << "listener(" << param->call_id_ << "/" << param->dir_ << ") session is dead, exiting"; break; + } + if(!len) + continue; + buf.setLength(len); + if((param->dir_ == 1 && (remote_port != session.getRemotePort1() || remote_addr != session.getRemoteAddr1())) || (param->dir_ == 2 && (remote_port != session.getRemotePort2() || remote_addr != session.getRemoteAddr2()))) { @@ -130,30 +137,30 @@ void* listener(void* p) { cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting because: " << e.what(); } - cLog.msg(Log::PRIO_ERR) << "listener(" << param->call_id_ << "/" << param->dir_ << ") exiting normally"; - + param->running_ = false; + gCallIdQueue.push(param->call_id_); pthread_exit(NULL); } class ListenerData { public: - ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : params1_(lp1), params2_(lp2) + ListenerData(ListenerThreadParam lp1, ListenerThreadParam lp2) : param1_(lp1), param2_(lp2) {}; UDPSocket* sock1_; UDPSocket* sock2_; - pthread_t threads1_; - pthread_t threads2_; - ListenerThreadParam params1_; - ListenerThreadParam params2_; + pthread_t thread1_; + pthread_t thread2_; + ListenerThreadParam param1_; + ListenerThreadParam param2_; }; void* listenerManager(void* dont_use_me) { try { - std::map listenerMap; + std::map listenerMap; while(1) { std::string call_id = gCallIdQueue.front(); // waits for semaphor and returns next call_id @@ -163,27 +170,35 @@ void* listenerManager(void* dont_use_me) if(!session.isComplete()) continue; - std::map::iterator it; + std::map::iterator it; it = listenerMap.find(call_id); if(it == listenerMap.end()) // listener Threads not existing yet { - cLog.msg(Log::PRIO_ERR) << "listenerManager: open UDP Socket: " - << session.getLocalAddr() << ":" << session.getLocalPort1() << " " - << session.getLocalAddr() << ":" << session.getLocalPort2(); - UDPSocket* sock1 = new UDPSocket(session.getLocalAddr(), session.getLocalPort1()); UDPSocket* sock2 = new UDPSocket(session.getLocalAddr(), session.getLocalPort2()); - ListenerData ld(ListenerThreadParam(*sock1, *sock2, call_id, 1), - ListenerThreadParam(*sock1, *sock2, call_id, 2)); - ld.sock1_ = sock1; - ld.sock2_ = sock2; - pthread_create(&(ld.threads1_), NULL, listener, &(ld.params1_)); - pthread_create(&(ld.threads2_), NULL, listener, &(ld.params2_)); + ListenerData* ld = new ListenerData(ListenerThreadParam(*sock1, *sock2, call_id, 1), + ListenerThreadParam(*sock1, *sock2, call_id, 2)); + ld->sock1_ = sock1; + ld->sock2_ = sock2; + pthread_create(&(ld->thread1_), NULL, listener, &(ld->param1_)); + pthread_create(&(ld->thread2_), NULL, listener, &(ld->param2_)); - std::pair::iterator, bool> ret; - ret = listenerMap.insert(std::map::value_type(call_id, ld)); - it = ret.first; + std::pair::iterator, bool> ret; + ret = listenerMap.insert(std::map::value_type(call_id, ld)); + continue; + } + + if(!it->second->param1_.running_ && !it->second->param2_.running_) + { + cLog.msg(Log::PRIO_NOTICE) << "listenerManager both threads for '" << call_id << "' exited, cleaning up"; + pthread_join(it->second->thread1_, NULL); + pthread_join(it->second->thread2_, NULL); + delete it->second->sock1_; + delete it->second->sock2_; + delete it->second; + listenerMap.erase(it); + gRtpSessionTable.delSession(call_id); continue; } // TODO: reinit if session is changed or cleanup if it is daed -- 2.11.4.GIT