1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 #include "WebTransportStreamProxy.h"
8 #include "WebTransportLog.h"
9 #include "Http3WebTransportStream.h"
10 #include "nsProxyRelease.h"
11 #include "nsSocketTransportService2.h"
13 namespace mozilla::net
{
15 NS_IMPL_ADDREF(WebTransportStreamProxy
)
16 NS_IMPL_RELEASE(WebTransportStreamProxy
)
18 NS_INTERFACE_MAP_BEGIN(WebTransportStreamProxy
)
19 NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports
, nsIWebTransportReceiveStream
)
20 NS_INTERFACE_MAP_ENTRY(nsIWebTransportReceiveStream
)
21 NS_INTERFACE_MAP_ENTRY(nsIWebTransportSendStream
)
22 NS_INTERFACE_MAP_ENTRY(nsIWebTransportBidirectionalStream
)
25 WebTransportStreamProxy::WebTransportStreamProxy(
26 Http3WebTransportStream
* aStream
)
27 : mWebTransportStream(aStream
) {
28 nsCOMPtr
<nsIAsyncInputStream
> inputStream
;
29 nsCOMPtr
<nsIAsyncOutputStream
> outputStream
;
30 mWebTransportStream
->GetWriterAndReader(getter_AddRefs(outputStream
),
31 getter_AddRefs(inputStream
));
33 mWriter
= new AsyncOutputStreamWrapper(outputStream
);
36 mReader
= new AsyncInputStreamWrapper(inputStream
, mWebTransportStream
);
40 WebTransportStreamProxy::~WebTransportStreamProxy() {
41 // mWebTransportStream needs to be destroyed on the socket thread.
42 NS_ProxyRelease("WebTransportStreamProxy::~WebTransportStreamProxy",
43 gSocketTransportService
, mWebTransportStream
.forget());
46 NS_IMETHODIMP
WebTransportStreamProxy::SendStopSending(uint8_t aError
) {
47 if (!OnSocketThread()) {
48 RefPtr
<WebTransportStreamProxy
> self(this);
49 return gSocketTransportService
->Dispatch(
50 NS_NewRunnableFunction("WebTransportStreamProxy::SendStopSending",
51 [self
{std::move(self
)}, error(aError
)]() {
52 self
->SendStopSending(error
);
56 mWebTransportStream
->SendStopSending(aError
);
60 NS_IMETHODIMP
WebTransportStreamProxy::SendFin(void) {
62 return NS_ERROR_UNEXPECTED
;
67 if (!OnSocketThread()) {
68 RefPtr
<WebTransportStreamProxy
> self(this);
69 return gSocketTransportService
->Dispatch(NS_NewRunnableFunction(
70 "WebTransportStreamProxy::SendFin",
71 [self
{std::move(self
)}]() { self
->mWebTransportStream
->SendFin(); }));
74 mWebTransportStream
->SendFin();
78 NS_IMETHODIMP
WebTransportStreamProxy::Reset(uint8_t aErrorCode
) {
80 return NS_ERROR_UNEXPECTED
;
85 if (!OnSocketThread()) {
86 RefPtr
<WebTransportStreamProxy
> self(this);
87 return gSocketTransportService
->Dispatch(
88 NS_NewRunnableFunction("WebTransportStreamProxy::Reset",
89 [self
{std::move(self
)}, error(aErrorCode
)]() {
90 self
->mWebTransportStream
->Reset(error
);
94 mWebTransportStream
->Reset(aErrorCode
);
100 class StatsCallbackWrapper
: public nsIWebTransportStreamStatsCallback
{
102 NS_DECL_THREADSAFE_ISUPPORTS
104 explicit StatsCallbackWrapper(nsIWebTransportStreamStatsCallback
* aCallback
)
105 : mCallback(aCallback
), mTarget(GetCurrentSerialEventTarget()) {}
107 NS_IMETHOD
OnSendStatsAvailable(
108 nsIWebTransportSendStreamStats
* aStats
) override
{
109 if (!mTarget
->IsOnCurrentThread()) {
110 RefPtr
<StatsCallbackWrapper
> self(this);
111 nsCOMPtr
<nsIWebTransportSendStreamStats
> stats
= aStats
;
112 Unused
<< mTarget
->Dispatch(NS_NewRunnableFunction(
113 "StatsCallbackWrapper::OnSendStatsAvailable",
114 [self
{std::move(self
)}, stats
{std::move(stats
)}]() {
115 self
->OnSendStatsAvailable(stats
);
120 mCallback
->OnSendStatsAvailable(aStats
);
124 NS_IMETHOD
OnReceiveStatsAvailable(
125 nsIWebTransportReceiveStreamStats
* aStats
) override
{
126 if (!mTarget
->IsOnCurrentThread()) {
127 RefPtr
<StatsCallbackWrapper
> self(this);
128 nsCOMPtr
<nsIWebTransportReceiveStreamStats
> stats
= aStats
;
129 Unused
<< mTarget
->Dispatch(NS_NewRunnableFunction(
130 "StatsCallbackWrapper::OnReceiveStatsAvailable",
131 [self
{std::move(self
)}, stats
{std::move(stats
)}]() {
132 self
->OnReceiveStatsAvailable(stats
);
137 mCallback
->OnReceiveStatsAvailable(aStats
);
142 virtual ~StatsCallbackWrapper() {
143 NS_ProxyRelease("StatsCallbackWrapper::~StatsCallbackWrapper", mTarget
,
147 nsCOMPtr
<nsIWebTransportStreamStatsCallback
> mCallback
;
148 nsCOMPtr
<nsIEventTarget
> mTarget
;
151 NS_IMPL_ISUPPORTS(StatsCallbackWrapper
, nsIWebTransportStreamStatsCallback
)
155 NS_IMETHODIMP
WebTransportStreamProxy::GetSendStreamStats(
156 nsIWebTransportStreamStatsCallback
* aCallback
) {
157 if (!OnSocketThread()) {
158 RefPtr
<WebTransportStreamProxy
> self(this);
159 nsCOMPtr
<nsIWebTransportStreamStatsCallback
> callback
=
160 new StatsCallbackWrapper(aCallback
);
161 return gSocketTransportService
->Dispatch(NS_NewRunnableFunction(
162 "WebTransportStreamProxy::GetSendStreamStats",
163 [self
{std::move(self
)}, callback
{std::move(callback
)}]() {
164 self
->GetSendStreamStats(callback
);
168 nsCOMPtr
<nsIWebTransportSendStreamStats
> stats
=
169 mWebTransportStream
->GetSendStreamStats();
170 aCallback
->OnSendStatsAvailable(stats
);
174 NS_IMETHODIMP
WebTransportStreamProxy::GetReceiveStreamStats(
175 nsIWebTransportStreamStatsCallback
* aCallback
) {
176 if (!OnSocketThread()) {
177 RefPtr
<WebTransportStreamProxy
> self(this);
178 nsCOMPtr
<nsIWebTransportStreamStatsCallback
> callback
=
179 new StatsCallbackWrapper(aCallback
);
180 return gSocketTransportService
->Dispatch(NS_NewRunnableFunction(
181 "WebTransportStreamProxy::GetReceiveStreamStats",
182 [self
{std::move(self
)}, callback
{std::move(callback
)}]() {
183 self
->GetReceiveStreamStats(callback
);
187 nsCOMPtr
<nsIWebTransportReceiveStreamStats
> stats
=
188 mWebTransportStream
->GetReceiveStreamStats();
189 aCallback
->OnReceiveStatsAvailable(stats
);
193 NS_IMETHODIMP
WebTransportStreamProxy::GetHasReceivedFIN(
194 bool* aHasReceivedFIN
) {
195 *aHasReceivedFIN
= mWebTransportStream
->RecvDone();
199 NS_IMETHODIMP
WebTransportStreamProxy::GetInputStream(
200 nsIAsyncInputStream
** aOut
) {
202 return NS_ERROR_NOT_AVAILABLE
;
205 RefPtr
<AsyncInputStreamWrapper
> stream
= mReader
;
210 NS_IMETHODIMP
WebTransportStreamProxy::GetOutputStream(
211 nsIAsyncOutputStream
** aOut
) {
213 return NS_ERROR_NOT_AVAILABLE
;
216 RefPtr
<AsyncOutputStreamWrapper
> stream
= mWriter
;
221 NS_IMETHODIMP
WebTransportStreamProxy::GetStreamId(uint64_t* aId
) {
222 *aId
= mWebTransportStream
->StreamId();
226 NS_IMETHODIMP
WebTransportStreamProxy::SetSendOrder(int64_t aSendOrder
) {
227 mWebTransportStream
->SetSendOrder(aSendOrder
);
231 //------------------------------------------------------------------------------
232 // WebTransportStreamProxy::AsyncInputStreamWrapper
233 //------------------------------------------------------------------------------
235 NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncInputStreamWrapper
,
236 nsIInputStream
, nsIAsyncInputStream
)
238 WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncInputStreamWrapper(
239 nsIAsyncInputStream
* aStream
, Http3WebTransportStream
* aWebTransportStream
)
240 : mStream(aStream
), mWebTransportStream(aWebTransportStream
) {}
242 WebTransportStreamProxy::AsyncInputStreamWrapper::~AsyncInputStreamWrapper() =
245 void WebTransportStreamProxy::AsyncInputStreamWrapper::MaybeCloseStream() {
246 if (!mWebTransportStream
->RecvDone()) {
250 uint64_t available
= 0;
251 Unused
<< Available(&available
);
253 // Don't close the InputStream if there's unread data available, since it
254 // would be lost. We exit above unless we know no more data will be received
260 ("AsyncInputStreamWrapper::MaybeCloseStream close stream due to FIN "
262 mWebTransportStream
.get()));
266 NS_IMETHODIMP
WebTransportStreamProxy::AsyncInputStreamWrapper::Close() {
267 return mStream
->CloseWithStatus(NS_BASE_STREAM_CLOSED
);
270 NS_IMETHODIMP
WebTransportStreamProxy::AsyncInputStreamWrapper::Available(
271 uint64_t* aAvailable
) {
272 return mStream
->Available(aAvailable
);
275 NS_IMETHODIMP
WebTransportStreamProxy::AsyncInputStreamWrapper::StreamStatus() {
276 return mStream
->StreamStatus();
279 NS_IMETHODIMP
WebTransportStreamProxy::AsyncInputStreamWrapper::Read(
280 char* aBuf
, uint32_t aCount
, uint32_t* aResult
) {
281 LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::Read %p", this));
282 nsresult rv
= mStream
->Read(aBuf
, aCount
, aResult
);
287 NS_IMETHODIMP
WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments(
288 nsWriteSegmentFun aWriter
, void* aClosure
, uint32_t aCount
,
290 LOG(("WebTransportStreamProxy::AsyncInputStreamWrapper::ReadSegments %p",
292 nsresult rv
= mStream
->ReadSegments(aWriter
, aClosure
, aCount
, aResult
);
294 LOG((" Read %u bytes", *aResult
));
300 NS_IMETHODIMP
WebTransportStreamProxy::AsyncInputStreamWrapper::IsNonBlocking(
302 return mStream
->IsNonBlocking(aResult
);
305 NS_IMETHODIMP
WebTransportStreamProxy::AsyncInputStreamWrapper::CloseWithStatus(
307 return mStream
->CloseWithStatus(aStatus
);
310 NS_IMETHODIMP
WebTransportStreamProxy::AsyncInputStreamWrapper::AsyncWait(
311 nsIInputStreamCallback
* aCallback
, uint32_t aFlags
,
312 uint32_t aRequestedCount
, nsIEventTarget
* aEventTarget
) {
313 return mStream
->AsyncWait(aCallback
, aFlags
, aRequestedCount
, aEventTarget
);
316 //------------------------------------------------------------------------------
317 // WebTransportStreamProxy::AsyncOutputStreamWrapper
318 //------------------------------------------------------------------------------
320 NS_IMPL_ISUPPORTS(WebTransportStreamProxy::AsyncOutputStreamWrapper
,
321 nsIOutputStream
, nsIAsyncOutputStream
)
323 WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncOutputStreamWrapper(
324 nsIAsyncOutputStream
* aStream
)
325 : mStream(aStream
) {}
327 WebTransportStreamProxy::AsyncOutputStreamWrapper::~AsyncOutputStreamWrapper() =
330 NS_IMETHODIMP
WebTransportStreamProxy::AsyncOutputStreamWrapper::Flush() {
331 return mStream
->Flush();
335 WebTransportStreamProxy::AsyncOutputStreamWrapper::StreamStatus() {
336 return mStream
->StreamStatus();
339 NS_IMETHODIMP
WebTransportStreamProxy::AsyncOutputStreamWrapper::Write(
340 const char* aBuf
, uint32_t aCount
, uint32_t* aResult
) {
341 LOG(("WebTransportStreamProxy::AsyncOutputStreamWrapper::Write %p %u bytes",
343 return mStream
->Write(aBuf
, aCount
, aResult
);
346 NS_IMETHODIMP
WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteFrom(
347 nsIInputStream
* aFromStream
, uint32_t aCount
, uint32_t* aResult
) {
348 return mStream
->WriteFrom(aFromStream
, aCount
, aResult
);
351 NS_IMETHODIMP
WebTransportStreamProxy::AsyncOutputStreamWrapper::WriteSegments(
352 nsReadSegmentFun aReader
, void* aClosure
, uint32_t aCount
,
354 return mStream
->WriteSegments(aReader
, aClosure
, aCount
, aResult
);
357 NS_IMETHODIMP
WebTransportStreamProxy::AsyncOutputStreamWrapper::AsyncWait(
358 nsIOutputStreamCallback
* aCallback
, uint32_t aFlags
,
359 uint32_t aRequestedCount
, nsIEventTarget
* aEventTarget
) {
360 return mStream
->AsyncWait(aCallback
, aFlags
, aRequestedCount
, aEventTarget
);
364 WebTransportStreamProxy::AsyncOutputStreamWrapper::CloseWithStatus(
366 return mStream
->CloseWithStatus(aStatus
);
369 NS_IMETHODIMP
WebTransportStreamProxy::AsyncOutputStreamWrapper::Close() {
370 return mStream
->Close();
373 NS_IMETHODIMP
WebTransportStreamProxy::AsyncOutputStreamWrapper::IsNonBlocking(
375 return mStream
->IsNonBlocking(aResult
);
378 } // namespace mozilla::net