mz_debug/
system_catalog_dumper.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Dumps catalog information to files.
17//! We run queries in serial rather than parallel because all queries in the pgwire
18//! connection are run in serial anyways. Running the queries in serial also makes
19//! cleaning up / aborting queries much easier.
20
21use anyhow::{Context as _, Result};
22use chrono::{DateTime, Utc};
23use csv_async::AsyncSerializer;
24use futures::TryStreamExt;
25use mz_tls_util::make_tls;
26use std::fmt;
27use std::path::PathBuf;
28use std::str::FromStr;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::io::AsyncWriteExt;
32use tokio::sync::Mutex;
33use tokio_postgres::{
34    Client as PgClient, Config as PgConfig, Connection, SimpleQueryMessage, Socket, Transaction,
35};
36use tokio_util::io::StreamReader;
37
38use mz_ore::collections::HashMap;
39use mz_ore::retry::{self};
40use mz_ore::task::{self, JoinHandle};
41use postgres_openssl::{MakeTlsConnector, TlsStream};
42use tracing::{error, info};
43
44use crate::Context;
45use crate::utils::format_base_path;
46
47#[derive(Debug, Clone)]
48pub enum RelationCategory {
49    /// For relations that belong in the `mz_introspection` schema.
50    /// These relations require a replica name to be specified.
51    Introspection,
52    /// For relations that are retained metric objects that we'd like to get the SUBSCRIBE output for.
53    Retained,
54    /// Other relations that we want to do a SELECT * FROM on.
55    Basic,
56}
57
58#[derive(Debug, Clone)]
59pub struct Relation {
60    pub name: &'static str,
61    pub category: RelationCategory,
62}
63
64/// This list is used to determine which relations to dump.
65/// The relations are grouped and delimited by their category (i.e. Basic object information)
66static RELATIONS: &[Relation] = &[
67    // Basic object information
68    Relation {
69        name: "mz_audit_events",
70        category: RelationCategory::Basic,
71    },
72    Relation {
73        name: "mz_databases",
74        category: RelationCategory::Basic,
75    },
76    Relation {
77        name: "mz_schemas",
78        category: RelationCategory::Basic,
79    },
80    Relation {
81        name: "mz_tables",
82        category: RelationCategory::Basic,
83    },
84    Relation {
85        name: "mz_sources",
86        category: RelationCategory::Basic,
87    },
88    Relation {
89        name: "mz_sinks",
90        category: RelationCategory::Basic,
91    },
92    Relation {
93        name: "mz_views",
94        category: RelationCategory::Basic,
95    },
96    Relation {
97        name: "mz_materialized_views",
98        category: RelationCategory::Basic,
99    },
100    Relation {
101        name: "mz_secrets",
102        category: RelationCategory::Basic,
103    },
104    Relation {
105        name: "mz_connections",
106        category: RelationCategory::Basic,
107    },
108    Relation {
109        name: "mz_roles",
110        category: RelationCategory::Basic,
111    },
112    Relation {
113        name: "mz_subscriptions",
114        category: RelationCategory::Basic,
115    },
116    Relation {
117        name: "mz_object_fully_qualified_names",
118        category: RelationCategory::Basic,
119    },
120    Relation {
121        name: "mz_sessions",
122        category: RelationCategory::Basic,
123    },
124    Relation {
125        name: "mz_object_history",
126        category: RelationCategory::Basic,
127    },
128    Relation {
129        name: "mz_object_lifetimes",
130        category: RelationCategory::Basic,
131    },
132    Relation {
133        name: "mz_object_dependencies",
134        category: RelationCategory::Basic,
135    },
136    Relation {
137        name: "mz_object_transitive_dependencies",
138        category: RelationCategory::Basic,
139    },
140    // Compute
141    Relation {
142        name: "mz_clusters",
143        category: RelationCategory::Basic,
144    },
145    Relation {
146        name: "mz_indexes",
147        category: RelationCategory::Basic,
148    },
149    Relation {
150        name: "mz_cluster_replicas",
151        category: RelationCategory::Basic,
152    },
153    Relation {
154        name: "mz_cluster_replica_sizes",
155        category: RelationCategory::Basic,
156    },
157    Relation {
158        name: "mz_cluster_replica_statuses",
159        category: RelationCategory::Basic,
160    },
161    Relation {
162        name: "mz_cluster_replica_metrics_history",
163        category: RelationCategory::Basic,
164    },
165    Relation {
166        name: "mz_compute_hydration_times",
167        category: RelationCategory::Retained,
168    },
169    Relation {
170        name: "mz_materialization_dependencies",
171        category: RelationCategory::Basic,
172    },
173    Relation {
174        name: "mz_cluster_replica_status_history",
175        category: RelationCategory::Basic,
176    },
177    // Freshness
178    Relation {
179        name: "mz_wallclock_global_lag_recent_history",
180        category: RelationCategory::Basic,
181    },
182    Relation {
183        name: "mz_global_frontiers",
184        category: RelationCategory::Basic,
185    },
186    Relation {
187        name: "mz_cluster_replica_frontiers",
188        category: RelationCategory::Basic,
189    },
190    Relation {
191        name: "mz_materialization_lag",
192        category: RelationCategory::Basic,
193    },
194    // Sources/sinks
195    Relation {
196        name: "mz_source_statistics_with_history",
197        category: RelationCategory::Basic,
198    },
199    Relation {
200        name: "mz_source_statistics_with_history",
201        category: RelationCategory::Retained,
202    },
203    Relation {
204        name: "mz_sink_statistics",
205        category: RelationCategory::Basic,
206    },
207    Relation {
208        name: "mz_sink_statistics",
209        category: RelationCategory::Retained,
210    },
211    Relation {
212        name: "mz_source_statuses",
213        category: RelationCategory::Basic,
214    },
215    Relation {
216        name: "mz_sink_statuses",
217        category: RelationCategory::Basic,
218    },
219    Relation {
220        name: "mz_source_status_history",
221        category: RelationCategory::Basic,
222    },
223    Relation {
224        name: "mz_sink_status_history",
225        category: RelationCategory::Basic,
226    },
227    // Refresh every information
228    Relation {
229        name: "mz_materialized_view_refresh_strategies",
230        category: RelationCategory::Basic,
231    },
232    Relation {
233        name: "mz_cluster_schedules",
234        category: RelationCategory::Basic,
235    },
236    // Persist
237    Relation {
238        name: "mz_recent_storage_usage",
239        category: RelationCategory::Basic,
240    },
241    // Introspection relations
242    Relation {
243        name: "mz_arrangement_sharing_per_worker",
244        category: RelationCategory::Introspection,
245    },
246    Relation {
247        name: "mz_arrangement_sharing",
248        category: RelationCategory::Introspection,
249    },
250    Relation {
251        name: "mz_arrangement_sizes_per_worker",
252        category: RelationCategory::Introspection,
253    },
254    Relation {
255        name: "mz_dataflow_channels",
256        category: RelationCategory::Introspection,
257    },
258    Relation {
259        name: "mz_dataflow_operators",
260        category: RelationCategory::Introspection,
261    },
262    Relation {
263        name: "mz_dataflow_global_ids",
264        category: RelationCategory::Introspection,
265    },
266    Relation {
267        name: "mz_dataflow_operator_dataflows_per_worker",
268        category: RelationCategory::Introspection,
269    },
270    Relation {
271        name: "mz_dataflow_operator_dataflows",
272        category: RelationCategory::Introspection,
273    },
274    Relation {
275        name: "mz_dataflow_operator_parents_per_worker",
276        category: RelationCategory::Introspection,
277    },
278    Relation {
279        name: "mz_dataflow_operator_parents",
280        category: RelationCategory::Introspection,
281    },
282    Relation {
283        name: "mz_compute_exports",
284        category: RelationCategory::Introspection,
285    },
286    Relation {
287        name: "mz_dataflow_arrangement_sizes",
288        category: RelationCategory::Introspection,
289    },
290    Relation {
291        name: "mz_expected_group_size_advice",
292        category: RelationCategory::Introspection,
293    },
294    Relation {
295        name: "mz_compute_frontiers",
296        category: RelationCategory::Introspection,
297    },
298    Relation {
299        name: "mz_dataflow_channel_operators_per_worker",
300        category: RelationCategory::Introspection,
301    },
302    Relation {
303        name: "mz_dataflow_channel_operators",
304        category: RelationCategory::Introspection,
305    },
306    Relation {
307        name: "mz_compute_import_frontiers",
308        category: RelationCategory::Introspection,
309    },
310    Relation {
311        name: "mz_message_counts_per_worker",
312        category: RelationCategory::Introspection,
313    },
314    Relation {
315        name: "mz_message_counts",
316        category: RelationCategory::Introspection,
317    },
318    Relation {
319        name: "mz_active_peeks",
320        category: RelationCategory::Introspection,
321    },
322    Relation {
323        name: "mz_compute_operator_durations_histogram_per_worker",
324        category: RelationCategory::Introspection,
325    },
326    Relation {
327        name: "mz_compute_operator_durations_histogram",
328        category: RelationCategory::Introspection,
329    },
330    Relation {
331        name: "mz_records_per_dataflow_operator_per_worker",
332        category: RelationCategory::Introspection,
333    },
334    Relation {
335        name: "mz_records_per_dataflow_operator",
336        category: RelationCategory::Introspection,
337    },
338    Relation {
339        name: "mz_records_per_dataflow_per_worker",
340        category: RelationCategory::Introspection,
341    },
342    Relation {
343        name: "mz_records_per_dataflow",
344        category: RelationCategory::Introspection,
345    },
346    Relation {
347        name: "mz_peek_durations_histogram_per_worker",
348        category: RelationCategory::Introspection,
349    },
350    Relation {
351        name: "mz_peek_durations_histogram",
352        category: RelationCategory::Introspection,
353    },
354    Relation {
355        name: "mz_dataflow_shutdown_durations_histogram_per_worker",
356        category: RelationCategory::Introspection,
357    },
358    Relation {
359        name: "mz_dataflow_shutdown_durations_histogram",
360        category: RelationCategory::Introspection,
361    },
362    Relation {
363        name: "mz_scheduling_elapsed_per_worker",
364        category: RelationCategory::Introspection,
365    },
366    Relation {
367        name: "mz_scheduling_elapsed",
368        category: RelationCategory::Introspection,
369    },
370    Relation {
371        name: "mz_scheduling_parks_histogram_per_worker",
372        category: RelationCategory::Introspection,
373    },
374    Relation {
375        name: "mz_scheduling_parks_histogram",
376        category: RelationCategory::Introspection,
377    },
378    Relation {
379        name: "mz_compute_lir_mapping_per_worker",
380        category: RelationCategory::Introspection,
381    },
382    Relation {
383        name: "mz_lir_mapping",
384        category: RelationCategory::Introspection,
385    },
386    Relation {
387        name: "mz_compute_error_counts",
388        category: RelationCategory::Introspection,
389    },
390    Relation {
391        name: "mz_compute_error_counts_per_worker",
392        category: RelationCategory::Introspection,
393    },
394    // Relations that are redundant with some of the above, but
395    // are used by the Console.
396    Relation {
397        name: "mz_cluster_replica_metrics_history",
398        category: RelationCategory::Basic,
399    },
400    Relation {
401        name: "mz_webhook_sources",
402        category: RelationCategory::Basic,
403    },
404    Relation {
405        name: "mz_cluster_replica_history",
406        category: RelationCategory::Basic,
407    },
408    Relation {
409        name: "mz_source_statistics",
410        category: RelationCategory::Basic,
411    },
412    Relation {
413        name: "mz_cluster_deployment_lineage",
414        category: RelationCategory::Basic,
415    },
416    Relation {
417        name: "mz_show_indexes",
418        category: RelationCategory::Basic,
419    },
420    Relation {
421        name: "mz_relations",
422        category: RelationCategory::Basic,
423    },
424    Relation {
425        name: "mz_frontiers",
426        category: RelationCategory::Basic,
427    },
428    Relation {
429        name: "mz_console_cluster_utilization_overview",
430        category: RelationCategory::Basic,
431    },
432    Relation {
433        name: "mz_columns",
434        category: RelationCategory::Basic,
435    },
436    Relation {
437        name: "mz_kafka_sources",
438        category: RelationCategory::Basic,
439    },
440    Relation {
441        name: "mz_kafka_sinks",
442        category: RelationCategory::Basic,
443    },
444];
445
446static PG_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
447/// Timeout for a query.
448// TODO (debug_tool3): Make this configurable.
449static PG_QUERY_TIMEOUT: Duration = Duration::from_secs(20);
450
451/// The maximum number of errors we tolerate for a cluster replica.
452/// If a cluster replica has more than this many errors, we skip it.
453static MAX_CLUSTER_REPLICA_ERROR_COUNT: usize = 3;
454
455static SET_SEARCH_PATH_QUERY: &str = "SET search_path = mz_internal, mz_catalog, mz_introspection";
456static SELECT_CLUSTER_REPLICAS_QUERY: &str = "SELECT c.name as cluster_name, cr.name as replica_name FROM mz_clusters AS c JOIN mz_cluster_replicas AS cr ON c.id = cr.cluster_id;";
457
458#[derive(Debug, Clone, PartialEq, Eq, Hash)]
459pub struct ClusterReplica {
460    pub cluster_name: String,
461    pub replica_name: String,
462}
463
464impl Default for ClusterReplica {
465    fn default() -> Self {
466        Self {
467            cluster_name: "mz_catalog_server".to_string(),
468            replica_name: "r1".to_string(),
469        }
470    }
471}
472
473impl fmt::Display for ClusterReplica {
474    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475        write!(f, "{}.{}", self.cluster_name, self.replica_name)
476    }
477}
478
479pub struct SystemCatalogDumper<'n> {
480    context: &'n Context,
481    pg_client: Arc<Mutex<PgClient>>,
482    pg_tls: MakeTlsConnector,
483    cluster_replicas: Vec<ClusterReplica>,
484    _pg_conn_handle: JoinHandle<Result<(), tokio_postgres::Error>>,
485}
486
487pub async fn create_postgres_connection(
488    connection_string: &str,
489) -> Result<
490    (
491        PgClient,
492        Connection<Socket, TlsStream<Socket>>,
493        MakeTlsConnector,
494    ),
495    anyhow::Error,
496> {
497    let mut pg_config = PgConfig::from_str(connection_string)?;
498    pg_config.connect_timeout(PG_CONNECTION_TIMEOUT);
499    let tls = make_tls(&pg_config)?;
500    info!(
501        "Connecting to PostgreSQL server at {}...",
502        connection_string
503    );
504    let (pg_client, pg_conn) = retry::Retry::default()
505        .max_duration(PG_CONNECTION_TIMEOUT)
506        .retry_async_canceling(|_| {
507            let pg_config = pg_config.clone();
508            let tls = tls.clone();
509            async move { pg_config.connect(tls).await.map_err(|e| anyhow::anyhow!(e)) }
510        })
511        .await?;
512
513    Ok((pg_client, pg_conn, tls))
514}
515
516pub async fn write_copy_stream(
517    transaction: &Transaction<'_>,
518    copy_query: &str,
519    file: &mut tokio::fs::File,
520    relation_name: &str,
521) -> Result<(), anyhow::Error> {
522    let copy_stream = transaction
523        .copy_out(copy_query)
524        .await
525        .context(format!("Failed to COPY TO for {}", relation_name))?
526        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
527    let copy_stream = std::pin::pin!(copy_stream);
528    let mut reader = StreamReader::new(copy_stream);
529    tokio::io::copy(&mut reader, file).await?;
530    // Ensure the file is flushed to disk.
531    file.sync_all().await?;
532
533    Ok::<(), anyhow::Error>(())
534}
535
536pub async fn copy_relation_to_csv(
537    transaction: &Transaction<'_>,
538    file_path_name: PathBuf,
539    column_names: &Vec<String>,
540    relation: &Relation,
541) -> Result<(), anyhow::Error> {
542    let mut file = tokio::fs::File::create(&file_path_name).await?;
543
544    match relation.category {
545        RelationCategory::Retained => {
546            let mut writer = AsyncSerializer::from_writer(file);
547            writer.serialize(column_names).await?;
548
549            transaction
550                .execute(
551                    &format!("DECLARE c CURSOR FOR SUBSCRIBE TO {}", relation.name),
552                    &[],
553                )
554                .await
555                .context("Failed to declare cursor")?;
556
557            // We need to use simple_query, otherwise tokio-postgres will run an introspection SELECT query to figure out the types since it'll
558            // try to prepare the query. This causes an error since SUBSCRIBEs and SELECT queries are not allowed to be executed in the same transaction.
559            // Thus we use simple_query to avoid the introspection query.
560            let rows = transaction
561                // We use a timeout of '1' to receive the snapshot of the current state. A timeout of '0' will return no results.
562                // We also don't care if we get more than just the snapshot.
563                .simple_query("FETCH ALL FROM c WITH (TIMEOUT '1')")
564                .await
565                .context("Failed to fetch all from cursor")?;
566
567            for row in rows {
568                if let SimpleQueryMessage::Row(row) = row {
569                    let values: Vec<&str> = (0..row.len())
570                        .map(|i| row.get(i).unwrap_or("")) // Convert each field to String
571                        .collect();
572                    writer.serialize(&values).await?;
573                }
574            }
575        }
576        _ => {
577            // TODO (SangJunBak): Use `WITH (HEADER TRUE)` once database-issues#2846 is implemented.
578            file.write_all((column_names.join(",") + "\n").as_bytes())
579                .await?;
580            let copy_query = format!(
581                "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)",
582                relation.name
583            );
584            write_copy_stream(transaction, &copy_query, &mut file, relation.name).await?;
585        }
586    };
587
588    info!("Copied {} to {}", relation.name, file_path_name.display());
589    Ok::<(), anyhow::Error>(())
590}
591
592pub async fn query_column_names(
593    pg_client: &PgClient,
594    relation: &Relation,
595) -> Result<Vec<String>, anyhow::Error> {
596    let relation_name = relation.name;
597    // We query the column names to write the header row of the CSV file.
598    let mut column_names = pg_client
599        .query(&format!("SHOW COLUMNS FROM {}", &relation_name), &[])
600        .await
601        .context(format!("Failed to get column names for {}", relation_name))?
602        .into_iter()
603        .map(|row| match row.try_get::<_, String>("name") {
604            Ok(name) => Some(name),
605            Err(_) => None,
606        })
607        .filter_map(|row| row)
608        .collect::<Vec<_>>();
609
610    match relation.category {
611        RelationCategory::Retained => {
612            column_names.splice(0..0, ["mz_timestamp".to_string(), "mz_diff".to_string()]);
613        }
614        _ => (),
615    }
616
617    Ok(column_names)
618}
619
620pub async fn query_relation(
621    transaction: &Transaction<'_>,
622    start_time: DateTime<Utc>,
623    relation: &Relation,
624    column_names: &Vec<String>,
625    cluster_replica: Option<&ClusterReplica>,
626) -> Result<(), anyhow::Error> {
627    let relation_name = relation.name;
628    let relation_category = &relation.category;
629
630    // Some queries (i.e. mz_introspection relations) require the cluster and replica to be set.
631    if let Some(cluster_replica) = &cluster_replica {
632        transaction
633            .execute(
634                &format!("SET LOCAL CLUSTER = '{}'", cluster_replica.cluster_name),
635                &[],
636            )
637            .await
638            .context(format!(
639                "Failed to set cluster to {}",
640                cluster_replica.cluster_name
641            ))?;
642        transaction
643            .execute(
644                &format!(
645                    "SET LOCAL CLUSTER_REPLICA = '{}'",
646                    cluster_replica.replica_name
647                ),
648                &[],
649            )
650            .await
651            .context(format!(
652                "Failed to set cluster replica to {}",
653                cluster_replica.replica_name
654            ))?;
655    }
656
657    match relation_category {
658        RelationCategory::Basic => {
659            let file_path = format_file_path(start_time, None);
660            let file_path_name = file_path.join(relation_name).with_extension("csv");
661            tokio::fs::create_dir_all(&file_path).await?;
662
663            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
664        }
665        RelationCategory::Introspection => {
666            let file_path = format_file_path(start_time, cluster_replica);
667            tokio::fs::create_dir_all(&file_path).await?;
668
669            let file_path_name = file_path.join(relation_name).with_extension("csv");
670
671            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
672        }
673        RelationCategory::Retained => {
674            // Copy the current state and retained subscribe state
675            let file_path = format_file_path(start_time, None);
676            let file_path_name = file_path
677                .join(format!("{}_subscribe", relation_name))
678                .with_extension("csv");
679            tokio::fs::create_dir_all(&file_path).await?;
680
681            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
682        }
683    }
684    Ok::<(), anyhow::Error>(())
685}
686
687impl<'n> SystemCatalogDumper<'n> {
688    pub async fn new(context: &'n Context, connection_string: &str) -> Result<Self, anyhow::Error> {
689        let (pg_client, pg_conn, pg_tls) = create_postgres_connection(connection_string).await?;
690
691        info!("Connected to PostgreSQL server at {}...", connection_string);
692
693        let handle = task::spawn(|| "postgres-connection", pg_conn);
694
695        // Set search path to system catalog tables
696        pg_client
697            .execute(SET_SEARCH_PATH_QUERY, &[])
698            .await
699            .context("Failed to set search path")?;
700
701        // We need to get all cluster replicas to dump introspection relations.
702        let cluster_replicas = match pg_client.query(SELECT_CLUSTER_REPLICAS_QUERY, &[]).await {
703            Ok(rows) => rows
704                .into_iter()
705                .map(|row| {
706                    let cluster_name = row.try_get::<_, String>("cluster_name");
707                    let replica_name = row.try_get::<_, String>("replica_name");
708
709                    if let (Ok(cluster_name), Ok(replica_name)) = (cluster_name, replica_name) {
710                        Some(ClusterReplica {
711                            cluster_name,
712                            replica_name,
713                        })
714                    } else {
715                        None
716                    }
717                })
718                .filter_map(|row| row)
719                .collect::<Vec<_>>(),
720            Err(e) => {
721                error!("Failed to get replica names: {}", e);
722                vec![]
723            }
724        };
725
726        Ok(Self {
727            context,
728            pg_client: Arc::new(Mutex::new(pg_client)),
729            pg_tls,
730            cluster_replicas,
731            _pg_conn_handle: handle,
732        })
733    }
734
735    pub async fn dump_relation(
736        &self,
737        relation: &Relation,
738        cluster_replica: Option<&ClusterReplica>,
739    ) -> Result<(), anyhow::Error> {
740        info!(
741            "Copying relation {}{}{}",
742            relation.name,
743            match relation.category {
744                RelationCategory::Retained => " (subscribe history)",
745                _ => "",
746            },
747            cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" in {}", replica))
748        );
749
750        let start_time = self.context.start_time;
751        let pg_client = &self.pg_client;
752
753        let relation_name = relation.name.to_string();
754
755        if let Err(err) = retry::Retry::default()
756            .max_duration(PG_QUERY_TIMEOUT)
757            .initial_backoff(Duration::from_secs(2))
758            .retry_async_canceling(|_| {
759                let start_time = start_time.clone();
760                let relation_name = relation.name;
761                let cluster_replica = cluster_replica.clone();
762
763                async move {
764                    // TODO (debug_tool3): Use a transaction for the entire dump instead of per query.
765                    let mut pg_client = pg_client.lock().await;
766
767                    match async {
768                        // We cannot query the column names in the transaction because SUBSCRIBE queries
769                        // cannot be executed with SELECT and SHOW queries in the same transaction.
770                        let column_names = query_column_names(&pg_client, relation).await?;
771
772                        let transaction = pg_client.transaction().await?;
773                        query_relation(
774                            &transaction,
775                            start_time,
776                            relation,
777                            &column_names,
778                            cluster_replica,
779                        )
780                        .await?;
781
782                        Ok::<(), anyhow::Error>(())
783                    }
784                    .await
785                    {
786                        Ok(()) => Ok(()),
787                        Err(err) => {
788                            error!(
789                                "{}: {:#}. Retrying...",
790                                format_catalog_dump_error_message(relation_name, cluster_replica),
791                                err
792                            );
793                            Err(err)
794                        }
795                    }
796                }
797            })
798            .await
799        {
800            let pg_client_lock = pg_client.lock().await;
801
802            let cancel_token = pg_client_lock.cancel_token();
803
804            if let Err(_) = async {
805                let tls = self.pg_tls.clone();
806
807                cancel_token.cancel_query(tls).await?;
808                Ok::<(), anyhow::Error>(())
809            }
810            .await
811            {
812                error!(
813                    "Failed to cancel query for {}{}",
814                    relation_name,
815                    cluster_replica
816                        .map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
817                );
818            }
819
820            return Err(err);
821        }
822
823        Ok(())
824    }
825
826    pub async fn dump_all_relations(&self) {
827        let cluster_replicas = &self.cluster_replicas;
828
829        // Create a map to count errors by cluster replica..
830        let mut cluster_replica_error_counts: HashMap<ClusterReplica, usize> = HashMap::new();
831        for replica in cluster_replicas {
832            cluster_replica_error_counts
833                .entry(replica.clone())
834                .insert_entry(0);
835        }
836
837        let non_introspection_iter = RELATIONS
838            .iter()
839            .filter(|relation| {
840                matches!(
841                    relation.category,
842                    RelationCategory::Basic | RelationCategory::Retained
843                )
844            })
845            .map(|relation| (relation, None::<&ClusterReplica>));
846
847        let introspection_iter = RELATIONS
848            .iter()
849            .filter(|relation| matches!(relation.category, RelationCategory::Introspection))
850            .collect::<Vec<_>>();
851
852        let introspection_iter = cluster_replicas.iter().flat_map(|replica| {
853            introspection_iter
854                .iter()
855                .map(move |relation| (*relation, Some(replica)))
856        });
857
858        // Combine and iterate over all relation/replica pairs
859        for (relation, replica) in non_introspection_iter.chain(introspection_iter) {
860            let replica_key = if let Some(replica) = replica {
861                replica
862            } else {
863                // If the replica is null, we assume it's  mz_catalog_server.
864                &ClusterReplica::default()
865            };
866
867            // If the cluster replica has more than `MAX_CLUSTER_REPLICA_ERROR_COUNT` errors,
868            // we can skip it since we can assume it's not responsive and don't want to hold up
869            // following queries.
870            if cluster_replica_error_counts.get(replica_key).unwrap_or(&0)
871                >= &MAX_CLUSTER_REPLICA_ERROR_COUNT
872            {
873                info!(
874                    "Skipping {}{}",
875                    relation.name,
876                    replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
877                );
878                continue;
879            }
880
881            if let Err(err) = self.dump_relation(relation, replica).await {
882                error!(
883                    "{}: {:#}.",
884                    format_catalog_dump_error_message(relation.name, replica),
885                    err,
886                );
887
888                if err.to_string().contains("deadline has elapsed") {
889                    let docs_link = if replica.is_none()
890                        || replica.map_or(false, |r| r.cluster_name == "mz_catalog_server")
891                    {
892                        "https://materialize.com/docs/self-managed/v25.1/installation/troubleshooting/#troubleshooting-console-unresponsiveness"
893                    } else {
894                        "https://materialize.com/docs/sql/alter-cluster/#resizing-1"
895                    };
896                    error!("Consider increasing the size of the cluster {}", docs_link);
897                }
898
899                let is_missing_catalog_item_err = match err.downcast_ref::<tokio_postgres::Error>()
900                {
901                    Some(pg_err) => pg_err
902                        .to_string()
903                        .to_lowercase()
904                        .contains("unknown catalog item"),
905                    None => false,
906                };
907
908                // If the error is due to a missing catalog item,
909                // we don't count it as an error since we expect some
910                // catalog items to be missing. This is because mz-debug
911                // is meant to be backwards compatible with older versions
912                // of Materialize.
913                if !is_missing_catalog_item_err {
914                    cluster_replica_error_counts
915                        .entry(replica_key.clone())
916                        .and_modify(|count| *count += 1)
917                        .or_insert(1);
918                }
919            }
920        }
921    }
922}
923
924fn format_catalog_dump_error_message(
925    relation_name: &str,
926    cluster_replica: Option<&ClusterReplica>,
927) -> String {
928    format!(
929        "Failed to dump relation {}{}",
930        relation_name,
931        cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
932    )
933}
934
935fn format_file_path(date_time: DateTime<Utc>, cluster_replica: Option<&ClusterReplica>) -> PathBuf {
936    let path = format_base_path(date_time).join("system-catalog");
937    if let Some(cluster_replica) = cluster_replica {
938        path.join(cluster_replica.cluster_name.as_str())
939            .join(cluster_replica.replica_name.as_str())
940    } else {
941        path
942    }
943}