mz_compute/arrangement/
manager.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of arrangements across dataflows.
11
12use std::any::Any;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Instant;
16
17use differential_dataflow::lattice::antichain_join;
18use differential_dataflow::operators::arrange::{Arranged, ShutdownButton, TraceAgent};
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
21use mz_repr::{Diff, GlobalId, Timestamp};
22use timely::PartialOrder;
23use timely::dataflow::Scope;
24use timely::dataflow::operators::CapabilitySet;
25use timely::progress::frontier::{Antichain, AntichainRef};
26
27use crate::metrics::WorkerMetrics;
28use crate::typedefs::{ErrAgent, RowRowAgent};
29
30/// A `TraceManager` stores maps from global identifiers to the primary arranged
31/// representation of that collection.
32pub struct TraceManager {
33    pub(crate) traces: BTreeMap<GlobalId, TraceBundle>,
34    metrics: WorkerMetrics,
35}
36
37impl TraceManager {
38    /// TODO(undocumented)
39    pub fn new(metrics: WorkerMetrics) -> Self {
40        TraceManager {
41            traces: BTreeMap::new(),
42            metrics,
43        }
44    }
45
46    /// performs maintenance work on the managed traces.
47    ///
48    /// In particular, this method enables the physical merging of batches, so that at most a logarithmic
49    /// number of batches need to be maintained. Any new batches introduced after this method is called
50    /// will not be physically merged until the method is called again. This is mostly due to limitations
51    /// of differential dataflow, which requires users to perform this explicitly; if that changes we may
52    /// be able to remove this code.
53    pub fn maintenance(&mut self) {
54        let start = Instant::now();
55        self.metrics.arrangement_maintenance_active_info.set(1);
56
57        let mut antichain = Antichain::new();
58        for bundle in self.traces.values_mut() {
59            bundle.oks.read_upper(&mut antichain);
60            bundle.oks.set_physical_compaction(antichain.borrow());
61            bundle.errs.read_upper(&mut antichain);
62            bundle.errs.set_physical_compaction(antichain.borrow());
63        }
64
65        let duration = start.elapsed().as_secs_f64();
66        self.metrics
67            .arrangement_maintenance_seconds_total
68            .inc_by(duration);
69        self.metrics.arrangement_maintenance_active_info.set(0);
70    }
71
72    /// Enables compaction of traces associated with the identifier.
73    ///
74    /// Compaction may not occur immediately, but once this method is called the
75    /// associated traces may not accumulate to the correct quantities for times
76    /// not in advance of `frontier`. Users should take care to only rely on
77    /// accumulations at times in advance of `frontier`.
78    pub fn allow_compaction(&mut self, id: GlobalId, frontier: AntichainRef<Timestamp>) {
79        if let Some(bundle) = self.traces.get_mut(&id) {
80            bundle.oks.set_logical_compaction(frontier);
81            bundle.errs.set_logical_compaction(frontier);
82        }
83    }
84
85    /// Returns a reference to the trace for `id`, should it exist.
86    pub fn get(&self, id: &GlobalId) -> Option<&TraceBundle> {
87        self.traces.get(id)
88    }
89
90    /// Returns a mutable reference to the trace for `id`, should it
91    /// exist.
92    pub fn get_mut(&mut self, id: &GlobalId) -> Option<&mut TraceBundle> {
93        self.traces.get_mut(id)
94    }
95
96    /// Binds the arrangement for `id` to `trace`.
97    pub fn set(&mut self, id: GlobalId, trace: TraceBundle) {
98        self.traces.insert(id, trace);
99    }
100
101    /// Removes the trace for `id`.
102    pub fn remove(&mut self, id: &GlobalId) -> Option<TraceBundle> {
103        self.traces.remove(id)
104    }
105}
106
107/// Handle to a trace that can be padded.
108///
109/// A padded trace contains empty data for all times greater than or equal to its `padded_since`
110/// and less than the logical compaction frontier of the inner `trace`.
111///
112/// This type is intentionally limited to only work with `mz_repr::Timestamp` times, because that
113/// is all that's required by `TraceManager`. It can be made to be more generic, at the cost of
114/// more complicated reasoning about the correct management of the involved frontiers.
115#[derive(Clone)]
116pub struct PaddedTrace<Tr>
117where
118    Tr: TraceReader<Time = Timestamp>,
119{
120    /// The wrapped trace.
121    trace: Tr,
122    /// The frontier from which the trace is padded, or `None` if it is not padded.
123    ///
124    /// Invariant: The contained frontier is less than the logical compaction frontier of `trace`.
125    ///
126    /// All methods of `PaddedTrace` are written to uphold this invariant. In particular,
127    /// `set_logical_compaction_frontier`  sets the `padded_since` to `None` if the new compaction
128    /// frontier is >= the previous compaction frontier of `trace`.
129    padded_since: Option<Antichain<Timestamp>>,
130}
131
132impl<Tr> From<Tr> for PaddedTrace<Tr>
133where
134    Tr: TraceReader<Time = Timestamp>,
135{
136    fn from(trace: Tr) -> Self {
137        Self {
138            trace,
139            padded_since: None,
140        }
141    }
142}
143
144impl<Tr> PaddedTrace<Tr>
145where
146    Tr: TraceReader<Time = Timestamp>,
147{
148    /// Turns this trace into a padded version that reports empty data for all times less than the
149    /// trace's current logical compaction frontier.
150    fn into_padded(mut self) -> Self {
151        let trace_since = self.trace.get_logical_compaction();
152        let minimum_frontier = Antichain::from_elem(Timestamp::MIN);
153        if PartialOrder::less_than(&minimum_frontier.borrow(), &trace_since) {
154            self.padded_since = Some(minimum_frontier);
155        }
156        self
157    }
158}
159
160impl<Tr> TraceReader for PaddedTrace<Tr>
161where
162    Tr: TraceReader<Time = Timestamp>,
163{
164    type Key<'a> = Tr::Key<'a>;
165    type Val<'a> = Tr::Val<'a>;
166    type Time = Tr::Time;
167    type TimeGat<'a> = Tr::TimeGat<'a>;
168    type Diff = Tr::Diff;
169    type DiffGat<'a> = Tr::DiffGat<'a>;
170    type Batch = Tr::Batch;
171    type Storage = Tr::Storage;
172    type Cursor = Tr::Cursor;
173
174    fn cursor_through(
175        &mut self,
176        upper: AntichainRef<Self::Time>,
177    ) -> Option<(Self::Cursor, Self::Storage)> {
178        self.trace.cursor_through(upper)
179    }
180
181    fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>) {
182        let Some(padded_since) = &mut self.padded_since else {
183            self.trace.set_logical_compaction(frontier);
184            return;
185        };
186
187        // If a padded trace is compacted to some frontier less than the inner trace's compaction
188        // frontier, advance the `padded_since`. Otherwise discard the padding and apply the
189        // compaction to the inner trace instead.
190        let trace_since = self.trace.get_logical_compaction();
191        if PartialOrder::less_than(&frontier, &trace_since) {
192            if PartialOrder::less_than(&padded_since.borrow(), &frontier) {
193                *padded_since = frontier.to_owned();
194            }
195        } else {
196            self.padded_since = None;
197            self.trace.set_logical_compaction(frontier);
198        }
199    }
200
201    fn get_logical_compaction(&mut self) -> AntichainRef<Self::Time> {
202        match &self.padded_since {
203            Some(since) => since.borrow(),
204            None => self.trace.get_logical_compaction(),
205        }
206    }
207
208    fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>) {
209        self.trace.set_physical_compaction(frontier);
210    }
211
212    fn get_physical_compaction(&mut self) -> AntichainRef<Self::Time> {
213        self.trace.get_logical_compaction()
214    }
215
216    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
217        self.trace.map_batches(f)
218    }
219}
220
221impl<Tr> PaddedTrace<TraceAgent<Tr>>
222where
223    Tr: TraceReader<Time = Timestamp> + 'static,
224{
225    /// Import a trace restricted to a specific time interval `[since, until)`.
226    pub fn import_frontier_core<G>(
227        &mut self,
228        scope: &G,
229        name: &str,
230        since: Antichain<Tr::Time>,
231        until: Antichain<Tr::Time>,
232    ) -> (
233        Arranged<G, TraceFrontier<TraceAgent<Tr>>>,
234        ShutdownButton<CapabilitySet<Tr::Time>>,
235    )
236    where
237        G: Scope<Timestamp = Tr::Time>,
238    {
239        self.trace.import_frontier_core(scope, name, since, until)
240    }
241}
242
243/// Bundles together traces for the successful computations (`oks`), the
244/// failed computations (`errs`), additional tokens that should share
245/// the lifetime of the bundled traces (`to_drop`).
246#[derive(Clone)]
247pub struct TraceBundle {
248    oks: PaddedTrace<RowRowAgent<Timestamp, Diff>>,
249    errs: PaddedTrace<ErrAgent<Timestamp, Diff>>,
250    to_drop: Option<Rc<dyn Any>>,
251}
252
253impl TraceBundle {
254    /// Constructs a new trace bundle out of an `oks` trace and `errs` trace.
255    pub fn new<O, E>(oks: O, errs: E) -> TraceBundle
256    where
257        O: Into<PaddedTrace<RowRowAgent<Timestamp, Diff>>>,
258        E: Into<PaddedTrace<ErrAgent<Timestamp, Diff>>>,
259    {
260        TraceBundle {
261            oks: oks.into(),
262            errs: errs.into(),
263            to_drop: None,
264        }
265    }
266
267    /// Adds tokens to be dropped when the trace bundle is dropped.
268    pub fn with_drop<T>(self, to_drop: T) -> TraceBundle
269    where
270        T: 'static,
271    {
272        TraceBundle {
273            to_drop: Some(Rc::new(Box::new(to_drop))),
274            ..self
275        }
276    }
277
278    /// Returns a mutable reference to the `oks` trace.
279    pub fn oks_mut(&mut self) -> &mut PaddedTrace<RowRowAgent<Timestamp, Diff>> {
280        &mut self.oks
281    }
282
283    /// Returns a mutable reference to the `errs` trace.
284    pub fn errs_mut(&mut self) -> &mut PaddedTrace<ErrAgent<Timestamp, Diff>> {
285        &mut self.errs
286    }
287
288    /// Returns a reference to the `to_drop` tokens.
289    pub fn to_drop(&self) -> &Option<Rc<dyn Any>> {
290        &self.to_drop
291    }
292
293    /// Returns the frontier up to which the traces have been allowed to compact.
294    pub fn compaction_frontier(&mut self) -> Antichain<Timestamp> {
295        antichain_join(
296            &self.oks.get_logical_compaction(),
297            &self.errs.get_logical_compaction(),
298        )
299    }
300
301    /// Turns this trace bundle into a padded version that reports empty data for all times less
302    /// than the traces' current logical compaction frontier.
303    ///
304    /// Note that the padded bundle represents a different TVC than the original one, it is unsound
305    /// to use it to "uncompact" an existing TVC. The only valid use of the padded bundle is to
306    /// initializa a new TVC.
307    pub fn into_padded(self) -> Self {
308        Self {
309            oks: self.oks.into_padded(),
310            errs: self.errs.into_padded(),
311            to_drop: self.to_drop,
312        }
313    }
314}