1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "net/tools/quic/quic_dispatcher.h"
9 #include "base/debug/stack_trace.h"
10 #include "base/logging.h"
11 #include "base/stl_util.h"
12 #include "net/quic/quic_blocked_writer_interface.h"
13 #include "net/quic/quic_utils.h"
14 #include "net/tools/quic/quic_default_packet_writer.h"
15 #include "net/tools/quic/quic_epoll_connection_helper.h"
16 #include "net/tools/quic/quic_packet_writer_wrapper.h"
17 #include "net/tools/quic/quic_socket_utils.h"
23 using base::StringPiece
;
26 class DeleteSessionsAlarm
: public EpollAlarm
{
28 explicit DeleteSessionsAlarm(QuicDispatcher
* dispatcher
)
29 : dispatcher_(dispatcher
) {
32 virtual int64
OnAlarm() OVERRIDE
{
33 EpollAlarm::OnAlarm();
34 dispatcher_
->DeleteSessions();
39 QuicDispatcher
* dispatcher_
;
42 class QuicDispatcher::QuicFramerVisitor
: public QuicFramerVisitorInterface
{
44 explicit QuicFramerVisitor(QuicDispatcher
* dispatcher
)
45 : dispatcher_(dispatcher
) {}
47 // QuicFramerVisitorInterface implementation
48 virtual void OnPacket() OVERRIDE
{}
49 virtual bool OnUnauthenticatedPublicHeader(
50 const QuicPacketPublicHeader
& header
) OVERRIDE
{
51 return dispatcher_
->OnUnauthenticatedPublicHeader(header
);
53 virtual bool OnUnauthenticatedHeader(
54 const QuicPacketHeader
& header
) OVERRIDE
{
55 dispatcher_
->OnUnauthenticatedHeader(header
);
58 virtual void OnError(QuicFramer
* framer
) OVERRIDE
{
59 DVLOG(1) << QuicUtils::ErrorToString(framer
->error());
62 // The following methods should never get called because we always return
63 // false from OnUnauthenticatedHeader(). As a result, we never process the
64 // payload of the packet.
65 virtual bool OnProtocolVersionMismatch(
66 QuicVersion
/*received_version*/) OVERRIDE
{
70 virtual void OnPublicResetPacket(
71 const QuicPublicResetPacket
& /*packet*/) OVERRIDE
{
74 virtual void OnVersionNegotiationPacket(
75 const QuicVersionNegotiationPacket
& /*packet*/) OVERRIDE
{
78 virtual void OnPacketComplete() OVERRIDE
{
81 virtual bool OnPacketHeader(const QuicPacketHeader
& /*header*/) OVERRIDE
{
85 virtual void OnRevivedPacket() OVERRIDE
{
88 virtual void OnFecProtectedPayload(StringPiece
/*payload*/) OVERRIDE
{
91 virtual bool OnStreamFrame(const QuicStreamFrame
& /*frame*/) OVERRIDE
{
95 virtual bool OnAckFrame(const QuicAckFrame
& /*frame*/) OVERRIDE
{
99 virtual bool OnCongestionFeedbackFrame(
100 const QuicCongestionFeedbackFrame
& /*frame*/) OVERRIDE
{
104 virtual bool OnStopWaitingFrame(
105 const QuicStopWaitingFrame
& /*frame*/) OVERRIDE
{
109 virtual bool OnRstStreamFrame(const QuicRstStreamFrame
& /*frame*/) OVERRIDE
{
113 virtual bool OnConnectionCloseFrame(
114 const QuicConnectionCloseFrame
& /*frame*/) OVERRIDE
{
118 virtual bool OnGoAwayFrame(const QuicGoAwayFrame
& /*frame*/) OVERRIDE
{
122 virtual bool OnWindowUpdateFrame(const QuicWindowUpdateFrame
& /*frame*/)
127 virtual bool OnBlockedFrame(const QuicBlockedFrame
& frame
) OVERRIDE
{
131 virtual void OnFecData(const QuicFecData
& /*fec*/) OVERRIDE
{
136 QuicDispatcher
* dispatcher_
;
139 QuicDispatcher::QuicDispatcher(const QuicConfig
& config
,
140 const QuicCryptoServerConfig
& crypto_config
,
141 const QuicVersionVector
& supported_versions
,
142 EpollServer
* epoll_server
)
144 crypto_config_(crypto_config
),
145 delete_sessions_alarm_(new DeleteSessionsAlarm(this)),
146 epoll_server_(epoll_server
),
147 helper_(new QuicEpollConnectionHelper(epoll_server_
)),
148 supported_versions_(supported_versions
),
149 current_packet_(NULL
),
150 framer_(supported_versions
, /*unused*/ QuicTime::Zero(), true),
151 framer_visitor_(new QuicFramerVisitor(this)) {
152 framer_
.set_visitor(framer_visitor_
.get());
155 QuicDispatcher::~QuicDispatcher() {
156 STLDeleteValues(&session_map_
);
157 STLDeleteElements(&closed_session_list_
);
160 void QuicDispatcher::Initialize(int fd
) {
161 DCHECK(writer_
== NULL
);
162 writer_
.reset(CreateWriterWrapper(CreateWriter(fd
)));
163 time_wait_list_manager_
.reset(
164 new QuicTimeWaitListManager(writer_
.get(), this,
165 epoll_server(), supported_versions()));
168 void QuicDispatcher::ProcessPacket(const IPEndPoint
& server_address
,
169 const IPEndPoint
& client_address
,
170 const QuicEncryptedPacket
& packet
) {
171 current_server_address_
= server_address
;
172 current_client_address_
= client_address
;
173 current_packet_
= &packet
;
174 // ProcessPacket will cause the packet to be dispatched in
175 // OnUnauthenticatedPublicHeader, or sent to the time wait list manager
176 // in OnAuthenticatedHeader.
177 framer_
.ProcessPacket(packet
);
178 // TODO(rjshade): Return a status describing if/why a packet was dropped,
179 // and log somehow. Maybe expose as a varz.
182 bool QuicDispatcher::OnUnauthenticatedPublicHeader(
183 const QuicPacketPublicHeader
& header
) {
184 QuicSession
* session
= NULL
;
186 QuicConnectionId connection_id
= header
.connection_id
;
187 SessionMap::iterator it
= session_map_
.find(connection_id
);
188 if (it
== session_map_
.end()) {
189 if (header
.reset_flag
) {
192 if (time_wait_list_manager_
->IsConnectionIdInTimeWait(connection_id
)) {
193 return HandlePacketForTimeWait(header
);
196 // Ensure the packet has a version negotiation bit set before creating a new
197 // session for it. All initial packets for a new connection are required to
198 // have the flag set. Otherwise it may be a stray packet.
199 if (header
.version_flag
) {
200 session
= CreateQuicSession(connection_id
, current_server_address_
,
201 current_client_address_
);
204 if (session
== NULL
) {
205 DVLOG(1) << "Failed to create session for " << connection_id
;
206 // Add this connection_id fo the time-wait state, to safely reject future
209 if (header
.version_flag
&&
210 !framer_
.IsSupportedVersion(header
.versions
.front())) {
211 // TODO(ianswett): Produce a no-version version negotiation packet.
215 // Use the version in the packet if possible, otherwise assume the latest.
216 QuicVersion version
= header
.version_flag
? header
.versions
.front() :
217 supported_versions_
.front();
218 time_wait_list_manager_
->AddConnectionIdToTimeWait(
219 connection_id
, version
, NULL
);
220 DCHECK(time_wait_list_manager_
->IsConnectionIdInTimeWait(connection_id
));
221 return HandlePacketForTimeWait(header
);
223 DVLOG(1) << "Created new session for " << connection_id
;
224 session_map_
.insert(make_pair(connection_id
, session
));
226 session
= it
->second
;
229 session
->connection()->ProcessUdpPacket(
230 current_server_address_
, current_client_address_
, *current_packet_
);
232 // Do not parse the packet further. The session will process it completely.
236 void QuicDispatcher::OnUnauthenticatedHeader(const QuicPacketHeader
& header
) {
237 DCHECK(time_wait_list_manager_
->IsConnectionIdInTimeWait(
238 header
.public_header
.connection_id
));
239 time_wait_list_manager_
->ProcessPacket(current_server_address_
,
240 current_client_address_
,
241 header
.public_header
.connection_id
,
242 header
.packet_sequence_number
);
245 void QuicDispatcher::CleanUpSession(SessionMap::iterator it
) {
246 QuicConnection
* connection
= it
->second
->connection();
247 QuicEncryptedPacket
* connection_close_packet
=
248 connection
->ReleaseConnectionClosePacket();
249 write_blocked_list_
.erase(connection
);
250 time_wait_list_manager_
->AddConnectionIdToTimeWait(it
->first
,
251 connection
->version(),
252 connection_close_packet
);
253 session_map_
.erase(it
);
256 void QuicDispatcher::DeleteSessions() {
257 STLDeleteElements(&closed_session_list_
);
260 void QuicDispatcher::OnCanWrite() {
261 // We got an EPOLLOUT: the socket should not be blocked.
262 writer_
->SetWritable();
264 // Give each writer one attempt to write.
265 int num_writers
= write_blocked_list_
.size();
266 for (int i
= 0; i
< num_writers
; ++i
) {
267 if (write_blocked_list_
.empty()) {
270 QuicBlockedWriterInterface
* blocked_writer
=
271 write_blocked_list_
.begin()->first
;
272 write_blocked_list_
.erase(write_blocked_list_
.begin());
273 blocked_writer
->OnCanWrite();
274 if (writer_
->IsWriteBlocked()) {
275 // We were unable to write. Wait for the next EPOLLOUT. The writer is
276 // responsible for adding itself to the blocked list via OnWriteBlocked().
282 bool QuicDispatcher::HasPendingWrites() const {
283 return !write_blocked_list_
.empty();
286 void QuicDispatcher::Shutdown() {
287 while (!session_map_
.empty()) {
288 QuicSession
* session
= session_map_
.begin()->second
;
289 session
->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY
);
290 // Validate that the session removes itself from the session map on close.
291 DCHECK(session_map_
.empty() || session_map_
.begin()->second
!= session
);
296 void QuicDispatcher::OnConnectionClosed(QuicConnectionId connection_id
,
297 QuicErrorCode error
) {
298 SessionMap::iterator it
= session_map_
.find(connection_id
);
299 if (it
== session_map_
.end()) {
300 LOG(DFATAL
) << "ConnectionId " << connection_id
301 << " does not exist in the session map. "
302 << "Error: " << QuicUtils::ErrorToString(error
);
303 LOG(DFATAL
) << base::debug::StackTrace().ToString();
307 DLOG_IF(INFO
, error
!= QUIC_NO_ERROR
) << "Closing connection ("
309 << ") due to error: "
310 << QuicUtils::ErrorToString(error
);
312 if (closed_session_list_
.empty()) {
313 epoll_server_
->RegisterAlarmApproximateDelta(
314 0, delete_sessions_alarm_
.get());
316 closed_session_list_
.push_back(it
->second
);
320 void QuicDispatcher::OnWriteBlocked(QuicBlockedWriterInterface
* writer
) {
321 DCHECK(writer_
->IsWriteBlocked());
322 write_blocked_list_
.insert(make_pair(writer
, true));
325 QuicPacketWriter
* QuicDispatcher::CreateWriter(int fd
) {
326 return new QuicDefaultPacketWriter(fd
);
329 QuicPacketWriterWrapper
* QuicDispatcher::CreateWriterWrapper(
330 QuicPacketWriter
* writer
) {
331 return new QuicPacketWriterWrapper(writer
);
334 QuicSession
* QuicDispatcher::CreateQuicSession(
335 QuicConnectionId connection_id
,
336 const IPEndPoint
& server_address
,
337 const IPEndPoint
& client_address
) {
338 QuicServerSession
* session
= new QuicServerSession(
340 CreateQuicConnection(connection_id
, server_address
, client_address
),
342 session
->InitializeSession(crypto_config_
);
346 QuicConnection
* QuicDispatcher::CreateQuicConnection(
347 QuicConnectionId connection_id
,
348 const IPEndPoint
& server_address
,
349 const IPEndPoint
& client_address
) {
350 return new QuicConnection(connection_id
, client_address
, helper_
.get(),
351 writer_
.get(), true, supported_versions_
);
354 void QuicDispatcher::set_writer(QuicPacketWriter
* writer
) {
355 writer_
->set_writer(writer
);
358 bool QuicDispatcher::HandlePacketForTimeWait(
359 const QuicPacketPublicHeader
& header
) {
360 if (header
.reset_flag
) {
361 // Public reset packets do not have sequence numbers, so ignore the packet.
365 // Switch the framer to the correct version, so that the sequence number can
366 // be parsed correctly.
367 framer_
.set_version(time_wait_list_manager_
->GetQuicVersionFromConnectionId(
368 header
.connection_id
));
370 // Continue parsing the packet to extract the sequence number. Then
371 // send it to the time wait manager in OnUnathenticatedHeader.