From 0b89e245d95379e9a4d82d9f47bc2daf89219ffb Mon Sep 17 00:00:00 2001 From: nils <48135649+Nilstrieb@users.noreply.github.com> Date: Tue, 7 Mar 2023 15:18:45 +0100 Subject: [PATCH] loop --- hyper/benches/body.rs | 88 -------------------- hyper/benches/connect.rs | 12 --- hyper/benches/end_to_end.rs | 133 ------------------------------ hyper/benches/pipeline.rs | 15 ---- hyper/benches/server.rs | 71 ---------------- hyper/src/server/mod.rs | 151 ----------------------------------- hyper/src/server/server.rs | 29 ++----- hyper/src/server/shutdown.rs | 55 ------------- warp/src/server.rs | 38 +-------- 9 files changed, 6 insertions(+), 586 deletions(-) delete mode 100644 hyper/benches/body.rs delete mode 100644 hyper/benches/connect.rs delete mode 100644 hyper/benches/end_to_end.rs delete mode 100644 hyper/benches/pipeline.rs delete mode 100644 hyper/benches/server.rs delete mode 100644 hyper/src/server/shutdown.rs diff --git a/hyper/benches/body.rs b/hyper/benches/body.rs deleted file mode 100644 index 255914d..0000000 --- a/hyper/benches/body.rs +++ /dev/null @@ -1,88 +0,0 @@ -#![feature(test)] -#![deny(warnings)] - -extern crate test; - -use bytes::Buf; -use futures_util::stream; -use futures_util::StreamExt; -use hyper::body::Body; - -macro_rules! bench_stream { - ($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{ - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .expect("rt build"); - - let $total_ident: usize = $bytes * $count; - $bencher.bytes = $total_ident as u64; - let __s: &'static [&'static [u8]] = &[&[b'x'; $bytes] as &[u8]; $count] as _; - - $bencher.iter(|| { - rt.block_on(async { - let $body_pat = Body::wrap_stream( - stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)), - ); - $block; - }); - }); - }}; -} - -macro_rules! benches { - ($($name:ident, $bytes:expr, $count:expr;)+) => ( - mod aggregate { - use super::*; - - $( - #[bench] - fn $name(b: &mut test::Bencher) { - bench_stream!(b, bytes: $bytes, count: $count, total, body, { - let buf = hyper::body::aggregate(body).await.unwrap(); - assert_eq!(buf.remaining(), total); - }); - } - )+ - } - - mod manual_into_vec { - use super::*; - - $( - #[bench] - fn $name(b: &mut test::Bencher) { - bench_stream!(b, bytes: $bytes, count: $count, total, mut body, { - let mut vec = Vec::new(); - while let Some(chunk) = body.next().await { - vec.extend_from_slice(&chunk.unwrap()); - } - assert_eq!(vec.len(), total); - }); - } - )+ - } - - mod to_bytes { - use super::*; - - $( - #[bench] - fn $name(b: &mut test::Bencher) { - bench_stream!(b, bytes: $bytes, count: $count, total, body, { - let bytes = hyper::body::to_bytes(body).await.unwrap(); - assert_eq!(bytes.len(), total); - }); - } - )+ - } - ) -} - -// ===== Actual Benchmarks ===== - -benches! { - bytes_1_000_count_2, 1_000, 2; - bytes_1_000_count_10, 1_000, 10; - bytes_10_000_count_1, 10_000, 1; - bytes_10_000_count_10, 10_000, 10; -} diff --git a/hyper/benches/connect.rs b/hyper/benches/connect.rs deleted file mode 100644 index 04aab8e..0000000 --- a/hyper/benches/connect.rs +++ /dev/null @@ -1,12 +0,0 @@ -#![feature(test)] -#![deny(warnings)] -extern crate test; -use http::Uri; -use hyper::client::connect::HttpConnector; -use hyper::service::Service; -use std::net::SocketAddr; -use tokio::net::TcpListener; -#[bench] -fn http_connector(b: &mut test::Bencher) { - loop {} -} diff --git a/hyper/benches/end_to_end.rs b/hyper/benches/end_to_end.rs deleted file mode 100644 index 7aa6e0f..0000000 --- a/hyper/benches/end_to_end.rs +++ /dev/null @@ -1,133 +0,0 @@ -#![feature(test)] -#![deny(warnings)] -extern crate test; -use std::net::SocketAddr; -use futures_util::future::join_all; -use hyper::client::HttpConnector; -use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server}; -#[bench] -fn http1_consecutive_x1_empty(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http1_consecutive_x1_req_10b(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http1_consecutive_x1_both_100kb(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http1_consecutive_x1_both_10mb(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http1_parallel_x10_empty(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http1_parallel_x10_req_10mb(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http1_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http1_parallel_x10_res_1mb(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http1_parallel_x10_res_10mb(b: &mut test::Bencher) { - loop {} -} -const HTTP2_MAX_WINDOW: u32 = std::u32::MAX >> 1; -#[bench] -fn http2_consecutive_x1_empty(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_consecutive_x1_req_10b(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_consecutive_x1_req_100kb(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_parallel_x10_empty(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_parallel_x10_req_10mb(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_parallel_x10_req_10kb_100_chunks_adaptive_window(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_parallel_x10_req_10kb_100_chunks_max_window(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_parallel_x10_res_1mb(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn http2_parallel_x10_res_10mb(b: &mut test::Bencher) { - loop {} -} -struct Opts { - http2: bool, - http2_stream_window: Option, - http2_conn_window: Option, - http2_adaptive_window: bool, - parallel_cnt: u32, - request_method: Method, - request_body: Option<&'static [u8]>, - request_chunks: usize, - response_body: &'static [u8], -} -fn opts() -> Opts { - loop {} -} -impl Opts { - fn http2(mut self) -> Self { - loop {} - } - fn http2_stream_window(mut self, sz: impl Into>) -> Self { - loop {} - } - fn http2_conn_window(mut self, sz: impl Into>) -> Self { - loop {} - } - fn http2_adaptive_window(mut self) -> Self { - loop {} - } - fn method(mut self, m: Method) -> Self { - loop {} - } - fn request_body(mut self, body: &'static [u8]) -> Self { - loop {} - } - fn request_chunks(mut self, chunk: &'static [u8], cnt: usize) -> Self { - loop {} - } - fn response_body(mut self, body: &'static [u8]) -> Self { - loop {} - } - fn parallel(mut self, cnt: u32) -> Self { - loop {} - } - fn bench(self, b: &mut test::Bencher) { - loop {} - } -} -fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr { - loop {} -} diff --git a/hyper/benches/pipeline.rs b/hyper/benches/pipeline.rs deleted file mode 100644 index f9eaf0d..0000000 --- a/hyper/benches/pipeline.rs +++ /dev/null @@ -1,15 +0,0 @@ -#![feature(test)] -#![deny(warnings)] -extern crate test; -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::sync::mpsc; -use std::time::Duration; -use tokio::sync::oneshot; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Response, Server}; -const PIPELINED_REQUESTS: usize = 16; -#[bench] -fn hello_world_16(b: &mut test::Bencher) { - loop {} -} diff --git a/hyper/benches/server.rs b/hyper/benches/server.rs deleted file mode 100644 index df2fe08..0000000 --- a/hyper/benches/server.rs +++ /dev/null @@ -1,71 +0,0 @@ -#![feature(test)] -#![deny(warnings)] -extern crate test; -use std::io::{Read, Write}; -use std::net::{TcpListener, TcpStream}; -use std::sync::mpsc; -use std::time::Duration; -use futures_util::{stream, StreamExt}; -use tokio::sync::oneshot; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{Body, Response, Server}; -macro_rules! bench_server { - ($b:ident, $header:expr, $body:expr) => { - { let _ = pretty_env_logger::try_init(); let (_until_tx, until_rx) = - oneshot::channel::< () > (); let addr = { let (addr_tx, addr_rx) = - mpsc::channel(); std::thread::spawn(move || { let addr = "127.0.0.1:0".parse() - .unwrap(); let make_svc = make_service_fn(| _ | async { Ok::< _, hyper::Error > - (service_fn(| _ | async { Ok::< _, hyper::Error > (Response::builder() - .header($header .0, $header .1).header("content-type", "text/plain").body($body - ()).unwrap(),) })) }); let rt = tokio::runtime::Builder::new_current_thread() - .enable_all().build().expect("rt build"); let srv = rt.block_on(async move { - Server::bind(& addr).serve(make_svc) }); addr_tx.send(srv.local_addr()).unwrap(); - let graceful = srv.with_graceful_shutdown(async { until_rx.await.ok(); }); rt - .block_on(async move { if let Err(e) = graceful.await { - panic!("server error: {}", e); } }); }); addr_rx.recv().unwrap() }; let - total_bytes = { let mut tcp = TcpStream::connect(addr).unwrap(); tcp - .write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n") - .unwrap(); let mut buf = Vec::new(); tcp.read_to_end(& mut buf).unwrap() }; let - mut tcp = TcpStream::connect(addr).unwrap(); tcp - .set_read_timeout(Some(Duration::from_secs(3))).unwrap(); let mut buf = [0u8; - 8192]; $b .bytes = 35 + total_bytes as u64; $b .iter(|| { tcp - .write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n").unwrap(); let mut sum = - 0; while sum < total_bytes { sum += tcp.read(& mut buf).unwrap(); } - assert_eq!(sum, total_bytes); }); } - }; -} -fn body(b: &'static [u8]) -> hyper::Body { - loop {} -} -#[bench] -fn throughput_fixedsize_small_payload(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn throughput_fixedsize_large_payload(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn throughput_chunked_small_payload(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn throughput_chunked_large_payload(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn throughput_chunked_many_chunks(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn raw_tcp_throughput_small_payload(b: &mut test::Bencher) { - loop {} -} -#[bench] -fn raw_tcp_throughput_large_payload(b: &mut test::Bencher) { - loop {} -} diff --git a/hyper/src/server/mod.rs b/hyper/src/server/mod.rs index e763d0e..2b4d6ca 100644 --- a/hyper/src/server/mod.rs +++ b/hyper/src/server/mod.rs @@ -1,153 +1,3 @@ -//! HTTP Server -//! -//! A `Server` is created to listen on a port, parse HTTP requests, and hand -//! them off to a `Service`. -//! -//! There are two levels of APIs provide for constructing HTTP servers: -//! -//! - The higher-level [`Server`](Server) type. -//! - The lower-level [`conn`](conn) module. -//! -//! # Server -//! -//! The [`Server`](Server) is main way to start listening for HTTP requests. -//! It wraps a listener with a [`MakeService`](crate::service), and then should -//! be executed to start serving requests. -//! -//! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default. -//! -//! ## Examples -//! -//! ```no_run -//! use std::convert::Infallible; -//! use std::net::SocketAddr; -//! use hyper::{Body, Request, Response, Server}; -//! use hyper::service::{make_service_fn, service_fn}; -//! -//! async fn handle(_req: Request) -> Result, Infallible> { -//! Ok(Response::new(Body::from("Hello World"))) -//! } -//! -//! # #[cfg(feature = "runtime")] -//! #[tokio::main] -//! async fn main() { -//! // Construct our SocketAddr to listen on... -//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); -//! -//! // And a MakeService to handle each connection... -//! let make_service = make_service_fn(|_conn| async { -//! Ok::<_, Infallible>(service_fn(handle)) -//! }); -//! -//! // Then bind and serve... -//! let server = Server::bind(&addr).serve(make_service); -//! -//! // And run forever... -//! if let Err(e) = server.await { -//! eprintln!("server error: {}", e); -//! } -//! } -//! # #[cfg(not(feature = "runtime"))] -//! # fn main() {} -//! ``` -//! -//! If you don't need the connection and your service implements `Clone` you can use -//! [`tower::make::Shared`] instead of `make_service_fn` which is a bit simpler: -//! -//! ```no_run -//! # use std::convert::Infallible; -//! # use std::net::SocketAddr; -//! # use hyper::{Body, Request, Response, Server}; -//! # use hyper::service::{make_service_fn, service_fn}; -//! # use tower::make::Shared; -//! # async fn handle(_req: Request) -> Result, Infallible> { -//! # Ok(Response::new(Body::from("Hello World"))) -//! # } -//! # #[cfg(feature = "runtime")] -//! #[tokio::main] -//! async fn main() { -//! // Construct our SocketAddr to listen on... -//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); -//! -//! // Shared is a MakeService that produces services by cloning an inner service... -//! let make_service = Shared::new(service_fn(handle)); -//! -//! // Then bind and serve... -//! let server = Server::bind(&addr).serve(make_service); -//! -//! // And run forever... -//! if let Err(e) = server.await { -//! eprintln!("server error: {}", e); -//! } -//! } -//! # #[cfg(not(feature = "runtime"))] -//! # fn main() {} -//! ``` -//! -//! Passing data to your request handler can be done like so: -//! -//! ```no_run -//! use std::convert::Infallible; -//! use std::net::SocketAddr; -//! use hyper::{Body, Request, Response, Server}; -//! use hyper::service::{make_service_fn, service_fn}; -//! # #[cfg(feature = "runtime")] -//! use hyper::server::conn::AddrStream; -//! -//! #[derive(Clone)] -//! struct AppContext { -//! // Whatever data your application needs can go here -//! } -//! -//! async fn handle( -//! context: AppContext, -//! addr: SocketAddr, -//! req: Request -//! ) -> Result, Infallible> { -//! Ok(Response::new(Body::from("Hello World"))) -//! } -//! -//! # #[cfg(feature = "runtime")] -//! #[tokio::main] -//! async fn main() { -//! let context = AppContext { -//! // ... -//! }; -//! -//! // A `MakeService` that produces a `Service` to handle each connection. -//! let make_service = make_service_fn(move |conn: &AddrStream| { -//! // We have to clone the context to share it with each invocation of -//! // `make_service`. If your data doesn't implement `Clone` consider using -//! // an `std::sync::Arc`. -//! let context = context.clone(); -//! -//! // You can grab the address of the incoming connection like so. -//! let addr = conn.remote_addr(); -//! -//! // Create a `Service` for responding to the request. -//! let service = service_fn(move |req| { -//! handle(context.clone(), addr, req) -//! }); -//! -//! // Return the service to hyper. -//! async move { Ok::<_, Infallible>(service) } -//! }); -//! -//! // Run the server like above... -//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); -//! -//! let server = Server::bind(&addr).serve(make_service); -//! -//! if let Err(e) = server.await { -//! eprintln!("server error: {}", e); -//! } -//! } -//! # #[cfg(not(feature = "runtime"))] -//! # fn main() {} -//! ``` -//! -//! [`tower::make::Shared`]: https://docs.rs/tower/latest/tower/make/struct.Shared.html - pub mod accept; pub mod conn; #[cfg(feature = "tcp")] @@ -161,7 +11,6 @@ cfg_feature! { pub(crate) mod server; pub use self::server::Builder; - mod shutdown; } cfg_feature! { diff --git a/hyper/src/server/server.rs b/hyper/src/server/server.rs index 749e720..59160ca 100644 --- a/hyper/src/server/server.rs +++ b/hyper/src/server/server.rs @@ -11,7 +11,6 @@ use crate::common::exec::Exec; use crate::common::exec::{ConnStreamExec, NewSvcExec}; use crate::common::{task, Future, Pin, Poll, Unpin}; use super::conn::{Http as Http_, UpgradeableConnection}; -use super::shutdown::{Graceful, GracefulWatcher}; use crate::service::{HttpService, MakeServiceRef}; use self::new_svc::NewSvcTask; pin_project! { @@ -108,13 +107,7 @@ where /// let _ = tx.send(()); /// # } /// ``` - pub fn with_graceful_shutdown(self, signal: F) -> Graceful - where - F: Future, - E: NewSvcExec, - { - loop {} - } + fn poll_next_( self: Pin<&mut Self>, cx: &mut task::Context<'_>, @@ -154,15 +147,7 @@ impl Builder { pub fn http1_pipeline_flush(mut self, val: bool) -> Self { loop {} } - /// Set a timeout for reading client request headers. If a client does not - /// transmit the entire header within this time, the connection is closed. - /// - /// Default is None. - #[cfg(all(feature = "http1", feature = "runtime"))] - #[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))] - pub(crate) fn http1_header_read_timeout(mut self, read_timeout: Duration) -> Self { - loop {} - } + /// pub fn serve(self, _: S) -> Server where @@ -181,13 +166,9 @@ pub trait Watcher: Clone { pub(crate) struct NoopWatcher; impl Watcher for NoopWatcher where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, S: HttpService, - E: ConnStreamExec, - S::ResBody: 'static, - ::Error: Into>, { - type Future = UpgradeableConnection; + type Future = (); fn watch(&self) -> Self::Future { loop {} } @@ -204,7 +185,7 @@ pub(crate) mod new_svc { pin_project! { #[allow(missing_debug_implementations)] - pub struct NewSvcTask < I, N, S : HttpService < Body >, E, W : Watcher < I, S, E >> { + pub struct NewSvcTask < I, N, S, E, W : Watcher < I, S, E >> { #[pin] state : State , @@ -216,7 +197,7 @@ pub(crate) mod new_svc { pin_project! { #[project = StateProj] - pub (super) enum State , E, W : Watcher < I, S, E >> { + pub (super) enum State > { Connecting { a: (I, S, W, E), }, diff --git a/hyper/src/server/shutdown.rs b/hyper/src/server/shutdown.rs deleted file mode 100644 index 120bf18..0000000 --- a/hyper/src/server/shutdown.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::error::Error as StdError; -use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite}; -use super::accept::Accept; -use super::conn::UpgradeableConnection; -use super::server::{Server, Watcher}; -use crate::body::{Body, HttpBody}; -use crate::common::drain::{Draining, Signal, Watch, Watching}; -use crate::common::exec::{ConnStreamExec, NewSvcExec}; -use crate::common::{task, Future, Pin, Poll, Unpin}; -use crate::service::{HttpService, MakeServiceRef}; -pin_project! { - #[allow(missing_debug_implementations)] pub struct Graceful < I, S, F, E > { #[pin] - state : State < I, S, F, E >, } -} -pin_project! { - #[project = StateProj] pub (super) enum State < I, S, F, E > { Running { drain : - Option < (Signal, Watch) >, #[pin] server : Server < I, S, E >, #[pin] signal : F, }, - Draining { draining : Draining }, } -} -impl Graceful {} -impl Future for Graceful -where - I: Accept, - IE: Into>, - IO: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: MakeServiceRef, - S::Error: Into>, - B: HttpBody + 'static, - B::Error: Into>, - F: Future, - E: ConnStreamExec<>::Future, B>, - E: NewSvcExec, -{ - type Output = crate::Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - loop {} - } -} -#[allow(missing_debug_implementations)] -#[derive(Clone)] -pub struct GracefulWatcher(Watch); -impl Watcher for GracefulWatcher -where - I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - S: HttpService, - E: ConnStreamExec, - S::ResBody: 'static, - ::Error: Into>, -{ - type Future = (); - fn watch(&self) -> Self::Future { - loop {} - } -} diff --git a/warp/src/server.rs b/warp/src/server.rs index cefbc11..89e91f5 100644 --- a/warp/src/server.rs +++ b/warp/src/server.rs @@ -200,45 +200,10 @@ where /// // Spawn the server into a runtime /// tokio::task::spawn(server); /// - /// // Later, start the shutdown... - /// let _ = tx.send(()); - /// # } - /// ``` - pub fn bind_with_graceful_shutdown( - self, - addr: impl Into + 'static, - signal: impl Future + Send + 'static, - ) -> (SocketAddr, impl Future + 'static) { - let (addr, srv) = bind!(self, addr); - let fut = srv - .with_graceful_shutdown(signal) - .map(|result| { - if let Err(err) = result { - tracing::error!("server error: {}", err) - } - }); - (addr, fut) - } /// Create a server with graceful shutdown signal. /// /// When the signal completes, the server will start the graceful shutdown - /// process. - pub fn try_bind_with_graceful_shutdown( - self, - addr: impl Into + 'static, - signal: impl Future + Send + 'static, - ) -> Result<(SocketAddr, impl Future + 'static), crate::Error> { - let addr = addr.into(); - let (addr, srv) = try_bind!(self, & addr).map_err(crate::Error::new)?; - let srv = srv - .with_graceful_shutdown(signal) - .map(|result| { - if let Err(err) = result { - tracing::error!("server error: {}", err) - } - }); - Ok((addr, srv)) - } + /// Setup this `Server` with a specific stream of incoming connections. /// /// This can be used for Unix Domain Sockets, or TLS, etc. @@ -282,7 +247,6 @@ where ) .http1_pipeline_flush(pipeline) .serve(service) - .with_graceful_shutdown(signal) .await; if let Err(err) = srv { tracing::error!("server error: {}", err);