Skip to main content

mz_testdrive/action/duckdb/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use anyhow::{Context, anyhow};
13use duckdb::types::ValueRef;
14use mz_ore::retry::{Retry, RetryResult};
15
16use crate::action::duckdb::get_or_create_connection;
17use crate::action::{ControlFlow, State};
18use crate::parser::BuiltinCommand;
19
20pub async fn run_query(
21    mut cmd: BuiltinCommand,
22    state: &mut State,
23) -> Result<ControlFlow, anyhow::Error> {
24    let name = cmd.args.string("name")?;
25    let sort_rows = cmd.args.opt_bool("sort-rows")?.unwrap_or(false);
26    cmd.args.done()?;
27
28    // First line is the query, remaining lines are expected output
29    let mut lines = cmd.input.into_iter();
30    let query = lines
31        .next()
32        .ok_or_else(|| anyhow!("duckdb-query requires a query as the first input line"))?;
33    let mut expected_rows: Vec<String> = lines.collect();
34    if sort_rows {
35        expected_rows.sort();
36    }
37
38    let conn = get_or_create_connection(state, name).await?;
39    println!(">> {}", query);
40
41    // Reading from an external system (e.g. an Iceberg table backed by a sink
42    // with a non-zero COMMIT INTERVAL) is eventually consistent, so retry until
43    // the result matches the expectation or the timeout elapses, mirroring the
44    // behavior of testdrive's `>` SQL queries. Query-execution errors (e.g. the
45    // table not yet existing after a restart) are retried too.
46    Retry::default()
47        .initial_backoff(state.initial_backoff)
48        .factor(state.backoff_factor)
49        .max_duration(state.timeout)
50        .max_tries(state.max_tries)
51        .retry_async(|retry_state| {
52            let conn = Arc::clone(&conn);
53            let query = query.clone();
54            let expected_rows = &expected_rows;
55            async move {
56                let res = mz_ore::task::spawn_blocking(
57                    || "duckdb_query".to_string(),
58                    move || {
59                        let conn = conn.lock().map_err(|e| anyhow!("lock poisoned: {}", e))?;
60                        let mut stmt =
61                            conn.prepare(&query).context("preparing DuckDB query")?;
62                        let mut rows = stmt.query([]).context("executing DuckDB query")?;
63
64                        let mut result = Vec::new();
65                        while let Some(row) = rows.next()? {
66                            // Get column count from the row's statement
67                            let column_count = row.as_ref().column_count();
68                            let mut row_values = Vec::with_capacity(column_count);
69                            for i in 0..column_count {
70                                let val = row.get_ref(i)?;
71                                let formatted = format_value(&val);
72                                row_values.push(formatted);
73                            }
74                            result.push(row_values.join(" "));
75                        }
76                        Ok::<_, anyhow::Error>(result)
77                    },
78                )
79                .await;
80
81                let err = match res {
82                    Err(e) => e,
83                    Ok(mut actual_rows) => {
84                        if sort_rows {
85                            actual_rows.sort();
86                        }
87                        if actual_rows == *expected_rows {
88                            return RetryResult::Ok(());
89                        }
90                        anyhow!(
91                            "DuckDB query result mismatch\nexpected ({} rows):\n{}\n\nactual ({} rows):\n{}",
92                            expected_rows.len(),
93                            expected_rows.join("\n"),
94                            actual_rows.len(),
95                            actual_rows.join("\n")
96                        )
97                    }
98                };
99
100                // Only print the first retry notice in CI to avoid spamming the
101                // log; print every one locally for visibility.
102                if retry_state.i == 0 || !mz_ore::env::is_var_truthy("CI") {
103                    if let Some(backoff) = retry_state.next_backoff {
104                        if !backoff.is_zero() {
105                            println!(
106                                "{}\nrows didn't match; sleeping to see if the result catches up 🕑 {:.0?}",
107                                err, backoff
108                            );
109                        }
110                    }
111                }
112
113                RetryResult::RetryableErr(err)
114            }
115        })
116        .await?;
117
118    Ok(ControlFlow::Continue)
119}
120
121fn format_value(val: &ValueRef) -> String {
122    match val {
123        ValueRef::Null => "<null>".to_string(),
124        ValueRef::Boolean(b) => b.to_string(),
125        ValueRef::TinyInt(i) => i.to_string(),
126        ValueRef::SmallInt(i) => i.to_string(),
127        ValueRef::Int(i) => i.to_string(),
128        ValueRef::BigInt(i) => i.to_string(),
129        ValueRef::HugeInt(i) => i.to_string(),
130        ValueRef::UTinyInt(i) => i.to_string(),
131        ValueRef::USmallInt(i) => i.to_string(),
132        ValueRef::UInt(i) => i.to_string(),
133        ValueRef::UBigInt(i) => i.to_string(),
134        ValueRef::Float(f) => f.to_string(),
135        ValueRef::Double(f) => f.to_string(),
136        ValueRef::Text(bytes) => String::from_utf8_lossy(bytes).to_string(),
137        ValueRef::Blob(bytes) => format!("{:?}", bytes),
138        _ => format!("{:?}", val),
139    }
140}