Trait mz_timely_util::operator::StreamExt
source · pub trait StreamExt<G, C1>{
// Required methods
fn unary_fallible<DCB, ECB, B, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
where DCB: ContainerBuilder,
ECB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> Box<dyn FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>, &mut OutputHandleCore<'_, G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>) + 'static>,
P: ParallelizationContract<G::Timestamp, C1>;
fn unary_async<CB, P, B, BFut>(
&self,
pact: P,
name: String,
constructor: B,
) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo, AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>, AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) -> BFut,
BFut: Future + 'static,
P: ParallelizationContract<G::Timestamp, C1>;
fn binary_async<C2, CB, P1, P2, B, BFut>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: String,
constructor: B,
) -> StreamCore<G, CB::Container>
where C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo, AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>, AsyncInputHandle<G::Timestamp, C2, ConnectedToOne>, AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) -> BFut,
BFut: Future + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>;
fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)
where B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
BFut: Future + 'static,
P: ParallelizationContract<G::Timestamp, C1>;
fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
&self,
name: &str,
logic: L,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
where DCB: ContainerBuilder + PushInto<D2>,
ECB: ContainerBuilder + PushInto<E>,
I: IntoIterator<Item = Result<D2, E>>,
L: for<'a> FnMut(C1::Item<'a>) -> I + 'static;
fn expire_stream_at(
&self,
name: &str,
expiration: G::Timestamp,
token: Weak<()>,
) -> StreamCore<G, C1>;
fn pass_through<CB, R>(
&self,
name: &str,
unit: R,
) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>,
R: Data;
fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1>;
fn distribute(&self) -> StreamCore<G, C1>
where C1: ExchangeData;
// Provided method
fn map_fallible<DCB, ECB, D2, E, L>(
&self,
name: &str,
logic: L,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
where DCB: ContainerBuilder + PushInto<D2>,
ECB: ContainerBuilder + PushInto<E>,
L: for<'a> FnMut(C1::Item<'a>) -> Result<D2, E> + 'static { ... }
}
Expand description
Extension methods for timely StreamCore
s.
Required Methods§
sourcefn unary_fallible<DCB, ECB, B, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)where
DCB: ContainerBuilder,
ECB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> Box<dyn FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>, &mut OutputHandleCore<'_, G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>) + 'static>,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary_fallible<DCB, ECB, B, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)where
DCB: ContainerBuilder,
ECB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> Box<dyn FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>, &mut OutputHandleCore<'_, G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>) + 'static>,
P: ParallelizationContract<G::Timestamp, C1>,
Like timely::dataflow::operators::generic::operator::Operator::unary
,
but the logic function can handle failures.
Creates a new dataflow operator that partitions its input stream by a
parallelization strategy pact
and repeatedly invokes logic
, the
function returned by the function passed as constructor
. The logic
function can read to the input stream and write to either of two output
streams, where the first output stream represents successful
computations and the second output stream represents failed
computations.
sourcefn unary_async<CB, P, B, BFut>(
&self,
pact: P,
name: String,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo, AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>, AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) -> BFut,
BFut: Future + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary_async<CB, P, B, BFut>(
&self,
pact: P,
name: String,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo, AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>, AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) -> BFut,
BFut: Future + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly schedules logic, the future returned by the function passed as constructor. logic can read from the input stream, and write to the output stream.
sourcefn binary_async<C2, CB, P1, P2, B, BFut>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: String,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo, AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>, AsyncInputHandle<G::Timestamp, C2, ConnectedToOne>, AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) -> BFut,
BFut: Future + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary_async<C2, CB, P1, P2, B, BFut>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: String,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo, AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>, AsyncInputHandle<G::Timestamp, C2, ConnectedToOne>, AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) -> BFut,
BFut: Future + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly schedules logic, the future returned by the function passed as constructor. logic can read from the input streams, and write to the output stream.
sourcefn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)where
B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
BFut: Future + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)where
B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
BFut: Future + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly schedules logic which can read from the input stream and inspect the frontier at the input.
sourcefn flat_map_fallible<DCB, ECB, D2, E, I, L>(
&self,
name: &str,
logic: L,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)where
DCB: ContainerBuilder + PushInto<D2>,
ECB: ContainerBuilder + PushInto<E>,
I: IntoIterator<Item = Result<D2, E>>,
L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
fn flat_map_fallible<DCB, ECB, D2, E, I, L>(
&self,
name: &str,
logic: L,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)where
DCB: ContainerBuilder + PushInto<D2>,
ECB: ContainerBuilder + PushInto<E>,
I: IntoIterator<Item = Result<D2, E>>,
L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,
Like timely::dataflow::operators::map::Map::flat_map
, but logic
is allowed to fail. The first returned stream will contain the
successful applications of logic
, while the second returned stream
will contain the failed applications.
sourcefn expire_stream_at(
&self,
name: &str,
expiration: G::Timestamp,
token: Weak<()>,
) -> StreamCore<G, C1>
fn expire_stream_at( &self, name: &str, expiration: G::Timestamp, token: Weak<()>, ) -> StreamCore<G, C1>
Block progress of the frontier at expiration
time, unless the token is dropped.
sourcefn pass_through<CB, R>(
&self,
name: &str,
unit: R,
) -> StreamCore<G, CB::Container>
fn pass_through<CB, R>( &self, name: &str, unit: R, ) -> StreamCore<G, CB::Container>
Take a Timely stream and convert it to a Differential stream, where each diff is “1” and each time is the current Timely timestamp.
sourcefn with_token(&self, token: Weak<()>) -> StreamCore<G, C1>
fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1>
Wraps the stream with an operator that passes through all received inputs as long as the provided token can be upgraded. Once the token cannot be upgraded anymore, all data flowing into the operator is dropped.
sourcefn distribute(&self) -> StreamCore<G, C1>where
C1: ExchangeData,
fn distribute(&self) -> StreamCore<G, C1>where
C1: ExchangeData,
Distributes the data of the stream to all workers in a round-robin fashion.
Provided Methods§
sourcefn map_fallible<DCB, ECB, D2, E, L>(
&self,
name: &str,
logic: L,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)where
DCB: ContainerBuilder + PushInto<D2>,
ECB: ContainerBuilder + PushInto<E>,
L: for<'a> FnMut(C1::Item<'a>) -> Result<D2, E> + 'static,
fn map_fallible<DCB, ECB, D2, E, L>(
&self,
name: &str,
logic: L,
) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)where
DCB: ContainerBuilder + PushInto<D2>,
ECB: ContainerBuilder + PushInto<E>,
L: for<'a> FnMut(C1::Item<'a>) -> Result<D2, E> + 'static,
Like timely::dataflow::operators::map::Map::map
, but logic
is allowed to fail. The first returned stream will contain the
successful applications of logic
, while the second returned stream
will contain the failed applications.
Object Safety§
Implementations on Foreign Types§
source§impl<G, C1> StreamExt<G, C1> for StreamCore<G, C1>
impl<G, C1> StreamExt<G, C1> for StreamCore<G, C1>
source§fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)where
B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
BFut: Future + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)where
B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut,
BFut: Future + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly schedules logic which can read from the input stream and inspect the frontier at the input.