Trait timely::dataflow::operators::core::input::Input

source ·
pub trait Input: Scope {
    // Required methods
    fn new_input<C: Container + Data>(
        &mut self,
    ) -> (Handle<<Self as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, StreamCore<Self, C>);
    fn new_input_with_builder<CB: ContainerBuilder>(
        &mut self,
    ) -> (Handle<<Self as ScopeParent>::Timestamp, CB>, StreamCore<Self, CB::Container>);
    fn input_from<CB: ContainerBuilder>(
        &mut self,
        handle: &mut Handle<<Self as ScopeParent>::Timestamp, CB>,
    ) -> StreamCore<Self, CB::Container>;
}
Expand description

Create a new Stream and Handle through which to supply input.

Required Methods§

source

fn new_input<C: Container + Data>( &mut self, ) -> (Handle<<Self as ScopeParent>::Timestamp, CapacityContainerBuilder<C>>, StreamCore<Self, C>)

Create a new StreamCore and Handle through which to supply input.

The new_input method returns a pair (Handle, StreamCore) where the StreamCore can be used immediately for timely dataflow construction, and the Handle is later used to introduce data into the timely dataflow computation.

The Handle also provides a means to indicate to timely dataflow that the input has advanced beyond certain timestamps, allowing timely to issue progress notifications.

§Examples
use timely::*;
use timely::dataflow::operators::core::{Input, Inspect};

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut input = worker.dataflow(|scope| {
        let (input, stream) = scope.new_input::<Vec<_>>();
        stream.inspect(|x| println!("hello {:?}", x));
        input
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send(round);
        input.advance_to(round + 1);
        worker.step();
    }
});
source

fn new_input_with_builder<CB: ContainerBuilder>( &mut self, ) -> (Handle<<Self as ScopeParent>::Timestamp, CB>, StreamCore<Self, CB::Container>)

Create a new StreamCore and Handle through which to supply input.

The new_input method returns a pair (Handle, StreamCore) where the StreamCore can be used immediately for timely dataflow construction, and the Handle is later used to introduce data into the timely dataflow computation.

The Handle also provides a means to indicate to timely dataflow that the input has advanced beyond certain timestamps, allowing timely to issue progress notifications.

§Examples
use std::rc::Rc;
use timely::*;
use timely::dataflow::operators::core::{Input, Inspect};
use timely::container::CapacityContainerBuilder;

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut input = worker.dataflow(|scope| {
        let (input, stream) = scope.new_input_with_builder::<CapacityContainerBuilder<Rc<Vec<_>>>>();
        stream.inspect(|x| println!("hello {:?}", x));
        input
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send_batch(&mut Rc::new(vec![round]));
        input.advance_to(round + 1);
        worker.step();
    }
});
source

fn input_from<CB: ContainerBuilder>( &mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, CB>, ) -> StreamCore<Self, CB::Container>

Create a new stream from a supplied interactive handle.

This method creates a new timely stream whose data are supplied interactively through the handle argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate if it as attached to more than one stream.

§Examples
use timely::*;
use timely::dataflow::operators::core::{Input, Inspect};
use timely::dataflow::operators::core::input::Handle;

// construct and execute a timely dataflow
timely::execute(Config::thread(), |worker| {

    // add an input and base computation off of it
    let mut input = Handle::new();
    worker.dataflow(|scope| {
        scope.input_from(&mut input)
             .container::<Vec<_>>()
             .inspect(|x| println!("hello {:?}", x));
    });

    // introduce input, advance computation
    for round in 0..10 {
        input.send(round);
        input.advance_to(round + 1);
        worker.step();
    }
});

Object Safety§

This trait is not object safe.

Implementors§

source§

impl<G: Scope> Input for G