From 9f18e8cb032e664788569ceeb839bf37b8298b7a Mon Sep 17 00:00:00 2001 From: Nilstrieb <48135649+Nilstrieb@users.noreply.github.com> Date: Thu, 15 Sep 2022 21:27:27 +0200 Subject: [PATCH] delete --- .gitmodules | 3 - .../src/{balance/pool/mod.rs => balance.rs} | 82 +++++--- tower/tower/src/balance/error.rs | 21 -- tower/tower/src/balance/mod.rs | 61 ------ tower/tower/src/balance/p2c/layer.rs | 60 ------ tower/tower/src/balance/p2c/make.rs | 125 ------------ tower/tower/src/balance/p2c/mod.rs | 41 ---- tower/tower/src/balance/p2c/service.rs | 58 ------ tower/tower/src/balance/p2c/test.rs | 125 ------------ tower/tower/src/balance/pool/test.rs | 190 ------------------ .../src/{discover/mod.rs => discover.rs} | 9 +- tower/tower/src/discover/error.rs | 12 -- tower/tower/src/discover/list.rs | 61 ------ tower/tower/src/load/constant.rs | 80 -------- tower/tower/src/load/mod.rs | 72 ------- 15 files changed, 53 insertions(+), 947 deletions(-) delete mode 100644 .gitmodules rename tower/tower/src/{balance/pool/mod.rs => balance.rs} (57%) delete mode 100644 tower/tower/src/balance/error.rs delete mode 100644 tower/tower/src/balance/mod.rs delete mode 100644 tower/tower/src/balance/p2c/layer.rs delete mode 100644 tower/tower/src/balance/p2c/make.rs delete mode 100644 tower/tower/src/balance/p2c/mod.rs delete mode 100644 tower/tower/src/balance/p2c/service.rs delete mode 100644 tower/tower/src/balance/p2c/test.rs delete mode 100644 tower/tower/src/balance/pool/test.rs rename tower/tower/src/{discover/mod.rs => discover.rs} (94%) delete mode 100644 tower/tower/src/discover/error.rs delete mode 100644 tower/tower/src/discover/list.rs delete mode 100644 tower/tower/src/load/constant.rs diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index f80e285..0000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "tower"] - path = tower - url = https://github.com/tower-rs/tower.git diff --git a/tower/tower/src/balance/pool/mod.rs b/tower/tower/src/balance.rs similarity index 57% rename from tower/tower/src/balance/pool/mod.rs rename to tower/tower/src/balance.rs index 837b58a..fc75ab0 100644 --- a/tower/tower/src/balance/pool/mod.rs +++ b/tower/tower/src/balance.rs @@ -1,37 +1,64 @@ -//! This module defines a load-balanced pool of services that adds new services when load is high. -//! -//! The pool uses `poll_ready` as a signal indicating whether additional services should be spawned -//! to handle the current level of load. Specifically, every time `poll_ready` on the inner service -//! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `Pending`, [`Pool`] -//! considers it a 1. [`Pool`] then maintains an [exponential moving -//! average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) over those -//! samples, which gives an estimate of how often the underlying service has been ready when it was -//! needed "recently" (see [`Builder::urgency`]). If the service is loaded (see -//! [`Builder::loaded_above`]), a new service is created and added to the underlying [`Balance`]. -//! If the service is underutilized (see [`Builder::underutilized_below`]) and there are two or -//! more services, then the latest added service is removed. In either case, the load estimate is -//! reset to its initial value (see [`Builder::initial`] to prevent services from being rapidly -//! added or removed. -#![deny(missing_docs)] - -use super::p2c::Balance; -use crate::discover::Change; +use crate::discover::{Change, Discover}; use crate::load::Load; use crate::make::MakeService; -use futures_core::{ready, Stream}; +use futures_core::ready; +use futures_core::Stream; +use futures_util::future::{self, TryFutureExt}; use pin_project_lite::pin_project; +use rand::{rngs::SmallRng, Rng, SeedableRng}; use slab::Slab; +use std::hash::Hash; +use std::marker::PhantomData; use std::{ fmt, future::Future, - marker::PhantomData, pin::Pin, task::{Context, Poll}, }; +use tokio::sync::oneshot; use tower_service::Service; +use tracing::{debug, trace}; + +pub struct Balance { + _req: PhantomData<(D, Req)>, +} + +impl Balance +where + D: Discover, + D::Service: Service, + >::Error: Into, +{ + pub fn new(discover: D) -> Self { + todo!() + } +} + +impl Service for Balance +where + D: Discover + Unpin, + D::Key: Hash + Clone, + D::Error: Into, + D::Service: Service + Load, + ::Metric: std::fmt::Debug, + >::Error: Into, +{ + type Response = >::Response; + type Error = crate::BoxError; + type Future = future::MapErr< + >::Future, + fn(>::Error) -> crate::BoxError, + >; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + todo!() + } + + fn call(&mut self, request: Req) -> Self::Future { + todo!() + } +} -/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a -/// service when load is low. See [`Pool`]. pub struct PoolDiscoverer where MS: MakeService, @@ -50,16 +77,9 @@ where } } -/// A [builder] that lets you configure how a [`Pool`] determines whether the underlying service is -/// loaded or not. See the [module-level documentation](self) and the builder's methods for -/// details. -/// -/// [builder]: https://rust-lang-nursery.github.io/api-guidelines/type-safety.html#builders-enable-construction-of-complex-values-c-builder -#[derive(Copy, Clone, Debug)] pub struct Builder {} impl Builder { - /// See [`Pool::new`]. pub fn build() -> () where MS: MakeService, @@ -76,9 +96,7 @@ impl Builder { } } -/// A dynamically sized, load-balanced pool of `Service` instances. pub struct Pool { - // the Pin> here is needed since Balance requires the Service to be Unpin balance: (MS, Target, Request), } @@ -106,8 +124,6 @@ where } } -#[doc(hidden)] -#[derive(Debug)] pub struct DropNotifyService { svc: Svc, id: usize, diff --git a/tower/tower/src/balance/error.rs b/tower/tower/src/balance/error.rs deleted file mode 100644 index 4d47630..0000000 --- a/tower/tower/src/balance/error.rs +++ /dev/null @@ -1,21 +0,0 @@ -//! Error types for the [`tower::balance`] middleware. -//! -//! [`tower::balance`]: crate::balance - -use std::fmt; - -/// The balancer's endpoint discovery stream failed. -#[derive(Debug)] -pub struct Discover(pub(crate) crate::BoxError); - -impl fmt::Display for Discover { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "load balancer discovery error: {}", self.0) - } -} - -impl std::error::Error for Discover { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&*self.0) - } -} diff --git a/tower/tower/src/balance/mod.rs b/tower/tower/src/balance/mod.rs deleted file mode 100644 index 4e27f4a..0000000 --- a/tower/tower/src/balance/mod.rs +++ /dev/null @@ -1,61 +0,0 @@ -//! Middleware that allows balancing load among multiple services. -//! -//! In larger systems, multiple endpoints are often available for a given service. As load -//! increases, you want to ensure that that load is spread evenly across the available services. -//! Otherwise, clients could see spikes in latency if their request goes to a particularly loaded -//! service, even when spare capacity is available to handle that request elsewhere. -//! -//! This module provides two pieces of middleware that helps with this type of load balancing: -//! -//! First, [`p2c`] implements the "[Power of Two Random Choices]" algorithm, a simple but robust -//! technique for spreading load across services with only inexact load measurements. Use this if -//! the set of available services is not within your control, and you simply want to spread load -//! among that set of services. -//! -//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf -//! -//! Second, [`pool`] implements a dynamically sized pool of services. It estimates the overall -//! current load by tracking successful and unsuccessful calls to [`poll_ready`], and uses an -//! exponentially weighted moving average to add (using [`MakeService`]) or remove (by dropping) -//! services in response to increases or decreases in load. Use this if you are able to -//! dynamically add more service endpoints to the system to handle added load. -//! -//! # Examples -//! -//! ```rust -//! # #[cfg(feature = "util")] -//! # #[cfg(feature = "load")] -//! # fn warnings_are_errors() { -//! use tower::balance::p2c::Balance; -//! use tower::load::Load; -//! use tower::{Service, ServiceExt}; -//! use futures_util::pin_mut; -//! # use futures_core::Stream; -//! # use futures_util::StreamExt; -//! -//! async fn spread + Load>(svc1: S, svc2: S, reqs: impl Stream) -//! where -//! S::Error: Into, -//! # // this bound is pretty unfortunate, and the compiler does _not_ help -//! S::Metric: std::fmt::Debug, -//! { -//! // Spread load evenly across the two services -//! let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2])); -//! -//! // Issue all the requests that come in. -//! // Some will go to svc1, some will go to svc2. -//! pin_mut!(reqs); -//! let mut responses = p2c.call_all(reqs); -//! while let Some(rsp) = responses.next().await { -//! // ... -//! } -//! } -//! # } -//! ``` -//! -//! [`MakeService`]: crate::MakeService -//! [`poll_ready`]: crate::Service::poll_ready - -pub mod error; -pub mod p2c; -pub mod pool; diff --git a/tower/tower/src/balance/p2c/layer.rs b/tower/tower/src/balance/p2c/layer.rs deleted file mode 100644 index ce59c6e..0000000 --- a/tower/tower/src/balance/p2c/layer.rs +++ /dev/null @@ -1,60 +0,0 @@ -use super::MakeBalance; -use std::{fmt, marker::PhantomData}; -use tower_layer::Layer; - -/// Construct load balancers ([`Balance`]) over dynamic service sets ([`Discover`]) produced by the -/// "inner" service in response to requests coming from the "outer" service. -/// -/// This construction may seem a little odd at first glance. This is not a layer that takes -/// requests and produces responses in the traditional sense. Instead, it is more like -/// [`MakeService`] in that it takes service _descriptors_ (see `Target` on [`MakeService`]) -/// and produces _services_. Since [`Balance`] spreads requests across a _set_ of services, -/// the inner service should produce a [`Discover`], not just a single -/// [`Service`], given a service descriptor. -/// -/// See the [module-level documentation](crate::balance) for details on load balancing. -/// -/// [`Balance`]: crate::balance::p2c::Balance -/// [`Discover`]: crate::discover::Discover -/// [`MakeService`]: crate::MakeService -/// [`Service`]: crate::Service -pub struct MakeBalanceLayer { - _marker: PhantomData, -} - -impl MakeBalanceLayer { - /// Build balancers using operating system entropy. - pub fn new() -> Self { - Self { - _marker: PhantomData, - } - } -} - -impl Default for MakeBalanceLayer { - fn default() -> Self { - Self::new() - } -} - -impl Clone for MakeBalanceLayer { - fn clone(&self) -> Self { - Self { - _marker: PhantomData, - } - } -} - -impl Layer for MakeBalanceLayer { - type Service = MakeBalance; - - fn layer(&self, make_discover: S) -> Self::Service { - MakeBalance::new(make_discover) - } -} - -impl fmt::Debug for MakeBalanceLayer { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("MakeBalanceLayer").finish() - } -} diff --git a/tower/tower/src/balance/p2c/make.rs b/tower/tower/src/balance/p2c/make.rs deleted file mode 100644 index 538d703..0000000 --- a/tower/tower/src/balance/p2c/make.rs +++ /dev/null @@ -1,125 +0,0 @@ -use super::Balance; -use crate::discover::Discover; -use futures_core::ready; -use pin_project_lite::pin_project; -use std::hash::Hash; -use std::marker::PhantomData; -use std::{ - fmt, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; -use tower_service::Service; - -/// Constructs load balancers over dynamic service sets produced by a wrapped "inner" service. -/// -/// This is effectively an implementation of [`MakeService`] except that it forwards the service -/// descriptors (`Target`) to an inner service (`S`), and expects that service to produce a -/// service set in the form of a [`Discover`]. It then wraps the service set in a [`Balance`] -/// before returning it as the "made" service. -/// -/// See the [module-level documentation](crate::balance) for details on load balancing. -/// -/// [`MakeService`]: crate::MakeService -/// [`Discover`]: crate::discover::Discover -/// [`Balance`]: crate::balance::p2c::Balance -pub struct MakeBalance { - inner: S, - _marker: PhantomData, -} - -pin_project! { - /// A [`Balance`] in the making. - /// - /// [`Balance`]: crate::balance::p2c::Balance - pub struct MakeFuture { - #[pin] - inner: F, - _marker: PhantomData, - } -} - -impl MakeBalance { - /// Build balancers using operating system entropy. - pub fn new(make_discover: S) -> Self { - Self { - inner: make_discover, - _marker: PhantomData, - } - } -} - -impl Clone for MakeBalance -where - S: Clone, -{ - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - _marker: PhantomData, - } - } -} - -impl Service for MakeBalance -where - S: Service, - S::Response: Discover, - ::Key: Hash, - ::Service: Service, - <::Service as Service>::Error: Into, -{ - type Response = Balance; - type Error = S::Error; - type Future = MakeFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, target: Target) -> Self::Future { - MakeFuture { - inner: self.inner.call(target), - _marker: PhantomData, - } - } -} - -impl fmt::Debug for MakeBalance -where - S: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let Self { inner, _marker } = self; - f.debug_struct("MakeBalance").field("inner", inner).finish() - } -} - -impl Future for MakeFuture -where - F: Future>, - T: Discover, - ::Key: Hash, - ::Service: Service, - <::Service as Service>::Error: Into, -{ - type Output = Result, E>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let inner = ready!(this.inner.poll(cx))?; - let svc = Balance::new(inner); - Poll::Ready(Ok(svc)) - } -} - -impl fmt::Debug for MakeFuture -where - F: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let Self { inner, _marker } = self; - f.debug_struct("MakeFuture").field("inner", inner).finish() - } -} diff --git a/tower/tower/src/balance/p2c/mod.rs b/tower/tower/src/balance/p2c/mod.rs deleted file mode 100644 index 0d4203c..0000000 --- a/tower/tower/src/balance/p2c/mod.rs +++ /dev/null @@ -1,41 +0,0 @@ -//! This module implements the "[Power of Two Random Choices]" load balancing algorithm. -//! -//! It is a simple but robust technique for spreading load across services with only inexact load -//! measurements. As its name implies, whenever a request comes in, it samples two ready services -//! at random, and issues the request to whichever service is less loaded. How loaded a service is -//! is determined by the return value of [`Load`](crate::load::Load). -//! -//! As described in the [Finagle Guide][finagle]: -//! -//! > The algorithm randomly picks two services from the set of ready endpoints and -//! > selects the least loaded of the two. By repeatedly using this strategy, we can -//! > expect a manageable upper bound on the maximum load of any server. -//! > -//! > The maximum load variance between any two servers is bound by `ln(ln(n))` where -//! > `n` is the number of servers in the cluster. -//! -//! The balance service and layer implementations rely on _service discovery_ to provide the -//! underlying set of services to balance requests across. This happens through the -//! [`Discover`](crate::discover::Discover) trait, which is essentially a [`Stream`] that indicates -//! when services become available or go away. If you have a fixed set of services, consider using -//! [`ServiceList`](crate::discover::ServiceList). -//! -//! Since the load balancer needs to perform _random_ choices, the constructors in this module -//! usually come in two forms: one that uses randomness provided by the operating system, and one -//! that lets you specify the random seed to use. Usually the former is what you'll want, though -//! the latter may come in handy for reproducability or to reduce reliance on the operating system. -//! -//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf -//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded -//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html - -mod layer; -mod make; -mod service; - -#[cfg(test)] -mod test; - -pub use layer::MakeBalanceLayer; -pub use make::{MakeBalance, MakeFuture}; -pub use service::Balance; diff --git a/tower/tower/src/balance/p2c/service.rs b/tower/tower/src/balance/p2c/service.rs deleted file mode 100644 index 5843df4..0000000 --- a/tower/tower/src/balance/p2c/service.rs +++ /dev/null @@ -1,58 +0,0 @@ -use super::super::error; -use crate::discover::{Change, Discover}; -use crate::load::Load; -use futures_core::ready; -use futures_util::future::{self, TryFutureExt}; -use pin_project_lite::pin_project; -use rand::{rngs::SmallRng, Rng, SeedableRng}; -use std::hash::Hash; -use std::marker::PhantomData; -use std::{ - fmt, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; -use tokio::sync::oneshot; -use tower_service::Service; -use tracing::{debug, trace}; - -pub struct Balance { - _req: PhantomData<(D, Req)>, -} - -impl Balance -where - D: Discover, - D::Service: Service, - >::Error: Into, -{ - pub fn new(discover: D) -> Self { - todo!() - } -} - -impl Service for Balance -where - D: Discover + Unpin, - D::Key: Hash + Clone, - D::Error: Into, - D::Service: Service + Load, - ::Metric: std::fmt::Debug, - >::Error: Into, -{ - type Response = >::Response; - type Error = crate::BoxError; - type Future = future::MapErr< - >::Future, - fn(>::Error) -> crate::BoxError, - >; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - todo!() - } - - fn call(&mut self, request: Req) -> Self::Future { - todo!() - } -} diff --git a/tower/tower/src/balance/p2c/test.rs b/tower/tower/src/balance/p2c/test.rs deleted file mode 100644 index 2370860..0000000 --- a/tower/tower/src/balance/p2c/test.rs +++ /dev/null @@ -1,125 +0,0 @@ -use crate::discover::ServiceList; -use crate::load; -use futures_util::pin_mut; -use std::task::Poll; -use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task}; -use tower_test::{assert_request_eq, mock}; - -use super::*; - -#[tokio::test] -async fn empty() { - let empty: Vec, usize>> = vec![]; - let disco = ServiceList::new(empty); - let mut svc = mock::Spawn::new(Balance::new(disco)); - assert_pending!(svc.poll_ready()); -} - -#[tokio::test] -async fn single_endpoint() { - let (mut svc, mut handle) = mock::spawn_with(|s| { - let mock = load::Constant::new(s, 0); - let disco = ServiceList::new(vec![mock].into_iter()); - Balance::new(disco) - }); - - handle.allow(0); - assert_pending!(svc.poll_ready()); - assert_eq!( - svc.get_ref().len(), - 1, - "balancer must have discovered endpoint" - ); - - handle.allow(1); - assert_ready_ok!(svc.poll_ready()); - - let mut fut = task::spawn(svc.call(())); - - assert_request_eq!(handle, ()).send_response(1); - - assert_eq!(assert_ready_ok!(fut.poll()), 1); - handle.allow(1); - assert_ready_ok!(svc.poll_ready()); - - handle.send_error("endpoint lost"); - assert_pending!(svc.poll_ready()); - assert!( - svc.get_ref().is_empty(), - "balancer must drop failed endpoints" - ); -} - -#[tokio::test] -async fn two_endpoints_with_equal_load() { - let (mock_a, handle_a) = mock::pair(); - let (mock_b, handle_b) = mock::pair(); - let mock_a = load::Constant::new(mock_a, 1); - let mock_b = load::Constant::new(mock_b, 1); - - pin_mut!(handle_a); - pin_mut!(handle_b); - - let disco = ServiceList::new(vec![mock_a, mock_b].into_iter()); - let mut svc = mock::Spawn::new(Balance::new(disco)); - - handle_a.allow(0); - handle_b.allow(0); - assert_pending!(svc.poll_ready()); - assert_eq!( - svc.get_ref().len(), - 2, - "balancer must have discovered both endpoints" - ); - - handle_a.allow(1); - handle_b.allow(0); - assert_ready_ok!( - svc.poll_ready(), - "must be ready when one of two services is ready" - ); - { - let mut fut = task::spawn(svc.call(())); - assert_request_eq!(handle_a, ()).send_response("a"); - assert_eq!(assert_ready_ok!(fut.poll()), "a"); - } - - handle_a.allow(0); - handle_b.allow(1); - assert_ready_ok!( - svc.poll_ready(), - "must be ready when both endpoints are ready" - ); - { - let mut fut = task::spawn(svc.call(())); - assert_request_eq!(handle_b, ()).send_response("b"); - assert_eq!(assert_ready_ok!(fut.poll()), "b"); - } - - handle_a.allow(1); - handle_b.allow(1); - for _ in 0..2 { - assert_ready_ok!( - svc.poll_ready(), - "must be ready when both endpoints are ready" - ); - let mut fut = task::spawn(svc.call(())); - - for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] { - if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request() { - tracing::info!("using {}", c); - tx.send_response(c); - h.allow(0); - } - } - assert_ready_ok!(fut.poll()); - } - - handle_a.send_error("endpoint lost"); - assert_pending!(svc.poll_ready()); - assert_eq!( - svc.get_ref().len(), - 1, - "balancer must drop failed endpoints", - ); -} diff --git a/tower/tower/src/balance/pool/test.rs b/tower/tower/src/balance/pool/test.rs deleted file mode 100644 index 6861b25..0000000 --- a/tower/tower/src/balance/pool/test.rs +++ /dev/null @@ -1,190 +0,0 @@ -use crate::load; -use futures_util::pin_mut; -use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task}; -use tower_test::{assert_request_eq, mock}; - -use super::*; - -#[tokio::test] -async fn basic() { - // start the pool - let (mock, handle) = mock::pair::<(), load::Constant, usize>>(); - pin_mut!(handle); - - let mut pool = mock::Spawn::new(Builder::new().build(mock, ())); - assert_pending!(pool.poll_ready()); - - // give the pool a backing service - let (svc1_m, svc1) = mock::pair(); - pin_mut!(svc1); - - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); - - // send a request to the one backing service - let mut fut = task::spawn(pool.call(())); - - assert_pending!(fut.poll()); - assert_request_eq!(svc1, ()).send_response("foobar"); - assert_eq!(assert_ready_ok!(fut.poll()), "foobar"); -} - -#[tokio::test] -async fn high_load() { - // start the pool - let (mock, handle) = mock::pair::<(), load::Constant, usize>>(); - pin_mut!(handle); - - let pool = Builder::new() - .urgency(1.0) // so _any_ Pending will add a service - .underutilized_below(0.0) // so no Ready will remove a service - .max_services(Some(2)) - .build(mock, ()); - let mut pool = mock::Spawn::new(pool); - assert_pending!(pool.poll_ready()); - - // give the pool a backing service - let (svc1_m, svc1) = mock::pair(); - pin_mut!(svc1); - - svc1.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); - - // make the one backing service not ready - let mut fut1 = task::spawn(pool.call(())); - - // if we poll_ready again, pool should notice that load is increasing - // since urgency == 1.0, it should immediately enter high load - assert_pending!(pool.poll_ready()); - // it should ask the maker for another service, so we give it one - let (svc2_m, svc2) = mock::pair(); - pin_mut!(svc2); - - svc2.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); - - // the pool should now be ready again for one more request - assert_ready_ok!(pool.poll_ready()); - let mut fut2 = task::spawn(pool.call(())); - - assert_pending!(pool.poll_ready()); - - // the pool should _not_ try to add another service - // sicen we have max_services(2) - assert_pending!(handle.as_mut().poll_request()); - - // let see that each service got one request - assert_request_eq!(svc1, ()).send_response("foo"); - assert_request_eq!(svc2, ()).send_response("bar"); - assert_eq!(assert_ready_ok!(fut1.poll()), "foo"); - assert_eq!(assert_ready_ok!(fut2.poll()), "bar"); -} - -#[tokio::test] -async fn low_load() { - // start the pool - let (mock, handle) = mock::pair::<(), load::Constant, usize>>(); - pin_mut!(handle); - - let pool = Builder::new() - .urgency(1.0) // so any event will change the service count - .build(mock, ()); - - let mut pool = mock::Spawn::new(pool); - - assert_pending!(pool.poll_ready()); - - // give the pool a backing service - let (svc1_m, svc1) = mock::pair(); - pin_mut!(svc1); - - svc1.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); - - // cycling a request should now work - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc1, ()).send_response("foo"); - assert_eq!(assert_ready_ok!(fut.poll()), "foo"); - // and pool should now not be ready (since svc1 isn't ready) - // it should immediately try to add another service - // which we give it - assert_pending!(pool.poll_ready()); - let (svc2_m, svc2) = mock::pair(); - pin_mut!(svc2); - - svc2.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); - // pool is now ready - // which (because of urgency == 1.0) should immediately cause it to drop a service - // it'll drop svc1, so it'll still be ready - assert_ready_ok!(pool.poll_ready()); - // and even with another ready, it won't drop svc2 since its now the only service - assert_ready_ok!(pool.poll_ready()); - - // cycling a request should now work on svc2 - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc2, ()).send_response("foo"); - assert_eq!(assert_ready_ok!(fut.poll()), "foo"); - - // and again (still svc2) - svc2.allow(1); - assert_ready_ok!(pool.poll_ready()); - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc2, ()).send_response("foo"); - assert_eq!(assert_ready_ok!(fut.poll()), "foo"); -} - -#[tokio::test] -async fn failing_service() { - // start the pool - let (mock, handle) = mock::pair::<(), load::Constant, usize>>(); - pin_mut!(handle); - - let pool = Builder::new() - .urgency(1.0) // so _any_ Pending will add a service - .underutilized_below(0.0) // so no Ready will remove a service - .build(mock, ()); - - let mut pool = mock::Spawn::new(pool); - - assert_pending!(pool.poll_ready()); - - // give the pool a backing service - let (svc1_m, svc1) = mock::pair(); - pin_mut!(svc1); - - svc1.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0)); - assert_ready_ok!(pool.poll_ready()); - - // one request-response cycle - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc1, ()).send_response("foo"); - assert_eq!(assert_ready_ok!(fut.poll()), "foo"); - - // now make svc1 fail, so it has to be removed - svc1.send_error("ouch"); - // polling now should recognize the failed service, - // try to create a new one, and then realize the maker isn't ready - assert_pending!(pool.poll_ready()); - // then we release another service - let (svc2_m, svc2) = mock::pair(); - pin_mut!(svc2); - - svc2.allow(1); - assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0)); - - // the pool should now be ready again - assert_ready_ok!(pool.poll_ready()); - // and a cycle should work (and go through svc2) - let mut fut = task::spawn(pool.call(())); - - assert_request_eq!(svc2, ()).send_response("bar"); - assert_eq!(assert_ready_ok!(fut.poll()), "bar"); -} diff --git a/tower/tower/src/discover/mod.rs b/tower/tower/src/discover.rs similarity index 94% rename from tower/tower/src/discover/mod.rs rename to tower/tower/src/discover.rs index 4ec9dac..f8e3e83 100644 --- a/tower/tower/src/discover/mod.rs +++ b/tower/tower/src/discover.rs @@ -1,8 +1,3 @@ -mod error; -mod list; - -pub use self::list::ServiceList; - use crate::sealed::Sealed; use futures_core::TryStream; use std::{ @@ -10,6 +5,10 @@ use std::{ task::{Context, Poll}, }; +mod error { + pub enum Never {} +} + pub trait Discover { type Key: Eq; type Service; diff --git a/tower/tower/src/discover/error.rs b/tower/tower/src/discover/error.rs deleted file mode 100644 index cec3de4..0000000 --- a/tower/tower/src/discover/error.rs +++ /dev/null @@ -1,12 +0,0 @@ -use std::{error::Error, fmt}; - -#[derive(Debug)] -pub enum Never {} - -impl fmt::Display for Never { - fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result { - match *self {} - } -} - -impl Error for Never {} diff --git a/tower/tower/src/discover/list.rs b/tower/tower/src/discover/list.rs deleted file mode 100644 index b419802..0000000 --- a/tower/tower/src/discover/list.rs +++ /dev/null @@ -1,61 +0,0 @@ -use super::{error::Never, Change}; -use futures_core::Stream; -use pin_project_lite::pin_project; -use std::iter::{Enumerate, IntoIterator}; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; -use tower_service::Service; - -pin_project! { - /// Static service discovery based on a predetermined list of services. - /// - /// [`ServiceList`] is created with an initial list of services. The discovery - /// process will yield this list once and do nothing after. - #[derive(Debug)] - pub struct ServiceList - where - T: IntoIterator, - { - inner: Enumerate, - } -} - -impl ServiceList -where - T: IntoIterator, -{ - #[allow(missing_docs)] - pub fn new(services: T) -> ServiceList - where - U: Service, - { - ServiceList { - inner: services.into_iter().enumerate(), - } - } -} - -impl Stream for ServiceList -where - T: IntoIterator, -{ - type Item = Result, Never>; - - fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - match self.project().inner.next() { - Some((i, service)) => Poll::Ready(Some(Ok(Change::Insert(i, service)))), - None => Poll::Ready(None), - } - } -} - -// check that List can be directly over collections -#[cfg(test)] -#[allow(dead_code)] -type ListVecTest = ServiceList>; - -#[cfg(test)] -#[allow(dead_code)] -type ListVecIterTest = ServiceList<::std::vec::IntoIter>; diff --git a/tower/tower/src/load/constant.rs b/tower/tower/src/load/constant.rs deleted file mode 100644 index a7c874e..0000000 --- a/tower/tower/src/load/constant.rs +++ /dev/null @@ -1,80 +0,0 @@ -//! A constant [`Load`] implementation. - -#[cfg(feature = "discover")] -use crate::discover::{Change, Discover}; -#[cfg(feature = "discover")] -use futures_core::{ready, Stream}; -#[cfg(feature = "discover")] -use std::pin::Pin; - -use super::Load; -use pin_project_lite::pin_project; -use std::task::{Context, Poll}; -use tower_service::Service; - -pin_project! { - #[derive(Debug)] - /// Wraps a type so that it implements [`Load`] and returns a constant load metric. - /// - /// This load estimator is primarily useful for testing. - pub struct Constant { - inner: T, - load: M, - } -} - -// ===== impl Constant ===== - -impl Constant { - /// Wraps a `T`-typed service with a constant `M`-typed load metric. - pub fn new(inner: T, load: M) -> Self { - Self { inner, load } - } -} - -impl Load for Constant { - type Metric = M; - - fn load(&self) -> M { - self.load - } -} - -impl Service for Constant -where - S: Service, - M: Copy, -{ - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.inner.call(req) - } -} - -/// Proxies [`Discover`] such that all changes are wrapped with a constant load. -#[cfg(feature = "discover")] -#[cfg_attr(docsrs, doc(cfg(feature = "discover")))] -impl Stream for Constant { - type Item = Result>, D::Error>; - - /// Yields the next discovery change set. - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - use self::Change::*; - - let this = self.project(); - let change = match ready!(Pin::new(this.inner).poll_discover(cx)).transpose()? { - None => return Poll::Ready(None), - Some(Insert(k, svc)) => Insert(k, Constant::new(svc, *this.load)), - Some(Remove(k)) => Remove(k), - }; - - Poll::Ready(Some(Ok(change))) - } -} diff --git a/tower/tower/src/load/mod.rs b/tower/tower/src/load/mod.rs index 2f9a373..85b17be 100644 --- a/tower/tower/src/load/mod.rs +++ b/tower/tower/src/load/mod.rs @@ -1,72 +1,9 @@ -//! Service load measurement -//! -//! This module provides the [`Load`] trait, which allows measuring how loaded a service is. -//! It also provides several wrapper types that measure load in different ways: -//! -//! - [`Constant`] — Always returns the same constant load value for a service. -//! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests. -//! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service. -//! -//! In general, you will want to use one of these when using the types in [`tower::balance`] which -//! balance services depending on their load. Which load metric to use depends on your exact -//! use-case, but the ones above should get you quite far! -//! -//! When the `discover` feature is enabled, wrapper types for [`Discover`] that -//! wrap the discovered services with the given load estimator are also provided. -//! -//! # When does a request complete? -//! -//! For many applications, the request life-cycle is relatively simple: when a service responds to -//! a request, that request is done, and the system can forget about it. However, for some -//! applications, the service may respond to the initial request while other parts of the system -//! are still acting on that request. In such an application, the system load must take these -//! requests into account as well, or risk the system underestimating its own load. -//! -//! To support these use-cases, the load estimators in this module are parameterized by the -//! [`TrackCompletion`] trait, with [`CompleteOnResponse`] as the default type. The behavior of -//! [`CompleteOnResponse`] is what you would normally expect for a request-response cycle: when the -//! response is produced, the request is considered "finished", and load goes down. This can be -//! overriden by your own user-defined type to track more complex request completion semantics. See -//! the documentation for [`completion`] for more details. -//! -//! # Examples -//! -//! ```rust -//! # #[cfg(feature = "util")] -//! use tower::util::ServiceExt; -//! # #[cfg(feature = "util")] -//! use tower::{load::Load, Service}; -//! # #[cfg(feature = "util")] -//! async fn simple_balance( -//! svc1: &mut S1, -//! svc2: &mut S2, -//! request: R -//! ) -> Result -//! where -//! S1: Load + Service, -//! S2: Load + Service -//! { -//! if svc1.load() < svc2.load() { -//! svc1.ready().await?.call(request).await -//! } else { -//! svc2.ready().await?.call(request).await -//! } -//! } -//! ``` -//! -//! [`tower::balance`]: crate::balance -//! [`Discover`]: crate::discover::Discover -//! [`CompleteOnResponse`]: crate::load::completion::CompleteOnResponse -// TODO: a custom completion example would be good here - pub mod completion; -mod constant; pub mod peak_ewma; pub mod pending_requests; pub use self::{ completion::{CompleteOnResponse, TrackCompletion}, - constant::Constant, peak_ewma::PeakEwma, pending_requests::PendingRequests, }; @@ -74,16 +11,7 @@ pub use self::{ #[cfg(feature = "discover")] pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover}; -/// Types that implement this trait can give an estimate of how loaded they are. -/// -/// See the module documentation for more details. pub trait Load { - /// A comparable load metric. - /// - /// Lesser values indicate that the service is less loaded, and should be preferred for new - /// requests over another service with a higher value. type Metric: PartialOrd; - - /// Estimate the service's current load. fn load(&self) -> Self::Metric; }