Skip to main content

mz_sqllogictest/
runner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The Materialize-specific runner for sqllogictest.
11//!
12//! slt tests expect a serialized execution of sql statements and queries.
13//! To get the same results in materialize we track current_timestamp and increment it whenever we execute a statement.
14//!
15//! The high-level workflow is:
16//!   for each record in the test file:
17//!     if record is a sql statement:
18//!       run sql in postgres, observe changes and copy them to materialize using LocalInput::Updates(..)
19//!       advance current_timestamp
20//!       promise to never send updates for times < current_timestamp using LocalInput::Watermark(..)
21//!       compare to expected results
22//!       if wrong, bail out and stop processing this file
23//!     if record is a sql query:
24//!       peek query at current_timestamp
25//!       compare to expected results
26//!       if wrong, record the error
27
28use std::collections::BTreeMap;
29use std::error::Error;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write};
32use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::LazyLock;
36use std::time::Duration;
37use std::{env, fmt, ops, str, thread};
38
39use anyhow::{anyhow, bail};
40use bytes::BytesMut;
41use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
42use fallible_iterator::FallibleIterator;
43use futures::sink::SinkExt;
44use itertools::Itertools;
45use maplit::btreemap;
46use md5::{Digest, Md5};
47use mz_adapter_types::bootstrap_builtin_cluster_config::{
48    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
49    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
50    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
51};
52use mz_catalog::config::ClusterReplicaSizeMap;
53use mz_controller::{ControllerConfig, ReplicaHttpLocator};
54use mz_environmentd::CatalogConfig;
55use mz_license_keys::ValidatedLicenseKey;
56use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
57use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
58use mz_ore::cast::{CastFrom, ReinterpretCast};
59use mz_ore::channel::trigger;
60use mz_ore::error::ErrorExt;
61use mz_ore::metrics::MetricsRegistry;
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::retry::Retry;
64use mz_ore::task;
65use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
66use mz_ore::tracing::TracingHandle;
67use mz_ore::url::SensitiveUrl;
68use mz_persist_client::PersistLocation;
69use mz_persist_client::cache::PersistClientCache;
70use mz_persist_client::cfg::PersistConfig;
71use mz_persist_client::rpc::{
72    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
73};
74use mz_pgrepr::{Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value, oid};
75use mz_repr::ColumnName;
76use mz_repr::adt::date::Date;
77use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
78use mz_repr::adt::numeric;
79use mz_secrets::SecretsController;
80use mz_server_core::listeners::{
81    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
82    ListenersConfig, SqlListenerConfig,
83};
84use mz_sql::ast::{Expr, Raw, Statement};
85use mz_sql::catalog::EnvironmentId;
86use mz_sql_parser::ast::display::AstDisplay;
87use mz_sql_parser::ast::{
88    CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
89    IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
90    SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
91    UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
92};
93use mz_sql_parser::parser;
94use mz_storage_types::connections::ConnectionContext;
95use postgres_protocol::types;
96use regex::Regex;
97use tempfile::TempDir;
98use tokio::net::TcpListener;
99use tokio::runtime::Runtime;
100use tokio::sync::oneshot;
101use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
102use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
103use tokio_stream::wrappers::TcpListenerStream;
104use tower_http::cors::AllowOrigin;
105use tracing::{error, info};
106use uuid::Uuid;
107use uuid::fmt::Simple;
108
109use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
110use crate::util;
111
112#[derive(Debug)]
113pub enum Outcome<'a> {
114    Unsupported {
115        error: anyhow::Error,
116        location: Location,
117    },
118    ParseFailure {
119        error: anyhow::Error,
120        location: Location,
121    },
122    PlanFailure {
123        error: anyhow::Error,
124        location: Location,
125    },
126    UnexpectedPlanSuccess {
127        expected_error: &'a str,
128        location: Location,
129    },
130    WrongNumberOfRowsInserted {
131        expected_count: u64,
132        actual_count: u64,
133        location: Location,
134    },
135    WrongColumnCount {
136        expected_count: usize,
137        actual_count: usize,
138        location: Location,
139    },
140    WrongColumnNames {
141        expected_column_names: &'a Vec<ColumnName>,
142        actual_column_names: Vec<ColumnName>,
143        actual_output: Output,
144        location: Location,
145    },
146    OutputFailure {
147        expected_output: &'a Output,
148        actual_raw_output: Vec<Row>,
149        actual_output: Output,
150        location: Location,
151    },
152    InconsistentViewOutcome {
153        query_outcome: Box<Outcome<'a>>,
154        view_outcome: Box<Outcome<'a>>,
155        location: Location,
156    },
157    Bail {
158        cause: Box<Outcome<'a>>,
159        location: Location,
160    },
161    Warning {
162        cause: Box<Outcome<'a>>,
163        location: Location,
164    },
165    Success,
166}
167
168const NUM_OUTCOMES: usize = 12;
169const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
170const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
171
172impl<'a> Outcome<'a> {
173    fn code(&self) -> usize {
174        match self {
175            Outcome::Unsupported { .. } => 0,
176            Outcome::ParseFailure { .. } => 1,
177            Outcome::PlanFailure { .. } => 2,
178            Outcome::UnexpectedPlanSuccess { .. } => 3,
179            Outcome::WrongNumberOfRowsInserted { .. } => 4,
180            Outcome::WrongColumnCount { .. } => 5,
181            Outcome::WrongColumnNames { .. } => 6,
182            Outcome::OutputFailure { .. } => 7,
183            Outcome::InconsistentViewOutcome { .. } => 8,
184            Outcome::Bail { .. } => 9,
185            Outcome::Warning { .. } => 10,
186            Outcome::Success => 11,
187        }
188    }
189
190    fn success(&self) -> bool {
191        matches!(self, Outcome::Success)
192    }
193
194    fn failure(&self) -> bool {
195        !matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
196    }
197
198    /// Returns an error message that will match self. Appropriate for
199    /// rewriting error messages (i.e. not inserting error messages where we
200    /// currently expect success).
201    fn err_msg(&self) -> Option<String> {
202        match self {
203            Outcome::Unsupported { error, .. }
204            | Outcome::ParseFailure { error, .. }
205            | Outcome::PlanFailure { error, .. } => {
206                // Take only the first line, which should be sufficient for
207                // meaningfully matching the error.
208                let err_str = error.to_string_with_causes();
209                let err_str = err_str.split('\n').next().unwrap();
210                // Strip the "db error: ERROR: " prefix added by the postgres
211                // client library, as it's noisy and not useful for matching.
212                let err_str = err_str.strip_prefix("db error: ERROR: ").unwrap_or(err_str);
213                // This value gets fed back into regex to check that it matches
214                // `self`, so escape its meta characters. We need to undo the
215                // escaping of #. `regex::escape` escapes this because it
216                // expects that we use the `x` flag when building a regex, but
217                // this is not the case, so \# would end up being an invalid
218                // escape sequence, which would choke the parsing of the slt
219                // file the next time around.
220                Some(regex::escape(err_str).replace(r"\#", "#"))
221            }
222            _ => None,
223        }
224    }
225}
226
227impl fmt::Display for Outcome<'_> {
228    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
229        use Outcome::*;
230        const INDENT: &str = "\n        ";
231        match self {
232            Unsupported { error, location } => write!(
233                f,
234                "Unsupported:{}:\n{}",
235                location,
236                error.display_with_causes()
237            ),
238            ParseFailure { error, location } => {
239                write!(
240                    f,
241                    "ParseFailure:{}:\n{}",
242                    location,
243                    error.display_with_causes()
244                )
245            }
246            PlanFailure { error, location } => write!(f, "PlanFailure:{}:\n{:#}", location, error),
247            UnexpectedPlanSuccess {
248                expected_error,
249                location,
250            } => write!(
251                f,
252                "UnexpectedPlanSuccess:{} expected error: {}",
253                location, expected_error
254            ),
255            WrongNumberOfRowsInserted {
256                expected_count,
257                actual_count,
258                location,
259            } => write!(
260                f,
261                "WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
262                location, INDENT, expected_count, INDENT, actual_count
263            ),
264            WrongColumnCount {
265                expected_count,
266                actual_count,
267                location,
268            } => write!(
269                f,
270                "WrongColumnCount:{}{}expected: {}{}actually: {}",
271                location, INDENT, expected_count, INDENT, actual_count
272            ),
273            WrongColumnNames {
274                expected_column_names,
275                actual_column_names,
276                actual_output: _,
277                location,
278            } => write!(
279                f,
280                "Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
281                location,
282                INDENT,
283                expected_column_names
284                    .iter()
285                    .map(|n| n.to_string())
286                    .collect::<Vec<_>>()
287                    .join(" "),
288                INDENT,
289                actual_column_names
290                    .iter()
291                    .map(|n| n.to_string())
292                    .collect::<Vec<_>>()
293                    .join(" ")
294            ),
295            OutputFailure {
296                expected_output,
297                actual_raw_output,
298                actual_output,
299                location,
300            } => write!(
301                f,
302                "OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
303                location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
304            ),
305            InconsistentViewOutcome {
306                query_outcome,
307                view_outcome,
308                location,
309            } => write!(
310                f,
311                "InconsistentViewOutcome:{}{}expected from query: {}{}actually from indexed view: {}",
312                location, INDENT, query_outcome, INDENT, view_outcome
313            ),
314            Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
315            Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
316            Success => f.write_str("Success"),
317        }
318    }
319}
320
321#[derive(Default, Debug)]
322pub struct Outcomes {
323    stats: [usize; NUM_OUTCOMES],
324    details: Vec<String>,
325}
326
327impl ops::AddAssign<Outcomes> for Outcomes {
328    fn add_assign(&mut self, rhs: Outcomes) {
329        for (lhs, rhs) in self.stats.iter_mut().zip_eq(rhs.stats.iter()) {
330            *lhs += rhs
331        }
332    }
333}
334impl Outcomes {
335    pub fn any_failed(&self) -> bool {
336        self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
337    }
338
339    pub fn as_json(&self) -> serde_json::Value {
340        serde_json::json!({
341            "unsupported": self.stats[0],
342            "parse_failure": self.stats[1],
343            "plan_failure": self.stats[2],
344            "unexpected_plan_success": self.stats[3],
345            "wrong_number_of_rows_affected": self.stats[4],
346            "wrong_column_count": self.stats[5],
347            "wrong_column_names": self.stats[6],
348            "output_failure": self.stats[7],
349            "inconsistent_view_outcome": self.stats[8],
350            "bail": self.stats[9],
351            "warning": self.stats[10],
352            "success": self.stats[11],
353        })
354    }
355
356    pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
357        OutcomesDisplay {
358            inner: self,
359            no_fail,
360            failure_details,
361        }
362    }
363}
364
365pub struct OutcomesDisplay<'a> {
366    inner: &'a Outcomes,
367    no_fail: bool,
368    failure_details: bool,
369}
370
371impl<'a> fmt::Display for OutcomesDisplay<'a> {
372    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
373        let total: usize = self.inner.stats.iter().sum();
374        if self.failure_details
375            && (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
376                || self.no_fail)
377        {
378            for outcome in &self.inner.details {
379                writeln!(f, "{}", outcome)?;
380            }
381            Ok(())
382        } else {
383            write!(
384                f,
385                "{}:",
386                if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
387                    "PASS"
388                } else if self.no_fail {
389                    "FAIL-IGNORE"
390                } else {
391                    "FAIL"
392                }
393            )?;
394            static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
395                vec![
396                    "unsupported",
397                    "parse-failure",
398                    "plan-failure",
399                    "unexpected-plan-success",
400                    "wrong-number-of-rows-inserted",
401                    "wrong-column-count",
402                    "wrong-column-names",
403                    "output-failure",
404                    "inconsistent-view-outcome",
405                    "bail",
406                    "warning",
407                    "success",
408                    "total",
409                ]
410            });
411            for (i, n) in self.inner.stats.iter().enumerate() {
412                if *n > 0 {
413                    write!(f, " {}={}", NAMES[i], n)?;
414                }
415            }
416            write!(f, " total={}", total)
417        }
418    }
419}
420
421struct QueryInfo {
422    is_select: bool,
423    num_attributes: Option<usize>,
424}
425
426enum PrepareQueryOutcome<'a> {
427    QueryPrepared(QueryInfo),
428    Outcome(Outcome<'a>),
429}
430
431pub struct Runner<'a> {
432    config: &'a RunConfig<'a>,
433    inner: Option<RunnerInner<'a>>,
434}
435
436pub struct RunnerInner<'a> {
437    server_addr: SocketAddr,
438    internal_server_addr: SocketAddr,
439    password_server_addr: SocketAddr,
440    internal_http_server_addr: SocketAddr,
441    // Drop order matters for these fields.
442    client: tokio_postgres::Client,
443    system_client: tokio_postgres::Client,
444    clients: BTreeMap<String, tokio_postgres::Client>,
445    auto_index_tables: bool,
446    auto_index_selects: bool,
447    auto_transactions: bool,
448    enable_table_keys: bool,
449    verbose: bool,
450    stdout: &'a dyn WriteFmt,
451    _shutdown_trigger: trigger::Trigger,
452    _server_thread: JoinOnDropHandle<()>,
453    _temp_dir: TempDir,
454}
455
456#[derive(Debug)]
457pub struct Slt(Value);
458
459impl<'a> FromSql<'a> for Slt {
460    fn from_sql(
461        ty: &PgType,
462        mut raw: &'a [u8],
463    ) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
464        Ok(match *ty {
465            PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
466                types::bytea_from_sql(raw),
467            )?)),
468            PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
469            PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
470            PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
471                types::char_from_sql(raw)?.to_be_bytes(),
472            ))),
473            PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
474            PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
475            PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
476                raw,
477            )?)?)),
478            PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
479            PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
480            PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
481            PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
482            PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
483            PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
484            PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
485            PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
486            PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
487            PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
488            PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
489            PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
490                Self(Value::Text(types::text_from_sql(raw)?.to_string()))
491            }
492            PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
493            PgType::TIMESTAMP => Self(Value::Timestamp(
494                NaiveDateTime::from_sql(ty, raw)?.try_into()?,
495            )),
496            PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
497                DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
498            )),
499            PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
500            PgType::RECORD => {
501                let num_fields = read_be_i32(&mut raw)?;
502                let mut tuple = vec![];
503                for _ in 0..num_fields {
504                    let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
505                    let typ = match PgType::from_oid(oid) {
506                        Some(typ) => typ,
507                        None => return Err("unknown oid".into()),
508                    };
509                    let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
510                    tuple.push(v.map(|v| v.0));
511                }
512                Self(Value::Record(tuple))
513            }
514            PgType::INT4_RANGE
515            | PgType::INT8_RANGE
516            | PgType::DATE_RANGE
517            | PgType::NUM_RANGE
518            | PgType::TS_RANGE
519            | PgType::TSTZ_RANGE => {
520                use mz_repr::adt::range::Range;
521                let range: Range<Slt> = Range::from_sql(ty, raw)?;
522                Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
523            }
524
525            _ => match ty.kind() {
526                PgKind::Array(arr_type) => {
527                    let arr = types::array_from_sql(raw)?;
528                    let elements: Vec<Option<Value>> = arr
529                        .values()
530                        .map(|v| match v {
531                            Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
532                            None => Ok(None),
533                        })
534                        .collect::<Vec<Option<Slt>>>()?
535                        .into_iter()
536                        // Map a Vec<Option<Slt>> to Vec<Option<Value>>.
537                        .map(|v| v.map(|v| v.0))
538                        .collect();
539
540                    Self(Value::Array {
541                        dims: arr
542                            .dimensions()
543                            .map(|d| {
544                                Ok(mz_repr::adt::array::ArrayDimension {
545                                    lower_bound: isize::cast_from(d.lower_bound),
546                                    length: usize::try_from(d.len)
547                                        .expect("cannot have negative length"),
548                                })
549                            })
550                            .collect()?,
551                        elements,
552                    })
553                }
554                _ => match ty.oid() {
555                    oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
556                    oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
557                    oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
558                    oid::TYPE_MZ_TIMESTAMP_OID => {
559                        let s = types::text_from_sql(raw)?;
560                        let t: mz_repr::Timestamp = s.parse()?;
561                        Self(Value::MzTimestamp(t))
562                    }
563                    oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
564                        types::bytea_from_sql(raw),
565                    )?)),
566                    _ => unreachable!(),
567                },
568            },
569        })
570    }
571    fn accepts(ty: &PgType) -> bool {
572        match ty.kind() {
573            PgKind::Array(_) | PgKind::Composite(_) => return true,
574            _ => {}
575        }
576        match ty.oid() {
577            oid::TYPE_UINT2_OID
578            | oid::TYPE_UINT4_OID
579            | oid::TYPE_UINT8_OID
580            | oid::TYPE_MZ_TIMESTAMP_OID
581            | oid::TYPE_MZ_ACL_ITEM_OID => return true,
582            _ => {}
583        }
584        matches!(
585            *ty,
586            PgType::ACLITEM
587                | PgType::BOOL
588                | PgType::BYTEA
589                | PgType::CHAR
590                | PgType::DATE
591                | PgType::FLOAT4
592                | PgType::FLOAT8
593                | PgType::INT2
594                | PgType::INT4
595                | PgType::INT8
596                | PgType::INTERVAL
597                | PgType::JSONB
598                | PgType::NAME
599                | PgType::NUMERIC
600                | PgType::OID
601                | PgType::REGCLASS
602                | PgType::REGPROC
603                | PgType::REGTYPE
604                | PgType::RECORD
605                | PgType::TEXT
606                | PgType::BPCHAR
607                | PgType::VARCHAR
608                | PgType::TIME
609                | PgType::TIMESTAMP
610                | PgType::TIMESTAMPTZ
611                | PgType::UUID
612                | PgType::INT4_RANGE
613                | PgType::INT4_RANGE_ARRAY
614                | PgType::INT8_RANGE
615                | PgType::INT8_RANGE_ARRAY
616                | PgType::DATE_RANGE
617                | PgType::DATE_RANGE_ARRAY
618                | PgType::NUM_RANGE
619                | PgType::NUM_RANGE_ARRAY
620                | PgType::TS_RANGE
621                | PgType::TS_RANGE_ARRAY
622                | PgType::TSTZ_RANGE
623                | PgType::TSTZ_RANGE_ARRAY
624        )
625    }
626}
627
628// From postgres-types/src/private.rs.
629fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
630    if buf.len() < 4 {
631        return Err("invalid buffer size".into());
632    }
633    let mut bytes = [0; 4];
634    bytes.copy_from_slice(&buf[..4]);
635    *buf = &buf[4..];
636    Ok(i32::from_be_bytes(bytes))
637}
638
639// From postgres-types/src/private.rs.
640fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
641where
642    T: FromSql<'a>,
643{
644    let value = match usize::try_from(read_be_i32(buf)?) {
645        Err(_) => None,
646        Ok(len) => {
647            if len > buf.len() {
648                return Err("invalid buffer size".into());
649            }
650            let (head, tail) = buf.split_at(len);
651            *buf = tail;
652            Some(head)
653        }
654    };
655    T::from_sql_nullable(type_, value)
656}
657
658fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
659    match (typ, d.0) {
660        (Type::Bool, Value::Bool(b)) => b.to_string(),
661
662        (Type::Integer, Value::Int2(i)) => i.to_string(),
663        (Type::Integer, Value::Int4(i)) => i.to_string(),
664        (Type::Integer, Value::Int8(i)) => i.to_string(),
665        (Type::Integer, Value::UInt2(u)) => u.0.to_string(),
666        (Type::Integer, Value::UInt4(u)) => u.0.to_string(),
667        (Type::Integer, Value::UInt8(u)) => u.0.to_string(),
668        (Type::Integer, Value::Oid(i)) => i.to_string(),
669        // TODO(benesch): rewrite to avoid `as`.
670        #[allow(clippy::as_conversions)]
671        (Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
672        // TODO(benesch): rewrite to avoid `as`.
673        #[allow(clippy::as_conversions)]
674        (Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
675        // This is so wrong, but sqlite needs it.
676        (Type::Integer, Value::Text(_)) => "0".to_string(),
677        (Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
678        (Type::Integer, Value::Numeric(d)) => {
679            let mut d = d.0.0.clone();
680            let mut cx = numeric::cx_datum();
681            // Truncate the decimal to match sqlite.
682            if mode == Mode::Standard {
683                cx.set_rounding(dec::Rounding::Down);
684            }
685            cx.round(&mut d);
686            numeric::munge_numeric(&mut d).unwrap();
687            d.to_standard_notation_string()
688        }
689
690        (Type::Real, Value::Int2(i)) => format!("{:.3}", i),
691        (Type::Real, Value::Int4(i)) => format!("{:.3}", i),
692        (Type::Real, Value::Int8(i)) => format!("{:.3}", i),
693        (Type::Real, Value::Float4(f)) => match mode {
694            Mode::Standard => format!("{:.3}", f),
695            Mode::Cockroach => format!("{}", f),
696        },
697        (Type::Real, Value::Float8(f)) => match mode {
698            Mode::Standard => format!("{:.3}", f),
699            Mode::Cockroach => format!("{}", f),
700        },
701        (Type::Real, Value::Numeric(d)) => match mode {
702            Mode::Standard => {
703                let mut d = d.0.0.clone();
704                if d.exponent() < -3 {
705                    numeric::rescale(&mut d, 3).unwrap();
706                }
707                numeric::munge_numeric(&mut d).unwrap();
708                d.to_standard_notation_string()
709            }
710            Mode::Cockroach => d.0.0.to_standard_notation_string(),
711        },
712
713        (Type::Text, Value::Text(s)) => {
714            if s.is_empty() {
715                "(empty)".to_string()
716            } else {
717                s
718            }
719        }
720        (Type::Text, Value::Bool(b)) => b.to_string(),
721        (Type::Text, Value::Float4(f)) => format!("{:.3}", f),
722        (Type::Text, Value::Float8(f)) => format!("{:.3}", f),
723        // Bytes are printed as text iff they are valid UTF-8. This
724        // seems guaranteed to confuse everyone, but it is required for
725        // compliance with the CockroachDB sqllogictest runner. [0]
726        //
727        // [0]: https://github.com/cockroachdb/cockroach/blob/970782487/pkg/sql/logictest/logic.go#L2038-L2043
728        (Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
729            Ok(s) => s.to_string(),
730            Err(_) => format!("{:?}", b),
731        },
732        (Type::Text, Value::Numeric(d)) => d.0.0.to_standard_notation_string(),
733        // Everything else gets normal text encoding. This correctly handles things
734        // like arrays, tuples, and strings that need to be quoted.
735        (Type::Text, d) => {
736            let mut buf = BytesMut::new();
737            d.encode_text(&mut buf);
738            String::from_utf8_lossy(&buf).into_owned()
739        }
740
741        (Type::Oid, Value::Oid(o)) => o.to_string(),
742
743        (_, d) => panic!(
744            "Don't know how to format {:?} as {:?} in column {}",
745            d, typ, col,
746        ),
747    }
748}
749
750fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
751    let mut formatted: Vec<String> = vec![];
752    for i in 0..row.len() {
753        let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
754        let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
755        formatted.push(match t {
756            Some(t) => t,
757            None => "NULL".into(),
758        });
759    }
760
761    formatted
762}
763
764impl<'a> Runner<'a> {
765    pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
766        let mut runner = Self {
767            config,
768            inner: None,
769        };
770        runner.reset().await?;
771        Ok(runner)
772    }
773
774    pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
775        // Explicitly drop the old runner here to ensure that we wait for threads to terminate
776        // before starting a new runner
777        drop(self.inner.take());
778        self.inner = Some(RunnerInner::start(self.config).await?);
779
780        Ok(())
781    }
782
783    async fn run_record<'r>(
784        &mut self,
785        record: &'r Record<'r>,
786        in_transaction: &mut bool,
787    ) -> Result<Outcome<'r>, anyhow::Error> {
788        if let Record::ResetServer = record {
789            self.reset().await?;
790            Ok(Outcome::Success)
791        } else {
792            self.inner
793                .as_mut()
794                .expect("RunnerInner missing")
795                .run_record(record, in_transaction)
796                .await
797        }
798    }
799
800    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
801        self.inner
802            .as_ref()
803            .expect("RunnerInner missing")
804            .check_catalog()
805            .await
806    }
807
808    async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
809        let inner = self.inner.as_mut().expect("RunnerInner missing");
810
811        inner.client.batch_execute("ROLLBACK;").await?;
812
813        inner
814            .system_client
815            .batch_execute(
816                "ROLLBACK;
817                 SET cluster = mz_catalog_server;
818                 RESET cluster_replica;",
819            )
820            .await?;
821
822        inner
823            .system_client
824            .batch_execute("ALTER SYSTEM RESET ALL")
825            .await?;
826
827        // Drop all databases, then recreate the `materialize` database.
828        for row in inner
829            .system_client
830            .query("SELECT name FROM mz_databases", &[])
831            .await?
832        {
833            let name: &str = row.get("name");
834            inner
835                .system_client
836                .batch_execute(&format!("DROP DATABASE \"{name}\""))
837                .await?;
838        }
839        inner
840            .system_client
841            .batch_execute("CREATE DATABASE materialize")
842            .await?;
843
844        // Ensure quickstart cluster exists with one replica of size `self.config.replica_size`.
845        // We don't destroy the existing quickstart cluster replica if it exists, as turning
846        // on a cluster replica is exceptionally slow.
847        let mut needs_default_cluster = true;
848        for row in inner
849            .system_client
850            .query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
851            .await?
852        {
853            match row.get("name") {
854                "quickstart" => needs_default_cluster = false,
855                name => {
856                    inner
857                        .system_client
858                        .batch_execute(&format!("DROP CLUSTER {name}"))
859                        .await?
860                }
861            }
862        }
863        if needs_default_cluster {
864            inner
865                .system_client
866                .batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
867                .await?;
868        }
869        let mut needs_default_replica = false;
870        let rows = inner
871            .system_client
872            .query(
873                "SELECT name, size FROM mz_cluster_replicas
874                 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')
875                 ORDER BY name",
876                &[],
877            )
878            .await?;
879        if rows.len() != self.config.replicas {
880            needs_default_replica = true;
881        } else {
882            for (i, row) in rows.iter().enumerate() {
883                let name: &str = row.get("name");
884                let size: &str = row.get("size");
885                if name != format!("r{i}") || size != self.config.replica_size {
886                    needs_default_replica = true;
887                    break;
888                }
889            }
890        }
891
892        if needs_default_replica {
893            inner
894                .system_client
895                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = false)")
896                .await?;
897            for row in inner
898                .system_client
899                .query(
900                    "SELECT name FROM mz_cluster_replicas
901                     WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
902                    &[],
903                )
904                .await?
905            {
906                let name: &str = row.get("name");
907                inner
908                    .system_client
909                    .batch_execute(&format!("DROP CLUSTER REPLICA quickstart.{}", name))
910                    .await?;
911            }
912            for i in 1..=self.config.replicas {
913                inner
914                    .system_client
915                    .batch_execute(&format!(
916                        "CREATE CLUSTER REPLICA quickstart.r{i} SIZE '{}'",
917                        self.config.replica_size
918                    ))
919                    .await?;
920            }
921            inner
922                .system_client
923                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = true)")
924                .await?;
925        }
926
927        // Grant initial privileges.
928        inner
929            .system_client
930            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
931            .await?;
932        inner
933            .system_client
934            .batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
935            .await?;
936        inner
937            .system_client
938            .batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
939            .await?;
940        inner
941            .system_client
942            .batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
943            .await?;
944        inner
945            .system_client
946            .batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
947            .await?;
948
949        // Some sqllogictests require more than the default amount of tables, so we increase the
950        // limit for all tests.
951        inner
952            .system_client
953            .simple_query("ALTER SYSTEM SET max_tables = 100")
954            .await?;
955
956        if inner.enable_table_keys {
957            inner
958                .system_client
959                .simple_query("ALTER SYSTEM SET unsafe_enable_table_keys = true")
960                .await?;
961        }
962
963        inner.ensure_fixed_features().await?;
964
965        inner.client = connect(inner.server_addr, None, None).await.unwrap();
966        inner.system_client = connect(inner.internal_server_addr, Some("mz_system"), None)
967            .await
968            .unwrap();
969        inner.clients = BTreeMap::new();
970
971        Ok(())
972    }
973}
974
975impl<'a> RunnerInner<'a> {
976    pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
977        let temp_dir = tempfile::tempdir()?;
978        let scratch_dir = tempfile::tempdir()?;
979        let environment_id = EnvironmentId::for_tests();
980        let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
981            let postgres_url = &config.postgres_url;
982            let prefix = &config.prefix;
983            info!(%postgres_url, "starting server");
984            let (client, conn) = Retry::default()
985                .max_tries(5)
986                .retry_async(|_| async {
987                    match tokio_postgres::connect(postgres_url, NoTls).await {
988                        Ok(c) => Ok(c),
989                        Err(e) => {
990                            error!(%e, "failed to connect to postgres");
991                            Err(e)
992                        }
993                    }
994                })
995                .await?;
996            task::spawn(|| "sqllogictest_connect", async move {
997                if let Err(e) = conn.await {
998                    panic!("connection error: {}", e);
999                }
1000            });
1001            client
1002                .batch_execute(&format!(
1003                    "DROP SCHEMA IF EXISTS {prefix}_tsoracle CASCADE;
1004                     CREATE SCHEMA IF NOT EXISTS {prefix}_consensus;
1005                     CREATE SCHEMA {prefix}_tsoracle;"
1006                ))
1007                .await?;
1008            (
1009                format!("{postgres_url}?options=--search_path={prefix}_consensus")
1010                    .parse()
1011                    .expect("invalid consensus URI"),
1012                format!("{postgres_url}?options=--search_path={prefix}_tsoracle")
1013                    .parse()
1014                    .expect("invalid timestamp oracle URI"),
1015            )
1016        };
1017
1018        let secrets_dir = temp_dir.path().join("secrets");
1019        let orchestrator = Arc::new(
1020            ProcessOrchestrator::new(ProcessOrchestratorConfig {
1021                image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
1022                suppress_output: false,
1023                environment_id: environment_id.to_string(),
1024                secrets_dir: secrets_dir.clone(),
1025                command_wrapper: config
1026                    .orchestrator_process_wrapper
1027                    .as_ref()
1028                    .map_or(Ok(vec![]), |s| shell_words::split(s))?,
1029                propagate_crashes: true,
1030                tcp_proxy: None,
1031                scratch_directory: scratch_dir.path().to_path_buf(),
1032            })
1033            .await?,
1034        );
1035        let now = SYSTEM_TIME.clone();
1036        let metrics_registry = MetricsRegistry::new();
1037
1038        let persist_config = PersistConfig::new(
1039            &mz_environmentd::BUILD_INFO,
1040            now.clone(),
1041            mz_dyncfgs::all_dyncfgs(),
1042        );
1043        let persist_pubsub_server =
1044            PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
1045        let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
1046        let persist_pubsub_tcp_listener =
1047            TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
1048                .await
1049                .expect("pubsub addr binding");
1050        let persist_pubsub_server_port = persist_pubsub_tcp_listener
1051            .local_addr()
1052            .expect("pubsub addr has local addr")
1053            .port();
1054        info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
1055        mz_ore::task::spawn(|| "persist_pubsub_server", async move {
1056            persist_pubsub_server
1057                .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
1058                .await
1059                .expect("success")
1060        });
1061        let persist_clients =
1062            PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1063                let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1064                    cfg,
1065                    persist_pubsub_client.sender,
1066                    metrics,
1067                ));
1068                PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1069            });
1070        let persist_clients = Arc::new(persist_clients);
1071
1072        let secrets_controller = Arc::clone(&orchestrator);
1073        let connection_context = ConnectionContext::for_tests(orchestrator.reader());
1074        let orchestrator = Arc::new(TracingOrchestrator::new(
1075            orchestrator,
1076            config.tracing.clone(),
1077        ));
1078        let listeners_config = ListenersConfig {
1079            sql: btreemap! {
1080                "external".to_owned() => SqlListenerConfig {
1081                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1082                    authenticator_kind: AuthenticatorKind::None,
1083                    allowed_roles: AllowedRoles::Normal,
1084                    enable_tls: false,
1085                },
1086                "internal".to_owned() => SqlListenerConfig {
1087                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1088                    authenticator_kind: AuthenticatorKind::None,
1089                    allowed_roles: AllowedRoles::Internal,
1090                    enable_tls: false,
1091                },
1092                "password".to_owned() => SqlListenerConfig {
1093                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1094                    authenticator_kind: AuthenticatorKind::Password,
1095                    allowed_roles: AllowedRoles::Normal,
1096                    enable_tls: false,
1097                },
1098            },
1099            http: btreemap![
1100                "external".to_owned() => HttpListenerConfig {
1101                    base: BaseListenerConfig {
1102                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1103                        authenticator_kind: AuthenticatorKind::None,
1104                        allowed_roles: AllowedRoles::Normal,
1105                        enable_tls: false
1106                    },
1107                    routes: HttpRoutesEnabled {
1108                        base: true,
1109                        webhook: true,
1110                        internal: false,
1111                        metrics: false,
1112                        profiling: false,
1113                    },
1114                },
1115                "internal".to_owned() => HttpListenerConfig {
1116                    base: BaseListenerConfig {
1117                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1118                        authenticator_kind: AuthenticatorKind::None,
1119                        allowed_roles: AllowedRoles::NormalAndInternal,
1120                        enable_tls: false
1121                    },
1122                    routes: HttpRoutesEnabled {
1123                        base: true,
1124                        webhook: true,
1125                        internal: true,
1126                        metrics: true,
1127                        profiling: true,
1128                    },
1129                },
1130            ],
1131        };
1132        let listeners = mz_environmentd::Listeners::bind(listeners_config).await?;
1133        let host_name = format!(
1134            "localhost:{}",
1135            listeners.http["external"].handle.local_addr.port()
1136        );
1137        let catalog_config = CatalogConfig {
1138            persist_clients: Arc::clone(&persist_clients),
1139            metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
1140        };
1141        let server_config = mz_environmentd::Config {
1142            catalog_config,
1143            timestamp_oracle_url: Some(timestamp_oracle_url),
1144            controller: ControllerConfig {
1145                build_info: &mz_environmentd::BUILD_INFO,
1146                orchestrator,
1147                clusterd_image: "clusterd".into(),
1148                init_container_image: None,
1149                deploy_generation: 0,
1150                persist_location: PersistLocation {
1151                    blob_uri: format!(
1152                        "file://{}/persist/blob",
1153                        config.persist_dir.path().display()
1154                    )
1155                    .parse()
1156                    .expect("invalid blob URI"),
1157                    consensus_uri,
1158                },
1159                persist_clients,
1160                now: SYSTEM_TIME.clone(),
1161                metrics_registry: metrics_registry.clone(),
1162                persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
1163                secrets_args: mz_service::secrets::SecretsReaderCliArgs {
1164                    secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
1165                    secrets_reader_local_file_dir: Some(secrets_dir),
1166                    secrets_reader_kubernetes_context: None,
1167                    secrets_reader_aws_prefix: None,
1168                    secrets_reader_name_prefix: None,
1169                },
1170                connection_context,
1171                replica_http_locator: Arc::new(ReplicaHttpLocator::default()),
1172            },
1173            secrets_controller,
1174            cloud_resource_controller: None,
1175            tls: None,
1176            frontegg: None,
1177            cors_allowed_origin: AllowOrigin::list([]),
1178            unsafe_mode: true,
1179            all_features: false,
1180            metrics_registry,
1181            now,
1182            environment_id,
1183            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
1184            bootstrap_default_cluster_replica_size: config.replica_size.clone(),
1185            bootstrap_default_cluster_replication_factor: config
1186                .replicas
1187                .try_into()
1188                .expect("replicas must fit"),
1189            bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1190                replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1191                size: config.replica_size.clone(),
1192            },
1193            bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1194                replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1195                size: config.replica_size.clone(),
1196            },
1197            bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1198                replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1199                size: config.replica_size.clone(),
1200            },
1201            bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1202                replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1203                size: config.replica_size.clone(),
1204            },
1205            bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1206                replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1207                size: config.replica_size.clone(),
1208            },
1209            system_parameter_defaults: {
1210                let mut params = BTreeMap::new();
1211                params.insert(
1212                    "log_filter".to_string(),
1213                    config.tracing.startup_log_filter.to_string(),
1214                );
1215                params.extend(config.system_parameter_defaults.clone());
1216                params
1217            },
1218            availability_zones: Default::default(),
1219            tracing_handle: config.tracing_handle.clone(),
1220            storage_usage_collection_interval: Duration::from_secs(3600),
1221            storage_usage_retention_period: None,
1222            segment_api_key: None,
1223            segment_client_side: false,
1224            // SLT doesn't like eternally running tasks since it waits for them to finish inbetween SLT files
1225            test_only_dummy_segment_client: false,
1226            egress_addresses: vec![],
1227            aws_account_id: None,
1228            aws_privatelink_availability_zones: None,
1229            launchdarkly_sdk_key: None,
1230            launchdarkly_key_map: Default::default(),
1231            config_sync_file_path: None,
1232            config_sync_timeout: Duration::from_secs(30),
1233            config_sync_loop_interval: None,
1234            bootstrap_role: Some("materialize".into()),
1235            http_host_name: Some(host_name),
1236            internal_console_redirect_url: None,
1237            tls_reload_certs: mz_server_core::cert_reload_never_reload(),
1238            helm_chart_version: None,
1239            license_key: ValidatedLicenseKey::for_tests(),
1240            external_login_password_mz_system: None,
1241            force_builtin_schema_migration: None,
1242        };
1243        // We need to run the server on its own Tokio runtime, which in turn
1244        // requires its own thread, so that we can wait for any tasks spawned
1245        // by the server to be shutdown at the end of each file. If we were to
1246        // share a Tokio runtime, tasks from the last file's server would still
1247        // be live at the start of the next file's server.
1248        let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
1249            oneshot::channel();
1250        let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
1251        let (password_server_addr_tx, password_server_addr_rx) = oneshot::channel();
1252        let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
1253        let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
1254        let server_thread = thread::spawn(|| {
1255            let runtime = match Runtime::new() {
1256                Ok(runtime) => runtime,
1257                Err(e) => {
1258                    server_addr_tx
1259                        .send(Err(e.into()))
1260                        .expect("receiver should not drop first");
1261                    return;
1262                }
1263            };
1264            let server = match runtime.block_on(listeners.serve(server_config)) {
1265                Ok(runtime) => runtime,
1266                Err(e) => {
1267                    server_addr_tx
1268                        .send(Err(e.into()))
1269                        .expect("receiver should not drop first");
1270                    return;
1271                }
1272            };
1273            server_addr_tx
1274                .send(Ok(server.sql_listener_handles["external"].local_addr))
1275                .expect("receiver should not drop first");
1276            internal_server_addr_tx
1277                .send(server.sql_listener_handles["internal"].local_addr)
1278                .expect("receiver should not drop first");
1279            password_server_addr_tx
1280                .send(server.sql_listener_handles["password"].local_addr)
1281                .expect("receiver should not drop first");
1282            internal_http_server_addr_tx
1283                .send(server.http_listener_handles["internal"].local_addr)
1284                .expect("receiver should not drop first");
1285            let _ = runtime.block_on(shutdown_trigger_rx);
1286        });
1287        let server_addr = server_addr_rx.await??;
1288        let internal_server_addr = internal_server_addr_rx.await?;
1289        let password_server_addr = password_server_addr_rx.await?;
1290        let internal_http_server_addr = internal_http_server_addr_rx.await?;
1291
1292        let system_client = connect(internal_server_addr, Some("mz_system"), None)
1293            .await
1294            .unwrap();
1295        let client = connect(server_addr, None, None).await.unwrap();
1296
1297        let inner = RunnerInner {
1298            server_addr,
1299            internal_server_addr,
1300            password_server_addr,
1301            internal_http_server_addr,
1302            _shutdown_trigger: shutdown_trigger,
1303            _server_thread: server_thread.join_on_drop(),
1304            _temp_dir: temp_dir,
1305            client,
1306            system_client,
1307            clients: BTreeMap::new(),
1308            auto_index_tables: config.auto_index_tables,
1309            auto_index_selects: config.auto_index_selects,
1310            auto_transactions: config.auto_transactions,
1311            enable_table_keys: config.enable_table_keys,
1312            verbose: config.verbose,
1313            stdout: config.stdout,
1314        };
1315        inner.ensure_fixed_features().await?;
1316
1317        Ok(inner)
1318    }
1319
1320    /// Set features that should be enabled regardless of whether reset-server was
1321    /// called. These features may be set conditionally depending on the run configuration.
1322    async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
1323        // We turn on enable_reduce_mfp_fusion, as we wish
1324        // to get as much coverage of these features as we can.
1325        // TODO(vmarcos): Remove this code when we retire this feature flag.
1326        self.system_client
1327            .execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
1328            .await?;
1329
1330        // Dangerous functions are useful for tests so we enable it for all tests.
1331        self.system_client
1332            .execute("ALTER SYSTEM SET unsafe_enable_unsafe_functions = on", &[])
1333            .await?;
1334        Ok(())
1335    }
1336
1337    async fn run_record<'r>(
1338        &mut self,
1339        record: &'r Record<'r>,
1340        in_transaction: &mut bool,
1341    ) -> Result<Outcome<'r>, anyhow::Error> {
1342        match &record {
1343            Record::Statement {
1344                expected_error,
1345                rows_affected,
1346                sql,
1347                location,
1348            } => {
1349                if self.auto_transactions && *in_transaction {
1350                    self.client.execute("COMMIT", &[]).await?;
1351                    *in_transaction = false;
1352                }
1353                match self
1354                    .run_statement(*expected_error, *rows_affected, sql, location.clone())
1355                    .await?
1356                {
1357                    Outcome::Success => {
1358                        if self.auto_index_tables {
1359                            let additional = mutate(sql);
1360                            for stmt in additional {
1361                                self.client.execute(&stmt, &[]).await?;
1362                            }
1363                        }
1364                        Ok(Outcome::Success)
1365                    }
1366                    other => {
1367                        if expected_error.is_some() {
1368                            Ok(other)
1369                        } else {
1370                            // If we failed to execute a statement that was supposed to succeed,
1371                            // running the rest of the tests in this file will probably cause
1372                            // false positives, so just give up on the file entirely.
1373                            Ok(Outcome::Bail {
1374                                cause: Box::new(other),
1375                                location: location.clone(),
1376                            })
1377                        }
1378                    }
1379                }
1380            }
1381            Record::Query {
1382                sql,
1383                output,
1384                location,
1385            } => {
1386                self.run_query(sql, output, location.clone(), in_transaction)
1387                    .await
1388            }
1389            Record::Simple {
1390                conn,
1391                user,
1392                password,
1393                sql,
1394                sort,
1395                output,
1396                location,
1397                ..
1398            } => {
1399                self.run_simple(
1400                    *conn,
1401                    *user,
1402                    *password,
1403                    sql,
1404                    sort.clone(),
1405                    output,
1406                    location.clone(),
1407                )
1408                .await
1409            }
1410            Record::Copy {
1411                table_name,
1412                tsv_path,
1413            } => {
1414                let tsv = tokio::fs::read(tsv_path).await?;
1415                let copy = self
1416                    .client
1417                    .copy_in(&*format!("COPY {} FROM STDIN", table_name))
1418                    .await?;
1419                tokio::pin!(copy);
1420                copy.send(bytes::Bytes::from(tsv)).await?;
1421                copy.finish().await?;
1422                Ok(Outcome::Success)
1423            }
1424            _ => Ok(Outcome::Success),
1425        }
1426    }
1427
1428    async fn run_statement<'r>(
1429        &self,
1430        expected_error: Option<&'r str>,
1431        expected_rows_affected: Option<u64>,
1432        sql: &'r str,
1433        location: Location,
1434    ) -> Result<Outcome<'r>, anyhow::Error> {
1435        static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
1436            LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
1437        if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
1438            // sure, we totally made you an index
1439            return Ok(Outcome::Success);
1440        }
1441
1442        match self.client.execute(sql, &[]).await {
1443            Ok(actual) => {
1444                if let Some(expected_error) = expected_error {
1445                    return Ok(Outcome::UnexpectedPlanSuccess {
1446                        expected_error,
1447                        location,
1448                    });
1449                }
1450                match expected_rows_affected {
1451                    None => Ok(Outcome::Success),
1452                    Some(expected) => {
1453                        if expected != actual {
1454                            Ok(Outcome::WrongNumberOfRowsInserted {
1455                                expected_count: expected,
1456                                actual_count: actual,
1457                                location,
1458                            })
1459                        } else {
1460                            Ok(Outcome::Success)
1461                        }
1462                    }
1463                }
1464            }
1465            Err(error) => {
1466                if let Some(expected_error) = expected_error {
1467                    if Regex::new(expected_error)?.is_match(&error.to_string_with_causes()) {
1468                        return Ok(Outcome::Success);
1469                    }
1470                }
1471                Ok(Outcome::PlanFailure {
1472                    error: anyhow!(error),
1473                    location,
1474                })
1475            }
1476        }
1477    }
1478
1479    async fn prepare_query<'r>(
1480        &self,
1481        sql: &str,
1482        output: &'r Result<QueryOutput<'_>, &'r str>,
1483        location: Location,
1484        in_transaction: &mut bool,
1485    ) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
1486        // get statement
1487        let statements = match mz_sql::parse::parse(sql) {
1488            Ok(statements) => statements,
1489            Err(e) => match output {
1490                Ok(_) => {
1491                    return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1492                        error: e.into(),
1493                        location,
1494                    }));
1495                }
1496                Err(expected_error) => {
1497                    if Regex::new(expected_error)?.is_match(&e.to_string_with_causes()) {
1498                        return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
1499                    } else {
1500                        return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1501                            error: e.into(),
1502                            location,
1503                        }));
1504                    }
1505                }
1506            },
1507        };
1508        let statement = match &*statements {
1509            [] => bail!("Got zero statements?"),
1510            [statement] => &statement.ast,
1511            _ => bail!("Got multiple statements: {:?}", statements),
1512        };
1513        let (is_select, num_attributes) = match statement {
1514            Statement::Select(stmt) => (true, derive_num_attributes(&stmt.query.body)),
1515            _ => (false, None),
1516        };
1517
1518        match output {
1519            Ok(_) => {
1520                if self.auto_transactions && !*in_transaction {
1521                    // No ISOLATION LEVEL SERIALIZABLE because of database-issues#5323
1522                    self.client.execute("BEGIN", &[]).await?;
1523                    *in_transaction = true;
1524                }
1525            }
1526            Err(_) => {
1527                if self.auto_transactions && *in_transaction {
1528                    self.client.execute("COMMIT", &[]).await?;
1529                    *in_transaction = false;
1530                }
1531            }
1532        }
1533
1534        // `SHOW` commands reference catalog schema, thus are not in the same timedomain and not
1535        // allowed in the same transaction, see:
1536        // https://materialize.com/docs/sql/begin/#same-timedomain-error
1537        match statement {
1538            Statement::Show(..) => {
1539                if self.auto_transactions && *in_transaction {
1540                    self.client.execute("COMMIT", &[]).await?;
1541                    *in_transaction = false;
1542                }
1543            }
1544            _ => (),
1545        }
1546        Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
1547            is_select,
1548            num_attributes,
1549        }))
1550    }
1551
1552    async fn execute_query<'r>(
1553        &self,
1554        sql: &str,
1555        output: &'r Result<QueryOutput<'_>, &'r str>,
1556        location: Location,
1557    ) -> Result<Outcome<'r>, anyhow::Error> {
1558        let rows = match self.client.query(sql, &[]).await {
1559            Ok(rows) => rows,
1560            Err(error) => {
1561                let error_string = error.to_string_with_causes();
1562                return match output {
1563                    Ok(_) => {
1564                        if error_string.contains("supported") || error_string.contains("overload") {
1565                            // this is a failure, but it's caused by lack of support rather than by bugs
1566                            Ok(Outcome::Unsupported {
1567                                error: anyhow!(error),
1568                                location,
1569                            })
1570                        } else {
1571                            Ok(Outcome::PlanFailure {
1572                                error: anyhow!(error),
1573                                location,
1574                            })
1575                        }
1576                    }
1577                    Err(expected_error) => {
1578                        if Regex::new(expected_error)?.is_match(&error_string) {
1579                            Ok(Outcome::Success)
1580                        } else {
1581                            Ok(Outcome::PlanFailure {
1582                                error: anyhow!(
1583                                    "error does not match expected pattern:\n  expected: /{}/\n  actual:    {}",
1584                                    expected_error,
1585                                    error_string
1586                                ),
1587                                location,
1588                            })
1589                        }
1590                    }
1591                };
1592            }
1593        };
1594
1595        // unpack expected output
1596        let QueryOutput {
1597            sort,
1598            types: expected_types,
1599            column_names: expected_column_names,
1600            output: expected_output,
1601            mode,
1602            ..
1603        } = match output {
1604            Err(expected_error) => {
1605                return Ok(Outcome::UnexpectedPlanSuccess {
1606                    expected_error,
1607                    location,
1608                });
1609            }
1610            Ok(query_output) => query_output,
1611        };
1612
1613        // format output
1614        let mut formatted_rows = vec![];
1615        for row in &rows {
1616            if row.len() != expected_types.len() {
1617                return Ok(Outcome::WrongColumnCount {
1618                    expected_count: expected_types.len(),
1619                    actual_count: row.len(),
1620                    location,
1621                });
1622            }
1623            let row = format_row(row, expected_types, *mode);
1624            formatted_rows.push(row);
1625        }
1626
1627        // sort formatted output
1628        if let Sort::Row = sort {
1629            formatted_rows.sort();
1630        }
1631        let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
1632        if let Sort::Value = sort {
1633            values.sort();
1634        }
1635
1636        // Various checks as long as there are returned rows.
1637        if let Some(row) = rows.get(0) {
1638            // check column names
1639            if let Some(expected_column_names) = expected_column_names {
1640                let actual_column_names = row
1641                    .columns()
1642                    .iter()
1643                    .map(|t| ColumnName::from(t.name()))
1644                    .collect::<Vec<_>>();
1645                if expected_column_names != &actual_column_names {
1646                    return Ok(Outcome::WrongColumnNames {
1647                        expected_column_names,
1648                        actual_column_names,
1649                        actual_output: Output::Values(values),
1650                        location,
1651                    });
1652                }
1653            }
1654        }
1655
1656        // check output
1657        match expected_output {
1658            Output::Values(expected_values) => {
1659                if values != *expected_values {
1660                    return Ok(Outcome::OutputFailure {
1661                        expected_output,
1662                        actual_raw_output: rows,
1663                        actual_output: Output::Values(values),
1664                        location,
1665                    });
1666                }
1667            }
1668            Output::Hashed {
1669                num_values,
1670                md5: expected_md5,
1671            } => {
1672                let mut hasher = Md5::new();
1673                for value in &values {
1674                    hasher.update(value);
1675                    hasher.update("\n");
1676                }
1677                let md5 = format!("{:x}", hasher.finalize());
1678                if values.len() != *num_values || md5 != *expected_md5 {
1679                    return Ok(Outcome::OutputFailure {
1680                        expected_output,
1681                        actual_raw_output: rows,
1682                        actual_output: Output::Hashed {
1683                            num_values: values.len(),
1684                            md5,
1685                        },
1686                        location,
1687                    });
1688                }
1689            }
1690        }
1691
1692        Ok(Outcome::Success)
1693    }
1694
1695    async fn execute_view_inner<'r>(
1696        &self,
1697        sql: &str,
1698        output: &'r Result<QueryOutput<'_>, &'r str>,
1699        location: Location,
1700    ) -> Result<Option<Outcome<'r>>, anyhow::Error> {
1701        print_sql_if(self.stdout, sql, self.verbose);
1702        let sql_result = self.client.execute(sql, &[]).await;
1703
1704        // Evaluate if we already reached an outcome or not.
1705        let tentative_outcome = if let Err(view_error) = sql_result {
1706            if let Err(expected_error) = output {
1707                if Regex::new(expected_error)?.is_match(&view_error.to_string_with_causes()) {
1708                    Some(Outcome::Success)
1709                } else {
1710                    Some(Outcome::PlanFailure {
1711                        error: anyhow!(
1712                            "error does not match expected pattern:\n  expected: /{}/\n  actual:    {}",
1713                            expected_error,
1714                            view_error.to_string_with_causes()
1715                        ),
1716                        location: location.clone(),
1717                    })
1718                }
1719            } else {
1720                Some(Outcome::PlanFailure {
1721                    error: view_error.into(),
1722                    location: location.clone(),
1723                })
1724            }
1725        } else {
1726            None
1727        };
1728        Ok(tentative_outcome)
1729    }
1730
1731    async fn execute_view<'r>(
1732        &self,
1733        sql: &str,
1734        num_attributes: Option<usize>,
1735        output: &'r Result<QueryOutput<'_>, &'r str>,
1736        location: Location,
1737    ) -> Result<Outcome<'r>, anyhow::Error> {
1738        // Create indexed view SQL commands and execute `CREATE VIEW`.
1739        let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
1740            column_names.clone()
1741        } else {
1742            None
1743        };
1744        let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
1745            sql,
1746            Uuid::new_v4().as_simple(),
1747            num_attributes,
1748            expected_column_names,
1749        );
1750        let tentative_outcome = self
1751            .execute_view_inner(create_view.as_str(), output, location.clone())
1752            .await?;
1753
1754        // Either we already have an outcome or alternatively,
1755        // we proceed to index and query the view.
1756        if let Some(view_outcome) = tentative_outcome {
1757            return Ok(view_outcome);
1758        }
1759
1760        let tentative_outcome = self
1761            .execute_view_inner(create_index.as_str(), output, location.clone())
1762            .await?;
1763
1764        let view_outcome;
1765        if let Some(outcome) = tentative_outcome {
1766            view_outcome = outcome;
1767        } else {
1768            print_sql_if(self.stdout, view_sql.as_str(), self.verbose);
1769            view_outcome = self
1770                .execute_query(view_sql.as_str(), output, location.clone())
1771                .await?;
1772        }
1773
1774        // Remember to clean up after ourselves by dropping the view.
1775        print_sql_if(self.stdout, drop_view.as_str(), self.verbose);
1776        self.client.execute(drop_view.as_str(), &[]).await?;
1777
1778        Ok(view_outcome)
1779    }
1780
1781    async fn run_query<'r>(
1782        &self,
1783        sql: &'r str,
1784        output: &'r Result<QueryOutput<'_>, &'r str>,
1785        location: Location,
1786        in_transaction: &mut bool,
1787    ) -> Result<Outcome<'r>, anyhow::Error> {
1788        let prepare_outcome = self
1789            .prepare_query(sql, output, location.clone(), in_transaction)
1790            .await?;
1791        match prepare_outcome {
1792            PrepareQueryOutcome::QueryPrepared(QueryInfo {
1793                is_select,
1794                num_attributes,
1795            }) => {
1796                let query_outcome = self.execute_query(sql, output, location.clone()).await?;
1797                if is_select && self.auto_index_selects {
1798                    let view_outcome = self
1799                        .execute_view(sql, None, output, location.clone())
1800                        .await?;
1801
1802                    // We compare here the query-based and view-based outcomes.
1803                    // We only produce a test failure if the outcomes are of different
1804                    // variant types, thus accepting smaller deviations in the details
1805                    // produced for each variant.
1806                    if std::mem::discriminant::<Outcome>(&query_outcome)
1807                        != std::mem::discriminant::<Outcome>(&view_outcome)
1808                    {
1809                        // Before producing a failure outcome, we try to obtain a new
1810                        // outcome for view-based execution exploiting analysis of the
1811                        // number of attributes. This two-level strategy can avoid errors
1812                        // produced by column ambiguity in the `SELECT`.
1813                        let view_outcome = if num_attributes.is_some() {
1814                            self.execute_view(sql, num_attributes, output, location.clone())
1815                                .await?
1816                        } else {
1817                            view_outcome
1818                        };
1819
1820                        if std::mem::discriminant::<Outcome>(&query_outcome)
1821                            != std::mem::discriminant::<Outcome>(&view_outcome)
1822                        {
1823                            let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
1824                                query_outcome: Box::new(query_outcome),
1825                                view_outcome: Box::new(view_outcome),
1826                                location: location.clone(),
1827                            };
1828                            // Determine if this inconsistent view outcome should be reported
1829                            // as an error or only as a warning.
1830                            let outcome = if should_warn(&inconsistent_view_outcome) {
1831                                Outcome::Warning {
1832                                    cause: Box::new(inconsistent_view_outcome),
1833                                    location: location.clone(),
1834                                }
1835                            } else {
1836                                inconsistent_view_outcome
1837                            };
1838                            return Ok(outcome);
1839                        }
1840                    }
1841                }
1842                Ok(query_outcome)
1843            }
1844            PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
1845        }
1846    }
1847
1848    async fn get_conn(
1849        &mut self,
1850        name: Option<&str>,
1851        user: Option<&str>,
1852        password: Option<&str>,
1853    ) -> Result<&tokio_postgres::Client, tokio_postgres::Error> {
1854        match name {
1855            None => Ok(&self.client),
1856            Some(name) => {
1857                if !self.clients.contains_key(name) {
1858                    let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
1859                        self.internal_server_addr
1860                    } else if password.is_some() {
1861                        // Use password server for password authentication
1862                        self.password_server_addr
1863                    } else {
1864                        self.server_addr
1865                    };
1866                    let client = connect(addr, user, password).await?;
1867                    self.clients.insert(name.into(), client);
1868                }
1869                Ok(self.clients.get(name).unwrap())
1870            }
1871        }
1872    }
1873
1874    async fn run_simple<'r>(
1875        &mut self,
1876        conn: Option<&'r str>,
1877        user: Option<&'r str>,
1878        password: Option<&'r str>,
1879        sql: &'r str,
1880        sort: Sort,
1881        output: &'r Output,
1882        location: Location,
1883    ) -> Result<Outcome<'r>, anyhow::Error> {
1884        let actual = match self.get_conn(conn, user, password).await {
1885            Ok(client) => match client.simple_query(sql).await {
1886                Ok(result) => {
1887                    let mut rows = Vec::new();
1888
1889                    for m in result.into_iter() {
1890                        match m {
1891                            SimpleQueryMessage::Row(row) => {
1892                                let mut s = vec![];
1893                                for i in 0..row.len() {
1894                                    s.push(row.get(i).unwrap_or("NULL"));
1895                                }
1896                                rows.push(s.join(","));
1897                            }
1898                            SimpleQueryMessage::CommandComplete(count) => {
1899                                // This applies any sort on the COMPLETE line as
1900                                // well, but we do the same for the expected output.
1901                                rows.push(format!("COMPLETE {}", count));
1902                            }
1903                            SimpleQueryMessage::RowDescription(_) => {}
1904                            _ => panic!("unexpected"),
1905                        }
1906                    }
1907
1908                    if let Sort::Row = sort {
1909                        rows.sort();
1910                    }
1911
1912                    Output::Values(rows)
1913                }
1914                // Errors can contain multiple lines (say if there are details), and rewrite
1915                // sticks them each on their own line, so we need to split up the lines here to
1916                // each be its own String in the Vec.
1917                Err(error) => Output::Values(
1918                    error
1919                        .to_string_with_causes()
1920                        .lines()
1921                        .map(|s| s.to_string())
1922                        .collect(),
1923                ),
1924            },
1925            Err(error) => Output::Values(
1926                error
1927                    .to_string_with_causes()
1928                    .lines()
1929                    .map(|s| s.to_string())
1930                    .collect(),
1931            ),
1932        };
1933        if *output != actual {
1934            Ok(Outcome::OutputFailure {
1935                expected_output: output,
1936                actual_raw_output: vec![],
1937                actual_output: actual,
1938                location,
1939            })
1940        } else {
1941            Ok(Outcome::Success)
1942        }
1943    }
1944
1945    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
1946        let url = format!(
1947            "http://{}/api/catalog/check",
1948            self.internal_http_server_addr
1949        );
1950        let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
1951
1952        if let Some(inconsistencies) = response.get("err") {
1953            let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
1954                .expect("serializing Value cannot fail");
1955            Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
1956        } else {
1957            Ok(())
1958        }
1959    }
1960}
1961
1962async fn connect(
1963    addr: SocketAddr,
1964    user: Option<&str>,
1965    password: Option<&str>,
1966) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
1967    let mut config = tokio_postgres::Config::new();
1968    config.host(addr.ip().to_string());
1969    config.port(addr.port());
1970    config.user(user.unwrap_or("materialize"));
1971    if let Some(password) = password {
1972        config.password(password);
1973    }
1974    let (client, connection) = config.connect(NoTls).await?;
1975
1976    task::spawn(|| "sqllogictest_connect", async move {
1977        if let Err(e) = connection.await {
1978            eprintln!("connection error: {}", e);
1979        }
1980    });
1981    Ok(client)
1982}
1983
1984pub trait WriteFmt {
1985    fn write_fmt(&self, fmt: fmt::Arguments<'_>);
1986}
1987
1988pub struct RunConfig<'a> {
1989    pub stdout: &'a dyn WriteFmt,
1990    pub stderr: &'a dyn WriteFmt,
1991    pub verbose: bool,
1992    pub quiet: bool,
1993    pub postgres_url: String,
1994    pub prefix: String,
1995    pub no_fail: bool,
1996    pub fail_fast: bool,
1997    pub auto_index_tables: bool,
1998    pub auto_index_selects: bool,
1999    pub auto_transactions: bool,
2000    pub enable_table_keys: bool,
2001    pub orchestrator_process_wrapper: Option<String>,
2002    pub tracing: TracingCliArgs,
2003    pub tracing_handle: TracingHandle,
2004    pub system_parameter_defaults: BTreeMap<String, String>,
2005    /// Persist state is handled specially because:
2006    /// - Persist background workers do not necessarily shut down immediately once the server is
2007    ///   shut down, and may panic if their storage is deleted out from under them.
2008    /// - It's safe for different databases to reference the same state: all data is scoped by UUID.
2009    pub persist_dir: TempDir,
2010    pub replicas: usize,
2011    pub replica_size: String,
2012}
2013
2014/// Indentation used for verbose output of SQL statements.
2015const PRINT_INDENT: usize = 4;
2016
2017fn print_record(config: &RunConfig<'_>, record: &Record) {
2018    match record {
2019        Record::Statement { sql, .. } | Record::Query { sql, .. } => {
2020            print_sql(config.stdout, sql, None)
2021        }
2022        Record::Simple { conn, sql, .. } => print_sql(config.stdout, sql, *conn),
2023        Record::Copy {
2024            table_name,
2025            tsv_path,
2026        } => {
2027            writeln!(
2028                config.stdout,
2029                "{}slt copy {} from {}",
2030                " ".repeat(PRINT_INDENT),
2031                table_name,
2032                tsv_path
2033            )
2034        }
2035        Record::ResetServer => {
2036            writeln!(config.stdout, "{}reset-server", " ".repeat(PRINT_INDENT))
2037        }
2038        Record::Halt => {
2039            writeln!(config.stdout, "{}halt", " ".repeat(PRINT_INDENT))
2040        }
2041        Record::HashThreshold { threshold } => {
2042            writeln!(
2043                config.stdout,
2044                "{}hash-threshold {}",
2045                " ".repeat(PRINT_INDENT),
2046                threshold
2047            )
2048        }
2049    }
2050}
2051
2052fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
2053    if cond {
2054        print_sql(stdout, sql, None)
2055    }
2056}
2057
2058fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str, conn: Option<&str>) {
2059    let text = if let Some(conn) = conn {
2060        format!("[conn={}] {}", conn, sql)
2061    } else {
2062        sql.to_string()
2063    };
2064    writeln!(stdout, "{}", util::indent(&text, PRINT_INDENT))
2065}
2066
2067/// Regular expressions for matching error messages that should force a plan failure
2068/// in an inconsistent view outcome into a warning if the corresponding query succeeds.
2069const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
2070    // The following are unfixable errors in indexed views given our
2071    // current constraints.
2072    "cannot materialize call to",
2073    "SHOW commands are not allowed in views",
2074    "cannot create view with unstable dependencies",
2075    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
2076    "no valid schema selected",
2077    r#"system schema '\w+' cannot be modified"#,
2078    r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
2079    // NOTE(vmarcos): Column ambiguity that could not be eliminated by our
2080    // currently implemented syntactic rewrites is considered unfixable.
2081    // In addition, if some column cannot be dealt with, e.g., in `ORDER BY`
2082    // references, we treat this condition as unfixable as well.
2083    r#"column "[\w\?]+" specified more than once"#,
2084    r#"column "(\w+\.)?\w+" does not exist"#,
2085];
2086
2087/// Evaluates if the given outcome should be returned directly or if it should
2088/// be wrapped as a warning. Note that this function should be used for outcomes
2089/// that can be judged in a context-independent manner, i.e., the outcome itself
2090/// provides enough information as to whether a warning should be emitted or not.
2091fn should_warn(outcome: &Outcome) -> bool {
2092    match outcome {
2093        Outcome::InconsistentViewOutcome {
2094            query_outcome,
2095            view_outcome,
2096            ..
2097        } => match (query_outcome.as_ref(), view_outcome.as_ref()) {
2098            (Outcome::Success, Outcome::PlanFailure { error, .. }) => {
2099                INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
2100                    Regex::new(s)
2101                        .expect("unexpected error in regular expression parsing")
2102                        .is_match(&error.to_string_with_causes())
2103                })
2104            }
2105            _ => false,
2106        },
2107        _ => false,
2108    }
2109}
2110
2111pub async fn run_string(
2112    runner: &mut Runner<'_>,
2113    source: &str,
2114    input: &str,
2115) -> Result<Outcomes, anyhow::Error> {
2116    runner.reset_database().await?;
2117
2118    let mut outcomes = Outcomes::default();
2119    let mut parser = crate::parser::Parser::new(source, input);
2120    // Transactions are currently relatively slow. Since sqllogictest runs in a single connection
2121    // there should be no difference in having longer running transactions.
2122    let mut in_transaction = false;
2123    writeln!(runner.config.stdout, "--- {}", source);
2124
2125    for record in parser.parse_records()? {
2126        // In maximal-verbose mode, print the query before attempting to run
2127        // it. Running the query might panic, so it is important to print out
2128        // what query we are trying to run *before* we panic.
2129        if runner.config.verbose {
2130            print_record(runner.config, &record);
2131        }
2132
2133        let outcome = runner
2134            .run_record(&record, &mut in_transaction)
2135            .await
2136            .map_err(|err| format!("In {}:\n{}", source, err))
2137            .unwrap();
2138
2139        // Print warnings and failures in verbose mode.
2140        if !runner.config.quiet && !outcome.success() {
2141            if !runner.config.verbose {
2142                // If `verbose` is enabled, we'll already have printed the record,
2143                // so don't print it again. Yes, this is an ugly bit of logic.
2144                // Please don't try to consolidate it with the `print_record`
2145                // call above, as it's important to have a mode in which records
2146                // are printed before they are run, so that if running the
2147                // record panics, you can tell which record caused it.
2148                if !outcome.failure() {
2149                    writeln!(
2150                        runner.config.stdout,
2151                        "{}",
2152                        util::indent("Warning detected for: ", 4)
2153                    );
2154                }
2155                print_record(runner.config, &record);
2156            }
2157            if runner.config.verbose || outcome.failure() {
2158                writeln!(
2159                    runner.config.stdout,
2160                    "{}",
2161                    util::indent(&outcome.to_string(), 4)
2162                );
2163                writeln!(runner.config.stdout, "{}", util::indent("----", 4));
2164            }
2165        }
2166
2167        outcomes.stats[outcome.code()] += 1;
2168        if outcome.failure() {
2169            outcomes.details.push(format!("{}", outcome));
2170        }
2171
2172        if let Outcome::Bail { .. } = outcome {
2173            break;
2174        }
2175
2176        if runner.config.fail_fast && outcome.failure() {
2177            break;
2178        }
2179    }
2180    Ok(outcomes)
2181}
2182
2183pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
2184    let mut input = String::new();
2185    File::open(filename)?.read_to_string(&mut input)?;
2186    let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
2187    runner.check_catalog().await?;
2188
2189    Ok(outcomes)
2190}
2191
2192pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
2193    runner.reset_database().await?;
2194
2195    let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
2196
2197    let mut input = String::new();
2198    file.read_to_string(&mut input)?;
2199
2200    let mut buf = RewriteBuffer::new(&input);
2201
2202    let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
2203    writeln!(runner.config.stdout, "--- {}", filename.display());
2204    let mut in_transaction = false;
2205
2206    fn append_values_output(
2207        buf: &mut RewriteBuffer,
2208        input: &String,
2209        expected_output: &str,
2210        mode: &Mode,
2211        types: &Vec<Type>,
2212        column_names: Option<&Vec<ColumnName>>,
2213        actual_output: &Vec<String>,
2214        multiline: bool,
2215    ) {
2216        buf.append_header(input, expected_output, column_names);
2217
2218        for (i, row) in actual_output.chunks(types.len()).enumerate() {
2219            match mode {
2220                // In Cockroach mode, output each row on its own line, with
2221                // two spaces between each column.
2222                Mode::Cockroach => {
2223                    if i != 0 {
2224                        buf.append("\n");
2225                    }
2226
2227                    if row.len() == 0 {
2228                        // nothing to do
2229                    } else if row.len() == 1 {
2230                        // If there is only one column, then there is no need for space
2231                        // substitution, so we only do newline substitution.
2232                        if multiline {
2233                            buf.append(&row[0]);
2234                        } else {
2235                            buf.append(&row[0].replace('\n', "⏎"))
2236                        }
2237                    } else {
2238                        // Substitute spaces with ␠ to avoid mistaking the spaces in the result
2239                        // values with spaces that separate columns.
2240                        buf.append(
2241                            &row.iter()
2242                                .map(|col| {
2243                                    let mut col = col.replace(' ', "␠");
2244                                    if !multiline {
2245                                        col = col.replace('\n', "⏎");
2246                                    }
2247                                    col
2248                                })
2249                                .join("  "),
2250                        );
2251                    }
2252                }
2253                // In standard mode, output each value on its own line,
2254                // and ignore row boundaries.
2255                // No need to substitute spaces, because every value (not row) is on a separate
2256                // line. But we do need to substitute newlines.
2257                Mode::Standard => {
2258                    for (j, col) in row.iter().enumerate() {
2259                        if i != 0 || j != 0 {
2260                            buf.append("\n");
2261                        }
2262                        buf.append(&if multiline {
2263                            col.clone()
2264                        } else {
2265                            col.replace('\n', "⏎")
2266                        });
2267                    }
2268                }
2269            }
2270        }
2271    }
2272
2273    for record in parser.parse_records()? {
2274        let outcome = runner.run_record(&record, &mut in_transaction).await?;
2275
2276        match (&record, &outcome) {
2277            // If we see an output failure for a query, rewrite the expected output
2278            // to match the observed output.
2279            (
2280                Record::Query {
2281                    output:
2282                        Ok(QueryOutput {
2283                            mode,
2284                            output: Output::Values(_),
2285                            output_str: expected_output,
2286                            types,
2287                            column_names,
2288                            multiline,
2289                            ..
2290                        }),
2291                    ..
2292                },
2293                Outcome::OutputFailure {
2294                    actual_output: Output::Values(actual_output),
2295                    ..
2296                },
2297            ) => {
2298                append_values_output(
2299                    &mut buf,
2300                    &input,
2301                    expected_output,
2302                    mode,
2303                    types,
2304                    column_names.as_ref(),
2305                    actual_output,
2306                    *multiline,
2307                );
2308            }
2309            (
2310                Record::Query {
2311                    output:
2312                        Ok(QueryOutput {
2313                            mode,
2314                            output: Output::Values(_),
2315                            output_str: expected_output,
2316                            types,
2317                            multiline,
2318                            ..
2319                        }),
2320                    ..
2321                },
2322                Outcome::WrongColumnNames {
2323                    actual_column_names,
2324                    actual_output: Output::Values(actual_output),
2325                    ..
2326                },
2327            ) => {
2328                append_values_output(
2329                    &mut buf,
2330                    &input,
2331                    expected_output,
2332                    mode,
2333                    types,
2334                    Some(actual_column_names),
2335                    actual_output,
2336                    *multiline,
2337                );
2338            }
2339            (
2340                Record::Query {
2341                    output:
2342                        Ok(QueryOutput {
2343                            output: Output::Hashed { .. },
2344                            output_str: expected_output,
2345                            column_names,
2346                            ..
2347                        }),
2348                    ..
2349                },
2350                Outcome::OutputFailure {
2351                    actual_output: Output::Hashed { num_values, md5 },
2352                    ..
2353                },
2354            ) => {
2355                buf.append_header(&input, expected_output, column_names.as_ref());
2356
2357                buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
2358            }
2359            (
2360                Record::Simple {
2361                    output_str: expected_output,
2362                    ..
2363                },
2364                Outcome::OutputFailure {
2365                    actual_output: Output::Values(actual_output),
2366                    ..
2367                },
2368            ) => {
2369                buf.append_header(&input, expected_output, None);
2370
2371                for (i, row) in actual_output.iter().enumerate() {
2372                    if i != 0 {
2373                        buf.append("\n");
2374                    }
2375                    buf.append(row);
2376                }
2377            }
2378            (
2379                Record::Query {
2380                    sql,
2381                    output: Err(err),
2382                    ..
2383                },
2384                outcome,
2385            )
2386            | (
2387                Record::Statement {
2388                    expected_error: Some(err),
2389                    sql,
2390                    ..
2391                },
2392                outcome,
2393            ) if outcome.err_msg().is_some() => {
2394                buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
2395            }
2396            (_, Outcome::Success) => {}
2397            _ => bail!("unexpected: {:?} {:?}", record, outcome),
2398        }
2399    }
2400
2401    file.set_len(0)?;
2402    file.seek(SeekFrom::Start(0))?;
2403    file.write_all(buf.finish().as_bytes())?;
2404    file.sync_all()?;
2405    Ok(())
2406}
2407
2408/// Provides a means to rewrite the `.slt` file while iterating over it.
2409///
2410/// This struct takes the slt file as its `input`, tracks a cursor into it
2411/// (`input_offset`), and provides a buffer (`output`) to store the rewritten
2412/// results.
2413///
2414/// Functions that modify the file will lazily move `input` into `output` using
2415/// `flush_to`. However, those calls should all be interior to other functions.
2416#[derive(Debug)]
2417struct RewriteBuffer<'a> {
2418    input: &'a str,
2419    input_offset: usize,
2420    output: String,
2421}
2422
2423impl<'a> RewriteBuffer<'a> {
2424    fn new(input: &'a str) -> RewriteBuffer<'a> {
2425        RewriteBuffer {
2426            input,
2427            input_offset: 0,
2428            output: String::new(),
2429        }
2430    }
2431
2432    fn flush_to(&mut self, offset: usize) {
2433        assert!(offset >= self.input_offset);
2434        let chunk = &self.input[self.input_offset..offset];
2435        self.output.push_str(chunk);
2436        self.input_offset = offset;
2437    }
2438
2439    fn skip_to(&mut self, offset: usize) {
2440        assert!(offset >= self.input_offset);
2441        self.input_offset = offset;
2442    }
2443
2444    fn append(&mut self, s: &str) {
2445        self.output.push_str(s);
2446    }
2447
2448    fn append_header(
2449        &mut self,
2450        input: &String,
2451        expected_output: &str,
2452        column_names: Option<&Vec<ColumnName>>,
2453    ) {
2454        // Output everything before this record.
2455        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2456        #[allow(clippy::as_conversions)]
2457        let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
2458        self.flush_to(offset);
2459        self.skip_to(offset + expected_output.len());
2460
2461        // Attempt to install the result separator (----), if it does
2462        // not already exist.
2463        if self.peek_last(5) == "\n----" {
2464            self.append("\n");
2465        } else if self.peek_last(6) != "\n----\n" {
2466            self.append("\n----\n");
2467        }
2468
2469        let Some(names) = column_names else {
2470            return;
2471        };
2472        self.append(
2473            &names
2474                .iter()
2475                .map(|name| name.replace(' ', "␠"))
2476                .collect::<Vec<_>>()
2477                .join(" "),
2478        );
2479        self.append("\n");
2480    }
2481
2482    fn rewrite_expected_error(
2483        &mut self,
2484        input: &String,
2485        old_err: &str,
2486        new_err: &str,
2487        query: &str,
2488    ) {
2489        // Output everything before this error message.
2490        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2491        #[allow(clippy::as_conversions)]
2492        let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
2493        self.flush_to(err_offset);
2494        self.append(new_err);
2495        self.append("\n");
2496        self.append(query);
2497        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2498        #[allow(clippy::as_conversions)]
2499        self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
2500    }
2501
2502    fn peek_last(&self, n: usize) -> &str {
2503        &self.output[self.output.len() - n..]
2504    }
2505
2506    fn finish(mut self) -> String {
2507        self.flush_to(self.input.len());
2508        self.output
2509    }
2510}
2511
2512/// Generates view creation, view indexing, view querying, and view
2513/// dropping SQL commands for a given `SELECT` query. If the number
2514/// of attributes produced by the query is known, the view commands
2515/// are specialized to avoid issues with column ambiguity. This
2516/// function is a helper for `--auto_index_selects` and assumes that
2517/// the provided input SQL has already been run through the parser,
2518/// resulting in a valid `SELECT` statement.
2519fn generate_view_sql(
2520    sql: &str,
2521    view_uuid: &Simple,
2522    num_attributes: Option<usize>,
2523    expected_column_names: Option<Vec<ColumnName>>,
2524) -> (String, String, String, String) {
2525    // To create the view, re-parse the sql; note that we must find exactly
2526    // one statement and it must be a `SELECT`.
2527    // NOTE(vmarcos): Direct string manipulation was attempted while
2528    // prototyping the code below, which avoids the extra parsing and
2529    // data structure cloning. However, running DDL is so slow that
2530    // it did not matter in terms of runtime. We can revisit this if
2531    // DDL cost drops dramatically in the future.
2532    let stmts = parser::parse_statements(sql).unwrap_or_default();
2533    assert!(stmts.len() == 1);
2534    let (query, query_as_of) = match &stmts[0].ast {
2535        Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
2536        _ => unreachable!("This function should only be called for SELECTs"),
2537    };
2538
2539    // Prior to creating the view, process the `ORDER BY` clause of
2540    // the `SELECT` query, if any. Ordering is not preserved when a
2541    // view includes an `ORDER BY` clause and must be re-enforced by
2542    // an external `ORDER BY` clause when querying the view.
2543    let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
2544        (query.order_by.clone(), vec![], None)
2545    } else {
2546        derive_order_by(&query.body, &query.order_by)
2547    };
2548
2549    // Since one-shot SELECT statements may contain ambiguous column names,
2550    // we either use the expected column names, if that option was
2551    // provided, or else just rename the output schema of the view
2552    // using numerically increasing attribute names, whenever possible.
2553    // This strategy makes it possible to use `CREATE INDEX`, thus
2554    // matching the behavior of the option `auto_index_tables`. However,
2555    // we may be presented with a `SELECT *` query, in which case the parser
2556    // does not produce sufficient information to allow us to compute
2557    // the number of output columns. In the latter case, we are supplied
2558    // with `None` for `num_attributes` and just employ the command
2559    // `CREATE DEFAULT INDEX` instead. Additionally, the view is created
2560    // without schema renaming. This strategy is insufficient to dodge
2561    // column name ambiguity in all cases, but we assume here that we
2562    // can adjust the (hopefully) small number of tests that eventually
2563    // challenge us in this particular way.
2564    let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
2565    let projection = expected_column_names.map_or_else(
2566        || {
2567            num_attributes.map_or(vec![], |n| {
2568                (1..=n)
2569                    .map(|i| Ident::new_unchecked(format!("a{i}")))
2570                    .collect()
2571            })
2572        },
2573        |cols| {
2574            cols.iter()
2575                .map(|c| Ident::new_unchecked(c.as_str()))
2576                .collect()
2577        },
2578    );
2579    let columns: Vec<Ident> = projection
2580        .iter()
2581        .cloned()
2582        .chain(extra_columns.iter().map(|item| {
2583            if let SelectItem::Expr {
2584                expr: _,
2585                alias: Some(ident),
2586            } = item
2587            {
2588                ident.clone()
2589            } else {
2590                unreachable!("alias must be given for extra column")
2591            }
2592        }))
2593        .collect();
2594
2595    // Build a `CREATE VIEW` with the columns computed above.
2596    let mut query = query.clone();
2597    if extra_columns.len() > 0 {
2598        match &mut query.body {
2599            SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
2600            _ => unimplemented!("cannot yet rewrite projections of nested queries"),
2601        }
2602    }
2603    let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
2604        if_exists: IfExistsBehavior::Error,
2605        temporary: false,
2606        definition: ViewDefinition {
2607            name: name.clone(),
2608            columns: columns.clone(),
2609            query,
2610        },
2611    })
2612    .to_ast_string_stable();
2613
2614    // We then create either a `CREATE INDEX` or a `CREATE DEFAULT INDEX`
2615    // statement, depending on whether we could obtain the number of
2616    // attributes from the original `SELECT`.
2617    let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2618        name: None,
2619        in_cluster: None,
2620        on_name: RawItemName::Name(name.clone()),
2621        key_parts: if columns.len() == 0 {
2622            None
2623        } else {
2624            Some(
2625                columns
2626                    .iter()
2627                    .map(|ident| Expr::Identifier(vec![ident.clone()]))
2628                    .collect(),
2629            )
2630        },
2631        with_options: Vec::new(),
2632        if_not_exists: false,
2633    })
2634    .to_ast_string_stable();
2635
2636    // Assert if DISTINCT semantics are unchanged from view
2637    let distinct_unneeded = extra_columns.len() == 0
2638        || match distinct {
2639            None | Some(Distinct::On(_)) => true,
2640            Some(Distinct::EntireRow) => false,
2641        };
2642    let distinct = if distinct_unneeded { None } else { distinct };
2643
2644    // `SELECT [* | {projection}] FROM {name} [ORDER BY {view_order_by}]`
2645    let view_sql = AstStatement::<Raw>::Select(SelectStatement {
2646        query: Query {
2647            ctes: CteBlock::Simple(vec![]),
2648            body: SetExpr::Select(Box::new(Select {
2649                distinct,
2650                projection: if projection.len() == 0 {
2651                    vec![SelectItem::Wildcard]
2652                } else {
2653                    projection
2654                        .iter()
2655                        .map(|ident| SelectItem::Expr {
2656                            expr: Expr::Identifier(vec![ident.clone()]),
2657                            alias: None,
2658                        })
2659                        .collect()
2660                },
2661                from: vec![TableWithJoins {
2662                    relation: TableFactor::Table {
2663                        name: RawItemName::Name(name.clone()),
2664                        alias: None,
2665                    },
2666                    joins: vec![],
2667                }],
2668                selection: None,
2669                group_by: vec![],
2670                having: None,
2671                qualify: None,
2672                options: vec![],
2673            })),
2674            order_by: view_order_by,
2675            limit: None,
2676            offset: None,
2677        },
2678        as_of: query_as_of.clone(),
2679    })
2680    .to_ast_string_stable();
2681
2682    // `DROP VIEW {name}`
2683    let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
2684        object_type: ObjectType::View,
2685        if_exists: false,
2686        names: vec![UnresolvedObjectName::Item(name)],
2687        cascade: false,
2688    })
2689    .to_ast_string_stable();
2690
2691    (create_view, create_index, view_sql, drop_view)
2692}
2693
2694/// Analyzes the provided query `body` to derive the number of
2695/// attributes in the query. We only consider syntactic cues,
2696/// so we may end up deriving `None` for the number of attributes
2697/// as a conservative approximation.
2698fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
2699    let Some((projection, _)) = find_projection(body) else {
2700        return None;
2701    };
2702    derive_num_attributes_from_projection(projection)
2703}
2704
2705/// Analyzes a query's `ORDER BY` clause to derive an `ORDER BY`
2706/// clause that makes numeric references to any expressions in
2707/// the projection and generated-attribute references to expressions
2708/// that need to be added as extra columns to the projection list.
2709/// The rewritten `ORDER BY` clause is then usable when querying a
2710/// view that contains the same `SELECT` as the given query.
2711/// This function returns both the rewritten `ORDER BY` clause
2712/// as well as a list of extra columns that need to be added
2713/// to the query's projection for the `ORDER BY` clause to
2714/// succeed.
2715fn derive_order_by(
2716    body: &SetExpr<Raw>,
2717    order_by: &Vec<OrderByExpr<Raw>>,
2718) -> (
2719    Vec<OrderByExpr<Raw>>,
2720    Vec<SelectItem<Raw>>,
2721    Option<Distinct<Raw>>,
2722) {
2723    let Some((projection, distinct)) = find_projection(body) else {
2724        return (vec![], vec![], None);
2725    };
2726    let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
2727    (view_order_by, extra_columns, distinct.clone())
2728}
2729
2730/// Finds the projection list in a `SELECT` query body.
2731fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
2732    // Iterate to peel off the query body until the query's
2733    // projection list is found.
2734    let mut set_expr = body;
2735    loop {
2736        match set_expr {
2737            SetExpr::Select(select) => {
2738                return Some((&select.projection, &select.distinct));
2739            }
2740            SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
2741            SetExpr::Query(query) => set_expr = &query.body,
2742            _ => return None,
2743        }
2744    }
2745}
2746
2747/// Computes the number of attributes that are obtained by the
2748/// projection of a `SELECT` query. The projection may include
2749/// wildcards, in which case the analysis just returns `None`.
2750fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
2751    let mut num_attributes = 0usize;
2752    for item in projection.iter() {
2753        let SelectItem::Expr { expr, .. } = item else {
2754            return None;
2755        };
2756        match expr {
2757            Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
2758                return None;
2759            }
2760            _ => {
2761                num_attributes += 1;
2762            }
2763        }
2764    }
2765    Some(num_attributes)
2766}
2767
2768/// Computes an `ORDER BY` clause with only numeric references
2769/// from given projection and `ORDER BY` of a `SELECT` query.
2770/// If the derivation fails to match a given expression, the
2771/// matched prefix is returned. Note that this could be empty.
2772fn derive_order_by_from_projection(
2773    projection: &Vec<SelectItem<Raw>>,
2774    order_by: &Vec<OrderByExpr<Raw>>,
2775) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
2776    let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
2777    let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
2778    for order_by_expr in order_by.iter() {
2779        let query_expr = &order_by_expr.expr;
2780        let view_expr = match query_expr {
2781            Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
2782            _ => {
2783                // Find expression in query projection, if we can.
2784                if let Some(i) = projection.iter().position(|item| match item {
2785                    SelectItem::Expr { expr, alias } => {
2786                        expr == query_expr
2787                            || match query_expr {
2788                                Expr::Identifier(ident) => {
2789                                    ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
2790                                }
2791                                _ => false,
2792                            }
2793                    }
2794                    SelectItem::Wildcard => false,
2795                }) {
2796                    Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
2797                } else {
2798                    // If the expression is not found in the
2799                    // projection, add extra column.
2800                    let ident = Ident::new_unchecked(format!(
2801                        "a{}",
2802                        (projection.len() + extra_columns.len() + 1)
2803                    ));
2804                    extra_columns.push(SelectItem::Expr {
2805                        expr: query_expr.clone(),
2806                        alias: Some(ident.clone()),
2807                    });
2808                    Expr::Identifier(vec![ident])
2809                }
2810            }
2811        };
2812        view_order_by.push(OrderByExpr {
2813            expr: view_expr,
2814            asc: order_by_expr.asc,
2815            nulls_last: order_by_expr.nulls_last,
2816        });
2817    }
2818    (view_order_by, extra_columns)
2819}
2820
2821/// Returns extra statements to execute after `stmt` is executed.
2822fn mutate(sql: &str) -> Vec<String> {
2823    let stmts = parser::parse_statements(sql).unwrap_or_default();
2824    let mut additional = Vec::new();
2825    for stmt in stmts {
2826        match stmt.ast {
2827            AstStatement::CreateTable(stmt) => additional.push(
2828                // CREATE TABLE -> CREATE INDEX. Specify all columns manually in case CREATE
2829                // DEFAULT INDEX ever goes away.
2830                AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2831                    name: None,
2832                    in_cluster: None,
2833                    on_name: RawItemName::Name(stmt.name.clone()),
2834                    key_parts: Some(
2835                        stmt.columns
2836                            .iter()
2837                            .map(|def| Expr::Identifier(vec![def.name.clone()]))
2838                            .collect(),
2839                    ),
2840                    with_options: Vec::new(),
2841                    if_not_exists: false,
2842                })
2843                .to_ast_string_stable(),
2844            ),
2845            _ => {}
2846        }
2847    }
2848    additional
2849}
2850
2851#[mz_ore::test]
2852#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
2853fn test_generate_view_sql() {
2854    let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
2855    let cases = vec![
2856        (("SELECT * FROM t", None, None),
2857        (
2858            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
2859            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2860            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2861            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2862        )),
2863        (("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
2864        (
2865            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2866            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
2867            r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2868            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2869        )),
2870        (("SELECT a, b, c FROM t1, t2", Some(3), None),
2871        (
2872            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2873            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2874            r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2875            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2876        )),
2877        // A case with ambiguity that is accepted by the function, illustrating that
2878        // our measures to dodge this issue are imperfect.
2879        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
2880        (
2881            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
2882            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2883            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2884            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2885        )),
2886        (("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
2887        (
2888            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
2889            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
2890            r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
2891            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2892        )),
2893        (("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
2894        (
2895            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
2896            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
2897            r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2898            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2899        )),
2900        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
2901        (
2902            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
2903            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2904            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2905            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2906        )),
2907        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
2908        (
2909            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
2910            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2911            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
2912            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2913        )),
2914        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
2915        (
2916            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
2917            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2918            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
2919            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2920        )),
2921        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
2922        (
2923            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
2924            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2925            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
2926            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2927        )),
2928    ];
2929    for ((sql, num_attributes, expected_column_names), expected) in cases {
2930        let view_sql =
2931            generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
2932        assert_eq!(expected, view_sql);
2933    }
2934}
2935
2936#[mz_ore::test]
2937fn test_mutate() {
2938    let cases = vec![
2939        ("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
2940        (
2941            "CREATE TABLE t (a INT)",
2942            vec![r#"CREATE INDEX ON "t" ("a")"#],
2943        ),
2944        (
2945            "CREATE TABLE t (a INT, b TEXT)",
2946            vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
2947        ),
2948        // Invalid syntax works, just returns nothing.
2949        ("BAD SYNTAX", Vec::new()),
2950    ];
2951    for (sql, expected) in cases {
2952        let stmts = mutate(sql);
2953        assert_eq!(expected, stmts, "sql: {sql}");
2954    }
2955}