Revert of Revert of Add a WorkerScheduler and a WebThreadImplForWorker (patchset...
[chromium-blink-merge.git] / net / quic / quic_stream_sequencer.cc
blob7f06ec46d9a6600e3fa3475c0c9852471e4b8fcf
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_stream_sequencer.h"
7 #include <algorithm>
8 #include <limits>
10 #include "base/logging.h"
11 #include "base/metrics/sparse_histogram.h"
12 #include "net/quic/reliable_quic_stream.h"
14 using std::min;
15 using std::numeric_limits;
16 using std::string;
18 namespace net {
20 QuicStreamSequencer::QuicStreamSequencer(ReliableQuicStream* quic_stream)
21 : stream_(quic_stream),
22 num_bytes_consumed_(0),
23 close_offset_(numeric_limits<QuicStreamOffset>::max()),
24 blocked_(false),
25 num_bytes_buffered_(0),
26 num_frames_received_(0),
27 num_duplicate_frames_received_(0),
28 num_early_frames_received_(0) {
31 QuicStreamSequencer::~QuicStreamSequencer() {
34 void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
35 ++num_frames_received_;
36 if (IsDuplicate(frame)) {
37 ++num_duplicate_frames_received_;
38 // Silently ignore duplicates.
39 return;
42 if (FrameOverlapsBufferedData(frame)) {
43 stream_->CloseConnectionWithDetails(
44 QUIC_INVALID_STREAM_FRAME, "Stream frame overlaps with buffered data.");
45 return;
48 QuicStreamOffset byte_offset = frame.offset;
49 size_t data_len = frame.data.TotalBufferSize();
50 if (data_len == 0 && !frame.fin) {
51 // Stream frames must have data or a fin flag.
52 stream_->CloseConnectionWithDetails(QUIC_INVALID_STREAM_FRAME,
53 "Empty stream frame without FIN set.");
54 return;
57 if (frame.fin) {
58 CloseStreamAtOffset(frame.offset + data_len);
59 if (data_len == 0) {
60 return;
64 IOVector data;
65 data.AppendIovec(frame.data.iovec(), frame.data.Size());
67 if (byte_offset > num_bytes_consumed_) {
68 ++num_early_frames_received_;
71 // If the frame has arrived in-order then we can process it immediately, only
72 // buffering if the stream is unable to process it.
73 if (!blocked_ && byte_offset == num_bytes_consumed_) {
74 DVLOG(1) << "Processing byte offset " << byte_offset;
75 size_t bytes_consumed = 0;
76 for (size_t i = 0; i < data.Size(); ++i) {
77 bytes_consumed += stream_->ProcessRawData(
78 static_cast<char*>(data.iovec()[i].iov_base),
79 data.iovec()[i].iov_len);
81 num_bytes_consumed_ += bytes_consumed;
82 stream_->AddBytesConsumed(bytes_consumed);
84 if (MaybeCloseStream()) {
85 return;
87 if (bytes_consumed > data_len) {
88 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
89 return;
90 } else if (bytes_consumed == data_len) {
91 FlushBufferedFrames();
92 return; // it's safe to ack this frame.
93 } else {
94 // Set ourselves up to buffer what's left.
95 data_len -= bytes_consumed;
96 data.Consume(bytes_consumed);
97 byte_offset += bytes_consumed;
101 // Buffer any remaining data to be consumed by the stream when ready.
102 for (size_t i = 0; i < data.Size(); ++i) {
103 DVLOG(1) << "Buffering stream data at offset " << byte_offset;
104 const iovec& iov = data.iovec()[i];
105 buffered_frames_.insert(std::make_pair(
106 byte_offset, string(static_cast<char*>(iov.iov_base), iov.iov_len)));
107 byte_offset += iov.iov_len;
108 num_bytes_buffered_ += iov.iov_len;
110 return;
113 void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
114 const QuicStreamOffset kMaxOffset = numeric_limits<QuicStreamOffset>::max();
116 // If we have a scheduled termination or close, any new offset should match
117 // it.
118 if (close_offset_ != kMaxOffset && offset != close_offset_) {
119 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
120 return;
123 close_offset_ = offset;
125 MaybeCloseStream();
128 bool QuicStreamSequencer::MaybeCloseStream() {
129 if (!blocked_ && IsClosed()) {
130 DVLOG(1) << "Passing up termination, as we've processed "
131 << num_bytes_consumed_ << " of " << close_offset_
132 << " bytes.";
133 // Technically it's an error if num_bytes_consumed isn't exactly
134 // equal, but error handling seems silly at this point.
135 stream_->OnFinRead();
136 buffered_frames_.clear();
137 num_bytes_buffered_ = 0;
138 return true;
140 return false;
143 int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) {
144 DCHECK(!blocked_);
145 FrameMap::iterator it = buffered_frames_.begin();
146 size_t index = 0;
147 QuicStreamOffset offset = num_bytes_consumed_;
148 while (it != buffered_frames_.end() && index < iov_len) {
149 if (it->first != offset) return index;
151 iov[index].iov_base = static_cast<void*>(
152 const_cast<char*>(it->second.data()));
153 iov[index].iov_len = it->second.size();
154 offset += it->second.size();
156 ++index;
157 ++it;
159 return index;
162 int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
163 DCHECK(!blocked_);
164 FrameMap::iterator it = buffered_frames_.begin();
165 size_t iov_index = 0;
166 size_t iov_offset = 0;
167 size_t frame_offset = 0;
168 QuicStreamOffset initial_bytes_consumed = num_bytes_consumed_;
170 while (iov_index < iov_len &&
171 it != buffered_frames_.end() &&
172 it->first == num_bytes_consumed_) {
173 int bytes_to_read = min(iov[iov_index].iov_len - iov_offset,
174 it->second.size() - frame_offset);
176 char* iov_ptr = static_cast<char*>(iov[iov_index].iov_base) + iov_offset;
177 memcpy(iov_ptr,
178 it->second.data() + frame_offset, bytes_to_read);
179 frame_offset += bytes_to_read;
180 iov_offset += bytes_to_read;
182 if (iov[iov_index].iov_len == iov_offset) {
183 // We've filled this buffer.
184 iov_offset = 0;
185 ++iov_index;
187 if (it->second.size() == frame_offset) {
188 // We've copied this whole frame
189 RecordBytesConsumed(it->second.size());
190 buffered_frames_.erase(it);
191 it = buffered_frames_.begin();
192 frame_offset = 0;
195 // We've finished copying. If we have a partial frame, update it.
196 if (frame_offset != 0) {
197 buffered_frames_.insert(std::make_pair(it->first + frame_offset,
198 it->second.substr(frame_offset)));
199 buffered_frames_.erase(buffered_frames_.begin());
200 RecordBytesConsumed(frame_offset);
202 return static_cast<int>(num_bytes_consumed_ - initial_bytes_consumed);
205 bool QuicStreamSequencer::HasBytesToRead() const {
206 FrameMap::const_iterator it = buffered_frames_.begin();
208 return it != buffered_frames_.end() && it->first == num_bytes_consumed_;
211 bool QuicStreamSequencer::IsClosed() const {
212 return num_bytes_consumed_ >= close_offset_;
215 bool QuicStreamSequencer::FrameOverlapsBufferedData(
216 const QuicStreamFrame& frame) const {
217 if (buffered_frames_.empty()) {
218 return false;
221 FrameMap::const_iterator next_frame =
222 buffered_frames_.lower_bound(frame.offset);
223 // Duplicate frames should have been dropped in IsDuplicate.
224 DCHECK(next_frame == buffered_frames_.end() ||
225 next_frame->first != frame.offset);
227 // If there is a buffered frame with a higher starting offset, then we check
228 // to see if the new frame runs into the higher frame.
229 if (next_frame != buffered_frames_.end() &&
230 (frame.offset + frame.data.TotalBufferSize()) > next_frame->first) {
231 DVLOG(1) << "New frame overlaps next frame: " << frame.offset << " + "
232 << frame.data.TotalBufferSize() << " > " << next_frame->first;
233 return true;
236 // If there is a buffered frame with a lower starting offset, then we check
237 // to see if the buffered frame runs into the new frame.
238 if (next_frame != buffered_frames_.begin()) {
239 FrameMap::const_iterator preceeding_frame = --next_frame;
240 QuicStreamOffset offset = preceeding_frame->first;
241 uint64 data_length = preceeding_frame->second.length();
242 if ((offset + data_length) > frame.offset) {
243 DVLOG(1) << "Preceeding frame overlaps new frame: " << offset << " + "
244 << data_length << " > " << frame.offset;
245 return true;
248 return false;
251 bool QuicStreamSequencer::IsDuplicate(const QuicStreamFrame& frame) const {
252 // A frame is duplicate if the frame offset is smaller than our bytes consumed
253 // or we have stored the frame in our map.
254 // TODO(pwestin): Is it possible that a new frame contain more data even if
255 // the offset is the same?
256 return frame.offset < num_bytes_consumed_ ||
257 buffered_frames_.find(frame.offset) != buffered_frames_.end();
260 void QuicStreamSequencer::SetBlockedUntilFlush() {
261 blocked_ = true;
264 void QuicStreamSequencer::FlushBufferedFrames() {
265 blocked_ = false;
266 FrameMap::iterator it = buffered_frames_.find(num_bytes_consumed_);
267 while (it != buffered_frames_.end()) {
268 DVLOG(1) << "Flushing buffered packet at offset " << it->first;
269 string* data = &it->second;
270 size_t bytes_consumed = stream_->ProcessRawData(data->c_str(),
271 data->size());
272 RecordBytesConsumed(bytes_consumed);
273 if (MaybeCloseStream()) {
274 return;
276 if (bytes_consumed > data->size()) {
277 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM); // Programming error
278 return;
279 } else if (bytes_consumed == data->size()) {
280 buffered_frames_.erase(it);
281 it = buffered_frames_.find(num_bytes_consumed_);
282 } else {
283 string new_data = it->second.substr(bytes_consumed);
284 buffered_frames_.erase(it);
285 buffered_frames_.insert(std::make_pair(num_bytes_consumed_, new_data));
286 return;
289 MaybeCloseStream();
292 void QuicStreamSequencer::RecordBytesConsumed(size_t bytes_consumed) {
293 num_bytes_consumed_ += bytes_consumed;
294 num_bytes_buffered_ -= bytes_consumed;
296 stream_->AddBytesConsumed(bytes_consumed);
299 } // namespace net