1use anyhow::{Context as _, Result};
22use csv_async::AsyncSerializer;
23use futures::TryStreamExt;
24use mz_tls_util::make_tls;
25use std::fmt;
26use std::path::PathBuf;
27use std::str::FromStr;
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::io::AsyncWriteExt;
31use tokio::sync::Mutex;
32use tokio_postgres::{
33 Client as PgClient, Config as PgConfig, Connection, SimpleQueryMessage, Socket, Transaction,
34};
35use tokio_util::io::StreamReader;
36
37use mz_ore::collections::HashMap;
38use mz_ore::retry::{self};
39use mz_ore::task::{self, JoinHandle};
40use postgres_openssl::{MakeTlsConnector, TlsStream};
41use tracing::{info, warn};
42
43#[derive(Debug, Clone)]
44pub enum RelationCategory {
45 Introspection,
48 Retained,
50 Basic,
52}
53
54#[derive(Debug, Clone)]
55pub struct Relation {
56 pub name: &'static str,
57 pub category: RelationCategory,
58}
59
60static SYSTEM_CATALOG_DUMP_DIR: &str = "system_catalog";
61static RELATIONS: &[Relation] = &[
64 Relation {
66 name: "mz_audit_events",
67 category: RelationCategory::Basic,
68 },
69 Relation {
70 name: "mz_databases",
71 category: RelationCategory::Basic,
72 },
73 Relation {
74 name: "mz_schemas",
75 category: RelationCategory::Basic,
76 },
77 Relation {
78 name: "mz_tables",
79 category: RelationCategory::Basic,
80 },
81 Relation {
82 name: "mz_sources",
83 category: RelationCategory::Basic,
84 },
85 Relation {
86 name: "mz_sinks",
87 category: RelationCategory::Basic,
88 },
89 Relation {
90 name: "mz_views",
91 category: RelationCategory::Basic,
92 },
93 Relation {
94 name: "mz_materialized_views",
95 category: RelationCategory::Basic,
96 },
97 Relation {
98 name: "mz_secrets",
99 category: RelationCategory::Basic,
100 },
101 Relation {
102 name: "mz_connections",
103 category: RelationCategory::Basic,
104 },
105 Relation {
106 name: "mz_roles",
107 category: RelationCategory::Basic,
108 },
109 Relation {
110 name: "mz_subscriptions",
111 category: RelationCategory::Basic,
112 },
113 Relation {
114 name: "mz_object_fully_qualified_names",
115 category: RelationCategory::Basic,
116 },
117 Relation {
118 name: "mz_sessions",
119 category: RelationCategory::Basic,
120 },
121 Relation {
122 name: "mz_object_history",
123 category: RelationCategory::Basic,
124 },
125 Relation {
126 name: "mz_object_lifetimes",
127 category: RelationCategory::Basic,
128 },
129 Relation {
130 name: "mz_object_dependencies",
131 category: RelationCategory::Basic,
132 },
133 Relation {
134 name: "mz_object_transitive_dependencies",
135 category: RelationCategory::Basic,
136 },
137 Relation {
139 name: "mz_clusters",
140 category: RelationCategory::Basic,
141 },
142 Relation {
143 name: "mz_indexes",
144 category: RelationCategory::Basic,
145 },
146 Relation {
147 name: "mz_cluster_replicas",
148 category: RelationCategory::Basic,
149 },
150 Relation {
151 name: "mz_cluster_replica_sizes",
152 category: RelationCategory::Basic,
153 },
154 Relation {
155 name: "mz_cluster_replica_statuses",
156 category: RelationCategory::Basic,
157 },
158 Relation {
159 name: "mz_cluster_replica_metrics_history",
160 category: RelationCategory::Basic,
161 },
162 Relation {
163 name: "mz_compute_hydration_times",
164 category: RelationCategory::Retained,
165 },
166 Relation {
167 name: "mz_materialization_dependencies",
168 category: RelationCategory::Basic,
169 },
170 Relation {
171 name: "mz_cluster_replica_status_history",
172 category: RelationCategory::Basic,
173 },
174 Relation {
176 name: "mz_wallclock_global_lag_recent_history",
177 category: RelationCategory::Basic,
178 },
179 Relation {
180 name: "mz_global_frontiers",
181 category: RelationCategory::Basic,
182 },
183 Relation {
184 name: "mz_cluster_replica_frontiers",
185 category: RelationCategory::Basic,
186 },
187 Relation {
188 name: "mz_materialization_lag",
189 category: RelationCategory::Basic,
190 },
191 Relation {
193 name: "mz_source_statistics_with_history",
194 category: RelationCategory::Basic,
195 },
196 Relation {
197 name: "mz_source_statistics_with_history",
198 category: RelationCategory::Retained,
199 },
200 Relation {
201 name: "mz_sink_statistics",
202 category: RelationCategory::Basic,
203 },
204 Relation {
205 name: "mz_sink_statistics",
206 category: RelationCategory::Retained,
207 },
208 Relation {
209 name: "mz_source_statuses",
210 category: RelationCategory::Basic,
211 },
212 Relation {
213 name: "mz_sink_statuses",
214 category: RelationCategory::Basic,
215 },
216 Relation {
217 name: "mz_source_status_history",
218 category: RelationCategory::Basic,
219 },
220 Relation {
221 name: "mz_sink_status_history",
222 category: RelationCategory::Basic,
223 },
224 Relation {
226 name: "mz_materialized_view_refresh_strategies",
227 category: RelationCategory::Basic,
228 },
229 Relation {
230 name: "mz_cluster_schedules",
231 category: RelationCategory::Basic,
232 },
233 Relation {
235 name: "mz_recent_storage_usage",
236 category: RelationCategory::Basic,
237 },
238 Relation {
240 name: "mz_arrangement_sharing_per_worker",
241 category: RelationCategory::Introspection,
242 },
243 Relation {
244 name: "mz_arrangement_sharing",
245 category: RelationCategory::Introspection,
246 },
247 Relation {
248 name: "mz_arrangement_sizes_per_worker",
249 category: RelationCategory::Introspection,
250 },
251 Relation {
252 name: "mz_dataflow_channels",
253 category: RelationCategory::Introspection,
254 },
255 Relation {
256 name: "mz_dataflow_operators",
257 category: RelationCategory::Introspection,
258 },
259 Relation {
260 name: "mz_dataflow_global_ids",
261 category: RelationCategory::Introspection,
262 },
263 Relation {
264 name: "mz_dataflow_operator_dataflows_per_worker",
265 category: RelationCategory::Introspection,
266 },
267 Relation {
268 name: "mz_dataflow_operator_dataflows",
269 category: RelationCategory::Introspection,
270 },
271 Relation {
272 name: "mz_dataflow_operator_parents_per_worker",
273 category: RelationCategory::Introspection,
274 },
275 Relation {
276 name: "mz_dataflow_operator_parents",
277 category: RelationCategory::Introspection,
278 },
279 Relation {
280 name: "mz_compute_exports",
281 category: RelationCategory::Introspection,
282 },
283 Relation {
284 name: "mz_dataflow_arrangement_sizes",
285 category: RelationCategory::Introspection,
286 },
287 Relation {
288 name: "mz_expected_group_size_advice",
289 category: RelationCategory::Introspection,
290 },
291 Relation {
292 name: "mz_compute_frontiers",
293 category: RelationCategory::Introspection,
294 },
295 Relation {
296 name: "mz_dataflow_channel_operators_per_worker",
297 category: RelationCategory::Introspection,
298 },
299 Relation {
300 name: "mz_dataflow_channel_operators",
301 category: RelationCategory::Introspection,
302 },
303 Relation {
304 name: "mz_compute_import_frontiers",
305 category: RelationCategory::Introspection,
306 },
307 Relation {
308 name: "mz_message_counts_per_worker",
309 category: RelationCategory::Introspection,
310 },
311 Relation {
312 name: "mz_message_counts",
313 category: RelationCategory::Introspection,
314 },
315 Relation {
316 name: "mz_active_peeks",
317 category: RelationCategory::Introspection,
318 },
319 Relation {
320 name: "mz_compute_operator_durations_histogram_per_worker",
321 category: RelationCategory::Introspection,
322 },
323 Relation {
324 name: "mz_compute_operator_durations_histogram",
325 category: RelationCategory::Introspection,
326 },
327 Relation {
328 name: "mz_records_per_dataflow_operator_per_worker",
329 category: RelationCategory::Introspection,
330 },
331 Relation {
332 name: "mz_records_per_dataflow_operator",
333 category: RelationCategory::Introspection,
334 },
335 Relation {
336 name: "mz_records_per_dataflow_per_worker",
337 category: RelationCategory::Introspection,
338 },
339 Relation {
340 name: "mz_records_per_dataflow",
341 category: RelationCategory::Introspection,
342 },
343 Relation {
344 name: "mz_peek_durations_histogram_per_worker",
345 category: RelationCategory::Introspection,
346 },
347 Relation {
348 name: "mz_peek_durations_histogram",
349 category: RelationCategory::Introspection,
350 },
351 Relation {
352 name: "mz_dataflow_shutdown_durations_histogram_per_worker",
353 category: RelationCategory::Introspection,
354 },
355 Relation {
356 name: "mz_dataflow_shutdown_durations_histogram",
357 category: RelationCategory::Introspection,
358 },
359 Relation {
360 name: "mz_scheduling_elapsed_per_worker",
361 category: RelationCategory::Introspection,
362 },
363 Relation {
364 name: "mz_scheduling_elapsed",
365 category: RelationCategory::Introspection,
366 },
367 Relation {
368 name: "mz_scheduling_parks_histogram_per_worker",
369 category: RelationCategory::Introspection,
370 },
371 Relation {
372 name: "mz_scheduling_parks_histogram",
373 category: RelationCategory::Introspection,
374 },
375 Relation {
376 name: "mz_compute_lir_mapping_per_worker",
377 category: RelationCategory::Introspection,
378 },
379 Relation {
380 name: "mz_lir_mapping",
381 category: RelationCategory::Introspection,
382 },
383 Relation {
384 name: "mz_compute_error_counts",
385 category: RelationCategory::Introspection,
386 },
387 Relation {
388 name: "mz_compute_error_counts_per_worker",
389 category: RelationCategory::Introspection,
390 },
391 Relation {
394 name: "mz_cluster_replica_metrics_history",
395 category: RelationCategory::Basic,
396 },
397 Relation {
398 name: "mz_webhook_sources",
399 category: RelationCategory::Basic,
400 },
401 Relation {
402 name: "mz_cluster_replica_history",
403 category: RelationCategory::Basic,
404 },
405 Relation {
406 name: "mz_source_statistics",
407 category: RelationCategory::Basic,
408 },
409 Relation {
410 name: "mz_cluster_deployment_lineage",
411 category: RelationCategory::Basic,
412 },
413 Relation {
414 name: "mz_show_indexes",
415 category: RelationCategory::Basic,
416 },
417 Relation {
418 name: "mz_relations",
419 category: RelationCategory::Basic,
420 },
421 Relation {
422 name: "mz_frontiers",
423 category: RelationCategory::Basic,
424 },
425 Relation {
426 name: "mz_console_cluster_utilization_overview",
427 category: RelationCategory::Basic,
428 },
429 Relation {
430 name: "mz_columns",
431 category: RelationCategory::Basic,
432 },
433 Relation {
434 name: "mz_kafka_sources",
435 category: RelationCategory::Basic,
436 },
437 Relation {
438 name: "mz_kafka_sinks",
439 category: RelationCategory::Basic,
440 },
441];
442
443static PG_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
444static PG_QUERY_TIMEOUT: Duration = Duration::from_secs(20);
447
448static MAX_CLUSTER_REPLICA_ERROR_COUNT: usize = 3;
451
452static SET_SEARCH_PATH_QUERY: &str = "SET search_path = mz_internal, mz_catalog, mz_introspection";
453static SELECT_CLUSTER_REPLICAS_QUERY: &str = "SELECT c.name as cluster_name, cr.name as replica_name FROM mz_clusters AS c JOIN mz_cluster_replicas AS cr ON c.id = cr.cluster_id;";
454
455#[derive(Debug, Clone, PartialEq, Eq, Hash)]
456pub struct ClusterReplica {
457 pub cluster_name: String,
458 pub replica_name: String,
459}
460
461impl Default for ClusterReplica {
462 fn default() -> Self {
463 Self {
464 cluster_name: "mz_catalog_server".to_string(),
465 replica_name: "r1".to_string(),
466 }
467 }
468}
469
470impl fmt::Display for ClusterReplica {
471 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
472 write!(f, "{}.{}", self.cluster_name, self.replica_name)
473 }
474}
475
476pub struct SystemCatalogDumper {
477 base_path: PathBuf,
478 pg_client: Arc<Mutex<PgClient>>,
479 pg_tls: MakeTlsConnector,
480 cluster_replicas: Vec<ClusterReplica>,
481 _pg_conn_handle: JoinHandle<Result<(), tokio_postgres::Error>>,
482}
483
484pub async fn create_postgres_connection(
485 connection_string: &str,
486) -> Result<
487 (
488 PgClient,
489 Connection<Socket, TlsStream<Socket>>,
490 MakeTlsConnector,
491 ),
492 anyhow::Error,
493> {
494 let mut pg_config = PgConfig::from_str(connection_string)?;
495 pg_config.connect_timeout(PG_CONNECTION_TIMEOUT);
496 let tls = make_tls(&pg_config)?;
497
498 let host_addr = pg_config.get_hosts().first();
499 let port = pg_config.get_ports().first();
500
501 let redacted_connection_string = if let (Some(host_addr), Some(port)) = (host_addr, port) {
503 format!(" at {:?} on port {}", host_addr, port)
504 } else {
505 "".to_string()
506 };
507
508 info!(
509 "Connecting to PostgreSQL server{}",
510 redacted_connection_string
511 );
512
513 let (pg_client, pg_conn) = retry::Retry::default()
514 .max_duration(PG_CONNECTION_TIMEOUT)
515 .retry_async_canceling(|_| {
516 let pg_config = pg_config.clone();
517 let tls = tls.clone();
518 async move { pg_config.connect(tls).await.map_err(|e| anyhow::anyhow!(e)) }
519 })
520 .await?;
521
522 info!(
523 "Connected to PostgreSQL server{}",
524 redacted_connection_string
525 );
526
527 Ok((pg_client, pg_conn, tls))
528}
529
530pub async fn write_copy_stream(
531 transaction: &Transaction<'_>,
532 copy_query: &str,
533 file: &mut tokio::fs::File,
534 relation_name: &str,
535) -> Result<(), anyhow::Error> {
536 let copy_stream = transaction
537 .copy_out(copy_query)
538 .await
539 .context(format!("Failed to COPY TO for {}", relation_name))?
540 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
541 let copy_stream = std::pin::pin!(copy_stream);
542 let mut reader = StreamReader::new(copy_stream);
543 tokio::io::copy(&mut reader, file).await?;
544 file.sync_all().await?;
546
547 Ok::<(), anyhow::Error>(())
548}
549
550pub async fn copy_relation_to_csv(
551 transaction: &Transaction<'_>,
552 file_path_name: PathBuf,
553 column_names: &Vec<String>,
554 relation: &Relation,
555) -> Result<(), anyhow::Error> {
556 let mut file = tokio::fs::File::create(&file_path_name).await?;
557
558 match relation.category {
559 RelationCategory::Retained => {
560 let mut writer = AsyncSerializer::from_writer(file);
561 writer.serialize(column_names).await?;
562
563 transaction
564 .execute(
565 &format!("DECLARE c CURSOR FOR SUBSCRIBE TO {}", relation.name),
566 &[],
567 )
568 .await
569 .context("Failed to declare cursor")?;
570
571 let rows = transaction
575 .simple_query("FETCH ALL FROM c WITH (TIMEOUT '1')")
578 .await
579 .context("Failed to fetch all from cursor")?;
580
581 for row in rows {
582 if let SimpleQueryMessage::Row(row) = row {
583 let values: Vec<&str> = (0..row.len())
584 .map(|i| row.get(i).unwrap_or("")) .collect();
586 writer.serialize(&values).await?;
587 }
588 }
589 }
590 _ => {
591 file.write_all((column_names.join(",") + "\n").as_bytes())
593 .await?;
594 let copy_query = format!(
595 "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)",
596 relation.name
597 );
598 write_copy_stream(transaction, ©_query, &mut file, relation.name).await?;
599 }
600 };
601
602 info!("Copied {} to {}", relation.name, file_path_name.display());
603 Ok::<(), anyhow::Error>(())
604}
605
606pub async fn query_column_names(
607 pg_client: &PgClient,
608 relation: &Relation,
609) -> Result<Vec<String>, anyhow::Error> {
610 let relation_name = relation.name;
611 let mut column_names = pg_client
613 .query(&format!("SHOW COLUMNS FROM {}", &relation_name), &[])
614 .await
615 .context(format!("Failed to get column names for {}", relation_name))?
616 .into_iter()
617 .map(|row| match row.try_get::<_, String>("name") {
618 Ok(name) => Some(name),
619 Err(_) => None,
620 })
621 .filter_map(|row| row)
622 .collect::<Vec<_>>();
623
624 match relation.category {
625 RelationCategory::Retained => {
626 column_names.splice(0..0, ["mz_timestamp".to_string(), "mz_diff".to_string()]);
627 }
628 _ => (),
629 }
630
631 Ok(column_names)
632}
633
634pub async fn query_relation(
635 transaction: &Transaction<'_>,
636 base_path: PathBuf,
637 relation: &Relation,
638 column_names: &Vec<String>,
639 cluster_replica: Option<&ClusterReplica>,
640) -> Result<(), anyhow::Error> {
641 let relation_name = relation.name;
642 let relation_category = &relation.category;
643
644 if let Some(cluster_replica) = &cluster_replica {
646 transaction
647 .execute(
648 &format!("SET LOCAL CLUSTER = '{}'", cluster_replica.cluster_name),
649 &[],
650 )
651 .await
652 .context(format!(
653 "Failed to set cluster to {}",
654 cluster_replica.cluster_name
655 ))?;
656 transaction
657 .execute(
658 &format!(
659 "SET LOCAL CLUSTER_REPLICA = '{}'",
660 cluster_replica.replica_name
661 ),
662 &[],
663 )
664 .await
665 .context(format!(
666 "Failed to set cluster replica to {}",
667 cluster_replica.replica_name
668 ))?;
669 }
670
671 match relation_category {
672 RelationCategory::Basic => {
673 let file_path = format_file_path(base_path, None);
674 let file_path_name = file_path.join(relation_name).with_extension("csv");
675 tokio::fs::create_dir_all(&file_path).await?;
676
677 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
678 }
679 RelationCategory::Introspection => {
680 let file_path = format_file_path(base_path, cluster_replica);
681 tokio::fs::create_dir_all(&file_path).await?;
682
683 let file_path_name = file_path.join(relation_name).with_extension("csv");
684
685 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
686 }
687 RelationCategory::Retained => {
688 let file_path = format_file_path(base_path, None);
690 let file_path_name = file_path
691 .join(format!("{}_subscribe", relation_name))
692 .with_extension("csv");
693 tokio::fs::create_dir_all(&file_path).await?;
694
695 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
696 }
697 }
698 Ok::<(), anyhow::Error>(())
699}
700
701impl SystemCatalogDumper {
702 pub async fn new(connection_url: &str, base_path: PathBuf) -> Result<Self, anyhow::Error> {
703 let (pg_client, pg_conn, pg_tls) = create_postgres_connection(connection_url).await?;
704
705 let handle = task::spawn(|| "postgres-connection", pg_conn);
706
707 pg_client
709 .execute(SET_SEARCH_PATH_QUERY, &[])
710 .await
711 .context("Failed to set search path")?;
712
713 let cluster_replicas = match pg_client.query(SELECT_CLUSTER_REPLICAS_QUERY, &[]).await {
715 Ok(rows) => rows
716 .into_iter()
717 .map(|row| {
718 let cluster_name = row.try_get::<_, String>("cluster_name");
719 let replica_name = row.try_get::<_, String>("replica_name");
720
721 if let (Ok(cluster_name), Ok(replica_name)) = (cluster_name, replica_name) {
722 Some(ClusterReplica {
723 cluster_name,
724 replica_name,
725 })
726 } else {
727 None
728 }
729 })
730 .filter_map(|row| row)
731 .collect::<Vec<_>>(),
732 Err(e) => {
733 warn!("Failed to get replica names: {}", e);
734 vec![]
735 }
736 };
737
738 Ok(Self {
739 base_path,
740 pg_client: Arc::new(Mutex::new(pg_client)),
741 pg_tls,
742 cluster_replicas,
743 _pg_conn_handle: handle,
744 })
745 }
746
747 pub async fn dump_relation(
748 &self,
749 relation: &Relation,
750 cluster_replica: Option<&ClusterReplica>,
751 ) -> Result<(), anyhow::Error> {
752 info!(
753 "Copying relation {}{}{}",
754 relation.name,
755 match relation.category {
756 RelationCategory::Retained => " (subscribe history)",
757 _ => "",
758 },
759 cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" in {}", replica))
760 );
761
762 let base_path = self.base_path.clone();
763 let pg_client = &self.pg_client;
764
765 let relation_name = relation.name.to_string();
766
767 if let Err(err) = retry::Retry::default()
768 .max_duration(PG_QUERY_TIMEOUT)
769 .initial_backoff(Duration::from_secs(2))
770 .retry_async_canceling(|_| {
771 let base_path = base_path.clone();
772 let relation_name = relation.name;
773 let cluster_replica = cluster_replica.clone();
774
775 async move {
776 let mut pg_client = pg_client.lock().await;
778
779 match async {
780 let column_names = query_column_names(&pg_client, relation).await?;
783
784 let transaction = pg_client.transaction().await?;
785 query_relation(
786 &transaction,
787 base_path,
788 relation,
789 &column_names,
790 cluster_replica,
791 )
792 .await?;
793
794 Ok::<(), anyhow::Error>(())
795 }
796 .await
797 {
798 Ok(()) => Ok(()),
799 Err(err) => {
800 warn!(
801 "{}: {:#}. Retrying...",
802 format_catalog_dump_error_message(relation_name, cluster_replica),
803 err
804 );
805 Err(err)
806 }
807 }
808 }
809 })
810 .await
811 {
812 let pg_client_lock = pg_client.lock().await;
813
814 let cancel_token = pg_client_lock.cancel_token();
815
816 if let Err(_) = async {
817 let tls = self.pg_tls.clone();
818
819 cancel_token.cancel_query(tls).await?;
820 Ok::<(), anyhow::Error>(())
821 }
822 .await
823 {
824 warn!(
825 "Failed to cancel query for {}{}",
826 relation_name,
827 cluster_replica
828 .map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
829 );
830 }
831
832 return Err(err);
833 }
834
835 Ok(())
836 }
837
838 pub async fn dump_all_relations(&self) {
839 let cluster_replicas = &self.cluster_replicas;
840
841 let mut cluster_replica_error_counts: HashMap<ClusterReplica, usize> = HashMap::new();
843 for replica in cluster_replicas {
844 cluster_replica_error_counts
845 .entry(replica.clone())
846 .insert_entry(0);
847 }
848
849 let non_introspection_iter = RELATIONS
850 .iter()
851 .filter(|relation| {
852 matches!(
853 relation.category,
854 RelationCategory::Basic | RelationCategory::Retained
855 )
856 })
857 .map(|relation| (relation, None::<&ClusterReplica>));
858
859 let introspection_iter = RELATIONS
860 .iter()
861 .filter(|relation| matches!(relation.category, RelationCategory::Introspection))
862 .collect::<Vec<_>>();
863
864 let introspection_iter = cluster_replicas.iter().flat_map(|replica| {
865 introspection_iter
866 .iter()
867 .map(move |relation| (*relation, Some(replica)))
868 });
869
870 for (relation, replica) in non_introspection_iter.chain(introspection_iter) {
872 let replica_key = if let Some(replica) = replica {
873 replica
874 } else {
875 &ClusterReplica::default()
877 };
878
879 if cluster_replica_error_counts.get(replica_key).unwrap_or(&0)
883 >= &MAX_CLUSTER_REPLICA_ERROR_COUNT
884 {
885 info!(
886 "Skipping {}{}",
887 relation.name,
888 replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
889 );
890 continue;
891 }
892
893 if let Err(err) = self.dump_relation(relation, replica).await {
894 warn!(
895 "{}: {:#}.",
896 format_catalog_dump_error_message(relation.name, replica),
897 err,
898 );
899
900 if err.to_string().contains("deadline has elapsed") {
901 let docs_link = if replica.is_none()
902 || replica.map_or(false, |r| r.cluster_name == "mz_catalog_server")
903 {
904 "https://materialize.com/docs/installation/troubleshooting/#troubleshooting-console-unresponsiveness"
905 } else {
906 "https://materialize.com/docs/sql/alter-cluster/#resizing-1"
907 };
908 warn!("Consider increasing the size of the cluster {}", docs_link);
909 }
910
911 let is_missing_catalog_item_err = match err.downcast_ref::<tokio_postgres::Error>()
912 {
913 Some(pg_err) => pg_err
914 .to_string()
915 .to_lowercase()
916 .contains("unknown catalog item"),
917 None => false,
918 };
919
920 if !is_missing_catalog_item_err {
926 cluster_replica_error_counts
927 .entry(replica_key.clone())
928 .and_modify(|count| *count += 1)
929 .or_insert(1);
930 }
931 }
932 }
933 }
934}
935
936fn format_catalog_dump_error_message(
937 relation_name: &str,
938 cluster_replica: Option<&ClusterReplica>,
939) -> String {
940 format!(
941 "Failed to dump relation {}{}",
942 relation_name,
943 cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
944 )
945}
946
947fn format_file_path(base_path: PathBuf, cluster_replica: Option<&ClusterReplica>) -> PathBuf {
948 let path = base_path.join(SYSTEM_CATALOG_DUMP_DIR);
949 if let Some(cluster_replica) = cluster_replica {
950 path.join(cluster_replica.cluster_name.as_str())
951 .join(cluster_replica.replica_name.as_str())
952 } else {
953 path
954 }
955}