1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use std::str::FromStr;
11use std::time::Duration;
1213use anyhow::{Context, anyhow, bail};
14use mz_ore::retry::Retry;
15use mz_ore::task;
16use mz_tls_util::make_tls;
17use tokio_postgres::config::Host;
18use tokio_postgres::{Client, Config};
19use url::Url;
2021/// Constructs a URL from PostgreSQL configuration parameters.
22///
23/// Returns an error if the set of configuration parameters is not representable
24/// as a URL, e.g., if there are multiple hosts.
25pub fn config_url(config: &Config) -> Result<Url, anyhow::Error> {
26let mut url = Url::parse("postgresql://").unwrap();
2728let host = match config.get_hosts() {
29 [] => "localhost".into(),
30 [Host::Tcp(host)] => host.clone(),
31 [Host::Unix(path)] => path.display().to_string(),
32_ => bail!("Materialize URL cannot contain multiple hosts"),
33 };
34 url.set_host(Some(&host))
35 .context("parsing Materialize host")?;
3637 url.set_port(Some(match config.get_ports() {
38 [] => 5432,
39 [port] => *port,
40_ => bail!("Materialize URL cannot contain multiple ports"),
41 }))
42 .expect("known to be valid to set port");
4344if let Some(user) = config.get_user() {
45 url.set_username(user)
46 .expect("known to be valid to set username");
47 }
4849Ok(url)
50}
5152pub async fn postgres_client(
53 url: &str,
54 default_timeout: Duration,
55) -> Result<(Client, task::JoinHandle<Result<(), tokio_postgres::Error>>), anyhow::Error> {
56let (client, connection) = Retry::default()
57 .max_duration(default_timeout)
58 .retry_async_canceling(|_| async move {
59let pgconfig = &mut Config::from_str(url)?;
60 pgconfig.connect_timeout(default_timeout);
61let tls = make_tls(pgconfig)?;
62 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
63 })
64 .await?;
6566println!("Connecting to PostgreSQL server at {}...", url);
67let handle = task::spawn(|| "postgres_client_task", connection);
6869Ok((client, handle))
70}