mz_testdrive/action/duckdb/
query.rs1use std::sync::Arc;
11
12use anyhow::{Context, anyhow};
13use duckdb::types::ValueRef;
14use mz_ore::retry::{Retry, RetryResult};
15
16use crate::action::duckdb::get_or_create_connection;
17use crate::action::{ControlFlow, State};
18use crate::parser::BuiltinCommand;
19
20pub async fn run_query(
21 mut cmd: BuiltinCommand,
22 state: &mut State,
23) -> Result<ControlFlow, anyhow::Error> {
24 let name = cmd.args.string("name")?;
25 let sort_rows = cmd.args.opt_bool("sort-rows")?.unwrap_or(false);
26 cmd.args.done()?;
27
28 let mut lines = cmd.input.into_iter();
30 let query = lines
31 .next()
32 .ok_or_else(|| anyhow!("duckdb-query requires a query as the first input line"))?;
33 let mut expected_rows: Vec<String> = lines.collect();
34 if sort_rows {
35 expected_rows.sort();
36 }
37
38 let conn = get_or_create_connection(state, name).await?;
39 println!(">> {}", query);
40
41 Retry::default()
47 .initial_backoff(state.initial_backoff)
48 .factor(state.backoff_factor)
49 .max_duration(state.timeout)
50 .max_tries(state.max_tries)
51 .retry_async(|retry_state| {
52 let conn = Arc::clone(&conn);
53 let query = query.clone();
54 let expected_rows = &expected_rows;
55 async move {
56 let res = mz_ore::task::spawn_blocking(
57 || "duckdb_query".to_string(),
58 move || {
59 let conn = conn.lock().map_err(|e| anyhow!("lock poisoned: {}", e))?;
60 let mut stmt =
61 conn.prepare(&query).context("preparing DuckDB query")?;
62 let mut rows = stmt.query([]).context("executing DuckDB query")?;
63
64 let mut result = Vec::new();
65 while let Some(row) = rows.next()? {
66 let column_count = row.as_ref().column_count();
68 let mut row_values = Vec::with_capacity(column_count);
69 for i in 0..column_count {
70 let val = row.get_ref(i)?;
71 let formatted = format_value(&val);
72 row_values.push(formatted);
73 }
74 result.push(row_values.join(" "));
75 }
76 Ok::<_, anyhow::Error>(result)
77 },
78 )
79 .await;
80
81 let err = match res {
82 Err(e) => e,
83 Ok(mut actual_rows) => {
84 if sort_rows {
85 actual_rows.sort();
86 }
87 if actual_rows == *expected_rows {
88 return RetryResult::Ok(());
89 }
90 anyhow!(
91 "DuckDB query result mismatch\nexpected ({} rows):\n{}\n\nactual ({} rows):\n{}",
92 expected_rows.len(),
93 expected_rows.join("\n"),
94 actual_rows.len(),
95 actual_rows.join("\n")
96 )
97 }
98 };
99
100 if retry_state.i == 0 || !mz_ore::env::is_var_truthy("CI") {
103 if let Some(backoff) = retry_state.next_backoff {
104 if !backoff.is_zero() {
105 println!(
106 "{}\nrows didn't match; sleeping to see if the result catches up 🕑 {:.0?}",
107 err, backoff
108 );
109 }
110 }
111 }
112
113 RetryResult::RetryableErr(err)
114 }
115 })
116 .await?;
117
118 Ok(ControlFlow::Continue)
119}
120
121fn format_value(val: &ValueRef) -> String {
122 match val {
123 ValueRef::Null => "<null>".to_string(),
124 ValueRef::Boolean(b) => b.to_string(),
125 ValueRef::TinyInt(i) => i.to_string(),
126 ValueRef::SmallInt(i) => i.to_string(),
127 ValueRef::Int(i) => i.to_string(),
128 ValueRef::BigInt(i) => i.to_string(),
129 ValueRef::HugeInt(i) => i.to_string(),
130 ValueRef::UTinyInt(i) => i.to_string(),
131 ValueRef::USmallInt(i) => i.to_string(),
132 ValueRef::UInt(i) => i.to_string(),
133 ValueRef::UBigInt(i) => i.to_string(),
134 ValueRef::Float(f) => f.to_string(),
135 ValueRef::Double(f) => f.to_string(),
136 ValueRef::Text(bytes) => String::from_utf8_lossy(bytes).to_string(),
137 ValueRef::Blob(bytes) => format!("{:?}", bytes),
138 _ => format!("{:?}", val),
139 }
140}