Struct differential_dataflow::operators::iterate::Variable

source ·
pub struct Variable<G: Scope, D: Data, R: Abelian + 'static>
where G::Timestamp: Lattice,
{ /* private fields */ }
Expand description

A recursively defined collection.

The Variable struct allows differential dataflow programs requiring more sophisticated iterative patterns than singly recursive iteration. For example: in mutual recursion two collections evolve simultaneously.

§Examples

The following example is equivalent to the example for the Iterate trait.

use timely::order::Product;
use timely::dataflow::Scope;

use differential_dataflow::input::Input;
use differential_dataflow::operators::iterate::Variable;

::timely::example(|scope| {

    let numbers = scope.new_collection_from(1 .. 10u32).1;

    scope.iterative::<u64,_,_>(|nested| {
        let summary = Product::new(Default::default(), 1);
        let variable = Variable::new_from(numbers.enter(nested), summary);
        let result = variable.map(|x| if x % 2 == 0 { x/2 } else { x })
                             .consolidate();
        variable.set(&result)
                .leave()
    });
})

Implementations§

source§

impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R>
where G::Timestamp: Lattice,

source

pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self

Creates a new initially empty Variable.

This method produces a simpler dataflow graph than new_from, and should be used whenever the variable has an empty input.

source

pub fn new_from( source: Collection<G, D, R>, step: <G::Timestamp as Timestamp>::Summary, ) -> Self

Creates a new Variable from a supplied source stream.

source

pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R>

Set the definition of the Variable to a collection.

This method binds the Variable to be equal to the supplied collection, which may be recursively defined in terms of the variable itself.

source

pub fn set_concat(self, result: &Collection<G, D, R>) -> Collection<G, D, R>

Set the definition of the Variable to a collection concatenated to self.

This method is a specialization of set which has the effect of concatenating result and self before calling set. This method avoids some dataflow complexity related to retracting the initial input, and will do less work in that case.

This behavior can also be achieved by using new to create an empty initial collection, and then using self.set(self.concat(result)).

Methods from Deref<Target = Collection<G, D, R>>§

source

pub fn consolidate(&self) -> Self

Aggregates the weights of equal records into at most one record.

This method uses the type D’s hashed() method to partition the data. The data are accumulated in place, each held back until their timestamp has completed.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let x = scope.new_collection_from(1 .. 10u32).1;

    x.negate()
     .concat(&x)
     .consolidate() // <-- ensures cancellation occurs
     .assert_empty();
});
source

pub fn consolidate_named<Tr>(&self, name: &str) -> Self
where Tr: Trace<Time = G::Timestamp, Diff = R> + 'static, for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>, Tr::Batch: Batch, Tr::Batcher: Batcher<Input = Vec<((D, ()), G::Timestamp, R)>>,

As consolidate but with the ability to name the operator and specify the trace type.

source

pub fn consolidate_stream(&self) -> Self

Aggregates the weights of equal records.

Unlike consolidate, this method does not exchange data and does not ensure that at most one copy of each (data, time) pair exists in the results. Instead, it acts on each batch of data and collapses equivalent (data, time) pairs found therein, suppressing any that accumulate to zero.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let x = scope.new_collection_from(1 .. 10u32).1;

    // nothing to assert, as no particular guarantees.
    x.negate()
     .concat(&x)
     .consolidate_stream();
});
source

pub fn enter_dynamic(&self, _level: usize) -> Self

Enters a dynamically created scope which has level timestamp coordinates.

source

pub fn leave_dynamic(&self, level: usize) -> Self

Leaves a dynamically created scope which has level timestamp coordinates.

source

pub fn concat(&self, other: &Self) -> Self

Creates a new collection accumulating the contents of the two collections.

Despite the name, differential dataflow collections are unordered. This method is so named because the implementation is the concatenation of the stream of updates, but it corresponds to the addition of the two collections.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let data = scope.new_collection_from(1 .. 10).1;

    let odds = data.filter(|x| x % 2 == 1);
    let evens = data.filter(|x| x % 2 == 0);

    odds.concat(&evens)
        .assert_eq(&data);
});
source

pub fn concatenate<I>(&self, sources: I) -> Self
where I: IntoIterator<Item = Self>,

Creates a new collection accumulating the contents of the two collections.

Despite the name, differential dataflow collections are unordered. This method is so named because the implementation is the concatenation of the stream of updates, but it corresponds to the addition of the two collections.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let data = scope.new_collection_from(1 .. 10).1;

    let odds = data.filter(|x| x % 2 == 1);
    let evens = data.filter(|x| x % 2 == 0);

    odds.concatenate(Some(evens))
        .assert_eq(&data);
});
source

pub fn enter_region<'a>( &self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>, ) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R, C>

This method is a specialization of enter to the case where the nested scope is a region. It removes the need for an operator that adjusts the timestamp.

source

pub fn inspect_container<F>(&self, func: F) -> Self
where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,

Applies a supplied function to each batch of updates.

This method is analogous to inspect, but operates on batches and reveals the timestamp of the timely dataflow capability associated with the batch of updates. The observed batching depends on how the system executes, and may vary run to run.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {
    scope.new_collection_from(1 .. 10).1
         .map_in_place(|x| *x *= 2)
         .filter(|x| x % 2 == 1)
         .inspect_container(|event| println!("event: {:?}", event));
});
source

pub fn probe(&self) -> Handle<G::Timestamp>

Attaches a timely dataflow probe to the output of a Collection.

This probe is used to determine when the state of the Collection has stabilized and can be read out.

source

pub fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> Self

Attaches a timely dataflow probe to the output of a Collection.

This probe is used to determine when the state of the Collection has stabilized and all updates observed. In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a computation can wait until the probe has caught up to the input before introducing more rounds of data, to avoid swamping the system.

source

pub fn scope(&self) -> G

The scope containing the underlying timely dataflow stream.

source

pub fn map<D2, L>(&self, logic: L) -> Collection<G, D2, R>
where D2: Data, L: FnMut(D) -> D2 + 'static,

Creates a new collection by applying the supplied function to each input element.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {
    scope.new_collection_from(1 .. 10).1
         .map(|x| x * 2)
         .filter(|x| x % 2 == 1)
         .assert_empty();
});
source

pub fn map_in_place<L>(&self, logic: L) -> Collection<G, D, R>
where L: FnMut(&mut D) + 'static,

Creates a new collection by applying the supplied function to each input element.

Although the name suggests in-place mutation, this function does not change the source collection, but rather re-uses the underlying allocations in its implementation. The method is semantically equivalent to map, but can be more efficient.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {
    scope.new_collection_from(1 .. 10).1
         .map_in_place(|x| *x *= 2)
         .filter(|x| x % 2 == 1)
         .assert_empty();
});
source

pub fn flat_map<I, L>(&self, logic: L) -> Collection<G, I::Item, R>
where G::Timestamp: Clone, I: IntoIterator, I::Item: Data, L: FnMut(D) -> I + 'static,

Creates a new collection by applying the supplied function to each input element and accumulating the results.

This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be warned that if the iterators produce substantial amounts of data, they are currently fully drained before attempting to consolidate the results.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {
    scope.new_collection_from(1 .. 10).1
         .flat_map(|x| 0 .. x);
});
source

pub fn filter<L>(&self, logic: L) -> Collection<G, D, R>
where L: FnMut(&D) -> bool + 'static,

Creates a new collection containing those input records satisfying the supplied predicate.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {
    scope.new_collection_from(1 .. 10).1
         .map(|x| x * 2)
         .filter(|x| x % 2 == 1)
         .assert_empty();
});
source

pub fn explode<D2, R2, I, L>( &self, logic: L, ) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
where D2: Data, R2: Semigroup + Multiply<R>, <R2 as Multiply<R>>::Output: Semigroup + 'static, I: IntoIterator<Item = (D2, R2)>, L: FnMut(D) -> I + 'static,

Replaces each record with another, with a new difference type.

This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) and move the data into the difference component. This will allow differential dataflow to update in-place.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let nums = scope.new_collection_from(0 .. 10).1;
    let x1 = nums.flat_map(|x| 0 .. x);
    let x2 = nums.map(|x| (x, 9 - x))
                 .explode(|(x,y)| Some((x,y)));

    x1.assert_eq(&x2);
});
source

pub fn join_function<D2, R2, I, L>( &self, logic: L, ) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
where G::Timestamp: Lattice, D2: Data, R2: Semigroup + Multiply<R>, <R2 as Multiply<R>>::Output: Semigroup + 'static, I: IntoIterator<Item = (D2, G::Timestamp, R2)>, L: FnMut(D) -> I + 'static,

Joins each record against a collection defined by the function logic.

This method performs what is essentially a join with the collection of records (x, logic(x)). Rather than materialize this second relation, logic is applied to each record and the appropriate modifications made to the results, namely joining timestamps and multiplying differences.

#Examples

use differential_dataflow::input::Input;

::timely::example(|scope| {
    // creates `x` copies of `2*x` from time `3*x` until `4*x`,
    // for x from 0 through 9.
    scope.new_collection_from(0 .. 10isize).1
         .join_function(|x|
             //   data      time      diff
             vec![(2*x, (3*x) as u64,  x),
                  (2*x, (4*x) as u64, -x)]
          );
});
source

pub fn enter<'a, T>( &self, child: &Child<'a, G, T>, ) -> Collection<Child<'a, G, T>, D, R>
where T: Refines<<G as ScopeParent>::Timestamp>,

Brings a Collection into a nested scope.

§Examples
use timely::dataflow::Scope;
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let data = scope.new_collection_from(1 .. 10).1;

    let result = scope.region(|child| {
        data.enter(child)
            .leave()
    });

    data.assert_eq(&result);
});
source

pub fn enter_at<'a, T, F>( &self, child: &Iterative<'a, G, T>, initial: F, ) -> Collection<Iterative<'a, G, T>, D, R>
where T: Timestamp + Hash, F: FnMut(&D) -> T + Clone + 'static, G::Timestamp: Hash,

Brings a Collection into a nested scope, at varying times.

The initial function indicates the time at which each element of the Collection should appear.

§Examples
use timely::dataflow::Scope;
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let data = scope.new_collection_from(1 .. 10).1;

    let result = scope.iterative::<u64,_,_>(|child| {
        data.enter_at(child, |x| *x)
            .leave()
    });

    data.assert_eq(&result);
});
source

pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
where F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,

Delays each difference by a supplied function.

It is assumed that func only advances timestamps; this is not verified, and things may go horribly wrong if that assumption is incorrect. It is also critical that func be monotonic: if two times are ordered, they should have the same order once func is applied to them (this is because we advance the timely capability with the same logic, and it must remain less_equal to all of the data timestamps).

source

pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
where F: FnMut(&(D, G::Timestamp, R)) + 'static,

Applies a supplied function to each update.

This method is most commonly used to report information back to the user, often for debugging purposes. Any function can be used here, but be warned that the incremental nature of differential dataflow does not guarantee that it will be called as many times as you might expect.

The (data, time, diff) triples indicate a change diff to the frequency of data which takes effect at the logical time time. When times are totally ordered (for example, usize), these updates reflect the changes along the sequence of collections. For partially ordered times, the mathematics are more interesting and less intuitive, unfortunately.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {
    scope.new_collection_from(1 .. 10).1
         .map_in_place(|x| *x *= 2)
         .filter(|x| x % 2 == 1)
         .inspect(|x| println!("error: {:?}", x));
});
source

pub fn inspect_batch<F>(&self, func: F) -> Collection<G, D, R>
where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)]) + 'static,

Applies a supplied function to each batch of updates.

This method is analogous to inspect, but operates on batches and reveals the timestamp of the timely dataflow capability associated with the batch of updates. The observed batching depends on how the system executes, and may vary run to run.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {
    scope.new_collection_from(1 .. 10).1
         .map_in_place(|x| *x *= 2)
         .filter(|x| x % 2 == 1)
         .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
});
source

pub fn assert_empty(&self)

Assert if the collection is ever non-empty.

Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation is not run, or not run to completion, there may be un-exercised times at which the collection could be non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should indicate that this assertion never found cause to complain.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {
    scope.new_collection_from(1 .. 10).1
         .map(|x| x * 2)
         .filter(|x| x % 2 == 1)
         .assert_empty();
});
source

pub fn leave(&self) -> Collection<G, D, R>

Returns the final value of a Collection from a nested scope to its containing scope.

§Examples
use timely::dataflow::Scope;
use differential_dataflow::input::Input;

::timely::example(|scope| {

   let data = scope.new_collection_from(1 .. 10).1;

   let result = scope.region(|child| {
        data.enter(child)
            .leave()
    });

    data.assert_eq(&result);
});
source

pub fn leave_region(&self) -> Collection<G, D, R>

Returns the value of a Collection from a nested region to its containing scope.

This method is a specialization of leave to the case that of a nested region. It removes the need for an operator that adjusts the timestamp.

source

pub fn negate(&self) -> Collection<G, D, R>

Creates a new collection whose counts are the negation of those in the input.

This method is most commonly used with concat to get those element in one collection but not another. However, differential dataflow computations are still defined for all values of the difference type R, including negative counts.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let data = scope.new_collection_from(1 .. 10).1;

    let odds = data.filter(|x| x % 2 == 1);
    let evens = data.filter(|x| x % 2 == 0);

    odds.negate()
        .concat(&data)
        .assert_eq(&evens);
});
source

pub fn assert_eq(&self, other: &Self)

Assert if the collections are ever different.

Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation is not run, or not run to completion, there may be un-exercised times at which the collections could vary. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should indicate that this assertion never found cause to complain.

§Examples
use differential_dataflow::input::Input;

::timely::example(|scope| {

    let data = scope.new_collection_from(1 .. 10).1;

    let odds = data.filter(|x| x % 2 == 1);
    let evens = data.filter(|x| x % 2 == 0);

    odds.concat(&evens)
        .assert_eq(&data);
});

Trait Implementations§

source§

impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R>
where G::Timestamp: Lattice,

§

type Target = Collection<G, D, R>

The resulting type after dereferencing.
source§

fn deref(&self) -> &Self::Target

Dereferences the value.

Auto Trait Implementations§

§

impl<G, D, R> Freeze for Variable<G, D, R>

§

impl<G, D, R> !RefUnwindSafe for Variable<G, D, R>

§

impl<G, D, R> !Send for Variable<G, D, R>

§

impl<G, D, R> !Sync for Variable<G, D, R>

§

impl<G, D, R> Unpin for Variable<G, D, R>
where <<G as ScopeParent>::Timestamp as Timestamp>::Summary: Unpin, G: Unpin, D: Unpin, R: Unpin, <G as ScopeParent>::Timestamp: Unpin,

§

impl<G, D, R> !UnwindSafe for Variable<G, D, R>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.