2 * Copyright (c) Meta Platforms, Inc. and affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 #include <folly/concurrency/CacheLocality.h>
25 template <typename DigestT
>
26 DigestBuilder
<DigestT
>::DigestBuilder(size_t bufferSize
, size_t digestSize
)
27 : bufferSize_(bufferSize
), digestSize_(digestSize
) {
28 auto& cl
= CacheLocality::system();
29 cpuLocalBuffers_
.resize(cl
.numCachesByLevel
[0]);
32 template <typename DigestT
>
33 DigestT DigestBuilder
<DigestT
>::build() {
34 std::vector
<std::vector
<double>> valuesVec
;
35 std::vector
<std::unique_ptr
<DigestT
>> digestPtrs
;
36 valuesVec
.reserve(cpuLocalBuffers_
.size());
37 digestPtrs
.reserve(cpuLocalBuffers_
.size());
39 for (auto& cpuLocalBuffer
: cpuLocalBuffers_
) {
40 std::unique_lock
<SharedMutexSuppressTSAN
> g(cpuLocalBuffer
.mutex
);
41 valuesVec
.push_back(std::move(cpuLocalBuffer
.buffer
));
42 if (cpuLocalBuffer
.digest
) {
43 digestPtrs
.push_back(std::move(cpuLocalBuffer
.digest
));
47 std::vector
<DigestT
> digests
;
48 digests
.reserve(digestPtrs
.size());
49 for (auto& digestPtr
: digestPtrs
) {
50 digests
.push_back(std::move(*digestPtr
));
54 for (const auto& vec
: valuesVec
) {
58 std::vector
<double> values
;
59 values
.reserve(count
);
60 for (const auto& vec
: valuesVec
) {
61 values
.insert(values
.end(), vec
.begin(), vec
.end());
63 DigestT
digest(digestSize_
);
64 digests
.push_back(digest
.merge(values
));
66 return DigestT::merge(digests
);
69 template <typename DigestT
>
70 void DigestBuilder
<DigestT
>::append(double value
) {
71 auto cpuLocalBuf
= &cpuLocalBuffers_
[AccessSpreader
<>::cachedCurrent(
72 cpuLocalBuffers_
.size())];
73 std::unique_lock
<SharedMutexSuppressTSAN
> g(cpuLocalBuf
->mutex
);
74 cpuLocalBuf
->buffer
.push_back(value
);
75 if (cpuLocalBuf
->buffer
.size() == bufferSize_
) {
76 if (!cpuLocalBuf
->digest
) {
77 cpuLocalBuf
->digest
= std::make_unique
<DigestT
>(digestSize_
);
79 *cpuLocalBuf
->digest
= cpuLocalBuf
->digest
->merge(cpuLocalBuf
->buffer
);
80 cpuLocalBuf
->buffer
.clear();