mz_testdrive/util/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11use std::time::Duration;
12
13use anyhow::{Context, anyhow, bail};
14use mz_ore::retry::Retry;
15use mz_ore::task;
16use mz_tls_util::make_tls;
17use tokio_postgres::config::Host;
18use tokio_postgres::{Client, Config};
19use url::Url;
20
21/// Constructs a URL from PostgreSQL configuration parameters.
22///
23/// Returns an error if the set of configuration parameters is not representable
24/// as a URL, e.g., if there are multiple hosts.
25pub fn config_url(config: &Config) -> Result<Url, anyhow::Error> {
26    let mut url = Url::parse("postgresql://").unwrap();
27
28    let host = match config.get_hosts() {
29        [] => "localhost".into(),
30        [Host::Tcp(host)] => host.clone(),
31        [Host::Unix(path)] => path.display().to_string(),
32        _ => bail!("Materialize URL cannot contain multiple hosts"),
33    };
34    url.set_host(Some(&host))
35        .context("parsing Materialize host")?;
36
37    url.set_port(Some(match config.get_ports() {
38        [] => 5432,
39        [port] => *port,
40        _ => bail!("Materialize URL cannot contain multiple ports"),
41    }))
42    .expect("known to be valid to set port");
43
44    if let Some(user) = config.get_user() {
45        url.set_username(user)
46            .expect("known to be valid to set username");
47    }
48
49    Ok(url)
50}
51
52pub async fn postgres_client(
53    url: &str,
54    default_timeout: Duration,
55) -> Result<(Client, task::JoinHandle<Result<(), tokio_postgres::Error>>), anyhow::Error> {
56    let (client, connection) = Retry::default()
57        .max_duration(default_timeout)
58        .retry_async_canceling(|_| async move {
59            let pgconfig = &mut Config::from_str(url)?;
60            pgconfig.connect_timeout(default_timeout);
61            let tls = make_tls(pgconfig)?;
62            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
63        })
64        .await?;
65
66    println!("Connecting to PostgreSQL server at {}...", url);
67    let handle = task::spawn(|| "postgres_client_task", connection);
68
69    Ok((client, handle))
70}