timely/dataflow/operators/core/capture/
extract.rs

1//! Traits and types for extracting captured timely dataflow streams.
2
3use super::Event;
4use crate::Container;
5use crate::container::{SizableContainer, DrainContainer, PushInto};
6
7/// Supports extracting a sequence of timestamp and data.
8pub trait Extract<T, C> {
9    /// Converts `self` into a sequence of timestamped data.
10    ///
11    /// Currently this is only implemented for `Receiver<Event<T, C>>`, and is used only
12    /// to easily pull data out of a timely dataflow computation once it has completed.
13    ///
14    /// # Examples
15    ///
16    /// ```rust
17    /// use std::rc::Rc;
18    /// use std::sync::{Arc, Mutex};
19    /// use timely::dataflow::Scope;
20    /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
21    /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
22    ///
23    /// // get send and recv endpoints, wrap send to share
24    /// let (send, recv) = ::std::sync::mpsc::channel();
25    /// let send = Arc::new(Mutex::new(send));
26    ///
27    /// timely::execute(timely::Config::thread(), move |worker| {
28    ///
29    ///     // this is only to validate the output.
30    ///     let send = send.lock().unwrap().clone();
31    ///
32    ///     // these are to capture/replay the stream.
33    ///     let handle1 = Rc::new(EventLink::new());
34    ///     let handle2 = Some(handle1.clone());
35    ///
36    ///     worker.dataflow::<u64,_,_>(|scope1|
37    ///         (0..10).to_stream(scope1)
38    ///                .capture_into(handle1)
39    ///     );
40    ///
41    ///     worker.dataflow(|scope2| {
42    ///         handle2.replay_into(scope2)
43    ///                .capture_into(send)
44    ///     });
45    /// }).unwrap();
46    ///
47    /// assert_eq!(recv.extract().into_iter().flat_map(|x| x.1).collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
48    /// ```
49    fn extract(self) -> Vec<(T, C)>;
50}
51
52impl<T: Ord, C: SizableContainer> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
53where
54    for<'a> C: Container + DrainContainer<Item<'a>: Ord> + PushInto<C::Item<'a>>,
55{
56    fn extract(self) -> Vec<(T, C)> {
57        let mut staged = std::collections::BTreeMap::new();
58        for event in self {
59            if let Event::Messages(time, data) = event {
60                staged.entry(time)
61                      .or_insert_with(Vec::new)
62                      .push(data);
63            }
64        }
65        let mut result = Vec::new();
66        for (time, mut dataz) in staged.into_iter() {
67            let mut to_sort = Vec::new();
68            for data in dataz.iter_mut() {
69                to_sort.extend(data.drain());
70            }
71            to_sort.sort();
72            let mut sorted = C::default();
73            for datum in to_sort.into_iter() {
74                sorted.push_into(datum);
75            }
76            if !sorted.is_empty() {
77                result.push((time, sorted));
78            }
79        }
80        result
81    }
82}