timely/dataflow/operators/core/capture/extract.rs
1//! Traits and types for extracting captured timely dataflow streams.
2
3use super::Event;
4use crate::{container::{SizableContainer, PushInto}};
5
6/// Supports extracting a sequence of timestamp and data.
7pub trait Extract<T, C> {
8 /// Converts `self` into a sequence of timestamped data.
9 ///
10 /// Currently this is only implemented for `Receiver<Event<T, C>>`, and is used only
11 /// to easily pull data out of a timely dataflow computation once it has completed.
12 ///
13 /// # Examples
14 ///
15 /// ```rust
16 /// use std::rc::Rc;
17 /// use std::sync::{Arc, Mutex};
18 /// use timely::dataflow::Scope;
19 /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
20 /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
21 ///
22 /// // get send and recv endpoints, wrap send to share
23 /// let (send, recv) = ::std::sync::mpsc::channel();
24 /// let send = Arc::new(Mutex::new(send));
25 ///
26 /// timely::execute(timely::Config::thread(), move |worker| {
27 ///
28 /// // this is only to validate the output.
29 /// let send = send.lock().unwrap().clone();
30 ///
31 /// // these are to capture/replay the stream.
32 /// let handle1 = Rc::new(EventLink::new());
33 /// let handle2 = Some(handle1.clone());
34 ///
35 /// worker.dataflow::<u64,_,_>(|scope1|
36 /// (0..10).to_stream(scope1)
37 /// .capture_into(handle1)
38 /// );
39 ///
40 /// worker.dataflow(|scope2| {
41 /// handle2.replay_into(scope2)
42 /// .capture_into(send)
43 /// });
44 /// }).unwrap();
45 ///
46 /// assert_eq!(recv.extract().into_iter().flat_map(|x| x.1).collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
47 /// ```
48 fn extract(self) -> Vec<(T, C)>;
49}
50
51impl<T: Ord, C: SizableContainer> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
52where
53 for<'a> C: PushInto<C::Item<'a>>,
54 for<'a> C::Item<'a>: Ord,
55{
56 fn extract(self) -> Vec<(T, C)> {
57 let mut staged = std::collections::BTreeMap::new();
58 for event in self {
59 if let Event::Messages(time, data) = event {
60 staged.entry(time)
61 .or_insert_with(Vec::new)
62 .push(data);
63 }
64 }
65 let mut result = Vec::new();
66 for (time, mut dataz) in staged.into_iter() {
67 let mut to_sort = Vec::new();
68 for data in dataz.iter_mut() {
69 to_sort.extend(data.drain());
70 }
71 to_sort.sort();
72 let mut sorted = C::default();
73 for datum in to_sort.into_iter() {
74 sorted.push(datum);
75 }
76 if !sorted.is_empty() {
77 result.push((time, sorted));
78 }
79 }
80 result
81 }
82}