pub trait UnorderedInput<G: Scope> {
    // Required method
    fn new_unordered_input<D: Data>(
        &mut self,
    ) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>);
}
Expand description

Create a new Stream and Handle through which to supply input.

Required Methods§

source

fn new_unordered_input<D: Data>( &mut self, ) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>)

Create a new capability-based Stream and Handle through which to supply input. This input supports multiple open epochs (timestamps) at the same time.

The new_unordered_input method returns ((Handle, Capability), Stream) where the Stream can be used immediately for timely dataflow construction, Handle and Capability are later used to introduce data into the timely dataflow computation.

The Capability returned is for the default value of the timestamp type in use. The capability can be dropped to inform the system that the input has advanced beyond the capability’s timestamp. To retain the ability to send, a new capability at a later timestamp should be obtained first, via the delayed function for Capability.

To communicate the end-of-input drop all available capabilities.

§Examples
use std::sync::{Arc, Mutex};

use timely::*;
use timely::dataflow::operators::*;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::Stream;

// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

timely::execute(Config::thread(), move |worker| {

    // this is only to validate the output.
    let send = send.lock().unwrap().clone();

    // create and capture the unordered input.
    let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
        let (input, stream) = scope.new_unordered_input();
        stream.capture_into(send);
        input
    });

    // feed values 0..10 at times 0..10.
    for round in 0..10 {
        input.session(cap.clone()).give(round);
        cap = cap.delayed(&(round + 1));
        worker.step();
    }
}).unwrap();

let extract = recv.extract();
for i in 0..10 {
    assert_eq!(extract[i], (i, vec![i]));
}

Object Safety§

This trait is not object safe.

Implementors§

source§

impl<G: Scope> UnorderedInput<G> for G