timely/dataflow/operators/core/capture/
extract.rs

1//! Traits and types for extracting captured timely dataflow streams.
2
3use super::Event;
4use crate::{container::{SizableContainer, PushInto}};
5
6/// Supports extracting a sequence of timestamp and data.
7pub trait Extract<T, C> {
8    /// Converts `self` into a sequence of timestamped data.
9    ///
10    /// Currently this is only implemented for `Receiver<Event<T, C>>`, and is used only
11    /// to easily pull data out of a timely dataflow computation once it has completed.
12    ///
13    /// # Examples
14    ///
15    /// ```rust
16    /// use std::rc::Rc;
17    /// use std::sync::{Arc, Mutex};
18    /// use timely::dataflow::Scope;
19    /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
20    /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
21    ///
22    /// // get send and recv endpoints, wrap send to share
23    /// let (send, recv) = ::std::sync::mpsc::channel();
24    /// let send = Arc::new(Mutex::new(send));
25    ///
26    /// timely::execute(timely::Config::thread(), move |worker| {
27    ///
28    ///     // this is only to validate the output.
29    ///     let send = send.lock().unwrap().clone();
30    ///
31    ///     // these are to capture/replay the stream.
32    ///     let handle1 = Rc::new(EventLink::new());
33    ///     let handle2 = Some(handle1.clone());
34    ///
35    ///     worker.dataflow::<u64,_,_>(|scope1|
36    ///         (0..10).to_stream(scope1)
37    ///                .capture_into(handle1)
38    ///     );
39    ///
40    ///     worker.dataflow(|scope2| {
41    ///         handle2.replay_into(scope2)
42    ///                .capture_into(send)
43    ///     });
44    /// }).unwrap();
45    ///
46    /// assert_eq!(recv.extract().into_iter().flat_map(|x| x.1).collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
47    /// ```
48    fn extract(self) -> Vec<(T, C)>;
49}
50
51impl<T: Ord, C: SizableContainer> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
52where
53    for<'a> C: PushInto<C::Item<'a>>,
54    for<'a> C::Item<'a>: Ord,
55{
56    fn extract(self) -> Vec<(T, C)> {
57        let mut staged = std::collections::BTreeMap::new();
58        for event in self {
59            if let Event::Messages(time, data) = event {
60                staged.entry(time)
61                      .or_insert_with(Vec::new)
62                      .push(data);
63            }
64        }
65        let mut result = Vec::new();
66        for (time, mut dataz) in staged.into_iter() {
67            let mut to_sort = Vec::new();
68            for data in dataz.iter_mut() {
69                to_sort.extend(data.drain());
70            }
71            to_sort.sort();
72            let mut sorted = C::default();
73            for datum in to_sort.into_iter() {
74                sorted.push(datum);
75            }
76            if !sorted.is_empty() {
77                result.push((time, sorted));
78            }
79        }
80        result
81    }
82}