1use super::super::error;
2use crate::discover::{Change, Discover};
3use crate::load::Load;
4use crate::ready_cache::{error::Failed, ReadyCache};
5use futures_core::ready;
6use futures_util::future::{self, TryFutureExt};
7use pin_project_lite::pin_project;
8use rand::{rngs::SmallRng, Rng, SeedableRng};
9use std::hash::Hash;
10use std::marker::PhantomData;
11use std::{
12 fmt,
13 future::Future,
14 pin::Pin,
15 task::{Context, Poll},
16};
17use tokio::sync::oneshot;
18use tower_service::Service;
19use tracing::{debug, trace};
20
21pub struct Balance<D, Req>
33where
34 D: Discover,
35 D::Key: Hash,
36{
37 discover: D,
38
39 services: ReadyCache<D::Key, D::Service, Req>,
40 ready_index: Option<usize>,
41
42 rng: SmallRng,
43
44 _req: PhantomData<Req>,
45}
46
47impl<D: Discover, Req> fmt::Debug for Balance<D, Req>
48where
49 D: fmt::Debug,
50 D::Key: Hash + fmt::Debug,
51 D::Service: fmt::Debug,
52{
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("Balance")
55 .field("discover", &self.discover)
56 .field("services", &self.services)
57 .finish()
58 }
59}
60
61pin_project! {
62 struct UnreadyService<K, S, Req> {
66 key: Option<K>,
67 #[pin]
68 cancel: oneshot::Receiver<()>,
69 service: Option<S>,
70
71 _req: PhantomData<Req>,
72 }
73}
74
75enum Error<E> {
76 Inner(E),
77 Canceled,
78}
79
80impl<D, Req> Balance<D, Req>
81where
82 D: Discover,
83 D::Key: Hash,
84 D::Service: Service<Req>,
85 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
86{
87 pub fn new(discover: D) -> Self {
89 Self::from_rng(discover, &mut rand::thread_rng()).expect("ThreadRNG must be valid")
90 }
91
92 pub fn from_rng<R: Rng>(discover: D, rng: R) -> Result<Self, rand::Error> {
94 let rng = SmallRng::from_rng(rng)?;
95 Ok(Self {
96 rng,
97 discover,
98 services: ReadyCache::default(),
99 ready_index: None,
100
101 _req: PhantomData,
102 })
103 }
104
105 pub fn len(&self) -> usize {
107 self.services.len()
108 }
109
110 pub fn is_empty(&self) -> bool {
112 self.services.is_empty()
113 }
114}
115
116impl<D, Req> Balance<D, Req>
117where
118 D: Discover + Unpin,
119 D::Key: Hash + Clone,
120 D::Error: Into<crate::BoxError>,
121 D::Service: Service<Req> + Load,
122 <D::Service as Load>::Metric: std::fmt::Debug,
123 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
124{
125 fn update_pending_from_discover(
129 &mut self,
130 cx: &mut Context<'_>,
131 ) -> Poll<Option<Result<(), error::Discover>>> {
132 debug!("updating from discover");
133 loop {
134 match ready!(Pin::new(&mut self.discover).poll_discover(cx))
135 .transpose()
136 .map_err(|e| error::Discover(e.into()))?
137 {
138 None => return Poll::Ready(None),
139 Some(Change::Remove(key)) => {
140 trace!("remove");
141 self.services.evict(&key);
142 }
143 Some(Change::Insert(key, svc)) => {
144 trace!("insert");
145 self.services.push(key, svc);
148 }
149 }
150 }
151 }
152
153 fn promote_pending_to_ready(&mut self, cx: &mut Context<'_>) {
154 loop {
155 match self.services.poll_pending(cx) {
156 Poll::Ready(Ok(())) => {
157 debug_assert_eq!(self.services.pending_len(), 0);
159 break;
160 }
161 Poll::Pending => {
162 debug_assert!(self.services.pending_len() > 0);
164 break;
165 }
166 Poll::Ready(Err(error)) => {
167 debug!(%error, "dropping failed endpoint");
170 }
171 }
172 }
173 trace!(
174 ready = %self.services.ready_len(),
175 pending = %self.services.pending_len(),
176 "poll_unready"
177 );
178 }
179
180 fn p2c_ready_index(&mut self) -> Option<usize> {
182 match self.services.ready_len() {
183 0 => None,
184 1 => Some(0),
185 len => {
186 let idxs = rand::seq::index::sample(&mut self.rng, len, 2);
189
190 let aidx = idxs.index(0);
191 let bidx = idxs.index(1);
192 debug_assert_ne!(aidx, bidx, "random indices must be distinct");
193
194 let aload = self.ready_index_load(aidx);
195 let bload = self.ready_index_load(bidx);
196 let chosen = if aload <= bload { aidx } else { bidx };
197
198 trace!(
199 a.index = aidx,
200 a.load = ?aload,
201 b.index = bidx,
202 b.load = ?bload,
203 chosen = if chosen == aidx { "a" } else { "b" },
204 "p2c",
205 );
206 Some(chosen)
207 }
208 }
209 }
210
211 fn ready_index_load(&self, index: usize) -> <D::Service as Load>::Metric {
213 let (_, svc) = self.services.get_ready_index(index).expect("invalid index");
214 svc.load()
215 }
216
217 pub(crate) fn discover_mut(&mut self) -> &mut D {
218 &mut self.discover
219 }
220}
221
222impl<D, Req> Service<Req> for Balance<D, Req>
223where
224 D: Discover + Unpin,
225 D::Key: Hash + Clone,
226 D::Error: Into<crate::BoxError>,
227 D::Service: Service<Req> + Load,
228 <D::Service as Load>::Metric: std::fmt::Debug,
229 <D::Service as Service<Req>>::Error: Into<crate::BoxError>,
230{
231 type Response = <D::Service as Service<Req>>::Response;
232 type Error = crate::BoxError;
233 type Future = future::MapErr<
234 <D::Service as Service<Req>>::Future,
235 fn(<D::Service as Service<Req>>::Error) -> crate::BoxError,
236 >;
237
238 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
239 let _ = self.update_pending_from_discover(cx)?;
242 self.promote_pending_to_ready(cx);
243
244 loop {
245 if let Some(index) = self.ready_index.take() {
252 match self.services.check_ready_index(cx, index) {
253 Ok(true) => {
254 self.ready_index = Some(index);
256 return Poll::Ready(Ok(()));
257 }
258 Ok(false) => {
259 trace!("ready service became unavailable");
261 }
262 Err(Failed(_, error)) => {
263 debug!(%error, "endpoint failed");
266 }
267 }
268 }
269
270 self.ready_index = self.p2c_ready_index();
273 if self.ready_index.is_none() {
274 debug_assert_eq!(self.services.ready_len(), 0);
275 return Poll::Pending;
278 }
279 }
280 }
281
282 fn call(&mut self, request: Req) -> Self::Future {
283 let index = self.ready_index.take().expect("called before ready");
284 self.services
285 .call_ready_index(index, request)
286 .map_err(Into::into)
287 }
288}
289
290impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
291 type Output = Result<(K, S), (K, Error<S::Error>)>;
292
293 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
294 let this = self.project();
295
296 if let Poll::Ready(Ok(())) = this.cancel.poll(cx) {
297 let key = this.key.take().expect("polled after ready");
298 return Poll::Ready(Err((key, Error::Canceled)));
299 }
300
301 let res = ready!(this
302 .service
303 .as_mut()
304 .expect("poll after ready")
305 .poll_ready(cx));
306
307 let key = this.key.take().expect("polled after ready");
308 let svc = this.service.take().expect("polled after ready");
309
310 match res {
311 Ok(()) => Poll::Ready(Ok((key, svc))),
312 Err(e) => Poll::Ready(Err((key, Error::Inner(e)))),
313 }
314 }
315}
316
317impl<K, S, Req> fmt::Debug for UnreadyService<K, S, Req>
318where
319 K: fmt::Debug,
320 S: fmt::Debug,
321{
322 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
323 let Self {
324 key,
325 cancel,
326 service,
327 _req,
328 } = self;
329 f.debug_struct("UnreadyService")
330 .field("key", key)
331 .field("cancel", cancel)
332 .field("service", service)
333 .finish()
334 }
335}