1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
//! Create new `Streams` connected to external inputs.
use crate::Data;
use crate::container::CapacityContainerBuilder;
use crate::dataflow::operators::{ActivateCapability};
use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore};
use crate::dataflow::{Stream, Scope};
/// Create a new `Stream` and `Handle` through which to supply input.
pub trait UnorderedInput<G: Scope> {
/// Create a new capability-based `Stream` and `Handle` through which to supply input. This
/// input supports multiple open epochs (timestamps) at the same time.
///
/// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `Stream` can be used
/// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce
/// data into the timely dataflow computation.
///
/// The `Capability` returned is for the default value of the timestamp type in use. The
/// capability can be dropped to inform the system that the input has advanced beyond the
/// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
/// should be obtained first, via the `delayed` function for `Capability`.
///
/// To communicate the end-of-input drop all available capabilities.
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Mutex};
///
/// use timely::*;
/// use timely::dataflow::operators::*;
/// use timely::dataflow::operators::capture::Extract;
/// use timely::dataflow::Stream;
///
/// // get send and recv endpoints, wrap send to share
/// let (send, recv) = ::std::sync::mpsc::channel();
/// let send = Arc::new(Mutex::new(send));
///
/// timely::execute(Config::thread(), move |worker| {
///
/// // this is only to validate the output.
/// let send = send.lock().unwrap().clone();
///
/// // create and capture the unordered input.
/// let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
/// let (input, stream) = scope.new_unordered_input();
/// stream.capture_into(send);
/// input
/// });
///
/// // feed values 0..10 at times 0..10.
/// for round in 0..10 {
/// input.session(cap.clone()).give(round);
/// cap = cap.delayed(&(round + 1));
/// worker.step();
/// }
/// }).unwrap();
///
/// let extract = recv.extract();
/// for i in 0..10 {
/// assert_eq!(extract[i], (i, vec![i]));
/// }
/// ```
fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>);
}
impl<G: Scope> UnorderedInput<G> for G {
fn new_unordered_input<D:Data>(&mut self) -> ((UnorderedHandle<G::Timestamp, D>, ActivateCapability<G::Timestamp>), Stream<G, D>) {
UnorderedInputCore::new_unordered_input(self)
}
}
/// An unordered handle specialized to vectors.
pub type UnorderedHandle<T, D> = UnorderedHandleCore<T, CapacityContainerBuilder<Vec<D>>>;