timely/dataflow/operators/core/capture/extract.rs
1//! Traits and types for extracting captured timely dataflow streams.
2
3use super::Event;
4use crate::Container;
5use crate::container::{SizableContainer, DrainContainer, PushInto};
6
7/// Supports extracting a sequence of timestamp and data.
8pub trait Extract<T, C> {
9 /// Converts `self` into a sequence of timestamped data.
10 ///
11 /// Currently this is only implemented for `Receiver<Event<T, C>>`, and is used only
12 /// to easily pull data out of a timely dataflow computation once it has completed.
13 ///
14 /// # Examples
15 ///
16 /// ```rust
17 /// use std::rc::Rc;
18 /// use std::sync::{Arc, Mutex};
19 /// use timely::dataflow::Scope;
20 /// use timely::dataflow::operators::{Capture, ToStream, Inspect};
21 /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
22 ///
23 /// // get send and recv endpoints, wrap send to share
24 /// let (send, recv) = ::std::sync::mpsc::channel();
25 /// let send = Arc::new(Mutex::new(send));
26 ///
27 /// timely::execute(timely::Config::thread(), move |worker| {
28 ///
29 /// // this is only to validate the output.
30 /// let send = send.lock().unwrap().clone();
31 ///
32 /// // these are to capture/replay the stream.
33 /// let handle1 = Rc::new(EventLink::new());
34 /// let handle2 = Some(handle1.clone());
35 ///
36 /// worker.dataflow::<u64,_,_>(|scope1|
37 /// (0..10).to_stream(scope1)
38 /// .container::<Vec<_>>()
39 /// .capture_into(handle1)
40 /// );
41 ///
42 /// worker.dataflow(|scope2| {
43 /// handle2.replay_into(scope2)
44 /// .capture_into(send)
45 /// });
46 /// }).unwrap();
47 ///
48 /// assert_eq!(recv.extract().into_iter().flat_map(|x| x.1).collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
49 /// ```
50 fn extract(self) -> Vec<(T, C)>;
51}
52
53impl<T: Ord, C: SizableContainer> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
54where
55 for<'a> C: Container + DrainContainer<Item<'a>: Ord> + PushInto<C::Item<'a>>,
56{
57 fn extract(self) -> Vec<(T, C)> {
58 let mut staged = std::collections::BTreeMap::new();
59 for event in self {
60 if let Event::Messages(time, data) = event {
61 staged.entry(time)
62 .or_insert_with(Vec::new)
63 .push(data);
64 }
65 }
66 let mut result = Vec::new();
67 for (time, mut dataz) in staged.into_iter() {
68 let mut to_sort = Vec::new();
69 for data in dataz.iter_mut() {
70 to_sort.extend(data.drain());
71 }
72 to_sort.sort();
73 let mut sorted = C::default();
74 for datum in to_sort.into_iter() {
75 sorted.push_into(datum);
76 }
77 if !sorted.is_empty() {
78 result.push((time, sorted));
79 }
80 }
81 result
82 }
83}