pub struct StreamCore<S: Scope, C> { /* private fields */ }Expand description
Abstraction of a stream of C: Container records timestamped with S::Timestamp.
Internally Stream maintains a list of data recipients who should be presented with data
produced by the source of the stream.
Implementations§
Source§impl<S: Scope, C> StreamCore<S, C>
impl<S: Scope, C> StreamCore<S, C>
Sourcepub fn connect_to<P: Push<Message<S::Timestamp, C>> + 'static>(
&self,
target: Target,
pusher: P,
identifier: usize,
)
pub fn connect_to<P: Push<Message<S::Timestamp, C>> + 'static>( &self, target: Target, pusher: P, identifier: usize, )
Connects the stream to a destination.
The destination is described both by a Target, for progress tracking information, and a P: Push where the
records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
Sourcepub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self
pub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self
Allocates a Stream from a supplied Source name and rendezvous point.
Sourcepub fn container<C2>(self) -> StreamCore<S, C2>where
Self: AsStream<S, C2>,
pub fn container<C2>(self) -> StreamCore<S, C2>where
Self: AsStream<S, C2>,
Allows the assertion of a container type, for the benefit of type inference.
Trait Implementations§
Source§impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C>
impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C>
Source§fn as_stream(self) -> Self
fn as_stream(self) -> Self
Translate
self to a StreamCore.Source§impl<S: Scope, C: Container> BranchWhen<<S as ScopeParent>::Timestamp> for StreamCore<S, C>
impl<S: Scope, C: Container> BranchWhen<<S as ScopeParent>::Timestamp> for StreamCore<S, C>
Source§fn branch_when(
&self,
condition: impl Fn(&S::Timestamp) -> bool + 'static,
) -> (Self, Self)
fn branch_when( &self, condition: impl Fn(&S::Timestamp) -> bool + 'static, ) -> (Self, Self)
Takes one input stream and splits it into two output streams.
For each time, the supplied closure is called. If it returns
true,
the records for that will be sent to the second returned stream, otherwise
they will be sent to the first. Read moreSource§impl<S: Scope, C: Container> Capture<<S as ScopeParent>::Timestamp, C> for StreamCore<S, C>
impl<S: Scope, C: Container> Capture<<S as ScopeParent>::Timestamp, C> for StreamCore<S, C>
Source§impl<S: Scope, C> Clone for StreamCore<S, C>
impl<S: Scope, C> Clone for StreamCore<S, C>
Source§impl<G: Scope, C: Container> Concat<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container> Concat<G, C> for StreamCore<G, C>
Source§fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C>
fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C>
Merge the contents of two streams. Read more
Source§impl<G: Scope, C: Container> Concatenate<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container> Concatenate<G, C> for StreamCore<G, C>
Source§fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>where
I: IntoIterator<Item = StreamCore<G, C>>,
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>where
I: IntoIterator<Item = StreamCore<G, C>>,
Merge the contents of multiple streams. Read more
Source§impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container> ConnectLoop<G, C> for StreamCore<G, C>
Source§fn connect_loop(&self, handle: Handle<G, C>)
fn connect_loop(&self, handle: Handle<G, C>)
Connect a
Stream to be the input of a loop variable. Read moreSource§impl<S, C> Debug for StreamCore<S, C>where
S: Scope,
impl<S, C> Debug for StreamCore<S, C>where
S: Scope,
Source§impl<G: Scope, T: Timestamp + Refines<G::Timestamp>, C: Container> Enter<G, T, C> for StreamCore<G, C>
impl<G: Scope, T: Timestamp + Refines<G::Timestamp>, C: Container> Enter<G, T, C> for StreamCore<G, C>
Source§impl<G: Scope, C> Exchange<C> for StreamCore<G, C>where
C: Container + SizableContainer + DrainContainer + Send + ContainerBytes + for<'a> PushInto<C::Item<'a>>,
impl<G: Scope, C> Exchange<C> for StreamCore<G, C>where
C: Container + SizableContainer + DrainContainer + Send + ContainerBytes + for<'a> PushInto<C::Item<'a>>,
Source§impl<G: Scope, C> Filter<C> for StreamCore<G, C>
impl<G: Scope, C> Filter<C> for StreamCore<G, C>
Source§impl<G: Scope, C: Container + IterContainer> Inspect<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container + IterContainer> Inspect<G, C> for StreamCore<G, C>
Source§fn inspect_core<F>(&self, func: F) -> Self
fn inspect_core<F>(&self, func: F) -> Self
Runs a supplied closure on each observed data batch, and each frontier advancement. Read more
Source§fn inspect<F>(&self, func: F) -> Self
fn inspect<F>(&self, func: F) -> Self
Runs a supplied closure on each observed data element. Read more
Source§fn inspect_time<F>(&self, func: F) -> Self
fn inspect_time<F>(&self, func: F) -> Self
Runs a supplied closure on each observed data element and associated time. Read more
Source§impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container> InspectCore<G, C> for StreamCore<G, C>
Source§fn inspect_container<F>(&self, func: F) -> StreamCore<G, C>
fn inspect_container<F>(&self, func: F) -> StreamCore<G, C>
Runs a supplied closure on each observed container, and each frontier advancement. Read more
Source§impl<G: Scope, C: Container, T: Timestamp + Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C>
impl<G: Scope, C: Container, T: Timestamp + Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C>
Source§fn leave(&self) -> StreamCore<G, C>
fn leave(&self) -> StreamCore<G, C>
Source§impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C>
impl<S: Scope, C: Container + DrainContainer> Map<S, C> for StreamCore<S, C>
Source§fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>where
I: IntoIterator,
C2: Container + SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>) -> I + 'static,
fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>where
I: IntoIterator,
C2: Container + SizableContainer + PushInto<I::Item>,
L: FnMut(C::Item<'_>) -> I + 'static,
Consumes each element of the stream and yields some number of new elements. Read more
Source§fn map<C2, D2, L>(&self, logic: L) -> StreamCore<S, C2>
fn map<C2, D2, L>(&self, logic: L) -> StreamCore<S, C2>
Consumes each element of the stream and yields a new element. Read more
Source§fn flat_map_builder<'t, I, L>(
&'t self,
logic: L,
) -> FlatMapBuilder<'t, Self, C, L, I>
fn flat_map_builder<'t, I, L>( &'t self, logic: L, ) -> FlatMapBuilder<'t, Self, C, L, I>
Creates a
FlatMapBuilder, which allows chaining of iterator logic before finalization into a stream. Read moreSource§impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for StreamCore<S, C>
impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for StreamCore<S, C>
Source§fn ok_err<C1, D1, C2, D2, L>(
&self,
logic: L,
) -> (StreamCore<S, C1>, StreamCore<S, C2>)where
C1: Container + SizableContainer + PushInto<D1>,
C2: Container + SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,
fn ok_err<C1, D1, C2, D2, L>(
&self,
logic: L,
) -> (StreamCore<S, C1>, StreamCore<S, C2>)where
C1: Container + SizableContainer + PushInto<D1>,
C2: Container + SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,
Takes one input stream and splits it into two output streams.
For each record, the supplied closure is called with the data.
If it returns
Ok(x), then x will be sent
to the first returned stream; otherwise, if it returns Err(e),
then e will be sent to the second. Read moreSource§impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1>
impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1>
Source§fn unary_frontier<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary_frontier<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, write to the output stream, and inspect the frontier at the input. Read moreSource§fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, CB::Container>
fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>( &self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, write to the output stream, and inspect the frontier at the input. Read moreSource§fn unary<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input stream, and write to the output stream. Read moreSource§fn binary_frontier<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary_frontier<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, CB::Container>
fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn binary<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact, and repeatedly invokes logic, the function returned by the function passed as constructor.
logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moreSource§fn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact, and repeatedly invokes the function logic which can read from the input stream
and inspect the frontier at the input. Read moreSource§impl<G: Scope, C: Container + DrainContainer> Partition<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container + DrainContainer> Partition<G, C> for StreamCore<G, C>
Source§impl<S: Scope, C: Container> Reclock<S> for StreamCore<S, C>
impl<S: Scope, C: Container> Reclock<S> for StreamCore<S, C>
Source§fn reclock<TC: Container>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C>
fn reclock<TC: Container>(&self, clock: &StreamCore<S, TC>) -> StreamCore<S, C>
Delays records until an input is observed on the
clock input. Read moreConvert a stream into a stream of shared data Read more
Auto Trait Implementations§
impl<S, C> Freeze for StreamCore<S, C>where
S: Freeze,
impl<S, C> !RefUnwindSafe for StreamCore<S, C>
impl<S, C> !Send for StreamCore<S, C>
impl<S, C> !Sync for StreamCore<S, C>
impl<S, C> Unpin for StreamCore<S, C>where
S: Unpin,
impl<S, C> !UnwindSafe for StreamCore<S, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
Converts
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more