1 // Copyright (c) 2009-2010 Satoshi Nakamoto
2 // Distributed under the MIT/X11 software license, see the accompanying
3 // file license.txt or http://www.opensource.org/licenses/mit-license.php.
7 void ThreadMessageHandler2(void* parg
);
8 void ThreadSocketHandler2(void* parg
);
9 void ThreadOpenConnections2(void* parg
);
10 bool OpenNetworkConnection(const CAddress
& addrConnect
);
17 // Global state variables
20 uint64 nLocalServices
= (fClient
? 0 : NODE_NETWORK
);
21 CAddress
addrLocalHost(0, DEFAULT_PORT
, nLocalServices
);
22 CNode
* pnodeLocalHost
= NULL
;
23 uint64 nLocalHostNonce
= 0;
24 array
<int, 10> vnThreadsRunning
;
25 SOCKET hListenSocket
= INVALID_SOCKET
;
26 int64 nThreadSocketHandlerHeartbeat
= INT64_MAX
;
28 vector
<CNode
*> vNodes
;
29 CCriticalSection cs_vNodes
;
30 map
<vector
<unsigned char>, CAddress
> mapAddresses
;
31 CCriticalSection cs_mapAddresses
;
32 map
<CInv
, CDataStream
> mapRelay
;
33 deque
<pair
<int64
, CInv
> > vRelayExpiration
;
34 CCriticalSection cs_mapRelay
;
35 map
<CInv
, int64
> mapAlreadyAskedFor
;
38 int fUseProxy
= false;
39 CAddress
addrProxy("127.0.0.1:9050");
45 void CNode::PushGetBlocks(CBlockIndex
* pindexBegin
, uint256 hashEnd
)
47 // Filter out duplicate requests
48 if (pindexBegin
== pindexLastGetBlocksBegin
&& hashEnd
== hashLastGetBlocksEnd
)
50 pindexLastGetBlocksBegin
= pindexBegin
;
51 hashLastGetBlocksEnd
= hashEnd
;
53 PushMessage("getblocks", CBlockLocator(pindexBegin
), hashEnd
);
60 bool ConnectSocket(const CAddress
& addrConnect
, SOCKET
& hSocketRet
)
62 hSocketRet
= INVALID_SOCKET
;
64 SOCKET hSocket
= socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
65 if (hSocket
== INVALID_SOCKET
)
67 #if defined(__BSD__) || defined(__WXOSX__)
69 setsockopt(hSocket
, SOL_SOCKET
, SO_NOSIGPIPE
, (void*)&set
, sizeof(int));
72 bool fRoutable
= !(addrConnect
.GetByte(3) == 10 || (addrConnect
.GetByte(3) == 192 && addrConnect
.GetByte(2) == 168));
73 bool fProxy
= (fUseProxy
&& fRoutable
);
74 struct sockaddr_in sockaddr
= (fProxy
? addrProxy
.GetSockAddr() : addrConnect
.GetSockAddr());
76 if (connect(hSocket
, (struct sockaddr
*)&sockaddr
, sizeof(sockaddr
)) == SOCKET_ERROR
)
84 printf("proxy connecting %s\n", addrConnect
.ToStringLog().c_str());
85 char pszSocks4IP
[] = "\4\1\0\0\0\0\0\0user";
86 memcpy(pszSocks4IP
+ 2, &addrConnect
.port
, 2);
87 memcpy(pszSocks4IP
+ 4, &addrConnect
.ip
, 4);
88 char* pszSocks4
= pszSocks4IP
;
89 int nSize
= sizeof(pszSocks4IP
);
91 int ret
= send(hSocket
, pszSocks4
, nSize
, MSG_NOSIGNAL
);
95 return error("Error sending to proxy");
98 if (recv(hSocket
, pchRet
, 8, 0) != 8)
100 closesocket(hSocket
);
101 return error("Error reading proxy response");
103 if (pchRet
[1] != 0x5a)
105 closesocket(hSocket
);
106 if (pchRet
[1] != 0x5b)
107 printf("ERROR: Proxy returned error %d\n", pchRet
[1]);
110 printf("proxy connected %s\n", addrConnect
.ToStringLog().c_str());
113 hSocketRet
= hSocket
;
119 bool GetMyExternalIP2(const CAddress
& addrConnect
, const char* pszGet
, const char* pszKeyword
, unsigned int& ipRet
)
122 if (!ConnectSocket(addrConnect
, hSocket
))
123 return error("GetMyExternalIP() : connection to %s failed", addrConnect
.ToString().c_str());
125 send(hSocket
, pszGet
, strlen(pszGet
), MSG_NOSIGNAL
);
128 while (RecvLine(hSocket
, strLine
))
134 if (!RecvLine(hSocket
, strLine
))
136 closesocket(hSocket
);
139 if (strLine
.find(pszKeyword
) != -1)
141 strLine
= strLine
.substr(strLine
.find(pszKeyword
) + strlen(pszKeyword
));
145 closesocket(hSocket
);
146 if (strLine
.find("<"))
147 strLine
= strLine
.substr(0, strLine
.find("<"));
148 strLine
= strLine
.substr(strspn(strLine
.c_str(), " \t\n\r"));
149 while (strLine
.size() > 0 && isspace(strLine
[strLine
.size()-1]))
150 strLine
.resize(strLine
.size()-1);
151 CAddress
addr(strLine
.c_str());
152 printf("GetMyExternalIP() received [%s] %s\n", strLine
.c_str(), addr
.ToString().c_str());
153 if (addr
.ip
== 0 || addr
.ip
== INADDR_NONE
|| !addr
.IsRoutable())
159 closesocket(hSocket
);
160 return error("GetMyExternalIP() : connection closed");
164 bool GetMyExternalIP(unsigned int& ipRet
)
166 CAddress addrConnect
;
168 const char* pszKeyword
;
173 for (int nLookup
= 0; nLookup
<= 1; nLookup
++)
174 for (int nHost
= 1; nHost
<= 2; nHost
++)
178 addrConnect
= CAddress("70.86.96.218:80"); // www.ipaddressworld.com
182 struct hostent
* phostent
= gethostbyname("www.ipaddressworld.com");
183 if (phostent
&& phostent
->h_addr_list
&& phostent
->h_addr_list
[0])
184 addrConnect
= CAddress(*(u_long
*)phostent
->h_addr_list
[0], htons(80));
187 pszGet
= "GET /ip.php HTTP/1.1\r\n"
188 "Host: www.ipaddressworld.com\r\n"
189 "User-Agent: Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)\r\n"
190 "Connection: close\r\n"
197 addrConnect
= CAddress("208.78.68.70:80"); // checkip.dyndns.org
201 struct hostent
* phostent
= gethostbyname("checkip.dyndns.org");
202 if (phostent
&& phostent
->h_addr_list
&& phostent
->h_addr_list
[0])
203 addrConnect
= CAddress(*(u_long
*)phostent
->h_addr_list
[0], htons(80));
206 pszGet
= "GET / HTTP/1.1\r\n"
207 "Host: checkip.dyndns.org\r\n"
208 "User-Agent: Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)\r\n"
209 "Connection: close\r\n"
212 pszKeyword
= "Address:";
215 if (GetMyExternalIP2(addrConnect
, pszGet
, pszKeyword
, ipRet
))
226 bool AddAddress(CAddress addr
)
228 if (!addr
.IsRoutable())
230 if (addr
.ip
== addrLocalHost
.ip
)
232 CRITICAL_BLOCK(cs_mapAddresses
)
234 map
<vector
<unsigned char>, CAddress
>::iterator it
= mapAddresses
.find(addr
.GetKey());
235 if (it
== mapAddresses
.end())
238 printf("AddAddress(%s)\n", addr
.ToStringLog().c_str());
239 mapAddresses
.insert(make_pair(addr
.GetKey(), addr
));
240 CAddrDB().WriteAddress(addr
);
245 bool fUpdated
= false;
246 CAddress
& addrFound
= (*it
).second
;
247 if ((addrFound
.nServices
| addr
.nServices
) != addrFound
.nServices
)
249 // Services have been added
250 addrFound
.nServices
|= addr
.nServices
;
253 bool fCurrentlyOnline
= (GetAdjustedTime() - addr
.nTime
< 24 * 60 * 60);
254 int64 nUpdateInterval
= (fCurrentlyOnline
? 60 * 60 : 24 * 60 * 60);
255 if (addrFound
.nTime
< addr
.nTime
- nUpdateInterval
)
257 // Periodically update most recently seen time
258 addrFound
.nTime
= addr
.nTime
;
262 CAddrDB().WriteAddress(addrFound
);
268 void AddressCurrentlyConnected(const CAddress
& addr
)
270 CRITICAL_BLOCK(cs_mapAddresses
)
272 // Only if it's been published already
273 map
<vector
<unsigned char>, CAddress
>::iterator it
= mapAddresses
.find(addr
.GetKey());
274 if (it
!= mapAddresses
.end())
276 CAddress
& addrFound
= (*it
).second
;
277 int64 nUpdateInterval
= 20 * 60;
278 if (addrFound
.nTime
< GetAdjustedTime() - nUpdateInterval
)
280 // Periodically update most recently seen time
281 addrFound
.nTime
= GetAdjustedTime();
283 addrdb
.WriteAddress(addrFound
);
293 void AbandonRequests(void (*fn
)(void*, CDataStream
&), void* param1
)
295 // If the dialog might get closed before the reply comes back,
296 // call this in the destructor so it doesn't get called after it's deleted.
297 CRITICAL_BLOCK(cs_vNodes
)
299 foreach(CNode
* pnode
, vNodes
)
301 CRITICAL_BLOCK(pnode
->cs_mapRequests
)
303 for (map
<uint256
, CRequestTracker
>::iterator mi
= pnode
->mapRequests
.begin(); mi
!= pnode
->mapRequests
.end();)
305 CRequestTracker
& tracker
= (*mi
).second
;
306 if (tracker
.fn
== fn
&& tracker
.param1
== param1
)
307 pnode
->mapRequests
.erase(mi
++);
323 // Subscription methods for the broadcast and subscription system.
324 // Channel numbers are message numbers, i.e. MSG_TABLE and MSG_PRODUCT.
326 // The subscription system uses a meet-in-the-middle strategy.
327 // With 100,000 nodes, if senders broadcast to 1000 random nodes and receivers
328 // subscribe to 1000 random nodes, 99.995% (1 - 0.99^1000) of messages will get through.
331 bool AnySubscribed(unsigned int nChannel
)
333 if (pnodeLocalHost
->IsSubscribed(nChannel
))
335 CRITICAL_BLOCK(cs_vNodes
)
336 foreach(CNode
* pnode
, vNodes
)
337 if (pnode
->IsSubscribed(nChannel
))
342 bool CNode::IsSubscribed(unsigned int nChannel
)
344 if (nChannel
>= vfSubscribe
.size())
346 return vfSubscribe
[nChannel
];
349 void CNode::Subscribe(unsigned int nChannel
, unsigned int nHops
)
351 if (nChannel
>= vfSubscribe
.size())
354 if (!AnySubscribed(nChannel
))
357 CRITICAL_BLOCK(cs_vNodes
)
358 foreach(CNode
* pnode
, vNodes
)
360 pnode
->PushMessage("subscribe", nChannel
, nHops
);
363 vfSubscribe
[nChannel
] = true;
366 void CNode::CancelSubscribe(unsigned int nChannel
)
368 if (nChannel
>= vfSubscribe
.size())
371 // Prevent from relaying cancel if wasn't subscribed
372 if (!vfSubscribe
[nChannel
])
374 vfSubscribe
[nChannel
] = false;
376 if (!AnySubscribed(nChannel
))
378 // Relay subscription cancel
379 CRITICAL_BLOCK(cs_vNodes
)
380 foreach(CNode
* pnode
, vNodes
)
382 pnode
->PushMessage("sub-cancel", nChannel
);
394 CNode
* FindNode(unsigned int ip
)
396 CRITICAL_BLOCK(cs_vNodes
)
398 foreach(CNode
* pnode
, vNodes
)
399 if (pnode
->addr
.ip
== ip
)
405 CNode
* FindNode(CAddress addr
)
407 CRITICAL_BLOCK(cs_vNodes
)
409 foreach(CNode
* pnode
, vNodes
)
410 if (pnode
->addr
== addr
)
416 CNode
* ConnectNode(CAddress addrConnect
, int64 nTimeout
)
418 if (addrConnect
.ip
== addrLocalHost
.ip
)
421 // Look for an existing connection
422 CNode
* pnode
= FindNode(addrConnect
.ip
);
426 pnode
->AddRef(nTimeout
);
433 printf("trying connection %s lastseen=%.1fhrs lasttry=%.1fhrs\n",
434 addrConnect
.ToStringLog().c_str(),
435 (double)(addrConnect
.nTime
- GetAdjustedTime())/3600.0,
436 (double)(addrConnect
.nLastTry
- GetAdjustedTime())/3600.0);
438 CRITICAL_BLOCK(cs_mapAddresses
)
439 mapAddresses
[addrConnect
.GetKey()].nLastTry
= GetAdjustedTime();
443 if (ConnectSocket(addrConnect
, hSocket
))
446 printf("connected %s\n", addrConnect
.ToStringLog().c_str());
448 // Set to nonblocking
451 if (ioctlsocket(hSocket
, FIONBIO
, &nOne
) == SOCKET_ERROR
)
452 printf("ConnectSocket() : ioctlsocket nonblocking setting failed, error %d\n", WSAGetLastError());
454 if (fcntl(hSocket
, F_SETFL
, O_NONBLOCK
) == SOCKET_ERROR
)
455 printf("ConnectSocket() : fcntl nonblocking setting failed, error %d\n", errno
);
459 CNode
* pnode
= new CNode(hSocket
, addrConnect
, false);
461 pnode
->AddRef(nTimeout
);
464 CRITICAL_BLOCK(cs_vNodes
)
465 vNodes
.push_back(pnode
);
467 pnode
->nTimeConnected
= GetTime();
476 void CNode::CloseSocketDisconnect()
479 if (hSocket
!= INVALID_SOCKET
)
482 printf("%s ", DateTimeStrFormat("%x %H:%M:%S", GetTime()).c_str());
483 printf("disconnecting node %s\n", addr
.ToStringLog().c_str());
484 closesocket(hSocket
);
485 hSocket
= INVALID_SOCKET
;
489 void CNode::Cleanup()
491 // All of a nodes broadcasts and subscriptions are automatically torn down
492 // when it goes down, so a node has to stay up to keep its broadcast going.
494 // Cancel subscriptions
495 for (unsigned int nChannel
= 0; nChannel
< vfSubscribe
.size(); nChannel
++)
496 if (vfSubscribe
[nChannel
])
497 CancelSubscribe(nChannel
);
512 void ThreadSocketHandler(void* parg
)
514 IMPLEMENT_RANDOMIZE_STACK(ThreadSocketHandler(parg
));
517 vnThreadsRunning
[0]++;
518 ThreadSocketHandler2(parg
);
519 vnThreadsRunning
[0]--;
521 catch (std::exception
& e
) {
522 vnThreadsRunning
[0]--;
523 PrintException(&e
, "ThreadSocketHandler()");
525 vnThreadsRunning
[0]--;
526 throw; // support pthread_cancel()
528 printf("ThreadSocketHandler exiting\n");
531 void ThreadSocketHandler2(void* parg
)
533 printf("ThreadSocketHandler started\n");
534 list
<CNode
*> vNodesDisconnected
;
535 int nPrevNodeCount
= 0;
542 CRITICAL_BLOCK(cs_vNodes
)
544 // Disconnect unused nodes
545 vector
<CNode
*> vNodesCopy
= vNodes
;
546 foreach(CNode
* pnode
, vNodesCopy
)
548 if (pnode
->fDisconnect
||
549 (pnode
->GetRefCount() <= 0 && pnode
->vRecv
.empty() && pnode
->vSend
.empty()))
551 // remove from vNodes
552 vNodes
.erase(remove(vNodes
.begin(), vNodes
.end(), pnode
), vNodes
.end());
554 // close socket and cleanup
555 pnode
->CloseSocketDisconnect();
558 // hold in disconnected pool until all refs are released
559 pnode
->nReleaseTime
= max(pnode
->nReleaseTime
, GetTime() + 15 * 60);
560 if (pnode
->fNetworkNode
|| pnode
->fInbound
)
562 vNodesDisconnected
.push_back(pnode
);
566 // Delete disconnected nodes
567 list
<CNode
*> vNodesDisconnectedCopy
= vNodesDisconnected
;
568 foreach(CNode
* pnode
, vNodesDisconnectedCopy
)
570 // wait until threads are done using it
571 if (pnode
->GetRefCount() <= 0)
573 bool fDelete
= false;
574 TRY_CRITICAL_BLOCK(pnode
->cs_vSend
)
575 TRY_CRITICAL_BLOCK(pnode
->cs_vRecv
)
576 TRY_CRITICAL_BLOCK(pnode
->cs_mapRequests
)
577 TRY_CRITICAL_BLOCK(pnode
->cs_inventory
)
581 vNodesDisconnected
.remove(pnode
);
587 if (vNodes
.size() != nPrevNodeCount
)
589 nPrevNodeCount
= vNodes
.size();
595 // Find which sockets have data to receive
597 struct timeval timeout
;
599 timeout
.tv_usec
= 50000; // frequency to poll pnode->vSend
606 FD_ZERO(&fdsetError
);
607 SOCKET hSocketMax
= 0;
608 FD_SET(hListenSocket
, &fdsetRecv
);
609 hSocketMax
= max(hSocketMax
, hListenSocket
);
610 CRITICAL_BLOCK(cs_vNodes
)
612 foreach(CNode
* pnode
, vNodes
)
614 if (pnode
->hSocket
== INVALID_SOCKET
|| pnode
->hSocket
< 0)
616 FD_SET(pnode
->hSocket
, &fdsetRecv
);
617 FD_SET(pnode
->hSocket
, &fdsetError
);
618 hSocketMax
= max(hSocketMax
, pnode
->hSocket
);
619 TRY_CRITICAL_BLOCK(pnode
->cs_vSend
)
620 if (!pnode
->vSend
.empty())
621 FD_SET(pnode
->hSocket
, &fdsetSend
);
625 vnThreadsRunning
[0]--;
626 int nSelect
= select(hSocketMax
+ 1, &fdsetRecv
, &fdsetSend
, &fdsetError
, &timeout
);
627 vnThreadsRunning
[0]++;
630 if (nSelect
== SOCKET_ERROR
)
632 int nErr
= WSAGetLastError();
633 printf("socket select error %d\n", nErr
);
634 for (int i
= 0; i
<= hSocketMax
; i
++)
635 FD_SET(i
, &fdsetRecv
);
637 FD_ZERO(&fdsetError
);
638 Sleep(timeout
.tv_usec
/1000);
643 // Accept new connections
645 if (FD_ISSET(hListenSocket
, &fdsetRecv
))
647 struct sockaddr_in sockaddr
;
648 socklen_t len
= sizeof(sockaddr
);
649 SOCKET hSocket
= accept(hListenSocket
, (struct sockaddr
*)&sockaddr
, &len
);
650 CAddress
addr(sockaddr
);
651 if (hSocket
== INVALID_SOCKET
)
653 if (WSAGetLastError() != WSAEWOULDBLOCK
)
654 printf("socket error accept failed: %d\n", WSAGetLastError());
658 printf("accepted connection %s\n", addr
.ToStringLog().c_str());
659 CNode
* pnode
= new CNode(hSocket
, addr
, true);
661 CRITICAL_BLOCK(cs_vNodes
)
662 vNodes
.push_back(pnode
);
668 // Service each socket
670 vector
<CNode
*> vNodesCopy
;
671 CRITICAL_BLOCK(cs_vNodes
)
674 foreach(CNode
* pnode
, vNodesCopy
)
677 foreach(CNode
* pnode
, vNodesCopy
)
685 if (pnode
->hSocket
== INVALID_SOCKET
)
687 if (FD_ISSET(pnode
->hSocket
, &fdsetRecv
) || FD_ISSET(pnode
->hSocket
, &fdsetError
))
689 TRY_CRITICAL_BLOCK(pnode
->cs_vRecv
)
691 CDataStream
& vRecv
= pnode
->vRecv
;
692 unsigned int nPos
= vRecv
.size();
694 // typical socket buffer is 8K-64K
695 char pchBuf
[0x10000];
696 int nBytes
= recv(pnode
->hSocket
, pchBuf
, sizeof(pchBuf
), MSG_DONTWAIT
);
699 vRecv
.resize(nPos
+ nBytes
);
700 memcpy(&vRecv
[nPos
], pchBuf
, nBytes
);
701 pnode
->nLastRecv
= GetTime();
703 else if (nBytes
== 0)
705 // socket closed gracefully
706 if (!pnode
->fDisconnect
)
707 printf("socket closed\n");
708 pnode
->CloseSocketDisconnect();
713 int nErr
= WSAGetLastError();
714 if (nErr
!= WSAEWOULDBLOCK
&& nErr
!= WSAEMSGSIZE
&& nErr
!= WSAEINTR
&& nErr
!= WSAEINPROGRESS
)
716 if (!pnode
->fDisconnect
)
717 printf("socket recv error %d\n", nErr
);
718 pnode
->CloseSocketDisconnect();
727 if (pnode
->hSocket
== INVALID_SOCKET
)
729 if (FD_ISSET(pnode
->hSocket
, &fdsetSend
))
731 TRY_CRITICAL_BLOCK(pnode
->cs_vSend
)
733 CDataStream
& vSend
= pnode
->vSend
;
736 int nBytes
= send(pnode
->hSocket
, &vSend
[0], vSend
.size(), MSG_NOSIGNAL
| MSG_DONTWAIT
);
739 vSend
.erase(vSend
.begin(), vSend
.begin() + nBytes
);
740 pnode
->nLastSend
= GetTime();
745 int nErr
= WSAGetLastError();
746 if (nErr
!= WSAEWOULDBLOCK
&& nErr
!= WSAEMSGSIZE
&& nErr
!= WSAEINTR
&& nErr
!= WSAEINPROGRESS
)
748 printf("socket send error %d\n", nErr
);
749 pnode
->CloseSocketDisconnect();
757 // Inactivity checking
759 if (pnode
->vSend
.empty())
760 pnode
->nLastSendEmpty
= GetTime();
761 if (GetTime() - pnode
->nTimeConnected
> 60)
763 if (pnode
->nLastRecv
== 0 || pnode
->nLastSend
== 0)
765 printf("socket no message in first 60 seconds, %d %d\n", pnode
->nLastRecv
!= 0, pnode
->nLastSend
!= 0);
766 pnode
->fDisconnect
= true;
768 else if (GetTime() - pnode
->nLastSend
> 90*60 && GetTime() - pnode
->nLastSendEmpty
> 90*60)
770 printf("socket not sending\n");
771 pnode
->fDisconnect
= true;
773 else if (GetTime() - pnode
->nLastRecv
> 90*60)
775 printf("socket inactivity timeout\n");
776 pnode
->fDisconnect
= true;
780 CRITICAL_BLOCK(cs_vNodes
)
782 foreach(CNode
* pnode
, vNodesCopy
)
786 nThreadSocketHandlerHeartbeat
= GetTime();
803 unsigned int pnSeed
[] =
805 0x35218252, 0x9c9c9618, 0xda6bacad, 0xb9aca862, 0x97c235c6,
806 0x146f9562, 0xb67b9e4b, 0x87cf4bc0, 0xb83945d0, 0x984333ad,
807 0xbbeec555, 0x6f0eb440, 0xe0005318, 0x7797e460, 0xddc60fcc,
808 0xb3bbd24a, 0x1ac85746, 0x641846a6, 0x85ee1155, 0xbb2e7a4c,
809 0x9cb8514b, 0xfc342648, 0x62958fae, 0xd0a8c87a, 0xa800795b,
810 0xda8c814e, 0x256a0c80, 0x3f23ec63, 0xd565df43, 0x997d9044,
811 0xaa121448, 0xbed8688e, 0x59d09a5e, 0xb2931243, 0x3730ba18,
812 0xdd3462d0, 0x4e4d1448, 0x171df645, 0x84ee1155,
813 0x248ac445, 0x0e634444, 0x0ded1b63, 0x30c01e60,
814 0xa2b9a094, 0x29e4fd43, 0x9ce61b4c, 0xdae09744,
819 void ThreadOpenConnections(void* parg
)
821 IMPLEMENT_RANDOMIZE_STACK(ThreadOpenConnections(parg
));
824 vnThreadsRunning
[1]++;
825 ThreadOpenConnections2(parg
);
826 vnThreadsRunning
[1]--;
828 catch (std::exception
& e
) {
829 vnThreadsRunning
[1]--;
830 PrintException(&e
, "ThreadOpenConnections()");
832 vnThreadsRunning
[1]--;
833 PrintException(NULL
, "ThreadOpenConnections()");
835 printf("ThreadOpenConnections exiting\n");
838 void ThreadOpenConnections2(void* parg
)
840 printf("ThreadOpenConnections started\n");
842 // Connect to specific addresses
843 if (mapArgs
.count("-connect"))
845 for (int64 nLoop
= 0;; nLoop
++)
847 foreach(string strAddr
, mapMultiArgs
["-connect"])
849 CAddress
addr(strAddr
, NODE_NETWORK
);
851 OpenNetworkConnection(addr
);
852 for (int i
= 0; i
< 10 && i
< nLoop
; i
++)
862 // Connect to manually added nodes first
863 if (mapArgs
.count("-addnode"))
865 foreach(string strAddr
, mapMultiArgs
["-addnode"])
867 CAddress
addr(strAddr
, NODE_NETWORK
);
870 OpenNetworkConnection(addr
);
878 // Initiate network connections
879 int64 nStart
= GetTime();
883 vnThreadsRunning
[1]--;
885 const int nMaxConnections
= 8;
886 while (vNodes
.size() >= nMaxConnections
)
892 vnThreadsRunning
[1]++;
896 CRITICAL_BLOCK(cs_mapAddresses
)
898 // Add seed nodes if IRC isn't working
899 static bool fSeedUsed
;
900 bool fTOR
= (fUseProxy
&& addrProxy
.port
== htons(9050));
901 if (mapAddresses
.empty() && (GetTime() - nStart
> 60 || fTOR
))
903 for (int i
= 0; i
< ARRAYLEN(pnSeed
); i
++)
905 // It'll only connect to one or two seed nodes because once it connects,
906 // it'll get a pile of addresses with newer timestamps.
915 if (fSeedUsed
&& mapAddresses
.size() > ARRAYLEN(pnSeed
) + 100)
917 // Disconnect seed nodes
918 set
<unsigned int> setSeed(pnSeed
, pnSeed
+ ARRAYLEN(pnSeed
));
919 static int64 nSeedDisconnected
;
920 if (nSeedDisconnected
== 0)
922 nSeedDisconnected
= GetTime();
923 CRITICAL_BLOCK(cs_vNodes
)
924 foreach(CNode
* pnode
, vNodes
)
925 if (setSeed
.count(pnode
->addr
.ip
))
926 pnode
->fDisconnect
= true;
929 // Keep setting timestamps to 0 so they won't reconnect
930 if (GetTime() - nSeedDisconnected
< 60 * 60)
932 foreach(PAIRTYPE(const vector
<unsigned char>, CAddress
)& item
, mapAddresses
)
934 if (setSeed
.count(item
.second
.ip
))
936 item
.second
.nTime
= 0;
937 CAddrDB().WriteAddress(item
.second
);
946 // Choose an address to connect to based on most recently seen
948 CAddress addrConnect
;
949 int64 nBest
= INT64_MIN
;
951 // Do this here so we don't have to critsect vNodes inside mapAddresses critsect
952 set
<unsigned int> setConnected
;
953 CRITICAL_BLOCK(cs_vNodes
)
954 foreach(CNode
* pnode
, vNodes
)
955 setConnected
.insert(pnode
->addr
.ip
);
957 CRITICAL_BLOCK(cs_mapAddresses
)
959 foreach(const PAIRTYPE(vector
<unsigned char>, CAddress
)& item
, mapAddresses
)
961 const CAddress
& addr
= item
.second
;
962 if (!addr
.IsIPv4() || !addr
.IsValid() || setConnected
.count(addr
.ip
))
964 int64 nSinceLastSeen
= GetAdjustedTime() - addr
.nTime
;
965 int64 nSinceLastTry
= GetAdjustedTime() - addr
.nLastTry
;
967 // Randomize the order in a deterministic way, putting the standard port first
968 int64 nRandomizer
= (uint64
)(nStart
* 4951 + addr
.nLastTry
* 9567851 + addr
.ip
* 7789) % (2 * 60 * 60);
969 if (addr
.port
!= DEFAULT_PORT
)
970 nRandomizer
+= 2 * 60 * 60;
972 // Last seen Base retry frequency
982 int64 nDelay
= (int64
)(3600.0 * sqrt(fabs((double)nSinceLastSeen
) / 3600.0) + nRandomizer
);
984 // Fast reconnect for one hour after last seen
985 if (nSinceLastSeen
< 60 * 60)
988 // Limit retry frequency
989 if (nSinceLastTry
< nDelay
)
992 // If we have IRC, we'll be notified when they first come online,
993 // and again every 24 hours by the refresh broadcast.
994 if (nGotIRCAddresses
> 0 && vNodes
.size() >= 2 && nSinceLastSeen
> 24 * 60 * 60)
997 // Only try the old stuff if we don't have enough connections
998 if (vNodes
.size() >= 8 && nSinceLastSeen
> 24 * 60 * 60)
1001 // If multiple addresses are ready, prioritize by time since
1002 // last seen and time since last tried.
1003 int64 nScore
= min(nSinceLastTry
, (int64
)24 * 60 * 60) - nSinceLastSeen
- nRandomizer
;
1012 if (addrConnect
.IsValid())
1013 OpenNetworkConnection(addrConnect
);
1017 bool OpenNetworkConnection(const CAddress
& addrConnect
)
1020 // Initiate outbound network connection
1024 if (addrConnect
.ip
== addrLocalHost
.ip
|| !addrConnect
.IsIPv4() || FindNode(addrConnect
.ip
))
1027 vnThreadsRunning
[1]--;
1028 CNode
* pnode
= ConnectNode(addrConnect
);
1029 vnThreadsRunning
[1]++;
1034 pnode
->fNetworkNode
= true;
1036 if (addrLocalHost
.IsRoutable() && !fUseProxy
)
1038 // Advertise our address
1039 vector
<CAddress
> vAddr
;
1040 vAddr
.push_back(addrLocalHost
);
1041 pnode
->PushMessage("addr", vAddr
);
1044 // Get as many addresses as we can
1045 pnode
->PushMessage("getaddr");
1046 pnode
->fGetAddr
= true; // don't relay the results of the getaddr
1048 ////// should the one on the receiving end do this too?
1049 // Subscribe our local subscription list
1050 const unsigned int nHops
= 0;
1051 for (unsigned int nChannel
= 0; nChannel
< pnodeLocalHost
->vfSubscribe
.size(); nChannel
++)
1052 if (pnodeLocalHost
->vfSubscribe
[nChannel
])
1053 pnode
->PushMessage("subscribe", nChannel
, nHops
);
1065 void ThreadMessageHandler(void* parg
)
1067 IMPLEMENT_RANDOMIZE_STACK(ThreadMessageHandler(parg
));
1070 vnThreadsRunning
[2]++;
1071 ThreadMessageHandler2(parg
);
1072 vnThreadsRunning
[2]--;
1074 catch (std::exception
& e
) {
1075 vnThreadsRunning
[2]--;
1076 PrintException(&e
, "ThreadMessageHandler()");
1078 vnThreadsRunning
[2]--;
1079 PrintException(NULL
, "ThreadMessageHandler()");
1081 printf("ThreadMessageHandler exiting\n");
1084 void ThreadMessageHandler2(void* parg
)
1086 printf("ThreadMessageHandler started\n");
1087 SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL
);
1090 vector
<CNode
*> vNodesCopy
;
1091 CRITICAL_BLOCK(cs_vNodes
)
1093 vNodesCopy
= vNodes
;
1094 foreach(CNode
* pnode
, vNodesCopy
)
1098 // Poll the connected nodes for messages
1099 CNode
* pnodeTrickle
= NULL
;
1100 if (!vNodesCopy
.empty())
1101 pnodeTrickle
= vNodesCopy
[GetRand(vNodesCopy
.size())];
1102 foreach(CNode
* pnode
, vNodesCopy
)
1105 TRY_CRITICAL_BLOCK(pnode
->cs_vRecv
)
1106 ProcessMessages(pnode
);
1111 TRY_CRITICAL_BLOCK(pnode
->cs_vSend
)
1112 SendMessages(pnode
, pnode
== pnodeTrickle
);
1117 CRITICAL_BLOCK(cs_vNodes
)
1119 foreach(CNode
* pnode
, vNodesCopy
)
1123 // Wait and allow messages to bunch up
1124 vnThreadsRunning
[2]--;
1126 vnThreadsRunning
[2]++;
1140 bool BindListenPort(string
& strError
)
1146 // Initialize Windows Sockets
1148 int ret
= WSAStartup(MAKEWORD(2,2), &wsadata
);
1149 if (ret
!= NO_ERROR
)
1151 strError
= strprintf("Error: TCP/IP socket library failed to start (WSAStartup returned error %d)", ret
);
1152 printf("%s\n", strError
.c_str());
1157 // Create socket for listening for incoming connections
1158 hListenSocket
= socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
);
1159 if (hListenSocket
== INVALID_SOCKET
)
1161 strError
= strprintf("Error: Couldn't open socket for incoming connections (socket returned error %d)", WSAGetLastError());
1162 printf("%s\n", strError
.c_str());
1166 #if defined(__BSD__) || defined(__WXOSX__)
1167 // Different way of disabling SIGPIPE on BSD
1168 setsockopt(hListenSocket
, SOL_SOCKET
, SO_NOSIGPIPE
, (void*)&nOne
, sizeof(int));
1172 // Allow binding if the port is still in TIME_WAIT state after
1173 // the program was closed and restarted. Not an issue on windows.
1174 setsockopt(hListenSocket
, SOL_SOCKET
, SO_REUSEADDR
, (void*)&nOne
, sizeof(int));
1178 // Set to nonblocking, incoming connections will also inherit this
1179 if (ioctlsocket(hListenSocket
, FIONBIO
, (u_long
*)&nOne
) == SOCKET_ERROR
)
1181 if (fcntl(hListenSocket
, F_SETFL
, O_NONBLOCK
) == SOCKET_ERROR
)
1184 strError
= strprintf("Error: Couldn't set properties on socket for incoming connections (error %d)", WSAGetLastError());
1185 printf("%s\n", strError
.c_str());
1189 // The sockaddr_in structure specifies the address family,
1190 // IP address, and port for the socket that is being bound
1191 struct sockaddr_in sockaddr
;
1192 memset(&sockaddr
, 0, sizeof(sockaddr
));
1193 sockaddr
.sin_family
= AF_INET
;
1194 sockaddr
.sin_addr
.s_addr
= INADDR_ANY
; // bind to all IPs on this computer
1195 sockaddr
.sin_port
= DEFAULT_PORT
;
1196 if (::bind(hListenSocket
, (struct sockaddr
*)&sockaddr
, sizeof(sockaddr
)) == SOCKET_ERROR
)
1198 int nErr
= WSAGetLastError();
1199 if (nErr
== WSAEADDRINUSE
)
1200 strError
= strprintf("Unable to bind to port %d on this computer. Bitcoin is probably already running.", ntohs(sockaddr
.sin_port
));
1202 strError
= strprintf("Error: Unable to bind to port %d on this computer (bind returned error %d)", ntohs(sockaddr
.sin_port
), nErr
);
1203 printf("%s\n", strError
.c_str());
1206 printf("Bound to port %d\n", ntohs(sockaddr
.sin_port
));
1208 // Listen for incoming connections
1209 if (listen(hListenSocket
, SOMAXCONN
) == SOCKET_ERROR
)
1211 strError
= strprintf("Error: Listening for incoming connections failed (listen returned error %d)", WSAGetLastError());
1212 printf("%s\n", strError
.c_str());
1219 void StartNode(void* parg
)
1221 if (pnodeLocalHost
== NULL
)
1222 pnodeLocalHost
= new CNode(INVALID_SOCKET
, CAddress("127.0.0.1", nLocalServices
));
1225 // Get local host ip
1226 char pszHostName
[1000] = "";
1227 if (gethostname(pszHostName
, sizeof(pszHostName
)) != SOCKET_ERROR
)
1229 struct hostent
* phostent
= gethostbyname(pszHostName
);
1232 // Take the first IP that isn't loopback 127.x.x.x
1233 for (int i
= 0; phostent
->h_addr_list
[i
] != NULL
; i
++)
1234 printf("host ip %d: %s\n", i
, CAddress(*(unsigned int*)phostent
->h_addr_list
[i
]).ToStringIP().c_str());
1235 for (int i
= 0; phostent
->h_addr_list
[i
] != NULL
; i
++)
1237 CAddress
addr(*(unsigned int*)phostent
->h_addr_list
[i
], DEFAULT_PORT
, nLocalServices
);
1238 if (addr
.IsValid() && addr
.GetByte(3) != 127)
1240 addrLocalHost
= addr
;
1247 // Get local host ip
1248 struct ifaddrs
* myaddrs
;
1249 if (getifaddrs(&myaddrs
) == 0)
1251 for (struct ifaddrs
* ifa
= myaddrs
; ifa
!= NULL
; ifa
= ifa
->ifa_next
)
1253 if (ifa
->ifa_addr
== NULL
) continue;
1254 if ((ifa
->ifa_flags
& IFF_UP
) == 0) continue;
1255 if (strcmp(ifa
->ifa_name
, "lo") == 0) continue;
1256 if (strcmp(ifa
->ifa_name
, "lo0") == 0) continue;
1258 if (ifa
->ifa_addr
->sa_family
== AF_INET
)
1260 struct sockaddr_in
* s4
= (struct sockaddr_in
*)(ifa
->ifa_addr
);
1261 if (inet_ntop(ifa
->ifa_addr
->sa_family
, (void*)&(s4
->sin_addr
), pszIP
, sizeof(pszIP
)) != NULL
)
1262 printf("ipv4 %s: %s\n", ifa
->ifa_name
, pszIP
);
1264 // Take the first IP that isn't loopback 127.x.x.x
1265 CAddress
addr(*(unsigned int*)&s4
->sin_addr
, DEFAULT_PORT
, nLocalServices
);
1266 if (addr
.IsValid() && addr
.GetByte(3) != 127)
1268 addrLocalHost
= addr
;
1272 else if (ifa
->ifa_addr
->sa_family
== AF_INET6
)
1274 struct sockaddr_in6
* s6
= (struct sockaddr_in6
*)(ifa
->ifa_addr
);
1275 if (inet_ntop(ifa
->ifa_addr
->sa_family
, (void*)&(s6
->sin6_addr
), pszIP
, sizeof(pszIP
)) != NULL
)
1276 printf("ipv6 %s: %s\n", ifa
->ifa_name
, pszIP
);
1279 freeifaddrs(myaddrs
);
1282 printf("addrLocalHost = %s\n", addrLocalHost
.ToString().c_str());
1284 // Get our external IP address for incoming connections
1287 // Proxies can't take incoming connections
1288 addrLocalHost
.ip
= CAddress("0.0.0.0").ip
;
1289 printf("addrLocalHost = %s\n", addrLocalHost
.ToString().c_str());
1293 if (addrIncoming
.IsValid())
1294 addrLocalHost
.ip
= addrIncoming
.ip
;
1296 if (GetMyExternalIP(addrLocalHost
.ip
))
1298 addrIncoming
= addrLocalHost
;
1299 CWalletDB().WriteSetting("addrIncoming", addrIncoming
);
1300 printf("addrLocalHost = %s\n", addrLocalHost
.ToString().c_str());
1308 // Get addresses from IRC and advertise ours
1309 if (!CreateThread(ThreadIRCSeed
, NULL
))
1310 printf("Error: CreateThread(ThreadIRCSeed) failed\n");
1312 // Send and receive from sockets, accept connections
1313 pthread_t hThreadSocketHandler
= CreateThread(ThreadSocketHandler
, NULL
, true);
1315 // Initiate outbound connections
1316 if (!CreateThread(ThreadOpenConnections
, NULL
))
1317 printf("Error: CreateThread(ThreadOpenConnections) failed\n");
1320 if (!CreateThread(ThreadMessageHandler
, NULL
))
1321 printf("Error: CreateThread(ThreadMessageHandler) failed\n");
1323 // Generate coins in the background
1324 GenerateBitcoins(fGenerateBitcoins
);
1327 // Thread monitoring
1328 // Not really needed anymore, the cause of the hanging was fixed
1335 if (GetTime() - nThreadSocketHandlerHeartbeat
> 15 * 60)
1337 // First see if closing sockets will free it
1338 printf("*** ThreadSocketHandler is stopped ***\n");
1339 CRITICAL_BLOCK(cs_vNodes
)
1341 foreach(CNode
* pnode
, vNodes
)
1344 TRY_CRITICAL_BLOCK(pnode
->cs_vRecv
)
1345 TRY_CRITICAL_BLOCK(pnode
->cs_vSend
)
1349 printf("*** closing socket\n");
1350 pnode
->CloseSocketDisconnect();
1357 if (GetTime() - nThreadSocketHandlerHeartbeat
< 60)
1360 // Hopefully it never comes to this.
1361 // We know it'll always be hung in the recv or send call.
1362 // cs_vRecv or cs_vSend may be left permanently unreleased,
1363 // but we always only use TRY_CRITICAL_SECTION on them.
1364 printf("*** Restarting ThreadSocketHandler ***\n");
1365 TerminateThread(hThreadSocketHandler
, 0);
1367 CloseHandle(hThreadSocketHandler
);
1369 vnThreadsRunning
[0] = 0;
1372 hThreadSocketHandler
= CreateThread(ThreadSocketHandler
, NULL
, true);
1373 nThreadSocketHandlerHeartbeat
= GetTime();
1380 printf("StopNode()\n");
1382 nTransactionsUpdated
++;
1383 int64 nStart
= GetTime();
1384 while (vnThreadsRunning
[0] > 0 || vnThreadsRunning
[2] > 0 || vnThreadsRunning
[3] > 0 || vnThreadsRunning
[4] > 0)
1386 if (GetTime() - nStart
> 20)
1390 if (vnThreadsRunning
[0] > 0) printf("ThreadSocketHandler still running\n");
1391 if (vnThreadsRunning
[1] > 0) printf("ThreadOpenConnections still running\n");
1392 if (vnThreadsRunning
[2] > 0) printf("ThreadMessageHandler still running\n");
1393 if (vnThreadsRunning
[3] > 0) printf("ThreadBitcoinMiner still running\n");
1394 if (vnThreadsRunning
[4] > 0) printf("ThreadRPCServer still running\n");
1395 while (vnThreadsRunning
[2] > 0 || vnThreadsRunning
[4] > 0)
1411 foreach(CNode
* pnode
, vNodes
)
1412 if (pnode
->hSocket
!= INVALID_SOCKET
)
1413 closesocket(pnode
->hSocket
);
1414 if (hListenSocket
!= INVALID_SOCKET
)
1415 if (closesocket(hListenSocket
) == SOCKET_ERROR
)
1416 printf("closesocket(hListenSocket) failed with error %d\n", WSAGetLastError());
1419 // Shutdown Windows Sockets
1424 instance_of_cnetcleanup
;