1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
//! Extension trait and implementation for observing and action on streamed data.
use crate::{Container, Data};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::operators::generic::Operator;
/// Methods to inspect records and batches of records on a stream.
pub trait Inspect<G: Scope, C: Container>: InspectCore<G, C> + Sized {
/// Runs a supplied closure on each observed data element.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Map, Inspect};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn inspect<F>(&self, mut func: F) -> Self
where
F: for<'a> FnMut(C::ItemRef<'a>) + 'static,
{
self.inspect_batch(move |_, data| {
for datum in data.iter() { func(datum); }
})
}
/// Runs a supplied closure on each observed data element and associated time.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Map, Inspect};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
/// });
/// ```
fn inspect_time<F>(&self, mut func: F) -> Self
where
F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static,
{
self.inspect_batch(move |time, data| {
for datum in data.iter() {
func(time, datum);
}
})
}
/// Runs a supplied closure on each observed data batch (time and data slice).
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Map, Inspect};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
/// });
/// ```
fn inspect_batch(&self, mut func: impl FnMut(&G::Timestamp, &C)+'static) -> Self {
self.inspect_core(move |event| {
if let Ok((time, data)) = event {
func(time, data);
}
})
}
/// Runs a supplied closure on each observed data batch, and each frontier advancement.
///
/// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
/// and `Err` for frontiers. Frontiers are only presented when they change.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Map, Inspect};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .inspect_core(|event| {
/// match event {
/// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
/// Err(frontier) => println!("frontier advanced to {:?}", frontier),
/// }
/// });
/// });
/// ```
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
}
impl<G: Scope, C: Container + Data> Inspect<G, C> for StreamCore<G, C> {
fn inspect_core<F>(&self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static {
self.inspect_container(func)
}
}
/// Inspect containers
pub trait InspectCore<G: Scope, C: Container> {
/// Runs a supplied closure on each observed container, and each frontier advancement.
///
/// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
/// and `Err` for frontiers. Frontiers are only presented when they change.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Map, InspectCore};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .inspect_container(|event| {
/// match event {
/// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
/// Err(frontier) => println!("frontier advanced to {:?}", frontier),
/// }
/// });
/// });
/// ```
fn inspect_container<F>(&self, func: F) -> StreamCore<G, C> where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static;
}
impl<G: Scope, C: Container + Data> InspectCore<G, C> for StreamCore<G, C> {
fn inspect_container<F>(&self, mut func: F) -> StreamCore<G, C>
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static
{
use crate::progress::timestamp::Timestamp;
let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum());
self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| {
if input.frontier.frontier() != frontier.borrow() {
frontier.clear();
frontier.extend(input.frontier.frontier().iter().cloned());
func(Err(frontier.elements()));
}
input.for_each(|time, data| {
func(Ok((&time, &*data)));
output.session(&time).give_container(data);
});
})
}
}