mz_storage_controller/
history.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A reducible history of storage commands.
11
12use std::collections::BTreeMap;
13
14use mz_ore::cast::CastFrom;
15use mz_storage_client::client::StorageCommand;
16use mz_storage_client::metrics::HistoryMetrics;
17use mz_storage_types::parameters::StorageParameters;
18use timely::order::TotalOrder;
19
20/// A history of storage commands.
21#[derive(Debug)]
22pub(crate) struct CommandHistory<T> {
23    /// The number of commands at the last time we compacted the history.
24    reduced_count: usize,
25    /// The sequence of commands that should be applied.
26    ///
27    /// This list may not be "compact" in that there can be commands that could be optimized or
28    /// removed given the context of other commands, for example compaction commands that can be
29    /// unified, or run commands that can be dropped due to allowed compaction.
30    commands: Vec<StorageCommand<T>>,
31    /// Tracked metrics.
32    metrics: HistoryMetrics,
33}
34
35impl<T: timely::progress::Timestamp + TotalOrder> CommandHistory<T> {
36    /// Constructs a new command history.
37    pub fn new(metrics: HistoryMetrics) -> Self {
38        metrics.reset();
39
40        Self {
41            reduced_count: 0,
42            commands: Vec::new(),
43            metrics,
44        }
45    }
46
47    /// Returns an iterator over the contained storage commands.
48    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand<T>> {
49        self.commands.iter()
50    }
51
52    /// Adds a command to the history.
53    ///
54    /// This action will reduce the history every time it doubles.
55    pub fn push(&mut self, command: StorageCommand<T>) {
56        self.commands.push(command);
57
58        if self.commands.len() > 2 * self.reduced_count {
59            self.reduce();
60        } else {
61            // Refresh reported metrics. `reduce` already refreshes metrics, so we only need to do
62            // that here in the non-reduce case.
63            let command = self.commands.last().expect("pushed above");
64            self.metrics.command_counts.for_command(command).inc();
65        }
66    }
67
68    /// Reduces the command history to a minimal form.
69    pub fn reduce(&mut self) {
70        use StorageCommand::*;
71
72        let mut hello_command = None;
73        let mut initialization_complete = false;
74        let mut allow_writes = false;
75        let mut final_compactions = BTreeMap::new();
76
77        // Collect the final definitions of ingestions and sinks.
78        // The same object ID can occur in multiple run commands when an object was altered. In
79        // this scenario, we only want to send the most recent definition of the object.
80        let mut final_ingestions = BTreeMap::new();
81        let mut final_sinks = BTreeMap::new();
82        let mut final_oneshot_ingestions = BTreeMap::new();
83
84        // Collect only the final configuration.
85        // Note that this means the final configuration is applied to all objects installed on the
86        // new replica during initialization, even when the same objects where installed with an
87        // older config on existing replicas. This is only correct as long as config parameters
88        // don't affect the output of storage objects, as that would make different replicas write
89        // different data, which is likely to produce inconsistencies.
90        let mut final_configuration = StorageParameters::default();
91
92        for command in self.commands.drain(..) {
93            match command {
94                cmd @ Hello { .. } => hello_command = Some(cmd),
95                InitializationComplete => initialization_complete = true,
96                AllowWrites => allow_writes = true,
97                UpdateConfiguration(params) => final_configuration.update(*params),
98                RunIngestion(ingestion) => {
99                    final_ingestions.insert(ingestion.id, ingestion);
100                }
101                RunSink(sink) => {
102                    final_sinks.insert(sink.id, sink);
103                }
104                AllowCompaction(id, since) => {
105                    final_compactions.insert(id, since);
106                }
107                RunOneshotIngestion(oneshot) => {
108                    final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
109                }
110                CancelOneshotIngestion(uuid) => {
111                    final_oneshot_ingestions.remove(&uuid);
112                }
113            }
114        }
115
116        let mut run_ingestions = Vec::new();
117        let mut run_sinks = Vec::new();
118        let mut allow_compaction = Vec::new();
119
120        // Discard ingestions that have been dropped, keep the rest.
121        for ingestion in final_ingestions.into_values() {
122            if let Some(frontier) = final_compactions.get(&ingestion.id) {
123                if frontier.is_empty() {
124                    continue;
125                }
126            }
127
128            let compactions = ingestion
129                .description
130                .collection_ids()
131                .filter_map(|id| final_compactions.remove(&id).map(|f| (id, f)));
132            allow_compaction.extend(compactions);
133
134            run_ingestions.push(ingestion);
135        }
136
137        // Discard sinks that have been dropped, advance the as-of of the rest.
138        for sink in final_sinks.into_values() {
139            if let Some(frontier) = final_compactions.remove(&sink.id) {
140                if frontier.is_empty() {
141                    continue;
142                }
143            }
144
145            run_sinks.push(sink);
146        }
147
148        // Reconstitute the commands as a compact history.
149        //
150        // When we update `metrics`, we need to be careful to not transiently report incorrect
151        // counts, as they would be observable by other threads. For example, we should not call
152        // `metrics.reset()` here, since otherwise the command history would appear empty for a
153        // brief amount of time.
154        let command_counts = &self.metrics.command_counts;
155
156        let count = u64::from(hello_command.is_some());
157        command_counts.hello.set(count);
158        if let Some(hello) = hello_command {
159            self.commands.push(hello);
160        }
161
162        let count = u64::from(!final_configuration.all_unset());
163        command_counts.update_configuration.set(count);
164        if !final_configuration.all_unset() {
165            let config = Box::new(final_configuration);
166            self.commands
167                .push(StorageCommand::UpdateConfiguration(config));
168        }
169
170        let count = u64::cast_from(run_ingestions.len());
171        command_counts.run_ingestion.set(count);
172        for ingestion in run_ingestions {
173            self.commands.push(StorageCommand::RunIngestion(ingestion));
174        }
175
176        let count = u64::cast_from(run_sinks.len());
177        command_counts.run_sink.set(count);
178        for sink in run_sinks {
179            self.commands.push(StorageCommand::RunSink(sink));
180        }
181
182        // Note: RunOneshotIngestion commands are reduced, as we receive
183        // CancelOneshotIngestion commands.
184        let count = u64::cast_from(final_oneshot_ingestions.len());
185        command_counts.run_oneshot_ingestion.set(count);
186        for ingestion in final_oneshot_ingestions.into_values() {
187            self.commands
188                .push(StorageCommand::RunOneshotIngestion(ingestion));
189        }
190
191        command_counts.cancel_oneshot_ingestion.set(0);
192
193        let count = u64::cast_from(allow_compaction.len());
194        command_counts.allow_compaction.set(count);
195        for (id, since) in allow_compaction {
196            self.commands
197                .push(StorageCommand::AllowCompaction(id, since));
198        }
199
200        let count = u64::from(initialization_complete);
201        command_counts.initialization_complete.set(count);
202        if initialization_complete {
203            self.commands.push(StorageCommand::InitializationComplete);
204        }
205
206        let count = u64::from(allow_writes);
207        command_counts.allow_writes.set(count);
208        if allow_writes {
209            self.commands.push(StorageCommand::AllowWrites);
210        }
211
212        self.reduced_count = self.commands.len();
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use std::str::FromStr;
219
220    use mz_cluster_client::metrics::ControllerMetrics;
221    use mz_ore::metrics::MetricsRegistry;
222    use mz_ore::url::SensitiveUrl;
223    use mz_persist_types::PersistLocation;
224    use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationType};
225    use mz_storage_client::client::{RunIngestionCommand, RunSinkCommand};
226    use mz_storage_client::metrics::StorageControllerMetrics;
227    use mz_storage_types::connections::inline::InlinedConnection;
228    use mz_storage_types::connections::{KafkaConnection, Tunnel};
229    use mz_storage_types::controller::CollectionMetadata;
230    use mz_storage_types::instances::StorageInstanceId;
231    use mz_storage_types::sinks::{
232        KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
233        KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection, StorageSinkDesc,
234    };
235    use mz_storage_types::sources::load_generator::{
236        LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
237    };
238    use mz_storage_types::sources::{
239        GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection,
240        SourceConnection, SourceDesc, SourceEnvelope, SourceExport, SourceExportDataConfig,
241        SourceExportDetails,
242    };
243    use timely::progress::Antichain;
244
245    use super::*;
246
247    fn history() -> CommandHistory<u64> {
248        let registry = MetricsRegistry::new();
249        let controller_metrics = ControllerMetrics::new(&registry);
250        let metrics = StorageControllerMetrics::new(&registry, controller_metrics)
251            .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID"))
252            .for_history();
253
254        CommandHistory::new(metrics)
255    }
256
257    fn ingestion_description<S: Into<Vec<u64>>>(
258        ingestion_id: u64,
259        subsource_ids: S,
260        remap_collection_id: u64,
261    ) -> IngestionDescription<CollectionMetadata, InlinedConnection> {
262        let export_ids = [ingestion_id, remap_collection_id]
263            .into_iter()
264            .chain(subsource_ids.into());
265        let source_exports = export_ids
266            .map(|id| {
267                let export = SourceExport {
268                    storage_metadata: CollectionMetadata {
269                        persist_location: PersistLocation {
270                            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
271                            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
272                        },
273                        remap_shard: Default::default(),
274                        data_shard: Default::default(),
275                        relation_desc: RelationDesc::new(
276                            RelationType {
277                                column_types: Default::default(),
278                                keys: Default::default(),
279                            },
280                            Vec::<String>::new(),
281                        ),
282                        txns_shard: Default::default(),
283                    },
284                    details: SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
285                        output: LoadGeneratorOutput::Default,
286                    }),
287                    data_config: SourceExportDataConfig {
288                        encoding: Default::default(),
289                        envelope: SourceEnvelope::CdcV2,
290                    },
291                };
292                (GlobalId::User(id), export)
293            })
294            .collect();
295
296        let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
297            load_generator: LoadGenerator::Auction,
298            tick_micros: Default::default(),
299            as_of: Default::default(),
300            up_to: Default::default(),
301        });
302        let primary_export_details = connection.primary_export_details();
303
304        IngestionDescription {
305            desc: SourceDesc {
306                connection,
307                primary_export: SourceExportDataConfig {
308                    encoding: Default::default(),
309                    envelope: SourceEnvelope::CdcV2,
310                },
311                primary_export_details,
312                timestamp_interval: Default::default(),
313            },
314            ingestion_metadata: CollectionMetadata {
315                persist_location: PersistLocation {
316                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
317                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
318                },
319                remap_shard: Default::default(),
320                data_shard: Default::default(),
321                relation_desc: RelationDesc::new(
322                    RelationType {
323                        column_types: Default::default(),
324                        keys: Default::default(),
325                    },
326                    Vec::<String>::new(),
327                ),
328                txns_shard: Default::default(),
329            },
330            source_exports,
331            instance_id: StorageInstanceId::system(0).expect("0 is a valid ID"),
332            remap_collection_id: GlobalId::User(remap_collection_id),
333        }
334    }
335
336    fn sink_description() -> StorageSinkDesc<CollectionMetadata, u64> {
337        StorageSinkDesc {
338            from: GlobalId::System(1),
339            from_desc: RelationDesc::new(
340                RelationType {
341                    column_types: Default::default(),
342                    keys: Default::default(),
343                },
344                Vec::<String>::new(),
345            ),
346            connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
347                connection_id: CatalogItemId::System(2),
348                connection: KafkaConnection {
349                    brokers: Default::default(),
350                    default_tunnel: Tunnel::Direct,
351                    progress_topic: Default::default(),
352                    progress_topic_options: Default::default(),
353                    options: Default::default(),
354                    tls: Default::default(),
355                    sasl: Default::default(),
356                },
357                format: KafkaSinkFormat {
358                    key_format: Default::default(),
359                    value_format: KafkaSinkFormatType::Text,
360                },
361                relation_key_indices: Default::default(),
362                key_desc_and_indices: Default::default(),
363                headers_index: Default::default(),
364                value_desc: RelationDesc::new(
365                    RelationType {
366                        column_types: Default::default(),
367                        keys: Default::default(),
368                    },
369                    Vec::<String>::new(),
370                ),
371                partition_by: Default::default(),
372                topic: Default::default(),
373                topic_options: Default::default(),
374                compression_type: KafkaSinkCompressionType::None,
375                progress_group_id: KafkaIdStyle::Legacy,
376                transactional_id: KafkaIdStyle::Legacy,
377                topic_metadata_refresh_interval: Default::default(),
378            }),
379            with_snapshot: Default::default(),
380            version: Default::default(),
381            envelope: SinkEnvelope::Upsert,
382            as_of: Antichain::from_elem(0),
383            from_storage_metadata: CollectionMetadata {
384                persist_location: PersistLocation {
385                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
386                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
387                },
388                remap_shard: Default::default(),
389                data_shard: Default::default(),
390                relation_desc: RelationDesc::new(
391                    RelationType {
392                        column_types: Default::default(),
393                        keys: Default::default(),
394                    },
395                    Vec::<String>::new(),
396                ),
397                txns_shard: Default::default(),
398            },
399            to_storage_metadata: CollectionMetadata {
400                persist_location: PersistLocation {
401                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
402                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
403                },
404                remap_shard: Default::default(),
405                data_shard: Default::default(),
406                relation_desc: RelationDesc::new(
407                    RelationType {
408                        column_types: Default::default(),
409                        keys: Default::default(),
410                    },
411                    Vec::<String>::new(),
412                ),
413                txns_shard: Default::default(),
414            },
415        }
416    }
417
418    #[mz_ore::test]
419    fn reduce_drops_dropped_ingestion() {
420        let mut history = history();
421
422        let commands = [
423            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
424                id: GlobalId::User(1),
425                description: ingestion_description(1, [2], 3),
426            })),
427            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
428            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
429            StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::new()),
430        ];
431
432        for cmd in commands {
433            history.push(cmd);
434        }
435
436        history.reduce();
437
438        let commands_after: Vec<_> = history.iter().collect();
439        assert!(commands_after.is_empty(), "{:?}", commands_after);
440    }
441
442    #[mz_ore::test]
443    fn reduce_keeps_compacted_ingestion() {
444        let mut history = history();
445
446        let commands = [
447            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
448                id: GlobalId::User(1),
449                description: ingestion_description(1, [2], 3),
450            })),
451            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(1)),
452            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
453            StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::from_elem(3)),
454        ];
455
456        for cmd in commands.clone() {
457            history.push(cmd);
458        }
459
460        history.reduce();
461
462        let commands_after: Vec<_> = history.iter().cloned().collect();
463        assert_eq!(commands_after, commands);
464    }
465
466    #[mz_ore::test]
467    fn reduce_keeps_partially_dropped_ingestion() {
468        let mut history = history();
469
470        let commands = [
471            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
472                id: GlobalId::User(1),
473                description: ingestion_description(1, [2], 3),
474            })),
475            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
476        ];
477
478        for cmd in commands.clone() {
479            history.push(cmd);
480        }
481
482        history.reduce();
483
484        let commands_after: Vec<_> = history.iter().cloned().collect();
485        assert_eq!(commands_after, commands);
486    }
487
488    #[mz_ore::test]
489    fn reduce_drops_dropped_sink() {
490        let mut history = history();
491
492        let commands = [
493            StorageCommand::RunSink(Box::new(RunSinkCommand {
494                id: GlobalId::User(1),
495                description: sink_description(),
496            })),
497            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
498        ];
499
500        for cmd in commands {
501            history.push(cmd);
502        }
503
504        history.reduce();
505
506        let commands_after: Vec<_> = history.iter().collect();
507        assert!(commands_after.is_empty(), "{:?}", commands_after);
508    }
509
510    #[mz_ore::test]
511    fn reduce_keeps_compacted_sink() {
512        let mut history = history();
513
514        let sink_desc = sink_description();
515        let commands = [
516            StorageCommand::RunSink(Box::new(RunSinkCommand {
517                id: GlobalId::User(1),
518                description: sink_desc.clone(),
519            })),
520            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(42)),
521        ];
522
523        for cmd in commands {
524            history.push(cmd);
525        }
526
527        history.reduce();
528
529        let commands_after: Vec<_> = history.iter().cloned().collect();
530
531        let expected_commands = [StorageCommand::RunSink(Box::new(RunSinkCommand {
532            id: GlobalId::User(1),
533            description: sink_desc,
534        }))];
535
536        assert_eq!(commands_after, expected_commands);
537    }
538
539    #[mz_ore::test]
540    fn reduce_drops_stray_compactions() {
541        let mut history = history();
542
543        let commands = [
544            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
545            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(1)),
546            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
547        ];
548
549        for cmd in commands {
550            history.push(cmd);
551        }
552
553        history.reduce();
554
555        let commands_after: Vec<_> = history.iter().collect();
556        assert!(commands_after.is_empty(), "{:?}", commands_after);
557    }
558}