differential_dataflow/trace/wrappers/
rc.rs1use std::rc::Rc;
15use std::cell::RefCell;
16
17use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}};
18
19use crate::trace::TraceReader;
20
21pub struct TraceBox<Tr: TraceReader> {
27 pub logical_compaction: MutableAntichain<Tr::Time>,
29 pub physical_compaction: MutableAntichain<Tr::Time>,
31 pub trace: Tr,
33}
34
35impl<Tr: TraceReader> TraceBox<Tr> {
36 pub fn new(mut trace: Tr) -> Self {
41
42 let mut logical_compaction = MutableAntichain::new();
43 logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
44 let mut physical_compaction = MutableAntichain::new();
45 physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
46
47 TraceBox {
48 logical_compaction,
49 physical_compaction,
50 trace,
51 }
52 }
53 #[inline]
55 pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
56 self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
57 self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
58 self.trace.set_logical_compaction(self.logical_compaction.frontier());
59 }
60 #[inline]
62 pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
63 self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
64 self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
65 self.trace.set_physical_compaction(self.physical_compaction.frontier());
66 }
67}
68
69pub struct TraceRc<Tr: TraceReader> {
75 logical_compaction: Antichain<Tr::Time>,
76 physical_compaction: Antichain<Tr::Time>,
77 pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
79}
80
81use crate::trace::WithLayout;
82impl<Tr: TraceReader> WithLayout for TraceRc<Tr> {
83 type Layout = Tr::Layout;
84}
85
86impl<Tr: TraceReader> TraceReader for TraceRc<Tr> {
87
88 type Batch = Tr::Batch;
89 type Storage = Tr::Storage;
90 type Cursor = Tr::Cursor;
91
92 fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
98 self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
99 self.logical_compaction = frontier.to_owned();
100 }
101 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.logical_compaction.borrow() }
102 fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>) {
104 self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
105 self.physical_compaction = frontier.to_owned();
106 }
107 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time> { self.physical_compaction.borrow() }
108 fn cursor_through(&mut self, frontier: AntichainRef<'_, Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
110 ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
111 }
112
113 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
114 ::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f)
115 }
116}
117
118impl<Tr: TraceReader> TraceRc<Tr> {
119 pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
121
122 let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
123
124 let handle = TraceRc {
125 logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
126 physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
127 wrapper: wrapped.clone(),
128 };
129
130 (handle, wrapped)
131 }
132}
133
134impl<Tr: TraceReader> Clone for TraceRc<Tr> {
135 fn clone(&self) -> Self {
136 self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
138 self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
139 TraceRc {
140 logical_compaction: self.logical_compaction.clone(),
141 physical_compaction: self.physical_compaction.clone(),
142 wrapper: self.wrapper.clone(),
143 }
144 }
145}
146
147impl<Tr: TraceReader> Drop for TraceRc<Tr> {
148 fn drop(&mut self) {
149 self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
150 self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
151 self.logical_compaction = Antichain::new();
152 self.physical_compaction = Antichain::new();
153 }
154}