Skip to main content

mz_debug/
system_catalog_dumper.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Dumps catalog information to files.
17//! We run queries in serial rather than parallel because all queries in the pgwire
18//! connection are run in serial anyways. Running the queries in serial also makes
19//! cleaning up / aborting queries much easier.
20
21use anyhow::{Context as _, Result};
22use csv_async::AsyncSerializer;
23use futures::TryStreamExt;
24use mz_sql_parser::ast::display::escaped_string_literal;
25use mz_tls_util::make_tls;
26use std::fmt;
27use std::path::PathBuf;
28use std::str::FromStr;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::io::AsyncWriteExt;
32use tokio::sync::Mutex;
33use tokio_postgres::{
34    Client as PgClient, Config as PgConfig, Connection, SimpleQueryMessage, Socket, Transaction,
35};
36use tokio_util::io::StreamReader;
37
38use mz_ore::collections::HashMap;
39use mz_ore::retry::{self};
40use mz_ore::task::{self, JoinHandle};
41use postgres_openssl::{MakeTlsConnector, TlsStream};
42use tracing::{info, warn};
43
44#[derive(Debug, Clone)]
45pub enum RelationCategory {
46    /// For relations that belong in the `mz_introspection` schema.
47    /// These relations require a replica name to be specified.
48    Introspection,
49    /// For relations that are retained metric objects that we'd like to get the SUBSCRIBE output for.
50    Retained,
51    /// Other relations that we want to do a SELECT * FROM on.
52    Basic,
53    /// A custom query that doesn't map to a single relation.
54    Custom { sql: &'static str },
55}
56
57#[derive(Debug, Clone)]
58pub struct Relation {
59    pub name: &'static str,
60    pub category: RelationCategory,
61}
62
63static SYSTEM_CATALOG_DUMP_DIR: &str = "system_catalog";
64/// This list is used to determine which relations to dump.
65/// The relations are grouped and delimited by their category (i.e. Basic object information)
66static RELATIONS: &[Relation] = &[
67    // Basic object information
68    Relation {
69        name: "mz_audit_events",
70        category: RelationCategory::Basic,
71    },
72    Relation {
73        name: "mz_databases",
74        category: RelationCategory::Basic,
75    },
76    Relation {
77        name: "mz_schemas",
78        category: RelationCategory::Basic,
79    },
80    Relation {
81        name: "mz_tables",
82        category: RelationCategory::Basic,
83    },
84    Relation {
85        name: "mz_sources",
86        category: RelationCategory::Basic,
87    },
88    Relation {
89        name: "mz_sinks",
90        category: RelationCategory::Basic,
91    },
92    Relation {
93        name: "mz_views",
94        category: RelationCategory::Basic,
95    },
96    Relation {
97        name: "mz_materialized_views",
98        category: RelationCategory::Basic,
99    },
100    Relation {
101        name: "mz_secrets",
102        category: RelationCategory::Basic,
103    },
104    Relation {
105        name: "mz_connections",
106        category: RelationCategory::Basic,
107    },
108    Relation {
109        name: "mz_roles",
110        category: RelationCategory::Basic,
111    },
112    Relation {
113        name: "mz_subscriptions",
114        category: RelationCategory::Basic,
115    },
116    Relation {
117        name: "mz_object_fully_qualified_names",
118        category: RelationCategory::Basic,
119    },
120    Relation {
121        name: "mz_sessions",
122        category: RelationCategory::Basic,
123    },
124    Relation {
125        name: "mz_object_history",
126        category: RelationCategory::Basic,
127    },
128    Relation {
129        name: "mz_object_lifetimes",
130        category: RelationCategory::Basic,
131    },
132    Relation {
133        name: "mz_object_dependencies",
134        category: RelationCategory::Basic,
135    },
136    Relation {
137        name: "mz_object_transitive_dependencies",
138        category: RelationCategory::Basic,
139    },
140    // Compute
141    Relation {
142        name: "mz_clusters",
143        category: RelationCategory::Basic,
144    },
145    Relation {
146        name: "mz_indexes",
147        category: RelationCategory::Basic,
148    },
149    Relation {
150        name: "mz_cluster_replicas",
151        category: RelationCategory::Basic,
152    },
153    Relation {
154        name: "mz_cluster_replica_sizes",
155        category: RelationCategory::Basic,
156    },
157    Relation {
158        name: "mz_cluster_replica_statuses",
159        category: RelationCategory::Basic,
160    },
161    Relation {
162        name: "mz_cluster_replica_metrics_history",
163        category: RelationCategory::Basic,
164    },
165    Relation {
166        name: "mz_compute_hydration_times",
167        category: RelationCategory::Retained,
168    },
169    Relation {
170        name: "mz_materialization_dependencies",
171        category: RelationCategory::Basic,
172    },
173    Relation {
174        name: "mz_cluster_replica_status_history",
175        category: RelationCategory::Basic,
176    },
177    // Freshness
178    Relation {
179        name: "mz_wallclock_global_lag_recent_history",
180        category: RelationCategory::Basic,
181    },
182    Relation {
183        name: "mz_global_frontiers",
184        category: RelationCategory::Basic,
185    },
186    Relation {
187        name: "mz_cluster_replica_frontiers",
188        category: RelationCategory::Basic,
189    },
190    Relation {
191        name: "mz_materialization_lag",
192        category: RelationCategory::Basic,
193    },
194    // Sources/sinks
195    Relation {
196        name: "mz_source_statistics_with_history",
197        category: RelationCategory::Basic,
198    },
199    Relation {
200        name: "mz_source_statistics_with_history",
201        category: RelationCategory::Retained,
202    },
203    Relation {
204        name: "mz_sink_statistics",
205        category: RelationCategory::Basic,
206    },
207    Relation {
208        name: "mz_sink_statistics",
209        category: RelationCategory::Retained,
210    },
211    Relation {
212        name: "mz_source_statuses",
213        category: RelationCategory::Basic,
214    },
215    Relation {
216        name: "mz_sink_statuses",
217        category: RelationCategory::Basic,
218    },
219    Relation {
220        name: "mz_source_status_history",
221        category: RelationCategory::Basic,
222    },
223    Relation {
224        name: "mz_sink_status_history",
225        category: RelationCategory::Basic,
226    },
227    // Refresh every information
228    Relation {
229        name: "mz_materialized_view_refresh_strategies",
230        category: RelationCategory::Basic,
231    },
232    Relation {
233        name: "mz_cluster_schedules",
234        category: RelationCategory::Basic,
235    },
236    // Persist
237    Relation {
238        name: "mz_recent_storage_usage",
239        category: RelationCategory::Basic,
240    },
241    // Introspection relations
242    Relation {
243        name: "mz_arrangement_sharing_per_worker",
244        category: RelationCategory::Introspection,
245    },
246    Relation {
247        name: "mz_arrangement_sharing",
248        category: RelationCategory::Introspection,
249    },
250    Relation {
251        name: "mz_arrangement_sizes_per_worker",
252        category: RelationCategory::Introspection,
253    },
254    Relation {
255        name: "mz_dataflow_channels",
256        category: RelationCategory::Introspection,
257    },
258    Relation {
259        name: "mz_dataflow_operators",
260        category: RelationCategory::Introspection,
261    },
262    Relation {
263        name: "mz_dataflow_global_ids",
264        category: RelationCategory::Introspection,
265    },
266    Relation {
267        name: "mz_dataflow_operator_dataflows_per_worker",
268        category: RelationCategory::Introspection,
269    },
270    Relation {
271        name: "mz_dataflow_operator_dataflows",
272        category: RelationCategory::Introspection,
273    },
274    Relation {
275        name: "mz_dataflow_operator_parents_per_worker",
276        category: RelationCategory::Introspection,
277    },
278    Relation {
279        name: "mz_dataflow_operator_parents",
280        category: RelationCategory::Introspection,
281    },
282    Relation {
283        name: "mz_compute_exports",
284        category: RelationCategory::Introspection,
285    },
286    Relation {
287        name: "mz_dataflow_arrangement_sizes",
288        category: RelationCategory::Introspection,
289    },
290    Relation {
291        name: "mz_expected_group_size_advice",
292        category: RelationCategory::Introspection,
293    },
294    Relation {
295        name: "mz_compute_frontiers",
296        category: RelationCategory::Introspection,
297    },
298    Relation {
299        name: "mz_dataflow_channel_operators_per_worker",
300        category: RelationCategory::Introspection,
301    },
302    Relation {
303        name: "mz_dataflow_channel_operators",
304        category: RelationCategory::Introspection,
305    },
306    Relation {
307        name: "mz_compute_import_frontiers",
308        category: RelationCategory::Introspection,
309    },
310    Relation {
311        name: "mz_message_counts_per_worker",
312        category: RelationCategory::Introspection,
313    },
314    Relation {
315        name: "mz_message_counts",
316        category: RelationCategory::Introspection,
317    },
318    Relation {
319        name: "mz_active_peeks",
320        category: RelationCategory::Introspection,
321    },
322    Relation {
323        name: "mz_compute_operator_durations_histogram_per_worker",
324        category: RelationCategory::Introspection,
325    },
326    Relation {
327        name: "mz_compute_operator_durations_histogram",
328        category: RelationCategory::Introspection,
329    },
330    Relation {
331        name: "mz_records_per_dataflow_operator_per_worker",
332        category: RelationCategory::Introspection,
333    },
334    Relation {
335        name: "mz_records_per_dataflow_operator",
336        category: RelationCategory::Introspection,
337    },
338    Relation {
339        name: "mz_records_per_dataflow_per_worker",
340        category: RelationCategory::Introspection,
341    },
342    Relation {
343        name: "mz_records_per_dataflow",
344        category: RelationCategory::Introspection,
345    },
346    Relation {
347        name: "mz_peek_durations_histogram_per_worker",
348        category: RelationCategory::Introspection,
349    },
350    Relation {
351        name: "mz_peek_durations_histogram",
352        category: RelationCategory::Introspection,
353    },
354    Relation {
355        name: "mz_scheduling_elapsed_per_worker",
356        category: RelationCategory::Introspection,
357    },
358    Relation {
359        name: "mz_scheduling_elapsed",
360        category: RelationCategory::Introspection,
361    },
362    Relation {
363        name: "mz_scheduling_parks_histogram_per_worker",
364        category: RelationCategory::Introspection,
365    },
366    Relation {
367        name: "mz_scheduling_parks_histogram",
368        category: RelationCategory::Introspection,
369    },
370    Relation {
371        name: "mz_compute_lir_mapping_per_worker",
372        category: RelationCategory::Introspection,
373    },
374    Relation {
375        name: "mz_lir_mapping",
376        category: RelationCategory::Introspection,
377    },
378    Relation {
379        name: "mz_compute_error_counts",
380        category: RelationCategory::Introspection,
381    },
382    Relation {
383        name: "mz_compute_error_counts_per_worker",
384        category: RelationCategory::Introspection,
385    },
386    // Relations that are redundant with some of the above, but
387    // are used by the Console.
388    Relation {
389        name: "mz_cluster_replica_metrics_history",
390        category: RelationCategory::Basic,
391    },
392    Relation {
393        name: "mz_webhook_sources",
394        category: RelationCategory::Basic,
395    },
396    Relation {
397        name: "mz_cluster_replica_history",
398        category: RelationCategory::Basic,
399    },
400    Relation {
401        name: "mz_source_statistics",
402        category: RelationCategory::Basic,
403    },
404    Relation {
405        name: "mz_cluster_deployment_lineage",
406        category: RelationCategory::Basic,
407    },
408    Relation {
409        name: "mz_show_indexes",
410        category: RelationCategory::Basic,
411    },
412    Relation {
413        name: "mz_relations",
414        category: RelationCategory::Basic,
415    },
416    Relation {
417        name: "mz_frontiers",
418        category: RelationCategory::Basic,
419    },
420    Relation {
421        name: "mz_console_cluster_utilization_overview",
422        category: RelationCategory::Basic,
423    },
424    Relation {
425        name: "mz_columns",
426        category: RelationCategory::Basic,
427    },
428    Relation {
429        name: "mz_kafka_sources",
430        category: RelationCategory::Basic,
431    },
432    Relation {
433        name: "mz_kafka_sinks",
434        category: RelationCategory::Basic,
435    },
436    // Custom queries
437    Relation {
438        name: "daily_replica_credit_usage",
439        category: RelationCategory::Custom {
440            sql: r#"
441-- Capture replica create and drop events from the audit log
442with replica_events as (
443    select
444        details ->> 'replica_id' as replica_id,
445        details ->> 'replica_name' as replica_name,
446        details ->> 'cluster_name' as cluster_name,
447        details ->> 'logical_size' as replica_size,
448        event_type,
449        occurred_at
450    from mz_catalog.mz_audit_events
451    where
452        object_type = 'cluster-replica'
453        and event_type in ('create', 'drop')
454        and details ->> 'replica_id' like 'u%'
455),
456
457-- Pair each replica creation with its corresponding drop to determine lifespan
458replica_lifespans as (
459    select
460        c.replica_id,
461        c.replica_name,
462        c.cluster_name,
463        c.replica_size,
464        c.occurred_at as created_at,
465        min(d.occurred_at) as dropped_at
466    from replica_events as c
467    left join replica_events as d
468        on  c.replica_id = d.replica_id
469        and d.event_type = 'drop'
470        and c.occurred_at < d.occurred_at
471    where c.event_type = 'create'
472    group by
473        c.replica_id,
474        c.replica_name,
475        c.cluster_name,
476        c.replica_size,
477        c.occurred_at
478),
479
480-- Break replica lifespans into per-day slices and compute seconds online per day
481daily_slices as (
482    select
483        l.replica_id,
484        l.replica_name,
485        l.cluster_name,
486        l.replica_size,
487        gs.day_start::date as usage_date,
488        greatest(
489            0::numeric,
490            extract(
491                epoch from
492                    least(coalesce(l.dropped_at, now()), gs.day_start + interval '1 day')
493                  - greatest(l.created_at, gs.day_start)
494            )::numeric
495        ) as seconds_online
496    from replica_lifespans as l
497    cross join lateral generate_series(
498        date_trunc('day', l.created_at),
499        date_trunc('day', coalesce(l.dropped_at, now())),
500        interval '1 day'
501    ) as gs(day_start)
502),
503
504-- Resolve replica heap limits from configured size definitions
505size_heap as (
506    select
507        size,
508        memory_bytes + disk_bytes as heap_limit
509    from mz_catalog.mz_cluster_replica_sizes
510    where disk_bytes is not null and disk_bytes > 0
511),
512
513-- Resolve replica heap limits from runtime metrics,
514-- since heap_limit is only discovered when the replica comes online
515metrics_heap as (
516    select distinct on (replica_id)
517        replica_id,
518        coalesce(heap_limit, memory_bytes + disk_bytes) as heap_limit
519    from mz_internal.mz_cluster_replica_metrics_history
520)
521
522-- Aggregate daily usage, report daily footprint, and compute
523-- M.1 credit equivalency (53 GiB-hour = 1.5 credits), billed per second
524select
525    s.usage_date,
526    mz_environment_id() as env_id,
527
528    sum(coalesce(sh.heap_limit, mh.heap_limit)) as total_heap_limit,
529
530    sum(coalesce(sh.heap_limit, mh.heap_limit) * s.seconds_online)
531        / (53.0 * 1024 * 1024 * 1024 * 3600.0)
532        * 1.5
533        as credit_equivalency
534from daily_slices as s
535left join size_heap as sh on s.replica_size = sh.size
536left join metrics_heap as mh on s.replica_id = mh.replica_id
537where s.seconds_online > 0
538group by s.usage_date
539order by s.usage_date;
540"#,
541        },
542    },
543];
544
545static PG_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
546/// Timeout for a query.
547// TODO (debug_tool3): Make this configurable.
548static PG_QUERY_TIMEOUT: Duration = Duration::from_secs(20);
549
550/// The maximum number of errors we tolerate for a cluster replica.
551/// If a cluster replica has more than this many errors, we skip it.
552static MAX_CLUSTER_REPLICA_ERROR_COUNT: usize = 3;
553
554static SET_SEARCH_PATH_QUERY: &str = "SET search_path = mz_internal, mz_catalog, mz_introspection";
555static SELECT_CLUSTER_REPLICAS_QUERY: &str = "SELECT c.name as cluster_name, cr.name as replica_name FROM mz_clusters AS c JOIN mz_cluster_replicas AS cr ON c.id = cr.cluster_id;";
556
557#[derive(Debug, Clone, PartialEq, Eq, Hash)]
558pub struct ClusterReplica {
559    pub cluster_name: String,
560    pub replica_name: String,
561}
562
563impl Default for ClusterReplica {
564    fn default() -> Self {
565        Self {
566            cluster_name: "mz_catalog_server".to_string(),
567            replica_name: "r1".to_string(),
568        }
569    }
570}
571
572impl fmt::Display for ClusterReplica {
573    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
574        write!(f, "{}.{}", self.cluster_name, self.replica_name)
575    }
576}
577
578pub struct SystemCatalogDumper {
579    base_path: PathBuf,
580    pg_client: Arc<Mutex<PgClient>>,
581    pg_tls: MakeTlsConnector,
582    cluster_replicas: Vec<ClusterReplica>,
583    _pg_conn_handle: JoinHandle<Result<(), tokio_postgres::Error>>,
584}
585
586pub async fn create_postgres_connection(
587    connection_string: &str,
588) -> Result<
589    (
590        PgClient,
591        Connection<Socket, TlsStream<Socket>>,
592        MakeTlsConnector,
593    ),
594    anyhow::Error,
595> {
596    let mut pg_config = PgConfig::from_str(connection_string)?;
597    pg_config.connect_timeout(PG_CONNECTION_TIMEOUT);
598    let tls = make_tls(&pg_config)?;
599
600    let host_addr = pg_config.get_hosts().first();
601    let port = pg_config.get_ports().first();
602
603    // The original connection string can contain the username and password, so we only print the host and port.
604    let redacted_connection_string = if let (Some(host_addr), Some(port)) = (host_addr, port) {
605        format!(" at {:?} on port {}", host_addr, port)
606    } else {
607        "".to_string()
608    };
609
610    info!(
611        "Connecting to PostgreSQL server{}",
612        redacted_connection_string
613    );
614
615    let (pg_client, pg_conn) = retry::Retry::default()
616        .max_duration(PG_CONNECTION_TIMEOUT)
617        .retry_async_canceling(|_| {
618            let pg_config = pg_config.clone();
619            let tls = tls.clone();
620            async move { pg_config.connect(tls).await.map_err(|e| anyhow::anyhow!(e)) }
621        })
622        .await?;
623
624    info!(
625        "Connected to PostgreSQL server{}",
626        redacted_connection_string
627    );
628
629    Ok((pg_client, pg_conn, tls))
630}
631
632pub async fn write_copy_stream(
633    transaction: &Transaction<'_>,
634    copy_query: &str,
635    file: &mut tokio::fs::File,
636    relation_name: &str,
637) -> Result<(), anyhow::Error> {
638    let copy_stream = transaction
639        .copy_out(copy_query)
640        .await
641        .context(format!("Failed to COPY TO for {}", relation_name))?
642        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
643    let copy_stream = std::pin::pin!(copy_stream);
644    let mut reader = StreamReader::new(copy_stream);
645    tokio::io::copy(&mut reader, file).await?;
646    // Ensure the file is flushed to disk.
647    file.sync_all().await?;
648
649    Ok::<(), anyhow::Error>(())
650}
651
652pub async fn copy_relation_to_csv(
653    transaction: &Transaction<'_>,
654    file_path_name: PathBuf,
655    column_names: &Vec<String>,
656    relation: &Relation,
657) -> Result<(), anyhow::Error> {
658    let mut file = tokio::fs::File::create(&file_path_name).await?;
659
660    match relation.category {
661        RelationCategory::Retained => {
662            let mut writer = AsyncSerializer::from_writer(file);
663            writer.serialize(column_names).await?;
664
665            transaction
666                .execute(
667                    &format!("DECLARE c CURSOR FOR SUBSCRIBE TO {}", relation.name),
668                    &[],
669                )
670                .await
671                .context("Failed to declare cursor")?;
672
673            // We need to use simple_query, otherwise tokio-postgres will run an introspection SELECT query to figure out the types since it'll
674            // try to prepare the query. This causes an error since SUBSCRIBEs and SELECT queries are not allowed to be executed in the same transaction.
675            // Thus we use simple_query to avoid the introspection query.
676            let rows = transaction
677                // We use a timeout of '1' to receive the snapshot of the current state. A timeout of '0' will return no results.
678                // We also don't care if we get more than just the snapshot.
679                .simple_query("FETCH ALL FROM c WITH (TIMEOUT '1')")
680                .await
681                .context("Failed to fetch all from cursor")?;
682
683            for row in rows {
684                if let SimpleQueryMessage::Row(row) = row {
685                    let values: Vec<&str> = (0..row.len())
686                        .map(|i| row.get(i).unwrap_or("")) // Convert each field to String
687                        .collect();
688                    writer.serialize(&values).await?;
689                }
690            }
691        }
692        _ => {
693            // TODO (SangJunBak): Use `WITH (HEADER TRUE)` once database-issues#2846 is implemented.
694            file.write_all((column_names.join(",") + "\n").as_bytes())
695                .await?;
696            let copy_query = format!(
697                "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)",
698                relation.name
699            );
700            write_copy_stream(transaction, &copy_query, &mut file, relation.name).await?;
701        }
702    };
703
704    info!("Copied {} to {}", relation.name, file_path_name.display());
705    Ok::<(), anyhow::Error>(())
706}
707
708pub async fn query_column_names(
709    pg_client: &PgClient,
710    relation: &Relation,
711) -> Result<Vec<String>, anyhow::Error> {
712    let relation_name = relation.name;
713    // We query the column names to write the header row of the CSV file.
714    let mut column_names = pg_client
715        .query(&format!("SHOW COLUMNS FROM {}", &relation_name), &[])
716        .await
717        .context(format!("Failed to get column names for {}", relation_name))?
718        .into_iter()
719        .map(|row| match row.try_get::<_, String>("name") {
720            Ok(name) => Some(name),
721            Err(_) => None,
722        })
723        .filter_map(|row| row)
724        .collect::<Vec<_>>();
725
726    match relation.category {
727        RelationCategory::Retained => {
728            column_names.splice(0..0, ["mz_timestamp".to_string(), "mz_diff".to_string()]);
729        }
730        _ => (),
731    }
732
733    Ok(column_names)
734}
735
736pub async fn query_relation(
737    transaction: &Transaction<'_>,
738    base_path: PathBuf,
739    relation: &Relation,
740    column_names: &Vec<String>,
741    cluster_replica: Option<&ClusterReplica>,
742) -> Result<(), anyhow::Error> {
743    let relation_name = relation.name;
744    let relation_category = &relation.category;
745
746    // Some queries (i.e. mz_introspection relations) require the cluster and replica to be set.
747    if let Some(cluster_replica) = &cluster_replica {
748        transaction
749            .execute(
750                &format!(
751                    "SET LOCAL CLUSTER = {}",
752                    escaped_string_literal(&cluster_replica.cluster_name)
753                ),
754                &[],
755            )
756            .await
757            .context(format!(
758                "Failed to set cluster to {}",
759                cluster_replica.cluster_name
760            ))?;
761        transaction
762            .execute(
763                &format!(
764                    "SET LOCAL CLUSTER_REPLICA = {}",
765                    escaped_string_literal(&cluster_replica.replica_name)
766                ),
767                &[],
768            )
769            .await
770            .context(format!(
771                "Failed to set cluster replica to {}",
772                cluster_replica.replica_name
773            ))?;
774    }
775
776    match relation_category {
777        RelationCategory::Basic | RelationCategory::Custom { .. } => {
778            let file_path = format_file_path(base_path, None);
779            let file_path_name = file_path.join(relation_name).with_extension("csv");
780            tokio::fs::create_dir_all(&file_path).await?;
781
782            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
783        }
784        RelationCategory::Introspection => {
785            let file_path = format_file_path(base_path, cluster_replica);
786            tokio::fs::create_dir_all(&file_path).await?;
787
788            let file_path_name = file_path.join(relation_name).with_extension("csv");
789
790            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
791        }
792        RelationCategory::Retained => {
793            // Copy the current state and retained subscribe state
794            let file_path = format_file_path(base_path, None);
795            let file_path_name = file_path
796                .join(format!("{}_subscribe", relation_name))
797                .with_extension("csv");
798            tokio::fs::create_dir_all(&file_path).await?;
799
800            copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
801        }
802    }
803    Ok::<(), anyhow::Error>(())
804}
805
806impl SystemCatalogDumper {
807    pub async fn new(connection_url: &str, base_path: PathBuf) -> Result<Self, anyhow::Error> {
808        let (pg_client, pg_conn, pg_tls) = create_postgres_connection(connection_url).await?;
809
810        let handle = task::spawn(|| "postgres-connection", pg_conn);
811
812        // Set search path to system catalog tables
813        pg_client
814            .execute(SET_SEARCH_PATH_QUERY, &[])
815            .await
816            .context("Failed to set search path")?;
817
818        // We need to get all cluster replicas to dump introspection relations.
819        let cluster_replicas = match pg_client.query(SELECT_CLUSTER_REPLICAS_QUERY, &[]).await {
820            Ok(rows) => rows
821                .into_iter()
822                .map(|row| {
823                    let cluster_name = row.try_get::<_, String>("cluster_name");
824                    let replica_name = row.try_get::<_, String>("replica_name");
825
826                    if let (Ok(cluster_name), Ok(replica_name)) = (cluster_name, replica_name) {
827                        Some(ClusterReplica {
828                            cluster_name,
829                            replica_name,
830                        })
831                    } else {
832                        None
833                    }
834                })
835                .filter_map(|row| row)
836                .collect::<Vec<_>>(),
837            Err(e) => {
838                warn!("Failed to get replica names: {}", e);
839                vec![]
840            }
841        };
842
843        Ok(Self {
844            base_path,
845            pg_client: Arc::new(Mutex::new(pg_client)),
846            pg_tls,
847            cluster_replicas,
848            _pg_conn_handle: handle,
849        })
850    }
851
852    pub async fn dump_relation(
853        &self,
854        relation: &Relation,
855        cluster_replica: Option<&ClusterReplica>,
856    ) -> Result<(), anyhow::Error> {
857        info!(
858            "Copying relation {}{}{}",
859            relation.name,
860            match relation.category {
861                RelationCategory::Retained => " (subscribe history)",
862                _ => "",
863            },
864            cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" in {}", replica))
865        );
866
867        let base_path = self.base_path.clone();
868        let pg_client = &self.pg_client;
869
870        let relation_name = relation.name.to_string();
871
872        // For custom queries, create a temporary view so the retry loop
873        // can treat them identically to basic relations.
874        if let RelationCategory::Custom { sql } = &relation.category {
875            let pg_client_lock = pg_client.lock().await;
876            pg_client_lock
877                .execute(
878                    &format!(
879                        "CREATE OR REPLACE TEMPORARY VIEW {} AS {}",
880                        relation.name, sql
881                    ),
882                    &[],
883                )
884                .await
885                .context(format!(
886                    "Failed to create temporary view for {}",
887                    relation.name
888                ))?;
889        }
890
891        if let Err(err) = retry::Retry::default()
892            .max_duration(PG_QUERY_TIMEOUT)
893            .initial_backoff(Duration::from_secs(2))
894            .retry_async_canceling(|_| {
895                let base_path = base_path.clone();
896                let relation_name = relation.name;
897                let cluster_replica = cluster_replica.clone();
898
899                async move {
900                    // TODO (debug_tool3): Use a transaction for the entire dump instead of per query.
901                    let mut pg_client = pg_client.lock().await;
902
903                    match async {
904                        // We cannot query the column names in the transaction because SUBSCRIBE queries
905                        // cannot be executed with SELECT and SHOW queries in the same transaction.
906                        let column_names = query_column_names(&pg_client, relation).await?;
907
908                        let transaction = pg_client.transaction().await?;
909                        query_relation(
910                            &transaction,
911                            base_path,
912                            relation,
913                            &column_names,
914                            cluster_replica,
915                        )
916                        .await?;
917
918                        Ok::<(), anyhow::Error>(())
919                    }
920                    .await
921                    {
922                        Ok(()) => Ok(()),
923                        Err(err) => {
924                            warn!(
925                                "{}: {:#}. Retrying...",
926                                format_catalog_dump_error_message(relation_name, cluster_replica),
927                                err
928                            );
929                            Err(err)
930                        }
931                    }
932                }
933            })
934            .await
935        {
936            let pg_client_lock = pg_client.lock().await;
937
938            let cancel_token = pg_client_lock.cancel_token();
939
940            if let Err(_) = async {
941                let tls = self.pg_tls.clone();
942
943                cancel_token.cancel_query(tls).await?;
944                Ok::<(), anyhow::Error>(())
945            }
946            .await
947            {
948                warn!(
949                    "Failed to cancel query for {}{}",
950                    relation_name,
951                    cluster_replica
952                        .map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
953                );
954            }
955
956            return Err(err);
957        }
958
959        Ok(())
960    }
961
962    pub async fn dump_all_relations(&self) {
963        let cluster_replicas = &self.cluster_replicas;
964
965        // Create a map to count errors by cluster replica..
966        let mut cluster_replica_error_counts: HashMap<ClusterReplica, usize> = HashMap::new();
967        for replica in cluster_replicas {
968            cluster_replica_error_counts
969                .entry(replica.clone())
970                .insert_entry(0);
971        }
972
973        let non_introspection_iter = RELATIONS
974            .iter()
975            .filter(|relation| {
976                matches!(
977                    relation.category,
978                    RelationCategory::Basic
979                        | RelationCategory::Retained
980                        | RelationCategory::Custom { .. }
981                )
982            })
983            .map(|relation| (relation, None::<&ClusterReplica>));
984
985        let introspection_iter = RELATIONS
986            .iter()
987            .filter(|relation| matches!(relation.category, RelationCategory::Introspection))
988            .collect::<Vec<_>>();
989
990        let introspection_iter = cluster_replicas.iter().flat_map(|replica| {
991            introspection_iter
992                .iter()
993                .map(move |relation| (*relation, Some(replica)))
994        });
995
996        // Combine and iterate over all relation/replica pairs
997        for (relation, replica) in non_introspection_iter.chain(introspection_iter) {
998            let replica_key = if let Some(replica) = replica {
999                replica
1000            } else {
1001                // If the replica is null, we assume it's  mz_catalog_server.
1002                &ClusterReplica::default()
1003            };
1004
1005            // If the cluster replica has more than `MAX_CLUSTER_REPLICA_ERROR_COUNT` errors,
1006            // we can skip it since we can assume it's not responsive and don't want to hold up
1007            // following queries.
1008            if cluster_replica_error_counts.get(replica_key).unwrap_or(&0)
1009                >= &MAX_CLUSTER_REPLICA_ERROR_COUNT
1010            {
1011                info!(
1012                    "Skipping {}{}",
1013                    relation.name,
1014                    replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
1015                );
1016                continue;
1017            }
1018
1019            if let Err(err) = self.dump_relation(relation, replica).await {
1020                warn!(
1021                    "{}: {:#}.",
1022                    format_catalog_dump_error_message(relation.name, replica),
1023                    err,
1024                );
1025
1026                if err.to_string().contains("deadline has elapsed") {
1027                    let docs_link = if replica.is_none()
1028                        || replica.map_or(false, |r| r.cluster_name == "mz_catalog_server")
1029                    {
1030                        "https://materialize.com/docs/installation/troubleshooting/#troubleshooting-console-unresponsiveness"
1031                    } else {
1032                        "https://materialize.com/docs/sql/alter-cluster/#resizing-1"
1033                    };
1034                    warn!("Consider increasing the size of the cluster {}", docs_link);
1035                }
1036
1037                let is_missing_catalog_item_err = match err.downcast_ref::<tokio_postgres::Error>()
1038                {
1039                    Some(pg_err) => pg_err
1040                        .to_string()
1041                        .to_lowercase()
1042                        .contains("unknown catalog item"),
1043                    None => false,
1044                };
1045
1046                // If the error is due to a missing catalog item,
1047                // we don't count it as an error since we expect some
1048                // catalog items to be missing. This is because mz-debug
1049                // is meant to be backwards compatible with older versions
1050                // of Materialize.
1051                if !is_missing_catalog_item_err {
1052                    cluster_replica_error_counts
1053                        .entry(replica_key.clone())
1054                        .and_modify(|count| *count += 1)
1055                        .or_insert(1);
1056                }
1057            }
1058        }
1059    }
1060}
1061
1062fn format_catalog_dump_error_message(
1063    relation_name: &str,
1064    cluster_replica: Option<&ClusterReplica>,
1065) -> String {
1066    format!(
1067        "Failed to dump relation {}{}",
1068        relation_name,
1069        cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
1070    )
1071}
1072
1073fn format_file_path(base_path: PathBuf, cluster_replica: Option<&ClusterReplica>) -> PathBuf {
1074    let path = base_path.join(SYSTEM_CATALOG_DUMP_DIR);
1075    if let Some(cluster_replica) = cluster_replica {
1076        path.join(cluster_replica.cluster_name.as_str())
1077            .join(cluster_replica.replica_name.as_str())
1078    } else {
1079        path
1080    }
1081}