This commit is contained in:
nora 2023-03-07 16:31:04 +01:00
parent a434a3de71
commit 270f547952
5 changed files with 69 additions and 582 deletions

View file

@ -5,25 +5,20 @@ use std::time::Duration;
use futures_util::future::{self, Either, FutureExt as _, TryFutureExt as _};
use http::uri::{Port, Scheme};
use http::{Request, Response, Uri, Version};
use tracing::{debug, trace};
use super::conn;
use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
use super::pool::{
self, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
};
use super::pool::{self, Key as PoolKey, Pool, Poolable, Pooled, Reservation};
#[cfg(feature = "tcp")]
use super::HttpConnector;
use crate::body::{Body, HttpBody};
use crate::common::{
exec::BoxSendFuture, sync_wrapper::SyncWrapper, lazy as hyper_lazy, task, Future,
Lazy, Pin, Poll,
exec::BoxSendFuture, lazy as hyper_lazy, sync_wrapper::SyncWrapper, task, Future, Lazy, Pin,
Poll,
};
use crate::rt::Executor;
use http::uri::{Port, Scheme};
use http::{Request, Response, Uri, Version};
use tracing::{debug, trace};
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
pub struct Client<C, B = Body> {
@ -39,23 +34,12 @@ struct Config {
ver: Ver,
}
#[must_use = "futures do nothing unless polled"]
pub struct ResponseFuture {
inner: SyncWrapper<
Pin<Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>>,
>,
inner: SyncWrapper<Pin<Box<dyn Future<Output = crate::Result<Response<Body>>> + Send>>>,
}
#[cfg(feature = "tcp")]
impl Client<HttpConnector, Body> {
#[cfg_attr(docsrs, doc(cfg(feature = "tcp")))]
#[inline]
pub(crate) fn new() -> Client<HttpConnector, Body> {
@ -68,200 +52,7 @@ impl Default for Client<HttpConnector, Body> {
loop {}
}
}
impl Client<(), Body> {
#[inline]
pub(crate) fn builder() -> Builder {
loop {}
}
}
impl<C, B> Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
pub(crate) fn get(&self, uri: Uri) -> ResponseFuture
where
B: Default,
{
loop {}
}
pub(crate) fn request(&self, mut req: Request<B>) -> ResponseFuture {
loop {}
}
async fn retryably_send_request(
self,
mut req: Request<B>,
pool_key: PoolKey,
) -> crate::Result<Response<Body>> {
loop {}
}
async fn send_request(
&self,
mut req: Request<B>,
pool_key: PoolKey,
) -> Result<Response<Body>, ClientError<B>> {
loop {}
}
async fn connection_for(
&self,
pool_key: PoolKey,
) -> Result<Pooled<PoolClient<B>>, ClientConnectError> {
loop {}
}
fn connect_to(
&self,
pool_key: PoolKey,
) -> impl Lazy<Output = crate::Result<Pooled<PoolClient<B>>>> + Unpin {
let executor = self.conn_builder.exec.clone();
let pool = self.pool.clone();
#[cfg(not(feature = "http2"))]
let conn_builder = self.conn_builder.clone();
#[cfg(feature = "http2")]
let mut conn_builder = self.conn_builder.clone();
let ver = self.config.ver;
let is_ver_h2 = ver == Ver::Http2;
let connector = self.connector.clone();
let dst = domain_as_uri(pool_key.clone());
hyper_lazy(move || {
let connecting = match pool.connecting(&pool_key, ver) {
Some(lock) => lock,
None => {
let canceled = crate::Error::new_canceled()
.with("HTTP/2 connection in progress");
return Either::Right(future::err(canceled));
}
};
Either::Left(
connector
.connect(connect::sealed::Internal, dst)
.map_err(crate::Error::new_connect)
.and_then(move |io| {
let connected = io.connected();
let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
match connecting.alpn_h2(&pool) {
Some(lock) => {
trace!("ALPN negotiated h2, updating pool");
lock
}
None => {
let canceled = crate::Error::new_canceled()
.with("ALPN upgraded to HTTP/2");
return Either::Right(future::err(canceled));
}
}
} else {
connecting
};
#[cfg_attr(not(feature = "http2"), allow(unused))]
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
#[cfg(feature = "http2")]
{
conn_builder.http2_only(is_h2);
}
Either::Left(
Box::pin(async move {
let (tx, conn) = conn_builder.handshake(io).await?;
trace!(
"handshake complete, spawning background dispatcher task"
);
executor
.execute(
conn
.map_err(|e| debug!("client connection error: {}", e))
.map(|_| ()),
);
let tx = tx.when_ready().await?;
let tx = {
#[cfg(feature = "http2")]
{
if is_h2 {
PoolTx::Http2(tx.into_http2())
} else {
PoolTx::Http1(tx)
}
}
#[cfg(not(feature = "http2"))] PoolTx::Http1(tx)
};
Ok(
pool
.pooled(
connecting,
PoolClient {
conn_info: connected,
tx,
},
),
)
}),
)
}),
)
})
}
}
impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
@ -272,10 +63,7 @@ where
type Response = Response<Body>;
type Error = crate::Error;
type Future = ResponseFuture;
fn poll_ready(
&mut self,
_: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {}
}
fn call(&mut self, req: Request<B>) -> Self::Future {
@ -292,10 +80,7 @@ where
type Response = Response<Body>;
type Error = crate::Error;
type Future = ResponseFuture;
fn poll_ready(
&mut self,
_: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {}
}
fn call(&mut self, req: Request<B>) -> Self::Future {
@ -396,12 +181,14 @@ where
#[allow(missing_debug_implementations)]
enum ClientError<B> {
Normal(crate::Error),
Canceled { connection_reused: bool, req: Request<B>, reason: crate::Error },
Canceled {
connection_reused: bool,
req: Request<B>,
reason: crate::Error,
},
}
impl<B> ClientError<B> {
fn map_with_reused(
conn_reused: bool,
) -> impl Fn((crate::Error, Option<Request<B>>)) -> Self {
fn map_with_reused(conn_reused: bool) -> impl Fn((crate::Error, Option<Request<B>>)) -> Self {
move |(err, orig_req)| {
if let Some(req) = orig_req {
ClientError::Canceled {
@ -450,24 +237,6 @@ fn is_schema_secure(uri: &Uri) -> bool {
loop {}
}
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
#[derive(Clone)]
pub struct Builder {
@ -496,11 +265,7 @@ impl Builder {
{
loop {}
}
pub(crate) fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
where
D: Into<Option<Duration>>,
@ -512,198 +277,61 @@ impl Builder {
pub(crate) fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
loop {}
}
pub(crate) fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
loop {}
}
pub(crate) fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
loop {}
}
#[cfg(feature = "http1")]
#[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
pub(crate) fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
loop {}
}
pub(crate) fn http1_allow_spaces_after_header_name_in_responses(
&mut self,
val: bool,
) -> &mut Self {
loop {}
}
pub(crate) fn http1_allow_obsolete_multiline_headers_in_responses(
&mut self,
val: bool,
) -> &mut Self {
loop {}
}
pub(crate) fn http1_ignore_invalid_headers_in_responses(
&mut self,
val: bool,
) -> &mut Builder {
pub(crate) fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
loop {}
}
pub(crate) fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
loop {}
}
pub(crate) fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
loop {}
}
pub(crate) fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
loop {}
}
pub(crate) fn http09_responses(&mut self, val: bool) -> &mut Self {
loop {}
}
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_only(&mut self, val: bool) -> &mut Self {
loop {}
}
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_initial_stream_window_size(
@ -712,11 +340,7 @@ impl Builder {
) -> &mut Self {
loop {}
}
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_initial_connection_window_size(
@ -725,39 +349,19 @@ impl Builder {
) -> &mut Self {
loop {}
}
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
loop {}
}
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_max_frame_size(
&mut self,
sz: impl Into<Option<u32>>,
) -> &mut Self {
pub(crate) fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
loop {}
}
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
@ -767,101 +371,50 @@ impl Builder {
) -> &mut Self {
loop {}
}
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
loop {}
}
#[cfg(feature = "runtime")]
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
loop {}
}
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_max_concurrent_reset_streams(
&mut self,
max: usize,
) -> &mut Self {
pub(crate) fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
loop {}
}
#[cfg(feature = "http2")]
#[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
pub(crate) fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
loop {}
}
#[inline]
pub(crate) fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
loop {}
}
#[inline]
pub(crate) fn set_host(&mut self, val: bool) -> &mut Self {
loop {}
}
pub(crate) fn executor<E>(&mut self, exec: E) -> &mut Self
where
E: Executor<BoxSendFuture> + Send + Sync + 'static,
{
loop {}
}
#[cfg(feature = "tcp")]
pub(crate) fn build_http<B>(&self) -> Client<HttpConnector, B>
where
@ -870,7 +423,7 @@ impl Builder {
{
loop {}
}
pub(crate) fn build<C, B>(&self, connector: C) -> Client<C, B>
where
C: Connect + Clone,

View file

@ -1,7 +1,3 @@
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
use crate::body::Body;
#[cfg(feature = "server")]
@ -13,6 +9,10 @@ use crate::rt::Executor;
use crate::server::server::{new_svc::NewSvcTask, Watcher};
#[cfg(all(feature = "server", any(feature = "http1", feature = "http2")))]
use crate::service::HttpService;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
#[cfg(feature = "server")]
pub trait ConnStreamExec<F, B: HttpBody>: Clone {
fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
@ -27,14 +27,6 @@ pub enum Exec {
Default,
Executor(Arc<dyn Executor<BoxSendFuture> + Send + Sync>),
}
impl Exec {
pub(crate) fn execute<F>(&self, fut: F)
where
F: Future<Output = ()> + Send + 'static,
{
loop {}
}
}
impl fmt::Debug for Exec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
loop {}

View file

@ -5,32 +5,26 @@
//! - The [`Accept`](Accept) trait used to asynchronously accept incoming
//! connections.
//! - Utilities like `poll_fn` to ease creating a custom `Accept`.
#[cfg(feature = "stream")]
use futures_core::Stream;
#[cfg(feature = "stream")]
use pin_project_lite::pin_project;
use crate::common::{
task::{self, Poll},
Pin,
};
#[cfg(feature = "stream")]
use futures_core::Stream;
#[cfg(feature = "stream")]
use pin_project_lite::pin_project;
pub trait Accept {
type Conn;
type Error;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>>;
}
#[cfg(feature = "stream")]
pub fn from_stream<S, IO, E>(stream: S) -> impl Accept<Conn = IO, Error = E>
where
@ -49,7 +43,7 @@ where
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
self.project().stream.poll_next(cx)
loop {}
}
}
FromStream { stream }

View file

@ -41,11 +41,11 @@ impl Server<AddrIncoming, ()> {
}
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
impl<I, IO, IE, S, B, E> Future for Server<I, S, E>
impl<I, IO, IE, S, E> Future for Server<I, S, E>
where
I: Accept<Conn = IO, Error = IE>,
IE: Into<Box<dyn StdError + Send + Sync>>,
S: MakeServiceRef<IO, Body, ResBody = B>,
S: MakeServiceRef<IO, Body>,
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
{
type Output = ();

View file

@ -1,23 +1,17 @@
use std::error::Error as StdError;
use std::fmt;
use tokio::io::{AsyncRead, AsyncWrite};
use super::{HttpService, Service};
use crate::body::HttpBody;
use crate::common::{task, Future, Poll};
use std::error::Error as StdError;
use std::fmt;
use tokio::io::{AsyncRead, AsyncWrite};
pub(crate) trait MakeConnection<Target>: self::sealed::Sealed<(Target,)> {
type Connection: AsyncRead + AsyncWrite;
type Error;
type Future: Future<Output = Result<Self::Connection, Self::Error>>;
fn poll_ready(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
fn make_connection(&mut self, target: Target) -> Self::Future;
}
impl<S, Target> self::sealed::Sealed<(Target,)> for S
where
S: Service<Target>,
{}
impl<S, Target> self::sealed::Sealed<(Target,)> for S where S: Service<Target> {}
impl<S, Target> MakeConnection<Target> for S
where
S: Service<Target>,
@ -26,10 +20,7 @@ where
type Connection = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {}
}
fn make_connection(&mut self, target: Target) -> Self::Future {
@ -43,10 +34,7 @@ pub trait MakeServiceRef<Target, ReqBody>: self::sealed::Sealed<(Target, ReqBody
type MakeError: Into<Box<dyn StdError + Send + Sync>>;
type Future: Future<Output = Result<Self::Service, Self::MakeError>>;
type __DontNameMe: self::sealed::CantImpl;
fn poll_ready_ref(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::MakeError>>;
fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>>;
fn make_service_ref(&mut self, target: &Target) -> Self::Future;
}
impl<T, Target, E, ME, S, F, IB, OB> MakeServiceRef<Target, IB> for T
@ -65,10 +53,7 @@ where
type MakeError = ME;
type Future = F;
type __DontNameMe = self::sealed::CantName;
fn poll_ready_ref(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::MakeError>> {
fn poll_ready_ref(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::MakeError>> {
loop {}
}
fn make_service_ref(&mut self, target: &Target) -> Self::Future {
@ -81,42 +66,8 @@ where
S: HttpService<B1, ResBody = B2>,
B1: HttpBody,
B2: HttpBody,
{}
{
}
pub fn make_service_fn<F, Target, Ret>(f: F) -> MakeServiceFn<F>
where
@ -139,10 +90,7 @@ where
type Error = MkErr;
type Response = Svc;
type Future = Ret;
fn poll_ready(
&mut self,
_cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {}
}
fn call(&mut self, target: &'t Target) -> Self::Future {