3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "httpfetch.h"
21 #include "porting.h" // for sleep_ms(), get_sysinfo(), secure_rand_fill_buf()
28 #include "network/socket.h" // for select()
29 #include "threading/event.h"
31 #include "exceptions.h"
34 #include "util/container.h"
35 #include "util/thread.h"
40 std::mutex g_httpfetch_mutex
;
41 std::map
<unsigned long, std::queue
<HTTPFetchResult
> > g_httpfetch_results
;
42 PcgRandom g_callerid_randomness
;
44 HTTPFetchRequest::HTTPFetchRequest() :
45 timeout(g_settings
->getS32("curl_timeout")),
46 connect_timeout(timeout
),
47 useragent(std::string(PROJECT_NAME_C
"/") + g_version_hash
+ " (" + porting::get_sysinfo() + ")")
52 static void httpfetch_deliver_result(const HTTPFetchResult
&fetch_result
)
54 unsigned long caller
= fetch_result
.caller
;
55 if (caller
!= HTTPFETCH_DISCARD
) {
56 MutexAutoLock
lock(g_httpfetch_mutex
);
57 g_httpfetch_results
[caller
].push(fetch_result
);
61 static void httpfetch_request_clear(unsigned long caller
);
63 unsigned long httpfetch_caller_alloc()
65 MutexAutoLock
lock(g_httpfetch_mutex
);
67 // Check each caller ID except HTTPFETCH_DISCARD
68 const unsigned long discard
= HTTPFETCH_DISCARD
;
69 for (unsigned long caller
= discard
+ 1; caller
!= discard
; ++caller
) {
70 std::map
<unsigned long, std::queue
<HTTPFetchResult
> >::iterator
71 it
= g_httpfetch_results
.find(caller
);
72 if (it
== g_httpfetch_results
.end()) {
73 verbosestream
<< "httpfetch_caller_alloc: allocating "
74 << caller
<< std::endl
;
75 // Access element to create it
76 g_httpfetch_results
[caller
];
81 FATAL_ERROR("httpfetch_caller_alloc: ran out of caller IDs");
85 unsigned long httpfetch_caller_alloc_secure()
87 MutexAutoLock
lock(g_httpfetch_mutex
);
89 // Generate random caller IDs and make sure they're not
90 // already used or equal to HTTPFETCH_DISCARD
91 // Give up after 100 tries to prevent infinite loop
96 caller
= (((u64
) g_callerid_randomness
.next()) << 32) |
97 g_callerid_randomness
.next();
100 FATAL_ERROR("httpfetch_caller_alloc_secure: ran out of caller IDs");
101 return HTTPFETCH_DISCARD
;
103 } while (g_httpfetch_results
.find(caller
) != g_httpfetch_results
.end());
105 verbosestream
<< "httpfetch_caller_alloc_secure: allocating "
106 << caller
<< std::endl
;
108 // Access element to create it
109 g_httpfetch_results
[caller
];
113 void httpfetch_caller_free(unsigned long caller
)
115 verbosestream
<<"httpfetch_caller_free: freeing "
118 httpfetch_request_clear(caller
);
119 if (caller
!= HTTPFETCH_DISCARD
) {
120 MutexAutoLock
lock(g_httpfetch_mutex
);
121 g_httpfetch_results
.erase(caller
);
125 bool httpfetch_async_get(unsigned long caller
, HTTPFetchResult
&fetch_result
)
127 MutexAutoLock
lock(g_httpfetch_mutex
);
129 // Check that caller exists
130 std::map
<unsigned long, std::queue
<HTTPFetchResult
> >::iterator
131 it
= g_httpfetch_results
.find(caller
);
132 if (it
== g_httpfetch_results
.end())
135 // Check that result queue is nonempty
136 std::queue
<HTTPFetchResult
> &caller_results
= it
->second
;
137 if (caller_results
.empty())
141 fetch_result
= caller_results
.front();
142 caller_results
.pop();
147 #include <curl/curl.h>
150 USE_CURL is on: use cURL based httpfetch implementation
153 static size_t httpfetch_writefunction(
154 char *ptr
, size_t size
, size_t nmemb
, void *userdata
)
156 std::ostringstream
*stream
= (std::ostringstream
*)userdata
;
157 size_t count
= size
* nmemb
;
158 stream
->write(ptr
, count
);
162 static size_t httpfetch_discardfunction(
163 char *ptr
, size_t size
, size_t nmemb
, void *userdata
)
170 std::list
<CURL
*> handles
;
173 CurlHandlePool() = default;
177 for (std::list
<CURL
*>::iterator it
= handles
.begin();
178 it
!= handles
.end(); ++it
) {
179 curl_easy_cleanup(*it
);
185 if (handles
.empty()) {
186 curl
= curl_easy_init();
188 errorstream
<<"curl_easy_init returned NULL"<<std::endl
;
192 curl
= handles
.front();
197 void free(CURL
*handle
)
200 handles
.push_back(handle
);
204 class HTTPFetchOngoing
207 HTTPFetchOngoing(const HTTPFetchRequest
&request
, CurlHandlePool
*pool
);
210 CURLcode
start(CURLM
*multi
);
211 const HTTPFetchResult
* complete(CURLcode res
);
213 const HTTPFetchRequest
&getRequest() const { return request
; };
214 const CURL
*getEasyHandle() const { return curl
; };
217 CurlHandlePool
*pool
;
220 HTTPFetchRequest request
;
221 HTTPFetchResult result
;
222 std::ostringstream oss
;
223 struct curl_slist
*http_header
;
228 HTTPFetchOngoing::HTTPFetchOngoing(const HTTPFetchRequest
&request_
,
229 CurlHandlePool
*pool_
):
235 oss(std::ios::binary
),
239 curl
= pool
->alloc();
244 // Set static cURL options
245 curl_easy_setopt(curl
, CURLOPT_NOSIGNAL
, 1);
246 curl_easy_setopt(curl
, CURLOPT_FAILONERROR
, 1);
247 curl_easy_setopt(curl
, CURLOPT_FOLLOWLOCATION
, 1);
248 curl_easy_setopt(curl
, CURLOPT_MAXREDIRS
, 3);
249 curl_easy_setopt(curl
, CURLOPT_ENCODING
, "gzip");
251 std::string bind_address
= g_settings
->get("bind_address");
252 if (!bind_address
.empty()) {
253 curl_easy_setopt(curl
, CURLOPT_INTERFACE
, bind_address
.c_str());
256 if (!g_settings
->getBool("enable_ipv6")) {
257 curl_easy_setopt(curl
, CURLOPT_IPRESOLVE
, CURL_IPRESOLVE_V4
);
260 #if LIBCURL_VERSION_NUM >= 0x071304
261 // Restrict protocols so that curl vulnerabilities in
262 // other protocols don't affect us.
263 // These settings were introduced in curl 7.19.4.
269 curl_easy_setopt(curl
, CURLOPT_PROTOCOLS
, protocols
);
270 curl_easy_setopt(curl
, CURLOPT_REDIR_PROTOCOLS
, protocols
);
273 // Set cURL options based on HTTPFetchRequest
274 curl_easy_setopt(curl
, CURLOPT_URL
,
275 request
.url
.c_str());
276 curl_easy_setopt(curl
, CURLOPT_TIMEOUT_MS
,
278 curl_easy_setopt(curl
, CURLOPT_CONNECTTIMEOUT_MS
,
279 request
.connect_timeout
);
281 if (!request
.useragent
.empty())
282 curl_easy_setopt(curl
, CURLOPT_USERAGENT
, request
.useragent
.c_str());
284 // Set up a write callback that writes to the
285 // ostringstream ongoing->oss, unless the data
286 // is to be discarded
287 if (request
.caller
== HTTPFETCH_DISCARD
) {
288 curl_easy_setopt(curl
, CURLOPT_WRITEFUNCTION
,
289 httpfetch_discardfunction
);
290 curl_easy_setopt(curl
, CURLOPT_WRITEDATA
, NULL
);
292 curl_easy_setopt(curl
, CURLOPT_WRITEFUNCTION
,
293 httpfetch_writefunction
);
294 curl_easy_setopt(curl
, CURLOPT_WRITEDATA
, &oss
);
297 // Set data from fields or raw_data
298 if (request
.multipart
) {
299 curl_httppost
*last
= NULL
;
300 for (StringMap::iterator it
= request
.fields
.begin();
301 it
!= request
.fields
.end(); ++it
) {
302 curl_formadd(&post
, &last
,
303 CURLFORM_NAMELENGTH
, it
->first
.size(),
304 CURLFORM_PTRNAME
, it
->first
.c_str(),
305 CURLFORM_CONTENTSLENGTH
, it
->second
.size(),
306 CURLFORM_PTRCONTENTS
, it
->second
.c_str(),
309 curl_easy_setopt(curl
, CURLOPT_HTTPPOST
, post
);
310 // request.post_fields must now *never* be
311 // modified until CURLOPT_HTTPPOST is cleared
313 switch (request
.method
) {
315 curl_easy_setopt(curl
, CURLOPT_HTTPGET
, 1);
318 curl_easy_setopt(curl
, CURLOPT_POST
, 1);
321 curl_easy_setopt(curl
, CURLOPT_CUSTOMREQUEST
, "PUT");
324 curl_easy_setopt(curl
, CURLOPT_CUSTOMREQUEST
, "DELETE");
327 if (request
.method
!= HTTP_GET
) {
328 if (!request
.raw_data
.empty()) {
329 curl_easy_setopt(curl
, CURLOPT_POSTFIELDSIZE
,
330 request
.raw_data
.size());
331 curl_easy_setopt(curl
, CURLOPT_POSTFIELDS
,
332 request
.raw_data
.c_str());
333 } else if (!request
.fields
.empty()) {
335 for (auto &field
: request
.fields
) {
338 str
+= urlencode(field
.first
);
340 str
+= urlencode(field
.second
);
342 curl_easy_setopt(curl
, CURLOPT_POSTFIELDSIZE
,
344 curl_easy_setopt(curl
, CURLOPT_COPYPOSTFIELDS
,
349 // Set additional HTTP headers
350 for (const std::string
&extra_header
: request
.extra_headers
) {
351 http_header
= curl_slist_append(http_header
, extra_header
.c_str());
353 curl_easy_setopt(curl
, CURLOPT_HTTPHEADER
, http_header
);
355 if (!g_settings
->getBool("curl_verify_cert")) {
356 curl_easy_setopt(curl
, CURLOPT_SSL_VERIFYPEER
, false);
360 CURLcode
HTTPFetchOngoing::start(CURLM
*multi_
)
363 return CURLE_FAILED_INIT
;
366 // Easy interface (sync)
367 return curl_easy_perform(curl
);
370 // Multi interface (async)
371 CURLMcode mres
= curl_multi_add_handle(multi_
, curl
);
372 if (mres
!= CURLM_OK
) {
373 errorstream
<< "curl_multi_add_handle"
374 << " returned error code " << mres
376 return CURLE_FAILED_INIT
;
378 multi
= multi_
; // store for curl_multi_remove_handle
382 const HTTPFetchResult
* HTTPFetchOngoing::complete(CURLcode res
)
384 result
.succeeded
= (res
== CURLE_OK
);
385 result
.timeout
= (res
== CURLE_OPERATION_TIMEDOUT
);
386 result
.data
= oss
.str();
388 // Get HTTP/FTP response code
389 result
.response_code
= 0;
390 if (curl
&& (curl_easy_getinfo(curl
, CURLINFO_RESPONSE_CODE
,
391 &result
.response_code
) != CURLE_OK
)) {
392 // We failed to get a return code, make sure it is still 0
393 result
.response_code
= 0;
396 if (res
!= CURLE_OK
) {
397 errorstream
<< request
.url
<< " not found ("
398 << curl_easy_strerror(res
) << ")"
399 << " (response code " << result
.response_code
<< ")"
406 HTTPFetchOngoing::~HTTPFetchOngoing()
409 CURLMcode mres
= curl_multi_remove_handle(multi
, curl
);
410 if (mres
!= CURLM_OK
) {
411 errorstream
<< "curl_multi_remove_handle"
412 << " returned error code " << mres
417 // Set safe options for the reusable cURL handle
418 curl_easy_setopt(curl
, CURLOPT_WRITEFUNCTION
,
419 httpfetch_discardfunction
);
420 curl_easy_setopt(curl
, CURLOPT_WRITEDATA
, NULL
);
421 curl_easy_setopt(curl
, CURLOPT_POSTFIELDS
, NULL
);
423 curl_easy_setopt(curl
, CURLOPT_HTTPHEADER
, NULL
);
424 curl_slist_free_all(http_header
);
427 curl_easy_setopt(curl
, CURLOPT_HTTPPOST
, NULL
);
431 // Store the cURL handle for reuse
436 class CurlFetchThread
: public Thread
447 HTTPFetchRequest fetch_request
;
452 MutexedQueue
<Request
> m_requests
;
453 size_t m_parallel_limit
;
455 // Variables exclusively used within thread
456 std::vector
<HTTPFetchOngoing
*> m_all_ongoing
;
457 std::list
<HTTPFetchRequest
> m_queued_fetches
;
460 CurlFetchThread(int parallel_limit
) :
463 if (parallel_limit
>= 1)
464 m_parallel_limit
= parallel_limit
;
466 m_parallel_limit
= 1;
469 void requestFetch(const HTTPFetchRequest
&fetch_request
)
473 req
.fetch_request
= fetch_request
;
475 m_requests
.push_back(req
);
478 void requestClear(unsigned long caller
, Event
*event
)
482 req
.fetch_request
.caller
= caller
;
484 m_requests
.push_back(req
);
490 req
.type
= RT_WAKEUP
;
492 m_requests
.push_back(req
);
496 // Handle a request from some other thread
497 // E.g. new fetch; clear fetches for one caller; wake up
498 void processRequest(const Request
&req
)
500 if (req
.type
== RT_FETCH
) {
501 // New fetch, queue until there are less
502 // than m_parallel_limit ongoing fetches
503 m_queued_fetches
.push_back(req
.fetch_request
);
505 // see processQueued() for what happens next
508 else if (req
.type
== RT_CLEAR
) {
509 unsigned long caller
= req
.fetch_request
.caller
;
511 // Abort all ongoing fetches for the caller
512 for (std::vector
<HTTPFetchOngoing
*>::iterator
513 it
= m_all_ongoing
.begin();
514 it
!= m_all_ongoing
.end();) {
515 if ((*it
)->getRequest().caller
== caller
) {
517 it
= m_all_ongoing
.erase(it
);
523 // Also abort all queued fetches for the caller
524 for (std::list
<HTTPFetchRequest
>::iterator
525 it
= m_queued_fetches
.begin();
526 it
!= m_queued_fetches
.end();) {
527 if ((*it
).caller
== caller
)
528 it
= m_queued_fetches
.erase(it
);
533 else if (req
.type
== RT_WAKEUP
) {
534 // Wakeup: Nothing to do, thread is awake at this point
537 if (req
.event
!= NULL
)
541 // Start new ongoing fetches if m_parallel_limit allows
542 void processQueued(CurlHandlePool
*pool
)
544 while (m_all_ongoing
.size() < m_parallel_limit
&&
545 !m_queued_fetches
.empty()) {
546 HTTPFetchRequest request
= m_queued_fetches
.front();
547 m_queued_fetches
.pop_front();
549 // Create ongoing fetch data and make a cURL handle
550 // Set cURL options based on HTTPFetchRequest
551 HTTPFetchOngoing
*ongoing
=
552 new HTTPFetchOngoing(request
, pool
);
554 // Initiate the connection (curl_multi_add_handle)
555 CURLcode res
= ongoing
->start(m_multi
);
556 if (res
== CURLE_OK
) {
557 m_all_ongoing
.push_back(ongoing
);
560 httpfetch_deliver_result(*ongoing
->complete(res
));
566 // Process CURLMsg (indicates completion of a fetch)
567 void processCurlMessage(CURLMsg
*msg
)
569 // Determine which ongoing fetch the message pertains to
572 for (i
= 0; i
< m_all_ongoing
.size(); ++i
) {
573 if (m_all_ongoing
[i
]->getEasyHandle() == msg
->easy_handle
) {
578 if (msg
->msg
== CURLMSG_DONE
&& found
) {
579 // m_all_ongoing[i] succeeded or failed.
580 HTTPFetchOngoing
*ongoing
= m_all_ongoing
[i
];
581 httpfetch_deliver_result(*ongoing
->complete(msg
->data
.result
));
583 m_all_ongoing
.erase(m_all_ongoing
.begin() + i
);
587 // Wait for a request from another thread, or timeout elapses
588 void waitForRequest(long timeout
)
590 if (m_queued_fetches
.empty()) {
592 Request req
= m_requests
.pop_front(timeout
);
595 catch (ItemNotFoundException
&e
) {}
599 // Wait until some IO happens, or timeout elapses
600 void waitForIO(long timeout
)
606 long select_timeout
= -1;
607 struct timeval select_tv
;
610 FD_ZERO(&read_fd_set
);
611 FD_ZERO(&write_fd_set
);
612 FD_ZERO(&exc_fd_set
);
614 mres
= curl_multi_fdset(m_multi
, &read_fd_set
,
615 &write_fd_set
, &exc_fd_set
, &max_fd
);
616 if (mres
!= CURLM_OK
) {
617 errorstream
<<"curl_multi_fdset"
618 <<" returned error code "<<mres
623 mres
= curl_multi_timeout(m_multi
, &select_timeout
);
624 if (mres
!= CURLM_OK
) {
625 errorstream
<<"curl_multi_timeout"
626 <<" returned error code "<<mres
631 // Limit timeout so new requests get through
632 if (select_timeout
< 0 || select_timeout
> timeout
)
633 select_timeout
= timeout
;
635 if (select_timeout
> 0) {
636 // in Winsock it is forbidden to pass three empty
637 // fd_sets to select(), so in that case use sleep_ms
639 select_tv
.tv_sec
= select_timeout
/ 1000;
640 select_tv
.tv_usec
= (select_timeout
% 1000) * 1000;
641 int retval
= select(max_fd
+ 1, &read_fd_set
,
642 &write_fd_set
, &exc_fd_set
,
646 errorstream
<<"select returned error code "
647 <<WSAGetLastError()<<std::endl
;
649 errorstream
<<"select returned error code "
655 sleep_ms(select_timeout
);
664 m_multi
= curl_multi_init();
665 if (m_multi
== NULL
) {
666 errorstream
<<"curl_multi_init returned NULL\n";
670 FATAL_ERROR_IF(!m_all_ongoing
.empty(), "Expected empty");
672 while (!stopRequested()) {
673 BEGIN_DEBUG_EXCEPTION_HANDLER
676 Handle new async requests
679 while (!m_requests
.empty()) {
680 Request req
= m_requests
.pop_frontNoEx();
683 processQueued(&pool
);
686 Handle ongoing async requests
689 int still_ongoing
= 0;
690 while (curl_multi_perform(m_multi
, &still_ongoing
) ==
691 CURLM_CALL_MULTI_PERFORM
)
695 Handle completed async requests
697 if (still_ongoing
< (int) m_all_ongoing
.size()) {
700 msg
= curl_multi_info_read(m_multi
, &msgs_in_queue
);
701 while (msg
!= NULL
) {
702 processCurlMessage(msg
);
703 msg
= curl_multi_info_read(m_multi
, &msgs_in_queue
);
708 If there are ongoing requests, wait for data
709 (with a timeout of 100ms so that new requests
712 If no ongoing requests, wait for a new request.
713 (Possibly an empty request that signals
714 that the thread should be stopped.)
716 if (m_all_ongoing
.empty())
717 waitForRequest(100000000);
721 END_DEBUG_EXCEPTION_HANDLER
724 // Call curl_multi_remove_handle and cleanup easy handles
725 for (HTTPFetchOngoing
*i
: m_all_ongoing
) {
728 m_all_ongoing
.clear();
730 m_queued_fetches
.clear();
732 CURLMcode mres
= curl_multi_cleanup(m_multi
);
733 if (mres
!= CURLM_OK
) {
734 errorstream
<<"curl_multi_cleanup"
735 <<" returned error code "<<mres
743 CurlFetchThread
*g_httpfetch_thread
= NULL
;
745 void httpfetch_init(int parallel_limit
)
747 verbosestream
<<"httpfetch_init: parallel_limit="<<parallel_limit
750 CURLcode res
= curl_global_init(CURL_GLOBAL_DEFAULT
);
751 FATAL_ERROR_IF(res
!= CURLE_OK
, "CURL init failed");
753 g_httpfetch_thread
= new CurlFetchThread(parallel_limit
);
755 // Initialize g_callerid_randomness for httpfetch_caller_alloc_secure
757 porting::secure_rand_fill_buf(randbuf
, sizeof(u64
) * 2);
758 g_callerid_randomness
= PcgRandom(randbuf
[0], randbuf
[1]);
761 void httpfetch_cleanup()
763 verbosestream
<<"httpfetch_cleanup: cleaning up"<<std::endl
;
765 g_httpfetch_thread
->stop();
766 g_httpfetch_thread
->requestWakeUp();
767 g_httpfetch_thread
->wait();
768 delete g_httpfetch_thread
;
770 curl_global_cleanup();
773 void httpfetch_async(const HTTPFetchRequest
&fetch_request
)
775 g_httpfetch_thread
->requestFetch(fetch_request
);
776 if (!g_httpfetch_thread
->isRunning())
777 g_httpfetch_thread
->start();
780 static void httpfetch_request_clear(unsigned long caller
)
782 if (g_httpfetch_thread
->isRunning()) {
784 g_httpfetch_thread
->requestClear(caller
, &event
);
787 g_httpfetch_thread
->requestClear(caller
, NULL
);
791 void httpfetch_sync(const HTTPFetchRequest
&fetch_request
,
792 HTTPFetchResult
&fetch_result
)
794 // Create ongoing fetch data and make a cURL handle
795 // Set cURL options based on HTTPFetchRequest
797 HTTPFetchOngoing
ongoing(fetch_request
, &pool
);
798 // Do the fetch (curl_easy_perform)
799 CURLcode res
= ongoing
.start(NULL
);
800 // Update fetch result
801 fetch_result
= *ongoing
.complete(res
);
809 Dummy httpfetch implementation that always returns an error.
812 void httpfetch_init(int parallel_limit
)
816 void httpfetch_cleanup()
820 void httpfetch_async(const HTTPFetchRequest
&fetch_request
)
822 errorstream
<< "httpfetch_async: unable to fetch " << fetch_request
.url
823 << " because USE_CURL=0" << std::endl
;
825 HTTPFetchResult
fetch_result(fetch_request
); // sets succeeded = false etc.
826 httpfetch_deliver_result(fetch_result
);
829 static void httpfetch_request_clear(unsigned long caller
)
833 void httpfetch_sync(const HTTPFetchRequest
&fetch_request
,
834 HTTPFetchResult
&fetch_result
)
836 errorstream
<< "httpfetch_sync: unable to fetch " << fetch_request
.url
837 << " because USE_CURL=0" << std::endl
;
839 fetch_result
= HTTPFetchResult(fetch_request
); // sets succeeded = false etc.