1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/quic/quic_session.h"
7 #include "base/stl_util.h"
8 #include "net/quic/crypto/proof_verifier.h"
9 #include "net/quic/quic_connection.h"
10 #include "net/ssl/ssl_info.h"
12 using base::StringPiece
;
20 const size_t kMaxPrematurelyClosedStreamsTracked
= 20;
21 const size_t kMaxZombieStreams
= 20;
23 #define ENDPOINT (is_server_ ? "Server: " : " Client: ")
25 // We want to make sure we delete any closed streams in a safe manner.
26 // To avoid deleting a stream in mid-operation, we have a simple shim between
27 // us and the stream, so we can delete any streams when we return from
30 // We could just override the base methods, but this makes it easier to make
31 // sure we don't miss any.
32 class VisitorShim
: public QuicConnectionVisitorInterface
{
34 explicit VisitorShim(QuicSession
* session
) : session_(session
) {}
36 virtual bool OnStreamFrames(const vector
<QuicStreamFrame
>& frames
) OVERRIDE
{
37 bool accepted
= session_
->OnStreamFrames(frames
);
38 session_
->PostProcessAfterData();
41 virtual void OnRstStream(const QuicRstStreamFrame
& frame
) OVERRIDE
{
42 session_
->OnRstStream(frame
);
43 session_
->PostProcessAfterData();
46 virtual void OnGoAway(const QuicGoAwayFrame
& frame
) OVERRIDE
{
47 session_
->OnGoAway(frame
);
48 session_
->PostProcessAfterData();
51 virtual bool OnCanWrite() OVERRIDE
{
52 bool rc
= session_
->OnCanWrite();
53 session_
->PostProcessAfterData();
57 virtual void OnSuccessfulVersionNegotiation(
58 const QuicVersion
& version
) OVERRIDE
{
59 session_
->OnSuccessfulVersionNegotiation(version
);
62 virtual void OnConnectionClosed(QuicErrorCode error
,
63 bool from_peer
) OVERRIDE
{
64 session_
->OnConnectionClosed(error
, from_peer
);
65 // The session will go away, so don't bother with cleanup.
68 virtual bool HasPendingHandshake() const OVERRIDE
{
69 return session_
->HasPendingHandshake();
73 QuicSession
* session_
;
76 QuicSession::QuicSession(QuicConnection
* connection
,
77 const QuicConfig
& config
,
79 : connection_(connection
),
80 visitor_shim_(new VisitorShim(this)),
82 max_open_streams_(config_
.max_streams_per_connection()),
83 next_stream_id_(is_server
? 2 : 3),
84 is_server_(is_server
),
85 largest_peer_created_stream_id_(0),
86 error_(QUIC_NO_ERROR
),
87 goaway_received_(false),
89 has_pending_handshake_(false) {
91 connection_
->set_visitor(visitor_shim_
.get());
92 connection_
->SetIdleNetworkTimeout(config_
.idle_connection_state_lifetime());
93 if (connection_
->connected()) {
94 connection_
->SetOverallConnectionTimeout(
95 config_
.max_time_before_crypto_handshake());
97 // TODO(satyamshekhar): Set congestion control and ICSL also.
100 QuicSession::~QuicSession() {
101 STLDeleteElements(&closed_streams_
);
102 STLDeleteValues(&stream_map_
);
105 bool QuicSession::OnStreamFrames(const vector
<QuicStreamFrame
>& frames
) {
106 for (size_t i
= 0; i
< frames
.size(); ++i
) {
107 // TODO(rch) deal with the error case of stream id 0
108 if (IsClosedStream(frames
[i
].stream_id
)) {
109 // If we get additional frames for a stream where we didn't process
110 // headers, it's highly likely our compression context will end up
111 // permanently out of sync with the peer's, so we give up and close the
113 if (ContainsKey(prematurely_closed_streams_
, frames
[i
].stream_id
)) {
114 connection()->SendConnectionClose(
115 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED
);
121 ReliableQuicStream
* stream
= GetStream(frames
[i
].stream_id
);
122 if (stream
== NULL
) return false;
123 if (!stream
->WillAcceptStreamFrame(frames
[i
])) return false;
125 // TODO(alyssar) check against existing connection address: if changed, make
126 // sure we update the connection.
129 for (size_t i
= 0; i
< frames
.size(); ++i
) {
130 QuicStreamId stream_id
= frames
[i
].stream_id
;
131 ReliableQuicStream
* stream
= GetStream(stream_id
);
135 stream
->OnStreamFrame(frames
[i
]);
137 // If the stream had been prematurely closed, and the
138 // headers are now decompressed, then we are finally finished
140 if (ContainsKey(zombie_streams_
, stream_id
) &&
141 stream
->headers_decompressed()) {
142 CloseZombieStream(stream_id
);
146 while (!decompression_blocked_streams_
.empty()) {
147 QuicHeaderId header_id
= decompression_blocked_streams_
.begin()->first
;
148 if (header_id
!= decompressor_
.current_header_id()) {
151 QuicStreamId stream_id
= decompression_blocked_streams_
.begin()->second
;
152 decompression_blocked_streams_
.erase(header_id
);
153 ReliableQuicStream
* stream
= GetStream(stream_id
);
155 connection()->SendConnectionClose(
156 QUIC_STREAM_RST_BEFORE_HEADERS_DECOMPRESSED
);
159 stream
->OnDecompressorAvailable();
164 void QuicSession::OnRstStream(const QuicRstStreamFrame
& frame
) {
165 ReliableQuicStream
* stream
= GetStream(frame
.stream_id
);
167 return; // Errors are handled by GetStream.
169 if (ContainsKey(zombie_streams_
, stream
->id())) {
170 // If this was a zombie stream then we close it out now.
171 CloseZombieStream(stream
->id());
172 // However, since the headers still have not been decompressed, we want to
173 // mark it a prematurely closed so that if we ever receive frames
174 // for this stream we can close the connection.
175 DCHECK(!stream
->headers_decompressed());
176 AddPrematurelyClosedStream(frame
.stream_id
);
179 stream
->OnStreamReset(frame
.error_code
);
182 void QuicSession::OnGoAway(const QuicGoAwayFrame
& frame
) {
183 DCHECK(frame
.last_good_stream_id
< next_stream_id_
);
184 goaway_received_
= true;
187 void QuicSession::OnConnectionClosed(QuicErrorCode error
, bool from_peer
) {
188 DCHECK(!connection_
->connected());
189 if (error_
== QUIC_NO_ERROR
) {
193 while (stream_map_
.size() != 0) {
194 ReliableStreamMap::iterator it
= stream_map_
.begin();
195 QuicStreamId id
= it
->first
;
196 it
->second
->OnConnectionClosed(error
, from_peer
);
197 // The stream should call CloseStream as part of OnConnectionClosed.
198 if (stream_map_
.find(id
) != stream_map_
.end()) {
199 LOG(DFATAL
) << ENDPOINT
200 << "Stream failed to close under OnConnectionClosed";
206 bool QuicSession::OnCanWrite() {
207 // We latch this here rather than doing a traditional loop, because streams
208 // may be modifying the list as we loop.
209 int remaining_writes
= write_blocked_streams_
.NumBlockedStreams();
211 while (!connection_
->HasQueuedData() &&
212 remaining_writes
> 0) {
213 DCHECK(write_blocked_streams_
.HasWriteBlockedStreams());
214 int index
= write_blocked_streams_
.GetHighestPriorityWriteBlockedList();
216 LOG(DFATAL
) << "WriteBlockedStream is missing";
217 connection_
->CloseConnection(QUIC_INTERNAL_ERROR
, false);
218 return true; // We have no write blocked streams.
220 QuicStreamId stream_id
= write_blocked_streams_
.PopFront(index
);
221 if (stream_id
== kCryptoStreamId
) {
222 has_pending_handshake_
= false; // We just popped it.
224 ReliableQuicStream
* stream
= GetStream(stream_id
);
225 if (stream
!= NULL
) {
226 // If the stream can't write all bytes, it'll re-add itself to the blocked
228 stream
->OnCanWrite();
233 return !write_blocked_streams_
.HasWriteBlockedStreams();
236 bool QuicSession::HasPendingHandshake() const {
237 return has_pending_handshake_
;
240 QuicConsumedData
QuicSession::WritevData(QuicStreamId id
,
241 const struct iovec
* iov
,
243 QuicStreamOffset offset
,
246 data
.AppendIovec(iov
, iov_count
);
247 return connection_
->SendStreamData(id
, data
, offset
, fin
);
250 void QuicSession::SendRstStream(QuicStreamId id
,
251 QuicRstStreamErrorCode error
) {
252 connection_
->SendRstStream(id
, error
);
253 CloseStreamInner(id
, true);
256 void QuicSession::SendGoAway(QuicErrorCode error_code
, const string
& reason
) {
258 connection_
->SendGoAway(error_code
, largest_peer_created_stream_id_
, reason
);
261 void QuicSession::CloseStream(QuicStreamId stream_id
) {
262 CloseStreamInner(stream_id
, false);
265 void QuicSession::CloseStreamInner(QuicStreamId stream_id
,
266 bool locally_reset
) {
267 DLOG(INFO
) << ENDPOINT
<< "Closing stream " << stream_id
;
269 ReliableStreamMap::iterator it
= stream_map_
.find(stream_id
);
270 if (it
== stream_map_
.end()) {
271 DLOG(INFO
) << ENDPOINT
<< "Stream is already closed: " << stream_id
;
274 ReliableQuicStream
* stream
= it
->second
;
275 if (connection_
->connected() && !stream
->headers_decompressed()) {
276 // If the stream is being closed locally (for example a client cancelling
277 // a request before receiving the response) then we need to make sure that
278 // we keep the stream alive long enough to process any response or
279 // RST_STREAM frames.
280 if (locally_reset
&& !is_server_
) {
281 AddZombieStream(stream_id
);
285 // This stream has been closed before the headers were decompressed.
286 // This might cause problems with head of line blocking of headers.
287 // If the peer sent headers which were lost but we now close the stream
288 // we will never be able to decompress headers for other streams.
289 // To deal with this, we keep track of streams which have been closed
290 // prematurely. If we ever receive data frames for this steam, then we
291 // know there actually has been a problem and we close the connection.
292 AddPrematurelyClosedStream(stream
->id());
294 closed_streams_
.push_back(it
->second
);
295 if (ContainsKey(zombie_streams_
, stream
->id())) {
296 zombie_streams_
.erase(stream
->id());
298 stream_map_
.erase(it
);
302 void QuicSession::AddZombieStream(QuicStreamId stream_id
) {
303 if (zombie_streams_
.size() == kMaxZombieStreams
) {
304 QuicStreamId oldest_zombie_stream_id
= zombie_streams_
.begin()->first
;
305 CloseZombieStream(oldest_zombie_stream_id
);
306 // However, since the headers still have not been decompressed, we want to
307 // mark it a prematurely closed so that if we ever receive frames
308 // for this stream we can close the connection.
309 AddPrematurelyClosedStream(oldest_zombie_stream_id
);
311 zombie_streams_
.insert(make_pair(stream_id
, true));
314 void QuicSession::CloseZombieStream(QuicStreamId stream_id
) {
315 DCHECK(ContainsKey(zombie_streams_
, stream_id
));
316 zombie_streams_
.erase(stream_id
);
317 ReliableQuicStream
* stream
= GetStream(stream_id
);
321 stream_map_
.erase(stream_id
);
323 closed_streams_
.push_back(stream
);
326 void QuicSession::AddPrematurelyClosedStream(QuicStreamId stream_id
) {
327 if (prematurely_closed_streams_
.size() ==
328 kMaxPrematurelyClosedStreamsTracked
) {
329 prematurely_closed_streams_
.erase(prematurely_closed_streams_
.begin());
331 prematurely_closed_streams_
.insert(make_pair(stream_id
, true));
334 bool QuicSession::IsEncryptionEstablished() {
335 return GetCryptoStream()->encryption_established();
338 bool QuicSession::IsCryptoHandshakeConfirmed() {
339 return GetCryptoStream()->handshake_confirmed();
342 void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event
) {
344 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
345 // to QuicSession since it is the glue.
346 case ENCRYPTION_FIRST_ESTABLISHED
:
349 case ENCRYPTION_REESTABLISHED
:
350 // Retransmit originally packets that were sent, since they can't be
351 // decrypted by the peer.
352 connection_
->RetransmitUnackedPackets(
353 QuicConnection::INITIAL_ENCRYPTION_ONLY
);
356 case HANDSHAKE_CONFIRMED
:
357 LOG_IF(DFATAL
, !config_
.negotiated()) << ENDPOINT
358 << "Handshake confirmed without parameter negotiation.";
359 connection_
->SetIdleNetworkTimeout(
360 config_
.idle_connection_state_lifetime());
361 connection_
->SetOverallConnectionTimeout(QuicTime::Delta::Infinite());
362 max_open_streams_
= config_
.max_streams_per_connection();
366 LOG(ERROR
) << ENDPOINT
<< "Got unknown handshake event: " << event
;
370 void QuicSession::OnCryptoHandshakeMessageSent(
371 const CryptoHandshakeMessage
& message
) {
374 void QuicSession::OnCryptoHandshakeMessageReceived(
375 const CryptoHandshakeMessage
& message
) {
378 QuicConfig
* QuicSession::config() {
382 void QuicSession::ActivateStream(ReliableQuicStream
* stream
) {
383 DLOG(INFO
) << ENDPOINT
<< "num_streams: " << stream_map_
.size()
384 << ". activating " << stream
->id();
385 DCHECK_EQ(stream_map_
.count(stream
->id()), 0u);
386 stream_map_
[stream
->id()] = stream
;
389 QuicStreamId
QuicSession::GetNextStreamId() {
390 QuicStreamId id
= next_stream_id_
;
391 next_stream_id_
+= 2;
395 ReliableQuicStream
* QuicSession::GetStream(const QuicStreamId stream_id
) {
396 if (stream_id
== kCryptoStreamId
) {
397 return GetCryptoStream();
400 ReliableStreamMap::iterator it
= stream_map_
.find(stream_id
);
401 if (it
!= stream_map_
.end()) {
405 if (IsClosedStream(stream_id
)) {
409 if (stream_id
% 2 == next_stream_id_
% 2) {
410 // We've received a frame for a locally-created stream that is not
411 // currently active. This is an error.
412 connection()->SendConnectionClose(QUIC_PACKET_FOR_NONEXISTENT_STREAM
);
416 return GetIncomingReliableStream(stream_id
);
419 ReliableQuicStream
* QuicSession::GetIncomingReliableStream(
420 QuicStreamId stream_id
) {
421 if (IsClosedStream(stream_id
)) {
426 // We've already sent a GoAway
427 SendRstStream(stream_id
, QUIC_STREAM_PEER_GOING_AWAY
);
431 implicitly_created_streams_
.erase(stream_id
);
432 if (stream_id
> largest_peer_created_stream_id_
) {
433 // TODO(rch) add unit test for this
434 if (stream_id
- largest_peer_created_stream_id_
> kMaxStreamIdDelta
) {
435 connection()->SendConnectionClose(QUIC_INVALID_STREAM_ID
);
438 if (largest_peer_created_stream_id_
== 0) {
439 largest_peer_created_stream_id_
= 1;
441 for (QuicStreamId id
= largest_peer_created_stream_id_
+ 2;
444 implicitly_created_streams_
.insert(id
);
446 largest_peer_created_stream_id_
= stream_id
;
448 ReliableQuicStream
* stream
= CreateIncomingReliableStream(stream_id
);
449 if (stream
== NULL
) {
452 ActivateStream(stream
);
456 bool QuicSession::IsClosedStream(QuicStreamId id
) {
458 if (id
== kCryptoStreamId
) {
461 if (ContainsKey(zombie_streams_
, id
)) {
464 if (ContainsKey(stream_map_
, id
)) {
468 if (id
% 2 == next_stream_id_
% 2) {
469 // Locally created streams are strictly in-order. If the id is in the
470 // range of created streams and it's not active, it must have been closed.
471 return id
< next_stream_id_
;
473 // For peer created streams, we also need to consider implicitly created
475 return id
<= largest_peer_created_stream_id_
&&
476 implicitly_created_streams_
.count(id
) == 0;
479 size_t QuicSession::GetNumOpenStreams() const {
480 return stream_map_
.size() + implicitly_created_streams_
.size() -
481 zombie_streams_
.size();
484 void QuicSession::MarkWriteBlocked(QuicStreamId id
, QuicPriority priority
) {
485 if (id
== kCryptoStreamId
) {
486 DCHECK(!has_pending_handshake_
);
487 has_pending_handshake_
= true;
488 // TODO(jar): Be sure to use the highest priority for the crypto stream,
489 // perhaps by adding a "special" priority for it that is higher than
491 priority
= kHighestPriority
;
493 write_blocked_streams_
.PushBack(id
, priority
);
496 void QuicSession::MarkDecompressionBlocked(QuicHeaderId header_id
,
497 QuicStreamId stream_id
) {
498 decompression_blocked_streams_
[header_id
] = stream_id
;
501 bool QuicSession::GetSSLInfo(SSLInfo
* ssl_info
) {
506 void QuicSession::PostProcessAfterData() {
507 STLDeleteElements(&closed_streams_
);
508 closed_streams_
.clear();