Skip to main content

mz_compute_client/protocol/
history.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A reducible history of compute commands.
11
12use std::borrow::Borrow;
13use std::collections::{BTreeMap, BTreeSet};
14
15use mz_ore::cast::CastFrom;
16use mz_ore::metrics::UIntGauge;
17use mz_ore::{assert_none, soft_assert_or_log};
18use timely::PartialOrder;
19use timely::progress::Antichain;
20
21use crate::controller::StorageCollections;
22use crate::metrics::HistoryMetrics;
23use crate::protocol::command::{ComputeCommand, ComputeParameters};
24
25/// TODO(database-issues#7533): Add documentation.
26#[derive(Debug)]
27pub struct ComputeCommandHistory<M> {
28    /// The number of commands at the last time we compacted the history.
29    reduced_count: usize,
30    /// The sequence of commands that should be applied.
31    ///
32    /// This list may not be "compact" in that there can be commands that could be optimized
33    /// or removed given the context of other commands, for example compaction commands that
34    /// can be unified, or dataflows that can be dropped due to allowed compaction.
35    commands: Vec<ComputeCommand>,
36    /// Tracked metrics.
37    metrics: HistoryMetrics<M>,
38}
39
40impl<M> ComputeCommandHistory<M>
41where
42    M: Borrow<UIntGauge>,
43{
44    /// TODO(database-issues#7533): Add documentation.
45    pub fn new(metrics: HistoryMetrics<M>) -> Self {
46        metrics.reset();
47
48        Self {
49            reduced_count: 0,
50            commands: Vec::new(),
51            metrics,
52        }
53    }
54
55    /// Add a command to the history.
56    ///
57    /// This action will reduce the history every time it doubles.
58    pub fn push(&mut self, command: ComputeCommand) {
59        self.commands.push(command);
60
61        if self.commands.len() > 2 * self.reduced_count {
62            self.reduce();
63        } else {
64            // Refresh reported metrics. `reduce` already refreshes metrics, so we only need to do
65            // that here in the non-reduce case.
66            let command = self.commands.last().expect("pushed above");
67            self.metrics
68                .command_counts
69                .for_command(command)
70                .borrow()
71                .inc();
72            if matches!(command, ComputeCommand::CreateDataflow(_)) {
73                self.metrics.dataflow_count.borrow().inc();
74            }
75        }
76    }
77
78    /// Reduces `self.history` to a minimal form.
79    ///
80    /// This action not only simplifies the issued history, but importantly reduces the instructions
81    /// to only reference inputs from times that are still certain to be valid. Commands that allow
82    /// compaction of a collection also remove certainty that the inputs will be available for times
83    /// not greater or equal to that compaction frontier.
84    pub fn reduce(&mut self) {
85        // First determine what the final compacted frontiers will be for each collection.
86        // These will determine for each collection whether the command that creates it is required,
87        // and if required what `as_of` frontier should be used for its updated command.
88        let mut final_frontiers = BTreeMap::new();
89        let mut created_dataflows = Vec::new();
90        let mut scheduled_collections = Vec::new();
91        let mut live_peeks = BTreeMap::new();
92
93        let mut hello_command = None;
94        let mut create_inst_command = None;
95
96        // Collect only the final configuration.
97        // Note that this is only correct as long as all config parameters apply globally. If we
98        // ever introduce parameters that only affect subsequent commands, we will have to
99        // reconsider this approach.
100        let mut final_configuration = ComputeParameters::default();
101
102        let mut initialization_complete = false;
103        let mut allow_writes = BTreeSet::new();
104
105        for command in self.commands.drain(..) {
106            match command {
107                hello @ ComputeCommand::Hello { .. } => {
108                    assert_none!(hello_command);
109                    hello_command = Some(hello);
110                }
111                // We should be able to handle the Create* commands, should this client need to be restartable.
112                create_inst @ ComputeCommand::CreateInstance(_) => {
113                    assert_none!(create_inst_command);
114                    create_inst_command = Some(create_inst);
115                }
116                ComputeCommand::InitializationComplete => {
117                    initialization_complete = true;
118                }
119                ComputeCommand::UpdateConfiguration(params) => {
120                    final_configuration.update(*params);
121                }
122                ComputeCommand::CreateDataflow(dataflow) => {
123                    created_dataflows.push(dataflow);
124                }
125                ComputeCommand::Schedule(id) => {
126                    scheduled_collections.push(id);
127                }
128                ComputeCommand::AllowCompaction { id, frontier } => {
129                    final_frontiers.insert(id, frontier.clone());
130                }
131                ComputeCommand::Peek(peek) => {
132                    live_peeks.insert(peek.uuid, peek);
133                }
134                ComputeCommand::CancelPeek { uuid } => {
135                    live_peeks.remove(&uuid);
136                }
137                ComputeCommand::AllowWrites(id) => {
138                    allow_writes.insert(id);
139                }
140            }
141        }
142
143        // Update dataflow `as_of` frontiers according to allowed compaction.
144        // One possible frontier is the empty frontier, indicating that the dataflow can be removed.
145        for dataflow in created_dataflows.iter_mut() {
146            let mut as_of = Antichain::new();
147            let initial_as_of = dataflow.as_of.as_ref().unwrap();
148            for id in dataflow.export_ids() {
149                // If compaction has been allowed use that; otherwise use the initial `as_of`.
150                if let Some(frontier) = final_frontiers.get(&id) {
151                    as_of.extend(frontier.clone());
152                } else {
153                    as_of.extend(initial_as_of.clone());
154                }
155            }
156
157            soft_assert_or_log!(
158                PartialOrder::less_equal(initial_as_of, &as_of),
159                "dataflow as-of regression: {:?} -> {:?} (exports={})",
160                initial_as_of.elements(),
161                as_of.elements(),
162                dataflow.display_export_ids(),
163            );
164
165            // Remove compaction for any collection that brought us to `as_of`.
166            for id in dataflow.export_ids() {
167                if let Some(frontier) = final_frontiers.get(&id) {
168                    if frontier == &as_of {
169                        final_frontiers.remove(&id);
170                    }
171                }
172            }
173
174            dataflow.as_of = Some(as_of);
175        }
176
177        // Discard dataflows whose outputs have all been allowed to compact away.
178        created_dataflows.retain(|dataflow| dataflow.as_of != Some(Antichain::new()));
179        let retained_collections: BTreeSet<_> = created_dataflows
180            .iter()
181            .flat_map(|d| d.export_ids())
182            .collect();
183        scheduled_collections.retain(|id| retained_collections.contains(id));
184        // Retain any `AllowWrites` command for dataflows that still exist.
185        allow_writes.retain(|id| retained_collections.contains(id));
186
187        // Reconstitute the commands as a compact history.
188
189        // When we update `metrics`, we need to be careful to not transiently report incorrect
190        // counts, as they would be observable by other threads.
191        let command_counts = &self.metrics.command_counts;
192        let dataflow_count = &self.metrics.dataflow_count;
193
194        let count = u64::from(hello_command.is_some());
195        command_counts.hello.borrow().set(count);
196        if let Some(hello) = hello_command {
197            self.commands.push(hello);
198        }
199
200        let count = u64::from(create_inst_command.is_some());
201        command_counts.create_instance.borrow().set(count);
202        if let Some(create_inst_command) = create_inst_command {
203            self.commands.push(create_inst_command);
204        }
205
206        let count = u64::from(!final_configuration.all_unset());
207        command_counts.update_configuration.borrow().set(count);
208        if !final_configuration.all_unset() {
209            let config = Box::new(final_configuration);
210            self.commands
211                .push(ComputeCommand::UpdateConfiguration(config));
212        }
213
214        let count = u64::cast_from(created_dataflows.len());
215        command_counts.create_dataflow.borrow().set(count);
216        dataflow_count.borrow().set(count);
217        for dataflow in created_dataflows {
218            self.commands.push(ComputeCommand::CreateDataflow(dataflow));
219        }
220
221        let count = u64::cast_from(scheduled_collections.len());
222        command_counts.schedule.borrow().set(count);
223        for id in scheduled_collections {
224            self.commands.push(ComputeCommand::Schedule(id));
225        }
226
227        let count = u64::cast_from(live_peeks.len());
228        command_counts.peek.borrow().set(count);
229        for peek in live_peeks.into_values() {
230            self.commands.push(ComputeCommand::Peek(peek));
231        }
232
233        command_counts.cancel_peek.borrow().set(0);
234
235        // Allow compaction only after emitting peek commands.
236        let count = u64::cast_from(final_frontiers.len());
237        command_counts.allow_compaction.borrow().set(count);
238        for (id, frontier) in final_frontiers {
239            self.commands
240                .push(ComputeCommand::AllowCompaction { id, frontier });
241        }
242
243        let count = u64::cast_from(allow_writes.len());
244        command_counts.allow_writes.borrow().set(count);
245        for id in allow_writes {
246            self.commands.push(ComputeCommand::AllowWrites(id));
247        }
248
249        let count = u64::from(initialization_complete);
250        command_counts.initialization_complete.borrow().set(count);
251        if initialization_complete {
252            self.commands.push(ComputeCommand::InitializationComplete);
253        }
254
255        self.reduced_count = self.commands.len();
256    }
257
258    /// Discard all peek commands.
259    pub fn discard_peeks(&mut self) {
260        self.commands.retain(|command| {
261            use ComputeCommand::*;
262            let is_peek = matches!(command, Peek(_) | CancelPeek { .. });
263            if is_peek {
264                self.metrics
265                    .command_counts
266                    .for_command(command)
267                    .borrow()
268                    .dec();
269            }
270            !is_peek
271        });
272    }
273
274    /// Update the source import uppers to reflect the current state of the imported collections.
275    ///
276    /// This method should be called after compacting the history to make sure that the dataflow
277    /// descriptions do not mention storage collections that don't exist anymore. Its main
278    /// purpose is to advance the uppers when connecting a new replica.
279    pub fn update_source_uppers(&mut self, storage_collections: &StorageCollections) {
280        for command in &mut self.commands {
281            if let ComputeCommand::CreateDataflow(dataflow) = command {
282                for (id, import) in dataflow.source_imports.iter_mut() {
283                    let frontiers = storage_collections
284                        .collection_frontiers(*id)
285                        .expect("collection exists");
286
287                    import.upper = frontiers.write_frontier;
288                }
289            }
290        }
291    }
292
293    /// Iterate through the contained commands.
294    pub fn iter(&self) -> impl Iterator<Item = &ComputeCommand> {
295        self.commands.iter()
296    }
297}