differential_dataflow/operators/arrange/agent.rs
1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::trace::{Trace, TraceReader, BatchReader};
14use crate::trace::wrappers::rc::TraceBox;
15
16use timely::scheduling::Activator;
17
18use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
19use super::TraceReplayInstruction;
20
21use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
22
23
24/// A `TraceReader` wrapper which can be imported into other dataflows.
25///
26/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
27/// from the dataflow in which it was defined, and imported into other dataflows.
28pub struct TraceAgent<Tr: TraceReader> {
29 trace: Rc<RefCell<TraceBox<Tr>>>,
30 queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
31 logical_compaction: Antichain<Tr::Time>,
32 physical_compaction: Antichain<Tr::Time>,
33 temp_antichain: Antichain<Tr::Time>,
34
35 operator: OperatorInfo,
36 logging: Option<crate::logging::Logger>,
37}
38
39impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
40 type Key<'a> = Tr::Key<'a>;
41 type Val<'a> = Tr::Val<'a>;
42 type Time = Tr::Time;
43 type TimeGat<'a> = Tr::TimeGat<'a>;
44 type Diff = Tr::Diff;
45 type DiffGat<'a> = Tr::DiffGat<'a>;
46
47 type Batch = Tr::Batch;
48 type Storage = Tr::Storage;
49 type Cursor = Tr::Cursor;
50
51 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
52 // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
53 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
54 crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
55 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
56 ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
57 self.temp_antichain.clear();
58 }
59 fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
60 self.logical_compaction.borrow()
61 }
62 fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
63 // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
64 // Instead, it determines the joint consequences of both guarantees and moves forward with that.
65 crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
66 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
67 ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
68 self.temp_antichain.clear();
69 }
70 fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
71 self.physical_compaction.borrow()
72 }
73 fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
74 self.trace.borrow_mut().trace.cursor_through(frontier)
75 }
76 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
77}
78
79impl<Tr: TraceReader> TraceAgent<Tr> {
80 /// Creates a new agent from a trace reader.
81 pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
82 where
83 Tr: Trace,
84 {
85 let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
86 let queues = Rc::new(RefCell::new(Vec::new()));
87
88 if let Some(logging) = &logging {
89 logging.log(
90 crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
91 );
92 }
93
94 let reader = TraceAgent {
95 trace: trace.clone(),
96 queues: Rc::downgrade(&queues),
97 logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
98 physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
99 temp_antichain: Antichain::new(),
100 operator,
101 logging,
102 };
103
104 let writer = TraceWriter::new(
105 vec![<Tr::Time as Timestamp>::minimum()],
106 Rc::downgrade(&trace),
107 queues,
108 );
109
110 (reader, writer)
111 }
112
113 /// Attaches a new shared queue to the trace.
114 ///
115 /// The queue is first populated with existing batches from the trace,
116 /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
117 /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
118 pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
119 {
120 // create a new queue for progress and batch information.
121 let mut new_queue = VecDeque::new();
122
123 // add the existing batches from the trace
124 let mut upper = None;
125 self.trace
126 .borrow_mut()
127 .trace
128 .map_batches(|batch| {
129 new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(<Tr::Time as Timestamp>::minimum())));
130 upper = Some(batch.upper().clone());
131 });
132
133 if let Some(upper) = upper {
134 new_queue.push_back(TraceReplayInstruction::Frontier(upper));
135 }
136
137 let reference = Rc::new((activator, RefCell::new(new_queue)));
138
139 // wraps the queue in a ref-counted ref cell and enqueue/return it.
140 if let Some(queue) = self.queues.upgrade() {
141 queue.borrow_mut().push(Rc::downgrade(&reference));
142 }
143 reference.0.activate();
144 reference
145 }
146
147 /// The [OperatorInfo] of the underlying Timely operator
148 pub fn operator(&self) -> &OperatorInfo {
149 &self.operator
150 }
151
152 /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
153 /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
154 /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
155 ///
156 /// This method is subject to changes and removal and should not be considered part of a stable
157 /// interface.
158 pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
159 Rc::clone(&self.trace)
160 }
161}
162
163impl<Tr: TraceReader+'static> TraceAgent<Tr> {
164 /// Copies an existing collection into the supplied scope.
165 ///
166 /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
167 /// directly to the source collection brought into the local scope. The only caveat is that the initial state
168 /// of the collection is its current state, and updates occur from this point forward. The historical changes
169 /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
170 /// are no longer evident.
171 ///
172 /// The current behavior is that the introduced collection accumulates updates to some times less or equal
173 /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
174 /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
175 /// the historical collection may move through configurations that did not actually occur, even if eventually
176 /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
177 /// the intermediate computation could do something that the original computation did not, like diverge.
178 ///
179 /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
180 /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
181 /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
182 /// to advance times before using them).
183 ///
184 /// # Examples
185 ///
186 /// ```
187 /// use timely::Config;
188 /// use differential_dataflow::input::Input;
189 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
190 /// use differential_dataflow::operators::reduce::Reduce;
191 /// use differential_dataflow::trace::Trace;
192 ///
193 /// ::timely::execute(Config::thread(), |worker| {
194 ///
195 /// // create a first dataflow
196 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
197 /// // create input handle and collection.
198 /// scope.new_collection_from(0 .. 10).1
199 /// .arrange_by_self()
200 /// .trace
201 /// });
202 ///
203 /// // do some work.
204 /// worker.step();
205 /// worker.step();
206 ///
207 /// // create a second dataflow
208 /// worker.dataflow(move |scope| {
209 /// trace.import(scope)
210 /// .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
211 /// });
212 ///
213 /// }).unwrap();
214 /// ```
215 pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
216 where
217 G: Scope<Timestamp=Tr::Time>,
218 {
219 self.import_named(scope, "ArrangedSource")
220 }
221
222 /// Same as `import`, but allows to name the source.
223 pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
224 where
225 G: Scope<Timestamp=Tr::Time>,
226 {
227 // Drop ShutdownButton and return only the arrangement.
228 self.import_core(scope, name).0
229 }
230
231 /// Imports an arrangement into the supplied scope.
232 ///
233 /// # Examples
234 ///
235 /// ```
236 /// use timely::Config;
237 /// use timely::dataflow::ProbeHandle;
238 /// use timely::dataflow::operators::Probe;
239 /// use differential_dataflow::input::InputSession;
240 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
241 /// use differential_dataflow::operators::reduce::Reduce;
242 /// use differential_dataflow::trace::Trace;
243 ///
244 /// ::timely::execute(Config::thread(), |worker| {
245 ///
246 /// let mut input = InputSession::<_,(),isize>::new();
247 /// let mut probe = ProbeHandle::new();
248 ///
249 /// // create a first dataflow
250 /// let mut trace = worker.dataflow::<u32,_,_>(|scope| {
251 /// // create input handle and collection.
252 /// input.to_collection(scope)
253 /// .arrange_by_self()
254 /// .trace
255 /// });
256 ///
257 /// // do some work.
258 /// worker.step();
259 /// worker.step();
260 ///
261 /// // create a second dataflow
262 /// let mut shutdown = worker.dataflow(|scope| {
263 /// let (arrange, button) = trace.import_core(scope, "Import");
264 /// arrange.stream.probe_with(&mut probe);
265 /// button
266 /// });
267 ///
268 /// worker.step();
269 /// worker.step();
270 /// assert!(!probe.done());
271 ///
272 /// shutdown.press();
273 ///
274 /// worker.step();
275 /// worker.step();
276 /// assert!(probe.done());
277 ///
278 /// }).unwrap();
279 /// ```
280 pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
281 where
282 G: Scope<Timestamp=Tr::Time>,
283 {
284 let trace = self.clone();
285
286 let mut shutdown_button = None;
287
288 let stream = {
289
290 let shutdown_button_ref = &mut shutdown_button;
291 source(scope, name, move |capability, info| {
292
293 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
294
295 let activator = scope.activator_for(Rc::clone(&info.address));
296 let queue = self.new_listener(activator);
297
298 let activator = scope.activator_for(info.address);
299 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
300
301 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
302
303 move |output| {
304
305 let mut capabilities = capabilities.borrow_mut();
306 if let Some(ref mut capabilities) = *capabilities {
307
308 let mut borrow = queue.1.borrow_mut();
309 for instruction in borrow.drain(..) {
310 match instruction {
311 TraceReplayInstruction::Frontier(frontier) => {
312 capabilities.downgrade(&frontier.borrow()[..]);
313 },
314 TraceReplayInstruction::Batch(batch, hint) => {
315 if let Some(time) = hint {
316 if !batch.is_empty() {
317 let delayed = capabilities.delayed(&time);
318 output.session(&delayed).give(batch);
319 }
320 }
321 }
322 }
323 }
324 }
325 }
326 })
327 };
328
329 (Arranged { stream, trace }, shutdown_button.unwrap())
330 }
331
332 /// Imports an arrangement into the supplied scope.
333 ///
334 /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
335 ///
336 /// # Examples
337 ///
338 /// ```
339 /// use timely::Config;
340 /// use timely::progress::frontier::AntichainRef;
341 /// use timely::dataflow::ProbeHandle;
342 /// use timely::dataflow::operators::Probe;
343 /// use timely::dataflow::operators::Inspect;
344 /// use differential_dataflow::input::InputSession;
345 /// use differential_dataflow::operators::arrange::ArrangeBySelf;
346 /// use differential_dataflow::operators::reduce::Reduce;
347 /// use differential_dataflow::trace::Trace;
348 /// use differential_dataflow::trace::TraceReader;
349 /// use differential_dataflow::input::Input;
350 ///
351 /// ::timely::execute(Config::thread(), |worker| {
352 ///
353 /// let mut probe = ProbeHandle::new();
354 ///
355 /// // create a first dataflow
356 /// let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
357 /// // create input handle and collection.
358 /// let (handle, stream) = scope.new_collection();
359 /// let trace = stream.arrange_by_self().trace;
360 /// (handle, trace)
361 /// });
362 ///
363 /// handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
364 /// handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
365 /// handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
366 /// handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
367 /// handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
368 ///
369 /// trace.set_logical_compaction(AntichainRef::new(&[5]));
370 ///
371 /// // create a second dataflow
372 /// let mut shutdown = worker.dataflow(|scope| {
373 /// let (arrange, button) = trace.import_frontier(scope, "Import");
374 /// arrange
375 /// .as_collection(|k,v| (*k,*v))
376 /// .inner
377 /// .inspect(|(d,t,r)| {
378 /// assert!(t >= &5);
379 /// })
380 /// .probe_with(&mut probe);
381 ///
382 /// button
383 /// });
384 ///
385 /// worker.step();
386 /// worker.step();
387 /// assert!(!probe.done());
388 ///
389 /// shutdown.press();
390 ///
391 /// worker.step();
392 /// worker.step();
393 /// assert!(probe.done());
394 ///
395 /// }).unwrap();
396 /// ```
397 pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
398 where
399 G: Scope<Timestamp=Tr::Time>,
400 Tr: TraceReader,
401 {
402 // This frontier describes our only guarantee on the compaction frontier.
403 let since = self.get_logical_compaction().to_owned();
404 self.import_frontier_core(scope, name, since, Antichain::new())
405 }
406
407 /// Import a trace restricted to a specific time interval `[since, until)`.
408 ///
409 /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
410 /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
411 /// to `until` the operator capability will be dropped.
412 ///
413 /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
414 /// frontier indicates the end of times.
415 pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
416 where
417 G: Scope<Timestamp=Tr::Time>,
418 Tr: TraceReader,
419 {
420 let trace = self.clone();
421 let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
422
423 let mut shutdown_button = None;
424
425 let stream = {
426
427 let shutdown_button_ref = &mut shutdown_button;
428 source(scope, name, move |capability, info| {
429
430 let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
431
432 let activator = scope.activator_for(Rc::clone(&info.address));
433 let queue = self.new_listener(activator);
434
435 let activator = scope.activator_for(info.address);
436 *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
437
438 capabilities.borrow_mut().as_mut().unwrap().insert(capability);
439
440 move |output| {
441
442 let mut capabilities = capabilities.borrow_mut();
443 if let Some(ref mut capabilities) = *capabilities {
444 let mut borrow = queue.1.borrow_mut();
445 for instruction in borrow.drain(..) {
446 // If we have dropped the capabilities due to `until`, attempt no further work.
447 // Without the capabilities, we should soon be shut down (once this loop ends).
448 if !capabilities.is_empty() {
449 match instruction {
450 TraceReplayInstruction::Frontier(frontier) => {
451 if timely::PartialOrder::less_equal(&until, &frontier) {
452 // It might be nice to actively *drop* `capabilities`, but it seems
453 // complicated logically (i.e. we'd have to break out of the loop).
454 capabilities.downgrade(&[]);
455 } else {
456 capabilities.downgrade(&frontier.borrow()[..]);
457 }
458 },
459 TraceReplayInstruction::Batch(batch, hint) => {
460 if let Some(time) = hint {
461 if !batch.is_empty() {
462 let delayed = capabilities.delayed(&time);
463 output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
464 }
465 }
466 }
467 }
468 }
469 }
470 }
471 }
472 })
473 };
474
475 (Arranged { stream, trace }, shutdown_button.unwrap())
476 }
477}
478
479
480
481/// Wrapper than can drop shared references.
482pub struct ShutdownButton<T> {
483 reference: Rc<RefCell<Option<T>>>,
484 activator: Activator,
485}
486
487impl<T> ShutdownButton<T> {
488 /// Creates a new ShutdownButton.
489 pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
490 Self { reference, activator }
491 }
492 /// Push the shutdown button, dropping the shared objects.
493 pub fn press(&mut self) {
494 *self.reference.borrow_mut() = None;
495 self.activator.activate();
496 }
497 /// Hotwires the button to one that is pressed if dropped.
498 pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
499 ShutdownDeadmans {
500 button: self
501 }
502 }
503}
504
505/// A deadman's switch version of a shutdown button.
506///
507/// This type hosts a shutdown button and will press it when dropped.
508pub struct ShutdownDeadmans<T> {
509 button: ShutdownButton<T>,
510}
511
512impl<T> Drop for ShutdownDeadmans<T> {
513 fn drop(&mut self) {
514 self.button.press();
515 }
516}
517
518impl<Tr: TraceReader> Clone for TraceAgent<Tr> {
519 fn clone(&self) -> Self {
520
521 if let Some(logging) = &self.logging {
522 logging.log(
523 crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
524 );
525 }
526
527 // increase counts for wrapped `TraceBox`.
528 let empty_frontier = Antichain::new();
529 self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
530 self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
531
532 TraceAgent {
533 trace: self.trace.clone(),
534 queues: self.queues.clone(),
535 logical_compaction: self.logical_compaction.clone(),
536 physical_compaction: self.physical_compaction.clone(),
537 operator: self.operator.clone(),
538 logging: self.logging.clone(),
539 temp_antichain: Antichain::new(),
540 }
541 }
542}
543
544impl<Tr: TraceReader> Drop for TraceAgent<Tr> {
545 fn drop(&mut self) {
546
547 if let Some(logging) = &self.logging {
548 logging.log(
549 crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
550 );
551 }
552
553 // decrement borrow counts to remove all holds
554 let empty_frontier = Antichain::new();
555 self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
556 self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
557 }
558}