mz_compute/arrangement/
manager.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of arrangements across dataflows.
11
12use std::any::Any;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Instant;
16
17use differential_dataflow::lattice::antichain_join;
18use differential_dataflow::operators::arrange::{Arranged, ShutdownButton, TraceAgent};
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::trace::implementations::WithLayout;
21use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
22use mz_repr::{Diff, GlobalId, Timestamp};
23use timely::PartialOrder;
24use timely::dataflow::Scope;
25use timely::dataflow::operators::CapabilitySet;
26use timely::progress::Timestamp as _;
27use timely::progress::frontier::{Antichain, AntichainRef};
28
29use crate::metrics::WorkerMetrics;
30use crate::typedefs::{ErrAgent, RowRowAgent};
31
32/// A `TraceManager` stores maps from global identifiers to the primary arranged
33/// representation of that collection.
34pub struct TraceManager {
35    pub(crate) traces: BTreeMap<GlobalId, TraceBundle>,
36    metrics: WorkerMetrics,
37}
38
39impl TraceManager {
40    /// TODO(undocumented)
41    pub fn new(metrics: WorkerMetrics) -> Self {
42        TraceManager {
43            traces: BTreeMap::new(),
44            metrics,
45        }
46    }
47
48    /// performs maintenance work on the managed traces.
49    ///
50    /// In particular, this method enables the physical merging of batches, so that at most a logarithmic
51    /// number of batches need to be maintained. Any new batches introduced after this method is called
52    /// will not be physically merged until the method is called again. This is mostly due to limitations
53    /// of differential dataflow, which requires users to perform this explicitly; if that changes we may
54    /// be able to remove this code.
55    pub fn maintenance(&mut self) {
56        let start = Instant::now();
57        self.metrics.arrangement_maintenance_active_info.set(1);
58
59        let mut antichain = Antichain::new();
60        for bundle in self.traces.values_mut() {
61            bundle.oks.read_upper(&mut antichain);
62            bundle.oks.set_physical_compaction(antichain.borrow());
63            bundle.errs.read_upper(&mut antichain);
64            bundle.errs.set_physical_compaction(antichain.borrow());
65        }
66
67        let duration = start.elapsed().as_secs_f64();
68        self.metrics
69            .arrangement_maintenance_seconds_total
70            .inc_by(duration);
71        self.metrics.arrangement_maintenance_active_info.set(0);
72    }
73
74    /// Enables compaction of traces associated with the identifier.
75    ///
76    /// Compaction may not occur immediately, but once this method is called the
77    /// associated traces may not accumulate to the correct quantities for times
78    /// not in advance of `frontier`. Users should take care to only rely on
79    /// accumulations at times in advance of `frontier`.
80    pub fn allow_compaction(&mut self, id: GlobalId, frontier: AntichainRef<Timestamp>) {
81        if let Some(bundle) = self.traces.get_mut(&id) {
82            bundle.oks.set_logical_compaction(frontier);
83            bundle.errs.set_logical_compaction(frontier);
84        }
85    }
86
87    /// Returns a reference to the trace for `id`, should it exist.
88    pub fn get(&self, id: &GlobalId) -> Option<&TraceBundle> {
89        self.traces.get(id)
90    }
91
92    /// Returns a mutable reference to the trace for `id`, should it
93    /// exist.
94    pub fn get_mut(&mut self, id: &GlobalId) -> Option<&mut TraceBundle> {
95        self.traces.get_mut(id)
96    }
97
98    /// Binds the arrangement for `id` to `trace`.
99    pub fn set(&mut self, id: GlobalId, trace: TraceBundle) {
100        self.traces.insert(id, trace);
101    }
102
103    /// Removes the trace for `id`.
104    pub fn remove(&mut self, id: &GlobalId) -> Option<TraceBundle> {
105        self.traces.remove(id)
106    }
107}
108
109/// Handle to a trace that can be padded.
110///
111/// A padded trace contains empty data for all times greater than or equal to its `padded_since`
112/// and less than the logical compaction frontier of the inner `trace`.
113///
114/// This type is intentionally limited to only work with `mz_repr::Timestamp` times, because that
115/// is all that's required by `TraceManager`. It can be made to be more generic, at the cost of
116/// more complicated reasoning about the correct management of the involved frontiers.
117#[derive(Clone)]
118pub struct PaddedTrace<Tr>
119where
120    Tr: TraceReader,
121{
122    /// The wrapped trace.
123    trace: Tr,
124    /// The frontier from which the trace is padded, or `None` if it is not padded.
125    ///
126    /// Invariant: The contained frontier is less than the logical compaction frontier of `trace`.
127    ///
128    /// All methods of `PaddedTrace` are written to uphold this invariant. In particular,
129    /// `set_logical_compaction_frontier`  sets the `padded_since` to `None` if the new compaction
130    /// frontier is >= the previous compaction frontier of `trace`.
131    padded_since: Option<Antichain<Tr::Time>>,
132}
133
134impl<Tr> From<Tr> for PaddedTrace<Tr>
135where
136    Tr: TraceReader,
137{
138    fn from(trace: Tr) -> Self {
139        Self {
140            trace,
141            padded_since: None,
142        }
143    }
144}
145
146impl<Tr> PaddedTrace<Tr>
147where
148    Tr: TraceReader,
149{
150    /// Turns this trace into a padded version that reports empty data for all times less than the
151    /// trace's current logical compaction frontier.
152    fn into_padded(mut self) -> Self {
153        let trace_since = self.trace.get_logical_compaction();
154        let minimum_frontier = Antichain::from_elem(Tr::Time::minimum());
155        if PartialOrder::less_than(&minimum_frontier.borrow(), &trace_since) {
156            self.padded_since = Some(minimum_frontier);
157        }
158        self
159    }
160}
161
162impl<Tr: TraceReader> WithLayout for PaddedTrace<Tr> {
163    type Layout = Tr::Layout;
164}
165
166impl<Tr> TraceReader for PaddedTrace<Tr>
167where
168    Tr: TraceReader,
169{
170    type Batch = Tr::Batch;
171    type Storage = Tr::Storage;
172    type Cursor = Tr::Cursor;
173
174    fn cursor_through(
175        &mut self,
176        upper: AntichainRef<Self::Time>,
177    ) -> Option<(Self::Cursor, Self::Storage)> {
178        self.trace.cursor_through(upper)
179    }
180
181    fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>) {
182        let Some(padded_since) = &mut self.padded_since else {
183            self.trace.set_logical_compaction(frontier);
184            return;
185        };
186
187        // If a padded trace is compacted to some frontier less than the inner trace's compaction
188        // frontier, advance the `padded_since`. Otherwise discard the padding and apply the
189        // compaction to the inner trace instead.
190        let trace_since = self.trace.get_logical_compaction();
191        if PartialOrder::less_than(&frontier, &trace_since) {
192            if PartialOrder::less_than(&padded_since.borrow(), &frontier) {
193                *padded_since = frontier.to_owned();
194            }
195        } else {
196            self.padded_since = None;
197            self.trace.set_logical_compaction(frontier);
198        }
199    }
200
201    fn get_logical_compaction(&mut self) -> AntichainRef<'_, Self::Time> {
202        match &self.padded_since {
203            Some(since) => since.borrow(),
204            None => self.trace.get_logical_compaction(),
205        }
206    }
207
208    fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>) {
209        self.trace.set_physical_compaction(frontier);
210    }
211
212    fn get_physical_compaction(&mut self) -> AntichainRef<'_, Self::Time> {
213        self.trace.get_logical_compaction()
214    }
215
216    fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
217        self.trace.map_batches(f)
218    }
219}
220
221impl<Tr> PaddedTrace<TraceAgent<Tr>>
222where
223    Tr: TraceReader<Time = Timestamp> + 'static,
224{
225    /// Import a trace restricted to a specific time interval `[since, until)`.
226    pub fn import_frontier_core<G>(
227        &mut self,
228        scope: &G,
229        name: &str,
230        since: Antichain<Tr::Time>,
231        until: Antichain<Tr::Time>,
232    ) -> (
233        Arranged<G, TraceFrontier<TraceAgent<Tr>>>,
234        ShutdownButton<CapabilitySet<Tr::Time>>,
235    )
236    where
237        G: Scope<Timestamp = Tr::Time>,
238    {
239        self.trace.import_frontier_core(scope, name, since, until)
240    }
241}
242
243/// Bundles together traces for the successful computations (`oks`), the
244/// failed computations (`errs`), additional tokens that should share
245/// the lifetime of the bundled traces (`to_drop`).
246#[derive(Clone)]
247pub struct TraceBundle {
248    oks: PaddedTrace<RowRowAgent<Timestamp, Diff>>,
249    errs: PaddedTrace<ErrAgent<Timestamp, Diff>>,
250    to_drop: Option<Rc<dyn Any>>,
251}
252
253impl TraceBundle {
254    /// Constructs a new trace bundle out of an `oks` trace and `errs` trace.
255    pub fn new<O, E>(oks: O, errs: E) -> TraceBundle
256    where
257        O: Into<PaddedTrace<RowRowAgent<Timestamp, Diff>>>,
258        E: Into<PaddedTrace<ErrAgent<Timestamp, Diff>>>,
259    {
260        TraceBundle {
261            oks: oks.into(),
262            errs: errs.into(),
263            to_drop: None,
264        }
265    }
266
267    /// Adds tokens to be dropped when the trace bundle is dropped.
268    pub fn with_drop<T>(self, to_drop: T) -> TraceBundle
269    where
270        T: 'static,
271    {
272        TraceBundle {
273            to_drop: Some(Rc::new(Box::new(to_drop))),
274            ..self
275        }
276    }
277
278    /// Returns a mutable reference to the `oks` trace.
279    pub fn oks_mut(&mut self) -> &mut PaddedTrace<RowRowAgent<Timestamp, Diff>> {
280        &mut self.oks
281    }
282
283    /// Returns a mutable reference to the `errs` trace.
284    pub fn errs_mut(&mut self) -> &mut PaddedTrace<ErrAgent<Timestamp, Diff>> {
285        &mut self.errs
286    }
287
288    /// Returns a reference to the `to_drop` tokens.
289    pub fn to_drop(&self) -> &Option<Rc<dyn Any>> {
290        &self.to_drop
291    }
292
293    /// Returns the frontier up to which the traces have been allowed to compact.
294    pub fn compaction_frontier(&mut self) -> Antichain<Timestamp> {
295        antichain_join(
296            &self.oks.get_logical_compaction(),
297            &self.errs.get_logical_compaction(),
298        )
299    }
300
301    /// Turns this trace bundle into a padded version that reports empty data for all times less
302    /// than the traces' current logical compaction frontier.
303    ///
304    /// Note that the padded bundle represents a different TVC than the original one, it is unsound
305    /// to use it to "uncompact" an existing TVC. The only valid use of the padded bundle is to
306    /// initializa a new TVC.
307    pub fn into_padded(self) -> Self {
308        Self {
309            oks: self.oks.into_padded(),
310            errs: self.errs.into_padded(),
311            to_drop: self.to_drop,
312        }
313    }
314}