This commit is contained in:
nora 2023-03-07 15:18:45 +01:00
parent 62c84026f6
commit 0b89e245d9
9 changed files with 6 additions and 586 deletions

View file

@ -1,88 +0,0 @@
#![feature(test)]
#![deny(warnings)]
extern crate test;
use bytes::Buf;
use futures_util::stream;
use futures_util::StreamExt;
use hyper::body::Body;
macro_rules! bench_stream {
($bencher:ident, bytes: $bytes:expr, count: $count:expr, $total_ident:ident, $body_pat:pat, $block:expr) => {{
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.expect("rt build");
let $total_ident: usize = $bytes * $count;
$bencher.bytes = $total_ident as u64;
let __s: &'static [&'static [u8]] = &[&[b'x'; $bytes] as &[u8]; $count] as _;
$bencher.iter(|| {
rt.block_on(async {
let $body_pat = Body::wrap_stream(
stream::iter(__s.iter()).map(|&s| Ok::<_, std::convert::Infallible>(s)),
);
$block;
});
});
}};
}
macro_rules! benches {
($($name:ident, $bytes:expr, $count:expr;)+) => (
mod aggregate {
use super::*;
$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, body, {
let buf = hyper::body::aggregate(body).await.unwrap();
assert_eq!(buf.remaining(), total);
});
}
)+
}
mod manual_into_vec {
use super::*;
$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, mut body, {
let mut vec = Vec::new();
while let Some(chunk) = body.next().await {
vec.extend_from_slice(&chunk.unwrap());
}
assert_eq!(vec.len(), total);
});
}
)+
}
mod to_bytes {
use super::*;
$(
#[bench]
fn $name(b: &mut test::Bencher) {
bench_stream!(b, bytes: $bytes, count: $count, total, body, {
let bytes = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(bytes.len(), total);
});
}
)+
}
)
}
// ===== Actual Benchmarks =====
benches! {
bytes_1_000_count_2, 1_000, 2;
bytes_1_000_count_10, 1_000, 10;
bytes_10_000_count_1, 10_000, 1;
bytes_10_000_count_10, 10_000, 10;
}

View file

@ -1,12 +0,0 @@
#![feature(test)]
#![deny(warnings)]
extern crate test;
use http::Uri;
use hyper::client::connect::HttpConnector;
use hyper::service::Service;
use std::net::SocketAddr;
use tokio::net::TcpListener;
#[bench]
fn http_connector(b: &mut test::Bencher) {
loop {}
}

View file

@ -1,133 +0,0 @@
#![feature(test)]
#![deny(warnings)]
extern crate test;
use std::net::SocketAddr;
use futures_util::future::join_all;
use hyper::client::HttpConnector;
use hyper::{body::HttpBody as _, Body, Method, Request, Response, Server};
#[bench]
fn http1_consecutive_x1_empty(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http1_consecutive_x1_req_10b(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http1_consecutive_x1_both_100kb(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http1_consecutive_x1_both_10mb(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http1_parallel_x10_empty(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http1_parallel_x10_req_10mb(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http1_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http1_parallel_x10_res_1mb(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http1_parallel_x10_res_10mb(b: &mut test::Bencher) {
loop {}
}
const HTTP2_MAX_WINDOW: u32 = std::u32::MAX >> 1;
#[bench]
fn http2_consecutive_x1_empty(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_consecutive_x1_req_10b(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_consecutive_x1_req_100kb(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_parallel_x10_empty(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_parallel_x10_req_10mb(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_parallel_x10_req_10kb_100_chunks(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_parallel_x10_req_10kb_100_chunks_adaptive_window(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_parallel_x10_req_10kb_100_chunks_max_window(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_parallel_x10_res_1mb(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn http2_parallel_x10_res_10mb(b: &mut test::Bencher) {
loop {}
}
struct Opts {
http2: bool,
http2_stream_window: Option<u32>,
http2_conn_window: Option<u32>,
http2_adaptive_window: bool,
parallel_cnt: u32,
request_method: Method,
request_body: Option<&'static [u8]>,
request_chunks: usize,
response_body: &'static [u8],
}
fn opts() -> Opts {
loop {}
}
impl Opts {
fn http2(mut self) -> Self {
loop {}
}
fn http2_stream_window(mut self, sz: impl Into<Option<u32>>) -> Self {
loop {}
}
fn http2_conn_window(mut self, sz: impl Into<Option<u32>>) -> Self {
loop {}
}
fn http2_adaptive_window(mut self) -> Self {
loop {}
}
fn method(mut self, m: Method) -> Self {
loop {}
}
fn request_body(mut self, body: &'static [u8]) -> Self {
loop {}
}
fn request_chunks(mut self, chunk: &'static [u8], cnt: usize) -> Self {
loop {}
}
fn response_body(mut self, body: &'static [u8]) -> Self {
loop {}
}
fn parallel(mut self, cnt: u32) -> Self {
loop {}
}
fn bench(self, b: &mut test::Bencher) {
loop {}
}
}
fn spawn_server(rt: &tokio::runtime::Runtime, opts: &Opts) -> SocketAddr {
loop {}
}

View file

@ -1,15 +0,0 @@
#![feature(test)]
#![deny(warnings)]
extern crate test;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::sync::mpsc;
use std::time::Duration;
use tokio::sync::oneshot;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Response, Server};
const PIPELINED_REQUESTS: usize = 16;
#[bench]
fn hello_world_16(b: &mut test::Bencher) {
loop {}
}

View file

@ -1,71 +0,0 @@
#![feature(test)]
#![deny(warnings)]
extern crate test;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
use std::time::Duration;
use futures_util::{stream, StreamExt};
use tokio::sync::oneshot;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Response, Server};
macro_rules! bench_server {
($b:ident, $header:expr, $body:expr) => {
{ let _ = pretty_env_logger::try_init(); let (_until_tx, until_rx) =
oneshot::channel::< () > (); let addr = { let (addr_tx, addr_rx) =
mpsc::channel(); std::thread::spawn(move || { let addr = "127.0.0.1:0".parse()
.unwrap(); let make_svc = make_service_fn(| _ | async { Ok::< _, hyper::Error >
(service_fn(| _ | async { Ok::< _, hyper::Error > (Response::builder()
.header($header .0, $header .1).header("content-type", "text/plain").body($body
()).unwrap(),) })) }); let rt = tokio::runtime::Builder::new_current_thread()
.enable_all().build().expect("rt build"); let srv = rt.block_on(async move {
Server::bind(& addr).serve(make_svc) }); addr_tx.send(srv.local_addr()).unwrap();
let graceful = srv.with_graceful_shutdown(async { until_rx.await.ok(); }); rt
.block_on(async move { if let Err(e) = graceful.await {
panic!("server error: {}", e); } }); }); addr_rx.recv().unwrap() }; let
total_bytes = { let mut tcp = TcpStream::connect(addr).unwrap(); tcp
.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
.unwrap(); let mut buf = Vec::new(); tcp.read_to_end(& mut buf).unwrap() }; let
mut tcp = TcpStream::connect(addr).unwrap(); tcp
.set_read_timeout(Some(Duration::from_secs(3))).unwrap(); let mut buf = [0u8;
8192]; $b .bytes = 35 + total_bytes as u64; $b .iter(|| { tcp
.write_all(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n").unwrap(); let mut sum =
0; while sum < total_bytes { sum += tcp.read(& mut buf).unwrap(); }
assert_eq!(sum, total_bytes); }); }
};
}
fn body(b: &'static [u8]) -> hyper::Body {
loop {}
}
#[bench]
fn throughput_fixedsize_small_payload(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn throughput_fixedsize_large_payload(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn throughput_fixedsize_many_chunks(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn throughput_chunked_small_payload(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn throughput_chunked_large_payload(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn throughput_chunked_many_chunks(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn raw_tcp_throughput_small_payload(b: &mut test::Bencher) {
loop {}
}
#[bench]
fn raw_tcp_throughput_large_payload(b: &mut test::Bencher) {
loop {}
}

View file

@ -1,153 +1,3 @@
//! HTTP Server
//!
//! A `Server` is created to listen on a port, parse HTTP requests, and hand
//! them off to a `Service`.
//!
//! There are two levels of APIs provide for constructing HTTP servers:
//!
//! - The higher-level [`Server`](Server) type.
//! - The lower-level [`conn`](conn) module.
//!
//! # Server
//!
//! The [`Server`](Server) is main way to start listening for HTTP requests.
//! It wraps a listener with a [`MakeService`](crate::service), and then should
//! be executed to start serving requests.
//!
//! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default.
//!
//! ## Examples
//!
//! ```no_run
//! use std::convert::Infallible;
//! use std::net::SocketAddr;
//! use hyper::{Body, Request, Response, Server};
//! use hyper::service::{make_service_fn, service_fn};
//!
//! async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
//! Ok(Response::new(Body::from("Hello World")))
//! }
//!
//! # #[cfg(feature = "runtime")]
//! #[tokio::main]
//! async fn main() {
//! // Construct our SocketAddr to listen on...
//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
//!
//! // And a MakeService to handle each connection...
//! let make_service = make_service_fn(|_conn| async {
//! Ok::<_, Infallible>(service_fn(handle))
//! });
//!
//! // Then bind and serve...
//! let server = Server::bind(&addr).serve(make_service);
//!
//! // And run forever...
//! if let Err(e) = server.await {
//! eprintln!("server error: {}", e);
//! }
//! }
//! # #[cfg(not(feature = "runtime"))]
//! # fn main() {}
//! ```
//!
//! If you don't need the connection and your service implements `Clone` you can use
//! [`tower::make::Shared`] instead of `make_service_fn` which is a bit simpler:
//!
//! ```no_run
//! # use std::convert::Infallible;
//! # use std::net::SocketAddr;
//! # use hyper::{Body, Request, Response, Server};
//! # use hyper::service::{make_service_fn, service_fn};
//! # use tower::make::Shared;
//! # async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
//! # Ok(Response::new(Body::from("Hello World")))
//! # }
//! # #[cfg(feature = "runtime")]
//! #[tokio::main]
//! async fn main() {
//! // Construct our SocketAddr to listen on...
//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
//!
//! // Shared is a MakeService that produces services by cloning an inner service...
//! let make_service = Shared::new(service_fn(handle));
//!
//! // Then bind and serve...
//! let server = Server::bind(&addr).serve(make_service);
//!
//! // And run forever...
//! if let Err(e) = server.await {
//! eprintln!("server error: {}", e);
//! }
//! }
//! # #[cfg(not(feature = "runtime"))]
//! # fn main() {}
//! ```
//!
//! Passing data to your request handler can be done like so:
//!
//! ```no_run
//! use std::convert::Infallible;
//! use std::net::SocketAddr;
//! use hyper::{Body, Request, Response, Server};
//! use hyper::service::{make_service_fn, service_fn};
//! # #[cfg(feature = "runtime")]
//! use hyper::server::conn::AddrStream;
//!
//! #[derive(Clone)]
//! struct AppContext {
//! // Whatever data your application needs can go here
//! }
//!
//! async fn handle(
//! context: AppContext,
//! addr: SocketAddr,
//! req: Request<Body>
//! ) -> Result<Response<Body>, Infallible> {
//! Ok(Response::new(Body::from("Hello World")))
//! }
//!
//! # #[cfg(feature = "runtime")]
//! #[tokio::main]
//! async fn main() {
//! let context = AppContext {
//! // ...
//! };
//!
//! // A `MakeService` that produces a `Service` to handle each connection.
//! let make_service = make_service_fn(move |conn: &AddrStream| {
//! // We have to clone the context to share it with each invocation of
//! // `make_service`. If your data doesn't implement `Clone` consider using
//! // an `std::sync::Arc`.
//! let context = context.clone();
//!
//! // You can grab the address of the incoming connection like so.
//! let addr = conn.remote_addr();
//!
//! // Create a `Service` for responding to the request.
//! let service = service_fn(move |req| {
//! handle(context.clone(), addr, req)
//! });
//!
//! // Return the service to hyper.
//! async move { Ok::<_, Infallible>(service) }
//! });
//!
//! // Run the server like above...
//! let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
//!
//! let server = Server::bind(&addr).serve(make_service);
//!
//! if let Err(e) = server.await {
//! eprintln!("server error: {}", e);
//! }
//! }
//! # #[cfg(not(feature = "runtime"))]
//! # fn main() {}
//! ```
//!
//! [`tower::make::Shared`]: https://docs.rs/tower/latest/tower/make/struct.Shared.html
pub mod accept;
pub mod conn;
#[cfg(feature = "tcp")]
@ -161,7 +11,6 @@ cfg_feature! {
pub(crate) mod server;
pub use self::server::Builder;
mod shutdown;
}
cfg_feature! {

View file

@ -11,7 +11,6 @@ use crate::common::exec::Exec;
use crate::common::exec::{ConnStreamExec, NewSvcExec};
use crate::common::{task, Future, Pin, Poll, Unpin};
use super::conn::{Http as Http_, UpgradeableConnection};
use super::shutdown::{Graceful, GracefulWatcher};
use crate::service::{HttpService, MakeServiceRef};
use self::new_svc::NewSvcTask;
pin_project! {
@ -108,13 +107,7 @@ where
/// let _ = tx.send(());
/// # }
/// ```
pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
where
F: Future<Output = ()>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
{
loop {}
}
fn poll_next_(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
@ -154,15 +147,7 @@ impl<I, E> Builder<I, E> {
pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
loop {}
}
/// Set a timeout for reading client request headers. If a client does not
/// transmit the entire header within this time, the connection is closed.
///
/// Default is None.
#[cfg(all(feature = "http1", feature = "runtime"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "http1", feature = "runtime"))))]
pub(crate) fn http1_header_read_timeout(mut self, read_timeout: Duration) -> Self {
loop {}
}
///
pub fn serve<S, B>(self, _: S) -> Server<I, S>
where
@ -181,13 +166,9 @@ pub trait Watcher<I, S, E>: Clone {
pub(crate) struct NoopWatcher;
impl<I, S, E> Watcher<I, S, E> for NoopWatcher
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: HttpService<Body>,
E: ConnStreamExec<S::Future, S::ResBody>,
S::ResBody: 'static,
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Future = UpgradeableConnection<I, S, E>;
type Future = ();
fn watch(&self) -> Self::Future {
loop {}
}
@ -204,7 +185,7 @@ pub(crate) mod new_svc {
pin_project! {
#[allow(missing_debug_implementations)]
pub struct NewSvcTask < I, N, S : HttpService < Body >, E, W : Watcher < I, S, E >> {
pub struct NewSvcTask < I, N, S, E, W : Watcher < I, S, E >> {
#[pin]
state : State <I, S, E, W >,
@ -216,7 +197,7 @@ pub(crate) mod new_svc {
pin_project! {
#[project = StateProj]
pub (super) enum State <I, S : HttpService < Body >, E, W : Watcher < I, S, E >> {
pub (super) enum State <I, S, E, W : Watcher < I, S, E >> {
Connecting { a: (I, S, W, E), },

View file

@ -1,55 +0,0 @@
use std::error::Error as StdError;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use super::accept::Accept;
use super::conn::UpgradeableConnection;
use super::server::{Server, Watcher};
use crate::body::{Body, HttpBody};
use crate::common::drain::{Draining, Signal, Watch, Watching};
use crate::common::exec::{ConnStreamExec, NewSvcExec};
use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::{HttpService, MakeServiceRef};
pin_project! {
#[allow(missing_debug_implementations)] pub struct Graceful < I, S, F, E > { #[pin]
state : State < I, S, F, E >, }
}
pin_project! {
#[project = StateProj] pub (super) enum State < I, S, F, E > { Running { drain :
Option < (Signal, Watch) >, #[pin] server : Server < I, S, E >, #[pin] signal : F, },
Draining { draining : Draining }, }
}
impl<I, S, F, E> Graceful<I, S, F, E> {}
impl<I, IO, IE, S, B, F, E> Future for Graceful<I, S, F, E>
where
I: Accept<Conn = IO, Error = IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
F: Future<Output = ()>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
{
type Output = crate::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
loop {}
}
}
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct GracefulWatcher(Watch);
impl<I, S, E> Watcher<I, S, E> for GracefulWatcher
where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: HttpService<Body>,
E: ConnStreamExec<S::Future, S::ResBody>,
S::ResBody: 'static,
<S::ResBody as HttpBody>::Error: Into<Box<dyn StdError + Send + Sync>>,
{
type Future = ();
fn watch(&self) -> Self::Future {
loop {}
}
}