mz_testdrive/util/
postgres.rs1use std::str::FromStr;
11use std::time::Duration;
12
13use anyhow::{Context, anyhow, bail};
14use mz_ore::retry::Retry;
15use mz_ore::task;
16use mz_tls_util::make_tls;
17use tokio_postgres::config::Host;
18use tokio_postgres::{Client, Config};
19use url::Url;
20
21pub fn config_url(config: &Config) -> Result<Url, anyhow::Error> {
26 let mut url = Url::parse("postgresql://").unwrap();
27
28 let host = match config.get_hosts() {
29 [] => "localhost".into(),
30 [Host::Tcp(host)] => host.clone(),
31 [Host::Unix(path)] => path.display().to_string(),
32 _ => bail!("Materialize URL cannot contain multiple hosts"),
33 };
34 url.set_host(Some(&host))
35 .context("parsing Materialize host")?;
36
37 url.set_port(Some(match config.get_ports() {
38 [] => 5432,
39 [port] => *port,
40 _ => bail!("Materialize URL cannot contain multiple ports"),
41 }))
42 .expect("known to be valid to set port");
43
44 if let Some(user) = config.get_user() {
45 url.set_username(user)
46 .expect("known to be valid to set username");
47 }
48
49 Ok(url)
50}
51
52pub async fn postgres_client(
53 url: &str,
54 default_timeout: Duration,
55) -> Result<(Client, task::JoinHandle<Result<(), tokio_postgres::Error>>), anyhow::Error> {
56 let (client, connection) = Retry::default()
57 .max_duration(default_timeout)
58 .retry_async_canceling(|_| async move {
59 let pgconfig = &mut Config::from_str(url)?;
60 pgconfig.connect_timeout(default_timeout);
61 let tls = make_tls(pgconfig)?;
62 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
63 })
64 .await?;
65
66 if url.contains("mzp_") {
67 println!("Connecting to PostgreSQL server at [REDACTED]...");
68 } else {
69 println!("Connecting to PostgreSQL server at {}...", url);
70 }
71 let handle = task::spawn(|| "postgres_client_task", connection);
72
73 Ok((client, handle))
74}