timely/dataflow/operators/core/capture/
event.rs1use columnar::Columnar;
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Hash, Ord, PartialOrd, Eq, PartialEq, Deserialize, Serialize, Columnar)]
12pub enum Event<T, C> {
13 Progress(Vec<(T, i64)>),
15 Messages(T, C),
17}
18
19pub trait EventIterator<T: Clone, C: Clone> {
26 fn next(&mut self) -> Option<std::borrow::Cow<'_, Event<T, C>>>;
28}
29
30pub trait EventPusher<T, C> {
32 fn push(&mut self, event: Event<T, C>);
34}
35
36impl<T, C> EventPusher<T, C> for ::std::sync::mpsc::Sender<Event<T, C>> {
38 fn push(&mut self, event: Event<T, C>) {
39 let _ = self.send(event);
42 }
43}
44
45pub mod link {
47
48 use std::borrow::Cow;
49 use std::rc::Rc;
50 use std::cell::RefCell;
51
52 use super::{Event, EventPusher, EventIterator};
53
54 pub struct EventLink<T, C> {
56 pub event: Option<Event<T, C>>,
61 pub next: RefCell<Option<Rc<EventLink<T, C>>>>,
63 }
64
65 impl<T, C> EventLink<T, C> {
66 pub fn new() -> EventLink<T, C> {
68 EventLink { event: None, next: RefCell::new(None) }
69 }
70 }
71
72 impl<T, C> EventPusher<T, C> for Rc<EventLink<T, C>> {
74 fn push(&mut self, event: Event<T, C>) {
75 *self.next.borrow_mut() = Some(Rc::new(EventLink { event: Some(event), next: RefCell::new(None) }));
76 let next = Rc::clone(self.next.borrow().as_ref().unwrap());
77 *self = next;
78 }
79 }
80
81 impl<T: Clone, C: Clone> EventIterator<T, C> for Rc<EventLink<T, C>> {
82 fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
83 let is_some = self.next.borrow().is_some();
84 if is_some {
85 let next = Rc::clone(self.next.borrow().as_ref().unwrap());
86 *self = next;
87 if let Some(this) = Rc::get_mut(self) {
88 this.event.take().map(Cow::Owned)
89 }
90 else {
91 self.event.as_ref().map(Cow::Borrowed)
92 }
93 }
94 else {
95 None
96 }
97 }
98 }
99
100 impl<T, C> Drop for EventLink<T, C> {
102 fn drop(&mut self) {
103 while let Some(link) = self.next.replace(None) {
104 if let Ok(head) = Rc::try_unwrap(link) {
105 *self = head;
106 }
107 }
108 }
109 }
110
111 impl<T, C> Default for EventLink<T, C> {
112 fn default() -> Self {
113 Self::new()
114 }
115 }
116
117 #[test]
118 fn avoid_stack_overflow_in_drop() {
119 #[cfg(miri)]
120 let limit = 1_000;
121 #[cfg(not(miri))]
122 let limit = 1_000_000;
123 let mut event1 = Rc::new(EventLink::<(),()>::new());
124 let _event2 = Rc::clone(&event1);
125 for _ in 0 .. limit {
126 event1.push(Event::Progress(vec![]));
127 }
128 }
129}
130
131pub mod link_sync {
133
134 use std::borrow::Cow;
135 use std::sync::{Arc, Mutex};
136
137 use super::{Event, EventPusher, EventIterator};
138
139 pub struct EventLink<T, C> {
141 pub event: Option<Event<T, C>>,
146 pub next: Mutex<Option<Arc<EventLink<T, C>>>>,
148 }
149
150 impl<T, C> EventLink<T, C> {
151 pub fn new() -> EventLink<T, C> {
153 EventLink { event: None, next: Mutex::new(None) }
154 }
155 }
156
157 impl<T, C> EventPusher<T, C> for Arc<EventLink<T, C>> {
158 fn push(&mut self, event: Event<T, C>) {
159 let mut guard = self.next.lock().unwrap();
160 *guard = Some(Arc::new(EventLink { event: Some(event), next: Mutex::new(None) }));
161 let next = Arc::clone(guard.as_ref().unwrap());
162 drop(guard);
163 *self = next;
164 }
165 }
166
167 impl<T: Clone, C: Clone> EventIterator<T, C> for Arc<EventLink<T, C>> {
168 fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
169 let is_some = self.next.lock().unwrap().is_some();
170 if is_some {
171 let next = Arc::clone(self.next.lock().unwrap().as_ref().unwrap());
172 *self = next;
173 if let Some(this) = Arc::get_mut(self) {
174 this.event.take().map(Cow::Owned)
175 }
176 else {
177 self.event.as_ref().map(Cow::Borrowed)
178 }
179 }
180 else {
181 None
182 }
183 }
184 }
185
186 impl<T, C> Drop for EventLink<T, C> {
188 fn drop(&mut self) {
189 while let Some(link) = self.next.get_mut().unwrap().take() {
190 if let Ok(head) = Arc::try_unwrap(link) {
191 *self = head;
192 }
193 }
194 }
195 }
196
197 impl<T, C> Default for EventLink<T, C> {
198 fn default() -> Self {
199 Self::new()
200 }
201 }
202
203 #[test]
204 fn avoid_stack_overflow_in_drop() {
205 #[cfg(miri)]
206 let limit = 1_000;
207 #[cfg(not(miri))]
208 let limit = 1_000_000;
209 let mut event1 = Arc::new(EventLink::<(),()>::new());
210 let _event2 = Arc::clone(&event1);
211 for _ in 0 .. limit {
212 event1.push(Event::Progress(vec![]));
213 }
214 }
215}
216
217pub mod binary {
219
220 use std::borrow::Cow;
221 use std::io::ErrorKind;
222 use std::ops::DerefMut;
223 use std::sync::Arc;
224
225 use serde::{de::DeserializeOwned, Serialize};
226 use timely_communication::allocator::zero_copy::bytes_slab::{BytesRefill, BytesSlab};
227
228 use super::{Event, EventPusher, EventIterator};
229
230 pub struct EventWriter<T, C, W: ::std::io::Write> {
232 stream: W,
233 phant: ::std::marker::PhantomData<(T, C)>,
234 }
235
236 impl<T, C, W: ::std::io::Write> EventWriter<T, C, W> {
237 pub fn new(w: W) -> Self {
239 Self {
240 stream: w,
241 phant: ::std::marker::PhantomData,
242 }
243 }
244 }
245
246 impl<T: Serialize, C: Serialize, W: ::std::io::Write> EventPusher<T, C> for EventWriter<T, C, W> {
247 fn push(&mut self, event: Event<T, C>) {
248 let len = ::bincode::serialized_size(&event).expect("Event bincode failed");
250 self.stream.write_all(&len.to_le_bytes()).expect("Event write failed");
251 ::bincode::serialize_into(&mut self.stream, &event).expect("Event bincode failed");
252 }
253 }
254
255 pub struct EventReader<T, C, R: ::std::io::Read> {
257 reader: R,
258 buf: BytesSlab,
259 phant: ::std::marker::PhantomData<(T, C)>,
260 }
261
262 impl<T, C, R: ::std::io::Read> EventReader<T, C, R> {
263 pub fn new(r: R) -> Self {
265 let refill = BytesRefill {
266 logic: Arc::new(|size| {
267 Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target = [u8]>>
268 }),
269 limit: None,
270 };
271 Self {
272 reader: r,
273 buf: BytesSlab::new(20, refill),
274 phant: ::std::marker::PhantomData,
275 }
276 }
277 }
278
279 impl<T: DeserializeOwned + Clone, C: DeserializeOwned + Clone, R: ::std::io::Read> EventIterator<T, C> for EventReader<T, C, R> {
280 fn next(&mut self) -> Option<Cow<'_, Event<T, C>>> {
281 self.buf.ensure_capacity(1);
282 match self.reader.read(self.buf.empty()) {
284 Ok(n) => self.buf.make_valid(n),
285 Err(e) if e.kind() == ErrorKind::WouldBlock => {}
286 Err(e) => panic!("read failed: {e}"),
287 };
288
289 let valid = self.buf.valid();
290 if valid.len() >= 8 {
291 let event_len = u64::from_le_bytes([
292 valid[0], valid[1], valid[2], valid[3], valid[4], valid[5], valid[6], valid[7],
293 ]);
294 let required_bytes = (event_len + 8) as usize;
295 if valid.len() >= required_bytes {
296 let bytes = self.buf.extract(required_bytes);
297 let event = ::bincode::deserialize(&bytes[8..]).expect("Event decode failed");
298 Some(Cow::Owned(event))
299 } else {
300 None
301 }
302 } else {
303 None
304 }
305 }
306 }
307}