Fix threading issue in CookieMonster task ordering.
[chromium-blink-merge.git] / net / quic / quic_session.cc
blob77494b511812c03e9df704d3317927a2e6d2f2ee
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_session.h"
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/ssl/ssl_info.h"
12 using base::StringPiece;
13 using base::hash_map;
14 using base::hash_set;
15 using std::make_pair;
16 using std::vector;
18 namespace net {
20 const size_t kMaxPrematurelyClosedStreamsTracked = 20;
21 const size_t kMaxZombieStreams = 20;
23 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
25 // We want to make sure we delete any closed streams in a safe manner.
26 // To avoid deleting a stream in mid-operation, we have a simple shim between
27 // us and the stream, so we can delete any streams when we return from
28 // processing.
30 // We could just override the base methods, but this makes it easier to make
31 // sure we don't miss any.
32 class VisitorShim : public QuicConnectionVisitorInterface {
33 public:
34 explicit VisitorShim(QuicSession* session) : session_(session) {}
36 virtual bool OnStreamFrames(const vector<QuicStreamFrame>& frames) OVERRIDE {
37 bool accepted = session_->OnStreamFrames(frames);
38 session_->PostProcessAfterData();
39 return accepted;
41 virtual void OnRstStream(const QuicRstStreamFrame& frame) OVERRIDE {
42 session_->OnRstStream(frame);
43 session_->PostProcessAfterData();
46 virtual void OnGoAway(const QuicGoAwayFrame& frame) OVERRIDE {
47 session_->OnGoAway(frame);
48 session_->PostProcessAfterData();
51 virtual bool OnCanWrite() OVERRIDE {
52 bool rc = session_->OnCanWrite();
53 session_->PostProcessAfterData();
54 return rc;
57 virtual void OnSuccessfulVersionNegotiation(
58 const QuicVersion& version) OVERRIDE {
59 session_->OnSuccessfulVersionNegotiation(version);
62 virtual void OnConnectionClosed(QuicErrorCode error,
63 bool from_peer) OVERRIDE {
64 session_->OnConnectionClosed(error, from_peer);
65 // The session will go away, so don't bother with cleanup.
68 virtual bool HasPendingHandshake() const OVERRIDE {
69 return session_->HasPendingHandshake();
72 private:
73 QuicSession* session_;
76 QuicSession::QuicSession(QuicConnection* connection,
77 const QuicConfig& config,
78 bool is_server)
79 : connection_(connection),
80 visitor_shim_(new VisitorShim(this)),
81 config_(config),
82 max_open_streams_(config_.max_streams_per_connection()),
83 next_stream_id_(is_server ? 2 : 3),
84 is_server_(is_server),
85 largest_peer_created_stream_id_(0),
86 error_(QUIC_NO_ERROR),
87 goaway_received_(false),
88 goaway_sent_(false),
89 has_pending_handshake_(false) {
91 connection_->set_visitor(visitor_shim_.get());
92 connection_->SetIdleNetworkTimeout(config_.idle_connection_state_lifetime());
93 if (connection_->connected()) {
94 connection_->SetOverallConnectionTimeout(
95 config_.max_time_before_crypto_handshake());
97 // TODO(satyamshekhar): Set congestion control and ICSL also.
100 QuicSession::~QuicSession() {
101 STLDeleteElements(&closed_streams_);
102 STLDeleteValues(&stream_map_);
105 bool QuicSession::OnStreamFrames(const vector<QuicStreamFrame>& frames) {
106 for (size_t i = 0; i < frames.size(); ++i) {
107 // TODO(rch) deal with the error case of stream id 0
108 if (IsClosedStream(frames[i].stream_id)) {
109 // If we get additional frames for a stream where we didn't process
110 // headers, it's highly likely our compression context will end up
111 // permanently out of sync with the peer's, so we give up and close the
112 // connection.
113 if (ContainsKey(prematurely_closed_streams_, frames[i].stream_id)) {
114 connection()->SendConnectionClose(
115 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
116 return false;
118 continue;
121 ReliableQuicStream* stream = GetStream(frames[i].stream_id);
122 if (stream == NULL) return false;
123 if (!stream->WillAcceptStreamFrame(frames[i])) return false;
125 // TODO(alyssar) check against existing connection address: if changed, make
126 // sure we update the connection.
129 for (size_t i = 0; i < frames.size(); ++i) {
130 QuicStreamId stream_id = frames[i].stream_id;
131 ReliableQuicStream* stream = GetStream(stream_id);
132 if (!stream) {
133 continue;
135 stream->OnStreamFrame(frames[i]);
137 // If the stream had been prematurely closed, and the
138 // headers are now decompressed, then we are finally finished
139 // with this stream.
140 if (ContainsKey(zombie_streams_, stream_id) &&
141 stream->headers_decompressed()) {
142 CloseZombieStream(stream_id);
146 while (!decompression_blocked_streams_.empty()) {
147 QuicHeaderId header_id = decompression_blocked_streams_.begin()->first;
148 if (header_id != decompressor_.current_header_id()) {
149 break;
151 QuicStreamId stream_id = decompression_blocked_streams_.begin()->second;
152 decompression_blocked_streams_.erase(header_id);
153 ReliableQuicStream* stream = GetStream(stream_id);
154 if (!stream) {
155 connection()->SendConnectionClose(
156 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED);
157 return false;
159 stream->OnDecompressorAvailable();
161 return true;
164 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
165 ReliableQuicStream* stream = GetStream(frame.stream_id);
166 if (!stream) {
167 return; // Errors are handled by GetStream.
169 if (ContainsKey(zombie_streams_, stream->id())) {
170 // If this was a zombie stream then we close it out now.
171 CloseZombieStream(stream->id());
172 // However, since the headers still have not been decompressed, we want to
173 // mark it a prematurely closed so that if we ever receive frames
174 // for this stream we can close the connection.
175 DCHECK(!stream->headers_decompressed());
176 AddPrematurelyClosedStream(frame.stream_id);
177 return;
179 stream->OnStreamReset(frame.error_code);
182 void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
183 DCHECK(frame.last_good_stream_id < next_stream_id_);
184 goaway_received_ = true;
187 void QuicSession::OnConnectionClosed(QuicErrorCode error, bool from_peer) {
188 DCHECK(!connection_->connected());
189 if (error_ == QUIC_NO_ERROR) {
190 error_ = error;
193 while (stream_map_.size() != 0) {
194 ReliableStreamMap::iterator it = stream_map_.begin();
195 QuicStreamId id = it->first;
196 it->second->OnConnectionClosed(error, from_peer);
197 // The stream should call CloseStream as part of OnConnectionClosed.
198 if (stream_map_.find(id) != stream_map_.end()) {
199 LOG(DFATAL) << ENDPOINT
200 << "Stream failed to close under OnConnectionClosed";
201 CloseStream(id);
206 bool QuicSession::OnCanWrite() {
207 // We latch this here rather than doing a traditional loop, because streams
208 // may be modifying the list as we loop.
209 int remaining_writes = write_blocked_streams_.NumBlockedStreams();
211 while (!connection_->HasQueuedData() &&
212 remaining_writes > 0) {
213 DCHECK(write_blocked_streams_.HasWriteBlockedStreams());
214 int index = write_blocked_streams_.GetHighestPriorityWriteBlockedList();
215 if (index == -1) {
216 LOG(DFATAL) << "WriteBlockedStream is missing";
217 connection_->CloseConnection(QUIC_INTERNAL_ERROR, false);
218 return true; // We have no write blocked streams.
220 QuicStreamId stream_id = write_blocked_streams_.PopFront(index);
221 if (stream_id == kCryptoStreamId) {
222 has_pending_handshake_ = false; // We just popped it.
224 ReliableQuicStream* stream = GetStream(stream_id);
225 if (stream != NULL) {
226 // If the stream can't write all bytes, it'll re-add itself to the blocked
227 // list.
228 stream->OnCanWrite();
230 --remaining_writes;
233 return !write_blocked_streams_.HasWriteBlockedStreams();
236 bool QuicSession::HasPendingHandshake() const {
237 return has_pending_handshake_;
240 QuicConsumedData QuicSession::WritevData(QuicStreamId id,
241 const struct iovec* iov,
242 int iov_count,
243 QuicStreamOffset offset,
244 bool fin) {
245 IOVector data;
246 data.AppendIovec(iov, iov_count);
247 return connection_->SendStreamData(id, data, offset, fin);
250 void QuicSession::SendRstStream(QuicStreamId id,
251 QuicRstStreamErrorCode error) {
252 connection_->SendRstStream(id, error);
253 CloseStreamInner(id, true);
256 void QuicSession::SendGoAway(QuicErrorCode error_code, const string& reason) {
257 goaway_sent_ = true;
258 connection_->SendGoAway(error_code, largest_peer_created_stream_id_, reason);
261 void QuicSession::CloseStream(QuicStreamId stream_id) {
262 CloseStreamInner(stream_id, false);
265 void QuicSession::CloseStreamInner(QuicStreamId stream_id,
266 bool locally_reset) {
267 DLOG(INFO) << ENDPOINT << "Closing stream " << stream_id;
269 ReliableStreamMap::iterator it = stream_map_.find(stream_id);
270 if (it == stream_map_.end()) {
271 DLOG(INFO) << ENDPOINT << "Stream is already closed: " << stream_id;
272 return;
274 ReliableQuicStream* stream = it->second;
275 if (connection_->connected() && !stream->headers_decompressed()) {
276 // If the stream is being closed locally (for example a client cancelling
277 // a request before receiving the response) then we need to make sure that
278 // we keep the stream alive long enough to process any response or
279 // RST_STREAM frames.
280 if (locally_reset && !is_server_) {
281 AddZombieStream(stream_id);
282 return;
285 // This stream has been closed before the headers were decompressed.
286 // This might cause problems with head of line blocking of headers.
287 // If the peer sent headers which were lost but we now close the stream
288 // we will never be able to decompress headers for other streams.
289 // To deal with this, we keep track of streams which have been closed
290 // prematurely. If we ever receive data frames for this steam, then we
291 // know there actually has been a problem and we close the connection.
292 AddPrematurelyClosedStream(stream->id());
294 closed_streams_.push_back(it->second);
295 if (ContainsKey(zombie_streams_, stream->id())) {
296 zombie_streams_.erase(stream->id());
298 stream_map_.erase(it);
299 stream->OnClose();
302 void QuicSession::AddZombieStream(QuicStreamId stream_id) {
303 if (zombie_streams_.size() == kMaxZombieStreams) {
304 QuicStreamId oldest_zombie_stream_id = zombie_streams_.begin()->first;
305 CloseZombieStream(oldest_zombie_stream_id);
306 // However, since the headers still have not been decompressed, we want to
307 // mark it a prematurely closed so that if we ever receive frames
308 // for this stream we can close the connection.
309 AddPrematurelyClosedStream(oldest_zombie_stream_id);
311 zombie_streams_.insert(make_pair(stream_id, true));
314 void QuicSession::CloseZombieStream(QuicStreamId stream_id) {
315 DCHECK(ContainsKey(zombie_streams_, stream_id));
316 zombie_streams_.erase(stream_id);
317 ReliableQuicStream* stream = GetStream(stream_id);
318 if (!stream) {
319 return;
321 stream_map_.erase(stream_id);
322 stream->OnClose();
323 closed_streams_.push_back(stream);
326 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id) {
327 if (prematurely_closed_streams_.size() ==
328 kMaxPrematurelyClosedStreamsTracked) {
329 prematurely_closed_streams_.erase(prematurely_closed_streams_.begin());
331 prematurely_closed_streams_.insert(make_pair(stream_id, true));
334 bool QuicSession::IsEncryptionEstablished() {
335 return GetCryptoStream()->encryption_established();
338 bool QuicSession::IsCryptoHandshakeConfirmed() {
339 return GetCryptoStream()->handshake_confirmed();
342 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
343 switch (event) {
344 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
345 // to QuicSession since it is the glue.
346 case ENCRYPTION_FIRST_ESTABLISHED:
347 break;
349 case ENCRYPTION_REESTABLISHED:
350 // Retransmit originally packets that were sent, since they can't be
351 // decrypted by the peer.
352 connection_->RetransmitUnackedPackets(
353 QuicConnection::INITIAL_ENCRYPTION_ONLY);
354 break;
356 case HANDSHAKE_CONFIRMED:
357 LOG_IF(DFATAL, !config_.negotiated()) << ENDPOINT
358 << "Handshake confirmed without parameter negotiation.";
359 connection_->SetIdleNetworkTimeout(
360 config_.idle_connection_state_lifetime());
361 connection_->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
362 max_open_streams_ = config_.max_streams_per_connection();
363 break;
365 default:
366 LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
370 void QuicSession::OnCryptoHandshakeMessageSent(
371 const CryptoHandshakeMessage& message) {
374 void QuicSession::OnCryptoHandshakeMessageReceived(
375 const CryptoHandshakeMessage& message) {
378 QuicConfig* QuicSession::config() {
379 return &config_;
382 void QuicSession::ActivateStream(ReliableQuicStream* stream) {
383 DLOG(INFO) << ENDPOINT << "num_streams: " << stream_map_.size()
384 << ". activating " << stream->id();
385 DCHECK_EQ(stream_map_.count(stream->id()), 0u);
386 stream_map_[stream->id()] = stream;
389 QuicStreamId QuicSession::GetNextStreamId() {
390 QuicStreamId id = next_stream_id_;
391 next_stream_id_ += 2;
392 return id;
395 ReliableQuicStream* QuicSession::GetStream(const QuicStreamId stream_id) {
396 if (stream_id == kCryptoStreamId) {
397 return GetCryptoStream();
400 ReliableStreamMap::iterator it = stream_map_.find(stream_id);
401 if (it != stream_map_.end()) {
402 return it->second;
405 if (IsClosedStream(stream_id)) {
406 return NULL;
409 if (stream_id % 2 == next_stream_id_ % 2) {
410 // We've received a frame for a locally-created stream that is not
411 // currently active. This is an error.
412 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM);
413 return NULL;
416 return GetIncomingReliableStream(stream_id);
419 ReliableQuicStream* QuicSession::GetIncomingReliableStream(
420 QuicStreamId stream_id) {
421 if (IsClosedStream(stream_id)) {
422 return NULL;
425 if (goaway_sent_) {
426 // We've already sent a GoAway
427 SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY);
428 return NULL;
431 implicitly_created_streams_.erase(stream_id);
432 if (stream_id > largest_peer_created_stream_id_) {
433 // TODO(rch) add unit test for this
434 if (stream_id - largest_peer_created_stream_id_ > kMaxStreamIdDelta) {
435 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID);
436 return NULL;
438 if (largest_peer_created_stream_id_ == 0) {
439 largest_peer_created_stream_id_= 1;
441 for (QuicStreamId id = largest_peer_created_stream_id_ + 2;
442 id < stream_id;
443 id += 2) {
444 implicitly_created_streams_.insert(id);
446 largest_peer_created_stream_id_ = stream_id;
448 ReliableQuicStream* stream = CreateIncomingReliableStream(stream_id);
449 if (stream == NULL) {
450 return NULL;
452 ActivateStream(stream);
453 return stream;
456 bool QuicSession::IsClosedStream(QuicStreamId id) {
457 DCHECK_NE(0u, id);
458 if (id == kCryptoStreamId) {
459 return false;
461 if (ContainsKey(zombie_streams_, id)) {
462 return true;
464 if (ContainsKey(stream_map_, id)) {
465 // Stream is active
466 return false;
468 if (id % 2 == next_stream_id_ % 2) {
469 // Locally created streams are strictly in-order. If the id is in the
470 // range of created streams and it's not active, it must have been closed.
471 return id < next_stream_id_;
473 // For peer created streams, we also need to consider implicitly created
474 // streams.
475 return id <= largest_peer_created_stream_id_ &&
476 implicitly_created_streams_.count(id) == 0;
479 size_t QuicSession::GetNumOpenStreams() const {
480 return stream_map_.size() + implicitly_created_streams_.size() -
481 zombie_streams_.size();
484 void QuicSession::MarkWriteBlocked(QuicStreamId id, QuicPriority priority) {
485 if (id == kCryptoStreamId) {
486 DCHECK(!has_pending_handshake_);
487 has_pending_handshake_ = true;
488 // TODO(jar): Be sure to use the highest priority for the crypto stream,
489 // perhaps by adding a "special" priority for it that is higher than
490 // kHighestPriority.
491 priority = kHighestPriority;
493 write_blocked_streams_.PushBack(id, priority);
496 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id,
497 QuicStreamId stream_id) {
498 decompression_blocked_streams_[header_id] = stream_id;
501 bool QuicSession::GetSSLInfo(SSLInfo* ssl_info) {
502 NOTIMPLEMENTED();
503 return false;
506 void QuicSession::PostProcessAfterData() {
507 STLDeleteElements(&closed_streams_);
508 closed_streams_.clear();
511 } // namespace net