1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Management of arrangements across dataflows.
1112use std::any::Any;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Instant;
1617use differential_dataflow::lattice::antichain_join;
18use differential_dataflow::operators::arrange::{Arranged, ShutdownButton, TraceAgent};
19use differential_dataflow::trace::TraceReader;
20use differential_dataflow::trace::wrappers::frontier::TraceFrontier;
21use mz_repr::{Diff, GlobalId, Timestamp};
22use timely::PartialOrder;
23use timely::dataflow::Scope;
24use timely::dataflow::operators::CapabilitySet;
25use timely::progress::frontier::{Antichain, AntichainRef};
2627use crate::metrics::WorkerMetrics;
28use crate::typedefs::{ErrAgent, RowRowAgent};
2930/// A `TraceManager` stores maps from global identifiers to the primary arranged
31/// representation of that collection.
32pub struct TraceManager {
33pub(crate) traces: BTreeMap<GlobalId, TraceBundle>,
34 metrics: WorkerMetrics,
35}
3637impl TraceManager {
38/// TODO(undocumented)
39pub fn new(metrics: WorkerMetrics) -> Self {
40 TraceManager {
41 traces: BTreeMap::new(),
42 metrics,
43 }
44 }
4546/// performs maintenance work on the managed traces.
47 ///
48 /// In particular, this method enables the physical merging of batches, so that at most a logarithmic
49 /// number of batches need to be maintained. Any new batches introduced after this method is called
50 /// will not be physically merged until the method is called again. This is mostly due to limitations
51 /// of differential dataflow, which requires users to perform this explicitly; if that changes we may
52 /// be able to remove this code.
53pub fn maintenance(&mut self) {
54let start = Instant::now();
55self.metrics.arrangement_maintenance_active_info.set(1);
5657let mut antichain = Antichain::new();
58for bundle in self.traces.values_mut() {
59 bundle.oks.read_upper(&mut antichain);
60 bundle.oks.set_physical_compaction(antichain.borrow());
61 bundle.errs.read_upper(&mut antichain);
62 bundle.errs.set_physical_compaction(antichain.borrow());
63 }
6465let duration = start.elapsed().as_secs_f64();
66self.metrics
67 .arrangement_maintenance_seconds_total
68 .inc_by(duration);
69self.metrics.arrangement_maintenance_active_info.set(0);
70 }
7172/// Enables compaction of traces associated with the identifier.
73 ///
74 /// Compaction may not occur immediately, but once this method is called the
75 /// associated traces may not accumulate to the correct quantities for times
76 /// not in advance of `frontier`. Users should take care to only rely on
77 /// accumulations at times in advance of `frontier`.
78pub fn allow_compaction(&mut self, id: GlobalId, frontier: AntichainRef<Timestamp>) {
79if let Some(bundle) = self.traces.get_mut(&id) {
80 bundle.oks.set_logical_compaction(frontier);
81 bundle.errs.set_logical_compaction(frontier);
82 }
83 }
8485/// Returns a reference to the trace for `id`, should it exist.
86pub fn get(&self, id: &GlobalId) -> Option<&TraceBundle> {
87self.traces.get(id)
88 }
8990/// Returns a mutable reference to the trace for `id`, should it
91 /// exist.
92pub fn get_mut(&mut self, id: &GlobalId) -> Option<&mut TraceBundle> {
93self.traces.get_mut(id)
94 }
9596/// Binds the arrangement for `id` to `trace`.
97pub fn set(&mut self, id: GlobalId, trace: TraceBundle) {
98self.traces.insert(id, trace);
99 }
100101/// Removes the trace for `id`.
102pub fn remove(&mut self, id: &GlobalId) -> Option<TraceBundle> {
103self.traces.remove(id)
104 }
105}
106107/// Handle to a trace that can be padded.
108///
109/// A padded trace contains empty data for all times greater than or equal to its `padded_since`
110/// and less than the logical compaction frontier of the inner `trace`.
111///
112/// This type is intentionally limited to only work with `mz_repr::Timestamp` times, because that
113/// is all that's required by `TraceManager`. It can be made to be more generic, at the cost of
114/// more complicated reasoning about the correct management of the involved frontiers.
115#[derive(Clone)]
116pub struct PaddedTrace<Tr>
117where
118Tr: TraceReader<Time = Timestamp>,
119{
120/// The wrapped trace.
121trace: Tr,
122/// The frontier from which the trace is padded, or `None` if it is not padded.
123 ///
124 /// Invariant: The contained frontier is less than the logical compaction frontier of `trace`.
125 ///
126 /// All methods of `PaddedTrace` are written to uphold this invariant. In particular,
127 /// `set_logical_compaction_frontier` sets the `padded_since` to `None` if the new compaction
128 /// frontier is >= the previous compaction frontier of `trace`.
129padded_since: Option<Antichain<Timestamp>>,
130}
131132impl<Tr> From<Tr> for PaddedTrace<Tr>
133where
134Tr: TraceReader<Time = Timestamp>,
135{
136fn from(trace: Tr) -> Self {
137Self {
138 trace,
139 padded_since: None,
140 }
141 }
142}
143144impl<Tr> PaddedTrace<Tr>
145where
146Tr: TraceReader<Time = Timestamp>,
147{
148/// Turns this trace into a padded version that reports empty data for all times less than the
149 /// trace's current logical compaction frontier.
150fn into_padded(mut self) -> Self {
151let trace_since = self.trace.get_logical_compaction();
152let minimum_frontier = Antichain::from_elem(Timestamp::MIN);
153if PartialOrder::less_than(&minimum_frontier.borrow(), &trace_since) {
154self.padded_since = Some(minimum_frontier);
155 }
156self
157}
158}
159160impl<Tr> TraceReader for PaddedTrace<Tr>
161where
162Tr: TraceReader<Time = Timestamp>,
163{
164type Key<'a> = Tr::Key<'a>;
165type Val<'a> = Tr::Val<'a>;
166type Time = Tr::Time;
167type TimeGat<'a> = Tr::TimeGat<'a>;
168type Diff = Tr::Diff;
169type DiffGat<'a> = Tr::DiffGat<'a>;
170type Batch = Tr::Batch;
171type Storage = Tr::Storage;
172type Cursor = Tr::Cursor;
173174fn cursor_through(
175&mut self,
176 upper: AntichainRef<Self::Time>,
177 ) -> Option<(Self::Cursor, Self::Storage)> {
178self.trace.cursor_through(upper)
179 }
180181fn set_logical_compaction(&mut self, frontier: AntichainRef<Self::Time>) {
182let Some(padded_since) = &mut self.padded_since else {
183self.trace.set_logical_compaction(frontier);
184return;
185 };
186187// If a padded trace is compacted to some frontier less than the inner trace's compaction
188 // frontier, advance the `padded_since`. Otherwise discard the padding and apply the
189 // compaction to the inner trace instead.
190let trace_since = self.trace.get_logical_compaction();
191if PartialOrder::less_than(&frontier, &trace_since) {
192if PartialOrder::less_than(&padded_since.borrow(), &frontier) {
193*padded_since = frontier.to_owned();
194 }
195 } else {
196self.padded_since = None;
197self.trace.set_logical_compaction(frontier);
198 }
199 }
200201fn get_logical_compaction(&mut self) -> AntichainRef<Self::Time> {
202match &self.padded_since {
203Some(since) => since.borrow(),
204None => self.trace.get_logical_compaction(),
205 }
206 }
207208fn set_physical_compaction(&mut self, frontier: AntichainRef<Self::Time>) {
209self.trace.set_physical_compaction(frontier);
210 }
211212fn get_physical_compaction(&mut self) -> AntichainRef<Self::Time> {
213self.trace.get_logical_compaction()
214 }
215216fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F) {
217self.trace.map_batches(f)
218 }
219}
220221impl<Tr> PaddedTrace<TraceAgent<Tr>>
222where
223Tr: TraceReader<Time = Timestamp> + 'static,
224{
225/// Import a trace restricted to a specific time interval `[since, until)`.
226pub fn import_frontier_core<G>(
227&mut self,
228 scope: &G,
229 name: &str,
230 since: Antichain<Tr::Time>,
231 until: Antichain<Tr::Time>,
232 ) -> (
233 Arranged<G, TraceFrontier<TraceAgent<Tr>>>,
234 ShutdownButton<CapabilitySet<Tr::Time>>,
235 )
236where
237G: Scope<Timestamp = Tr::Time>,
238 {
239self.trace.import_frontier_core(scope, name, since, until)
240 }
241}
242243/// Bundles together traces for the successful computations (`oks`), the
244/// failed computations (`errs`), additional tokens that should share
245/// the lifetime of the bundled traces (`to_drop`).
246#[derive(Clone)]
247pub struct TraceBundle {
248 oks: PaddedTrace<RowRowAgent<Timestamp, Diff>>,
249 errs: PaddedTrace<ErrAgent<Timestamp, Diff>>,
250 to_drop: Option<Rc<dyn Any>>,
251}
252253impl TraceBundle {
254/// Constructs a new trace bundle out of an `oks` trace and `errs` trace.
255pub fn new<O, E>(oks: O, errs: E) -> TraceBundle
256where
257O: Into<PaddedTrace<RowRowAgent<Timestamp, Diff>>>,
258 E: Into<PaddedTrace<ErrAgent<Timestamp, Diff>>>,
259 {
260 TraceBundle {
261 oks: oks.into(),
262 errs: errs.into(),
263 to_drop: None,
264 }
265 }
266267/// Adds tokens to be dropped when the trace bundle is dropped.
268pub fn with_drop<T>(self, to_drop: T) -> TraceBundle
269where
270T: 'static,
271 {
272 TraceBundle {
273 to_drop: Some(Rc::new(Box::new(to_drop))),
274 ..self
275}
276 }
277278/// Returns a mutable reference to the `oks` trace.
279pub fn oks_mut(&mut self) -> &mut PaddedTrace<RowRowAgent<Timestamp, Diff>> {
280&mut self.oks
281 }
282283/// Returns a mutable reference to the `errs` trace.
284pub fn errs_mut(&mut self) -> &mut PaddedTrace<ErrAgent<Timestamp, Diff>> {
285&mut self.errs
286 }
287288/// Returns a reference to the `to_drop` tokens.
289pub fn to_drop(&self) -> &Option<Rc<dyn Any>> {
290&self.to_drop
291 }
292293/// Returns the frontier up to which the traces have been allowed to compact.
294pub fn compaction_frontier(&mut self) -> Antichain<Timestamp> {
295 antichain_join(
296&self.oks.get_logical_compaction(),
297&self.errs.get_logical_compaction(),
298 )
299 }
300301/// Turns this trace bundle into a padded version that reports empty data for all times less
302 /// than the traces' current logical compaction frontier.
303 ///
304 /// Note that the padded bundle represents a different TVC than the original one, it is unsound
305 /// to use it to "uncompact" an existing TVC. The only valid use of the padded bundle is to
306 /// initializa a new TVC.
307pub fn into_padded(self) -> Self {
308Self {
309 oks: self.oks.into_padded(),
310 errs: self.errs.into_padded(),
311 to_drop: self.to_drop,
312 }
313 }
314}