pub trait Operator<G: Scope, C1: Container> {
// Required methods
fn unary_frontier<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>;
fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, CB::Container>;
fn unary<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>
where CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>;
fn binary_frontier<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>
where C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>;
fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, CB::Container>;
fn binary<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>
where C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>;
fn sink<L, P>(&self, pact: P, name: &str, logic: L)
where L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>;
}
Expand description
Methods to construct generic streaming and blocking operators.
Required Methods§
sourcefn unary_frontier<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary_frontier<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
fn main() {
timely::example(|scope| {
(0u64..10).to_stream(scope)
.unary_frontier(Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
let mut notificator = FrontierNotificator::new();
let mut stash = HashMap::new();
move |input, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(12);
}
while let Some((time, data)) = input.next() {
stash.entry(time.time().clone())
.or_insert(Vec::new())
.extend(data.drain(..));
}
notificator.for_each(&[input.frontier()], |time, _not| {
if let Some(mut vec) = stash.remove(time.time()) {
output.session(&time).give_iterator(vec.drain(..));
}
});
}
})
.container::<Vec<_>>();
});
}
sourcefn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, CB::Container>
fn unary_notify<CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContract<G::Timestamp, C1>>( &self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
fn main() {
timely::example(|scope| {
(0u64..10)
.to_stream(scope)
.unary_notify(Pipeline, "example", None, move |input, output, notificator| {
input.for_each(|time, data| {
output.session(&time).give_container(data);
notificator.notify_at(time.retain());
});
notificator.for_each(|time, _cnt, _not| {
println!("notified at {:?}", time);
});
});
});
}
sourcefn unary<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn unary<CB, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, and write to the output stream.
§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
(0u64..10).to_stream(scope)
.unary(Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
move |input, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(100);
}
while let Some((time, data)) = input.next() {
output.session(&time).give_container(data);
}
}
});
});
sourcefn binary_frontier<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary_frontier<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::execute(timely::Config::thread(), |worker| {
let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
let (in1_handle, in1) = scope.new_input();
let (in2_handle, in2) = scope.new_input();
in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
let mut notificator = FrontierNotificator::new();
let mut stash = HashMap::new();
move |input1, input2, output| {
while let Some((time, data)) = input1.next() {
stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
notificator.notify_at(time.retain());
}
while let Some((time, data)) = input2.next() {
stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
notificator.notify_at(time.retain());
}
notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
if let Some(mut vec) = stash.remove(time.time()) {
output.session(&time).give_iterator(vec.drain(..));
}
});
}
})
.container::<Vec<_>>()
.inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
(in1_handle, in2_handle)
});
for i in 1..10 {
in1.send(i - 1);
in1.advance_to(i);
in2.send(i - 1);
in2.advance_to(i);
}
}).unwrap();
sourcefn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L,
) -> StreamCore<G, CB::Container>
fn binary_notify<C2: Container, CB: ContainerBuilder, L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContract<G::Timestamp, C1>, P2: ParallelizationContract<G::Timestamp, C2>>( &self, other: &StreamCore<G, C2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L, ) -> StreamCore<G, CB::Container>
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
§Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::execute(timely::Config::thread(), |worker| {
let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
let (in1_handle, in1) = scope.new_input();
let (in2_handle, in2) = scope.new_input();
in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
input1.for_each(|time, data| {
output.session(&time).give_container(data);
notificator.notify_at(time.retain());
});
input2.for_each(|time, data| {
output.session(&time).give_container(data);
notificator.notify_at(time.retain());
});
notificator.for_each(|time, _cnt, _not| {
println!("notified at {:?}", time);
});
});
(in1_handle, in2_handle)
});
for i in 1..10 {
in1.send(i - 1);
in1.advance_to(i);
in2.send(i - 1);
in2.advance_to(i);
}
}).unwrap();
sourcefn binary<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
fn binary<C2, CB, B, L, P1, P2>(
&self,
other: &StreamCore<G, C2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B,
) -> StreamCore<G, CB::Container>where
C2: Container,
CB: ContainerBuilder,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, C1, P1::Puller>, &mut InputHandleCore<G::Timestamp, C2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>) + 'static,
P1: ParallelizationContract<G::Timestamp, C1>,
P2: ParallelizationContract<G::Timestamp, C2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
§Examples
use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
let stream2 = (0u64..10).to_stream(scope);
(0u64..10).to_stream(scope)
.binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
move |input1, input2, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(100);
}
while let Some((time, data)) = input1.next() {
output.session(&time).give_container(data);
}
while let Some((time, data)) = input2.next() {
output.session(&time).give_container(data);
}
}
}).inspect(|x| println!("{:?}", x));
});
sourcefn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
fn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, C1, P::Puller>) + 'static,
P: ParallelizationContract<G::Timestamp, C1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes the function logic
which can read from the input stream
and inspect the frontier at the input.
§Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
(0u64..10)
.to_stream(scope)
.sink(Pipeline, "example", |input| {
while let Some((time, data)) = input.next() {
for datum in data.iter() {
println!("{:?}:\t{:?}", time, datum);
}
}
});
});