This commit is contained in:
nora 2023-03-07 17:04:26 +01:00
parent 1d163b6fef
commit 20031c3035
3 changed files with 33 additions and 97 deletions

View file

@ -149,32 +149,6 @@ impl<'a, T> Drop for OptGuard<'a, T> {
loop {}
}
}
cfg_server! {
impl < S, B > Server < S, B > where S : HttpService < B >, { pub (crate) fn
new(service : S) -> Server < S, B > { Server { in_flight : Box::pin(None), service, }
} pub (crate) fn into_service(self) -> S { self.service } } impl < S : HttpService <
B >, B > Unpin for Server < S, B > {} impl < S, Bs > Dispatch for Server < S, Body >
where S : HttpService < Body, ResBody = Bs >, S::Error : Into < Box < dyn StdError +
Send + Sync >>, Bs : HttpBody, { type PollItem = MessageHead < http::StatusCode >;
type PollBody = Bs; type PollError = S::Error; type RecvItem = RequestHead; fn
poll_msg(mut self : Pin <& mut Self >, cx : & mut task::Context <'_ >,) -> Poll <
Option < Result < (Self::PollItem, Self::PollBody), Self::PollError >>> { let mut
this = self.as_mut(); let ret = if let Some(ref mut fut) = this.in_flight.as_mut()
.as_pin_mut() { let resp = ready!(fut.as_mut().poll(cx) ?); let (parts, body) = resp
.into_parts(); let head = MessageHead { version : parts.version, subject : parts
.status, headers : parts.headers, extensions : parts.extensions, };
Poll::Ready(Some(Ok((head, body)))) } else {
unreachable!("poll_msg shouldn't be called if no inflight"); }; this.in_flight
.set(None); ret } fn recv_msg(& mut self, msg : crate ::Result < (Self::RecvItem,
Body) >) -> crate ::Result < () > { let (msg, body) = msg ?; let mut req =
Request::new(body); * req.method_mut() = msg.subject.0; * req.uri_mut() = msg.subject
.1; * req.headers_mut() = msg.headers; * req.version_mut() = msg.version; * req
.extensions_mut() = msg.extensions; let fut = self.service.call(req); self.in_flight
.set(Some(fut)); Ok(()) } fn poll_ready(& mut self, cx : & mut task::Context <'_ >)
-> Poll < Result < (), () >> { if self.in_flight.is_some() { Poll::Pending } else {
self.service.poll_ready(cx).map_err(| _e | { trace!("service closed"); }) } } fn
should_poll(& self) -> bool { self.in_flight.is_some() } }
}
cfg_client! {
impl < B > Client < B > { pub (crate) fn new(rx : ClientRx < B >) -> Client < B > {
Client { callback : None, rx, rx_closed : false, } } } impl < B > Dispatch for Client

View file

@ -42,17 +42,15 @@
//! }
//! # }
//! ```
#[cfg(
all(
any(feature = "http1", feature = "http2"),
not(all(feature = "http1", feature = "http2"))
)
)]
#[cfg(feature = "http2")]
use crate::common::io::Rewind;
#[cfg(all(
any(feature = "http1", feature = "http2"),
not(all(feature = "http1", feature = "http2"))
))]
use std::marker::PhantomData;
#[cfg(all(any(feature = "http1", feature = "http2"), feature = "runtime"))]
use std::time::Duration;
#[cfg(feature = "http2")]
use crate::common::io::Rewind;
cfg_feature! {
#![any(feature = "http1", feature = "http2")] use std::error::Error as StdError; use
std::fmt; use bytes::Bytes; use pin_project_lite::pin_project; use tokio::io:: {
@ -66,11 +64,6 @@ cfg_feature! {
#[cfg(feature = "tcp")]
pub use super::tcp::{AddrIncoming, AddrStream};
#[derive(Clone, Debug)]
#[cfg(any(feature = "http1", feature = "http2"))]
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
@ -81,32 +74,24 @@ pub(crate) struct Http<E = Exec> {
#[cfg(any(feature = "http1", feature = "http2"))]
#[derive(Clone, Debug, PartialEq)]
enum ConnectionMode {
#[cfg(feature = "http1")]
H1Only,
#[cfg(feature = "http2")]
H2Only,
#[cfg(all(feature = "http1", feature = "http2"))]
Fallback,
}
#[cfg(any(feature = "http1", feature = "http2"))]
pin_project! {
#[doc = " A future binding a connection with a Service."] #[doc = ""] #[doc =
" Polling this future will drive HTTP forward."] #[must_use =
"futures do nothing unless polled"] #[cfg_attr(docsrs, doc(cfg(any(feature = "http1",
feature = "http2"))))] pub struct Connection < T, S, E = Exec > where S : HttpService
< Body >, { pub (super) conn : Option < ProtoServer < T, S::ResBody, S, E >>,
pub struct Connection < T, S, E = Exec > { pub (super) conn : Option < ProtoServer < T, S, S, E >>,
fallback : Fallback < E >, }
}
#[cfg(feature = "http1")]
type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
proto::h1::dispatch::Server<S, Body>,
B,
T,
proto::ServerTransaction,
>;
type Http1Dispatcher<T, B, S> =
proto::h1::Dispatcher<proto::h1::dispatch::Server<S, Body>, B, T, proto::ServerTransaction>;
#[cfg(all(not(feature = "http1"), feature = "http2"))]
type Http1Dispatcher<T, B, S> = (Never, PhantomData<(T, Box<Pin<B>>, Box<Pin<S>>)>);
#[cfg(feature = "http2")]
@ -118,9 +103,11 @@ type Http2Server<T, B, S, E> = (
);
#[cfg(any(feature = "http1", feature = "http2"))]
pin_project! {
#[project = ProtoServerProj] pub (super) enum ProtoServer < T, B, S, E = Exec > where
S : HttpService < Body >, B : HttpBody, { H1 { #[pin] h1 : Http1Dispatcher < T, B, S
>, }, H2 { #[pin] h2 : Http2Server < T, B, S, E >, }, }
#[project = ProtoServerProj] pub (super) enum ProtoServer < T, B, S, E = Exec > {
H1 { #[pin] h1 : (T, B, S), }, H2 { #[pin] h2 : (T, B, S ,E), },
}
}
#[cfg(all(feature = "http1", feature = "http2"))]
#[derive(Clone, Debug)]
@ -128,12 +115,10 @@ enum Fallback<E> {
ToHttp2(proto::h2::server::Config, E),
Http1Only,
}
#[cfg(
all(
any(feature = "http1", feature = "http2"),
not(all(feature = "http1", feature = "http2"))
)
)]
#[cfg(all(
any(feature = "http1", feature = "http2"),
not(all(feature = "http1", feature = "http2"))
))]
type Fallback<E> = PhantomData<E>;
#[cfg(any(feature = "http1", feature = "http2"))]
impl Http {}
@ -157,8 +142,7 @@ mod upgrades {
use super::*;
#[must_use = "futures do nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct UpgradeableConnection<T, S, E>
{
pub struct UpgradeableConnection<T, S, E> {
pub(super) inner: (T, S, E),
}
impl<I, B, S, E> UpgradeableConnection<I, S, E>
@ -169,7 +153,8 @@ mod upgrades {
B: HttpBody + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
E: ConnStreamExec<S::Future, B>,
{}
{
}
impl<I, B, S, E> Future for UpgradeableConnection<I, S, E>
where
S: HttpService<Body, ResBody = B>,
@ -180,10 +165,7 @@ mod upgrades {
E: ConnStreamExec<S::Future, B>,
{
type Output = crate::Result<()>;
fn poll(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
loop {}
}
}

View file

@ -1,26 +1,14 @@
use std::error::Error as StdError;
use crate::body::HttpBody;
use crate::common::{task, Future, Poll};
use crate::common::Future;
use crate::{Request, Response};
use std::error::Error as StdError;
pub trait HttpService<ReqBody>: sealed::Sealed<ReqBody> {
type ResBody: HttpBody;
type Error: Into<Box<dyn StdError + Send + Sync>>;
type ResBody;
type Error;
type Future: Future<Output = Result<Response<Self::ResBody>, Self::Error>>;
#[doc(hidden)]
fn poll_ready(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>>;
#[doc(hidden)]
fn call(&mut self, req: Request<ReqBody>) -> Self::Future;
}
impl<T, B1, B2> HttpService<B1> for T
where
@ -31,21 +19,13 @@ where
type ResBody = B2;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
loop {}
}
fn call(&mut self, req: Request<B1>) -> Self::Future {
loop {}
}
}
impl<T, B1, B2> sealed::Sealed<B1> for T
where
T: tower_service::Service<Request<B1>, Response = Response<B2>>,
B2: HttpBody,
{}
{
}
mod sealed {
pub trait Sealed<T> {}
}