use crate::container::{Container, PushContainer, PushInto};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::operators::generic::operator::Operator;
pub trait Filter<C: Container> {
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(&self, predicate: P) -> Self;
}
impl<G: Scope, C: PushContainer> Filter<C> for StreamCore<G, C>
where
for<'a> C::Item<'a>: PushInto<C>
{
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(&self, mut predicate: P) -> StreamCore<G, C> {
let mut container = Default::default();
self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
input.for_each(|time, data| {
data.swap(&mut container);
if !container.is_empty() {
output.session(&time).give_iterator(container.drain().filter(&mut predicate));
}
});
})
}
}