Skip to main content

timely/dataflow/operators/core/capture/
event.rs

1//! Traits and types describing timely dataflow events.
2//!
3//! The `Event` type describes the information an operator can observe about a timely dataflow
4//! stream. There are two types of events, (i) the receipt of data and (ii) reports of progress
5//! of timestamps.
6
7use columnar::Columnar;
8use serde::{Deserialize, Serialize};
9
10/// Data and progress events of the captured stream.
11#[derive(Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize, Columnar)]
12pub enum Event<T, C> {
13    /// Progress received via `push_external_progress`.
14    Progress(Vec<(T, i64)>),
15    /// Messages received via the data stream.
16    Messages(T, C),
17}
18
19/// Iterates over contained `Event<T, C>`.
20///
21/// The `EventIterator` trait describes types that can iterate over `Cow`s of events,
22/// and which can be used to replay a stream into a new timely dataflow computation.
23///
24/// This method is not simply an iterator because of the lifetime in the result.
25pub trait EventIterator<T: Clone, C: Clone> {
26    /// Iterates over `Cow<Event<T, C>>` elements.
27    fn next(&mut self) -> Option<std::borrow::Cow<'_, Event<T, C>>>;
28}
29
30/// Receives `Event<T, C>` events.
31pub trait EventPusher<T, C> {
32    /// Provides a new `Event<T, D>` to the pusher.
33    fn push(&mut self, event: Event<T, C>);
34}
35
36// implementation for the linked list behind a `Handle`.
37impl<T, C> EventPusher<T, C> for ::std::sync::mpsc::Sender<Event<T, C>> {
38    fn push(&mut self, event: Event<T, C>) {
39        // NOTE: An Err(x) result just means "data not accepted" most likely
40        //       because the receiver is gone. No need to panic.
41        let _ = self.send(event);
42    }
43}
44
45/// A linked-list event pusher and iterator.
46pub mod link {
47
48    use std::borrow::Cow;
49    use std::rc::Rc;
50    use std::cell::RefCell;
51
52    use super::{Event, EventPusher, EventIterator};
53
54    /// A linked list of Event<T, C>.
55    pub struct EventLink<T, C> {
56        /// An event, if one exists.
57        ///
58        /// An event might not exist, if either we want to insert a `None` and have the output iterator pause,
59        /// or in the case of the very first linked list element, which has no event when constructed.
60        pub event: Option<Event<T, C>>,
61        /// The next event, if it exists.
62        pub next: RefCell<Option<Rc<EventLink<T, C>>>>,
63    }
64
65    impl<T, C> EventLink<T, C> {
66        /// Allocates a new `EventLink`.
67        pub fn new() -> EventLink<T, C> {
68            EventLink { event: None, next: RefCell::new(None) }
69        }
70    }
71
72    // implementation for the linked list behind a `Handle`.
73    impl<T, C> EventPusher<T, C> for Rc<EventLink<T, C>> {
74        fn push(&mut self, event: Event<T, C>) {
75            *self.next.borrow_mut() = Some(Rc::new(EventLink { event: Some(event), next: RefCell::new(None) }));
76            let next = Rc::clone(self.next.borrow().as_ref().unwrap());
77            *self = next;
78        }
79    }
80
81    impl<T: Clone, C: Clone> EventIterator<T, C> for Rc<EventLink<T, C>> {
82        fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
83            let is_some = self.next.borrow().is_some();
84            if is_some {
85                let next = Rc::clone(self.next.borrow().as_ref().unwrap());
86                *self = next;
87                if let Some(this) = Rc::get_mut(self) {
88                    this.event.take().map(Cow::Owned)
89                }
90                else {
91                    self.event.as_ref().map(Cow::Borrowed)
92                }
93            }
94            else {
95                None
96            }
97        }
98    }
99
100    // Drop implementation to prevent stack overflow through naive drop impl.
101    impl<T, C> Drop for EventLink<T, C> {
102        fn drop(&mut self) {
103            while let Some(link) = self.next.replace(None) {
104                if let Ok(head) = Rc::try_unwrap(link) {
105                    *self = head;
106                }
107            }
108        }
109    }
110
111    impl<T, C> Default for EventLink<T, C> {
112        fn default() -> Self {
113            Self::new()
114        }
115    }
116
117    #[test]
118    fn avoid_stack_overflow_in_drop() {
119        #[cfg(miri)]
120        let limit = 1_000;
121        #[cfg(not(miri))]
122        let limit = 1_000_000;
123        let mut event1 = Rc::new(EventLink::<(),()>::new());
124        let _event2 = Rc::clone(&event1);
125        for _ in 0 .. limit {
126            event1.push(Event::Progress(vec![]));
127        }
128    }
129}
130
131/// A thread-safe linked-list event pusher and iterator.
132pub mod link_sync {
133
134    use std::borrow::Cow;
135    use std::sync::{Arc, Mutex};
136
137    use super::{Event, EventPusher, EventIterator};
138
139    /// A linked list of Event<T, C> usable across threads.
140    pub struct EventLink<T, C> {
141        /// An event, if one exists.
142        ///
143        /// An event might not exist, if either we want to insert a `None` and have the output iterator pause,
144        /// or in the case of the very first linked list element, which has no event when constructed.
145        pub event: Option<Event<T, C>>,
146        /// The next event, if it exists.
147        pub next: Mutex<Option<Arc<EventLink<T, C>>>>,
148    }
149
150    impl<T, C> EventLink<T, C> {
151        /// Allocates a new `EventLink`.
152        pub fn new() -> EventLink<T, C> {
153            EventLink { event: None, next: Mutex::new(None) }
154        }
155    }
156
157    impl<T, C> EventPusher<T, C> for Arc<EventLink<T, C>> {
158        fn push(&mut self, event: Event<T, C>) {
159            let mut guard = self.next.lock().unwrap();
160            *guard = Some(Arc::new(EventLink { event: Some(event), next: Mutex::new(None) }));
161            let next = Arc::clone(guard.as_ref().unwrap());
162            drop(guard);
163            *self = next;
164        }
165    }
166
167    impl<T: Clone, C: Clone> EventIterator<T, C> for Arc<EventLink<T, C>> {
168        fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
169            let is_some = self.next.lock().unwrap().is_some();
170            if is_some {
171                let next = Arc::clone(self.next.lock().unwrap().as_ref().unwrap());
172                *self = next;
173                if let Some(this) = Arc::get_mut(self) {
174                    this.event.take().map(Cow::Owned)
175                }
176                else {
177                    self.event.as_ref().map(Cow::Borrowed)
178                }
179            }
180            else {
181                None
182            }
183        }
184    }
185
186    // Drop implementation to prevent stack overflow through naive drop impl.
187    impl<T, C> Drop for EventLink<T, C> {
188        fn drop(&mut self) {
189            while let Some(link) = self.next.get_mut().unwrap().take() {
190                if let Ok(head) = Arc::try_unwrap(link) {
191                    *self = head;
192                }
193            }
194        }
195    }
196
197    impl<T, C> Default for EventLink<T, C> {
198        fn default() -> Self {
199            Self::new()
200        }
201    }
202
203    #[test]
204    fn avoid_stack_overflow_in_drop() {
205        #[cfg(miri)]
206        let limit = 1_000;
207        #[cfg(not(miri))]
208        let limit = 1_000_000;
209        let mut event1 = Arc::new(EventLink::<(),()>::new());
210        let _event2 = Arc::clone(&event1);
211        for _ in 0 .. limit {
212            event1.push(Event::Progress(vec![]));
213        }
214    }
215}
216
217/// A binary event pusher and iterator.
218pub mod binary {
219
220    use std::borrow::Cow;
221    use std::io::ErrorKind;
222    use std::ops::DerefMut;
223    use std::sync::Arc;
224
225    use serde::{de::DeserializeOwned, Serialize};
226    use timely_communication::allocator::zero_copy::bytes_slab::{BytesRefill, BytesSlab};
227
228    use super::{Event, EventPusher, EventIterator};
229
230    /// A wrapper for `W: Write` implementing `EventPusher<T, C>`.
231    pub struct EventWriter<T, C, W: ::std::io::Write> {
232        stream: W,
233        phant: ::std::marker::PhantomData<(T, C)>,
234    }
235
236    impl<T, C, W: ::std::io::Write> EventWriter<T, C, W> {
237        /// Allocates a new `EventWriter` wrapping a supplied writer.
238        pub fn new(w: W) -> Self {
239            Self {
240                stream: w,
241                phant: ::std::marker::PhantomData,
242            }
243        }
244    }
245
246    impl<T: Serialize, C: Serialize, W: ::std::io::Write> EventPusher<T, C> for EventWriter<T, C, W> {
247        fn push(&mut self, event: Event<T, C>) {
248            // TODO: `push` has no mechanism to report errors, so we `unwrap`.
249            let len = ::bincode::serialized_size(&event).expect("Event bincode failed");
250            self.stream.write_all(&len.to_le_bytes()).expect("Event write failed");
251            ::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode failed");
252        }
253    }
254
255    /// A Wrapper for `R: Read` implementing `EventIterator<T, D>`.
256    pub struct EventReader<T, C, R: ::std::io::Read> {
257        reader: R,
258        buf: BytesSlab,
259        phant: ::std::marker::PhantomData<(T, C)>,
260    }
261
262    impl<T, C, R: ::std::io::Read> EventReader<T, C, R> {
263        /// Allocates a new `EventReader` wrapping a supplied reader.
264        pub fn new(r: R) -> Self {
265            let refill = BytesRefill {
266                logic: Arc::new(|size| {
267                    Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target = [u8]>>
268                }),
269                limit: None,
270            };
271            Self {
272                reader: r,
273                buf: BytesSlab::new(20, refill),
274                phant: ::std::marker::PhantomData,
275            }
276        }
277    }
278
279    impl<T: DeserializeOwned + Clone, C: DeserializeOwned + Clone, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
280        fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
281            self.buf.ensure_capacity(1);
282            // Attempt to read some more bytes into self.buffer.
283            match self.reader.read(self.buf.empty()) {
284                Ok(n) => self.buf.make_valid(n),
285                Err(e) if e.kind() == ErrorKind::WouldBlock => {}
286                Err(e) => panic!("read failed: {e}"),
287            };
288
289            let valid = self.buf.valid();
290            if valid.len() >= 8 {
291                let event_len = u64::from_le_bytes([
292                    valid[0], valid[1], valid[2], valid[3], valid[4], valid[5], valid[6], valid[7],
293                ]);
294                let required_bytes = (event_len + 8) as usize;
295                if valid.len() >= required_bytes {
296                    let bytes = self.buf.extract(required_bytes);
297                    let event = ::bincode::deserialize(&bytes[8..]).expect("Event decode failed");
298                    Some(Cow::Owned(event))
299                } else {
300                    None
301                }
302            } else {
303                None
304            }
305        }
306    }
307}