Remove DnsConfigServiceTest.GetSystemConfig test
[chromium-blink-merge.git] / content / child / shared_memory_data_consumer_handle.cc
blobf5d6e69f5654f0ed6bc13c0519ae3f9c830cfe07
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/child/shared_memory_data_consumer_handle.h"
7 #include <algorithm>
8 #include <deque>
9 #include <vector>
11 #include "base/bind.h"
12 #include "base/message_loop/message_loop.h"
13 #include "base/single_thread_task_runner.h"
14 #include "base/synchronization/lock.h"
15 #include "content/public/child/fixed_received_data.h"
17 namespace content {
19 namespace {
21 class DelegateThreadSafeReceivedData final
22 : public RequestPeer::ThreadSafeReceivedData {
23 public:
24 explicit DelegateThreadSafeReceivedData(
25 scoped_ptr<RequestPeer::ReceivedData> data)
26 : data_(data.Pass()),
27 task_runner_(base::MessageLoop::current()->task_runner()) {}
28 ~DelegateThreadSafeReceivedData() override {
29 if (!task_runner_->BelongsToCurrentThread()) {
30 // Delete the data on the original thread.
31 task_runner_->DeleteSoon(FROM_HERE, data_.release());
35 const char* payload() const override { return data_->payload(); }
36 int length() const override { return data_->length(); }
37 int encoded_length() const override { return data_->encoded_length(); }
39 private:
40 scoped_ptr<RequestPeer::ReceivedData> data_;
41 scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
43 DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
46 } // namespace
48 using Result = blink::WebDataConsumerHandle::Result;
50 class SharedMemoryDataConsumerHandle::Context final
51 : public base::RefCountedThreadSafe<Context> {
52 public:
53 Context()
54 : result_(Ok),
55 first_offset_(0),
56 client_(nullptr),
57 is_reader_active_(true) {}
59 bool IsEmpty() const { return queue_.empty(); }
60 void Clear() {
61 for (auto& data : queue_) {
62 delete data;
64 queue_.clear();
65 first_offset_ = 0;
66 client_ = nullptr;
68 void Notify() {
69 // Note that this function is not protected by |lock_| (actually it
70 // shouldn't be) but |notification_task_runner_| is thread-safe.
72 if (notification_task_runner_->BelongsToCurrentThread()) {
73 NotifyImmediately();
74 } else {
75 notification_task_runner_->PostTask(
76 FROM_HERE, base::Bind(&Context::NotifyImmediately, this));
80 RequestPeer::ThreadSafeReceivedData* Top() { return queue_.front(); }
81 void Push(scoped_ptr<RequestPeer::ThreadSafeReceivedData> data) {
82 queue_.push_back(data.release());
84 size_t first_offset() const { return first_offset_; }
85 Result result() const { return result_; }
86 void set_result(Result r) { result_ = r; }
87 Client* client() { return client_; }
88 void SetClient(Client* client) {
89 if (client) {
90 notification_task_runner_ = base::MessageLoop::current()->task_runner();
91 client_ = client;
92 } else {
93 notification_task_runner_ = nullptr;
94 client_ = nullptr;
97 bool is_reader_active() const { return is_reader_active_; }
98 void set_is_reader_active(bool b) { is_reader_active_ = b; }
99 void Consume(size_t s) {
100 first_offset_ += s;
101 auto top = Top();
102 if (static_cast<size_t>(top->length()) <= first_offset_) {
103 delete top;
104 queue_.pop_front();
105 first_offset_ = 0;
108 base::Lock& lock() { return lock_; }
110 private:
111 friend class base::RefCountedThreadSafe<Context>;
112 ~Context() {
113 // This is necessary because the queue stores raw pointers.
114 Clear();
117 void NotifyImmediately() {
118 // As we can assume that all reader-side methods are called on this
119 // thread (see WebDataConsumerHandle comments), we don't need to lock.
120 if (client_)
121 client_->didGetReadable();
124 base::Lock lock_;
125 // |result_| stores the ultimate state of this handle if it has. Otherwise,
126 // |Ok| is set.
127 Result result_;
128 // TODO(yhirano): Use std::deque<scoped_ptr<ThreadSafeReceivedData>>
129 // once it is allowed.
130 std::deque<RequestPeer::ThreadSafeReceivedData*> queue_;
131 size_t first_offset_;
132 Client* client_;
133 scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
134 bool is_reader_active_;
136 DISALLOW_COPY_AND_ASSIGN(Context);
139 SharedMemoryDataConsumerHandle::Writer::Writer(
140 const scoped_refptr<Context>& context,
141 BackpressureMode mode)
142 : context_(context), mode_(mode) {
145 SharedMemoryDataConsumerHandle::Writer::~Writer() {
146 Close();
149 void SharedMemoryDataConsumerHandle::Writer::AddData(
150 scoped_ptr<RequestPeer::ReceivedData> data) {
151 if (!data->length()) {
152 // We omit empty data.
153 return;
156 bool needs_notification = false;
158 base::AutoLock lock(context_->lock());
159 if (!context_->is_reader_active()) {
160 // No one is interested in the data.
161 return;
164 needs_notification = context_->client() && context_->IsEmpty();
165 scoped_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
166 if (mode_ == kApplyBackpressure) {
167 data_to_pass =
168 make_scoped_ptr(new DelegateThreadSafeReceivedData(data.Pass()));
169 } else {
170 data_to_pass = make_scoped_ptr(new FixedReceivedData(data.get()));
172 context_->Push(data_to_pass.Pass());
175 if (needs_notification)
176 context_->Notify();
179 void SharedMemoryDataConsumerHandle::Writer::Close() {
180 bool needs_notification = false;
183 base::AutoLock lock(context_->lock());
184 if (context_->result() == Ok) {
185 context_->set_result(Done);
186 needs_notification = context_->client() && context_->IsEmpty();
189 if (needs_notification)
190 context_->Notify();
193 SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
194 BackpressureMode mode,
195 scoped_ptr<Writer>* writer)
196 : context_(new Context) {
197 writer->reset(new Writer(context_, mode));
200 SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle() {
201 base::AutoLock lock(context_->lock());
202 context_->set_is_reader_active(false);
203 context_->Clear();
206 Result SharedMemoryDataConsumerHandle::read(void* data,
207 size_t size,
208 Flags flags,
209 size_t* read_size_to_return) {
210 base::AutoLock lock(context_->lock());
212 size_t total_read_size = 0;
213 *read_size_to_return = 0;
214 if (context_->result() != Ok && context_->result() != Done)
215 return context_->result();
217 while (!context_->IsEmpty() && total_read_size < size) {
218 const auto& top = context_->Top();
219 size_t readable = top->length() - context_->first_offset();
220 size_t writable = size - total_read_size;
221 size_t read_size = std::min(readable, writable);
222 const char* begin = top->payload() + context_->first_offset();
223 std::copy(begin, begin + read_size,
224 static_cast<char*>(data) + total_read_size);
225 total_read_size += read_size;
226 context_->Consume(read_size);
228 *read_size_to_return = total_read_size;
229 return total_read_size ? Ok : context_->result() == Done ? Done : ShouldWait;
232 Result SharedMemoryDataConsumerHandle::beginRead(const void** buffer,
233 Flags flags,
234 size_t* available) {
235 *buffer = nullptr;
236 *available = 0;
238 base::AutoLock lock(context_->lock());
240 if (context_->result() != Ok && context_->result() != Done)
241 return context_->result();
243 if (context_->IsEmpty())
244 return context_->result() == Done ? Done : ShouldWait;
246 const auto& top = context_->Top();
247 *buffer = top->payload() + context_->first_offset();
248 *available = top->length() - context_->first_offset();
250 return Ok;
253 Result SharedMemoryDataConsumerHandle::endRead(size_t read_size) {
254 base::AutoLock lock(context_->lock());
256 if (context_->IsEmpty())
257 return UnexpectedError;
259 context_->Consume(read_size);
260 return Ok;
263 void SharedMemoryDataConsumerHandle::registerClient(Client* client) {
264 bool needs_notification = false;
266 base::AutoLock lock(context_->lock());
268 context_->SetClient(client);
269 needs_notification = !context_->IsEmpty();
271 if (needs_notification)
272 context_->Notify();
275 void SharedMemoryDataConsumerHandle::unregisterClient() {
276 base::AutoLock lock(context_->lock());
278 context_->SetClient(nullptr);
281 } // namespace content