Skip to main content

timely/dataflow/operators/core/
unordered_input.rs

1//! Create new `Stream`s connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::ContainerBuilder;
7use crate::scheduling::{Schedule, ActivateOnDrop};
8
9use crate::progress::{Operate, operate::SharedProgress, Timestamp};
10use crate::progress::Source;
11use crate::progress::ChangeBatch;
12use crate::progress::operate::Connectivity;
13use crate::dataflow::channels::pushers::{Counter, Output, Tee};
14use crate::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession};
15use crate::dataflow::operators::{ActivateCapability, Capability};
16use crate::dataflow::{Scope, Stream};
17
18use crate::scheduling::Activations;
19
20/// Create a new `Stream` and `Handle` through which to supply input.
21pub trait UnorderedInput<'scope, T: Timestamp> {
22    /// Create a new capability-based [Stream] and [UnorderedHandle] through which to supply input. This
23    /// input supports multiple open epochs (timestamps) at the same time.
24    ///
25    /// The `new_unordered_input_core` method returns `((HandleCore, Capability), Stream)` where the `Stream` can be used
26    /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce
27    /// data into the timely dataflow computation.
28    ///
29    /// The `Capability` returned is for the default value of the timestamp type in use. The
30    /// capability can be dropped to inform the system that the input has advanced beyond the
31    /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
32    /// should be obtained first, via the `delayed` function for `Capability`.
33    ///
34    /// To communicate the end-of-input drop all available capabilities.
35    ///
36    /// # Examples
37    ///
38    /// ```
39    /// use std::sync::{Arc, Mutex};
40    ///
41    /// use timely::*;
42    /// use timely::dataflow::operators::{capture::Extract, Capture};
43    /// use timely::dataflow::operators::core::{UnorderedInput};
44    ///
45    /// // get send and recv endpoints, wrap send to share
46    /// let (send, recv) = ::std::sync::mpsc::channel();
47    /// let send = Arc::new(Mutex::new(send));
48    ///
49    /// timely::execute(Config::thread(), move |worker| {
50    ///
51    ///     // this is only to validate the output.
52    ///     let send = send.lock().unwrap().clone();
53    ///
54    ///     // create and capture the unordered input.
55    ///     let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
56    ///         let (input, stream) = scope.new_unordered_input();
57    ///         stream
58    ///             .container::<Vec<_>>()
59    ///             .capture_into(send);
60    ///         input
61    ///     });
62    ///
63    ///     // feed values 0..10 at times 0..10.
64    ///     for round in 0..10 {
65    ///         input.activate().session(&cap).give(round);
66    ///         cap = cap.delayed(&(round + 1));
67    ///         worker.step();
68    ///     }
69    /// }).unwrap();
70    ///
71    /// let extract = recv.extract();
72    /// for i in 0..10 {
73    ///     assert_eq!(extract[i], (i, vec![i]));
74    /// }
75    /// ```
76    fn new_unordered_input<CB: ContainerBuilder>(&self) -> ((UnorderedHandle<T, CB>, ActivateCapability<T>), Stream<'scope, T, CB::Container>);
77}
78
79impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> {
80    fn new_unordered_input<CB: ContainerBuilder>(&self) -> ((UnorderedHandle<T, CB>, ActivateCapability<T>), Stream<'scope, T, CB::Container>) {
81
82        let (output, registrar) = Tee::<T, CB::Container>::new();
83        let internal = Rc::new(RefCell::new(ChangeBatch::new()));
84        // let produced = Rc::new(RefCell::new(ChangeBatch::new()));
85        let cap = Capability::new(T::minimum(), Rc::clone(&internal));
86        let counter = Counter::new(output);
87        let produced = Rc::clone(counter.produced());
88        let counter = Output::new(counter, Rc::clone(&internal), 0);
89        let peers = self.peers();
90
91        let slot = self.reserve_operator();
92        let index = slot.index();
93        let address = slot.addr();
94
95        let cap = ActivateCapability::new(cap, Rc::clone(&address), self.activations());
96
97        let helper = UnorderedHandle::new(counter, Rc::clone(&address), self.activations());
98
99        slot.install(Box::new(UnorderedOperator {
100            name: "UnorderedInput".to_owned(),
101            address,
102            shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
103            internal,
104            produced,
105            peers,
106        }));
107
108        ((helper, cap), Stream::new(Source::new(index, 0), registrar, *self))
109    }
110}
111
112struct UnorderedOperator<T:Timestamp> {
113    name: String,
114    address: Rc<[usize]>,
115    shared_progress: Rc<RefCell<SharedProgress<T>>>,
116    internal:   Rc<RefCell<ChangeBatch<T>>>,
117    produced:   Rc<RefCell<ChangeBatch<T>>>,
118    peers:     usize,
119}
120
121impl<T:Timestamp> Schedule for UnorderedOperator<T> {
122    fn name(&self) -> &str { &self.name }
123    fn path(&self) -> &[usize] { &self.address[..] }
124    fn schedule(&mut self) -> bool {
125        let shared_progress = &mut *self.shared_progress.borrow_mut();
126        self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]);
127        self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
128        false
129    }
130}
131
132use crate::progress::operate::FrontierInterest;
133impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
134    fn inputs(&self) -> usize { 0 }
135    fn outputs(&self) -> usize { 1 }
136
137    fn initialize(self: Box<Self>) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>) {
138        for (time, count) in self.internal.borrow_mut().drain() {
139            self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
140        }
141        (Vec::new(), Rc::clone(&self.shared_progress), self)
142    }
143
144    fn notify_me(&self) -> &[FrontierInterest] { &[] }
145}
146
147/// A handle to an input [Stream], used to introduce data to a timely dataflow computation.
148pub struct UnorderedHandle<T: Timestamp, CB: ContainerBuilder> {
149    output: OutputBuilder<T, CB>,
150    address: Rc<[usize]>,
151    activations: Rc<RefCell<Activations>>,
152}
153
154impl<T: Timestamp, CB: ContainerBuilder> UnorderedHandle<T, CB> {
155    fn new(output: Output<T, CB::Container>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
156        Self {
157            output: OutputBuilder::from(output),
158            address,
159            activations,
160        }
161    }
162
163    /// Allocates a new automatically flushing session based on the supplied capability.
164    #[inline]
165    pub fn activate(&mut self) -> ActivateOnDrop<OutputBuilderSession<'_, T, CB>> {
166        ActivateOnDrop::new(self.output.activate(), Rc::clone(&self.address), Rc::clone(&self.activations))
167    }
168}