mz_storage_controller/
history.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A reducible history of storage commands.
11
12use std::collections::BTreeMap;
13
14use mz_dyncfg::ConfigValHandle;
15use mz_ore::cast::CastFrom;
16use mz_storage_client::client::StorageCommand;
17use mz_storage_client::metrics::HistoryMetrics;
18use mz_storage_types::parameters::StorageParameters;
19use timely::PartialOrder;
20use timely::order::TotalOrder;
21
22/// A history of storage commands.
23#[derive(Debug)]
24pub(crate) struct CommandHistory<T> {
25    /// The number of commands at the last time we compacted the history.
26    reduced_count: usize,
27    /// The sequence of commands that should be applied.
28    ///
29    /// This list may not be "compact" in that there can be commands that could be optimized or
30    /// removed given the context of other commands, for example compaction commands that can be
31    /// unified, or run commands that can be dropped due to allowed compaction.
32    commands: Vec<StorageCommand<T>>,
33    /// Tracked metrics.
34    metrics: HistoryMetrics,
35    /// Config: whether to use the snapshot-frontier optimization for sinks.
36    enable_snapshot_frontier: ConfigValHandle<bool>,
37}
38
39impl<T: timely::progress::Timestamp + TotalOrder> CommandHistory<T> {
40    /// Constructs a new command history.
41    pub fn new(metrics: HistoryMetrics, enable_snapshot_frontier: ConfigValHandle<bool>) -> Self {
42        metrics.reset();
43
44        Self {
45            reduced_count: 0,
46            commands: Vec::new(),
47            metrics,
48            enable_snapshot_frontier,
49        }
50    }
51
52    /// Returns an iterator over the contained storage commands.
53    pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand<T>> {
54        self.commands.iter()
55    }
56
57    /// Adds a command to the history.
58    ///
59    /// This action will reduce the history every time it doubles.
60    pub fn push(&mut self, command: StorageCommand<T>) {
61        use StorageCommand::*;
62
63        self.commands.push(command);
64
65        if self.commands.len() > 2 * self.reduced_count {
66            self.reduce();
67        } else {
68            // Refresh reported metrics. `reduce` already refreshes metrics, so we only need to do
69            // that here in the non-reduce case.
70            let command = self.commands.last().expect("pushed above");
71            let metrics = &self.metrics;
72            match command {
73                CreateTimely { .. } => metrics.create_timely_count.inc(),
74                InitializationComplete => metrics.initialization_complete_count.inc(),
75                AllowWrites => metrics.allow_writes_count.inc(),
76                UpdateConfiguration(_) => metrics.update_configuration_count.inc(),
77                RunIngestion(_) => metrics.run_ingestions_count.inc(),
78                RunSink(_) => metrics.run_sinks_count.inc(),
79                AllowCompaction(_, _) => metrics.allow_compaction_count.inc(),
80                RunOneshotIngestion(_) | CancelOneshotIngestion { .. } => {
81                    // TODO(cf2): Add metrics for oneshot ingestions.
82                }
83            }
84        }
85    }
86
87    /// Reduces the command history to a minimal form.
88    pub fn reduce(&mut self) {
89        use StorageCommand::*;
90
91        let mut create_timely_command = None;
92        let mut initialization_complete = false;
93        let mut allow_writes = false;
94        let mut final_compactions = BTreeMap::new();
95
96        // Collect the final definitions of ingestions and sinks.
97        // The same object ID can occur in multiple run commands when an object was altered. In
98        // this scenario, we only want to send the most recent definition of the object.
99        let mut final_ingestions = BTreeMap::new();
100        let mut final_sinks = BTreeMap::new();
101        let mut final_oneshot_ingestions = BTreeMap::new();
102
103        // Collect only the final configuration.
104        // Note that this means the final configuration is applied to all objects installed on the
105        // new replica during initialization, even when the same objects where installed with an
106        // older config on existing replicas. This is only correct as long as config parameters
107        // don't affect the output of storage objects, as that would make different replicas write
108        // different data, which is likely to produce inconsistencies.
109        let mut final_configuration = StorageParameters::default();
110
111        for command in self.commands.drain(..) {
112            match command {
113                cmd @ CreateTimely { .. } => create_timely_command = Some(cmd),
114                InitializationComplete => initialization_complete = true,
115                AllowWrites => allow_writes = true,
116                UpdateConfiguration(params) => final_configuration.update(*params),
117                RunIngestion(ingestion) => {
118                    final_ingestions.insert(ingestion.id, ingestion);
119                }
120                RunSink(sink) => {
121                    final_sinks.insert(sink.id, sink);
122                }
123                AllowCompaction(id, since) => {
124                    final_compactions.insert(id, since);
125                }
126                RunOneshotIngestion(oneshot) => {
127                    final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
128                }
129                CancelOneshotIngestion(uuid) => {
130                    final_oneshot_ingestions.remove(&uuid);
131                }
132            }
133        }
134
135        let mut run_ingestions = Vec::new();
136        let mut run_sinks = Vec::new();
137        let mut allow_compaction = Vec::new();
138
139        // Discard ingestions that have been dropped, keep the rest.
140        for ingestion in final_ingestions.into_values() {
141            if let Some(frontier) = final_compactions.get(&ingestion.id) {
142                if frontier.is_empty() {
143                    continue;
144                }
145            }
146
147            let compactions = ingestion
148                .description
149                .collection_ids()
150                .filter_map(|id| final_compactions.remove(&id).map(|f| (id, f)));
151            allow_compaction.extend(compactions);
152
153            run_ingestions.push(ingestion);
154        }
155
156        // Discard sinks that have been dropped, advance the as-of of the rest.
157        for mut sink in final_sinks.into_values() {
158            if let Some(frontier) = final_compactions.remove(&sink.id) {
159                if frontier.is_empty() {
160                    continue;
161                }
162                // The as-of is at least the implied capability of the sink collection. If the as-of
163                // advances for an existing export, that can only be because the implied capability
164                // has advanced, which means that the write frontier has advanced, which means the
165                // snapshot has definitely been written out.
166                if PartialOrder::less_than(&sink.description.as_of, &frontier) {
167                    sink.description.as_of = frontier;
168                    if self.enable_snapshot_frontier.get() {
169                        sink.description.with_snapshot = false;
170                    }
171                }
172            }
173
174            run_sinks.push(sink);
175        }
176
177        // Reconstitute the commands as a compact history.
178        //
179        // When we update `metrics`, we need to be careful to not transiently report incorrect
180        // counts, as they would be observable by other threads. For example, we should not call
181        // `metrics.reset()` here, since otherwise the command history would appear empty for a
182        // brief amount of time.
183
184        let count = u64::from(create_timely_command.is_some());
185        self.metrics.create_timely_count.set(count);
186        if let Some(create_timely_command) = create_timely_command {
187            self.commands.push(create_timely_command);
188        }
189
190        let count = u64::from(!final_configuration.all_unset());
191        self.metrics.update_configuration_count.set(count);
192        if !final_configuration.all_unset() {
193            let config = Box::new(final_configuration);
194            self.commands
195                .push(StorageCommand::UpdateConfiguration(config));
196        }
197
198        let count = u64::cast_from(run_ingestions.len());
199        self.metrics.run_ingestions_count.set(count);
200        for ingestion in run_ingestions {
201            self.commands.push(StorageCommand::RunIngestion(ingestion));
202        }
203
204        let count = u64::cast_from(run_sinks.len());
205        self.metrics.run_ingestions_count.set(count);
206        for sink in run_sinks {
207            self.commands.push(StorageCommand::RunSink(sink));
208        }
209
210        // Note: RunOneshotIngestion commands are reduced, as we receive
211        // CancelOneshotIngestion commands.
212        //
213        // TODO(cf2): Record metrics on the number of OneshotIngestion commands.
214        for ingestion in final_oneshot_ingestions.into_values() {
215            self.commands
216                .push(StorageCommand::RunOneshotIngestion(ingestion));
217        }
218
219        let count = u64::cast_from(allow_compaction.len());
220        self.metrics.allow_compaction_count.set(count);
221        for (id, since) in allow_compaction {
222            self.commands
223                .push(StorageCommand::AllowCompaction(id, since));
224        }
225
226        let count = u64::from(initialization_complete);
227        self.metrics.initialization_complete_count.set(count);
228        if initialization_complete {
229            self.commands.push(StorageCommand::InitializationComplete);
230        }
231
232        let count = u64::from(allow_writes);
233        self.metrics.allow_writes_count.set(count);
234        if allow_writes {
235            self.commands.push(StorageCommand::AllowWrites);
236        }
237
238        self.reduced_count = self.commands.len();
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use std::str::FromStr;
245
246    use mz_cluster_client::metrics::ControllerMetrics;
247    use mz_ore::metrics::MetricsRegistry;
248    use mz_ore::url::SensitiveUrl;
249    use mz_persist_types::PersistLocation;
250    use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationType};
251    use mz_storage_client::client::{RunIngestionCommand, RunSinkCommand};
252    use mz_storage_client::metrics::StorageControllerMetrics;
253    use mz_storage_types::connections::inline::InlinedConnection;
254    use mz_storage_types::connections::{KafkaConnection, Tunnel};
255    use mz_storage_types::controller::CollectionMetadata;
256    use mz_storage_types::instances::StorageInstanceId;
257    use mz_storage_types::sinks::{
258        KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
259        KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection, StorageSinkDesc,
260    };
261    use mz_storage_types::sources::load_generator::{
262        LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
263    };
264    use mz_storage_types::sources::{
265        GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection,
266        SourceConnection, SourceDesc, SourceEnvelope, SourceExport, SourceExportDataConfig,
267        SourceExportDetails,
268    };
269    use timely::progress::Antichain;
270
271    use super::*;
272
273    fn history() -> CommandHistory<u64> {
274        let registry = MetricsRegistry::new();
275        let controller_metrics = ControllerMetrics::new(&registry);
276        let metrics = StorageControllerMetrics::new(&registry, controller_metrics)
277            .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID"))
278            .for_history();
279
280        CommandHistory::new(metrics, ConfigValHandle::disconnected(true))
281    }
282
283    fn ingestion_description<S: Into<Vec<u64>>>(
284        ingestion_id: u64,
285        subsource_ids: S,
286        remap_collection_id: u64,
287    ) -> IngestionDescription<CollectionMetadata, InlinedConnection> {
288        let export_ids = [ingestion_id, remap_collection_id]
289            .into_iter()
290            .chain(subsource_ids.into());
291        let source_exports = export_ids
292            .map(|id| {
293                let export = SourceExport {
294                    storage_metadata: CollectionMetadata {
295                        persist_location: PersistLocation {
296                            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
297                            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
298                        },
299                        remap_shard: Default::default(),
300                        data_shard: Default::default(),
301                        relation_desc: RelationDesc::new(
302                            RelationType {
303                                column_types: Default::default(),
304                                keys: Default::default(),
305                            },
306                            Vec::<String>::new(),
307                        ),
308                        txns_shard: Default::default(),
309                    },
310                    details: SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
311                        output: LoadGeneratorOutput::Default,
312                    }),
313                    data_config: SourceExportDataConfig {
314                        encoding: Default::default(),
315                        envelope: SourceEnvelope::CdcV2,
316                    },
317                };
318                (GlobalId::User(id), export)
319            })
320            .collect();
321
322        let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
323            load_generator: LoadGenerator::Auction,
324            tick_micros: Default::default(),
325            as_of: Default::default(),
326            up_to: Default::default(),
327        });
328        let primary_export_details = connection.primary_export_details();
329
330        IngestionDescription {
331            desc: SourceDesc {
332                connection,
333                primary_export: SourceExportDataConfig {
334                    encoding: Default::default(),
335                    envelope: SourceEnvelope::CdcV2,
336                },
337                primary_export_details,
338                timestamp_interval: Default::default(),
339            },
340            ingestion_metadata: CollectionMetadata {
341                persist_location: PersistLocation {
342                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
343                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
344                },
345                remap_shard: Default::default(),
346                data_shard: Default::default(),
347                relation_desc: RelationDesc::new(
348                    RelationType {
349                        column_types: Default::default(),
350                        keys: Default::default(),
351                    },
352                    Vec::<String>::new(),
353                ),
354                txns_shard: Default::default(),
355            },
356            source_exports,
357            instance_id: StorageInstanceId::system(0).expect("0 is a valid ID"),
358            remap_collection_id: GlobalId::User(remap_collection_id),
359        }
360    }
361
362    fn sink_description() -> StorageSinkDesc<CollectionMetadata, u64> {
363        StorageSinkDesc {
364            from: GlobalId::System(1),
365            from_desc: RelationDesc::new(
366                RelationType {
367                    column_types: Default::default(),
368                    keys: Default::default(),
369                },
370                Vec::<String>::new(),
371            ),
372            connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
373                connection_id: CatalogItemId::System(2),
374                connection: KafkaConnection {
375                    brokers: Default::default(),
376                    default_tunnel: Tunnel::Direct,
377                    progress_topic: Default::default(),
378                    progress_topic_options: Default::default(),
379                    options: Default::default(),
380                    tls: Default::default(),
381                    sasl: Default::default(),
382                },
383                format: KafkaSinkFormat {
384                    key_format: Default::default(),
385                    value_format: KafkaSinkFormatType::Text,
386                },
387                relation_key_indices: Default::default(),
388                key_desc_and_indices: Default::default(),
389                headers_index: Default::default(),
390                value_desc: RelationDesc::new(
391                    RelationType {
392                        column_types: Default::default(),
393                        keys: Default::default(),
394                    },
395                    Vec::<String>::new(),
396                ),
397                partition_by: Default::default(),
398                topic: Default::default(),
399                topic_options: Default::default(),
400                compression_type: KafkaSinkCompressionType::None,
401                progress_group_id: KafkaIdStyle::Legacy,
402                transactional_id: KafkaIdStyle::Legacy,
403                topic_metadata_refresh_interval: Default::default(),
404            }),
405            with_snapshot: Default::default(),
406            version: Default::default(),
407            envelope: SinkEnvelope::Upsert,
408            as_of: Antichain::from_elem(0),
409            from_storage_metadata: CollectionMetadata {
410                persist_location: PersistLocation {
411                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
412                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
413                },
414                remap_shard: Default::default(),
415                data_shard: Default::default(),
416                relation_desc: RelationDesc::new(
417                    RelationType {
418                        column_types: Default::default(),
419                        keys: Default::default(),
420                    },
421                    Vec::<String>::new(),
422                ),
423                txns_shard: Default::default(),
424            },
425            to_storage_metadata: CollectionMetadata {
426                persist_location: PersistLocation {
427                    blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
428                    consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
429                },
430                remap_shard: Default::default(),
431                data_shard: Default::default(),
432                relation_desc: RelationDesc::new(
433                    RelationType {
434                        column_types: Default::default(),
435                        keys: Default::default(),
436                    },
437                    Vec::<String>::new(),
438                ),
439                txns_shard: Default::default(),
440            },
441        }
442    }
443
444    #[mz_ore::test]
445    fn reduce_drops_dropped_ingestion() {
446        let mut history = history();
447
448        let commands = [
449            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
450                id: GlobalId::User(1),
451                description: ingestion_description(1, [2], 3),
452            })),
453            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
454            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
455            StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::new()),
456        ];
457
458        for cmd in commands {
459            history.push(cmd);
460        }
461
462        history.reduce();
463
464        let commands_after: Vec<_> = history.iter().collect();
465        assert!(commands_after.is_empty(), "{:?}", commands_after);
466    }
467
468    #[mz_ore::test]
469    fn reduce_keeps_compacted_ingestion() {
470        let mut history = history();
471
472        let commands = [
473            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
474                id: GlobalId::User(1),
475                description: ingestion_description(1, [2], 3),
476            })),
477            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(1)),
478            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
479            StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::from_elem(3)),
480        ];
481
482        for cmd in commands.clone() {
483            history.push(cmd);
484        }
485
486        history.reduce();
487
488        let commands_after: Vec<_> = history.iter().cloned().collect();
489        assert_eq!(commands_after, commands);
490    }
491
492    #[mz_ore::test]
493    fn reduce_keeps_partially_dropped_ingestion() {
494        let mut history = history();
495
496        let commands = [
497            StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
498                id: GlobalId::User(1),
499                description: ingestion_description(1, [2], 3),
500            })),
501            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
502        ];
503
504        for cmd in commands.clone() {
505            history.push(cmd);
506        }
507
508        history.reduce();
509
510        let commands_after: Vec<_> = history.iter().cloned().collect();
511        assert_eq!(commands_after, commands);
512    }
513
514    #[mz_ore::test]
515    fn reduce_drops_dropped_sink() {
516        let mut history = history();
517
518        let commands = [
519            StorageCommand::RunSink(Box::new(RunSinkCommand {
520                id: GlobalId::User(1),
521                description: sink_description(),
522            })),
523            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
524        ];
525
526        for cmd in commands {
527            history.push(cmd);
528        }
529
530        history.reduce();
531
532        let commands_after: Vec<_> = history.iter().collect();
533        assert!(commands_after.is_empty(), "{:?}", commands_after);
534    }
535
536    #[mz_ore::test]
537    fn reduce_keeps_compacted_sink() {
538        let mut history = history();
539
540        let sink_desc = sink_description();
541        let commands = [
542            StorageCommand::RunSink(Box::new(RunSinkCommand {
543                id: GlobalId::User(1),
544                description: sink_desc.clone(),
545            })),
546            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(42)),
547        ];
548
549        for cmd in commands {
550            history.push(cmd);
551        }
552
553        history.reduce();
554
555        let commands_after: Vec<_> = history.iter().cloned().collect();
556
557        let expected_sink_desc = StorageSinkDesc {
558            as_of: Antichain::from_elem(42),
559            ..sink_desc
560        };
561        let expected_commands = [StorageCommand::RunSink(Box::new(RunSinkCommand {
562            id: GlobalId::User(1),
563            description: expected_sink_desc,
564        }))];
565
566        assert_eq!(commands_after, expected_commands);
567    }
568
569    #[mz_ore::test]
570    fn reduce_drops_stray_compactions() {
571        let mut history = history();
572
573        let commands = [
574            StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
575            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(1)),
576            StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
577        ];
578
579        for cmd in commands {
580            history.push(cmd);
581        }
582
583        history.reduce();
584
585        let commands_after: Vec<_> = history.iter().collect();
586        assert!(commands_after.is_empty(), "{:?}", commands_after);
587    }
588}