Trait Operator

Source
pub trait Operator<G: Scope, C1> {
    // Required methods
    fn unary_frontier<CB, B, L, P>(
        &self,
        pact: P,
        name: &str,
        constructor: B,
    ) -> StreamCore<G, CB::Container>
       where CB: ContainerBuilder,
             B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
             L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>), &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
             P: ParallelizationContract<G::Timestamp, C1>;
    fn unary_notify<CB: ContainerBuilder, L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>(
        &self,
        pact: P,
        name: &str,
        init: impl IntoIterator<Item = G::Timestamp>,
        logic: L,
    ) -> StreamCore<G, CB::Container>;
    fn unary<CB, B, L, P>(
        &self,
        pact: P,
        name: &str,
        constructor: B,
    ) -> StreamCore<G, CB::Container>
       where CB: ContainerBuilder,
             B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
             L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
             P: ParallelizationContract<G::Timestamp, C1>;
    fn binary_frontier<C2, CB, B, L, P1, P2>(
        &self,
        other: &StreamCore<G, C2>,
        pact1: P1,
        pact2: P2,
        name: &str,
        constructor: B,
    ) -> StreamCore<G, CB::Container>
       where C2: Container,
             CB: ContainerBuilder,
             B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
             L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>), (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>), &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
             P1: ParallelizationContract<G::Timestamp, C1>,
             P2: ParallelizationContract<G::Timestamp, C2>;
    fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>(
        &self,
        other: &StreamCore<G, C2>,
        pact1: P1,
        pact2: P2,
        name: &str,
        init: impl IntoIterator<Item = G::Timestamp>,
        logic: L,
    ) -> StreamCore<G, CB::Container>;
    fn binary<C2, CB, B, L, P1, P2>(
        &self,
        other: &StreamCore<G, C2>,
        pact1: P1,
        pact2: P2,
        name: &str,
        constructor: B,
    ) -> StreamCore<G, CB::Container>
       where C2: Container,
             CB: ContainerBuilder,
             B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
             L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static,
             P1: ParallelizationContract<G::Timestamp, C1>,
             P2: ParallelizationContract<G::Timestamp, C2>;
    fn sink<L, P>(&self, pact: P, name: &str, logic: L)
       where L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>)) + 'static,
             P: ParallelizationContract<G::Timestamp, C1>;
}
Expand description

Methods to construct generic streaming and blocking operators.

Required Methods§

Source

fn unary_frontier<CB, B, L, P>( &self, pact: P, name: &str, constructor: B, ) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>), &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, write to the output stream, and inspect the frontier at the input.

§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0u64..10).to_stream(scope)
        .unary_frontier(Pipeline, "example", |default_cap, _info| {
            let mut cap = Some(default_cap.delayed(&12));
            let mut notificator = FrontierNotificator::default();
            let mut stash = HashMap::new();
            move |(input, frontier), output| {
                if let Some(ref c) = cap.take() {
                    output.session(&c).give(12);
                }
                input.for_each_time(|time, data| {
                    stash.entry(time.time().clone())
                         .or_insert(Vec::new())
                         .extend(data.flat_map(|d| d.drain(..)));
                });
                notificator.for_each(&[frontier], |time, _not| {
                    if let Some(mut vec) = stash.remove(time.time()) {
                        output.session(&time).give_iterator(vec.drain(..));
                    }
                });
            }
        })
        .container::<Vec<_>>();
});
Source

fn unary_notify<CB: ContainerBuilder, L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>( &self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, write to the output stream, and inspect the frontier at the input.

§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0u64..10)
        .to_stream(scope)
        .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
            input.for_each_time(|time, data| {
                output.session(&time).give_containers(data);
                notificator.notify_at(time.retain());
            });
            notificator.for_each(|time, _cnt, _not| {
                println!("notified at {:?}", time);
            });
        });
});
Source

fn unary<CB, B, L, P>( &self, pact: P, name: &str, constructor: B, ) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input stream, and write to the output stream.

§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;

timely::example(|scope| {
    (0u64..10).to_stream(scope)
        .unary(Pipeline, "example", |default_cap, _info| {
            let mut cap = Some(default_cap.delayed(&12));
            move |input, output| {
                if let Some(ref c) = cap.take() {
                    output.session(&c).give(100);
                }
                input.for_each_time(|time, data| {
                    output.session(&time).give_containers(data);
                });
            }
        });
});
Source

fn binary_frontier<C2, CB, B, L, P1, P2>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B, ) -> StreamCore<G, CB::Container>
where C2: Container, CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut((InputSession<'_, G::Timestamp, C1, P1::Puller>, &MutableAntichain<G::Timestamp>), (InputSession<'_, G::Timestamp, C2, P2::Puller>, &MutableAntichain<G::Timestamp>), &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>,

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs.

§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::execute(timely::Config::thread(), |worker| {
   let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
       let (in1_handle, in1) = scope.new_input();
       let (in2_handle, in2) = scope.new_input();
       in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
           let mut notificator = FrontierNotificator::default();
           let mut stash = HashMap::new();
           move |(input1, frontier1), (input2, frontier2), output| {
               input1.for_each_time(|time, data| {
                   stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
                   notificator.notify_at(time.retain());
               });
               input2.for_each_time(|time, data| {
                   stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.flat_map(|d| d.drain(..)));
                   notificator.notify_at(time.retain());
               });
               notificator.for_each(&[frontier1, frontier2], |time, _not| {
                   if let Some(mut vec) = stash.remove(time.time()) {
                       output.session(&time).give_iterator(vec.drain(..));
                   }
               });
           }
       })
       .container::<Vec<_>>()
       .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));

       (in1_handle, in2_handle)
   });

   for i in 1..10 {
       in1.send(i - 1);
       in1.advance_to(i);
       in2.send(i - 1);
       in2.advance_to(i);
   }
}).unwrap();
Source

fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs.

§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::execute(timely::Config::thread(), |worker| {
   let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
       let (in1_handle, in1) = scope.new_input();
       let (in2_handle, in2) = scope.new_input();

       in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
           input1.for_each_time(|time, data| {
               output.session(&time).give_containers(data);
               notificator.notify_at(time.retain());
           });
           input2.for_each_time(|time, data| {
               output.session(&time).give_containers(data);
               notificator.notify_at(time.retain());
           });
           notificator.for_each(|time, _cnt, _not| {
               println!("notified at {:?}", time);
           });
       });

       (in1_handle, in2_handle)
   });

   for i in 1..10 {
       in1.send(i - 1);
       in1.advance_to(i);
       in2.send(i - 1);
       in2.advance_to(i);
   }
}).unwrap();
Source

fn binary<C2, CB, B, L, P1, P2>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, constructor: B, ) -> StreamCore<G, CB::Container>
where C2: Container, CB: ContainerBuilder, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(InputSession<'_, G::Timestamp, C1, P1::Puller>, InputSession<'_, G::Timestamp, C2, P2::Puller>, &mut OutputBuilderSession<'_, G::Timestamp, CB>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>,

Creates a new dataflow operator that partitions its input streams by a parallelization strategy pact, and repeatedly invokes logic, the function returned by the function passed as constructor. logic can read from the input streams, write to the output stream, and inspect the frontier at the inputs.

§Examples
use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;

timely::example(|scope| {
    let stream2 = (0u64..10).to_stream(scope);
    (0u64..10).to_stream(scope)
        .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
            let mut cap = Some(default_cap.delayed(&12));
            move |input1, input2, output| {
                if let Some(ref c) = cap.take() {
                    output.session(&c).give(100);
                }
                input1.for_each_time(|time, data| output.session(&time).give_containers(data));
                input2.for_each_time(|time, data| output.session(&time).give_containers(data));
            }
        }).inspect(|x| println!("{:?}", x));
});
Source

fn sink<L, P>(&self, pact: P, name: &str, logic: L)
where L: FnMut((InputSession<'_, G::Timestamp, C1, P::Puller>, &MutableAntichain<G::Timestamp>)) + 'static, P: ParallelizationContract<G::Timestamp, C1>,

Creates a new dataflow operator that partitions its input stream by a parallelization strategy pact, and repeatedly invokes the function logic which can read from the input stream and inspect the frontier at the input.

§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;

timely::example(|scope| {
    (0u64..10)
        .to_stream(scope)
        .sink(Pipeline, "example", |(input, frontier)| {
            input.for_each_time(|time, data| {
                for datum in data.flatten() {
                    println!("{:?}:\t{:?}", time, datum);
                }
            });
        });
});

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<G: Scope, C1: Container> Operator<G, C1> for StreamCore<G, C1>