1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "content/browser/byte_stream.h"
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/memory/ref_counted.h"
14 #include "base/sequenced_task_runner.h"
19 typedef std::deque
<std::pair
<scoped_refptr
<net::IOBuffer
>, size_t> >
22 class ByteStreamReaderImpl
;
24 // A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
25 // cleared in an object destructor and accessed to check for object
26 // existence. We can't use weak pointers because they're tightly tied to
27 // threads rather than task runners.
28 // TODO(rdsmith): A better solution would be extending weak pointers
29 // to support SequencedTaskRunners.
30 struct LifetimeFlag
: public base::RefCountedThreadSafe
<LifetimeFlag
> {
32 LifetimeFlag() : is_alive(true) { }
36 friend class base::RefCountedThreadSafe
<LifetimeFlag
>;
37 virtual ~LifetimeFlag() { }
40 DISALLOW_COPY_AND_ASSIGN(LifetimeFlag
);
43 // For both ByteStreamWriterImpl and ByteStreamReaderImpl, Construction and
44 // SetPeer may happen anywhere; all other operations on each class must
45 // happen in the context of their SequencedTaskRunner.
46 class ByteStreamWriterImpl
: public ByteStreamWriter
{
48 ByteStreamWriterImpl(scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
49 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
51 ~ByteStreamWriterImpl() override
;
53 // Must be called before any operations are performed.
54 void SetPeer(ByteStreamReaderImpl
* peer
,
55 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
56 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
);
58 // Overridden from ByteStreamWriter.
59 bool Write(scoped_refptr
<net::IOBuffer
> buffer
, size_t byte_count
) override
;
60 void Flush() override
;
61 void Close(int status
) override
;
62 void RegisterCallback(const base::Closure
& source_callback
) override
;
63 size_t GetTotalBufferedBytes() const override
;
65 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|.
66 static void UpdateWindow(scoped_refptr
<LifetimeFlag
> lifetime_flag
,
67 ByteStreamWriterImpl
* target
,
68 size_t bytes_consumed
);
71 // Called from UpdateWindow when object existence has been validated.
72 void UpdateWindowInternal(size_t bytes_consumed
);
74 void PostToPeer(bool complete
, int status
);
76 const size_t total_buffer_size_
;
78 // All data objects in this class are only valid to access on
79 // this task runner except as otherwise noted.
80 scoped_refptr
<base::SequencedTaskRunner
> my_task_runner_
;
82 // True while this object is alive.
83 scoped_refptr
<LifetimeFlag
> my_lifetime_flag_
;
85 base::Closure space_available_callback_
;
86 ContentVector input_contents_
;
87 size_t input_contents_size_
;
89 // ** Peer information.
91 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner_
;
93 // How much we've sent to the output that for flow control purposes we
94 // must assume hasn't been read yet.
95 size_t output_size_used_
;
97 // Only valid to access on peer_task_runner_.
98 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag_
;
100 // Only valid to access on peer_task_runner_ if
101 // |*peer_lifetime_flag_ == true|
102 ByteStreamReaderImpl
* peer_
;
105 class ByteStreamReaderImpl
: public ByteStreamReader
{
107 ByteStreamReaderImpl(scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
108 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
110 ~ByteStreamReaderImpl() override
;
112 // Must be called before any operations are performed.
113 void SetPeer(ByteStreamWriterImpl
* peer
,
114 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
115 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
);
117 // Overridden from ByteStreamReader.
118 StreamState
Read(scoped_refptr
<net::IOBuffer
>* data
, size_t* length
) override
;
119 int GetStatus() const override
;
120 void RegisterCallback(const base::Closure
& sink_callback
) override
;
122 // PostTask target from |ByteStreamWriterImpl::Write| and
123 // |ByteStreamWriterImpl::Close|.
124 // Receive data from our peer.
125 // static because it may be called after the object it is targeting
126 // has been destroyed. It may not access |*target|
127 // if |*object_lifetime_flag| is false.
128 static void TransferData(
129 scoped_refptr
<LifetimeFlag
> object_lifetime_flag
,
130 ByteStreamReaderImpl
* target
,
131 scoped_ptr
<ContentVector
> transfer_buffer
,
132 size_t transfer_buffer_bytes
,
133 bool source_complete
,
137 // Called from TransferData once object existence has been validated.
138 void TransferDataInternal(
139 scoped_ptr
<ContentVector
> transfer_buffer
,
140 size_t transfer_buffer_bytes
,
141 bool source_complete
,
144 void MaybeUpdateInput();
146 const size_t total_buffer_size_
;
148 scoped_refptr
<base::SequencedTaskRunner
> my_task_runner_
;
150 // True while this object is alive.
151 scoped_refptr
<LifetimeFlag
> my_lifetime_flag_
;
153 ContentVector available_contents_
;
155 bool received_status_
;
158 base::Closure data_available_callback_
;
160 // Time of last point at which data in stream transitioned from full
161 // to non-full. Nulled when a callback is sent.
162 base::Time last_non_full_time_
;
164 // ** Peer information
166 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner_
;
168 // How much has been removed from this class that we haven't told
169 // the input about yet.
170 size_t unreported_consumed_bytes_
;
172 // Only valid to access on peer_task_runner_.
173 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag_
;
175 // Only valid to access on peer_task_runner_ if
176 // |*peer_lifetime_flag_ == true|
177 ByteStreamWriterImpl
* peer_
;
180 ByteStreamWriterImpl::ByteStreamWriterImpl(
181 scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
182 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
184 : total_buffer_size_(buffer_size
),
185 my_task_runner_(task_runner
),
186 my_lifetime_flag_(lifetime_flag
),
187 input_contents_size_(0),
188 output_size_used_(0),
190 DCHECK(my_lifetime_flag_
.get());
191 my_lifetime_flag_
->is_alive
= true;
194 ByteStreamWriterImpl::~ByteStreamWriterImpl() {
195 // No RunsTasksOnCurrentThread() check to allow deleting a created writer
196 // before we start using it. Once started, should be deleted on the specified
198 my_lifetime_flag_
->is_alive
= false;
201 void ByteStreamWriterImpl::SetPeer(
202 ByteStreamReaderImpl
* peer
,
203 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
204 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
) {
206 peer_task_runner_
= peer_task_runner
;
207 peer_lifetime_flag_
= peer_lifetime_flag
;
210 bool ByteStreamWriterImpl::Write(
211 scoped_refptr
<net::IOBuffer
> buffer
, size_t byte_count
) {
212 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
216 // TODO(tyoshino): Discuss with content/browser/download developer and if
217 // they're fine with, set smaller limit and make it configurable.
218 size_t space_limit
= std::numeric_limits
<size_t>::max() -
219 GetTotalBufferedBytes();
220 if (byte_count
> space_limit
) {
221 // TODO(tyoshino): Tell the user that Write() failed.
226 input_contents_
.push_back(std::make_pair(buffer
, byte_count
));
227 input_contents_size_
+= byte_count
;
229 // Arbitrarily, we buffer to a third of the total size before sending.
230 if (input_contents_size_
> total_buffer_size_
/ kFractionBufferBeforeSending
)
231 PostToPeer(false, 0);
233 return GetTotalBufferedBytes() <= total_buffer_size_
;
236 void ByteStreamWriterImpl::Flush() {
237 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
238 if (input_contents_size_
> 0)
239 PostToPeer(false, 0);
242 void ByteStreamWriterImpl::Close(int status
) {
243 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
244 PostToPeer(true, status
);
247 void ByteStreamWriterImpl::RegisterCallback(
248 const base::Closure
& source_callback
) {
249 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
250 space_available_callback_
= source_callback
;
253 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const {
254 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
255 // This sum doesn't overflow since Write() fails if this sum is going to
257 return input_contents_size_
+ output_size_used_
;
261 void ByteStreamWriterImpl::UpdateWindow(
262 scoped_refptr
<LifetimeFlag
> lifetime_flag
, ByteStreamWriterImpl
* target
,
263 size_t bytes_consumed
) {
264 // If the target object isn't alive anymore, we do nothing.
265 if (!lifetime_flag
->is_alive
) return;
267 target
->UpdateWindowInternal(bytes_consumed
);
270 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed
) {
271 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
273 bool was_above_limit
= GetTotalBufferedBytes() > total_buffer_size_
;
275 DCHECK_GE(output_size_used_
, bytes_consumed
);
276 output_size_used_
-= bytes_consumed
;
278 // Callback if we were above the limit and we're now <= to it.
279 bool no_longer_above_limit
= GetTotalBufferedBytes() <= total_buffer_size_
;
281 if (no_longer_above_limit
&& was_above_limit
&&
282 !space_available_callback_
.is_null())
283 space_available_callback_
.Run();
286 void ByteStreamWriterImpl::PostToPeer(bool complete
, int status
) {
287 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
288 // Valid contexts in which to call.
289 DCHECK(complete
|| 0 != input_contents_size_
);
291 scoped_ptr
<ContentVector
> transfer_buffer
;
292 size_t buffer_size
= 0;
293 if (0 != input_contents_size_
) {
294 transfer_buffer
.reset(new ContentVector
);
295 transfer_buffer
->swap(input_contents_
);
296 buffer_size
= input_contents_size_
;
297 output_size_used_
+= input_contents_size_
;
298 input_contents_size_
= 0;
300 peer_task_runner_
->PostTask(
301 FROM_HERE
, base::Bind(
302 &ByteStreamReaderImpl::TransferData
,
305 base::Passed(&transfer_buffer
),
311 ByteStreamReaderImpl::ByteStreamReaderImpl(
312 scoped_refptr
<base::SequencedTaskRunner
> task_runner
,
313 scoped_refptr
<LifetimeFlag
> lifetime_flag
,
315 : total_buffer_size_(buffer_size
),
316 my_task_runner_(task_runner
),
317 my_lifetime_flag_(lifetime_flag
),
318 received_status_(false),
320 unreported_consumed_bytes_(0),
322 DCHECK(my_lifetime_flag_
.get());
323 my_lifetime_flag_
->is_alive
= true;
326 ByteStreamReaderImpl::~ByteStreamReaderImpl() {
327 // No RunsTasksOnCurrentThread() check to allow deleting a created writer
328 // before we start using it. Once started, should be deleted on the specified
330 my_lifetime_flag_
->is_alive
= false;
333 void ByteStreamReaderImpl::SetPeer(
334 ByteStreamWriterImpl
* peer
,
335 scoped_refptr
<base::SequencedTaskRunner
> peer_task_runner
,
336 scoped_refptr
<LifetimeFlag
> peer_lifetime_flag
) {
338 peer_task_runner_
= peer_task_runner
;
339 peer_lifetime_flag_
= peer_lifetime_flag
;
342 ByteStreamReaderImpl::StreamState
343 ByteStreamReaderImpl::Read(scoped_refptr
<net::IOBuffer
>* data
,
345 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
347 if (available_contents_
.size()) {
348 *data
= available_contents_
.front().first
;
349 *length
= available_contents_
.front().second
;
350 available_contents_
.pop_front();
351 unreported_consumed_bytes_
+= *length
;
354 return STREAM_HAS_DATA
;
356 if (received_status_
) {
357 return STREAM_COMPLETE
;
362 int ByteStreamReaderImpl::GetStatus() const {
363 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
364 DCHECK(received_status_
);
368 void ByteStreamReaderImpl::RegisterCallback(
369 const base::Closure
& sink_callback
) {
370 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
372 data_available_callback_
= sink_callback
;
376 void ByteStreamReaderImpl::TransferData(
377 scoped_refptr
<LifetimeFlag
> object_lifetime_flag
,
378 ByteStreamReaderImpl
* target
,
379 scoped_ptr
<ContentVector
> transfer_buffer
,
381 bool source_complete
,
383 // If our target is no longer alive, do nothing.
384 if (!object_lifetime_flag
->is_alive
) return;
386 target
->TransferDataInternal(
387 transfer_buffer
.Pass(), buffer_size
, source_complete
, status
);
390 void ByteStreamReaderImpl::TransferDataInternal(
391 scoped_ptr
<ContentVector
> transfer_buffer
,
393 bool source_complete
,
395 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
397 bool was_empty
= available_contents_
.empty();
399 if (transfer_buffer
) {
400 available_contents_
.insert(available_contents_
.end(),
401 transfer_buffer
->begin(),
402 transfer_buffer
->end());
405 if (source_complete
) {
406 received_status_
= true;
410 // Callback on transition from empty to non-empty, or
412 if (((was_empty
&& !available_contents_
.empty()) ||
414 !data_available_callback_
.is_null())
415 data_available_callback_
.Run();
418 // Decide whether or not to send the input a window update.
419 // Currently we do that whenever we've got unreported consumption
420 // greater than 1/3 of total size.
421 void ByteStreamReaderImpl::MaybeUpdateInput() {
422 DCHECK(my_task_runner_
->RunsTasksOnCurrentThread());
424 if (unreported_consumed_bytes_
<=
425 total_buffer_size_
/ kFractionReadBeforeWindowUpdate
)
428 peer_task_runner_
->PostTask(
429 FROM_HERE
, base::Bind(
430 &ByteStreamWriterImpl::UpdateWindow
,
433 unreported_consumed_bytes_
));
434 unreported_consumed_bytes_
= 0;
439 const int ByteStreamWriter::kFractionBufferBeforeSending
= 3;
440 const int ByteStreamReader::kFractionReadBeforeWindowUpdate
= 3;
442 ByteStreamReader::~ByteStreamReader() { }
444 ByteStreamWriter::~ByteStreamWriter() { }
446 void CreateByteStream(
447 scoped_refptr
<base::SequencedTaskRunner
> input_task_runner
,
448 scoped_refptr
<base::SequencedTaskRunner
> output_task_runner
,
450 scoped_ptr
<ByteStreamWriter
>* input
,
451 scoped_ptr
<ByteStreamReader
>* output
) {
452 scoped_refptr
<LifetimeFlag
> input_flag(new LifetimeFlag());
453 scoped_refptr
<LifetimeFlag
> output_flag(new LifetimeFlag());
455 ByteStreamWriterImpl
* in
= new ByteStreamWriterImpl(
456 input_task_runner
, input_flag
, buffer_size
);
457 ByteStreamReaderImpl
* out
= new ByteStreamReaderImpl(
458 output_task_runner
, output_flag
, buffer_size
);
460 in
->SetPeer(out
, output_task_runner
, output_flag
);
461 out
->SetPeer(in
, input_task_runner
, input_flag
);
466 } // namespace content