mod backoff_reset_timer;
pub(crate) mod delayed_init;
mod event_flatten;
mod event_modify;
#[cfg(feature = "unstable-runtime-predicates")] mod predicate;
mod reflect;
mod stream_backoff;
mod watch_ext;
pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
pub use event_modify::EventModify;
#[cfg(feature = "unstable-runtime-predicates")]
pub use predicate::{predicates, Predicate, PredicateFilter};
pub use reflect::Reflect;
pub use stream_backoff::StreamBackoff;
pub use watch_ext::WatchStreamExt;
use futures::{
stream::{self, Peekable},
Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt,
};
use pin_project::pin_project;
use std::{
fmt::Debug,
pin::{pin, Pin},
sync::{Arc, Mutex},
task::Poll,
};
use stream::IntoStream;
use tokio::{runtime::Handle, task::JoinHandle};
#[pin_project]
pub(crate) struct SplitCase<S: Stream, Case> {
inner: Arc<Mutex<Peekable<S>>>,
should_consume_item: fn(&S::Item) -> bool,
try_extract_item_case: fn(S::Item) -> Option<Case>,
}
impl<S, Case> Stream for SplitCase<S, Case>
where
S: Stream + Unpin,
S::Item: Debug,
{
type Item = Case;
#[allow(clippy::mut_mutex_lock)]
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
let inner = this.inner.lock().unwrap();
let mut inner = Pin::new(inner);
let inner_peek = pin!(inner.as_mut().peek());
match inner_peek.poll(cx) {
Poll::Ready(Some(x_ref)) => {
if (this.should_consume_item)(x_ref) {
let item = inner.as_mut().poll_next(cx);
match item {
Poll::Ready(Some(x)) => Poll::Ready(Some((this.try_extract_item_case)(x).expect(
"`try_extract_item_case` returned `None` despite `should_consume_item` returning `true`",
))),
res => panic!(
"Peekable::poll_next() returned {res:?} when Peekable::peek() returned Ready(Some(_))"
),
}
} else {
Poll::Pending
}
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[allow(clippy::type_complexity, clippy::arc_with_non_send_sync)]
fn trystream_split_result<S>(
stream: S,
) -> (
SplitCase<IntoStream<S>, S::Ok>,
SplitCase<IntoStream<S>, S::Error>,
)
where
S: TryStream + Unpin,
S::Ok: Debug,
S::Error: Debug,
{
let stream = Arc::new(Mutex::new(stream.into_stream().peekable()));
(
SplitCase {
inner: stream.clone(),
should_consume_item: Result::is_ok,
try_extract_item_case: Result::ok,
},
SplitCase {
inner: stream,
should_consume_item: Result::is_err,
try_extract_item_case: Result::err,
},
)
}
pub(crate) fn trystream_try_via<S1, S2>(
input_stream: S1,
make_via_stream: impl FnOnce(SplitCase<IntoStream<S1>, S1::Ok>) -> S2,
) -> impl Stream<Item = Result<S2::Ok, S1::Error>>
where
S1: TryStream + Unpin,
S2: TryStream<Error = S1::Error>,
S1::Ok: Debug,
S1::Error: Debug,
{
let (oks, errs) = trystream_split_result(input_stream); let via = make_via_stream(oks); stream::select(via.into_stream(), errs.map(Err)) }
pub struct CancelableJoinHandle<T> {
inner: JoinHandle<T>,
}
impl<T> CancelableJoinHandle<T>
where
T: Send + 'static,
{
pub fn spawn(future: impl Future<Output = T> + Send + 'static, runtime: &Handle) -> Self {
CancelableJoinHandle {
inner: runtime.spawn(future),
}
}
}
impl<T> Drop for CancelableJoinHandle<T> {
fn drop(&mut self) {
self.inner.abort()
}
}
impl<T> Future for CancelableJoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.inner.poll_unpin(cx).map(
Result::unwrap,
)
}
}
#[pin_project]
pub(crate) struct OnComplete<S, F> {
#[pin]
stream: stream::Fuse<S>,
#[pin]
on_complete: F,
}
impl<S: Stream, F: Future<Output = ()>> Stream for OnComplete<S, F> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.stream.poll_next(cx) {
Poll::Ready(None) => match this.on_complete.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => Poll::Ready(None),
},
x => x,
}
}
}
pub(crate) trait KubeRuntimeStreamExt: Stream + Sized {
fn on_complete<F: Future<Output = ()>>(self, on_complete: F) -> OnComplete<Self, F> {
OnComplete {
stream: self.fuse(),
on_complete,
}
}
}
impl<S: Stream> KubeRuntimeStreamExt for S {}
#[cfg(test)]
mod tests {
use std::convert::Infallible;
use futures::stream::{self, StreamExt};
use super::trystream_try_via;
#[allow(dead_code)]
fn trystream_try_via_should_be_able_to_borrow() {
struct WeirdComplexObject {}
impl Drop for WeirdComplexObject {
fn drop(&mut self) {}
}
let mut x = WeirdComplexObject {};
let y = WeirdComplexObject {};
drop(trystream_try_via(
Box::pin(stream::once(async {
let _ = &mut x;
Result::<_, Infallible>::Ok(())
})),
|s| {
s.map(|()| {
let _ = &y;
Ok(())
})
},
));
}
}