1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::fs::File;
15use std::io::{self, Write};
16use std::path::PathBuf;
17use std::process;
18use std::sync::Arc;
19use std::sync::LazyLock;
20use std::time::Instant;
21
22use anyhow::Context;
23use clap::Parser;
24use futures::future::FutureExt;
25use mz_adapter::catalog::{Catalog, InitializeStateResult};
26use mz_adapter_types::bootstrap_builtin_cluster_config::{
27 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
28 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
29 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
30};
31use mz_build_info::{BuildInfo, build_info};
32use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, StateConfig};
33use mz_catalog::durable::debug::{
34 AuditLogCollection, ClusterCollection, ClusterIntrospectionSourceIndexCollection,
35 ClusterReplicaCollection, Collection, CollectionTrace, CollectionType, CommentCollection,
36 ConfigCollection, DatabaseCollection, DebugCatalogState, DefaultPrivilegeCollection,
37 IdAllocatorCollection, ItemCollection, NetworkPolicyCollection, RoleAuthCollection,
38 RoleCollection, SchemaCollection, SettingCollection, SourceReferencesCollection,
39 StorageCollectionMetadataCollection, SystemConfigurationCollection,
40 SystemItemMappingCollection, SystemPrivilegeCollection, Trace, TxnWalShardCollection,
41 UnfinalizedShardsCollection,
42};
43use mz_catalog::durable::{
44 BootstrapArgs, OpenableDurableCatalogState, persist_backed_catalog_state,
45};
46use mz_catalog::memory::objects::CatalogItem;
47use mz_cloud_resources::AwsExternalIdPrefix;
48use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
49use mz_ore::cli::{self, CliConfig};
50use mz_ore::collections::HashSet;
51use mz_ore::error::ErrorExt;
52use mz_ore::metrics::MetricsRegistry;
53use mz_ore::now::SYSTEM_TIME;
54use mz_ore::url::SensitiveUrl;
55use mz_persist_client::cache::PersistClientCache;
56use mz_persist_client::cfg::PersistConfig;
57use mz_persist_client::rpc::PubSubClientConnection;
58use mz_persist_client::{Diagnostics, PersistClient, PersistLocation};
59use mz_repr::{Diff, Timestamp};
60use mz_service::secrets::SecretsReaderCliArgs;
61use mz_sql::catalog::EnvironmentId;
62use mz_storage_types::StorageDiff;
63use mz_storage_types::connections::ConnectionContext;
64use mz_storage_types::controller::StorageError;
65use mz_storage_types::sources::SourceData;
66use serde::{Deserialize, Serialize};
67use tracing::{Instrument, error};
68
69pub const BUILD_INFO: BuildInfo = build_info!();
70pub static VERSION: LazyLock<String> = LazyLock::new(|| BUILD_INFO.human_version(None));
71
72#[derive(Parser, Debug)]
73#[clap(name = "catalog", next_line_help = true, version = VERSION.as_str())]
74pub struct Args {
75 #[clap(long, env = "ENVIRONMENT_ID")]
79 environment_id: EnvironmentId,
80 #[clap(long, env = "PERSIST_BLOB_URL")]
82 persist_blob_url: SensitiveUrl,
83 #[clap(long, env = "PERSIST_CONSENSUS_URL")]
85 persist_consensus_url: SensitiveUrl,
86 #[clap(long, env = "AWS_EXTERNAL_ID", value_name = "ID", value_parser = AwsExternalIdPrefix::new_from_cli_argument_or_environment_variable)]
91 aws_external_id_prefix: Option<AwsExternalIdPrefix>,
92
93 #[clap(long, env = "AWS_CONNECTION_ROLE_ARN")]
96 aws_connection_role_arn: Option<String>,
97 #[clap(flatten)]
99 tracing: TracingCliArgs,
100 #[clap(long)]
102 deploy_generation: Option<u64>,
103
104 #[clap(subcommand)]
105 action: Action,
106}
107
108#[derive(Debug, Clone, clap::Subcommand)]
109enum Action {
110 Dump {
115 #[clap(long)]
118 ignore_large_collections: bool,
119 #[clap(long, short = 'i', action = clap::ArgAction::Append)]
121 ignore: Vec<CollectionType>,
122 #[clap(long)]
124 stats_only: bool,
125 #[clap(long, short = 'c')]
127 consolidate: bool,
128 target: Option<PathBuf>,
130 },
131 Epoch {
133 target: Option<PathBuf>,
135 },
136 Edit {
138 collection: String,
140 key: serde_json::Value,
142 value: serde_json::Value,
144 },
145 Delete {
147 collection: String,
149 key: serde_json::Value,
151 },
152 UpgradeCheck {
158 #[clap(flatten)]
159 secrets: SecretsReaderCliArgs,
160 cluster_replica_sizes: String,
162 },
163}
164
165#[tokio::main]
166async fn main() {
167 let args: Args = cli::parse_args(CliConfig {
168 env_prefix: Some("MZ_CATALOG_DEBUG_"),
169 enable_version_flag: true,
170 });
171
172 let (_, _tracing_guard) = args
173 .tracing
174 .configure_tracing(
175 StaticTracingConfig {
176 service_name: "catalog-debug",
177 build_info: BUILD_INFO,
178 },
179 MetricsRegistry::new(),
180 )
181 .await
182 .expect("failed to init tracing");
183
184 if let Err(err) = run(args).await {
185 eprintln!(
186 "catalog-debug: fatal: {}\nbacktrace: {}",
187 err.display_with_causes(),
188 err.backtrace()
189 );
190 process::exit(1);
191 }
192}
193
194async fn run(args: Args) -> Result<(), anyhow::Error> {
195 let metrics_registry = MetricsRegistry::new();
196 let start = Instant::now();
197 let persist_config = PersistConfig::new_default_configs(&BUILD_INFO, SYSTEM_TIME.clone());
200 let persist_clients = PersistClientCache::new(persist_config, &metrics_registry, |_, _| {
201 PubSubClientConnection::noop()
202 });
203 let persist_location = PersistLocation {
204 blob_uri: args.persist_blob_url.clone(),
205 consensus_uri: args.persist_consensus_url.clone(),
206 };
207 let persist_client = persist_clients.open(persist_location).await?;
208 let organization_id = args.environment_id.organization_id();
209 let metrics = Arc::new(mz_catalog::durable::Metrics::new(&metrics_registry));
210 let openable_state = persist_backed_catalog_state(
211 persist_client.clone(),
212 organization_id,
213 BUILD_INFO.semver_version(),
214 args.deploy_generation,
215 metrics,
216 )
217 .await?;
218
219 match args.action.clone() {
220 Action::Dump {
221 ignore_large_collections,
222 ignore,
223 stats_only,
224 consolidate,
225 target,
226 } => {
227 let ignore: HashSet<_> = ignore.into_iter().collect();
228 let target: Box<dyn Write> = if let Some(path) = target {
229 Box::new(File::create(path)?)
230 } else {
231 Box::new(io::stdout().lock())
232 };
233 dump(
234 openable_state,
235 ignore_large_collections,
236 ignore,
237 stats_only,
238 consolidate,
239 target,
240 )
241 .await
242 }
243 Action::Epoch { target } => {
244 let target: Box<dyn Write> = if let Some(path) = target {
245 Box::new(File::create(path)?)
246 } else {
247 Box::new(io::stdout().lock())
248 };
249 epoch(openable_state, target).await
250 }
251 Action::Edit {
252 collection,
253 key,
254 value,
255 } => edit(openable_state, collection, key, value).await,
256 Action::Delete { collection, key } => delete(openable_state, collection, key).await,
257 Action::UpgradeCheck {
258 secrets,
259 cluster_replica_sizes,
260 } => {
261 let cluster_replica_sizes =
262 ClusterReplicaSizeMap::parse_from_str(&cluster_replica_sizes, false)
263 .context("parsing replica size map")?;
264 upgrade_check(
265 args,
266 openable_state,
267 persist_client,
268 secrets,
269 cluster_replica_sizes,
270 start,
271 )
272 .await
273 }
274 }
275}
276
277macro_rules! for_collection {
280 ($collection_type:expr, $fn:ident $(, $arg:expr)*) => {
281 match $collection_type {
282 CollectionType::AuditLog => $fn::<AuditLogCollection>($($arg),*).await?,
283 CollectionType::ComputeInstance => $fn::<ClusterCollection>($($arg),*).await?,
284 CollectionType::ComputeIntrospectionSourceIndex => $fn::<ClusterIntrospectionSourceIndexCollection>($($arg),*).await?,
285 CollectionType::ComputeReplicas => $fn::<ClusterReplicaCollection>($($arg),*).await?,
286 CollectionType::Comments => $fn::<CommentCollection>($($arg),*).await?,
287 CollectionType::Config => $fn::<ConfigCollection>($($arg),*).await?,
288 CollectionType::Database => $fn::<DatabaseCollection>($($arg),*).await?,
289 CollectionType::DefaultPrivileges => $fn::<DefaultPrivilegeCollection>($($arg),*).await?,
290 CollectionType::IdAlloc => $fn::<IdAllocatorCollection>($($arg),*).await?,
291 CollectionType::Item => $fn::<ItemCollection>($($arg),*).await?,
292 CollectionType::NetworkPolicy => $fn::<NetworkPolicyCollection>($($arg),*).await?,
293 CollectionType::Role => $fn::<RoleCollection>($($arg),*).await?,
294 CollectionType::RoleAuth => $fn::<RoleAuthCollection>($($arg),*).await?,
295 CollectionType::Schema => $fn::<SchemaCollection>($($arg),*).await?,
296 CollectionType::Setting => $fn::<SettingCollection>($($arg),*).await?,
297 CollectionType::SourceReferences => $fn::<SourceReferencesCollection>($($arg),*).await?,
298 CollectionType::SystemConfiguration => $fn::<SystemConfigurationCollection>($($arg),*).await?,
299 CollectionType::SystemGidMapping => $fn::<SystemItemMappingCollection>($($arg),*).await?,
300 CollectionType::SystemPrivileges => $fn::<SystemPrivilegeCollection>($($arg),*).await?,
301 CollectionType::StorageCollectionMetadata => $fn::<StorageCollectionMetadataCollection>($($arg),*).await?,
302 CollectionType::UnfinalizedShard => $fn::<UnfinalizedShardsCollection>($($arg),*).await?,
303 CollectionType::TxnWalShard => $fn::<TxnWalShardCollection>($($arg),*).await?,
304 }
305 };
306}
307
308async fn edit(
309 openable_state: Box<dyn OpenableDurableCatalogState>,
310 collection: String,
311 key: serde_json::Value,
312 value: serde_json::Value,
313) -> Result<(), anyhow::Error> {
314 async fn edit_col<T: Collection>(
315 mut debug_state: DebugCatalogState,
316 key: serde_json::Value,
317 value: serde_json::Value,
318 ) -> Result<serde_json::Value, anyhow::Error>
319 where
320 for<'a> T::Key: PartialEq + Eq + Debug + Clone + Deserialize<'a>,
321 for<'a> T::Value: Debug + Clone + Serialize + Deserialize<'a>,
322 {
323 let key: T::Key = serde_json::from_value(key)?;
324 let value: T::Value = serde_json::from_value(value)?;
325 let prev = debug_state.edit::<T>(key.clone(), value.clone()).await?;
326 Ok(serde_json::to_value(prev)?)
327 }
328
329 let collection_type: CollectionType = collection.parse()?;
330 let debug_state = openable_state.open_debug().await?;
331 let prev = for_collection!(collection_type, edit_col, debug_state, key, value);
332 println!("previous value: {prev:?}");
333 Ok(())
334}
335
336async fn delete(
337 openable_state: Box<dyn OpenableDurableCatalogState>,
338 collection: String,
339 key: serde_json::Value,
340) -> Result<(), anyhow::Error> {
341 async fn delete_col<T: Collection>(
342 mut debug_state: DebugCatalogState,
343 key: serde_json::Value,
344 ) -> Result<(), anyhow::Error>
345 where
346 for<'a> T::Key: PartialEq + Eq + Debug + Clone + Deserialize<'a>,
347 T::Value: Debug,
348 {
349 let key: T::Key = serde_json::from_value(key)?;
350 debug_state.delete::<T>(key.clone()).await?;
351 Ok(())
352 }
353
354 let collection_type: CollectionType = collection.parse()?;
355 let debug_state = openable_state.open_debug().await?;
356 for_collection!(collection_type, delete_col, debug_state, key);
357 Ok(())
358}
359
360async fn dump(
361 mut openable_state: Box<dyn OpenableDurableCatalogState>,
362 ignore_large_collections: bool,
363 ignore: HashSet<CollectionType>,
364 stats_only: bool,
365 consolidate: bool,
366 mut target: impl Write,
367) -> Result<(), anyhow::Error> {
368 fn dump_col<T: Collection>(
369 data: &mut BTreeMap<String, DumpedCollection>,
370 trace: CollectionTrace<T>,
371 ignore: &HashSet<CollectionType>,
372 stats_only: bool,
373 consolidate: bool,
374 ) where
375 T::Key: Serialize + Debug + 'static,
376 T::Value: Serialize + Debug + 'static,
377 {
378 if ignore.contains(&T::collection_type()) {
379 return;
380 }
381
382 let entries: Vec<_> = trace
383 .values
384 .into_iter()
385 .map(|((k, v), timestamp, diff)| {
386 let key_json = serde_json::to_string(&k).expect("must serialize");
387 let value_json = serde_json::to_string(&v).expect("must serialize");
388 DumpedEntry {
389 key: Box::new(k),
390 value: Box::new(v),
391 key_json: UnescapedDebug(key_json),
392 value_json: UnescapedDebug(value_json),
393 timestamp,
394 diff,
395 }
396 })
397 .collect();
398
399 let total_count = entries.len();
400 let addition_count = entries
401 .iter()
402 .filter(|entry| entry.diff == Diff::ONE)
403 .count();
404 let retraction_count = entries
405 .iter()
406 .filter(|entry| entry.diff == Diff::MINUS_ONE)
407 .count();
408 let entries = if stats_only { None } else { Some(entries) };
409 let dumped_col = DumpedCollection {
410 total_count,
411 addition_count,
412 retraction_count,
413 entries,
414 };
415 let name = T::name();
416
417 if consolidate && retraction_count != 0 {
418 error!(
419 "{name} catalog collection has corrupt entries, there should be no retractions in a consolidated catalog, but there are {retraction_count} retractions"
420 );
421 }
422
423 data.insert(name, dumped_col);
424 }
425
426 let mut data = BTreeMap::new();
427 let Trace {
428 audit_log,
429 clusters,
430 introspection_sources,
431 cluster_replicas,
432 comments,
433 configs,
434 databases,
435 default_privileges,
436 id_allocator,
437 items,
438 network_policies,
439 roles,
440 role_auth,
441 schemas,
442 settings,
443 source_references,
444 system_object_mappings,
445 system_configurations,
446 system_privileges,
447 storage_collection_metadata,
448 unfinalized_shards,
449 txn_wal_shard,
450 } = if consolidate {
451 openable_state.trace_consolidated().await?
452 } else {
453 openable_state.trace_unconsolidated().await?
454 };
455
456 if !ignore_large_collections {
457 dump_col(&mut data, audit_log, &ignore, stats_only, consolidate);
458 }
459 dump_col(&mut data, clusters, &ignore, stats_only, consolidate);
460 dump_col(
461 &mut data,
462 introspection_sources,
463 &ignore,
464 stats_only,
465 consolidate,
466 );
467 dump_col(
468 &mut data,
469 cluster_replicas,
470 &ignore,
471 stats_only,
472 consolidate,
473 );
474 dump_col(&mut data, comments, &ignore, stats_only, consolidate);
475 dump_col(&mut data, configs, &ignore, stats_only, consolidate);
476 dump_col(&mut data, databases, &ignore, stats_only, consolidate);
477 dump_col(
478 &mut data,
479 default_privileges,
480 &ignore,
481 stats_only,
482 consolidate,
483 );
484 dump_col(&mut data, id_allocator, &ignore, stats_only, consolidate);
485 dump_col(&mut data, items, &ignore, stats_only, consolidate);
486 dump_col(
487 &mut data,
488 network_policies,
489 &ignore,
490 stats_only,
491 consolidate,
492 );
493 dump_col(&mut data, roles, &ignore, stats_only, consolidate);
494 dump_col(&mut data, role_auth, &ignore, stats_only, consolidate);
495 dump_col(&mut data, schemas, &ignore, stats_only, consolidate);
496 dump_col(&mut data, settings, &ignore, stats_only, consolidate);
497 dump_col(
498 &mut data,
499 source_references,
500 &ignore,
501 stats_only,
502 consolidate,
503 );
504 dump_col(
505 &mut data,
506 system_configurations,
507 &ignore,
508 stats_only,
509 consolidate,
510 );
511 dump_col(
512 &mut data,
513 system_object_mappings,
514 &ignore,
515 stats_only,
516 consolidate,
517 );
518 dump_col(
519 &mut data,
520 system_privileges,
521 &ignore,
522 stats_only,
523 consolidate,
524 );
525 dump_col(
526 &mut data,
527 storage_collection_metadata,
528 &ignore,
529 stats_only,
530 consolidate,
531 );
532 dump_col(
533 &mut data,
534 unfinalized_shards,
535 &ignore,
536 stats_only,
537 consolidate,
538 );
539 dump_col(&mut data, txn_wal_shard, &ignore, stats_only, consolidate);
540
541 writeln!(&mut target, "{data:#?}")?;
542 Ok(())
543}
544
545async fn epoch(
546 mut openable_state: Box<dyn OpenableDurableCatalogState>,
547 mut target: impl Write,
548) -> Result<(), anyhow::Error> {
549 let epoch = openable_state.epoch().await?;
550 writeln!(&mut target, "Epoch: {epoch:#?}")?;
551 Ok(())
552}
553
554async fn upgrade_check(
555 args: Args,
556 openable_state: Box<dyn OpenableDurableCatalogState>,
557 persist_client: PersistClient,
558 secrets: SecretsReaderCliArgs,
559 cluster_replica_sizes: ClusterReplicaSizeMap,
560 start: Instant,
561) -> Result<(), anyhow::Error> {
562 let secrets_reader = secrets.load().await.context("loading secrets reader")?;
563
564 let now = SYSTEM_TIME.clone();
565 let mut storage = openable_state
566 .open_savepoint(
567 now().into(),
568 &BootstrapArgs {
569 default_cluster_replica_size:
570 "DEFAULT CLUSTER REPLICA SIZE IS ONLY USED FOR NEW ENVIRONMENTS".into(),
571 default_cluster_replication_factor: 1,
572 bootstrap_role: None,
573 cluster_replica_size_map: cluster_replica_sizes.clone(),
574 },
575 )
576 .await?
577 .0;
578
579 let builtin_clusters_replica_size = cluster_replica_sizes
582 .0
583 .first_key_value()
584 .expect("we must have at least a single valid replica size")
585 .0
586 .clone();
587
588 let boot_ts = now().into();
589 let read_only = true;
590 let InitializeStateResult {
594 state,
595 migrated_storage_collections_0dt: _,
596 new_builtin_collections: _,
597 builtin_table_updates: _,
598 last_seen_version,
599 expr_cache_handle: _,
600 cached_global_exprs: _,
601 uncached_local_exprs: _,
602 } = Catalog::initialize_state(
603 StateConfig {
604 unsafe_mode: true,
605 all_features: false,
606 build_info: &BUILD_INFO,
607 environment_id: args.environment_id.clone(),
608 read_only,
609 now,
610 boot_ts,
611 skip_migrations: false,
612 cluster_replica_sizes,
613 builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
614 size: builtin_clusters_replica_size.clone(),
615 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
616 },
617 builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
618 size: builtin_clusters_replica_size.clone(),
619 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
620 },
621 builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
622 size: builtin_clusters_replica_size.clone(),
623 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
624 },
625 builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
626 size: builtin_clusters_replica_size.clone(),
627 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
628 },
629 builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
630 size: builtin_clusters_replica_size.clone(),
631 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
632 },
633 system_parameter_defaults: Default::default(),
634 remote_system_parameters: None,
635 availability_zones: vec![],
636 egress_addresses: vec![],
637 aws_principal_context: None,
638 aws_privatelink_availability_zones: None,
639 http_host_name: None,
640 connection_context: ConnectionContext::from_cli_args(
641 args.environment_id.to_string(),
642 &args.tracing.startup_log_filter,
643 args.aws_external_id_prefix,
644 args.aws_connection_role_arn,
645 secrets_reader,
646 None,
647 ),
648 builtin_item_migration_config: BuiltinItemMigrationConfig {
649 persist_client: PersistClient::new_for_tests().await,
652 read_only,
653 },
654 persist_client: persist_client.clone(),
655 enable_expression_cache_override: None,
656 enable_0dt_deployment: true,
657 helm_chart_version: None,
658 },
659 &mut storage,
660 )
661 .instrument(tracing::info_span!("catalog::initialize_state"))
662 .boxed()
663 .await?;
664 let dur = start.elapsed();
665
666 let msg = format!(
667 "catalog upgrade from {} to {} would succeed in about {} ms",
668 last_seen_version,
669 &BUILD_INFO.human_version(None),
670 dur.as_millis(),
671 );
672 println!("{msg}");
673
674 let storage_entries = state
676 .get_entries()
677 .filter_map(|(_item_id, entry)| match entry.item() {
678 CatalogItem::Table(table) => Some((table.global_id_writes(), table.desc.latest())),
680 CatalogItem::Source(source) => Some((source.global_id(), source.desc.clone())),
681 CatalogItem::ContinualTask(ct) => Some((ct.global_id(), ct.desc.clone())),
682 CatalogItem::MaterializedView(mv) => Some((mv.global_id(), mv.desc.clone())),
683 CatalogItem::Log(_)
684 | CatalogItem::View(_)
685 | CatalogItem::Sink(_)
686 | CatalogItem::Index(_)
687 | CatalogItem::Type(_)
688 | CatalogItem::Func(_)
689 | CatalogItem::Secret(_)
690 | CatalogItem::Connection(_) => None,
691 });
692
693 let mut storage_errors = BTreeMap::default();
694 for (gid, item_desc) in storage_entries {
695 let maybe_shard_id = state
698 .storage_metadata()
699 .get_collection_shard::<Timestamp>(gid);
700 let shard_id = match maybe_shard_id {
701 Ok(shard_id) => shard_id,
702 Err(StorageError::IdentifierMissing(_)) => {
703 println!("no shard_id found for {gid}, continuing...");
704 continue;
705 }
706 Err(err) => {
707 storage_errors.insert(gid, err.to_string());
709 continue;
710 }
711 };
712 println!("checking Persist schema info for {gid}: {shard_id}");
713
714 let diagnostics = Diagnostics {
715 shard_name: gid.to_string(),
716 handle_purpose: "catalog upgrade check".to_string(),
717 };
718 let persisted_schema = persist_client
719 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
720 .await
721 .expect("invalid persist usage");
722 let Some((_schema_id, persisted_relation_desc, _)) = persisted_schema else {
726 println!("no schema found for {gid} '{shard_id}', continuing...");
727 continue;
728 };
729
730 let persisted_data_type =
731 mz_persist_types::columnar::data_type::<SourceData>(&persisted_relation_desc)?;
732 let new_data_type = mz_persist_types::columnar::data_type::<SourceData>(&item_desc)?;
733
734 let migration =
735 mz_persist_types::schema::backward_compatible(&persisted_data_type, &new_data_type);
736 if migration.is_none() {
737 let msg = format!(
738 "invalid Persist schema migration!\nshard_id: {}\npersisted: {:?}\n{:?}\nnew: {:?}\n{:?}",
739 shard_id, persisted_relation_desc, persisted_data_type, item_desc, new_data_type,
740 );
741 storage_errors.insert(gid, msg);
742 }
743 }
744
745 if !storage_errors.is_empty() {
746 anyhow::bail!("validation of storage objects failed! errors: {storage_errors:?}")
747 } else {
748 Ok(())
749 }
750}
751
752struct DumpedCollection {
753 total_count: usize,
754 addition_count: usize,
755 retraction_count: usize,
756 entries: Option<Vec<DumpedEntry>>,
757}
758
759impl std::fmt::Debug for DumpedCollection {
760 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
761 let mut debug = f.debug_struct("");
762 let debug = debug.field("total_count", &self.total_count);
763 let debug = debug.field("addition_count", &self.addition_count);
764 let debug = debug.field("retraction_count", &self.retraction_count);
765 let debug = match &self.entries {
766 Some(entries) => debug.field("entries", entries),
767 None => debug,
768 };
769 debug.finish()
770 }
771}
772
773struct DumpedEntry {
774 key: Box<dyn std::fmt::Debug>,
775 value: Box<dyn std::fmt::Debug>,
776 key_json: UnescapedDebug,
777 value_json: UnescapedDebug,
778 timestamp: Timestamp,
779 diff: Diff,
780}
781
782impl std::fmt::Debug for DumpedEntry {
783 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
784 f.debug_struct("")
785 .field("key", &self.key)
786 .field("value", &self.value)
787 .field("key_json", &self.key_json)
788 .field("value_json", &self.value_json)
789 .field("timestamp", &self.timestamp)
790 .field("diff", &self.diff)
791 .finish()
792 }
793}
794
795struct UnescapedDebug(String);
798
799impl std::fmt::Debug for UnescapedDebug {
800 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
801 write!(f, "'{}'", &self.0)
802 }
803}