mz_debug/
system_catalog_dumper.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Dumps catalog information to files.
17//! We run queries in serial rather than parallel because all queries in the pgwire
18//! connection are run in serial anyways. Running the queries in serial also makes
19//! cleaning up / aborting queries much easier.
20
21use anyhow::{Context as _, Result};
22use csv_async::AsyncSerializer;
23use futures::TryStreamExt;
24use mz_tls_util::make_tls;
25use std::fmt;
26use std::path::PathBuf;
27use std::str::FromStr;
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::io::AsyncWriteExt;
31use tokio::sync::Mutex;
32use tokio_postgres::{
33    Client as PgClient, Config as PgConfig, Connection, SimpleQueryMessage, Socket, Transaction,
34};
35use tokio_util::io::StreamReader;
36
37use mz_ore::collections::HashMap;
38use mz_ore::retry::{self};
39use mz_ore::task::{self, JoinHandle};
40use postgres_openssl::{MakeTlsConnector, TlsStream};
41use tracing::{info, warn};
42
43#[derive(Debug, Clone)]
44pub enum RelationCategory {
45    /// For relations that belong in the `mz_introspection` schema.
46    /// These relations require a replica name to be specified.
47    Introspection,
48    /// For relations that are retained metric objects that we'd like to get the SUBSCRIBE output for.
49    Retained,
50    /// Other relations that we want to do a SELECT * FROM on.
51    Basic,
52}
53
54#[derive(Debug, Clone)]
55pub struct Relation {
56    pub name: &'static str,
57    pub category: RelationCategory,
58}
59
60static SYSTEM_CATALOG_DUMP_DIR: &str = "system_catalog";
61/// This list is used to determine which relations to dump.
62/// The relations are grouped and delimited by their category (i.e. Basic object information)
63static RELATIONS: &[Relation] = &[
64    // Basic object information
65    Relation {
66        name: "mz_audit_events",
67        category: RelationCategory::Basic,
68    },
69    Relation {
70        name: "mz_databases",
71        category: RelationCategory::Basic,
72    },
73    Relation {
74        name: "mz_schemas",
75        category: RelationCategory::Basic,
76    },
77    Relation {
78        name: "mz_tables",
79        category: RelationCategory::Basic,
80    },
81    Relation {
82        name: "mz_sources",
83        category: RelationCategory::Basic,
84    },
85    Relation {
86        name: "mz_sinks",
87        category: RelationCategory::Basic,
88    },
89    Relation {
90        name: "mz_views",
91        category: RelationCategory::Basic,
92    },
93    Relation {
94        name: "mz_materialized_views",
95        category: RelationCategory::Basic,
96    },
97    Relation {
98        name: "mz_secrets",
99        category: RelationCategory::Basic,
100    },
101    Relation {
102        name: "mz_connections",
103        category: RelationCategory::Basic,
104    },
105    Relation {
106        name: "mz_roles",
107        category: RelationCategory::Basic,
108    },
109    Relation {
110        name: "mz_subscriptions",
111        category: RelationCategory::Basic,
112    },
113    Relation {
114        name: "mz_object_fully_qualified_names",
115        category: RelationCategory::Basic,
116    },
117    Relation {
118        name: "mz_sessions",
119        category: RelationCategory::Basic,
120    },
121    Relation {
122        name: "mz_object_history",
123        category: RelationCategory::Basic,
124    },
125    Relation {
126        name: "mz_object_lifetimes",
127        category: RelationCategory::Basic,
128    },
129    Relation {
130        name: "mz_object_dependencies",
131        category: RelationCategory::Basic,
132    },
133    Relation {
134        name: "mz_object_transitive_dependencies",
135        category: RelationCategory::Basic,
136    },
137    // Compute
138    Relation {
139        name: "mz_clusters",
140        category: RelationCategory::Basic,
141    },
142    Relation {
143        name: "mz_indexes",
144        category: RelationCategory::Basic,
145    },
146    Relation {
147        name: "mz_cluster_replicas",
148        category: RelationCategory::Basic,
149    },
150    Relation {
151        name: "mz_cluster_replica_sizes",
152        category: RelationCategory::Basic,
153    },
154    Relation {
155        name: "mz_cluster_replica_statuses",
156        category: RelationCategory::Basic,
157    },
158    Relation {
159        name: "mz_cluster_replica_metrics_history",
160        category: RelationCategory::Basic,
161    },
162    Relation {
163        name: "mz_compute_hydration_times",
164        category: RelationCategory::Retained,
165    },
166    Relation {
167        name: "mz_materialization_dependencies",
168        category: RelationCategory::Basic,
169    },
170    Relation {
171        name: "mz_cluster_replica_status_history",
172        category: RelationCategory::Basic,
173    },
174    // Freshness
175    Relation {
176        name: "mz_wallclock_global_lag_recent_history",
177        category: RelationCategory::Basic,
178    },
179    Relation {
180        name: "mz_global_frontiers",
181        category: RelationCategory::Basic,
182    },
183    Relation {
184        name: "mz_cluster_replica_frontiers",
185        category: RelationCategory::Basic,
186    },
187    Relation {
188        name: "mz_materialization_lag",
189        category: RelationCategory::Basic,
190    },
191    // Sources/sinks
192    Relation {
193        name: "mz_source_statistics_with_history",
194        category: RelationCategory::Basic,
195    },
196    Relation {
197        name: "mz_source_statistics_with_history",
198        category: RelationCategory::Retained,
199    },
200    Relation {
201        name: "mz_sink_statistics",
202        category: RelationCategory::Basic,
203    },
204    Relation {
205        name: "mz_sink_statistics",
206        category: RelationCategory::Retained,
207    },
208    Relation {
209        name: "mz_source_statuses",
210        category: RelationCategory::Basic,
211    },
212    Relation {
213        name: "mz_sink_statuses",
214        category: RelationCategory::Basic,
215    },
216    Relation {
217        name: "mz_source_status_history",
218        category: RelationCategory::Basic,
219    },
220    Relation {
221        name: "mz_sink_status_history",
222        category: RelationCategory::Basic,
223    },
224    // Refresh every information
225    Relation {
226        name: "mz_materialized_view_refresh_strategies",
227        category: RelationCategory::Basic,
228    },
229    Relation {
230        name: "mz_cluster_schedules",
231        category: RelationCategory::Basic,
232    },
233    // Persist
234    Relation {
235        name: "mz_recent_storage_usage",
236        category: RelationCategory::Basic,
237    },
238    // Introspection relations
239    Relation {
240        name: "mz_arrangement_sharing_per_worker",
241        category: RelationCategory::Introspection,
242    },
243    Relation {
244        name: "mz_arrangement_sharing",
245        category: RelationCategory::Introspection,
246    },
247    Relation {
248        name: "mz_arrangement_sizes_per_worker",
249        category: RelationCategory::Introspection,
250    },
251    Relation {
252        name: "mz_dataflow_channels",
253        category: RelationCategory::Introspection,
254    },
255    Relation {
256        name: "mz_dataflow_operators",
257        category: RelationCategory::Introspection,
258    },
259    Relation {
260        name: "mz_dataflow_global_ids",
261        category: RelationCategory::Introspection,
262    },
263    Relation {
264        name: "mz_dataflow_operator_dataflows_per_worker",
265        category: RelationCategory::Introspection,
266    },
267    Relation {
268        name: "mz_dataflow_operator_dataflows",
269        category: RelationCategory::Introspection,
270    },
271    Relation {
272        name: "mz_dataflow_operator_parents_per_worker",
273        category: RelationCategory::Introspection,
274    },
275    Relation {
276        name: "mz_dataflow_operator_parents",
277        category: RelationCategory::Introspection,
278    },
279    Relation {
280        name: "mz_compute_exports",
281        category: RelationCategory::Introspection,
282    },
283    Relation {
284        name: "mz_dataflow_arrangement_sizes",
285        category: RelationCategory::Introspection,
286    },
287    Relation {
288        name: "mz_expected_group_size_advice",
289        category: RelationCategory::Introspection,
290    },
291    Relation {
292        name: "mz_compute_frontiers",
293        category: RelationCategory::Introspection,
294    },
295    Relation {
296        name: "mz_dataflow_channel_operators_per_worker",
297        category: RelationCategory::Introspection,
298    },
299    Relation {
300        name: "mz_dataflow_channel_operators",
301        category: RelationCategory::Introspection,
302    },
303    Relation {
304        name: "mz_compute_import_frontiers",
305        category: RelationCategory::Introspection,
306    },
307    Relation {
308        name: "mz_message_counts_per_worker",
309        category: RelationCategory::Introspection,
310    },
311    Relation {
312        name: "mz_message_counts",
313        category: RelationCategory::Introspection,
314    },
315    Relation {
316        name: "mz_active_peeks",
317        category: RelationCategory::Introspection,
318    },
319    Relation {
320        name: "mz_compute_operator_durations_histogram_per_worker",
321        category: RelationCategory::Introspection,
322    },
323    Relation {
324        name: "mz_compute_operator_durations_histogram",
325        category: RelationCategory::Introspection,
326    },
327    Relation {
328        name: "mz_records_per_dataflow_operator_per_worker",
329        category: RelationCategory::Introspection,
330    },
331    Relation {
332        name: "mz_records_per_dataflow_operator",
333        category: RelationCategory::Introspection,
334    },
335    Relation {
336        name: "mz_records_per_dataflow_per_worker",
337        category: RelationCategory::Introspection,
338    },
339    Relation {
340        name: "mz_records_per_dataflow",
341        category: RelationCategory::Introspection,
342    },
343    Relation {
344        name: "mz_peek_durations_histogram_per_worker",
345        category: RelationCategory::Introspection,
346    },
347    Relation {
348        name: "mz_peek_durations_histogram",
349        category: RelationCategory::Introspection,
350    },
351    Relation {
352        name: "mz_dataflow_shutdown_durations_histogram_per_worker",
353        category: RelationCategory::Introspection,
354    },
355    Relation {
356        name: "mz_dataflow_shutdown_durations_histogram",
357        category: RelationCategory::Introspection,
358    },
359    Relation {
360        name: "mz_scheduling_elapsed_per_worker",
361        category: RelationCategory::Introspection,
362    },
363    Relation {
364        name: "mz_scheduling_elapsed",
365        category: RelationCategory::Introspection,
366    },
367    Relation {
368        name: "mz_scheduling_parks_histogram_per_worker",
369        category: RelationCategory::Introspection,
370    },
371    Relation {
372        name: "mz_scheduling_parks_histogram",
373        category: RelationCategory::Introspection,
374    },
375    Relation {
376        name: "mz_compute_lir_mapping_per_worker",
377        category: RelationCategory::Introspection,
378    },
379    Relation {
380        name: "mz_lir_mapping",
381        category: RelationCategory::Introspection,
382    },
383    Relation {
384        name: "mz_compute_error_counts",
385        category: RelationCategory::Introspection,
386    },
387    Relation {
388        name: "mz_compute_error_counts_per_worker",
389        category: RelationCategory::Introspection,
390    },
391    // Relations that are redundant with some of the above, but
392    // are used by the Console.
393    Relation {
394        name: "mz_cluster_replica_metrics_history",
395        category: RelationCategory::Basic,
396    },
397    Relation {
398        name: "mz_webhook_sources",
399        category: RelationCategory::Basic,
400    },
401    Relation {
402        name: "mz_cluster_replica_history",
403        category: RelationCategory::Basic,
404    },
405    Relation {
406        name: "mz_source_statistics",
407        category: RelationCategory::Basic,
408    },
409    Relation {
410        name: "mz_cluster_deployment_lineage",
411        category: RelationCategory::Basic,
412    },
413    Relation {
414        name: "mz_show_indexes",
415        category: RelationCategory::Basic,
416    },
417    Relation {
418        name: "mz_relations",
419        category: RelationCategory::Basic,
420    },
421    Relation {
422        name: "mz_frontiers",
423        category: RelationCategory::Basic,
424    },
425    Relation {
426        name: "mz_console_cluster_utilization_overview",
427        category: RelationCategory::Basic,
428    },
429    Relation {
430        name: "mz_columns",
431        category: RelationCategory::Basic,
432    },
433    Relation {
434        name: "mz_kafka_sources",
435        category: RelationCategory::Basic,
436    },
437    Relation {
438        name: "mz_kafka_sinks",
439        category: RelationCategory::Basic,
440    },
441];
442
443static PG_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
444/// Timeout for a query.
445// TODO (debug_tool3): Make this configurable.
446static PG_QUERY_TIMEOUT: Duration = Duration::from_secs(20);
447
448/// The maximum number of errors we tolerate for a cluster replica.
449/// If a cluster replica has more than this many errors, we skip it.
450static MAX_CLUSTER_REPLICA_ERROR_COUNT: usize = 3;
451
452static SET_SEARCH_PATH_QUERY: &str = "SET search_path = mz_internal, mz_catalog, mz_introspection";
453static SELECT_CLUSTER_REPLICAS_QUERY: &str = "SELECT c.name as cluster_name, cr.name as replica_name FROM mz_clusters AS c JOIN mz_cluster_replicas AS cr ON c.id = cr.cluster_id;";
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash)]
456pub struct ClusterReplica {
457    pub cluster_name: String,
458    pub replica_name: String,
459}
460
461impl Default for ClusterReplica {
462    fn default() -> Self {
463        Self {
464            cluster_name: "mz_catalog_server".to_string(),
465            replica_name: "r1".to_string(),
466        }
467    }
468}
469
470impl fmt::Display for ClusterReplica {
471    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472        write!(f, "{}.{}", self.cluster_name, self.replica_name)
473    }
474}
475
476pub struct SystemCatalogDumper {
477    base_path: PathBuf,
478    pg_client: Arc<Mutex<PgClient>>,
479    pg_tls: MakeTlsConnector,
480    cluster_replicas: Vec<ClusterReplica>,
481    _pg_conn_handle: JoinHandle<Result<(), tokio_postgres::Error>>,
482}
483
484pub async fn create_postgres_connection(
485    connection_string: &str,
486) -> Result<
487    (
488        PgClient,
489        Connection<Socket, TlsStream<Socket>>,
490        MakeTlsConnector,
491    ),
492    anyhow::Error,
493> {
494    let mut pg_config = PgConfig::from_str(connection_string)?;
495    pg_config.connect_timeout(PG_CONNECTION_TIMEOUT);
496    let tls = make_tls(&pg_config)?;
497
498    let host_addr = pg_config.get_hosts().first();
499    let port = pg_config.get_ports().first();
500
501    // The original connection string can contain the username and password, so we only print the host and port.
502    let redacted_connection_string = if let (Some(host_addr), Some(port)) = (host_addr, port) {
503        format!(" at {:?} on port {}", host_addr, port)
504    } else {
505        "".to_string()
506    };
507
508    info!(
509        "Connecting to PostgreSQL server{}",
510        redacted_connection_string
511    );
512
513    let (pg_client, pg_conn) = retry::Retry::default()
514        .max_duration(PG_CONNECTION_TIMEOUT)
515        .retry_async_canceling(|_| {
516            let pg_config = pg_config.clone();
517            let tls = tls.clone();
518            async move { pg_config.connect(tls).await.map_err(|e| anyhow::anyhow!(e)) }
519        })
520        .await?;
521
522    info!(
523        "Connected to PostgreSQL server{}",
524        redacted_connection_string
525    );
526
527    Ok((pg_client, pg_conn, tls))
528}
529
530pub async fn write_copy_stream(
531    transaction: &Transaction<'_>,
532    copy_query: &str,
533    file: &mut tokio::fs::File,
534    relation_name: &str,
535) -> Result<(), anyhow::Error> {
536    let copy_stream = transaction
537        .copy_out(copy_query)
538        .await
539        .context(format!("Failed to COPY TO for {}", relation_name))?
540        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
541    let copy_stream = std::pin::pin!(copy_stream);
542    let mut reader = StreamReader::new(copy_stream);
543    tokio::io::copy(&mut reader, file).await?;
544    // Ensure the file is flushed to disk.
545    file.sync_all().await?;
546
547    Ok::<(), anyhow::Error>(())
548}
549
550pub async fn copy_relation_to_csv(
551    transaction: &Transaction<'_>,
552    file_path_name: PathBuf,
553    column_names: &Vec<String>,
554    relation: &Relation,
555) -> Result<(), anyhow::Error> {
556    let mut file = tokio::fs::File::create(&file_path_name).await?;
557
558    match relation.category {
559        RelationCategory::Retained => {
560            let mut writer = AsyncSerializer::from_writer(file);
561            writer.serialize(column_names).await?;
562
563            transaction
564                .execute(
565                    &format!("DECLARE c CURSOR FOR SUBSCRIBE TO {}", relation.name),
566                    &[],
567                )
568                .await
569                .context("Failed to declare cursor")?;
570
571            // We need to use simple_query, otherwise tokio-postgres will run an introspection SELECT query to figure out the types since it'll
572            // try to prepare the query. This causes an error since SUBSCRIBEs and SELECT queries are not allowed to be executed in the same transaction.
573            // Thus we use simple_query to avoid the introspection query.
574            let rows = transaction
575                // We use a timeout of '1' to receive the snapshot of the current state. A timeout of '0' will return no results.
576                // We also don't care if we get more than just the snapshot.
577                .simple_query("FETCH ALL FROM c WITH (TIMEOUT '1')")
578                .await
579                .context("Failed to fetch all from cursor")?;
580
581            for row in rows {
582                if let SimpleQueryMessage::Row(row) = row {
583                    let values: Vec<&str> = (0..row.len())
584                        .map(|i| row.get(i).unwrap_or("")) // Convert each field to String
585                        .collect();
586                    writer.serialize(&values).await?;
587                }
588            }
589        }
590        _ => {
591            // TODO (SangJunBak): Use `WITH (HEADER TRUE)` once database-issues#2846 is implemented.
592            file.write_all((column_names.join(",") + "\n").as_bytes())
593                .await?;
594            let copy_query = format!(
595                "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)",
596                relation.name
597            );
598            write_copy_stream(transaction, &copy_query, &mut file, relation.name).await?;
599        }
600    };
601
602    info!("Copied {} to {}", relation.name, file_path_name.display());
603    Ok::<(), anyhow::Error>(())
604}
605
606pub async fn query_column_names(
607    pg_client: &PgClient,
608    relation: &Relation,
609) -> Result<Vec<String>, anyhow::Error> {
610    let relation_name = relation.name;
611    // We query the column names to write the header row of the CSV file.
612    let mut column_names = pg_client
613        .query(&format!("SHOW COLUMNS FROM {}", &relation_name), &[])
614        .await
615        .context(format!("Failed to get column names for {}", relation_name))?
616        .into_iter()
617        .map(|row| match row.try_get::<_, String>("name") {
618            Ok(name) => Some(name),
619            Err(_) => None,
620        })
621        .filter_map(|row| row)
622        .collect::<Vec<_>>();
623
624    match relation.category {
625        RelationCategory::Retained => {
626            column_names.splice(0..0, ["mz_timestamp".to_string(), "mz_diff".to_string()]);
627        }
628        _ => (),
629    }
630
631    Ok(column_names)
632}
633
634pub async fn query_relation(
635    transaction: &Transaction<'_>,
636    base_path: PathBuf,
637    relation: &Relation,
638    column_names: &Vec<String>,
639    cluster_replica: Option<&ClusterReplica>,
640) -> Result<(), anyhow::Error> {
641    let relation_name = relation.name;
642    let relation_category = &relation.category;
643
644    // Some queries (i.e. mz_introspection relations) require the cluster and replica to be set.
645    if let Some(cluster_replica) = &cluster_replica {
646        transaction
647            .execute(
648                &format!("SET LOCAL CLUSTER = '{}'", cluster_replica.cluster_name),
649                &[],
650            )
651            .await
652            .context(format!(
653                "Failed to set cluster to {}",
654                cluster_replica.cluster_name
655            ))?;
656        transaction
657            .execute(
658                &format!(
659                    "SET LOCAL CLUSTER_REPLICA = '{}'",
660                    cluster_replica.replica_name
661                ),
662                &[],
663            )
664            .await
665            .context(format!(
666                "Failed to set cluster replica to {}",
667                cluster_replica.replica_name
668            ))?;
669    }
670
671    match relation_category {
672        RelationCategory::Basic => {
673            let file_path = format_file_path(base_path, None);
674            let file_path_name = file_path.join(relation_name).with_extension("csv");
675            tokio::fs::create_dir_all(&file_path).await?;
676
677            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
678        }
679        RelationCategory::Introspection => {
680            let file_path = format_file_path(base_path, cluster_replica);
681            tokio::fs::create_dir_all(&file_path).await?;
682
683            let file_path_name = file_path.join(relation_name).with_extension("csv");
684
685            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
686        }
687        RelationCategory::Retained => {
688            // Copy the current state and retained subscribe state
689            let file_path = format_file_path(base_path, None);
690            let file_path_name = file_path
691                .join(format!("{}_subscribe", relation_name))
692                .with_extension("csv");
693            tokio::fs::create_dir_all(&file_path).await?;
694
695            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
696        }
697    }
698    Ok::<(), anyhow::Error>(())
699}
700
701impl SystemCatalogDumper {
702    pub async fn new(connection_url: &str, base_path: PathBuf) -> Result<Self, anyhow::Error> {
703        let (pg_client, pg_conn, pg_tls) = create_postgres_connection(connection_url).await?;
704
705        let handle = task::spawn(|| "postgres-connection", pg_conn);
706
707        // Set search path to system catalog tables
708        pg_client
709            .execute(SET_SEARCH_PATH_QUERY, &[])
710            .await
711            .context("Failed to set search path")?;
712
713        // We need to get all cluster replicas to dump introspection relations.
714        let cluster_replicas = match pg_client.query(SELECT_CLUSTER_REPLICAS_QUERY, &[]).await {
715            Ok(rows) => rows
716                .into_iter()
717                .map(|row| {
718                    let cluster_name = row.try_get::<_, String>("cluster_name");
719                    let replica_name = row.try_get::<_, String>("replica_name");
720
721                    if let (Ok(cluster_name), Ok(replica_name)) = (cluster_name, replica_name) {
722                        Some(ClusterReplica {
723                            cluster_name,
724                            replica_name,
725                        })
726                    } else {
727                        None
728                    }
729                })
730                .filter_map(|row| row)
731                .collect::<Vec<_>>(),
732            Err(e) => {
733                warn!("Failed to get replica names: {}", e);
734                vec![]
735            }
736        };
737
738        Ok(Self {
739            base_path,
740            pg_client: Arc::new(Mutex::new(pg_client)),
741            pg_tls,
742            cluster_replicas,
743            _pg_conn_handle: handle,
744        })
745    }
746
747    pub async fn dump_relation(
748        &self,
749        relation: &Relation,
750        cluster_replica: Option<&ClusterReplica>,
751    ) -> Result<(), anyhow::Error> {
752        info!(
753            "Copying relation {}{}{}",
754            relation.name,
755            match relation.category {
756                RelationCategory::Retained => " (subscribe history)",
757                _ => "",
758            },
759            cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" in {}", replica))
760        );
761
762        let base_path = self.base_path.clone();
763        let pg_client = &self.pg_client;
764
765        let relation_name = relation.name.to_string();
766
767        if let Err(err) = retry::Retry::default()
768            .max_duration(PG_QUERY_TIMEOUT)
769            .initial_backoff(Duration::from_secs(2))
770            .retry_async_canceling(|_| {
771                let base_path = base_path.clone();
772                let relation_name = relation.name;
773                let cluster_replica = cluster_replica.clone();
774
775                async move {
776                    // TODO (debug_tool3): Use a transaction for the entire dump instead of per query.
777                    let mut pg_client = pg_client.lock().await;
778
779                    match async {
780                        // We cannot query the column names in the transaction because SUBSCRIBE queries
781                        // cannot be executed with SELECT and SHOW queries in the same transaction.
782                        let column_names = query_column_names(&pg_client, relation).await?;
783
784                        let transaction = pg_client.transaction().await?;
785                        query_relation(
786                            &transaction,
787                            base_path,
788                            relation,
789                            &column_names,
790                            cluster_replica,
791                        )
792                        .await?;
793
794                        Ok::<(), anyhow::Error>(())
795                    }
796                    .await
797                    {
798                        Ok(()) => Ok(()),
799                        Err(err) => {
800                            warn!(
801                                "{}: {:#}. Retrying...",
802                                format_catalog_dump_error_message(relation_name, cluster_replica),
803                                err
804                            );
805                            Err(err)
806                        }
807                    }
808                }
809            })
810            .await
811        {
812            let pg_client_lock = pg_client.lock().await;
813
814            let cancel_token = pg_client_lock.cancel_token();
815
816            if let Err(_) = async {
817                let tls = self.pg_tls.clone();
818
819                cancel_token.cancel_query(tls).await?;
820                Ok::<(), anyhow::Error>(())
821            }
822            .await
823            {
824                warn!(
825                    "Failed to cancel query for {}{}",
826                    relation_name,
827                    cluster_replica
828                        .map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
829                );
830            }
831
832            return Err(err);
833        }
834
835        Ok(())
836    }
837
838    pub async fn dump_all_relations(&self) {
839        let cluster_replicas = &self.cluster_replicas;
840
841        // Create a map to count errors by cluster replica..
842        let mut cluster_replica_error_counts: HashMap<ClusterReplica, usize> = HashMap::new();
843        for replica in cluster_replicas {
844            cluster_replica_error_counts
845                .entry(replica.clone())
846                .insert_entry(0);
847        }
848
849        let non_introspection_iter = RELATIONS
850            .iter()
851            .filter(|relation| {
852                matches!(
853                    relation.category,
854                    RelationCategory::Basic | RelationCategory::Retained
855                )
856            })
857            .map(|relation| (relation, None::<&ClusterReplica>));
858
859        let introspection_iter = RELATIONS
860            .iter()
861            .filter(|relation| matches!(relation.category, RelationCategory::Introspection))
862            .collect::<Vec<_>>();
863
864        let introspection_iter = cluster_replicas.iter().flat_map(|replica| {
865            introspection_iter
866                .iter()
867                .map(move |relation| (*relation, Some(replica)))
868        });
869
870        // Combine and iterate over all relation/replica pairs
871        for (relation, replica) in non_introspection_iter.chain(introspection_iter) {
872            let replica_key = if let Some(replica) = replica {
873                replica
874            } else {
875                // If the replica is null, we assume it's  mz_catalog_server.
876                &ClusterReplica::default()
877            };
878
879            // If the cluster replica has more than `MAX_CLUSTER_REPLICA_ERROR_COUNT` errors,
880            // we can skip it since we can assume it's not responsive and don't want to hold up
881            // following queries.
882            if cluster_replica_error_counts.get(replica_key).unwrap_or(&0)
883                >= &MAX_CLUSTER_REPLICA_ERROR_COUNT
884            {
885                info!(
886                    "Skipping {}{}",
887                    relation.name,
888                    replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
889                );
890                continue;
891            }
892
893            if let Err(err) = self.dump_relation(relation, replica).await {
894                warn!(
895                    "{}: {:#}.",
896                    format_catalog_dump_error_message(relation.name, replica),
897                    err,
898                );
899
900                if err.to_string().contains("deadline has elapsed") {
901                    let docs_link = if replica.is_none()
902                        || replica.map_or(false, |r| r.cluster_name == "mz_catalog_server")
903                    {
904                        "https://materialize.com/docs/installation/troubleshooting/#troubleshooting-console-unresponsiveness"
905                    } else {
906                        "https://materialize.com/docs/sql/alter-cluster/#resizing-1"
907                    };
908                    warn!("Consider increasing the size of the cluster {}", docs_link);
909                }
910
911                let is_missing_catalog_item_err = match err.downcast_ref::<tokio_postgres::Error>()
912                {
913                    Some(pg_err) => pg_err
914                        .to_string()
915                        .to_lowercase()
916                        .contains("unknown catalog item"),
917                    None => false,
918                };
919
920                // If the error is due to a missing catalog item,
921                // we don't count it as an error since we expect some
922                // catalog items to be missing. This is because mz-debug
923                // is meant to be backwards compatible with older versions
924                // of Materialize.
925                if !is_missing_catalog_item_err {
926                    cluster_replica_error_counts
927                        .entry(replica_key.clone())
928                        .and_modify(|count| *count += 1)
929                        .or_insert(1);
930                }
931            }
932        }
933    }
934}
935
936fn format_catalog_dump_error_message(
937    relation_name: &str,
938    cluster_replica: Option<&ClusterReplica>,
939) -> String {
940    format!(
941        "Failed to dump relation {}{}",
942        relation_name,
943        cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
944    )
945}
946
947fn format_file_path(base_path: PathBuf, cluster_replica: Option<&ClusterReplica>) -> PathBuf {
948    let path = base_path.join(SYSTEM_CATALOG_DUMP_DIR);
949    if let Some(cluster_replica) = cluster_replica {
950        path.join(cluster_replica.cluster_name.as_str())
951            .join(cluster_replica.replica_name.as_str())
952    } else {
953        path
954    }
955}