Backed out changeset a47a1ff34d2a (bug 1853444) for causing failures on /webtransport...
[gecko.git] / netwerk / protocol / webtransport / WebTransportStreamProxy.cpp
blob54578d580e09d80448f370c837196f6336828301
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 #include "WebTransportStreamProxy.h"
8 #include "WebTransportLog.h"
9 #include "Http3WebTransportStream.h"
10 #include "nsProxyRelease.h"
11 #include "nsSocketTransportService2.h"
13 namespace mozilla::net {
15 NS_IMPL_ADDREF(WebTransportStreamProxy)
16 NS_IMPL_RELEASE(WebTransportStreamProxy)
18 NS_INTERFACE_MAP_BEGIN(WebTransportStreamProxy)
19 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIWebTransportReceiveStream)
20 NS_INTERFACE_MAP_ENTRY(nsIWebTransportReceiveStream)
21 NS_INTERFACE_MAP_ENTRY(nsIWebTransportSendStream)
22 NS_INTERFACE_MAP_ENTRY(nsIWebTransportBidirectionalStream)
23 NS_INTERFACE_MAP_END
25 WebTransportStreamProxy::WebTransportStreamProxy(
26 Http3WebTransportStream* aStream)
27 : mWebTransportStream(aStream) {
28 nsCOMPtr<nsIAsyncInputStream> inputStream;
29 nsCOMPtr<nsIAsyncOutputStream> outputStream;
30 mWebTransportStream->GetWriterAndReader(getter_AddRefs(outputStream),
31 getter_AddRefs(inputStream));
32 if (outputStream) {
33 mWriter = new AsyncOutputStreamWrapper(outputStream);
35 if (inputStream) {
36 mReader = new AsyncInputStreamWrapper(inputStream, mWebTransportStream);
40 WebTransportStreamProxy::~WebTransportStreamProxy() {
41 // mWebTransportStream needs to be destroyed on the socket thread.
42 NS_ProxyRelease("WebTransportStreamProxy::~WebTransportStreamProxy",
43 gSocketTransportService, mWebTransportStream.forget());
46 NS_IMETHODIMP WebTransportStreamProxy::SendStopSending(uint8_t aError) {
47 if (!OnSocketThread()) {
48 RefPtr<WebTransportStreamProxy> self(this);
49 return gSocketTransportService->Dispatch(
50 NS_NewRunnableFunction("WebTransportStreamProxy::SendStopSending",
51 [self{std::move(self)}, error(aError)]() {
52 self->SendStopSending(error);
53 }));
56 mWebTransportStream->SendStopSending(aError);
57 return NS_OK;
60 NS_IMETHODIMP WebTransportStreamProxy::SendFin(void) {
61 if (!mWriter) {
62 return NS_ERROR_UNEXPECTED;
65 mWriter->Close();
67 if (!OnSocketThread()) {
68 RefPtr<WebTransportStreamProxy> self(this);
69 return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
70 "WebTransportStreamProxy::SendFin",
71 [self{std::move(self)}]() { self->mWebTransportStream->SendFin(); }));
74 mWebTransportStream->SendFin();
75 return NS_OK;
78 NS_IMETHODIMP WebTransportStreamProxy::Reset(uint8_t aErrorCode) {
79 if (!mWriter) {
80 return NS_ERROR_UNEXPECTED;
83 mWriter->Close();
85 if (!OnSocketThread()) {
86 RefPtr<WebTransportStreamProxy> self(this);
87 return gSocketTransportService->Dispatch(
88 NS_NewRunnableFunction("WebTransportStreamProxy::Reset",
89 [self{std::move(self)}, error(aErrorCode)]() {
90 self->mWebTransportStream->Reset(error);
91 }));
94 mWebTransportStream->Reset(aErrorCode);
95 return NS_OK;
98 namespace {
100 class StatsCallbackWrapper : public nsIWebTransportStreamStatsCallback {
101 public:
102 NS_DECL_THREADSAFE_ISUPPORTS
104 explicit StatsCallbackWrapper(nsIWebTransportStreamStatsCallback* aCallback)
105 : mCallback(aCallback), mTarget(GetCurrentSerialEventTarget()) {}
107 NS_IMETHOD OnSendStatsAvailable(
108 nsIWebTransportSendStreamStats* aStats) override {
109 if (!mTarget->IsOnCurrentThread()) {
110 RefPtr<StatsCallbackWrapper> self(this);
111 nsCOMPtr<nsIWebTransportSendStreamStats> stats = aStats;
112 Unused << mTarget->Dispatch(NS_NewRunnableFunction(
113 "StatsCallbackWrapper::OnSendStatsAvailable",
114 [self{std::move(self)}, stats{std::move(stats)}]() {
115 self->OnSendStatsAvailable(stats);
116 }));
117 return NS_OK;
120 mCallback->OnSendStatsAvailable(aStats);
121 return NS_OK;
124 NS_IMETHOD OnReceiveStatsAvailable(
125 nsIWebTransportReceiveStreamStats* aStats) override {
126 if (!mTarget->IsOnCurrentThread()) {
127 RefPtr<StatsCallbackWrapper> self(this);
128 nsCOMPtr<nsIWebTransportReceiveStreamStats> stats = aStats;
129 Unused << mTarget->Dispatch(NS_NewRunnableFunction(
130 "StatsCallbackWrapper::OnReceiveStatsAvailable",
131 [self{std::move(self)}, stats{std::move(stats)}]() {
132 self->OnReceiveStatsAvailable(stats);
133 }));
134 return NS_OK;
137 mCallback->OnReceiveStatsAvailable(aStats);
138 return NS_OK;
141 private:
142 virtual ~StatsCallbackWrapper() {
143 NS_ProxyRelease("StatsCallbackWrapper::~StatsCallbackWrapper", mTarget,
144 mCallback.forget());
147 nsCOMPtr<nsIWebTransportStreamStatsCallback> mCallback;
148 nsCOMPtr<nsIEventTarget> mTarget;
151 NS_IMPL_ISUPPORTS(StatsCallbackWrapper, nsIWebTransportStreamStatsCallback)
153 } // namespace
155 NS_IMETHODIMP WebTransportStreamProxy::GetSendStreamStats(
156 nsIWebTransportStreamStatsCallback* aCallback) {
157 if (!OnSocketThread()) {
158 RefPtr<WebTransportStreamProxy> self(this);
159 nsCOMPtr<nsIWebTransportStreamStatsCallback> callback =
160 new StatsCallbackWrapper(aCallback);
161 return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
162 "WebTransportStreamProxy::GetSendStreamStats",
163 [self{std::move(self)}, callback{std::move(callback)}]() {
164 self->GetSendStreamStats(callback);
165 }));
168 nsCOMPtr<nsIWebTransportSendStreamStats> stats =
169 mWebTransportStream->GetSendStreamStats();
170 aCallback->OnSendStatsAvailable(stats);
171 return NS_OK;
174 NS_IMETHODIMP WebTransportStreamProxy::GetReceiveStreamStats(
175 nsIWebTransportStreamStatsCallback* aCallback) {
176 if (!OnSocketThread()) {
177 RefPtr<WebTransportStreamProxy> self(this);
178 nsCOMPtr<nsIWebTransportStreamStatsCallback> callback =
179 new StatsCallbackWrapper(aCallback);
180 return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
181 "WebTransportStreamProxy::GetReceiveStreamStats",
182 [self{std::move(self)}, callback{std::move(callback)}]() {
183 self->GetReceiveStreamStats(callback);
184 }));
187 nsCOMPtr<nsIWebTransportReceiveStreamStats> stats =
188 mWebTransportStream->GetReceiveStreamStats();
189 aCallback->OnReceiveStatsAvailable(stats);
190 return NS_OK;
193 NS_IMETHODIMP WebTransportStreamProxy::GetHasReceivedFIN(
194 bool* aHasReceivedFIN) {
195 *aHasReceivedFIN = mWebTransportStream->RecvDone();
196 return NS_OK;
199 NS_IMETHODIMP WebTransportStreamProxy::GetInputStream(
200 nsIAsyncInputStream** aOut) {
201 if (!mReader) {
202 return NS_ERROR_NOT_AVAILABLE;
205 RefPtr<AsyncInputStreamWrapper> stream = mReader;
206 stream.forget(aOut);
207 return NS_OK;
210 NS_IMETHODIMP WebTransportStreamProxy::GetOutputStream(
211 nsIAsyncOutputStream** aOut) {
212 if (!mWriter) {
213 return NS_ERROR_NOT_AVAILABLE;
216 RefPtr<AsyncOutputStreamWrapper> stream = mWriter;
217 stream.forget(aOut);
218 return NS_OK;
221 NS_IMETHODIMP WebTransportStreamProxy::GetStreamId(uint64_t* aId) {
222 *aId = mWebTransportStream->StreamId();
223 return NS_OK;
226 NS_IMETHODIMP WebTransportStreamProxy::SetSendOrder(int64_t aSendOrder) {
227 mWebTransportStream->SetSendOrder(aSendOrder);
228 return NS_OK;
231 //------------------------------------------------------------------------------
232 // WebTransportStreamProxy::AsyncInputStreamWrapper
233 //------------------------------------------------------------------------------
235 NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncInputStreamWrapper,
236 nsIInputStream, nsIAsyncInputStream)
238 WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncInputStreamWrapper(
239 nsIAsyncInputStream* aStream, Http3WebTransportStream* aWebTransportStream)
240 : mStream(aStream), mWebTransportStream(aWebTransportStream) {}
242 WebTransportStreamProxy::AsyncInputStreamWrapper::~AsyncInputStreamWrapper() =
243 default;
245 void WebTransportStreamProxy::AsyncInputStreamWrapper::MaybeCloseStream() {
246 if (!mWebTransportStream->RecvDone()) {
247 return;
250 uint64_t available = 0;
251 Unused << Available(&available);
252 if (available) {
253 // Don't close the InputStream if there's unread data available, since it
254 // would be lost. We exit above unless we know no more data will be received
255 // for the stream.
256 return;
259 LOG(
260 ("AsyncInputStreamWrapper::MaybeCloseStream close stream due to FIN "
261 "stream=%p",
262 mWebTransportStream.get()));
263 Close();
266 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Close() {
267 return mStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
270 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Available(
271 uint64_t* aAvailable) {
272 return mStream->Available(aAvailable);
275 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::StreamStatus() {
276 return mStream->StreamStatus();
279 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::Read(
280 char* aBuf, uint32_t aCount, uint32_t* aResult) {
281 LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::Read %p", this));
282 nsresult rv = mStream->Read(aBuf, aCount, aResult);
283 MaybeCloseStream();
284 return rv;
287 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments(
288 nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
289 uint32_t* aResult) {
290 LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments %p",
291 this));
292 nsresult rv = mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
293 if (*aResult > 0) {
294 LOG((" Read %u bytes", *aResult));
296 MaybeCloseStream();
297 return rv;
300 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::IsNonBlocking(
301 bool* aResult) {
302 return mStream->IsNonBlocking(aResult);
305 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::CloseWithStatus(
306 nsresult aStatus) {
307 return mStream->CloseWithStatus(aStatus);
310 NS_IMETHODIMP WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncWait(
311 nsIInputStreamCallback* aCallback, uint32_t aFlags,
312 uint32_t aRequestedCount, nsIEventTarget* aEventTarget) {
313 return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget);
316 //------------------------------------------------------------------------------
317 // WebTransportStreamProxy::AsyncOutputStreamWrapper
318 //------------------------------------------------------------------------------
320 NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncOutputStreamWrapper,
321 nsIOutputStream, nsIAsyncOutputStream)
323 WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncOutputStreamWrapper(
324 nsIAsyncOutputStream* aStream)
325 : mStream(aStream) {}
327 WebTransportStreamProxy::AsyncOutputStreamWrapper::~AsyncOutputStreamWrapper() =
328 default;
330 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Flush() {
331 return mStream->Flush();
334 NS_IMETHODIMP
335 WebTransportStreamProxy::AsyncOutputStreamWrapper::StreamStatus() {
336 return mStream->StreamStatus();
339 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Write(
340 const char* aBuf, uint32_t aCount, uint32_t* aResult) {
341 LOG(("WebTransportStreamProxy::AsyncOutputStreamWrapper::Write %p %u bytes",
342 this, aCount));
343 return mStream->Write(aBuf, aCount, aResult);
346 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteFrom(
347 nsIInputStream* aFromStream, uint32_t aCount, uint32_t* aResult) {
348 return mStream->WriteFrom(aFromStream, aCount, aResult);
351 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteSegments(
352 nsReadSegmentFun aReader, void* aClosure, uint32_t aCount,
353 uint32_t* aResult) {
354 return mStream->WriteSegments(aReader, aClosure, aCount, aResult);
357 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncWait(
358 nsIOutputStreamCallback* aCallback, uint32_t aFlags,
359 uint32_t aRequestedCount, nsIEventTarget* aEventTarget) {
360 return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aEventTarget);
363 NS_IMETHODIMP
364 WebTransportStreamProxy::AsyncOutputStreamWrapper::CloseWithStatus(
365 nsresult aStatus) {
366 return mStream->CloseWithStatus(aStatus);
369 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::Close() {
370 return mStream->Close();
373 NS_IMETHODIMP WebTransportStreamProxy::AsyncOutputStreamWrapper::IsNonBlocking(
374 bool* aResult) {
375 return mStream->IsNonBlocking(aResult);
378 } // namespace mozilla::net