1use anyhow::{Context as _, Result};
22use chrono::{DateTime, Utc};
23use csv_async::AsyncSerializer;
24use futures::TryStreamExt;
25use mz_tls_util::make_tls;
26use std::fmt;
27use std::path::PathBuf;
28use std::str::FromStr;
29use std::sync::Arc;
30use std::time::Duration;
31use tokio::io::AsyncWriteExt;
32use tokio::sync::Mutex;
33use tokio_postgres::{
34 Client as PgClient, Config as PgConfig, Connection, SimpleQueryMessage, Socket, Transaction,
35};
36use tokio_util::io::StreamReader;
37
38use mz_ore::collections::HashMap;
39use mz_ore::retry::{self};
40use mz_ore::task::{self, JoinHandle};
41use postgres_openssl::{MakeTlsConnector, TlsStream};
42use tracing::{error, info};
43
44use crate::Context;
45use crate::utils::format_base_path;
46
47#[derive(Debug, Clone)]
48pub enum RelationCategory {
49 Introspection,
52 Retained,
54 Basic,
56}
57
58#[derive(Debug, Clone)]
59pub struct Relation {
60 pub name: &'static str,
61 pub category: RelationCategory,
62}
63
64static RELATIONS: &[Relation] = &[
67 Relation {
69 name: "mz_audit_events",
70 category: RelationCategory::Basic,
71 },
72 Relation {
73 name: "mz_databases",
74 category: RelationCategory::Basic,
75 },
76 Relation {
77 name: "mz_schemas",
78 category: RelationCategory::Basic,
79 },
80 Relation {
81 name: "mz_tables",
82 category: RelationCategory::Basic,
83 },
84 Relation {
85 name: "mz_sources",
86 category: RelationCategory::Basic,
87 },
88 Relation {
89 name: "mz_sinks",
90 category: RelationCategory::Basic,
91 },
92 Relation {
93 name: "mz_views",
94 category: RelationCategory::Basic,
95 },
96 Relation {
97 name: "mz_materialized_views",
98 category: RelationCategory::Basic,
99 },
100 Relation {
101 name: "mz_secrets",
102 category: RelationCategory::Basic,
103 },
104 Relation {
105 name: "mz_connections",
106 category: RelationCategory::Basic,
107 },
108 Relation {
109 name: "mz_roles",
110 category: RelationCategory::Basic,
111 },
112 Relation {
113 name: "mz_subscriptions",
114 category: RelationCategory::Basic,
115 },
116 Relation {
117 name: "mz_object_fully_qualified_names",
118 category: RelationCategory::Basic,
119 },
120 Relation {
121 name: "mz_sessions",
122 category: RelationCategory::Basic,
123 },
124 Relation {
125 name: "mz_object_history",
126 category: RelationCategory::Basic,
127 },
128 Relation {
129 name: "mz_object_lifetimes",
130 category: RelationCategory::Basic,
131 },
132 Relation {
133 name: "mz_object_dependencies",
134 category: RelationCategory::Basic,
135 },
136 Relation {
137 name: "mz_object_transitive_dependencies",
138 category: RelationCategory::Basic,
139 },
140 Relation {
142 name: "mz_clusters",
143 category: RelationCategory::Basic,
144 },
145 Relation {
146 name: "mz_indexes",
147 category: RelationCategory::Basic,
148 },
149 Relation {
150 name: "mz_cluster_replicas",
151 category: RelationCategory::Basic,
152 },
153 Relation {
154 name: "mz_cluster_replica_sizes",
155 category: RelationCategory::Basic,
156 },
157 Relation {
158 name: "mz_cluster_replica_statuses",
159 category: RelationCategory::Basic,
160 },
161 Relation {
162 name: "mz_cluster_replica_metrics_history",
163 category: RelationCategory::Basic,
164 },
165 Relation {
166 name: "mz_compute_hydration_times",
167 category: RelationCategory::Retained,
168 },
169 Relation {
170 name: "mz_materialization_dependencies",
171 category: RelationCategory::Basic,
172 },
173 Relation {
174 name: "mz_cluster_replica_status_history",
175 category: RelationCategory::Basic,
176 },
177 Relation {
179 name: "mz_wallclock_global_lag_recent_history",
180 category: RelationCategory::Basic,
181 },
182 Relation {
183 name: "mz_global_frontiers",
184 category: RelationCategory::Basic,
185 },
186 Relation {
187 name: "mz_cluster_replica_frontiers",
188 category: RelationCategory::Basic,
189 },
190 Relation {
191 name: "mz_materialization_lag",
192 category: RelationCategory::Basic,
193 },
194 Relation {
196 name: "mz_source_statistics_with_history",
197 category: RelationCategory::Basic,
198 },
199 Relation {
200 name: "mz_source_statistics_with_history",
201 category: RelationCategory::Retained,
202 },
203 Relation {
204 name: "mz_sink_statistics",
205 category: RelationCategory::Basic,
206 },
207 Relation {
208 name: "mz_sink_statistics",
209 category: RelationCategory::Retained,
210 },
211 Relation {
212 name: "mz_source_statuses",
213 category: RelationCategory::Basic,
214 },
215 Relation {
216 name: "mz_sink_statuses",
217 category: RelationCategory::Basic,
218 },
219 Relation {
220 name: "mz_source_status_history",
221 category: RelationCategory::Basic,
222 },
223 Relation {
224 name: "mz_sink_status_history",
225 category: RelationCategory::Basic,
226 },
227 Relation {
229 name: "mz_materialized_view_refresh_strategies",
230 category: RelationCategory::Basic,
231 },
232 Relation {
233 name: "mz_cluster_schedules",
234 category: RelationCategory::Basic,
235 },
236 Relation {
238 name: "mz_recent_storage_usage",
239 category: RelationCategory::Basic,
240 },
241 Relation {
243 name: "mz_arrangement_sharing_per_worker",
244 category: RelationCategory::Introspection,
245 },
246 Relation {
247 name: "mz_arrangement_sharing",
248 category: RelationCategory::Introspection,
249 },
250 Relation {
251 name: "mz_arrangement_sizes_per_worker",
252 category: RelationCategory::Introspection,
253 },
254 Relation {
255 name: "mz_dataflow_channels",
256 category: RelationCategory::Introspection,
257 },
258 Relation {
259 name: "mz_dataflow_operators",
260 category: RelationCategory::Introspection,
261 },
262 Relation {
263 name: "mz_dataflow_global_ids",
264 category: RelationCategory::Introspection,
265 },
266 Relation {
267 name: "mz_dataflow_operator_dataflows_per_worker",
268 category: RelationCategory::Introspection,
269 },
270 Relation {
271 name: "mz_dataflow_operator_dataflows",
272 category: RelationCategory::Introspection,
273 },
274 Relation {
275 name: "mz_dataflow_operator_parents_per_worker",
276 category: RelationCategory::Introspection,
277 },
278 Relation {
279 name: "mz_dataflow_operator_parents",
280 category: RelationCategory::Introspection,
281 },
282 Relation {
283 name: "mz_compute_exports",
284 category: RelationCategory::Introspection,
285 },
286 Relation {
287 name: "mz_dataflow_arrangement_sizes",
288 category: RelationCategory::Introspection,
289 },
290 Relation {
291 name: "mz_expected_group_size_advice",
292 category: RelationCategory::Introspection,
293 },
294 Relation {
295 name: "mz_compute_frontiers",
296 category: RelationCategory::Introspection,
297 },
298 Relation {
299 name: "mz_dataflow_channel_operators_per_worker",
300 category: RelationCategory::Introspection,
301 },
302 Relation {
303 name: "mz_dataflow_channel_operators",
304 category: RelationCategory::Introspection,
305 },
306 Relation {
307 name: "mz_compute_import_frontiers",
308 category: RelationCategory::Introspection,
309 },
310 Relation {
311 name: "mz_message_counts_per_worker",
312 category: RelationCategory::Introspection,
313 },
314 Relation {
315 name: "mz_message_counts",
316 category: RelationCategory::Introspection,
317 },
318 Relation {
319 name: "mz_active_peeks",
320 category: RelationCategory::Introspection,
321 },
322 Relation {
323 name: "mz_compute_operator_durations_histogram_per_worker",
324 category: RelationCategory::Introspection,
325 },
326 Relation {
327 name: "mz_compute_operator_durations_histogram",
328 category: RelationCategory::Introspection,
329 },
330 Relation {
331 name: "mz_records_per_dataflow_operator_per_worker",
332 category: RelationCategory::Introspection,
333 },
334 Relation {
335 name: "mz_records_per_dataflow_operator",
336 category: RelationCategory::Introspection,
337 },
338 Relation {
339 name: "mz_records_per_dataflow_per_worker",
340 category: RelationCategory::Introspection,
341 },
342 Relation {
343 name: "mz_records_per_dataflow",
344 category: RelationCategory::Introspection,
345 },
346 Relation {
347 name: "mz_peek_durations_histogram_per_worker",
348 category: RelationCategory::Introspection,
349 },
350 Relation {
351 name: "mz_peek_durations_histogram",
352 category: RelationCategory::Introspection,
353 },
354 Relation {
355 name: "mz_dataflow_shutdown_durations_histogram_per_worker",
356 category: RelationCategory::Introspection,
357 },
358 Relation {
359 name: "mz_dataflow_shutdown_durations_histogram",
360 category: RelationCategory::Introspection,
361 },
362 Relation {
363 name: "mz_scheduling_elapsed_per_worker",
364 category: RelationCategory::Introspection,
365 },
366 Relation {
367 name: "mz_scheduling_elapsed",
368 category: RelationCategory::Introspection,
369 },
370 Relation {
371 name: "mz_scheduling_parks_histogram_per_worker",
372 category: RelationCategory::Introspection,
373 },
374 Relation {
375 name: "mz_scheduling_parks_histogram",
376 category: RelationCategory::Introspection,
377 },
378 Relation {
379 name: "mz_compute_lir_mapping_per_worker",
380 category: RelationCategory::Introspection,
381 },
382 Relation {
383 name: "mz_lir_mapping",
384 category: RelationCategory::Introspection,
385 },
386 Relation {
387 name: "mz_compute_error_counts",
388 category: RelationCategory::Introspection,
389 },
390 Relation {
391 name: "mz_compute_error_counts_per_worker",
392 category: RelationCategory::Introspection,
393 },
394 Relation {
397 name: "mz_cluster_replica_metrics_history",
398 category: RelationCategory::Basic,
399 },
400 Relation {
401 name: "mz_webhook_sources",
402 category: RelationCategory::Basic,
403 },
404 Relation {
405 name: "mz_cluster_replica_history",
406 category: RelationCategory::Basic,
407 },
408 Relation {
409 name: "mz_source_statistics",
410 category: RelationCategory::Basic,
411 },
412 Relation {
413 name: "mz_cluster_deployment_lineage",
414 category: RelationCategory::Basic,
415 },
416 Relation {
417 name: "mz_show_indexes",
418 category: RelationCategory::Basic,
419 },
420 Relation {
421 name: "mz_relations",
422 category: RelationCategory::Basic,
423 },
424 Relation {
425 name: "mz_frontiers",
426 category: RelationCategory::Basic,
427 },
428 Relation {
429 name: "mz_console_cluster_utilization_overview",
430 category: RelationCategory::Basic,
431 },
432 Relation {
433 name: "mz_columns",
434 category: RelationCategory::Basic,
435 },
436 Relation {
437 name: "mz_kafka_sources",
438 category: RelationCategory::Basic,
439 },
440 Relation {
441 name: "mz_kafka_sinks",
442 category: RelationCategory::Basic,
443 },
444];
445
446static PG_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
447static PG_QUERY_TIMEOUT: Duration = Duration::from_secs(20);
450
451static MAX_CLUSTER_REPLICA_ERROR_COUNT: usize = 3;
454
455static SET_SEARCH_PATH_QUERY: &str = "SET search_path = mz_internal, mz_catalog, mz_introspection";
456static SELECT_CLUSTER_REPLICAS_QUERY: &str = "SELECT c.name as cluster_name, cr.name as replica_name FROM mz_clusters AS c JOIN mz_cluster_replicas AS cr ON c.id = cr.cluster_id;";
457
458#[derive(Debug, Clone, PartialEq, Eq, Hash)]
459pub struct ClusterReplica {
460 pub cluster_name: String,
461 pub replica_name: String,
462}
463
464impl Default for ClusterReplica {
465 fn default() -> Self {
466 Self {
467 cluster_name: "mz_catalog_server".to_string(),
468 replica_name: "r1".to_string(),
469 }
470 }
471}
472
473impl fmt::Display for ClusterReplica {
474 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475 write!(f, "{}.{}", self.cluster_name, self.replica_name)
476 }
477}
478
479pub struct SystemCatalogDumper<'n> {
480 context: &'n Context,
481 pg_client: Arc<Mutex<PgClient>>,
482 pg_tls: MakeTlsConnector,
483 cluster_replicas: Vec<ClusterReplica>,
484 _pg_conn_handle: JoinHandle<Result<(), tokio_postgres::Error>>,
485}
486
487pub async fn create_postgres_connection(
488 connection_string: &str,
489) -> Result<
490 (
491 PgClient,
492 Connection<Socket, TlsStream<Socket>>,
493 MakeTlsConnector,
494 ),
495 anyhow::Error,
496> {
497 let mut pg_config = PgConfig::from_str(connection_string)?;
498 pg_config.connect_timeout(PG_CONNECTION_TIMEOUT);
499 let tls = make_tls(&pg_config)?;
500 info!(
501 "Connecting to PostgreSQL server at {}...",
502 connection_string
503 );
504 let (pg_client, pg_conn) = retry::Retry::default()
505 .max_duration(PG_CONNECTION_TIMEOUT)
506 .retry_async_canceling(|_| {
507 let pg_config = pg_config.clone();
508 let tls = tls.clone();
509 async move { pg_config.connect(tls).await.map_err(|e| anyhow::anyhow!(e)) }
510 })
511 .await?;
512
513 Ok((pg_client, pg_conn, tls))
514}
515
516pub async fn write_copy_stream(
517 transaction: &Transaction<'_>,
518 copy_query: &str,
519 file: &mut tokio::fs::File,
520 relation_name: &str,
521) -> Result<(), anyhow::Error> {
522 let copy_stream = transaction
523 .copy_out(copy_query)
524 .await
525 .context(format!("Failed to COPY TO for {}", relation_name))?
526 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
527 let copy_stream = std::pin::pin!(copy_stream);
528 let mut reader = StreamReader::new(copy_stream);
529 tokio::io::copy(&mut reader, file).await?;
530 file.sync_all().await?;
532
533 Ok::<(), anyhow::Error>(())
534}
535
536pub async fn copy_relation_to_csv(
537 transaction: &Transaction<'_>,
538 file_path_name: PathBuf,
539 column_names: &Vec<String>,
540 relation: &Relation,
541) -> Result<(), anyhow::Error> {
542 let mut file = tokio::fs::File::create(&file_path_name).await?;
543
544 match relation.category {
545 RelationCategory::Retained => {
546 let mut writer = AsyncSerializer::from_writer(file);
547 writer.serialize(column_names).await?;
548
549 transaction
550 .execute(
551 &format!("DECLARE c CURSOR FOR SUBSCRIBE TO {}", relation.name),
552 &[],
553 )
554 .await
555 .context("Failed to declare cursor")?;
556
557 let rows = transaction
561 .simple_query("FETCH ALL FROM c WITH (TIMEOUT '1')")
564 .await
565 .context("Failed to fetch all from cursor")?;
566
567 for row in rows {
568 if let SimpleQueryMessage::Row(row) = row {
569 let values: Vec<&str> = (0..row.len())
570 .map(|i| row.get(i).unwrap_or("")) .collect();
572 writer.serialize(&values).await?;
573 }
574 }
575 }
576 _ => {
577 file.write_all((column_names.join(",") + "\n").as_bytes())
579 .await?;
580 let copy_query = format!(
581 "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)",
582 relation.name
583 );
584 write_copy_stream(transaction, ©_query, &mut file, relation.name).await?;
585 }
586 };
587
588 info!("Copied {} to {}", relation.name, file_path_name.display());
589 Ok::<(), anyhow::Error>(())
590}
591
592pub async fn query_column_names(
593 pg_client: &PgClient,
594 relation: &Relation,
595) -> Result<Vec<String>, anyhow::Error> {
596 let relation_name = relation.name;
597 let mut column_names = pg_client
599 .query(&format!("SHOW COLUMNS FROM {}", &relation_name), &[])
600 .await
601 .context(format!("Failed to get column names for {}", relation_name))?
602 .into_iter()
603 .map(|row| match row.try_get::<_, String>("name") {
604 Ok(name) => Some(name),
605 Err(_) => None,
606 })
607 .filter_map(|row| row)
608 .collect::<Vec<_>>();
609
610 match relation.category {
611 RelationCategory::Retained => {
612 column_names.splice(0..0, ["mz_timestamp".to_string(), "mz_diff".to_string()]);
613 }
614 _ => (),
615 }
616
617 Ok(column_names)
618}
619
620pub async fn query_relation(
621 transaction: &Transaction<'_>,
622 start_time: DateTime<Utc>,
623 relation: &Relation,
624 column_names: &Vec<String>,
625 cluster_replica: Option<&ClusterReplica>,
626) -> Result<(), anyhow::Error> {
627 let relation_name = relation.name;
628 let relation_category = &relation.category;
629
630 if let Some(cluster_replica) = &cluster_replica {
632 transaction
633 .execute(
634 &format!("SET LOCAL CLUSTER = '{}'", cluster_replica.cluster_name),
635 &[],
636 )
637 .await
638 .context(format!(
639 "Failed to set cluster to {}",
640 cluster_replica.cluster_name
641 ))?;
642 transaction
643 .execute(
644 &format!(
645 "SET LOCAL CLUSTER_REPLICA = '{}'",
646 cluster_replica.replica_name
647 ),
648 &[],
649 )
650 .await
651 .context(format!(
652 "Failed to set cluster replica to {}",
653 cluster_replica.replica_name
654 ))?;
655 }
656
657 match relation_category {
658 RelationCategory::Basic => {
659 let file_path = format_file_path(start_time, None);
660 let file_path_name = file_path.join(relation_name).with_extension("csv");
661 tokio::fs::create_dir_all(&file_path).await?;
662
663 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
664 }
665 RelationCategory::Introspection => {
666 let file_path = format_file_path(start_time, cluster_replica);
667 tokio::fs::create_dir_all(&file_path).await?;
668
669 let file_path_name = file_path.join(relation_name).with_extension("csv");
670
671 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
672 }
673 RelationCategory::Retained => {
674 let file_path = format_file_path(start_time, None);
676 let file_path_name = file_path
677 .join(format!("{}_subscribe", relation_name))
678 .with_extension("csv");
679 tokio::fs::create_dir_all(&file_path).await?;
680
681 copy_relation_to_csv(transaction, file_path_name, column_names, relation).await?;
682 }
683 }
684 Ok::<(), anyhow::Error>(())
685}
686
687impl<'n> SystemCatalogDumper<'n> {
688 pub async fn new(context: &'n Context, connection_string: &str) -> Result<Self, anyhow::Error> {
689 let (pg_client, pg_conn, pg_tls) = create_postgres_connection(connection_string).await?;
690
691 info!("Connected to PostgreSQL server at {}...", connection_string);
692
693 let handle = task::spawn(|| "postgres-connection", pg_conn);
694
695 pg_client
697 .execute(SET_SEARCH_PATH_QUERY, &[])
698 .await
699 .context("Failed to set search path")?;
700
701 let cluster_replicas = match pg_client.query(SELECT_CLUSTER_REPLICAS_QUERY, &[]).await {
703 Ok(rows) => rows
704 .into_iter()
705 .map(|row| {
706 let cluster_name = row.try_get::<_, String>("cluster_name");
707 let replica_name = row.try_get::<_, String>("replica_name");
708
709 if let (Ok(cluster_name), Ok(replica_name)) = (cluster_name, replica_name) {
710 Some(ClusterReplica {
711 cluster_name,
712 replica_name,
713 })
714 } else {
715 None
716 }
717 })
718 .filter_map(|row| row)
719 .collect::<Vec<_>>(),
720 Err(e) => {
721 error!("Failed to get replica names: {}", e);
722 vec![]
723 }
724 };
725
726 Ok(Self {
727 context,
728 pg_client: Arc::new(Mutex::new(pg_client)),
729 pg_tls,
730 cluster_replicas,
731 _pg_conn_handle: handle,
732 })
733 }
734
735 pub async fn dump_relation(
736 &self,
737 relation: &Relation,
738 cluster_replica: Option<&ClusterReplica>,
739 ) -> Result<(), anyhow::Error> {
740 info!(
741 "Copying relation {}{}{}",
742 relation.name,
743 match relation.category {
744 RelationCategory::Retained => " (subscribe history)",
745 _ => "",
746 },
747 cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" in {}", replica))
748 );
749
750 let start_time = self.context.start_time;
751 let pg_client = &self.pg_client;
752
753 let relation_name = relation.name.to_string();
754
755 if let Err(err) = retry::Retry::default()
756 .max_duration(PG_QUERY_TIMEOUT)
757 .initial_backoff(Duration::from_secs(2))
758 .retry_async_canceling(|_| {
759 let start_time = start_time.clone();
760 let relation_name = relation.name;
761 let cluster_replica = cluster_replica.clone();
762
763 async move {
764 let mut pg_client = pg_client.lock().await;
766
767 match async {
768 let column_names = query_column_names(&pg_client, relation).await?;
771
772 let transaction = pg_client.transaction().await?;
773 query_relation(
774 &transaction,
775 start_time,
776 relation,
777 &column_names,
778 cluster_replica,
779 )
780 .await?;
781
782 Ok::<(), anyhow::Error>(())
783 }
784 .await
785 {
786 Ok(()) => Ok(()),
787 Err(err) => {
788 error!(
789 "{}: {:#}. Retrying...",
790 format_catalog_dump_error_message(relation_name, cluster_replica),
791 err
792 );
793 Err(err)
794 }
795 }
796 }
797 })
798 .await
799 {
800 let pg_client_lock = pg_client.lock().await;
801
802 let cancel_token = pg_client_lock.cancel_token();
803
804 if let Err(_) = async {
805 let tls = self.pg_tls.clone();
806
807 cancel_token.cancel_query(tls).await?;
808 Ok::<(), anyhow::Error>(())
809 }
810 .await
811 {
812 error!(
813 "Failed to cancel query for {}{}",
814 relation_name,
815 cluster_replica
816 .map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
817 );
818 }
819
820 return Err(err);
821 }
822
823 Ok(())
824 }
825
826 pub async fn dump_all_relations(&self) {
827 let cluster_replicas = &self.cluster_replicas;
828
829 let mut cluster_replica_error_counts: HashMap<ClusterReplica, usize> = HashMap::new();
831 for replica in cluster_replicas {
832 cluster_replica_error_counts
833 .entry(replica.clone())
834 .insert_entry(0);
835 }
836
837 let non_introspection_iter = RELATIONS
838 .iter()
839 .filter(|relation| {
840 matches!(
841 relation.category,
842 RelationCategory::Basic | RelationCategory::Retained
843 )
844 })
845 .map(|relation| (relation, None::<&ClusterReplica>));
846
847 let introspection_iter = RELATIONS
848 .iter()
849 .filter(|relation| matches!(relation.category, RelationCategory::Introspection))
850 .collect::<Vec<_>>();
851
852 let introspection_iter = cluster_replicas.iter().flat_map(|replica| {
853 introspection_iter
854 .iter()
855 .map(move |relation| (*relation, Some(replica)))
856 });
857
858 for (relation, replica) in non_introspection_iter.chain(introspection_iter) {
860 let replica_key = if let Some(replica) = replica {
861 replica
862 } else {
863 &ClusterReplica::default()
865 };
866
867 if cluster_replica_error_counts.get(replica_key).unwrap_or(&0)
871 >= &MAX_CLUSTER_REPLICA_ERROR_COUNT
872 {
873 info!(
874 "Skipping {}{}",
875 relation.name,
876 replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
877 );
878 continue;
879 }
880
881 if let Err(err) = self.dump_relation(relation, replica).await {
882 error!(
883 "{}: {:#}.",
884 format_catalog_dump_error_message(relation.name, replica),
885 err,
886 );
887
888 if err.to_string().contains("deadline has elapsed") {
889 let docs_link = if replica.is_none()
890 || replica.map_or(false, |r| r.cluster_name == "mz_catalog_server")
891 {
892 "https://materialize.com/docs/self-managed/v25.1/installation/troubleshooting/#troubleshooting-console-unresponsiveness"
893 } else {
894 "https://materialize.com/docs/sql/alter-cluster/#resizing-1"
895 };
896 error!("Consider increasing the size of the cluster {}", docs_link);
897 }
898
899 let is_missing_catalog_item_err = match err.downcast_ref::<tokio_postgres::Error>()
900 {
901 Some(pg_err) => pg_err
902 .to_string()
903 .to_lowercase()
904 .contains("unknown catalog item"),
905 None => false,
906 };
907
908 if !is_missing_catalog_item_err {
914 cluster_replica_error_counts
915 .entry(replica_key.clone())
916 .and_modify(|count| *count += 1)
917 .or_insert(1);
918 }
919 }
920 }
921 }
922}
923
924fn format_catalog_dump_error_message(
925 relation_name: &str,
926 cluster_replica: Option<&ClusterReplica>,
927) -> String {
928 format!(
929 "Failed to dump relation {}{}",
930 relation_name,
931 cluster_replica.map_or_else(|| "".to_string(), |replica| format!(" for {}", replica))
932 )
933}
934
935fn format_file_path(date_time: DateTime<Utc>, cluster_replica: Option<&ClusterReplica>) -> PathBuf {
936 let path = format_base_path(date_time).join("system-catalog");
937 if let Some(cluster_replica) = cluster_replica {
938 path.join(cluster_replica.cluster_name.as_str())
939 .join(cluster_replica.replica_name.as_str())
940 } else {
941 path
942 }
943}