pub struct Variable<G, C>{ /* private fields */ }Expand description
A recursively defined collection.
The Variable struct allows differential dataflow programs requiring more sophisticated
iterative patterns than singly recursive iteration. For example: in mutual recursion two
collections evolve simultaneously.
§Examples
The following example is equivalent to the example for the Iterate trait.
use timely::order::Product;
use timely::dataflow::Scope;
use differential_dataflow::input::Input;
use differential_dataflow::operators::iterate::Variable;
::timely::example(|scope| {
let numbers = scope.new_collection_from(1 .. 10u32).1;
scope.iterative::<u64,_,_>(|nested| {
let summary = Product::new(Default::default(), 1);
let variable = Variable::new_from(numbers.enter(nested), summary);
let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x })
.consolidate();
variable.set(&result)
.leave()
});
})Implementations§
Source§impl<G, C> Variable<G, C>
impl<G, C> Variable<G, C>
Sourcepub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self
Creates a new initially empty Variable.
This method produces a simpler dataflow graph than new_from, and should
be used whenever the variable has an empty input.
Sourcepub fn new_from(
source: Collection<G, C>,
step: <G::Timestamp as Timestamp>::Summary,
) -> Self
pub fn new_from( source: Collection<G, C>, step: <G::Timestamp as Timestamp>::Summary, ) -> Self
Creates a new Variable from a supplied source stream.
Sourcepub fn set(self, result: &Collection<G, C>) -> Collection<G, C>
pub fn set(self, result: &Collection<G, C>) -> Collection<G, C>
Set the definition of the Variable to a collection.
This method binds the Variable to be equal to the supplied collection,
which may be recursively defined in terms of the variable itself.
Sourcepub fn set_concat(self, result: &Collection<G, C>) -> Collection<G, C>
pub fn set_concat(self, result: &Collection<G, C>) -> Collection<G, C>
Set the definition of the Variable to a collection concatenated to self.
This method is a specialization of set which has the effect of concatenating
result and self before calling set. This method avoids some dataflow
complexity related to retracting the initial input, and will do less work in
that case.
This behavior can also be achieved by using new to create an empty initial
collection, and then using self.set(self.concat(result)).
Methods from Deref<Target = Collection<G, C>>§
Sourcepub fn concat(&self, other: &Self) -> Self
pub fn concat(&self, other: &Self) -> Self
Creates a new collection accumulating the contents of the two collections.
Despite the name, differential dataflow collections are unordered. This method is so named because the implementation is the concatenation of the stream of updates, but it corresponds to the addition of the two collections.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let odds = data.filter(|x| x % 2 == 1);
let evens = data.filter(|x| x % 2 == 0);
odds.concat(&evens)
.assert_eq(&data);
});Sourcepub fn concatenate<I>(&self, sources: I) -> Selfwhere
I: IntoIterator<Item = Self>,
pub fn concatenate<I>(&self, sources: I) -> Selfwhere
I: IntoIterator<Item = Self>,
Creates a new collection accumulating the contents of the two collections.
Despite the name, differential dataflow collections are unordered. This method is so named because the implementation is the concatenation of the stream of updates, but it corresponds to the addition of the two collections.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let odds = data.filter(|x| x % 2 == 1);
let evens = data.filter(|x| x % 2 == 0);
odds.concatenate(Some(evens))
.assert_eq(&data);
});Sourcepub fn enter_region<'a>(
&self,
child: &Child<'a, G, <G as ScopeParent>::Timestamp>,
) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C>
pub fn enter_region<'a>( &self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>, ) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C>
This method is a specialization of enter to the case where the nested scope is a region.
It removes the need for an operator that adjusts the timestamp.
Sourcepub fn inspect_container<F>(&self, func: F) -> Self
pub fn inspect_container<F>(&self, func: F) -> Self
Applies a supplied function to each batch of updates.
This method is analogous to inspect, but operates on batches and reveals the timestamp of the
timely dataflow capability associated with the batch of updates. The observed batching depends
on how the system executes, and may vary run to run.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map_in_place(|x| *x *= 2)
.filter(|x| x % 2 == 1)
.inspect_container(|event| println!("event: {:?}", event));
});Sourcepub fn probe(&self) -> Handle<G::Timestamp>
pub fn probe(&self) -> Handle<G::Timestamp>
Attaches a timely dataflow probe to the output of a Collection.
This probe is used to determine when the state of the Collection has stabilized and can be read out.
Sourcepub fn probe_with(&self, handle: &Handle<G::Timestamp>) -> Self
pub fn probe_with(&self, handle: &Handle<G::Timestamp>) -> Self
Attaches a timely dataflow probe to the output of a Collection.
This probe is used to determine when the state of the Collection has stabilized and all updates observed. In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a computation can wait until the probe has caught up to the input before introducing more rounds of data, to avoid swamping the system.
Sourcepub fn negate(&self) -> Selfwhere
C: Negate,
pub fn negate(&self) -> Selfwhere
C: Negate,
Creates a new collection whose counts are the negation of those in the input.
This method is most commonly used with concat to get those element in one collection but not another.
However, differential dataflow computations are still defined for all values of the difference type R,
including negative counts.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let odds = data.filter(|x| x % 2 == 1);
let evens = data.filter(|x| x % 2 == 0);
odds.negate()
.concat(&data)
.assert_eq(&evens);
});Sourcepub fn enter<'a, T>(
&self,
child: &Child<'a, G, T>,
) -> Collection<Child<'a, G, T>, <C as Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>where
C: Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
T: Refines<<G as ScopeParent>::Timestamp>,
pub fn enter<'a, T>(
&self,
child: &Child<'a, G, T>,
) -> Collection<Child<'a, G, T>, <C as Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>where
C: Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
T: Refines<<G as ScopeParent>::Timestamp>,
Brings a Collection into a nested scope.
§Examples
use timely::dataflow::Scope;
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let result = scope.region(|child| {
data.enter(child)
.leave()
});
data.assert_eq(&result);
});Sourcepub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
Advances a timestamp in the stream according to the timestamp actions on the path.
The path may advance the timestamp sufficiently that it is no longer valid, for example if incrementing fields would result in integer overflow. In this case, the record is dropped.
§Examples
use timely::dataflow::Scope;
use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
use differential_dataflow::input::Input;
timely::example(|scope| {
let summary1 = 5;
let data = scope.new_collection_from(1 .. 10).1;
/// Applies `results_in` on every timestamp in the collection.
data.results_in(summary1);
});Trait Implementations§
Auto Trait Implementations§
impl<G, C> Freeze for Variable<G, C>
impl<G, C> !RefUnwindSafe for Variable<G, C>
impl<G, C> !Send for Variable<G, C>
impl<G, C> !Sync for Variable<G, C>
impl<G, C> Unpin for Variable<G, C>where
<<G as ScopeParent>::Timestamp as Timestamp>::Summary: Unpin,
G: Unpin,
C: Unpin,
<G as ScopeParent>::Timestamp: Unpin,
impl<G, C> !UnwindSafe for Variable<G, C>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign, for types that do not implement AddAssign.