mz_test_util/
mz_client.rs1use anyhow::Result;
11use tokio::time::{self, Duration};
12use tokio_postgres::{Client, Error, NoTls, Row, error::SqlState};
13use tracing::{debug, info};
14
15use mz_ore::task;
16
17pub async fn client(host: &str, port: u16) -> Result<Client> {
20 let (mz_client, conn) = tokio_postgres::Config::new()
21 .user("materialize")
22 .host(host)
23 .port(port)
24 .connect(NoTls)
25 .await?;
26
27 task::spawn(|| "test_util_mz_client", async move {
30 if let Err(e) = conn.await {
31 panic!("connection error: {}", e);
32 }
33 });
34
35 Ok(mz_client)
36}
37
38#[allow(clippy::disallowed_methods)]
42pub async fn try_query(mz_client: &Client, query: &str, delay: Duration) -> Result<Vec<Row>> {
43 loop {
44 let timer = std::time::Instant::now();
45 match mz_client.query(&*query, &[]).await {
46 Ok(rows) => return Ok(rows),
47 Err(e) => check_error(e)?,
48 }
49 delay_for(timer.elapsed(), delay).await;
50 }
51}
52
53#[allow(clippy::disallowed_methods)]
57pub async fn try_query_one(mz_client: &Client, query: &str, delay: Duration) -> Result<Row> {
58 loop {
59 let timer = std::time::Instant::now();
60 match mz_client.query_one(&*query, &[]).await {
61 Ok(rows) => return Ok(rows),
62 Err(e) => check_error(e)?,
63 }
64 delay_for(timer.elapsed(), delay).await;
65 }
66}
67
68fn check_error(e: Error) -> Result<()> {
75 if e.code() == Some(&SqlState::SQL_STATEMENT_NOT_YET_COMPLETE) {
76 info!("Error querying, will try again... {}", e.to_string());
77 Ok(())
78 } else {
79 Err(anyhow::Error::from(e))
80 }
81}
82
83async fn delay_for(elapsed: Duration, delay: Duration) {
85 if elapsed < delay {
86 time::sleep(delay - elapsed).await;
87 } else {
88 info!(
89 "Expected to query for records in {:#?}, took {:#?}",
90 delay, elapsed
91 );
92 }
93}
94
95#[allow(clippy::disallowed_methods)]
98pub async fn show_sources(mz_client: &Client) -> Result<Vec<String>> {
99 let mut res = Vec::new();
100 for row in mz_client.query("SHOW SOURCES", &[]).await? {
101 res.push(row.get(0))
102 }
103
104 Ok(res)
105}
106
107#[allow(clippy::disallowed_methods)]
110pub async fn drop_source(mz_client: &Client, name: &str) -> Result<()> {
111 let q = format!("DROP SOURCE IF EXISTS {} CASCADE", name);
112 debug!("deleting source=> {}", q);
113 mz_client.execute(&*q, &[]).await?;
114 Ok(())
115}
116
117#[allow(clippy::disallowed_methods)]
120pub async fn drop_table(mz_client: &Client, name: &str) -> Result<()> {
121 let q = format!("DROP TABLE IF EXISTS {} CASCADE", name);
122 debug!("deleting table=> {}", q);
123 mz_client.execute(&*q, &[]).await?;
124 Ok(())
125}
126
127#[allow(clippy::disallowed_methods)]
130pub async fn drop_index(mz_client: &Client, name: &str) -> Result<()> {
131 let q = format!("DROP INDEX {}", name);
132 debug!("deleting index=> {}", q);
133 mz_client.execute(&*q, &[]).await?;
134 Ok(())
135}
136
137#[allow(clippy::disallowed_methods)]
140pub async fn execute(mz_client: &Client, query: &str) -> Result<u64> {
141 debug!("exec=> {}", query);
142 Ok(mz_client.execute(query, &[]).await?)
143}