Backed out 2 changesets (bug 1865633) for causing xpc timeout failures CLOSED TREE
[gecko.git] / netwerk / test / http3server / src / main.rs
blobe6e474615cd91f200681d4bab38b0314b47c01cd
1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
7 #![deny(warnings)]
9 use base64::prelude::*;
10 use neqo_common::{event::Provider, qdebug, qinfo, qtrace, Datagram, Header};
11 use neqo_crypto::{generate_ech_keys, init_db, AllowZeroRtt, AntiReplay};
12 use neqo_http3::{
13     Error, Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent,
14     WebTransportRequest, WebTransportServerEvent, WebTransportSessionAcceptAction,
16 use neqo_transport::server::{ActiveConnectionRef, Server};
17 use neqo_transport::{
18     ConnectionEvent, ConnectionParameters, Output, RandomConnectionIdGenerator, StreamId,
19     StreamType,
21 use std::env;
23 use std::cell::RefCell;
24 use std::io;
25 use std::path::PathBuf;
26 use std::process::exit;
27 use std::rc::Rc;
28 use std::thread;
29 use std::time::{Duration, Instant};
31 use cfg_if::cfg_if;
32 use core::fmt::Display;
34 cfg_if! {
35     if #[cfg(not(target_os = "android"))] {
36         use std::sync::mpsc::{channel, Receiver, TryRecvError};
37         use hyper::body::HttpBody;
38         use hyper::header::{HeaderName, HeaderValue};
39         use hyper::{Body, Client, Method, Request};
40     }
43 use mio::net::UdpSocket;
44 use mio::{Events, Poll, PollOpt, Ready, Token};
45 use mio_extras::timer::{Builder, Timeout, Timer};
46 use std::cmp::{max, min};
47 use std::collections::hash_map::DefaultHasher;
48 use std::collections::HashMap;
49 use std::collections::HashSet;
50 use std::hash::{Hash, Hasher};
51 use std::mem;
52 use std::net::SocketAddr;
54 const MAX_TABLE_SIZE: u64 = 65536;
55 const MAX_BLOCKED_STREAMS: u16 = 10;
56 const PROTOCOLS: &[&str] = &["h3-29", "h3"];
57 const TIMER_TOKEN: Token = Token(0xffff);
58 const ECH_CONFIG_ID: u8 = 7;
59 const ECH_PUBLIC_NAME: &str = "public.example";
61 const HTTP_RESPONSE_WITH_WRONG_FRAME: &[u8] = &[
62     0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, // headers
63     0x0, 0x3, 0x61, 0x62, 0x63, // the first data frame
64     0x3, 0x1, 0x5, // a cancel push frame that is not allowed
67 trait HttpServer: Display {
68     fn process(&mut self, dgram: Option<Datagram>) -> Output;
69     fn process_events(&mut self);
70     fn get_timeout(&self) -> Option<Duration> {
71         None
72     }
75 struct Http3TestServer {
76     server: Http3Server,
77     // This a map from a post request to amount of data ithas been received on the request.
78     // The respons will carry the amount of data received.
79     posts: HashMap<Http3OrWebTransportStream, usize>,
80     responses: HashMap<Http3OrWebTransportStream, Vec<u8>>,
81     current_connection_hash: u64,
82     sessions_to_close: HashMap<Instant, Vec<WebTransportRequest>>,
83     sessions_to_create_stream: Vec<(WebTransportRequest, StreamType, bool)>,
84     webtransport_bidi_stream: HashSet<Http3OrWebTransportStream>,
85     wt_unidi_conn_to_stream: HashMap<ActiveConnectionRef, Http3OrWebTransportStream>,
86     wt_unidi_echo_back: HashMap<Http3OrWebTransportStream, Http3OrWebTransportStream>,
87     received_datagram: Option<Vec<u8>>,
90 impl ::std::fmt::Display for Http3TestServer {
91     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
92         write!(f, "{}", self.server)
93     }
96 impl Http3TestServer {
97     pub fn new(server: Http3Server) -> Self {
98         Self {
99             server,
100             posts: HashMap::new(),
101             responses: HashMap::new(),
102             current_connection_hash: 0,
103             sessions_to_close: HashMap::new(),
104             sessions_to_create_stream: Vec::new(),
105             webtransport_bidi_stream: HashSet::new(),
106             wt_unidi_conn_to_stream: HashMap::new(),
107             wt_unidi_echo_back: HashMap::new(),
108             received_datagram: None,
109         }
110     }
112     fn new_response(&mut self, mut stream: Http3OrWebTransportStream, mut data: Vec<u8>) {
113         if data.len() == 0 {
114             let _ = stream.stream_close_send();
115             return;
116         }
117         match stream.send_data(&data) {
118             Ok(sent) => {
119                 if sent < data.len() {
120                     self.responses.insert(stream, data.split_off(sent));
121                 } else {
122                     stream.stream_close_send().unwrap();
123                 }
124             }
125             Err(e) => {
126                 eprintln!("error is {:?}", e);
127             }
128         }
129     }
131     fn handle_stream_writable(&mut self, mut stream: Http3OrWebTransportStream) {
132         if let Some(data) = self.responses.get_mut(&stream) {
133             match stream.send_data(&data) {
134                 Ok(sent) => {
135                     if sent < data.len() {
136                         let new_d = (*data).split_off(sent);
137                         *data = new_d;
138                     } else {
139                         stream.stream_close_send().unwrap();
140                         self.responses.remove(&stream);
141                     }
142                 }
143                 Err(_) => {
144                     eprintln!("Unexpected error");
145                 }
146             }
147         }
148     }
150     fn maybe_close_session(&mut self) {
151         let now = Instant::now();
152         for (expires, sessions) in self.sessions_to_close.iter_mut() {
153             if *expires <= now {
154                 for s in sessions.iter_mut() {
155                     mem::drop(s.close_session(0, ""));
156                 }
157             }
158         }
159         self.sessions_to_close.retain(|expires, _| *expires >= now);
160     }
162     fn maybe_create_wt_stream(&mut self) {
163         if self.sessions_to_create_stream.is_empty() {
164             return;
165         }
166         let tuple = self.sessions_to_create_stream.pop().unwrap();
167         let mut session = tuple.0;
168         let mut wt_server_stream = session.create_stream(tuple.1).unwrap();
169         if tuple.1 == StreamType::UniDi {
170             if tuple.2 {
171                 wt_server_stream.send_data(b"qwerty").unwrap();
172                 wt_server_stream.stream_close_send().unwrap();
173             } else {
174                 // relaying Http3ServerEvent::Data to uni streams
175                 // slows down netwerk/test/unit/test_webtransport_simple.js
176                 // to the point of failure. Only do so when necessary.
177                 self.wt_unidi_conn_to_stream
178                     .insert(wt_server_stream.conn.clone(), wt_server_stream);
179             }
180         } else {
181             if tuple.2 {
182                 wt_server_stream.send_data(b"asdfg").unwrap();
183                 wt_server_stream.stream_close_send().unwrap();
184                 wt_server_stream
185                     .stream_stop_sending(Error::HttpNoError.code())
186                     .unwrap();
187             } else {
188                 self.webtransport_bidi_stream.insert(wt_server_stream);
189             }
190         }
191     }
194 impl HttpServer for Http3TestServer {
195     fn process(&mut self, dgram: Option<Datagram>) -> Output {
196         self.server.process(dgram, Instant::now())
197     }
199     fn process_events(&mut self) {
200         self.maybe_close_session();
201         self.maybe_create_wt_stream();
203         while let Some(event) = self.server.next_event() {
204             qtrace!("Event: {:?}", event);
205             match event {
206                 Http3ServerEvent::Headers {
207                     mut stream,
208                     headers,
209                     fin,
210                 } => {
211                     qtrace!("Headers (request={} fin={}): {:?}", stream, fin, headers);
213                     // Some responses do not have content-type. This is on purpose to exercise
214                     // UnknownDecoder code.
215                     let default_ret = b"Hello World".to_vec();
216                     let default_headers = vec![
217                         Header::new(":status", "200"),
218                         Header::new("cache-control", "no-cache"),
219                         Header::new("content-length", default_ret.len().to_string()),
220                         Header::new(
221                             "x-http3-conn-hash",
222                             self.current_connection_hash.to_string(),
223                         ),
224                     ];
226                     let path_hdr = headers.iter().find(|&h| h.name() == ":path");
227                     match path_hdr {
228                         Some(ph) if !ph.value().is_empty() => {
229                             let path = ph.value();
230                             qtrace!("Serve request {}", path);
231                             if path == "/Response421" {
232                                 let response_body = b"0123456789".to_vec();
233                                 stream
234                                     .send_headers(&[
235                                         Header::new(":status", "421"),
236                                         Header::new("cache-control", "no-cache"),
237                                         Header::new("content-type", "text/plain"),
238                                         Header::new(
239                                             "content-length",
240                                             response_body.len().to_string(),
241                                         ),
242                                     ])
243                                     .unwrap();
244                                 self.new_response(stream, response_body);
245                             } else if path == "/RequestCancelled" {
246                                 stream
247                                     .stream_stop_sending(Error::HttpRequestCancelled.code())
248                                     .unwrap();
249                                 stream
250                                     .stream_reset_send(Error::HttpRequestCancelled.code())
251                                     .unwrap();
252                             } else if path == "/VersionFallback" {
253                                 stream
254                                     .stream_stop_sending(Error::HttpVersionFallback.code())
255                                     .unwrap();
256                                 stream
257                                     .stream_reset_send(Error::HttpVersionFallback.code())
258                                     .unwrap();
259                             } else if path == "/EarlyResponse" {
260                                 stream
261                                     .stream_stop_sending(Error::HttpNoError.code())
262                                     .unwrap();
263                             } else if path == "/RequestRejected" {
264                                 stream
265                                     .stream_stop_sending(Error::HttpRequestRejected.code())
266                                     .unwrap();
267                                 stream
268                                     .stream_reset_send(Error::HttpRequestRejected.code())
269                                     .unwrap();
270                             } else if path == "/.well-known/http-opportunistic" {
271                                 let host_hdr = headers.iter().find(|&h| h.name() == ":authority");
272                                 match host_hdr {
273                                     Some(host) if !host.value().is_empty() => {
274                                         let mut content = b"[\"http://".to_vec();
275                                         content.extend(host.value().as_bytes());
276                                         content.extend(b"\"]".to_vec());
277                                         stream
278                                             .send_headers(&[
279                                                 Header::new(":status", "200"),
280                                                 Header::new("cache-control", "no-cache"),
281                                                 Header::new("content-type", "application/json"),
282                                                 Header::new(
283                                                     "content-length",
284                                                     content.len().to_string(),
285                                                 ),
286                                             ])
287                                             .unwrap();
288                                         self.new_response(stream, content);
289                                     }
290                                     _ => {
291                                         stream.send_headers(&default_headers).unwrap();
292                                         self.new_response(stream, default_ret);
293                                     }
294                                 }
295                             } else if path == "/no_body" {
296                                 stream
297                                     .send_headers(&[
298                                         Header::new(":status", "200"),
299                                         Header::new("cache-control", "no-cache"),
300                                     ])
301                                     .unwrap();
302                                 stream.stream_close_send().unwrap();
303                             } else if path == "/no_content_length" {
304                                 stream
305                                     .send_headers(&[
306                                         Header::new(":status", "200"),
307                                         Header::new("cache-control", "no-cache"),
308                                     ])
309                                     .unwrap();
310                                 self.new_response(stream, vec![b'a'; 4000]);
311                             } else if path == "/content_length_smaller" {
312                                 stream
313                                     .send_headers(&[
314                                         Header::new(":status", "200"),
315                                         Header::new("cache-control", "no-cache"),
316                                         Header::new("content-type", "text/plain"),
317                                         Header::new("content-length", 4000.to_string()),
318                                     ])
319                                     .unwrap();
320                                 self.new_response(stream, vec![b'a'; 8000]);
321                             } else if path == "/post" {
322                                 // Read all data before responding.
323                                 self.posts.insert(stream, 0);
324                             } else if path == "/priority_mirror" {
325                                 if let Some(priority) =
326                                     headers.iter().find(|h| h.name() == "priority")
327                                 {
328                                     stream
329                                         .send_headers(&[
330                                             Header::new(":status", "200"),
331                                             Header::new("cache-control", "no-cache"),
332                                             Header::new("content-type", "text/plain"),
333                                             Header::new("priority-mirror", priority.value()),
334                                             Header::new(
335                                                 "content-length",
336                                                 priority.value().len().to_string(),
337                                             ),
338                                         ])
339                                         .unwrap();
340                                     self.new_response(stream, priority.value().as_bytes().to_vec());
341                                 } else {
342                                     stream
343                                         .send_headers(&[
344                                             Header::new(":status", "200"),
345                                             Header::new("cache-control", "no-cache"),
346                                         ])
347                                         .unwrap();
348                                     stream.stream_close_send().unwrap();
349                                 }
350                             } else if path == "/103_response" {
351                                 if let Some(early_hint) =
352                                     headers.iter().find(|h| h.name() == "link-to-set")
353                                 {
354                                     for l in early_hint.value().split(',') {
355                                         stream
356                                             .send_headers(&[
357                                                 Header::new(":status", "103"),
358                                                 Header::new("link", l),
359                                             ])
360                                             .unwrap();
361                                     }
362                                 }
363                                 stream
364                                     .send_headers(&[
365                                         Header::new(":status", "200"),
366                                         Header::new("cache-control", "no-cache"),
367                                         Header::new("content-length", "0"),
368                                     ])
369                                     .unwrap();
370                                 stream.stream_close_send().unwrap();
371                             } else if path == "/get_webtransport_datagram" {
372                                 if let Some(vec_ref) = self.received_datagram.as_ref() {
373                                     stream
374                                         .send_headers(&[
375                                             Header::new(":status", "200"),
376                                             Header::new(
377                                                 "content-length",
378                                                 vec_ref.len().to_string(),
379                                             ),
380                                         ])
381                                         .unwrap();
382                                     self.new_response(stream, vec_ref.to_vec());
383                                     self.received_datagram = None;
384                                 } else {
385                                     stream
386                                         .send_headers(&[
387                                             Header::new(":status", "404"),
388                                             Header::new("cache-control", "no-cache"),
389                                         ])
390                                         .unwrap();
391                                     stream.stream_close_send().unwrap();
392                                 }
393                             } else {
394                                 match path.trim_matches(|p| p == '/').parse::<usize>() {
395                                     Ok(v) => {
396                                         stream
397                                             .send_headers(&[
398                                                 Header::new(":status", "200"),
399                                                 Header::new("cache-control", "no-cache"),
400                                                 Header::new("content-type", "text/plain"),
401                                                 Header::new("content-length", v.to_string()),
402                                             ])
403                                             .unwrap();
404                                         self.new_response(stream, vec![b'a'; v]);
405                                     }
406                                     Err(_) => {
407                                         stream.send_headers(&default_headers).unwrap();
408                                         self.new_response(stream, default_ret);
409                                     }
410                                 }
411                             }
412                         }
413                         _ => {
414                             stream.send_headers(&default_headers).unwrap();
415                             self.new_response(stream, default_ret);
416                         }
417                     }
418                 }
419                 Http3ServerEvent::Data {
420                     mut stream,
421                     data,
422                     fin,
423                 } => {
424                     // echo bidirectional input back to client
425                     if self.webtransport_bidi_stream.contains(&stream) {
426                         self.new_response(stream, data);
427                         break;
428                     }
430                     // echo unidirectional input to back to client
431                     // need to close or we hang
432                     if self.wt_unidi_echo_back.contains_key(&stream) {
433                         let mut echo_back = self.wt_unidi_echo_back.remove(&stream).unwrap();
434                         echo_back.send_data(&data).unwrap();
435                         echo_back.stream_close_send().unwrap();
436                         break;
437                     }
439                     if let Some(r) = self.posts.get_mut(&stream) {
440                         *r += data.len();
441                     }
442                     if fin {
443                         if let Some(r) = self.posts.remove(&stream) {
444                             let default_ret = b"Hello World".to_vec();
445                             stream
446                                 .send_headers(&[
447                                     Header::new(":status", "200"),
448                                     Header::new("cache-control", "no-cache"),
449                                     Header::new("x-data-received-length", r.to_string()),
450                                     Header::new("content-length", default_ret.len().to_string()),
451                                 ])
452                                 .unwrap();
453                             self.new_response(stream, default_ret);
454                         }
455                     }
456                 }
457                 Http3ServerEvent::DataWritable { stream } => self.handle_stream_writable(stream),
458                 Http3ServerEvent::StateChange { conn, state } => {
459                     if matches!(state, neqo_http3::Http3State::Connected) {
460                         let mut h = DefaultHasher::new();
461                         conn.hash(&mut h);
462                         self.current_connection_hash = h.finish();
463                     }
464                 }
465                 Http3ServerEvent::PriorityUpdate { .. } => {}
466                 Http3ServerEvent::StreamReset { stream, error } => {
467                     qtrace!("Http3ServerEvent::StreamReset {:?} {:?}", stream, error);
468                 }
469                 Http3ServerEvent::StreamStopSending { stream, error } => {
470                     qtrace!(
471                         "Http3ServerEvent::StreamStopSending {:?} {:?}",
472                         stream,
473                         error
474                     );
475                 }
476                 Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession {
477                     mut session,
478                     headers,
479                 }) => {
480                     qdebug!(
481                         "WebTransportServerEvent::NewSession {:?} {:?}",
482                         session,
483                         headers
484                     );
485                     let path_hdr = headers.iter().find(|&h| h.name() == ":path");
486                     match path_hdr {
487                         Some(ph) if !ph.value().is_empty() => {
488                             let path = ph.value();
489                             qtrace!("Serve request {}", path);
490                             if path == "/success" {
491                                 session
492                                     .response(&WebTransportSessionAcceptAction::Accept)
493                                     .unwrap();
494                             } else if path == "/redirect" {
495                                 session
496                                     .response(&WebTransportSessionAcceptAction::Reject(
497                                         [
498                                             Header::new(":status", "302"),
499                                             Header::new("location", "/"),
500                                         ]
501                                         .to_vec(),
502                                     ))
503                                     .unwrap();
504                             } else if path == "/reject" {
505                                 session
506                                     .response(&WebTransportSessionAcceptAction::Reject(
507                                         [Header::new(":status", "404")].to_vec(),
508                                     ))
509                                     .unwrap();
510                             } else if path == "/closeafter0ms" {
511                                 session
512                                     .response(&WebTransportSessionAcceptAction::Accept)
513                                     .unwrap();
514                                 let now = Instant::now();
515                                 if !self.sessions_to_close.contains_key(&now) {
516                                     self.sessions_to_close.insert(now, Vec::new());
517                                 }
518                                 self.sessions_to_close.get_mut(&now).unwrap().push(session);
519                             } else if path == "/closeafter100ms" {
520                                 session
521                                     .response(&WebTransportSessionAcceptAction::Accept)
522                                     .unwrap();
523                                 let expires = Instant::now() + Duration::from_millis(100);
524                                 if !self.sessions_to_close.contains_key(&expires) {
525                                     self.sessions_to_close.insert(expires, Vec::new());
526                                 }
527                                 self.sessions_to_close
528                                     .get_mut(&expires)
529                                     .unwrap()
530                                     .push(session);
531                             } else if path == "/create_unidi_stream" {
532                                 session
533                                     .response(&WebTransportSessionAcceptAction::Accept)
534                                     .unwrap();
535                                 self.sessions_to_create_stream.push((
536                                     session,
537                                     StreamType::UniDi,
538                                     false,
539                                 ));
540                             } else if path == "/create_unidi_stream_and_hello" {
541                                 session
542                                     .response(&WebTransportSessionAcceptAction::Accept)
543                                     .unwrap();
544                                 self.sessions_to_create_stream.push((
545                                     session,
546                                     StreamType::UniDi,
547                                     true,
548                                 ));
549                             } else if path == "/create_bidi_stream" {
550                                 session
551                                     .response(&WebTransportSessionAcceptAction::Accept)
552                                     .unwrap();
553                                 self.sessions_to_create_stream.push((
554                                     session,
555                                     StreamType::BiDi,
556                                     false,
557                                 ));
558                             } else if path == "/create_bidi_stream_and_hello" {
559                                 self.webtransport_bidi_stream.clear();
560                                 session
561                                     .response(&WebTransportSessionAcceptAction::Accept)
562                                     .unwrap();
563                                 self.sessions_to_create_stream.push((
564                                     session,
565                                     StreamType::BiDi,
566                                     true,
567                                 ));
568                             } else {
569                                 session
570                                     .response(&WebTransportSessionAcceptAction::Accept)
571                                     .unwrap();
572                             }
573                         }
574                         _ => {
575                             session
576                                 .response(&WebTransportSessionAcceptAction::Reject(
577                                     [Header::new(":status", "404")].to_vec(),
578                                 ))
579                                 .unwrap();
580                         }
581                     }
582                 }
583                 Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed {
584                     session,
585                     reason,
586                     headers: _,
587                 }) => {
588                     qdebug!(
589                         "WebTransportServerEvent::SessionClosed {:?} {:?}",
590                         session,
591                         reason
592                     );
593                 }
594                 Http3ServerEvent::WebTransport(WebTransportServerEvent::NewStream(stream)) => {
595                     // new stream could be from client-outgoing unidirectional
596                     // or bidirectional
597                     if !stream.stream_info.is_http() {
598                         if stream.stream_id().is_bidi() {
599                             self.webtransport_bidi_stream.insert(stream);
600                         } else {
601                             // Newly created stream happens on same connection
602                             // as the stream creation for client's incoming stream.
603                             // Link the streams with map for echo back
604                             if self.wt_unidi_conn_to_stream.contains_key(&stream.conn) {
605                                 let s = self.wt_unidi_conn_to_stream.remove(&stream.conn).unwrap();
606                                 self.wt_unidi_echo_back.insert(stream, s);
607                             }
608                         }
609                     }
610                 }
611                 Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram {
612                     session,
613                     datagram,
614                 }) => {
615                     qdebug!(
616                         "WebTransportServerEvent::Datagram {:?} {:?}",
617                         session,
618                         datagram
619                     );
620                     self.received_datagram = Some(datagram);
621                 }
622             }
623         }
624     }
626     fn get_timeout(&self) -> Option<Duration> {
627         if let Some(next) = self.sessions_to_close.keys().min() {
628             return Some(max(*next - Instant::now(), Duration::from_millis(0)));
629         }
630         None
631     }
634 impl HttpServer for Server {
635     fn process(&mut self, dgram: Option<Datagram>) -> Output {
636         self.process(dgram, Instant::now())
637     }
639     fn process_events(&mut self) {
640         let active_conns = self.active_connections();
641         for mut acr in active_conns {
642             loop {
643                 let event = match acr.borrow_mut().next_event() {
644                     None => break,
645                     Some(e) => e,
646                 };
647                 match event {
648                     ConnectionEvent::RecvStreamReadable { stream_id } => {
649                         if stream_id.is_bidi() && stream_id.is_client_initiated() {
650                             // We are only interesting in request streams
651                             acr.borrow_mut()
652                                 .stream_send(stream_id, HTTP_RESPONSE_WITH_WRONG_FRAME)
653                                 .expect("Read should succeed");
654                         }
655                     }
656                     _ => {}
657                 }
658             }
659         }
660     }
663 struct Http3ProxyServer {
664     server: Http3Server,
665     responses: HashMap<Http3OrWebTransportStream, Vec<u8>>,
666     server_port: i32,
667     request_header: HashMap<StreamId, Vec<Header>>,
668     request_body: HashMap<StreamId, Vec<u8>>,
669     #[cfg(not(target_os = "android"))]
670     stream_map: HashMap<StreamId, Http3OrWebTransportStream>,
671     #[cfg(not(target_os = "android"))]
672     response_to_send: HashMap<StreamId, Receiver<(Vec<Header>, Vec<u8>)>>,
675 impl ::std::fmt::Display for Http3ProxyServer {
676     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
677         write!(f, "{}", self.server)
678     }
681 impl Http3ProxyServer {
682     pub fn new(server: Http3Server, server_port: i32) -> Self {
683         Self {
684             server,
685             responses: HashMap::new(),
686             server_port,
687             request_header: HashMap::new(),
688             request_body: HashMap::new(),
689             #[cfg(not(target_os = "android"))]
690             stream_map: HashMap::new(),
691             #[cfg(not(target_os = "android"))]
692             response_to_send: HashMap::new(),
693         }
694     }
696     #[cfg(not(target_os = "android"))]
697     fn new_response(&mut self, mut stream: Http3OrWebTransportStream, mut data: Vec<u8>) {
698         if data.len() == 0 {
699             let _ = stream.stream_close_send();
700             return;
701         }
702         match stream.send_data(&data) {
703             Ok(sent) => {
704                 if sent < data.len() {
705                     self.responses.insert(stream, data.split_off(sent));
706                 } else {
707                     stream.stream_close_send().unwrap();
708                 }
709             }
710             Err(e) => {
711                 eprintln!("error is {:?}", e);
712             }
713         }
714     }
716     fn handle_stream_writable(&mut self, mut stream: Http3OrWebTransportStream) {
717         if let Some(data) = self.responses.get_mut(&stream) {
718             match stream.send_data(&data) {
719                 Ok(sent) => {
720                     if sent < data.len() {
721                         let new_d = (*data).split_off(sent);
722                         *data = new_d;
723                     } else {
724                         stream.stream_close_send().unwrap();
725                         self.responses.remove(&stream);
726                     }
727                 }
728                 Err(_) => {
729                     eprintln!("Unexpected error");
730                 }
731             }
732         }
733     }
735     #[cfg(not(target_os = "android"))]
736     async fn fetch_url(
737         request: hyper::Request<Body>,
738         out_header: &mut Vec<Header>,
739         out_body: &mut Vec<u8>,
740     ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
741         let client = Client::new();
742         let mut resp = client.request(request).await?;
743         out_header.push(Header::new(":status", resp.status().as_str()));
744         for (key, value) in resp.headers() {
745             out_header.push(Header::new(
746                 key.as_str().to_ascii_lowercase(),
747                 match value.to_str() {
748                     Ok(str) => str,
749                     _ => "",
750                 },
751             ));
752         }
754         while let Some(chunk) = resp.body_mut().data().await {
755             match chunk {
756                 Ok(data) => {
757                     out_body.append(&mut data.to_vec());
758                 }
759                 _ => {}
760             }
761         }
763         Ok(())
764     }
766     #[cfg(not(target_os = "android"))]
767     fn fetch(
768         &mut self,
769         mut stream: Http3OrWebTransportStream,
770         request_headers: &Vec<Header>,
771         request_body: Vec<u8>,
772     ) {
773         let mut request: hyper::Request<Body> = Request::default();
774         let mut path = String::new();
775         for hdr in request_headers.iter() {
776             match hdr.name() {
777                 ":method" => {
778                     *request.method_mut() = Method::from_bytes(hdr.value().as_bytes()).unwrap();
779                 }
780                 ":scheme" => {}
781                 ":authority" => {
782                     request.headers_mut().insert(
783                         hyper::header::HOST,
784                         HeaderValue::from_str(hdr.value()).unwrap(),
785                     );
786                 }
787                 ":path" => {
788                     path = String::from(hdr.value());
789                 }
790                 _ => {
791                     if let Ok(hdr_name) = HeaderName::from_lowercase(hdr.name().as_bytes()) {
792                         request
793                             .headers_mut()
794                             .insert(hdr_name, HeaderValue::from_str(hdr.value()).unwrap());
795                     }
796                 }
797             }
798         }
799         *request.body_mut() = Body::from(request_body);
800         *request.uri_mut() =
801             match format!("http://127.0.0.1:{}{}", self.server_port.to_string(), path).parse() {
802                 Ok(uri) => uri,
803                 _ => {
804                     eprintln!("invalid uri: {}", path);
805                     stream
806                         .send_headers(&[
807                             Header::new(":status", "400"),
808                             Header::new("cache-control", "no-cache"),
809                             Header::new("content-length", "0"),
810                         ])
811                         .unwrap();
812                     return;
813                 }
814             };
815         qtrace!("request header: {:?}", request);
817         let (sender, receiver) = channel();
818         thread::spawn(move || {
819             let rt = tokio::runtime::Runtime::new().unwrap();
820             let mut h: Vec<Header> = Vec::new();
821             let mut data: Vec<u8> = Vec::new();
822             let _ = rt.block_on(Self::fetch_url(request, &mut h, &mut data));
823             qtrace!("response headers: {:?}", h);
824             qtrace!("res data: {:02X?}", data);
826             match sender.send((h, data)) {
827                 Ok(()) => {}
828                 _ => {
829                     eprintln!("sender.send failed");
830                 }
831             }
832         });
834         self.response_to_send.insert(stream.stream_id(), receiver);
835         self.stream_map.insert(stream.stream_id(), stream);
836     }
838     #[cfg(target_os = "android")]
839     fn fetch(
840         &mut self,
841         mut _stream: Http3OrWebTransportStream,
842         _request_headers: &Vec<Header>,
843         _request_body: Vec<u8>,
844     ) {
845         // do nothing
846     }
848     #[cfg(not(target_os = "android"))]
849     fn maybe_process_response(&mut self) {
850         let mut data_to_send = HashMap::new();
851         self.response_to_send
852             .retain(|id, receiver| match receiver.try_recv() {
853                 Ok((headers, body)) => {
854                     data_to_send.insert(*id, (headers.clone(), body.clone()));
855                     false
856                 }
857                 Err(TryRecvError::Empty) => true,
858                 Err(TryRecvError::Disconnected) => false,
859             });
860         while let Some(id) = data_to_send.keys().next().cloned() {
861             let mut stream = self.stream_map.remove(&id).unwrap();
862             let (header, data) = data_to_send.remove(&id).unwrap();
863             qtrace!("response headers: {:?}", header);
864             match stream.send_headers(&header) {
865                 Ok(()) => {
866                     self.new_response(stream, data);
867                 }
868                 _ => {}
869             }
870         }
871     }
874 impl HttpServer for Http3ProxyServer {
875     fn process(&mut self, dgram: Option<Datagram>) -> Output {
876         self.server.process(dgram, Instant::now())
877     }
879     fn process_events(&mut self) {
880         #[cfg(not(target_os = "android"))]
881         self.maybe_process_response();
882         while let Some(event) = self.server.next_event() {
883             qtrace!("Event: {:?}", event);
884             match event {
885                 Http3ServerEvent::Headers {
886                     mut stream,
887                     headers,
888                     fin: _,
889                 } => {
890                     qtrace!("Headers {:?}", headers);
891                     if self.server_port != -1 {
892                         let method_hdr = headers.iter().find(|&h| h.name() == ":method");
893                         match method_hdr {
894                             Some(method) => match method.value() {
895                                 "POST" => {
896                                     let content_length =
897                                         headers.iter().find(|&h| h.name() == "content-length");
898                                     if let Some(length_str) = content_length {
899                                         if let Ok(len) = length_str.value().parse::<u32>() {
900                                             if len > 0 {
901                                                 self.request_header
902                                                     .insert(stream.stream_id(), headers);
903                                                 self.request_body
904                                                     .insert(stream.stream_id(), Vec::new());
905                                             } else {
906                                                 self.fetch(stream, &headers, b"".to_vec());
907                                             }
908                                         }
909                                     }
910                                 }
911                                 _ => {
912                                     self.fetch(stream, &headers, b"".to_vec());
913                                 }
914                             },
915                             _ => {}
916                         }
917                     } else {
918                         let path_hdr = headers.iter().find(|&h| h.name() == ":path");
919                         match path_hdr {
920                             Some(ph) if !ph.value().is_empty() => {
921                                 let path = ph.value();
922                                 match &path[..6] {
923                                     "/port?" => {
924                                         let port = path[6..].parse::<i32>();
925                                         if let Ok(port) = port {
926                                             qtrace!("got port {}", port);
927                                             self.server_port = port;
928                                         }
929                                     }
930                                     _ => {}
931                                 }
932                             }
933                             _ => {}
934                         }
935                         stream
936                             .send_headers(&[
937                                 Header::new(":status", "200"),
938                                 Header::new("cache-control", "no-cache"),
939                                 Header::new("content-length", "0"),
940                             ])
941                             .unwrap();
942                     }
943                 }
944                 Http3ServerEvent::Data {
945                     stream,
946                     mut data,
947                     fin,
948                 } => {
949                     if let Some(d) = self.request_body.get_mut(&stream.stream_id()) {
950                         d.append(&mut data);
951                     }
952                     if fin {
953                         if let Some(d) = self.request_body.remove(&stream.stream_id()) {
954                             let headers = self.request_header.remove(&stream.stream_id()).unwrap();
955                             self.fetch(stream, &headers, d);
956                         }
957                     }
958                 }
959                 Http3ServerEvent::DataWritable { stream } => self.handle_stream_writable(stream),
960                 Http3ServerEvent::StateChange { .. } | Http3ServerEvent::PriorityUpdate { .. } => {}
961                 Http3ServerEvent::StreamReset { stream, error } => {
962                     qtrace!("Http3ServerEvent::StreamReset {:?} {:?}", stream, error);
963                 }
964                 Http3ServerEvent::StreamStopSending { stream, error } => {
965                     qtrace!(
966                         "Http3ServerEvent::StreamStopSending {:?} {:?}",
967                         stream,
968                         error
969                     );
970                 }
971                 Http3ServerEvent::WebTransport(_) => {}
972             }
973         }
974     }
977 #[derive(Default)]
978 struct NonRespondingServer {}
980 impl ::std::fmt::Display for NonRespondingServer {
981     fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
982         write!(f, "NonRespondingServer")
983     }
986 impl HttpServer for NonRespondingServer {
987     fn process(&mut self, _dgram: Option<Datagram>) -> Output {
988         Output::None
989     }
991     fn process_events(&mut self) {}
994 fn emit_packet(socket: &UdpSocket, out_dgram: Datagram) {
995     let res = match socket.send_to(&out_dgram, &out_dgram.destination()) {
996         Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => 0,
997         Err(err) => {
998             eprintln!("UDP send error: {:?}", err);
999             exit(1);
1000         }
1001         Ok(res) => res,
1002     };
1003     if res != out_dgram.len() {
1004         qinfo!("Unable to send all {} bytes of datagram", out_dgram.len());
1005     }
1008 fn process(
1009     server: &mut dyn HttpServer,
1010     svr_timeout: &mut Option<Timeout>,
1011     inx: usize,
1012     dgram: Option<Datagram>,
1013     timer: &mut Timer<usize>,
1014     socket: &mut UdpSocket,
1015 ) -> bool {
1016     match server.process(dgram) {
1017         Output::Datagram(dgram) => {
1018             emit_packet(socket, dgram);
1019             true
1020         }
1021         Output::Callback(mut new_timeout) => {
1022             if let Some(t) = server.get_timeout() {
1023                 new_timeout = min(new_timeout, t);
1024             }
1025             if let Some(svr_timeout) = svr_timeout {
1026                 timer.cancel_timeout(svr_timeout);
1027             }
1029             qinfo!("Setting timeout of {:?} for {}", new_timeout, server);
1030             if new_timeout > Duration::from_secs(1) {
1031                 new_timeout = Duration::from_secs(1);
1032             }
1033             *svr_timeout = Some(timer.set_timeout(new_timeout, inx));
1034             false
1035         }
1036         Output::None => {
1037             qdebug!("Output::None");
1038             false
1039         }
1040     }
1043 fn read_dgram(
1044     socket: &mut UdpSocket,
1045     local_address: &SocketAddr,
1046 ) -> Result<Option<Datagram>, io::Error> {
1047     let buf = &mut [0u8; 2048];
1048     let res = socket.recv_from(&mut buf[..]);
1049     if let Some(err) = res.as_ref().err() {
1050         if err.kind() != io::ErrorKind::WouldBlock {
1051             eprintln!("UDP recv error: {:?}", err);
1052         }
1053         return Ok(None);
1054     };
1056     let (sz, remote_addr) = res.unwrap();
1057     if sz == buf.len() {
1058         eprintln!("Might have received more than {} bytes", buf.len());
1059     }
1061     if sz == 0 {
1062         eprintln!("zero length datagram received?");
1063         Ok(None)
1064     } else {
1065         Ok(Some(Datagram::new(remote_addr, *local_address, &buf[..sz])))
1066     }
1069 enum ServerType {
1070     Http3,
1071     Http3Fail,
1072     Http3NoResponse,
1073     Http3Ech,
1074     Http3Proxy,
1077 struct ServersRunner {
1078     hosts: Vec<SocketAddr>,
1079     poll: Poll,
1080     sockets: Vec<UdpSocket>,
1081     servers: HashMap<SocketAddr, (Box<dyn HttpServer>, Option<Timeout>)>,
1082     timer: Timer<usize>,
1083     active_servers: HashSet<usize>,
1084     ech_config: Vec<u8>,
1087 impl ServersRunner {
1088     pub fn new() -> Result<Self, io::Error> {
1089         Ok(Self {
1090             hosts: Vec::new(),
1091             poll: Poll::new()?,
1092             sockets: Vec::new(),
1093             servers: HashMap::new(),
1094             timer: Builder::default()
1095                 .tick_duration(Duration::from_millis(1))
1096                 .build::<usize>(),
1097             active_servers: HashSet::new(),
1098             ech_config: Vec::new(),
1099         })
1100     }
1102     pub fn init(&mut self) {
1103         self.add_new_socket(0, ServerType::Http3, 0);
1104         self.add_new_socket(1, ServerType::Http3Fail, 0);
1105         self.add_new_socket(2, ServerType::Http3Ech, 0);
1107         let proxy_port = match env::var("MOZ_HTTP3_PROXY_PORT") {
1108             Ok(val) => val.parse::<u16>().unwrap(),
1109             _ => 0,
1110         };
1111         self.add_new_socket(3, ServerType::Http3Proxy, proxy_port);
1112         self.add_new_socket(5, ServerType::Http3NoResponse, 0);
1114         println!(
1115             "HTTP3 server listening on ports {}, {}, {}, {} and {}. EchConfig is @{}@",
1116             self.hosts[0].port(),
1117             self.hosts[1].port(),
1118             self.hosts[2].port(),
1119             self.hosts[3].port(),
1120             self.hosts[4].port(),
1121             BASE64_STANDARD.encode(&self.ech_config)
1122         );
1123         self.poll
1124             .register(&self.timer, TIMER_TOKEN, Ready::readable(), PollOpt::edge())
1125             .unwrap();
1126     }
1128     fn add_new_socket(&mut self, count: usize, server_type: ServerType, port: u16) -> u16 {
1129         let addr = format!("127.0.0.1:{}", port).parse().unwrap();
1131         let socket = match UdpSocket::bind(&addr) {
1132             Err(err) => {
1133                 eprintln!("Unable to bind UDP socket: {}", err);
1134                 exit(1)
1135             }
1136             Ok(s) => s,
1137         };
1139         let local_addr = match socket.local_addr() {
1140             Err(err) => {
1141                 eprintln!("Socket local address not bound: {}", err);
1142                 exit(1)
1143             }
1144             Ok(s) => s,
1145         };
1147         self.hosts.push(local_addr);
1149         self.poll
1150             .register(
1151                 &socket,
1152                 Token(count),
1153                 Ready::readable() | Ready::writable(),
1154                 PollOpt::edge(),
1155             )
1156             .unwrap();
1158         self.sockets.push(socket);
1159         let server = self.create_server(server_type);
1160         self.servers.insert(local_addr, (server, None));
1161         local_addr.port()
1162     }
1164     fn create_server(&mut self, server_type: ServerType) -> Box<dyn HttpServer> {
1165         let anti_replay = AntiReplay::new(Instant::now(), Duration::from_secs(10), 7, 14)
1166             .expect("unable to setup anti-replay");
1167         let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10)));
1169         match server_type {
1170             ServerType::Http3 => Box::new(Http3TestServer::new(
1171                 Http3Server::new(
1172                     Instant::now(),
1173                     &[" HTTP2 Test Cert"],
1174                     PROTOCOLS,
1175                     anti_replay,
1176                     cid_mgr,
1177                     Http3Parameters::default()
1178                         .max_table_size_encoder(MAX_TABLE_SIZE)
1179                         .max_table_size_decoder(MAX_TABLE_SIZE)
1180                         .max_blocked_streams(MAX_BLOCKED_STREAMS)
1181                         .webtransport(true)
1182                         .connection_parameters(ConnectionParameters::default().datagram_size(1200)),
1183                     None,
1184                 )
1185                 .expect("We cannot make a server!"),
1186             )),
1187             ServerType::Http3Fail => Box::new(
1188                 Server::new(
1189                     Instant::now(),
1190                     &[" HTTP2 Test Cert"],
1191                     PROTOCOLS,
1192                     anti_replay,
1193                     Box::new(AllowZeroRtt {}),
1194                     cid_mgr,
1195                     ConnectionParameters::default(),
1196                 )
1197                 .expect("We cannot make a server!"),
1198             ),
1199             ServerType::Http3NoResponse => Box::new(NonRespondingServer::default()),
1200             ServerType::Http3Ech => {
1201                 let mut server = Box::new(Http3TestServer::new(
1202                     Http3Server::new(
1203                         Instant::now(),
1204                         &[" HTTP2 Test Cert"],
1205                         PROTOCOLS,
1206                         anti_replay,
1207                         cid_mgr,
1208                         Http3Parameters::default()
1209                             .max_table_size_encoder(MAX_TABLE_SIZE)
1210                             .max_table_size_decoder(MAX_TABLE_SIZE)
1211                             .max_blocked_streams(MAX_BLOCKED_STREAMS),
1212                         None,
1213                     )
1214                     .expect("We cannot make a server!"),
1215                 ));
1216                 let ref mut unboxed_server = (*server).server;
1217                 let (sk, pk) = generate_ech_keys().unwrap();
1218                 unboxed_server
1219                     .enable_ech(ECH_CONFIG_ID, ECH_PUBLIC_NAME, &sk, &pk)
1220                     .expect("unable to enable ech");
1221                 self.ech_config = Vec::from(unboxed_server.ech_config());
1222                 server
1223             }
1224             ServerType::Http3Proxy => {
1225                 let server_config = if env::var("MOZ_HTTP3_MOCHITEST").is_ok() {
1226                     ("mochitest-cert", 8888)
1227                 } else {
1228                     (" HTTP2 Test Cert", -1)
1229                 };
1230                 let server = Box::new(Http3ProxyServer::new(
1231                     Http3Server::new(
1232                         Instant::now(),
1233                         &[server_config.0],
1234                         PROTOCOLS,
1235                         anti_replay,
1236                         cid_mgr,
1237                         Http3Parameters::default()
1238                             .max_table_size_encoder(MAX_TABLE_SIZE)
1239                             .max_table_size_decoder(MAX_TABLE_SIZE)
1240                             .max_blocked_streams(MAX_BLOCKED_STREAMS)
1241                             .webtransport(true)
1242                             .connection_parameters(
1243                                 ConnectionParameters::default().datagram_size(1200),
1244                             ),
1245                         None,
1246                     )
1247                     .expect("We cannot make a server!"),
1248                     server_config.1,
1249                 ));
1250                 server
1251             }
1252         }
1253     }
1255     fn process_datagrams_and_events(
1256         &mut self,
1257         inx: usize,
1258         read_socket: bool,
1259     ) -> Result<(), io::Error> {
1260         if let Some(socket) = self.sockets.get_mut(inx) {
1261             if let Some((ref mut server, svr_timeout)) =
1262                 self.servers.get_mut(&socket.local_addr().unwrap())
1263             {
1264                 if read_socket {
1265                     loop {
1266                         let dgram = read_dgram(socket, &self.hosts[inx])?;
1267                         if dgram.is_none() {
1268                             break;
1269                         }
1270                         let _ = process(
1271                             &mut **server,
1272                             svr_timeout,
1273                             inx,
1274                             dgram,
1275                             &mut self.timer,
1276                             socket,
1277                         );
1278                     }
1279                 } else {
1280                     let _ = process(
1281                         &mut **server,
1282                         svr_timeout,
1283                         inx,
1284                         None,
1285                         &mut self.timer,
1286                         socket,
1287                     );
1288                 }
1289                 server.process_events();
1290                 if process(
1291                     &mut **server,
1292                     svr_timeout,
1293                     inx,
1294                     None,
1295                     &mut self.timer,
1296                     socket,
1297                 ) {
1298                     self.active_servers.insert(inx);
1299                 }
1300             }
1301         }
1302         Ok(())
1303     }
1305     fn process_active_conns(&mut self) -> Result<(), io::Error> {
1306         let curr_active = mem::take(&mut self.active_servers);
1307         for inx in curr_active {
1308             self.process_datagrams_and_events(inx, false)?;
1309         }
1310         Ok(())
1311     }
1313     fn process_timeout(&mut self) -> Result<(), io::Error> {
1314         while let Some(inx) = self.timer.poll() {
1315             qinfo!("Timer expired for {:?}", inx);
1316             self.process_datagrams_and_events(inx, false)?;
1317         }
1318         Ok(())
1319     }
1321     pub fn run(&mut self) -> Result<(), io::Error> {
1322         let mut events = Events::with_capacity(1024);
1323         loop {
1324             // If there are active servers do not block in poll.
1325             self.poll.poll(
1326                 &mut events,
1327                 if self.active_servers.is_empty() {
1328                     None
1329                 } else {
1330                     Some(Duration::from_millis(0))
1331                 },
1332             )?;
1334             for event in &events {
1335                 if event.token() == TIMER_TOKEN {
1336                     self.process_timeout()?;
1337                 } else {
1338                     self.process_datagrams_and_events(
1339                         event.token().0,
1340                         event.readiness().is_readable(),
1341                     )?;
1342                 }
1343             }
1344             self.process_active_conns()?;
1345         }
1346     }
1349 fn main() -> Result<(), io::Error> {
1350     let args: Vec<String> = env::args().collect();
1351     if args.len() < 2 {
1352         eprintln!("Wrong arguments.");
1353         exit(1)
1354     }
1356     // Read data from stdin and terminate the server if EOF is detected, which
1357     // means that runxpcshelltests.py ended without shutting down the server.
1358     thread::spawn(|| loop {
1359         let mut buffer = String::new();
1360         match io::stdin().read_line(&mut buffer) {
1361             Ok(n) => {
1362                 if n == 0 {
1363                     exit(0);
1364                 }
1365             }
1366             Err(_) => {
1367                 exit(0);
1368             }
1369         }
1370     });
1372     init_db(PathBuf::from(args[1].clone()));
1374     let mut servers_runner = ServersRunner::new()?;
1375     servers_runner.init();
1376     servers_runner.run()