Type Alias Stream

Source
pub type Stream<S, D> = StreamCore<S, Vec<D>>;
Expand description

A stream batching data in vectors.

Aliased Type§

struct Stream<S, D> { /* private fields */ }

Implementations

Source§

impl<S: Scope, C: Container> StreamCore<S, C>

Source

pub fn connect_to<P: Push<Message<S::Timestamp, C>> + 'static>( &self, target: Target, pusher: P, identifier: usize, )

Connects the stream to a destination.

The destination is described both by a Target, for progress tracking information, and a P: Push where the records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.

Source

pub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self

Allocates a Stream from a supplied Source name and rendezvous point.

Source

pub fn name(&self) -> &Source

The name of the stream’s source operator.

Source

pub fn scope(&self) -> S

The scope immediately containing the stream.

Source

pub fn container<D: Container>(self) -> StreamCore<S, D>
where Self: AsStream<S, D>,

Allows the assertion of a container type, for the benefit of type inference.

Trait Implementations§

Source§

impl<G: Scope, D: Data> Accumulate<G, D> for Stream<G, D>

Source§

fn accumulate<A: Data>( &self, default: A, logic: impl Fn(&mut A, &mut Vec<D>) + 'static, ) -> Stream<G, A>

Accumulates records within a timestamp. Read more
Source§

fn count(&self) -> Stream<G, usize>

Counts the number of records observed at each time. Read more
Source§

impl<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)>

Source§

fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, E: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>( &self, fold: F, emit: E, hash: H, ) -> Stream<S, R>
where S::Timestamp: Eq,

Aggregates data of the form (key, val), using user-supplied logic. Read more
Source§

impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D>

Source§

fn branch( &self, condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, ) -> (Stream<S, D>, Stream<S, D>)

Takes one input stream and splits it into two output streams. For each record, the supplied closure is called with a reference to the data and its time. If it returns true, the record will be sent to the second returned stream, otherwise it will be sent to the first. Read more
Source§

impl<G: Scope, D: ExchangeData> Broadcast<D> for Stream<G, D>

Source§

fn broadcast(&self) -> Stream<G, D>

Broadcast records to all workers. Read more
Source§

impl<G: Scope, D: Data> Delay<G, D> for Stream<G, D>

Source§

fn delay<L: FnMut(&D, &G::Timestamp) -> G::Timestamp + 'static>( &self, func: L, ) -> Self

Advances the timestamp of records using a supplied function. Read more
Source§

fn delay_total<L: FnMut(&D, &G::Timestamp) -> G::Timestamp + 'static>( &self, func: L, ) -> Self

Advances the timestamp of records using a supplied function. Read more
Source§

fn delay_batch<L: FnMut(&G::Timestamp) -> G::Timestamp + 'static>( &self, func: L, ) -> Self

Advances the timestamp of batches of records using a supplied function. Read more
Source§

impl<G: Scope, D: Data> Filter<D> for Stream<G, D>

Source§

fn filter<P: FnMut(&D) -> bool + 'static>(&self, predicate: P) -> Stream<G, D>

Returns a new instance of self containing only records satisfying predicate. Read more
Source§

impl<S: Scope, D: Data> Map<S, D> for Stream<S, D>

Source§

fn map_in_place<L: FnMut(&mut D) + 'static>(&self, logic: L) -> Stream<S, D>

Updates each element of the stream and yields the element, re-using memory where possible. Read more
Source§

fn flat_map<I: IntoIterator, L: FnMut(D) -> I + 'static>( &self, logic: L, ) -> Stream<S, I::Item>
where I::Item: Data,

Consumes each element of the stream and yields some number of new elements. Read more
Source§

fn map<D2: Data, L: FnMut(D) -> D2 + 'static>(&self, logic: L) -> Stream<S, D2>

Consumes each element of the stream and yields a new element. Read more
Source§

impl<G: Scope, D: Data, D2: Data, F: Fn(D) -> (u64, D2) + 'static> Partition<G, D, D2, F> for Stream<G, D>

Source§

fn partition(&self, parts: u64, route: F) -> Vec<Stream<G, D2>>

Produces parts output streams, containing records produced and assigned by route. Read more
Source§

impl<S: Scope, T: Data, E: Data> ResultStream<S, T, E> for Stream<S, Result<T, E>>

Source§

fn ok(&self) -> Stream<S, T>

Returns a new instance of self containing only ok records. Read more
Source§

fn err(&self) -> Stream<S, E>

Returns a new instance of self containing only err records. Read more
Source§

fn map_ok<T2: Data, L: FnMut(T) -> T2 + 'static>( &self, logic: L, ) -> Stream<S, Result<T2, E>>

Returns a new instance of self applying logic on all Ok records. Read more
Source§

fn map_err<E2: Data, L: FnMut(E) -> E2 + 'static>( &self, logic: L, ) -> Stream<S, Result<T, E2>>

Returns a new instance of self applying logic on all Err records. Read more
Source§

fn and_then<T2: Data, L: FnMut(T) -> Result<T2, E> + 'static>( &self, logic: L, ) -> Stream<S, Result<T2, E>>

Returns a new instance of self applying logic on all Ok records, passes through Err records. Read more
Source§

fn unwrap_or_else<L: FnMut(E) -> T + 'static>(&self, logic: L) -> Stream<S, T>

Returns a new instance of self applying logic on all Ok records. Read more
Source§

impl<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)>

Source§

fn state_machine<R: Data, D: Default + 'static, I: IntoIterator<Item = R>, F: Fn(&K, V, &mut D) -> (bool, I) + 'static, H: Fn(&K) -> u64 + 'static>( &self, fold: F, hash: H, ) -> Stream<S, R>
where S::Timestamp: Hash + Eq,

Tracks a state for each presented key, using user-supplied state transition logic. Read more
Source§

impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C>

Source§

fn as_stream(self) -> Self

Translate self to a StreamCore.
Source§

impl<S: Scope, C: Container + Data> BranchWhen<<S as ScopeParent>::Timestamp> for StreamCore<S, C>

Source§

fn branch_when( &self, condition: impl Fn(&S::Timestamp) -> bool + 'static, ) -> (Self, Self)

Takes one input stream and splits it into two output streams. For each time, the supplied closure is called. If it returns true, the records for that will be sent to the second returned stream, otherwise they will be sent to the first. Read more
Source§

impl<S: Scope, C: Container + Data> Capture<<S as ScopeParent>::Timestamp, C> for StreamCore<S, C>

Source§

fn capture_into<P: EventPusher<S::Timestamp, C> + 'static>( &self, event_pusher: P, )

Captures a stream of timestamped data for later replay. Read more
Source§

fn capture(&self) -> Receiver<Event<T, C>>

Captures a stream using Rust’s MPSC channels.
Source§

impl<S: Scope, C> Clone for StreamCore<S, C>

Source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C>

Source§

fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C>

Merge the contents of two streams. Read more
Source§

impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C>

Source§

fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>
where I: IntoIterator<Item = StreamCore<G, C>>,

Merge the contents of multiple streams. Read more
Source§

impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C>

Source§

fn connect_loop(&self, handle: Handle<G, C>)

Connect a Stream to be the input of a loop variable. Read more
Source§

impl<S, C> Debug for StreamCore<S, C>
where S: Scope,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<G: Scope, T: Timestamp + Refines<G::Timestamp>, C: Data + Container> Enter<G, T, C> for StreamCore<G, C>

Source§

fn enter<'a>(&self, scope: &Child<'a, G, T>) -> StreamCore<Child<'a, G, T>, C>

Moves the Stream argument into a child of its current Scope. Read more
Source§

impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
where C: SizableContainer + ExchangeData + ContainerBytes + for<'a> PushInto<C::Item<'a>>,

Source§

fn exchange<F>(&self, route: F) -> StreamCore<G, C>
where for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static,

Exchange records between workers. Read more
Source§

impl<G: Scope, C> Filter<C> for StreamCore<G, C>
where for<'a> C: PushInto<C::Item<'a>> + SizableContainer + Data,

Source§

fn filter<P: FnMut(&C::Item<'_>) -> bool + 'static>( &self, predicate: P, ) -> StreamCore<G, C>

Returns a new instance of self containing only records satisfying predicate. Read more
Source§

impl<G: Scope, C: Container + Data> Inspect<G, C> for StreamCore<G, C>

Source§

fn inspect_core<F>(&self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,

Runs a supplied closure on each observed data batch, and each frontier advancement. Read more
Source§

fn inspect<F>(&self, func: F) -> Self
where F: for<'a> FnMut(C::ItemRef<'a>) + 'static,

Runs a supplied closure on each observed data element. Read more
Source§

fn inspect_time<F>(&self, func: F) -> Self
where F: for<'a> FnMut(&G::Timestamp, C::ItemRef<'a>) + 'static,

Runs a supplied closure on each observed data element and associated time. Read more
Source§

fn inspect_batch(&self, func: impl FnMut(&G::Timestamp, &C) + 'static) -> Self

Runs a supplied closure on each observed data batch (time and data slice). Read more
Source§

impl<G: Scope, C: Container + Data> InspectCore<G, C> for StreamCore<G, C>

Source§

fn inspect_container<F>(&self, func: F) -> StreamCore<G, C>
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,

Runs a supplied closure on each observed container, and each frontier advancement. Read more
Source§

impl<G: Scope, C: Container + Data, T: Timestamp + Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C>

Source§

fn leave(&self) -> StreamCore<G, C>

Moves a Stream to the parent of its current Scope. Read more
Source§

impl<S: Scope, C: Container + Data> Map<S, C> for StreamCore<S, C>

Source§

fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>
where I: IntoIterator, C2: SizableContainer + PushInto<I::Item> + Data, L: FnMut(C::Item<'_>) -> I + 'static,

Consumes each element of the stream and yields some number of new elements. Read more
Source§

fn map<C2, D2, L>(&self, logic: L) -> StreamCore<S, C2>
where C2: SizableContainer + PushInto<D2> + Data, L: FnMut(C::Item<'_>) -> D2 + 'static,

Consumes each element of the stream and yields a new element. Read more
Source§

impl<S: Scope, C: Container + Data> OkErr<S, C> for StreamCore<S, C>

Source§

fn ok_err<C1, D1, C2, D2, L>( &self, logic: L, ) -> (StreamCore<S, C1>, StreamCore<S, C2>)
where C1: SizableContainer + PushInto<D1> + Data, C2: SizableContainer + PushInto<D2> + Data, L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,

Takes one input stream and splits it into two output streams. For each record, the supplied closure is called with the data. If it returns Ok(x), then x will be sent to the first returned stream; otherwise, if it returns Err(e), then e will be sent to the second. Read more
Source§

impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1>

Source§

fn unary_frontier<CB, B, L, P>( &self, pact: P, name: &str, constructor: B, ) -> StreamCore<G, CB::Container>

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, write to the output stream, and inspect the frontier at the input. Read more
Source§

fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>( &self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, write to the output stream, and inspect the frontier at the input. Read more
Source§

fn unary<CB, B, L, P>( &self, pact: P, name: &str, constructor: B, ) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, and write to the output stream. Read more
Source§

fn binary_frontier<C2, CB, B, L, P1, P2>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B, ) -> StreamCore<G, CB::Container>

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
Source§

fn binary_notify<C2: Container + Data, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
Source§

fn binary<C2, CB, B, L, P1, P2>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B, ) -> StreamCore<G, CB::Container>
where C2: Container + Data, CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>,

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read more
Source§

fn sink<L, P>(&self, pact: P, name: &str, logic: L)
where L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>) + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes the function logic which can read from the input stream and inspect the frontier at the input. Read more
Source§

impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C>

Source§

fn partition<CB, D2, F>( &self, parts: u64, route: F, ) -> Vec<StreamCore<G, CB::Container>>
where CB: ContainerBuilder + PushInto<D2>, CB::Container: Data, F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,

Produces parts output streams, containing records produced and assigned by route. Read more
Source§

impl<G: Scope, C: Container + Data> Probe<G, C> for StreamCore<G, C>

Source§

fn probe(&self) -> Handle<G::Timestamp>

Constructs a progress probe which indicates which timestamps have elapsed at the operator. Read more
Source§

fn probe_with(&self, handle: &Handle<G::Timestamp>) -> StreamCore<G, C>

Inserts a progress probe in a stream. Read more
Source§

impl<S: Scope, C: Container + Data> Reclock<S> for StreamCore<S, C>

Source§

fn reclock<TC: Container + Data>( &self, clock: &StreamCore<S, TC>, ) -> StreamCore<S, C>

Delays records until an input is observed on the clock input. Read more
Source§

impl<S: Scope, C: Container + Data> SharedStream<S, C> for StreamCore<S, C>

Source§

fn shared(&self) -> StreamCore<S, Rc<C>>

Convert a stream into a stream of shared data Read more