tower/balance/p2c/
service.rs

1use super::super::error;
2use crate::discover::{Change, Discover};
3use crate::load::Load;
4use crate::ready_cache::{error::Failed, ReadyCache};
5use futures_core::ready;
6use futures_util::future::{self, TryFutureExt};
7use pin_project_lite::pin_project;
8use rand::{rngs::SmallRng, Rng, SeedableRng};
9use std::hash::Hash;
10use std::marker::PhantomData;
11use std::{
12    fmt,
13    future::Future,
14    pin::Pin,
15    task::{Context, Poll},
16};
17use tokio::sync::oneshot;
18use tower_service::Service;
19use tracing::{debug, trace};
20
21/// Efficiently distributes requests across an arbitrary number of services.
22///
23/// See the [module-level documentation](..) for details.
24///
25/// Note that [`Balance`] requires that the [`Discover`] you use is [`Unpin`] in order to implement
26/// [`Service`]. This is because it needs to be accessed from [`Service::poll_ready`], which takes
27/// `&mut self`. You can achieve this easily by wrapping your [`Discover`] in [`Box::pin`] before you
28/// construct the [`Balance`] instance. For more details, see [#319].
29///
30/// [`Box::pin`]: std::boxed::Box::pin()
31/// [#319]: https://github.com/tower-rs/tower/issues/319
32pub struct Balance<D, Req>
33where
34    D: Discover,
35    D::Key: Hash,
36{
37    discover: D,
38
39    services: ReadyCache<D::Key, D::Service, Req>,
40    ready_index: Option<usize>,
41
42    rng: SmallRng,
43
44    _req: PhantomData<Req>,
45}
46
47impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
48where
49    D: fmt::Debug,
50    D::Key: Hash + fmt::Debug,
51    D::Service: fmt::Debug,
52{
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("Balance")
55            .field("discover", &self.discover)
56            .field("services", &self.services)
57            .finish()
58    }
59}
60
61pin_project! {
62    /// A Future that becomes satisfied when an `S`-typed service is ready.
63    ///
64    /// May fail due to cancelation, i.e., if [`Discover`] removes the service from the service set.
65    struct UnreadyService<K, S, Req> {
66        key: Option<K>,
67        #[pin]
68        cancel: oneshot::Receiver<()>,
69        service: Option<S>,
70
71        _req: PhantomData<Req>,
72    }
73}
74
75enum Error<E> {
76    Inner(E),
77    Canceled,
78}
79
80impl<D, Req> Balance<D, Req>
81where
82    D: Discover,
83    D::Key: Hash,
84    D::Service: Service<Req>,
85    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
86{
87    /// Constructs a load balancer that uses operating system entropy.
88    pub fn new(discover: D) -> Self {
89        Self::from_rng(discover, &mut rand::thread_rng()).expect("ThreadRNG must be valid")
90    }
91
92    /// Constructs a load balancer seeded with the provided random number generator.
93    pub fn from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error> {
94        let rng = SmallRng::from_rng(rng)?;
95        Ok(Self {
96            rng,
97            discover,
98            services: ReadyCache::default(),
99            ready_index: None,
100
101            _req: PhantomData,
102        })
103    }
104
105    /// Returns the number of endpoints currently tracked by the balancer.
106    pub fn len(&self) -> usize {
107        self.services.len()
108    }
109
110    /// Returns whether or not the balancer is empty.
111    pub fn is_empty(&self) -> bool {
112        self.services.is_empty()
113    }
114}
115
116impl<D, Req> Balance<D, Req>
117where
118    D: Discover + Unpin,
119    D::Key: Hash + Clone,
120    D::Error: Into<crate::BoxError>,
121    D::Service: Service<Req> + Load,
122    <D::Service as Load>::Metric: std::fmt::Debug,
123    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
124{
125    /// Polls `discover` for updates, adding new items to `not_ready`.
126    ///
127    /// Removals may alter the order of either `ready` or `not_ready`.
128    fn update_pending_from_discover(
129        &mut self,
130        cx: &mut Context<'_>,
131    ) -> Poll<Option<Result<(), error::Discover>>> {
132        debug!("updating from discover");
133        loop {
134            match ready!(Pin::new(&mut self.discover).poll_discover(cx))
135                .transpose()
136                .map_err(|e| error::Discover(e.into()))?
137            {
138                None => return Poll::Ready(None),
139                Some(Change::Remove(key)) => {
140                    trace!("remove");
141                    self.services.evict(&key);
142                }
143                Some(Change::Insert(key, svc)) => {
144                    trace!("insert");
145                    // If this service already existed in the set, it will be
146                    // replaced as the new one becomes ready.
147                    self.services.push(key, svc);
148                }
149            }
150        }
151    }
152
153    fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) {
154        loop {
155            match self.services.poll_pending(cx) {
156                Poll::Ready(Ok(())) => {
157                    // There are no remaining pending services.
158                    debug_assert_eq!(self.services.pending_len(), 0);
159                    break;
160                }
161                Poll::Pending => {
162                    // None of the pending services are ready.
163                    debug_assert!(self.services.pending_len() > 0);
164                    break;
165                }
166                Poll::Ready(Err(error)) => {
167                    // An individual service was lost; continue processing
168                    // pending services.
169                    debug!(%error, "dropping failed endpoint");
170                }
171            }
172        }
173        trace!(
174            ready = %self.services.ready_len(),
175            pending = %self.services.pending_len(),
176            "poll_unready"
177        );
178    }
179
180    /// Performs P2C on inner services to find a suitable endpoint.
181    fn p2c_ready_index(&mut self) -> Option<usize> {
182        match self.services.ready_len() {
183            0 => None,
184            1 => Some(0),
185            len => {
186                // Get two distinct random indexes (in a random order) and
187                // compare the loads of the service at each index.
188                let idxs = rand::seq::index::sample(&mut self.rng, len, 2);
189
190                let aidx = idxs.index(0);
191                let bidx = idxs.index(1);
192                debug_assert_ne!(aidx, bidx, "random indices must be distinct");
193
194                let aload = self.ready_index_load(aidx);
195                let bload = self.ready_index_load(bidx);
196                let chosen = if aload <= bload { aidx } else { bidx };
197
198                trace!(
199                    a.index = aidx,
200                    a.load = ?aload,
201                    b.index = bidx,
202                    b.load = ?bload,
203                    chosen = if chosen == aidx { "a" } else { "b" },
204                    "p2c",
205                );
206                Some(chosen)
207            }
208        }
209    }
210
211    /// Accesses a ready endpoint by index and returns its current load.
212    fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
213        let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
214        svc.load()
215    }
216
217    pub(crate) fn discover_mut(&mut self) -> &mut D {
218        &mut self.discover
219    }
220}
221
222impl<D, Req> Service<Req> for Balance<D, Req>
223where
224    D: Discover + Unpin,
225    D::Key: Hash + Clone,
226    D::Error: Into<crate::BoxError>,
227    D::Service: Service<Req> + Load,
228    <D::Service as Load>::Metric: std::fmt::Debug,
229    <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
230{
231    type Response = <D::Service as Service<Req>>::Response;
232    type Error = crate::BoxError;
233    type Future = future::MapErr<
234        <D::Service as Service<Req>>::Future,
235        fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
236    >;
237
238    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
239        // `ready_index` may have already been set by a prior invocation. These
240        // updates cannot disturb the order of existing ready services.
241        let _ = self.update_pending_from_discover(cx)?;
242        self.promote_pending_to_ready(cx);
243
244        loop {
245            // If a service has already been selected, ensure that it is ready.
246            // This ensures that the underlying service is ready immediately
247            // before a request is dispatched to it (i.e. in the same task
248            // invocation). If, e.g., a failure detector has changed the state
249            // of the service, it may be evicted from the ready set so that
250            // another service can be selected.
251            if let Some(index) = self.ready_index.take() {
252                match self.services.check_ready_index(cx, index) {
253                    Ok(true) => {
254                        // The service remains ready.
255                        self.ready_index = Some(index);
256                        return Poll::Ready(Ok(()));
257                    }
258                    Ok(false) => {
259                        // The service is no longer ready. Try to find a new one.
260                        trace!("ready service became unavailable");
261                    }
262                    Err(Failed(_, error)) => {
263                        // The ready endpoint failed, so log the error and try
264                        // to find a new one.
265                        debug!(%error, "endpoint failed");
266                    }
267                }
268            }
269
270            // Select a new service by comparing two at random and using the
271            // lesser-loaded service.
272            self.ready_index = self.p2c_ready_index();
273            if self.ready_index.is_none() {
274                debug_assert_eq!(self.services.ready_len(), 0);
275                // We have previously registered interest in updates from
276                // discover and pending services.
277                return Poll::Pending;
278            }
279        }
280    }
281
282    fn call(&mut self, request: Req) -> Self::Future {
283        let index = self.ready_index.take().expect("called before ready");
284        self.services
285            .call_ready_index(index, request)
286            .map_err(Into::into)
287    }
288}
289
290impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
291    type Output = Result<(K, S), (K, Error<S::Error>)>;
292
293    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
294        let this = self.project();
295
296        if let Poll::Ready(Ok(())) = this.cancel.poll(cx) {
297            let key = this.key.take().expect("polled after ready");
298            return Poll::Ready(Err((key, Error::Canceled)));
299        }
300
301        let res = ready!(this
302            .service
303            .as_mut()
304            .expect("poll after ready")
305            .poll_ready(cx));
306
307        let key = this.key.take().expect("polled after ready");
308        let svc = this.service.take().expect("polled after ready");
309
310        match res {
311            Ok(()) => Poll::Ready(Ok((key, svc))),
312            Err(e) => Poll::Ready(Err((key, Error::Inner(e)))),
313        }
314    }
315}
316
317impl<K, S, Req> fmt::Debug for UnreadyService<K, S, Req>
318where
319    K: fmt::Debug,
320    S: fmt::Debug,
321{
322    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
323        let Self {
324            key,
325            cancel,
326            service,
327            _req,
328        } = self;
329        f.debug_struct("UnreadyService")
330            .field("key", key)
331            .field("cancel", cancel)
332            .field("service", service)
333            .finish()
334    }
335}