differential_dataflow/trace/wrappers/
rc.rs

1//! A reference-counted wrapper sharing one owned trace.
2//!
3//! The types in this module, `TraceBox` and `TraceRc` and meant to parallel `RcBox` and `Rc` in `std::rc`.
4//!
5//! The first typee is an owned trace with some information about the cumulative requirements of the shared
6//! handles. This is roughly how much progress has each made, so we know which "read capabilities" they have
7//! collectively dropped, and when it is safe to inform the trace of such progress.
8//!
9//! The second type is a wrapper which presents as a `TraceReader`, but whose methods for advancing its read
10//! capabilities interact with the `TraceBox` rather than directly with the owned trace. Ideally, instances
11//! `TraceRc` should appear indistinguishable from the underlying trace from a reading perspective, with the
12//! exception that the trace may not compact its representation as fast as if it were exclusively owned.
13
14use std::rc::Rc;
15use std::cell::RefCell;
16
17use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}};
18
19use crate::trace::TraceReader;
20
21/// A wrapper around a trace which tracks the frontiers of all referees.
22///
23/// This is an internal type, unlikely to be useful to higher-level programs, but exposed just in case.
24/// This type is equivalent to a `RefCell`, in that it wraps the mutable state that multiple referrers
25/// may influence.
26pub struct TraceBox<Tr: TraceReader> {
27    /// accumulated holds on times for advancement.
28    pub logical_compaction: MutableAntichain<Tr::Time>,
29    /// accumulated holds on times for distinction.
30    pub physical_compaction: MutableAntichain<Tr::Time>,
31    /// The wrapped trace.
32    pub trace: Tr,
33}
34
35impl<Tr: TraceReader> TraceBox<Tr> {
36    /// Moves an existing trace into a shareable trace wrapper.
37    ///
38    /// The trace may already exist and have non-initial advance and distinguish frontiers. The boxing
39    /// process will fish these out and make sure that they are used for the initial read capabilities.
40    pub fn new(mut trace: Tr) -> Self {
41
42        let mut logical_compaction = MutableAntichain::new();
43        logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
44        let mut physical_compaction = MutableAntichain::new();
45        physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
46
47        TraceBox {
48            logical_compaction,
49            physical_compaction,
50            trace,
51        }
52    }
53    /// Replaces elements of `lower` with those of `upper`.
54    #[inline]
55    pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
56        self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
57        self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
58        self.trace.set_logical_compaction(self.logical_compaction.frontier());
59    }
60    /// Replaces elements of `lower` with those of `upper`.
61    #[inline]
62    pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
63        self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
64        self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
65        self.trace.set_physical_compaction(self.physical_compaction.frontier());
66    }
67}
68
69/// A handle to a shared trace.
70///
71/// As long as the handle exists, the wrapped trace should continue to exist and will not advance its
72/// timestamps past the frontier maintained by the handle. The intent is that such a handle appears as
73/// if it is a privately maintained trace, despite being backed by shared resources.
74pub struct TraceRc<Tr: TraceReader> {
75    logical_compaction: Antichain<Tr::Time>,
76    physical_compaction: Antichain<Tr::Time>,
77    /// Wrapped trace. Please be gentle when using.
78    pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
79}
80
81use crate::trace::WithLayout;
82impl<Tr: TraceReader> WithLayout for TraceRc<Tr> {
83    type Layout = Tr::Layout;
84}
85
86impl<Tr: TraceReader> TraceReader for TraceRc<Tr> {
87
88    type Batch = Tr::Batch;
89    type Storage = Tr::Storage;
90    type Cursor = Tr::Cursor;
91
92    /// Sets frontier to now be elements in `frontier`.
93    ///
94    /// This change may not have immediately observable effects. It informs the shared trace that this
95    /// handle no longer requires access to times other than those in the future of `frontier`, but if
96    /// there are other handles to the same trace, it may not yet be able to compact.
97    fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
98        self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
99        self.logical_compaction = frontier.to_owned();
100    }
101    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.logical_compaction.borrow() }
102    /// Allows the trace to compact batches of times before `frontier`.
103    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
104        self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
105        self.physical_compaction = frontier.to_owned();
106    }
107    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.physical_compaction.borrow() }
108    /// Creates a new cursor over the wrapped trace.
109    fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
110        ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
111    }
112
113    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
114        ::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f)
115    }
116}
117
118impl<Tr: TraceReader> TraceRc<Tr> {
119    /// Allocates a new handle from an existing wrapped wrapper.
120    pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
121
122        let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
123
124        let handle = TraceRc {
125            logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
126            physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
127            wrapper: wrapped.clone(),
128        };
129
130        (handle, wrapped)
131    }
132}
133
134impl<Tr: TraceReader> Clone for TraceRc<Tr> {
135    fn clone(&self) -> Self {
136        // increase ref counts for this frontier
137        self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
138        self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
139        TraceRc {
140            logical_compaction: self.logical_compaction.clone(),
141            physical_compaction: self.physical_compaction.clone(),
142            wrapper: self.wrapper.clone(),
143        }
144    }
145}
146
147impl<Tr: TraceReader> Drop for TraceRc<Tr> {
148    fn drop(&mut self) {
149        self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
150        self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
151        self.logical_compaction = Antichain::new();
152        self.physical_compaction = Antichain::new();
153    }
154}