pub trait UnorderedInputCore<G: Scope> {
    // Required method
    fn new_unordered_input_core<D: Container>(
        &mut self
    ) -> ((UnorderedHandleCore<G::Timestamp, D>, ActivateCapability<G::Timestamp>), StreamCore<G, D>);
}
Expand description

Create a new Stream and Handle through which to supply input.

Required Methods§

source

fn new_unordered_input_core<D: Container>( &mut self ) -> ((UnorderedHandleCore<G::Timestamp, D>, ActivateCapability<G::Timestamp>), StreamCore<G, D>)

Create a new capability-based StreamCore and UnorderedHandleCore through which to supply input. This input supports multiple open epochs (timestamps) at the same time.

The new_unordered_input_core method returns ((HandleCore, Capability), StreamCore) where the StreamCore can be used immediately for timely dataflow construction, HandleCore and Capability are later used to introduce data into the timely dataflow computation.

The Capability returned is for the default value of the timestamp type in use. The capability can be dropped to inform the system that the input has advanced beyond the capability’s timestamp. To retain the ability to send, a new capability at a later timestamp should be obtained first, via the delayed function for Capability.

To communicate the end-of-input drop all available capabilities.

§Examples
use std::sync::{Arc, Mutex};

use timely::*;
use timely::dataflow::operators::*;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::Stream;

// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

timely::execute(Config::thread(), move |worker| {

    // this is only to validate the output.
    let send = send.lock().unwrap().clone();

    // create and capture the unordered input.
    let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
        let (input, stream) = scope.new_unordered_input_core();
        stream.capture_into(send);
        input
    });

    // feed values 0..10 at times 0..10.
    for round in 0..10 {
        input.session(cap.clone()).give(round);
        cap = cap.delayed(&(round + 1));
        worker.step();
    }
}).unwrap();

let extract = recv.extract();
for i in 0..10 {
    assert_eq!(extract[i], (i, vec![i]));
}

Object Safety§

This trait is not object safe.

Implementors§