pub trait Operator<G: Scope, D1: Container> {
// Required methods
fn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> StreamCore<G, D2>
where D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>;
fn unary_notify<D2: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> StreamCore<G, D2>;
fn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> StreamCore<G, D2>
where D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>;
fn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> StreamCore<G, D3>
where D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>;
fn binary_notify<D2: Container, D3: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContractCore<G::Timestamp, D1>, P2: ParallelizationContractCore<G::Timestamp, D2>>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> StreamCore<G, D3>;
fn binary<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> StreamCore<G, D3>
where D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>;
fn sink<L, P>(&self, pact: P, name: &str, logic: L)
where L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>;
}
Expand description
Methods to construct generic streaming and blocking operators.
Required Methods§
sourcefn unary_frontier<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> StreamCore<G, D2>where
D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
fn unary_frontier<D2, B, L, P>( &self, pact: P, name: &str, constructor: B ) -> StreamCore<G, D2>where D2: Container, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input.
Examples
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
fn main() {
timely::example(|scope| {
(0u64..10).to_stream(scope)
.unary_frontier(Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
let mut notificator = FrontierNotificator::new();
let mut stash = HashMap::new();
let mut vector = Vec::new();
move |input, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(12);
}
while let Some((time, data)) = input.next() {
data.swap(&mut vector);
stash.entry(time.time().clone())
.or_insert(Vec::new())
.extend(vector.drain(..));
}
notificator.for_each(&[input.frontier()], |time, _not| {
if let Some(mut vec) = stash.remove(time.time()) {
output.session(&time).give_iterator(vec.drain(..));
}
});
}
});
});
}
sourcefn unary_notify<D2: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>>(
&self,
pact: P,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> StreamCore<G, D2>
fn unary_notify<D2: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>, &mut Notificator<'_, G::Timestamp>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>>( &self, pact: P, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L ) -> StreamCore<G, D2>
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, write to the output stream, and inspect the frontier at the input.
Examples
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Pipeline;
fn main() {
timely::example(|scope| {
let mut vector = Vec::new();
(0u64..10)
.to_stream(scope)
.unary_notify(Pipeline, "example", None, move |input, output, notificator| {
input.for_each(|time, data| {
data.swap(&mut vector);
output.session(&time).give_vec(&mut vector);
notificator.notify_at(time.retain());
});
notificator.for_each(|time, _cnt, _not| {
println!("notified at {:?}", time);
});
});
});
}
sourcefn unary<D2, B, L, P>(
&self,
pact: P,
name: &str,
constructor: B
) -> StreamCore<G, D2>where
D2: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
fn unary<D2, B, L, P>( &self, pact: P, name: &str, constructor: B ) -> StreamCore<G, D2>where D2: Container, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D2, TeeCore<G::Timestamp, D2>>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input stream, and write to the output stream.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
(0u64..10).to_stream(scope)
.unary(Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
let mut vector = Vec::new();
move |input, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(100);
}
while let Some((time, data)) = input.next() {
data.swap(&mut vector);
output.session(&time).give_vec(&mut vector);
}
}
});
});
sourcefn binary_frontier<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> StreamCore<G, D3>where
D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>,
fn binary_frontier<D2, D3, B, L, P1, P2>( &self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B ) -> StreamCore<G, D3>where D2: Container, D3: Container, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P1::Puller>, &mut FrontieredInputHandleCore<'_, G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static, P1: ParallelizationContractCore<G::Timestamp, D1>, P2: ParallelizationContractCore<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::execute(timely::Config::thread(), |worker| {
let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
let (in1_handle, in1) = scope.new_input();
let (in2_handle, in2) = scope.new_input();
in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
let mut notificator = FrontierNotificator::new();
let mut stash = HashMap::new();
let mut vector1 = Vec::new();
let mut vector2 = Vec::new();
move |input1, input2, output| {
while let Some((time, data)) = input1.next() {
data.swap(&mut vector1);
stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..));
notificator.notify_at(time.retain());
}
while let Some((time, data)) = input2.next() {
data.swap(&mut vector2);
stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..));
notificator.notify_at(time.retain());
}
notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
if let Some(mut vec) = stash.remove(time.time()) {
output.session(&time).give_iterator(vec.drain(..));
}
});
}
}).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
(in1_handle, in2_handle)
});
for i in 1..10 {
in1.send(i - 1);
in1.advance_to(i);
in2.send(i - 1);
in2.advance_to(i);
}
}).unwrap();
sourcefn binary_notify<D2: Container, D3: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContractCore<G::Timestamp, D1>, P2: ParallelizationContractCore<G::Timestamp, D2>>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
init: impl IntoIterator<Item = G::Timestamp>,
logic: L
) -> StreamCore<G, D3>
fn binary_notify<D2: Container, D3: Container, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>, &mut Notificator<'_, G::Timestamp>) + 'static, P1: ParallelizationContractCore<G::Timestamp, D1>, P2: ParallelizationContractCore<G::Timestamp, D2>>( &self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item = G::Timestamp>, logic: L ) -> StreamCore<G, D3>
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
Examples
use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
timely::execute(timely::Config::thread(), |worker| {
let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
let (in1_handle, in1) = scope.new_input();
let (in2_handle, in2) = scope.new_input();
let mut vector1 = Vec::new();
let mut vector2 = Vec::new();
in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
input1.for_each(|time, data| {
data.swap(&mut vector1);
output.session(&time).give_vec(&mut vector1);
notificator.notify_at(time.retain());
});
input2.for_each(|time, data| {
data.swap(&mut vector2);
output.session(&time).give_vec(&mut vector2);
notificator.notify_at(time.retain());
});
notificator.for_each(|time, _cnt, _not| {
println!("notified at {:?}", time);
});
});
(in1_handle, in2_handle)
});
for i in 1..10 {
in1.send(i - 1);
in1.advance_to(i);
in2.send(i - 1);
in2.advance_to(i);
}
}).unwrap();
sourcefn binary<D2, D3, B, L, P1, P2>(
&self,
other: &StreamCore<G, D2>,
pact1: P1,
pact2: P2,
name: &str,
constructor: B
) -> StreamCore<G, D3>where
D2: Container,
D3: Container,
B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static,
P1: ParallelizationContractCore<G::Timestamp, D1>,
P2: ParallelizationContractCore<G::Timestamp, D2>,
fn binary<D2, D3, B, L, P1, P2>( &self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B ) -> StreamCore<G, D3>where D2: Container, D3: Container, B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L, L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>, &mut InputHandleCore<G::Timestamp, D2, P2::Puller>, &mut OutputHandleCore<'_, G::Timestamp, D3, TeeCore<G::Timestamp, D3>>) + 'static, P1: ParallelizationContractCore<G::Timestamp, D1>, P2: ParallelizationContractCore<G::Timestamp, D2>,
Creates a new dataflow operator that partitions its input streams by a parallelization
strategy pact
, and repeatedly invokes logic
, the function returned by the function passed as constructor
.
logic
can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
Examples
use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
let stream2 = (0u64..10).to_stream(scope);
(0u64..10).to_stream(scope)
.binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
let mut cap = Some(default_cap.delayed(&12));
let mut vector1 = Vec::new();
let mut vector2 = Vec::new();
move |input1, input2, output| {
if let Some(ref c) = cap.take() {
output.session(&c).give(100);
}
while let Some((time, data)) = input1.next() {
data.swap(&mut vector1);
output.session(&time).give_vec(&mut vector1);
}
while let Some((time, data)) = input2.next() {
data.swap(&mut vector2);
output.session(&time).give_vec(&mut vector2);
}
}
}).inspect(|x| println!("{:?}", x));
});
sourcefn sink<L, P>(&self, pact: P, name: &str, logic: L)where
L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>) + 'static,
P: ParallelizationContractCore<G::Timestamp, D1>,
fn sink<L, P>(&self, pact: P, name: &str, logic: L)where L: FnMut(&mut FrontieredInputHandleCore<'_, G::Timestamp, D1, P::Puller>) + 'static, P: ParallelizationContractCore<G::Timestamp, D1>,
Creates a new dataflow operator that partitions its input stream by a parallelization
strategy pact
, and repeatedly invokes the function logic
which can read from the input stream
and inspect the frontier at the input.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::Scope;
timely::example(|scope| {
(0u64..10)
.to_stream(scope)
.sink(Pipeline, "example", |input| {
while let Some((time, data)) = input.next() {
for datum in data.iter() {
println!("{:?}:\t{:?}", time, datum);
}
}
});
});