This fixes a bug in PHP/HH's crypt_blowfish implementation that can cause a short...
[hiphop-php.git] / hphp / util / extern-worker.cpp
blob1db0a77a3efcd39baf69759d27973e1099078638
1 /*
2 +----------------------------------------------------------------------+
3 | HipHop for PHP |
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/util/extern-worker.h"
19 #include "hphp/util/assertions.h"
20 #include "hphp/util/current-executable.h"
21 #include "hphp/util/hash-map.h"
22 #include "hphp/util/logger.h"
23 #include "hphp/util/match.h"
24 #include "hphp/util/struct-log.h"
25 #include "hphp/util/trace.h"
27 #include <folly/FileUtil.h>
28 #include <folly/Subprocess.h>
29 #include <folly/gen/Base.h>
30 #include <folly/memory/UninitializedMemoryHacks.h>
32 #include <boost/filesystem.hpp>
34 #include <filesystem>
35 #include <mutex>
37 namespace fs = std::filesystem;
39 namespace HPHP::extern_worker {
41 //////////////////////////////////////////////////////////////////////
43 using namespace detail;
44 using namespace folly::gen;
46 //////////////////////////////////////////////////////////////////////
48 const char* const s_option = "--extern-worker";
50 std::atomic<uint64_t> RequestId::s_next{0};
51 std::atomic<uint64_t> RequestId::s_active{0};
53 const std::array<OutputType, 1> Client::s_valOutputType{OutputType::Val};
54 const std::array<OutputType, 1> Client::s_vecOutputType{OutputType::Vec};
55 const std::array<OutputType, 1> Client::s_optOutputType{OutputType::Opt};
57 ImplHook g_impl_hook{nullptr};
59 thread_local bool g_in_job{false};
61 //////////////////////////////////////////////////////////////////////
63 namespace {
65 //////////////////////////////////////////////////////////////////////
67 TRACE_SET_MOD(extern_worker);
69 //////////////////////////////////////////////////////////////////////
71 // If passed via the command-line, the worker is running in "local"
72 // mode and will use a different mechanism to read inputs/write
73 // outputs.
74 const char* const g_local_option = "--local";
76 // If running in local mode, the FD of the pipe used to communicate
77 // output back to the parent.
78 constexpr int g_local_pipe_fd = 3;
80 // For the subprocess impl, if the size of the data is <= this
81 // constant, we'll store it "inline" in the ref itself, rather than
82 // writing it to disk. This values probably needs more tuning.
83 constexpr size_t g_inline_size = 64;
85 //////////////////////////////////////////////////////////////////////
87 struct Registry {
88 hphp_fast_string_map<JobBase*> registry;
89 std::mutex lock;
92 Registry& registry() {
93 static Registry registry;
94 return registry;
97 //////////////////////////////////////////////////////////////////////
99 // Represents an open file. Used this over readFile/writeFile if you
100 // want to maintain a persistent FD, or want to just read/write just a
101 // portion of the file. Only reading or appending to the file is
102 // supported.
103 struct FD {
104 FD(const fs::path& path, bool read, bool write, bool create)
105 : m_path{path}, m_offset{0} {
106 assertx(IMPLIES(create, write));
108 auto flags = O_CLOEXEC;
109 if (read) {
110 flags |= (write ? O_RDWR : O_RDONLY);
111 } else {
112 assertx(write);
113 flags |= O_WRONLY;
116 if (write) flags |= O_APPEND;
117 if (create) {
118 flags |= O_CREAT | O_EXCL;
119 // We're creating it, so the file is empty
120 m_offset = 0;
123 auto fd = folly::openNoInt(m_path.c_str(), flags);
124 if (fd < 0) {
125 throw Error{
126 folly::sformat(
127 "Unable to open {} [{}]",
128 m_path.native(), folly::errnoStr(errno)
132 m_fd = fd;
133 SCOPE_FAIL { ::close(m_fd); };
135 // If we're going to write to the file, but not creating it, we
136 // don't know what the end of the file is, so find the current
137 // offset.
138 if (write && !create) syncOffset();
140 ~FD() { if (m_fd >= 0) ::close(m_fd); }
142 FD(const FD&) = delete;
143 FD(FD&& o) noexcept : m_fd{o.m_fd}, m_path{o.m_path}, m_offset{o.m_offset}
144 { o.m_fd = -1; }
145 FD& operator=(const FD&) = delete;
146 FD& operator=(FD&& o) noexcept {
147 std::swap(m_fd, o.m_fd);
148 std::swap(m_path, o.m_path);
149 std::swap(m_offset, o.m_offset);
150 return *this;
153 const fs::path& path() const { return m_path; }
155 std::string read(size_t offset, size_t size) const {
156 assertx(m_fd >= 0);
158 std::string data;
159 folly::resizeWithoutInitialization(data, size);
161 auto const read = folly::preadFull(m_fd, data.data(), size, offset);
162 if (read == size) return data;
163 if (read < 0) {
164 throw Error{
165 folly::sformat(
166 "Failed reading {} bytes from {} at {} [{}]",
167 size, m_path.native(), offset, folly::errnoStr(errno)
171 throw Error{
172 folly::sformat(
173 "Partial read from {} at {} (expected {}, actual {})",
174 m_path.native(), offset, size, read
179 size_t append(const std::string& data) {
180 assertx(m_fd >= 0);
182 auto const written = folly::writeFull(m_fd, data.data(), data.size());
183 if (written < 0) {
184 throw Error{
185 folly::sformat(
186 "Failed writing {} bytes to {} [{}]",
187 data.size(), m_path.native(), folly::errnoStr(errno)
191 if (written != data.size()) {
192 throw Error{
193 folly::sformat(
194 "Partial write to {} (expected {}, actual {})",
195 m_path.native(), data.size(), written
199 auto const prev = m_offset;
200 m_offset += written;
201 return prev;
204 // Update m_offset to the end of the file. This is only needed if
205 // you've opened an already created file, or if you know someone
206 // else has written to it.
207 void syncOffset() {
208 assertx(m_fd >= 0);
209 auto const size = ::lseek(m_fd, 0, SEEK_END);
210 if (size < 0) {
211 throw Error{
212 folly::sformat(
213 "Unable to seek to end of {}",
214 m_path.native()
218 m_offset = size;
221 private:
222 int m_fd;
223 fs::path m_path;
224 size_t m_offset;
227 //////////////////////////////////////////////////////////////////////
229 // Read from the FD (which is assumed to be a pipe) and append the
230 // contents to the given string. Return true if the pipe is now
231 // closed, or false otherwise. The false case can only happen if the
232 // FD is non-blocking.
233 bool readFromPipe(int fd, std::string& s) {
234 auto realEnd = s.size();
235 SCOPE_EXIT { s.resize(realEnd); };
236 while (true) {
237 assertx(realEnd <= s.size());
238 auto spaceLeft = s.size() - realEnd;
239 while (spaceLeft < 1024) {
240 folly::resizeWithoutInitialization(
241 s, std::max<size_t>(4096, s.size() * 2)
243 spaceLeft = s.size() - realEnd;
245 auto const read =
246 folly::readNoInt(fd, s.data() + realEnd, spaceLeft);
247 if (read < 0) {
248 if (errno == EAGAIN) return false;
249 throw Error{
250 folly::sformat(
251 "Failed reading from pipe {} [{}]",
253 folly::errnoStr(errno)
256 } else if (read == 0) {
257 return true;
259 realEnd += read;
263 // Read from the FD (which is assumed to be a pipe) until it is closed
264 // (returns EOF). Returns all data read. Only use this with a blocking
265 // FD, as it will spin otherwise.
266 std::string readFromPipe(int fd) {
267 std::string out;
268 while (!readFromPipe(fd, out)) {}
269 return out;
272 // Write all of the given data to the FD (which is assumed to be a
273 // pipe).
274 void writeToPipe(int fd, const char* data, size_t size) {
275 auto const written = folly::writeFull(fd, data, size);
276 if (written < 0) {
277 throw Error{
278 folly::sformat(
279 "Failed writing {} bytes to pipe {} [{}]",
280 size, fd, folly::errnoStr(errno)
284 if (written != size) {
285 throw Error{
286 folly::sformat(
287 "Partial write to pipe {} (expected {}, actual {})",
288 fd, size, written
294 //////////////////////////////////////////////////////////////////////
297 * "Serialized" API:
299 * For this mode a worker expects to be invoked with --local, followed
300 * by the name of the command, the root of the blob files, and the
301 * name of the blob file to write results to.
303 * This mode uses blob files written locally, and communicates its
304 * input/output via serialized RefIds via stdin and pipes. It's meant
305 * for SubprocessImpl and is more efficient than the "File" mode
306 * below.
308 * See SubprocessImpl for more description.
311 struct SerializedSource : public detail::ISource {
312 SerializedSource(fs::path root, std::string s)
313 : m_source{std::move(s)}
314 , m_decoder{m_source.data(), m_source.size()}
315 , m_currentInput{0}
316 , m_numInputs{0}
317 , m_root{std::move(root)}
319 ~SerializedSource() = default;
321 std::string blob() override {
322 return refToBlob(decodeRefId(m_decoder));
324 Optional<std::string> optBlob() override {
325 auto r = decodeOptRefId(m_decoder);
326 if (!r) return std::nullopt;
327 return refToBlob(*r);
330 BlobVec variadic() override {
331 return from(decodeRefIdVec(m_decoder))
332 | map([&] (const RefId& r) { return refToBlob(r); })
333 | as<std::vector>();
336 void initDone() override {
337 assertx(m_numInputs == 0);
338 assertx(m_currentInput == 0);
339 m_decoder(m_numInputs);
341 bool inputEnd() const override {
342 return m_currentInput >= m_numInputs;
344 void nextInput() override {
345 assertx(!inputEnd());
346 ++m_currentInput;
348 void finish() override { m_decoder.assertDone(); }
350 static RefId decodeRefId(BlobDecoder& d) {
351 // Optimize RefId representation for space. If m_size is <= the
352 // inline size, we know that m_extra is zero, and that m_id has
353 // the same length as m_size, so we do not need to encode it
354 // twice.
355 decltype(RefId::m_size) size;
356 d(size);
357 if (size <= g_inline_size) {
358 assertx(d.remaining() >= size);
359 std::string id{(const char*)d.data(), size};
360 d.advance(size);
361 return RefId{std::move(id), size, 0};
362 } else {
363 decltype(RefId::m_id) id;
364 decltype(RefId::m_extra) offset;
365 d(id);
366 d(offset);
367 return RefId{std::move(id), size, offset};
371 static Optional<RefId> decodeOptRefId(BlobDecoder& d) {
372 bool present;
373 d(present);
374 if (!present) return std::nullopt;
375 return decodeRefId(d);
378 static IdVec decodeRefIdVec(BlobDecoder& d) {
379 std::vector<RefId> out;
380 size_t size;
381 d(size);
382 out.reserve(size);
383 std::generate_n(
384 std::back_inserter(out),
385 size,
386 [&] { return decodeRefId(d); }
388 return out;
391 private:
392 std::string refToBlob(const RefId& r) {
393 if (r.m_size <= g_inline_size) {
394 assertx(r.m_id.size() == r.m_size);
395 assertx(!r.m_extra);
396 return r.m_id;
399 fs::path path{r.m_id};
400 if (path.is_absolute()) {
401 return FD{path, true, false, false}.read(r.m_extra, r.m_size);
404 auto it = m_fdCache.find(path.native());
405 if (it == m_fdCache.end()) {
406 auto [elem, emplaced] = m_fdCache.emplace(
407 path.native(),
408 FD{m_root / path, true, false, false}
410 assertx(emplaced);
411 it = elem;
413 return it->second.read(r.m_extra, r.m_size);
416 std::string m_source;
417 BlobDecoder m_decoder;
418 size_t m_currentInput;
419 size_t m_numInputs;
421 fs::path m_root;
422 hphp_fast_map<std::string, FD> m_fdCache;
425 struct SerializedSink : public detail::ISink {
426 explicit SerializedSink(fs::path outputFile)
427 : m_fd{outputFile, false, true, false} {}
429 void blob(const std::string& b) override {
430 encodeRefId(makeRefId(b), m_encoder);
432 void optBlob(const Optional<std::string>& b) override {
433 encodeOptRefId(b ? makeRefId(*b) : Optional<RefId>{}, m_encoder);
435 void variadic(const BlobVec& v) override {
436 auto const refs = from(v)
437 | map([&] (const std::string& b) { return makeRefId(b); })
438 | as<std::vector>();
439 encodeRefIdVec(refs, m_encoder);
441 void nextOutput() override {}
442 void startFini() override {}
444 void finish() override {
445 writeToPipe(
446 g_local_pipe_fd,
447 (const char*)m_encoder.data(),
448 m_encoder.size()
450 ::close(g_local_pipe_fd);
453 static void encodeRefId(const RefId& r, BlobEncoder& e) {
454 // Optimized RefId encoding. If it's an inline ref, we can encode
455 // it more optimally. See above in SerializedSource::decodeRefId.
456 e(r.m_size);
457 if (r.m_size <= g_inline_size) {
458 assertx(r.m_id.size() == r.m_size);
459 assertx(!r.m_extra);
460 e.writeRaw(r.m_id.data(), r.m_id.size());
461 } else {
462 e(r.m_id);
463 e(r.m_extra);
467 static void encodeOptRefId(const Optional<RefId>& r, BlobEncoder& e) {
468 if (r) {
469 e(true);
470 encodeRefId(*r, e);
471 } else {
472 e(false);
476 static void encodeRefIdVec(const IdVec& v, BlobEncoder& e) {
477 e((size_t)v.size());
478 for (auto const& r : v) encodeRefId(r, e);
481 private:
482 RefId makeRefId(const std::string& b) {
483 if (b.size() <= g_inline_size) return RefId{b, b.size(), 0};
484 auto const offset = m_fd.append(b);
485 return RefId{m_fd.path().filename(), b.size(), offset};
488 FD m_fd;
489 BlobEncoder m_encoder;
492 //////////////////////////////////////////////////////////////////////
495 * "File" API:
497 * For this mode a worker expects to be invoked with the name of the
498 * command, followed by 3 paths to directories. The directories
499 * represent the config inputs (for init()), the inputs (for multiple
500 * runs()), and the last is the output directory, which will be
501 * written to.
503 * All three directories use the same format for representing
504 * data:
506 * - The first level contains numbered directories (from 0 to N). Each
507 * numbered directory represents a set of inputs/outputs. For outputs,
508 * the number corresponds to the matching number of the inputs. The
509 * config directory does not have this level, as there's only ever one
510 * of them.
512 * - The next level contains numbered files or directories (from 0 to
513 * N). Each file/directory specifies a particular input. The exact
514 * number depends on the job being executed (both sides need to
515 * agree). The format of the file or directory depends on that
516 * input/output's type.
518 * - If the input/output is a "normal" type, it will be a file
519 * containing the serialized data (using BlobEncoder) for that
520 * input/output. Strings are a special case. If the type is a
521 * std::string, it will not be serialized and the string will be
522 * stored directly (this makes it easier to represent files as their
523 * contents).
525 * - If the input/output is an Opt<T>, it will be represented like a
526 * "normal" type of type T, except it is not required to be
527 * present. If the file is not present (so a gap in the numbering), it
528 * is assumed to be std::nullopt.
530 * - If the input/output is a Variadic<T>, it will be a
531 * directory. Inside of that directory will be numbered files from
532 * 0-N, one for each element of the vector. The vector elements will
533 * be encoded as "normal" types.
535 * NB: Marker types cannot nest, so there aren't any possible deeper
536 * levels than this.
539 struct FileSource : public detail::ISource {
540 FileSource(fs::path config, fs::path input)
541 : m_configPath{std::move(config)}
542 , m_inputPath{std::move(input)}
543 , m_itemIdx{0}
544 , m_inputIdx{0}
545 , m_itemBase{m_configPath} {}
546 ~FileSource() = default;
548 std::string blob() override {
549 auto const filename = m_itemBase / folly::to<std::string>(m_itemIdx++);
550 return detail::readFile(filename);
552 Optional<std::string> optBlob() override {
553 auto const filename = m_itemBase / folly::to<std::string>(m_itemIdx++);
554 if (!fs::exists(filename)) return std::nullopt;
555 return detail::readFile(filename);
558 BlobVec variadic() override {
559 BlobVec out;
560 auto const vecBase = m_itemBase / folly::to<std::string>(m_itemIdx++);
561 for (size_t i = 0;; ++i) {
562 auto const valPath = vecBase / folly::to<std::string>(i);
563 // A break in the numbering means the end of the vector.
564 if (!fs::exists(valPath)) break;
565 out.emplace_back(detail::readFile(valPath));
567 return out;
570 void initDone() override {
571 assertx(m_itemBase == m_configPath);
572 assertx(m_inputIdx == 0);
573 m_itemIdx = 0;
574 m_itemBase = m_inputPath / "0";
577 bool inputEnd() const override {
578 assertx(m_itemBase == m_inputPath / folly::to<std::string>(m_inputIdx));
579 return !fs::exists(m_itemBase);
582 void nextInput() override {
583 assertx(m_itemBase == m_inputPath / folly::to<std::string>(m_inputIdx));
584 m_itemIdx = 0;
585 m_itemBase = m_inputPath / folly::to<std::string>(++m_inputIdx);
588 void finish() override {}
589 private:
590 fs::path m_configPath;
591 fs::path m_inputPath;
593 size_t m_itemIdx;
594 size_t m_inputIdx;
595 fs::path m_itemBase;
598 struct FileSink : public detail::ISink {
599 explicit FileSink(fs::path base)
600 : m_base{std::move(base)}
601 , m_itemIdx{0}
602 , m_outputIdx{0}
604 // We insist on a clean output directory.
605 if (!fs::create_directory(m_base)) {
606 throw Error{
607 folly::sformat("Output directory {} already exists", m_base.native())
612 void blob(const std::string& b) override { write(b); }
613 void optBlob(const Optional<std::string>& b) override {
614 if (!b) {
615 makeDir();
616 ++m_itemIdx;
617 } else {
618 write(*b);
621 void variadic(const BlobVec& v) override {
622 auto const dir = makeDir();
623 auto const vecDir = dir / folly::to<std::string>(m_itemIdx);
624 fs::create_directory(vecDir, dir);
625 for (size_t i = 0; i < v.size(); ++i) {
626 auto const& b = v[i];
627 writeFile(vecDir / folly::to<std::string>(i), b.data(), b.size());
629 ++m_itemIdx;
632 void nextOutput() override {
633 assertx(m_outputIdx.has_value());
634 ++*m_outputIdx;
635 m_itemIdx = 0;
637 void startFini() override {
638 assertx(m_outputIdx.has_value());
639 m_outputIdx.reset();
640 m_itemIdx = 0;
642 void finish() override {}
643 private:
644 fs::path currentDir() {
645 if (!m_outputIdx) return m_base / "fini";
646 return m_base / folly::to<std::string>(*m_outputIdx);
649 fs::path makeDir() {
650 auto const outputDir = currentDir();
651 if (!m_itemIdx) fs::create_directory(outputDir, m_base);
652 return outputDir;
655 void write(const std::string& b) {
656 auto const dir = makeDir();
657 writeFile(dir / folly::to<std::string>(m_itemIdx), b.data(), b.size());
658 ++m_itemIdx;
661 fs::path m_base;
662 size_t m_itemIdx;
663 Optional<size_t> m_outputIdx;
666 //////////////////////////////////////////////////////////////////////
668 // folly::writeFile expects a container-like input, so this makes a
669 // ptr/length pair behave like one (without copying).
670 struct Adaptor {
671 size_t size() const { return m_size; }
672 bool empty() const { return !size(); }
673 const char& operator[](size_t idx) const { return m_ptr[idx]; }
674 const char* m_ptr;
675 size_t m_size;
678 //////////////////////////////////////////////////////////////////////
682 //////////////////////////////////////////////////////////////////////
684 namespace detail {
686 //////////////////////////////////////////////////////////////////////
688 // Wrappers around the folly functions with error handling
690 std::string readFile(const fs::path& path) {
691 std::string s;
692 if (!folly::readFile(path.c_str(), s)) {
693 throw Error{
694 folly::sformat(
695 "Unable to read input from {} [{}]",
696 path.c_str(), folly::errnoStr(errno)
700 return s;
703 void writeFile(const fs::path& path,
704 const char* ptr, size_t size) {
705 if (!folly::writeFile(Adaptor{ptr, size}, path.c_str())) {
706 throw Error{
707 folly::sformat(
708 "Unable to write output to {} [{}]",
709 path.c_str(), folly::errnoStr(errno)
715 //////////////////////////////////////////////////////////////////////
717 JobBase::JobBase(const std::string& name)
718 : m_name{name}
720 FTRACE(4, "registering remote worker command \"{}\"\n", m_name);
721 auto& r = registry();
722 std::lock_guard<std::mutex> _{r.lock};
723 auto const insert = r.registry.emplace(m_name, this);
724 always_assert(insert.second);
727 //////////////////////////////////////////////////////////////////////
731 //////////////////////////////////////////////////////////////////////
733 namespace {
735 // Parse the command-line and create the appropriate Source and Sink
736 // for input and output.
737 std::tuple<
738 std::unique_ptr<ISource>,
739 std::unique_ptr<ISink>,
740 std::string
742 parseOptions(int argc, char** argv) {
743 always_assert(argc > 1);
744 always_assert(!strcmp(argv[1], s_option));
746 if (argc >= 2 && !strcmp(argv[2], g_local_option)) {
747 if (argc != 6) {
748 std::cerr << "Usage: "
749 << argv[0]
750 << " " << s_option
751 << " " << g_local_option
752 << " <command name>"
753 << " <root>"
754 << " <output file>"
755 << std::endl;
756 return std::make_tuple(nullptr, nullptr, "");
759 std::string name{argv[3]};
760 fs::path root{argv[4]};
761 fs::path outputFile{argv[5]};
763 FTRACE(2, "extern worker run (local) (\"{}\", {}, {})\n",
764 name, root.native(), outputFile.native());
766 // Input comes from STDIN
767 auto source =
768 time("read-pipe", [] { return readFromPipe(STDIN_FILENO); });
769 return std::make_tuple(
770 std::make_unique<SerializedSource>(
771 std::move(root),
772 std::move(source)
774 std::make_unique<SerializedSink>(std::move(outputFile)),
775 std::move(name)
777 } else if (argc != 6) {
778 std::cerr << "Usage: "
779 << argv[0]
780 << " " << s_option
781 << " <command name>"
782 << " <config dir>"
783 << " <output dir>"
784 << " <input dir>"
785 << std::endl;
786 return std::make_tuple(nullptr, nullptr, "");
787 } else {
788 std::string name{argv[2]};
789 fs::path configPath{argv[3]};
790 fs::path outputPath{argv[4]};
791 fs::path inputPath{argv[5]};
793 FTRACE(2, "extern worker run(\"{}\", {}, {}, {})\n",
794 name, configPath.native(), outputPath.native(),
795 inputPath.native());
797 return std::make_tuple(
798 std::make_unique<FileSource>(
799 std::move(configPath),
800 std::move(inputPath)
802 std::make_unique<FileSink>(std::move(outputPath)),
803 std::move(name)
810 //////////////////////////////////////////////////////////////////////
812 int main(int argc, char** argv) {
813 Timer _{"main"};
815 try {
816 auto const [source, sink, name] = parseOptions(argc, argv);
817 if (!source) return EXIT_FAILURE;
819 // Lookup the registered job for the requested name.
820 auto const worker = [&, name = name] {
821 auto& r = registry();
822 std::lock_guard<std::mutex> _{r.lock};
823 auto const it = r.registry.find(name);
824 if (it == r.registry.end()) {
825 throw Error{folly::sformat("No command named `{}` registered", name)};
827 return it->second;
828 }();
830 g_in_job = true;
831 SCOPE_EXIT { g_in_job = false; };
833 // First do any global initialization.
834 time("init", [&, &source = source] { worker->init(*source); });
835 time("run-all", [&, &source = source, &sink = sink] {
836 // Then execute run() until we're out of inputs.
837 size_t run = 0;
838 while (!source->inputEnd()) {
839 time(
840 [&] { return folly::sformat("run {}", run); },
841 [&, &source = source, &sink = sink] { worker->run(*source, *sink); }
843 ++run;
845 source->finish();
847 // Do any cleanup and flush output to its final destination
848 time("fini", [&, &sink = sink] { worker->fini(*sink); });
849 time("flush", [&sink = sink] { sink->finish(); });
850 return 0;
851 } catch (const std::exception& exn) {
852 std::cerr << "Error: " << exn.what() << std::endl;
853 return EXIT_FAILURE;
857 //////////////////////////////////////////////////////////////////////
859 namespace {
861 //////////////////////////////////////////////////////////////////////
864 * The builtin implementation which uses fork+exec (and stores data on
865 * disk). It is always available, reasonably efficient, and assumed to
866 * be "reliable" (IE, never needs a fallback).
868 * All data is stored under the "root" (which is the working-dir
869 * specified in Options).
871 * Data is stored in blob files under the root, whose names are
872 * assigned numerically. The blob files are pooled. When a thread
873 * needs to write, it checks out a free file, append the data to it,
874 * then "returns" it back to the pool. The particular blob file data
875 * is stored to is therefore arbitrary. Files are not stored at all
876 * (since they're already on disk).
878 * The RefIds are just the path to the file, either a blob file, or
879 * some other file. If the data was stored in a blob file, the m_extra
880 * of the RefId will contain its offset in the blob file. For stored
881 * files, m_extra will always be zero.
883 * As an optimization, if the data is less or equal to g_inline_size,
884 * then its stored "inline" in the RefId. That is, m_id is the blob
885 * itself. Inline/non-inline blobs can be distinguished by the m_size
886 * field. This also applies to stored files (if sufficiently small,
887 * we'll read it and store it inline).
889 * Workers are forked, given their inputs, calculate outputs, then
890 * write their outputs and exit. Input is given to the worker via
891 * stdin (as a stream of serialized RefIds). As part of the
892 * command-line, the worker is given an output file to write its
893 * output to. It does (creating RefIds in the process), and reports
894 * the output RefIds via a pipe (g_local_pipe_fd).
896 struct SubprocessImpl : public Client::Impl {
897 SubprocessImpl(const Options&, Client&);
898 ~SubprocessImpl() override;
900 std::string session() const override { return m_fdManager->root().native(); }
902 bool isSubprocess() const override { return true; }
903 bool supportsOptimistic() const override { return false; }
904 bool isDisabled() const override { return false; }
906 coro::Task<BlobVec> load(const RequestId&, IdVec) override;
907 coro::Task<IdVec> store(const RequestId&, PathVec, BlobVec,
908 bool) override;
909 coro::Task<std::vector<RefValVec>>
910 exec(const RequestId&,
911 const std::string&,
912 RefValVec,
913 std::vector<RefValVec>,
914 const folly::Range<const OutputType*>&,
915 const folly::Range<const OutputType*>*) override;
917 private:
918 // Manage the pool of blob files
919 struct FDManager {
920 explicit FDManager(fs::path);
922 const fs::path& root() const { return m_root; }
924 // Acquire a FD to the given file to read from. If the path does
925 // not correspond to a blob file, nullptr is returned.
926 const FD* acquireForRead(const fs::path&);
927 // Acquire a FD to a blob file to append to. You don't have a say
928 // in which file you get. The FD must be returned via release()
929 // when done. You have exclusive access to this FD.
930 FD* acquireForAppend();
931 // Release a FD acquired from acquireForAppend.
932 void release(FD&);
933 private:
934 std::unique_ptr<FD> newFD();
936 fs::path m_root;
937 folly_concurrent_hash_map_simd<std::string, std::unique_ptr<FD>> m_fds;
939 std::mutex m_lock;
940 std::stack<FD*> m_forAppend;
941 size_t m_nextBlob;
944 // Similar to FDManager, but manages trace files. We have workers
945 // re-use trace files to avoid generating a huge number of time.
946 struct TraceFileManager {
947 fs::path get();
948 void put(fs::path);
949 private:
950 std::mutex m_lock;
951 std::stack<fs::path> m_paths;
952 size_t m_nextId{0};
955 static fs::path newRoot(const Options&);
957 coro::Task<std::string> doSubprocess(const RequestId&,
958 const std::string&,
959 std::string,
960 const fs::path&);
962 Options m_options;
963 std::unique_ptr<FDManager> m_fdManager;
964 TraceFileManager m_traceManager;
967 SubprocessImpl::SubprocessImpl(const Options& options, Client& parent)
968 : Impl{"subprocess", parent}
969 , m_options{options}
970 , m_fdManager{std::make_unique<FDManager>(newRoot(m_options))}
972 FTRACE(2, "Using subprocess extern-worker impl with root at {}\n",
973 m_fdManager->root().native());
974 Logger::FInfo(
975 "Using subprocess extern-worker at {}",
976 m_fdManager->root().native()
980 SubprocessImpl::~SubprocessImpl() {
981 auto const root = m_fdManager->root();
982 // Destroy FD manager first, to ensure no open FDs to any files
983 // while cleaning up.
984 m_fdManager.reset();
986 if (m_options.m_cleanup) {
987 Logger::FInfo(
988 "Cleaning up subprocess extern-worker at {}...",
989 root.native()
991 auto const before = std::chrono::steady_clock::now();
992 auto const removed = time(
993 "subprocess cleanup",
994 [&] {
995 std::error_code ec; // Suppress exceptions
996 return fs::remove_all(root, ec);
999 auto const elapsed = std::chrono::duration_cast<std::chrono::duration<double>>(
1000 std::chrono::steady_clock::now() - before
1001 ).count();
1002 FTRACE(2, "removed {} files\n", removed);
1003 Logger::FInfo(
1004 "Done cleaning up subprocess extern-worker. "
1005 "({} files removed) (took {})",
1006 removed, folly::prettyPrint(elapsed, folly::PRETTY_TIME_HMS, false)
1011 // Ensure we always have an unique root under the working directory.
1012 fs::path SubprocessImpl::newRoot(const Options& opts) {
1013 auto const base = opts.m_workingDir / "hphp-extern-worker";
1014 fs::create_directories(base);
1015 auto const full = base / boost::filesystem::unique_path(
1016 "%%%%-%%%%-%%%%-%%%%-%%%%-%%%%"
1017 ).native();
1018 fs::create_directory(full, base);
1019 return fs::canonical(full);
1022 coro::Task<BlobVec> SubprocessImpl::load(const RequestId& requestId,
1023 IdVec ids) {
1024 // Read every file corresponding to the id and return their
1025 // contents.
1026 auto out = from(ids)
1027 | mapped([&] (const RefId& id) {
1028 FTRACE(4, "{} reading blob from {}\n",
1029 requestId.tracePrefix(), id.toString());
1030 if (id.m_size <= g_inline_size) {
1031 // Inline data requires no file read
1032 assertx(id.m_id.size() == id.m_size);
1033 assertx(!id.m_extra);
1034 return id.m_id;
1036 if (auto const fd = m_fdManager->acquireForRead(id.m_id)) {
1037 // The data is in a blob file, so use a cached FD
1038 return fd->read(id.m_extra, id.m_size);
1040 // It's some other (non-blob) file. Create an ephemeral FD to
1041 // read it.
1042 return FD{id.m_id, true, false, false}.read(id.m_extra, id.m_size);
1044 | as<std::vector>();
1045 HPHP_CORO_MOVE_RETURN(out);
1048 coro::Task<IdVec> SubprocessImpl::store(const RequestId& requestId,
1049 PathVec paths,
1050 BlobVec blobs,
1051 bool) {
1052 // SubprocessImpl never "uploads" files, but it must write to disk
1053 // (which we classify as an upload).
1055 FD* fd = nullptr;
1056 SCOPE_EXIT { if (fd) m_fdManager->release(*fd); };
1058 // Update stats. Skip blobs which we'll store inline.
1059 for (auto const& b : blobs) {
1060 if (b.size() <= g_inline_size) continue;
1061 ++stats().blobsUploaded;
1062 stats().blobBytesUploaded += b.size();
1063 if (!fd) fd = m_fdManager->acquireForAppend();
1064 assertx(fd);
1067 auto out =
1068 ((from(paths)
1069 | mapped([&] (const fs::path& p) {
1070 auto fileSize = fs::file_size(p);
1071 if (fileSize <= g_inline_size) {
1072 FTRACE(4, "{} storing file {} inline\n",
1073 requestId.tracePrefix(),
1074 p.native());
1075 auto const contents = readFile(p);
1076 // Size of file could theoretically change between
1077 // stat-ing and reading it.
1078 if (contents.size() <= g_inline_size) {
1079 return RefId{contents, contents.size(), 0};
1081 fileSize = contents.size();
1083 // We distinguish blob files from non-blob files by making
1084 // sure non-blob files are always absolute paths (blob files
1085 // are always relative).
1086 return RefId{fs::canonical(p).native(), fileSize, 0};
1089 (from(blobs)
1090 | mapped([&] (const std::string& b) {
1091 if (b.size() <= g_inline_size) {
1092 FTRACE(4, "{} storing blob inline\n",
1093 requestId.tracePrefix());
1094 return RefId{b, b.size(), 0};
1096 FTRACE(4, "{} writing size {} blob to {}\n",
1097 requestId.tracePrefix(), b.size(), fd->path().native());
1098 auto const offset = fd->append(b);
1099 RefId r{fd->path().filename().native(), b.size(), offset};
1100 FTRACE(4, "{} written as {}\n", requestId.tracePrefix(), r.toString());
1101 return r;
1104 | as<std::vector>();
1105 HPHP_CORO_MOVE_RETURN(out);
1108 coro::Task<std::vector<RefValVec>>
1109 SubprocessImpl::exec(const RequestId& requestId,
1110 const std::string& command,
1111 RefValVec config,
1112 std::vector<RefValVec> inputs,
1113 const folly::Range<const OutputType*>& output,
1114 const folly::Range<const OutputType*>* finiOutput) {
1115 FTRACE(4, "{} executing \"{}\" ({} runs)\n",
1116 requestId.tracePrefix(), command, inputs.size());
1118 HPHP_CORO_SAFE_POINT;
1120 // Each set of inputs should always have the same size.
1121 if (debug && !inputs.empty()) {
1122 auto const size = inputs[0].size();
1123 for (size_t i = 1; i < inputs.size(); ++i) {
1124 always_assert(inputs[i].size() == size);
1128 // Encode all the inputs
1129 BlobEncoder encoder;
1130 auto const encodeParams = [&] (const RefValVec& params) {
1131 for (auto const& param : params) {
1132 match<void>(
1133 param,
1134 [&] (const RefId& id) {
1135 SerializedSink::encodeRefId(id, encoder);
1137 [&] (const Optional<RefId>& id) {
1138 SerializedSink::encodeOptRefId(id, encoder);
1140 [&] (const IdVec& ids) {
1141 SerializedSink::encodeRefIdVec(ids, encoder);
1146 encodeParams(config);
1147 encoder((size_t)inputs.size());
1148 for (auto const& input : inputs) encodeParams(input);
1150 // Acquire a FD. We're not actually going to write to it, but the
1151 // worker will. This ensures the worker has exclusive access to the
1152 // file while it's running.
1153 auto fd = m_fdManager->acquireForAppend();
1154 SCOPE_EXIT { if (fd) m_fdManager->release(*fd); };
1156 // Do the actual fork+exec.
1157 auto const outputBlob = HPHP_CORO_AWAIT(
1158 doSubprocess(
1159 requestId,
1160 command,
1161 std::string{(const char*)encoder.data(), encoder.size()},
1162 fd->path()
1165 // The worker (maybe) wrote to the file, so we need to re-sync our
1166 // tracked offset.
1167 fd->syncOffset();
1169 // Decode the output
1170 BlobDecoder decoder{outputBlob.data(), outputBlob.size()};
1171 auto const makeOutput = [&] (OutputType type) -> RefVal {
1172 switch (type) {
1173 case OutputType::Val:
1174 return SerializedSource::decodeRefId(decoder);
1175 case OutputType::Opt:
1176 return SerializedSource::decodeOptRefId(decoder);
1177 case OutputType::Vec:
1178 return SerializedSource::decodeRefIdVec(decoder);
1180 always_assert(false);
1183 auto const makeOutputs = [&] (const folly::Range<const OutputType*>& types) {
1184 RefValVec vec;
1185 vec.reserve(types.size());
1186 for (auto const type : types) {
1187 vec.emplace_back(makeOutput(type));
1189 return vec;
1192 std::vector<RefValVec> out;
1193 out.reserve(inputs.size() + (finiOutput ? 1 : 0));
1194 std::generate_n(
1195 std::back_inserter(out),
1196 inputs.size(),
1197 [&] { return makeOutputs(output); }
1199 if (finiOutput) out.emplace_back(makeOutputs(*finiOutput));
1201 decoder.assertDone();
1202 HPHP_CORO_MOVE_RETURN(out);
1205 coro::Task<std::string>
1206 SubprocessImpl::doSubprocess(const RequestId& requestId,
1207 const std::string& command,
1208 std::string inputBlob,
1209 const fs::path& outputPath) {
1210 std::vector<std::string> args{
1211 current_executable_path(),
1212 s_option,
1213 g_local_option,
1214 command,
1215 m_fdManager->root().native(),
1216 outputPath.native()
1219 // Propagate the TRACE option in the environment. We'll copy the
1220 // trace output into this process' trace output.
1221 std::vector<std::string> env;
1222 Optional<fs::path> traceFile;
1223 SCOPE_EXIT { if (traceFile) m_traceManager.put(std::move(*traceFile)); };
1224 if (auto const trace = getenv("TRACE")) {
1225 traceFile = m_traceManager.get();
1226 auto const fullPath = m_fdManager->root() / *traceFile;
1227 env.emplace_back(folly::sformat("TRACE={}", trace));
1228 env.emplace_back(folly::sformat("HPHP_TRACE_FILE={}", fullPath.c_str()));
1231 FTRACE(
1232 4, "{} executing subprocess for '{}'{}(input size: {})\n",
1233 requestId.tracePrefix(),
1234 command,
1235 traceFile
1236 ? folly::sformat(" (trace-file: {}) ", traceFile->native())
1237 : "",
1238 inputBlob.size()
1241 HPHP_CORO_SAFE_POINT;
1243 auto const before = std::chrono::steady_clock::now();
1245 // Do the actual fork+exec.
1246 folly::Subprocess subprocess{
1247 args,
1248 folly::Subprocess::Options{}
1249 .parentDeathSignal(SIGKILL)
1250 .pipeStdin()
1251 .pipeStdout()
1252 .pipeStderr()
1253 .fd(g_local_pipe_fd, folly::Subprocess::PIPE_OUT)
1254 .closeOtherFds(),
1255 nullptr,
1256 &env
1259 std::string output;
1260 std::string stderr;
1261 size_t inputWritten = 0;
1263 // Communicate with the worker. When available, read from worker's
1264 // stderr and output pipe and store the data. Throw everything else
1265 // away. Attempt to write inputs to the worker's stdin whenever it
1266 // has free space.
1267 subprocess.communicate(
1268 [&] (int parentFd, int childFd) { // Read
1269 if (childFd == g_local_pipe_fd) {
1270 return readFromPipe(parentFd, output);
1271 } else if (childFd == STDERR_FILENO) {
1272 return readFromPipe(parentFd, stderr);
1273 } else {
1274 // Worker is writing to some other random FD (including
1275 // stdout). Ignore it.
1276 char dummy[512];
1277 folly::readNoInt(parentFd, dummy, sizeof dummy);
1279 return false;
1281 [&] (int parentFd, int childFd) { // Write
1282 // Close any writable FD except stdin
1283 if (childFd != STDIN_FILENO) return true;
1284 // We've written all of the input, so close stdin. This will
1285 // signal to the worker that input is all sent.
1286 if (inputWritten >= inputBlob.size()) return true;
1288 // Otherwise write what we can
1289 auto const toWrite = inputBlob.size() - inputWritten;
1290 auto const written =
1291 folly::writeNoInt(parentFd, inputBlob.data() + inputWritten, toWrite);
1292 if (written < 0) {
1293 if (errno == EAGAIN) return false;
1294 throw Error{
1295 folly::sformat(
1296 "Failed writing {} bytes to subprocess input for '{}' [{}]",
1297 toWrite, command, folly::errnoStr(errno)
1300 } else if (written == 0) {
1301 return true;
1303 inputWritten += written;
1304 return false;
1308 auto const returnCode = subprocess.wait();
1309 auto const elapsed = std::chrono::steady_clock::now() - before;
1311 stats().execCpuUsec +=
1312 std::chrono::duration_cast<std::chrono::microseconds>(elapsed).count();
1313 ++stats().execAllocatedCores;
1315 FTRACE(
1317 "{} subprocess finished (took {}). "
1318 "Output size: {}, Return code: {}, Stderr: {}\n",
1319 requestId.tracePrefix(),
1320 prettyPrint(
1321 std::chrono::duration_cast<std::chrono::duration<double>>(
1322 elapsed
1323 ).count(),
1324 folly::PRETTY_TIME,
1325 false
1327 output.size(),
1328 returnCode.str(),
1329 stderr
1332 HPHP_CORO_SAFE_POINT;
1334 // Do this before checking the return code. If the process failed,
1335 // we want to capture anything it logged before throwing.
1336 if (traceFile && fs::exists(m_fdManager->root() / *traceFile)) {
1337 auto const contents = readFile(m_fdManager->root() / *traceFile);
1338 if (!contents.empty()) {
1339 Trace::ftraceRelease(
1340 "vvvvvvvvvvvvvvvvvv remote-exec ({} \"{}\") vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv\n"
1341 "{}"
1342 "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
1343 requestId.toString(),
1344 command,
1345 contents
1350 if (!returnCode.exited() || returnCode.exitStatus() != 0) {
1351 throw WorkerError{
1352 folly::sformat(
1353 "Execution of `{}` failed: {}\nstderr:\n{}",
1354 command,
1355 returnCode.str(),
1356 stderr
1360 if (inputWritten != inputBlob.size()) {
1361 throw WorkerError{
1362 folly::sformat(
1363 "Execution of `{}` failed: Process failed to consume input\n",
1364 command
1369 HPHP_CORO_MOVE_RETURN(output);
1372 //////////////////////////////////////////////////////////////////////
1374 SubprocessImpl::FDManager::FDManager(fs::path root)
1375 : m_root{std::move(root)}
1376 , m_nextBlob{0}
1379 const FD* SubprocessImpl::FDManager::acquireForRead(const fs::path& path) {
1380 // Blob files are always relative
1381 if (path.is_absolute()) return nullptr;
1382 // Only already created blob files should be in m_fds. So, if it's
1383 // not present, it can't be a blob file.
1384 auto const it = m_fds.find(path.native());
1385 if (it == m_fds.end()) return nullptr;
1386 return it->second.get();
1389 FD* SubprocessImpl::FDManager::acquireForAppend() {
1390 // Use the blob file at the top of m_forAppend.
1391 std::scoped_lock<std::mutex> _{m_lock};
1392 if (m_forAppend.empty()) {
1393 // If empty, there's no free blob file. We need to create a new
1394 // one.
1395 auto fd = newFD();
1396 auto const path = fd->path();
1397 auto const [elem, emplaced] =
1398 m_fds.emplace(path.filename().native(), std::move(fd));
1399 assertx(emplaced);
1400 m_forAppend.push(elem->second.get());
1402 auto fd = m_forAppend.top();
1403 m_forAppend.pop();
1404 return fd;
1407 void SubprocessImpl::FDManager::release(FD& fd) {
1408 if (debug) {
1409 // Should be returning something cached
1410 auto const it = m_fds.find(fd.path().filename().native());
1411 always_assert(it != m_fds.end());
1412 always_assert(it->second.get() == &fd);
1414 std::scoped_lock<std::mutex> _{m_lock};
1415 m_forAppend.push(&fd);
1418 std::unique_ptr<FD> SubprocessImpl::FDManager::newFD() {
1419 // We deliberately keep the blob filename as small as possible
1420 // because they get serialized a lot and it keeps the input/output
1421 // sizes small.
1422 auto const id = m_nextBlob++;
1423 auto const filename = m_root / folly::to<std::string>(id);
1424 return std::make_unique<FD>(filename, true, true, true);
1427 //////////////////////////////////////////////////////////////////////
1429 fs::path SubprocessImpl::TraceFileManager::get() {
1430 std::scoped_lock<std::mutex> _{m_lock};
1431 if (m_paths.empty()) {
1432 auto const id = m_nextId++;
1433 m_paths.push(folly::sformat("trace-{:04}.log", id));
1435 auto path = std::move(m_paths.top());
1436 m_paths.pop();
1437 return path;
1440 void SubprocessImpl::TraceFileManager::put(fs::path path) {
1441 std::scoped_lock<std::mutex> _{m_lock};
1442 m_paths.push(std::move(path));
1445 //////////////////////////////////////////////////////////////////////
1449 //////////////////////////////////////////////////////////////////////
1451 Client::Client(folly::Executor::KeepAlive<> executor,
1452 const Options& options)
1453 : m_options{options}
1454 , m_stats{std::make_shared<Stats>()}
1455 , m_forceFallback{false}
1456 , m_fallbackSem{1}
1458 Timer _{"create impl"};
1459 // Look up which implementation to use. If a hook has been
1460 // registered, and we're allowed to use it according to the Options,
1461 // try to use it.
1462 if (g_impl_hook &&
1463 m_options.m_useSubprocess != Options::UseSubprocess::Always) {
1464 m_impl = g_impl_hook(m_options, executor, *this);
1466 // The hook can return nullptr even if registered. In each case, we
1467 // have no special implementation to use.
1468 if (!m_impl) {
1469 // Use the subprocess implementation (which is always available),
1470 // unless the Options specifies we shouldn't. If not, it's a fatal
1471 // error.
1472 if (m_options.m_useSubprocess == Options::UseSubprocess::Never) {
1473 throw Error{"No non-subprocess impl available"};
1475 m_impl = std::make_unique<SubprocessImpl>(m_options, *this);
1477 FTRACE(2, "created \"{}\" impl\n", m_impl->name());
1480 Client::~Client() {
1481 Timer _{[&] { return folly::sformat("destroy impl {}", m_impl->name()); }};
1482 m_impl.reset();
1483 m_fallbackImpl.reset();
1486 std::unique_ptr<Client::Impl> Client::makeFallbackImpl() {
1487 // This will be called once from within LockFreeLazy, so we only
1488 // emit this warning once.
1489 Logger::Warning(
1490 "Certain operations will use local fallback from this "
1491 "point on and may run slower."
1493 return std::make_unique<SubprocessImpl>(m_options, *this);
1496 coro::Task<Ref<std::string>> Client::storeFile(fs::path path,
1497 bool optimistic) {
1498 RequestId requestId{"store file"};
1500 FTRACE(
1501 2, "{} storing {}{}\n",
1502 requestId.tracePrefix(),
1503 path.native(),
1504 optimistic ? " (optimistically)" : ""
1507 ++m_stats->files;
1508 ++m_stats->storeCalls;
1509 SCOPE_EXIT {
1510 m_stats->storeLatencyUsec += std::chrono::duration_cast<
1511 std::chrono::microseconds
1512 >(requestId.elapsed()).count();
1515 auto wasFallback = false;
1516 auto ids = HPHP_CORO_AWAIT(tryWithFallback<IdVec>(
1517 [&] (Impl& i, bool isFallback) {
1518 if (isFallback) ++m_stats->fileFallbacks;
1519 return i.store(
1520 requestId,
1521 PathVec{path},
1523 optimistic
1526 wasFallback
1528 assertx(ids.size() == 1);
1530 Ref<std::string> ref{std::move(ids[0]), wasFallback};
1531 HPHP_CORO_MOVE_RETURN(ref);
1534 coro::Task<std::vector<Ref<std::string>>>
1535 Client::storeFile(std::vector<fs::path> paths,
1536 bool optimistic) {
1537 RequestId requestId{"store files"};
1539 FTRACE(
1540 2, "{} storing {} files{}\n",
1541 requestId.tracePrefix(),
1542 paths.size(),
1543 optimistic ? " (optimistically)" : ""
1545 ONTRACE(4, [&] {
1546 for (auto const& p : paths) {
1547 FTRACE(4, "{} storing {}\n", requestId.tracePrefix(), p.native());
1549 }());
1551 m_stats->files += paths.size();
1552 ++m_stats->storeCalls;
1553 SCOPE_EXIT {
1554 m_stats->storeLatencyUsec += std::chrono::duration_cast<
1555 std::chrono::microseconds
1556 >(requestId.elapsed()).count();
1559 auto const DEBUG_ONLY size = paths.size();
1560 auto wasFallback = false;
1561 auto ids = HPHP_CORO_AWAIT(tryWithFallback<IdVec>(
1562 [&] (Impl& i, bool isFallback) {
1563 if (isFallback) m_stats->fileFallbacks += paths.size();
1564 return i.store(requestId, paths, {}, optimistic);
1566 wasFallback
1568 assertx(ids.size() == size);
1570 auto out = from(ids)
1571 | move
1572 | mapped([&] (auto&& id) {
1573 return Ref<std::string>{std::move(id), wasFallback};
1575 | as<std::vector>();
1576 HPHP_CORO_MOVE_RETURN(out);
1579 //////////////////////////////////////////////////////////////////////
1581 // Sleep some random amount corresponding to the number of retries
1582 // we've tried.
1583 void Client::Impl::throttleSleep(size_t retry,
1584 std::chrono::milliseconds base) {
1585 // Each retry doubles the size of the window. We select a random
1586 // amount from within that.
1587 auto const scale = uint64_t{1} << std::min(retry, size_t{16});
1588 auto const window = std::chrono::microseconds{base} * scale;
1590 static std::mt19937_64 engine = [&] {
1591 std::random_device rd;
1592 std::seed_seq seed{rd(), rd(), rd(), rd(), rd(), rd(), rd(), rd()};
1593 return std::mt19937_64{seed};
1594 }();
1596 std::uniform_int_distribution<> distrib(0, window.count());
1597 auto const wait = [&] {
1598 static std::mutex lock;
1599 std::lock_guard<std::mutex> _{lock};
1600 return std::chrono::microseconds{distrib(engine)};
1601 }();
1603 // Use normal sleep here, not coro friendly sleep. The whole purpose
1604 // is to slow down execution and using a coro sleep will just allow
1605 // a lot of other actions to run.
1606 std::this_thread::sleep_for(wait);
1609 //////////////////////////////////////////////////////////////////////
1611 std::string Client::Stats::toString(const std::string& phase,
1612 const std::string& extra) const {
1613 auto const bytes = [] (size_t b) {
1614 auto s = folly::prettyPrint(
1616 folly::PRETTY_BYTES
1618 if (!s.empty() && s[s.size()-1] == ' ') s.resize(s.size()-1);
1619 return s;
1622 auto const usecs = [] (size_t t) {
1623 auto s = prettyPrint(
1624 double(t) / 1000000.0,
1625 folly::PRETTY_TIME_HMS,
1626 false
1628 if (!s.empty() && s[s.size()-1] == ' ') s.resize(s.size()-1);
1629 return s;
1632 auto const pct = [] (size_t a, size_t b) {
1633 if (!b) return 0.0;
1634 return double(a) / b * 100.0;
1637 auto const execs_ = execs.load();
1638 auto const allocatedCores = execAllocatedCores.load();
1639 auto const cpuUsecs = execCpuUsec.load();
1640 auto const execCalls_ = execCalls.load();
1641 auto const storeCalls_ = storeCalls.load();
1642 auto const loadCalls_ = loadCalls.load();
1644 return folly::sformat(
1645 " {}:{}\n"
1646 " Execs: {:,} total, {:,} cache-hits ({:.2f}%), {:,} optimistically, {:,} fallback\n"
1647 " Files: {:,} total, {:,} read, {:,} queried, {:,} uploaded ({}), {:,} fallback\n"
1648 " Blobs: {:,} total, {:,} queried, {:,} uploaded ({}), {:,} fallback\n"
1649 " Cpu: {} usage, {:,} allocated cores ({}/core)\n"
1650 " Mem: {} max used, {} reserved\n"
1651 " {:,} downloads ({}), {:,} throttles\n"
1652 " Avg Latency: {} exec, {} store, {} load",
1653 phase,
1654 extra.empty() ? "" : folly::sformat(" {}", extra),
1655 execs_,
1656 execCacheHits.load(),
1657 pct(execCacheHits.load(), execs_),
1658 optimisticExecs.load(),
1659 execFallbacks.load(),
1660 files.load(),
1661 filesRead.load(),
1662 filesQueried.load(),
1663 filesUploaded.load(),
1664 bytes(fileBytesUploaded.load()),
1665 fileFallbacks.load(),
1666 blobs.load(),
1667 blobsQueried.load(),
1668 blobsUploaded.load(),
1669 bytes(blobBytesUploaded.load()),
1670 blobFallbacks.load(),
1671 usecs(cpuUsecs),
1672 allocatedCores,
1673 usecs(allocatedCores ? (cpuUsecs / allocatedCores) : 0),
1674 bytes(execMaxUsedMem.load()),
1675 bytes(execReservedMem.load()),
1676 downloads.load(),
1677 bytes(bytesDownloaded.load()),
1678 throttles.load(),
1679 usecs(execCalls_ ? (execLatencyUsec.load() / execCalls_) : 0),
1680 usecs(storeCalls_ ? (storeLatencyUsec.load() / storeCalls_) : 0),
1681 usecs(loadCalls_ ? (loadLatencyUsec.load() / loadCalls_) : 0)
1685 void Client::Stats::logSample(const std::string& phase,
1686 StructuredLogEntry& sample) const {
1687 sample.setInt(phase + "_total_execs", execs.load());
1688 sample.setInt(phase + "_cache_hits", execCacheHits.load());
1689 sample.setInt(phase + "_optimistically", optimisticExecs.load());
1690 sample.setInt(phase + "_fallbacks", execFallbacks.load());
1692 sample.setInt(phase + "_total_files", files.load());
1693 sample.setInt(phase + "_file_reads", filesRead.load());
1694 sample.setInt(phase + "_file_queries", filesQueried.load());
1695 sample.setInt(phase + "_file_stores", filesUploaded.load());
1696 sample.setInt(phase + "_file_stores_bytes", fileBytesUploaded.load());
1697 sample.setInt(phase + "_file_fallbacks", fileFallbacks.load());
1699 sample.setInt(phase + "_total_blobs", blobs.load());
1700 sample.setInt(phase + "_blob_queries", blobsQueried.load());
1701 sample.setInt(phase + "_blob_stores", blobsUploaded.load());
1702 sample.setInt(phase + "_blob_stores_bytes", blobBytesUploaded.load());
1703 sample.setInt(phase + "_blob_fallbacks", blobFallbacks.load());
1705 sample.setInt(phase + "_total_loads", downloads.load());
1706 sample.setInt(phase + "_total_loads_bytes", bytesDownloaded.load());
1707 sample.setInt(phase + "_throttles", throttles.load());
1709 sample.setInt(phase + "_exec_cpu_usec", execCpuUsec.load());
1710 sample.setInt(phase + "_exec_allocated_cores", execAllocatedCores.load());
1711 sample.setInt(phase + "_exec_max_used_mem", execMaxUsedMem.load());
1712 sample.setInt(phase + "_exec_reserved_mem", execReservedMem.load());
1714 auto const execCalls_ = execCalls.load();
1715 auto const storeCalls_ = storeCalls.load();
1716 auto const loadCalls_ = loadCalls.load();
1718 sample.setInt(
1719 phase + "_avg_exec_latency_usec",
1720 execCalls_ ? (execLatencyUsec.load() / execCalls_) : 0
1722 sample.setInt(
1723 phase + "_avg_store_latency_usec",
1724 storeCalls_ ? (storeLatencyUsec.load() / storeCalls_) : 0
1726 sample.setInt(
1727 phase + "_avg_load_latency_usec",
1728 loadCalls_ ? (loadLatencyUsec.load() / loadCalls_) : 0
1732 //////////////////////////////////////////////////////////////////////