differential_dataflow/operators/arrange/
agent.rs

1//! Shared read access to a trace.
2
3use std::rc::{Rc, Weak};
4use std::cell::RefCell;
5use std::collections::VecDeque;
6
7use timely::dataflow::Scope;
8use timely::dataflow::operators::generic::{OperatorInfo, source};
9use timely::progress::Timestamp;
10use timely::progress::{Antichain, frontier::AntichainRef};
11use timely::dataflow::operators::CapabilitySet;
12
13use crate::trace::{Trace, TraceReader, BatchReader};
14use crate::trace::wrappers::rc::TraceBox;
15
16use timely::scheduling::Activator;
17
18use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
19use super::TraceReplayInstruction;
20
21use crate::trace::wrappers::frontier::{TraceFrontier, BatchFrontier};
22
23
24/// A `TraceReader` wrapper which can be imported into other dataflows.
25///
26/// The `TraceAgent` is the default trace type produced by `arranged`, and it can be extracted
27/// from the dataflow in which it was defined, and imported into other dataflows.
28pub struct TraceAgent<Tr: TraceReader> {
29    trace: Rc<RefCell<TraceBox<Tr>>>,
30    queues: Weak<RefCell<Vec<TraceAgentQueueWriter<Tr>>>>,
31    logical_compaction: Antichain<Tr::Time>,
32    physical_compaction: Antichain<Tr::Time>,
33    temp_antichain: Antichain<Tr::Time>,
34
35    operator: OperatorInfo,
36    logging: Option<crate::logging::Logger>,
37}
38
39impl<Tr: TraceReader> TraceReader for TraceAgent<Tr> {
40    type Key<'a> = Tr::Key<'a>;
41    type Val<'a> = Tr::Val<'a>;
42    type Time = Tr::Time;
43    type TimeGat<'a> = Tr::TimeGat<'a>;
44    type Diff = Tr::Diff;
45    type DiffGat<'a> = Tr::DiffGat<'a>;
46
47    type Batch = Tr::Batch;
48    type Storage = Tr::Storage;
49    type Cursor = Tr::Cursor;
50
51    fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
52        // This method does not enforce that `frontier` is greater or equal to `self.logical_compaction`.
53        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
54        crate::lattice::antichain_join_into(&self.logical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
55        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), self.temp_antichain.borrow());
56        ::std::mem::swap(&mut self.logical_compaction, &mut self.temp_antichain);
57        self.temp_antichain.clear();
58    }
59    fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> {
60        self.logical_compaction.borrow()
61    }
62    fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
63        // This method does not enforce that `frontier` is greater or equal to `self.physical_compaction`.
64        // Instead, it determines the joint consequences of both guarantees and moves forward with that.
65        crate::lattice::antichain_join_into(&self.physical_compaction.borrow()[..], &frontier[..], &mut self.temp_antichain);
66        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), self.temp_antichain.borrow());
67        ::std::mem::swap(&mut self.physical_compaction, &mut self.temp_antichain);
68        self.temp_antichain.clear();
69    }
70    fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> {
71        self.physical_compaction.borrow()
72    }
73    fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Self::Cursor, Self::Storage)> {
74        self.trace.borrow_mut().trace.cursor_through(frontier)
75    }
76    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) { self.trace.borrow().trace.map_batches(f) }
77}
78
79impl<Tr: TraceReader> TraceAgent<Tr> {
80    /// Creates a new agent from a trace reader.
81    pub fn new(trace: Tr, operator: OperatorInfo, logging: Option<crate::logging::Logger>) -> (Self, TraceWriter<Tr>)
82    where
83        Tr: Trace,
84    {
85        let trace = Rc::new(RefCell::new(TraceBox::new(trace)));
86        let queues = Rc::new(RefCell::new(Vec::new()));
87
88        if let Some(logging) = &logging {
89            logging.log(
90                crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
91            );
92        }
93
94        let reader = TraceAgent {
95            trace: trace.clone(),
96            queues: Rc::downgrade(&queues),
97            logical_compaction: trace.borrow().logical_compaction.frontier().to_owned(),
98            physical_compaction: trace.borrow().physical_compaction.frontier().to_owned(),
99            temp_antichain: Antichain::new(),
100            operator,
101            logging,
102        };
103
104        let writer = TraceWriter::new(
105            vec![<Tr::Time as Timestamp>::minimum()],
106            Rc::downgrade(&trace),
107            queues,
108        );
109
110        (reader, writer)
111    }
112
113    /// Attaches a new shared queue to the trace.
114    ///
115    /// The queue is first populated with existing batches from the trace,
116    /// The queue will be immediately populated with existing historical batches from the trace, and until the reference
117    /// is dropped the queue will receive new batches as produced by the source `arrange` operator.
118    pub fn new_listener(&mut self, activator: Activator) -> TraceAgentQueueReader<Tr>
119    {
120        // create a new queue for progress and batch information.
121        let mut new_queue = VecDeque::new();
122
123        // add the existing batches from the trace
124        let mut upper = None;
125        self.trace
126            .borrow_mut()
127            .trace
128            .map_batches(|batch| {
129                new_queue.push_back(TraceReplayInstruction::Batch(batch.clone(), Some(<Tr::Time as Timestamp>::minimum())));
130                upper = Some(batch.upper().clone());
131            });
132
133        if let Some(upper) = upper {
134            new_queue.push_back(TraceReplayInstruction::Frontier(upper));
135        }
136
137        let reference = Rc::new((activator, RefCell::new(new_queue)));
138
139        // wraps the queue in a ref-counted ref cell and enqueue/return it.
140        if let Some(queue) = self.queues.upgrade() {
141            queue.borrow_mut().push(Rc::downgrade(&reference));
142        }
143        reference.0.activate();
144        reference
145    }
146
147    /// The [OperatorInfo] of the underlying Timely operator
148    pub fn operator(&self) -> &OperatorInfo {
149        &self.operator
150    }
151
152    /// Obtain a reference to the inner [`TraceBox`]. It is the caller's obligation to maintain
153    /// the trace box and this trace agent's invariants. Specifically, it is undefined behavior
154    /// to mutate the trace box. Keeping strong references can prevent resource reclamation.
155    ///
156    /// This method is subject to changes and removal and should not be considered part of a stable
157    /// interface.
158    pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>> {
159        Rc::clone(&self.trace)
160    }
161}
162
163impl<Tr: TraceReader+'static> TraceAgent<Tr> {
164    /// Copies an existing collection into the supplied scope.
165    ///
166    /// This method creates an `Arranged` collection that should appear indistinguishable from applying `arrange`
167    /// directly to the source collection brought into the local scope. The only caveat is that the initial state
168    /// of the collection is its current state, and updates occur from this point forward. The historical changes
169    /// the collection experienced in the past are accumulated, and the distinctions from the initial collection
170    /// are no longer evident.
171    ///
172    /// The current behavior is that the introduced collection accumulates updates to some times less or equal
173    /// to `self.get_logical_compaction()`. There is *not* currently a guarantee that the updates are accumulated *to*
174    /// the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
175    /// the historical collection may move through configurations that did not actually occur, even if eventually
176    /// arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
177    /// the intermediate computation could do something that the original computation did not, like diverge.
178    ///
179    /// I would expect the semantics to improve to "updates are advanced to `self.get_logical_compaction()`", which
180    /// means the computation will run as if starting from exactly this frontier. It is not currently clear whose
181    /// responsibility this should be (the trace/batch should only reveal these times, or an operator should know
182    /// to advance times before using them).
183    ///
184    /// # Examples
185    ///
186    /// ```
187    /// use timely::Config;
188    /// use differential_dataflow::input::Input;
189    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
190    /// use differential_dataflow::operators::reduce::Reduce;
191    /// use differential_dataflow::trace::Trace;
192    ///
193    /// ::timely::execute(Config::thread(), |worker| {
194    ///
195    ///     // create a first dataflow
196    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
197    ///         // create input handle and collection.
198    ///         scope.new_collection_from(0 .. 10).1
199    ///              .arrange_by_self()
200    ///              .trace
201    ///     });
202    ///
203    ///     // do some work.
204    ///     worker.step();
205    ///     worker.step();
206    ///
207    ///     // create a second dataflow
208    ///     worker.dataflow(move |scope| {
209    ///         trace.import(scope)
210    ///              .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
211    ///     });
212    ///
213    /// }).unwrap();
214    /// ```
215    pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
216    where
217        G: Scope<Timestamp=Tr::Time>,
218    {
219        self.import_named(scope, "ArrangedSource")
220    }
221
222    /// Same as `import`, but allows to name the source.
223    pub fn import_named<G>(&mut self, scope: &G, name: &str) -> Arranged<G, TraceAgent<Tr>>
224    where
225        G: Scope<Timestamp=Tr::Time>,
226    {
227        // Drop ShutdownButton and return only the arrangement.
228        self.import_core(scope, name).0
229    }
230
231    /// Imports an arrangement into the supplied scope.
232    ///
233    /// # Examples
234    ///
235    /// ```
236    /// use timely::Config;
237    /// use timely::dataflow::ProbeHandle;
238    /// use timely::dataflow::operators::Probe;
239    /// use differential_dataflow::input::InputSession;
240    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
241    /// use differential_dataflow::operators::reduce::Reduce;
242    /// use differential_dataflow::trace::Trace;
243    ///
244    /// ::timely::execute(Config::thread(), |worker| {
245    ///
246    ///     let mut input = InputSession::<_,(),isize>::new();
247    ///     let mut probe = ProbeHandle::new();
248    ///
249    ///     // create a first dataflow
250    ///     let mut trace = worker.dataflow::<u32,_,_>(|scope| {
251    ///         // create input handle and collection.
252    ///         input.to_collection(scope)
253    ///              .arrange_by_self()
254    ///              .trace
255    ///     });
256    ///
257    ///     // do some work.
258    ///     worker.step();
259    ///     worker.step();
260    ///
261    ///     // create a second dataflow
262    ///     let mut shutdown = worker.dataflow(|scope| {
263    ///         let (arrange, button) = trace.import_core(scope, "Import");
264    ///         arrange.stream.probe_with(&mut probe);
265    ///         button
266    ///     });
267    ///
268    ///     worker.step();
269    ///     worker.step();
270    ///     assert!(!probe.done());
271    ///
272    ///     shutdown.press();
273    ///
274    ///     worker.step();
275    ///     worker.step();
276    ///     assert!(probe.done());
277    ///
278    /// }).unwrap();
279    /// ```
280    pub fn import_core<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
281    where
282        G: Scope<Timestamp=Tr::Time>,
283    {
284        let trace = self.clone();
285
286        let mut shutdown_button = None;
287
288        let stream = {
289
290            let shutdown_button_ref = &mut shutdown_button;
291            source(scope, name, move |capability, info| {
292
293                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
294
295                let activator = scope.activator_for(Rc::clone(&info.address));
296                let queue = self.new_listener(activator);
297
298                let activator = scope.activator_for(info.address);
299                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
300
301                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
302
303                move |output| {
304
305                    let mut capabilities = capabilities.borrow_mut();
306                    if let Some(ref mut capabilities) = *capabilities {
307
308                        let mut borrow = queue.1.borrow_mut();
309                        for instruction in borrow.drain(..) {
310                            match instruction {
311                                TraceReplayInstruction::Frontier(frontier) => {
312                                    capabilities.downgrade(&frontier.borrow()[..]);
313                                },
314                                TraceReplayInstruction::Batch(batch, hint) => {
315                                    if let Some(time) = hint {
316                                        if !batch.is_empty() {
317                                            let delayed = capabilities.delayed(&time);
318                                            output.session(&delayed).give(batch);
319                                        }
320                                    }
321                                }
322                            }
323                        }
324                    }
325                }
326            })
327        };
328
329        (Arranged { stream, trace }, shutdown_button.unwrap())
330    }
331
332    /// Imports an arrangement into the supplied scope.
333    ///
334    /// This variant of import uses the `get_logical_compaction` to forcibly advance timestamps in updates.
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// use timely::Config;
340    /// use timely::progress::frontier::AntichainRef;
341    /// use timely::dataflow::ProbeHandle;
342    /// use timely::dataflow::operators::Probe;
343    /// use timely::dataflow::operators::Inspect;
344    /// use differential_dataflow::input::InputSession;
345    /// use differential_dataflow::operators::arrange::ArrangeBySelf;
346    /// use differential_dataflow::operators::reduce::Reduce;
347    /// use differential_dataflow::trace::Trace;
348    /// use differential_dataflow::trace::TraceReader;
349    /// use differential_dataflow::input::Input;
350    ///
351    /// ::timely::execute(Config::thread(), |worker| {
352    ///
353    ///     let mut probe = ProbeHandle::new();
354    ///
355    ///     // create a first dataflow
356    ///     let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
357    ///         // create input handle and collection.
358    ///         let (handle, stream) = scope.new_collection();
359    ///         let trace = stream.arrange_by_self().trace;
360    ///         (handle, trace)
361    ///     });
362    ///
363    ///     handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
364    ///     handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
365    ///     handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
366    ///     handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
367    ///     handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
368    ///
369    ///     trace.set_logical_compaction(AntichainRef::new(&[5]));
370    ///
371    ///     // create a second dataflow
372    ///     let mut shutdown = worker.dataflow(|scope| {
373    ///         let (arrange, button) = trace.import_frontier(scope, "Import");
374    ///         arrange
375    ///             .as_collection(|k,v| (*k,*v))
376    ///             .inner
377    ///             .inspect(|(d,t,r)| {
378    ///                 assert!(t >= &5);
379    ///             })
380    ///             .probe_with(&mut probe);
381    ///
382    ///         button
383    ///     });
384    ///
385    ///     worker.step();
386    ///     worker.step();
387    ///     assert!(!probe.done());
388    ///
389    ///     shutdown.press();
390    ///
391    ///     worker.step();
392    ///     worker.step();
393    ///     assert!(probe.done());
394    ///
395    /// }).unwrap();
396    /// ```
397    pub fn import_frontier<G>(&mut self, scope: &G, name: &str) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
398    where
399        G: Scope<Timestamp=Tr::Time>,
400        Tr: TraceReader,
401    {
402        // This frontier describes our only guarantee on the compaction frontier.
403        let since = self.get_logical_compaction().to_owned();
404        self.import_frontier_core(scope, name, since, Antichain::new())
405    }
406
407    /// Import a trace restricted to a specific time interval `[since, until)`.
408    ///
409    /// All updates present in the input trace will be first advanced to `since`, and then either emitted,
410    /// or if greater or equal to `until`, suppressed. Once all times are certain to be greater or equal
411    /// to `until` the operator capability will be dropped.
412    ///
413    /// Invoking this method with an `until` of `Antichain::new()` will perform no filtering, as the empty
414    /// frontier indicates the end of times.
415    pub fn import_frontier_core<G>(&mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
416    where
417        G: Scope<Timestamp=Tr::Time>,
418        Tr: TraceReader,
419    {
420        let trace = self.clone();
421        let trace = TraceFrontier::make_from(trace, since.borrow(), until.borrow());
422
423        let mut shutdown_button = None;
424
425        let stream = {
426
427            let shutdown_button_ref = &mut shutdown_button;
428            source(scope, name, move |capability, info| {
429
430                let capabilities = Rc::new(RefCell::new(Some(CapabilitySet::new())));
431
432                let activator = scope.activator_for(Rc::clone(&info.address));
433                let queue = self.new_listener(activator);
434
435                let activator = scope.activator_for(info.address);
436                *shutdown_button_ref = Some(ShutdownButton::new(capabilities.clone(), activator));
437
438                capabilities.borrow_mut().as_mut().unwrap().insert(capability);
439
440                move |output| {
441
442                    let mut capabilities = capabilities.borrow_mut();
443                    if let Some(ref mut capabilities) = *capabilities {
444                        let mut borrow = queue.1.borrow_mut();
445                        for instruction in borrow.drain(..) {
446                            // If we have dropped the capabilities due to `until`, attempt no further work.
447                            // Without the capabilities, we should soon be shut down (once this loop ends).
448                            if !capabilities.is_empty() {
449                                match instruction {
450                                    TraceReplayInstruction::Frontier(frontier) => {
451                                        if timely::PartialOrder::less_equal(&until, &frontier) {
452                                            // It might be nice to actively *drop* `capabilities`, but it seems
453                                            // complicated logically (i.e. we'd have to break out of the loop).
454                                            capabilities.downgrade(&[]);
455                                        } else {
456                                            capabilities.downgrade(&frontier.borrow()[..]);
457                                        }
458                                    },
459                                    TraceReplayInstruction::Batch(batch, hint) => {
460                                        if let Some(time) = hint {
461                                            if !batch.is_empty() {
462                                                let delayed = capabilities.delayed(&time);
463                                                output.session(&delayed).give(BatchFrontier::make_from(batch, since.borrow(), until.borrow()));
464                                            }
465                                        }
466                                    }
467                                }
468                            }
469                        }
470                    }
471                }
472            })
473        };
474
475        (Arranged { stream, trace }, shutdown_button.unwrap())
476    }
477}
478
479
480
481/// Wrapper than can drop shared references.
482pub struct ShutdownButton<T> {
483    reference: Rc<RefCell<Option<T>>>,
484    activator: Activator,
485}
486
487impl<T> ShutdownButton<T> {
488    /// Creates a new ShutdownButton.
489    pub fn new(reference: Rc<RefCell<Option<T>>>, activator: Activator) -> Self {
490        Self { reference, activator }
491    }
492    /// Push the shutdown button, dropping the shared objects.
493    pub fn press(&mut self) {
494        *self.reference.borrow_mut() = None;
495        self.activator.activate();
496    }
497    /// Hotwires the button to one that is pressed if dropped.
498    pub fn press_on_drop(self) -> ShutdownDeadmans<T> {
499        ShutdownDeadmans {
500            button: self
501        }
502    }
503}
504
505/// A deadman's switch version of a shutdown button.
506///
507/// This type hosts a shutdown button and will press it when dropped.
508pub struct ShutdownDeadmans<T> {
509    button: ShutdownButton<T>,
510}
511
512impl<T> Drop for ShutdownDeadmans<T> {
513    fn drop(&mut self) {
514        self.button.press();
515    }
516}
517
518impl<Tr: TraceReader> Clone for TraceAgent<Tr> {
519    fn clone(&self) -> Self {
520
521        if let Some(logging) = &self.logging {
522            logging.log(
523                crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
524            );
525        }
526
527        // increase counts for wrapped `TraceBox`.
528        let empty_frontier = Antichain::new();
529        self.trace.borrow_mut().adjust_logical_compaction(empty_frontier.borrow(), self.logical_compaction.borrow());
530        self.trace.borrow_mut().adjust_physical_compaction(empty_frontier.borrow(), self.physical_compaction.borrow());
531
532        TraceAgent {
533            trace: self.trace.clone(),
534            queues: self.queues.clone(),
535            logical_compaction: self.logical_compaction.clone(),
536            physical_compaction: self.physical_compaction.clone(),
537            operator: self.operator.clone(),
538            logging: self.logging.clone(),
539            temp_antichain: Antichain::new(),
540        }
541    }
542}
543
544impl<Tr: TraceReader> Drop for TraceAgent<Tr> {
545    fn drop(&mut self) {
546
547        if let Some(logging) = &self.logging {
548            logging.log(
549                crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
550            );
551        }
552
553        // decrement borrow counts to remove all holds
554        let empty_frontier = Antichain::new();
555        self.trace.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), empty_frontier.borrow());
556        self.trace.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), empty_frontier.borrow());
557    }
558}