mz_test_util/
mz_client.rs1use anyhow::Result;
11use tokio::time::{self, Duration};
12use tokio_postgres::{Client, Error, NoTls, Row, error::SqlState};
13use tracing::{debug, info};
14
15use mz_ore::task;
16
17pub async fn client(host: &str, port: u16) -> Result<Client> {
20 let (mz_client, conn) = tokio_postgres::Config::new()
21 .user("materialize")
22 .host(host)
23 .port(port)
24 .connect(NoTls)
25 .await?;
26
27 task::spawn(|| "test_util_mz_client", async move {
30 if let Err(e) = conn.await {
31 panic!("connection error: {}", e);
32 }
33 });
34
35 Ok(mz_client)
36}
37
38pub async fn try_query(mz_client: &Client, query: &str, delay: Duration) -> Result<Vec<Row>> {
41 loop {
42 let timer = std::time::Instant::now();
43 match mz_client.query(&*query, &[]).await {
44 Ok(rows) => return Ok(rows),
45 Err(e) => check_error(e)?,
46 }
47 delay_for(timer.elapsed(), delay).await;
48 }
49}
50
51pub async fn try_query_one(mz_client: &Client, query: &str, delay: Duration) -> Result<Row> {
54 loop {
55 let timer = std::time::Instant::now();
56 match mz_client.query_one(&*query, &[]).await {
57 Ok(rows) => return Ok(rows),
58 Err(e) => check_error(e)?,
59 }
60 delay_for(timer.elapsed(), delay).await;
61 }
62}
63
64fn check_error(e: Error) -> Result<()> {
71 if e.code() == Some(&SqlState::SQL_STATEMENT_NOT_YET_COMPLETE) {
72 info!("Error querying, will try again... {}", e.to_string());
73 Ok(())
74 } else {
75 Err(anyhow::Error::from(e))
76 }
77}
78
79async fn delay_for(elapsed: Duration, delay: Duration) {
81 if elapsed < delay {
82 time::sleep(delay - elapsed).await;
83 } else {
84 info!(
85 "Expected to query for records in {:#?}, took {:#?}",
86 delay, elapsed
87 );
88 }
89}
90
91pub async fn show_sources(mz_client: &Client) -> Result<Vec<String>> {
93 let mut res = Vec::new();
94 for row in mz_client.query("SHOW SOURCES", &[]).await? {
95 res.push(row.get(0))
96 }
97
98 Ok(res)
99}
100
101pub async fn drop_source(mz_client: &Client, name: &str) -> Result<()> {
103 let q = format!("DROP SOURCE IF EXISTS {} CASCADE", name);
104 debug!("deleting source=> {}", q);
105 mz_client.execute(&*q, &[]).await?;
106 Ok(())
107}
108
109pub async fn drop_table(mz_client: &Client, name: &str) -> Result<()> {
111 let q = format!("DROP TABLE IF EXISTS {} CASCADE", name);
112 debug!("deleting table=> {}", q);
113 mz_client.execute(&*q, &[]).await?;
114 Ok(())
115}
116
117pub async fn drop_index(mz_client: &Client, name: &str) -> Result<()> {
119 let q = format!("DROP INDEX {}", name);
120 debug!("deleting index=> {}", q);
121 mz_client.execute(&*q, &[]).await?;
122 Ok(())
123}
124
125pub async fn execute(mz_client: &Client, query: &str) -> Result<u64> {
127 debug!("exec=> {}", query);
128 Ok(mz_client.execute(query, &[]).await?)
129}