mz_compute_client/protocol/
history.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A reducible history of compute commands.
11
12use std::borrow::Borrow;
13use std::collections::{BTreeMap, BTreeSet};
14
15use mz_ore::cast::CastFrom;
16use mz_ore::metrics::UIntGauge;
17use mz_ore::{assert_none, soft_assert_or_log};
18use timely::PartialOrder;
19use timely::progress::Antichain;
20
21use crate::controller::StorageCollections;
22use crate::metrics::HistoryMetrics;
23use crate::protocol::command::{ComputeCommand, ComputeParameters};
24
25/// TODO(database-issues#7533): Add documentation.
26#[derive(Debug)]
27pub struct ComputeCommandHistory<M, T = mz_repr::Timestamp> {
28    /// The number of commands at the last time we compacted the history.
29    reduced_count: usize,
30    /// The sequence of commands that should be applied.
31    ///
32    /// This list may not be "compact" in that there can be commands that could be optimized
33    /// or removed given the context of other commands, for example compaction commands that
34    /// can be unified, or dataflows that can be dropped due to allowed compaction.
35    commands: Vec<ComputeCommand<T>>,
36    /// Tracked metrics.
37    metrics: HistoryMetrics<M>,
38}
39
40impl<M, T> ComputeCommandHistory<M, T>
41where
42    M: Borrow<UIntGauge>,
43    T: timely::progress::Timestamp,
44{
45    /// TODO(database-issues#7533): Add documentation.
46    pub fn new(metrics: HistoryMetrics<M>) -> Self {
47        metrics.reset();
48
49        Self {
50            reduced_count: 0,
51            commands: Vec::new(),
52            metrics,
53        }
54    }
55
56    /// Add a command to the history.
57    ///
58    /// This action will reduce the history every time it doubles.
59    pub fn push(&mut self, command: ComputeCommand<T>) {
60        self.commands.push(command);
61
62        if self.commands.len() > 2 * self.reduced_count {
63            self.reduce();
64        } else {
65            // Refresh reported metrics. `reduce` already refreshes metrics, so we only need to do
66            // that here in the non-reduce case.
67            let command = self.commands.last().expect("pushed above");
68            self.metrics
69                .command_counts
70                .for_command(command)
71                .borrow()
72                .inc();
73            if matches!(command, ComputeCommand::CreateDataflow(_)) {
74                self.metrics.dataflow_count.borrow().inc();
75            }
76        }
77    }
78
79    /// Reduces `self.history` to a minimal form.
80    ///
81    /// This action not only simplifies the issued history, but importantly reduces the instructions
82    /// to only reference inputs from times that are still certain to be valid. Commands that allow
83    /// compaction of a collection also remove certainty that the inputs will be available for times
84    /// not greater or equal to that compaction frontier.
85    pub fn reduce(&mut self) {
86        // First determine what the final compacted frontiers will be for each collection.
87        // These will determine for each collection whether the command that creates it is required,
88        // and if required what `as_of` frontier should be used for its updated command.
89        let mut final_frontiers = BTreeMap::new();
90        let mut created_dataflows = Vec::new();
91        let mut scheduled_collections = Vec::new();
92        let mut live_peeks = BTreeMap::new();
93
94        let mut create_inst_command = None;
95        let mut create_timely_command = None;
96
97        // Collect only the final configuration.
98        // Note that this is only correct as long as all config parameters apply globally. If we
99        // ever introduce parameters that only affect subsequent commands, we will have to
100        // reconsider this approach.
101        let mut final_configuration = ComputeParameters::default();
102
103        let mut initialization_complete = false;
104        let mut allow_writes = false;
105
106        for command in self.commands.drain(..) {
107            match command {
108                create_timely @ ComputeCommand::CreateTimely { .. } => {
109                    assert_none!(create_timely_command);
110                    create_timely_command = Some(create_timely);
111                }
112                // We should be able to handle the Create* commands, should this client need to be restartable.
113                create_inst @ ComputeCommand::CreateInstance(_) => {
114                    assert_none!(create_inst_command);
115                    create_inst_command = Some(create_inst);
116                }
117                ComputeCommand::InitializationComplete => {
118                    initialization_complete = true;
119                }
120                ComputeCommand::UpdateConfiguration(params) => {
121                    final_configuration.update(*params);
122                }
123                ComputeCommand::CreateDataflow(dataflow) => {
124                    created_dataflows.push(dataflow);
125                }
126                ComputeCommand::Schedule(id) => {
127                    scheduled_collections.push(id);
128                }
129                ComputeCommand::AllowCompaction { id, frontier } => {
130                    final_frontiers.insert(id, frontier.clone());
131                }
132                ComputeCommand::Peek(peek) => {
133                    live_peeks.insert(peek.uuid, peek);
134                }
135                ComputeCommand::CancelPeek { uuid } => {
136                    live_peeks.remove(&uuid);
137                }
138                ComputeCommand::AllowWrites => {
139                    allow_writes = true;
140                }
141            }
142        }
143
144        // Update dataflow `as_of` frontiers according to allowed compaction.
145        // One possible frontier is the empty frontier, indicating that the dataflow can be removed.
146        for dataflow in created_dataflows.iter_mut() {
147            let mut as_of = Antichain::new();
148            let initial_as_of = dataflow.as_of.as_ref().unwrap();
149            for id in dataflow.export_ids() {
150                // If compaction has been allowed use that; otherwise use the initial `as_of`.
151                if let Some(frontier) = final_frontiers.get(&id) {
152                    as_of.extend(frontier.clone());
153                } else {
154                    as_of.extend(initial_as_of.clone());
155                }
156            }
157
158            soft_assert_or_log!(
159                PartialOrder::less_equal(initial_as_of, &as_of),
160                "dataflow as-of regression: {:?} -> {:?} (exports={})",
161                initial_as_of.elements(),
162                as_of.elements(),
163                dataflow.display_export_ids(),
164            );
165
166            // Remove compaction for any collection that brought us to `as_of`.
167            for id in dataflow.export_ids() {
168                if let Some(frontier) = final_frontiers.get(&id) {
169                    if frontier == &as_of {
170                        final_frontiers.remove(&id);
171                    }
172                }
173            }
174
175            dataflow.as_of = Some(as_of);
176        }
177
178        // Discard dataflows whose outputs have all been allowed to compact away.
179        created_dataflows.retain(|dataflow| dataflow.as_of != Some(Antichain::new()));
180        let retained_collections: BTreeSet<_> = created_dataflows
181            .iter()
182            .flat_map(|d| d.export_ids())
183            .collect();
184        scheduled_collections.retain(|id| retained_collections.contains(id));
185
186        // Reconstitute the commands as a compact history.
187
188        // When we update `metrics`, we need to be careful to not transiently report incorrect
189        // counts, as they would be observable by other threads.
190        let command_counts = &self.metrics.command_counts;
191        let dataflow_count = &self.metrics.dataflow_count;
192
193        let count = u64::from(create_timely_command.is_some());
194        command_counts.create_timely.borrow().set(count);
195        if let Some(create_timely_command) = create_timely_command {
196            self.commands.push(create_timely_command);
197        }
198
199        let count = u64::from(create_inst_command.is_some());
200        command_counts.create_instance.borrow().set(count);
201        if let Some(create_inst_command) = create_inst_command {
202            self.commands.push(create_inst_command);
203        }
204
205        let count = u64::from(!final_configuration.all_unset());
206        command_counts.update_configuration.borrow().set(count);
207        if !final_configuration.all_unset() {
208            let config = Box::new(final_configuration);
209            self.commands
210                .push(ComputeCommand::UpdateConfiguration(config));
211        }
212
213        let count = u64::cast_from(created_dataflows.len());
214        command_counts.create_dataflow.borrow().set(count);
215        dataflow_count.borrow().set(count);
216        for dataflow in created_dataflows {
217            self.commands.push(ComputeCommand::CreateDataflow(dataflow));
218        }
219
220        let count = u64::cast_from(scheduled_collections.len());
221        command_counts.schedule.borrow().set(count);
222        for id in scheduled_collections {
223            self.commands.push(ComputeCommand::Schedule(id));
224        }
225
226        let count = u64::cast_from(live_peeks.len());
227        command_counts.peek.borrow().set(count);
228        for peek in live_peeks.into_values() {
229            self.commands.push(ComputeCommand::Peek(peek));
230        }
231
232        command_counts.cancel_peek.borrow().set(0);
233
234        // Allow compaction only after emitting peek commands.
235        let count = u64::cast_from(final_frontiers.len());
236        command_counts.allow_compaction.borrow().set(count);
237        for (id, frontier) in final_frontiers {
238            self.commands
239                .push(ComputeCommand::AllowCompaction { id, frontier });
240        }
241
242        let count = u64::from(initialization_complete);
243        command_counts.initialization_complete.borrow().set(count);
244        if initialization_complete {
245            self.commands.push(ComputeCommand::InitializationComplete);
246        }
247
248        if allow_writes {
249            self.commands.push(ComputeCommand::AllowWrites);
250        }
251
252        self.reduced_count = self.commands.len();
253    }
254
255    /// Discard all peek commands.
256    pub fn discard_peeks(&mut self) {
257        self.commands.retain(|command| {
258            use ComputeCommand::*;
259            let is_peek = matches!(command, Peek(_) | CancelPeek { .. });
260            if is_peek {
261                self.metrics
262                    .command_counts
263                    .for_command(command)
264                    .borrow()
265                    .dec();
266            }
267            !is_peek
268        });
269    }
270
271    /// Update the source import uppers to reflect the current state of the imported collections.
272    ///
273    /// This method should be called after compacting the history to make sure that the dataflow
274    /// descriptions do not mention storage collections that don't exist anymore. Its main
275    /// purpose is to advance the uppers when connecting a new replica.
276    pub fn update_source_uppers(&mut self, storage_collections: &StorageCollections<T>) {
277        for command in &mut self.commands {
278            if let ComputeCommand::CreateDataflow(dataflow) = command {
279                for (id, (_, _, upper)) in dataflow.source_imports.iter_mut() {
280                    let frontiers = storage_collections
281                        .collection_frontiers(*id)
282                        .expect("collection exists");
283
284                    *upper = frontiers.write_frontier;
285                }
286            }
287        }
288    }
289
290    /// Iterate through the contained commands.
291    pub fn iter(&self) -> impl Iterator<Item = &ComputeCommand<T>> {
292        self.commands.iter()
293    }
294}