Bug 1814798 - pt 2. Add a PHCManager component to control PHC r=glandium,emilio
[gecko.git] / netwerk / protocol / http / Http3WebTransportStream.cpp
blobcd5a525e877ca5258b54219040905420ec646e24
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 #include "Http3WebTransportStream.h"
8 #include "HttpLog.h"
9 #include "Http3Session.h"
10 #include "Http3WebTransportSession.h"
11 #include "mozilla/TimeStamp.h"
12 #include "nsHttpHandler.h"
13 #include "nsIOService.h"
14 #include "nsIPipe.h"
15 #include "nsSocketTransportService2.h"
16 #include "nsIWebTransportStream.h"
18 namespace mozilla::net {
20 namespace {
22 // This is an nsAHttpTransaction that does nothing.
23 class DummyWebTransportStreamTransaction : public nsAHttpTransaction {
24 public:
25 NS_DECL_THREADSAFE_ISUPPORTS
27 DummyWebTransportStreamTransaction() = default;
29 void SetConnection(nsAHttpConnection*) override {}
30 nsAHttpConnection* Connection() override { return nullptr; }
31 void GetSecurityCallbacks(nsIInterfaceRequestor**) override {}
32 void OnTransportStatus(nsITransport* transport, nsresult status,
33 int64_t progress) override {}
34 bool IsDone() override { return false; }
35 nsresult Status() override { return NS_OK; }
36 uint32_t Caps() override { return 0; }
37 [[nodiscard]] nsresult ReadSegments(nsAHttpSegmentReader*, uint32_t,
38 uint32_t*) override {
39 return NS_OK;
41 [[nodiscard]] nsresult WriteSegments(nsAHttpSegmentWriter*, uint32_t,
42 uint32_t*) override {
43 return NS_OK;
45 void Close(nsresult reason) override {}
46 nsHttpConnectionInfo* ConnectionInfo() override { return nullptr; }
47 void SetProxyConnectFailed() override {}
48 nsHttpRequestHead* RequestHead() override { return nullptr; }
49 uint32_t Http1xTransactionCount() override { return 0; }
50 [[nodiscard]] nsresult TakeSubTransactions(
51 nsTArray<RefPtr<nsAHttpTransaction>>& outTransactions) override {
52 return NS_OK;
55 private:
56 virtual ~DummyWebTransportStreamTransaction() = default;
59 NS_IMPL_ISUPPORTS(DummyWebTransportStreamTransaction, nsISupportsWeakReference)
61 class WebTransportSendStreamStats : public nsIWebTransportSendStreamStats {
62 public:
63 NS_DECL_THREADSAFE_ISUPPORTS
65 explicit WebTransportSendStreamStats(uint64_t aSent, uint64_t aAcked)
66 : mTimeStamp(TimeStamp::Now()),
67 mTotalSent(aSent),
68 mTotalAcknowledged(aAcked) {}
70 NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
71 *aTimestamp = mTimeStamp;
72 return NS_OK;
74 NS_IMETHOD GetBytesSent(uint64_t* aBytesSent) override {
75 *aBytesSent = mTotalSent;
76 return NS_OK;
78 NS_IMETHOD GetBytesAcknowledged(uint64_t* aBytesAcknowledged) override {
79 *aBytesAcknowledged = mTotalAcknowledged;
80 return NS_OK;
83 private:
84 virtual ~WebTransportSendStreamStats() = default;
86 TimeStamp mTimeStamp;
87 uint64_t mTotalSent;
88 uint64_t mTotalAcknowledged;
91 NS_IMPL_ISUPPORTS(WebTransportSendStreamStats, nsIWebTransportSendStreamStats)
93 class WebTransportReceiveStreamStats
94 : public nsIWebTransportReceiveStreamStats {
95 public:
96 NS_DECL_THREADSAFE_ISUPPORTS
98 explicit WebTransportReceiveStreamStats(uint64_t aReceived)
99 : mTimeStamp(TimeStamp::Now()), mTotalReceived(aReceived) {}
101 NS_IMETHOD GetTimestamp(mozilla::TimeStamp* aTimestamp) override {
102 *aTimestamp = mTimeStamp;
103 return NS_OK;
105 NS_IMETHOD GetBytesReceived(uint64_t* aByteReceived) override {
106 *aByteReceived = mTotalReceived;
107 return NS_OK;
110 private:
111 virtual ~WebTransportReceiveStreamStats() = default;
113 TimeStamp mTimeStamp;
114 uint64_t mTotalReceived;
117 NS_IMPL_ISUPPORTS(WebTransportReceiveStreamStats,
118 nsIWebTransportReceiveStreamStats)
120 } // namespace
122 NS_IMPL_ISUPPORTS(Http3WebTransportStream, nsIInputStreamCallback)
124 Http3WebTransportStream::Http3WebTransportStream(
125 Http3Session* aSession, uint64_t aSessionId, WebTransportStreamType aType,
126 std::function<void(Result<RefPtr<Http3WebTransportStream>, nsresult>&&)>&&
127 aCallback)
128 : Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession),
129 mSessionId(aSessionId),
130 mStreamType(aType),
131 mStreamRole(OUTGOING),
132 mStreamReadyCallback(std::move(aCallback)) {
133 LOG(("Http3WebTransportStream outgoing ctor %p", this));
136 Http3WebTransportStream::Http3WebTransportStream(Http3Session* aSession,
137 uint64_t aSessionId,
138 WebTransportStreamType aType,
139 uint64_t aStreamId)
140 : Http3StreamBase(new DummyWebTransportStreamTransaction(), aSession),
141 mSessionId(aSessionId),
142 mStreamType(aType),
143 mStreamRole(INCOMING),
144 // WAITING_DATA indicates we are waiting
145 // Http3WebTransportStream::OnInputStreamReady to be called.
146 mSendState(WAITING_DATA),
147 mStreamReadyCallback(nullptr) {
148 LOG(("Http3WebTransportStream incoming ctor %p", this));
149 mStreamId = aStreamId;
152 Http3WebTransportStream::~Http3WebTransportStream() {
153 LOG(("Http3WebTransportStream dtor %p", this));
156 nsresult Http3WebTransportStream::TryActivating() {
157 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
158 return mSession->TryActivatingWebTransportStream(&mStreamId, this);
161 NS_IMETHODIMP Http3WebTransportStream::OnInputStreamReady(
162 nsIAsyncInputStream* aStream) {
163 LOG1(
164 ("Http3WebTransportStream::OnInputStreamReady [this=%p stream=%p "
165 "state=%d]",
166 this, aStream, mSendState));
167 if (mSendState == SEND_DONE) {
168 // already closed
169 return NS_OK;
172 mSendState = SENDING;
173 mSession->StreamHasDataToWrite(this);
174 return NS_OK;
177 nsresult Http3WebTransportStream::InitOutputPipe() {
178 nsCOMPtr<nsIAsyncOutputStream> out;
179 nsCOMPtr<nsIAsyncInputStream> in;
180 NS_NewPipe2(getter_AddRefs(in), getter_AddRefs(out), true, true,
181 nsIOService::gDefaultSegmentSize,
182 nsIOService::gDefaultSegmentCount);
185 MutexAutoLock lock(mMutex);
186 mSendStreamPipeIn = std::move(in);
187 mSendStreamPipeOut = std::move(out);
190 nsresult rv =
191 mSendStreamPipeIn->AsyncWait(this, 0, 0, gSocketTransportService);
192 if (NS_FAILED(rv)) {
193 return rv;
196 mSendState = WAITING_DATA;
197 return NS_OK;
200 nsresult Http3WebTransportStream::InitInputPipe() {
201 nsCOMPtr<nsIAsyncOutputStream> out;
202 nsCOMPtr<nsIAsyncInputStream> in;
203 NS_NewPipe2(getter_AddRefs(in), getter_AddRefs(out), true, true,
204 nsIOService::gDefaultSegmentSize,
205 nsIOService::gDefaultSegmentCount);
208 MutexAutoLock lock(mMutex);
209 mReceiveStreamPipeIn = std::move(in);
210 mReceiveStreamPipeOut = std::move(out);
213 mRecvState = READING;
214 return NS_OK;
217 void Http3WebTransportStream::GetWriterAndReader(
218 nsIAsyncOutputStream** aOutOutputStream,
219 nsIAsyncInputStream** aOutInputStream) {
220 nsCOMPtr<nsIAsyncOutputStream> output;
221 nsCOMPtr<nsIAsyncInputStream> input;
223 MutexAutoLock lock(mMutex);
224 output = mSendStreamPipeOut;
225 input = mReceiveStreamPipeIn;
228 output.forget(aOutOutputStream);
229 input.forget(aOutInputStream);
232 already_AddRefed<nsIWebTransportSendStreamStats>
233 Http3WebTransportStream::GetSendStreamStats() {
234 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
236 nsCOMPtr<nsIWebTransportSendStreamStats> stats =
237 new WebTransportSendStreamStats(mTotalSent, mTotalAcknowledged);
238 return stats.forget();
241 already_AddRefed<nsIWebTransportReceiveStreamStats>
242 Http3WebTransportStream::GetReceiveStreamStats() {
243 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
245 nsCOMPtr<nsIWebTransportReceiveStreamStats> stats =
246 new WebTransportReceiveStreamStats(mTotalReceived);
247 return stats.forget();
250 nsresult Http3WebTransportStream::OnReadSegment(const char* buf, uint32_t count,
251 uint32_t* countRead) {
252 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
254 LOG(("Http3WebTransportStream::OnReadSegment count=%u state=%d [this=%p]",
255 count, mSendState, this));
257 nsresult rv = NS_OK;
259 switch (mSendState) {
260 case WAITING_TO_ACTIVATE:
261 rv = TryActivating();
262 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
263 LOG3(
264 ("Http3WebTransportStream::OnReadSegment %p cannot activate now. "
265 "queued.\n",
266 this));
267 break;
269 if (NS_FAILED(rv)) {
270 LOG3(
271 ("Http3WebTransportStream::OnReadSegment %p cannot activate "
272 "error=0x%" PRIx32 ".",
273 this, static_cast<uint32_t>(rv)));
274 mStreamReadyCallback(Err(rv));
275 mStreamReadyCallback = nullptr;
276 break;
279 rv = InitOutputPipe();
280 if (NS_SUCCEEDED(rv) && mStreamType == WebTransportStreamType::BiDi) {
281 rv = InitInputPipe();
283 if (NS_FAILED(rv)) {
284 LOG3(
285 ("Http3WebTransportStream::OnReadSegment %p failed to create pipe "
286 "error=0x%" PRIx32 ".",
287 this, static_cast<uint32_t>(rv)));
288 mSendState = SEND_DONE;
289 mStreamReadyCallback(Err(rv));
290 mStreamReadyCallback = nullptr;
291 break;
294 // Successfully activated.
295 mStreamReadyCallback(RefPtr{this});
296 mStreamReadyCallback = nullptr;
297 break;
298 case SENDING: {
299 rv = mSession->SendRequestBody(mStreamId, buf, count, countRead);
300 LOG3(
301 ("Http3WebTransportStream::OnReadSegment %p sending body returns "
302 "error=0x%" PRIx32 ".",
303 this, static_cast<uint32_t>(rv)));
304 mTotalSent += *countRead;
305 } break;
306 case WAITING_DATA:
307 // Still waiting
308 LOG3((
309 "Http3WebTransportStream::OnReadSegment %p Still waiting for data...",
310 this));
311 break;
312 case SEND_DONE:
313 LOG3(("Http3WebTransportStream::OnReadSegment %p called after SEND_DONE ",
314 this));
315 MOZ_ASSERT(false, "We are done sending this request!");
316 MOZ_ASSERT(mStreamReadyCallback);
317 rv = NS_ERROR_UNEXPECTED;
318 mStreamReadyCallback(Err(rv));
319 mStreamReadyCallback = nullptr;
320 break;
323 mSocketOutCondition = rv;
325 return mSocketOutCondition;
328 // static
329 nsresult Http3WebTransportStream::ReadRequestSegment(
330 nsIInputStream* stream, void* closure, const char* buf, uint32_t offset,
331 uint32_t count, uint32_t* countRead) {
332 Http3WebTransportStream* wtStream = (Http3WebTransportStream*)closure;
333 nsresult rv = wtStream->OnReadSegment(buf, count, countRead);
334 LOG(("Http3WebTransportStream::ReadRequestSegment %p read=%u", wtStream,
335 *countRead));
336 return rv;
339 nsresult Http3WebTransportStream::ReadSegments() {
340 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
341 LOG(("Http3WebTransportStream::ReadSegments [this=%p]", this));
342 nsresult rv = NS_OK;
343 uint32_t sendBytes = 0;
344 bool again = true;
345 do {
346 sendBytes = 0;
347 rv = mSocketOutCondition = NS_OK;
348 LOG(("Http3WebTransportStream::ReadSegments state=%d [this=%p]", mSendState,
349 this));
350 switch (mSendState) {
351 case WAITING_TO_ACTIVATE: {
352 LOG3(
353 ("Http3WebTransportStream %p ReadSegments forcing OnReadSegment "
354 "call\n",
355 this));
356 uint32_t wasted = 0;
357 nsresult rv2 = OnReadSegment("", 0, &wasted);
358 LOG3((" OnReadSegment returned 0x%08" PRIx32,
359 static_cast<uint32_t>(rv2)));
360 if (mSendState != WAITING_DATA) {
361 break;
364 [[fallthrough]];
365 case WAITING_DATA:
366 [[fallthrough]];
367 case SENDING: {
368 if (mStreamRole == INCOMING &&
369 mStreamType == WebTransportStreamType::UniDi) {
370 rv = NS_OK;
371 break;
373 mSendState = SENDING;
374 rv = mSendStreamPipeIn->ReadSegments(ReadRequestSegment, this,
375 nsIOService::gDefaultSegmentSize,
376 &sendBytes);
377 } break;
378 case SEND_DONE: {
379 return NS_OK;
381 default:
382 sendBytes = 0;
383 rv = NS_OK;
384 break;
387 LOG(("Http3WebTransportStream::ReadSegments rv=0x%" PRIx32
388 " read=%u sock-cond=%" PRIx32 " again=%d mSendFin=%d [this=%p]",
389 static_cast<uint32_t>(rv), sendBytes,
390 static_cast<uint32_t>(mSocketOutCondition), again, mSendFin, this));
392 // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
393 if (rv == NS_BASE_STREAM_CLOSED || !mPendingTasks.IsEmpty()) {
394 rv = NS_OK;
395 sendBytes = 0;
398 if (NS_FAILED(rv)) {
399 // if the writer didn't want to write any more data, then
400 // wait for the transaction to call ResumeSend.
401 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
402 mSendState = WAITING_DATA;
403 rv = mSendStreamPipeIn->AsyncWait(this, 0, 0, gSocketTransportService);
405 again = false;
407 // Got a WebTransport specific error
408 if (rv >= NS_ERROR_WEBTRANSPORT_CODE_BASE &&
409 rv <= NS_ERROR_WEBTRANSPORT_CODE_END) {
410 uint8_t errorCode = GetWebTransportErrorFromNSResult(rv);
411 mSendState = SEND_DONE;
412 Reset(WebTransportErrorToHttp3Error(errorCode));
413 rv = NS_OK;
415 } else if (NS_FAILED(mSocketOutCondition)) {
416 if (mSocketOutCondition != NS_BASE_STREAM_WOULD_BLOCK) {
417 rv = mSocketOutCondition;
419 again = false;
420 } else if (!sendBytes) {
421 mSendState = SEND_DONE;
422 rv = NS_OK;
423 again = false;
424 if (!mPendingTasks.IsEmpty()) {
425 LOG(("Has pending tasks to do"));
426 nsTArray<std::function<void()>> tasks = std::move(mPendingTasks);
427 for (const auto& task : tasks) {
428 task();
431 // Tell the underlying stream we're done
432 SendFin();
435 // write more to the socket until error or end-of-request...
436 } while (again && gHttpHandler->Active());
437 return rv;
440 nsresult Http3WebTransportStream::OnWriteSegment(char* buf, uint32_t count,
441 uint32_t* countWritten) {
442 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
444 LOG(("Http3WebTransportStream::OnWriteSegment [this=%p, state=%d", this,
445 static_cast<uint32_t>(mRecvState)));
446 nsresult rv = NS_OK;
447 switch (mRecvState) {
448 case READING: {
449 rv = mSession->ReadResponseData(mStreamId, buf, count, countWritten,
450 &mFin);
451 if (*countWritten == 0) {
452 if (mFin) {
453 mRecvState = RECV_DONE;
454 rv = NS_BASE_STREAM_CLOSED;
455 } else {
456 rv = NS_BASE_STREAM_WOULD_BLOCK;
458 } else {
459 mTotalReceived += *countWritten;
460 if (mFin) {
461 mRecvState = RECEIVED_FIN;
464 } break;
465 case RECEIVED_FIN:
466 rv = NS_BASE_STREAM_CLOSED;
467 mRecvState = RECV_DONE;
468 break;
469 case RECV_DONE:
470 rv = NS_ERROR_UNEXPECTED;
471 break;
472 default:
473 rv = NS_ERROR_UNEXPECTED;
474 break;
477 // Remember the error received from lower layers. A stream pipe may overwrite
478 // it.
479 // If rv == NS_OK this will reset mSocketInCondition.
480 mSocketInCondition = rv;
482 return rv;
485 // static
486 nsresult Http3WebTransportStream::WritePipeSegment(nsIOutputStream* stream,
487 void* closure, char* buf,
488 uint32_t offset,
489 uint32_t count,
490 uint32_t* countWritten) {
491 Http3WebTransportStream* self = (Http3WebTransportStream*)closure;
493 nsresult rv = self->OnWriteSegment(buf, count, countWritten);
494 if (NS_FAILED(rv)) {
495 return rv;
498 LOG(("Http3WebTransportStream::WritePipeSegment %p written=%u", self,
499 *countWritten));
501 return rv;
504 nsresult Http3WebTransportStream::WriteSegments() {
505 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
506 if (!mReceiveStreamPipeOut) {
507 return NS_OK;
510 LOG(("Http3WebTransportStream::WriteSegments [this=%p]", this));
512 nsresult rv = NS_OK;
513 uint32_t countWrittenSingle = 0;
514 bool again = true;
516 do {
517 mSocketInCondition = NS_OK;
518 countWrittenSingle = 0;
519 rv = mReceiveStreamPipeOut->WriteSegments(WritePipeSegment, this,
520 nsIOService::gDefaultSegmentSize,
521 &countWrittenSingle);
522 LOG(("Http3WebTransportStream::WriteSegments rv=0x%" PRIx32
523 " countWrittenSingle=%" PRIu32 " socketin=%" PRIx32 " [this=%p]",
524 static_cast<uint32_t>(rv), countWrittenSingle,
525 static_cast<uint32_t>(mSocketInCondition), this));
526 if (NS_FAILED(rv)) {
527 if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
528 rv = NS_OK;
530 again = false;
531 } else if (NS_FAILED(mSocketInCondition)) {
532 if (mSocketInCondition != NS_BASE_STREAM_WOULD_BLOCK) {
533 rv = mSocketInCondition;
534 if (rv == NS_BASE_STREAM_CLOSED) {
535 mReceiveStreamPipeOut->Close();
536 rv = NS_OK;
539 again = false;
541 // read more from the socket until error...
542 } while (again && gHttpHandler->Active());
544 return rv;
547 bool Http3WebTransportStream::Done() const {
548 return mSendState == SEND_DONE && mRecvState == RECV_DONE;
551 void Http3WebTransportStream::Close(nsresult aResult) {
552 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
553 LOG(("Http3WebTransportStream::Close [this=%p]", this));
554 mTransaction = nullptr;
555 if (mSendStreamPipeIn) {
556 mSendStreamPipeIn->AsyncWait(nullptr, 0, 0, nullptr);
557 mSendStreamPipeIn->CloseWithStatus(aResult);
559 if (mReceiveStreamPipeOut) {
560 mReceiveStreamPipeOut->AsyncWait(nullptr, 0, 0, nullptr);
561 mReceiveStreamPipeOut->CloseWithStatus(aResult);
563 mSendState = SEND_DONE;
564 mRecvState = RECV_DONE;
565 mSession = nullptr;
568 void Http3WebTransportStream::SendFin() {
569 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
570 LOG(("Http3WebTransportStream::SendFin [this=%p mSendState=%d]", this,
571 mSendState));
573 if (mSendFin || !mSession || mResetError) {
574 // Already closed.
575 return;
578 mSendFin = true;
580 switch (mSendState) {
581 case SENDING: {
582 mPendingTasks.AppendElement([self = RefPtr{this}]() {
583 self->mSession->CloseSendingSide(self->mStreamId);
585 } break;
586 case WAITING_DATA:
587 mSendState = SEND_DONE;
588 [[fallthrough]];
589 case SEND_DONE:
590 mSession->CloseSendingSide(mStreamId);
591 // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
592 mSession->StreamHasDataToWrite(this);
593 break;
594 default:
595 MOZ_ASSERT_UNREACHABLE("invalid mSendState!");
596 break;
600 void Http3WebTransportStream::Reset(uint64_t aErrorCode) {
601 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
602 LOG(("Http3WebTransportStream::Reset [this=%p, mSendState=%d]", this,
603 mSendState));
605 if (mResetError || !mSession || mSendFin) {
606 // The stream is already reset.
607 return;
610 mResetError = Some(aErrorCode);
612 switch (mSendState) {
613 case SENDING: {
614 LOG(("Http3WebTransportStream::Reset [this=%p] reset after sending data",
615 this));
616 mPendingTasks.AppendElement([self = RefPtr{this}]() {
617 // "Reset" needs a special treatment here. If we are sending data and
618 // ResetWebTransportStream is called before Http3Session::ProcessOutput,
619 // neqo will drop the last piece of data.
620 NS_DispatchToCurrentThread(
621 NS_NewRunnableFunction("Http3WebTransportStream::Reset", [self]() {
622 self->mSession->ResetWebTransportStream(self, *self->mResetError);
623 self->mSession->StreamHasDataToWrite(self);
624 self->mSession->ConnectSlowConsumer(self);
625 }));
627 } break;
628 case WAITING_DATA:
629 mSendState = SEND_DONE;
630 [[fallthrough]];
631 case SEND_DONE:
632 mSession->ResetWebTransportStream(this, *mResetError);
633 // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
634 mSession->StreamHasDataToWrite(this);
635 mSession->ConnectSlowConsumer(this);
636 break;
637 default:
638 MOZ_ASSERT_UNREACHABLE("invalid mSendState!");
639 break;
643 void Http3WebTransportStream::SendStopSending(uint8_t aErrorCode) {
644 MOZ_ASSERT(OnSocketThread(), "not on socket thread");
645 LOG(("Http3WebTransportStream::SendStopSending [this=%p, mSendState=%d]",
646 this, mSendState));
648 if (mSendState == WAITING_TO_ACTIVATE) {
649 return;
652 if (mStopSendingError || !mSession) {
653 return;
656 mStopSendingError = Some(aErrorCode);
658 mSession->StreamStopSending(this, *mStopSendingError);
659 // StreamHasDataToWrite needs to be called to trigger ProcessOutput.
660 mSession->StreamHasDataToWrite(this);
663 void Http3WebTransportStream::SetSendOrder(int64_t aSendOrder) {
664 mSession->SetSendOrder(this, aSendOrder);
667 } // namespace mozilla::net