2 +----------------------------------------------------------------------+
4 +----------------------------------------------------------------------+
5 | Copyright (c) 2010-present Facebook, Inc. (http://www.facebook.com) |
6 +----------------------------------------------------------------------+
7 | This source file is subject to version 3.01 of the PHP license, |
8 | that is bundled with this package in the file LICENSE, and is |
9 | available through the world-wide-web at the following url: |
10 | http://www.php.net/license/3_01.txt |
11 | If you did not receive a copy of the PHP license and are unable to |
12 | obtain it through the world-wide-web, please send a note to |
13 | license@php.net so we can mail you a copy immediately. |
14 +----------------------------------------------------------------------+
17 #include "hphp/util/extern-worker.h"
19 #include "hphp/util/assertions.h"
20 #include "hphp/util/current-executable.h"
21 #include "hphp/util/hash-map.h"
22 #include "hphp/util/logger.h"
23 #include "hphp/util/match.h"
24 #include "hphp/util/struct-log.h"
25 #include "hphp/util/trace.h"
27 #include <folly/FileUtil.h>
28 #include <folly/Subprocess.h>
29 #include <folly/gen/Base.h>
30 #include <folly/memory/UninitializedMemoryHacks.h>
32 #include <boost/filesystem.hpp>
37 namespace fs
= std::filesystem
;
39 namespace HPHP::extern_worker
{
41 //////////////////////////////////////////////////////////////////////
43 using namespace detail
;
44 using namespace folly::gen
;
46 //////////////////////////////////////////////////////////////////////
48 const char* const s_option
= "--extern-worker";
50 std::atomic
<uint64_t> RequestId::s_next
{0};
51 std::atomic
<uint64_t> RequestId::s_active
{0};
53 const std::array
<OutputType
, 1> Client::s_valOutputType
{OutputType::Val
};
54 const std::array
<OutputType
, 1> Client::s_vecOutputType
{OutputType::Vec
};
55 const std::array
<OutputType
, 1> Client::s_optOutputType
{OutputType::Opt
};
57 ImplHook g_impl_hook
{nullptr};
59 thread_local
bool g_in_job
{false};
61 //////////////////////////////////////////////////////////////////////
65 //////////////////////////////////////////////////////////////////////
67 TRACE_SET_MOD(extern_worker
);
69 //////////////////////////////////////////////////////////////////////
71 // If passed via the command-line, the worker is running in "local"
72 // mode and will use a different mechanism to read inputs/write
74 const char* const g_local_option
= "--local";
76 // If running in local mode, the FD of the pipe used to communicate
77 // output back to the parent.
78 constexpr int g_local_pipe_fd
= 3;
80 // For the subprocess impl, if the size of the data is <= this
81 // constant, we'll store it "inline" in the ref itself, rather than
82 // writing it to disk. This values probably needs more tuning.
83 constexpr size_t g_inline_size
= 64;
85 //////////////////////////////////////////////////////////////////////
88 hphp_fast_string_map
<JobBase
*> registry
;
92 Registry
& registry() {
93 static Registry registry
;
97 //////////////////////////////////////////////////////////////////////
99 // Represents an open file. Used this over readFile/writeFile if you
100 // want to maintain a persistent FD, or want to just read/write just a
101 // portion of the file. Only reading or appending to the file is
104 FD(const fs::path
& path
, bool read
, bool write
, bool create
)
105 : m_path
{path
}, m_offset
{0} {
106 assertx(IMPLIES(create
, write
));
108 auto flags
= O_CLOEXEC
;
110 flags
|= (write
? O_RDWR
: O_RDONLY
);
116 if (write
) flags
|= O_APPEND
;
118 flags
|= O_CREAT
| O_EXCL
;
119 // We're creating it, so the file is empty
123 auto fd
= folly::openNoInt(m_path
.c_str(), flags
);
127 "Unable to open {} [{}]",
128 m_path
.native(), folly::errnoStr(errno
)
133 SCOPE_FAIL
{ ::close(m_fd
); };
135 // If we're going to write to the file, but not creating it, we
136 // don't know what the end of the file is, so find the current
138 if (write
&& !create
) syncOffset();
140 ~FD() { if (m_fd
>= 0) ::close(m_fd
); }
142 FD(const FD
&) = delete;
143 FD(FD
&& o
) noexcept
: m_fd
{o
.m_fd
}, m_path
{o
.m_path
}, m_offset
{o
.m_offset
}
145 FD
& operator=(const FD
&) = delete;
146 FD
& operator=(FD
&& o
) noexcept
{
147 std::swap(m_fd
, o
.m_fd
);
148 std::swap(m_path
, o
.m_path
);
149 std::swap(m_offset
, o
.m_offset
);
153 const fs::path
& path() const { return m_path
; }
155 std::string
read(size_t offset
, size_t size
) const {
159 folly::resizeWithoutInitialization(data
, size
);
161 auto const read
= folly::preadFull(m_fd
, data
.data(), size
, offset
);
162 if (read
== size
) return data
;
166 "Failed reading {} bytes from {} at {} [{}]",
167 size
, m_path
.native(), offset
, folly::errnoStr(errno
)
173 "Partial read from {} at {} (expected {}, actual {})",
174 m_path
.native(), offset
, size
, read
179 size_t append(const std::string
& data
) {
182 auto const written
= folly::writeFull(m_fd
, data
.data(), data
.size());
186 "Failed writing {} bytes to {} [{}]",
187 data
.size(), m_path
.native(), folly::errnoStr(errno
)
191 if (written
!= data
.size()) {
194 "Partial write to {} (expected {}, actual {})",
195 m_path
.native(), data
.size(), written
199 auto const prev
= m_offset
;
204 // Update m_offset to the end of the file. This is only needed if
205 // you've opened an already created file, or if you know someone
206 // else has written to it.
209 auto const size
= ::lseek(m_fd
, 0, SEEK_END
);
213 "Unable to seek to end of {}",
227 //////////////////////////////////////////////////////////////////////
229 // Read from the FD (which is assumed to be a pipe) and append the
230 // contents to the given string. Return true if the pipe is now
231 // closed, or false otherwise. The false case can only happen if the
232 // FD is non-blocking.
233 bool readFromPipe(int fd
, std::string
& s
) {
234 auto realEnd
= s
.size();
235 SCOPE_EXIT
{ s
.resize(realEnd
); };
237 assertx(realEnd
<= s
.size());
238 auto spaceLeft
= s
.size() - realEnd
;
239 while (spaceLeft
< 1024) {
240 folly::resizeWithoutInitialization(
241 s
, std::max
<size_t>(4096, s
.size() * 2)
243 spaceLeft
= s
.size() - realEnd
;
246 folly::readNoInt(fd
, s
.data() + realEnd
, spaceLeft
);
248 if (errno
== EAGAIN
) return false;
251 "Failed reading from pipe {} [{}]",
253 folly::errnoStr(errno
)
256 } else if (read
== 0) {
263 // Read from the FD (which is assumed to be a pipe) until it is closed
264 // (returns EOF). Returns all data read. Only use this with a blocking
265 // FD, as it will spin otherwise.
266 std::string
readFromPipe(int fd
) {
268 while (!readFromPipe(fd
, out
)) {}
272 // Write all of the given data to the FD (which is assumed to be a
274 void writeToPipe(int fd
, const char* data
, size_t size
) {
275 auto const written
= folly::writeFull(fd
, data
, size
);
279 "Failed writing {} bytes to pipe {} [{}]",
280 size
, fd
, folly::errnoStr(errno
)
284 if (written
!= size
) {
287 "Partial write to pipe {} (expected {}, actual {})",
294 //////////////////////////////////////////////////////////////////////
299 * For this mode a worker expects to be invoked with --local, followed
300 * by the name of the command, the root of the blob files, and the
301 * name of the blob file to write results to.
303 * This mode uses blob files written locally, and communicates its
304 * input/output via serialized RefIds via stdin and pipes. It's meant
305 * for SubprocessImpl and is more efficient than the "File" mode
308 * See SubprocessImpl for more description.
311 struct SerializedSource
: public detail::ISource
{
312 SerializedSource(fs::path root
, std::string s
)
313 : m_source
{std::move(s
)}
314 , m_decoder
{m_source
.data(), m_source
.size()}
317 , m_root
{std::move(root
)}
319 ~SerializedSource() = default;
321 std::string
blob() override
{
322 return refToBlob(decodeRefId(m_decoder
));
324 Optional
<std::string
> optBlob() override
{
325 auto r
= decodeOptRefId(m_decoder
);
326 if (!r
) return std::nullopt
;
327 return refToBlob(*r
);
330 BlobVec
variadic() override
{
331 return from(decodeRefIdVec(m_decoder
))
332 | map([&] (const RefId
& r
) { return refToBlob(r
); })
336 void initDone() override
{
337 assertx(m_numInputs
== 0);
338 assertx(m_currentInput
== 0);
339 m_decoder(m_numInputs
);
341 bool inputEnd() const override
{
342 return m_currentInput
>= m_numInputs
;
344 void nextInput() override
{
345 assertx(!inputEnd());
348 void finish() override
{ m_decoder
.assertDone(); }
350 static RefId
decodeRefId(BlobDecoder
& d
) {
351 // Optimize RefId representation for space. If m_size is <= the
352 // inline size, we know that m_extra is zero, and that m_id has
353 // the same length as m_size, so we do not need to encode it
355 decltype(RefId::m_size
) size
;
357 if (size
<= g_inline_size
) {
358 assertx(d
.remaining() >= size
);
359 std::string id
{(const char*)d
.data(), size
};
361 return RefId
{std::move(id
), size
, 0};
363 decltype(RefId::m_id
) id
;
364 decltype(RefId::m_extra
) offset
;
367 return RefId
{std::move(id
), size
, offset
};
371 static Optional
<RefId
> decodeOptRefId(BlobDecoder
& d
) {
374 if (!present
) return std::nullopt
;
375 return decodeRefId(d
);
378 static IdVec
decodeRefIdVec(BlobDecoder
& d
) {
379 std::vector
<RefId
> out
;
384 std::back_inserter(out
),
386 [&] { return decodeRefId(d
); }
392 std::string
refToBlob(const RefId
& r
) {
393 if (r
.m_size
<= g_inline_size
) {
394 assertx(r
.m_id
.size() == r
.m_size
);
399 fs::path path
{r
.m_id
};
400 if (path
.is_absolute()) {
401 return FD
{path
, true, false, false}.read(r
.m_extra
, r
.m_size
);
404 auto it
= m_fdCache
.find(path
.native());
405 if (it
== m_fdCache
.end()) {
406 auto [elem
, emplaced
] = m_fdCache
.emplace(
408 FD
{m_root
/ path
, true, false, false}
413 return it
->second
.read(r
.m_extra
, r
.m_size
);
416 std::string m_source
;
417 BlobDecoder m_decoder
;
418 size_t m_currentInput
;
422 hphp_fast_map
<std::string
, FD
> m_fdCache
;
425 struct SerializedSink
: public detail::ISink
{
426 explicit SerializedSink(fs::path outputFile
)
427 : m_fd
{outputFile
, false, true, false} {}
429 void blob(const std::string
& b
) override
{
430 encodeRefId(makeRefId(b
), m_encoder
);
432 void optBlob(const Optional
<std::string
>& b
) override
{
433 encodeOptRefId(b
? makeRefId(*b
) : Optional
<RefId
>{}, m_encoder
);
435 void variadic(const BlobVec
& v
) override
{
436 auto const refs
= from(v
)
437 | map([&] (const std::string
& b
) { return makeRefId(b
); })
439 encodeRefIdVec(refs
, m_encoder
);
441 void nextOutput() override
{}
442 void startFini() override
{}
444 void finish() override
{
447 (const char*)m_encoder
.data(),
450 ::close(g_local_pipe_fd
);
453 static void encodeRefId(const RefId
& r
, BlobEncoder
& e
) {
454 // Optimized RefId encoding. If it's an inline ref, we can encode
455 // it more optimally. See above in SerializedSource::decodeRefId.
457 if (r
.m_size
<= g_inline_size
) {
458 assertx(r
.m_id
.size() == r
.m_size
);
460 e
.writeRaw(r
.m_id
.data(), r
.m_id
.size());
467 static void encodeOptRefId(const Optional
<RefId
>& r
, BlobEncoder
& e
) {
476 static void encodeRefIdVec(const IdVec
& v
, BlobEncoder
& e
) {
478 for (auto const& r
: v
) encodeRefId(r
, e
);
482 RefId
makeRefId(const std::string
& b
) {
483 if (b
.size() <= g_inline_size
) return RefId
{b
, b
.size(), 0};
484 auto const offset
= m_fd
.append(b
);
485 return RefId
{m_fd
.path().filename(), b
.size(), offset
};
489 BlobEncoder m_encoder
;
492 //////////////////////////////////////////////////////////////////////
497 * For this mode a worker expects to be invoked with the name of the
498 * command, followed by 3 paths to directories. The directories
499 * represent the config inputs (for init()), the inputs (for multiple
500 * runs()), and the last is the output directory, which will be
503 * All three directories use the same format for representing
506 * - The first level contains numbered directories (from 0 to N). Each
507 * numbered directory represents a set of inputs/outputs. For outputs,
508 * the number corresponds to the matching number of the inputs. The
509 * config directory does not have this level, as there's only ever one
512 * - The next level contains numbered files or directories (from 0 to
513 * N). Each file/directory specifies a particular input. The exact
514 * number depends on the job being executed (both sides need to
515 * agree). The format of the file or directory depends on that
516 * input/output's type.
518 * - If the input/output is a "normal" type, it will be a file
519 * containing the serialized data (using BlobEncoder) for that
520 * input/output. Strings are a special case. If the type is a
521 * std::string, it will not be serialized and the string will be
522 * stored directly (this makes it easier to represent files as their
525 * - If the input/output is an Opt<T>, it will be represented like a
526 * "normal" type of type T, except it is not required to be
527 * present. If the file is not present (so a gap in the numbering), it
528 * is assumed to be std::nullopt.
530 * - If the input/output is a Variadic<T>, it will be a
531 * directory. Inside of that directory will be numbered files from
532 * 0-N, one for each element of the vector. The vector elements will
533 * be encoded as "normal" types.
535 * NB: Marker types cannot nest, so there aren't any possible deeper
539 struct FileSource
: public detail::ISource
{
540 FileSource(fs::path config
, fs::path input
)
541 : m_configPath
{std::move(config
)}
542 , m_inputPath
{std::move(input
)}
545 , m_itemBase
{m_configPath
} {}
546 ~FileSource() = default;
548 std::string
blob() override
{
549 auto const filename
= m_itemBase
/ folly::to
<std::string
>(m_itemIdx
++);
550 return detail::readFile(filename
);
552 Optional
<std::string
> optBlob() override
{
553 auto const filename
= m_itemBase
/ folly::to
<std::string
>(m_itemIdx
++);
554 if (!fs::exists(filename
)) return std::nullopt
;
555 return detail::readFile(filename
);
558 BlobVec
variadic() override
{
560 auto const vecBase
= m_itemBase
/ folly::to
<std::string
>(m_itemIdx
++);
561 for (size_t i
= 0;; ++i
) {
562 auto const valPath
= vecBase
/ folly::to
<std::string
>(i
);
563 // A break in the numbering means the end of the vector.
564 if (!fs::exists(valPath
)) break;
565 out
.emplace_back(detail::readFile(valPath
));
570 void initDone() override
{
571 assertx(m_itemBase
== m_configPath
);
572 assertx(m_inputIdx
== 0);
574 m_itemBase
= m_inputPath
/ "0";
577 bool inputEnd() const override
{
578 assertx(m_itemBase
== m_inputPath
/ folly::to
<std::string
>(m_inputIdx
));
579 return !fs::exists(m_itemBase
);
582 void nextInput() override
{
583 assertx(m_itemBase
== m_inputPath
/ folly::to
<std::string
>(m_inputIdx
));
585 m_itemBase
= m_inputPath
/ folly::to
<std::string
>(++m_inputIdx
);
588 void finish() override
{}
590 fs::path m_configPath
;
591 fs::path m_inputPath
;
598 struct FileSink
: public detail::ISink
{
599 explicit FileSink(fs::path base
)
600 : m_base
{std::move(base
)}
604 // We insist on a clean output directory.
605 if (!fs::create_directory(m_base
)) {
607 folly::sformat("Output directory {} already exists", m_base
.native())
612 void blob(const std::string
& b
) override
{ write(b
); }
613 void optBlob(const Optional
<std::string
>& b
) override
{
621 void variadic(const BlobVec
& v
) override
{
622 auto const dir
= makeDir();
623 auto const vecDir
= dir
/ folly::to
<std::string
>(m_itemIdx
);
624 fs::create_directory(vecDir
, dir
);
625 for (size_t i
= 0; i
< v
.size(); ++i
) {
626 auto const& b
= v
[i
];
627 writeFile(vecDir
/ folly::to
<std::string
>(i
), b
.data(), b
.size());
632 void nextOutput() override
{
633 assertx(m_outputIdx
.has_value());
637 void startFini() override
{
638 assertx(m_outputIdx
.has_value());
642 void finish() override
{}
644 fs::path
currentDir() {
645 if (!m_outputIdx
) return m_base
/ "fini";
646 return m_base
/ folly::to
<std::string
>(*m_outputIdx
);
650 auto const outputDir
= currentDir();
651 if (!m_itemIdx
) fs::create_directory(outputDir
, m_base
);
655 void write(const std::string
& b
) {
656 auto const dir
= makeDir();
657 writeFile(dir
/ folly::to
<std::string
>(m_itemIdx
), b
.data(), b
.size());
663 Optional
<size_t> m_outputIdx
;
666 //////////////////////////////////////////////////////////////////////
668 // folly::writeFile expects a container-like input, so this makes a
669 // ptr/length pair behave like one (without copying).
671 size_t size() const { return m_size
; }
672 bool empty() const { return !size(); }
673 const char& operator[](size_t idx
) const { return m_ptr
[idx
]; }
678 //////////////////////////////////////////////////////////////////////
682 //////////////////////////////////////////////////////////////////////
686 //////////////////////////////////////////////////////////////////////
688 // Wrappers around the folly functions with error handling
690 std::string
readFile(const fs::path
& path
) {
692 if (!folly::readFile(path
.c_str(), s
)) {
695 "Unable to read input from {} [{}]",
696 path
.c_str(), folly::errnoStr(errno
)
703 void writeFile(const fs::path
& path
,
704 const char* ptr
, size_t size
) {
705 if (!folly::writeFile(Adaptor
{ptr
, size
}, path
.c_str())) {
708 "Unable to write output to {} [{}]",
709 path
.c_str(), folly::errnoStr(errno
)
715 //////////////////////////////////////////////////////////////////////
717 JobBase::JobBase(const std::string
& name
)
720 FTRACE(4, "registering remote worker command \"{}\"\n", m_name
);
721 auto& r
= registry();
722 std::lock_guard
<std::mutex
> _
{r
.lock
};
723 auto const insert
= r
.registry
.emplace(m_name
, this);
724 always_assert(insert
.second
);
727 //////////////////////////////////////////////////////////////////////
731 //////////////////////////////////////////////////////////////////////
735 // Parse the command-line and create the appropriate Source and Sink
736 // for input and output.
738 std::unique_ptr
<ISource
>,
739 std::unique_ptr
<ISink
>,
742 parseOptions(int argc
, char** argv
) {
743 always_assert(argc
> 1);
744 always_assert(!strcmp(argv
[1], s_option
));
746 if (argc
>= 2 && !strcmp(argv
[2], g_local_option
)) {
748 std::cerr
<< "Usage: "
751 << " " << g_local_option
756 return std::make_tuple(nullptr, nullptr, "");
759 std::string name
{argv
[3]};
760 fs::path root
{argv
[4]};
761 fs::path outputFile
{argv
[5]};
763 FTRACE(2, "extern worker run (local) (\"{}\", {}, {})\n",
764 name
, root
.native(), outputFile
.native());
766 // Input comes from STDIN
768 time("read-pipe", [] { return readFromPipe(STDIN_FILENO
); });
769 return std::make_tuple(
770 std::make_unique
<SerializedSource
>(
774 std::make_unique
<SerializedSink
>(std::move(outputFile
)),
777 } else if (argc
!= 6) {
778 std::cerr
<< "Usage: "
786 return std::make_tuple(nullptr, nullptr, "");
788 std::string name
{argv
[2]};
789 fs::path configPath
{argv
[3]};
790 fs::path outputPath
{argv
[4]};
791 fs::path inputPath
{argv
[5]};
793 FTRACE(2, "extern worker run(\"{}\", {}, {}, {})\n",
794 name
, configPath
.native(), outputPath
.native(),
797 return std::make_tuple(
798 std::make_unique
<FileSource
>(
799 std::move(configPath
),
802 std::make_unique
<FileSink
>(std::move(outputPath
)),
810 //////////////////////////////////////////////////////////////////////
812 int main(int argc
, char** argv
) {
816 auto const [source
, sink
, name
] = parseOptions(argc
, argv
);
817 if (!source
) return EXIT_FAILURE
;
819 // Lookup the registered job for the requested name.
820 auto const worker
= [&, name
= name
] {
821 auto& r
= registry();
822 std::lock_guard
<std::mutex
> _
{r
.lock
};
823 auto const it
= r
.registry
.find(name
);
824 if (it
== r
.registry
.end()) {
825 throw Error
{folly::sformat("No command named `{}` registered", name
)};
831 SCOPE_EXIT
{ g_in_job
= false; };
833 // First do any global initialization.
834 time("init", [&, &source
= source
] { worker
->init(*source
); });
835 time("run-all", [&, &source
= source
, &sink
= sink
] {
836 // Then execute run() until we're out of inputs.
838 while (!source
->inputEnd()) {
840 [&] { return folly::sformat("run {}", run
); },
841 [&, &source
= source
, &sink
= sink
] { worker
->run(*source
, *sink
); }
847 // Do any cleanup and flush output to its final destination
848 time("fini", [&, &sink
= sink
] { worker
->fini(*sink
); });
849 time("flush", [&sink
= sink
] { sink
->finish(); });
851 } catch (const std::exception
& exn
) {
852 std::cerr
<< "Error: " << exn
.what() << std::endl
;
857 //////////////////////////////////////////////////////////////////////
861 //////////////////////////////////////////////////////////////////////
864 * The builtin implementation which uses fork+exec (and stores data on
865 * disk). It is always available, reasonably efficient, and assumed to
866 * be "reliable" (IE, never needs a fallback).
868 * All data is stored under the "root" (which is the working-dir
869 * specified in Options).
871 * Data is stored in blob files under the root, whose names are
872 * assigned numerically. The blob files are pooled. When a thread
873 * needs to write, it checks out a free file, append the data to it,
874 * then "returns" it back to the pool. The particular blob file data
875 * is stored to is therefore arbitrary. Files are not stored at all
876 * (since they're already on disk).
878 * The RefIds are just the path to the file, either a blob file, or
879 * some other file. If the data was stored in a blob file, the m_extra
880 * of the RefId will contain its offset in the blob file. For stored
881 * files, m_extra will always be zero.
883 * As an optimization, if the data is less or equal to g_inline_size,
884 * then its stored "inline" in the RefId. That is, m_id is the blob
885 * itself. Inline/non-inline blobs can be distinguished by the m_size
886 * field. This also applies to stored files (if sufficiently small,
887 * we'll read it and store it inline).
889 * Workers are forked, given their inputs, calculate outputs, then
890 * write their outputs and exit. Input is given to the worker via
891 * stdin (as a stream of serialized RefIds). As part of the
892 * command-line, the worker is given an output file to write its
893 * output to. It does (creating RefIds in the process), and reports
894 * the output RefIds via a pipe (g_local_pipe_fd).
896 struct SubprocessImpl
: public Client::Impl
{
897 SubprocessImpl(const Options
&, Client
&);
898 ~SubprocessImpl() override
;
900 std::string
session() const override
{ return m_fdManager
->root().native(); }
902 bool isSubprocess() const override
{ return true; }
903 bool supportsOptimistic() const override
{ return false; }
904 bool isDisabled() const override
{ return false; }
906 coro::Task
<BlobVec
> load(const RequestId
&, IdVec
) override
;
907 coro::Task
<IdVec
> store(const RequestId
&, PathVec
, BlobVec
,
909 coro::Task
<std::vector
<RefValVec
>>
910 exec(const RequestId
&,
913 std::vector
<RefValVec
>,
914 const folly::Range
<const OutputType
*>&,
915 const folly::Range
<const OutputType
*>*) override
;
918 // Manage the pool of blob files
920 explicit FDManager(fs::path
);
922 const fs::path
& root() const { return m_root
; }
924 // Acquire a FD to the given file to read from. If the path does
925 // not correspond to a blob file, nullptr is returned.
926 const FD
* acquireForRead(const fs::path
&);
927 // Acquire a FD to a blob file to append to. You don't have a say
928 // in which file you get. The FD must be returned via release()
929 // when done. You have exclusive access to this FD.
930 FD
* acquireForAppend();
931 // Release a FD acquired from acquireForAppend.
934 std::unique_ptr
<FD
> newFD();
937 folly_concurrent_hash_map_simd
<std::string
, std::unique_ptr
<FD
>> m_fds
;
940 std::stack
<FD
*> m_forAppend
;
944 // Similar to FDManager, but manages trace files. We have workers
945 // re-use trace files to avoid generating a huge number of time.
946 struct TraceFileManager
{
951 std::stack
<fs::path
> m_paths
;
955 static fs::path
newRoot(const Options
&);
957 coro::Task
<std::string
> doSubprocess(const RequestId
&,
963 std::unique_ptr
<FDManager
> m_fdManager
;
964 TraceFileManager m_traceManager
;
967 SubprocessImpl::SubprocessImpl(const Options
& options
, Client
& parent
)
968 : Impl
{"subprocess", parent
}
970 , m_fdManager
{std::make_unique
<FDManager
>(newRoot(m_options
))}
972 FTRACE(2, "Using subprocess extern-worker impl with root at {}\n",
973 m_fdManager
->root().native());
975 "Using subprocess extern-worker at {}",
976 m_fdManager
->root().native()
980 SubprocessImpl::~SubprocessImpl() {
981 auto const root
= m_fdManager
->root();
982 // Destroy FD manager first, to ensure no open FDs to any files
983 // while cleaning up.
986 if (m_options
.m_cleanup
) {
988 "Cleaning up subprocess extern-worker at {}...",
991 auto const before
= std::chrono::steady_clock::now();
992 auto const removed
= time(
993 "subprocess cleanup",
995 std::error_code ec
; // Suppress exceptions
996 return fs::remove_all(root
, ec
);
999 auto const elapsed
= std::chrono::duration_cast
<std::chrono::duration
<double>>(
1000 std::chrono::steady_clock::now() - before
1002 FTRACE(2, "removed {} files\n", removed
);
1004 "Done cleaning up subprocess extern-worker. "
1005 "({} files removed) (took {})",
1006 removed
, folly::prettyPrint(elapsed
, folly::PRETTY_TIME_HMS
, false)
1011 // Ensure we always have an unique root under the working directory.
1012 fs::path
SubprocessImpl::newRoot(const Options
& opts
) {
1013 auto const base
= opts
.m_workingDir
/ "hphp-extern-worker";
1014 fs::create_directories(base
);
1015 auto const full
= base
/ boost::filesystem::unique_path(
1016 "%%%%-%%%%-%%%%-%%%%-%%%%-%%%%"
1018 fs::create_directory(full
, base
);
1019 return fs::canonical(full
);
1022 coro::Task
<BlobVec
> SubprocessImpl::load(const RequestId
& requestId
,
1024 // Read every file corresponding to the id and return their
1026 auto out
= from(ids
)
1027 | mapped([&] (const RefId
& id
) {
1028 FTRACE(4, "{} reading blob from {}\n",
1029 requestId
.tracePrefix(), id
.toString());
1030 if (id
.m_size
<= g_inline_size
) {
1031 // Inline data requires no file read
1032 assertx(id
.m_id
.size() == id
.m_size
);
1033 assertx(!id
.m_extra
);
1036 if (auto const fd
= m_fdManager
->acquireForRead(id
.m_id
)) {
1037 // The data is in a blob file, so use a cached FD
1038 return fd
->read(id
.m_extra
, id
.m_size
);
1040 // It's some other (non-blob) file. Create an ephemeral FD to
1042 return FD
{id
.m_id
, true, false, false}.read(id
.m_extra
, id
.m_size
);
1044 | as
<std::vector
>();
1045 HPHP_CORO_MOVE_RETURN(out
);
1048 coro::Task
<IdVec
> SubprocessImpl::store(const RequestId
& requestId
,
1052 // SubprocessImpl never "uploads" files, but it must write to disk
1053 // (which we classify as an upload).
1056 SCOPE_EXIT
{ if (fd
) m_fdManager
->release(*fd
); };
1058 // Update stats. Skip blobs which we'll store inline.
1059 for (auto const& b
: blobs
) {
1060 if (b
.size() <= g_inline_size
) continue;
1061 ++stats().blobsUploaded
;
1062 stats().blobBytesUploaded
+= b
.size();
1063 if (!fd
) fd
= m_fdManager
->acquireForAppend();
1069 | mapped([&] (const fs::path
& p
) {
1070 auto fileSize
= fs::file_size(p
);
1071 if (fileSize
<= g_inline_size
) {
1072 FTRACE(4, "{} storing file {} inline\n",
1073 requestId
.tracePrefix(),
1075 auto const contents
= readFile(p
);
1076 // Size of file could theoretically change between
1077 // stat-ing and reading it.
1078 if (contents
.size() <= g_inline_size
) {
1079 return RefId
{contents
, contents
.size(), 0};
1081 fileSize
= contents
.size();
1083 // We distinguish blob files from non-blob files by making
1084 // sure non-blob files are always absolute paths (blob files
1085 // are always relative).
1086 return RefId
{fs::canonical(p
).native(), fileSize
, 0};
1090 | mapped([&] (const std::string
& b
) {
1091 if (b
.size() <= g_inline_size
) {
1092 FTRACE(4, "{} storing blob inline\n",
1093 requestId
.tracePrefix());
1094 return RefId
{b
, b
.size(), 0};
1096 FTRACE(4, "{} writing size {} blob to {}\n",
1097 requestId
.tracePrefix(), b
.size(), fd
->path().native());
1098 auto const offset
= fd
->append(b
);
1099 RefId r
{fd
->path().filename().native(), b
.size(), offset
};
1100 FTRACE(4, "{} written as {}\n", requestId
.tracePrefix(), r
.toString());
1104 | as
<std::vector
>();
1105 HPHP_CORO_MOVE_RETURN(out
);
1108 coro::Task
<std::vector
<RefValVec
>>
1109 SubprocessImpl::exec(const RequestId
& requestId
,
1110 const std::string
& command
,
1112 std::vector
<RefValVec
> inputs
,
1113 const folly::Range
<const OutputType
*>& output
,
1114 const folly::Range
<const OutputType
*>* finiOutput
) {
1115 FTRACE(4, "{} executing \"{}\" ({} runs)\n",
1116 requestId
.tracePrefix(), command
, inputs
.size());
1118 HPHP_CORO_SAFE_POINT
;
1120 // Each set of inputs should always have the same size.
1121 if (debug
&& !inputs
.empty()) {
1122 auto const size
= inputs
[0].size();
1123 for (size_t i
= 1; i
< inputs
.size(); ++i
) {
1124 always_assert(inputs
[i
].size() == size
);
1128 // Encode all the inputs
1129 BlobEncoder encoder
;
1130 auto const encodeParams
= [&] (const RefValVec
& params
) {
1131 for (auto const& param
: params
) {
1134 [&] (const RefId
& id
) {
1135 SerializedSink::encodeRefId(id
, encoder
);
1137 [&] (const Optional
<RefId
>& id
) {
1138 SerializedSink::encodeOptRefId(id
, encoder
);
1140 [&] (const IdVec
& ids
) {
1141 SerializedSink::encodeRefIdVec(ids
, encoder
);
1146 encodeParams(config
);
1147 encoder((size_t)inputs
.size());
1148 for (auto const& input
: inputs
) encodeParams(input
);
1150 // Acquire a FD. We're not actually going to write to it, but the
1151 // worker will. This ensures the worker has exclusive access to the
1152 // file while it's running.
1153 auto fd
= m_fdManager
->acquireForAppend();
1154 SCOPE_EXIT
{ if (fd
) m_fdManager
->release(*fd
); };
1156 // Do the actual fork+exec.
1157 auto const outputBlob
= HPHP_CORO_AWAIT(
1161 std::string
{(const char*)encoder
.data(), encoder
.size()},
1165 // The worker (maybe) wrote to the file, so we need to re-sync our
1169 // Decode the output
1170 BlobDecoder decoder
{outputBlob
.data(), outputBlob
.size()};
1171 auto const makeOutput
= [&] (OutputType type
) -> RefVal
{
1173 case OutputType::Val
:
1174 return SerializedSource::decodeRefId(decoder
);
1175 case OutputType::Opt
:
1176 return SerializedSource::decodeOptRefId(decoder
);
1177 case OutputType::Vec
:
1178 return SerializedSource::decodeRefIdVec(decoder
);
1180 always_assert(false);
1183 auto const makeOutputs
= [&] (const folly::Range
<const OutputType
*>& types
) {
1185 vec
.reserve(types
.size());
1186 for (auto const type
: types
) {
1187 vec
.emplace_back(makeOutput(type
));
1192 std::vector
<RefValVec
> out
;
1193 out
.reserve(inputs
.size() + (finiOutput
? 1 : 0));
1195 std::back_inserter(out
),
1197 [&] { return makeOutputs(output
); }
1199 if (finiOutput
) out
.emplace_back(makeOutputs(*finiOutput
));
1201 decoder
.assertDone();
1202 HPHP_CORO_MOVE_RETURN(out
);
1205 coro::Task
<std::string
>
1206 SubprocessImpl::doSubprocess(const RequestId
& requestId
,
1207 const std::string
& command
,
1208 std::string inputBlob
,
1209 const fs::path
& outputPath
) {
1210 std::vector
<std::string
> args
{
1211 current_executable_path(),
1215 m_fdManager
->root().native(),
1219 // Propagate the TRACE option in the environment. We'll copy the
1220 // trace output into this process' trace output.
1221 std::vector
<std::string
> env
;
1222 Optional
<fs::path
> traceFile
;
1223 SCOPE_EXIT
{ if (traceFile
) m_traceManager
.put(std::move(*traceFile
)); };
1224 if (auto const trace
= getenv("TRACE")) {
1225 traceFile
= m_traceManager
.get();
1226 auto const fullPath
= m_fdManager
->root() / *traceFile
;
1227 env
.emplace_back(folly::sformat("TRACE={}", trace
));
1228 env
.emplace_back(folly::sformat("HPHP_TRACE_FILE={}", fullPath
.c_str()));
1232 4, "{} executing subprocess for '{}'{}(input size: {})\n",
1233 requestId
.tracePrefix(),
1236 ? folly::sformat(" (trace-file: {}) ", traceFile
->native())
1241 HPHP_CORO_SAFE_POINT
;
1243 auto const before
= std::chrono::steady_clock::now();
1245 // Do the actual fork+exec.
1246 folly::Subprocess subprocess
{
1248 folly::Subprocess::Options
{}
1249 .parentDeathSignal(SIGKILL
)
1253 .fd(g_local_pipe_fd
, folly::Subprocess::PIPE_OUT
)
1261 size_t inputWritten
= 0;
1263 // Communicate with the worker. When available, read from worker's
1264 // stderr and output pipe and store the data. Throw everything else
1265 // away. Attempt to write inputs to the worker's stdin whenever it
1267 subprocess
.communicate(
1268 [&] (int parentFd
, int childFd
) { // Read
1269 if (childFd
== g_local_pipe_fd
) {
1270 return readFromPipe(parentFd
, output
);
1271 } else if (childFd
== STDERR_FILENO
) {
1272 return readFromPipe(parentFd
, stderr
);
1274 // Worker is writing to some other random FD (including
1275 // stdout). Ignore it.
1277 folly::readNoInt(parentFd
, dummy
, sizeof dummy
);
1281 [&] (int parentFd
, int childFd
) { // Write
1282 // Close any writable FD except stdin
1283 if (childFd
!= STDIN_FILENO
) return true;
1284 // We've written all of the input, so close stdin. This will
1285 // signal to the worker that input is all sent.
1286 if (inputWritten
>= inputBlob
.size()) return true;
1288 // Otherwise write what we can
1289 auto const toWrite
= inputBlob
.size() - inputWritten
;
1290 auto const written
=
1291 folly::writeNoInt(parentFd
, inputBlob
.data() + inputWritten
, toWrite
);
1293 if (errno
== EAGAIN
) return false;
1296 "Failed writing {} bytes to subprocess input for '{}' [{}]",
1297 toWrite
, command
, folly::errnoStr(errno
)
1300 } else if (written
== 0) {
1303 inputWritten
+= written
;
1308 auto const returnCode
= subprocess
.wait();
1309 auto const elapsed
= std::chrono::steady_clock::now() - before
;
1311 stats().execCpuUsec
+=
1312 std::chrono::duration_cast
<std::chrono::microseconds
>(elapsed
).count();
1313 ++stats().execAllocatedCores
;
1317 "{} subprocess finished (took {}). "
1318 "Output size: {}, Return code: {}, Stderr: {}\n",
1319 requestId
.tracePrefix(),
1321 std::chrono::duration_cast
<std::chrono::duration
<double>>(
1332 HPHP_CORO_SAFE_POINT
;
1334 // Do this before checking the return code. If the process failed,
1335 // we want to capture anything it logged before throwing.
1336 if (traceFile
&& fs::exists(m_fdManager
->root() / *traceFile
)) {
1337 auto const contents
= readFile(m_fdManager
->root() / *traceFile
);
1338 if (!contents
.empty()) {
1339 Trace::ftraceRelease(
1340 "vvvvvvvvvvvvvvvvvv remote-exec ({} \"{}\") vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv\n"
1342 "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n",
1343 requestId
.toString(),
1350 if (!returnCode
.exited() || returnCode
.exitStatus() != 0) {
1353 "Execution of `{}` failed: {}\nstderr:\n{}",
1360 if (inputWritten
!= inputBlob
.size()) {
1363 "Execution of `{}` failed: Process failed to consume input\n",
1369 HPHP_CORO_MOVE_RETURN(output
);
1372 //////////////////////////////////////////////////////////////////////
1374 SubprocessImpl::FDManager::FDManager(fs::path root
)
1375 : m_root
{std::move(root
)}
1379 const FD
* SubprocessImpl::FDManager::acquireForRead(const fs::path
& path
) {
1380 // Blob files are always relative
1381 if (path
.is_absolute()) return nullptr;
1382 // Only already created blob files should be in m_fds. So, if it's
1383 // not present, it can't be a blob file.
1384 auto const it
= m_fds
.find(path
.native());
1385 if (it
== m_fds
.end()) return nullptr;
1386 return it
->second
.get();
1389 FD
* SubprocessImpl::FDManager::acquireForAppend() {
1390 // Use the blob file at the top of m_forAppend.
1391 std::scoped_lock
<std::mutex
> _
{m_lock
};
1392 if (m_forAppend
.empty()) {
1393 // If empty, there's no free blob file. We need to create a new
1396 auto const path
= fd
->path();
1397 auto const [elem
, emplaced
] =
1398 m_fds
.emplace(path
.filename().native(), std::move(fd
));
1400 m_forAppend
.push(elem
->second
.get());
1402 auto fd
= m_forAppend
.top();
1407 void SubprocessImpl::FDManager::release(FD
& fd
) {
1409 // Should be returning something cached
1410 auto const it
= m_fds
.find(fd
.path().filename().native());
1411 always_assert(it
!= m_fds
.end());
1412 always_assert(it
->second
.get() == &fd
);
1414 std::scoped_lock
<std::mutex
> _
{m_lock
};
1415 m_forAppend
.push(&fd
);
1418 std::unique_ptr
<FD
> SubprocessImpl::FDManager::newFD() {
1419 // We deliberately keep the blob filename as small as possible
1420 // because they get serialized a lot and it keeps the input/output
1422 auto const id
= m_nextBlob
++;
1423 auto const filename
= m_root
/ folly::to
<std::string
>(id
);
1424 return std::make_unique
<FD
>(filename
, true, true, true);
1427 //////////////////////////////////////////////////////////////////////
1429 fs::path
SubprocessImpl::TraceFileManager::get() {
1430 std::scoped_lock
<std::mutex
> _
{m_lock
};
1431 if (m_paths
.empty()) {
1432 auto const id
= m_nextId
++;
1433 m_paths
.push(folly::sformat("trace-{:04}.log", id
));
1435 auto path
= std::move(m_paths
.top());
1440 void SubprocessImpl::TraceFileManager::put(fs::path path
) {
1441 std::scoped_lock
<std::mutex
> _
{m_lock
};
1442 m_paths
.push(std::move(path
));
1445 //////////////////////////////////////////////////////////////////////
1449 //////////////////////////////////////////////////////////////////////
1451 Client::Client(folly::Executor::KeepAlive
<> executor
,
1452 const Options
& options
)
1453 : m_options
{options
}
1454 , m_stats
{std::make_shared
<Stats
>()}
1455 , m_forceFallback
{false}
1458 Timer _
{"create impl"};
1459 // Look up which implementation to use. If a hook has been
1460 // registered, and we're allowed to use it according to the Options,
1463 m_options
.m_useSubprocess
!= Options::UseSubprocess::Always
) {
1464 m_impl
= g_impl_hook(m_options
, executor
, *this);
1466 // The hook can return nullptr even if registered. In each case, we
1467 // have no special implementation to use.
1469 // Use the subprocess implementation (which is always available),
1470 // unless the Options specifies we shouldn't. If not, it's a fatal
1472 if (m_options
.m_useSubprocess
== Options::UseSubprocess::Never
) {
1473 throw Error
{"No non-subprocess impl available"};
1475 m_impl
= std::make_unique
<SubprocessImpl
>(m_options
, *this);
1477 FTRACE(2, "created \"{}\" impl\n", m_impl
->name());
1481 Timer _
{[&] { return folly::sformat("destroy impl {}", m_impl
->name()); }};
1483 m_fallbackImpl
.reset();
1486 std::unique_ptr
<Client::Impl
> Client::makeFallbackImpl() {
1487 // This will be called once from within LockFreeLazy, so we only
1488 // emit this warning once.
1490 "Certain operations will use local fallback from this "
1491 "point on and may run slower."
1493 return std::make_unique
<SubprocessImpl
>(m_options
, *this);
1496 coro::Task
<Ref
<std::string
>> Client::storeFile(fs::path path
,
1498 RequestId requestId
{"store file"};
1501 2, "{} storing {}{}\n",
1502 requestId
.tracePrefix(),
1504 optimistic
? " (optimistically)" : ""
1508 ++m_stats
->storeCalls
;
1510 m_stats
->storeLatencyUsec
+= std::chrono::duration_cast
<
1511 std::chrono::microseconds
1512 >(requestId
.elapsed()).count();
1515 auto wasFallback
= false;
1516 auto ids
= HPHP_CORO_AWAIT(tryWithFallback
<IdVec
>(
1517 [&] (Impl
& i
, bool isFallback
) {
1518 if (isFallback
) ++m_stats
->fileFallbacks
;
1528 assertx(ids
.size() == 1);
1530 Ref
<std::string
> ref
{std::move(ids
[0]), wasFallback
};
1531 HPHP_CORO_MOVE_RETURN(ref
);
1534 coro::Task
<std::vector
<Ref
<std::string
>>>
1535 Client::storeFile(std::vector
<fs::path
> paths
,
1537 RequestId requestId
{"store files"};
1540 2, "{} storing {} files{}\n",
1541 requestId
.tracePrefix(),
1543 optimistic
? " (optimistically)" : ""
1546 for (auto const& p
: paths
) {
1547 FTRACE(4, "{} storing {}\n", requestId
.tracePrefix(), p
.native());
1551 m_stats
->files
+= paths
.size();
1552 ++m_stats
->storeCalls
;
1554 m_stats
->storeLatencyUsec
+= std::chrono::duration_cast
<
1555 std::chrono::microseconds
1556 >(requestId
.elapsed()).count();
1559 auto const DEBUG_ONLY size
= paths
.size();
1560 auto wasFallback
= false;
1561 auto ids
= HPHP_CORO_AWAIT(tryWithFallback
<IdVec
>(
1562 [&] (Impl
& i
, bool isFallback
) {
1563 if (isFallback
) m_stats
->fileFallbacks
+= paths
.size();
1564 return i
.store(requestId
, paths
, {}, optimistic
);
1568 assertx(ids
.size() == size
);
1570 auto out
= from(ids
)
1572 | mapped([&] (auto&& id
) {
1573 return Ref
<std::string
>{std::move(id
), wasFallback
};
1575 | as
<std::vector
>();
1576 HPHP_CORO_MOVE_RETURN(out
);
1579 //////////////////////////////////////////////////////////////////////
1581 // Sleep some random amount corresponding to the number of retries
1583 void Client::Impl::throttleSleep(size_t retry
,
1584 std::chrono::milliseconds base
) {
1585 // Each retry doubles the size of the window. We select a random
1586 // amount from within that.
1587 auto const scale
= uint64_t{1} << std::min(retry
, size_t{16});
1588 auto const window
= std::chrono::microseconds
{base
} * scale
;
1590 static std::mt19937_64 engine
= [&] {
1591 std::random_device rd
;
1592 std::seed_seq seed
{rd(), rd(), rd(), rd(), rd(), rd(), rd(), rd()};
1593 return std::mt19937_64
{seed
};
1596 std::uniform_int_distribution
<> distrib(0, window
.count());
1597 auto const wait
= [&] {
1598 static std::mutex lock
;
1599 std::lock_guard
<std::mutex
> _
{lock
};
1600 return std::chrono::microseconds
{distrib(engine
)};
1603 // Use normal sleep here, not coro friendly sleep. The whole purpose
1604 // is to slow down execution and using a coro sleep will just allow
1605 // a lot of other actions to run.
1606 std::this_thread::sleep_for(wait
);
1609 //////////////////////////////////////////////////////////////////////
1611 std::string
Client::Stats::toString(const std::string
& phase
,
1612 const std::string
& extra
) const {
1613 auto const bytes
= [] (size_t b
) {
1614 auto s
= folly::prettyPrint(
1618 if (!s
.empty() && s
[s
.size()-1] == ' ') s
.resize(s
.size()-1);
1622 auto const usecs
= [] (size_t t
) {
1623 auto s
= prettyPrint(
1624 double(t
) / 1000000.0,
1625 folly::PRETTY_TIME_HMS
,
1628 if (!s
.empty() && s
[s
.size()-1] == ' ') s
.resize(s
.size()-1);
1632 auto const pct
= [] (size_t a
, size_t b
) {
1634 return double(a
) / b
* 100.0;
1637 auto const execs_
= execs
.load();
1638 auto const allocatedCores
= execAllocatedCores
.load();
1639 auto const cpuUsecs
= execCpuUsec
.load();
1640 auto const execCalls_
= execCalls
.load();
1641 auto const storeCalls_
= storeCalls
.load();
1642 auto const loadCalls_
= loadCalls
.load();
1644 return folly::sformat(
1646 " Execs: {:,} total, {:,} cache-hits ({:.2f}%), {:,} optimistically, {:,} fallback\n"
1647 " Files: {:,} total, {:,} read, {:,} queried, {:,} uploaded ({}), {:,} fallback\n"
1648 " Blobs: {:,} total, {:,} queried, {:,} uploaded ({}), {:,} fallback\n"
1649 " Cpu: {} usage, {:,} allocated cores ({}/core)\n"
1650 " Mem: {} max used, {} reserved\n"
1651 " {:,} downloads ({}), {:,} throttles\n"
1652 " Avg Latency: {} exec, {} store, {} load",
1654 extra
.empty() ? "" : folly::sformat(" {}", extra
),
1656 execCacheHits
.load(),
1657 pct(execCacheHits
.load(), execs_
),
1658 optimisticExecs
.load(),
1659 execFallbacks
.load(),
1662 filesQueried
.load(),
1663 filesUploaded
.load(),
1664 bytes(fileBytesUploaded
.load()),
1665 fileFallbacks
.load(),
1667 blobsQueried
.load(),
1668 blobsUploaded
.load(),
1669 bytes(blobBytesUploaded
.load()),
1670 blobFallbacks
.load(),
1673 usecs(allocatedCores
? (cpuUsecs
/ allocatedCores
) : 0),
1674 bytes(execMaxUsedMem
.load()),
1675 bytes(execReservedMem
.load()),
1677 bytes(bytesDownloaded
.load()),
1679 usecs(execCalls_
? (execLatencyUsec
.load() / execCalls_
) : 0),
1680 usecs(storeCalls_
? (storeLatencyUsec
.load() / storeCalls_
) : 0),
1681 usecs(loadCalls_
? (loadLatencyUsec
.load() / loadCalls_
) : 0)
1685 void Client::Stats::logSample(const std::string
& phase
,
1686 StructuredLogEntry
& sample
) const {
1687 sample
.setInt(phase
+ "_total_execs", execs
.load());
1688 sample
.setInt(phase
+ "_cache_hits", execCacheHits
.load());
1689 sample
.setInt(phase
+ "_optimistically", optimisticExecs
.load());
1690 sample
.setInt(phase
+ "_fallbacks", execFallbacks
.load());
1692 sample
.setInt(phase
+ "_total_files", files
.load());
1693 sample
.setInt(phase
+ "_file_reads", filesRead
.load());
1694 sample
.setInt(phase
+ "_file_queries", filesQueried
.load());
1695 sample
.setInt(phase
+ "_file_stores", filesUploaded
.load());
1696 sample
.setInt(phase
+ "_file_stores_bytes", fileBytesUploaded
.load());
1697 sample
.setInt(phase
+ "_file_fallbacks", fileFallbacks
.load());
1699 sample
.setInt(phase
+ "_total_blobs", blobs
.load());
1700 sample
.setInt(phase
+ "_blob_queries", blobsQueried
.load());
1701 sample
.setInt(phase
+ "_blob_stores", blobsUploaded
.load());
1702 sample
.setInt(phase
+ "_blob_stores_bytes", blobBytesUploaded
.load());
1703 sample
.setInt(phase
+ "_blob_fallbacks", blobFallbacks
.load());
1705 sample
.setInt(phase
+ "_total_loads", downloads
.load());
1706 sample
.setInt(phase
+ "_total_loads_bytes", bytesDownloaded
.load());
1707 sample
.setInt(phase
+ "_throttles", throttles
.load());
1709 sample
.setInt(phase
+ "_exec_cpu_usec", execCpuUsec
.load());
1710 sample
.setInt(phase
+ "_exec_allocated_cores", execAllocatedCores
.load());
1711 sample
.setInt(phase
+ "_exec_max_used_mem", execMaxUsedMem
.load());
1712 sample
.setInt(phase
+ "_exec_reserved_mem", execReservedMem
.load());
1714 auto const execCalls_
= execCalls
.load();
1715 auto const storeCalls_
= storeCalls
.load();
1716 auto const loadCalls_
= loadCalls
.load();
1719 phase
+ "_avg_exec_latency_usec",
1720 execCalls_
? (execLatencyUsec
.load() / execCalls_
) : 0
1723 phase
+ "_avg_store_latency_usec",
1724 storeCalls_
? (storeLatencyUsec
.load() / storeCalls_
) : 0
1727 phase
+ "_avg_load_latency_usec",
1728 loadCalls_
? (loadLatencyUsec
.load() / loadCalls_
) : 0
1732 //////////////////////////////////////////////////////////////////////