tower lol

This commit is contained in:
nora 2022-09-15 21:21:20 +02:00
parent 1e415960ec
commit 40a6bd3bce
49 changed files with 4626 additions and 1 deletions

1
tower

@ -1 +0,0 @@
Subproject commit 4dc34c8d57b1e448c29cccb9f91468f6105ae461

2
tower/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
target
Cargo.lock

8
tower/Cargo.toml Normal file
View file

@ -0,0 +1,8 @@
[workspace]
members = [
"tower",
"tower-layer",
"tower-service",
"tower-test",
]

25
tower/LICENSE Normal file
View file

@ -0,0 +1,25 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

53
tower/README.md Normal file
View file

@ -0,0 +1,53 @@
# Tower
Tower is a library of modular and reusable components for building robust
networking clients and servers.
[![Crates.io][crates-badge]][crates-url]
[![Documentation][docs-badge]][docs-url]
[![Documentation (master)][docs-master-badge]][docs-master-url]
[![MIT licensed][mit-badge]][mit-url]
[![Build Status][actions-badge]][actions-url]
[![Discord chat][discord-badge]][discord-url]
[crates-badge]: https://img.shields.io/crates/v/tower.svg
[crates-url]: https://crates.io/crates/tower
[docs-badge]: https://docs.rs/tower/badge.svg
[docs-url]: https://docs.rs/tower
[docs-master-badge]: https://img.shields.io/badge/docs-master-blue
[docs-master-url]: https://tower-rs.github.io/tower/tower
[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg
[mit-url]: LICENSE
[actions-badge]: https://github.com/tower-rs/tower/workflows/CI/badge.svg
[actions-url]:https://github.com/tower-rs/tower/actions?query=workflow%3ACI
[discord-badge]: https://img.shields.io/discord/500028886025895936?logo=discord&label=discord&logoColor=white
[discord-url]: https://discord.gg/EeF3cQw
## Overview
Tower aims to make it as easy as possible to build robust networking clients and
servers. It is protocol agnostic, but is designed around a request / response
pattern. If your protocol is entirely stream based, Tower may not be a good fit.
## Supported Rust Versions
Tower will keep a rolling MSRV (minimum supported Rust version) policy of **at
least** 6 months. When increasing the MSRV, the new Rust version must have been
released at least six months ago. The current MSRV is 1.49.0.
## Getting Started
If you're brand new to Tower and want to start with the basics we recommend you
check out some of our [guides].
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.
[guides]: https://github.com/tower-rs/tower/tree/master/guides

23
tower/deny.toml Normal file
View file

@ -0,0 +1,23 @@
[advisories]
vulnerability = "deny"
unmaintained = "warn"
notice = "warn"
ignore = ["RUSTSEC-2020-0159"]
[licenses]
unlicensed = "deny"
allow = []
deny = []
copyleft = "warn"
allow-osi-fsf-free = "either"
confidence-threshold = 0.8
[bans]
multiple-versions = "deny"
highlight = "all"
skip = []
[sources]
unknown-registry = "warn"
unknown-git = "warn"
allow-git = []

8
tower/netlify.toml Normal file
View file

@ -0,0 +1,8 @@
[build]
command = "rustup install nightly --profile minimal && cargo doc --features=full --no-deps && cp -r target/doc _netlify_out"
environment = { RUSTDOCFLAGS= "--cfg docsrs" }
publish = "_netlify_out"
[[redirects]]
from = "/"
to = "/tower"

View file

@ -0,0 +1,30 @@
# 0.3.1 (January 7, 2021)
### Added
- Added `layer_fn`, for constructing a `Layer` from a function taking
a `Service` and returning a different `Service` ([#491])
- Added an implementation of `Layer` for `&Layer` ([#446])
- Multiple documentation improvements ([#487], [#490])
[#491]: https://github.com/tower-rs/tower/pull/491
[#446]: https://github.com/tower-rs/tower/pull/446
[#487]: https://github.com/tower-rs/tower/pull/487
[#490]: https://github.com/tower-rs/tower/pull/490
# 0.3.0 (November 29, 2019)
- Move layer builder from `tower-util` to tower-layer.
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.0 (April 26, 2019)
- Initial release

View file

@ -0,0 +1,26 @@
[package]
name = "tower-layer"
# When releasing to crates.io:
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.3.1"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-layer/0.3.0-alpha.2"
description = """
Decorates a `Service` to allow easy composition between `Service`s.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
[dev-dependencies]
tower-service = { version = "0.3.0", path = "../tower-service" }
tower = { version = "0.4", path = "../tower" }

25
tower/tower-layer/LICENSE Normal file
View file

@ -0,0 +1,25 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View file

@ -0,0 +1,43 @@
# Tower Layer
Decorates a [Tower] `Service`, transforming either the request or the response.
[![Crates.io][crates-badge]][crates-url]
[![Documentation][docs-badge]][docs-url]
[![Documentation (master)][docs-master-badge]][docs-master-url]
[![MIT licensed][mit-badge]][mit-url]
[![Build Status][actions-badge]][actions-url]
[![Discord chat][discord-badge]][discord-url]
[crates-badge]: https://img.shields.io/crates/v/tower-layer.svg
[crates-url]: https://crates.io/crates/tower-layer
[docs-badge]: https://docs.rs/tower-layer/badge.svg
[docs-url]: https://docs.rs/tower-layer
[docs-master-badge]: https://img.shields.io/badge/docs-master-blue
[docs-master-url]: https://tower-rs.github.io/tower/tower_layer
[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg
[mit-url]: LICENSE
[actions-badge]: https://github.com/tower-rs/tower/workflows/CI/badge.svg
[actions-url]:https://github.com/tower-rs/tower/actions?query=workflow%3ACI
[discord-badge]: https://img.shields.io/discord/500028886025895936?logo=discord&label=discord&logoColor=white
[discord-url]: https://discord.gg/EeF3cQw
## Overview
Often, many of the pieces needed for writing network applications can be
reused across multiple services. The `Layer` trait can be used to write
reusable components that can be applied to very different kinds of services;
for example, it can be applied to services operating on different protocols,
and to both the client and server side of a network transaction.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.
[Tower]: https://crates.io/crates/tower

View file

@ -0,0 +1,37 @@
use super::Layer;
use std::fmt;
/// A no-op middleware.
///
/// When wrapping a [`Service`], the [`Identity`] layer returns the provided
/// service without modifying it.
///
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
#[derive(Default, Clone)]
pub struct Identity {
_p: (),
}
impl Identity {
/// Create a new [`Identity`] value
pub fn new() -> Identity {
Identity { _p: () }
}
}
/// Decorates a [`Service`], transforming either the request or the response.
///
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
impl<S> Layer<S> for Identity {
type Service = S;
fn layer(&self, inner: S) -> Self::Service {
inner
}
}
impl fmt::Debug for Identity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Identity").finish()
}
}

View file

@ -0,0 +1,114 @@
use super::Layer;
use std::fmt;
/// Returns a new [`LayerFn`] that implements [`Layer`] by calling the
/// given function.
///
/// The [`Layer::layer`] method takes a type implementing [`Service`] and
/// returns a different type implementing [`Service`]. In many cases, this can
/// be implemented by a function or a closure. The [`LayerFn`] helper allows
/// writing simple [`Layer`] implementations without needing the boilerplate of
/// a new struct implementing [`Layer`].
///
/// # Example
/// ```rust
/// # use tower::Service;
/// # use std::task::{Poll, Context};
/// # use tower_layer::{Layer, layer_fn};
/// # use std::fmt;
/// # use std::convert::Infallible;
/// #
/// // A middleware that logs requests before forwarding them to another service
/// pub struct LogService<S> {
/// target: &'static str,
/// service: S,
/// }
///
/// impl<S, Request> Service<Request> for LogService<S>
/// where
/// S: Service<Request>,
/// Request: fmt::Debug,
/// {
/// type Response = S::Response;
/// type Error = S::Error;
/// type Future = S::Future;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// self.service.poll_ready(cx)
/// }
///
/// fn call(&mut self, request: Request) -> Self::Future {
/// // Log the request
/// println!("request = {:?}, target = {:?}", request, self.target);
///
/// self.service.call(request)
/// }
/// }
///
/// // A `Layer` that wraps services in `LogService`
/// let log_layer = layer_fn(|service| {
/// LogService {
/// service,
/// target: "tower-docs",
/// }
/// });
///
/// // An example service. This one uppercases strings
/// let uppercase_service = tower::service_fn(|request: String| async move {
/// Ok::<_, Infallible>(request.to_uppercase())
/// });
///
/// // Wrap our service in a `LogService` so requests are logged.
/// let wrapped_service = log_layer.layer(uppercase_service);
/// ```
///
/// [`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
/// [`Layer::layer`]: crate::Layer::layer
pub fn layer_fn<T>(f: T) -> LayerFn<T> {
LayerFn { f }
}
/// A `Layer` implemented by a closure. See the docs for [`layer_fn`] for more details.
#[derive(Clone, Copy)]
pub struct LayerFn<F> {
f: F,
}
impl<F, S, Out> Layer<S> for LayerFn<F>
where
F: Fn(S) -> Out,
{
type Service = Out;
fn layer(&self, inner: S) -> Self::Service {
(self.f)(inner)
}
}
impl<F> fmt::Debug for LayerFn<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LayerFn")
.field("f", &format_args!("{}", std::any::type_name::<F>()))
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[allow(dead_code)]
#[test]
fn layer_fn_has_useful_debug_impl() {
struct WrappedService<S> {
inner: S,
}
let layer = layer_fn(|svc| WrappedService { inner: svc });
let _svc = layer.layer("foo");
assert_eq!(
"LayerFn { f: tower_layer::layer_fn::tests::layer_fn_has_useful_debug_impl::{{closure}} }".to_string(),
format!("{:?}", layer),
);
}
}

View file

@ -0,0 +1,111 @@
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![forbid(unsafe_code)]
// `rustdoc::broken_intra_doc_links` is checked on CI
//! Layer traits and extensions.
//!
//! A layer decorates an service and provides additional functionality. It
//! allows other services to be composed with the service that implements layer.
//!
//! A middleware implements the [`Layer`] and [`Service`] trait.
//!
//! [`Service`]: https://docs.rs/tower/latest/tower/trait.Service.html
mod identity;
mod layer_fn;
mod stack;
pub use self::{
identity::Identity,
layer_fn::{layer_fn, LayerFn},
stack::Stack,
};
/// Decorates a [`Service`], transforming either the request or the response.
///
/// Often, many of the pieces needed for writing network applications can be
/// reused across multiple services. The `Layer` trait can be used to write
/// reusable components that can be applied to very different kinds of services;
/// for example, it can be applied to services operating on different protocols,
/// and to both the client and server side of a network transaction.
///
/// # Log
///
/// Take request logging as an example:
///
/// ```rust
/// # use tower_service::Service;
/// # use std::task::{Poll, Context};
/// # use tower_layer::Layer;
/// # use std::fmt;
///
/// pub struct LogLayer {
/// target: &'static str,
/// }
///
/// impl<S> Layer<S> for LogLayer {
/// type Service = LogService<S>;
///
/// fn layer(&self, service: S) -> Self::Service {
/// LogService {
/// target: self.target,
/// service
/// }
/// }
/// }
///
/// // This service implements the Log behavior
/// pub struct LogService<S> {
/// target: &'static str,
/// service: S,
/// }
///
/// impl<S, Request> Service<Request> for LogService<S>
/// where
/// S: Service<Request>,
/// Request: fmt::Debug,
/// {
/// type Response = S::Response;
/// type Error = S::Error;
/// type Future = S::Future;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// self.service.poll_ready(cx)
/// }
///
/// fn call(&mut self, request: Request) -> Self::Future {
/// // Insert log statement here or other functionality
/// println!("request = {:?}, target = {:?}", request, self.target);
/// self.service.call(request)
/// }
/// }
/// ```
///
/// The above log implementation is decoupled from the underlying protocol and
/// is also decoupled from client or server concerns. In other words, the same
/// log middleware could be used in either a client or a server.
///
/// [`Service`]: https://docs.rs/tower/latest/tower/trait.Service.html
pub trait Layer<S> {
/// The wrapped service
type Service;
/// Wrap the given service with the middleware, returning a new service
/// that has been decorated with the middleware.
fn layer(&self, inner: S) -> Self::Service;
}
impl<'a, T, S> Layer<S> for &'a T
where
T: ?Sized + Layer<S>,
{
type Service = T::Service;
fn layer(&self, inner: S) -> Self::Service {
(**self).layer(inner)
}
}

View file

@ -0,0 +1,62 @@
use super::Layer;
use std::fmt;
/// Two middlewares chained together.
#[derive(Clone)]
pub struct Stack<Inner, Outer> {
inner: Inner,
outer: Outer,
}
impl<Inner, Outer> Stack<Inner, Outer> {
/// Create a new `Stack`.
pub fn new(inner: Inner, outer: Outer) -> Self {
Stack { inner, outer }
}
}
impl<S, Inner, Outer> Layer<S> for Stack<Inner, Outer>
where
Inner: Layer<S>,
Outer: Layer<Inner::Service>,
{
type Service = Outer::Service;
fn layer(&self, service: S) -> Self::Service {
let inner = self.inner.layer(service);
self.outer.layer(inner)
}
}
impl<Inner, Outer> fmt::Debug for Stack<Inner, Outer>
where
Inner: fmt::Debug,
Outer: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// The generated output of nested `Stack`s is very noisy and makes
// it harder to understand what is in a `ServiceBuilder`.
//
// Instead, this output is designed assuming that a `Stack` is
// usually quite nested, and inside a `ServiceBuilder`. Therefore,
// this skips using `f.debug_struct()`, since each one would force
// a new layer of indentation.
//
// - In compact mode, a nested stack ends up just looking like a flat
// list of layers.
//
// - In pretty mode, while a newline is inserted between each layer,
// the `DebugStruct` used in the `ServiceBuilder` will inject padding
// to that each line is at the same indentation level.
//
// Also, the order of [outer, inner] is important, since it reflects
// the order that the layers were added to the stack.
if f.alternate() {
// pretty
write!(f, "{:#?},\n{:#?}", self.outer, self.inner)
} else {
write!(f, "{:?}, {:?}", self.outer, self.inner)
}
}
}

View file

@ -0,0 +1,63 @@
# Unreleased
- None
# 0.3.2 (June 17, 2022)
## Added
- **docs**: Clarify subtlety around cloning and readiness in the `Service` docs
([#548])
- **docs**: Clarify details around shared resource consumption in `poll_ready()`
([#662])
[#548]: https://github.com/tower-rs/tower/pull/548
[#662]: https://github.com/tower-rs/tower/pull/662
# 0.3.1 (November 29, 2019)
- Improve example in `Service` docs. ([#510])
[#510]: https://github.com/tower-rs/tower/pull/510
# 0.3.0 (November 29, 2019)
- Update to `futures 0.3`.
- Update documentation for `std::future::Future`.
# 0.3.0-alpha.2 (September 30, 2019)
- Documentation fixes.
# 0.3.0-alpha.1 (Aug 20, 2019)
* Switch to `std::future::Future`
# 0.2.0 (Dec 12, 2018)
* Change `Service`'s `Request` associated type to be a generic instead.
* Before:
```rust
impl Service for Client {
type Request = HttpRequest;
type Response = HttpResponse;
// ...
}
```
* After:
```rust
impl Service<HttpRequest> for Client {
type Response = HttpResponse;
// ...
}
```
* Remove `NewService`, use `tower_util::MakeService` instead.
* Remove `Service::ready` and `Ready`, use `tower_util::ServiceExt` instead.
# 0.1.0 (Aug 9, 2018)
* Initial release

View file

@ -0,0 +1,28 @@
[package]
name = "tower-service"
# When releasing to crates.io:
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "v0.3.x" git tag.
version = "0.3.2"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower-service/0.3.2"
description = """
Trait representing an asynchronous, request / response based, client or server.
"""
categories = ["asynchronous", "network-programming"]
edition = "2018"
[dependencies]
[dev-dependencies]
http = "0.2"
tower-layer = { version = "0.3", path = "../tower-layer" }
tokio = { version = "1", features = ["macros", "time"] }
futures = "0.3"

View file

@ -0,0 +1,25 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View file

@ -0,0 +1,56 @@
# Tower Service
The foundational `Service` trait that [Tower] is based on.
[![Crates.io][crates-badge]][crates-url]
[![Documentation][docs-badge]][docs-url]
[![Documentation (master)][docs-master-badge]][docs-master-url]
[![MIT licensed][mit-badge]][mit-url]
[![Build Status][actions-badge]][actions-url]
[![Discord chat][discord-badge]][discord-url]
[crates-badge]: https://img.shields.io/crates/v/tower-service.svg
[crates-url]: https://crates.io/crates/tower-service
[docs-badge]: https://docs.rs/tower-service/badge.svg
[docs-url]: https://docs.rs/tower-service
[docs-master-badge]: https://img.shields.io/badge/docs-master-blue
[docs-master-url]: https://tower-rs.github.io/tower/tower_service
[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg
[mit-url]: LICENSE
[actions-badge]: https://github.com/tower-rs/tower/workflows/CI/badge.svg
[actions-url]:https://github.com/tower-rs/tower/actions?query=workflow%3ACI
[discord-badge]: https://img.shields.io/discord/500028886025895936?logo=discord&label=discord&logoColor=white
[discord-url]: https://discord.gg/EeF3cQw
## Overview
The [`Service`] trait provides the foundation upon which [Tower] is built. It is a
simple, but powerful trait. At its heart, `Service` is just an asynchronous
function of request to response.
```
async fn(Request) -> Result<Response, Error>
```
Implementations of `Service` take a request, the type of which varies per
protocol, and returns a future representing the eventual completion or failure
of the response.
Services are used to represent both clients and servers. An *instance* of
`Service` is used through a client; a server *implements* `Service`.
By using standardizing the interface, middleware can be created. Middleware
*implement* `Service` by passing the request to another `Service`. The
middleware may take actions such as modify the request.
[`Service`]: https://docs.rs/tower-service/latest/tower_service/trait.Service.html
[Tower]: https://crates.io/crates/tower
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View file

@ -0,0 +1,20 @@
#![warn(rust_2018_idioms, unreachable_pub)]
#![forbid(unsafe_code)]
use std::future::Future;
use std::task::{Context, Poll};
pub trait Service<Request> {
/// Responses given by the service.
type Response;
/// Errors produced by the service.
type Error;
/// The future response value.
type Future: Future<Output = Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn call(&mut self, req: Request) -> Self::Future;
}

418
tower/tower/CHANGELOG.md Normal file
View file

@ -0,0 +1,418 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# Unreleased
- None.
# 0.4.13 (June 17, 2022)
### Added
- **load_shed**: Public constructor for `Overloaded` error ([#661])
### Fixed
- **util**: Fix hang with `call_all` when the `Stream` of requests is pending
([#656])
- **ready_cache**: Ensure cancelation is observed by pending services ([#668],
fixes [#415])
- **docs**: Fix a missing section header due to a typo ([#646])
- **docs**: Fix broken links to `Service` trait ([#659])
[#661]: https://github.com/tower-rs/tower/pull/661
[#656]: https://github.com/tower-rs/tower/pull/656
[#668]: https://github.com/tower-rs/tower/pull/668
[#415]: https://github.com/tower-rs/tower/pull/415
[#646]: https://github.com/tower-rs/tower/pull/646
[#659]: https://github.com/tower-rs/tower/pull/659
# 0.4.12 (February 16, 2022)
### Fixed
- **hedge**, **load**, **retry**: Fix use of `Instant` operations that can panic
on platforms where `Instant` is not monotonic ([#633])
- Disable `attributes` feature on `tracing` dependency ([#623])
- Remove unused dependencies and dependency features with some feature
combinations ([#603], [#602])
- **docs**: Fix a typo in the RustDoc for `Buffer` ([#622])
### Changed
- Updated minimum supported Rust version (MSRV) to 1.49.0.
- **hedge**: Updated `hdrhistogram` dependency to v7.0 ([#602])
- Updated `tokio-util` dependency to v0.7 ([#638])
[#633]: https://github.com/tower-rs/tower/pull/633
[#623]: https://github.com/tower-rs/tower/pull/623
[#603]: https://github.com/tower-rs/tower/pull/603
[#602]: https://github.com/tower-rs/tower/pull/602
[#622]: https://github.com/tower-rs/tower/pull/622
[#638]: https://github.com/tower-rs/tower/pull/638
# 0.4.11 (November 18, 2021)
### Added
- **util**: Add `BoxCloneService` which is a `Clone + Send` boxed `Service` ([#615])
- **util**: Add `ServiceExt::boxed` and `ServiceExt::boxed_clone` for applying the
`BoxService` and `BoxCloneService` middleware ([#616])
- **builder**: Add `ServiceBuilder::boxed` and `ServiceBuilder::boxed_clone` for
applying `BoxService` and `BoxCloneService` layers ([#616])
### Fixed
- **util**: Remove redundant `F: Clone` bound from `ServiceExt::map_request` ([#607])
- **util**: Remove unnecessary `Debug` bounds from `impl Debug for BoxService` ([#617])
- **util**: Remove unnecessary `Debug` bounds from `impl Debug for UnsyncBoxService` ([#617])
- **balance**: Remove redundant `Req: Clone` bound from `Clone` impls
for `MakeBalance`, and `MakeBalanceLayer` ([#607])
- **balance**: Remove redundant `Req: Debug` bound from `Debug` impls
for `MakeBalance`, `MakeFuture`, `Balance`, and `Pool` ([#607])
- **ready-cache**: Remove redundant `Req: Debug` bound from `Debug` impl
for `ReadyCache` ([#607])
- **steer**: Remove redundant `Req: Debug` bound from `Debug` impl
for `Steer` ([#607])
- **docs**: Fix `doc(cfg(...))` attributes
of `PeakEwmaDiscover`, and `PendingRequestsDiscover` ([#610])
[#607]: https://github.com/tower-rs/tower/pull/607
[#610]: https://github.com/tower-rs/tower/pull/610
[#615]: https://github.com/tower-rs/tower/pull/615
[#616]: https://github.com/tower-rs/tower/pull/616
[#617]: https://github.com/tower-rs/tower/pull/617
# 0.4.10 (October 19, 2021)
- Fix accidental breaking change when using the
`rustdoc::broken_intra_doc_links` lint ([#605])
- Clarify that tower's minimum supported rust version is 1.46 ([#605])
[#605]: https://github.com/tower-rs/tower/pull/605
# 0.4.9 (October 13, 2021)
- Migrate to [pin-project-lite] ([#595])
- **builder**: Implement `Layer` for `ServiceBuilder` ([#600])
- **builder**: Add `ServiceBuilder::and_then` analogous to
`ServiceExt::and_then` ([#601])
[#600]: https://github.com/tower-rs/tower/pull/600
[#601]: https://github.com/tower-rs/tower/pull/601
[#595]: https://github.com/tower-rs/tower/pull/595
[pin-project-lite]: https://crates.io/crates/pin-project-lite
# 0.4.8 (May 28, 2021)
- **builder**: Add `ServiceBuilder::map_result` analogous to
`ServiceExt::map_result` ([#583])
- **limit**: Add `GlobalConcurrencyLimitLayer` to allow reusing a concurrency
limit across multiple services ([#574])
[#574]: https://github.com/tower-rs/tower/pull/574
[#583]: https://github.com/tower-rs/tower/pull/583
# 0.4.7 (April 27, 2021)
### Added
- **builder**: Add `ServiceBuilder::check_service` to check the request,
response, and error types of the output service. ([#576])
- **builder**: Add `ServiceBuilder::check_service_clone` to check the output
service can be cloned. ([#576])
### Fixed
- **spawn_ready**: Abort spawned background tasks when the `SpawnReady` service
is dropped, fixing a potential task/resource leak (#[581])
- Fixed broken documentation links ([#578])
[#576]: https://github.com/tower-rs/tower/pull/576
[#578]: https://github.com/tower-rs/tower/pull/578
[#581]: https://github.com/tower-rs/tower/pull/581
# 0.4.6 (February 26, 2021)
### Deprecated
- **util**: Deprecated `ServiceExt::ready_and` (renamed to `ServiceExt::ready`).
([#567])
- **util**: Deprecated `ReadyAnd` future (renamed to `Ready`). ([#567])
### Added
- **builder**: Add `ServiceBuilder::layer_fn` to add a layer built from a
function. ([#560])
- **builder**: Add `ServiceBuilder::map_future` for transforming the futures
produced by a service. ([#559])
- **builder**: Add `ServiceBuilder::service_fn` for applying `Layer`s to an
async function using `util::service_fn`. ([#564])
- **util**: Add example for `service_fn`. ([#563])
- **util**: Add `BoxLayer` for creating boxed `Layer` trait objects. ([#569])
[#567]: https://github.com/tower-rs/tower/pull/567
[#560]: https://github.com/tower-rs/tower/pull/560
[#559]: https://github.com/tower-rs/tower/pull/559
[#564]: https://github.com/tower-rs/tower/pull/564
[#563]: https://github.com/tower-rs/tower/pull/563
[#569]: https://github.com/tower-rs/tower/pull/569
# 0.4.5 (February 10, 2021)
### Added
- **util**: Add `ServiceExt::map_future`. ([#542])
- **builder**: Add `ServiceBuilder::option_layer` to optionally add a layer. ([#555])
- **make**: Add `Shared` which lets you implement `MakeService` by cloning a
service. ([#533])
### Fixed
- **util**: Make combinators that contain closures implement `Debug`. They
previously wouldn't since closures never implement `Debug`. ([#552])
- **steer**: Implement `Clone` for `Steer`. ([#554])
- **spawn-ready**: SpawnReady now propagates the current `tracing` span to
spawned tasks ([#557])
- Only pull in `tracing` for the features that need it. ([#551])
[#542]: https://github.com/tower-rs/tower/pull/542
[#555]: https://github.com/tower-rs/tower/pull/555
[#557]: https://github.com/tower-rs/tower/pull/557
[#533]: https://github.com/tower-rs/tower/pull/533
[#551]: https://github.com/tower-rs/tower/pull/551
[#554]: https://github.com/tower-rs/tower/pull/554
[#552]: https://github.com/tower-rs/tower/pull/552
# 0.4.4 (January 20, 2021)
### Added
- **util**: Implement `Layer` for `Either<A, B>`. ([#531])
- **util**: Implement `Clone` for `FilterLayer`. ([#535])
- **timeout**: Implement `Clone` for `TimeoutLayer`. ([#535])
- **limit**: Implement `Clone` for `RateLimitLayer`. ([#535])
### Fixed
- Added "full" feature which turns on all other features. ([#532])
- **spawn-ready**: Avoid oneshot allocations. ([#538])
[#531]: https://github.com/tower-rs/tower/pull/531
[#532]: https://github.com/tower-rs/tower/pull/532
[#535]: https://github.com/tower-rs/tower/pull/535
[#538]: https://github.com/tower-rs/tower/pull/538
# 0.4.3 (January 13, 2021)
### Added
- **filter**: `Filter::check` and `AsyncFilter::check` methods which check a
request against the filter's `Predicate` ([#521])
- **filter**: Added `get_ref`, `get_mut`, and `into_inner` methods to `Filter`
and `AsyncFilter`, allowing access to the wrapped service ([#522])
- **util**: Added `layer` associated function to `AndThen`, `Then`,
`MapRequest`, `MapResponse`, and `MapResult` types. These return a `Layer`
that produces middleware of that type, as a convenience to avoid having to
import the `Layer` type separately. ([#524])
- **util**: Added missing `Clone` impls to `AndThenLayer`, `MapRequestLayer`,
and `MapErrLayer`, when the mapped function implements `Clone` ([#525])
- **util**: Added `FutureService::new` constructor, with less restrictive bounds
than the `future_service` free function ([#523])
[#521]: https://github.com/tower-rs/tower/pull/521
[#522]: https://github.com/tower-rs/tower/pull/522
[#523]: https://github.com/tower-rs/tower/pull/523
[#524]: https://github.com/tower-rs/tower/pull/524
[#525]: https://github.com/tower-rs/tower/pull/525
# 0.4.2 (January 11, 2021)
### Added
- Export `layer_fn` and `LayerFn` from the `tower::layer` module. ([#516])
### Fixed
- Fix missing `Sync` implementation for `Buffer` and `ConcurrencyLimit` ([#518])
[#518]: https://github.com/tower-rs/tower/pull/518
[#516]: https://github.com/tower-rs/tower/pull/516
# 0.4.1 (January 7, 2021)
### Fixed
- Updated `tower-layer` to 0.3.1 to fix broken re-exports.
# 0.4.0 (January 7, 2021)
This is a major breaking release including a large number of changes. In
particular, this release updates `tower` to depend on Tokio 1.0, and moves all
middleware into the `tower` crate. In addition, Tower 0.4 reworks several
middleware APIs, as well as introducing new ones.
This release does *not* change the core `Service` or `Layer` traits, so `tower`
0.4 still depends on `tower-service` 0.3 and `tower-layer` 0.3. This means that
`tower` 0.4 is still compatible with libraries that depend on those crates.
### Added
- **make**: Added `MakeService::into_service` and `MakeService::as_service` for
converting `MakeService`s into `Service`s ([#492])
- **steer**: Added `steer` middleware for routing requests to one of a set of
services ([#426])
- **util**: Added `MapRequest` middleware and `ServiceExt::map_request`, for
applying a function to a request before passing it to the inner service
([#435])
- **util**: Added `MapResponse` middleware and `ServiceExt::map_response`, for
applying a function to the `Response` type of an inner service after its
future completes ([#435])
- **util**: Added `MapErr` middleware and `ServiceExt::map_err`, for
applying a function to the `Error` returned by an inner service if it fails
([#396])
- **util**: Added `MapResult` middleware and `ServiceExt::map_result`, for
applying a function to the `Result` returned by an inner service's future
regardless of whether it succeeds or fails ([#499])
- **util**: Added `Then` middleware and `ServiceExt::then`, for chaining another
future after an inner service's future completes (with a `Response` or an
`Error`) ([#500])
- **util**: Added `AndThen` middleware and `ServiceExt::and_then`, for
chaining another future after an inner service's future completes successfully
([#485])
- **util**: Added `layer_fn`, for constructing a `Layer` from a function taking
a `Service` and returning a different `Service` ([#491])
- **util**: Added `FutureService`, which implements `Service` for a
`Future` whose `Output` type is a `Service` ([#496])
- **util**: Added `BoxService::layer` and `UnsyncBoxService::layer`, to make
constructing layers more ergonomic ([#503])
- **layer**: Added `Layer` impl for `&Layer` ([#446])
- **retry**: Added `Retry::get_ref`, `Retry::get_mut`, and `Retry::into_inner`
to access the inner service ([#463])
- **timeout**: Added `Timeout::get_ref`, `Timeout::get_mut`, and
`Timeout::into_inner` to access the inner service ([#463])
- **buffer**: Added `Clone` and `Copy` impls for `BufferLayer` (#[493])
- Several documentation improvements ([#442], [#444], [#445], [#449], [#487],
[#490], [#506]])
### Changed
- All middleware `tower-*` crates were merged into `tower` and placed
behind feature flags ([#432])
- Updated Tokio dependency to 1.0 ([#489])
- **builder**: Make `ServiceBuilder::service` take `self` by reference rather
than by value ([#504])
- **reconnect**: Return errors from `MakeService` in the response future, rather than
in `poll_ready`, allowing the reconnect service to be reused when a reconnect
fails ([#386], [#437])
- **discover**: Changed `Discover` to be a sealed trait alias for a
`TryStream<Item = Change>`. `Discover` implementations are now written by
implementing `Stream`. ([#443])
- **load**: Renamed the `Instrument` trait to `TrackCompletion` ([#445])
- **load**: Renamed `NoInstrument` to `CompleteOnResponse` ([#445])
- **balance**: Renamed `BalanceLayer` to `MakeBalanceLayer` ([#449])
- **balance**: Renamed `BalanceMake` to `MakeBalance` ([#449])
- **ready-cache**: Changed `ready_cache::error::Failed`'s `fmt::Debug` impl to
require the key type to also implement `fmt::Debug` ([#467])
- **filter**: Changed `Filter` and `Predicate` to use a synchronous function as
a predicate ([#508])
- **filter**: Renamed the previous `Filter` and `Predicate` (where `Predicate`s
returned a `Future`) to `AsyncFilter` and `AsyncPredicate` ([#508])
- **filter**: `Predicate`s now take a `Request` type by value and may return a
new request, potentially of a different type ([#508])
- **filter**: `Predicate`s may now return an error of any type ([#508])
### Fixed
- **limit**: Fixed an issue where `RateLimit` services do not reset the remaining
count when rate limiting ([#438], [#439])
- **util**: Fixed a bug where `oneshot` futures panic if the service does not
immediately become ready ([#447])
- **ready-cache**: Fixed `ready_cache::error::Failed` not returning inner error types
via `Error::source` ([#467])
- **hedge**: Fixed an interaction with `buffer` where `buffer` slots were
eagerly reserved for hedge requests even if they were not sent ([#472])
- **hedge**: Fixed the use of a fixed 10 second bound on the hedge latency
histogram resulting on errors with longer-lived requests. The latency
histogram now automatically resizes ([#484])
- **buffer**: Fixed an issue where tasks waiting for buffer capacity were not
woken when a buffer is dropped, potentially resulting in a task leak ([#480])
### Removed
- Remove `ServiceExt::ready`.
- **discover**: Removed `discover::stream` module, since `Discover` is now an
alias for `Stream` ([#443])
- **buffer**: Removed `MakeBalance::from_rng`, which caused all balancers to use
the same RNG ([#497])
[#432]: https://github.com/tower-rs/tower/pull/432
[#426]: https://github.com/tower-rs/tower/pull/426
[#435]: https://github.com/tower-rs/tower/pull/435
[#499]: https://github.com/tower-rs/tower/pull/499
[#386]: https://github.com/tower-rs/tower/pull/386
[#437]: https://github.com/tower-rs/tower/pull/487
[#438]: https://github.com/tower-rs/tower/pull/438
[#437]: https://github.com/tower-rs/tower/pull/439
[#443]: https://github.com/tower-rs/tower/pull/443
[#442]: https://github.com/tower-rs/tower/pull/442
[#444]: https://github.com/tower-rs/tower/pull/444
[#445]: https://github.com/tower-rs/tower/pull/445
[#446]: https://github.com/tower-rs/tower/pull/446
[#447]: https://github.com/tower-rs/tower/pull/447
[#449]: https://github.com/tower-rs/tower/pull/449
[#463]: https://github.com/tower-rs/tower/pull/463
[#396]: https://github.com/tower-rs/tower/pull/396
[#467]: https://github.com/tower-rs/tower/pull/467
[#472]: https://github.com/tower-rs/tower/pull/472
[#480]: https://github.com/tower-rs/tower/pull/480
[#484]: https://github.com/tower-rs/tower/pull/484
[#489]: https://github.com/tower-rs/tower/pull/489
[#497]: https://github.com/tower-rs/tower/pull/497
[#487]: https://github.com/tower-rs/tower/pull/487
[#493]: https://github.com/tower-rs/tower/pull/493
[#491]: https://github.com/tower-rs/tower/pull/491
[#495]: https://github.com/tower-rs/tower/pull/495
[#503]: https://github.com/tower-rs/tower/pull/503
[#504]: https://github.com/tower-rs/tower/pull/504
[#492]: https://github.com/tower-rs/tower/pull/492
[#500]: https://github.com/tower-rs/tower/pull/500
[#490]: https://github.com/tower-rs/tower/pull/490
[#506]: https://github.com/tower-rs/tower/pull/506
[#508]: https://github.com/tower-rs/tower/pull/508
[#485]: https://github.com/tower-rs/tower/pull/485
# 0.3.1 (January 17, 2020)
- Allow opting out of tracing/log (#410).
# 0.3.0 (December 19, 2019)
- Update all tower based crates to `0.3`.
- Update to `tokio 0.2`
- Update to `futures 0.3`
# 0.3.0-alpha.2 (September 30, 2019)
- Move to `futures-*-preview 0.3.0-alpha.19`
- Move to `pin-project 0.4`
# 0.3.0-alpha.1a (September 13, 2019)
- Update `tower-buffer` to `0.3.0-alpha.1b`
# 0.3.0-alpha.1 (September 11, 2019)
- Move to `std::future`
# 0.1.1 (July 19, 2019)
- Add `ServiceBuilder::into_inner`
# 0.1.0 (April 26, 2019)
- Initial release

107
tower/tower/Cargo.toml Normal file
View file

@ -0,0 +1,107 @@
[package]
name = "tower"
# When releasing to crates.io:
# - Update doc url
# - Cargo.toml
# - README.md
# - Update CHANGELOG.md.
# - Create "vX.X.X" git tag.
version = "0.4.13"
authors = ["Tower Maintainers <team@tower-rs.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tower-rs/tower"
homepage = "https://github.com/tower-rs/tower"
documentation = "https://docs.rs/tower/0.4.13"
description = """
Tower is a library of modular and reusable components for building robust
clients and servers.
"""
categories = ["asynchronous", "network-programming"]
keywords = ["io", "async", "non-blocking", "futures", "service"]
edition = "2018"
rust-version = "1.49.0"
[features]
default = ["log"]
# Internal
__common = ["futures-core", "pin-project-lite"]
full = [
"balance",
"buffer",
"discover",
"filter",
"hedge",
"limit",
"load",
"load-shed",
"make",
"ready-cache",
"reconnect",
"retry",
"spawn-ready",
"steer",
"timeout",
"util",
]
# FIXME: Use weak dependency once available (https://github.com/rust-lang/cargo/issues/8832)
log = ["tracing/log"]
balance = ["discover", "load", "ready-cache", "make", "rand", "slab"]
buffer = ["__common", "tokio/sync", "tokio/rt", "tokio-util", "tracing"]
discover = ["__common"]
filter = ["__common", "futures-util"]
hedge = ["util", "filter", "futures-util", "hdrhistogram", "tokio/time", "tracing"]
limit = ["__common", "tokio/time", "tokio/sync", "tokio-util", "tracing"]
load = ["__common", "tokio/time", "tracing"]
load-shed = ["__common"]
make = ["futures-util", "pin-project-lite", "tokio/io-std"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"]
reconnect = ["make", "tokio/io-std", "tracing"]
retry = ["__common", "tokio/time"]
spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
steer = []
timeout = ["pin-project-lite", "tokio/time"]
util = ["__common", "futures-util", "pin-project"]
[dependencies]
tower-layer = { version = "0.3.1", path = "../tower-layer" }
tower-service = { version = "0.3.1", path = "../tower-service" }
futures-core = { version = "0.3", optional = true }
futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
hdrhistogram = { version = "7.0", optional = true, default-features = false }
indexmap = { version = "1.0.2", optional = true }
rand = { version = "0.8", features = ["small_rng"], optional = true }
slab = { version = "0.4", optional = true }
tokio = { version = "1.6", optional = true, features = ["sync"] }
tokio-stream = { version = "0.1.0", optional = true }
tokio-util = { version = "0.7.0", default-features = false, optional = true }
tracing = { version = "0.1.2", default-features = false, features = ["std"], optional = true }
pin-project = { version = "1", optional = true }
pin-project-lite = { version = "0.2.7", optional = true }
[dev-dependencies]
futures = "0.3"
hdrhistogram = { version = "7.0", default-features = false }
pin-project-lite = "0.2.7"
tokio = { version = "1.6.2", features = ["macros", "sync", "test-util", "rt-multi-thread"] }
tokio-stream = "0.1"
tokio-test = "0.4"
tower-test = { version = "0.4", path = "../tower-test" }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi"] }
http = "0.2"
lazy_static = "1.4.0"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[package.metadata.playground]
features = ["full"]
[[example]]
name = "tower-balance"
path = "examples/tower-balance.rs"
required-features = ["full"]

25
tower/tower/LICENSE Normal file
View file

@ -0,0 +1,25 @@
Copyright (c) 2019 Tower Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

190
tower/tower/README.md Normal file
View file

@ -0,0 +1,190 @@
# Tower
Tower is a library of modular and reusable components for building robust
networking clients and servers.
[![Crates.io][crates-badge]][crates-url]
[![Documentation][docs-badge]][docs-url]
[![Documentation (master)][docs-master-badge]][docs-master-url]
[![MIT licensed][mit-badge]][mit-url]
[![Build Status][actions-badge]][actions-url]
[![Discord chat][discord-badge]][discord-url]
[crates-badge]: https://img.shields.io/crates/v/tower.svg
[crates-url]: https://crates.io/crates/tower
[docs-badge]: https://docs.rs/tower/badge.svg
[docs-url]: https://docs.rs/tower
[docs-master-badge]: https://img.shields.io/badge/docs-master-blue
[docs-master-url]: https://tower-rs.github.io/tower/tower
[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg
[mit-url]: LICENSE
[actions-badge]: https://github.com/tower-rs/tower/workflows/CI/badge.svg
[actions-url]:https://github.com/tower-rs/tower/actions?query=workflow%3ACI
[discord-badge]: https://img.shields.io/discord/500028886025895936?logo=discord&label=discord&logoColor=white
[discord-url]: https://discord.gg/EeF3cQw
## Overview
Tower aims to make it as easy as possible to build robust networking clients and
servers. It is protocol agnostic, but is designed around a request / response
pattern. If your protocol is entirely stream based, Tower may not be a good fit.
Tower provides a simple core abstraction, the [`Service`] trait, which
represents an asynchronous function taking a request and returning either a
response or an error. This abstraction can be used to model both clients and
servers.
Generic components, like [timeouts], [rate limiting], and [load balancing],
can be modeled as [`Service`]s that wrap some inner service and apply
additional behavior before or after the inner service is called. This allows
implementing these components in a protocol-agnostic, composable way. Typically,
such services are referred to as _middleware_.
An additional abstraction, the [`Layer`] trait, is used to compose
middleware with [`Service`]s. If a [`Service`] can be thought of as an
asynchronous function from a request type to a response type, a [`Layer`] is
a function taking a [`Service`] of one type and returning a [`Service`] of a
different type. The [`ServiceBuilder`] type is used to add middleware to a
service by composing it with multiple multiple [`Layer`]s.
### The Tower Ecosystem
Tower is made up of the following crates:
* [`tower`] (this crate)
* [`tower-service`]
* [`tower-layer`]
* [`tower-test`]
Since the [`Service`] and [`Layer`] traits are important integration points
for all libraries using Tower, they are kept as stable as possible, and
breaking changes are made rarely. Therefore, they are defined in separate
crates, [`tower-service`] and [`tower-layer`]. This crate contains
re-exports of those core traits, implementations of commonly-used
middleware, and [utilities] for working with [`Service`]s and [`Layer`]s.
Finally, the [`tower-test`] crate provides tools for testing programs using
Tower.
## Usage
Tower provides an abstraction layer, and generic implementations of various
middleware. This means that the `tower` crate on its own does *not* provide
a working implementation of a network client or server. Instead, Tower's
[`Service` trait][`Service`] provides an integration point between
application code, libraries providing middleware implementations, and
libraries that implement servers and/or clients for various network
protocols.
Depending on your particular use case, you might use Tower in several ways:
* **Implementing application logic** for a networked program. You might
use the [`Service`] trait to model your application's behavior, and use
the middleware [provided by this crate][all_layers] and by other libraries
to add functionality to clients and servers provided by one or more
protocol implementations.
* **Implementing middleware** to add custom behavior to network clients and
servers in a reusable manner. This might be general-purpose middleware
(and if it is, please consider releasing your middleware as a library for
other Tower users!) or application-specific behavior that needs to be
shared between multiple clients or servers.
* **Implementing a network protocol**. Libraries that implement network
protocols (such as HTTP) can depend on `tower-service` to use the
[`Service`] trait as an integration point between the protocol and user
code. For example, a client for some protocol might implement [`Service`],
allowing users to add arbitrary Tower middleware to those clients.
Similarly, a server might be created from a user-provided [`Service`].
Additionally, when a network protocol requires functionality already
provided by existing Tower middleware, a protocol implementation might use
Tower middleware internally, as well as as an integration point.
### Library Support
A number of third-party libraries support Tower and the [`Service`] trait.
The following is an incomplete list of such libraries:
* [`hyper`]: A fast and correct low-level HTTP implementation.
* [`tonic`]: A [gRPC-over-HTTP/2][grpc] implementation built on top of
[`hyper`]. See [here][tonic-examples] for examples of using [`tonic`] with
Tower.
* [`warp`]: A lightweight, composable web framework. See
[here][warp-service] for details on using [`warp`] with Tower.
* [`tower-lsp`] and its fork, [`lspower`]: implementations of the [Language
Server Protocol][lsp] based on Tower.
* [`kube`]: Kubernetes client and futures controller runtime. [`kube::Client`]
makes use of the Tower ecosystem: [`tower`], [`tower-http`], and
[`tower-test`]. See [here][kube-example-minimal] and
[here][kube-example-trace] for examples of using [`kube`] with Tower.
[`hyper`]: https://crates.io/crates/hyper
[`tonic`]: https://crates.io/crates/tonic
[tonic-examples]: https://github.com/hyperium/tonic/tree/master/examples/src/tower
[grpc]: https://grpc.io
[`warp`]: https://crates.io/crates/warp
[warp-service]: https://docs.rs/warp/0.2.5/warp/fn.service.html
[`tower-lsp`]: https://crates.io/crates/tower-lsp
[`lspower`]: https://crates.io/crates/lspower
[lsp]: https://microsoft.github.io/language-server-protocol/
[`kube`]: https://crates.io/crates/kube
[`kube::Client`]: https://docs.rs/kube/latest/kube/struct.Client.html
[kube-example-minimal]: https://github.com/clux/kube-rs/blob/master/examples/custom_client.rs
[kube-example-trace]: https://github.com/clux/kube-rs/blob/master/examples/custom_client_trace.rs
[`tower-http`]: https://crates.io/crates/tower-http
If you're the maintainer of a crate that supports Tower, we'd love to add
your crate to this list! Please [open a PR] adding a brief description of
your library!
### Getting Started
The various middleware implementations provided by this crate are feature
flagged, so that users can only compile the parts of Tower they need. By
default, all the optional middleware are disabled.
To get started using all of Tower's optional middleware, add this to your
`Cargo.toml`:
```toml
tower = { version = "0.4", features = ["full"] }
```
Alternatively, you can only enable some features. For example, to enable
only the [`retry`] and [`timeout`][timeouts] middleware, write:
```toml
tower = { version = "0.4", features = ["retry", "timeout"] }
```
See [here][all_layers] for a complete list of all middleware provided by
Tower.
[`Service`]: https://docs.rs/tower/latest/tower/trait.Service.html
[`Layer`]: https://docs.rs/tower/latest/tower/trait.Layer.html
[all_layers]: https://docs.rs/tower/latest/tower/#modules
[timeouts]: https://docs.rs/tower/latest/tower/timeout/
[rate limiting]: https://docs.rs/tower/latest/tower/limit/rate
[load balancing]: https://docs.rs/tower/latest/tower/balance/
[`ServiceBuilder`]: https://docs.rs/tower/latest/tower/struct.ServiceBuilder.html
[utilities]: https://docs.rs/tower/latest/tower/trait.ServiceExt.html
[`tower`]: https://crates.io/crates/tower
[`tower-service`]: https://crates.io/crates/tower-service
[`tower-layer`]: https://crates.io/crates/tower-layer
[`tower-test`]: https://crates.io/crates/tower-test
[`retry`]: https://docs.rs/tower/latest/tower/retry
[open a PR]: https://github.com/tower-rs/tower/compare
## Supported Rust Versions
Tower will keep a rolling MSRV (minimum supported Rust version) policy of **at
least** 6 months. When increasing the MSRV, the new Rust version must have been
released at least six months ago. The current MSRV is 1.49.0.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tower by you, shall be licensed as MIT, without any additional
terms or conditions.

View file

@ -0,0 +1,21 @@
//! Error types for the [`tower::balance`] middleware.
//!
//! [`tower::balance`]: crate::balance
use std::fmt;
/// The balancer's endpoint discovery stream failed.
#[derive(Debug)]
pub struct Discover(pub(crate) crate::BoxError);
impl fmt::Display for Discover {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "load balancer discovery error: {}", self.0)
}
}
impl std::error::Error for Discover {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&*self.0)
}
}

View file

@ -0,0 +1,61 @@
//! Middleware that allows balancing load among multiple services.
//!
//! In larger systems, multiple endpoints are often available for a given service. As load
//! increases, you want to ensure that that load is spread evenly across the available services.
//! Otherwise, clients could see spikes in latency if their request goes to a particularly loaded
//! service, even when spare capacity is available to handle that request elsewhere.
//!
//! This module provides two pieces of middleware that helps with this type of load balancing:
//!
//! First, [`p2c`] implements the "[Power of Two Random Choices]" algorithm, a simple but robust
//! technique for spreading load across services with only inexact load measurements. Use this if
//! the set of available services is not within your control, and you simply want to spread load
//! among that set of services.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//!
//! Second, [`pool`] implements a dynamically sized pool of services. It estimates the overall
//! current load by tracking successful and unsuccessful calls to [`poll_ready`], and uses an
//! exponentially weighted moving average to add (using [`MakeService`]) or remove (by dropping)
//! services in response to increases or decreases in load. Use this if you are able to
//! dynamically add more service endpoints to the system to handle added load.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! # #[cfg(feature = "load")]
//! # fn warnings_are_errors() {
//! use tower::balance::p2c::Balance;
//! use tower::load::Load;
//! use tower::{Service, ServiceExt};
//! use futures_util::pin_mut;
//! # use futures_core::Stream;
//! # use futures_util::StreamExt;
//!
//! async fn spread<Req, S: Service<Req> + Load>(svc1: S, svc2: S, reqs: impl Stream<Item = Req>)
//! where
//! S::Error: Into<tower::BoxError>,
//! # // this bound is pretty unfortunate, and the compiler does _not_ help
//! S::Metric: std::fmt::Debug,
//! {
//! // Spread load evenly across the two services
//! let p2c = Balance::new(tower::discover::ServiceList::new(vec![svc1, svc2]));
//!
//! // Issue all the requests that come in.
//! // Some will go to svc1, some will go to svc2.
//! pin_mut!(reqs);
//! let mut responses = p2c.call_all(reqs);
//! while let Some(rsp) = responses.next().await {
//! // ...
//! }
//! }
//! # }
//! ```
//!
//! [`MakeService`]: crate::MakeService
//! [`poll_ready`]: crate::Service::poll_ready
pub mod error;
pub mod p2c;
pub mod pool;

View file

@ -0,0 +1,60 @@
use super::MakeBalance;
use std::{fmt, marker::PhantomData};
use tower_layer::Layer;
/// Construct load balancers ([`Balance`]) over dynamic service sets ([`Discover`]) produced by the
/// "inner" service in response to requests coming from the "outer" service.
///
/// This construction may seem a little odd at first glance. This is not a layer that takes
/// requests and produces responses in the traditional sense. Instead, it is more like
/// [`MakeService`] in that it takes service _descriptors_ (see `Target` on [`MakeService`])
/// and produces _services_. Since [`Balance`] spreads requests across a _set_ of services,
/// the inner service should produce a [`Discover`], not just a single
/// [`Service`], given a service descriptor.
///
/// See the [module-level documentation](crate::balance) for details on load balancing.
///
/// [`Balance`]: crate::balance::p2c::Balance
/// [`Discover`]: crate::discover::Discover
/// [`MakeService`]: crate::MakeService
/// [`Service`]: crate::Service
pub struct MakeBalanceLayer<D, Req> {
_marker: PhantomData<fn(D, Req)>,
}
impl<D, Req> MakeBalanceLayer<D, Req> {
/// Build balancers using operating system entropy.
pub fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
impl<D, Req> Default for MakeBalanceLayer<D, Req> {
fn default() -> Self {
Self::new()
}
}
impl<D, Req> Clone for MakeBalanceLayer<D, Req> {
fn clone(&self) -> Self {
Self {
_marker: PhantomData,
}
}
}
impl<S, Req> Layer<S> for MakeBalanceLayer<S, Req> {
type Service = MakeBalance<S, Req>;
fn layer(&self, make_discover: S) -> Self::Service {
MakeBalance::new(make_discover)
}
}
impl<D, Req> fmt::Debug for MakeBalanceLayer<D, Req> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("MakeBalanceLayer").finish()
}
}

View file

@ -0,0 +1,125 @@
use super::Balance;
use crate::discover::Discover;
use futures_core::ready;
use pin_project_lite::pin_project;
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
/// Constructs load balancers over dynamic service sets produced by a wrapped "inner" service.
///
/// This is effectively an implementation of [`MakeService`] except that it forwards the service
/// descriptors (`Target`) to an inner service (`S`), and expects that service to produce a
/// service set in the form of a [`Discover`]. It then wraps the service set in a [`Balance`]
/// before returning it as the "made" service.
///
/// See the [module-level documentation](crate::balance) for details on load balancing.
///
/// [`MakeService`]: crate::MakeService
/// [`Discover`]: crate::discover::Discover
/// [`Balance`]: crate::balance::p2c::Balance
pub struct MakeBalance<S, Req> {
inner: S,
_marker: PhantomData<fn(Req)>,
}
pin_project! {
/// A [`Balance`] in the making.
///
/// [`Balance`]: crate::balance::p2c::Balance
pub struct MakeFuture<F, Req> {
#[pin]
inner: F,
_marker: PhantomData<fn(Req)>,
}
}
impl<S, Req> MakeBalance<S, Req> {
/// Build balancers using operating system entropy.
pub fn new(make_discover: S) -> Self {
Self {
inner: make_discover,
_marker: PhantomData,
}
}
}
impl<S, Req> Clone for MakeBalance<S, Req>
where
S: Clone,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
_marker: PhantomData,
}
}
}
impl<S, Target, Req> Service<Target> for MakeBalance<S, Req>
where
S: Service<Target>,
S::Response: Discover,
<S::Response as Discover>::Key: Hash,
<S::Response as Discover>::Service: Service<Req>,
<<S::Response as Discover>::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Response = Balance<S::Response, Req>;
type Error = S::Error;
type Future = MakeFuture<S::Future, Req>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, target: Target) -> Self::Future {
MakeFuture {
inner: self.inner.call(target),
_marker: PhantomData,
}
}
}
impl<S, Req> fmt::Debug for MakeBalance<S, Req>
where
S: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let Self { inner, _marker } = self;
f.debug_struct("MakeBalance").field("inner", inner).finish()
}
}
impl<F, T, E, Req> Future for MakeFuture<F, Req>
where
F: Future<Output = Result<T, E>>,
T: Discover,
<T as Discover>::Key: Hash,
<T as Discover>::Service: Service<Req>,
<<T as Discover>::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Output = Result<Balance<T, Req>, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let inner = ready!(this.inner.poll(cx))?;
let svc = Balance::new(inner);
Poll::Ready(Ok(svc))
}
}
impl<F, Req> fmt::Debug for MakeFuture<F, Req>
where
F: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let Self { inner, _marker } = self;
f.debug_struct("MakeFuture").field("inner", inner).finish()
}
}

View file

@ -0,0 +1,41 @@
//! This module implements the "[Power of Two Random Choices]" load balancing algorithm.
//!
//! It is a simple but robust technique for spreading load across services with only inexact load
//! measurements. As its name implies, whenever a request comes in, it samples two ready services
//! at random, and issues the request to whichever service is less loaded. How loaded a service is
//! is determined by the return value of [`Load`](crate::load::Load).
//!
//! As described in the [Finagle Guide][finagle]:
//!
//! > The algorithm randomly picks two services from the set of ready endpoints and
//! > selects the least loaded of the two. By repeatedly using this strategy, we can
//! > expect a manageable upper bound on the maximum load of any server.
//! >
//! > The maximum load variance between any two servers is bound by `ln(ln(n))` where
//! > `n` is the number of servers in the cluster.
//!
//! The balance service and layer implementations rely on _service discovery_ to provide the
//! underlying set of services to balance requests across. This happens through the
//! [`Discover`](crate::discover::Discover) trait, which is essentially a [`Stream`] that indicates
//! when services become available or go away. If you have a fixed set of services, consider using
//! [`ServiceList`](crate::discover::ServiceList).
//!
//! Since the load balancer needs to perform _random_ choices, the constructors in this module
//! usually come in two forms: one that uses randomness provided by the operating system, and one
//! that lets you specify the random seed to use. Usually the former is what you'll want, though
//! the latter may come in handy for reproducability or to reduce reliance on the operating system.
//!
//! [Power of Two Random Choices]: http://www.eecs.harvard.edu/~michaelm/postscripts/handbook2001.pdf
//! [finagle]: https://twitter.github.io/finagle/guide/Clients.html#power-of-two-choices-p2c-least-loaded
//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
mod layer;
mod make;
mod service;
#[cfg(test)]
mod test;
pub use layer::MakeBalanceLayer;
pub use make::{MakeBalance, MakeFuture};
pub use service::Balance;

View file

@ -0,0 +1,58 @@
use super::super::error;
use crate::discover::{Change, Discover};
use crate::load::Load;
use futures_core::ready;
use futures_util::future::{self, TryFutureExt};
use pin_project_lite::pin_project;
use rand::{rngs::SmallRng, Rng, SeedableRng};
use std::hash::Hash;
use std::marker::PhantomData;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot;
use tower_service::Service;
use tracing::{debug, trace};
pub struct Balance<D, Req> {
_req: PhantomData<(D, Req)>,
}
impl<D, Req> Balance<D, Req>
where
D: Discover,
D::Service: Service<Req>,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
pub fn new(discover: D) -> Self {
todo!()
}
}
impl<D, Req> Service<Req> for Balance<D, Req>
where
D: Discover + Unpin,
D::Key: Hash + Clone,
D::Error: Into<crate::BoxError>,
D::Service: Service<Req> + Load,
<D::Service as Load>::Metric: std::fmt::Debug,
<D::Service as Service<Req>>::Error: Into<crate::BoxError>,
{
type Response = <D::Service as Service<Req>>::Response;
type Error = crate::BoxError;
type Future = future::MapErr<
<D::Service as Service<Req>>::Future,
fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
fn call(&mut self, request: Req) -> Self::Future {
todo!()
}
}

View file

@ -0,0 +1,125 @@
use crate::discover::ServiceList;
use crate::load;
use futures_util::pin_mut;
use std::task::Poll;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_test::{assert_request_eq, mock};
use super::*;
#[tokio::test]
async fn empty() {
let empty: Vec<load::Constant<mock::Mock<(), &'static str>, usize>> = vec![];
let disco = ServiceList::new(empty);
let mut svc = mock::Spawn::new(Balance::new(disco));
assert_pending!(svc.poll_ready());
}
#[tokio::test]
async fn single_endpoint() {
let (mut svc, mut handle) = mock::spawn_with(|s| {
let mock = load::Constant::new(s, 0);
let disco = ServiceList::new(vec![mock].into_iter());
Balance::new(disco)
});
handle.allow(0);
assert_pending!(svc.poll_ready());
assert_eq!(
svc.get_ref().len(),
1,
"balancer must have discovered endpoint"
);
handle.allow(1);
assert_ready_ok!(svc.poll_ready());
let mut fut = task::spawn(svc.call(()));
assert_request_eq!(handle, ()).send_response(1);
assert_eq!(assert_ready_ok!(fut.poll()), 1);
handle.allow(1);
assert_ready_ok!(svc.poll_ready());
handle.send_error("endpoint lost");
assert_pending!(svc.poll_ready());
assert!(
svc.get_ref().is_empty(),
"balancer must drop failed endpoints"
);
}
#[tokio::test]
async fn two_endpoints_with_equal_load() {
let (mock_a, handle_a) = mock::pair();
let (mock_b, handle_b) = mock::pair();
let mock_a = load::Constant::new(mock_a, 1);
let mock_b = load::Constant::new(mock_b, 1);
pin_mut!(handle_a);
pin_mut!(handle_b);
let disco = ServiceList::new(vec![mock_a, mock_b].into_iter());
let mut svc = mock::Spawn::new(Balance::new(disco));
handle_a.allow(0);
handle_b.allow(0);
assert_pending!(svc.poll_ready());
assert_eq!(
svc.get_ref().len(),
2,
"balancer must have discovered both endpoints"
);
handle_a.allow(1);
handle_b.allow(0);
assert_ready_ok!(
svc.poll_ready(),
"must be ready when one of two services is ready"
);
{
let mut fut = task::spawn(svc.call(()));
assert_request_eq!(handle_a, ()).send_response("a");
assert_eq!(assert_ready_ok!(fut.poll()), "a");
}
handle_a.allow(0);
handle_b.allow(1);
assert_ready_ok!(
svc.poll_ready(),
"must be ready when both endpoints are ready"
);
{
let mut fut = task::spawn(svc.call(()));
assert_request_eq!(handle_b, ()).send_response("b");
assert_eq!(assert_ready_ok!(fut.poll()), "b");
}
handle_a.allow(1);
handle_b.allow(1);
for _ in 0..2 {
assert_ready_ok!(
svc.poll_ready(),
"must be ready when both endpoints are ready"
);
let mut fut = task::spawn(svc.call(()));
for (ref mut h, c) in &mut [(&mut handle_a, "a"), (&mut handle_b, "b")] {
if let Poll::Ready(Some((_, tx))) = h.as_mut().poll_request() {
tracing::info!("using {}", c);
tx.send_response(c);
h.allow(0);
}
}
assert_ready_ok!(fut.poll());
}
handle_a.send_error("endpoint lost");
assert_pending!(svc.poll_ready());
assert_eq!(
svc.get_ref().len(),
1,
"balancer must drop failed endpoints",
);
}

View file

@ -0,0 +1,142 @@
//! This module defines a load-balanced pool of services that adds new services when load is high.
//!
//! The pool uses `poll_ready` as a signal indicating whether additional services should be spawned
//! to handle the current level of load. Specifically, every time `poll_ready` on the inner service
//! returns `Ready`, [`Pool`] consider that a 0, and every time it returns `Pending`, [`Pool`]
//! considers it a 1. [`Pool`] then maintains an [exponential moving
//! average](https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average) over those
//! samples, which gives an estimate of how often the underlying service has been ready when it was
//! needed "recently" (see [`Builder::urgency`]). If the service is loaded (see
//! [`Builder::loaded_above`]), a new service is created and added to the underlying [`Balance`].
//! If the service is underutilized (see [`Builder::underutilized_below`]) and there are two or
//! more services, then the latest added service is removed. In either case, the load estimate is
//! reset to its initial value (see [`Builder::initial`] to prevent services from being rapidly
//! added or removed.
#![deny(missing_docs)]
use super::p2c::Balance;
use crate::discover::Change;
use crate::load::Load;
use crate::make::MakeService;
use futures_core::{ready, Stream};
use pin_project_lite::pin_project;
use slab::Slab;
use std::{
fmt,
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
/// A wrapper around `MakeService` that discovers a new service when load is high, and removes a
/// service when load is low. See [`Pool`].
pub struct PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
_p: PhantomData<(MS, Target, Request)>,
}
impl<MS, Target, Request> Stream for PoolDiscoverer<MS, Target, Request>
where
MS: MakeService<Target, Request>,
{
type Item = Result<(Change<usize, DropNotifyService<MS::Service>>), MS::MakeError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}
/// A [builder] that lets you configure how a [`Pool`] determines whether the underlying service is
/// loaded or not. See the [module-level documentation](self) and the builder's methods for
/// details.
///
/// [builder]: https://rust-lang-nursery.github.io/api-guidelines/type-safety.html#builders-enable-construction-of-complex-values-c-builder
#[derive(Copy, Clone, Debug)]
pub struct Builder {}
impl Builder {
/// See [`Pool::new`].
pub fn build<MS, Target, Request>() -> ()
where
MS: MakeService<Target, Request>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: Into<crate::BoxError>,
MS::Error: Into<crate::BoxError>,
{
let d: PoolDiscoverer<MS, Target, Request> = todo!();
let x = Balance::new(Box::pin(d));
todo!()
}
}
/// A dynamically sized, load-balanced pool of `Service` instances.
pub struct Pool<MS, Target, Request> {
// the Pin<Box<_>> here is needed since Balance requires the Service to be Unpin
balance: (MS, Target, Request),
}
type PinBalance<S, Request> = Balance<Pin<Box<S>>, Request>;
impl<MS, Target, Req> Service<Req> for Pool<MS, Target, Req>
where
MS: MakeService<Target, Req>,
MS::Service: Load,
<MS::Service as Load>::Metric: std::fmt::Debug,
MS::MakeError: Into<crate::BoxError>,
MS::Error: Into<crate::BoxError>,
Target: Clone,
{
type Response = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Response;
type Error = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Error;
type Future = <PinBalance<PoolDiscoverer<MS, Target, Req>, Req> as Service<Req>>::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
fn call(&mut self, req: Req) -> Self::Future {
todo!()
}
}
#[doc(hidden)]
#[derive(Debug)]
pub struct DropNotifyService<Svc> {
svc: Svc,
id: usize,
notify: tokio::sync::mpsc::UnboundedSender<usize>,
}
impl<Svc> Drop for DropNotifyService<Svc> {
fn drop(&mut self) {
todo!()
}
}
impl<Svc: Load> Load for DropNotifyService<Svc> {
type Metric = Svc::Metric;
fn load(&self) -> Self::Metric {
todo!()
}
}
impl<Request, Svc: Service<Request>> Service<Request> for DropNotifyService<Svc> {
type Response = Svc::Response;
type Future = Svc::Future;
type Error = Svc::Error;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
todo!()
}
fn call(&mut self, req: Request) -> Self::Future {
todo!()
}
}

View file

@ -0,0 +1,190 @@
use crate::load;
use futures_util::pin_mut;
use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task};
use tower_test::{assert_request_eq, mock};
use super::*;
#[tokio::test]
async fn basic() {
// start the pool
let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let mut pool = mock::Spawn::new(Builder::new().build(mock, ()));
assert_pending!(pool.poll_ready());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready());
// send a request to the one backing service
let mut fut = task::spawn(pool.call(()));
assert_pending!(fut.poll());
assert_request_eq!(svc1, ()).send_response("foobar");
assert_eq!(assert_ready_ok!(fut.poll()), "foobar");
}
#[tokio::test]
async fn high_load() {
// start the pool
let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let pool = Builder::new()
.urgency(1.0) // so _any_ Pending will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.max_services(Some(2))
.build(mock, ());
let mut pool = mock::Spawn::new(pool);
assert_pending!(pool.poll_ready());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready());
// make the one backing service not ready
let mut fut1 = task::spawn(pool.call(()));
// if we poll_ready again, pool should notice that load is increasing
// since urgency == 1.0, it should immediately enter high load
assert_pending!(pool.poll_ready());
// it should ask the maker for another service, so we give it one
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again for one more request
assert_ready_ok!(pool.poll_ready());
let mut fut2 = task::spawn(pool.call(()));
assert_pending!(pool.poll_ready());
// the pool should _not_ try to add another service
// sicen we have max_services(2)
assert_pending!(handle.as_mut().poll_request());
// let see that each service got one request
assert_request_eq!(svc1, ()).send_response("foo");
assert_request_eq!(svc2, ()).send_response("bar");
assert_eq!(assert_ready_ok!(fut1.poll()), "foo");
assert_eq!(assert_ready_ok!(fut2.poll()), "bar");
}
#[tokio::test]
async fn low_load() {
// start the pool
let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let pool = Builder::new()
.urgency(1.0) // so any event will change the service count
.build(mock, ());
let mut pool = mock::Spawn::new(pool);
assert_pending!(pool.poll_ready());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready());
// cycling a request should now work
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc1, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll()), "foo");
// and pool should now not be ready (since svc1 isn't ready)
// it should immediately try to add another service
// which we give it
assert_pending!(pool.poll_ready());
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// pool is now ready
// which (because of urgency == 1.0) should immediately cause it to drop a service
// it'll drop svc1, so it'll still be ready
assert_ready_ok!(pool.poll_ready());
// and even with another ready, it won't drop svc2 since its now the only service
assert_ready_ok!(pool.poll_ready());
// cycling a request should now work on svc2
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc2, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll()), "foo");
// and again (still svc2)
svc2.allow(1);
assert_ready_ok!(pool.poll_ready());
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc2, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll()), "foo");
}
#[tokio::test]
async fn failing_service() {
// start the pool
let (mock, handle) = mock::pair::<(), load::Constant<mock::Mock<(), &'static str>, usize>>();
pin_mut!(handle);
let pool = Builder::new()
.urgency(1.0) // so _any_ Pending will add a service
.underutilized_below(0.0) // so no Ready will remove a service
.build(mock, ());
let mut pool = mock::Spawn::new(pool);
assert_pending!(pool.poll_ready());
// give the pool a backing service
let (svc1_m, svc1) = mock::pair();
pin_mut!(svc1);
svc1.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc1_m, 0));
assert_ready_ok!(pool.poll_ready());
// one request-response cycle
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc1, ()).send_response("foo");
assert_eq!(assert_ready_ok!(fut.poll()), "foo");
// now make svc1 fail, so it has to be removed
svc1.send_error("ouch");
// polling now should recognize the failed service,
// try to create a new one, and then realize the maker isn't ready
assert_pending!(pool.poll_ready());
// then we release another service
let (svc2_m, svc2) = mock::pair();
pin_mut!(svc2);
svc2.allow(1);
assert_request_eq!(handle, ()).send_response(load::Constant::new(svc2_m, 0));
// the pool should now be ready again
assert_ready_ok!(pool.poll_ready());
// and a cycle should work (and go through svc2)
let mut fut = task::spawn(pool.call(()));
assert_request_eq!(svc2, ()).send_response("bar");
assert_eq!(assert_ready_ok!(fut.poll()), "bar");
}

828
tower/tower/src/builder.rs Normal file
View file

@ -0,0 +1,828 @@
//! Builder types to compose layers and services
use tower_layer::{Identity, Layer, Stack};
use tower_service::Service;
use std::fmt;
/// Declaratively construct [`Service`] values.
///
/// [`ServiceBuilder`] provides a [builder-like interface][builder] for composing
/// layers to be applied to a [`Service`].
///
/// # Service
///
/// A [`Service`] is a trait representing an asynchronous function of a request
/// to a response. It is similar to `async fn(Request) -> Result<Response, Error>`.
///
/// A [`Service`] is typically bound to a single transport, such as a TCP
/// connection. It defines how _all_ inbound or outbound requests are handled
/// by that connection.
///
/// [builder]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
///
/// # Order
///
/// The order in which layers are added impacts how requests are handled. Layers
/// that are added first will be called with the request first. The argument to
/// `service` will be last to see the request.
///
/// ```
/// # // this (and other) doctest is ignored because we don't have a way
/// # // to say that it should only be run with cfg(feature = "...")
/// # use tower::Service;
/// # use tower::builder::ServiceBuilder;
/// # #[cfg(all(feature = "buffer", feature = "limit"))]
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
/// ServiceBuilder::new()
/// .buffer(100)
/// .concurrency_limit(10)
/// .service(svc)
/// # ;
/// # }
/// ```
///
/// In the above example, the buffer layer receives the request first followed
/// by `concurrency_limit`. `buffer` enables up to 100 request to be in-flight
/// **on top of** the requests that have already been forwarded to the next
/// layer. Combined with `concurrency_limit`, this allows up to 110 requests to be
/// in-flight.
///
/// ```
/// # use tower::Service;
/// # use tower::builder::ServiceBuilder;
/// # #[cfg(all(feature = "buffer", feature = "limit"))]
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
/// ServiceBuilder::new()
/// .concurrency_limit(10)
/// .buffer(100)
/// .service(svc)
/// # ;
/// # }
/// ```
///
/// The above example is similar, but the order of layers is reversed. Now,
/// `concurrency_limit` applies first and only allows 10 requests to be in-flight
/// total.
///
/// # Examples
///
/// A [`Service`] stack with a single layer:
///
/// ```
/// # use tower::Service;
/// # use tower::builder::ServiceBuilder;
/// # #[cfg(feature = "limit")]
/// # use tower::limit::concurrency::ConcurrencyLimitLayer;
/// # #[cfg(feature = "limit")]
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
/// ServiceBuilder::new()
/// .concurrency_limit(5)
/// .service(svc);
/// # ;
/// # }
/// ```
///
/// A [`Service`] stack with _multiple_ layers that contain rate limiting,
/// in-flight request limits, and a channel-backed, clonable [`Service`]:
///
/// ```
/// # use tower::Service;
/// # use tower::builder::ServiceBuilder;
/// # use std::time::Duration;
/// # #[cfg(all(feature = "buffer", feature = "limit"))]
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
/// ServiceBuilder::new()
/// .buffer(5)
/// .concurrency_limit(5)
/// .rate_limit(5, Duration::from_secs(1))
/// .service(svc);
/// # ;
/// # }
/// ```
///
/// [`Service`]: crate::Service
#[derive(Clone)]
pub struct ServiceBuilder<L> {
layer: L,
}
impl Default for ServiceBuilder<Identity> {
fn default() -> Self {
Self::new()
}
}
impl ServiceBuilder<Identity> {
/// Create a new [`ServiceBuilder`].
pub fn new() -> Self {
ServiceBuilder {
layer: Identity::new(),
}
}
}
impl<L> ServiceBuilder<L> {
/// Add a new layer `T` into the [`ServiceBuilder`].
///
/// This wraps the inner service with the service provided by a user-defined
/// [`Layer`]. The provided layer must implement the [`Layer`] trait.
///
/// [`Layer`]: crate::Layer
pub fn layer<T>(self, layer: T) -> ServiceBuilder<Stack<T, L>> {
ServiceBuilder {
layer: Stack::new(layer, self.layer),
}
}
/// Optionally add a new layer `T` into the [`ServiceBuilder`].
///
/// ```
/// # use std::time::Duration;
/// # use tower::Service;
/// # use tower::builder::ServiceBuilder;
/// # use tower::timeout::TimeoutLayer;
/// # async fn wrap<S>(svc: S) where S: Service<(), Error = &'static str> + 'static + Send, S::Future: Send {
/// # let timeout = Some(Duration::new(10, 0));
/// // Apply a timeout if configured
/// ServiceBuilder::new()
/// .option_layer(timeout.map(TimeoutLayer::new))
/// .service(svc)
/// # ;
/// # }
/// ```
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn option_layer<T>(
self,
layer: Option<T>,
) -> ServiceBuilder<Stack<crate::util::Either<T, Identity>, L>> {
self.layer(crate::util::option_layer(layer))
}
/// Add a [`Layer`] built from a function that accepts a service and returns another service.
///
/// See the documentation for [`layer_fn`] for more details.
///
/// [`layer_fn`]: crate::layer::layer_fn
pub fn layer_fn<F>(self, f: F) -> ServiceBuilder<Stack<crate::layer::LayerFn<F>, L>> {
self.layer(crate::layer::layer_fn(f))
}
/// Buffer requests when the next layer is not ready.
///
/// This wraps the inner service with an instance of the [`Buffer`]
/// middleware.
///
/// [`Buffer`]: crate::buffer
#[cfg(feature = "buffer")]
#[cfg_attr(docsrs, doc(cfg(feature = "buffer")))]
pub fn buffer<Request>(
self,
bound: usize,
) -> ServiceBuilder<Stack<crate::buffer::BufferLayer<Request>, L>> {
self.layer(crate::buffer::BufferLayer::new(bound))
}
/// Limit the max number of in-flight requests.
///
/// A request is in-flight from the time the request is received until the
/// response future completes. This includes the time spent in the next
/// layers.
///
/// This wraps the inner service with an instance of the
/// [`ConcurrencyLimit`] middleware.
///
/// [`ConcurrencyLimit`]: crate::limit::concurrency
#[cfg(feature = "limit")]
#[cfg_attr(docsrs, doc(cfg(feature = "limit")))]
pub fn concurrency_limit(
self,
max: usize,
) -> ServiceBuilder<Stack<crate::limit::ConcurrencyLimitLayer, L>> {
self.layer(crate::limit::ConcurrencyLimitLayer::new(max))
}
/// Drop requests when the next layer is unable to respond to requests.
///
/// Usually, when a service or middleware does not have capacity to process a
/// request (i.e., [`poll_ready`] returns [`Pending`]), the caller waits until
/// capacity becomes available.
///
/// [`LoadShed`] immediately responds with an error when the next layer is
/// out of capacity.
///
/// This wraps the inner service with an instance of the [`LoadShed`]
/// middleware.
///
/// [`LoadShed`]: crate::load_shed
/// [`poll_ready`]: crate::Service::poll_ready
/// [`Pending`]: std::task::Poll::Pending
#[cfg(feature = "load-shed")]
#[cfg_attr(docsrs, doc(cfg(feature = "load-shed")))]
pub fn load_shed(self) -> ServiceBuilder<Stack<crate::load_shed::LoadShedLayer, L>> {
self.layer(crate::load_shed::LoadShedLayer::new())
}
/// Limit requests to at most `num` per the given duration.
///
/// This wraps the inner service with an instance of the [`RateLimit`]
/// middleware.
///
/// [`RateLimit`]: crate::limit::rate
#[cfg(feature = "limit")]
#[cfg_attr(docsrs, doc(cfg(feature = "limit")))]
pub fn rate_limit(
self,
num: u64,
per: std::time::Duration,
) -> ServiceBuilder<Stack<crate::limit::RateLimitLayer, L>> {
self.layer(crate::limit::RateLimitLayer::new(num, per))
}
/// Retry failed requests according to the given [retry policy][policy].
///
/// `policy` determines which failed requests will be retried. It must
/// implement the [`retry::Policy`][policy] trait.
///
/// This wraps the inner service with an instance of the [`Retry`]
/// middleware.
///
/// [`Retry`]: crate::retry
/// [policy]: crate::retry::Policy
#[cfg(feature = "retry")]
#[cfg_attr(docsrs, doc(cfg(feature = "retry")))]
pub fn retry<P>(self, policy: P) -> ServiceBuilder<Stack<crate::retry::RetryLayer<P>, L>> {
self.layer(crate::retry::RetryLayer::new(policy))
}
/// Fail requests that take longer than `timeout`.
///
/// If the next layer takes more than `timeout` to respond to a request,
/// processing is terminated and an error is returned.
///
/// This wraps the inner service with an instance of the [`timeout`]
/// middleware.
///
/// [`timeout`]: crate::timeout
#[cfg(feature = "timeout")]
#[cfg_attr(docsrs, doc(cfg(feature = "timeout")))]
pub fn timeout(
self,
timeout: std::time::Duration,
) -> ServiceBuilder<Stack<crate::timeout::TimeoutLayer, L>> {
self.layer(crate::timeout::TimeoutLayer::new(timeout))
}
/// Conditionally reject requests based on `predicate`.
///
/// `predicate` must implement the [`Predicate`] trait.
///
/// This wraps the inner service with an instance of the [`Filter`]
/// middleware.
///
/// [`Filter`]: crate::filter
/// [`Predicate`]: crate::filter::Predicate
#[cfg(feature = "filter")]
#[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
pub fn filter<P>(
self,
predicate: P,
) -> ServiceBuilder<Stack<crate::filter::FilterLayer<P>, L>> {
self.layer(crate::filter::FilterLayer::new(predicate))
}
/// Conditionally reject requests based on an asynchronous `predicate`.
///
/// `predicate` must implement the [`AsyncPredicate`] trait.
///
/// This wraps the inner service with an instance of the [`AsyncFilter`]
/// middleware.
///
/// [`AsyncFilter`]: crate::filter::AsyncFilter
/// [`AsyncPredicate`]: crate::filter::AsyncPredicate
#[cfg(feature = "filter")]
#[cfg_attr(docsrs, doc(cfg(feature = "filter")))]
pub fn filter_async<P>(
self,
predicate: P,
) -> ServiceBuilder<Stack<crate::filter::AsyncFilterLayer<P>, L>> {
self.layer(crate::filter::AsyncFilterLayer::new(predicate))
}
/// Map one request type to another.
///
/// This wraps the inner service with an instance of the [`MapRequest`]
/// middleware.
///
/// # Examples
///
/// Changing the type of a request:
///
/// ```rust
/// use tower::ServiceBuilder;
/// use tower::ServiceExt;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), ()> {
/// // Suppose we have some `Service` whose request type is `String`:
/// let string_svc = tower::service_fn(|request: String| async move {
/// println!("request: {}", request);
/// Ok(())
/// });
///
/// // ...but we want to call that service with a `usize`. What do we do?
///
/// let usize_svc = ServiceBuilder::new()
/// // Add a middlware that converts the request type to a `String`:
/// .map_request(|request: usize| format!("{}", request))
/// // ...and wrap the string service with that middleware:
/// .service(string_svc);
///
/// // Now, we can call that service with a `usize`:
/// usize_svc.oneshot(42).await?;
/// # Ok(())
/// # }
/// ```
///
/// Modifying the request value:
///
/// ```rust
/// use tower::ServiceBuilder;
/// use tower::ServiceExt;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), ()> {
/// // A service that takes a number and returns it:
/// let svc = tower::service_fn(|request: usize| async move {
/// Ok(request)
/// });
///
/// let svc = ServiceBuilder::new()
/// // Add a middleware that adds 1 to each request
/// .map_request(|request: usize| request + 1)
/// .service(svc);
///
/// let response = svc.oneshot(1).await?;
/// assert_eq!(response, 2);
/// # Ok(())
/// # }
/// ```
///
/// [`MapRequest`]: crate::util::MapRequest
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn map_request<F, R1, R2>(
self,
f: F,
) -> ServiceBuilder<Stack<crate::util::MapRequestLayer<F>, L>>
where
F: FnMut(R1) -> R2 + Clone,
{
self.layer(crate::util::MapRequestLayer::new(f))
}
/// Map one response type to another.
///
/// This wraps the inner service with an instance of the [`MapResponse`]
/// middleware.
///
/// See the documentation for the [`map_response` combinator] for details.
///
/// [`MapResponse`]: crate::util::MapResponse
/// [`map_response` combinator]: crate::util::ServiceExt::map_response
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn map_response<F>(
self,
f: F,
) -> ServiceBuilder<Stack<crate::util::MapResponseLayer<F>, L>> {
self.layer(crate::util::MapResponseLayer::new(f))
}
/// Map one error type to another.
///
/// This wraps the inner service with an instance of the [`MapErr`]
/// middleware.
///
/// See the documentation for the [`map_err` combinator] for details.
///
/// [`MapErr`]: crate::util::MapErr
/// [`map_err` combinator]: crate::util::ServiceExt::map_err
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn map_err<F>(self, f: F) -> ServiceBuilder<Stack<crate::util::MapErrLayer<F>, L>> {
self.layer(crate::util::MapErrLayer::new(f))
}
/// Composes a function that transforms futures produced by the service.
///
/// This wraps the inner service with an instance of the [`MapFutureLayer`] middleware.
///
/// See the documentation for the [`map_future`] combinator for details.
///
/// [`MapFutureLayer`]: crate::util::MapFutureLayer
/// [`map_future`]: crate::util::ServiceExt::map_future
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn map_future<F>(self, f: F) -> ServiceBuilder<Stack<crate::util::MapFutureLayer<F>, L>> {
self.layer(crate::util::MapFutureLayer::new(f))
}
/// Apply an asynchronous function after the service, regardless of whether the future
/// succeeds or fails.
///
/// This wraps the inner service with an instance of the [`Then`]
/// middleware.
///
/// This is similar to the [`map_response`] and [`map_err`] functions,
/// except that the *same* function is invoked when the service's future
/// completes, whether it completes successfully or fails. This function
/// takes the [`Result`] returned by the service's future, and returns a
/// [`Result`].
///
/// See the documentation for the [`then` combinator] for details.
///
/// [`Then`]: crate::util::Then
/// [`then` combinator]: crate::util::ServiceExt::then
/// [`map_response`]: ServiceBuilder::map_response
/// [`map_err`]: ServiceBuilder::map_err
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn then<F>(self, f: F) -> ServiceBuilder<Stack<crate::util::ThenLayer<F>, L>> {
self.layer(crate::util::ThenLayer::new(f))
}
/// Executes a new future after this service's future resolves. This does
/// not alter the behaviour of the [`poll_ready`] method.
///
/// This method can be used to change the [`Response`] type of the service
/// into a different type. You can use this method to chain along a computation once the
/// service's response has been resolved.
///
/// This wraps the inner service with an instance of the [`AndThen`]
/// middleware.
///
/// See the documentation for the [`and_then` combinator] for details.
///
/// [`Response`]: crate::Service::Response
/// [`poll_ready`]: crate::Service::poll_ready
/// [`and_then` combinator]: crate::util::ServiceExt::and_then
/// [`AndThen`]: crate::util::AndThen
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn and_then<F>(self, f: F) -> ServiceBuilder<Stack<crate::util::AndThenLayer<F>, L>> {
self.layer(crate::util::AndThenLayer::new(f))
}
/// Maps this service's result type (`Result<Self::Response, Self::Error>`)
/// to a different value, regardless of whether the future succeeds or
/// fails.
///
/// This wraps the inner service with an instance of the [`MapResult`]
/// middleware.
///
/// See the documentation for the [`map_result` combinator] for details.
///
/// [`map_result` combinator]: crate::util::ServiceExt::map_result
/// [`MapResult`]: crate::util::MapResult
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn map_result<F>(self, f: F) -> ServiceBuilder<Stack<crate::util::MapResultLayer<F>, L>> {
self.layer(crate::util::MapResultLayer::new(f))
}
/// Returns the underlying `Layer` implementation.
pub fn into_inner(self) -> L {
self.layer
}
/// Wrap the service `S` with the middleware provided by this
/// [`ServiceBuilder`]'s [`Layer`]'s, returning a new [`Service`].
///
/// [`Layer`]: crate::Layer
/// [`Service`]: crate::Service
pub fn service<S>(&self, service: S) -> L::Service
where
L: Layer<S>,
{
self.layer.layer(service)
}
/// Wrap the async function `F` with the middleware provided by this [`ServiceBuilder`]'s
/// [`Layer`]s, returning a new [`Service`].
///
/// This is a convenience method which is equivalent to calling
/// [`ServiceBuilder::service`] with a [`service_fn`], like this:
///
/// ```rust
/// # use tower::{ServiceBuilder, service_fn};
/// # async fn handler_fn(_: ()) -> Result<(), ()> { Ok(()) }
/// # let _ = {
/// ServiceBuilder::new()
/// // ...
/// .service(service_fn(handler_fn))
/// # };
/// ```
///
/// # Example
///
/// ```rust
/// use std::time::Duration;
/// use tower::{ServiceBuilder, ServiceExt, BoxError, service_fn};
///
/// # #[tokio::main]
/// # async fn main() -> Result<(), BoxError> {
/// async fn handle(request: &'static str) -> Result<&'static str, BoxError> {
/// Ok(request)
/// }
///
/// let svc = ServiceBuilder::new()
/// .buffer(1024)
/// .timeout(Duration::from_secs(10))
/// .service_fn(handle);
///
/// let response = svc.oneshot("foo").await?;
///
/// assert_eq!(response, "foo");
/// # Ok(())
/// # }
/// ```
///
/// [`Layer`]: crate::Layer
/// [`Service`]: crate::Service
/// [`service_fn`]: crate::service_fn
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn service_fn<F>(self, f: F) -> L::Service
where
L: Layer<crate::util::ServiceFn<F>>,
{
self.service(crate::util::service_fn(f))
}
/// Check that the builder implements `Clone`.
///
/// This can be useful when debugging type errors in `ServiceBuilder`s with lots of layers.
///
/// Doesn't actually change the builder but serves as a type check.
///
/// # Example
///
/// ```rust
/// use tower::ServiceBuilder;
///
/// let builder = ServiceBuilder::new()
/// // Do something before processing the request
/// .map_request(|request: String| {
/// println!("got request!");
/// request
/// })
/// // Ensure our `ServiceBuilder` can be cloned
/// .check_clone()
/// // Do something after processing the request
/// .map_response(|response: String| {
/// println!("got response!");
/// response
/// });
/// ```
#[inline]
pub fn check_clone(self) -> Self
where
Self: Clone,
{
self
}
/// Check that the builder when given a service of type `S` produces a service that implements
/// `Clone`.
///
/// This can be useful when debugging type errors in `ServiceBuilder`s with lots of layers.
///
/// Doesn't actually change the builder but serves as a type check.
///
/// # Example
///
/// ```rust
/// use tower::ServiceBuilder;
///
/// # #[derive(Clone)]
/// # struct MyService;
/// #
/// let builder = ServiceBuilder::new()
/// // Do something before processing the request
/// .map_request(|request: String| {
/// println!("got request!");
/// request
/// })
/// // Ensure that the service produced when given a `MyService` implements
/// .check_service_clone::<MyService>()
/// // Do something after processing the request
/// .map_response(|response: String| {
/// println!("got response!");
/// response
/// });
/// ```
#[inline]
pub fn check_service_clone<S>(self) -> Self
where
L: Layer<S>,
L::Service: Clone,
{
self
}
/// Check that the builder when given a service of type `S` produces a service with the given
/// request, response, and error types.
///
/// This can be useful when debugging type errors in `ServiceBuilder`s with lots of layers.
///
/// Doesn't actually change the builder but serves as a type check.
///
/// # Example
///
/// ```rust
/// use tower::ServiceBuilder;
/// use std::task::{Poll, Context};
/// use tower::{Service, ServiceExt};
///
/// // An example service
/// struct MyService;
///
/// impl Service<Request> for MyService {
/// type Response = Response;
/// type Error = Error;
/// type Future = futures_util::future::Ready<Result<Response, Error>>;
///
/// fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// // ...
/// # todo!()
/// }
///
/// fn call(&mut self, request: Request) -> Self::Future {
/// // ...
/// # todo!()
/// }
/// }
///
/// struct Request;
/// struct Response;
/// struct Error;
///
/// struct WrappedResponse(Response);
///
/// let builder = ServiceBuilder::new()
/// // At this point in the builder if given a `MyService` it produces a service that
/// // accepts `Request`s, produces `Response`s, and fails with `Error`s
/// .check_service::<MyService, Request, Response, Error>()
/// // Wrap responses in `WrappedResponse`
/// .map_response(|response: Response| WrappedResponse(response))
/// // Now the response type will be `WrappedResponse`
/// .check_service::<MyService, _, WrappedResponse, _>();
/// ```
#[inline]
pub fn check_service<S, T, U, E>(self) -> Self
where
L: Layer<S>,
L::Service: Service<T, Response = U, Error = E>,
{
self
}
/// This wraps the inner service with the [`Layer`] returned by [`BoxService::layer()`].
///
/// See that method for more details.
///
/// # Example
///
/// ```
/// use tower::{Service, ServiceBuilder, BoxError, util::BoxService};
/// use std::time::Duration;
/// #
/// # struct Request;
/// # struct Response;
/// # impl Response {
/// # fn new() -> Self { Self }
/// # }
///
/// let service: BoxService<Request, Response, BoxError> = ServiceBuilder::new()
/// .boxed()
/// .load_shed()
/// .concurrency_limit(64)
/// .timeout(Duration::from_secs(10))
/// .service_fn(|req: Request| async {
/// Ok::<_, BoxError>(Response::new())
/// });
/// # let service = assert_service(service);
/// # fn assert_service<S, R>(svc: S) -> S
/// # where S: Service<R> { svc }
/// ```
///
/// [`BoxService::layer()`]: crate::util::BoxService::layer()
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn boxed<S, R>(
self,
) -> ServiceBuilder<
Stack<
tower_layer::LayerFn<
fn(
L::Service,
) -> crate::util::BoxService<
R,
<L::Service as Service<R>>::Response,
<L::Service as Service<R>>::Error,
>,
>,
L,
>,
>
where
L: Layer<S>,
L::Service: Service<R> + Send + 'static,
<L::Service as Service<R>>::Future: Send + 'static,
{
self.layer(crate::util::BoxService::layer())
}
/// This wraps the inner service with the [`Layer`] returned by [`BoxCloneService::layer()`].
///
/// This is similar to the [`boxed`] method, but it requires that `Self` implement
/// [`Clone`], and the returned boxed service implements [`Clone`].
///
/// See [`BoxCloneService`] for more details.
///
/// # Example
///
/// ```
/// use tower::{Service, ServiceBuilder, BoxError, util::BoxCloneService};
/// use std::time::Duration;
/// #
/// # struct Request;
/// # struct Response;
/// # impl Response {
/// # fn new() -> Self { Self }
/// # }
///
/// let service: BoxCloneService<Request, Response, BoxError> = ServiceBuilder::new()
/// .boxed_clone()
/// .load_shed()
/// .concurrency_limit(64)
/// .timeout(Duration::from_secs(10))
/// .service_fn(|req: Request| async {
/// Ok::<_, BoxError>(Response::new())
/// });
/// # let service = assert_service(service);
///
/// // The boxed service can still be cloned.
/// service.clone();
/// # fn assert_service<S, R>(svc: S) -> S
/// # where S: Service<R> { svc }
/// ```
///
/// [`BoxCloneService::layer()`]: crate::util::BoxCloneService::layer()
/// [`BoxCloneService`]: crate::util::BoxCloneService
/// [`boxed`]: Self::boxed
#[cfg(feature = "util")]
#[cfg_attr(docsrs, doc(cfg(feature = "util")))]
pub fn boxed_clone<S, R>(
self,
) -> ServiceBuilder<
Stack<
tower_layer::LayerFn<
fn(
L::Service,
) -> crate::util::BoxCloneService<
R,
<L::Service as Service<R>>::Response,
<L::Service as Service<R>>::Error,
>,
>,
L,
>,
>
where
L: Layer<S>,
L::Service: Service<R> + Clone + Send + 'static,
<L::Service as Service<R>>::Future: Send + 'static,
{
self.layer(crate::util::BoxCloneService::layer())
}
}
impl<L: fmt::Debug> fmt::Debug for ServiceBuilder<L> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ServiceBuilder").field(&self.layer).finish()
}
}
impl<S, L> Layer<S> for ServiceBuilder<L>
where
L: Layer<S>,
{
type Service = L::Service;
fn layer(&self, inner: S) -> Self::Service {
self.layer.layer(inner)
}
}

View file

@ -0,0 +1,12 @@
use std::{error::Error, fmt};
#[derive(Debug)]
pub enum Never {}
impl fmt::Display for Never {
fn fmt(&self, _: &mut fmt::Formatter) -> fmt::Result {
match *self {}
}
}
impl Error for Never {}

View file

@ -0,0 +1,61 @@
use super::{error::Never, Change};
use futures_core::Stream;
use pin_project_lite::pin_project;
use std::iter::{Enumerate, IntoIterator};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tower_service::Service;
pin_project! {
/// Static service discovery based on a predetermined list of services.
///
/// [`ServiceList`] is created with an initial list of services. The discovery
/// process will yield this list once and do nothing after.
#[derive(Debug)]
pub struct ServiceList<T>
where
T: IntoIterator,
{
inner: Enumerate<T::IntoIter>,
}
}
impl<T, U> ServiceList<T>
where
T: IntoIterator<Item = U>,
{
#[allow(missing_docs)]
pub fn new<Request>(services: T) -> ServiceList<T>
where
U: Service<Request>,
{
ServiceList {
inner: services.into_iter().enumerate(),
}
}
}
impl<T, U> Stream for ServiceList<T>
where
T: IntoIterator<Item = U>,
{
type Item = Result<Change<usize, U>, Never>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().inner.next() {
Some((i, service)) => Poll::Ready(Some(Ok(Change::Insert(i, service)))),
None => Poll::Ready(None),
}
}
}
// check that List can be directly over collections
#[cfg(test)]
#[allow(dead_code)]
type ListVecTest<T> = ServiceList<Vec<T>>;
#[cfg(test)]
#[allow(dead_code)]
type ListVecIterTest<T> = ServiceList<::std::vec::IntoIter<T>>;

View file

@ -0,0 +1,52 @@
mod error;
mod list;
pub use self::list::ServiceList;
use crate::sealed::Sealed;
use futures_core::TryStream;
use std::{
pin::Pin,
task::{Context, Poll},
};
pub trait Discover {
type Key: Eq;
type Service;
type Error;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> (Poll<Option<Result<Change<Self::Key, Self::Service>, Self::Error>>>);
}
impl<K, S, E, D: ?Sized> Sealed<Change<(), ()>> for D
where
D: TryStream<Ok = Change<K, S>, Error = E>,
K: Eq,
{
}
impl<K, S, E, D: ?Sized> Discover for D
where
D: TryStream<Ok = Change<K, S>, Error = E>,
K: Eq,
{
type Key = K;
type Service = S;
type Error = E;
fn poll_discover(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<D::Ok, D::Error>>> {
todo!()
}
}
/// A change in the service set.
#[derive(Debug)]
pub enum Change<K, V> {
Insert(K, V),
Remove(K),
}

14
tower/tower/src/layer.rs Normal file
View file

@ -0,0 +1,14 @@
//! A collection of [`Layer`] based tower services
//!
//! [`Layer`]: crate::Layer
pub use tower_layer::{layer_fn, Layer, LayerFn};
/// Utilities for combining layers
///
/// [`Identity`]: crate::layer::util::Identity
/// [`Layer`]: crate::Layer
/// [`Stack`]: crate::layer::util::Stack
pub mod util {
pub use tower_layer::{Identity, Stack};
}

51
tower/tower/src/lib.rs Normal file
View file

@ -0,0 +1,51 @@
#![warn(
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
unreachable_pub
)]
#![forbid(unsafe_code)]
#![allow(elided_lifetimes_in_paths, clippy::type_complexity)]
#![cfg_attr(test, allow(clippy::float_cmp))]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![allow(warnings)]
#[macro_use]
pub(crate) mod macros;
#[cfg(feature = "balance")]
#[cfg_attr(docsrs, doc(cfg(feature = "balance")))]
pub mod balance;
#[cfg(feature = "discover")]
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
pub mod discover;
#[cfg(feature = "load")]
#[cfg_attr(docsrs, doc(cfg(feature = "load")))]
pub mod load;
#[cfg(feature = "make")]
#[cfg_attr(docsrs, doc(cfg(feature = "make")))]
pub mod make;
pub mod builder;
pub mod layer;
#[doc(inline)]
pub use crate::builder::ServiceBuilder;
#[cfg(feature = "make")]
#[cfg_attr(docsrs, doc(cfg(feature = "make")))]
#[doc(inline)]
pub use crate::make::MakeService;
#[allow(unreachable_pub)]
mod sealed {
pub trait Sealed<T> {}
}
/// Alias for a type-erased error type.
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;

View file

@ -0,0 +1,95 @@
//! Application-specific request completion semantics.
use futures_core::ready;
use pin_project_lite::pin_project;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
/// Attaches `H`-typed completion tracker to `V` typed values.
///
/// Handles (of type `H`) are intended to be RAII guards that primarily implement [`Drop`] and update
/// load metric state as they are dropped. This trait allows implementors to "forward" the handle
/// to later parts of the request-handling pipeline, so that the handle is only dropped when the
/// request has truly completed.
///
/// This utility allows load metrics to have a protocol-agnostic means to track streams past their
/// initial response future. For example, if `V` represents an HTTP response type, an
/// implementation could add `H`-typed handles to each response's extensions to detect when all the
/// response's extensions have been dropped.
///
/// A base `impl<H, V> TrackCompletion<H, V> for CompleteOnResponse` is provided to drop the handle
/// once the response future is resolved. This is appropriate when a response is discrete and
/// cannot comprise multiple messages.
///
/// In many cases, the `Output` type is simply `V`. However, [`TrackCompletion`] may alter the type
/// in order to instrument it appropriately. For example, an HTTP [`TrackCompletion`] may modify
/// the body type: so a [`TrackCompletion`] that takes values of type
/// [`http::Response<A>`][response] may output values of type [`http::Response<B>`][response].
///
/// [response]: https://docs.rs/http/latest/http/response/struct.Response.html
pub trait TrackCompletion<H, V>: Clone {
/// The instrumented value type.
type Output;
/// Attaches a `H`-typed handle to a `V`-typed value.
fn track_completion(&self, handle: H, value: V) -> Self::Output;
}
/// A [`TrackCompletion`] implementation that considers the request completed when the response
/// future is resolved.
#[derive(Clone, Copy, Debug, Default)]
#[non_exhaustive]
pub struct CompleteOnResponse;
pin_project! {
/// Attaches a `C`-typed completion tracker to the result of an `F`-typed [`Future`].
#[derive(Debug)]
pub struct TrackCompletionFuture<F, C, H> {
#[pin]
future: F,
handle: Option<H>,
completion: C,
}
}
// ===== impl InstrumentFuture =====
impl<F, C, H> TrackCompletionFuture<F, C, H> {
/// Wraps a future, propagating the tracker into its value if successful.
pub fn new(completion: C, handle: H, future: F) -> Self {
TrackCompletionFuture {
future,
completion,
handle: Some(handle),
}
}
}
impl<F, C, H, T, E> Future for TrackCompletionFuture<F, C, H>
where
F: Future<Output = Result<T, E>>,
C: TrackCompletion<H, T>,
{
type Output = Result<C::Output, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let rsp = ready!(this.future.poll(cx))?;
let h = this.handle.take().expect("handle");
Poll::Ready(Ok(this.completion.track_completion(h, rsp)))
}
}
// ===== CompleteOnResponse =====
impl<H, V> TrackCompletion<H, V> for CompleteOnResponse {
type Output = V;
fn track_completion(&self, handle: H, value: V) -> V {
drop(handle);
value
}
}

View file

@ -0,0 +1,80 @@
//! A constant [`Load`] implementation.
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use std::pin::Pin;
use super::Load;
use pin_project_lite::pin_project;
use std::task::{Context, Poll};
use tower_service::Service;
pin_project! {
#[derive(Debug)]
/// Wraps a type so that it implements [`Load`] and returns a constant load metric.
///
/// This load estimator is primarily useful for testing.
pub struct Constant<T, M> {
inner: T,
load: M,
}
}
// ===== impl Constant =====
impl<T, M: Copy> Constant<T, M> {
/// Wraps a `T`-typed service with a constant `M`-typed load metric.
pub fn new(inner: T, load: M) -> Self {
Self { inner, load }
}
}
impl<T, M: Copy + PartialOrd> Load for Constant<T, M> {
type Metric = M;
fn load(&self) -> M {
self.load
}
}
impl<S, M, Request> Service<Request> for Constant<S, M>
where
S: Service<Request>,
M: Copy,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
self.inner.call(req)
}
}
/// Proxies [`Discover`] such that all changes are wrapped with a constant load.
#[cfg(feature = "discover")]
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
impl<D: Discover + Unpin, M: Copy> Stream for Constant<D, M> {
type Item = Result<Change<D::Key, Constant<D::Service, M>>, D::Error>;
/// Yields the next discovery change set.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use self::Change::*;
let this = self.project();
let change = match ready!(Pin::new(this.inner).poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Insert(k, svc)) => Insert(k, Constant::new(svc, *this.load)),
Some(Remove(k)) => Remove(k),
};
Poll::Ready(Some(Ok(change)))
}
}

View file

@ -0,0 +1,89 @@
//! Service load measurement
//!
//! This module provides the [`Load`] trait, which allows measuring how loaded a service is.
//! It also provides several wrapper types that measure load in different ways:
//!
//! - [`Constant`] — Always returns the same constant load value for a service.
//! - [`PendingRequests`] — Measures load by tracking the number of in-flight requests.
//! - [`PeakEwma`] — Measures load using a moving average of the peak latency for the service.
//!
//! In general, you will want to use one of these when using the types in [`tower::balance`] which
//! balance services depending on their load. Which load metric to use depends on your exact
//! use-case, but the ones above should get you quite far!
//!
//! When the `discover` feature is enabled, wrapper types for [`Discover`] that
//! wrap the discovered services with the given load estimator are also provided.
//!
//! # When does a request complete?
//!
//! For many applications, the request life-cycle is relatively simple: when a service responds to
//! a request, that request is done, and the system can forget about it. However, for some
//! applications, the service may respond to the initial request while other parts of the system
//! are still acting on that request. In such an application, the system load must take these
//! requests into account as well, or risk the system underestimating its own load.
//!
//! To support these use-cases, the load estimators in this module are parameterized by the
//! [`TrackCompletion`] trait, with [`CompleteOnResponse`] as the default type. The behavior of
//! [`CompleteOnResponse`] is what you would normally expect for a request-response cycle: when the
//! response is produced, the request is considered "finished", and load goes down. This can be
//! overriden by your own user-defined type to track more complex request completion semantics. See
//! the documentation for [`completion`] for more details.
//!
//! # Examples
//!
//! ```rust
//! # #[cfg(feature = "util")]
//! use tower::util::ServiceExt;
//! # #[cfg(feature = "util")]
//! use tower::{load::Load, Service};
//! # #[cfg(feature = "util")]
//! async fn simple_balance<S1, S2, R>(
//! svc1: &mut S1,
//! svc2: &mut S2,
//! request: R
//! ) -> Result<S1::Response, S1::Error>
//! where
//! S1: Load + Service<R>,
//! S2: Load<Metric = S1::Metric> + Service<R, Response = S1::Response, Error = S1::Error>
//! {
//! if svc1.load() < svc2.load() {
//! svc1.ready().await?.call(request).await
//! } else {
//! svc2.ready().await?.call(request).await
//! }
//! }
//! ```
//!
//! [`tower::balance`]: crate::balance
//! [`Discover`]: crate::discover::Discover
//! [`CompleteOnResponse`]: crate::load::completion::CompleteOnResponse
// TODO: a custom completion example would be good here
pub mod completion;
mod constant;
pub mod peak_ewma;
pub mod pending_requests;
pub use self::{
completion::{CompleteOnResponse, TrackCompletion},
constant::Constant,
peak_ewma::PeakEwma,
pending_requests::PendingRequests,
};
#[cfg(feature = "discover")]
pub use self::{peak_ewma::PeakEwmaDiscover, pending_requests::PendingRequestsDiscover};
/// Types that implement this trait can give an estimate of how loaded they are.
///
/// See the module documentation for more details.
pub trait Load {
/// A comparable load metric.
///
/// Lesser values indicate that the service is less loaded, and should be preferred for new
/// requests over another service with a higher value.
type Metric: PartialOrd;
/// Estimate the service's current load.
fn load(&self) -> Self::Metric;
}

View file

@ -0,0 +1,407 @@
//! A `Load` implementation that measures load using the PeakEWMA response latency.
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use pin_project_lite::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::Load;
use std::task::{Context, Poll};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::time::Instant;
use tower_service::Service;
use tracing::trace;
/// Measures the load of the underlying service using Peak-EWMA load measurement.
///
/// [`PeakEwma`] implements [`Load`] with the [`Cost`] metric that estimates the amount of
/// pending work to an endpoint. Work is calculated by multiplying the
/// exponentially-weighted moving average (EWMA) of response latencies by the number of
/// pending requests. The Peak-EWMA algorithm is designed to be especially sensitive to
/// worst-case latencies. Over time, the peak latency value decays towards the moving
/// average of latencies to the endpoint.
///
/// When no latency information has been measured for an endpoint, an arbitrary default
/// RTT of 1 second is used to prevent the endpoint from being overloaded before a
/// meaningful baseline can be established..
///
/// ## Note
///
/// This is derived from [Finagle][finagle], which is distributed under the Apache V2
/// license. Copyright 2017, Twitter Inc.
///
/// [finagle]:
/// https://github.com/twitter/finagle/blob/9cc08d15216497bb03a1cafda96b7266cfbbcff1/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
#[derive(Debug)]
pub struct PeakEwma<S, C = CompleteOnResponse> {
service: S,
decay_ns: f64,
rtt_estimate: Arc<Mutex<RttEstimate>>,
completion: C,
}
#[cfg(feature = "discover")]
pin_project! {
/// Wraps a `D`-typed stream of discovered services with `PeakEwma`.
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
#[derive(Debug)]
pub struct PeakEwmaDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
decay_ns: f64,
default_rtt: Duration,
completion: C,
}
}
/// Represents the relative cost of communicating with a service.
///
/// The underlying value estimates the amount of pending work to a service: the Peak-EWMA
/// latency estimate multiplied by the number of pending requests.
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
pub struct Cost(f64);
/// Tracks an in-flight request and updates the RTT-estimate on Drop.
#[derive(Debug)]
pub struct Handle {
sent_at: Instant,
decay_ns: f64,
rtt_estimate: Arc<Mutex<RttEstimate>>,
}
/// Holds the current RTT estimate and the last time this value was updated.
#[derive(Debug)]
struct RttEstimate {
update_at: Instant,
rtt_ns: f64,
}
const NANOS_PER_MILLI: f64 = 1_000_000.0;
// ===== impl PeakEwma =====
impl<S, C> PeakEwma<S, C> {
/// Wraps an `S`-typed service so that its load is tracked by the EWMA of its peak latency.
pub fn new(service: S, default_rtt: Duration, decay_ns: f64, completion: C) -> Self {
debug_assert!(decay_ns > 0.0, "decay_ns must be positive");
Self {
service,
decay_ns,
rtt_estimate: Arc::new(Mutex::new(RttEstimate::new(nanos(default_rtt)))),
completion,
}
}
fn handle(&self) -> Handle {
Handle {
decay_ns: self.decay_ns,
sent_at: Instant::now(),
rtt_estimate: self.rtt_estimate.clone(),
}
}
}
impl<S, C, Request> Service<Request> for PeakEwma<S, C>
where
S: Service<Request>,
C: TrackCompletion<Handle, S::Response>,
{
type Response = C::Output;
type Error = S::Error;
type Future = TrackCompletionFuture<S::Future, C, Handle>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
TrackCompletionFuture::new(
self.completion.clone(),
self.handle(),
self.service.call(req),
)
}
}
impl<S, C> Load for PeakEwma<S, C> {
type Metric = Cost;
fn load(&self) -> Self::Metric {
let pending = Arc::strong_count(&self.rtt_estimate) as u32 - 1;
// Update the RTT estimate to account for decay since the last update.
// If an estimate has not been established, a default is provided
let estimate = self.update_estimate();
let cost = Cost(estimate * f64::from(pending + 1));
trace!(
"load estimate={:.0}ms pending={} cost={:?}",
estimate / NANOS_PER_MILLI,
pending,
cost,
);
cost
}
}
impl<S, C> PeakEwma<S, C> {
fn update_estimate(&self) -> f64 {
let mut rtt = self.rtt_estimate.lock().expect("peak ewma prior_estimate");
rtt.decay(self.decay_ns)
}
}
// ===== impl PeakEwmaDiscover =====
#[cfg(feature = "discover")]
impl<D, C> PeakEwmaDiscover<D, C> {
/// Wraps a `D`-typed [`Discover`] so that services have a [`PeakEwma`] load metric.
///
/// The provided `default_rtt` is used as the default RTT estimate for newly
/// added services.
///
/// They `decay` value determines over what time period a RTT estimate should
/// decay.
pub fn new<Request>(discover: D, default_rtt: Duration, decay: Duration, completion: C) -> Self
where
D: Discover,
D::Service: Service<Request>,
C: TrackCompletion<Handle, <D::Service as Service<Request>>::Response>,
{
PeakEwmaDiscover {
discover,
decay_ns: nanos(decay),
default_rtt,
completion,
}
}
}
#[cfg(feature = "discover")]
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
impl<D, C> Stream for PeakEwmaDiscover<D, C>
where
D: Discover,
C: Clone,
{
type Item = Result<Change<D::Key, PeakEwma<D::Service, C>>, D::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Change::Remove(k)) => Change::Remove(k),
Some(Change::Insert(k, svc)) => {
let peak_ewma = PeakEwma::new(
svc,
*this.default_rtt,
*this.decay_ns,
this.completion.clone(),
);
Change::Insert(k, peak_ewma)
}
};
Poll::Ready(Some(Ok(change)))
}
}
// ===== impl RttEstimate =====
impl RttEstimate {
fn new(rtt_ns: f64) -> Self {
debug_assert!(0.0 < rtt_ns, "rtt must be positive");
Self {
rtt_ns,
update_at: Instant::now(),
}
}
/// Decays the RTT estimate with a decay period of `decay_ns`.
fn decay(&mut self, decay_ns: f64) -> f64 {
// Updates with a 0 duration so that the estimate decays towards 0.
let now = Instant::now();
self.update(now, now, decay_ns)
}
/// Updates the Peak-EWMA RTT estimate.
///
/// The elapsed time from `sent_at` to `recv_at` is added
fn update(&mut self, sent_at: Instant, recv_at: Instant, decay_ns: f64) -> f64 {
debug_assert!(
sent_at <= recv_at,
"recv_at={:?} after sent_at={:?}",
recv_at,
sent_at
);
let rtt = nanos(recv_at.saturating_duration_since(sent_at));
let now = Instant::now();
debug_assert!(
self.update_at <= now,
"update_at={:?} in the future",
self.update_at
);
self.rtt_ns = if self.rtt_ns < rtt {
// For Peak-EWMA, always use the worst-case (peak) value as the estimate for
// subsequent requests.
trace!(
"update peak rtt={}ms prior={}ms",
rtt / NANOS_PER_MILLI,
self.rtt_ns / NANOS_PER_MILLI,
);
rtt
} else {
// When an RTT is observed that is less than the estimated RTT, we decay the
// prior estimate according to how much time has elapsed since the last
// update. The inverse of the decay is used to scale the estimate towards the
// observed RTT value.
let elapsed = nanos(now.saturating_duration_since(self.update_at));
let decay = (-elapsed / decay_ns).exp();
let recency = 1.0 - decay;
let next_estimate = (self.rtt_ns * decay) + (rtt * recency);
trace!(
"update rtt={:03.0}ms decay={:06.0}ns; next={:03.0}ms",
rtt / NANOS_PER_MILLI,
self.rtt_ns - next_estimate,
next_estimate / NANOS_PER_MILLI,
);
next_estimate
};
self.update_at = now;
self.rtt_ns
}
}
// ===== impl Handle =====
impl Drop for Handle {
fn drop(&mut self) {
let recv_at = Instant::now();
if let Ok(mut rtt) = self.rtt_estimate.lock() {
rtt.update(self.sent_at, recv_at, self.decay_ns);
}
}
}
// ===== impl Cost =====
// Utility that converts durations to nanos in f64.
//
// Due to a lossy transformation, the maximum value that can be represented is ~585 years,
// which, I hope, is more than enough to represent request latencies.
fn nanos(d: Duration) -> f64 {
const NANOS_PER_SEC: u64 = 1_000_000_000;
let n = f64::from(d.subsec_nanos());
let s = d.as_secs().saturating_mul(NANOS_PER_SEC) as f64;
n + s
}
#[cfg(test)]
mod tests {
use futures_util::future;
use std::time::Duration;
use tokio::time;
use tokio_test::{assert_ready, assert_ready_ok, task};
use super::*;
struct Svc;
impl Service<()> for Svc {
type Response = ();
type Error = ();
type Future = future::Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
}
}
/// The default RTT estimate decays, so that new nodes are considered if the
/// default RTT is too high.
#[tokio::test]
async fn default_decay() {
time::pause();
let svc = PeakEwma::new(
Svc,
Duration::from_millis(10),
NANOS_PER_MILLI * 1_000.0,
CompleteOnResponse,
);
let Cost(load) = svc.load();
assert_eq!(load, 10.0 * NANOS_PER_MILLI);
time::advance(Duration::from_millis(100)).await;
let Cost(load) = svc.load();
assert!(9.0 * NANOS_PER_MILLI < load && load < 10.0 * NANOS_PER_MILLI);
time::advance(Duration::from_millis(100)).await;
let Cost(load) = svc.load();
assert!(8.0 * NANOS_PER_MILLI < load && load < 9.0 * NANOS_PER_MILLI);
}
// The default RTT estimate decays, so that new nodes are considered if the default RTT is too
// high.
#[tokio::test]
async fn compound_decay() {
time::pause();
let mut svc = PeakEwma::new(
Svc,
Duration::from_millis(20),
NANOS_PER_MILLI * 1_000.0,
CompleteOnResponse,
);
assert_eq!(svc.load(), Cost(20.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let mut rsp0 = task::spawn(svc.call(()));
assert!(svc.load() > Cost(20.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let mut rsp1 = task::spawn(svc.call(()));
assert!(svc.load() > Cost(40.0 * NANOS_PER_MILLI));
time::advance(Duration::from_millis(100)).await;
let () = assert_ready_ok!(rsp0.poll());
assert_eq!(svc.load(), Cost(400_000_000.0));
time::advance(Duration::from_millis(100)).await;
let () = assert_ready_ok!(rsp1.poll());
assert_eq!(svc.load(), Cost(200_000_000.0));
// Check that values decay as time elapses
time::advance(Duration::from_secs(1)).await;
assert!(svc.load() < Cost(100_000_000.0));
time::advance(Duration::from_secs(10)).await;
assert!(svc.load() < Cost(100_000.0));
}
#[test]
fn nanos() {
assert_eq!(super::nanos(Duration::new(0, 0)), 0.0);
assert_eq!(super::nanos(Duration::new(0, 123)), 123.0);
assert_eq!(super::nanos(Duration::new(1, 23)), 1_000_000_023.0);
assert_eq!(
super::nanos(Duration::new(::std::u64::MAX, 999_999_999)),
18446744074709553000.0
);
}
}

View file

@ -0,0 +1,216 @@
//! A [`Load`] implementation that measures load using the number of in-flight requests.
#[cfg(feature = "discover")]
use crate::discover::{Change, Discover};
#[cfg(feature = "discover")]
use futures_core::{ready, Stream};
#[cfg(feature = "discover")]
use pin_project_lite::pin_project;
#[cfg(feature = "discover")]
use std::pin::Pin;
use super::completion::{CompleteOnResponse, TrackCompletion, TrackCompletionFuture};
use super::Load;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower_service::Service;
/// Measures the load of the underlying service using the number of currently-pending requests.
#[derive(Debug)]
pub struct PendingRequests<S, C = CompleteOnResponse> {
service: S,
ref_count: RefCount,
completion: C,
}
/// Shared between instances of [`PendingRequests`] and [`Handle`] to track active references.
#[derive(Clone, Debug, Default)]
struct RefCount(Arc<()>);
#[cfg(feature = "discover")]
pin_project! {
/// Wraps a `D`-typed stream of discovered services with [`PendingRequests`].
#[cfg_attr(docsrs, doc(cfg(feature = "discover")))]
#[derive(Debug)]
pub struct PendingRequestsDiscover<D, C = CompleteOnResponse> {
#[pin]
discover: D,
completion: C,
}
}
/// Represents the number of currently-pending requests to a given service.
#[derive(Clone, Copy, Debug, Default, PartialOrd, PartialEq, Ord, Eq)]
pub struct Count(usize);
/// Tracks an in-flight request by reference count.
#[derive(Debug)]
pub struct Handle(RefCount);
// ===== impl PendingRequests =====
impl<S, C> PendingRequests<S, C> {
/// Wraps an `S`-typed service so that its load is tracked by the number of pending requests.
pub fn new(service: S, completion: C) -> Self {
Self {
service,
completion,
ref_count: RefCount::default(),
}
}
fn handle(&self) -> Handle {
Handle(self.ref_count.clone())
}
}
impl<S, C> Load for PendingRequests<S, C> {
type Metric = Count;
fn load(&self) -> Count {
// Count the number of references that aren't `self`.
Count(self.ref_count.ref_count() - 1)
}
}
impl<S, C, Request> Service<Request> for PendingRequests<S, C>
where
S: Service<Request>,
C: TrackCompletion<Handle, S::Response>,
{
type Response = C::Output;
type Error = S::Error;
type Future = TrackCompletionFuture<S::Future, C, Handle>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}
fn call(&mut self, req: Request) -> Self::Future {
TrackCompletionFuture::new(
self.completion.clone(),
self.handle(),
self.service.call(req),
)
}
}
// ===== impl PendingRequestsDiscover =====
#[cfg(feature = "discover")]
impl<D, C> PendingRequestsDiscover<D, C> {
/// Wraps a [`Discover`], wrapping all of its services with [`PendingRequests`].
pub fn new<Request>(discover: D, completion: C) -> Self
where
D: Discover,
D::Service: Service<Request>,
C: TrackCompletion<Handle, <D::Service as Service<Request>>::Response>,
{
Self {
discover,
completion,
}
}
}
#[cfg(feature = "discover")]
impl<D, C> Stream for PendingRequestsDiscover<D, C>
where
D: Discover,
C: Clone,
{
type Item = Result<Change<D::Key, PendingRequests<D::Service, C>>, D::Error>;
/// Yields the next discovery change set.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use self::Change::*;
let this = self.project();
let change = match ready!(this.discover.poll_discover(cx)).transpose()? {
None => return Poll::Ready(None),
Some(Insert(k, svc)) => Insert(k, PendingRequests::new(svc, this.completion.clone())),
Some(Remove(k)) => Remove(k),
};
Poll::Ready(Some(Ok(change)))
}
}
// ==== RefCount ====
impl RefCount {
pub(crate) fn ref_count(&self) -> usize {
Arc::strong_count(&self.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_util::future;
use std::task::{Context, Poll};
struct Svc;
impl Service<()> for Svc {
type Response = ();
type Error = ();
type Future = future::Ready<Result<(), ()>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
}
}
#[test]
fn default() {
let mut svc = PendingRequests::new(Svc, CompleteOnResponse);
assert_eq!(svc.load(), Count(0));
let rsp0 = svc.call(());
assert_eq!(svc.load(), Count(1));
let rsp1 = svc.call(());
assert_eq!(svc.load(), Count(2));
let () = tokio_test::block_on(rsp0).unwrap();
assert_eq!(svc.load(), Count(1));
let () = tokio_test::block_on(rsp1).unwrap();
assert_eq!(svc.load(), Count(0));
}
#[test]
fn with_completion() {
#[derive(Clone)]
struct IntoHandle;
impl TrackCompletion<Handle, ()> for IntoHandle {
type Output = Handle;
fn track_completion(&self, i: Handle, (): ()) -> Handle {
i
}
}
let mut svc = PendingRequests::new(Svc, IntoHandle);
assert_eq!(svc.load(), Count(0));
let rsp = svc.call(());
assert_eq!(svc.load(), Count(1));
let i0 = tokio_test::block_on(rsp).unwrap();
assert_eq!(svc.load(), Count(1));
let rsp = svc.call(());
assert_eq!(svc.load(), Count(2));
let i1 = tokio_test::block_on(rsp).unwrap();
assert_eq!(svc.load(), Count(2));
drop(i1);
assert_eq!(svc.load(), Count(1));
drop(i0);
assert_eq!(svc.load(), Count(0));
}
}

42
tower/tower/src/macros.rs Normal file
View file

@ -0,0 +1,42 @@
#[cfg(any(
feature = "util",
feature = "spawn-ready",
feature = "filter",
feature = "make"
))]
macro_rules! opaque_future {
($(#[$m:meta])* pub type $name:ident<$($param:ident),+> = $actual:ty;) => {
pin_project_lite::pin_project! {
$(#[$m])*
pub struct $name<$($param),+> {
#[pin]
inner: $actual
}
}
impl<$($param),+> $name<$($param),+> {
pub(crate) fn new(inner: $actual) -> Self {
Self {
inner
}
}
}
impl<$($param),+> std::fmt::Debug for $name<$($param),+> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple(stringify!($name)).field(&format_args!("...")).finish()
}
}
impl<$($param),+> std::future::Future for $name<$($param),+>
where
$actual: std::future::Future,
{
type Output = <$actual as std::future::Future>::Output;
#[inline]
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
self.project().inner.poll(cx)
}
}
}
}

View file

@ -0,0 +1,47 @@
use crate::sealed::Sealed;
use std::future::Future;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tower_service::Service;
/// The [`MakeConnection`] trait is used to create transports.
///
/// The goal of this service is to allow composable methods for creating
/// `AsyncRead + AsyncWrite` transports. This could mean creating a TLS
/// based connection or using some other method to authenticate the connection.
pub trait MakeConnection<Target>: Sealed<(Target,)> {
/// The transport provided by this service
type Connection: AsyncRead + AsyncWrite;
/// Errors produced by the connecting service
type Error;
/// The future that eventually produces the transport
type Future: Future<Output = Result<Self::Connection, Self::Error>>;
/// Returns `Poll::Ready(Ok(()))` when it is able to make more connections.
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Connect and return a transport asynchronously
fn make_connection(&mut self, target: Target) -> Self::Future;
}
impl<S, Target> Sealed<(Target,)> for S where S: Service<Target> {}
impl<C, Target> MakeConnection<Target> for C
where
C: Service<Target>,
C::Response: AsyncRead + AsyncWrite,
{
type Connection = C::Response;
type Error = C::Error;
type Future = C::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(self, cx)
}
fn make_connection(&mut self, target: Target) -> Self::Future {
Service::call(self, target)
}
}

View file

@ -0,0 +1,150 @@
//! Contains [`MakeService`] which is a trait alias for a [`Service`] of [`Service`]s.
use crate::sealed::Sealed;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::task::{Context, Poll};
use tower_service::Service;
pub(crate) mod shared;
/// Creates new [`Service`] values.
///
/// Acts as a service factory. This is useful for cases where new [`Service`]
/// values must be produced. One case is a TCP server listener. The listener
/// accepts new TCP streams, obtains a new [`Service`] value using the
/// [`MakeService`] trait, and uses that new [`Service`] value to process inbound
/// requests on that new TCP stream.
///
/// This is essentially a trait alias for a [`Service`] of [`Service`]s.
pub trait MakeService<Target, Request> {
type Response;
type Error;
type Service: Service<Request, Response = Self::Response, Error = Self::Error>;
type MakeError;
type Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::MakeError>>;
fn make_service(&mut self, target: Target) -> Self::Future;
}
impl<M, S, Target, Request> Sealed<(Target, Request)> for M
where
M: Service<Target, Response = S>,
S: Service<Request>,
{
}
impl<M, S, Target, Request> MakeService<Target, Request> for M
where
M: Service<Target, Response = S>,
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Service = S;
type MakeError = M::Error;
type Future = M::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::MakeError>> {
Service::poll_ready(self, cx)
}
fn make_service(&mut self, target: Target) -> Self::Future {
Service::call(self, target)
}
}
/// Service returned by [`MakeService::into_service`][into].
///
/// See the documentation on [`into_service`][into] for details.
///
/// [into]: MakeService::into_service
pub struct IntoService<M, Request> {
make: M,
_marker: PhantomData<Request>,
}
impl<M, Request> Clone for IntoService<M, Request>
where
M: Clone,
{
fn clone(&self) -> Self {
Self {
make: self.make.clone(),
_marker: PhantomData,
}
}
}
impl<M, Request> fmt::Debug for IntoService<M, Request>
where
M: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IntoService")
.field("make", &self.make)
.finish()
}
}
impl<M, S, Target, Request> Service<Target> for IntoService<M, Request>
where
M: Service<Target, Response = S>,
S: Service<Request>,
{
type Response = M::Response;
type Error = M::Error;
type Future = M::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.make.poll_ready(cx)
}
#[inline]
fn call(&mut self, target: Target) -> Self::Future {
self.make.make_service(target)
}
}
/// Service returned by [`MakeService::as_service`][as].
///
/// See the documentation on [`as_service`][as] for details.
///
/// [as]: MakeService::as_service
pub struct AsService<'a, M, Request> {
make: &'a mut M,
_marker: PhantomData<Request>,
}
impl<M, Request> fmt::Debug for AsService<'_, M, Request>
where
M: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AsService")
.field("make", &self.make)
.finish()
}
}
impl<M, S, Target, Request> Service<Target> for AsService<'_, M, Request>
where
M: Service<Target, Response = S>,
S: Service<Request>,
{
type Response = M::Response;
type Error = M::Error;
type Future = M::Future;
#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.make.poll_ready(cx)
}
#[inline]
fn call(&mut self, target: Target) -> Self::Future {
self.make.make_service(target)
}
}

View file

@ -0,0 +1,146 @@
use std::convert::Infallible;
use std::task::{Context, Poll};
use tower_service::Service;
/// A [`MakeService`] that produces services by cloning an inner service.
///
/// [`MakeService`]: super::MakeService
///
/// # Example
///
/// ```
/// # use std::task::{Context, Poll};
/// # use std::pin::Pin;
/// # use std::convert::Infallible;
/// use tower::make::{MakeService, Shared};
/// use tower::buffer::Buffer;
/// use tower::Service;
/// use futures::future::{Ready, ready};
///
/// // An example connection type
/// struct Connection {}
///
/// // An example request type
/// struct Request {}
///
/// // An example response type
/// struct Response {}
///
/// // Some service that doesn't implement `Clone`
/// struct MyService;
///
/// impl Service<Request> for MyService {
/// type Response = Response;
/// type Error = Infallible;
/// type Future = Ready<Result<Response, Infallible>>;
///
/// fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
/// Poll::Ready(Ok(()))
/// }
///
/// fn call(&mut self, req: Request) -> Self::Future {
/// ready(Ok(Response {}))
/// }
/// }
///
/// // Example function that runs a service by accepting new connections and using
/// // `Make` to create new services that might be bound to the connection.
/// //
/// // This is similar to what you might find in hyper.
/// async fn serve_make_service<Make>(make: Make)
/// where
/// Make: MakeService<Connection, Request>
/// {
/// // ...
/// }
///
/// # async {
/// // Our service
/// let svc = MyService;
///
/// // Make it `Clone` by putting a channel in front
/// let buffered = Buffer::new(svc, 1024);
///
/// // Convert it into a `MakeService`
/// let make = Shared::new(buffered);
///
/// // Run the service and just ignore the `Connection`s as `MyService` doesn't need them
/// serve_make_service(make).await;
/// # };
/// ```
#[derive(Debug, Clone, Copy)]
pub struct Shared<S> {
service: S,
}
impl<S> Shared<S> {
/// Create a new [`Shared`] from a service.
pub fn new(service: S) -> Self {
Self { service }
}
}
impl<S, T> Service<T> for Shared<S>
where
S: Clone,
{
type Response = S;
type Error = Infallible;
type Future = SharedFuture<S>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _target: T) -> Self::Future {
SharedFuture::new(futures_util::future::ready(Ok(self.service.clone())))
}
}
opaque_future! {
/// Response future from [`Shared`] services.
pub type SharedFuture<S> = futures_util::future::Ready<Result<S, Infallible>>;
}
#[cfg(test)]
mod tests {
use super::*;
use crate::make::MakeService;
use crate::service_fn;
use futures::future::poll_fn;
async fn echo<R>(req: R) -> Result<R, Infallible> {
Ok(req)
}
#[tokio::test]
async fn as_make_service() {
let mut shared = Shared::new(service_fn(echo::<&'static str>));
poll_fn(|cx| MakeService::<(), _>::poll_ready(&mut shared, cx))
.await
.unwrap();
let mut svc = shared.make_service(()).await.unwrap();
poll_fn(|cx| svc.poll_ready(cx)).await.unwrap();
let res = svc.call("foo").await.unwrap();
assert_eq!(res, "foo");
}
#[tokio::test]
async fn as_make_service_into_service() {
let shared = Shared::new(service_fn(echo::<&'static str>));
let mut shared = MakeService::<(), _>::into_service(shared);
poll_fn(|cx| Service::<()>::poll_ready(&mut shared, cx))
.await
.unwrap();
let mut svc = shared.call(()).await.unwrap();
poll_fn(|cx| svc.poll_ready(cx)).await.unwrap();
let res = svc.call("foo").await.unwrap();
assert_eq!(res, "foo");
}
}

View file

@ -0,0 +1,14 @@
//! Trait aliases for Services that produce specific types of Responses.
mod make_connection;
mod make_service;
pub use self::make_connection::MakeConnection;
pub use self::make_service::shared::Shared;
pub use self::make_service::{AsService, IntoService, MakeService};
pub mod future {
//! Future types
pub use super::make_service::shared::SharedFuture;
}