Land Recent QUIC Changes.
[chromium-blink-merge.git] / net / tools / quic / quic_dispatcher.cc
blob92e5d706e23175260e3c4fdfe4f3162256c31a39
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/tools/quic/quic_dispatcher.h"
7 #include <errno.h>
9 #include "base/debug/stack_trace.h"
10 #include "base/logging.h"
11 #include "base/stl_util.h"
12 #include "net/quic/quic_blocked_writer_interface.h"
13 #include "net/quic/quic_utils.h"
14 #include "net/tools/quic/quic_default_packet_writer.h"
15 #include "net/tools/quic/quic_epoll_connection_helper.h"
16 #include "net/tools/quic/quic_packet_writer_wrapper.h"
17 #include "net/tools/quic/quic_socket_utils.h"
19 namespace net {
21 namespace tools {
23 using base::StringPiece;
24 using std::make_pair;
26 class DeleteSessionsAlarm : public EpollAlarm {
27 public:
28 explicit DeleteSessionsAlarm(QuicDispatcher* dispatcher)
29 : dispatcher_(dispatcher) {
32 virtual int64 OnAlarm() OVERRIDE {
33 EpollAlarm::OnAlarm();
34 dispatcher_->DeleteSessions();
35 return 0;
38 private:
39 QuicDispatcher* dispatcher_;
42 class QuicDispatcher::QuicFramerVisitor : public QuicFramerVisitorInterface {
43 public:
44 explicit QuicFramerVisitor(QuicDispatcher* dispatcher)
45 : dispatcher_(dispatcher) {}
47 // QuicFramerVisitorInterface implementation
48 virtual void OnPacket() OVERRIDE {}
49 virtual bool OnUnauthenticatedPublicHeader(
50 const QuicPacketPublicHeader& header) OVERRIDE {
51 return dispatcher_->OnUnauthenticatedPublicHeader(header);
53 virtual bool OnUnauthenticatedHeader(
54 const QuicPacketHeader& header) OVERRIDE {
55 dispatcher_->OnUnauthenticatedHeader(header);
56 return false;
58 virtual void OnError(QuicFramer* framer) OVERRIDE {
59 DVLOG(1) << QuicUtils::ErrorToString(framer->error());
62 // The following methods should never get called because we always return
63 // false from OnUnauthenticatedHeader(). As a result, we never process the
64 // payload of the packet.
65 virtual bool OnProtocolVersionMismatch(
66 QuicVersion /*received_version*/) OVERRIDE {
67 DCHECK(false);
68 return false;
70 virtual void OnPublicResetPacket(
71 const QuicPublicResetPacket& /*packet*/) OVERRIDE {
72 DCHECK(false);
74 virtual void OnVersionNegotiationPacket(
75 const QuicVersionNegotiationPacket& /*packet*/) OVERRIDE {
76 DCHECK(false);
78 virtual void OnPacketComplete() OVERRIDE {
79 DCHECK(false);
81 virtual bool OnPacketHeader(const QuicPacketHeader& /*header*/) OVERRIDE {
82 DCHECK(false);
83 return false;
85 virtual void OnRevivedPacket() OVERRIDE {
86 DCHECK(false);
88 virtual void OnFecProtectedPayload(StringPiece /*payload*/) OVERRIDE {
89 DCHECK(false);
91 virtual bool OnStreamFrame(const QuicStreamFrame& /*frame*/) OVERRIDE {
92 DCHECK(false);
93 return false;
95 virtual bool OnAckFrame(const QuicAckFrame& /*frame*/) OVERRIDE {
96 DCHECK(false);
97 return false;
99 virtual bool OnCongestionFeedbackFrame(
100 const QuicCongestionFeedbackFrame& /*frame*/) OVERRIDE {
101 DCHECK(false);
102 return false;
104 virtual bool OnStopWaitingFrame(
105 const QuicStopWaitingFrame& /*frame*/) OVERRIDE {
106 DCHECK(false);
107 return false;
109 virtual bool OnRstStreamFrame(const QuicRstStreamFrame& /*frame*/) OVERRIDE {
110 DCHECK(false);
111 return false;
113 virtual bool OnConnectionCloseFrame(
114 const QuicConnectionCloseFrame & /*frame*/) OVERRIDE {
115 DCHECK(false);
116 return false;
118 virtual bool OnGoAwayFrame(const QuicGoAwayFrame& /*frame*/) OVERRIDE {
119 DCHECK(false);
120 return false;
122 virtual bool OnWindowUpdateFrame(const QuicWindowUpdateFrame& /*frame*/)
123 OVERRIDE {
124 DCHECK(false);
125 return false;
127 virtual bool OnBlockedFrame(const QuicBlockedFrame& frame) OVERRIDE {
128 DCHECK(false);
129 return false;
131 virtual void OnFecData(const QuicFecData& /*fec*/) OVERRIDE {
132 DCHECK(false);
135 private:
136 QuicDispatcher* dispatcher_;
139 QuicDispatcher::QuicDispatcher(const QuicConfig& config,
140 const QuicCryptoServerConfig& crypto_config,
141 const QuicVersionVector& supported_versions,
142 EpollServer* epoll_server)
143 : config_(config),
144 crypto_config_(crypto_config),
145 delete_sessions_alarm_(new DeleteSessionsAlarm(this)),
146 epoll_server_(epoll_server),
147 helper_(new QuicEpollConnectionHelper(epoll_server_)),
148 supported_versions_(supported_versions),
149 current_packet_(NULL),
150 framer_(supported_versions, /*unused*/ QuicTime::Zero(), true),
151 framer_visitor_(new QuicFramerVisitor(this)) {
152 framer_.set_visitor(framer_visitor_.get());
155 QuicDispatcher::~QuicDispatcher() {
156 STLDeleteValues(&session_map_);
157 STLDeleteElements(&closed_session_list_);
160 void QuicDispatcher::Initialize(int fd) {
161 DCHECK(writer_ == NULL);
162 writer_.reset(CreateWriterWrapper(CreateWriter(fd)));
163 time_wait_list_manager_.reset(
164 new QuicTimeWaitListManager(writer_.get(), this,
165 epoll_server(), supported_versions()));
168 void QuicDispatcher::ProcessPacket(const IPEndPoint& server_address,
169 const IPEndPoint& client_address,
170 const QuicEncryptedPacket& packet) {
171 current_server_address_ = server_address;
172 current_client_address_ = client_address;
173 current_packet_ = &packet;
174 // ProcessPacket will cause the packet to be dispatched in
175 // OnUnauthenticatedPublicHeader, or sent to the time wait list manager
176 // in OnAuthenticatedHeader.
177 framer_.ProcessPacket(packet);
178 // TODO(rjshade): Return a status describing if/why a packet was dropped,
179 // and log somehow. Maybe expose as a varz.
182 bool QuicDispatcher::OnUnauthenticatedPublicHeader(
183 const QuicPacketPublicHeader& header) {
184 QuicSession* session = NULL;
186 QuicConnectionId connection_id = header.connection_id;
187 SessionMap::iterator it = session_map_.find(connection_id);
188 if (it == session_map_.end()) {
189 if (header.reset_flag) {
190 return false;
192 if (time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id)) {
193 return HandlePacketForTimeWait(header);
196 // Ensure the packet has a version negotiation bit set before creating a new
197 // session for it. All initial packets for a new connection are required to
198 // have the flag set. Otherwise it may be a stray packet.
199 if (header.version_flag) {
200 session = CreateQuicSession(connection_id, current_server_address_,
201 current_client_address_);
204 if (session == NULL) {
205 DVLOG(1) << "Failed to create session for " << connection_id;
206 // Add this connection_id fo the time-wait state, to safely reject future
207 // packets.
209 if (header.version_flag &&
210 !framer_.IsSupportedVersion(header.versions.front())) {
211 // TODO(ianswett): Produce a no-version version negotiation packet.
212 return false;
215 // Use the version in the packet if possible, otherwise assume the latest.
216 QuicVersion version = header.version_flag ? header.versions.front() :
217 supported_versions_.front();
218 time_wait_list_manager_->AddConnectionIdToTimeWait(
219 connection_id, version, NULL);
220 DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id));
221 return HandlePacketForTimeWait(header);
223 DVLOG(1) << "Created new session for " << connection_id;
224 session_map_.insert(make_pair(connection_id, session));
225 } else {
226 session = it->second;
229 session->connection()->ProcessUdpPacket(
230 current_server_address_, current_client_address_, *current_packet_);
232 // Do not parse the packet further. The session will process it completely.
233 return false;
236 void QuicDispatcher::OnUnauthenticatedHeader(const QuicPacketHeader& header) {
237 DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(
238 header.public_header.connection_id));
239 time_wait_list_manager_->ProcessPacket(current_server_address_,
240 current_client_address_,
241 header.public_header.connection_id,
242 header.packet_sequence_number);
245 void QuicDispatcher::CleanUpSession(SessionMap::iterator it) {
246 QuicConnection* connection = it->second->connection();
247 QuicEncryptedPacket* connection_close_packet =
248 connection->ReleaseConnectionClosePacket();
249 write_blocked_list_.erase(connection);
250 time_wait_list_manager_->AddConnectionIdToTimeWait(it->first,
251 connection->version(),
252 connection_close_packet);
253 session_map_.erase(it);
256 void QuicDispatcher::DeleteSessions() {
257 STLDeleteElements(&closed_session_list_);
260 void QuicDispatcher::OnCanWrite() {
261 // We got an EPOLLOUT: the socket should not be blocked.
262 writer_->SetWritable();
264 // Give each writer one attempt to write.
265 int num_writers = write_blocked_list_.size();
266 for (int i = 0; i < num_writers; ++i) {
267 if (write_blocked_list_.empty()) {
268 return;
270 QuicBlockedWriterInterface* blocked_writer =
271 write_blocked_list_.begin()->first;
272 write_blocked_list_.erase(write_blocked_list_.begin());
273 blocked_writer->OnCanWrite();
274 if (writer_->IsWriteBlocked()) {
275 // We were unable to write. Wait for the next EPOLLOUT. The writer is
276 // responsible for adding itself to the blocked list via OnWriteBlocked().
277 return;
282 bool QuicDispatcher::HasPendingWrites() const {
283 return !write_blocked_list_.empty();
286 void QuicDispatcher::Shutdown() {
287 while (!session_map_.empty()) {
288 QuicSession* session = session_map_.begin()->second;
289 session->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY);
290 // Validate that the session removes itself from the session map on close.
291 DCHECK(session_map_.empty() || session_map_.begin()->second != session);
293 DeleteSessions();
296 void QuicDispatcher::OnConnectionClosed(QuicConnectionId connection_id,
297 QuicErrorCode error) {
298 SessionMap::iterator it = session_map_.find(connection_id);
299 if (it == session_map_.end()) {
300 LOG(DFATAL) << "ConnectionId " << connection_id
301 << " does not exist in the session map. "
302 << "Error: " << QuicUtils::ErrorToString(error);
303 LOG(DFATAL) << base::debug::StackTrace().ToString();
304 return;
307 DLOG_IF(INFO, error != QUIC_NO_ERROR) << "Closing connection ("
308 << connection_id
309 << ") due to error: "
310 << QuicUtils::ErrorToString(error);
312 if (closed_session_list_.empty()) {
313 epoll_server_->RegisterAlarmApproximateDelta(
314 0, delete_sessions_alarm_.get());
316 closed_session_list_.push_back(it->second);
317 CleanUpSession(it);
320 void QuicDispatcher::OnWriteBlocked(QuicBlockedWriterInterface* writer) {
321 DCHECK(writer_->IsWriteBlocked());
322 write_blocked_list_.insert(make_pair(writer, true));
325 QuicPacketWriter* QuicDispatcher::CreateWriter(int fd) {
326 return new QuicDefaultPacketWriter(fd);
329 QuicPacketWriterWrapper* QuicDispatcher::CreateWriterWrapper(
330 QuicPacketWriter* writer) {
331 return new QuicPacketWriterWrapper(writer);
334 QuicSession* QuicDispatcher::CreateQuicSession(
335 QuicConnectionId connection_id,
336 const IPEndPoint& server_address,
337 const IPEndPoint& client_address) {
338 QuicServerSession* session = new QuicServerSession(
339 config_,
340 CreateQuicConnection(connection_id, server_address, client_address),
341 this);
342 session->InitializeSession(crypto_config_);
343 return session;
346 QuicConnection* QuicDispatcher::CreateQuicConnection(
347 QuicConnectionId connection_id,
348 const IPEndPoint& server_address,
349 const IPEndPoint& client_address) {
350 return new QuicConnection(connection_id, client_address, helper_.get(),
351 writer_.get(), true, supported_versions_);
354 void QuicDispatcher::set_writer(QuicPacketWriter* writer) {
355 writer_->set_writer(writer);
358 bool QuicDispatcher::HandlePacketForTimeWait(
359 const QuicPacketPublicHeader& header) {
360 if (header.reset_flag) {
361 // Public reset packets do not have sequence numbers, so ignore the packet.
362 return false;
365 // Switch the framer to the correct version, so that the sequence number can
366 // be parsed correctly.
367 framer_.set_version(time_wait_list_manager_->GetQuicVersionFromConnectionId(
368 header.connection_id));
370 // Continue parsing the packet to extract the sequence number. Then
371 // send it to the time wait manager in OnUnathenticatedHeader.
372 return true;
375 } // namespace tools
376 } // namespace net