mz_catalog_debug/
main.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Debug utility for Catalog storage.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::fs::File;
15use std::io::{self, Write};
16use std::path::PathBuf;
17use std::process;
18use std::sync::Arc;
19use std::sync::LazyLock;
20use std::time::Instant;
21
22use anyhow::Context;
23use clap::Parser;
24use futures::future::FutureExt;
25use mz_adapter::catalog::{Catalog, InitializeStateResult};
26use mz_adapter_types::bootstrap_builtin_cluster_config::{
27    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
28    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
29    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
30};
31use mz_build_info::{BuildInfo, build_info};
32use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, StateConfig};
33use mz_catalog::durable::debug::{
34    AuditLogCollection, ClusterCollection, ClusterIntrospectionSourceIndexCollection,
35    ClusterReplicaCollection, Collection, CollectionTrace, CollectionType, CommentCollection,
36    ConfigCollection, DatabaseCollection, DebugCatalogState, DefaultPrivilegeCollection,
37    IdAllocatorCollection, ItemCollection, NetworkPolicyCollection, RoleAuthCollection,
38    RoleCollection, SchemaCollection, SettingCollection, SourceReferencesCollection,
39    StorageCollectionMetadataCollection, SystemConfigurationCollection,
40    SystemItemMappingCollection, SystemPrivilegeCollection, Trace, TxnWalShardCollection,
41    UnfinalizedShardsCollection,
42};
43use mz_catalog::durable::{
44    BootstrapArgs, OpenableDurableCatalogState, persist_backed_catalog_state,
45};
46use mz_catalog::memory::objects::CatalogItem;
47use mz_cloud_resources::AwsExternalIdPrefix;
48use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
49use mz_ore::cli::{self, CliConfig};
50use mz_ore::collections::HashSet;
51use mz_ore::error::ErrorExt;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::SYSTEM_TIME;
54use mz_ore::url::SensitiveUrl;
55use mz_persist_client::cache::PersistClientCache;
56use mz_persist_client::cfg::PersistConfig;
57use mz_persist_client::rpc::PubSubClientConnection;
58use mz_persist_client::{Diagnostics, PersistClient, PersistLocation};
59use mz_repr::{Diff, Timestamp};
60use mz_service::secrets::SecretsReaderCliArgs;
61use mz_sql::catalog::EnvironmentId;
62use mz_storage_types::StorageDiff;
63use mz_storage_types::connections::ConnectionContext;
64use mz_storage_types::controller::StorageError;
65use mz_storage_types::sources::SourceData;
66use serde::{Deserialize, Serialize};
67use tracing::{Instrument, error};
68
69pub const BUILD_INFO: BuildInfo = build_info!();
70pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
71
72#[derive(Parser, Debug)]
73#[clap(name = "catalog", next_line_help = true, version = VERSION.as_str())]
74pub struct Args {
75    // === Persist options. ===
76    /// An opaque identifier for the environment in which this process is
77    /// running.
78    #[clap(long, env = "ENVIRONMENT_ID")]
79    environment_id: EnvironmentId,
80    /// Where the persist library should store its blob data.
81    #[clap(long, env = "PERSIST_BLOB_URL")]
82    persist_blob_url: SensitiveUrl,
83    /// Where the persist library should perform consensus.
84    #[clap(long, env = "PERSIST_CONSENSUS_URL")]
85    persist_consensus_url: SensitiveUrl,
86    // === Cloud options. ===
87    /// An external ID to be supplied to all AWS AssumeRole operations.
88    ///
89    /// Details: <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html>
90    #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
91    aws_external_id_prefix: Option<AwsExternalIdPrefix>,
92
93    /// The ARN for a Materialize-controlled role to assume before assuming
94    /// a customer's requested role for an AWS connection.
95    #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
96    aws_connection_role_arn: Option<String>,
97    // === Tracing options. ===
98    #[clap(flatten)]
99    tracing: TracingCliArgs,
100    // === Other options. ===
101    #[clap(long)]
102    deploy_generation: Option<u64>,
103
104    #[clap(subcommand)]
105    action: Action,
106}
107
108#[derive(Debug, Clone, clap::Subcommand)]
109enum Action {
110    /// Dumps the catalog contents to stdout in a human-readable format.
111    /// Includes JSON for each key and value that can be hand edited and
112    /// then passed to the `edit` or `delete` commands. Also includes statistics
113    /// for each collection.
114    Dump {
115        /// Ignores the `audit_log` and `storage_usage` usage collections, which are often
116        /// extremely large and not that useful for debugging.
117        #[clap(long)]
118        ignore_large_collections: bool,
119        /// A list of collections to ignore.
120        #[clap(long, short = 'i', action = clap::ArgAction::Append)]
121        ignore: Vec<CollectionType>,
122        /// Only dumps the statistics of each collection and not the contents.
123        #[clap(long)]
124        stats_only: bool,
125        /// Consolidates the catalog contents.
126        #[clap(long, short = 'c')]
127        consolidate: bool,
128        /// Write output to specified path. Default stdout.
129        target: Option<PathBuf>,
130    },
131    /// Prints the current epoch.
132    Epoch {
133        /// Write output to specified path. Default stdout.
134        target: Option<PathBuf>,
135    },
136    /// Edits a single item in a collection in the catalog.
137    Edit {
138        /// The name of the catalog collection to edit.
139        collection: String,
140        /// The JSON-encoded key that identifies the item to edit.
141        key: serde_json::Value,
142        /// The new JSON-encoded value for the item.
143        value: serde_json::Value,
144    },
145    /// Deletes a single item in a collection in the catalog
146    Delete {
147        /// The name of the catalog collection to edit.
148        collection: String,
149        /// The JSON-encoded key that identifies the item to delete.
150        key: serde_json::Value,
151    },
152    /// Checks if the specified catalog could be upgraded from its state to the
153    /// adapter catalog at the version of this binary. Prints a success message
154    /// or error message. Exits with 0 if the upgrade would succeed, otherwise
155    /// non-zero. Can be used on a running environmentd. Operates without
156    /// interfering with it or committing any data to that catalog.
157    UpgradeCheck {
158        #[clap(flatten)]
159        secrets: SecretsReaderCliArgs,
160        /// Map of cluster name to resource specification. Check the README for latest values.
161        cluster_replica_sizes: String,
162    },
163}
164
165#[tokio::main]
166async fn main() {
167    let args: Args = cli::parse_args(CliConfig {
168        env_prefix: Some("MZ_CATALOG_DEBUG_"),
169        enable_version_flag: true,
170    });
171
172    let (_, _tracing_guard) = args
173        .tracing
174        .configure_tracing(
175            StaticTracingConfig {
176                service_name: "catalog-debug",
177                build_info: BUILD_INFO,
178            },
179            MetricsRegistry::new(),
180        )
181        .await
182        .expect("failed to init tracing");
183
184    if let Err(err) = run(args).await {
185        eprintln!(
186            "catalog-debug: fatal: {}\nbacktrace: {}",
187            err.display_with_causes(),
188            err.backtrace()
189        );
190        process::exit(1);
191    }
192}
193
194async fn run(args: Args) -> Result<(), anyhow::Error> {
195    let metrics_registry = MetricsRegistry::new();
196    let start = Instant::now();
197    // It's important that the version in this `BUILD_INFO` is kept in sync with the build
198    // info used to write data to the persist catalog.
199    let persist_config = PersistConfig::new_default_configs(&BUILD_INFO, SYSTEM_TIME.clone());
200    let persist_clients = PersistClientCache::new(persist_config, &metrics_registry, |_, _| {
201        PubSubClientConnection::noop()
202    });
203    let persist_location = PersistLocation {
204        blob_uri: args.persist_blob_url.clone(),
205        consensus_uri: args.persist_consensus_url.clone(),
206    };
207    let persist_client = persist_clients.open(persist_location).await?;
208    let organization_id = args.environment_id.organization_id();
209    let metrics = Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry));
210    let openable_state = persist_backed_catalog_state(
211        persist_client.clone(),
212        organization_id,
213        BUILD_INFO.semver_version(),
214        args.deploy_generation,
215        metrics,
216    )
217    .await?;
218
219    match args.action.clone() {
220        Action::Dump {
221            ignore_large_collections,
222            ignore,
223            stats_only,
224            consolidate,
225            target,
226        } => {
227            let ignore: HashSet<_> = ignore.into_iter().collect();
228            let target: Box<dyn Write> = if let Some(path) = target {
229                Box::new(File::create(path)?)
230            } else {
231                Box::new(io::stdout().lock())
232            };
233            dump(
234                openable_state,
235                ignore_large_collections,
236                ignore,
237                stats_only,
238                consolidate,
239                target,
240            )
241            .await
242        }
243        Action::Epoch { target } => {
244            let target: Box<dyn Write> = if let Some(path) = target {
245                Box::new(File::create(path)?)
246            } else {
247                Box::new(io::stdout().lock())
248            };
249            epoch(openable_state, target).await
250        }
251        Action::Edit {
252            collection,
253            key,
254            value,
255        } => edit(openable_state, collection, key, value).await,
256        Action::Delete { collection, key } => delete(openable_state, collection, key).await,
257        Action::UpgradeCheck {
258            secrets,
259            cluster_replica_sizes,
260        } => {
261            let cluster_replica_sizes =
262                ClusterReplicaSizeMap::parse_from_str(&cluster_replica_sizes, false)
263                    .context("parsing replica size map")?;
264            upgrade_check(
265                args,
266                openable_state,
267                persist_client,
268                secrets,
269                cluster_replica_sizes,
270                start,
271            )
272            .await
273        }
274    }
275}
276
277/// Macro to help call function `$fn` with the correct generic parameter that matches
278/// `$collection_type`.
279macro_rules! for_collection {
280    ($collection_type:expr, $fn:ident $(, $arg:expr)*) => {
281        match $collection_type {
282            CollectionType::AuditLog => $fn::<AuditLogCollection>($($arg),*).await?,
283            CollectionType::ComputeInstance => $fn::<ClusterCollection>($($arg),*).await?,
284            CollectionType::ComputeIntrospectionSourceIndex => $fn::<ClusterIntrospectionSourceIndexCollection>($($arg),*).await?,
285            CollectionType::ComputeReplicas => $fn::<ClusterReplicaCollection>($($arg),*).await?,
286            CollectionType::Comments => $fn::<CommentCollection>($($arg),*).await?,
287            CollectionType::Config => $fn::<ConfigCollection>($($arg),*).await?,
288            CollectionType::Database => $fn::<DatabaseCollection>($($arg),*).await?,
289            CollectionType::DefaultPrivileges => $fn::<DefaultPrivilegeCollection>($($arg),*).await?,
290            CollectionType::IdAlloc => $fn::<IdAllocatorCollection>($($arg),*).await?,
291            CollectionType::Item => $fn::<ItemCollection>($($arg),*).await?,
292            CollectionType::NetworkPolicy => $fn::<NetworkPolicyCollection>($($arg),*).await?,
293            CollectionType::Role => $fn::<RoleCollection>($($arg),*).await?,
294            CollectionType::RoleAuth => $fn::<RoleAuthCollection>($($arg),*).await?,
295            CollectionType::Schema => $fn::<SchemaCollection>($($arg),*).await?,
296            CollectionType::Setting => $fn::<SettingCollection>($($arg),*).await?,
297            CollectionType::SourceReferences => $fn::<SourceReferencesCollection>($($arg),*).await?,
298            CollectionType::SystemConfiguration => $fn::<SystemConfigurationCollection>($($arg),*).await?,
299            CollectionType::SystemGidMapping => $fn::<SystemItemMappingCollection>($($arg),*).await?,
300            CollectionType::SystemPrivileges => $fn::<SystemPrivilegeCollection>($($arg),*).await?,
301            CollectionType::StorageCollectionMetadata => $fn::<StorageCollectionMetadataCollection>($($arg),*).await?,
302            CollectionType::UnfinalizedShard => $fn::<UnfinalizedShardsCollection>($($arg),*).await?,
303            CollectionType::TxnWalShard => $fn::<TxnWalShardCollection>($($arg),*).await?,
304        }
305    };
306}
307
308async fn edit(
309    openable_state: Box<dyn OpenableDurableCatalogState>,
310    collection: String,
311    key: serde_json::Value,
312    value: serde_json::Value,
313) -> Result<(), anyhow::Error> {
314    async fn edit_col<T: Collection>(
315        mut debug_state: DebugCatalogState,
316        key: serde_json::Value,
317        value: serde_json::Value,
318    ) -> Result<serde_json::Value, anyhow::Error>
319    where
320        for<'a> T::Key: PartialEq + Eq + Debug + Clone + Deserialize<'a>,
321        for<'a> T::Value: Debug + Clone + Serialize + Deserialize<'a>,
322    {
323        let key: T::Key = serde_json::from_value(key)?;
324        let value: T::Value = serde_json::from_value(value)?;
325        let prev = debug_state.edit::<T>(key.clone(), value.clone()).await?;
326        Ok(serde_json::to_value(prev)?)
327    }
328
329    let collection_type: CollectionType = collection.parse()?;
330    let debug_state = openable_state.open_debug().await?;
331    let prev = for_collection!(collection_type, edit_col, debug_state, key, value);
332    println!("previous value: {prev:?}");
333    Ok(())
334}
335
336async fn delete(
337    openable_state: Box<dyn OpenableDurableCatalogState>,
338    collection: String,
339    key: serde_json::Value,
340) -> Result<(), anyhow::Error> {
341    async fn delete_col<T: Collection>(
342        mut debug_state: DebugCatalogState,
343        key: serde_json::Value,
344    ) -> Result<(), anyhow::Error>
345    where
346        for<'a> T::Key: PartialEq + Eq + Debug + Clone + Deserialize<'a>,
347        T::Value: Debug,
348    {
349        let key: T::Key = serde_json::from_value(key)?;
350        debug_state.delete::<T>(key.clone()).await?;
351        Ok(())
352    }
353
354    let collection_type: CollectionType = collection.parse()?;
355    let debug_state = openable_state.open_debug().await?;
356    for_collection!(collection_type, delete_col, debug_state, key);
357    Ok(())
358}
359
360async fn dump(
361    mut openable_state: Box<dyn OpenableDurableCatalogState>,
362    ignore_large_collections: bool,
363    ignore: HashSet<CollectionType>,
364    stats_only: bool,
365    consolidate: bool,
366    mut target: impl Write,
367) -> Result<(), anyhow::Error> {
368    fn dump_col<T: Collection>(
369        data: &mut BTreeMap<String, DumpedCollection>,
370        trace: CollectionTrace<T>,
371        ignore: &HashSet<CollectionType>,
372        stats_only: bool,
373        consolidate: bool,
374    ) where
375        T::Key: Serialize + Debug + 'static,
376        T::Value: Serialize + Debug + 'static,
377    {
378        if ignore.contains(&T::collection_type()) {
379            return;
380        }
381
382        let entries: Vec<_> = trace
383            .values
384            .into_iter()
385            .map(|((k, v), timestamp, diff)| {
386                let key_json = serde_json::to_string(&k).expect("must serialize");
387                let value_json = serde_json::to_string(&v).expect("must serialize");
388                DumpedEntry {
389                    key: Box::new(k),
390                    value: Box::new(v),
391                    key_json: UnescapedDebug(key_json),
392                    value_json: UnescapedDebug(value_json),
393                    timestamp,
394                    diff,
395                }
396            })
397            .collect();
398
399        let total_count = entries.len();
400        let addition_count = entries
401            .iter()
402            .filter(|entry| entry.diff == Diff::ONE)
403            .count();
404        let retraction_count = entries
405            .iter()
406            .filter(|entry| entry.diff == Diff::MINUS_ONE)
407            .count();
408        let entries = if stats_only { None } else { Some(entries) };
409        let dumped_col = DumpedCollection {
410            total_count,
411            addition_count,
412            retraction_count,
413            entries,
414        };
415        let name = T::name();
416
417        if consolidate && retraction_count != 0 {
418            error!(
419                "{name} catalog collection has corrupt entries, there should be no retractions in a consolidated catalog, but there are {retraction_count} retractions"
420            );
421        }
422
423        data.insert(name, dumped_col);
424    }
425
426    let mut data = BTreeMap::new();
427    let Trace {
428        audit_log,
429        clusters,
430        introspection_sources,
431        cluster_replicas,
432        comments,
433        configs,
434        databases,
435        default_privileges,
436        id_allocator,
437        items,
438        network_policies,
439        roles,
440        role_auth,
441        schemas,
442        settings,
443        source_references,
444        system_object_mappings,
445        system_configurations,
446        system_privileges,
447        storage_collection_metadata,
448        unfinalized_shards,
449        txn_wal_shard,
450    } = if consolidate {
451        openable_state.trace_consolidated().await?
452    } else {
453        openable_state.trace_unconsolidated().await?
454    };
455
456    if !ignore_large_collections {
457        dump_col(&mut data, audit_log, &ignore, stats_only, consolidate);
458    }
459    dump_col(&mut data, clusters, &ignore, stats_only, consolidate);
460    dump_col(
461        &mut data,
462        introspection_sources,
463        &ignore,
464        stats_only,
465        consolidate,
466    );
467    dump_col(
468        &mut data,
469        cluster_replicas,
470        &ignore,
471        stats_only,
472        consolidate,
473    );
474    dump_col(&mut data, comments, &ignore, stats_only, consolidate);
475    dump_col(&mut data, configs, &ignore, stats_only, consolidate);
476    dump_col(&mut data, databases, &ignore, stats_only, consolidate);
477    dump_col(
478        &mut data,
479        default_privileges,
480        &ignore,
481        stats_only,
482        consolidate,
483    );
484    dump_col(&mut data, id_allocator, &ignore, stats_only, consolidate);
485    dump_col(&mut data, items, &ignore, stats_only, consolidate);
486    dump_col(
487        &mut data,
488        network_policies,
489        &ignore,
490        stats_only,
491        consolidate,
492    );
493    dump_col(&mut data, roles, &ignore, stats_only, consolidate);
494    dump_col(&mut data, role_auth, &ignore, stats_only, consolidate);
495    dump_col(&mut data, schemas, &ignore, stats_only, consolidate);
496    dump_col(&mut data, settings, &ignore, stats_only, consolidate);
497    dump_col(
498        &mut data,
499        source_references,
500        &ignore,
501        stats_only,
502        consolidate,
503    );
504    dump_col(
505        &mut data,
506        system_configurations,
507        &ignore,
508        stats_only,
509        consolidate,
510    );
511    dump_col(
512        &mut data,
513        system_object_mappings,
514        &ignore,
515        stats_only,
516        consolidate,
517    );
518    dump_col(
519        &mut data,
520        system_privileges,
521        &ignore,
522        stats_only,
523        consolidate,
524    );
525    dump_col(
526        &mut data,
527        storage_collection_metadata,
528        &ignore,
529        stats_only,
530        consolidate,
531    );
532    dump_col(
533        &mut data,
534        unfinalized_shards,
535        &ignore,
536        stats_only,
537        consolidate,
538    );
539    dump_col(&mut data, txn_wal_shard, &ignore, stats_only, consolidate);
540
541    writeln!(&mut target, "{data:#?}")?;
542    Ok(())
543}
544
545async fn epoch(
546    mut openable_state: Box<dyn OpenableDurableCatalogState>,
547    mut target: impl Write,
548) -> Result<(), anyhow::Error> {
549    let epoch = openable_state.epoch().await?;
550    writeln!(&mut target, "Epoch: {epoch:#?}")?;
551    Ok(())
552}
553
554async fn upgrade_check(
555    args: Args,
556    openable_state: Box<dyn OpenableDurableCatalogState>,
557    persist_client: PersistClient,
558    secrets: SecretsReaderCliArgs,
559    cluster_replica_sizes: ClusterReplicaSizeMap,
560    start: Instant,
561) -> Result<(), anyhow::Error> {
562    let secrets_reader = secrets.load().await.context("loading secrets reader")?;
563
564    let now = SYSTEM_TIME.clone();
565    let mut storage = openable_state
566        .open_savepoint(
567            now().into(),
568            &BootstrapArgs {
569                default_cluster_replica_size:
570                    "DEFAULT CLUSTER REPLICA SIZE IS ONLY USED FOR NEW ENVIRONMENTS".into(),
571                default_cluster_replication_factor: 1,
572                bootstrap_role: None,
573                cluster_replica_size_map: cluster_replica_sizes.clone(),
574            },
575        )
576        .await?
577        .0;
578
579    // If this upgrade has new builtin replicas, then we need to assign some size to it. It doesn't
580    // really matter what size since it's not persisted, so we pick a random valid one.
581    let builtin_clusters_replica_size = cluster_replica_sizes
582        .0
583        .first_key_value()
584        .expect("we must have at least a single valid replica size")
585        .0
586        .clone();
587
588    let boot_ts = now().into();
589    let read_only = true;
590    // BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
591    // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
592    // Because of that we purposefully move this Future onto the heap (i.e. Box it).
593    let InitializeStateResult {
594        state,
595        migrated_storage_collections_0dt: _,
596        new_builtin_collections: _,
597        builtin_table_updates: _,
598        last_seen_version,
599        expr_cache_handle: _,
600        cached_global_exprs: _,
601        uncached_local_exprs: _,
602    } = Catalog::initialize_state(
603        StateConfig {
604            unsafe_mode: true,
605            all_features: false,
606            build_info: &BUILD_INFO,
607            environment_id: args.environment_id.clone(),
608            read_only,
609            now,
610            boot_ts,
611            skip_migrations: false,
612            cluster_replica_sizes,
613            builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
614                size: builtin_clusters_replica_size.clone(),
615                replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
616            },
617            builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
618                size: builtin_clusters_replica_size.clone(),
619                replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
620            },
621            builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
622                size: builtin_clusters_replica_size.clone(),
623                replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
624            },
625            builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
626                size: builtin_clusters_replica_size.clone(),
627                replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
628            },
629            builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
630                size: builtin_clusters_replica_size.clone(),
631                replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
632            },
633            system_parameter_defaults: Default::default(),
634            remote_system_parameters: None,
635            availability_zones: vec![],
636            egress_addresses: vec![],
637            aws_principal_context: None,
638            aws_privatelink_availability_zones: None,
639            http_host_name: None,
640            connection_context: ConnectionContext::from_cli_args(
641                args.environment_id.to_string(),
642                &args.tracing.startup_log_filter,
643                args.aws_external_id_prefix,
644                args.aws_connection_role_arn,
645                secrets_reader,
646                None,
647            ),
648            builtin_item_migration_config: BuiltinItemMigrationConfig {
649                // We don't actually want to write anything down, so use an in-memory persist
650                // client.
651                persist_client: PersistClient::new_for_tests().await,
652                read_only,
653            },
654            persist_client: persist_client.clone(),
655            enable_expression_cache_override: None,
656            enable_0dt_deployment: true,
657            helm_chart_version: None,
658        },
659        &mut storage,
660    )
661    .instrument(tracing::info_span!("catalog::initialize_state"))
662    .boxed()
663    .await?;
664    let dur = start.elapsed();
665
666    let msg = format!(
667        "catalog upgrade from {} to {} would succeed in about {} ms",
668        last_seen_version,
669        &BUILD_INFO.human_version(None),
670        dur.as_millis(),
671    );
672    println!("{msg}");
673
674    // Check that we can evolve the schema for all Persist shards.
675    let storage_entries = state
676        .get_entries()
677        .filter_map(|(_item_id, entry)| match entry.item() {
678            // TODO(alter_table): Handle multiple versions of tables.
679            CatalogItem::Table(table) => Some((table.global_id_writes(), table.desc.latest())),
680            CatalogItem::Source(source) => Some((source.global_id(), source.desc.clone())),
681            CatalogItem::ContinualTask(ct) => Some((ct.global_id(), ct.desc.clone())),
682            CatalogItem::MaterializedView(mv) => Some((mv.global_id(), mv.desc.clone())),
683            CatalogItem::Log(_)
684            | CatalogItem::View(_)
685            | CatalogItem::Sink(_)
686            | CatalogItem::Index(_)
687            | CatalogItem::Type(_)
688            | CatalogItem::Func(_)
689            | CatalogItem::Secret(_)
690            | CatalogItem::Connection(_) => None,
691        });
692
693    let mut storage_errors = BTreeMap::default();
694    for (gid, item_desc) in storage_entries {
695        // If a new version adds a BuiltinTable or BuiltinSource, we won't have created the shard
696        // yet so there isn't anything to check.
697        let maybe_shard_id = state
698            .storage_metadata()
699            .get_collection_shard::<Timestamp>(gid);
700        let shard_id = match maybe_shard_id {
701            Ok(shard_id) => shard_id,
702            Err(StorageError::IdentifierMissing(_)) => {
703                println!("no shard_id found for {gid}, continuing...");
704                continue;
705            }
706            Err(err) => {
707                // Collect errors instead of bailing on the first one.
708                storage_errors.insert(gid, err.to_string());
709                continue;
710            }
711        };
712        println!("checking Persist schema info for {gid}: {shard_id}");
713
714        let diagnostics = Diagnostics {
715            shard_name: gid.to_string(),
716            handle_purpose: "catalog upgrade check".to_string(),
717        };
718        let persisted_schema = persist_client
719            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
720            .await
721            .expect("invalid persist usage");
722        // If in the new version a BuiltinTable or BuiltinSource is changed (e.g. a new
723        // column is added) then we'll potentially have a new shard, but no writes will
724        // have occurred so no schema will be registered.
725        let Some((_schema_id, persisted_relation_desc, _)) = persisted_schema else {
726            println!("no schema found for {gid} '{shard_id}', continuing...");
727            continue;
728        };
729
730        let persisted_data_type =
731            mz_persist_types::columnar::data_type::<SourceData>(&persisted_relation_desc)?;
732        let new_data_type = mz_persist_types::columnar::data_type::<SourceData>(&item_desc)?;
733
734        let migration =
735            mz_persist_types::schema::backward_compatible(&persisted_data_type, &new_data_type);
736        if migration.is_none() {
737            let msg = format!(
738                "invalid Persist schema migration!\nshard_id: {}\npersisted: {:?}\n{:?}\nnew: {:?}\n{:?}",
739                shard_id, persisted_relation_desc, persisted_data_type, item_desc, new_data_type,
740            );
741            storage_errors.insert(gid, msg);
742        }
743    }
744
745    if !storage_errors.is_empty() {
746        anyhow::bail!("validation of storage objects failed! errors: {storage_errors:?}")
747    } else {
748        Ok(())
749    }
750}
751
752struct DumpedCollection {
753    total_count: usize,
754    addition_count: usize,
755    retraction_count: usize,
756    entries: Option<Vec<DumpedEntry>>,
757}
758
759impl std::fmt::Debug for DumpedCollection {
760    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
761        let mut debug = f.debug_struct("");
762        let debug = debug.field("total_count", &self.total_count);
763        let debug = debug.field("addition_count", &self.addition_count);
764        let debug = debug.field("retraction_count", &self.retraction_count);
765        let debug = match &self.entries {
766            Some(entries) => debug.field("entries", entries),
767            None => debug,
768        };
769        debug.finish()
770    }
771}
772
773struct DumpedEntry {
774    key: Box<dyn std::fmt::Debug>,
775    value: Box<dyn std::fmt::Debug>,
776    key_json: UnescapedDebug,
777    value_json: UnescapedDebug,
778    timestamp: Timestamp,
779    diff: Diff,
780}
781
782impl std::fmt::Debug for DumpedEntry {
783    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
784        f.debug_struct("")
785            .field("key", &self.key)
786            .field("value", &self.value)
787            .field("key_json", &self.key_json)
788            .field("value_json", &self.value_json)
789            .field("timestamp", &self.timestamp)
790            .field("diff", &self.diff)
791            .finish()
792    }
793}
794
795// We want to auto format things with debug, but also not print \ before the " in JSON values, so
796// implement our own debug that doesn't escape.
797struct UnescapedDebug(String);
798
799impl std::fmt::Debug for UnescapedDebug {
800    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
801        write!(f, "'{}'", &self.0)
802    }
803}