timely/dataflow/operators/core/capture/capture.rs
1//! Traits and types for capturing timely dataflow streams.
2//!
3//! All timely dataflow streams can be captured, but there are many ways to capture
4//! these streams. A stream may be `capture_into`'d any type implementing `EventPusher`,
5//! and there are several default implementations, including a linked-list, Rust's MPSC
6//! queue, and a binary serializer wrapping any `W: Write`.
7
8use crate::dataflow::{Scope, Stream};
9use crate::dataflow::channels::pact::Pipeline;
10use crate::dataflow::channels::pullers::Counter as PullCounter;
11use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
12
13use crate::Container;
14use crate::progress::ChangeBatch;
15use crate::progress::Timestamp;
16
17use super::{Event, EventPusher};
18
19/// Capture a stream of timestamped data for later replay.
20pub trait Capture<T: Timestamp, C: Container> : Sized {
21 /// Captures a stream of timestamped data for later replay.
22 ///
23 /// # Examples
24 ///
25 /// The type `Rc<EventLink<T,D>>` implements a typed linked list,
26 /// and can be captured into and replayed from.
27 ///
28 /// ```rust
29 /// use std::rc::Rc;
30 /// use std::sync::{Arc, Mutex};
31 /// use timely::dataflow::Scope;
32 /// use timely::dataflow::operators::{Capture, ToStream};
33 /// use timely::dataflow::operators::capture::{EventLink, Replay, Extract};
34 ///
35 /// // get send and recv endpoints, wrap send to share
36 /// let (send, recv) = ::std::sync::mpsc::channel();
37 /// let send = Arc::new(Mutex::new(send));
38 ///
39 /// timely::execute(timely::Config::thread(), move |worker| {
40 ///
41 /// // this is only to validate the output.
42 /// let send = send.lock().unwrap().clone();
43 ///
44 /// // these are to capture/replay the stream.
45 /// let handle1 = Rc::new(EventLink::new());
46 /// let handle2 = Some(handle1.clone());
47 ///
48 /// worker.dataflow::<u64,_,_>(|scope1|
49 /// (0..10).to_stream(scope1)
50 /// .container::<Vec<_>>()
51 /// .capture_into(handle1)
52 /// );
53 ///
54 /// worker.dataflow(|scope2| {
55 /// handle2.replay_into(scope2)
56 /// .capture_into(send)
57 /// });
58 /// }).unwrap();
59 ///
60 /// assert_eq!(recv.extract()[0].1, (0..10).collect::<Vec<_>>());
61 /// ```
62 ///
63 /// The types `EventWriter<T, D, W>` and `EventReader<T, D, R>` can be
64 /// captured into and replayed from, respectively. They use binary writers
65 /// and readers respectively, and can be backed by files, network sockets,
66 /// etc.
67 ///
68 /// ```
69 /// use std::rc::Rc;
70 /// use std::net::{TcpListener, TcpStream};
71 /// use std::sync::{Arc, Mutex};
72 /// use timely::dataflow::Scope;
73 /// use timely::dataflow::operators::{Capture, ToStream};
74 /// use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract};
75 ///
76 /// # #[cfg(miri)] fn main() {}
77 /// # #[cfg(not(miri))]
78 /// # fn main() {
79 /// // get send and recv endpoints, wrap send to share
80 /// let (send0, recv0) = ::std::sync::mpsc::channel();
81 /// let send0 = Arc::new(Mutex::new(send0));
82 ///
83 /// // these allow us to capture / replay a timely stream.
84 /// let list = TcpListener::bind("127.0.0.1:8001").unwrap();
85 /// let send = TcpStream::connect("127.0.0.1:8001").unwrap();
86 /// let recv = list.incoming().next().unwrap().unwrap();
87 /// recv.set_nonblocking(true).unwrap();
88 ///
89 /// std::thread::scope(move |s| {
90 /// s.spawn(move || timely::example(move |scope1| {
91 /// (0..10u64)
92 /// .to_stream(scope1)
93 /// .container::<Vec<_>>()
94 /// .capture_into(EventWriter::new(send))
95 /// }));
96 /// s.spawn(move || timely::example(move |scope2| {
97 /// // this is only to validate the output.
98 /// let send0 = send0.lock().unwrap().clone();
99 /// Some(EventReader::<_,Vec<u64>,_>::new(recv))
100 /// .replay_into(scope2)
101 /// .capture_into(send0)
102 /// }));
103 /// });
104 ///
105 /// assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());
106 /// # }
107 /// ```
108 fn capture_into<P: EventPusher<T, C>+'static>(self, pusher: P);
109
110 /// Captures a stream using Rust's MPSC channels.
111 fn capture(self) -> ::std::sync::mpsc::Receiver<Event<T, C>> {
112 let (send, recv) = ::std::sync::mpsc::channel();
113 self.capture_into(send);
114 recv
115 }
116}
117
118impl<S: Scope, C: Container> Capture<S::Timestamp, C> for Stream<S, C> {
119 fn capture_into<P: EventPusher<S::Timestamp, C>+'static>(self, mut event_pusher: P) {
120
121 let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
122 let mut input = PullCounter::new(builder.new_input(self, Pipeline));
123 let mut started = false;
124
125 builder.build(
126 move |progress| {
127
128 if !started {
129 // discard initial capability.
130 progress.frontiers[0].update(Timestamp::minimum(), -1);
131 started = true;
132 }
133 if !progress.frontiers[0].is_empty() {
134 // transmit any frontier progress.
135 let to_send = ::std::mem::replace(&mut progress.frontiers[0], ChangeBatch::new());
136 event_pusher.push(Event::Progress(to_send.into_inner().to_vec()));
137 }
138
139 // turn each received message into an event.
140 while let Some(message) = input.next() {
141 let time = &message.time;
142 let data = &mut message.data;
143 let vector = std::mem::take(data);
144 event_pusher.push(Event::Messages(time.clone(), vector));
145 }
146 input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);
147 false
148 }
149 );
150 }
151}