1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! A reducible history of compute commands.
1112use std::borrow::Borrow;
13use std::collections::{BTreeMap, BTreeSet};
1415use mz_ore::cast::CastFrom;
16use mz_ore::metrics::UIntGauge;
17use mz_ore::{assert_none, soft_assert_or_log};
18use timely::PartialOrder;
19use timely::progress::Antichain;
2021use crate::controller::StorageCollections;
22use crate::metrics::HistoryMetrics;
23use crate::protocol::command::{ComputeCommand, ComputeParameters};
2425/// TODO(database-issues#7533): Add documentation.
26#[derive(Debug)]
27pub struct ComputeCommandHistory<M, T = mz_repr::Timestamp> {
28/// The number of commands at the last time we compacted the history.
29reduced_count: usize,
30/// The sequence of commands that should be applied.
31 ///
32 /// This list may not be "compact" in that there can be commands that could be optimized
33 /// or removed given the context of other commands, for example compaction commands that
34 /// can be unified, or dataflows that can be dropped due to allowed compaction.
35commands: Vec<ComputeCommand<T>>,
36/// Tracked metrics.
37metrics: HistoryMetrics<M>,
38}
3940impl<M, T> ComputeCommandHistory<M, T>
41where
42M: Borrow<UIntGauge>,
43 T: timely::progress::Timestamp,
44{
45/// TODO(database-issues#7533): Add documentation.
46pub fn new(metrics: HistoryMetrics<M>) -> Self {
47 metrics.reset();
4849Self {
50 reduced_count: 0,
51 commands: Vec::new(),
52 metrics,
53 }
54 }
5556/// Add a command to the history.
57 ///
58 /// This action will reduce the history every time it doubles.
59pub fn push(&mut self, command: ComputeCommand<T>) {
60self.commands.push(command);
6162if self.commands.len() > 2 * self.reduced_count {
63self.reduce();
64 } else {
65// Refresh reported metrics. `reduce` already refreshes metrics, so we only need to do
66 // that here in the non-reduce case.
67let command = self.commands.last().expect("pushed above");
68self.metrics
69 .command_counts
70 .for_command(command)
71 .borrow()
72 .inc();
73if matches!(command, ComputeCommand::CreateDataflow(_)) {
74self.metrics.dataflow_count.borrow().inc();
75 }
76 }
77 }
7879/// Reduces `self.history` to a minimal form.
80 ///
81 /// This action not only simplifies the issued history, but importantly reduces the instructions
82 /// to only reference inputs from times that are still certain to be valid. Commands that allow
83 /// compaction of a collection also remove certainty that the inputs will be available for times
84 /// not greater or equal to that compaction frontier.
85pub fn reduce(&mut self) {
86// First determine what the final compacted frontiers will be for each collection.
87 // These will determine for each collection whether the command that creates it is required,
88 // and if required what `as_of` frontier should be used for its updated command.
89let mut final_frontiers = BTreeMap::new();
90let mut created_dataflows = Vec::new();
91let mut scheduled_collections = Vec::new();
92let mut live_peeks = BTreeMap::new();
9394let mut create_inst_command = None;
95let mut create_timely_command = None;
9697// Collect only the final configuration.
98 // Note that this is only correct as long as all config parameters apply globally. If we
99 // ever introduce parameters that only affect subsequent commands, we will have to
100 // reconsider this approach.
101let mut final_configuration = ComputeParameters::default();
102103let mut initialization_complete = false;
104let mut allow_writes = false;
105106for command in self.commands.drain(..) {
107match command {
108 create_timely @ ComputeCommand::CreateTimely { .. } => {
109assert_none!(create_timely_command);
110 create_timely_command = Some(create_timely);
111 }
112// We should be able to handle the Create* commands, should this client need to be restartable.
113create_inst @ ComputeCommand::CreateInstance(_) => {
114assert_none!(create_inst_command);
115 create_inst_command = Some(create_inst);
116 }
117 ComputeCommand::InitializationComplete => {
118 initialization_complete = true;
119 }
120 ComputeCommand::UpdateConfiguration(params) => {
121 final_configuration.update(*params);
122 }
123 ComputeCommand::CreateDataflow(dataflow) => {
124 created_dataflows.push(dataflow);
125 }
126 ComputeCommand::Schedule(id) => {
127 scheduled_collections.push(id);
128 }
129 ComputeCommand::AllowCompaction { id, frontier } => {
130 final_frontiers.insert(id, frontier.clone());
131 }
132 ComputeCommand::Peek(peek) => {
133 live_peeks.insert(peek.uuid, peek);
134 }
135 ComputeCommand::CancelPeek { uuid } => {
136 live_peeks.remove(&uuid);
137 }
138 ComputeCommand::AllowWrites => {
139 allow_writes = true;
140 }
141 }
142 }
143144// Update dataflow `as_of` frontiers according to allowed compaction.
145 // One possible frontier is the empty frontier, indicating that the dataflow can be removed.
146for dataflow in created_dataflows.iter_mut() {
147let mut as_of = Antichain::new();
148let initial_as_of = dataflow.as_of.as_ref().unwrap();
149for id in dataflow.export_ids() {
150// If compaction has been allowed use that; otherwise use the initial `as_of`.
151if let Some(frontier) = final_frontiers.get(&id) {
152 as_of.extend(frontier.clone());
153 } else {
154 as_of.extend(initial_as_of.clone());
155 }
156 }
157158soft_assert_or_log!(
159 PartialOrder::less_equal(initial_as_of, &as_of),
160"dataflow as-of regression: {:?} -> {:?} (exports={})",
161 initial_as_of.elements(),
162 as_of.elements(),
163 dataflow.display_export_ids(),
164 );
165166// Remove compaction for any collection that brought us to `as_of`.
167for id in dataflow.export_ids() {
168if let Some(frontier) = final_frontiers.get(&id) {
169if frontier == &as_of {
170 final_frontiers.remove(&id);
171 }
172 }
173 }
174175 dataflow.as_of = Some(as_of);
176 }
177178// Discard dataflows whose outputs have all been allowed to compact away.
179created_dataflows.retain(|dataflow| dataflow.as_of != Some(Antichain::new()));
180let retained_collections: BTreeSet<_> = created_dataflows
181 .iter()
182 .flat_map(|d| d.export_ids())
183 .collect();
184 scheduled_collections.retain(|id| retained_collections.contains(id));
185186// Reconstitute the commands as a compact history.
187188 // When we update `metrics`, we need to be careful to not transiently report incorrect
189 // counts, as they would be observable by other threads.
190let command_counts = &self.metrics.command_counts;
191let dataflow_count = &self.metrics.dataflow_count;
192193let count = u64::from(create_timely_command.is_some());
194 command_counts.create_timely.borrow().set(count);
195if let Some(create_timely_command) = create_timely_command {
196self.commands.push(create_timely_command);
197 }
198199let count = u64::from(create_inst_command.is_some());
200 command_counts.create_instance.borrow().set(count);
201if let Some(create_inst_command) = create_inst_command {
202self.commands.push(create_inst_command);
203 }
204205let count = u64::from(!final_configuration.all_unset());
206 command_counts.update_configuration.borrow().set(count);
207if !final_configuration.all_unset() {
208let config = Box::new(final_configuration);
209self.commands
210 .push(ComputeCommand::UpdateConfiguration(config));
211 }
212213let count = u64::cast_from(created_dataflows.len());
214 command_counts.create_dataflow.borrow().set(count);
215 dataflow_count.borrow().set(count);
216for dataflow in created_dataflows {
217self.commands.push(ComputeCommand::CreateDataflow(dataflow));
218 }
219220let count = u64::cast_from(scheduled_collections.len());
221 command_counts.schedule.borrow().set(count);
222for id in scheduled_collections {
223self.commands.push(ComputeCommand::Schedule(id));
224 }
225226let count = u64::cast_from(live_peeks.len());
227 command_counts.peek.borrow().set(count);
228for peek in live_peeks.into_values() {
229self.commands.push(ComputeCommand::Peek(peek));
230 }
231232 command_counts.cancel_peek.borrow().set(0);
233234// Allow compaction only after emitting peek commands.
235let count = u64::cast_from(final_frontiers.len());
236 command_counts.allow_compaction.borrow().set(count);
237for (id, frontier) in final_frontiers {
238self.commands
239 .push(ComputeCommand::AllowCompaction { id, frontier });
240 }
241242let count = u64::from(initialization_complete);
243 command_counts.initialization_complete.borrow().set(count);
244if initialization_complete {
245self.commands.push(ComputeCommand::InitializationComplete);
246 }
247248if allow_writes {
249self.commands.push(ComputeCommand::AllowWrites);
250 }
251252self.reduced_count = self.commands.len();
253 }
254255/// Discard all peek commands.
256pub fn discard_peeks(&mut self) {
257self.commands.retain(|command| {
258use ComputeCommand::*;
259let is_peek = matches!(command, Peek(_) | CancelPeek { .. });
260if is_peek {
261self.metrics
262 .command_counts
263 .for_command(command)
264 .borrow()
265 .dec();
266 }
267 !is_peek
268 });
269 }
270271/// Update the source import uppers to reflect the current state of the imported collections.
272 ///
273 /// This method should be called after compacting the history to make sure that the dataflow
274 /// descriptions do not mention storage collections that don't exist anymore. Its main
275 /// purpose is to advance the uppers when connecting a new replica.
276pub fn update_source_uppers(&mut self, storage_collections: &StorageCollections<T>) {
277for command in &mut self.commands {
278if let ComputeCommand::CreateDataflow(dataflow) = command {
279for (id, (_, _, upper)) in dataflow.source_imports.iter_mut() {
280let frontiers = storage_collections
281 .collection_frontiers(*id)
282 .expect("collection exists");
283284*upper = frontiers.write_frontier;
285 }
286 }
287 }
288 }
289290/// Iterate through the contained commands.
291pub fn iter(&self) -> impl Iterator<Item = &ComputeCommand<T>> {
292self.commands.iter()
293 }
294}