differential_dataflow/trace/wrappers/
rc.rs1use std::rc::Rc;
15use std::cell::RefCell;
16
17use timely::progress::{Antichain, frontier::{AntichainRef, MutableAntichain}};
18
19use crate::trace::TraceReader;
20
21pub struct TraceBox<Tr: TraceReader> {
27 pub logical_compaction: MutableAntichain<Tr::Time>,
29 pub physical_compaction: MutableAntichain<Tr::Time>,
31 pub trace: Tr,
33}
34
35impl<Tr: TraceReader> TraceBox<Tr> {
36 pub fn new(mut trace: Tr) -> Self {
41
42 let mut logical_compaction = MutableAntichain::new();
43 logical_compaction.update_iter(trace.get_logical_compaction().iter().cloned().map(|t| (t,1)));
44 let mut physical_compaction = MutableAntichain::new();
45 physical_compaction.update_iter(trace.get_physical_compaction().iter().cloned().map(|t| (t,1)));
46
47 TraceBox {
48 logical_compaction,
49 physical_compaction,
50 trace,
51 }
52 }
53 #[inline]
55 pub fn adjust_logical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
56 self.logical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
57 self.logical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
58 self.trace.set_logical_compaction(self.logical_compaction.frontier());
59 }
60 #[inline]
62 pub fn adjust_physical_compaction(&mut self, lower: AntichainRef<Tr::Time>, upper: AntichainRef<Tr::Time>) {
63 self.physical_compaction.update_iter(upper.iter().cloned().map(|t| (t,1)));
64 self.physical_compaction.update_iter(lower.iter().cloned().map(|t| (t,-1)));
65 self.trace.set_physical_compaction(self.physical_compaction.frontier());
66 }
67}
68
69pub struct TraceRc<Tr: TraceReader> {
75 logical_compaction: Antichain<Tr::Time>,
76 physical_compaction: Antichain<Tr::Time>,
77 pub wrapper: Rc<RefCell<TraceBox<Tr>>>,
79}
80
81impl<Tr: TraceReader> TraceReader for TraceRc<Tr> {
82 type Key<'a> = Tr::Key<'a>;
83 type Val<'a> = Tr::Val<'a>;
84 type Time = Tr::Time;
85 type TimeGat<'a> = Tr::TimeGat<'a>;
86 type Diff = Tr::Diff;
87 type DiffGat<'a> = Tr::DiffGat<'a>;
88
89 type Batch = Tr::Batch;
90 type Storage = Tr::Storage;
91 type Cursor = Tr::Cursor;
92
93 fn set_logical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
99 self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), frontier);
100 self.logical_compaction = frontier.to_owned();
101 }
102 fn get_logical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.logical_compaction.borrow() }
103 fn set_physical_compaction(&mut self, frontier: AntichainRef<Tr::Time>) {
105 self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), frontier);
106 self.physical_compaction = frontier.to_owned();
107 }
108 fn get_physical_compaction(&mut self) -> AntichainRef<Tr::Time> { self.physical_compaction.borrow() }
109 fn cursor_through(&mut self, frontier: AntichainRef<Tr::Time>) -> Option<(Tr::Cursor, Tr::Storage)> {
111 ::std::cell::RefCell::borrow_mut(&self.wrapper).trace.cursor_through(frontier)
112 }
113
114 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
115 ::std::cell::RefCell::borrow(&self.wrapper).trace.map_batches(f)
116 }
117}
118
119impl<Tr: TraceReader> TraceRc<Tr> {
120 pub fn make_from(trace: Tr) -> (Self, Rc<RefCell<TraceBox<Tr>>>) {
122
123 let wrapped = Rc::new(RefCell::new(TraceBox::new(trace)));
124
125 let handle = TraceRc {
126 logical_compaction: wrapped.borrow().logical_compaction.frontier().to_owned(),
127 physical_compaction: wrapped.borrow().physical_compaction.frontier().to_owned(),
128 wrapper: wrapped.clone(),
129 };
130
131 (handle, wrapped)
132 }
133}
134
135impl<Tr: TraceReader> Clone for TraceRc<Tr> {
136 fn clone(&self) -> Self {
137 self.wrapper.borrow_mut().adjust_logical_compaction(Antichain::new().borrow(), self.logical_compaction.borrow());
139 self.wrapper.borrow_mut().adjust_physical_compaction(Antichain::new().borrow(), self.physical_compaction.borrow());
140 TraceRc {
141 logical_compaction: self.logical_compaction.clone(),
142 physical_compaction: self.physical_compaction.clone(),
143 wrapper: self.wrapper.clone(),
144 }
145 }
146}
147
148impl<Tr: TraceReader> Drop for TraceRc<Tr> {
149 fn drop(&mut self) {
150 self.wrapper.borrow_mut().adjust_logical_compaction(self.logical_compaction.borrow(), Antichain::new().borrow());
151 self.wrapper.borrow_mut().adjust_physical_compaction(self.physical_compaction.borrow(), Antichain::new().borrow());
152 self.logical_compaction = Antichain::new();
153 self.physical_compaction = Antichain::new();
154 }
155}