timely/dataflow/operators/core/
unordered_input.rs

1//! Create new `StreamCore`s connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::Container;
7use crate::container::{ContainerBuilder, CapacityContainerBuilder};
8
9use crate::scheduling::{Schedule, ActivateOnDrop};
10
11use crate::progress::{Operate, operate::SharedProgress, Timestamp};
12use crate::progress::Source;
13use crate::progress::ChangeBatch;
14use crate::progress::operate::Connectivity;
15use crate::dataflow::channels::pushers::{Counter, Tee};
16use crate::dataflow::channels::pushers::buffer::{Buffer as PushBuffer, AutoflushSession};
17
18use crate::dataflow::operators::{ActivateCapability, Capability};
19
20use crate::dataflow::{Scope, StreamCore};
21
22/// Create a new `Stream` and `Handle` through which to supply input.
23pub trait UnorderedInput<G: Scope> {
24    /// Create a new capability-based [StreamCore] and [UnorderedHandle] through which to supply input. This
25    /// input supports multiple open epochs (timestamps) at the same time.
26    ///
27    /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used
28    /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce
29    /// data into the timely dataflow computation.
30    ///
31    /// The `Capability` returned is for the default value of the timestamp type in use. The
32    /// capability can be dropped to inform the system that the input has advanced beyond the
33    /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
34    /// should be obtained first, via the `delayed` function for `Capability`.
35    ///
36    /// To communicate the end-of-input drop all available capabilities.
37    ///
38    /// # Examples
39    ///
40    /// ```
41    /// use std::sync::{Arc, Mutex};
42    ///
43    /// use timely::*;
44    /// use timely::dataflow::operators::{capture::Extract, Capture};
45    /// use timely::dataflow::operators::core::{UnorderedInput};
46    /// use timely::dataflow::Stream;
47    ///
48    /// // get send and recv endpoints, wrap send to share
49    /// let (send, recv) = ::std::sync::mpsc::channel();
50    /// let send = Arc::new(Mutex::new(send));
51    ///
52    /// timely::execute(Config::thread(), move |worker| {
53    ///
54    ///     // this is only to validate the output.
55    ///     let send = send.lock().unwrap().clone();
56    ///
57    ///     // create and capture the unordered input.
58    ///     let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
59    ///         let (input, stream) = scope.new_unordered_input();
60    ///         stream
61    ///             .container::<Vec<_>>()
62    ///             .capture_into(send);
63    ///         input
64    ///     });
65    ///
66    ///     // feed values 0..10 at times 0..10.
67    ///     for round in 0..10 {
68    ///         input.session(cap.clone()).give(round);
69    ///         cap = cap.delayed(&(round + 1));
70    ///         worker.step();
71    ///     }
72    /// }).unwrap();
73    ///
74    /// let extract = recv.extract();
75    /// for i in 0..10 {
76    ///     assert_eq!(extract[i], (i, vec![i]));
77    /// }
78    /// ```
79    fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>);
80}
81
82impl<G: Scope> UnorderedInput<G> for G {
83    fn new_unordered_input<CB: ContainerBuilder>(&mut self) -> ((UnorderedHandle<G::Timestamp, CB>, ActivateCapability<G::Timestamp>), StreamCore<G, CB::Container>) {
84
85        let (output, registrar) = Tee::<G::Timestamp, CB::Container>::new();
86        let internal = Rc::new(RefCell::new(ChangeBatch::new()));
87        // let produced = Rc::new(RefCell::new(ChangeBatch::new()));
88        let cap = Capability::new(G::Timestamp::minimum(), Rc::clone(&internal));
89        let counter = Counter::new(output);
90        let produced = Rc::clone(counter.produced());
91        let peers = self.peers();
92
93        let index = self.allocate_operator_index();
94        let address = self.addr_for_child(index);
95
96        let cap = ActivateCapability::new(cap, Rc::clone(&address), self.activations());
97
98        let helper = UnorderedHandle::new(counter);
99
100        self.add_operator_with_index(Box::new(UnorderedOperator {
101            name: "UnorderedInput".to_owned(),
102            address,
103            shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
104            internal,
105            produced,
106            peers,
107        }), index);
108
109        ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone()))
110    }
111}
112
113struct UnorderedOperator<T:Timestamp> {
114    name: String,
115    address: Rc<[usize]>,
116    shared_progress: Rc<RefCell<SharedProgress<T>>>,
117    internal:   Rc<RefCell<ChangeBatch<T>>>,
118    produced:   Rc<RefCell<ChangeBatch<T>>>,
119    peers:     usize,
120}
121
122impl<T:Timestamp> Schedule for UnorderedOperator<T> {
123    fn name(&self) -> &str { &self.name }
124    fn path(&self) -> &[usize] { &self.address[..] }
125    fn schedule(&mut self) -> bool {
126        let shared_progress = &mut *self.shared_progress.borrow_mut();
127        self.internal.borrow_mut().drain_into(&mut shared_progress.internals[0]);
128        self.produced.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
129        false
130    }
131}
132
133impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
134    fn inputs(&self) -> usize { 0 }
135    fn outputs(&self) -> usize { 1 }
136
137    fn get_internal_summary(&mut self) -> (Connectivity<<T as Timestamp>::Summary>, Rc<RefCell<SharedProgress<T>>>) {
138        let mut borrow = self.internal.borrow_mut();
139        for (time, count) in borrow.drain() {
140            self.shared_progress.borrow_mut().internals[0].update(time, count * (self.peers as i64));
141        }
142        (Vec::new(), Rc::clone(&self.shared_progress))
143    }
144
145    fn notify_me(&self) -> bool { false }
146}
147
148/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation.
149#[derive(Debug)]
150pub struct UnorderedHandle<T: Timestamp, CB: ContainerBuilder> {
151    buffer: PushBuffer<T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>,
152}
153
154impl<T: Timestamp, CB: ContainerBuilder> UnorderedHandle<T, CB> {
155    fn new(pusher: Counter<T, CB::Container, Tee<T, CB::Container>>) -> UnorderedHandle<T, CB> {
156        UnorderedHandle {
157            buffer: PushBuffer::new(pusher),
158        }
159    }
160
161    /// Allocates a new automatically flushing session based on the supplied capability.
162    #[inline]
163    pub fn session_with_builder(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CB, Counter<T, CB::Container, Tee<T, CB::Container>>>> {
164        ActivateOnDrop::new(self.buffer.autoflush_session_with_builder(cap.capability.clone()), Rc::clone(&cap.address), Rc::clone(&cap.activations))
165    }
166}
167
168impl<T: Timestamp, C: Container> UnorderedHandle<T, CapacityContainerBuilder<C>> {
169    /// Allocates a new automatically flushing session based on the supplied capability.
170    #[inline]
171    pub fn session(&mut self, cap: ActivateCapability<T>) -> ActivateOnDrop<AutoflushSession<'_, T, CapacityContainerBuilder<C>, Counter<T, C, Tee<T, C>>>> {
172        self.session_with_builder(cap)
173    }
174}