mz_compute/arrangement/
manager.rs1use std::any::Any;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Instant;
16
17use differential_dataflow::lattice::antichain_join;
18use differential_dataflow::operators::arrange::{Arranged, ShutdownButton, TraceAgent};
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::trace::implementations::WithLayout;
21use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
22use mz_repr::{Diff, GlobalId, Timestamp};
23use timely::PartialOrder;
24use timely::dataflow::Scope;
25use timely::dataflow::operators::CapabilitySet;
26use timely::progress::Timestamp as _;
27use timely::progress::frontier::{Antichain, AntichainRef};
28
29use crate::metrics::WorkerMetrics;
30use crate::typedefs::{ErrAgent, RowRowAgent};
31
32pub struct TraceManager {
35 pub(crate) traces: BTreeMap<GlobalId, TraceBundle>,
36 metrics: WorkerMetrics,
37}
38
39impl TraceManager {
40 pub fn new(metrics: WorkerMetrics) -> Self {
42 TraceManager {
43 traces: BTreeMap::new(),
44 metrics,
45 }
46 }
47
48 pub fn maintenance(&mut self) {
56 let start = Instant::now();
57 self.metrics.arrangement_maintenance_active_info.set(1);
58
59 let mut antichain = Antichain::new();
60 for bundle in self.traces.values_mut() {
61 bundle.oks.read_upper(&mut antichain);
62 bundle.oks.set_physical_compaction(antichain.borrow());
63 bundle.errs.read_upper(&mut antichain);
64 bundle.errs.set_physical_compaction(antichain.borrow());
65 }
66
67 let duration = start.elapsed().as_secs_f64();
68 self.metrics
69 .arrangement_maintenance_seconds_total
70 .inc_by(duration);
71 self.metrics.arrangement_maintenance_active_info.set(0);
72 }
73
74 pub fn allow_compaction(&mut self, id: GlobalId, frontier: AntichainRef<Timestamp>) {
81 if let Some(bundle) = self.traces.get_mut(&id) {
82 bundle.oks.set_logical_compaction(frontier);
83 bundle.errs.set_logical_compaction(frontier);
84 }
85 }
86
87 pub fn get(&self, id: &GlobalId) -> Option<&TraceBundle> {
89 self.traces.get(id)
90 }
91
92 pub fn get_mut(&mut self, id: &GlobalId) -> Option<&mut TraceBundle> {
95 self.traces.get_mut(id)
96 }
97
98 pub fn set(&mut self, id: GlobalId, trace: TraceBundle) {
100 self.traces.insert(id, trace);
101 }
102
103 pub fn remove(&mut self, id: &GlobalId) -> Option<TraceBundle> {
105 self.traces.remove(id)
106 }
107}
108
109#[derive(Clone)]
118pub struct PaddedTrace<Tr>
119where
120 Tr: TraceReader,
121{
122 trace: Tr,
124 padded_since: Option<Antichain<Tr::Time>>,
132}
133
134impl<Tr> From<Tr> for PaddedTrace<Tr>
135where
136 Tr: TraceReader,
137{
138 fn from(trace: Tr) -> Self {
139 Self {
140 trace,
141 padded_since: None,
142 }
143 }
144}
145
146impl<Tr> PaddedTrace<Tr>
147where
148 Tr: TraceReader,
149{
150 fn into_padded(mut self) -> Self {
153 let trace_since = self.trace.get_logical_compaction();
154 let minimum_frontier = Antichain::from_elem(Tr::Time::minimum());
155 if PartialOrder::less_than(&minimum_frontier.borrow(), &trace_since) {
156 self.padded_since = Some(minimum_frontier);
157 }
158 self
159 }
160}
161
162impl<Tr: TraceReader> WithLayout for PaddedTrace<Tr> {
163 type Layout = Tr::Layout;
164}
165
166impl<Tr> TraceReader for PaddedTrace<Tr>
167where
168 Tr: TraceReader,
169{
170 type Batch = Tr::Batch;
171 type Storage = Tr::Storage;
172 type Cursor = Tr::Cursor;
173
174 fn cursor_through(
175 &mut self,
176 upper: AntichainRef<Self::Time>,
177 ) -> Option<(Self::Cursor, Self::Storage)> {
178 self.trace.cursor_through(upper)
179 }
180
181 fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>) {
182 let Some(padded_since) = &mut self.padded_since else {
183 self.trace.set_logical_compaction(frontier);
184 return;
185 };
186
187 let trace_since = self.trace.get_logical_compaction();
191 if PartialOrder::less_than(&frontier, &trace_since) {
192 if PartialOrder::less_than(&padded_since.borrow(), &frontier) {
193 *padded_since = frontier.to_owned();
194 }
195 } else {
196 self.padded_since = None;
197 self.trace.set_logical_compaction(frontier);
198 }
199 }
200
201 fn get_logical_compaction(&mut self) -> AntichainRef<'_, Self::Time> {
202 match &self.padded_since {
203 Some(since) => since.borrow(),
204 None => self.trace.get_logical_compaction(),
205 }
206 }
207
208 fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>) {
209 self.trace.set_physical_compaction(frontier);
210 }
211
212 fn get_physical_compaction(&mut self) -> AntichainRef<'_, Self::Time> {
213 self.trace.get_logical_compaction()
214 }
215
216 fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
217 self.trace.map_batches(f)
218 }
219}
220
221impl<Tr> PaddedTrace<TraceAgent<Tr>>
222where
223 Tr: TraceReader<Time = Timestamp> + 'static,
224{
225 pub fn import_frontier_core<G>(
227 &mut self,
228 scope: &G,
229 name: &str,
230 since: Antichain<Tr::Time>,
231 until: Antichain<Tr::Time>,
232 ) -> (
233 Arranged<G, TraceFrontier<TraceAgent<Tr>>>,
234 ShutdownButton<CapabilitySet<Tr::Time>>,
235 )
236 where
237 G: Scope<Timestamp = Tr::Time>,
238 {
239 self.trace.import_frontier_core(scope, name, since, until)
240 }
241}
242
243#[derive(Clone)]
247pub struct TraceBundle {
248 oks: PaddedTrace<RowRowAgent<Timestamp, Diff>>,
249 errs: PaddedTrace<ErrAgent<Timestamp, Diff>>,
250 to_drop: Option<Rc<dyn Any>>,
251}
252
253impl TraceBundle {
254 pub fn new<O, E>(oks: O, errs: E) -> TraceBundle
256 where
257 O: Into<PaddedTrace<RowRowAgent<Timestamp, Diff>>>,
258 E: Into<PaddedTrace<ErrAgent<Timestamp, Diff>>>,
259 {
260 TraceBundle {
261 oks: oks.into(),
262 errs: errs.into(),
263 to_drop: None,
264 }
265 }
266
267 pub fn with_drop<T>(self, to_drop: T) -> TraceBundle
269 where
270 T: 'static,
271 {
272 TraceBundle {
273 to_drop: Some(Rc::new(Box::new(to_drop))),
274 ..self
275 }
276 }
277
278 pub fn oks_mut(&mut self) -> &mut PaddedTrace<RowRowAgent<Timestamp, Diff>> {
280 &mut self.oks
281 }
282
283 pub fn errs_mut(&mut self) -> &mut PaddedTrace<ErrAgent<Timestamp, Diff>> {
285 &mut self.errs
286 }
287
288 pub fn to_drop(&self) -> &Option<Rc<dyn Any>> {
290 &self.to_drop
291 }
292
293 pub fn compaction_frontier(&mut self) -> Antichain<Timestamp> {
295 antichain_join(
296 &self.oks.get_logical_compaction(),
297 &self.errs.get_logical_compaction(),
298 )
299 }
300
301 pub fn into_padded(self) -> Self {
308 Self {
309 oks: self.oks.into_padded(),
310 errs: self.errs.into_padded(),
311 to_drop: self.to_drop,
312 }
313 }
314}