Skip to main content

mz_sqllogictest/
runner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The Materialize-specific runner for sqllogictest.
11//!
12//! slt tests expect a serialized execution of sql statements and queries.
13//! To get the same results in materialize we track current_timestamp and increment it whenever we execute a statement.
14//!
15//! The high-level workflow is:
16//!   for each record in the test file:
17//!     if record is a sql statement:
18//!       run sql in postgres, observe changes and copy them to materialize using LocalInput::Updates(..)
19//!       advance current_timestamp
20//!       promise to never send updates for times < current_timestamp using LocalInput::Watermark(..)
21//!       compare to expected results
22//!       if wrong, bail out and stop processing this file
23//!     if record is a sql query:
24//!       peek query at current_timestamp
25//!       compare to expected results
26//!       if wrong, record the error
27
28use std::collections::BTreeMap;
29use std::error::Error;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write};
32use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::LazyLock;
36use std::time::Duration;
37use std::{env, fmt, ops, str, thread};
38
39use anyhow::{anyhow, bail};
40use bytes::BytesMut;
41use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
42use fallible_iterator::FallibleIterator;
43use futures::sink::SinkExt;
44use itertools::Itertools;
45use maplit::btreemap;
46use md5::{Digest, Md5};
47use mz_adapter_types::bootstrap_builtin_cluster_config::{
48    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
49    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
50    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
51};
52use mz_catalog::config::ClusterReplicaSizeMap;
53use mz_controller::{ControllerConfig, ReplicaHttpLocator};
54use mz_environmentd::CatalogConfig;
55use mz_license_keys::ValidatedLicenseKey;
56use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
57use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
58use mz_ore::cast::{CastFrom, ReinterpretCast};
59use mz_ore::channel::trigger;
60use mz_ore::error::ErrorExt;
61use mz_ore::metrics::MetricsRegistry;
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::retry::Retry;
64use mz_ore::task;
65use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
66use mz_ore::tracing::TracingHandle;
67use mz_ore::url::SensitiveUrl;
68use mz_persist_client::PersistLocation;
69use mz_persist_client::cache::PersistClientCache;
70use mz_persist_client::cfg::PersistConfig;
71use mz_persist_client::rpc::{
72    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
73};
74use mz_pgrepr::{Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value, oid};
75use mz_repr::ColumnName;
76use mz_repr::adt::date::Date;
77use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
78use mz_repr::adt::numeric;
79use mz_secrets::SecretsController;
80use mz_server_core::listeners::{
81    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
82    ListenersConfig, SqlListenerConfig,
83};
84use mz_sql::ast::{Expr, Raw, Statement};
85use mz_sql::catalog::EnvironmentId;
86use mz_sql_parser::ast::display::AstDisplay;
87use mz_sql_parser::ast::{
88    CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
89    IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
90    SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
91    UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
92};
93use mz_sql_parser::parser;
94use mz_storage_types::connections::ConnectionContext;
95use postgres_protocol::types;
96use regex::Regex;
97use tempfile::TempDir;
98use tokio::net::TcpListener;
99use tokio::runtime::Runtime;
100use tokio::sync::oneshot;
101use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
102use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
103use tokio_stream::wrappers::TcpListenerStream;
104use tower_http::cors::AllowOrigin;
105use tracing::{error, info};
106use uuid::Uuid;
107use uuid::fmt::Simple;
108
109use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
110use crate::util;
111
112#[derive(Debug)]
113pub enum Outcome<'a> {
114    Unsupported {
115        error: anyhow::Error,
116        location: Location,
117    },
118    ParseFailure {
119        error: anyhow::Error,
120        location: Location,
121    },
122    PlanFailure {
123        error: anyhow::Error,
124        expected_error: Option<String>,
125        location: Location,
126    },
127    UnexpectedPlanSuccess {
128        expected_error: &'a str,
129        location: Location,
130    },
131    WrongNumberOfRowsInserted {
132        expected_count: u64,
133        actual_count: u64,
134        location: Location,
135    },
136    WrongColumnCount {
137        expected_count: usize,
138        actual_count: usize,
139        location: Location,
140    },
141    WrongColumnNames {
142        expected_column_names: &'a Vec<ColumnName>,
143        actual_column_names: Vec<ColumnName>,
144        actual_output: Output,
145        location: Location,
146    },
147    OutputFailure {
148        expected_output: &'a Output,
149        actual_raw_output: Vec<Row>,
150        actual_output: Output,
151        location: Location,
152    },
153    InconsistentViewOutcome {
154        query_outcome: Box<Outcome<'a>>,
155        view_outcome: Box<Outcome<'a>>,
156        location: Location,
157    },
158    Bail {
159        cause: Box<Outcome<'a>>,
160        location: Location,
161    },
162    Warning {
163        cause: Box<Outcome<'a>>,
164        location: Location,
165    },
166    Success,
167}
168
169const NUM_OUTCOMES: usize = 12;
170const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
171const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
172
173impl<'a> Outcome<'a> {
174    fn code(&self) -> usize {
175        match self {
176            Outcome::Unsupported { .. } => 0,
177            Outcome::ParseFailure { .. } => 1,
178            Outcome::PlanFailure { .. } => 2,
179            Outcome::UnexpectedPlanSuccess { .. } => 3,
180            Outcome::WrongNumberOfRowsInserted { .. } => 4,
181            Outcome::WrongColumnCount { .. } => 5,
182            Outcome::WrongColumnNames { .. } => 6,
183            Outcome::OutputFailure { .. } => 7,
184            Outcome::InconsistentViewOutcome { .. } => 8,
185            Outcome::Bail { .. } => 9,
186            Outcome::Warning { .. } => 10,
187            Outcome::Success => 11,
188        }
189    }
190
191    fn success(&self) -> bool {
192        matches!(self, Outcome::Success)
193    }
194
195    fn failure(&self) -> bool {
196        !matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
197    }
198
199    /// Returns an error message that will match self. Appropriate for
200    /// rewriting error messages (i.e. not inserting error messages where we
201    /// currently expect success).
202    fn err_msg(&self) -> Option<String> {
203        match self {
204            Outcome::Unsupported { error, .. }
205            | Outcome::ParseFailure { error, .. }
206            | Outcome::PlanFailure { error, .. } => {
207                // Take only the first line, which should be sufficient for
208                // meaningfully matching the error.
209                let err_str = error.to_string_with_causes();
210                let err_str = err_str.split('\n').next().unwrap();
211                // Strip the "db error: ERROR: " prefix added by the postgres
212                // client library, as it's noisy and not useful for matching.
213                let err_str = err_str.strip_prefix("db error: ERROR: ").unwrap_or(err_str);
214                // This value gets fed back into regex to check that it matches
215                // `self`, so escape its meta characters. We need to undo the
216                // escaping of #. `regex::escape` escapes this because it
217                // expects that we use the `x` flag when building a regex, but
218                // this is not the case, so \# would end up being an invalid
219                // escape sequence, which would choke the parsing of the slt
220                // file the next time around.
221                Some(regex::escape(err_str).replace(r"\#", "#"))
222            }
223            _ => None,
224        }
225    }
226}
227
228impl fmt::Display for Outcome<'_> {
229    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
230        use Outcome::*;
231        const INDENT: &str = "\n        ";
232        match self {
233            Unsupported { error, location } => write!(
234                f,
235                "Unsupported:{}:\n{}",
236                location,
237                error.display_with_causes()
238            ),
239            ParseFailure { error, location } => {
240                write!(
241                    f,
242                    "ParseFailure:{}:\n{}",
243                    location,
244                    error.display_with_causes()
245                )
246            }
247            PlanFailure {
248                error,
249                expected_error,
250                location,
251            } => {
252                if let Some(expected_error) = expected_error {
253                    write!(
254                        f,
255                        "PlanFailure:{}:\nerror does not match expected pattern:\n  expected: /{}/\n  actual:    {}",
256                        location,
257                        expected_error,
258                        error.display_with_causes()
259                    )
260                } else {
261                    write!(f, "PlanFailure:{}:\n{:#}", location, error)
262                }
263            }
264            UnexpectedPlanSuccess {
265                expected_error,
266                location,
267            } => write!(
268                f,
269                "UnexpectedPlanSuccess:{} expected error: {}",
270                location, expected_error
271            ),
272            WrongNumberOfRowsInserted {
273                expected_count,
274                actual_count,
275                location,
276            } => write!(
277                f,
278                "WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
279                location, INDENT, expected_count, INDENT, actual_count
280            ),
281            WrongColumnCount {
282                expected_count,
283                actual_count,
284                location,
285            } => write!(
286                f,
287                "WrongColumnCount:{}{}expected: {}{}actually: {}",
288                location, INDENT, expected_count, INDENT, actual_count
289            ),
290            WrongColumnNames {
291                expected_column_names,
292                actual_column_names,
293                actual_output: _,
294                location,
295            } => write!(
296                f,
297                "Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
298                location,
299                INDENT,
300                expected_column_names
301                    .iter()
302                    .map(|n| n.to_string())
303                    .collect::<Vec<_>>()
304                    .join(" "),
305                INDENT,
306                actual_column_names
307                    .iter()
308                    .map(|n| n.to_string())
309                    .collect::<Vec<_>>()
310                    .join(" ")
311            ),
312            OutputFailure {
313                expected_output,
314                actual_raw_output,
315                actual_output,
316                location,
317            } => write!(
318                f,
319                "OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
320                location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
321            ),
322            InconsistentViewOutcome {
323                query_outcome,
324                view_outcome,
325                location,
326            } => write!(
327                f,
328                "InconsistentViewOutcome:{}{}expected from query: {}{}actually from indexed view: {}",
329                location, INDENT, query_outcome, INDENT, view_outcome
330            ),
331            Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
332            Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
333            Success => f.write_str("Success"),
334        }
335    }
336}
337
338#[derive(Default, Debug)]
339pub struct Outcomes {
340    stats: [usize; NUM_OUTCOMES],
341    details: Vec<String>,
342}
343
344impl ops::AddAssign<Outcomes> for Outcomes {
345    fn add_assign(&mut self, rhs: Outcomes) {
346        for (lhs, rhs) in self.stats.iter_mut().zip_eq(rhs.stats.iter()) {
347            *lhs += rhs
348        }
349    }
350}
351impl Outcomes {
352    pub fn any_failed(&self) -> bool {
353        self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
354    }
355
356    pub fn as_json(&self) -> serde_json::Value {
357        serde_json::json!({
358            "unsupported": self.stats[0],
359            "parse_failure": self.stats[1],
360            "plan_failure": self.stats[2],
361            "unexpected_plan_success": self.stats[3],
362            "wrong_number_of_rows_affected": self.stats[4],
363            "wrong_column_count": self.stats[5],
364            "wrong_column_names": self.stats[6],
365            "output_failure": self.stats[7],
366            "inconsistent_view_outcome": self.stats[8],
367            "bail": self.stats[9],
368            "warning": self.stats[10],
369            "success": self.stats[11],
370        })
371    }
372
373    pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
374        OutcomesDisplay {
375            inner: self,
376            no_fail,
377            failure_details,
378        }
379    }
380}
381
382pub struct OutcomesDisplay<'a> {
383    inner: &'a Outcomes,
384    no_fail: bool,
385    failure_details: bool,
386}
387
388impl<'a> fmt::Display for OutcomesDisplay<'a> {
389    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
390        let total: usize = self.inner.stats.iter().sum();
391        if self.failure_details
392            && (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
393                || self.no_fail)
394        {
395            for outcome in &self.inner.details {
396                writeln!(f, "{}", outcome)?;
397            }
398            Ok(())
399        } else {
400            write!(
401                f,
402                "{}:",
403                if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
404                    "PASS"
405                } else if self.no_fail {
406                    "FAIL-IGNORE"
407                } else {
408                    "FAIL"
409                }
410            )?;
411            static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
412                vec![
413                    "unsupported",
414                    "parse-failure",
415                    "plan-failure",
416                    "unexpected-plan-success",
417                    "wrong-number-of-rows-inserted",
418                    "wrong-column-count",
419                    "wrong-column-names",
420                    "output-failure",
421                    "inconsistent-view-outcome",
422                    "bail",
423                    "warning",
424                    "success",
425                    "total",
426                ]
427            });
428            for (i, n) in self.inner.stats.iter().enumerate() {
429                if *n > 0 {
430                    write!(f, " {}={}", NAMES[i], n)?;
431                }
432            }
433            write!(f, " total={}", total)
434        }
435    }
436}
437
438struct QueryInfo {
439    is_select: bool,
440    num_attributes: Option<usize>,
441}
442
443enum PrepareQueryOutcome<'a> {
444    QueryPrepared(QueryInfo),
445    Outcome(Outcome<'a>),
446}
447
448pub struct Runner<'a> {
449    config: &'a RunConfig<'a>,
450    inner: Option<RunnerInner<'a>>,
451}
452
453pub struct RunnerInner<'a> {
454    server_addr: SocketAddr,
455    internal_server_addr: SocketAddr,
456    password_server_addr: SocketAddr,
457    internal_http_server_addr: SocketAddr,
458    // Drop order matters for these fields.
459    client: tokio_postgres::Client,
460    system_client: tokio_postgres::Client,
461    clients: BTreeMap<String, tokio_postgres::Client>,
462    auto_index_tables: bool,
463    auto_index_selects: bool,
464    auto_transactions: bool,
465    enable_table_keys: bool,
466    verbose: bool,
467    stdout: &'a dyn WriteFmt,
468    _shutdown_trigger: trigger::Trigger,
469    _server_thread: JoinOnDropHandle<()>,
470    _temp_dir: TempDir,
471}
472
473#[derive(Debug)]
474pub struct Slt(Value);
475
476impl<'a> FromSql<'a> for Slt {
477    fn from_sql(
478        ty: &PgType,
479        mut raw: &'a [u8],
480    ) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
481        Ok(match *ty {
482            PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
483                types::bytea_from_sql(raw),
484            )?)),
485            PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
486            PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
487            PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
488                types::char_from_sql(raw)?.to_be_bytes(),
489            ))),
490            PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
491            PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
492            PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
493                raw,
494            )?)?)),
495            PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
496            PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
497            PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
498            PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
499            PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
500            PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
501            PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
502            PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
503            PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
504            PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
505            PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
506            PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
507                Self(Value::Text(types::text_from_sql(raw)?.to_string()))
508            }
509            PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
510            PgType::TIMESTAMP => Self(Value::Timestamp(
511                NaiveDateTime::from_sql(ty, raw)?.try_into()?,
512            )),
513            PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
514                DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
515            )),
516            PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
517            PgType::RECORD => {
518                let num_fields = read_be_i32(&mut raw)?;
519                let mut tuple = vec![];
520                for _ in 0..num_fields {
521                    let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
522                    let typ = match PgType::from_oid(oid) {
523                        Some(typ) => typ,
524                        None => return Err("unknown oid".into()),
525                    };
526                    let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
527                    tuple.push(v.map(|v| v.0));
528                }
529                Self(Value::Record(tuple))
530            }
531            PgType::INT4_RANGE
532            | PgType::INT8_RANGE
533            | PgType::DATE_RANGE
534            | PgType::NUM_RANGE
535            | PgType::TS_RANGE
536            | PgType::TSTZ_RANGE => {
537                use mz_repr::adt::range::Range;
538                let range: Range<Slt> = Range::from_sql(ty, raw)?;
539                Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
540            }
541
542            _ => match ty.kind() {
543                PgKind::Array(arr_type) => {
544                    let arr = types::array_from_sql(raw)?;
545                    let elements: Vec<Option<Value>> = arr
546                        .values()
547                        .map(|v| match v {
548                            Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
549                            None => Ok(None),
550                        })
551                        .collect::<Vec<Option<Slt>>>()?
552                        .into_iter()
553                        // Map a Vec<Option<Slt>> to Vec<Option<Value>>.
554                        .map(|v| v.map(|v| v.0))
555                        .collect();
556
557                    Self(Value::Array {
558                        dims: arr
559                            .dimensions()
560                            .map(|d| {
561                                Ok(mz_repr::adt::array::ArrayDimension {
562                                    lower_bound: isize::cast_from(d.lower_bound),
563                                    length: usize::try_from(d.len)
564                                        .expect("cannot have negative length"),
565                                })
566                            })
567                            .collect()?,
568                        elements,
569                    })
570                }
571                _ => match ty.oid() {
572                    oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
573                    oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
574                    oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
575                    oid::TYPE_MZ_TIMESTAMP_OID => {
576                        let s = types::text_from_sql(raw)?;
577                        let t: mz_repr::Timestamp = s.parse()?;
578                        Self(Value::MzTimestamp(t))
579                    }
580                    oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
581                        types::bytea_from_sql(raw),
582                    )?)),
583                    _ => unreachable!(),
584                },
585            },
586        })
587    }
588    fn accepts(ty: &PgType) -> bool {
589        match ty.kind() {
590            PgKind::Array(_) | PgKind::Composite(_) => return true,
591            _ => {}
592        }
593        match ty.oid() {
594            oid::TYPE_UINT2_OID
595            | oid::TYPE_UINT4_OID
596            | oid::TYPE_UINT8_OID
597            | oid::TYPE_MZ_TIMESTAMP_OID
598            | oid::TYPE_MZ_ACL_ITEM_OID => return true,
599            _ => {}
600        }
601        matches!(
602            *ty,
603            PgType::ACLITEM
604                | PgType::BOOL
605                | PgType::BYTEA
606                | PgType::CHAR
607                | PgType::DATE
608                | PgType::FLOAT4
609                | PgType::FLOAT8
610                | PgType::INT2
611                | PgType::INT4
612                | PgType::INT8
613                | PgType::INTERVAL
614                | PgType::JSONB
615                | PgType::NAME
616                | PgType::NUMERIC
617                | PgType::OID
618                | PgType::REGCLASS
619                | PgType::REGPROC
620                | PgType::REGTYPE
621                | PgType::RECORD
622                | PgType::TEXT
623                | PgType::BPCHAR
624                | PgType::VARCHAR
625                | PgType::TIME
626                | PgType::TIMESTAMP
627                | PgType::TIMESTAMPTZ
628                | PgType::UUID
629                | PgType::INT4_RANGE
630                | PgType::INT4_RANGE_ARRAY
631                | PgType::INT8_RANGE
632                | PgType::INT8_RANGE_ARRAY
633                | PgType::DATE_RANGE
634                | PgType::DATE_RANGE_ARRAY
635                | PgType::NUM_RANGE
636                | PgType::NUM_RANGE_ARRAY
637                | PgType::TS_RANGE
638                | PgType::TS_RANGE_ARRAY
639                | PgType::TSTZ_RANGE
640                | PgType::TSTZ_RANGE_ARRAY
641        )
642    }
643}
644
645// From postgres-types/src/private.rs.
646fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
647    if buf.len() < 4 {
648        return Err("invalid buffer size".into());
649    }
650    let mut bytes = [0; 4];
651    bytes.copy_from_slice(&buf[..4]);
652    *buf = &buf[4..];
653    Ok(i32::from_be_bytes(bytes))
654}
655
656// From postgres-types/src/private.rs.
657fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
658where
659    T: FromSql<'a>,
660{
661    let value = match usize::try_from(read_be_i32(buf)?) {
662        Err(_) => None,
663        Ok(len) => {
664            if len > buf.len() {
665                return Err("invalid buffer size".into());
666            }
667            let (head, tail) = buf.split_at(len);
668            *buf = tail;
669            Some(head)
670        }
671    };
672    T::from_sql_nullable(type_, value)
673}
674
675fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
676    match (typ, d.0) {
677        (Type::Bool, Value::Bool(b)) => b.to_string(),
678
679        (Type::Integer, Value::Int2(i)) => i.to_string(),
680        (Type::Integer, Value::Int4(i)) => i.to_string(),
681        (Type::Integer, Value::Int8(i)) => i.to_string(),
682        (Type::Integer, Value::UInt2(u)) => u.0.to_string(),
683        (Type::Integer, Value::UInt4(u)) => u.0.to_string(),
684        (Type::Integer, Value::UInt8(u)) => u.0.to_string(),
685        (Type::Integer, Value::Oid(i)) => i.to_string(),
686        // TODO(benesch): rewrite to avoid `as`.
687        #[allow(clippy::as_conversions)]
688        (Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
689        // TODO(benesch): rewrite to avoid `as`.
690        #[allow(clippy::as_conversions)]
691        (Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
692        // This is so wrong, but sqlite needs it.
693        (Type::Integer, Value::Text(_)) => "0".to_string(),
694        (Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
695        (Type::Integer, Value::Numeric(d)) => {
696            let mut d = d.0.0.clone();
697            let mut cx = numeric::cx_datum();
698            // Truncate the decimal to match sqlite.
699            if mode == Mode::Standard {
700                cx.set_rounding(dec::Rounding::Down);
701            }
702            cx.round(&mut d);
703            numeric::munge_numeric(&mut d).unwrap();
704            d.to_standard_notation_string()
705        }
706
707        (Type::Real, Value::Int2(i)) => format!("{:.3}", i),
708        (Type::Real, Value::Int4(i)) => format!("{:.3}", i),
709        (Type::Real, Value::Int8(i)) => format!("{:.3}", i),
710        (Type::Real, Value::Float4(f)) => match mode {
711            Mode::Standard => format!("{:.3}", f),
712            Mode::Cockroach => format!("{}", f),
713        },
714        (Type::Real, Value::Float8(f)) => match mode {
715            Mode::Standard => format!("{:.3}", f),
716            Mode::Cockroach => format!("{}", f),
717        },
718        (Type::Real, Value::Numeric(d)) => match mode {
719            Mode::Standard => {
720                let mut d = d.0.0.clone();
721                if d.exponent() < -3 {
722                    numeric::rescale(&mut d, 3).unwrap();
723                }
724                numeric::munge_numeric(&mut d).unwrap();
725                d.to_standard_notation_string()
726            }
727            Mode::Cockroach => d.0.0.to_standard_notation_string(),
728        },
729
730        (Type::Text, Value::Text(s)) => {
731            if s.is_empty() {
732                "(empty)".to_string()
733            } else {
734                s
735            }
736        }
737        (Type::Text, Value::Bool(b)) => b.to_string(),
738        (Type::Text, Value::Float4(f)) => format!("{:.3}", f),
739        (Type::Text, Value::Float8(f)) => format!("{:.3}", f),
740        // Bytes are printed as text iff they are valid UTF-8. This
741        // seems guaranteed to confuse everyone, but it is required for
742        // compliance with the CockroachDB sqllogictest runner. [0]
743        //
744        // [0]: https://github.com/cockroachdb/cockroach/blob/970782487/pkg/sql/logictest/logic.go#L2038-L2043
745        (Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
746            Ok(s) => s.to_string(),
747            Err(_) => format!("{:?}", b),
748        },
749        (Type::Text, Value::Numeric(d)) => d.0.0.to_standard_notation_string(),
750        // Everything else gets normal text encoding. This correctly handles things
751        // like arrays, tuples, and strings that need to be quoted.
752        (Type::Text, d) => {
753            let mut buf = BytesMut::new();
754            d.encode_text(&mut buf);
755            String::from_utf8_lossy(&buf).into_owned()
756        }
757
758        (Type::Oid, Value::Oid(o)) => o.to_string(),
759
760        (_, d) => panic!(
761            "Don't know how to format {:?} as {:?} in column {}",
762            d, typ, col,
763        ),
764    }
765}
766
767fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
768    let mut formatted: Vec<String> = vec![];
769    for i in 0..row.len() {
770        let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
771        let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
772        formatted.push(match t {
773            Some(t) => t,
774            None => "NULL".into(),
775        });
776    }
777
778    formatted
779}
780
781impl<'a> Runner<'a> {
782    pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
783        let mut runner = Self {
784            config,
785            inner: None,
786        };
787        runner.reset().await?;
788        Ok(runner)
789    }
790
791    pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
792        // Explicitly drop the old runner here to ensure that we wait for threads to terminate
793        // before starting a new runner
794        drop(self.inner.take());
795        self.inner = Some(RunnerInner::start(self.config).await?);
796
797        Ok(())
798    }
799
800    async fn run_record<'r>(
801        &mut self,
802        record: &'r Record<'r>,
803        in_transaction: &mut bool,
804    ) -> Result<Outcome<'r>, anyhow::Error> {
805        if let Record::ResetServer = record {
806            self.reset().await?;
807            Ok(Outcome::Success)
808        } else {
809            self.inner
810                .as_mut()
811                .expect("RunnerInner missing")
812                .run_record(record, in_transaction)
813                .await
814        }
815    }
816
817    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
818        self.inner
819            .as_ref()
820            .expect("RunnerInner missing")
821            .check_catalog()
822            .await
823    }
824
825    async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
826        let inner = self.inner.as_mut().expect("RunnerInner missing");
827
828        inner.client.batch_execute("ROLLBACK;").await?;
829
830        inner
831            .system_client
832            .batch_execute(
833                "ROLLBACK;
834                 SET cluster = mz_catalog_server;
835                 RESET cluster_replica;",
836            )
837            .await?;
838
839        inner
840            .system_client
841            .batch_execute("ALTER SYSTEM RESET ALL")
842            .await?;
843
844        // Drop all databases, then recreate the `materialize` database.
845        for row in inner
846            .system_client
847            .query("SELECT name FROM mz_databases", &[])
848            .await?
849        {
850            let name: &str = row.get("name");
851            inner
852                .system_client
853                .batch_execute(&format!("DROP DATABASE \"{name}\""))
854                .await?;
855        }
856        inner
857            .system_client
858            .batch_execute("CREATE DATABASE materialize")
859            .await?;
860
861        // Ensure quickstart cluster exists with one replica of size `self.config.replica_size`.
862        // We don't destroy the existing quickstart cluster replica if it exists, as turning
863        // on a cluster replica is exceptionally slow.
864        let mut needs_default_cluster = true;
865        for row in inner
866            .system_client
867            .query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
868            .await?
869        {
870            match row.get("name") {
871                "quickstart" => needs_default_cluster = false,
872                name => {
873                    inner
874                        .system_client
875                        .batch_execute(&format!("DROP CLUSTER {name}"))
876                        .await?
877                }
878            }
879        }
880        if needs_default_cluster {
881            inner
882                .system_client
883                .batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
884                .await?;
885        }
886        let mut needs_default_replica = false;
887        let rows = inner
888            .system_client
889            .query(
890                "SELECT name, size FROM mz_cluster_replicas
891                 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')
892                 ORDER BY name",
893                &[],
894            )
895            .await?;
896        if rows.len() != self.config.replicas {
897            needs_default_replica = true;
898        } else {
899            for (i, row) in rows.iter().enumerate() {
900                let name: &str = row.get("name");
901                let size: &str = row.get("size");
902                if name != format!("r{i}") || size != self.config.replica_size {
903                    needs_default_replica = true;
904                    break;
905                }
906            }
907        }
908
909        if needs_default_replica {
910            inner
911                .system_client
912                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = false)")
913                .await?;
914            for row in inner
915                .system_client
916                .query(
917                    "SELECT name FROM mz_cluster_replicas
918                     WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
919                    &[],
920                )
921                .await?
922            {
923                let name: &str = row.get("name");
924                inner
925                    .system_client
926                    .batch_execute(&format!("DROP CLUSTER REPLICA quickstart.{}", name))
927                    .await?;
928            }
929            for i in 1..=self.config.replicas {
930                inner
931                    .system_client
932                    .batch_execute(&format!(
933                        "CREATE CLUSTER REPLICA quickstart.r{i} SIZE '{}'",
934                        self.config.replica_size
935                    ))
936                    .await?;
937            }
938            inner
939                .system_client
940                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = true)")
941                .await?;
942        }
943
944        // Grant initial privileges.
945        inner
946            .system_client
947            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
948            .await?;
949        inner
950            .system_client
951            .batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
952            .await?;
953        inner
954            .system_client
955            .batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
956            .await?;
957        inner
958            .system_client
959            .batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
960            .await?;
961        inner
962            .system_client
963            .batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
964            .await?;
965
966        // Some sqllogictests require more than the default amount of tables, so we increase the
967        // limit for all tests.
968        inner
969            .system_client
970            .simple_query("ALTER SYSTEM SET max_tables = 100")
971            .await?;
972
973        if inner.enable_table_keys {
974            inner
975                .system_client
976                .simple_query("ALTER SYSTEM SET unsafe_enable_table_keys = true")
977                .await?;
978        }
979
980        inner.ensure_fixed_features().await?;
981
982        inner.client = connect(inner.server_addr, None, None).await.unwrap();
983        inner.system_client = connect(inner.internal_server_addr, Some("mz_system"), None)
984            .await
985            .unwrap();
986        inner.clients = BTreeMap::new();
987
988        Ok(())
989    }
990}
991
992impl<'a> RunnerInner<'a> {
993    pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
994        let temp_dir = tempfile::tempdir()?;
995        let scratch_dir = tempfile::tempdir()?;
996        let environment_id = EnvironmentId::for_tests();
997        let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
998            let postgres_url = &config.postgres_url;
999            let prefix = &config.prefix;
1000            info!(%postgres_url, "starting server");
1001            let (client, conn) = Retry::default()
1002                .max_tries(5)
1003                .retry_async(|_| async {
1004                    match tokio_postgres::connect(postgres_url, NoTls).await {
1005                        Ok(c) => Ok(c),
1006                        Err(e) => {
1007                            error!(%e, "failed to connect to postgres");
1008                            Err(e)
1009                        }
1010                    }
1011                })
1012                .await?;
1013            task::spawn(|| "sqllogictest_connect", async move {
1014                if let Err(e) = conn.await {
1015                    panic!("connection error: {}", e);
1016                }
1017            });
1018            client
1019                .batch_execute(&format!(
1020                    "DROP SCHEMA IF EXISTS {prefix}_tsoracle CASCADE;
1021                     CREATE SCHEMA IF NOT EXISTS {prefix}_consensus;
1022                     CREATE SCHEMA {prefix}_tsoracle;"
1023                ))
1024                .await?;
1025            (
1026                format!("{postgres_url}?options=--search_path={prefix}_consensus")
1027                    .parse()
1028                    .expect("invalid consensus URI"),
1029                format!("{postgres_url}?options=--search_path={prefix}_tsoracle")
1030                    .parse()
1031                    .expect("invalid timestamp oracle URI"),
1032            )
1033        };
1034
1035        let secrets_dir = temp_dir.path().join("secrets");
1036        let orchestrator = Arc::new(
1037            ProcessOrchestrator::new(ProcessOrchestratorConfig {
1038                image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
1039                suppress_output: false,
1040                environment_id: environment_id.to_string(),
1041                secrets_dir: secrets_dir.clone(),
1042                command_wrapper: config
1043                    .orchestrator_process_wrapper
1044                    .as_ref()
1045                    .map_or(Ok(vec![]), |s| shell_words::split(s))?,
1046                propagate_crashes: true,
1047                tcp_proxy: None,
1048                scratch_directory: scratch_dir.path().to_path_buf(),
1049            })
1050            .await?,
1051        );
1052        let now = SYSTEM_TIME.clone();
1053        let metrics_registry = MetricsRegistry::new();
1054
1055        let persist_config = PersistConfig::new(
1056            &mz_environmentd::BUILD_INFO,
1057            now.clone(),
1058            mz_dyncfgs::all_dyncfgs(),
1059        );
1060        let persist_pubsub_server =
1061            PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
1062        let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
1063        let persist_pubsub_tcp_listener =
1064            TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
1065                .await
1066                .expect("pubsub addr binding");
1067        let persist_pubsub_server_port = persist_pubsub_tcp_listener
1068            .local_addr()
1069            .expect("pubsub addr has local addr")
1070            .port();
1071        info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
1072        mz_ore::task::spawn(|| "persist_pubsub_server", async move {
1073            persist_pubsub_server
1074                .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
1075                .await
1076                .expect("success")
1077        });
1078        let persist_clients =
1079            PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1080                let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1081                    cfg,
1082                    persist_pubsub_client.sender,
1083                    metrics,
1084                ));
1085                PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1086            });
1087        let persist_clients = Arc::new(persist_clients);
1088
1089        let secrets_controller = Arc::clone(&orchestrator);
1090        let connection_context = ConnectionContext::for_tests(orchestrator.reader());
1091        let orchestrator = Arc::new(TracingOrchestrator::new(
1092            orchestrator,
1093            config.tracing.clone(),
1094        ));
1095        let listeners_config = ListenersConfig {
1096            sql: btreemap! {
1097                "external".to_owned() => SqlListenerConfig {
1098                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1099                    authenticator_kind: AuthenticatorKind::None,
1100                    allowed_roles: AllowedRoles::Normal,
1101                    enable_tls: false,
1102                },
1103                "internal".to_owned() => SqlListenerConfig {
1104                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1105                    authenticator_kind: AuthenticatorKind::None,
1106                    allowed_roles: AllowedRoles::Internal,
1107                    enable_tls: false,
1108                },
1109                "password".to_owned() => SqlListenerConfig {
1110                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1111                    authenticator_kind: AuthenticatorKind::Password,
1112                    allowed_roles: AllowedRoles::Normal,
1113                    enable_tls: false,
1114                },
1115            },
1116            http: btreemap![
1117                "external".to_owned() => HttpListenerConfig {
1118                    base: BaseListenerConfig {
1119                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1120                        authenticator_kind: AuthenticatorKind::None,
1121                        allowed_roles: AllowedRoles::Normal,
1122                        enable_tls: false
1123                    },
1124                    routes: HttpRoutesEnabled {
1125                        base: true,
1126                        webhook: true,
1127                        internal: false,
1128                        metrics: false,
1129                        profiling: false,
1130                        mcp_agent: false,
1131                        mcp_developer: false,
1132                        console_config: true,
1133                    },
1134                },
1135                "internal".to_owned() => HttpListenerConfig {
1136                    base: BaseListenerConfig {
1137                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1138                        authenticator_kind: AuthenticatorKind::None,
1139                        allowed_roles: AllowedRoles::NormalAndInternal,
1140                        enable_tls: false
1141                    },
1142                    routes: HttpRoutesEnabled {
1143                        base: true,
1144                        webhook: true,
1145                        internal: true,
1146                        metrics: true,
1147                        profiling: true,
1148                        mcp_agent: false,
1149                        mcp_developer: false,
1150                        console_config: false,
1151                    },
1152                },
1153            ],
1154        };
1155        let listeners = mz_environmentd::Listeners::bind(listeners_config).await?;
1156        let host_name = format!(
1157            "localhost:{}",
1158            listeners.http["external"].handle.local_addr.port()
1159        );
1160        let catalog_config = CatalogConfig {
1161            persist_clients: Arc::clone(&persist_clients),
1162            metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
1163        };
1164        let server_config = mz_environmentd::Config {
1165            catalog_config,
1166            timestamp_oracle_url: Some(timestamp_oracle_url),
1167            controller: ControllerConfig {
1168                build_info: &mz_environmentd::BUILD_INFO,
1169                orchestrator,
1170                clusterd_image: "clusterd".into(),
1171                init_container_image: None,
1172                deploy_generation: 0,
1173                persist_location: PersistLocation {
1174                    blob_uri: format!(
1175                        "file://{}/persist/blob",
1176                        config.persist_dir.path().display()
1177                    )
1178                    .parse()
1179                    .expect("invalid blob URI"),
1180                    consensus_uri,
1181                },
1182                persist_clients,
1183                now: SYSTEM_TIME.clone(),
1184                metrics_registry: metrics_registry.clone(),
1185                persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
1186                secrets_args: mz_service::secrets::SecretsReaderCliArgs {
1187                    secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
1188                    secrets_reader_local_file_dir: Some(secrets_dir),
1189                    secrets_reader_kubernetes_context: None,
1190                    secrets_reader_aws_prefix: None,
1191                    secrets_reader_name_prefix: None,
1192                },
1193                connection_context,
1194                replica_http_locator: Arc::new(ReplicaHttpLocator::default()),
1195            },
1196            secrets_controller,
1197            cloud_resource_controller: None,
1198            tls: None,
1199            frontegg: None,
1200            cors_allowed_origin: AllowOrigin::list([]),
1201            cors_allowed_origin_list: Vec::new(),
1202            unsafe_mode: true,
1203            all_features: false,
1204            metrics_registry,
1205            now,
1206            environment_id,
1207            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
1208            bootstrap_default_cluster_replica_size: config.replica_size.clone(),
1209            bootstrap_default_cluster_replication_factor: config
1210                .replicas
1211                .try_into()
1212                .expect("replicas must fit"),
1213            bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1214                replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1215                size: config.replica_size.clone(),
1216            },
1217            bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1218                replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1219                size: config.replica_size.clone(),
1220            },
1221            bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1222                replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1223                size: config.replica_size.clone(),
1224            },
1225            bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1226                replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1227                size: config.replica_size.clone(),
1228            },
1229            bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1230                replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1231                size: config.replica_size.clone(),
1232            },
1233            system_parameter_defaults: {
1234                let mut params = BTreeMap::new();
1235                params.insert(
1236                    "log_filter".to_string(),
1237                    config.tracing.startup_log_filter.to_string(),
1238                );
1239                params.extend(config.system_parameter_defaults.clone());
1240                params
1241            },
1242            availability_zones: Default::default(),
1243            tracing_handle: config.tracing_handle.clone(),
1244            storage_usage_collection_interval: Duration::from_secs(3600),
1245            storage_usage_retention_period: None,
1246            segment_api_key: None,
1247            segment_client_side: false,
1248            // SLT doesn't like eternally running tasks since it waits for them to finish inbetween SLT files
1249            test_only_dummy_segment_client: false,
1250            egress_addresses: vec![],
1251            aws_account_id: None,
1252            aws_privatelink_availability_zones: None,
1253            launchdarkly_sdk_key: None,
1254            launchdarkly_key_map: Default::default(),
1255            config_sync_file_path: None,
1256            config_sync_timeout: Duration::from_secs(30),
1257            config_sync_loop_interval: None,
1258            bootstrap_role: Some("materialize".into()),
1259            http_host_name: Some(host_name),
1260            internal_console_redirect_url: None,
1261            tls_reload_certs: mz_server_core::cert_reload_never_reload(),
1262            helm_chart_version: None,
1263            license_key: ValidatedLicenseKey::for_tests(),
1264            external_login_password_mz_system: None,
1265            force_builtin_schema_migration: None,
1266        };
1267        // We need to run the server on its own Tokio runtime, which in turn
1268        // requires its own thread, so that we can wait for any tasks spawned
1269        // by the server to be shutdown at the end of each file. If we were to
1270        // share a Tokio runtime, tasks from the last file's server would still
1271        // be live at the start of the next file's server.
1272        let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
1273            oneshot::channel();
1274        let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
1275        let (password_server_addr_tx, password_server_addr_rx) = oneshot::channel();
1276        let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
1277        let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
1278        let server_thread = thread::spawn(|| {
1279            let runtime = match Runtime::new() {
1280                Ok(runtime) => runtime,
1281                Err(e) => {
1282                    server_addr_tx
1283                        .send(Err(e.into()))
1284                        .expect("receiver should not drop first");
1285                    return;
1286                }
1287            };
1288            let server = match runtime.block_on(listeners.serve(server_config)) {
1289                Ok(runtime) => runtime,
1290                Err(e) => {
1291                    server_addr_tx
1292                        .send(Err(e.into()))
1293                        .expect("receiver should not drop first");
1294                    return;
1295                }
1296            };
1297            server_addr_tx
1298                .send(Ok(server.sql_listener_handles["external"].local_addr))
1299                .expect("receiver should not drop first");
1300            internal_server_addr_tx
1301                .send(server.sql_listener_handles["internal"].local_addr)
1302                .expect("receiver should not drop first");
1303            password_server_addr_tx
1304                .send(server.sql_listener_handles["password"].local_addr)
1305                .expect("receiver should not drop first");
1306            internal_http_server_addr_tx
1307                .send(server.http_listener_handles["internal"].local_addr)
1308                .expect("receiver should not drop first");
1309            runtime.block_on(shutdown_trigger_rx);
1310        });
1311        let server_addr = server_addr_rx.await??;
1312        let internal_server_addr = internal_server_addr_rx.await?;
1313        let password_server_addr = password_server_addr_rx.await?;
1314        let internal_http_server_addr = internal_http_server_addr_rx.await?;
1315
1316        let system_client = connect(internal_server_addr, Some("mz_system"), None)
1317            .await
1318            .unwrap();
1319        let client = connect(server_addr, None, None).await.unwrap();
1320
1321        let inner = RunnerInner {
1322            server_addr,
1323            internal_server_addr,
1324            password_server_addr,
1325            internal_http_server_addr,
1326            _shutdown_trigger: shutdown_trigger,
1327            _server_thread: server_thread.join_on_drop(),
1328            _temp_dir: temp_dir,
1329            client,
1330            system_client,
1331            clients: BTreeMap::new(),
1332            auto_index_tables: config.auto_index_tables,
1333            auto_index_selects: config.auto_index_selects,
1334            auto_transactions: config.auto_transactions,
1335            enable_table_keys: config.enable_table_keys,
1336            verbose: config.verbose,
1337            stdout: config.stdout,
1338        };
1339        inner.ensure_fixed_features().await?;
1340
1341        Ok(inner)
1342    }
1343
1344    /// Set features that should be enabled regardless of whether reset-server was
1345    /// called. These features may be set conditionally depending on the run configuration.
1346    async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
1347        // We turn on enable_reduce_mfp_fusion, as we wish
1348        // to get as much coverage of these features as we can.
1349        // TODO(vmarcos): Remove this code when we retire this feature flag.
1350        self.system_client
1351            .execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
1352            .await?;
1353
1354        // Dangerous functions are useful for tests so we enable it for all tests.
1355        self.system_client
1356            .execute("ALTER SYSTEM SET unsafe_enable_unsafe_functions = on", &[])
1357            .await?;
1358        Ok(())
1359    }
1360
1361    async fn run_record<'r>(
1362        &mut self,
1363        record: &'r Record<'r>,
1364        in_transaction: &mut bool,
1365    ) -> Result<Outcome<'r>, anyhow::Error> {
1366        match &record {
1367            Record::Statement {
1368                expected_error,
1369                rows_affected,
1370                sql,
1371                location,
1372            } => {
1373                if self.auto_transactions && *in_transaction {
1374                    self.client.execute("COMMIT", &[]).await?;
1375                    *in_transaction = false;
1376                }
1377                match self
1378                    .run_statement(*expected_error, *rows_affected, sql, location.clone())
1379                    .await?
1380                {
1381                    Outcome::Success => {
1382                        if self.auto_index_tables {
1383                            let additional = mutate(sql);
1384                            for stmt in additional {
1385                                self.client.execute(&stmt, &[]).await?;
1386                            }
1387                        }
1388                        Ok(Outcome::Success)
1389                    }
1390                    other => {
1391                        if expected_error.is_some() {
1392                            Ok(other)
1393                        } else {
1394                            // If we failed to execute a statement that was supposed to succeed,
1395                            // running the rest of the tests in this file will probably cause
1396                            // false positives, so just give up on the file entirely.
1397                            Ok(Outcome::Bail {
1398                                cause: Box::new(other),
1399                                location: location.clone(),
1400                            })
1401                        }
1402                    }
1403                }
1404            }
1405            Record::Query {
1406                sql,
1407                output,
1408                location,
1409            } => {
1410                self.run_query(sql, output, location.clone(), in_transaction)
1411                    .await
1412            }
1413            Record::Simple {
1414                conn,
1415                user,
1416                password,
1417                sql,
1418                sort,
1419                output,
1420                location,
1421                ..
1422            } => {
1423                self.run_simple(
1424                    *conn,
1425                    *user,
1426                    *password,
1427                    sql,
1428                    sort.clone(),
1429                    output,
1430                    location.clone(),
1431                )
1432                .await
1433            }
1434            Record::Copy {
1435                table_name,
1436                tsv_path,
1437            } => {
1438                let tsv = tokio::fs::read(tsv_path).await?;
1439                let copy = self
1440                    .client
1441                    .copy_in(&*format!("COPY {} FROM STDIN", table_name))
1442                    .await?;
1443                tokio::pin!(copy);
1444                copy.send(bytes::Bytes::from(tsv)).await?;
1445                copy.finish().await?;
1446                Ok(Outcome::Success)
1447            }
1448            _ => Ok(Outcome::Success),
1449        }
1450    }
1451
1452    async fn run_statement<'r>(
1453        &self,
1454        expected_error: Option<&'r str>,
1455        expected_rows_affected: Option<u64>,
1456        sql: &'r str,
1457        location: Location,
1458    ) -> Result<Outcome<'r>, anyhow::Error> {
1459        static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
1460            LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
1461        if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
1462            // sure, we totally made you an index
1463            return Ok(Outcome::Success);
1464        }
1465
1466        match self.client.execute(sql, &[]).await {
1467            Ok(actual) => {
1468                if let Some(expected_error) = expected_error {
1469                    return Ok(Outcome::UnexpectedPlanSuccess {
1470                        expected_error,
1471                        location,
1472                    });
1473                }
1474                match expected_rows_affected {
1475                    None => Ok(Outcome::Success),
1476                    Some(expected) => {
1477                        if expected != actual {
1478                            Ok(Outcome::WrongNumberOfRowsInserted {
1479                                expected_count: expected,
1480                                actual_count: actual,
1481                                location,
1482                            })
1483                        } else {
1484                            Ok(Outcome::Success)
1485                        }
1486                    }
1487                }
1488            }
1489            Err(error) => {
1490                if let Some(expected_error) = expected_error {
1491                    if Regex::new(expected_error)?.is_match(&error.to_string_with_causes()) {
1492                        return Ok(Outcome::Success);
1493                    }
1494                    return Ok(Outcome::PlanFailure {
1495                        error: anyhow!(error),
1496                        expected_error: Some(expected_error.to_string()),
1497                        location,
1498                    });
1499                }
1500                Ok(Outcome::PlanFailure {
1501                    error: anyhow!(error),
1502                    expected_error: None,
1503                    location,
1504                })
1505            }
1506        }
1507    }
1508
1509    async fn prepare_query<'r>(
1510        &self,
1511        sql: &str,
1512        output: &'r Result<QueryOutput<'_>, &'r str>,
1513        location: Location,
1514        in_transaction: &mut bool,
1515    ) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
1516        // get statement
1517        let statements = match mz_sql::parse::parse(sql) {
1518            Ok(statements) => statements,
1519            Err(e) => match output {
1520                Ok(_) => {
1521                    return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1522                        error: e.into(),
1523                        location,
1524                    }));
1525                }
1526                Err(expected_error) => {
1527                    if Regex::new(expected_error)?.is_match(&e.to_string_with_causes()) {
1528                        return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
1529                    } else {
1530                        return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1531                            error: e.into(),
1532                            location,
1533                        }));
1534                    }
1535                }
1536            },
1537        };
1538        let statement = match &*statements {
1539            [] => bail!("Got zero statements?"),
1540            [statement] => &statement.ast,
1541            _ => bail!("Got multiple statements: {:?}", statements),
1542        };
1543        let (is_select, num_attributes) = match statement {
1544            Statement::Select(stmt) => (true, derive_num_attributes(&stmt.query.body)),
1545            _ => (false, None),
1546        };
1547
1548        match output {
1549            Ok(_) => {
1550                if self.auto_transactions && !*in_transaction {
1551                    // No ISOLATION LEVEL SERIALIZABLE because of database-issues#5323
1552                    self.client.execute("BEGIN", &[]).await?;
1553                    *in_transaction = true;
1554                }
1555            }
1556            Err(_) => {
1557                if self.auto_transactions && *in_transaction {
1558                    self.client.execute("COMMIT", &[]).await?;
1559                    *in_transaction = false;
1560                }
1561            }
1562        }
1563
1564        // `SHOW` commands reference catalog schema, thus are not in the same timedomain and not
1565        // allowed in the same transaction, see:
1566        // https://materialize.com/docs/sql/begin/#same-timedomain-error
1567        match statement {
1568            Statement::Show(..) => {
1569                if self.auto_transactions && *in_transaction {
1570                    self.client.execute("COMMIT", &[]).await?;
1571                    *in_transaction = false;
1572                }
1573            }
1574            _ => (),
1575        }
1576        Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
1577            is_select,
1578            num_attributes,
1579        }))
1580    }
1581
1582    async fn execute_query<'r>(
1583        &self,
1584        sql: &str,
1585        output: &'r Result<QueryOutput<'_>, &'r str>,
1586        location: Location,
1587    ) -> Result<Outcome<'r>, anyhow::Error> {
1588        let rows = match self.client.query(sql, &[]).await {
1589            Ok(rows) => rows,
1590            Err(error) => {
1591                let error_string = error.to_string_with_causes();
1592                return match output {
1593                    Ok(_) => {
1594                        if error_string.contains("supported") || error_string.contains("overload") {
1595                            // this is a failure, but it's caused by lack of support rather than by bugs
1596                            Ok(Outcome::Unsupported {
1597                                error: anyhow!(error),
1598                                location,
1599                            })
1600                        } else {
1601                            Ok(Outcome::PlanFailure {
1602                                error: anyhow!(error),
1603                                expected_error: None,
1604                                location,
1605                            })
1606                        }
1607                    }
1608                    Err(expected_error) => {
1609                        if Regex::new(expected_error)?.is_match(&error_string) {
1610                            Ok(Outcome::Success)
1611                        } else {
1612                            Ok(Outcome::PlanFailure {
1613                                error: anyhow!(error),
1614                                expected_error: Some(expected_error.to_string()),
1615                                location,
1616                            })
1617                        }
1618                    }
1619                };
1620            }
1621        };
1622
1623        // unpack expected output
1624        let QueryOutput {
1625            sort,
1626            types: expected_types,
1627            column_names: expected_column_names,
1628            output: expected_output,
1629            mode,
1630            ..
1631        } = match output {
1632            Err(expected_error) => {
1633                return Ok(Outcome::UnexpectedPlanSuccess {
1634                    expected_error,
1635                    location,
1636                });
1637            }
1638            Ok(query_output) => query_output,
1639        };
1640
1641        // format output
1642        let mut formatted_rows = vec![];
1643        for row in &rows {
1644            if row.len() != expected_types.len() {
1645                return Ok(Outcome::WrongColumnCount {
1646                    expected_count: expected_types.len(),
1647                    actual_count: row.len(),
1648                    location,
1649                });
1650            }
1651            let row = format_row(row, expected_types, *mode);
1652            formatted_rows.push(row);
1653        }
1654
1655        // sort formatted output
1656        if let Sort::Row = sort {
1657            formatted_rows.sort();
1658        }
1659        let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
1660        if let Sort::Value = sort {
1661            values.sort();
1662        }
1663
1664        // Various checks as long as there are returned rows.
1665        if let Some(row) = rows.get(0) {
1666            // check column names
1667            if let Some(expected_column_names) = expected_column_names {
1668                let actual_column_names = row
1669                    .columns()
1670                    .iter()
1671                    .map(|t| ColumnName::from(t.name()))
1672                    .collect::<Vec<_>>();
1673                if expected_column_names != &actual_column_names {
1674                    return Ok(Outcome::WrongColumnNames {
1675                        expected_column_names,
1676                        actual_column_names,
1677                        actual_output: Output::Values(values),
1678                        location,
1679                    });
1680                }
1681            }
1682        }
1683
1684        // check output
1685        match expected_output {
1686            Output::Values(expected_values) => {
1687                if values != *expected_values {
1688                    return Ok(Outcome::OutputFailure {
1689                        expected_output,
1690                        actual_raw_output: rows,
1691                        actual_output: Output::Values(values),
1692                        location,
1693                    });
1694                }
1695            }
1696            Output::Hashed {
1697                num_values,
1698                md5: expected_md5,
1699            } => {
1700                let mut hasher = Md5::new();
1701                for value in &values {
1702                    hasher.update(value);
1703                    hasher.update("\n");
1704                }
1705                let md5 = format!("{:x}", hasher.finalize());
1706                if values.len() != *num_values || md5 != *expected_md5 {
1707                    return Ok(Outcome::OutputFailure {
1708                        expected_output,
1709                        actual_raw_output: rows,
1710                        actual_output: Output::Hashed {
1711                            num_values: values.len(),
1712                            md5,
1713                        },
1714                        location,
1715                    });
1716                }
1717            }
1718        }
1719
1720        Ok(Outcome::Success)
1721    }
1722
1723    async fn execute_view_inner<'r>(
1724        &self,
1725        sql: &str,
1726        output: &'r Result<QueryOutput<'_>, &'r str>,
1727        location: Location,
1728    ) -> Result<Option<Outcome<'r>>, anyhow::Error> {
1729        print_sql_if(self.stdout, sql, self.verbose);
1730        let sql_result = self.client.execute(sql, &[]).await;
1731
1732        // Evaluate if we already reached an outcome or not.
1733        let tentative_outcome = if let Err(view_error) = sql_result {
1734            if let Err(expected_error) = output {
1735                if Regex::new(expected_error)?.is_match(&view_error.to_string_with_causes()) {
1736                    Some(Outcome::Success)
1737                } else {
1738                    Some(Outcome::PlanFailure {
1739                        error: view_error.into(),
1740                        expected_error: Some(expected_error.to_string()),
1741                        location: location.clone(),
1742                    })
1743                }
1744            } else {
1745                Some(Outcome::PlanFailure {
1746                    error: view_error.into(),
1747                    expected_error: None,
1748                    location: location.clone(),
1749                })
1750            }
1751        } else {
1752            None
1753        };
1754        Ok(tentative_outcome)
1755    }
1756
1757    async fn execute_view<'r>(
1758        &self,
1759        sql: &str,
1760        num_attributes: Option<usize>,
1761        output: &'r Result<QueryOutput<'_>, &'r str>,
1762        location: Location,
1763    ) -> Result<Outcome<'r>, anyhow::Error> {
1764        // Create indexed view SQL commands and execute `CREATE VIEW`.
1765        let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
1766            column_names.clone()
1767        } else {
1768            None
1769        };
1770        let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
1771            sql,
1772            Uuid::new_v4().as_simple(),
1773            num_attributes,
1774            expected_column_names,
1775        );
1776        let tentative_outcome = self
1777            .execute_view_inner(create_view.as_str(), output, location.clone())
1778            .await?;
1779
1780        // Either we already have an outcome or alternatively,
1781        // we proceed to index and query the view.
1782        if let Some(view_outcome) = tentative_outcome {
1783            return Ok(view_outcome);
1784        }
1785
1786        let tentative_outcome = self
1787            .execute_view_inner(create_index.as_str(), output, location.clone())
1788            .await?;
1789
1790        let view_outcome;
1791        if let Some(outcome) = tentative_outcome {
1792            view_outcome = outcome;
1793        } else {
1794            print_sql_if(self.stdout, view_sql.as_str(), self.verbose);
1795            view_outcome = self
1796                .execute_query(view_sql.as_str(), output, location.clone())
1797                .await?;
1798        }
1799
1800        // Remember to clean up after ourselves by dropping the view.
1801        print_sql_if(self.stdout, drop_view.as_str(), self.verbose);
1802        self.client.execute(drop_view.as_str(), &[]).await?;
1803
1804        Ok(view_outcome)
1805    }
1806
1807    async fn run_query<'r>(
1808        &self,
1809        sql: &'r str,
1810        output: &'r Result<QueryOutput<'_>, &'r str>,
1811        location: Location,
1812        in_transaction: &mut bool,
1813    ) -> Result<Outcome<'r>, anyhow::Error> {
1814        let prepare_outcome = self
1815            .prepare_query(sql, output, location.clone(), in_transaction)
1816            .await?;
1817        match prepare_outcome {
1818            PrepareQueryOutcome::QueryPrepared(QueryInfo {
1819                is_select,
1820                num_attributes,
1821            }) => {
1822                let query_outcome = self.execute_query(sql, output, location.clone()).await?;
1823                if is_select && self.auto_index_selects && query_outcome.success() {
1824                    let view_outcome = self
1825                        .execute_view(sql, None, output, location.clone())
1826                        .await?;
1827
1828                    if !view_outcome.success() {
1829                        // Before producing a failure outcome, we try to obtain a new
1830                        // outcome for view-based execution exploiting analysis of the
1831                        // number of attributes. This two-level strategy can avoid errors
1832                        // produced by column ambiguity in the `SELECT`.
1833                        let view_outcome = if num_attributes.is_some() {
1834                            self.execute_view(sql, num_attributes, output, location.clone())
1835                                .await?
1836                        } else {
1837                            view_outcome
1838                        };
1839
1840                        if !view_outcome.success() {
1841                            let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
1842                                query_outcome: Box::new(query_outcome),
1843                                view_outcome: Box::new(view_outcome),
1844                                location: location.clone(),
1845                            };
1846                            // Determine if this inconsistent view outcome should be reported
1847                            // as an error or only as a warning.
1848                            let outcome = if should_warn(&inconsistent_view_outcome) {
1849                                Outcome::Warning {
1850                                    cause: Box::new(inconsistent_view_outcome),
1851                                    location: location.clone(),
1852                                }
1853                            } else {
1854                                inconsistent_view_outcome
1855                            };
1856                            return Ok(outcome);
1857                        }
1858                    }
1859                }
1860                Ok(query_outcome)
1861            }
1862            PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
1863        }
1864    }
1865
1866    async fn get_conn(
1867        &mut self,
1868        name: Option<&str>,
1869        user: Option<&str>,
1870        password: Option<&str>,
1871    ) -> Result<&tokio_postgres::Client, tokio_postgres::Error> {
1872        match name {
1873            None => Ok(&self.client),
1874            Some(name) => {
1875                if !self.clients.contains_key(name) {
1876                    let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
1877                        self.internal_server_addr
1878                    } else if password.is_some() {
1879                        // Use password server for password authentication
1880                        self.password_server_addr
1881                    } else {
1882                        self.server_addr
1883                    };
1884                    let client = connect(addr, user, password).await?;
1885                    self.clients.insert(name.into(), client);
1886                }
1887                Ok(self.clients.get(name).unwrap())
1888            }
1889        }
1890    }
1891
1892    async fn run_simple<'r>(
1893        &mut self,
1894        conn: Option<&'r str>,
1895        user: Option<&'r str>,
1896        password: Option<&'r str>,
1897        sql: &'r str,
1898        sort: Sort,
1899        output: &'r Output,
1900        location: Location,
1901    ) -> Result<Outcome<'r>, anyhow::Error> {
1902        let actual = match self.get_conn(conn, user, password).await {
1903            Ok(client) => match client.simple_query(sql).await {
1904                Ok(result) => {
1905                    let mut rows = Vec::new();
1906
1907                    for m in result.into_iter() {
1908                        match m {
1909                            SimpleQueryMessage::Row(row) => {
1910                                let mut s = vec![];
1911                                for i in 0..row.len() {
1912                                    s.push(row.get(i).unwrap_or("NULL"));
1913                                }
1914                                rows.push(s.join(","));
1915                            }
1916                            SimpleQueryMessage::CommandComplete(count) => {
1917                                // This applies any sort on the COMPLETE line as
1918                                // well, but we do the same for the expected output.
1919                                rows.push(format!("COMPLETE {}", count));
1920                            }
1921                            SimpleQueryMessage::RowDescription(_) => {}
1922                            _ => panic!("unexpected"),
1923                        }
1924                    }
1925
1926                    if let Sort::Row = sort {
1927                        rows.sort();
1928                    }
1929
1930                    Output::Values(rows)
1931                }
1932                // Errors can contain multiple lines (say if there are details), and rewrite
1933                // sticks them each on their own line, so we need to split up the lines here to
1934                // each be its own String in the Vec.
1935                Err(error) => Output::Values(
1936                    error
1937                        .to_string_with_causes()
1938                        .lines()
1939                        .map(|s| s.to_string())
1940                        .collect(),
1941                ),
1942            },
1943            Err(error) => Output::Values(
1944                error
1945                    .to_string_with_causes()
1946                    .lines()
1947                    .map(|s| s.to_string())
1948                    .collect(),
1949            ),
1950        };
1951        if *output != actual {
1952            Ok(Outcome::OutputFailure {
1953                expected_output: output,
1954                actual_raw_output: vec![],
1955                actual_output: actual,
1956                location,
1957            })
1958        } else {
1959            Ok(Outcome::Success)
1960        }
1961    }
1962
1963    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
1964        let url = format!(
1965            "http://{}/api/catalog/check",
1966            self.internal_http_server_addr
1967        );
1968        let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
1969
1970        if let Some(inconsistencies) = response.get("err") {
1971            let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
1972                .expect("serializing Value cannot fail");
1973            Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
1974        } else {
1975            Ok(())
1976        }
1977    }
1978}
1979
1980async fn connect(
1981    addr: SocketAddr,
1982    user: Option<&str>,
1983    password: Option<&str>,
1984) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
1985    let mut config = tokio_postgres::Config::new();
1986    config.host(addr.ip().to_string());
1987    config.port(addr.port());
1988    config.user(user.unwrap_or("materialize"));
1989    if let Some(password) = password {
1990        config.password(password);
1991    }
1992    let (client, connection) = config.connect(NoTls).await?;
1993
1994    task::spawn(|| "sqllogictest_connect", async move {
1995        if let Err(e) = connection.await {
1996            eprintln!("connection error: {}", e);
1997        }
1998    });
1999    Ok(client)
2000}
2001
2002pub trait WriteFmt {
2003    fn write_fmt(&self, fmt: fmt::Arguments<'_>);
2004}
2005
2006pub struct RunConfig<'a> {
2007    pub stdout: &'a dyn WriteFmt,
2008    pub stderr: &'a dyn WriteFmt,
2009    pub verbose: bool,
2010    pub quiet: bool,
2011    pub postgres_url: String,
2012    pub prefix: String,
2013    pub no_fail: bool,
2014    pub fail_fast: bool,
2015    pub auto_index_tables: bool,
2016    pub auto_index_selects: bool,
2017    pub auto_transactions: bool,
2018    pub enable_table_keys: bool,
2019    pub orchestrator_process_wrapper: Option<String>,
2020    pub tracing: TracingCliArgs,
2021    pub tracing_handle: TracingHandle,
2022    pub system_parameter_defaults: BTreeMap<String, String>,
2023    /// Persist state is handled specially because:
2024    /// - Persist background workers do not necessarily shut down immediately once the server is
2025    ///   shut down, and may panic if their storage is deleted out from under them.
2026    /// - It's safe for different databases to reference the same state: all data is scoped by UUID.
2027    pub persist_dir: TempDir,
2028    pub replicas: usize,
2029    pub replica_size: String,
2030}
2031
2032/// Indentation used for verbose output of SQL statements.
2033const PRINT_INDENT: usize = 4;
2034
2035fn print_record(config: &RunConfig<'_>, record: &Record) {
2036    match record {
2037        Record::Statement { sql, .. } | Record::Query { sql, .. } => {
2038            print_sql(config.stdout, sql, None)
2039        }
2040        Record::Simple { conn, sql, .. } => print_sql(config.stdout, sql, *conn),
2041        Record::Copy {
2042            table_name,
2043            tsv_path,
2044        } => {
2045            writeln!(
2046                config.stdout,
2047                "{}slt copy {} from {}",
2048                " ".repeat(PRINT_INDENT),
2049                table_name,
2050                tsv_path
2051            )
2052        }
2053        Record::ResetServer => {
2054            writeln!(config.stdout, "{}reset-server", " ".repeat(PRINT_INDENT))
2055        }
2056        Record::Halt => {
2057            writeln!(config.stdout, "{}halt", " ".repeat(PRINT_INDENT))
2058        }
2059        Record::HashThreshold { threshold } => {
2060            writeln!(
2061                config.stdout,
2062                "{}hash-threshold {}",
2063                " ".repeat(PRINT_INDENT),
2064                threshold
2065            )
2066        }
2067    }
2068}
2069
2070fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
2071    if cond {
2072        print_sql(stdout, sql, None)
2073    }
2074}
2075
2076fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str, conn: Option<&str>) {
2077    let text = if let Some(conn) = conn {
2078        format!("[conn={}] {}", conn, sql)
2079    } else {
2080        sql.to_string()
2081    };
2082    writeln!(stdout, "{}", util::indent(&text, PRINT_INDENT))
2083}
2084
2085/// Regular expressions for matching error messages that should force a plan failure
2086/// in an inconsistent view outcome into a warning if the corresponding query succeeds.
2087const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
2088    // The following are unfixable errors in indexed views given our
2089    // current constraints.
2090    "cannot materialize call to",
2091    "SHOW commands are not allowed in views",
2092    "cannot create view with unstable dependencies",
2093    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
2094    "no valid schema selected",
2095    r#"system schema '\w+' cannot be modified"#,
2096    r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
2097    // NOTE(vmarcos): Column ambiguity that could not be eliminated by our
2098    // currently implemented syntactic rewrites is considered unfixable.
2099    // In addition, if some column cannot be dealt with, e.g., in `ORDER BY`
2100    // references, we treat this condition as unfixable as well.
2101    r#"column "[\w\?]+" specified more than once"#,
2102    r#"column "(\w+\.)?\w+" does not exist"#,
2103];
2104
2105/// Evaluates if the given outcome should be returned directly or if it should
2106/// be wrapped as a warning. Note that this function should be used for outcomes
2107/// that can be judged in a context-independent manner, i.e., the outcome itself
2108/// provides enough information as to whether a warning should be emitted or not.
2109fn should_warn(outcome: &Outcome) -> bool {
2110    match outcome {
2111        Outcome::InconsistentViewOutcome { view_outcome, .. } => match view_outcome.as_ref() {
2112            Outcome::PlanFailure { error, .. } => {
2113                INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
2114                    Regex::new(s)
2115                        .expect("unexpected error in regular expression parsing")
2116                        .is_match(&error.to_string_with_causes())
2117                })
2118            }
2119            _ => false,
2120        },
2121        _ => false,
2122    }
2123}
2124
2125pub async fn run_string(
2126    runner: &mut Runner<'_>,
2127    source: &str,
2128    input: &str,
2129) -> Result<Outcomes, anyhow::Error> {
2130    runner.reset_database().await?;
2131
2132    let mut outcomes = Outcomes::default();
2133    let mut parser = crate::parser::Parser::new(source, input);
2134    // Transactions are currently relatively slow. Since sqllogictest runs in a single connection
2135    // there should be no difference in having longer running transactions.
2136    let mut in_transaction = false;
2137    writeln!(runner.config.stdout, "--- {}", source);
2138
2139    for record in parser.parse_records()? {
2140        // In maximal-verbose mode, print the query before attempting to run
2141        // it. Running the query might panic, so it is important to print out
2142        // what query we are trying to run *before* we panic.
2143        if runner.config.verbose {
2144            print_record(runner.config, &record);
2145        }
2146
2147        let outcome = runner
2148            .run_record(&record, &mut in_transaction)
2149            .await
2150            .map_err(|err| format!("In {}:\n{}", source, err))
2151            .unwrap();
2152
2153        // Print warnings and failures in verbose mode.
2154        if !runner.config.quiet && !outcome.success() {
2155            if !runner.config.verbose {
2156                // If `verbose` is enabled, we'll already have printed the record,
2157                // so don't print it again. Yes, this is an ugly bit of logic.
2158                // Please don't try to consolidate it with the `print_record`
2159                // call above, as it's important to have a mode in which records
2160                // are printed before they are run, so that if running the
2161                // record panics, you can tell which record caused it.
2162                if !outcome.failure() {
2163                    writeln!(
2164                        runner.config.stdout,
2165                        "{}",
2166                        util::indent("Warning detected for: ", 4)
2167                    );
2168                }
2169                print_record(runner.config, &record);
2170            }
2171            if runner.config.verbose || outcome.failure() {
2172                writeln!(
2173                    runner.config.stdout,
2174                    "{}",
2175                    util::indent(&outcome.to_string(), 4)
2176                );
2177                writeln!(runner.config.stdout, "{}", util::indent("----", 4));
2178            }
2179        }
2180
2181        outcomes.stats[outcome.code()] += 1;
2182        if outcome.failure() {
2183            outcomes.details.push(format!("{}", outcome));
2184        }
2185
2186        if let Outcome::Bail { .. } = outcome {
2187            break;
2188        }
2189
2190        if runner.config.fail_fast && outcome.failure() {
2191            break;
2192        }
2193    }
2194    Ok(outcomes)
2195}
2196
2197pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
2198    let mut input = String::new();
2199    File::open(filename)?.read_to_string(&mut input)?;
2200    let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
2201    runner.check_catalog().await?;
2202
2203    Ok(outcomes)
2204}
2205
2206pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
2207    runner.reset_database().await?;
2208
2209    let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
2210
2211    let mut input = String::new();
2212    file.read_to_string(&mut input)?;
2213
2214    let mut buf = RewriteBuffer::new(&input);
2215
2216    let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
2217    writeln!(runner.config.stdout, "--- {}", filename.display());
2218    let mut in_transaction = false;
2219
2220    fn append_values_output(
2221        buf: &mut RewriteBuffer,
2222        input: &String,
2223        expected_output: &str,
2224        mode: &Mode,
2225        types: &Vec<Type>,
2226        column_names: Option<&Vec<ColumnName>>,
2227        actual_output: &Vec<String>,
2228        multiline: bool,
2229    ) {
2230        buf.append_header(input, expected_output, column_names);
2231
2232        for (i, row) in actual_output.chunks(types.len()).enumerate() {
2233            match mode {
2234                // In Cockroach mode, output each row on its own line, with
2235                // two spaces between each column.
2236                Mode::Cockroach => {
2237                    if i != 0 {
2238                        buf.append("\n");
2239                    }
2240
2241                    if row.len() == 0 {
2242                        // nothing to do
2243                    } else if row.len() == 1 {
2244                        // If there is only one column, then there is no need for space
2245                        // substitution, so we only do newline substitution.
2246                        if multiline {
2247                            buf.append(&row[0]);
2248                        } else {
2249                            buf.append(&row[0].replace('\n', "⏎"))
2250                        }
2251                    } else {
2252                        // Substitute spaces with ␠ to avoid mistaking the spaces in the result
2253                        // values with spaces that separate columns.
2254                        buf.append(
2255                            &row.iter()
2256                                .map(|col| {
2257                                    let mut col = col.replace(' ', "␠");
2258                                    if !multiline {
2259                                        col = col.replace('\n', "⏎");
2260                                    }
2261                                    col
2262                                })
2263                                .join("  "),
2264                        );
2265                    }
2266                }
2267                // In standard mode, output each value on its own line,
2268                // and ignore row boundaries.
2269                // No need to substitute spaces, because every value (not row) is on a separate
2270                // line. But we do need to substitute newlines.
2271                Mode::Standard => {
2272                    for (j, col) in row.iter().enumerate() {
2273                        if i != 0 || j != 0 {
2274                            buf.append("\n");
2275                        }
2276                        buf.append(&if multiline {
2277                            col.clone()
2278                        } else {
2279                            col.replace('\n', "⏎")
2280                        });
2281                    }
2282                }
2283            }
2284        }
2285    }
2286
2287    for record in parser.parse_records()? {
2288        let outcome = runner.run_record(&record, &mut in_transaction).await?;
2289
2290        match (&record, &outcome) {
2291            // If we see an output failure for a query, rewrite the expected output
2292            // to match the observed output.
2293            (
2294                Record::Query {
2295                    output:
2296                        Ok(QueryOutput {
2297                            mode,
2298                            output: Output::Values(_),
2299                            output_str: expected_output,
2300                            types,
2301                            column_names,
2302                            multiline,
2303                            ..
2304                        }),
2305                    ..
2306                },
2307                Outcome::OutputFailure {
2308                    actual_output: Output::Values(actual_output),
2309                    ..
2310                },
2311            ) => {
2312                append_values_output(
2313                    &mut buf,
2314                    &input,
2315                    expected_output,
2316                    mode,
2317                    types,
2318                    column_names.as_ref(),
2319                    actual_output,
2320                    *multiline,
2321                );
2322            }
2323            (
2324                Record::Query {
2325                    output:
2326                        Ok(QueryOutput {
2327                            mode,
2328                            output: Output::Values(_),
2329                            output_str: expected_output,
2330                            types,
2331                            multiline,
2332                            ..
2333                        }),
2334                    ..
2335                },
2336                Outcome::WrongColumnNames {
2337                    actual_column_names,
2338                    actual_output: Output::Values(actual_output),
2339                    ..
2340                },
2341            ) => {
2342                append_values_output(
2343                    &mut buf,
2344                    &input,
2345                    expected_output,
2346                    mode,
2347                    types,
2348                    Some(actual_column_names),
2349                    actual_output,
2350                    *multiline,
2351                );
2352            }
2353            (
2354                Record::Query {
2355                    output:
2356                        Ok(QueryOutput {
2357                            output: Output::Hashed { .. },
2358                            output_str: expected_output,
2359                            column_names,
2360                            ..
2361                        }),
2362                    ..
2363                },
2364                Outcome::OutputFailure {
2365                    actual_output: Output::Hashed { num_values, md5 },
2366                    ..
2367                },
2368            ) => {
2369                buf.append_header(&input, expected_output, column_names.as_ref());
2370
2371                buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
2372            }
2373            (
2374                Record::Simple {
2375                    output_str: expected_output,
2376                    ..
2377                },
2378                Outcome::OutputFailure {
2379                    actual_output: Output::Values(actual_output),
2380                    ..
2381                },
2382            ) => {
2383                buf.append_header(&input, expected_output, None);
2384
2385                for (i, row) in actual_output.iter().enumerate() {
2386                    if i != 0 {
2387                        buf.append("\n");
2388                    }
2389                    buf.append(row);
2390                }
2391            }
2392            (
2393                Record::Query {
2394                    sql,
2395                    output: Err(err),
2396                    ..
2397                },
2398                outcome,
2399            )
2400            | (
2401                Record::Statement {
2402                    expected_error: Some(err),
2403                    sql,
2404                    ..
2405                },
2406                outcome,
2407            ) if outcome.err_msg().is_some() => {
2408                buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
2409            }
2410            (_, Outcome::Success) => {}
2411            _ => bail!("unexpected: {:?} {:?}", record, outcome),
2412        }
2413    }
2414
2415    file.set_len(0)?;
2416    file.seek(SeekFrom::Start(0))?;
2417    file.write_all(buf.finish().as_bytes())?;
2418    file.sync_all()?;
2419    Ok(())
2420}
2421
2422/// Provides a means to rewrite the `.slt` file while iterating over it.
2423///
2424/// This struct takes the slt file as its `input`, tracks a cursor into it
2425/// (`input_offset`), and provides a buffer (`output`) to store the rewritten
2426/// results.
2427///
2428/// Functions that modify the file will lazily move `input` into `output` using
2429/// `flush_to`. However, those calls should all be interior to other functions.
2430#[derive(Debug)]
2431struct RewriteBuffer<'a> {
2432    input: &'a str,
2433    input_offset: usize,
2434    output: String,
2435}
2436
2437impl<'a> RewriteBuffer<'a> {
2438    fn new(input: &'a str) -> RewriteBuffer<'a> {
2439        RewriteBuffer {
2440            input,
2441            input_offset: 0,
2442            output: String::new(),
2443        }
2444    }
2445
2446    fn flush_to(&mut self, offset: usize) {
2447        assert!(offset >= self.input_offset);
2448        let chunk = &self.input[self.input_offset..offset];
2449        self.output.push_str(chunk);
2450        self.input_offset = offset;
2451    }
2452
2453    fn skip_to(&mut self, offset: usize) {
2454        assert!(offset >= self.input_offset);
2455        self.input_offset = offset;
2456    }
2457
2458    fn append(&mut self, s: &str) {
2459        self.output.push_str(s);
2460    }
2461
2462    fn append_header(
2463        &mut self,
2464        input: &String,
2465        expected_output: &str,
2466        column_names: Option<&Vec<ColumnName>>,
2467    ) {
2468        // Output everything before this record.
2469        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2470        #[allow(clippy::as_conversions)]
2471        let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
2472        self.flush_to(offset);
2473        self.skip_to(offset + expected_output.len());
2474
2475        // Attempt to install the result separator (----), if it does
2476        // not already exist.
2477        if self.peek_last(5) == "\n----" {
2478            self.append("\n");
2479        } else if self.peek_last(6) != "\n----\n" {
2480            self.append("\n----\n");
2481        }
2482
2483        let Some(names) = column_names else {
2484            return;
2485        };
2486        self.append(
2487            &names
2488                .iter()
2489                .map(|name| name.replace(' ', "␠"))
2490                .collect::<Vec<_>>()
2491                .join(" "),
2492        );
2493        self.append("\n");
2494    }
2495
2496    fn rewrite_expected_error(
2497        &mut self,
2498        input: &String,
2499        old_err: &str,
2500        new_err: &str,
2501        query: &str,
2502    ) {
2503        // Output everything before this error message.
2504        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2505        #[allow(clippy::as_conversions)]
2506        let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
2507        self.flush_to(err_offset);
2508        self.append(new_err);
2509        self.append("\n");
2510        self.append(query);
2511        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2512        #[allow(clippy::as_conversions)]
2513        self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
2514    }
2515
2516    fn peek_last(&self, n: usize) -> &str {
2517        &self.output[self.output.len() - n..]
2518    }
2519
2520    fn finish(mut self) -> String {
2521        self.flush_to(self.input.len());
2522        self.output
2523    }
2524}
2525
2526/// Generates view creation, view indexing, view querying, and view
2527/// dropping SQL commands for a given `SELECT` query. If the number
2528/// of attributes produced by the query is known, the view commands
2529/// are specialized to avoid issues with column ambiguity. This
2530/// function is a helper for `--auto_index_selects` and assumes that
2531/// the provided input SQL has already been run through the parser,
2532/// resulting in a valid `SELECT` statement.
2533fn generate_view_sql(
2534    sql: &str,
2535    view_uuid: &Simple,
2536    num_attributes: Option<usize>,
2537    expected_column_names: Option<Vec<ColumnName>>,
2538) -> (String, String, String, String) {
2539    // To create the view, re-parse the sql; note that we must find exactly
2540    // one statement and it must be a `SELECT`.
2541    // NOTE(vmarcos): Direct string manipulation was attempted while
2542    // prototyping the code below, which avoids the extra parsing and
2543    // data structure cloning. However, running DDL is so slow that
2544    // it did not matter in terms of runtime. We can revisit this if
2545    // DDL cost drops dramatically in the future.
2546    let stmts = parser::parse_statements(sql).unwrap_or_default();
2547    assert!(stmts.len() == 1);
2548    let (query, query_as_of) = match &stmts[0].ast {
2549        Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
2550        _ => unreachable!("This function should only be called for SELECTs"),
2551    };
2552
2553    // Prior to creating the view, process the `ORDER BY` clause of
2554    // the `SELECT` query, if any. Ordering is not preserved when a
2555    // view includes an `ORDER BY` clause and must be re-enforced by
2556    // an external `ORDER BY` clause when querying the view.
2557    let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
2558        (query.order_by.clone(), vec![], None)
2559    } else {
2560        derive_order_by(&query.body, &query.order_by)
2561    };
2562
2563    // Since one-shot SELECT statements may contain ambiguous column names,
2564    // we either use the expected column names, if that option was
2565    // provided, or else just rename the output schema of the view
2566    // using numerically increasing attribute names, whenever possible.
2567    // This strategy makes it possible to use `CREATE INDEX`, thus
2568    // matching the behavior of the option `auto_index_tables`. However,
2569    // we may be presented with a `SELECT *` query, in which case the parser
2570    // does not produce sufficient information to allow us to compute
2571    // the number of output columns. In the latter case, we are supplied
2572    // with `None` for `num_attributes` and just employ the command
2573    // `CREATE DEFAULT INDEX` instead. Additionally, the view is created
2574    // without schema renaming. This strategy is insufficient to dodge
2575    // column name ambiguity in all cases, but we assume here that we
2576    // can adjust the (hopefully) small number of tests that eventually
2577    // challenge us in this particular way.
2578    let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
2579    let projection = expected_column_names.map_or_else(
2580        || {
2581            num_attributes.map_or(vec![], |n| {
2582                (1..=n)
2583                    .map(|i| Ident::new_unchecked(format!("a{i}")))
2584                    .collect()
2585            })
2586        },
2587        |cols| {
2588            cols.iter()
2589                .map(|c| Ident::new_unchecked(c.as_str()))
2590                .collect()
2591        },
2592    );
2593    let columns: Vec<Ident> = projection
2594        .iter()
2595        .cloned()
2596        .chain(extra_columns.iter().map(|item| {
2597            if let SelectItem::Expr {
2598                expr: _,
2599                alias: Some(ident),
2600            } = item
2601            {
2602                ident.clone()
2603            } else {
2604                unreachable!("alias must be given for extra column")
2605            }
2606        }))
2607        .collect();
2608
2609    // Build a `CREATE VIEW` with the columns computed above.
2610    let mut query = query.clone();
2611    if extra_columns.len() > 0 {
2612        match &mut query.body {
2613            SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
2614            _ => unimplemented!("cannot yet rewrite projections of nested queries"),
2615        }
2616    }
2617    let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
2618        if_exists: IfExistsBehavior::Error,
2619        temporary: false,
2620        definition: ViewDefinition {
2621            name: name.clone(),
2622            columns: columns.clone(),
2623            query,
2624        },
2625    })
2626    .to_ast_string_stable();
2627
2628    // We then create either a `CREATE INDEX` or a `CREATE DEFAULT INDEX`
2629    // statement, depending on whether we could obtain the number of
2630    // attributes from the original `SELECT`.
2631    let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2632        name: None,
2633        in_cluster: None,
2634        on_name: RawItemName::Name(name.clone()),
2635        key_parts: if columns.len() == 0 {
2636            None
2637        } else {
2638            Some(
2639                columns
2640                    .iter()
2641                    .map(|ident| Expr::Identifier(vec![ident.clone()]))
2642                    .collect(),
2643            )
2644        },
2645        with_options: Vec::new(),
2646        if_not_exists: false,
2647    })
2648    .to_ast_string_stable();
2649
2650    // Assert if DISTINCT semantics are unchanged from view
2651    let distinct_unneeded = extra_columns.len() == 0
2652        || match distinct {
2653            None | Some(Distinct::On(_)) => true,
2654            Some(Distinct::EntireRow) => false,
2655        };
2656    let distinct = if distinct_unneeded { None } else { distinct };
2657
2658    // `SELECT [* | {projection}] FROM {name} [ORDER BY {view_order_by}]`
2659    let view_sql = AstStatement::<Raw>::Select(SelectStatement {
2660        query: Query {
2661            ctes: CteBlock::Simple(vec![]),
2662            body: SetExpr::Select(Box::new(Select {
2663                distinct,
2664                projection: if projection.len() == 0 {
2665                    vec![SelectItem::Wildcard]
2666                } else {
2667                    projection
2668                        .iter()
2669                        .map(|ident| SelectItem::Expr {
2670                            expr: Expr::Identifier(vec![ident.clone()]),
2671                            alias: None,
2672                        })
2673                        .collect()
2674                },
2675                from: vec![TableWithJoins {
2676                    relation: TableFactor::Table {
2677                        name: RawItemName::Name(name.clone()),
2678                        alias: None,
2679                    },
2680                    joins: vec![],
2681                }],
2682                selection: None,
2683                group_by: vec![],
2684                having: None,
2685                qualify: None,
2686                options: vec![],
2687            })),
2688            order_by: view_order_by,
2689            limit: None,
2690            offset: None,
2691        },
2692        as_of: query_as_of.clone(),
2693    })
2694    .to_ast_string_stable();
2695
2696    // `DROP VIEW {name}`
2697    let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
2698        object_type: ObjectType::View,
2699        if_exists: false,
2700        names: vec![UnresolvedObjectName::Item(name)],
2701        cascade: false,
2702    })
2703    .to_ast_string_stable();
2704
2705    (create_view, create_index, view_sql, drop_view)
2706}
2707
2708/// Analyzes the provided query `body` to derive the number of
2709/// attributes in the query. We only consider syntactic cues,
2710/// so we may end up deriving `None` for the number of attributes
2711/// as a conservative approximation.
2712fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
2713    let Some((projection, _)) = find_projection(body) else {
2714        return None;
2715    };
2716    derive_num_attributes_from_projection(projection)
2717}
2718
2719/// Analyzes a query's `ORDER BY` clause to derive an `ORDER BY`
2720/// clause that makes numeric references to any expressions in
2721/// the projection and generated-attribute references to expressions
2722/// that need to be added as extra columns to the projection list.
2723/// The rewritten `ORDER BY` clause is then usable when querying a
2724/// view that contains the same `SELECT` as the given query.
2725/// This function returns both the rewritten `ORDER BY` clause
2726/// as well as a list of extra columns that need to be added
2727/// to the query's projection for the `ORDER BY` clause to
2728/// succeed.
2729fn derive_order_by(
2730    body: &SetExpr<Raw>,
2731    order_by: &Vec<OrderByExpr<Raw>>,
2732) -> (
2733    Vec<OrderByExpr<Raw>>,
2734    Vec<SelectItem<Raw>>,
2735    Option<Distinct<Raw>>,
2736) {
2737    let Some((projection, distinct)) = find_projection(body) else {
2738        return (vec![], vec![], None);
2739    };
2740    let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
2741    (view_order_by, extra_columns, distinct.clone())
2742}
2743
2744/// Finds the projection list in a `SELECT` query body.
2745fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
2746    // Iterate to peel off the query body until the query's
2747    // projection list is found.
2748    let mut set_expr = body;
2749    loop {
2750        match set_expr {
2751            SetExpr::Select(select) => {
2752                return Some((&select.projection, &select.distinct));
2753            }
2754            SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
2755            SetExpr::Query(query) => set_expr = &query.body,
2756            _ => return None,
2757        }
2758    }
2759}
2760
2761/// Computes the number of attributes that are obtained by the
2762/// projection of a `SELECT` query. The projection may include
2763/// wildcards, in which case the analysis just returns `None`.
2764fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
2765    let mut num_attributes = 0usize;
2766    for item in projection.iter() {
2767        let SelectItem::Expr { expr, .. } = item else {
2768            return None;
2769        };
2770        match expr {
2771            Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
2772                return None;
2773            }
2774            _ => {
2775                num_attributes += 1;
2776            }
2777        }
2778    }
2779    Some(num_attributes)
2780}
2781
2782/// Computes an `ORDER BY` clause with only numeric references
2783/// from given projection and `ORDER BY` of a `SELECT` query.
2784/// If the derivation fails to match a given expression, the
2785/// matched prefix is returned. Note that this could be empty.
2786fn derive_order_by_from_projection(
2787    projection: &Vec<SelectItem<Raw>>,
2788    order_by: &Vec<OrderByExpr<Raw>>,
2789) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
2790    let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
2791    let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
2792    for order_by_expr in order_by.iter() {
2793        let query_expr = &order_by_expr.expr;
2794        let view_expr = match query_expr {
2795            Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
2796            _ => {
2797                // Find expression in query projection, if we can.
2798                if let Some(i) = projection.iter().position(|item| match item {
2799                    SelectItem::Expr { expr, alias } => {
2800                        expr == query_expr
2801                            || match query_expr {
2802                                Expr::Identifier(ident) => {
2803                                    ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
2804                                }
2805                                _ => false,
2806                            }
2807                    }
2808                    SelectItem::Wildcard => false,
2809                }) {
2810                    Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
2811                } else {
2812                    // If the expression is not found in the
2813                    // projection, add extra column.
2814                    let ident = Ident::new_unchecked(format!(
2815                        "a{}",
2816                        (projection.len() + extra_columns.len() + 1)
2817                    ));
2818                    extra_columns.push(SelectItem::Expr {
2819                        expr: query_expr.clone(),
2820                        alias: Some(ident.clone()),
2821                    });
2822                    Expr::Identifier(vec![ident])
2823                }
2824            }
2825        };
2826        view_order_by.push(OrderByExpr {
2827            expr: view_expr,
2828            asc: order_by_expr.asc,
2829            nulls_last: order_by_expr.nulls_last,
2830        });
2831    }
2832    (view_order_by, extra_columns)
2833}
2834
2835/// Returns extra statements to execute after `stmt` is executed.
2836fn mutate(sql: &str) -> Vec<String> {
2837    let stmts = parser::parse_statements(sql).unwrap_or_default();
2838    let mut additional = Vec::new();
2839    for stmt in stmts {
2840        match stmt.ast {
2841            AstStatement::CreateTable(stmt) => additional.push(
2842                // CREATE TABLE -> CREATE INDEX. Specify all columns manually in case CREATE
2843                // DEFAULT INDEX ever goes away.
2844                AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2845                    name: None,
2846                    in_cluster: None,
2847                    on_name: RawItemName::Name(stmt.name.clone()),
2848                    key_parts: Some(
2849                        stmt.columns
2850                            .iter()
2851                            .map(|def| Expr::Identifier(vec![def.name.clone()]))
2852                            .collect(),
2853                    ),
2854                    with_options: Vec::new(),
2855                    if_not_exists: false,
2856                })
2857                .to_ast_string_stable(),
2858            ),
2859            _ => {}
2860        }
2861    }
2862    additional
2863}
2864
2865#[mz_ore::test]
2866#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
2867fn test_generate_view_sql() {
2868    let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
2869    let cases = vec![
2870        (("SELECT * FROM t", None, None),
2871        (
2872            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
2873            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2874            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2875            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2876        )),
2877        (("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
2878        (
2879            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2880            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
2881            r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2882            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2883        )),
2884        (("SELECT a, b, c FROM t1, t2", Some(3), None),
2885        (
2886            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2887            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2888            r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2889            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2890        )),
2891        // A case with ambiguity that is accepted by the function, illustrating that
2892        // our measures to dodge this issue are imperfect.
2893        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
2894        (
2895            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
2896            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2897            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2898            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2899        )),
2900        (("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
2901        (
2902            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
2903            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
2904            r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
2905            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2906        )),
2907        (("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
2908        (
2909            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
2910            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
2911            r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2912            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2913        )),
2914        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
2915        (
2916            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
2917            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2918            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2919            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2920        )),
2921        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
2922        (
2923            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
2924            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2925            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
2926            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2927        )),
2928        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
2929        (
2930            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
2931            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2932            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
2933            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2934        )),
2935        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
2936        (
2937            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
2938            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2939            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
2940            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2941        )),
2942    ];
2943    for ((sql, num_attributes, expected_column_names), expected) in cases {
2944        let view_sql =
2945            generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
2946        assert_eq!(expected, view_sql);
2947    }
2948}
2949
2950#[mz_ore::test]
2951fn test_mutate() {
2952    let cases = vec![
2953        ("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
2954        (
2955            "CREATE TABLE t (a INT)",
2956            vec![r#"CREATE INDEX ON "t" ("a")"#],
2957        ),
2958        (
2959            "CREATE TABLE t (a INT, b TEXT)",
2960            vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
2961        ),
2962        // Invalid syntax works, just returns nothing.
2963        ("BAD SYNTAX", Vec::new()),
2964    ];
2965    for (sql, expected) in cases {
2966        let stmts = mutate(sql);
2967        assert_eq!(expected, stmts, "sql: {sql}");
2968    }
2969}