Skip to main content

mz_test_util/
mz_client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::Result;
11use tokio::time::{self, Duration};
12use tokio_postgres::{Client, Error, NoTls, Row, error::SqlState};
13use tracing::{debug, info};
14
15use mz_ore::task;
16
17/// Create and return a new PostgreSQL client, spawning off the connection
18/// object along the way.
19pub async fn client(host: &str, port: u16) -> Result<Client> {
20    let (mz_client, conn) = tokio_postgres::Config::new()
21        .user("materialize")
22        .host(host)
23        .port(port)
24        .connect(NoTls)
25        .await?;
26
27    // The connection object performs the actual communication with the database,
28    // so spawn it off to run on its own.
29    task::spawn(|| "test_util_mz_client", async move {
30        if let Err(e) = conn.await {
31            panic!("connection error: {}", e);
32        }
33    });
34
35    Ok(mz_client)
36}
37
38/// Try running PostgresSQL's `query` function, checking for a common
39/// Materialize error in `check_error`.
40// This helper intentionally executes caller-provided test SQL.
41#[allow(clippy::disallowed_methods)]
42pub async fn try_query(mz_client: &Client, query: &str, delay: Duration) -> Result<Vec<Row>> {
43    loop {
44        let timer = std::time::Instant::now();
45        match mz_client.query(&*query, &[]).await {
46            Ok(rows) => return Ok(rows),
47            Err(e) => check_error(e)?,
48        }
49        delay_for(timer.elapsed(), delay).await;
50    }
51}
52
53/// Try running PostgreSQL's `query_one` function, checking for a common
54/// Materialize error in `check_error`.
55// This helper intentionally executes caller-provided test SQL.
56#[allow(clippy::disallowed_methods)]
57pub async fn try_query_one(mz_client: &Client, query: &str, delay: Duration) -> Result<Row> {
58    loop {
59        let timer = std::time::Instant::now();
60        match mz_client.query_one(&*query, &[]).await {
61            Ok(rows) => return Ok(rows),
62            Err(e) => check_error(e)?,
63        }
64        delay_for(timer.elapsed(), delay).await;
65    }
66}
67
68/// The SQL_STATEMENT_NOT_YET_COMPLETE error will surface if we query a view in
69/// Materialize before data exists for that view. It is common to hit this error
70/// just after creating a view, particularly in testing or demo code.
71///
72/// Since this error is likely transient, we should retry reading from the view
73/// instead of failing.
74fn check_error(e: Error) -> Result<()> {
75    if e.code() == Some(&SqlState::SQL_STATEMENT_NOT_YET_COMPLETE) {
76        info!("Error querying, will try again... {}", e.to_string());
77        Ok(())
78    } else {
79        Err(anyhow::Error::from(e))
80    }
81}
82
83/// Limit the queries per second against a view in Materialize.
84async fn delay_for(elapsed: Duration, delay: Duration) {
85    if elapsed < delay {
86        time::sleep(delay - elapsed).await;
87    } else {
88        info!(
89            "Expected to query for records in {:#?}, took {:#?}",
90            delay, elapsed
91        );
92    }
93}
94
95/// Run Materialize's `SHOW SOURCES` command
96// Test utility uses direct driver query for a fixed introspection statement.
97#[allow(clippy::disallowed_methods)]
98pub async fn show_sources(mz_client: &Client) -> Result<Vec<String>> {
99    let mut res = Vec::new();
100    for row in mz_client.query("SHOW SOURCES", &[]).await? {
101        res.push(row.get(0))
102    }
103
104    Ok(res)
105}
106
107/// Delete a source and all dependent views, if the source exists
108// Name is supplied by test code and this utility intentionally forwards SQL.
109#[allow(clippy::disallowed_methods)]
110pub async fn drop_source(mz_client: &Client, name: &str) -> Result<()> {
111    let q = format!("DROP SOURCE IF EXISTS {} CASCADE", name);
112    debug!("deleting source=> {}", q);
113    mz_client.execute(&*q, &[]).await?;
114    Ok(())
115}
116
117/// Delete a table and all dependent views, if the table exists
118// Name is supplied by test code and this utility intentionally forwards SQL.
119#[allow(clippy::disallowed_methods)]
120pub async fn drop_table(mz_client: &Client, name: &str) -> Result<()> {
121    let q = format!("DROP TABLE IF EXISTS {} CASCADE", name);
122    debug!("deleting table=> {}", q);
123    mz_client.execute(&*q, &[]).await?;
124    Ok(())
125}
126
127/// Delete an index
128// Name is supplied by test code and this utility intentionally forwards SQL.
129#[allow(clippy::disallowed_methods)]
130pub async fn drop_index(mz_client: &Client, name: &str) -> Result<()> {
131    let q = format!("DROP INDEX {}", name);
132    debug!("deleting index=> {}", q);
133    mz_client.execute(&*q, &[]).await?;
134    Ok(())
135}
136
137/// Run PostgreSQL's `execute` function
138// This helper intentionally executes caller-provided test SQL.
139#[allow(clippy::disallowed_methods)]
140pub async fn execute(mz_client: &Client, query: &str) -> Result<u64> {
141    debug!("exec=> {}", query);
142    Ok(mz_client.execute(query, &[]).await?)
143}