This commit is contained in:
nora 2022-09-15 21:27:27 +02:00
parent 40a6bd3bce
commit 9f18e8cb03
15 changed files with 53 additions and 947 deletions

View file

@ -1,37 +1,64 @@
//! This module defines a load-balanced pool of services that adds new services when load is high.
//!
//! The pool uses `poll_ready` as a signal indicating whether additional services should be spawned
//! to handle the current level of load. Specifically, every time `poll_ready` on the inner service
//! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `Pending`, [`Pool`]
//! considers it a 1. [`Pool`] then maintains an [exponential moving
//! average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) over those
//! samples, which gives an estimate of how often the underlying service has been ready when it was
//! needed "recently" (see [`Builder::urgency`]). If the service is loaded (see
//! [`Builder::loaded_above`]), a new service is created and added to the underlying [`Balance`].
//! If the service is underutilized (see [`Builder::underutilized_below`]) and there are two or
//! more services, then the latest added service is removed. In either case, the load estimate is
//! reset to its initial value (see [`Builder::initial`] to prevent services from being rapidly
//! added or removed.
#![deny(missing_docs)]
use super::p2c::Balance;
use crate::discover::Change;
use crate::discover::{Change, Discover};
use crate::load::Load;
use crate::make::MakeService;
use futures_core::{ready, Stream};
use futures_core::ready;
use futures_core::Stream;
use futures_util::future::{self, TryFutureExt};
use pin_project_lite::pin_project;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use slab::Slab;
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};
pub struct Balance<D, Req> {
_req: PhantomData<(D, Req)>,
}
impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
pub fn new(discover: D) -> Self {
todo!()
}
}
impl<D, Req> Service<Req> for Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Hash + Clone,
D::Error: Into<crate::BoxError>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Response = <D::Service as Service<Req>>::Response;
type Error = crate::BoxError;
type Future = future::MapErr<
<D::Service as Service<Req>>::Future,
fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
fn call(&mut self, request: Req) -> Self::Future {
todo!()
}
}
/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
@ -50,16 +77,9 @@ where
}
}
/// A [builder] that lets you configure how a [`Pool`] determines whether the underlying service is
/// loaded or not. See the [module-level documentation](self) and the builder's methods for
/// details.
///
/// [builder]: https://rust-lang-nursery.github.io/api-guidelines/type-safety.html#builders-enable-construction-of-complex-values-c-builder
#[derive(Copy, Clone, Debug)]
pub struct Builder {}
impl Builder {
/// See [`Pool::new`].
pub fn build<MS, Target, Request>() -> ()
where
MS: MakeService<Target, Request>,
@ -76,9 +96,7 @@ impl Builder {
}
}
/// A dynamically sized, load-balanced pool of `Service` instances.
pub struct Pool<MS, Target, Request> {
// the Pin<Box<_>> here is needed since Balance requires the Service to be Unpin
balance: (MS, Target, Request),
}
@ -106,8 +124,6 @@ where
}
}
#[doc(hidden)]
#[derive(Debug)]
pub struct DropNotifyService<Svc> {
svc: Svc,
id: usize,

View file

@ -1,21 +0,0 @@
//! Error types for the [`tower::balance`] middleware.
//!
//! [`tower::balance`]: crate::balance
use std::fmt;
/// The balancer's endpoint discovery stream failed.
#[derive(Debug)]
pub struct Discover(pub(crate) crate::BoxError);
impl fmt::Display for Discover {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "load balancer discovery error: {}", self.0)
}
}
impl std::error::Error for Discover {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&*self.0)
}
}

View file

@ -1,61 +0,0 @@
//! Middleware that allows balancing load among multiple services.
//!
//! In larger systems, multiple endpoints are often available for a given service. As load
//! increases, you want to ensure that that load is spread evenly across the available services.
//! Otherwise, clients could see spikes in latency if their request goes to a particularly loaded
//! service, even when spare capacity is available to handle that request elsewhere.
//!
//! This module provides two pieces of middleware that helps with this type of load balancing:
//!
//! First, [`p2c`] implements the "[Power of Two Random Choices]" algorithm, a simple but robust
//! technique for spreading load across services with only inexact load measurements. Use this if
//! the set of available services is not within your control, and you simply want to spread load
//! among that set of services.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//!
//! Second, [`pool`] implements a dynamically sized pool of services. It estimates the overall
//! current load by tracking successful and unsuccessful calls to [`poll_ready`], and uses an
//! exponentially weighted moving average to add (using [`MakeService`]) or remove (by dropping)
//! services in response to increases or decreases in load. Use this if you are able to
//! dynamically add more service endpoints to the system to handle added load.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! # #[cfg(feature = "load")]
//! # fn warnings_are_errors() {
//! use tower::balance::p2c::Balance;
//! use tower::load::Load;
//! use tower::{Service, ServiceExt};
//! use futures_util::pin_mut;
//! # use futures_core::Stream;
//! # use futures_util::StreamExt;
//!
//! async fn spread<Req, S: Service<Req> + Load>(svc1: S, svc2: S, reqs: impl Stream<Item = Req>)
//! where
//! S::Error: Into<tower::BoxError>,
//! # // this bound is pretty unfortunate, and the compiler does _not_ help
//! S::Metric: std::fmt::Debug,
//! {
//! // Spread load evenly across the two services
//! let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2]));
//!
//! // Issue all the requests that come in.
//! // Some will go to svc1, some will go to svc2.
//! pin_mut!(reqs);
//! let mut responses = p2c.call_all(reqs);
//! while let Some(rsp) = responses.next().await {
//! // ...
//! }
//! }
//! # }
//! ```
//!
//! [`MakeService`]: crate::MakeService
//! [`poll_ready`]: crate::Service::poll_ready
pub mod error;
pub mod p2c;
pub mod pool;

View file

@ -1,60 +0,0 @@
use super::MakeBalance;
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
/// Construct load balancers ([`Balance`]) over dynamic service sets ([`Discover`]) produced by the
/// "inner" service in response to requests coming from the "outer" service.
///
/// This construction may seem a little odd at first glance. This is not a layer that takes
/// requests and produces responses in the traditional sense. Instead, it is more like
/// [`MakeService`] in that it takes service _descriptors_ (see `Target` on [`MakeService`])
/// and produces _services_. Since [`Balance`] spreads requests across a _set_ of services,
/// the inner service should produce a [`Discover`], not just a single
/// [`Service`], given a service descriptor.
///
/// See the [module-level documentation](crate::balance) for details on load balancing.
///
/// [`Balance`]: crate::balance::p2c::Balance
/// [`Discover`]: crate::discover::Discover
/// [`MakeService`]: crate::MakeService
/// [`Service`]: crate::Service
pub struct MakeBalanceLayer<D, Req> {
_marker: PhantomData<fn(D, Req)>,
}
impl<D, Req> MakeBalanceLayer<D, Req> {
/// Build balancers using operating system entropy.
pub fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
impl<D, Req> Default for MakeBalanceLayer<D, Req> {
fn default() -> Self {
Self::new()
}
}
impl<D, Req> Clone for MakeBalanceLayer<D, Req> {
fn clone(&self) -> Self {
Self {
_marker: PhantomData,
}
}
}
impl<S, Req> Layer<S> for MakeBalanceLayer<S, Req> {
type Service = MakeBalance<S, Req>;
fn layer(&self, make_discover: S) -> Self::Service {
MakeBalance::new(make_discover)
}
}
impl<D, Req> fmt::Debug for MakeBalanceLayer<D, Req> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MakeBalanceLayer").finish()
}
}

View file

@ -1,125 +0,0 @@
use super::Balance;
use crate::discover::Discover;
use futures_core::ready;
use pin_project_lite::pin_project;
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
/// Constructs load balancers over dynamic service sets produced by a wrapped "inner" service.
///
/// This is effectively an implementation of [`MakeService`] except that it forwards the service
/// descriptors (`Target`) to an inner service (`S`), and expects that service to produce a
/// service set in the form of a [`Discover`]. It then wraps the service set in a [`Balance`]
/// before returning it as the "made" service.
///
/// See the [module-level documentation](crate::balance) for details on load balancing.
///
/// [`MakeService`]: crate::MakeService
/// [`Discover`]: crate::discover::Discover
/// [`Balance`]: crate::balance::p2c::Balance
pub struct MakeBalance<S, Req> {
inner: S,
_marker: PhantomData<fn(Req)>,
}
pin_project! {
/// A [`Balance`] in the making.
///
/// [`Balance`]: crate::balance::p2c::Balance
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
_marker: PhantomData<fn(Req)>,
}
}
impl<S, Req> MakeBalance<S, Req> {
/// Build balancers using operating system entropy.
pub fn new(make_discover: S) -> Self {
Self {
inner: make_discover,
_marker: PhantomData,
}
}
}
impl<S, Req> Clone for MakeBalance<S, Req>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_marker: PhantomData,
}
}
}
impl<S, Target, Req> Service<Target> for MakeBalance<S, Req>
where
S: Service<Target>,
S::Response: Discover,
<S::Response as Discover>::Key: Hash,
<S::Response as Discover>::Service: Service<Req>,
<<S::Response as Discover>::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Response = Balance<S::Response, Req>;
type Error = S::Error;
type Future = MakeFuture<S::Future, Req>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, target: Target) -> Self::Future {
MakeFuture {
inner: self.inner.call(target),
_marker: PhantomData,
}
}
}
impl<S, Req> fmt::Debug for MakeBalance<S, Req>
where
S: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let Self { inner, _marker } = self;
f.debug_struct("MakeBalance").field("inner", inner).finish()
}
}
impl<F, T, E, Req> Future for MakeFuture<F, Req>
where
F: Future<Output = Result<T, E>>,
T: Discover,
<T as Discover>::Key: Hash,
<T as Discover>::Service: Service<Req>,
<<T as Discover>::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Output = Result<Balance<T, Req>, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let inner = ready!(this.inner.poll(cx))?;
let svc = Balance::new(inner);
Poll::Ready(Ok(svc))
}
}
impl<F, Req> fmt::Debug for MakeFuture<F, Req>
where
F: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let Self { inner, _marker } = self;
f.debug_struct("MakeFuture").field("inner", inner).finish()
}
}

View file

@ -1,41 +0,0 @@
//! This module implements the "[Power of Two Random Choices]" load balancing algorithm.
//!
//! It is a simple but robust technique for spreading load across services with only inexact load
//! measurements. As its name implies, whenever a request comes in, it samples two ready services
//! at random, and issues the request to whichever service is less loaded. How loaded a service is
//! is determined by the return value of [`Load`](crate::load::Load).
//!
//! As described in the [Finagle Guide][finagle]:
//!
//! > The algorithm randomly picks two services from the set of ready endpoints and
//! > selects the least loaded of the two. By repeatedly using this strategy, we can
//! > expect a manageable upper bound on the maximum load of any server.
//! >
//! > The maximum load variance between any two servers is bound by `ln(ln(n))` where
//! > `n` is the number of servers in the cluster.
//!
//! The balance service and layer implementations rely on _service discovery_ to provide the
//! underlying set of services to balance requests across. This happens through the
//! [`Discover`](crate::discover::Discover) trait, which is essentially a [`Stream`] that indicates
//! when services become available or go away. If you have a fixed set of services, consider using
//! [`ServiceList`](crate::discover::ServiceList).
//!
//! Since the load balancer needs to perform _random_ choices, the constructors in this module
//! usually come in two forms: one that uses randomness provided by the operating system, and one
//! that lets you specify the random seed to use. Usually the former is what you'll want, though
//! the latter may come in handy for reproducability or to reduce reliance on the operating system.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
mod layer;
mod make;
mod service;
#[cfg(test)]
mod test;
pub use layer::MakeBalanceLayer;
pub use make::{MakeBalance, MakeFuture};
pub use service::Balance;

View file

@ -1,58 +0,0 @@
use super::super::error;
use crate::discover::{Change, Discover};
use crate::load::Load;
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project_lite::pin_project;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};
pub struct Balance<D, Req> {
_req: PhantomData<(D, Req)>,
}
impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
pub fn new(discover: D) -> Self {
todo!()
}
}
impl<D, Req> Service<Req> for Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Hash + Clone,
D::Error: Into<crate::BoxError>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Response = <D::Service as Service<Req>>::Response;
type Error = crate::BoxError;
type Future = future::MapErr<
<D::Service as Service<Req>>::Future,
fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
fn call(&mut self, request: Req) -> Self::Future {
todo!()
}
}

View file

@ -1,125 +0,0 @@
use crate::discover::ServiceList;
use crate::load;
use futures_util::pin_mut;
use std::task::Poll;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_test::{assert_request_eq, mock};
use super::*;
#[tokio::test]
async fn empty() {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = mock::Spawn::new(Balance::new(disco));
assert_pending!(svc.poll_ready());
}
#[tokio::test]
async fn single_endpoint() {
let (mut svc, mut handle) = mock::spawn_with(|s| {
let mock = load::Constant::new(s, 0);
let disco = ServiceList::new(vec![mock].into_iter());
Balance::new(disco)
});
handle.allow(0);
assert_pending!(svc.poll_ready());
assert_eq!(
svc.get_ref().len(),
1,
"balancer must have discovered endpoint"
);
handle.allow(1);
assert_ready_ok!(svc.poll_ready());
let mut fut = task::spawn(svc.call(()));
assert_request_eq!(handle, ()).send_response(1);
assert_eq!(assert_ready_ok!(fut.poll()), 1);
handle.allow(1);
assert_ready_ok!(svc.poll_ready());
handle.send_error("endpoint lost");
assert_pending!(svc.poll_ready());
assert!(
svc.get_ref().is_empty(),
"balancer must drop failed endpoints"
);
}
#[tokio::test]
async fn two_endpoints_with_equal_load() {
let (mock_a, handle_a) = mock::pair();
let (mock_b, handle_b) = mock::pair();
let mock_a = load::Constant::new(mock_a, 1);
let mock_b = load::Constant::new(mock_b, 1);
pin_mut!(handle_a);
pin_mut!(handle_b);
let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = mock::Spawn::new(Balance::new(disco));
handle_a.allow(0);
handle_b.allow(0);
assert_pending!(svc.poll_ready());
assert_eq!(
svc.get_ref().len(),
2,
"balancer must have discovered both endpoints"
);
handle_a.allow(1);
handle_b.allow(0);
assert_ready_ok!(
svc.poll_ready(),
"must be ready when one of two services is ready"
);
{
let mut fut = task::spawn(svc.call(()));
assert_request_eq!(handle_a, ()).send_response("a");
assert_eq!(assert_ready_ok!(fut.poll()), "a");
}
handle_a.allow(0);
handle_b.allow(1);
assert_ready_ok!(
svc.poll_ready(),
"must be ready when both endpoints are ready"
);
{
let mut fut = task::spawn(svc.call(()));
assert_request_eq!(handle_b, ()).send_response("b");
assert_eq!(assert_ready_ok!(fut.poll()), "b");
}
handle_a.allow(1);
handle_b.allow(1);
for _ in 0..2 {
assert_ready_ok!(
svc.poll_ready(),
"must be ready when both endpoints are ready"
);
let mut fut = task::spawn(svc.call(()));
for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request() {
tracing::info!("using {}", c);
tx.send_response(c);
h.allow(0);
}
}
assert_ready_ok!(fut.poll());
}
handle_a.send_error("endpoint lost");
assert_pending!(svc.poll_ready());
assert_eq!(
svc.get_ref().len(),
1,
"balancer must drop failed endpoints",
);
}

View file

@ -1,190 +0,0 @@
use crate::load;
use futures_util::pin_mut;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_test::{assert_request_eq, mock};
use super::*;
#[tokio::test]
async fn basic() {
// start the pool
let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let mut pool = mock::Spawn::new(Builder::new().build(mock, ()));
assert_pending!(pool.poll_ready());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready());
// send a request to the one backing service
let mut fut = task::spawn(pool.call(()));
assert_pending!(fut.poll());
assert_request_eq!(svc1, ()).send_response("foobar");
assert_eq!(assert_ready_ok!(fut.poll()), "foobar");
}
#[tokio::test]
async fn high_load() {
// start the pool
let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let pool = Builder::new()
.urgency(1.0) // so _any_ Pending will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.max_services(Some(2))
.build(mock, ());
let mut pool = mock::Spawn::new(pool);
assert_pending!(pool.poll_ready());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready());
// make the one backing service not ready
let mut fut1 = task::spawn(pool.call(()));
// if we poll_ready again, pool should notice that load is increasing
// since urgency == 1.0, it should immediately enter high load
assert_pending!(pool.poll_ready());
// it should ask the maker for another service, so we give it one
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again for one more request
assert_ready_ok!(pool.poll_ready());
let mut fut2 = task::spawn(pool.call(()));
assert_pending!(pool.poll_ready());
// the pool should _not_ try to add another service
// sicen we have max_services(2)
assert_pending!(handle.as_mut().poll_request());
// let see that each service got one request
assert_request_eq!(svc1, ()).send_response("foo");
assert_request_eq!(svc2, ()).send_response("bar");
assert_eq!(assert_ready_ok!(fut1.poll()), "foo");
assert_eq!(assert_ready_ok!(fut2.poll()), "bar");
}
#[tokio::test]
async fn low_load() {
// start the pool
let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let pool = Builder::new()
.urgency(1.0) // so any event will change the service count
.build(mock, ());
let mut pool = mock::Spawn::new(pool);
assert_pending!(pool.poll_ready());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready());
// cycling a request should now work
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc1, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll()), "foo");
// and pool should now not be ready (since svc1 isn't ready)
// it should immediately try to add another service
// which we give it
assert_pending!(pool.poll_ready());
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// pool is now ready
// which (because of urgency == 1.0) should immediately cause it to drop a service
// it'll drop svc1, so it'll still be ready
assert_ready_ok!(pool.poll_ready());
// and even with another ready, it won't drop svc2 since its now the only service
assert_ready_ok!(pool.poll_ready());
// cycling a request should now work on svc2
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc2, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll()), "foo");
// and again (still svc2)
svc2.allow(1);
assert_ready_ok!(pool.poll_ready());
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc2, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll()), "foo");
}
#[tokio::test]
async fn failing_service() {
// start the pool
let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let pool = Builder::new()
.urgency(1.0) // so _any_ Pending will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.build(mock, ());
let mut pool = mock::Spawn::new(pool);
assert_pending!(pool.poll_ready());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready());
// one request-response cycle
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc1, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll()), "foo");
// now make svc1 fail, so it has to be removed
svc1.send_error("ouch");
// polling now should recognize the failed service,
// try to create a new one, and then realize the maker isn't ready
assert_pending!(pool.poll_ready());
// then we release another service
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again
assert_ready_ok!(pool.poll_ready());
// and a cycle should work (and go through svc2)
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc2, ()).send_response("bar");
assert_eq!(assert_ready_ok!(fut.poll()), "bar");
}

View file

@ -1,8 +1,3 @@
mod error;
mod list;
pub use self::list::ServiceList;
use crate::sealed::Sealed;
use futures_core::TryStream;
use std::{
@ -10,6 +5,10 @@ use std::{
task::{Context, Poll},
};
mod error {
pub enum Never {}
}
pub trait Discover {
type Key: Eq;
type Service;

View file

@ -1,12 +0,0 @@
use std::{error::Error, fmt};
#[derive(Debug)]
pub enum Never {}
impl fmt::Display for Never {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
match *self {}
}
}
impl Error for Never {}

View file

@ -1,61 +0,0 @@
use super::{error::Never, Change};
use futures_core::Stream;
use pin_project_lite::pin_project;
use std::iter::{Enumerate, IntoIterator};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
pin_project! {
/// Static service discovery based on a predetermined list of services.
///
/// [`ServiceList`] is created with an initial list of services. The discovery
/// process will yield this list once and do nothing after.
#[derive(Debug)]
pub struct ServiceList<T>
where
T: IntoIterator,
{
inner: Enumerate<T::IntoIter>,
}
}
impl<T, U> ServiceList<T>
where
T: IntoIterator<Item = U>,
{
#[allow(missing_docs)]
pub fn new<Request>(services: T) -> ServiceList<T>
where
U: Service<Request>,
{
ServiceList {
inner: services.into_iter().enumerate(),
}
}
}
impl<T, U> Stream for ServiceList<T>
where
T: IntoIterator<Item = U>,
{
type Item = Result<Change<usize, U>, Never>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().inner.next() {
Some((i, service)) => Poll::Ready(Some(Ok(Change::Insert(i, service)))),
None => Poll::Ready(None),
}
}
}
// check that List can be directly over collections
#[cfg(test)]
#[allow(dead_code)]
type ListVecTest<T> = ServiceList<Vec<T>>;
#[cfg(test)]
#[allow(dead_code)]
type ListVecIterTest<T> = ServiceList<::std::vec::IntoIter<T>>;

View file

@ -1,80 +0,0 @@
//! A constant [`Load`] implementation.
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use std::pin::Pin;
use super::Load;
use pin_project_lite::pin_project;
use std::task::{Context, Poll};
use tower_service::Service;
pin_project! {
#[derive(Debug)]
/// Wraps a type so that it implements [`Load`] and returns a constant load metric.
///
/// This load estimator is primarily useful for testing.
pub struct Constant<T, M> {
inner: T,
load: M,
}
}
// ===== impl Constant =====
impl<T, M: Copy> Constant<T, M> {
/// Wraps a `T`-typed service with a constant `M`-typed load metric.
pub fn new(inner: T, load: M) -> Self {
Self { inner, load }
}
}
impl<T, M: Copy + PartialOrd> Load for Constant<T, M> {
type Metric = M;
fn load(&self) -> M {
self.load
}
}
impl<S, M, Request> Service<Request> for Constant<S, M>
where
S: Service<Request>,
M: Copy,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
self.inner.call(req)
}
}
/// Proxies [`Discover`] such that all changes are wrapped with a constant load.
#[cfg(feature = "discover")]
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
impl<D: Discover + Unpin, M: Copy> Stream for Constant<D, M> {
type Item = Result<Change<D::Key, Constant<D::Service, M>>, D::Error>;
/// Yields the next discovery change set.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use self::Change::*;
let this = self.project();
let change = match ready!(Pin::new(this.inner).poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Insert(k, svc)) => Insert(k, Constant::new(svc, *this.load)),
Some(Remove(k)) => Remove(k),
};
Poll::Ready(Some(Ok(change)))
}
}

View file

@ -1,72 +1,9 @@
//! Service load measurement
//!
//! This module provides the [`Load`] trait, which allows measuring how loaded a service is.
//! It also provides several wrapper types that measure load in different ways:
//!
//! - [`Constant`] — Always returns the same constant load value for a service.
//! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests.
//! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service.
//!
//! In general, you will want to use one of these when using the types in [`tower::balance`] which
//! balance services depending on their load. Which load metric to use depends on your exact
//! use-case, but the ones above should get you quite far!
//!
//! When the `discover` feature is enabled, wrapper types for [`Discover`] that
//! wrap the discovered services with the given load estimator are also provided.
//!
//! # When does a request complete?
//!
//! For many applications, the request life-cycle is relatively simple: when a service responds to
//! a request, that request is done, and the system can forget about it. However, for some
//! applications, the service may respond to the initial request while other parts of the system
//! are still acting on that request. In such an application, the system load must take these
//! requests into account as well, or risk the system underestimating its own load.
//!
//! To support these use-cases, the load estimators in this module are parameterized by the
//! [`TrackCompletion`] trait, with [`CompleteOnResponse`] as the default type. The behavior of
//! [`CompleteOnResponse`] is what you would normally expect for a request-response cycle: when the
//! response is produced, the request is considered "finished", and load goes down. This can be
//! overriden by your own user-defined type to track more complex request completion semantics. See
//! the documentation for [`completion`] for more details.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! use tower::util::ServiceExt;
//! # #[cfg(feature = "util")]
//! use tower::{load::Load, Service};
//! # #[cfg(feature = "util")]
//! async fn simple_balance<S1, S2, R>(
//! svc1: &mut S1,
//! svc2: &mut S2,
//! request: R
//! ) -> Result<S1::Response, S1::Error>
//! where
//! S1: Load + Service<R>,
//! S2: Load<Metric = S1::Metric> + Service<R, Response = S1::Response, Error = S1::Error>
//! {
//! if svc1.load() < svc2.load() {
//! svc1.ready().await?.call(request).await
//! } else {
//! svc2.ready().await?.call(request).await
//! }
//! }
//! ```
//!
//! [`tower::balance`]: crate::balance
//! [`Discover`]: crate::discover::Discover
//! [`CompleteOnResponse`]: crate::load::completion::CompleteOnResponse
// TODO: a custom completion example would be good here
pub mod completion;
mod constant;
pub mod peak_ewma;
pub mod pending_requests;
pub use self::{
completion::{CompleteOnResponse, TrackCompletion},
constant::Constant,
peak_ewma::PeakEwma,
pending_requests::PendingRequests,
};
@ -74,16 +11,7 @@ pub use self::{
#[cfg(feature = "discover")]
pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover};
/// Types that implement this trait can give an estimate of how loaded they are.
///
/// See the module documentation for more details.
pub trait Load {
/// A comparable load metric.
///
/// Lesser values indicate that the service is less loaded, and should be preferred for new
/// requests over another service with a higher value.
type Metric: PartialOrd;
/// Estimate the service's current load.
fn load(&self) -> Self::Metric;
}