This commit is contained in:
nora 2023-03-07 15:47:19 +01:00
parent e1ebd97c91
commit c4644d42c3
2 changed files with 62 additions and 332 deletions

View file

@ -1,18 +1,17 @@
use std::error::Error as StdError; use self::new_svc::NewSvcTask;
#[cfg(feature = "tcp")]
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use super::accept::Accept; use super::accept::Accept;
use super::conn::Http as Http_;
#[cfg(all(feature = "tcp"))] #[cfg(all(feature = "tcp"))]
use super::tcp::AddrIncoming; use super::tcp::AddrIncoming;
use crate::body::{Body, HttpBody}; use crate::body::{Body, HttpBody};
use crate::common::exec::Exec; use crate::common::exec::Exec;
use crate::common::exec::{ConnStreamExec, NewSvcExec}; use crate::common::exec::{ConnStreamExec, NewSvcExec};
use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::common::{task, Future, Pin, Poll, Unpin};
use super::conn::{Http as Http_, UpgradeableConnection};
use crate::service::{HttpService, MakeServiceRef}; use crate::service::{HttpService, MakeServiceRef};
use self::new_svc::NewSvcTask; use pin_project_lite::pin_project;
use std::error::Error as StdError;
#[cfg(feature = "tcp")]
use tokio::io::{AsyncRead, AsyncWrite};
pin_project! { pin_project! {
#[doc = #[doc =
" A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default."] " A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default."]
@ -21,9 +20,9 @@ pin_project! {
" handlers. It is built using the [`Builder`](Builder), and the future"] #[doc = " handlers. It is built using the [`Builder`](Builder), and the future"] #[doc =
" completes when the server has been shutdown. It should be run by an"] #[doc = " completes when the server has been shutdown. It should be run by an"] #[doc =
" `Executor`."] " `Executor`."]
pub struct Server < I, S, E = Exec > { pub struct Server < I, S, E = Exec > {
#[pin] incoming : I, #[pin] incoming : I,
make_service : S, make_service : S,
protocol : Http_ < E >, protocol : Http_ < E >,
@ -38,7 +37,6 @@ pub struct Builder<I, E = Exec> {
} }
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
impl<I> Server<I, ()> { impl<I> Server<I, ()> {
pub fn builder(incoming: I) -> Builder<I> { pub fn builder(incoming: I) -> Builder<I> {
loop {} loop {}
} }
@ -49,72 +47,12 @@ impl<I> Server<I, ()> {
doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2")))) doc(cfg(all(feature = "tcp", any(feature = "http1", feature = "http2"))))
)] )]
impl Server<AddrIncoming, ()> { impl Server<AddrIncoming, ()> {
pub fn bind() -> Builder<AddrIncoming> { pub fn bind() -> Builder<AddrIncoming> {
loop {} loop {}
} }
} }
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
impl<I, IO, IE, S, E, B> Server<I, S, E>
where
I: Accept<Conn = IO, Error = IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
IO: AsyncRead + AsyncWrite + Unpin + Send + 'static,
S: MakeServiceRef<IO, Body, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ConnStreamExec<<S::Service as HttpService<Body>>::Future, B>,
{
fn poll_next_(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<crate::Result<Connecting<IO, S::Future, E>>>> {
loop {}
}
}
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
impl<I, IO, IE, S, B, E> Future for Server<I, S, E> impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
where where
@ -131,12 +69,8 @@ where
type Output = crate::Result<()>; type Output = crate::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
loop { loop {
if let Some(connecting) = ready!(self.as_mut().poll_next_(cx) ?) { let fut = NewSvcTask::new(NoopWatcher);
let fut = NewSvcTask::new(connecting, NoopWatcher); self.as_mut().project().protocol.exec.execute_new_svc(fut);
self.as_mut().project().protocol.exec.execute_new_svc(fut);
} else {
loop {}
}
} }
} }
} }
@ -148,7 +82,6 @@ impl<I, E> Builder<I, E> {
loop {} loop {}
} }
pub fn serve<S, B>(self, _: S) -> Server<I, S> pub fn serve<S, B>(self, _: S) -> Server<I, S>
where where
I: Accept, I: Accept,
@ -174,38 +107,38 @@ where
} }
} }
pub(crate) mod new_svc { pub(crate) mod new_svc {
use std::error::Error as StdError;
use tokio::io::{AsyncRead, AsyncWrite};
use super::{Connecting, Watcher}; use super::{Connecting, Watcher};
use crate::body::{Body, HttpBody}; use crate::body::{Body, HttpBody};
use crate::common::exec::ConnStreamExec; use crate::common::exec::ConnStreamExec;
use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::common::{task, Future, Pin, Poll, Unpin};
use crate::service::HttpService; use crate::service::HttpService;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use std::error::Error as StdError;
use tokio::io::{AsyncRead, AsyncWrite};
pin_project! { pin_project! {
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct NewSvcTask < I, N, S, E, W : Watcher < I, S, E >> { pub struct NewSvcTask < I, N, S, E, W : Watcher < I, S, E >> {
#[pin] #[pin]
state : State <I, S, E, W >, state : State <I, S, E, W >,
a: (N) a: (N)
} }
} }
pin_project! { pin_project! {
#[project = StateProj] #[project = StateProj]
pub (super) enum State <I, S, E, W : Watcher < I, S, E >> { pub (super) enum State <I, S, E, W : Watcher < I, S, E >> {
Connecting { a: (I, S, W, E), }, Connecting { a: (I, S, W, E), },
Connected { #[pin] future : W::Future, }, Connected { #[pin] future : W::Future, },
} }
} }
impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> { impl<I, N, S: HttpService<Body>, E, W: Watcher<I, S, E>> NewSvcTask<I, N, S, E, W> {
pub(super) fn new(connecting: Connecting<I, N, E>, watcher: W) -> Self { pub(super) fn new(watcher: W) -> Self {
loop {} loop {}
} }
} }
@ -230,7 +163,13 @@ pin_project! {
#[doc = " A future building a new `Service` to a `Connection`."] #[doc = ""] #[doc = #[doc = " A future building a new `Service` to a `Connection`."] #[doc = ""] #[doc =
" Wraps the future returned from `MakeService` into one that returns"] #[doc = " Wraps the future returned from `MakeService` into one that returns"] #[doc =
" a `Connection`."] #[must_use = "futures do nothing unless polled"] #[derive(Debug)] " a `Connection`."] #[must_use = "futures do nothing unless polled"] #[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] pub struct #[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
Connecting < I, F, E = Exec > { #[pin] future : F, io : Option < I >, protocol :
Http_ < E >, } pub struct Connecting < F, E = Exec > {
#[pin] future : F,
protocol :
Http_ < E >,
}
} }

View file

@ -1,21 +1,21 @@
use crate::filter::Filter;
use crate::reject::IsReject;
use crate::reply::Reply;
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
use crate::tls::TlsConfigBuilder; use crate::tls::TlsConfigBuilder;
use crate::transport::Transport;
use futures_util::{future, FutureExt, TryFuture, TryStream, TryStreamExt};
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use hyper::Server as HyperServer;
use std::convert::Infallible; use std::convert::Infallible;
use std::error::Error as StdError; use std::error::Error as StdError;
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
use std::path::Path; use std::path::Path;
use futures_util::{future, FutureExt, TryFuture, TryStream, TryStreamExt};
use hyper::server::conn::AddrIncoming;
use hyper::service::{make_service_fn, service_fn};
use hyper::Server as HyperServer;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tracing::Instrument; use tracing::Instrument;
use crate::filter::Filter;
use crate::reject::IsReject;
use crate::reply::Reply;
use crate::transport::Transport;
pub fn serve<F>(filter: F) -> Server<F> pub fn serve<F>(filter: F) -> Server<F>
where where
@ -32,74 +32,22 @@ pub struct Server<F> {
filter: F, filter: F,
} }
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
pub struct TlsServer<F> { pub struct TlsServer<F> {
server: Server<F>, server: Server<F>,
tls: TlsConfigBuilder, tls: TlsConfigBuilder,
} }
macro_rules! into_service {
($into:expr) => {
{ let inner = crate ::service($into); make_service_fn(move | transport | { let
inner = inner.clone(); let remote_addr = Transport::remote_addr(transport);
future::ok::< _, Infallible > (service_fn(move | req | { inner
.call_with_addr(req, remote_addr) })) }) }
};
}
macro_rules! addr_incoming {
($addr:expr) => {
{ let mut incoming = AddrIncoming::bind($addr) ?; incoming.set_nodelay(true); let
addr = incoming.local_addr(); (addr, incoming) }
};
}
macro_rules! bind_inner {
($this:ident, $addr:expr) => {
{ let service = into_service!($this .filter); let (addr, incoming) =
addr_incoming!($addr); let srv = HyperServer::builder(incoming)
.http1_pipeline_flush($this .pipeline).serve(service); Ok::< _, hyper::Error >
((addr, srv)) }
};
(tls : $this:ident, $addr:expr) => {
{ let service = into_service!($this .server.filter); let (addr, incoming) =
addr_incoming!($addr); let tls = $this .tls.build() ?; let srv =
HyperServer::builder(crate ::tls::TlsAcceptor::new(tls, incoming))
.http1_pipeline_flush($this .server.pipeline).serve(service); Ok::< _, Box < dyn
std::error::Error + Send + Sync >> ((addr, srv)) }
};
}
macro_rules! bind {
($this:ident, $addr:expr) => {
{ let addr = $addr .into(); (| addr | bind_inner!($this, addr)) (& addr)
.unwrap_or_else(| e | { panic!("error binding to {}: {}", addr, e); }) }
};
(tls : $this:ident, $addr:expr) => {
{ let addr = $addr .into(); (| addr | bind_inner!(tls : $this, addr)) (& addr)
.unwrap_or_else(| e | { panic!("error binding to {}: {}", addr, e); }) }
};
}
macro_rules! try_bind {
($this:ident, $addr:expr) => {
{ (| addr | bind_inner!($this, addr)) ($addr) }
};
(tls : $this:ident, $addr:expr) => {
{ (| addr | bind_inner!(tls : $this, addr)) ($addr) }
};
}
impl<F> Server<F> impl<F> Server<F>
where where
F: Filter + Clone + Send + Sync + 'static, F: Filter + Clone + Send + Sync + 'static,
<F::Future as TryFuture>::Ok: Reply, <F::Future as TryFuture>::Ok: Reply,
<F::Future as TryFuture>::Error: IsReject, <F::Future as TryFuture>::Error: IsReject,
{ {
pub async fn run(self, addr: impl Into<SocketAddr>) { pub async fn run(self, addr: impl Into<SocketAddr>) {
loop {} loop {}
} }
pub async fn run_incoming<I>(self, incoming: I) pub async fn run_incoming<I>(self, incoming: I)
where where
I: TryStream + Send, I: TryStream + Send,
@ -108,107 +56,22 @@ where
{ {
loop {} loop {}
} }
pub fn bind(self, addr: impl Into<SocketAddr> + 'static) -> impl Future<Output = ()> + 'static {
async {}
pub fn bind(
self,
addr: impl Into<SocketAddr> + 'static,
) -> impl Future<Output = ()> + 'static {
let (_, fut) = self.bind_ephemeral(addr);
fut
} }
pub async fn try_bind(self, addr: impl Into<SocketAddr>) { pub async fn try_bind(self, addr: impl Into<SocketAddr>) {
loop {} loop {}
} }
pub fn bind_ephemeral( pub fn bind_ephemeral(
self, self,
addr: impl Into<SocketAddr>, addr: impl Into<SocketAddr>,
) -> (SocketAddr, impl Future<Output = ()> + 'static) { ) -> (SocketAddr, impl Future<Output = ()> + 'static) {
let (addr, srv) = bind!(self, addr); (addr.into(), async {})
let srv = srv
.map(|result| {
if let Err(err) = result {
tracing::error!("server error: {}", err)
}
});
(addr, srv)
} }
pub fn try_bind_ephemeral(
self,
addr: impl Into<SocketAddr>,
) -> Result<(SocketAddr, impl Future<Output = ()> + 'static), crate::Error> {
let addr = addr.into();
let (addr, srv) = try_bind!(self, & addr).map_err(crate::Error::new)?;
let srv = srv
.map(|result| {
if let Err(err) = result {
tracing::error!("server error: {}", err)
}
});
Ok((addr, srv))
}
pub fn serve_incoming<I>(self, incoming: I) -> impl Future<Output = ()> pub fn serve_incoming<I>(self, incoming: I) -> impl Future<Output = ()>
where where
I: TryStream + Send, I: TryStream + Send,
@ -219,15 +82,7 @@ where
self.serve_incoming2(incoming) self.serve_incoming2(incoming)
.instrument(tracing::info_span!("Server::serve_incoming")) .instrument(tracing::info_span!("Server::serve_incoming"))
} }
pub fn serve_incoming_with_graceful_shutdown<I>( pub fn serve_incoming_with_graceful_shutdown<I>(
self, self,
incoming: I, incoming: I,
@ -238,23 +93,7 @@ where
I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin, I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>, I::Error: Into<Box<dyn StdError + Send + Sync>>,
{ {
let incoming = incoming.map_ok(crate::transport::LiftIo); async move { loop {} }
let service = into_service!(self.filter);
let pipeline = self.pipeline;
async move {
let srv = HyperServer::builder(
hyper::server::accept::from_stream(incoming.into_stream()),
)
.http1_pipeline_flush(pipeline)
.serve(service)
.await;
if let Err(err) = srv {
tracing::error!("server error: {}", err);
}
}
.instrument(
tracing::info_span!("Server::serve_incoming_with_graceful_shutdown"),
)
} }
async fn serve_incoming2<I>(self, incoming: I) async fn serve_incoming2<I>(self, incoming: I)
where where
@ -268,9 +107,7 @@ where
pub fn unstable_pipeline(mut self) -> Self { pub fn unstable_pipeline(mut self) -> Self {
loop {} loop {}
} }
#[cfg(feature = "tls")] #[cfg(feature = "tls")]
pub fn tls(self) -> TlsServer<F> { pub fn tls(self) -> TlsServer<F> {
loop {} loop {}
@ -283,69 +120,38 @@ where
<F::Future as TryFuture>::Ok: Reply, <F::Future as TryFuture>::Ok: Reply,
<F::Future as TryFuture>::Error: IsReject, <F::Future as TryFuture>::Error: IsReject,
{ {
pub fn key_path(self, path: impl AsRef<Path>) -> Self { pub fn key_path(self, path: impl AsRef<Path>) -> Self {
loop {} loop {}
} }
pub fn cert_path(self, path: impl AsRef<Path>) -> Self { pub fn cert_path(self, path: impl AsRef<Path>) -> Self {
loop {} loop {}
} }
pub fn client_auth_optional_path(self, path: impl AsRef<Path>) -> Self { pub fn client_auth_optional_path(self, path: impl AsRef<Path>) -> Self {
loop {} loop {}
} }
pub fn client_auth_required_path(self, path: impl AsRef<Path>) -> Self { pub fn client_auth_required_path(self, path: impl AsRef<Path>) -> Self {
loop {} loop {}
} }
pub fn key(self, key: impl AsRef<[u8]>) -> Self { pub fn key(self, key: impl AsRef<[u8]>) -> Self {
loop {} loop {}
} }
pub fn cert(self, cert: impl AsRef<[u8]>) -> Self { pub fn cert(self, cert: impl AsRef<[u8]>) -> Self {
loop {} loop {}
} }
pub fn client_auth_optional(self, trust_anchor: impl AsRef<[u8]>) -> Self { pub fn client_auth_optional(self, trust_anchor: impl AsRef<[u8]>) -> Self {
loop {} loop {}
} }
pub fn client_auth_required(self, trust_anchor: impl AsRef<[u8]>) -> Self { pub fn client_auth_required(self, trust_anchor: impl AsRef<[u8]>) -> Self {
loop {} loop {}
} }
pub fn ocsp_resp(self, resp: impl AsRef<[u8]>) -> Self { pub fn ocsp_resp(self, resp: impl AsRef<[u8]>) -> Self {
loop {} loop {}
} }
@ -355,37 +161,22 @@ where
{ {
loop {} loop {}
} }
pub async fn run(self, addr: impl Into<SocketAddr>) { pub async fn run(self, addr: impl Into<SocketAddr>) {
loop {} loop {}
} }
pub async fn bind(self, addr: impl Into<SocketAddr>) { pub async fn bind(self, addr: impl Into<SocketAddr>) {
loop {} loop {}
} }
pub fn bind_ephemeral( pub fn bind_ephemeral(
self, self,
addr: impl Into<SocketAddr>, addr: impl Into<SocketAddr>,
) -> (SocketAddr, impl Future<Output = ()> + 'static) { ) -> (SocketAddr, impl Future<Output = ()> + 'static) {
loop {} loop {}
} }
pub fn bind_with_graceful_shutdown( pub fn bind_with_graceful_shutdown(
self, self,
addr: impl Into<SocketAddr> + 'static, addr: impl Into<SocketAddr> + 'static,