timely/dataflow/operators/core/capture/extract.rs
1//! Traits and types for extracting captured timely dataflow streams.
2
3use super::Event;
4use crate::Container;
5use crate::container::{SizableContainer, DrainContainer, PushInto};
6
7/// Supports extracting a sequence of timestamp and data.
8pub trait Extract<T, C> {
9 /// Converts `self` into a sequence of timestamped data.
10 ///
11 /// Currently this is only implemented for `Receiver<Event<T, C>>`, and is used only
12 /// to easily pull data out of a timely dataflow computation once it has completed.
13 ///
14 /// # Examples
15 ///
16 /// ```rust
17 /// use std::rc::Rc;
18 /// use std::sync::{Arc, Mutex};
19 /// use timely::dataflow::Scope;
20 /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
21 /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
22 ///
23 /// // get send and recv endpoints, wrap send to share
24 /// let (send, recv) = ::std::sync::mpsc::channel();
25 /// let send = Arc::new(Mutex::new(send));
26 ///
27 /// timely::execute(timely::Config::thread(), move |worker| {
28 ///
29 /// // this is only to validate the output.
30 /// let send = send.lock().unwrap().clone();
31 ///
32 /// // these are to capture/replay the stream.
33 /// let handle1 = Rc::new(EventLink::new());
34 /// let handle2 = Some(handle1.clone());
35 ///
36 /// worker.dataflow::<u64,_,_>(|scope1|
37 /// (0..10).to_stream(scope1)
38 /// .capture_into(handle1)
39 /// );
40 ///
41 /// worker.dataflow(|scope2| {
42 /// handle2.replay_into(scope2)
43 /// .capture_into(send)
44 /// });
45 /// }).unwrap();
46 ///
47 /// assert_eq!(recv.extract().into_iter().flat_map(|x| x.1).collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
48 /// ```
49 fn extract(self) -> Vec<(T, C)>;
50}
51
52impl<T: Ord, C: SizableContainer> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
53where
54 for<'a> C: Container + DrainContainer<Item<'a>: Ord> + PushInto<C::Item<'a>>,
55{
56 fn extract(self) -> Vec<(T, C)> {
57 let mut staged = std::collections::BTreeMap::new();
58 for event in self {
59 if let Event::Messages(time, data) = event {
60 staged.entry(time)
61 .or_insert_with(Vec::new)
62 .push(data);
63 }
64 }
65 let mut result = Vec::new();
66 for (time, mut dataz) in staged.into_iter() {
67 let mut to_sort = Vec::new();
68 for data in dataz.iter_mut() {
69 to_sort.extend(data.drain());
70 }
71 to_sort.sort();
72 let mut sorted = C::default();
73 for datum in to_sort.into_iter() {
74 sorted.push_into(datum);
75 }
76 if !sorted.is_empty() {
77 result.push((time, sorted));
78 }
79 }
80 result
81 }
82}