1use anyhow::{Context as _, Result};
22use csv_async::AsyncSerializer;
23use futures::TryStreamExt;
24use mz_sql_parser::ast::display::escaped_string_literal;
25use mz_tls_util::make_tls;
26use std::fmt;
27use std::path::PathBuf;
28use std::str::FromStr;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::io::AsyncWriteExt;
32use tokio::sync::Mutex;
33use tokio_postgres::{
34 Client as PgClient, Config as PgConfig, Connection, SimpleQueryMessage, Socket, Transaction,
35};
36use tokio_util::io::StreamReader;
37
38use mz_ore::collections::HashMap;
39use mz_ore::retry::{self};
40use mz_ore::task::{self, JoinHandle};
41use postgres_openssl::{MakeTlsConnector, TlsStream};
42use tracing::{info, warn};
43
44#[derive(Debug, Clone)]
45pub enum RelationCategory {
46 Introspection,
49 Retained,
51 Basic,
53 Custom { sql: &'static str },
55}
56
57#[derive(Debug, Clone)]
58pub struct Relation {
59 pub name: &'static str,
60 pub category: RelationCategory,
61}
62
63static SYSTEM_CATALOG_DUMP_DIR: &str = "system_catalog";
64static RELATIONS: &[Relation] = &[
67 Relation {
69 name: "mz_audit_events",
70 category: RelationCategory::Basic,
71 },
72 Relation {
73 name: "mz_databases",
74 category: RelationCategory::Basic,
75 },
76 Relation {
77 name: "mz_schemas",
78 category: RelationCategory::Basic,
79 },
80 Relation {
81 name: "mz_tables",
82 category: RelationCategory::Basic,
83 },
84 Relation {
85 name: "mz_sources",
86 category: RelationCategory::Basic,
87 },
88 Relation {
89 name: "mz_sinks",
90 category: RelationCategory::Basic,
91 },
92 Relation {
93 name: "mz_views",
94 category: RelationCategory::Basic,
95 },
96 Relation {
97 name: "mz_materialized_views",
98 category: RelationCategory::Basic,
99 },
100 Relation {
101 name: "mz_secrets",
102 category: RelationCategory::Basic,
103 },
104 Relation {
105 name: "mz_connections",
106 category: RelationCategory::Basic,
107 },
108 Relation {
109 name: "mz_roles",
110 category: RelationCategory::Basic,
111 },
112 Relation {
113 name: "mz_subscriptions",
114 category: RelationCategory::Basic,
115 },
116 Relation {
117 name: "mz_object_fully_qualified_names",
118 category: RelationCategory::Basic,
119 },
120 Relation {
121 name: "mz_sessions",
122 category: RelationCategory::Basic,
123 },
124 Relation {
125 name: "mz_object_history",
126 category: RelationCategory::Basic,
127 },
128 Relation {
129 name: "mz_object_lifetimes",
130 category: RelationCategory::Basic,
131 },
132 Relation {
133 name: "mz_object_dependencies",
134 category: RelationCategory::Basic,
135 },
136 Relation {
137 name: "mz_object_transitive_dependencies",
138 category: RelationCategory::Basic,
139 },
140 Relation {
142 name: "mz_clusters",
143 category: RelationCategory::Basic,
144 },
145 Relation {
146 name: "mz_indexes",
147 category: RelationCategory::Basic,
148 },
149 Relation {
150 name: "mz_cluster_replicas",
151 category: RelationCategory::Basic,
152 },
153 Relation {
154 name: "mz_cluster_replica_sizes",
155 category: RelationCategory::Basic,
156 },
157 Relation {
158 name: "mz_cluster_replica_statuses",
159 category: RelationCategory::Basic,
160 },
161 Relation {
162 name: "mz_cluster_replica_metrics_history",
163 category: RelationCategory::Basic,
164 },
165 Relation {
166 name: "mz_compute_hydration_times",
167 category: RelationCategory::Retained,
168 },
169 Relation {
170 name: "mz_materialization_dependencies",
171 category: RelationCategory::Basic,
172 },
173 Relation {
174 name: "mz_cluster_replica_status_history",
175 category: RelationCategory::Basic,
176 },
177 Relation {
179 name: "mz_wallclock_global_lag_recent_history",
180 category: RelationCategory::Basic,
181 },
182 Relation {
183 name: "mz_global_frontiers",
184 category: RelationCategory::Basic,
185 },
186 Relation {
187 name: "mz_cluster_replica_frontiers",
188 category: RelationCategory::Basic,
189 },
190 Relation {
191 name: "mz_materialization_lag",
192 category: RelationCategory::Basic,
193 },
194 Relation {
196 name: "mz_source_statistics_with_history",
197 category: RelationCategory::Basic,
198 },
199 Relation {
200 name: "mz_source_statistics_with_history",
201 category: RelationCategory::Retained,
202 },
203 Relation {
204 name: "mz_sink_statistics",
205 category: RelationCategory::Basic,
206 },
207 Relation {
208 name: "mz_sink_statistics",
209 category: RelationCategory::Retained,
210 },
211 Relation {
212 name: "mz_source_statuses",
213 category: RelationCategory::Basic,
214 },
215 Relation {
216 name: "mz_sink_statuses",
217 category: RelationCategory::Basic,
218 },
219 Relation {
220 name: "mz_source_status_history",
221 category: RelationCategory::Basic,
222 },
223 Relation {
224 name: "mz_sink_status_history",
225 category: RelationCategory::Basic,
226 },
227 Relation {
229 name: "mz_materialized_view_refresh_strategies",
230 category: RelationCategory::Basic,
231 },
232 Relation {
233 name: "mz_cluster_schedules",
234 category: RelationCategory::Basic,
235 },
236 Relation {
238 name: "mz_recent_storage_usage",
239 category: RelationCategory::Basic,
240 },
241 Relation {
243 name: "mz_arrangement_sharing_per_worker",
244 category: RelationCategory::Introspection,
245 },
246 Relation {
247 name: "mz_arrangement_sharing",
248 category: RelationCategory::Introspection,
249 },
250 Relation {
251 name: "mz_arrangement_sizes_per_worker",
252 category: RelationCategory::Introspection,
253 },
254 Relation {
255 name: "mz_dataflow_channels",
256 category: RelationCategory::Introspection,
257 },
258 Relation {
259 name: "mz_dataflow_operators",
260 category: RelationCategory::Introspection,
261 },
262 Relation {
263 name: "mz_dataflow_global_ids",
264 category: RelationCategory::Introspection,
265 },
266 Relation {
267 name: "mz_dataflow_operator_dataflows_per_worker",
268 category: RelationCategory::Introspection,
269 },
270 Relation {
271 name: "mz_dataflow_operator_dataflows",
272 category: RelationCategory::Introspection,
273 },
274 Relation {
275 name: "mz_dataflow_operator_parents_per_worker",
276 category: RelationCategory::Introspection,
277 },
278 Relation {
279 name: "mz_dataflow_operator_parents",
280 category: RelationCategory::Introspection,
281 },
282 Relation {
283 name: "mz_compute_exports",
284 category: RelationCategory::Introspection,
285 },
286 Relation {
287 name: "mz_dataflow_arrangement_sizes",
288 category: RelationCategory::Introspection,
289 },
290 Relation {
291 name: "mz_expected_group_size_advice",
292 category: RelationCategory::Introspection,
293 },
294 Relation {
295 name: "mz_compute_frontiers",
296 category: RelationCategory::Introspection,
297 },
298 Relation {
299 name: "mz_dataflow_channel_operators_per_worker",
300 category: RelationCategory::Introspection,
301 },
302 Relation {
303 name: "mz_dataflow_channel_operators",
304 category: RelationCategory::Introspection,
305 },
306 Relation {
307 name: "mz_compute_import_frontiers",
308 category: RelationCategory::Introspection,
309 },
310 Relation {
311 name: "mz_message_counts_per_worker",
312 category: RelationCategory::Introspection,
313 },
314 Relation {
315 name: "mz_message_counts",
316 category: RelationCategory::Introspection,
317 },
318 Relation {
319 name: "mz_active_peeks",
320 category: RelationCategory::Introspection,
321 },
322 Relation {
323 name: "mz_compute_operator_durations_histogram_per_worker",
324 category: RelationCategory::Introspection,
325 },
326 Relation {
327 name: "mz_compute_operator_durations_histogram",
328 category: RelationCategory::Introspection,
329 },
330 Relation {
331 name: "mz_records_per_dataflow_operator_per_worker",
332 category: RelationCategory::Introspection,
333 },
334 Relation {
335 name: "mz_records_per_dataflow_operator",
336 category: RelationCategory::Introspection,
337 },
338 Relation {
339 name: "mz_records_per_dataflow_per_worker",
340 category: RelationCategory::Introspection,
341 },
342 Relation {
343 name: "mz_records_per_dataflow",
344 category: RelationCategory::Introspection,
345 },
346 Relation {
347 name: "mz_peek_durations_histogram_per_worker",
348 category: RelationCategory::Introspection,
349 },
350 Relation {
351 name: "mz_peek_durations_histogram",
352 category: RelationCategory::Introspection,
353 },
354 Relation {
355 name: "mz_scheduling_elapsed_per_worker",
356 category: RelationCategory::Introspection,
357 },
358 Relation {
359 name: "mz_scheduling_elapsed",
360 category: RelationCategory::Introspection,
361 },
362 Relation {
363 name: "mz_scheduling_parks_histogram_per_worker",
364 category: RelationCategory::Introspection,
365 },
366 Relation {
367 name: "mz_scheduling_parks_histogram",
368 category: RelationCategory::Introspection,
369 },
370 Relation {
371 name: "mz_compute_lir_mapping_per_worker",
372 category: RelationCategory::Introspection,
373 },
374 Relation {
375 name: "mz_lir_mapping",
376 category: RelationCategory::Introspection,
377 },
378 Relation {
379 name: "mz_compute_error_counts",
380 category: RelationCategory::Introspection,
381 },
382 Relation {
383 name: "mz_compute_error_counts_per_worker",
384 category: RelationCategory::Introspection,
385 },
386 Relation {
389 name: "mz_cluster_replica_metrics_history",
390 category: RelationCategory::Basic,
391 },
392 Relation {
393 name: "mz_webhook_sources",
394 category: RelationCategory::Basic,
395 },
396 Relation {
397 name: "mz_cluster_replica_history",
398 category: RelationCategory::Basic,
399 },
400 Relation {
401 name: "mz_source_statistics",
402 category: RelationCategory::Basic,
403 },
404 Relation {
405 name: "mz_cluster_deployment_lineage",
406 category: RelationCategory::Basic,
407 },
408 Relation {
409 name: "mz_show_indexes",
410 category: RelationCategory::Basic,
411 },
412 Relation {
413 name: "mz_relations",
414 category: RelationCategory::Basic,
415 },
416 Relation {
417 name: "mz_frontiers",
418 category: RelationCategory::Basic,
419 },
420 Relation {
421 name: "mz_console_cluster_utilization_overview",
422 category: RelationCategory::Basic,
423 },
424 Relation {
425 name: "mz_columns",
426 category: RelationCategory::Basic,
427 },
428 Relation {
429 name: "mz_kafka_sources",
430 category: RelationCategory::Basic,
431 },
432 Relation {
433 name: "mz_kafka_sinks",
434 category: RelationCategory::Basic,
435 },
436 Relation {
438 name: "daily_replica_credit_usage",
439 category: RelationCategory::Custom {
440 sql: r#"
441-- Capture replica create and drop events from the audit log
442with replica_events as (
443 select
444 details ->> 'replica_id' as replica_id,
445 details ->> 'replica_name' as replica_name,
446 details ->> 'cluster_name' as cluster_name,
447 details ->> 'logical_size' as replica_size,
448 event_type,
449 occurred_at
450 from mz_catalog.mz_audit_events
451 where
452 object_type = 'cluster-replica'
453 and event_type in ('create', 'drop')
454 and details ->> 'replica_id' like 'u%'
455),
456
457-- Pair each replica creation with its corresponding drop to determine lifespan
458replica_lifespans as (
459 select
460 c.replica_id,
461 c.replica_name,
462 c.cluster_name,
463 c.replica_size,
464 c.occurred_at as created_at,
465 min(d.occurred_at) as dropped_at
466 from replica_events as c
467 left join replica_events as d
468 on c.replica_id = d.replica_id
469 and d.event_type = 'drop'
470 and c.occurred_at < d.occurred_at
471 where c.event_type = 'create'
472 group by
473 c.replica_id,
474 c.replica_name,
475 c.cluster_name,
476 c.replica_size,
477 c.occurred_at
478),
479
480-- Break replica lifespans into per-day slices and compute seconds online per day
481daily_slices as (
482 select
483 l.replica_id,
484 l.replica_name,
485 l.cluster_name,
486 l.replica_size,
487 gs.day_start::date as usage_date,
488 greatest(
489 0::numeric,
490 extract(
491 epoch from
492 least(coalesce(l.dropped_at, now()), gs.day_start + interval '1 day')
493 - greatest(l.created_at, gs.day_start)
494 )::numeric
495 ) as seconds_online
496 from replica_lifespans as l
497 cross join lateral generate_series(
498 date_trunc('day', l.created_at),
499 date_trunc('day', coalesce(l.dropped_at, now())),
500 interval '1 day'
501 ) as gs(day_start)
502),
503
504-- Resolve replica heap limits from configured size definitions
505size_heap as (
506 select
507 size,
508 memory_bytes + disk_bytes as heap_limit
509 from mz_catalog.mz_cluster_replica_sizes
510 where disk_bytes is not null and disk_bytes > 0
511),
512
513-- Resolve replica heap limits from runtime metrics,
514-- since heap_limit is only discovered when the replica comes online
515metrics_heap as (
516 select distinct on (replica_id)
517 replica_id,
518 coalesce(heap_limit, memory_bytes + disk_bytes) as heap_limit
519 from mz_internal.mz_cluster_replica_metrics_history
520)
521
522-- Aggregate daily usage, report daily footprint, and compute
523-- M.1 credit equivalency (53 GiB-hour = 1.5 credits), billed per second
524select
525 s.usage_date,
526 mz_environment_id() as env_id,
527
528 sum(coalesce(sh.heap_limit, mh.heap_limit)) as total_heap_limit,
529
530 sum(coalesce(sh.heap_limit, mh.heap_limit) * s.seconds_online)
531 / (53.0 * 1024 * 1024 * 1024 * 3600.0)
532 * 1.5
533 as credit_equivalency
534from daily_slices as s
535left join size_heap as sh on s.replica_size = sh.size
536left join metrics_heap as mh on s.replica_id = mh.replica_id
537where s.seconds_online > 0
538group by s.usage_date
539order by s.usage_date;
540"#,
541 },
542 },
543];
544
545static PG_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
546static PG_QUERY_TIMEOUT: Duration = Duration::from_secs(20);
549
550static MAX_CLUSTER_REPLICA_ERROR_COUNT: usize = 3;
553
554static SET_SEARCH_PATH_QUERY: &str = "SET search_path = mz_internal, mz_catalog, mz_introspection";
555static SELECT_CLUSTER_REPLICAS_QUERY: &str = "SELECT c.name as cluster_name, cr.name as replica_name FROM mz_clusters AS c JOIN mz_cluster_replicas AS cr ON c.id = cr.cluster_id;";
556
557#[derive(Debug, Clone, PartialEq, Eq, Hash)]
558pub struct ClusterReplica {
559 pub cluster_name: String,
560 pub replica_name: String,
561}
562
563impl Default for ClusterReplica {
564 fn default() -> Self {
565 Self {
566 cluster_name: "mz_catalog_server".to_string(),
567 replica_name: "r1".to_string(),
568 }
569 }
570}
571
572impl fmt::Display for ClusterReplica {
573 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
574 write!(f, "{}.{}", self.cluster_name, self.replica_name)
575 }
576}
577
578pub struct SystemCatalogDumper {
579 base_path: PathBuf,
580 pg_client: Arc<Mutex<PgClient>>,
581 pg_tls: MakeTlsConnector,
582 cluster_replicas: Vec<ClusterReplica>,
583 _pg_conn_handle: JoinHandle<Result<(), tokio_postgres::Error>>,
584}
585
586pub async fn create_postgres_connection(
587 connection_string: &str,
588) -> Result<
589 (
590 PgClient,
591 Connection<Socket, TlsStream<Socket>>,
592 MakeTlsConnector,
593 ),
594 anyhow::Error,
595> {
596 let mut pg_config = PgConfig::from_str(connection_string)?;
597 pg_config.connect_timeout(PG_CONNECTION_TIMEOUT);
598 let tls = make_tls(&pg_config)?;
599
600 let host_addr = pg_config.get_hosts().first();
601 let port = pg_config.get_ports().first();
602
603 let redacted_connection_string = if let (Some(host_addr), Some(port)) = (host_addr, port) {
605 format!(" at {:?} on port {}", host_addr, port)
606 } else {
607 "".to_string()
608 };
609
610 info!(
611 "Connecting to PostgreSQL server{}",
612 redacted_connection_string
613 );
614
615 let (pg_client, pg_conn) = retry::Retry::default()
616 .max_duration(PG_CONNECTION_TIMEOUT)
617 .retry_async_canceling(|_| {
618 let pg_config = pg_config.clone();
619 let tls = tls.clone();
620 async move { pg_config.connect(tls).await.map_err(|e| anyhow::anyhow!(e)) }
621 })
622 .await?;
623
624 info!(
625 "Connected to PostgreSQL server{}",
626 redacted_connection_string
627 );
628
629 Ok((pg_client, pg_conn, tls))
630}
631
632pub async fn write_copy_stream(
633 transaction: &Transaction<'_>,
634 copy_query: &str,
635 file: &mut tokio::fs::File,
636 relation_name: &str,
637) -> Result<(), anyhow::Error> {
638 let copy_stream = transaction
639 .copy_out(copy_query)
640 .await
641 .context(format!("Failed to COPY TO for {}", relation_name))?
642 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
643 let copy_stream = std::pin::pin!(copy_stream);
644 let mut reader = StreamReader::new(copy_stream);
645 tokio::io::copy(&mut reader, file).await?;
646 file.sync_all().await?;
648
649 Ok::<(), anyhow::Error>(())
650}
651
652pub async fn copy_relation_to_csv(
653 transaction: &Transaction<'_>,
654 file_path_name: PathBuf,
655 column_names: &Vec<String>,
656 relation: &Relation,
657) -> Result<(), anyhow::Error> {
658 let mut file = tokio::fs::File::create(&file_path_name).await?;
659
660 match relation.category {
661 RelationCategory::Retained => {
662 let mut writer = AsyncSerializer::from_writer(file);
663 writer.serialize(column_names).await?;
664
665 transaction
666 .execute(
667 &format!("DECLARE c CURSOR FOR SUBSCRIBE TO {}", relation.name),
668 &[],
669 )
670 .await
671 .context("Failed to declare cursor")?;
672
673 let rows = transaction
677 .simple_query("FETCH ALL FROM c WITH (TIMEOUT '1')")
680 .await
681 .context("Failed to fetch all from cursor")?;
682
683 for row in rows {
684 if let SimpleQueryMessage::Row(row) = row {
685 let values: Vec<&str> = (0..row.len())
686 .map(|i| row.get(i).unwrap_or("")) .collect();
688 writer.serialize(&values).await?;
689 }
690 }
691 }
692 _ => {
693 file.write_all((column_names.join(",") + "\n").as_bytes())
695 .await?;
696 let copy_query = format!(
697 "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)",
698 relation.name
699 );
700 write_copy_stream(transaction, ©_query, &mut file, relation.name).await?;
701 }
702 };
703
704 info!("Copied {} to {}", relation.name, file_path_name.display());
705 Ok::<(), anyhow::Error>(())
706}
707
708pub async fn query_column_names(
709 pg_client: &PgClient,
710 relation: &Relation,
711) -> Result<Vec<String>, anyhow::Error> {
712 let relation_name = relation.name;
713 let mut column_names = pg_client
715 .query(&format!("SHOW COLUMNS FROM {}", &relation_name), &[])
716 .await
717 .context(format!("Failed to get column names for {}", relation_name))?
718 .into_iter()
719 .map(|row| match row.try_get::<_, String>("name") {
720 Ok(name) => Some(name),
721 Err(_) => None,
722 })
723 .filter_map(|row| row)
724 .collect::<Vec<_>>();
725
726 match relation.category {
727 RelationCategory::Retained => {
728 column_names.splice(0..0, ["mz_timestamp".to_string(), "mz_diff".to_string()]);
729 }
730 _ => (),
731 }
732
733 Ok(column_names)
734}
735
736pub async fn query_relation(
737 transaction: &Transaction<'_>,
738 base_path: PathBuf,
739 relation: &Relation,
740 column_names: &Vec<String>,
741 cluster_replica: Option<&ClusterReplica>,
742) -> Result<(), anyhow::Error> {
743 let relation_name = relation.name;
744 let relation_category = &relation.category;
745
746 if let Some(cluster_replica) = &cluster_replica {
748 transaction
749 .execute(
750 &format!(
751 "SET LOCAL CLUSTER = {}",
752 escaped_string_literal(&cluster_replica.cluster_name)
753 ),
754 &[],
755 )
756 .await
757 .context(format!(
758 "Failed to set cluster to {}",
759 cluster_replica.cluster_name
760 ))?;
761 transaction
762 .execute(
763 &format!(
764 "SET LOCAL CLUSTER_REPLICA = {}",
765 escaped_string_literal(&cluster_replica.replica_name)
766 ),
767 &[],
768 )
769 .await
770 .context(format!(
771 "Failed to set cluster replica to {}",
772 cluster_replica.replica_name
773 ))?;
774 }
775
776 match relation_category {
777 RelationCategory::Basic | RelationCategory::Custom { .. } => {
778 let file_path = format_file_path(base_path, None);
779 let file_path_name = file_path.join(relation_name).with_extension("csv");
780 tokio::fs::create_dir_all(&file_path).await?;
781
782 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
783 }
784 RelationCategory::Introspection => {
785 let file_path = format_file_path(base_path, cluster_replica);
786 tokio::fs::create_dir_all(&file_path).await?;
787
788 let file_path_name = file_path.join(relation_name).with_extension("csv");
789
790 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
791 }
792 RelationCategory::Retained => {
793 let file_path = format_file_path(base_path, None);
795 let file_path_name = file_path
796 .join(format!("{}_subscribe", relation_name))
797 .with_extension("csv");
798 tokio::fs::create_dir_all(&file_path).await?;
799
800 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
801 }
802 }
803 Ok::<(), anyhow::Error>(())
804}
805
806impl SystemCatalogDumper {
807 pub async fn new(connection_url: &str, base_path: PathBuf) -> Result<Self, anyhow::Error> {
808 let (pg_client, pg_conn, pg_tls) = create_postgres_connection(connection_url).await?;
809
810 let handle = task::spawn(|| "postgres-connection", pg_conn);
811
812 pg_client
814 .execute(SET_SEARCH_PATH_QUERY, &[])
815 .await
816 .context("Failed to set search path")?;
817
818 let cluster_replicas = match pg_client.query(SELECT_CLUSTER_REPLICAS_QUERY, &[]).await {
820 Ok(rows) => rows
821 .into_iter()
822 .map(|row| {
823 let cluster_name = row.try_get::<_, String>("cluster_name");
824 let replica_name = row.try_get::<_, String>("replica_name");
825
826 if let (Ok(cluster_name), Ok(replica_name)) = (cluster_name, replica_name) {
827 Some(ClusterReplica {
828 cluster_name,
829 replica_name,
830 })
831 } else {
832 None
833 }
834 })
835 .filter_map(|row| row)
836 .collect::<Vec<_>>(),
837 Err(e) => {
838 warn!("Failed to get replica names: {}", e);
839 vec![]
840 }
841 };
842
843 Ok(Self {
844 base_path,
845 pg_client: Arc::new(Mutex::new(pg_client)),
846 pg_tls,
847 cluster_replicas,
848 _pg_conn_handle: handle,
849 })
850 }
851
852 pub async fn dump_relation(
853 &self,
854 relation: &Relation,
855 cluster_replica: Option<&ClusterReplica>,
856 ) -> Result<(), anyhow::Error> {
857 info!(
858 "Copying relation {}{}{}",
859 relation.name,
860 match relation.category {
861 RelationCategory::Retained => " (subscribe history)",
862 _ => "",
863 },
864 cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" in {}", replica))
865 );
866
867 let base_path = self.base_path.clone();
868 let pg_client = &self.pg_client;
869
870 let relation_name = relation.name.to_string();
871
872 if let RelationCategory::Custom { sql } = &relation.category {
875 let pg_client_lock = pg_client.lock().await;
876 pg_client_lock
877 .execute(
878 &format!(
879 "CREATE OR REPLACE TEMPORARY VIEW {} AS {}",
880 relation.name, sql
881 ),
882 &[],
883 )
884 .await
885 .context(format!(
886 "Failed to create temporary view for {}",
887 relation.name
888 ))?;
889 }
890
891 if let Err(err) = retry::Retry::default()
892 .max_duration(PG_QUERY_TIMEOUT)
893 .initial_backoff(Duration::from_secs(2))
894 .retry_async_canceling(|_| {
895 let base_path = base_path.clone();
896 let relation_name = relation.name;
897 let cluster_replica = cluster_replica.clone();
898
899 async move {
900 let mut pg_client = pg_client.lock().await;
902
903 match async {
904 let column_names = query_column_names(&pg_client, relation).await?;
907
908 let transaction = pg_client.transaction().await?;
909 query_relation(
910 &transaction,
911 base_path,
912 relation,
913 &column_names,
914 cluster_replica,
915 )
916 .await?;
917
918 Ok::<(), anyhow::Error>(())
919 }
920 .await
921 {
922 Ok(()) => Ok(()),
923 Err(err) => {
924 warn!(
925 "{}: {:#}. Retrying...",
926 format_catalog_dump_error_message(relation_name, cluster_replica),
927 err
928 );
929 Err(err)
930 }
931 }
932 }
933 })
934 .await
935 {
936 let pg_client_lock = pg_client.lock().await;
937
938 let cancel_token = pg_client_lock.cancel_token();
939
940 if let Err(_) = async {
941 let tls = self.pg_tls.clone();
942
943 cancel_token.cancel_query(tls).await?;
944 Ok::<(), anyhow::Error>(())
945 }
946 .await
947 {
948 warn!(
949 "Failed to cancel query for {}{}",
950 relation_name,
951 cluster_replica
952 .map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
953 );
954 }
955
956 return Err(err);
957 }
958
959 Ok(())
960 }
961
962 pub async fn dump_all_relations(&self) {
963 let cluster_replicas = &self.cluster_replicas;
964
965 let mut cluster_replica_error_counts: HashMap<ClusterReplica, usize> = HashMap::new();
967 for replica in cluster_replicas {
968 cluster_replica_error_counts
969 .entry(replica.clone())
970 .insert_entry(0);
971 }
972
973 let non_introspection_iter = RELATIONS
974 .iter()
975 .filter(|relation| {
976 matches!(
977 relation.category,
978 RelationCategory::Basic
979 | RelationCategory::Retained
980 | RelationCategory::Custom { .. }
981 )
982 })
983 .map(|relation| (relation, None::<&ClusterReplica>));
984
985 let introspection_iter = RELATIONS
986 .iter()
987 .filter(|relation| matches!(relation.category, RelationCategory::Introspection))
988 .collect::<Vec<_>>();
989
990 let introspection_iter = cluster_replicas.iter().flat_map(|replica| {
991 introspection_iter
992 .iter()
993 .map(move |relation| (*relation, Some(replica)))
994 });
995
996 for (relation, replica) in non_introspection_iter.chain(introspection_iter) {
998 let replica_key = if let Some(replica) = replica {
999 replica
1000 } else {
1001 &ClusterReplica::default()
1003 };
1004
1005 if cluster_replica_error_counts.get(replica_key).unwrap_or(&0)
1009 >= &MAX_CLUSTER_REPLICA_ERROR_COUNT
1010 {
1011 info!(
1012 "Skipping {}{}",
1013 relation.name,
1014 replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
1015 );
1016 continue;
1017 }
1018
1019 if let Err(err) = self.dump_relation(relation, replica).await {
1020 warn!(
1021 "{}: {:#}.",
1022 format_catalog_dump_error_message(relation.name, replica),
1023 err,
1024 );
1025
1026 if err.to_string().contains("deadline has elapsed") {
1027 let docs_link = if replica.is_none()
1028 || replica.map_or(false, |r| r.cluster_name == "mz_catalog_server")
1029 {
1030 "https://materialize.com/docs/installation/troubleshooting/#troubleshooting-console-unresponsiveness"
1031 } else {
1032 "https://materialize.com/docs/sql/alter-cluster/#resizing-1"
1033 };
1034 warn!("Consider increasing the size of the cluster {}", docs_link);
1035 }
1036
1037 let is_missing_catalog_item_err = match err.downcast_ref::<tokio_postgres::Error>()
1038 {
1039 Some(pg_err) => pg_err
1040 .to_string()
1041 .to_lowercase()
1042 .contains("unknown catalog item"),
1043 None => false,
1044 };
1045
1046 if !is_missing_catalog_item_err {
1052 cluster_replica_error_counts
1053 .entry(replica_key.clone())
1054 .and_modify(|count| *count += 1)
1055 .or_insert(1);
1056 }
1057 }
1058 }
1059 }
1060}
1061
1062fn format_catalog_dump_error_message(
1063 relation_name: &str,
1064 cluster_replica: Option<&ClusterReplica>,
1065) -> String {
1066 format!(
1067 "Failed to dump relation {}{}",
1068 relation_name,
1069 cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
1070 )
1071}
1072
1073fn format_file_path(base_path: PathBuf, cluster_replica: Option<&ClusterReplica>) -> PathBuf {
1074 let path = base_path.join(SYSTEM_CATALOG_DUMP_DIR);
1075 if let Some(cluster_replica) = cluster_replica {
1076 path.join(cluster_replica.cluster_name.as_str())
1077 .join(cluster_replica.replica_name.as_str())
1078 } else {
1079 path
1080 }
1081}