differential_dataflow/operators/arrange/agent.rs
1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::trace::{Trace, TraceReader, BatchReader};
14use crate::trace::wrappers::rc::TraceBox;
15
16use timely::scheduling::Activator;
17
18use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
19use super::TraceReplayInstruction;
20
21use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
22
23
24/// A `TraceReader` wrapper which can be imported into other dataflows.
25///
26/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
27/// from the dataflow in which it was defined, and imported into other dataflows.
28pub struct TraceAgent<Tr: TraceReader> {
29 trace: Rc<RefCell<TraceBox<Tr>>>,
30 queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
31 logical_compaction: Antichain<Tr::Time>,
32 physical_compaction: Antichain<Tr::Time>,
33 temp_antichain: Antichain<Tr::Time>,
34
35 operator: OperatorInfo,
36 logging: Option<crate::logging::Logger>,
37}
38
39use crate::trace::implementations::WithLayout;
40impl<Tr: TraceReader> WithLayout for TraceAgent<Tr> {
41 type Layout = Tr::Layout;
42}
43
44impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
45
46 type Batch = Tr::Batch;
47 type Storage = Tr::Storage;
48 type Cursor = Tr::Cursor;
49
50 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
51 // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
52 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
53 crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
54 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
55 ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
56 self.temp_antichain.clear();
57 }
58 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
59 self.logical_compaction.borrow()
60 }
61 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
62 // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
63 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
64 crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
65 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
66 ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
67 self.temp_antichain.clear();
68 }
69 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> {
70 self.physical_compaction.borrow()
71 }
72 fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
73 self.trace.borrow_mut().trace.cursor_through(frontier)
74 }
75 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
76}
77
78impl<Tr: TraceReader> TraceAgent<Tr> {
79 /// Creates a new agent from a trace reader.
80 pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
81 where
82 Tr: Trace,
83 {
84 let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
85 let queues = Rc::new(RefCell::new(Vec::new()));
86
87 if let Some(logging) = &logging {
88 logging.log(
89 crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
90 );
91 }
92
93 let reader = TraceAgent {
94 trace: trace.clone(),
95 queues: Rc::downgrade(&queues),
96 logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
97 physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
98 temp_antichain: Antichain::new(),
99 operator,
100 logging,
101 };
102
103 let writer = TraceWriter::new(
104 vec![<Tr::Time as Timestamp>::minimum()],
105 Rc::downgrade(&trace),
106 queues,
107 );
108
109 (reader, writer)
110 }
111
112 /// Attaches a new shared queue to the trace.
113 ///
114 /// The queue is first populated with existing batches from the trace,
115 /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
116 /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
117 pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
118 {
119 // create a new queue for progress and batch information.
120 let mut new_queue = VecDeque::new();
121
122 // add the existing batches from the trace
123 let mut upper = None;
124 self.trace
125 .borrow_mut()
126 .trace
127 .map_batches(|batch| {
128 new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(<Tr::Time as Timestamp>::minimum())));
129 upper = Some(batch.upper().clone());
130 });
131
132 if let Some(upper) = upper {
133 new_queue.push_back(TraceReplayInstruction::Frontier(upper));
134 }
135
136 let reference = Rc::new((activator, RefCell::new(new_queue)));
137
138 // wraps the queue in a ref-counted ref cell and enqueue/return it.
139 if let Some(queue) = self.queues.upgrade() {
140 queue.borrow_mut().push(Rc::downgrade(&reference));
141 }
142 reference.0.activate();
143 reference
144 }
145
146 /// The [OperatorInfo] of the underlying Timely operator
147 pub fn operator(&self) -> &OperatorInfo {
148 &self.operator
149 }
150
151 /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
152 /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
153 /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
154 ///
155 /// This method is subject to changes and removal and should not be considered part of a stable
156 /// interface.
157 pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
158 Rc::clone(&self.trace)
159 }
160}
161
162impl<Tr: TraceReader+'static> TraceAgent<Tr> {
163 /// Copies an existing collection into the supplied scope.
164 ///
165 /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
166 /// directly to the source collection brought into the local scope. The only caveat is that the initial state
167 /// of the collection is its current state, and updates occur from this point forward. The historical changes
168 /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
169 /// are no longer evident.
170 ///
171 /// The current behavior is that the introduced collection accumulates updates to some times less or equal
172 /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
173 /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
174 /// the historical collection may move through configurations that did not actually occur, even if eventually
175 /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
176 /// the intermediate computation could do something that the original computation did not, like diverge.
177 ///
178 /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
179 /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
180 /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
181 /// to advance times before using them).
182 ///
183 /// # Examples
184 ///
185 /// ```
186 /// use timely::Config;
187 /// use differential_dataflow::input::Input;
188 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
189 /// use differential_dataflow::operators::reduce::Reduce;
190 /// use differential_dataflow::trace::Trace;
191 ///
192 /// ::timely::execute(Config::thread(), |worker| {
193 ///
194 /// // create a first dataflow
195 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
196 /// // create input handle and collection.
197 /// scope.new_collection_from(0 .. 10).1
198 /// .arrange_by_self()
199 /// .trace
200 /// });
201 ///
202 /// // do some work.
203 /// worker.step();
204 /// worker.step();
205 ///
206 /// // create a second dataflow
207 /// worker.dataflow(move |scope| {
208 /// trace.import(scope)
209 /// .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
210 /// });
211 ///
212 /// }).unwrap();
213 /// ```
214 pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
215 where
216 G: Scope<Timestamp=Tr::Time>,
217 {
218 self.import_named(scope, "ArrangedSource")
219 }
220
221 /// Same as `import`, but allows to name the source.
222 pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
223 where
224 G: Scope<Timestamp=Tr::Time>,
225 {
226 // Drop ShutdownButton and return only the arrangement.
227 self.import_core(scope, name).0
228 }
229
230 /// Imports an arrangement into the supplied scope.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// use timely::Config;
236 /// use timely::dataflow::ProbeHandle;
237 /// use timely::dataflow::operators::Probe;
238 /// use differential_dataflow::input::InputSession;
239 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
240 /// use differential_dataflow::operators::reduce::Reduce;
241 /// use differential_dataflow::trace::Trace;
242 ///
243 /// ::timely::execute(Config::thread(), |worker| {
244 ///
245 /// let mut input = InputSession::<_,(),isize>::new();
246 /// let mut probe = ProbeHandle::new();
247 ///
248 /// // create a first dataflow
249 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
250 /// // create input handle and collection.
251 /// input.to_collection(scope)
252 /// .arrange_by_self()
253 /// .trace
254 /// });
255 ///
256 /// // do some work.
257 /// worker.step();
258 /// worker.step();
259 ///
260 /// // create a second dataflow
261 /// let mut shutdown = worker.dataflow(|scope| {
262 /// let (arrange, button) = trace.import_core(scope, "Import");
263 /// arrange.stream.probe_with(&mut probe);
264 /// button
265 /// });
266 ///
267 /// worker.step();
268 /// worker.step();
269 /// assert!(!probe.done());
270 ///
271 /// shutdown.press();
272 ///
273 /// worker.step();
274 /// worker.step();
275 /// assert!(probe.done());
276 ///
277 /// }).unwrap();
278 /// ```
279 pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
280 where
281 G: Scope<Timestamp=Tr::Time>,
282 {
283 let trace = self.clone();
284
285 let mut shutdown_button = None;
286
287 let stream = {
288
289 let shutdown_button_ref = &mut shutdown_button;
290 source(scope, name, move |capability, info| {
291
292 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
293
294 let activator = scope.activator_for(Rc::clone(&info.address));
295 let queue = self.new_listener(activator);
296
297 let activator = scope.activator_for(info.address);
298 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
299
300 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
301
302 move |output| {
303
304 let mut capabilities = capabilities.borrow_mut();
305 if let Some(ref mut capabilities) = *capabilities {
306
307 let mut borrow = queue.1.borrow_mut();
308 for instruction in borrow.drain(..) {
309 match instruction {
310 TraceReplayInstruction::Frontier(frontier) => {
311 capabilities.downgrade(&frontier.borrow()[..]);
312 },
313 TraceReplayInstruction::Batch(batch, hint) => {
314 if let Some(time) = hint {
315 if !batch.is_empty() {
316 let delayed = capabilities.delayed(&time);
317 output.session(&delayed).give(batch);
318 }
319 }
320 }
321 }
322 }
323 }
324 }
325 })
326 };
327
328 (Arranged { stream, trace }, shutdown_button.unwrap())
329 }
330
331 /// Imports an arrangement into the supplied scope.
332 ///
333 /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
334 ///
335 /// # Examples
336 ///
337 /// ```
338 /// use timely::Config;
339 /// use timely::progress::frontier::AntichainRef;
340 /// use timely::dataflow::ProbeHandle;
341 /// use timely::dataflow::operators::Probe;
342 /// use timely::dataflow::operators::Inspect;
343 /// use differential_dataflow::input::InputSession;
344 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
345 /// use differential_dataflow::operators::reduce::Reduce;
346 /// use differential_dataflow::trace::Trace;
347 /// use differential_dataflow::trace::TraceReader;
348 /// use differential_dataflow::input::Input;
349 ///
350 /// ::timely::execute(Config::thread(), |worker| {
351 ///
352 /// let mut probe = ProbeHandle::new();
353 ///
354 /// // create a first dataflow
355 /// let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
356 /// // create input handle and collection.
357 /// let (handle, stream) = scope.new_collection();
358 /// let trace = stream.arrange_by_self().trace;
359 /// (handle, trace)
360 /// });
361 ///
362 /// handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
363 /// handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
364 /// handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
365 /// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
366 /// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
367 ///
368 /// trace.set_logical_compaction(AntichainRef::new(&[5]));
369 ///
370 /// // create a second dataflow
371 /// let mut shutdown = worker.dataflow(|scope| {
372 /// let (arrange, button) = trace.import_frontier(scope, "Import");
373 /// arrange
374 /// .as_collection(|k,v| (*k,*v))
375 /// .inner
376 /// .inspect(|(d,t,r)| {
377 /// assert!(t >= &5);
378 /// })
379 /// .probe_with(&mut probe);
380 ///
381 /// button
382 /// });
383 ///
384 /// worker.step();
385 /// worker.step();
386 /// assert!(!probe.done());
387 ///
388 /// shutdown.press();
389 ///
390 /// worker.step();
391 /// worker.step();
392 /// assert!(probe.done());
393 ///
394 /// }).unwrap();
395 /// ```
396 pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
397 where
398 G: Scope<Timestamp=Tr::Time>,
399 Tr: TraceReader,
400 {
401 // This frontier describes our only guarantee on the compaction frontier.
402 let since = self.get_logical_compaction().to_owned();
403 self.import_frontier_core(scope, name, since, Antichain::new())
404 }
405
406 /// Import a trace restricted to a specific time interval `[since, until)`.
407 ///
408 /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
409 /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
410 /// to `until` the operator capability will be dropped.
411 ///
412 /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
413 /// frontier indicates the end of times.
414 pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
415 where
416 G: Scope<Timestamp=Tr::Time>,
417 Tr: TraceReader,
418 {
419 let trace = self.clone();
420 let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
421
422 let mut shutdown_button = None;
423
424 let stream = {
425
426 let shutdown_button_ref = &mut shutdown_button;
427 source(scope, name, move |capability, info| {
428
429 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
430
431 let activator = scope.activator_for(Rc::clone(&info.address));
432 let queue = self.new_listener(activator);
433
434 let activator = scope.activator_for(info.address);
435 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
436
437 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
438
439 move |output| {
440
441 let mut capabilities = capabilities.borrow_mut();
442 if let Some(ref mut capabilities) = *capabilities {
443 let mut borrow = queue.1.borrow_mut();
444 for instruction in borrow.drain(..) {
445 // If we have dropped the capabilities due to `until`, attempt no further work.
446 // Without the capabilities, we should soon be shut down (once this loop ends).
447 if !capabilities.is_empty() {
448 match instruction {
449 TraceReplayInstruction::Frontier(frontier) => {
450 if timely::PartialOrder::less_equal(&until, &frontier) {
451 // It might be nice to actively *drop* `capabilities`, but it seems
452 // complicated logically (i.e. we'd have to break out of the loop).
453 capabilities.downgrade(&[]);
454 } else {
455 capabilities.downgrade(&frontier.borrow()[..]);
456 }
457 },
458 TraceReplayInstruction::Batch(batch, hint) => {
459 if let Some(time) = hint {
460 if !batch.is_empty() {
461 let delayed = capabilities.delayed(&time);
462 output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
463 }
464 }
465 }
466 }
467 }
468 }
469 }
470 }
471 })
472 };
473
474 (Arranged { stream, trace }, shutdown_button.unwrap())
475 }
476}
477
478
479
480/// Wrapper than can drop shared references.
481pub struct ShutdownButton<T> {
482 reference: Rc<RefCell<Option<T>>>,
483 activator: Activator,
484}
485
486impl<T> ShutdownButton<T> {
487 /// Creates a new ShutdownButton.
488 pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
489 Self { reference, activator }
490 }
491 /// Push the shutdown button, dropping the shared objects.
492 pub fn press(&mut self) {
493 *self.reference.borrow_mut() = None;
494 self.activator.activate();
495 }
496 /// Hotwires the button to one that is pressed if dropped.
497 pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
498 ShutdownDeadmans {
499 button: self
500 }
501 }
502}
503
504/// A deadman's switch version of a shutdown button.
505///
506/// This type hosts a shutdown button and will press it when dropped.
507pub struct ShutdownDeadmans<T> {
508 button: ShutdownButton<T>,
509}
510
511impl<T> Drop for ShutdownDeadmans<T> {
512 fn drop(&mut self) {
513 self.button.press();
514 }
515}
516
517impl<Tr: TraceReader> Clone for TraceAgent<Tr> {
518 fn clone(&self) -> Self {
519
520 if let Some(logging) = &self.logging {
521 logging.log(
522 crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
523 );
524 }
525
526 // increase counts for wrapped `TraceBox`.
527 let empty_frontier = Antichain::new();
528 self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
529 self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
530
531 TraceAgent {
532 trace: self.trace.clone(),
533 queues: self.queues.clone(),
534 logical_compaction: self.logical_compaction.clone(),
535 physical_compaction: self.physical_compaction.clone(),
536 operator: self.operator.clone(),
537 logging: self.logging.clone(),
538 temp_antichain: Antichain::new(),
539 }
540 }
541}
542
543impl<Tr: TraceReader> Drop for TraceAgent<Tr> {
544 fn drop(&mut self) {
545
546 if let Some(logging) = &self.logging {
547 logging.log(
548 crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
549 );
550 }
551
552 // decrement borrow counts to remove all holds
553 let empty_frontier = Antichain::new();
554 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
555 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
556 }
557}