1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//! Traits and types for extracting captured timely dataflow streams.

use super::Event;
use crate::{container::{SizableContainer, PushInto}};

/// Supports extracting a sequence of timestamp and data.
pub trait Extract<T, C> {
    /// Converts `self` into a sequence of timestamped data.
    ///
    /// Currently this is only implemented for `Receiver<Event<T, C>>`, and is used only
    /// to easily pull data out of a timely dataflow computation once it has completed.
    ///
    /// # Examples
    ///
    /// ```rust
    /// use std::rc::Rc;
    /// use std::sync::{Arc, Mutex};
    /// use timely::dataflow::Scope;
    /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
    /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
    ///
    /// // get send and recv endpoints, wrap send to share
    /// let (send, recv) = ::std::sync::mpsc::channel();
    /// let send = Arc::new(Mutex::new(send));
    ///
    /// timely::execute(timely::Config::thread(), move |worker| {
    ///
    ///     // this is only to validate the output.
    ///     let send = send.lock().unwrap().clone();
    ///
    ///     // these are to capture/replay the stream.
    ///     let handle1 = Rc::new(EventLink::new());
    ///     let handle2 = Some(handle1.clone());
    ///
    ///     worker.dataflow::<u64,_,_>(|scope1|
    ///         (0..10).to_stream(scope1)
    ///                .capture_into(handle1)
    ///     );
    ///
    ///     worker.dataflow(|scope2| {
    ///         handle2.replay_into(scope2)
    ///                .capture_into(send)
    ///     });
    /// }).unwrap();
    ///
    /// assert_eq!(recv.extract().into_iter().flat_map(|x| x.1).collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
    /// ```
    fn extract(self) -> Vec<(T, C)>;
}

impl<T: Ord, C: SizableContainer> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
where
    for<'a> C: PushInto<C::Item<'a>>,
    for<'a> C::Item<'a>: Ord,
{
    fn extract(self) -> Vec<(T, C)> {
        let mut staged = std::collections::BTreeMap::new();
        for event in self {
            if let Event::Messages(time, data) = event {
                staged.entry(time)
                      .or_insert_with(Vec::new)
                      .push(data);
            }
        }
        let mut result = Vec::new();
        for (time, mut dataz) in staged.into_iter() {
            let mut to_sort = Vec::new();
            for data in dataz.iter_mut() {
                to_sort.extend(data.drain());
            }
            to_sort.sort();
            let mut sorted = C::default();
            for datum in to_sort.into_iter() {
                sorted.push(datum);
            }
            if !sorted.is_empty() {
                result.push((time, sorted));
            }
        }
        result
    }
}