From b1cba78afa722137cb6c7b05e02d9d1ddf73a49f Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Thu, 15 Sep 2022 21:30:58 +0200 Subject: [PATCH] DELETE --- tower/tower/src/lib.rs | 38 +-- tower/tower/src/load/completion.rs | 95 ------ tower/tower/src/load/mod.rs | 17 - tower/tower/src/load/peak_ewma.rs | 407 ----------------------- tower/tower/src/load/pending_requests.rs | 216 ------------ tower/tower/src/make/make_connection.rs | 47 --- tower/tower/src/make/mod.rs | 2 - 7 files changed, 7 insertions(+), 815 deletions(-) delete mode 100644 tower/tower/src/load/completion.rs delete mode 100644 tower/tower/src/load/mod.rs delete mode 100644 tower/tower/src/load/peak_ewma.rs delete mode 100644 tower/tower/src/load/pending_requests.rs delete mode 100644 tower/tower/src/make/make_connection.rs diff --git a/tower/tower/src/lib.rs b/tower/tower/src/lib.rs index a5ffbb8..9d822a5 100644 --- a/tower/tower/src/lib.rs +++ b/tower/tower/src/lib.rs @@ -1,47 +1,16 @@ -#![warn( - missing_debug_implementations, - missing_docs, - rust_2018_idioms, - unreachable_pub -)] -#![forbid(unsafe_code)] -#![allow(elided_lifetimes_in_paths, clippy::type_complexity)] -#![cfg_attr(test, allow(clippy::float_cmp))] -#![cfg_attr(docsrs, feature(doc_cfg))] - #![allow(warnings)] #[macro_use] pub(crate) mod macros; -#[cfg(feature = "balance")] -#[cfg_attr(docsrs, doc(cfg(feature = "balance")))] pub mod balance; -#[cfg(feature = "discover")] -#[cfg_attr(docsrs, doc(cfg(feature = "discover")))] pub mod discover; -#[cfg(feature = "load")] -#[cfg_attr(docsrs, doc(cfg(feature = "load")))] -pub mod load; - - -#[cfg(feature = "make")] -#[cfg_attr(docsrs, doc(cfg(feature = "make")))] pub mod make; - pub mod builder; pub mod layer; -#[doc(inline)] -pub use crate::builder::ServiceBuilder; -#[cfg(feature = "make")] -#[cfg_attr(docsrs, doc(cfg(feature = "make")))] -#[doc(inline)] -pub use crate::make::MakeService; - - #[allow(unreachable_pub)] mod sealed { pub trait Sealed {} @@ -49,3 +18,10 @@ mod sealed { /// Alias for a type-erased error type. pub type BoxError = Box; + +mod load { + pub trait Load { + type Metric; + fn load(&self) -> Self::Metric; + } +} diff --git a/tower/tower/src/load/completion.rs b/tower/tower/src/load/completion.rs deleted file mode 100644 index 3c14a7f..0000000 --- a/tower/tower/src/load/completion.rs +++ /dev/null @@ -1,95 +0,0 @@ -//! Application-specific request completion semantics. - -use futures_core::ready; -use pin_project_lite::pin_project; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -/// Attaches `H`-typed completion tracker to `V` typed values. -/// -/// Handles (of type `H`) are intended to be RAII guards that primarily implement [`Drop`] and update -/// load metric state as they are dropped. This trait allows implementors to "forward" the handle -/// to later parts of the request-handling pipeline, so that the handle is only dropped when the -/// request has truly completed. -/// -/// This utility allows load metrics to have a protocol-agnostic means to track streams past their -/// initial response future. For example, if `V` represents an HTTP response type, an -/// implementation could add `H`-typed handles to each response's extensions to detect when all the -/// response's extensions have been dropped. -/// -/// A base `impl TrackCompletion for CompleteOnResponse` is provided to drop the handle -/// once the response future is resolved. This is appropriate when a response is discrete and -/// cannot comprise multiple messages. -/// -/// In many cases, the `Output` type is simply `V`. However, [`TrackCompletion`] may alter the type -/// in order to instrument it appropriately. For example, an HTTP [`TrackCompletion`] may modify -/// the body type: so a [`TrackCompletion`] that takes values of type -/// [`http::Response`][response] may output values of type [`http::Response`][response]. -/// -/// [response]: https://docs.rs/http/latest/http/response/struct.Response.html -pub trait TrackCompletion: Clone { - /// The instrumented value type. - type Output; - - /// Attaches a `H`-typed handle to a `V`-typed value. - fn track_completion(&self, handle: H, value: V) -> Self::Output; -} - -/// A [`TrackCompletion`] implementation that considers the request completed when the response -/// future is resolved. -#[derive(Clone, Copy, Debug, Default)] -#[non_exhaustive] -pub struct CompleteOnResponse; - -pin_project! { - /// Attaches a `C`-typed completion tracker to the result of an `F`-typed [`Future`]. - #[derive(Debug)] - pub struct TrackCompletionFuture { - #[pin] - future: F, - handle: Option, - completion: C, - } -} - -// ===== impl InstrumentFuture ===== - -impl TrackCompletionFuture { - /// Wraps a future, propagating the tracker into its value if successful. - pub fn new(completion: C, handle: H, future: F) -> Self { - TrackCompletionFuture { - future, - completion, - handle: Some(handle), - } - } -} - -impl Future for TrackCompletionFuture -where - F: Future>, - C: TrackCompletion, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let rsp = ready!(this.future.poll(cx))?; - let h = this.handle.take().expect("handle"); - Poll::Ready(Ok(this.completion.track_completion(h, rsp))) - } -} - -// ===== CompleteOnResponse ===== - -impl TrackCompletion for CompleteOnResponse { - type Output = V; - - fn track_completion(&self, handle: H, value: V) -> V { - drop(handle); - value - } -} diff --git a/tower/tower/src/load/mod.rs b/tower/tower/src/load/mod.rs deleted file mode 100644 index 85b17be..0000000 --- a/tower/tower/src/load/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -pub mod completion; -pub mod peak_ewma; -pub mod pending_requests; - -pub use self::{ - completion::{CompleteOnResponse, TrackCompletion}, - peak_ewma::PeakEwma, - pending_requests::PendingRequests, -}; - -#[cfg(feature = "discover")] -pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover}; - -pub trait Load { - type Metric: PartialOrd; - fn load(&self) -> Self::Metric; -} diff --git a/tower/tower/src/load/peak_ewma.rs b/tower/tower/src/load/peak_ewma.rs deleted file mode 100644 index 61ac201..0000000 --- a/tower/tower/src/load/peak_ewma.rs +++ /dev/null @@ -1,407 +0,0 @@ -//! A `Load` implementation that measures load using the PeakEWMA response latency. - -#[cfg(feature = "discover")] -use crate::discover::{Change, Discover}; -#[cfg(feature = "discover")] -use futures_core::{ready, Stream}; -#[cfg(feature = "discover")] -use pin_project_lite::pin_project; -#[cfg(feature = "discover")] -use std::pin::Pin; - -use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture}; -use super::Load; -use std::task::{Context, Poll}; -use std::{ - sync::{Arc, Mutex}, - time::Duration, -}; -use tokio::time::Instant; -use tower_service::Service; -use tracing::trace; - -/// Measures the load of the underlying service using Peak-EWMA load measurement. -/// -/// [`PeakEwma`] implements [`Load`] with the [`Cost`] metric that estimates the amount of -/// pending work to an endpoint. Work is calculated by multiplying the -/// exponentially-weighted moving average (EWMA) of response latencies by the number of -/// pending requests. The Peak-EWMA algorithm is designed to be especially sensitive to -/// worst-case latencies. Over time, the peak latency value decays towards the moving -/// average of latencies to the endpoint. -/// -/// When no latency information has been measured for an endpoint, an arbitrary default -/// RTT of 1 second is used to prevent the endpoint from being overloaded before a -/// meaningful baseline can be established.. -/// -/// ## Note -/// -/// This is derived from [Finagle][finagle], which is distributed under the Apache V2 -/// license. Copyright 2017, Twitter Inc. -/// -/// [finagle]: -/// https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala -#[derive(Debug)] -pub struct PeakEwma { - service: S, - decay_ns: f64, - rtt_estimate: Arc>, - completion: C, -} - -#[cfg(feature = "discover")] -pin_project! { - /// Wraps a `D`-typed stream of discovered services with `PeakEwma`. - #[cfg_attr(docsrs, doc(cfg(feature = "discover")))] - #[derive(Debug)] - pub struct PeakEwmaDiscover { - #[pin] - discover: D, - decay_ns: f64, - default_rtt: Duration, - completion: C, - } -} - -/// Represents the relative cost of communicating with a service. -/// -/// The underlying value estimates the amount of pending work to a service: the Peak-EWMA -/// latency estimate multiplied by the number of pending requests. -#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)] -pub struct Cost(f64); - -/// Tracks an in-flight request and updates the RTT-estimate on Drop. -#[derive(Debug)] -pub struct Handle { - sent_at: Instant, - decay_ns: f64, - rtt_estimate: Arc>, -} - -/// Holds the current RTT estimate and the last time this value was updated. -#[derive(Debug)] -struct RttEstimate { - update_at: Instant, - rtt_ns: f64, -} - -const NANOS_PER_MILLI: f64 = 1_000_000.0; - -// ===== impl PeakEwma ===== - -impl PeakEwma { - /// Wraps an `S`-typed service so that its load is tracked by the EWMA of its peak latency. - pub fn new(service: S, default_rtt: Duration, decay_ns: f64, completion: C) -> Self { - debug_assert!(decay_ns > 0.0, "decay_ns must be positive"); - Self { - service, - decay_ns, - rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))), - completion, - } - } - - fn handle(&self) -> Handle { - Handle { - decay_ns: self.decay_ns, - sent_at: Instant::now(), - rtt_estimate: self.rtt_estimate.clone(), - } - } -} - -impl Service for PeakEwma -where - S: Service, - C: TrackCompletion, -{ - type Response = C::Output; - type Error = S::Error; - type Future = TrackCompletionFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - TrackCompletionFuture::new( - self.completion.clone(), - self.handle(), - self.service.call(req), - ) - } -} - -impl Load for PeakEwma { - type Metric = Cost; - - fn load(&self) -> Self::Metric { - let pending = Arc::strong_count(&self.rtt_estimate) as u32 - 1; - - // Update the RTT estimate to account for decay since the last update. - // If an estimate has not been established, a default is provided - let estimate = self.update_estimate(); - - let cost = Cost(estimate * f64::from(pending + 1)); - trace!( - "load estimate={:.0}ms pending={} cost={:?}", - estimate / NANOS_PER_MILLI, - pending, - cost, - ); - cost - } -} - -impl PeakEwma { - fn update_estimate(&self) -> f64 { - let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate"); - rtt.decay(self.decay_ns) - } -} - -// ===== impl PeakEwmaDiscover ===== - -#[cfg(feature = "discover")] -impl PeakEwmaDiscover { - /// Wraps a `D`-typed [`Discover`] so that services have a [`PeakEwma`] load metric. - /// - /// The provided `default_rtt` is used as the default RTT estimate for newly - /// added services. - /// - /// They `decay` value determines over what time period a RTT estimate should - /// decay. - pub fn new(discover: D, default_rtt: Duration, decay: Duration, completion: C) -> Self - where - D: Discover, - D::Service: Service, - C: TrackCompletion>::Response>, - { - PeakEwmaDiscover { - discover, - decay_ns: nanos(decay), - default_rtt, - completion, - } - } -} - -#[cfg(feature = "discover")] -#[cfg_attr(docsrs, doc(cfg(feature = "discover")))] -impl Stream for PeakEwmaDiscover -where - D: Discover, - C: Clone, -{ - type Item = Result>, D::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let change = match ready!(this.discover.poll_discover(cx)).transpose()? { - None => return Poll::Ready(None), - Some(Change::Remove(k)) => Change::Remove(k), - Some(Change::Insert(k, svc)) => { - let peak_ewma = PeakEwma::new( - svc, - *this.default_rtt, - *this.decay_ns, - this.completion.clone(), - ); - Change::Insert(k, peak_ewma) - } - }; - - Poll::Ready(Some(Ok(change))) - } -} - -// ===== impl RttEstimate ===== - -impl RttEstimate { - fn new(rtt_ns: f64) -> Self { - debug_assert!(0.0 < rtt_ns, "rtt must be positive"); - Self { - rtt_ns, - update_at: Instant::now(), - } - } - - /// Decays the RTT estimate with a decay period of `decay_ns`. - fn decay(&mut self, decay_ns: f64) -> f64 { - // Updates with a 0 duration so that the estimate decays towards 0. - let now = Instant::now(); - self.update(now, now, decay_ns) - } - - /// Updates the Peak-EWMA RTT estimate. - /// - /// The elapsed time from `sent_at` to `recv_at` is added - fn update(&mut self, sent_at: Instant, recv_at: Instant, decay_ns: f64) -> f64 { - debug_assert!( - sent_at <= recv_at, - "recv_at={:?} after sent_at={:?}", - recv_at, - sent_at - ); - let rtt = nanos(recv_at.saturating_duration_since(sent_at)); - - let now = Instant::now(); - debug_assert!( - self.update_at <= now, - "update_at={:?} in the future", - self.update_at - ); - - self.rtt_ns = if self.rtt_ns < rtt { - // For Peak-EWMA, always use the worst-case (peak) value as the estimate for - // subsequent requests. - trace!( - "update peak rtt={}ms prior={}ms", - rtt / NANOS_PER_MILLI, - self.rtt_ns / NANOS_PER_MILLI, - ); - rtt - } else { - // When an RTT is observed that is less than the estimated RTT, we decay the - // prior estimate according to how much time has elapsed since the last - // update. The inverse of the decay is used to scale the estimate towards the - // observed RTT value. - let elapsed = nanos(now.saturating_duration_since(self.update_at)); - let decay = (-elapsed / decay_ns).exp(); - let recency = 1.0 - decay; - let next_estimate = (self.rtt_ns * decay) + (rtt * recency); - trace!( - "update rtt={:03.0}ms decay={:06.0}ns; next={:03.0}ms", - rtt / NANOS_PER_MILLI, - self.rtt_ns - next_estimate, - next_estimate / NANOS_PER_MILLI, - ); - next_estimate - }; - self.update_at = now; - - self.rtt_ns - } -} - -// ===== impl Handle ===== - -impl Drop for Handle { - fn drop(&mut self) { - let recv_at = Instant::now(); - - if let Ok(mut rtt) = self.rtt_estimate.lock() { - rtt.update(self.sent_at, recv_at, self.decay_ns); - } - } -} - -// ===== impl Cost ===== - -// Utility that converts durations to nanos in f64. -// -// Due to a lossy transformation, the maximum value that can be represented is ~585 years, -// which, I hope, is more than enough to represent request latencies. -fn nanos(d: Duration) -> f64 { - const NANOS_PER_SEC: u64 = 1_000_000_000; - let n = f64::from(d.subsec_nanos()); - let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64; - n + s -} - -#[cfg(test)] -mod tests { - use futures_util::future; - use std::time::Duration; - use tokio::time; - use tokio_test::{assert_ready, assert_ready_ok, task}; - - use super::*; - - struct Svc; - impl Service<()> for Svc { - type Response = (); - type Error = (); - type Future = future::Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, (): ()) -> Self::Future { - future::ok(()) - } - } - - /// The default RTT estimate decays, so that new nodes are considered if the - /// default RTT is too high. - #[tokio::test] - async fn default_decay() { - time::pause(); - - let svc = PeakEwma::new( - Svc, - Duration::from_millis(10), - NANOS_PER_MILLI * 1_000.0, - CompleteOnResponse, - ); - let Cost(load) = svc.load(); - assert_eq!(load, 10.0 * NANOS_PER_MILLI); - - time::advance(Duration::from_millis(100)).await; - let Cost(load) = svc.load(); - assert!(9.0 * NANOS_PER_MILLI < load && load < 10.0 * NANOS_PER_MILLI); - - time::advance(Duration::from_millis(100)).await; - let Cost(load) = svc.load(); - assert!(8.0 * NANOS_PER_MILLI < load && load < 9.0 * NANOS_PER_MILLI); - } - - // The default RTT estimate decays, so that new nodes are considered if the default RTT is too - // high. - #[tokio::test] - async fn compound_decay() { - time::pause(); - - let mut svc = PeakEwma::new( - Svc, - Duration::from_millis(20), - NANOS_PER_MILLI * 1_000.0, - CompleteOnResponse, - ); - assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI)); - - time::advance(Duration::from_millis(100)).await; - let mut rsp0 = task::spawn(svc.call(())); - assert!(svc.load() > Cost(20.0 * NANOS_PER_MILLI)); - - time::advance(Duration::from_millis(100)).await; - let mut rsp1 = task::spawn(svc.call(())); - assert!(svc.load() > Cost(40.0 * NANOS_PER_MILLI)); - - time::advance(Duration::from_millis(100)).await; - let () = assert_ready_ok!(rsp0.poll()); - assert_eq!(svc.load(), Cost(400_000_000.0)); - - time::advance(Duration::from_millis(100)).await; - let () = assert_ready_ok!(rsp1.poll()); - assert_eq!(svc.load(), Cost(200_000_000.0)); - - // Check that values decay as time elapses - time::advance(Duration::from_secs(1)).await; - assert!(svc.load() < Cost(100_000_000.0)); - - time::advance(Duration::from_secs(10)).await; - assert!(svc.load() < Cost(100_000.0)); - } - - #[test] - fn nanos() { - assert_eq!(super::nanos(Duration::new(0, 0)), 0.0); - assert_eq!(super::nanos(Duration::new(0, 123)), 123.0); - assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0); - assert_eq!( - super::nanos(Duration::new(::std::u64::MAX, 999_999_999)), - 18446744074709553000.0 - ); - } -} diff --git a/tower/tower/src/load/pending_requests.rs b/tower/tower/src/load/pending_requests.rs deleted file mode 100644 index 3d8689b..0000000 --- a/tower/tower/src/load/pending_requests.rs +++ /dev/null @@ -1,216 +0,0 @@ -//! A [`Load`] implementation that measures load using the number of in-flight requests. - -#[cfg(feature = "discover")] -use crate::discover::{Change, Discover}; -#[cfg(feature = "discover")] -use futures_core::{ready, Stream}; -#[cfg(feature = "discover")] -use pin_project_lite::pin_project; -#[cfg(feature = "discover")] -use std::pin::Pin; - -use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture}; -use super::Load; -use std::sync::Arc; -use std::task::{Context, Poll}; -use tower_service::Service; - -/// Measures the load of the underlying service using the number of currently-pending requests. -#[derive(Debug)] -pub struct PendingRequests { - service: S, - ref_count: RefCount, - completion: C, -} - -/// Shared between instances of [`PendingRequests`] and [`Handle`] to track active references. -#[derive(Clone, Debug, Default)] -struct RefCount(Arc<()>); - -#[cfg(feature = "discover")] -pin_project! { - /// Wraps a `D`-typed stream of discovered services with [`PendingRequests`]. - #[cfg_attr(docsrs, doc(cfg(feature = "discover")))] - #[derive(Debug)] - pub struct PendingRequestsDiscover { - #[pin] - discover: D, - completion: C, - } -} - -/// Represents the number of currently-pending requests to a given service. -#[derive(Clone, Copy, Debug, Default, PartialOrd, PartialEq, Ord, Eq)] -pub struct Count(usize); - -/// Tracks an in-flight request by reference count. -#[derive(Debug)] -pub struct Handle(RefCount); - -// ===== impl PendingRequests ===== - -impl PendingRequests { - /// Wraps an `S`-typed service so that its load is tracked by the number of pending requests. - pub fn new(service: S, completion: C) -> Self { - Self { - service, - completion, - ref_count: RefCount::default(), - } - } - - fn handle(&self) -> Handle { - Handle(self.ref_count.clone()) - } -} - -impl Load for PendingRequests { - type Metric = Count; - - fn load(&self) -> Count { - // Count the number of references that aren't `self`. - Count(self.ref_count.ref_count() - 1) - } -} - -impl Service for PendingRequests -where - S: Service, - C: TrackCompletion, -{ - type Response = C::Output; - type Error = S::Error; - type Future = TrackCompletionFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - TrackCompletionFuture::new( - self.completion.clone(), - self.handle(), - self.service.call(req), - ) - } -} - -// ===== impl PendingRequestsDiscover ===== - -#[cfg(feature = "discover")] -impl PendingRequestsDiscover { - /// Wraps a [`Discover`], wrapping all of its services with [`PendingRequests`]. - pub fn new(discover: D, completion: C) -> Self - where - D: Discover, - D::Service: Service, - C: TrackCompletion>::Response>, - { - Self { - discover, - completion, - } - } -} - -#[cfg(feature = "discover")] -impl Stream for PendingRequestsDiscover -where - D: Discover, - C: Clone, -{ - type Item = Result>, D::Error>; - - /// Yields the next discovery change set. - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - use self::Change::*; - - let this = self.project(); - let change = match ready!(this.discover.poll_discover(cx)).transpose()? { - None => return Poll::Ready(None), - Some(Insert(k, svc)) => Insert(k, PendingRequests::new(svc, this.completion.clone())), - Some(Remove(k)) => Remove(k), - }; - - Poll::Ready(Some(Ok(change))) - } -} - -// ==== RefCount ==== - -impl RefCount { - pub(crate) fn ref_count(&self) -> usize { - Arc::strong_count(&self.0) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use futures_util::future; - use std::task::{Context, Poll}; - - struct Svc; - impl Service<()> for Svc { - type Response = (); - type Error = (); - type Future = future::Ready>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, (): ()) -> Self::Future { - future::ok(()) - } - } - - #[test] - fn default() { - let mut svc = PendingRequests::new(Svc, CompleteOnResponse); - assert_eq!(svc.load(), Count(0)); - - let rsp0 = svc.call(()); - assert_eq!(svc.load(), Count(1)); - - let rsp1 = svc.call(()); - assert_eq!(svc.load(), Count(2)); - - let () = tokio_test::block_on(rsp0).unwrap(); - assert_eq!(svc.load(), Count(1)); - - let () = tokio_test::block_on(rsp1).unwrap(); - assert_eq!(svc.load(), Count(0)); - } - - #[test] - fn with_completion() { - #[derive(Clone)] - struct IntoHandle; - impl TrackCompletion for IntoHandle { - type Output = Handle; - fn track_completion(&self, i: Handle, (): ()) -> Handle { - i - } - } - - let mut svc = PendingRequests::new(Svc, IntoHandle); - assert_eq!(svc.load(), Count(0)); - - let rsp = svc.call(()); - assert_eq!(svc.load(), Count(1)); - let i0 = tokio_test::block_on(rsp).unwrap(); - assert_eq!(svc.load(), Count(1)); - - let rsp = svc.call(()); - assert_eq!(svc.load(), Count(2)); - let i1 = tokio_test::block_on(rsp).unwrap(); - assert_eq!(svc.load(), Count(2)); - - drop(i1); - assert_eq!(svc.load(), Count(1)); - - drop(i0); - assert_eq!(svc.load(), Count(0)); - } -} diff --git a/tower/tower/src/make/make_connection.rs b/tower/tower/src/make/make_connection.rs deleted file mode 100644 index 9566cc6..0000000 --- a/tower/tower/src/make/make_connection.rs +++ /dev/null @@ -1,47 +0,0 @@ -use crate::sealed::Sealed; -use std::future::Future; -use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tower_service::Service; - -/// The [`MakeConnection`] trait is used to create transports. -/// -/// The goal of this service is to allow composable methods for creating -/// `AsyncRead + AsyncWrite` transports. This could mean creating a TLS -/// based connection or using some other method to authenticate the connection. -pub trait MakeConnection: Sealed<(Target,)> { - /// The transport provided by this service - type Connection: AsyncRead + AsyncWrite; - - /// Errors produced by the connecting service - type Error; - - /// The future that eventually produces the transport - type Future: Future>; - - /// Returns `Poll::Ready(Ok(()))` when it is able to make more connections. - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; - - /// Connect and return a transport asynchronously - fn make_connection(&mut self, target: Target) -> Self::Future; -} - -impl Sealed<(Target,)> for S where S: Service {} - -impl MakeConnection for C -where - C: Service, - C::Response: AsyncRead + AsyncWrite, -{ - type Connection = C::Response; - type Error = C::Error; - type Future = C::Future; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - Service::poll_ready(self, cx) - } - - fn make_connection(&mut self, target: Target) -> Self::Future { - Service::call(self, target) - } -} diff --git a/tower/tower/src/make/mod.rs b/tower/tower/src/make/mod.rs index a377f2a..41b20c7 100644 --- a/tower/tower/src/make/mod.rs +++ b/tower/tower/src/make/mod.rs @@ -1,9 +1,7 @@ //! Trait aliases for Services that produce specific types of Responses. -mod make_connection; mod make_service; -pub use self::make_connection::MakeConnection; pub use self::make_service::shared::Shared; pub use self::make_service::{AsService, IntoService, MakeService};