Trait mz_timely_util::operator::StreamExt

source ·
pub trait StreamExt<G, C1>
where C1: Container, G: Scope,
{ // Required methods fn unary_fallible<DCB, ECB, B, P>( &self, pact: P, name: &str, constructor: B, ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>) where DCB: ContainerBuilder, ECB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> Box<dyn FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>, &mut OutputHandleCore<'_, G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>) + 'static>, P: ParallelizationContract<G::Timestamp, C1>; fn unary_async<CB, P, B, BFut>( &self, pact: P, name: String, constructor: B, ) -> StreamCore<G, CB::Container> where CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo, AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>, AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) -> BFut, BFut: Future + 'static, P: ParallelizationContract<G::Timestamp, C1>; fn binary_async<C2, CB, P1, P2, B, BFut>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: String, constructor: B, ) -> StreamCore<G, CB::Container> where C2: Container + 'static, CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo, AsyncInputHandle<G::Timestamp, C1, ConnectedToOne>, AsyncInputHandle<G::Timestamp, C2, ConnectedToOne>, AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) -> BFut, BFut: Future + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>; fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B) where B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut, BFut: Future + 'static, P: ParallelizationContract<G::Timestamp, C1>; fn flat_map_fallible<DCB, ECB, D2, E, I, L>( &self, name: &str, logic: L, ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>) where DCB: ContainerBuilder + PushInto<D2>, ECB: ContainerBuilder + PushInto<E>, I: IntoIterator<Item = Result<D2, E>>, L: for<'a> FnMut(C1::Item<'a>) -> I + 'static; fn expire_stream_at( &self, name: &str, expiration: G::Timestamp, token: Weak<()>, ) -> StreamCore<G, C1>; fn pass_through<CB, R>( &self, name: &str, unit: R, ) -> StreamCore<G, CB::Container> where CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>, R: Data; fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1>; fn distribute(&self) -> StreamCore<G, C1> where C1: ContainerBytes + Send; // Provided method fn map_fallible<DCB, ECB, D2, E, L>( &self, name: &str, logic: L, ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>) where DCB: ContainerBuilder + PushInto<D2>, ECB: ContainerBuilder + PushInto<E>, L: for<'a> FnMut(C1::Item<'a>) -> Result<D2, E> + 'static { ... } }
Expand description

Extension methods for timely StreamCores.

Required Methods§

source

fn unary_fallible<DCB, ECB, B, P>( &self, pact: P, name: &str, constructor: B, ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
where DCB: ContainerBuilder, ECB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> Box<dyn FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>, &mut OutputHandleCore<'_, G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>) + 'static>, P: ParallelizationContract<G::Timestamp, C1>,

Like timely::dataflow::operators::generic::operator::Operator::unary, but the logic function can handle failures.

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact and repeatedly invokes logic, the function returned by the function passed as constructor. The logic function can read to the input stream and write to either of two output streams, where the first output stream represents successful computations and the second output stream represents failed computations.

source

fn unary_async<CB, P, B, BFut>( &self, pact: P, name: String, constructor: B, ) -> StreamCore<G, CB::Container>

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly schedules logic, the future returned by the function passed as constructor. logic can read from the input stream, and write to the output stream.

source

fn binary_async<C2, CB, P1, P2, B, BFut>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: String, constructor: B, ) -> StreamCore<G, CB::Container>

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly schedules logic, the future returned by the function passed as constructor. logic can read from the input streams, and write to the output stream.

source

fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)
where B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut, BFut: Future + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly schedules logic which can read from the input stream and inspect the frontier at the input.

source

fn flat_map_fallible<DCB, ECB, D2, E, I, L>( &self, name: &str, logic: L, ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
where DCB: ContainerBuilder + PushInto<D2>, ECB: ContainerBuilder + PushInto<E>, I: IntoIterator<Item = Result<D2, E>>, L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,

Like timely::dataflow::operators::map::Map::flat_map, but logic is allowed to fail. The first returned stream will contain the successful applications of logic, while the second returned stream will contain the failed applications.

source

fn expire_stream_at( &self, name: &str, expiration: G::Timestamp, token: Weak<()>, ) -> StreamCore<G, C1>

Block progress of the frontier at expiration time, unless the token is dropped.

source

fn pass_through<CB, R>( &self, name: &str, unit: R, ) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>, R: Data,

Take a Timely stream and convert it to a Differential stream, where each diff is “1” and each time is the current Timely timestamp.

source

fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1>

Wraps the stream with an operator that passes through all received inputs as long as the provided token can be upgraded. Once the token cannot be upgraded anymore, all data flowing into the operator is dropped.

source

fn distribute(&self) -> StreamCore<G, C1>
where C1: ContainerBytes + Send,

Distributes the data of the stream to all workers in a round-robin fashion.

Provided Methods§

source

fn map_fallible<DCB, ECB, D2, E, L>( &self, name: &str, logic: L, ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
where DCB: ContainerBuilder + PushInto<D2>, ECB: ContainerBuilder + PushInto<E>, L: for<'a> FnMut(C1::Item<'a>) -> Result<D2, E> + 'static,

Like timely::dataflow::operators::map::Map::map, but logic is allowed to fail. The first returned stream will contain the successful applications of logic, while the second returned stream will contain the failed applications.

Object Safety§

This trait is not object safe.

Implementations on Foreign Types§

source§

impl<G, C1> StreamExt<G, C1> for StreamCore<G, C1>
where C1: Container + Data, G: Scope,

source§

fn sink_async<P, B, BFut>(&self, pact: P, name: String, constructor: B)
where B: FnOnce(OperatorInfo, AsyncInputHandle<G::Timestamp, C1, Disconnected>) -> BFut, BFut: Future + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly schedules logic which can read from the input stream and inspect the frontier at the input.

source§

fn unary_fallible<DCB, ECB, B, P>( &self, pact: P, name: &str, constructor: B, ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
where DCB: ContainerBuilder, ECB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> Box<dyn FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, DCB, Tee<G::Timestamp, DCB::Container>>, &mut OutputHandleCore<'_, G::Timestamp, ECB, Tee<G::Timestamp, ECB::Container>>) + 'static>, P: ParallelizationContract<G::Timestamp, C1>,

source§

fn unary_async<CB, P, B, BFut>( &self, pact: P, name: String, constructor: B, ) -> StreamCore<G, CB::Container>

source§

fn binary_async<C2, CB, P1, P2, B, BFut>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: String, constructor: B, ) -> StreamCore<G, CB::Container>

source§

fn flat_map_fallible<DCB, ECB, D2, E, I, L>( &self, name: &str, logic: L, ) -> (StreamCore<G, DCB::Container>, StreamCore<G, ECB::Container>)
where DCB: ContainerBuilder + PushInto<D2>, ECB: ContainerBuilder + PushInto<E>, I: IntoIterator<Item = Result<D2, E>>, L: for<'a> FnMut(C1::Item<'a>) -> I + 'static,

source§

fn expire_stream_at( &self, name: &str, expiration: G::Timestamp, token: Weak<()>, ) -> StreamCore<G, C1>

source§

fn pass_through<CB, R>( &self, name: &str, unit: R, ) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>, R: Data,

source§

fn with_token(&self, token: Weak<()>) -> StreamCore<G, C1>

source§

fn distribute(&self) -> StreamCore<G, C1>
where C1: ContainerBytes + Send,

Implementors§