1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use anyhow::Result;
11use tokio::time::{self, Duration};
12use tokio_postgres::{Client, Error, NoTls, Row, error::SqlState};
13use tracing::{debug, info};
1415use mz_ore::task;
1617/// Create and return a new PostgreSQL client, spawning off the connection
18/// object along the way.
19pub async fn client(host: &str, port: u16) -> Result<Client> {
20let (mz_client, conn) = tokio_postgres::Config::new()
21 .user("materialize")
22 .host(host)
23 .port(port)
24 .connect(NoTls)
25 .await?;
2627// The connection object performs the actual communication with the database,
28 // so spawn it off to run on its own.
29task::spawn(|| "test_util_mz_client", async move {
30if let Err(e) = conn.await {
31panic!("connection error: {}", e);
32 }
33 });
3435Ok(mz_client)
36}
3738/// Try running PostgresSQL's `query` function, checking for a common
39/// Materialize error in `check_error`.
40pub async fn try_query(mz_client: &Client, query: &str, delay: Duration) -> Result<Vec<Row>> {
41loop {
42let timer = std::time::Instant::now();
43match mz_client.query(&*query, &[]).await {
44Ok(rows) => return Ok(rows),
45Err(e) => check_error(e)?,
46 }
47 delay_for(timer.elapsed(), delay).await;
48 }
49}
5051/// Try running PostgreSQL's `query_one` function, checking for a common
52/// Materialize error in `check_error`.
53pub async fn try_query_one(mz_client: &Client, query: &str, delay: Duration) -> Result<Row> {
54loop {
55let timer = std::time::Instant::now();
56match mz_client.query_one(&*query, &[]).await {
57Ok(rows) => return Ok(rows),
58Err(e) => check_error(e)?,
59 }
60 delay_for(timer.elapsed(), delay).await;
61 }
62}
6364/// The SQL_STATEMENT_NOT_YET_COMPLETE error will surface if we query a view in
65/// Materialize before data exists for that view. It is common to hit this error
66/// just after creating a view, particularly in testing or demo code.
67///
68/// Since this error is likely transient, we should retry reading from the view
69/// instead of failing.
70fn check_error(e: Error) -> Result<()> {
71if e.code() == Some(&SqlState::SQL_STATEMENT_NOT_YET_COMPLETE) {
72info!("Error querying, will try again... {}", e.to_string());
73Ok(())
74 } else {
75Err(anyhow::Error::from(e))
76 }
77}
7879/// Limit the queries per second against a view in Materialize.
80async fn delay_for(elapsed: Duration, delay: Duration) {
81if elapsed < delay {
82 time::sleep(delay - elapsed).await;
83 } else {
84info!(
85"Expected to query for records in {:#?}, took {:#?}",
86 delay, elapsed
87 );
88 }
89}
9091/// Run Materialize's `SHOW SOURCES` command
92pub async fn show_sources(mz_client: &Client) -> Result<Vec<String>> {
93let mut res = Vec::new();
94for row in mz_client.query("SHOW SOURCES", &[]).await? {
95 res.push(row.get(0))
96 }
9798Ok(res)
99}
100101/// Delete a source and all dependent views, if the source exists
102pub async fn drop_source(mz_client: &Client, name: &str) -> Result<()> {
103let q = format!("DROP SOURCE IF EXISTS {} CASCADE", name);
104debug!("deleting source=> {}", q);
105 mz_client.execute(&*q, &[]).await?;
106Ok(())
107}
108109/// Delete a table and all dependent views, if the table exists
110pub async fn drop_table(mz_client: &Client, name: &str) -> Result<()> {
111let q = format!("DROP TABLE IF EXISTS {} CASCADE", name);
112debug!("deleting table=> {}", q);
113 mz_client.execute(&*q, &[]).await?;
114Ok(())
115}
116117/// Delete an index
118pub async fn drop_index(mz_client: &Client, name: &str) -> Result<()> {
119let q = format!("DROP INDEX {}", name);
120debug!("deleting index=> {}", q);
121 mz_client.execute(&*q, &[]).await?;
122Ok(())
123}
124125/// Run PostgreSQL's `execute` function
126pub async fn execute(mz_client: &Client, query: &str) -> Result<u64> {
127debug!("exec=> {}", query);
128Ok(mz_client.execute(query, &[]).await?)
129}