Skip to main content

mz_storage_controller/
history.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A reducible history of storage commands.
11
12use std::collections::BTreeMap;
13
14use mz_ore::cast::CastFrom;
15use mz_storage_client::client::StorageCommand;
16use mz_storage_client::metrics::HistoryMetrics;
17use mz_storage_types::parameters::StorageParameters;
18
19/// A history of storage commands.
20#[derive(Debug)]
21pub(crate) struct CommandHistory {
22    /// The number of commands at the last time we compacted the history.
23    reduced_count: usize,
24    /// The sequence of commands that should be applied.
25    ///
26    /// This list may not be "compact" in that there can be commands that could be optimized or
27    /// removed given the context of other commands, for example compaction commands that can be
28    /// unified, or run commands that can be dropped due to allowed compaction.
29    commands: Vec<StorageCommand>,
30    /// Tracked metrics.
31    metrics: HistoryMetrics,
32}
33
34impl CommandHistory {
35    /// Constructs a new command history.
36    pub fn new(metrics: HistoryMetrics) -> Self {
37        metrics.reset();
38
39        Self {
40            reduced_count: 0,
41            commands: Vec::new(),
42            metrics,
43        }
44    }
45
46    /// Returns an iterator over the contained storage commands.
47    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand> {
48        self.commands.iter()
49    }
50
51    /// Adds a command to the history.
52    ///
53    /// This action will reduce the history every time it doubles.
54    pub fn push(&mut self, command: StorageCommand) {
55        self.commands.push(command);
56
57        if self.commands.len() > 2 * self.reduced_count {
58            self.reduce();
59        } else {
60            // Refresh reported metrics. `reduce` already refreshes metrics, so we only need to do
61            // that here in the non-reduce case.
62            let command = self.commands.last().expect("pushed above");
63            self.metrics.command_counts.for_command(command).inc();
64        }
65    }
66
67    /// Reduces the command history to a minimal form.
68    pub fn reduce(&mut self) {
69        use StorageCommand::*;
70
71        let mut hello_command = None;
72        let mut initialization_complete = false;
73        let mut allow_writes = false;
74        let mut final_compactions = BTreeMap::new();
75
76        // Collect the final definitions of ingestions and sinks.
77        // The same object ID can occur in multiple run commands when an object was altered. In
78        // this scenario, we only want to send the most recent definition of the object.
79        let mut final_ingestions = BTreeMap::new();
80        let mut final_sinks = BTreeMap::new();
81        let mut final_oneshot_ingestions = BTreeMap::new();
82
83        // Collect only the final configuration.
84        // Note that this means the final configuration is applied to all objects installed on the
85        // new replica during initialization, even when the same objects where installed with an
86        // older config on existing replicas. This is only correct as long as config parameters
87        // don't affect the output of storage objects, as that would make different replicas write
88        // different data, which is likely to produce inconsistencies.
89        let mut final_configuration = StorageParameters::default();
90
91        for command in self.commands.drain(..) {
92            match command {
93                cmd @ Hello { .. } => hello_command = Some(cmd),
94                InitializationComplete => initialization_complete = true,
95                AllowWrites => allow_writes = true,
96                UpdateConfiguration(params) => final_configuration.update(*params),
97                RunIngestion(ingestion) => {
98                    final_ingestions.insert(ingestion.id, ingestion);
99                }
100                RunSink(sink) => {
101                    final_sinks.insert(sink.id, sink);
102                }
103                AllowCompaction(id, since) => {
104                    final_compactions.insert(id, since);
105                }
106                RunOneshotIngestion(oneshot) => {
107                    final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
108                }
109                CancelOneshotIngestion(uuid) => {
110                    final_oneshot_ingestions.remove(&uuid);
111                }
112            }
113        }
114
115        let mut run_ingestions = Vec::new();
116        let mut run_sinks = Vec::new();
117        let mut allow_compaction = Vec::new();
118
119        // Discard ingestions that have been dropped, keep the rest.
120        for ingestion in final_ingestions.into_values() {
121            if let Some(frontier) = final_compactions.get(&ingestion.id) {
122                if frontier.is_empty() {
123                    continue;
124                }
125            }
126
127            let compactions = ingestion
128                .description
129                .collection_ids()
130                .filter_map(|id| final_compactions.remove(&id).map(|f| (id, f)));
131            allow_compaction.extend(compactions);
132
133            run_ingestions.push(ingestion);
134        }
135
136        // Discard sinks that have been dropped, advance the as-of of the rest.
137        for sink in final_sinks.into_values() {
138            if let Some(frontier) = final_compactions.remove(&sink.id) {
139                if frontier.is_empty() {
140                    continue;
141                }
142            }
143
144            run_sinks.push(sink);
145        }
146
147        // Reconstitute the commands as a compact history.
148        //
149        // When we update `metrics`, we need to be careful to not transiently report incorrect
150        // counts, as they would be observable by other threads. For example, we should not call
151        // `metrics.reset()` here, since otherwise the command history would appear empty for a
152        // brief amount of time.
153        let command_counts = &self.metrics.command_counts;
154
155        let count = u64::from(hello_command.is_some());
156        command_counts.hello.set(count);
157        if let Some(hello) = hello_command {
158            self.commands.push(hello);
159        }
160
161        let count = u64::from(!final_configuration.all_unset());
162        command_counts.update_configuration.set(count);
163        if !final_configuration.all_unset() {
164            let config = Box::new(final_configuration);
165            self.commands
166                .push(StorageCommand::UpdateConfiguration(config));
167        }
168
169        let count = u64::cast_from(run_ingestions.len());
170        command_counts.run_ingestion.set(count);
171        for ingestion in run_ingestions {
172            self.commands.push(StorageCommand::RunIngestion(ingestion));
173        }
174
175        let count = u64::cast_from(run_sinks.len());
176        command_counts.run_sink.set(count);
177        for sink in run_sinks {
178            self.commands.push(StorageCommand::RunSink(sink));
179        }
180
181        // Note: RunOneshotIngestion commands are reduced, as we receive
182        // CancelOneshotIngestion commands.
183        let count = u64::cast_from(final_oneshot_ingestions.len());
184        command_counts.run_oneshot_ingestion.set(count);
185        for ingestion in final_oneshot_ingestions.into_values() {
186            self.commands
187                .push(StorageCommand::RunOneshotIngestion(ingestion));
188        }
189
190        command_counts.cancel_oneshot_ingestion.set(0);
191
192        let count = u64::cast_from(allow_compaction.len());
193        command_counts.allow_compaction.set(count);
194        for (id, since) in allow_compaction {
195            self.commands
196                .push(StorageCommand::AllowCompaction(id, since));
197        }
198
199        let count = u64::from(initialization_complete);
200        command_counts.initialization_complete.set(count);
201        if initialization_complete {
202            self.commands.push(StorageCommand::InitializationComplete);
203        }
204
205        let count = u64::from(allow_writes);
206        command_counts.allow_writes.set(count);
207        if allow_writes {
208            self.commands.push(StorageCommand::AllowWrites);
209        }
210
211        self.reduced_count = self.commands.len();
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use std::str::FromStr;
218
219    use mz_cluster_client::metrics::ControllerMetrics;
220    use mz_ore::metrics::MetricsRegistry;
221    use mz_ore::url::SensitiveUrl;
222    use mz_persist_types::PersistLocation;
223    use mz_repr::{CatalogItemId, GlobalId, RelationDesc, SqlRelationType};
224    use mz_storage_client::client::{RunIngestionCommand, RunSinkCommand};
225    use mz_storage_client::metrics::StorageControllerMetrics;
226    use mz_storage_types::connections::inline::InlinedConnection;
227    use mz_storage_types::connections::{KafkaConnection, Tunnel};
228    use mz_storage_types::controller::CollectionMetadata;
229    use mz_storage_types::instances::StorageInstanceId;
230    use mz_storage_types::sinks::{
231        KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
232        KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection, StorageSinkDesc,
233    };
234    use mz_storage_types::sources::load_generator::{
235        LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
236    };
237    use mz_storage_types::sources::{
238        GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection, SourceDesc,
239        SourceEnvelope, SourceExport, SourceExportDataConfig, SourceExportDetails,
240    };
241    use timely::progress::Antichain;
242
243    use super::*;
244
245    fn history() -> CommandHistory {
246        let registry = MetricsRegistry::new();
247        let controller_metrics = ControllerMetrics::new(&registry);
248        let metrics = StorageControllerMetrics::new(&registry, controller_metrics)
249            .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID"))
250            .for_history();
251
252        CommandHistory::new(metrics)
253    }
254
255    fn ingestion_description<S: Into<Vec<u64>>>(
256        ingestion_id: u64,
257        subsource_ids: S,
258        remap_collection_id: u64,
259    ) -> IngestionDescription<CollectionMetadata, InlinedConnection> {
260        let export_ids = [ingestion_id, remap_collection_id]
261            .into_iter()
262            .chain(subsource_ids.into());
263        let source_exports = export_ids
264            .map(|id| {
265                let export = SourceExport {
266                    storage_metadata: CollectionMetadata {
267                        persist_location: PersistLocation {
268                            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
269                            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
270                        },
271                        data_shard: Default::default(),
272                        relation_desc: RelationDesc::new(
273                            SqlRelationType {
274                                column_types: Default::default(),
275                                keys: Default::default(),
276                            },
277                            Vec::<String>::new(),
278                        ),
279                        txns_shard: Default::default(),
280                    },
281                    details: SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
282                        output: LoadGeneratorOutput::Default,
283                    }),
284                    data_config: SourceExportDataConfig {
285                        encoding: Default::default(),
286                        envelope: SourceEnvelope::CdcV2,
287                    },
288                };
289                (GlobalId::User(id), export)
290            })
291            .collect();
292
293        let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
294            load_generator: LoadGenerator::Auction,
295            tick_micros: Default::default(),
296            as_of: Default::default(),
297            up_to: Default::default(),
298        });
299
300        IngestionDescription {
301            desc: SourceDesc {
302                connection,
303                timestamp_interval: Default::default(),
304            },
305            remap_metadata: CollectionMetadata {
306                persist_location: PersistLocation {
307                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
308                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
309                },
310                data_shard: Default::default(),
311                relation_desc: RelationDesc::new(
312                    SqlRelationType {
313                        column_types: Default::default(),
314                        keys: Default::default(),
315                    },
316                    Vec::<String>::new(),
317                ),
318                txns_shard: Default::default(),
319            },
320            source_exports,
321            instance_id: StorageInstanceId::system(0).expect("0 is a valid ID"),
322            remap_collection_id: GlobalId::User(remap_collection_id),
323        }
324    }
325
326    fn sink_description() -> StorageSinkDesc<CollectionMetadata> {
327        StorageSinkDesc {
328            from: GlobalId::System(1),
329            from_desc: RelationDesc::new(
330                SqlRelationType {
331                    column_types: Default::default(),
332                    keys: Default::default(),
333                },
334                Vec::<String>::new(),
335            ),
336            connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
337                connection_id: CatalogItemId::System(2),
338                connection: KafkaConnection {
339                    brokers: Default::default(),
340                    default_tunnel: Tunnel::Direct,
341                    progress_topic: Default::default(),
342                    progress_topic_options: Default::default(),
343                    options: Default::default(),
344                    tls: Default::default(),
345                    sasl: Default::default(),
346                },
347                format: KafkaSinkFormat {
348                    key_format: Default::default(),
349                    value_format: KafkaSinkFormatType::Text,
350                },
351                relation_key_indices: Default::default(),
352                key_desc_and_indices: Default::default(),
353                headers_index: Default::default(),
354                value_desc: RelationDesc::new(
355                    SqlRelationType {
356                        column_types: Default::default(),
357                        keys: Default::default(),
358                    },
359                    Vec::<String>::new(),
360                ),
361                partition_by: Default::default(),
362                topic: Default::default(),
363                topic_options: Default::default(),
364                compression_type: KafkaSinkCompressionType::None,
365                progress_group_id: KafkaIdStyle::Legacy,
366                transactional_id: KafkaIdStyle::Legacy,
367                topic_metadata_refresh_interval: Default::default(),
368            }),
369            with_snapshot: Default::default(),
370            version: Default::default(),
371            envelope: SinkEnvelope::Upsert,
372            as_of: Antichain::from_elem(0.into()),
373            from_storage_metadata: CollectionMetadata {
374                persist_location: PersistLocation {
375                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
376                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
377                },
378                data_shard: Default::default(),
379                relation_desc: RelationDesc::new(
380                    SqlRelationType {
381                        column_types: Default::default(),
382                        keys: Default::default(),
383                    },
384                    Vec::<String>::new(),
385                ),
386                txns_shard: Default::default(),
387            },
388            to_storage_metadata: CollectionMetadata {
389                persist_location: PersistLocation {
390                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
391                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
392                },
393                data_shard: Default::default(),
394                relation_desc: RelationDesc::new(
395                    SqlRelationType {
396                        column_types: Default::default(),
397                        keys: Default::default(),
398                    },
399                    Vec::<String>::new(),
400                ),
401                txns_shard: Default::default(),
402            },
403            commit_interval: Default::default(),
404        }
405    }
406
407    #[mz_ore::test]
408    fn reduce_drops_dropped_ingestion() {
409        let mut history = history();
410
411        let commands = [
412            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
413                id: GlobalId::User(1),
414                description: ingestion_description(1, [2], 3),
415            })),
416            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
417            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
418            StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::new()),
419        ];
420
421        for cmd in commands {
422            history.push(cmd);
423        }
424
425        history.reduce();
426
427        let commands_after: Vec<_> = history.iter().collect();
428        assert!(commands_after.is_empty(), "{:?}", commands_after);
429    }
430
431    #[mz_ore::test]
432    fn reduce_keeps_compacted_ingestion() {
433        let mut history = history();
434
435        let commands = [
436            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
437                id: GlobalId::User(1),
438                description: ingestion_description(1, [2], 3),
439            })),
440            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(1.into())),
441            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2.into())),
442            StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::from_elem(3.into())),
443        ];
444
445        for cmd in commands.clone() {
446            history.push(cmd);
447        }
448
449        history.reduce();
450
451        let commands_after: Vec<_> = history.iter().cloned().collect();
452        assert_eq!(commands_after, commands);
453    }
454
455    #[mz_ore::test]
456    fn reduce_keeps_partially_dropped_ingestion() {
457        let mut history = history();
458
459        let commands = [
460            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
461                id: GlobalId::User(1),
462                description: ingestion_description(1, [2], 3),
463            })),
464            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
465        ];
466
467        for cmd in commands.clone() {
468            history.push(cmd);
469        }
470
471        history.reduce();
472
473        let commands_after: Vec<_> = history.iter().cloned().collect();
474        assert_eq!(commands_after, commands);
475    }
476
477    #[mz_ore::test]
478    fn reduce_drops_dropped_sink() {
479        let mut history = history();
480
481        let commands = [
482            StorageCommand::RunSink(Box::new(RunSinkCommand {
483                id: GlobalId::User(1),
484                description: sink_description(),
485            })),
486            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
487        ];
488
489        for cmd in commands {
490            history.push(cmd);
491        }
492
493        history.reduce();
494
495        let commands_after: Vec<_> = history.iter().collect();
496        assert!(commands_after.is_empty(), "{:?}", commands_after);
497    }
498
499    #[mz_ore::test]
500    fn reduce_keeps_compacted_sink() {
501        let mut history = history();
502
503        let sink_desc = sink_description();
504        let commands = [
505            StorageCommand::RunSink(Box::new(RunSinkCommand {
506                id: GlobalId::User(1),
507                description: sink_desc.clone(),
508            })),
509            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(42.into())),
510        ];
511
512        for cmd in commands {
513            history.push(cmd);
514        }
515
516        history.reduce();
517
518        let commands_after: Vec<_> = history.iter().cloned().collect();
519
520        let expected_commands = [StorageCommand::RunSink(Box::new(RunSinkCommand {
521            id: GlobalId::User(1),
522            description: sink_desc,
523        }))];
524
525        assert_eq!(commands_after, expected_commands);
526    }
527
528    #[mz_ore::test]
529    fn reduce_drops_stray_compactions() {
530        let mut history = history();
531
532        let commands = [
533            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
534            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(1.into())),
535            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2.into())),
536        ];
537
538        for cmd in commands {
539            history.push(cmd);
540        }
541
542        history.reduce();
543
544        let commands_after: Vec<_> = history.iter().collect();
545        assert!(commands_after.is_empty(), "{:?}", commands_after);
546    }
547}