mz_storage_controller/
history.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A reducible history of storage commands.
11
12use std::collections::BTreeMap;
13
14use mz_ore::cast::CastFrom;
15use mz_storage_client::client::StorageCommand;
16use mz_storage_client::metrics::HistoryMetrics;
17use mz_storage_types::parameters::StorageParameters;
18use timely::order::TotalOrder;
19
20/// A history of storage commands.
21#[derive(Debug)]
22pub(crate) struct CommandHistory<T> {
23    /// The number of commands at the last time we compacted the history.
24    reduced_count: usize,
25    /// The sequence of commands that should be applied.
26    ///
27    /// This list may not be "compact" in that there can be commands that could be optimized or
28    /// removed given the context of other commands, for example compaction commands that can be
29    /// unified, or run commands that can be dropped due to allowed compaction.
30    commands: Vec<StorageCommand<T>>,
31    /// Tracked metrics.
32    metrics: HistoryMetrics,
33}
34
35impl<T: timely::progress::Timestamp + TotalOrder> CommandHistory<T> {
36    /// Constructs a new command history.
37    pub fn new(metrics: HistoryMetrics) -> Self {
38        metrics.reset();
39
40        Self {
41            reduced_count: 0,
42            commands: Vec::new(),
43            metrics,
44        }
45    }
46
47    /// Returns an iterator over the contained storage commands.
48    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand<T>> {
49        self.commands.iter()
50    }
51
52    /// Adds a command to the history.
53    ///
54    /// This action will reduce the history every time it doubles.
55    pub fn push(&mut self, command: StorageCommand<T>) {
56        self.commands.push(command);
57
58        if self.commands.len() > 2 * self.reduced_count {
59            self.reduce();
60        } else {
61            // Refresh reported metrics. `reduce` already refreshes metrics, so we only need to do
62            // that here in the non-reduce case.
63            let command = self.commands.last().expect("pushed above");
64            self.metrics.command_counts.for_command(command).inc();
65        }
66    }
67
68    /// Reduces the command history to a minimal form.
69    pub fn reduce(&mut self) {
70        use StorageCommand::*;
71
72        let mut hello_command = None;
73        let mut initialization_complete = false;
74        let mut allow_writes = false;
75        let mut final_compactions = BTreeMap::new();
76
77        // Collect the final definitions of ingestions and sinks.
78        // The same object ID can occur in multiple run commands when an object was altered. In
79        // this scenario, we only want to send the most recent definition of the object.
80        let mut final_ingestions = BTreeMap::new();
81        let mut final_sinks = BTreeMap::new();
82        let mut final_oneshot_ingestions = BTreeMap::new();
83
84        // Collect only the final configuration.
85        // Note that this means the final configuration is applied to all objects installed on the
86        // new replica during initialization, even when the same objects where installed with an
87        // older config on existing replicas. This is only correct as long as config parameters
88        // don't affect the output of storage objects, as that would make different replicas write
89        // different data, which is likely to produce inconsistencies.
90        let mut final_configuration = StorageParameters::default();
91
92        for command in self.commands.drain(..) {
93            match command {
94                cmd @ Hello { .. } => hello_command = Some(cmd),
95                InitializationComplete => initialization_complete = true,
96                AllowWrites => allow_writes = true,
97                UpdateConfiguration(params) => final_configuration.update(*params),
98                RunIngestion(ingestion) => {
99                    final_ingestions.insert(ingestion.id, ingestion);
100                }
101                RunSink(sink) => {
102                    final_sinks.insert(sink.id, sink);
103                }
104                AllowCompaction(id, since) => {
105                    final_compactions.insert(id, since);
106                }
107                RunOneshotIngestion(oneshot) => {
108                    final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
109                }
110                CancelOneshotIngestion(uuid) => {
111                    final_oneshot_ingestions.remove(&uuid);
112                }
113            }
114        }
115
116        let mut run_ingestions = Vec::new();
117        let mut run_sinks = Vec::new();
118        let mut allow_compaction = Vec::new();
119
120        // Discard ingestions that have been dropped, keep the rest.
121        for ingestion in final_ingestions.into_values() {
122            if let Some(frontier) = final_compactions.get(&ingestion.id) {
123                if frontier.is_empty() {
124                    continue;
125                }
126            }
127
128            let compactions = ingestion
129                .description
130                .collection_ids()
131                .filter_map(|id| final_compactions.remove(&id).map(|f| (id, f)));
132            allow_compaction.extend(compactions);
133
134            run_ingestions.push(ingestion);
135        }
136
137        // Discard sinks that have been dropped, advance the as-of of the rest.
138        for sink in final_sinks.into_values() {
139            if let Some(frontier) = final_compactions.remove(&sink.id) {
140                if frontier.is_empty() {
141                    continue;
142                }
143            }
144
145            run_sinks.push(sink);
146        }
147
148        // Reconstitute the commands as a compact history.
149        //
150        // When we update `metrics`, we need to be careful to not transiently report incorrect
151        // counts, as they would be observable by other threads. For example, we should not call
152        // `metrics.reset()` here, since otherwise the command history would appear empty for a
153        // brief amount of time.
154        let command_counts = &self.metrics.command_counts;
155
156        let count = u64::from(hello_command.is_some());
157        command_counts.hello.set(count);
158        if let Some(hello) = hello_command {
159            self.commands.push(hello);
160        }
161
162        let count = u64::from(!final_configuration.all_unset());
163        command_counts.update_configuration.set(count);
164        if !final_configuration.all_unset() {
165            let config = Box::new(final_configuration);
166            self.commands
167                .push(StorageCommand::UpdateConfiguration(config));
168        }
169
170        let count = u64::cast_from(run_ingestions.len());
171        command_counts.run_ingestion.set(count);
172        for ingestion in run_ingestions {
173            self.commands.push(StorageCommand::RunIngestion(ingestion));
174        }
175
176        let count = u64::cast_from(run_sinks.len());
177        command_counts.run_sink.set(count);
178        for sink in run_sinks {
179            self.commands.push(StorageCommand::RunSink(sink));
180        }
181
182        // Note: RunOneshotIngestion commands are reduced, as we receive
183        // CancelOneshotIngestion commands.
184        let count = u64::cast_from(final_oneshot_ingestions.len());
185        command_counts.run_oneshot_ingestion.set(count);
186        for ingestion in final_oneshot_ingestions.into_values() {
187            self.commands
188                .push(StorageCommand::RunOneshotIngestion(ingestion));
189        }
190
191        command_counts.cancel_oneshot_ingestion.set(0);
192
193        let count = u64::cast_from(allow_compaction.len());
194        command_counts.allow_compaction.set(count);
195        for (id, since) in allow_compaction {
196            self.commands
197                .push(StorageCommand::AllowCompaction(id, since));
198        }
199
200        let count = u64::from(initialization_complete);
201        command_counts.initialization_complete.set(count);
202        if initialization_complete {
203            self.commands.push(StorageCommand::InitializationComplete);
204        }
205
206        let count = u64::from(allow_writes);
207        command_counts.allow_writes.set(count);
208        if allow_writes {
209            self.commands.push(StorageCommand::AllowWrites);
210        }
211
212        self.reduced_count = self.commands.len();
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use std::str::FromStr;
219
220    use mz_cluster_client::metrics::ControllerMetrics;
221    use mz_ore::metrics::MetricsRegistry;
222    use mz_ore::url::SensitiveUrl;
223    use mz_persist_types::PersistLocation;
224    use mz_repr::{CatalogItemId, GlobalId, RelationDesc, SqlRelationType};
225    use mz_storage_client::client::{RunIngestionCommand, RunSinkCommand};
226    use mz_storage_client::metrics::StorageControllerMetrics;
227    use mz_storage_types::connections::inline::InlinedConnection;
228    use mz_storage_types::connections::{KafkaConnection, Tunnel};
229    use mz_storage_types::controller::CollectionMetadata;
230    use mz_storage_types::instances::StorageInstanceId;
231    use mz_storage_types::sinks::{
232        KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
233        KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection, StorageSinkDesc,
234    };
235    use mz_storage_types::sources::load_generator::{
236        LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
237    };
238    use mz_storage_types::sources::{
239        GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection, SourceDesc,
240        SourceEnvelope, SourceExport, SourceExportDataConfig, SourceExportDetails,
241    };
242    use timely::progress::Antichain;
243
244    use super::*;
245
246    fn history() -> CommandHistory<u64> {
247        let registry = MetricsRegistry::new();
248        let controller_metrics = ControllerMetrics::new(&registry);
249        let metrics = StorageControllerMetrics::new(&registry, controller_metrics)
250            .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID"))
251            .for_history();
252
253        CommandHistory::new(metrics)
254    }
255
256    fn ingestion_description<S: Into<Vec<u64>>>(
257        ingestion_id: u64,
258        subsource_ids: S,
259        remap_collection_id: u64,
260    ) -> IngestionDescription<CollectionMetadata, InlinedConnection> {
261        let export_ids = [ingestion_id, remap_collection_id]
262            .into_iter()
263            .chain(subsource_ids.into());
264        let source_exports = export_ids
265            .map(|id| {
266                let export = SourceExport {
267                    storage_metadata: CollectionMetadata {
268                        persist_location: PersistLocation {
269                            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
270                            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
271                        },
272                        data_shard: Default::default(),
273                        relation_desc: RelationDesc::new(
274                            SqlRelationType {
275                                column_types: Default::default(),
276                                keys: Default::default(),
277                            },
278                            Vec::<String>::new(),
279                        ),
280                        txns_shard: Default::default(),
281                    },
282                    details: SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
283                        output: LoadGeneratorOutput::Default,
284                    }),
285                    data_config: SourceExportDataConfig {
286                        encoding: Default::default(),
287                        envelope: SourceEnvelope::CdcV2,
288                    },
289                };
290                (GlobalId::User(id), export)
291            })
292            .collect();
293
294        let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
295            load_generator: LoadGenerator::Auction,
296            tick_micros: Default::default(),
297            as_of: Default::default(),
298            up_to: Default::default(),
299        });
300
301        IngestionDescription {
302            desc: SourceDesc {
303                connection,
304                timestamp_interval: Default::default(),
305            },
306            remap_metadata: CollectionMetadata {
307                persist_location: PersistLocation {
308                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
309                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
310                },
311                data_shard: Default::default(),
312                relation_desc: RelationDesc::new(
313                    SqlRelationType {
314                        column_types: Default::default(),
315                        keys: Default::default(),
316                    },
317                    Vec::<String>::new(),
318                ),
319                txns_shard: Default::default(),
320            },
321            source_exports,
322            instance_id: StorageInstanceId::system(0).expect("0 is a valid ID"),
323            remap_collection_id: GlobalId::User(remap_collection_id),
324        }
325    }
326
327    fn sink_description() -> StorageSinkDesc<CollectionMetadata, u64> {
328        StorageSinkDesc {
329            from: GlobalId::System(1),
330            from_desc: RelationDesc::new(
331                SqlRelationType {
332                    column_types: Default::default(),
333                    keys: Default::default(),
334                },
335                Vec::<String>::new(),
336            ),
337            connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
338                connection_id: CatalogItemId::System(2),
339                connection: KafkaConnection {
340                    brokers: Default::default(),
341                    default_tunnel: Tunnel::Direct,
342                    progress_topic: Default::default(),
343                    progress_topic_options: Default::default(),
344                    options: Default::default(),
345                    tls: Default::default(),
346                    sasl: Default::default(),
347                },
348                format: KafkaSinkFormat {
349                    key_format: Default::default(),
350                    value_format: KafkaSinkFormatType::Text,
351                },
352                relation_key_indices: Default::default(),
353                key_desc_and_indices: Default::default(),
354                headers_index: Default::default(),
355                value_desc: RelationDesc::new(
356                    SqlRelationType {
357                        column_types: Default::default(),
358                        keys: Default::default(),
359                    },
360                    Vec::<String>::new(),
361                ),
362                partition_by: Default::default(),
363                topic: Default::default(),
364                topic_options: Default::default(),
365                compression_type: KafkaSinkCompressionType::None,
366                progress_group_id: KafkaIdStyle::Legacy,
367                transactional_id: KafkaIdStyle::Legacy,
368                topic_metadata_refresh_interval: Default::default(),
369            }),
370            with_snapshot: Default::default(),
371            version: Default::default(),
372            envelope: SinkEnvelope::Upsert,
373            as_of: Antichain::from_elem(0),
374            from_storage_metadata: CollectionMetadata {
375                persist_location: PersistLocation {
376                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
377                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
378                },
379                data_shard: Default::default(),
380                relation_desc: RelationDesc::new(
381                    SqlRelationType {
382                        column_types: Default::default(),
383                        keys: Default::default(),
384                    },
385                    Vec::<String>::new(),
386                ),
387                txns_shard: Default::default(),
388            },
389            to_storage_metadata: CollectionMetadata {
390                persist_location: PersistLocation {
391                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
392                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
393                },
394                data_shard: Default::default(),
395                relation_desc: RelationDesc::new(
396                    SqlRelationType {
397                        column_types: Default::default(),
398                        keys: Default::default(),
399                    },
400                    Vec::<String>::new(),
401                ),
402                txns_shard: Default::default(),
403            },
404        }
405    }
406
407    #[mz_ore::test]
408    fn reduce_drops_dropped_ingestion() {
409        let mut history = history();
410
411        let commands = [
412            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
413                id: GlobalId::User(1),
414                description: ingestion_description(1, [2], 3),
415            })),
416            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
417            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
418            StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::new()),
419        ];
420
421        for cmd in commands {
422            history.push(cmd);
423        }
424
425        history.reduce();
426
427        let commands_after: Vec<_> = history.iter().collect();
428        assert!(commands_after.is_empty(), "{:?}", commands_after);
429    }
430
431    #[mz_ore::test]
432    fn reduce_keeps_compacted_ingestion() {
433        let mut history = history();
434
435        let commands = [
436            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
437                id: GlobalId::User(1),
438                description: ingestion_description(1, [2], 3),
439            })),
440            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(1)),
441            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
442            StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::from_elem(3)),
443        ];
444
445        for cmd in commands.clone() {
446            history.push(cmd);
447        }
448
449        history.reduce();
450
451        let commands_after: Vec<_> = history.iter().cloned().collect();
452        assert_eq!(commands_after, commands);
453    }
454
455    #[mz_ore::test]
456    fn reduce_keeps_partially_dropped_ingestion() {
457        let mut history = history();
458
459        let commands = [
460            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
461                id: GlobalId::User(1),
462                description: ingestion_description(1, [2], 3),
463            })),
464            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
465        ];
466
467        for cmd in commands.clone() {
468            history.push(cmd);
469        }
470
471        history.reduce();
472
473        let commands_after: Vec<_> = history.iter().cloned().collect();
474        assert_eq!(commands_after, commands);
475    }
476
477    #[mz_ore::test]
478    fn reduce_drops_dropped_sink() {
479        let mut history = history();
480
481        let commands = [
482            StorageCommand::RunSink(Box::new(RunSinkCommand {
483                id: GlobalId::User(1),
484                description: sink_description(),
485            })),
486            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
487        ];
488
489        for cmd in commands {
490            history.push(cmd);
491        }
492
493        history.reduce();
494
495        let commands_after: Vec<_> = history.iter().collect();
496        assert!(commands_after.is_empty(), "{:?}", commands_after);
497    }
498
499    #[mz_ore::test]
500    fn reduce_keeps_compacted_sink() {
501        let mut history = history();
502
503        let sink_desc = sink_description();
504        let commands = [
505            StorageCommand::RunSink(Box::new(RunSinkCommand {
506                id: GlobalId::User(1),
507                description: sink_desc.clone(),
508            })),
509            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(42)),
510        ];
511
512        for cmd in commands {
513            history.push(cmd);
514        }
515
516        history.reduce();
517
518        let commands_after: Vec<_> = history.iter().cloned().collect();
519
520        let expected_commands = [StorageCommand::RunSink(Box::new(RunSinkCommand {
521            id: GlobalId::User(1),
522            description: sink_desc,
523        }))];
524
525        assert_eq!(commands_after, expected_commands);
526    }
527
528    #[mz_ore::test]
529    fn reduce_drops_stray_compactions() {
530        let mut history = history();
531
532        let commands = [
533            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
534            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(1)),
535            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
536        ];
537
538        for cmd in commands {
539            history.push(cmd);
540        }
541
542        history.reduce();
543
544        let commands_after: Vec<_> = history.iter().collect();
545        assert!(commands_after.is_empty(), "{:?}", commands_after);
546    }
547}