Struct timely::dataflow::stream::StreamCore
source · pub struct StreamCore<S: Scope, C> { /* private fields */ }
Expand description
Abstraction of a stream of C: Container
records timestamped with S::Timestamp
.
Internally Stream
maintains a list of data recipients who should be presented with data
produced by the source of the stream.
Implementations§
source§impl<S: Scope, C: Container> StreamCore<S, C>
impl<S: Scope, C: Container> StreamCore<S, C>
sourcepub fn connect_to<P: Push<Message<S::Timestamp, C>> + 'static>(
&self,
target: Target,
pusher: P,
identifier: usize,
)
pub fn connect_to<P: Push<Message<S::Timestamp, C>> + 'static>( &self, target: Target, pusher: P, identifier: usize, )
Connects the stream to a destination.
The destination is described both by a Target
, for progress tracking information, and a P: Push
where the
records should actually be sent. The identifier is unique to the edge and is used only for logging purposes.
sourcepub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self
pub fn new(source: Source, output: TeeHelper<S::Timestamp, C>, scope: S) -> Self
Allocates a Stream
from a supplied Source
name and rendezvous point.
sourcepub fn container<D: Container>(self) -> StreamCore<S, D>where
Self: AsStream<S, D>,
pub fn container<D: Container>(self) -> StreamCore<S, D>where
Self: AsStream<S, D>,
Allows the assertion of a container type, for the benefit of type inference.
Trait Implementations§
source§impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C>
impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C>
source§fn as_stream(self) -> Self
fn as_stream(self) -> Self
Translate
self
to a StreamCore.source§impl<S: Scope, C: Container + Data> BranchWhen<<S as ScopeParent>::Timestamp> for StreamCore<S, C>
impl<S: Scope, C: Container + Data> BranchWhen<<S as ScopeParent>::Timestamp> for StreamCore<S, C>
source§fn branch_when(
&self,
condition: impl Fn(&S::Timestamp) -> bool + 'static,
) -> (Self, Self)
fn branch_when( &self, condition: impl Fn(&S::Timestamp) -> bool + 'static, ) -> (Self, Self)
Takes one input stream and splits it into two output streams.
For each time, the supplied closure is called. If it returns
true
,
the records for that will be sent to the second returned stream, otherwise
they will be sent to the first. Read moresource§impl<S: Scope, C: Container + Data> Capture<<S as ScopeParent>::Timestamp, C> for StreamCore<S, C>
impl<S: Scope, C: Container + Data> Capture<<S as ScopeParent>::Timestamp, C> for StreamCore<S, C>
source§impl<S: Scope, C> Clone for StreamCore<S, C>
impl<S: Scope, C> Clone for StreamCore<S, C>
source§impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container + Data> Concat<G, C> for StreamCore<G, C>
source§fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C>
fn concat(&self, other: &StreamCore<G, C>) -> StreamCore<G, C>
Merge the contents of two streams. Read more
source§impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container + Data> Concatenate<G, C> for StreamCore<G, C>
source§fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>where
I: IntoIterator<Item = StreamCore<G, C>>,
fn concatenate<I>(&self, sources: I) -> StreamCore<G, C>where
I: IntoIterator<Item = StreamCore<G, C>>,
Merge the contents of multiple streams. Read more
source§impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container + Data> ConnectLoop<G, C> for StreamCore<G, C>
source§fn connect_loop(&self, handle: Handle<G, C>)
fn connect_loop(&self, handle: Handle<G, C>)
Connect a
Stream
to be the input of a loop variable. Read moresource§impl<S, C> Debug for StreamCore<S, C>where
S: Scope,
impl<S, C> Debug for StreamCore<S, C>where
S: Scope,
source§impl<G: Scope, T: Timestamp + Refines<G::Timestamp>, C: Data + Container> Enter<G, T, C> for StreamCore<G, C>
impl<G: Scope, T: Timestamp + Refines<G::Timestamp>, C: Data + Container> Enter<G, T, C> for StreamCore<G, C>
source§impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
source§impl<G: Scope, C> Filter<C> for StreamCore<G, C>
impl<G: Scope, C> Filter<C> for StreamCore<G, C>
source§impl<G: Scope, C: Container + Data> Inspect<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container + Data> Inspect<G, C> for StreamCore<G, C>
source§fn inspect_core<F>(&self, func: F) -> Self
fn inspect_core<F>(&self, func: F) -> Self
Runs a supplied closure on each observed data batch, and each frontier advancement. Read more
source§fn inspect<F>(&self, func: F) -> Self
fn inspect<F>(&self, func: F) -> Self
Runs a supplied closure on each observed data element. Read more
source§fn inspect_time<F>(&self, func: F) -> Self
fn inspect_time<F>(&self, func: F) -> Self
Runs a supplied closure on each observed data element and associated time. Read more
source§impl<G: Scope, C: Container + Data> InspectCore<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container + Data> InspectCore<G, C> for StreamCore<G, C>
source§fn inspect_container<F>(&self, func: F) -> StreamCore<G, C>
fn inspect_container<F>(&self, func: F) -> StreamCore<G, C>
Runs a supplied closure on each observed container, and each frontier advancement. Read more
source§impl<G: Scope, C: Container + Data, T: Timestamp + Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C>
impl<G: Scope, C: Container + Data, T: Timestamp + Refines<G::Timestamp>> Leave<G, C> for StreamCore<Child<'_, G, T>, C>
source§fn leave(&self) -> StreamCore<G, C>
fn leave(&self) -> StreamCore<G, C>
source§impl<S: Scope, C: Container + Data> Map<S, C> for StreamCore<S, C>
impl<S: Scope, C: Container + Data> Map<S, C> for StreamCore<S, C>
source§fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>where
I: IntoIterator,
C2: SizableContainer + PushInto<I::Item> + Data,
L: FnMut(C::Item<'_>) -> I + 'static,
fn flat_map<C2, I, L>(&self, logic: L) -> StreamCore<S, C2>where
I: IntoIterator,
C2: SizableContainer + PushInto<I::Item> + Data,
L: FnMut(C::Item<'_>) -> I + 'static,
Consumes each element of the stream and yields some number of new elements. Read more
source§fn map<C2, D2, L>(&self, logic: L) -> StreamCore<S, C2>
fn map<C2, D2, L>(&self, logic: L) -> StreamCore<S, C2>
Consumes each element of the stream and yields a new element. Read more
source§impl<S: Scope, C: Container + Data> OkErr<S, C> for StreamCore<S, C>
impl<S: Scope, C: Container + Data> OkErr<S, C> for StreamCore<S, C>
source§fn ok_err<C1, D1, C2, D2, L>(
&self,
logic: L,
) -> (StreamCore<S, C1>, StreamCore<S, C2>)where
C1: SizableContainer + PushInto<D1> + Data,
C2: SizableContainer + PushInto<D2> + Data,
L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,
fn ok_err<C1, D1, C2, D2, L>(
&self,
logic: L,
) -> (StreamCore<S, C1>, StreamCore<S, C2>)where
C1: SizableContainer + PushInto<D1> + Data,
C2: SizableContainer + PushInto<D2> + Data,
L: FnMut(C::Item<'_>) -> Result<D1, D2> + 'static,
Takes one input stream and splits it into two output streams.
For each record, the supplied closure is called with the data.
If it returns
Ok(x)
, then x
will be sent
to the first returned stream; otherwise, if it returns Err(e)
,
then e
will be sent to the second. Read moresource§impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1>
impl<G: Scope, C1: Container + Data> Operator<G, C1> for StreamCore<G, C1>
source§fn unary_frontier<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary_frontier<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input. Read moresource§fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, CB::Container>
fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>( &self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input. Read moresource§fn unary<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, and write to the output stream. Read moresource§fn binary_frontier<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container + Data,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary_frontier<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container + Data,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moresource§fn binary_notify<C2: Container + Data, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, CB::Container>
fn binary_notify<C2: Container + Data, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moresource§fn binary<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container + Data,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container + Data,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy
pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs. Read moresource§fn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy
pact
, and repeatedly invokes the function logic
which can read from the input stream
and inspect the frontier at the input. Read moresource§impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C>
impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C>
source§fn partition<CB, D2, F>(
&self,
parts: u64,
route: F,
) -> Vec<StreamCore<G, CB::Container>>where
CB: ContainerBuilder,
CB::Container: SizableContainer + PushInto<D2> + Data,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
fn partition<CB, D2, F>(
&self,
parts: u64,
route: F,
) -> Vec<StreamCore<G, CB::Container>>where
CB: ContainerBuilder,
CB::Container: SizableContainer + PushInto<D2> + Data,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
source§impl<S: Scope, C: Container + Data> Reclock<S> for StreamCore<S, C>
impl<S: Scope, C: Container + Data> Reclock<S> for StreamCore<S, C>
source§fn reclock<TC: Container + Data>(
&self,
clock: &StreamCore<S, TC>,
) -> StreamCore<S, C>
fn reclock<TC: Container + Data>( &self, clock: &StreamCore<S, TC>, ) -> StreamCore<S, C>
Delays records until an input is observed on the
clock
input. Read moreConvert a stream into a stream of shared data Read more
Auto Trait Implementations§
impl<S, C> Freeze for StreamCore<S, C>where
S: Freeze,
impl<S, C> !RefUnwindSafe for StreamCore<S, C>
impl<S, C> !Send for StreamCore<S, C>
impl<S, C> !Sync for StreamCore<S, C>
impl<S, C> Unpin for StreamCore<S, C>where
S: Unpin,
impl<S, C> !UnwindSafe for StreamCore<S, C>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
🔬This is a nightly-only experimental API. (
clone_to_uninit
)