1 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4 // option. This file may not be copied, modified, or distributed
5 // except according to those terms.
9 use base64::prelude::*;
10 use neqo_common::{event::Provider, qdebug, qinfo, qtrace, Datagram, Header};
11 use neqo_crypto::{generate_ech_keys, init_db, AllowZeroRtt, AntiReplay};
13 Error, Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent,
14 WebTransportRequest, WebTransportServerEvent, WebTransportSessionAcceptAction,
16 use neqo_transport::server::{ActiveConnectionRef, Server};
18 ConnectionEvent, ConnectionParameters, Output, RandomConnectionIdGenerator, StreamId,
23 use std::cell::RefCell;
25 use std::path::PathBuf;
26 use std::process::exit;
29 use std::time::{Duration, Instant};
32 use core::fmt::Display;
35 if #[cfg(not(target_os = "android"))] {
36 use std::sync::mpsc::{channel, Receiver, TryRecvError};
37 use hyper::body::HttpBody;
38 use hyper::header::{HeaderName, HeaderValue};
39 use hyper::{Body, Client, Method, Request};
43 use mio::net::UdpSocket;
44 use mio::{Events, Poll, PollOpt, Ready, Token};
45 use mio_extras::timer::{Builder, Timeout, Timer};
46 use std::cmp::{max, min};
47 use std::collections::hash_map::DefaultHasher;
48 use std::collections::HashMap;
49 use std::collections::HashSet;
50 use std::hash::{Hash, Hasher};
52 use std::net::SocketAddr;
54 const MAX_TABLE_SIZE: u64 = 65536;
55 const MAX_BLOCKED_STREAMS: u16 = 10;
56 const PROTOCOLS: &[&str] = &["h3-29", "h3"];
57 const TIMER_TOKEN: Token = Token(0xffff);
58 const ECH_CONFIG_ID: u8 = 7;
59 const ECH_PUBLIC_NAME: &str = "public.example";
61 const HTTP_RESPONSE_WITH_WRONG_FRAME: &[u8] = &[
62 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, // headers
63 0x0, 0x3, 0x61, 0x62, 0x63, // the first data frame
64 0x3, 0x1, 0x5, // a cancel push frame that is not allowed
67 trait HttpServer: Display {
68 fn process(&mut self, dgram: Option<Datagram>) -> Output;
69 fn process_events(&mut self);
70 fn get_timeout(&self) -> Option<Duration> {
75 struct Http3TestServer {
77 // This a map from a post request to amount of data ithas been received on the request.
78 // The respons will carry the amount of data received.
79 posts: HashMap<Http3OrWebTransportStream, usize>,
80 responses: HashMap<Http3OrWebTransportStream, Vec<u8>>,
81 current_connection_hash: u64,
82 sessions_to_close: HashMap<Instant, Vec<WebTransportRequest>>,
83 sessions_to_create_stream: Vec<(WebTransportRequest, StreamType, bool)>,
84 webtransport_bidi_stream: HashSet<Http3OrWebTransportStream>,
85 wt_unidi_conn_to_stream: HashMap<ActiveConnectionRef, Http3OrWebTransportStream>,
86 wt_unidi_echo_back: HashMap<Http3OrWebTransportStream, Http3OrWebTransportStream>,
87 received_datagram: Option<Vec<u8>>,
90 impl ::std::fmt::Display for Http3TestServer {
91 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
92 write!(f, "{}", self.server)
96 impl Http3TestServer {
97 pub fn new(server: Http3Server) -> Self {
100 posts: HashMap::new(),
101 responses: HashMap::new(),
102 current_connection_hash: 0,
103 sessions_to_close: HashMap::new(),
104 sessions_to_create_stream: Vec::new(),
105 webtransport_bidi_stream: HashSet::new(),
106 wt_unidi_conn_to_stream: HashMap::new(),
107 wt_unidi_echo_back: HashMap::new(),
108 received_datagram: None,
112 fn new_response(&mut self, mut stream: Http3OrWebTransportStream, mut data: Vec<u8>) {
114 let _ = stream.stream_close_send();
117 match stream.send_data(&data) {
119 if sent < data.len() {
120 self.responses.insert(stream, data.split_off(sent));
122 stream.stream_close_send().unwrap();
126 eprintln!("error is {:?}", e);
131 fn handle_stream_writable(&mut self, mut stream: Http3OrWebTransportStream) {
132 if let Some(data) = self.responses.get_mut(&stream) {
133 match stream.send_data(&data) {
135 if sent < data.len() {
136 let new_d = (*data).split_off(sent);
139 stream.stream_close_send().unwrap();
140 self.responses.remove(&stream);
144 eprintln!("Unexpected error");
150 fn maybe_close_session(&mut self) {
151 let now = Instant::now();
152 for (expires, sessions) in self.sessions_to_close.iter_mut() {
154 for s in sessions.iter_mut() {
155 mem::drop(s.close_session(0, ""));
159 self.sessions_to_close.retain(|expires, _| *expires >= now);
162 fn maybe_create_wt_stream(&mut self) {
163 if self.sessions_to_create_stream.is_empty() {
166 let tuple = self.sessions_to_create_stream.pop().unwrap();
167 let mut session = tuple.0;
168 let mut wt_server_stream = session.create_stream(tuple.1).unwrap();
169 if tuple.1 == StreamType::UniDi {
171 wt_server_stream.send_data(b"qwerty").unwrap();
172 wt_server_stream.stream_close_send().unwrap();
174 // relaying Http3ServerEvent::Data to uni streams
175 // slows down netwerk/test/unit/test_webtransport_simple.js
176 // to the point of failure. Only do so when necessary.
177 self.wt_unidi_conn_to_stream
178 .insert(wt_server_stream.conn.clone(), wt_server_stream);
182 wt_server_stream.send_data(b"asdfg").unwrap();
183 wt_server_stream.stream_close_send().unwrap();
185 .stream_stop_sending(Error::HttpNoError.code())
188 self.webtransport_bidi_stream.insert(wt_server_stream);
194 impl HttpServer for Http3TestServer {
195 fn process(&mut self, dgram: Option<Datagram>) -> Output {
196 self.server.process(dgram, Instant::now())
199 fn process_events(&mut self) {
200 self.maybe_close_session();
201 self.maybe_create_wt_stream();
203 while let Some(event) = self.server.next_event() {
204 qtrace!("Event: {:?}", event);
206 Http3ServerEvent::Headers {
211 qtrace!("Headers (request={} fin={}): {:?}", stream, fin, headers);
213 // Some responses do not have content-type. This is on purpose to exercise
214 // UnknownDecoder code.
215 let default_ret = b"Hello World".to_vec();
216 let default_headers = vec![
217 Header::new(":status", "200"),
218 Header::new("cache-control", "no-cache"),
219 Header::new("content-length", default_ret.len().to_string()),
222 self.current_connection_hash.to_string(),
226 let path_hdr = headers.iter().find(|&h| h.name() == ":path");
228 Some(ph) if !ph.value().is_empty() => {
229 let path = ph.value();
230 qtrace!("Serve request {}", path);
231 if path == "/Response421" {
232 let response_body = b"0123456789".to_vec();
235 Header::new(":status", "421"),
236 Header::new("cache-control", "no-cache"),
237 Header::new("content-type", "text/plain"),
240 response_body.len().to_string(),
244 self.new_response(stream, response_body);
245 } else if path == "/RequestCancelled" {
247 .stream_stop_sending(Error::HttpRequestCancelled.code())
250 .stream_reset_send(Error::HttpRequestCancelled.code())
252 } else if path == "/VersionFallback" {
254 .stream_stop_sending(Error::HttpVersionFallback.code())
257 .stream_reset_send(Error::HttpVersionFallback.code())
259 } else if path == "/EarlyResponse" {
261 .stream_stop_sending(Error::HttpNoError.code())
263 } else if path == "/RequestRejected" {
265 .stream_stop_sending(Error::HttpRequestRejected.code())
268 .stream_reset_send(Error::HttpRequestRejected.code())
270 } else if path == "/.well-known/http-opportunistic" {
271 let host_hdr = headers.iter().find(|&h| h.name() == ":authority");
273 Some(host) if !host.value().is_empty() => {
274 let mut content = b"[\"http://".to_vec();
275 content.extend(host.value().as_bytes());
276 content.extend(b"\"]".to_vec());
279 Header::new(":status", "200"),
280 Header::new("cache-control", "no-cache"),
281 Header::new("content-type", "application/json"),
284 content.len().to_string(),
288 self.new_response(stream, content);
291 stream.send_headers(&default_headers).unwrap();
292 self.new_response(stream, default_ret);
295 } else if path == "/no_body" {
298 Header::new(":status", "200"),
299 Header::new("cache-control", "no-cache"),
302 stream.stream_close_send().unwrap();
303 } else if path == "/no_content_length" {
306 Header::new(":status", "200"),
307 Header::new("cache-control", "no-cache"),
310 self.new_response(stream, vec![b'a'; 4000]);
311 } else if path == "/content_length_smaller" {
314 Header::new(":status", "200"),
315 Header::new("cache-control", "no-cache"),
316 Header::new("content-type", "text/plain"),
317 Header::new("content-length", 4000.to_string()),
320 self.new_response(stream, vec![b'a'; 8000]);
321 } else if path == "/post" {
322 // Read all data before responding.
323 self.posts.insert(stream, 0);
324 } else if path == "/priority_mirror" {
325 if let Some(priority) =
326 headers.iter().find(|h| h.name() == "priority")
330 Header::new(":status", "200"),
331 Header::new("cache-control", "no-cache"),
332 Header::new("content-type", "text/plain"),
333 Header::new("priority-mirror", priority.value()),
336 priority.value().len().to_string(),
340 self.new_response(stream, priority.value().as_bytes().to_vec());
344 Header::new(":status", "200"),
345 Header::new("cache-control", "no-cache"),
348 stream.stream_close_send().unwrap();
350 } else if path == "/103_response" {
351 if let Some(early_hint) =
352 headers.iter().find(|h| h.name() == "link-to-set")
354 for l in early_hint.value().split(',') {
357 Header::new(":status", "103"),
358 Header::new("link", l),
365 Header::new(":status", "200"),
366 Header::new("cache-control", "no-cache"),
367 Header::new("content-length", "0"),
370 stream.stream_close_send().unwrap();
371 } else if path == "/get_webtransport_datagram" {
372 if let Some(vec_ref) = self.received_datagram.as_ref() {
375 Header::new(":status", "200"),
378 vec_ref.len().to_string(),
382 self.new_response(stream, vec_ref.to_vec());
383 self.received_datagram = None;
387 Header::new(":status", "404"),
388 Header::new("cache-control", "no-cache"),
391 stream.stream_close_send().unwrap();
394 match path.trim_matches(|p| p == '/').parse::<usize>() {
398 Header::new(":status", "200"),
399 Header::new("cache-control", "no-cache"),
400 Header::new("content-type", "text/plain"),
401 Header::new("content-length", v.to_string()),
404 self.new_response(stream, vec![b'a'; v]);
407 stream.send_headers(&default_headers).unwrap();
408 self.new_response(stream, default_ret);
414 stream.send_headers(&default_headers).unwrap();
415 self.new_response(stream, default_ret);
419 Http3ServerEvent::Data {
424 // echo bidirectional input back to client
425 if self.webtransport_bidi_stream.contains(&stream) {
426 self.new_response(stream, data);
430 // echo unidirectional input to back to client
431 // need to close or we hang
432 if self.wt_unidi_echo_back.contains_key(&stream) {
433 let mut echo_back = self.wt_unidi_echo_back.remove(&stream).unwrap();
434 echo_back.send_data(&data).unwrap();
435 echo_back.stream_close_send().unwrap();
439 if let Some(r) = self.posts.get_mut(&stream) {
443 if let Some(r) = self.posts.remove(&stream) {
444 let default_ret = b"Hello World".to_vec();
447 Header::new(":status", "200"),
448 Header::new("cache-control", "no-cache"),
449 Header::new("x-data-received-length", r.to_string()),
450 Header::new("content-length", default_ret.len().to_string()),
453 self.new_response(stream, default_ret);
457 Http3ServerEvent::DataWritable { stream } => self.handle_stream_writable(stream),
458 Http3ServerEvent::StateChange { conn, state } => {
459 if matches!(state, neqo_http3::Http3State::Connected) {
460 let mut h = DefaultHasher::new();
462 self.current_connection_hash = h.finish();
465 Http3ServerEvent::PriorityUpdate { .. } => {}
466 Http3ServerEvent::StreamReset { stream, error } => {
467 qtrace!("Http3ServerEvent::StreamReset {:?} {:?}", stream, error);
469 Http3ServerEvent::StreamStopSending { stream, error } => {
471 "Http3ServerEvent::StreamStopSending {:?} {:?}",
476 Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession {
481 "WebTransportServerEvent::NewSession {:?} {:?}",
485 let path_hdr = headers.iter().find(|&h| h.name() == ":path");
487 Some(ph) if !ph.value().is_empty() => {
488 let path = ph.value();
489 qtrace!("Serve request {}", path);
490 if path == "/success" {
492 .response(&WebTransportSessionAcceptAction::Accept)
494 } else if path == "/redirect" {
496 .response(&WebTransportSessionAcceptAction::Reject(
498 Header::new(":status", "302"),
499 Header::new("location", "/"),
504 } else if path == "/reject" {
506 .response(&WebTransportSessionAcceptAction::Reject(
507 [Header::new(":status", "404")].to_vec(),
510 } else if path == "/closeafter0ms" {
512 .response(&WebTransportSessionAcceptAction::Accept)
514 let now = Instant::now();
515 if !self.sessions_to_close.contains_key(&now) {
516 self.sessions_to_close.insert(now, Vec::new());
518 self.sessions_to_close.get_mut(&now).unwrap().push(session);
519 } else if path == "/closeafter100ms" {
521 .response(&WebTransportSessionAcceptAction::Accept)
523 let expires = Instant::now() + Duration::from_millis(100);
524 if !self.sessions_to_close.contains_key(&expires) {
525 self.sessions_to_close.insert(expires, Vec::new());
527 self.sessions_to_close
531 } else if path == "/create_unidi_stream" {
533 .response(&WebTransportSessionAcceptAction::Accept)
535 self.sessions_to_create_stream.push((
540 } else if path == "/create_unidi_stream_and_hello" {
542 .response(&WebTransportSessionAcceptAction::Accept)
544 self.sessions_to_create_stream.push((
549 } else if path == "/create_bidi_stream" {
551 .response(&WebTransportSessionAcceptAction::Accept)
553 self.sessions_to_create_stream.push((
558 } else if path == "/create_bidi_stream_and_hello" {
559 self.webtransport_bidi_stream.clear();
561 .response(&WebTransportSessionAcceptAction::Accept)
563 self.sessions_to_create_stream.push((
570 .response(&WebTransportSessionAcceptAction::Accept)
576 .response(&WebTransportSessionAcceptAction::Reject(
577 [Header::new(":status", "404")].to_vec(),
583 Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed {
589 "WebTransportServerEvent::SessionClosed {:?} {:?}",
594 Http3ServerEvent::WebTransport(WebTransportServerEvent::NewStream(stream)) => {
595 // new stream could be from client-outgoing unidirectional
597 if !stream.stream_info.is_http() {
598 if stream.stream_id().is_bidi() {
599 self.webtransport_bidi_stream.insert(stream);
601 // Newly created stream happens on same connection
602 // as the stream creation for client's incoming stream.
603 // Link the streams with map for echo back
604 if self.wt_unidi_conn_to_stream.contains_key(&stream.conn) {
605 let s = self.wt_unidi_conn_to_stream.remove(&stream.conn).unwrap();
606 self.wt_unidi_echo_back.insert(stream, s);
611 Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram {
616 "WebTransportServerEvent::Datagram {:?} {:?}",
620 self.received_datagram = Some(datagram);
626 fn get_timeout(&self) -> Option<Duration> {
627 if let Some(next) = self.sessions_to_close.keys().min() {
628 return Some(max(*next - Instant::now(), Duration::from_millis(0)));
634 impl HttpServer for Server {
635 fn process(&mut self, dgram: Option<Datagram>) -> Output {
636 self.process(dgram, Instant::now())
639 fn process_events(&mut self) {
640 let active_conns = self.active_connections();
641 for mut acr in active_conns {
643 let event = match acr.borrow_mut().next_event() {
648 ConnectionEvent::RecvStreamReadable { stream_id } => {
649 if stream_id.is_bidi() && stream_id.is_client_initiated() {
650 // We are only interesting in request streams
652 .stream_send(stream_id, HTTP_RESPONSE_WITH_WRONG_FRAME)
653 .expect("Read should succeed");
663 struct Http3ProxyServer {
665 responses: HashMap<Http3OrWebTransportStream, Vec<u8>>,
667 request_header: HashMap<StreamId, Vec<Header>>,
668 request_body: HashMap<StreamId, Vec<u8>>,
669 #[cfg(not(target_os = "android"))]
670 stream_map: HashMap<StreamId, Http3OrWebTransportStream>,
671 #[cfg(not(target_os = "android"))]
672 response_to_send: HashMap<StreamId, Receiver<(Vec<Header>, Vec<u8>)>>,
675 impl ::std::fmt::Display for Http3ProxyServer {
676 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
677 write!(f, "{}", self.server)
681 impl Http3ProxyServer {
682 pub fn new(server: Http3Server, server_port: i32) -> Self {
685 responses: HashMap::new(),
687 request_header: HashMap::new(),
688 request_body: HashMap::new(),
689 #[cfg(not(target_os = "android"))]
690 stream_map: HashMap::new(),
691 #[cfg(not(target_os = "android"))]
692 response_to_send: HashMap::new(),
696 #[cfg(not(target_os = "android"))]
697 fn new_response(&mut self, mut stream: Http3OrWebTransportStream, mut data: Vec<u8>) {
699 let _ = stream.stream_close_send();
702 match stream.send_data(&data) {
704 if sent < data.len() {
705 self.responses.insert(stream, data.split_off(sent));
707 stream.stream_close_send().unwrap();
711 eprintln!("error is {:?}", e);
716 fn handle_stream_writable(&mut self, mut stream: Http3OrWebTransportStream) {
717 if let Some(data) = self.responses.get_mut(&stream) {
718 match stream.send_data(&data) {
720 if sent < data.len() {
721 let new_d = (*data).split_off(sent);
724 stream.stream_close_send().unwrap();
725 self.responses.remove(&stream);
729 eprintln!("Unexpected error");
735 #[cfg(not(target_os = "android"))]
737 request: hyper::Request<Body>,
738 out_header: &mut Vec<Header>,
739 out_body: &mut Vec<u8>,
740 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
741 let client = Client::new();
742 let mut resp = client.request(request).await?;
743 out_header.push(Header::new(":status", resp.status().as_str()));
744 for (key, value) in resp.headers() {
745 out_header.push(Header::new(
746 key.as_str().to_ascii_lowercase(),
747 match value.to_str() {
754 while let Some(chunk) = resp.body_mut().data().await {
757 out_body.append(&mut data.to_vec());
766 #[cfg(not(target_os = "android"))]
769 mut stream: Http3OrWebTransportStream,
770 request_headers: &Vec<Header>,
771 request_body: Vec<u8>,
773 let mut request: hyper::Request<Body> = Request::default();
774 let mut path = String::new();
775 for hdr in request_headers.iter() {
778 *request.method_mut() = Method::from_bytes(hdr.value().as_bytes()).unwrap();
782 request.headers_mut().insert(
784 HeaderValue::from_str(hdr.value()).unwrap(),
788 path = String::from(hdr.value());
791 if let Ok(hdr_name) = HeaderName::from_lowercase(hdr.name().as_bytes()) {
794 .insert(hdr_name, HeaderValue::from_str(hdr.value()).unwrap());
799 *request.body_mut() = Body::from(request_body);
801 match format!("http://127.0.0.1:{}{}", self.server_port.to_string(), path).parse() {
804 eprintln!("invalid uri: {}", path);
807 Header::new(":status", "400"),
808 Header::new("cache-control", "no-cache"),
809 Header::new("content-length", "0"),
815 qtrace!("request header: {:?}", request);
817 let (sender, receiver) = channel();
818 thread::spawn(move || {
819 let rt = tokio::runtime::Runtime::new().unwrap();
820 let mut h: Vec<Header> = Vec::new();
821 let mut data: Vec<u8> = Vec::new();
822 let _ = rt.block_on(Self::fetch_url(request, &mut h, &mut data));
823 qtrace!("response headers: {:?}", h);
824 qtrace!("res data: {:02X?}", data);
826 match sender.send((h, data)) {
829 eprintln!("sender.send failed");
834 self.response_to_send.insert(stream.stream_id(), receiver);
835 self.stream_map.insert(stream.stream_id(), stream);
838 #[cfg(target_os = "android")]
841 mut _stream: Http3OrWebTransportStream,
842 _request_headers: &Vec<Header>,
843 _request_body: Vec<u8>,
848 #[cfg(not(target_os = "android"))]
849 fn maybe_process_response(&mut self) {
850 let mut data_to_send = HashMap::new();
851 self.response_to_send
852 .retain(|id, receiver| match receiver.try_recv() {
853 Ok((headers, body)) => {
854 data_to_send.insert(*id, (headers.clone(), body.clone()));
857 Err(TryRecvError::Empty) => true,
858 Err(TryRecvError::Disconnected) => false,
860 while let Some(id) = data_to_send.keys().next().cloned() {
861 let mut stream = self.stream_map.remove(&id).unwrap();
862 let (header, data) = data_to_send.remove(&id).unwrap();
863 qtrace!("response headers: {:?}", header);
864 match stream.send_headers(&header) {
866 self.new_response(stream, data);
874 impl HttpServer for Http3ProxyServer {
875 fn process(&mut self, dgram: Option<Datagram>) -> Output {
876 self.server.process(dgram, Instant::now())
879 fn process_events(&mut self) {
880 #[cfg(not(target_os = "android"))]
881 self.maybe_process_response();
882 while let Some(event) = self.server.next_event() {
883 qtrace!("Event: {:?}", event);
885 Http3ServerEvent::Headers {
890 qtrace!("Headers {:?}", headers);
891 if self.server_port != -1 {
892 let method_hdr = headers.iter().find(|&h| h.name() == ":method");
894 Some(method) => match method.value() {
897 headers.iter().find(|&h| h.name() == "content-length");
898 if let Some(length_str) = content_length {
899 if let Ok(len) = length_str.value().parse::<u32>() {
902 .insert(stream.stream_id(), headers);
904 .insert(stream.stream_id(), Vec::new());
906 self.fetch(stream, &headers, b"".to_vec());
912 self.fetch(stream, &headers, b"".to_vec());
918 let path_hdr = headers.iter().find(|&h| h.name() == ":path");
920 Some(ph) if !ph.value().is_empty() => {
921 let path = ph.value();
924 let port = path[6..].parse::<i32>();
925 if let Ok(port) = port {
926 qtrace!("got port {}", port);
927 self.server_port = port;
937 Header::new(":status", "200"),
938 Header::new("cache-control", "no-cache"),
939 Header::new("content-length", "0"),
944 Http3ServerEvent::Data {
949 if let Some(d) = self.request_body.get_mut(&stream.stream_id()) {
953 if let Some(d) = self.request_body.remove(&stream.stream_id()) {
954 let headers = self.request_header.remove(&stream.stream_id()).unwrap();
955 self.fetch(stream, &headers, d);
959 Http3ServerEvent::DataWritable { stream } => self.handle_stream_writable(stream),
960 Http3ServerEvent::StateChange { .. } | Http3ServerEvent::PriorityUpdate { .. } => {}
961 Http3ServerEvent::StreamReset { stream, error } => {
962 qtrace!("Http3ServerEvent::StreamReset {:?} {:?}", stream, error);
964 Http3ServerEvent::StreamStopSending { stream, error } => {
966 "Http3ServerEvent::StreamStopSending {:?} {:?}",
971 Http3ServerEvent::WebTransport(_) => {}
978 struct NonRespondingServer {}
980 impl ::std::fmt::Display for NonRespondingServer {
981 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
982 write!(f, "NonRespondingServer")
986 impl HttpServer for NonRespondingServer {
987 fn process(&mut self, _dgram: Option<Datagram>) -> Output {
991 fn process_events(&mut self) {}
994 fn emit_packet(socket: &UdpSocket, out_dgram: Datagram) {
995 let res = match socket.send_to(&out_dgram, &out_dgram.destination()) {
996 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => 0,
998 eprintln!("UDP send error: {:?}", err);
1003 if res != out_dgram.len() {
1004 qinfo!("Unable to send all {} bytes of datagram", out_dgram.len());
1009 server: &mut dyn HttpServer,
1010 svr_timeout: &mut Option<Timeout>,
1012 dgram: Option<Datagram>,
1013 timer: &mut Timer<usize>,
1014 socket: &mut UdpSocket,
1016 match server.process(dgram) {
1017 Output::Datagram(dgram) => {
1018 emit_packet(socket, dgram);
1021 Output::Callback(mut new_timeout) => {
1022 if let Some(t) = server.get_timeout() {
1023 new_timeout = min(new_timeout, t);
1025 if let Some(svr_timeout) = svr_timeout {
1026 timer.cancel_timeout(svr_timeout);
1029 qinfo!("Setting timeout of {:?} for {}", new_timeout, server);
1030 if new_timeout > Duration::from_secs(1) {
1031 new_timeout = Duration::from_secs(1);
1033 *svr_timeout = Some(timer.set_timeout(new_timeout, inx));
1037 qdebug!("Output::None");
1044 socket: &mut UdpSocket,
1045 local_address: &SocketAddr,
1046 ) -> Result<Option<Datagram>, io::Error> {
1047 let buf = &mut [0u8; 2048];
1048 let res = socket.recv_from(&mut buf[..]);
1049 if let Some(err) = res.as_ref().err() {
1050 if err.kind() != io::ErrorKind::WouldBlock {
1051 eprintln!("UDP recv error: {:?}", err);
1056 let (sz, remote_addr) = res.unwrap();
1057 if sz == buf.len() {
1058 eprintln!("Might have received more than {} bytes", buf.len());
1062 eprintln!("zero length datagram received?");
1065 Ok(Some(Datagram::new(remote_addr, *local_address, &buf[..sz])))
1077 struct ServersRunner {
1078 hosts: Vec<SocketAddr>,
1080 sockets: Vec<UdpSocket>,
1081 servers: HashMap<SocketAddr, (Box<dyn HttpServer>, Option<Timeout>)>,
1082 timer: Timer<usize>,
1083 active_servers: HashSet<usize>,
1084 ech_config: Vec<u8>,
1087 impl ServersRunner {
1088 pub fn new() -> Result<Self, io::Error> {
1092 sockets: Vec::new(),
1093 servers: HashMap::new(),
1094 timer: Builder::default()
1095 .tick_duration(Duration::from_millis(1))
1097 active_servers: HashSet::new(),
1098 ech_config: Vec::new(),
1102 pub fn init(&mut self) {
1103 self.add_new_socket(0, ServerType::Http3, 0);
1104 self.add_new_socket(1, ServerType::Http3Fail, 0);
1105 self.add_new_socket(2, ServerType::Http3Ech, 0);
1107 let proxy_port = match env::var("MOZ_HTTP3_PROXY_PORT") {
1108 Ok(val) => val.parse::<u16>().unwrap(),
1111 self.add_new_socket(3, ServerType::Http3Proxy, proxy_port);
1112 self.add_new_socket(5, ServerType::Http3NoResponse, 0);
1115 "HTTP3 server listening on ports {}, {}, {}, {} and {}. EchConfig is @{}@",
1116 self.hosts[0].port(),
1117 self.hosts[1].port(),
1118 self.hosts[2].port(),
1119 self.hosts[3].port(),
1120 self.hosts[4].port(),
1121 BASE64_STANDARD.encode(&self.ech_config)
1124 .register(&self.timer, TIMER_TOKEN, Ready::readable(), PollOpt::edge())
1128 fn add_new_socket(&mut self, count: usize, server_type: ServerType, port: u16) -> u16 {
1129 let addr = format!("127.0.0.1:{}", port).parse().unwrap();
1131 let socket = match UdpSocket::bind(&addr) {
1133 eprintln!("Unable to bind UDP socket: {}", err);
1139 let local_addr = match socket.local_addr() {
1141 eprintln!("Socket local address not bound: {}", err);
1147 self.hosts.push(local_addr);
1153 Ready::readable() | Ready::writable(),
1158 self.sockets.push(socket);
1159 let server = self.create_server(server_type);
1160 self.servers.insert(local_addr, (server, None));
1164 fn create_server(&mut self, server_type: ServerType) -> Box<dyn HttpServer> {
1165 let anti_replay = AntiReplay::new(Instant::now(), Duration::from_secs(10), 7, 14)
1166 .expect("unable to setup anti-replay");
1167 let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10)));
1170 ServerType::Http3 => Box::new(Http3TestServer::new(
1173 &[" HTTP2 Test Cert"],
1177 Http3Parameters::default()
1178 .max_table_size_encoder(MAX_TABLE_SIZE)
1179 .max_table_size_decoder(MAX_TABLE_SIZE)
1180 .max_blocked_streams(MAX_BLOCKED_STREAMS)
1182 .connection_parameters(ConnectionParameters::default().datagram_size(1200)),
1185 .expect("We cannot make a server!"),
1187 ServerType::Http3Fail => Box::new(
1190 &[" HTTP2 Test Cert"],
1193 Box::new(AllowZeroRtt {}),
1195 ConnectionParameters::default(),
1197 .expect("We cannot make a server!"),
1199 ServerType::Http3NoResponse => Box::new(NonRespondingServer::default()),
1200 ServerType::Http3Ech => {
1201 let mut server = Box::new(Http3TestServer::new(
1204 &[" HTTP2 Test Cert"],
1208 Http3Parameters::default()
1209 .max_table_size_encoder(MAX_TABLE_SIZE)
1210 .max_table_size_decoder(MAX_TABLE_SIZE)
1211 .max_blocked_streams(MAX_BLOCKED_STREAMS),
1214 .expect("We cannot make a server!"),
1216 let ref mut unboxed_server = (*server).server;
1217 let (sk, pk) = generate_ech_keys().unwrap();
1219 .enable_ech(ECH_CONFIG_ID, ECH_PUBLIC_NAME, &sk, &pk)
1220 .expect("unable to enable ech");
1221 self.ech_config = Vec::from(unboxed_server.ech_config());
1224 ServerType::Http3Proxy => {
1225 let server_config = if env::var("MOZ_HTTP3_MOCHITEST").is_ok() {
1226 ("mochitest-cert", 8888)
1228 (" HTTP2 Test Cert", -1)
1230 let server = Box::new(Http3ProxyServer::new(
1237 Http3Parameters::default()
1238 .max_table_size_encoder(MAX_TABLE_SIZE)
1239 .max_table_size_decoder(MAX_TABLE_SIZE)
1240 .max_blocked_streams(MAX_BLOCKED_STREAMS)
1242 .connection_parameters(
1243 ConnectionParameters::default().datagram_size(1200),
1247 .expect("We cannot make a server!"),
1255 fn process_datagrams_and_events(
1259 ) -> Result<(), io::Error> {
1260 if let Some(socket) = self.sockets.get_mut(inx) {
1261 if let Some((ref mut server, svr_timeout)) =
1262 self.servers.get_mut(&socket.local_addr().unwrap())
1266 let dgram = read_dgram(socket, &self.hosts[inx])?;
1267 if dgram.is_none() {
1289 server.process_events();
1298 self.active_servers.insert(inx);
1305 fn process_active_conns(&mut self) -> Result<(), io::Error> {
1306 let curr_active = mem::take(&mut self.active_servers);
1307 for inx in curr_active {
1308 self.process_datagrams_and_events(inx, false)?;
1313 fn process_timeout(&mut self) -> Result<(), io::Error> {
1314 while let Some(inx) = self.timer.poll() {
1315 qinfo!("Timer expired for {:?}", inx);
1316 self.process_datagrams_and_events(inx, false)?;
1321 pub fn run(&mut self) -> Result<(), io::Error> {
1322 let mut events = Events::with_capacity(1024);
1324 // If there are active servers do not block in poll.
1327 if self.active_servers.is_empty() {
1330 Some(Duration::from_millis(0))
1334 for event in &events {
1335 if event.token() == TIMER_TOKEN {
1336 self.process_timeout()?;
1338 self.process_datagrams_and_events(
1340 event.readiness().is_readable(),
1344 self.process_active_conns()?;
1349 fn main() -> Result<(), io::Error> {
1350 let args: Vec<String> = env::args().collect();
1352 eprintln!("Wrong arguments.");
1356 // Read data from stdin and terminate the server if EOF is detected, which
1357 // means that runxpcshelltests.py ended without shutting down the server.
1358 thread::spawn(|| loop {
1359 let mut buffer = String::new();
1360 match io::stdin().read_line(&mut buffer) {
1372 init_db(PathBuf::from(args[1].clone()));
1374 let mut servers_runner = ServersRunner::new()?;
1375 servers_runner.init();
1376 servers_runner.run()