Skip to main content

mz_sqllogictest/
runner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The Materialize-specific runner for sqllogictest.
11//!
12//! slt tests expect a serialized execution of sql statements and queries.
13//! To get the same results in materialize we track current_timestamp and increment it whenever we execute a statement.
14//!
15//! The high-level workflow is:
16//!   for each record in the test file:
17//!     if record is a sql statement:
18//!       run sql in postgres, observe changes and copy them to materialize using LocalInput::Updates(..)
19//!       advance current_timestamp
20//!       promise to never send updates for times < current_timestamp using LocalInput::Watermark(..)
21//!       compare to expected results
22//!       if wrong, bail out and stop processing this file
23//!     if record is a sql query:
24//!       peek query at current_timestamp
25//!       compare to expected results
26//!       if wrong, record the error
27
28use std::collections::BTreeMap;
29use std::error::Error;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write};
32use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::LazyLock;
36use std::time::Duration;
37use std::{env, fmt, ops, str, thread};
38
39use anyhow::{anyhow, bail};
40use bytes::BytesMut;
41use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
42use fallible_iterator::FallibleIterator;
43use futures::sink::SinkExt;
44use itertools::Itertools;
45use maplit::btreemap;
46use md5::{Digest, Md5};
47use mz_adapter_types::bootstrap_builtin_cluster_config::{
48    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
49    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
50    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
51};
52use mz_catalog::config::ClusterReplicaSizeMap;
53use mz_controller::{ControllerConfig, ReplicaHttpLocator};
54use mz_environmentd::CatalogConfig;
55use mz_license_keys::ValidatedLicenseKey;
56use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
57use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
58use mz_ore::cast::{CastFrom, ReinterpretCast};
59use mz_ore::channel::trigger;
60use mz_ore::error::ErrorExt;
61use mz_ore::metrics::MetricsRegistry;
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::retry::Retry;
64use mz_ore::sql;
65use mz_ore::sql::Sql;
66use mz_ore::task;
67use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
68use mz_ore::tracing::TracingHandle;
69use mz_ore::url::SensitiveUrl;
70use mz_persist_client::PersistLocation;
71use mz_persist_client::cache::PersistClientCache;
72use mz_persist_client::cfg::PersistConfig;
73use mz_persist_client::rpc::{
74    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
75};
76use mz_pgrepr::{Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value, oid};
77use mz_repr::ColumnName;
78use mz_repr::adt::date::Date;
79use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
80use mz_repr::adt::numeric;
81use mz_secrets::SecretsController;
82use mz_server_core::listeners::{
83    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
84    ListenersConfig, SqlListenerConfig,
85};
86use mz_sql::ast::{Expr, Raw, Statement};
87use mz_sql::catalog::EnvironmentId;
88use mz_sql_parser::ast::display::AstDisplay;
89use mz_sql_parser::ast::{
90    CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
91    IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
92    SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
93    UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
94};
95use mz_sql_parser::parser;
96use mz_storage_types::connections::ConnectionContext;
97use postgres_protocol::types;
98use regex::Regex;
99use tempfile::TempDir;
100use tokio::net::TcpListener;
101use tokio::runtime::Runtime;
102use tokio::sync::oneshot;
103use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
104use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
105use tokio_stream::wrappers::TcpListenerStream;
106use tower_http::cors::AllowOrigin;
107use tracing::{error, info};
108use uuid::Uuid;
109use uuid::fmt::Simple;
110
111use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
112use crate::util;
113
114#[derive(Debug)]
115pub enum Outcome<'a> {
116    Unsupported {
117        error: anyhow::Error,
118        location: Location,
119    },
120    ParseFailure {
121        error: anyhow::Error,
122        location: Location,
123    },
124    PlanFailure {
125        error: anyhow::Error,
126        expected_error: Option<String>,
127        location: Location,
128    },
129    UnexpectedPlanSuccess {
130        expected_error: &'a str,
131        location: Location,
132    },
133    WrongNumberOfRowsInserted {
134        expected_count: u64,
135        actual_count: u64,
136        location: Location,
137    },
138    WrongColumnCount {
139        expected_count: usize,
140        actual_count: usize,
141        location: Location,
142    },
143    WrongColumnNames {
144        expected_column_names: &'a Vec<ColumnName>,
145        actual_column_names: Vec<ColumnName>,
146        actual_output: Output,
147        location: Location,
148    },
149    OutputFailure {
150        expected_output: &'a Output,
151        actual_raw_output: Vec<Row>,
152        actual_output: Output,
153        location: Location,
154    },
155    InconsistentViewOutcome {
156        query_outcome: Box<Outcome<'a>>,
157        view_outcome: Box<Outcome<'a>>,
158        location: Location,
159    },
160    Bail {
161        cause: Box<Outcome<'a>>,
162        location: Location,
163    },
164    Warning {
165        cause: Box<Outcome<'a>>,
166        location: Location,
167    },
168    Success,
169}
170
171const NUM_OUTCOMES: usize = 12;
172const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
173const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
174
175impl<'a> Outcome<'a> {
176    fn code(&self) -> usize {
177        match self {
178            Outcome::Unsupported { .. } => 0,
179            Outcome::ParseFailure { .. } => 1,
180            Outcome::PlanFailure { .. } => 2,
181            Outcome::UnexpectedPlanSuccess { .. } => 3,
182            Outcome::WrongNumberOfRowsInserted { .. } => 4,
183            Outcome::WrongColumnCount { .. } => 5,
184            Outcome::WrongColumnNames { .. } => 6,
185            Outcome::OutputFailure { .. } => 7,
186            Outcome::InconsistentViewOutcome { .. } => 8,
187            Outcome::Bail { .. } => 9,
188            Outcome::Warning { .. } => 10,
189            Outcome::Success => 11,
190        }
191    }
192
193    fn success(&self) -> bool {
194        matches!(self, Outcome::Success)
195    }
196
197    fn failure(&self) -> bool {
198        !matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
199    }
200
201    /// Returns an error message that will match self. Appropriate for
202    /// rewriting error messages (i.e. not inserting error messages where we
203    /// currently expect success).
204    fn err_msg(&self) -> Option<String> {
205        match self {
206            Outcome::Unsupported { error, .. }
207            | Outcome::ParseFailure { error, .. }
208            | Outcome::PlanFailure { error, .. } => {
209                // Take only the first line, which should be sufficient for
210                // meaningfully matching the error.
211                let err_str = error.to_string_with_causes();
212                let err_str = err_str.split('\n').next().unwrap();
213                // Strip the "db error: ERROR: " prefix added by the postgres
214                // client library, as it's noisy and not useful for matching.
215                let err_str = err_str.strip_prefix("db error: ERROR: ").unwrap_or(err_str);
216                // This value gets fed back into regex to check that it matches
217                // `self`, so escape its meta characters. We need to undo the
218                // escaping of #. `regex::escape` escapes this because it
219                // expects that we use the `x` flag when building a regex, but
220                // this is not the case, so \# would end up being an invalid
221                // escape sequence, which would choke the parsing of the slt
222                // file the next time around.
223                Some(regex::escape(err_str).replace(r"\#", "#"))
224            }
225            _ => None,
226        }
227    }
228}
229
230impl fmt::Display for Outcome<'_> {
231    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
232        use Outcome::*;
233        const INDENT: &str = "\n        ";
234        match self {
235            Unsupported { error, location } => write!(
236                f,
237                "Unsupported:{}:\n{}",
238                location,
239                error.display_with_causes()
240            ),
241            ParseFailure { error, location } => {
242                write!(
243                    f,
244                    "ParseFailure:{}:\n{}",
245                    location,
246                    error.display_with_causes()
247                )
248            }
249            PlanFailure {
250                error,
251                expected_error,
252                location,
253            } => {
254                if let Some(expected_error) = expected_error {
255                    write!(
256                        f,
257                        "PlanFailure:{}:\nerror does not match expected pattern:\n  expected: /{}/\n  actual:    {}",
258                        location,
259                        expected_error,
260                        error.display_with_causes()
261                    )
262                } else {
263                    write!(f, "PlanFailure:{}:\n{:#}", location, error)
264                }
265            }
266            UnexpectedPlanSuccess {
267                expected_error,
268                location,
269            } => write!(
270                f,
271                "UnexpectedPlanSuccess:{} expected error: {}",
272                location, expected_error
273            ),
274            WrongNumberOfRowsInserted {
275                expected_count,
276                actual_count,
277                location,
278            } => write!(
279                f,
280                "WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
281                location, INDENT, expected_count, INDENT, actual_count
282            ),
283            WrongColumnCount {
284                expected_count,
285                actual_count,
286                location,
287            } => write!(
288                f,
289                "WrongColumnCount:{}{}expected: {}{}actually: {}",
290                location, INDENT, expected_count, INDENT, actual_count
291            ),
292            WrongColumnNames {
293                expected_column_names,
294                actual_column_names,
295                actual_output: _,
296                location,
297            } => write!(
298                f,
299                "Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
300                location,
301                INDENT,
302                expected_column_names
303                    .iter()
304                    .map(|n| n.to_string())
305                    .collect::<Vec<_>>()
306                    .join(" "),
307                INDENT,
308                actual_column_names
309                    .iter()
310                    .map(|n| n.to_string())
311                    .collect::<Vec<_>>()
312                    .join(" ")
313            ),
314            OutputFailure {
315                expected_output,
316                actual_raw_output,
317                actual_output,
318                location,
319            } => write!(
320                f,
321                "OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
322                location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
323            ),
324            InconsistentViewOutcome {
325                query_outcome,
326                view_outcome,
327                location,
328            } => write!(
329                f,
330                "InconsistentViewOutcome:{}{}expected from query: {}{}actually from indexed view: {}",
331                location, INDENT, query_outcome, INDENT, view_outcome
332            ),
333            Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
334            Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
335            Success => f.write_str("Success"),
336        }
337    }
338}
339
340#[derive(Default, Debug)]
341pub struct Outcomes {
342    stats: [usize; NUM_OUTCOMES],
343    details: Vec<String>,
344}
345
346impl ops::AddAssign<Outcomes> for Outcomes {
347    fn add_assign(&mut self, rhs: Outcomes) {
348        for (lhs, rhs) in self.stats.iter_mut().zip_eq(rhs.stats.iter()) {
349            *lhs += rhs
350        }
351    }
352}
353impl Outcomes {
354    pub fn any_failed(&self) -> bool {
355        self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
356    }
357
358    pub fn as_json(&self) -> serde_json::Value {
359        serde_json::json!({
360            "unsupported": self.stats[0],
361            "parse_failure": self.stats[1],
362            "plan_failure": self.stats[2],
363            "unexpected_plan_success": self.stats[3],
364            "wrong_number_of_rows_affected": self.stats[4],
365            "wrong_column_count": self.stats[5],
366            "wrong_column_names": self.stats[6],
367            "output_failure": self.stats[7],
368            "inconsistent_view_outcome": self.stats[8],
369            "bail": self.stats[9],
370            "warning": self.stats[10],
371            "success": self.stats[11],
372        })
373    }
374
375    pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
376        OutcomesDisplay {
377            inner: self,
378            no_fail,
379            failure_details,
380        }
381    }
382}
383
384pub struct OutcomesDisplay<'a> {
385    inner: &'a Outcomes,
386    no_fail: bool,
387    failure_details: bool,
388}
389
390impl<'a> fmt::Display for OutcomesDisplay<'a> {
391    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
392        let total: usize = self.inner.stats.iter().sum();
393        if self.failure_details
394            && (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
395                || self.no_fail)
396        {
397            for outcome in &self.inner.details {
398                writeln!(f, "{}", outcome)?;
399            }
400            Ok(())
401        } else {
402            write!(
403                f,
404                "{}:",
405                if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
406                    "PASS"
407                } else if self.no_fail {
408                    "FAIL-IGNORE"
409                } else {
410                    "FAIL"
411                }
412            )?;
413            static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
414                vec![
415                    "unsupported",
416                    "parse-failure",
417                    "plan-failure",
418                    "unexpected-plan-success",
419                    "wrong-number-of-rows-inserted",
420                    "wrong-column-count",
421                    "wrong-column-names",
422                    "output-failure",
423                    "inconsistent-view-outcome",
424                    "bail",
425                    "warning",
426                    "success",
427                    "total",
428                ]
429            });
430            for (i, n) in self.inner.stats.iter().enumerate() {
431                if *n > 0 {
432                    write!(f, " {}={}", NAMES[i], n)?;
433                }
434            }
435            write!(f, " total={}", total)
436        }
437    }
438}
439
440struct QueryInfo {
441    is_select: bool,
442    num_attributes: Option<usize>,
443    /// Whether the `SELECT` carries an `AS OF` clause. Such queries are
444    /// excluded from the `--auto-index-selects` consistency check: re-running
445    /// them against a freshly created indexed view is inherently racy, because
446    /// the view's `since` advances with time and can move past a historical
447    /// `AS OF`, yielding a spurious "could not find a valid timestamp" failure.
448    has_as_of: bool,
449}
450
451enum PrepareQueryOutcome<'a> {
452    QueryPrepared(QueryInfo),
453    Outcome(Outcome<'a>),
454}
455
456pub struct Runner<'a> {
457    config: &'a RunConfig<'a>,
458    inner: Option<RunnerInner<'a>>,
459    /// Active `replace` substitutions, applied to actual query output before it
460    /// is compared/rewritten. Held on the outer `Runner` so they survive a
461    /// `reset-server`. See `Record::Replace`.
462    replacements: Vec<(Regex, String)>,
463}
464
465pub struct RunnerInner<'a> {
466    server_addr: SocketAddr,
467    internal_server_addr: SocketAddr,
468    password_server_addr: SocketAddr,
469    internal_http_server_addr: SocketAddr,
470    // Drop order matters for these fields.
471    client: tokio_postgres::Client,
472    system_client: tokio_postgres::Client,
473    clients: BTreeMap<String, tokio_postgres::Client>,
474    auto_index_tables: bool,
475    auto_index_selects: bool,
476    auto_transactions: bool,
477    enable_table_keys: bool,
478    verbose: bool,
479    stdout: &'a dyn WriteFmt,
480    _shutdown_trigger: trigger::Trigger,
481    _server_thread: JoinOnDropHandle<()>,
482    _temp_dir: TempDir,
483}
484
485#[derive(Debug)]
486pub struct Slt(Value);
487
488impl<'a> FromSql<'a> for Slt {
489    fn from_sql(
490        ty: &PgType,
491        mut raw: &'a [u8],
492    ) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
493        Ok(match *ty {
494            PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
495                types::bytea_from_sql(raw),
496            )?)),
497            PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
498            PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
499            PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
500                types::char_from_sql(raw)?.to_be_bytes(),
501            ))),
502            PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
503            PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
504            PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
505                raw,
506            )?)?)),
507            PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
508            PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
509            PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
510            PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
511            PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
512            PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
513            PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
514            PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
515            PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
516            PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
517            PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
518            PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
519                Self(Value::Text(types::text_from_sql(raw)?.to_string()))
520            }
521            PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
522            PgType::TIMESTAMP => Self(Value::Timestamp(
523                NaiveDateTime::from_sql(ty, raw)?.try_into()?,
524            )),
525            PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
526                DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
527            )),
528            PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
529            PgType::RECORD => {
530                let num_fields = read_be_i32(&mut raw)?;
531                let mut tuple = vec![];
532                for _ in 0..num_fields {
533                    let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
534                    let typ = match PgType::from_oid(oid) {
535                        Some(typ) => typ,
536                        None => return Err("unknown oid".into()),
537                    };
538                    let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
539                    tuple.push(v.map(|v| v.0));
540                }
541                Self(Value::Record(tuple))
542            }
543            PgType::INT4_RANGE
544            | PgType::INT8_RANGE
545            | PgType::DATE_RANGE
546            | PgType::NUM_RANGE
547            | PgType::TS_RANGE
548            | PgType::TSTZ_RANGE => {
549                use mz_repr::adt::range::Range;
550                let range: Range<Slt> = Range::from_sql(ty, raw)?;
551                Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
552            }
553
554            _ => match ty.kind() {
555                PgKind::Array(arr_type) => {
556                    let arr = types::array_from_sql(raw)?;
557                    let elements: Vec<Option<Value>> = arr
558                        .values()
559                        .map(|v| match v {
560                            Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
561                            None => Ok(None),
562                        })
563                        .collect::<Vec<Option<Slt>>>()?
564                        .into_iter()
565                        // Map a Vec<Option<Slt>> to Vec<Option<Value>>.
566                        .map(|v| v.map(|v| v.0))
567                        .collect();
568
569                    Self(Value::Array {
570                        dims: arr
571                            .dimensions()
572                            .map(|d| {
573                                Ok(mz_repr::adt::array::ArrayDimension {
574                                    lower_bound: isize::cast_from(d.lower_bound),
575                                    length: usize::try_from(d.len)
576                                        .expect("cannot have negative length"),
577                                })
578                            })
579                            .collect()?,
580                        elements,
581                    })
582                }
583                _ => match ty.oid() {
584                    oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
585                    oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
586                    oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
587                    oid::TYPE_MZ_TIMESTAMP_OID => {
588                        let s = types::text_from_sql(raw)?;
589                        let t: mz_repr::Timestamp = s.parse()?;
590                        Self(Value::MzTimestamp(t))
591                    }
592                    oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
593                        types::bytea_from_sql(raw),
594                    )?)),
595                    _ => unreachable!(),
596                },
597            },
598        })
599    }
600    fn accepts(ty: &PgType) -> bool {
601        match ty.kind() {
602            PgKind::Array(_) | PgKind::Composite(_) => return true,
603            _ => {}
604        }
605        match ty.oid() {
606            oid::TYPE_UINT2_OID
607            | oid::TYPE_UINT4_OID
608            | oid::TYPE_UINT8_OID
609            | oid::TYPE_MZ_TIMESTAMP_OID
610            | oid::TYPE_MZ_ACL_ITEM_OID => return true,
611            _ => {}
612        }
613        matches!(
614            *ty,
615            PgType::ACLITEM
616                | PgType::BOOL
617                | PgType::BYTEA
618                | PgType::CHAR
619                | PgType::DATE
620                | PgType::FLOAT4
621                | PgType::FLOAT8
622                | PgType::INT2
623                | PgType::INT4
624                | PgType::INT8
625                | PgType::INTERVAL
626                | PgType::JSONB
627                | PgType::NAME
628                | PgType::NUMERIC
629                | PgType::OID
630                | PgType::REGCLASS
631                | PgType::REGPROC
632                | PgType::REGTYPE
633                | PgType::RECORD
634                | PgType::TEXT
635                | PgType::BPCHAR
636                | PgType::VARCHAR
637                | PgType::TIME
638                | PgType::TIMESTAMP
639                | PgType::TIMESTAMPTZ
640                | PgType::UUID
641                | PgType::INT4_RANGE
642                | PgType::INT4_RANGE_ARRAY
643                | PgType::INT8_RANGE
644                | PgType::INT8_RANGE_ARRAY
645                | PgType::DATE_RANGE
646                | PgType::DATE_RANGE_ARRAY
647                | PgType::NUM_RANGE
648                | PgType::NUM_RANGE_ARRAY
649                | PgType::TS_RANGE
650                | PgType::TS_RANGE_ARRAY
651                | PgType::TSTZ_RANGE
652                | PgType::TSTZ_RANGE_ARRAY
653        )
654    }
655}
656
657// From postgres-types/src/private.rs.
658fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
659    if buf.len() < 4 {
660        return Err("invalid buffer size".into());
661    }
662    let mut bytes = [0; 4];
663    bytes.copy_from_slice(&buf[..4]);
664    *buf = &buf[4..];
665    Ok(i32::from_be_bytes(bytes))
666}
667
668// From postgres-types/src/private.rs.
669fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
670where
671    T: FromSql<'a>,
672{
673    let value = match usize::try_from(read_be_i32(buf)?) {
674        Err(_) => None,
675        Ok(len) => {
676            if len > buf.len() {
677                return Err("invalid buffer size".into());
678            }
679            let (head, tail) = buf.split_at(len);
680            *buf = tail;
681            Some(head)
682        }
683    };
684    T::from_sql_nullable(type_, value)
685}
686
687fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
688    match (typ, d.0) {
689        (Type::Bool, Value::Bool(b)) => b.to_string(),
690
691        (Type::Integer, Value::Int2(i)) => i.to_string(),
692        (Type::Integer, Value::Int4(i)) => i.to_string(),
693        (Type::Integer, Value::Int8(i)) => i.to_string(),
694        (Type::Integer, Value::UInt2(u)) => u.0.to_string(),
695        (Type::Integer, Value::UInt4(u)) => u.0.to_string(),
696        (Type::Integer, Value::UInt8(u)) => u.0.to_string(),
697        (Type::Integer, Value::Oid(i)) => i.to_string(),
698        // TODO(benesch): rewrite to avoid `as`.
699        #[allow(clippy::as_conversions)]
700        (Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
701        // TODO(benesch): rewrite to avoid `as`.
702        #[allow(clippy::as_conversions)]
703        (Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
704        // This is so wrong, but sqlite needs it.
705        (Type::Integer, Value::Text(_)) => "0".to_string(),
706        (Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
707        (Type::Integer, Value::Numeric(d)) => {
708            let mut d = d.0.0.clone();
709            let mut cx = numeric::cx_datum();
710            // Truncate the decimal to match sqlite.
711            if mode == Mode::Standard {
712                cx.set_rounding(dec::Rounding::Down);
713            }
714            cx.round(&mut d);
715            numeric::munge_numeric(&mut d).unwrap();
716            d.to_standard_notation_string()
717        }
718
719        (Type::Real, Value::Int2(i)) => format!("{:.3}", i),
720        (Type::Real, Value::Int4(i)) => format!("{:.3}", i),
721        (Type::Real, Value::Int8(i)) => format!("{:.3}", i),
722        (Type::Real, Value::Float4(f)) => match mode {
723            Mode::Standard => format!("{:.3}", f),
724            Mode::Cockroach => format!("{}", f),
725        },
726        (Type::Real, Value::Float8(f)) => match mode {
727            Mode::Standard => format!("{:.3}", f),
728            Mode::Cockroach => format!("{}", f),
729        },
730        (Type::Real, Value::Numeric(d)) => match mode {
731            Mode::Standard => {
732                let mut d = d.0.0.clone();
733                if d.exponent() < -3 {
734                    numeric::rescale(&mut d, 3).unwrap();
735                }
736                numeric::munge_numeric(&mut d).unwrap();
737                d.to_standard_notation_string()
738            }
739            Mode::Cockroach => d.0.0.to_standard_notation_string(),
740        },
741
742        (Type::Text, Value::Text(s)) => {
743            if s.is_empty() {
744                "(empty)".to_string()
745            } else {
746                s
747            }
748        }
749        (Type::Text, Value::Bool(b)) => b.to_string(),
750        (Type::Text, Value::Float4(f)) => format!("{:.3}", f),
751        (Type::Text, Value::Float8(f)) => format!("{:.3}", f),
752        // Bytes are printed as text iff they are valid UTF-8. This
753        // seems guaranteed to confuse everyone, but it is required for
754        // compliance with the CockroachDB sqllogictest runner. [0]
755        //
756        // [0]: https://github.com/cockroachdb/cockroach/blob/970782487/pkg/sql/logictest/logic.go#L2038-L2043
757        (Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
758            Ok(s) => s.to_string(),
759            Err(_) => format!("{:?}", b),
760        },
761        (Type::Text, Value::Numeric(d)) => d.0.0.to_standard_notation_string(),
762        // Everything else gets normal text encoding. This correctly handles things
763        // like arrays, tuples, and strings that need to be quoted.
764        (Type::Text, d) => {
765            let mut buf = BytesMut::new();
766            d.encode_text(&mut buf);
767            String::from_utf8_lossy(&buf).into_owned()
768        }
769
770        (Type::Oid, Value::Oid(o)) => o.to_string(),
771
772        (_, d) => panic!(
773            "Don't know how to format {:?} as {:?} in column {}",
774            d, typ, col,
775        ),
776    }
777}
778
779fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
780    let mut formatted: Vec<String> = vec![];
781    for i in 0..row.len() {
782        let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
783        let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
784        formatted.push(match t {
785            Some(t) => t,
786            None => "NULL".into(),
787        });
788    }
789
790    formatted
791}
792
793impl<'a> Runner<'a> {
794    pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
795        let mut runner = Self {
796            config,
797            inner: None,
798            replacements: Vec::new(),
799        };
800        runner.reset().await?;
801        Ok(runner)
802    }
803
804    pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
805        // Explicitly drop the old runner here to ensure that we wait for threads to terminate
806        // before starting a new runner
807        drop(self.inner.take());
808        self.inner = Some(RunnerInner::start(self.config).await?);
809
810        Ok(())
811    }
812
813    async fn run_record<'r>(
814        &mut self,
815        record: &'r Record<'r>,
816        in_transaction: &mut bool,
817    ) -> Result<Outcome<'r>, anyhow::Error> {
818        if let Record::ResetServer = record {
819            self.reset().await?;
820            Ok(Outcome::Success)
821        } else if let Record::Replace {
822            pattern,
823            replacement,
824        } = record
825        {
826            // Validated at parse time, so this compile cannot fail.
827            let regex = Regex::new(pattern).expect("replace regex validated by parser");
828            self.replacements.push((regex, replacement.clone()));
829            Ok(Outcome::Success)
830        } else {
831            self.inner
832                .as_mut()
833                .expect("RunnerInner missing")
834                .run_record(record, in_transaction, &self.replacements)
835                .await
836        }
837    }
838
839    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
840        self.inner
841            .as_ref()
842            .expect("RunnerInner missing")
843            .check_catalog()
844            .await
845    }
846
847    #[allow(clippy::disallowed_methods)]
848    async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
849        let inner = self.inner.as_mut().expect("RunnerInner missing");
850
851        inner.client.batch_execute("ROLLBACK;").await?;
852
853        inner
854            .system_client
855            .batch_execute(
856                "ROLLBACK;
857                 SET cluster = mz_catalog_server;
858                 RESET cluster_replica;",
859            )
860            .await?;
861
862        inner
863            .system_client
864            .batch_execute("ALTER SYSTEM RESET ALL")
865            .await?;
866
867        // Drop all databases, then recreate the `materialize` database.
868        for row in inner
869            .system_client
870            .query("SELECT name FROM mz_databases", &[])
871            .await?
872        {
873            let name: &str = row.get("name");
874            inner
875                .system_client
876                .batch_execute(sql!("DROP DATABASE {}", Sql::ident(name)).as_str())
877                .await?;
878        }
879        inner
880            .system_client
881            .batch_execute("CREATE DATABASE materialize")
882            .await?;
883
884        // Ensure quickstart cluster exists with one replica of size `self.config.replica_size`.
885        // We don't destroy the existing quickstart cluster replica if it exists, as turning
886        // on a cluster replica is exceptionally slow.
887        let mut needs_default_cluster = true;
888        for row in inner
889            .system_client
890            .query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
891            .await?
892        {
893            match row.get("name") {
894                "quickstart" => needs_default_cluster = false,
895                name => {
896                    inner
897                        .system_client
898                        .batch_execute(sql!("DROP CLUSTER {}", Sql::ident(name)).as_str())
899                        .await?
900                }
901            }
902        }
903        if needs_default_cluster {
904            inner
905                .system_client
906                .batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
907                .await?;
908        }
909        let mut needs_default_replica = false;
910        let rows = inner
911            .system_client
912            .query(
913                "SELECT name, size FROM mz_cluster_replicas
914                 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')
915                 ORDER BY name",
916                &[],
917            )
918            .await?;
919        if rows.len() != self.config.replicas {
920            needs_default_replica = true;
921        } else {
922            for (i, row) in rows.iter().enumerate() {
923                let name: &str = row.get("name");
924                let size: &str = row.get("size");
925                if name != format!("r{}", i + 1) || size != self.config.replica_size {
926                    needs_default_replica = true;
927                    break;
928                }
929            }
930        }
931
932        if needs_default_replica {
933            inner
934                .system_client
935                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = false)")
936                .await?;
937            for row in inner
938                .system_client
939                .query(
940                    "SELECT name FROM mz_cluster_replicas
941                     WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
942                    &[],
943                )
944                .await?
945            {
946                let name: &str = row.get("name");
947                inner
948                    .system_client
949                    .batch_execute(
950                        sql!("DROP CLUSTER REPLICA quickstart.{}", Sql::ident(name)).as_str(),
951                    )
952                    .await?;
953            }
954            for i in 1..=self.config.replicas {
955                inner
956                    .system_client
957                    .batch_execute(
958                        sql!(
959                            "CREATE CLUSTER REPLICA quickstart.r{} SIZE {}",
960                            i,
961                            Sql::literal(&self.config.replica_size)
962                        )
963                        .as_str(),
964                    )
965                    .await?;
966            }
967            inner
968                .system_client
969                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = true)")
970                .await?;
971        }
972
973        // Grant initial privileges.
974        inner
975            .system_client
976            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
977            .await?;
978        inner
979            .system_client
980            .batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
981            .await?;
982        inner
983            .system_client
984            .batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
985            .await?;
986        inner
987            .system_client
988            .batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
989            .await?;
990        inner
991            .system_client
992            .batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
993            .await?;
994
995        // Some sqllogictests require more than the default amount of tables, so we increase the
996        // limit for all tests.
997        inner
998            .system_client
999            .simple_query("ALTER SYSTEM SET max_tables = 100")
1000            .await?;
1001
1002        if inner.enable_table_keys {
1003            inner
1004                .system_client
1005                .simple_query("ALTER SYSTEM SET unsafe_enable_table_keys = true")
1006                .await?;
1007        }
1008
1009        inner.ensure_fixed_features().await?;
1010
1011        inner.client = connect(inner.server_addr, None, None).await.unwrap();
1012        inner.system_client = connect(inner.internal_server_addr, Some("mz_system"), None)
1013            .await
1014            .unwrap();
1015        inner.clients = BTreeMap::new();
1016
1017        Ok(())
1018    }
1019}
1020
1021impl<'a> RunnerInner<'a> {
1022    pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
1023        let temp_dir = tempfile::tempdir()?;
1024        let scratch_dir = tempfile::tempdir()?;
1025        let environment_id = EnvironmentId::for_tests();
1026        let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
1027            let postgres_url = &config.postgres_url;
1028            let prefix = &config.prefix;
1029            info!(%postgres_url, "starting server");
1030            let (client, conn) = Retry::default()
1031                .max_tries(5)
1032                .retry_async(|_| async {
1033                    match tokio_postgres::connect(postgres_url, NoTls).await {
1034                        Ok(c) => Ok(c),
1035                        Err(e) => {
1036                            error!(%e, "failed to connect to postgres");
1037                            Err(e)
1038                        }
1039                    }
1040                })
1041                .await?;
1042            task::spawn(|| "sqllogictest_connect", async move {
1043                if let Err(e) = conn.await {
1044                    panic!("connection error: {}", e);
1045                }
1046            });
1047            // `prefix` is generated by sqllogictest harness configuration and
1048            // cannot be represented with composable `Sql`.
1049            #[allow(clippy::disallowed_methods)]
1050            client
1051                .batch_execute(&format!(
1052                    "DROP SCHEMA IF EXISTS {prefix}_tsoracle CASCADE;
1053                     CREATE SCHEMA IF NOT EXISTS {prefix}_consensus;
1054                     CREATE SCHEMA {prefix}_tsoracle;"
1055                ))
1056                .await?;
1057            (
1058                format!("{postgres_url}?options=--search_path={prefix}_consensus")
1059                    .parse()
1060                    .expect("invalid consensus URI"),
1061                format!("{postgres_url}?options=--search_path={prefix}_tsoracle")
1062                    .parse()
1063                    .expect("invalid timestamp oracle URI"),
1064            )
1065        };
1066
1067        let secrets_dir = temp_dir.path().join("secrets");
1068        let orchestrator = Arc::new(
1069            ProcessOrchestrator::new(ProcessOrchestratorConfig {
1070                image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
1071                suppress_output: false,
1072                environment_id: environment_id.to_string(),
1073                secrets_dir: secrets_dir.clone(),
1074                command_wrapper: config
1075                    .orchestrator_process_wrapper
1076                    .as_ref()
1077                    .map_or(Ok(vec![]), |s| shell_words::split(s))?,
1078                propagate_crashes: true,
1079                tcp_proxy: None,
1080                scratch_directory: scratch_dir.path().to_path_buf(),
1081            })
1082            .await?,
1083        );
1084        let now = SYSTEM_TIME.clone();
1085        let metrics_registry = MetricsRegistry::new();
1086
1087        let persist_config = PersistConfig::new(
1088            &mz_environmentd::BUILD_INFO,
1089            now.clone(),
1090            mz_dyncfgs::all_dyncfgs(),
1091        );
1092        let persist_pubsub_server =
1093            PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
1094        let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
1095        let persist_pubsub_tcp_listener =
1096            TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
1097                .await
1098                .expect("pubsub addr binding");
1099        let persist_pubsub_server_port = persist_pubsub_tcp_listener
1100            .local_addr()
1101            .expect("pubsub addr has local addr")
1102            .port();
1103        info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
1104        mz_ore::task::spawn(|| "persist_pubsub_server", async move {
1105            persist_pubsub_server
1106                .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
1107                .await
1108                .expect("success")
1109        });
1110        let persist_clients =
1111            PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1112                let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1113                    cfg,
1114                    persist_pubsub_client.sender,
1115                    metrics,
1116                ));
1117                PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1118            });
1119        let persist_clients = Arc::new(persist_clients);
1120
1121        let secrets_controller = Arc::clone(&orchestrator);
1122        let connection_context = ConnectionContext::for_tests(orchestrator.reader());
1123        let orchestrator = Arc::new(TracingOrchestrator::new(
1124            orchestrator,
1125            config.tracing.clone(),
1126        ));
1127        let listeners_config = ListenersConfig {
1128            sql: btreemap! {
1129                "external".to_owned() => SqlListenerConfig {
1130                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1131                    authenticator_kind: AuthenticatorKind::None,
1132                    allowed_roles: AllowedRoles::Normal,
1133                    enable_tls: false,
1134                },
1135                "internal".to_owned() => SqlListenerConfig {
1136                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1137                    authenticator_kind: AuthenticatorKind::None,
1138                    allowed_roles: AllowedRoles::Internal,
1139                    enable_tls: false,
1140                },
1141                "password".to_owned() => SqlListenerConfig {
1142                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1143                    authenticator_kind: AuthenticatorKind::Password,
1144                    allowed_roles: AllowedRoles::Normal,
1145                    enable_tls: false,
1146                },
1147            },
1148            http: btreemap![
1149                "external".to_owned() => HttpListenerConfig {
1150                    base: BaseListenerConfig {
1151                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1152                        authenticator_kind: AuthenticatorKind::None,
1153                        allowed_roles: AllowedRoles::Normal,
1154                        enable_tls: false
1155                    },
1156                    routes: HttpRoutesEnabled {
1157                        base: true,
1158                        webhook: true,
1159                        internal: false,
1160                        metrics: false,
1161                        profiling: false,
1162                        mcp_agent: false,
1163                        mcp_developer: false,
1164                        console_config: true,
1165                    },
1166                },
1167                "internal".to_owned() => HttpListenerConfig {
1168                    base: BaseListenerConfig {
1169                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1170                        authenticator_kind: AuthenticatorKind::None,
1171                        allowed_roles: AllowedRoles::NormalAndInternal,
1172                        enable_tls: false
1173                    },
1174                    routes: HttpRoutesEnabled {
1175                        base: true,
1176                        webhook: true,
1177                        internal: true,
1178                        metrics: true,
1179                        profiling: true,
1180                        mcp_agent: false,
1181                        mcp_developer: false,
1182                        console_config: false,
1183                    },
1184                },
1185            ],
1186        };
1187        let listeners = mz_environmentd::Listeners::bind(listeners_config).await?;
1188        let host_name = format!(
1189            "localhost:{}",
1190            listeners.http["external"].handle.local_addr.port()
1191        );
1192        let catalog_config = CatalogConfig {
1193            persist_clients: Arc::clone(&persist_clients),
1194            metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
1195        };
1196        let server_config = mz_environmentd::Config {
1197            catalog_config,
1198            timestamp_oracle_url: Some(timestamp_oracle_url),
1199            controller: ControllerConfig {
1200                build_info: &mz_environmentd::BUILD_INFO,
1201                orchestrator,
1202                clusterd_image: "clusterd".into(),
1203                init_container_image: None,
1204                deploy_generation: 0,
1205                persist_location: PersistLocation {
1206                    blob_uri: format!(
1207                        "file://{}/persist/blob",
1208                        config.persist_dir.path().display()
1209                    )
1210                    .parse()
1211                    .expect("invalid blob URI"),
1212                    consensus_uri,
1213                },
1214                persist_clients,
1215                now: SYSTEM_TIME.clone(),
1216                metrics_registry: metrics_registry.clone(),
1217                persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
1218                secrets_args: mz_service::secrets::SecretsReaderCliArgs {
1219                    secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
1220                    secrets_reader_local_file_dir: Some(secrets_dir),
1221                    secrets_reader_kubernetes_context: None,
1222                    secrets_reader_aws_prefix: None,
1223                    secrets_reader_name_prefix: None,
1224                },
1225                connection_context,
1226                replica_http_locator: Arc::new(ReplicaHttpLocator::default()),
1227            },
1228            secrets_controller,
1229            cloud_resource_controller: None,
1230            tls: None,
1231            frontegg: None,
1232            frontegg_oauth_issuer_url: None,
1233            cors_allowed_origin: AllowOrigin::list([]),
1234            cors_allowed_origin_list: Vec::new(),
1235            unsafe_mode: true,
1236            all_features: false,
1237            metrics_registry,
1238            now,
1239            environment_id,
1240            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
1241            bootstrap_default_cluster_replica_size: config.replica_size.clone(),
1242            bootstrap_default_cluster_replication_factor: config
1243                .replicas
1244                .try_into()
1245                .expect("replicas must fit"),
1246            bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1247                replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1248                size: config.replica_size.clone(),
1249            },
1250            bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1251                replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1252                size: config.replica_size.clone(),
1253            },
1254            bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1255                replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1256                size: config.replica_size.clone(),
1257            },
1258            bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1259                replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1260                size: config.replica_size.clone(),
1261            },
1262            bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1263                replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1264                size: config.replica_size.clone(),
1265            },
1266            system_parameter_defaults: {
1267                let mut params = BTreeMap::new();
1268                params.insert(
1269                    "log_filter".to_string(),
1270                    config.tracing.startup_log_filter.to_string(),
1271                );
1272                params.extend(config.system_parameter_defaults.clone());
1273                params
1274            },
1275            availability_zones: Default::default(),
1276            tracing_handle: config.tracing_handle.clone(),
1277            storage_usage_collection_interval: Duration::from_secs(3600),
1278            storage_usage_retention_period: None,
1279            segment_api_key: None,
1280            segment_client_side: false,
1281            // SLT doesn't like eternally running tasks since it waits for them to finish inbetween SLT files
1282            test_only_dummy_segment_client: false,
1283            egress_addresses: vec![],
1284            aws_account_id: None,
1285            aws_privatelink_availability_zones: None,
1286            launchdarkly_sdk_key: None,
1287            launchdarkly_key_map: Default::default(),
1288            config_sync_file_path: None,
1289            config_sync_timeout: Duration::from_secs(30),
1290            config_sync_loop_interval: None,
1291            bootstrap_role: Some("materialize".into()),
1292            http_host_name: Some(host_name),
1293            internal_console_redirect_url: None,
1294            tls_reload_certs: mz_server_core::cert_reload_never_reload(),
1295            helm_chart_version: None,
1296            license_key: ValidatedLicenseKey::for_tests(),
1297            external_login_password_mz_system: None,
1298            force_builtin_schema_migration: None,
1299        };
1300        // We need to run the server on its own Tokio runtime, which in turn
1301        // requires its own thread, so that we can wait for any tasks spawned
1302        // by the server to be shutdown at the end of each file. If we were to
1303        // share a Tokio runtime, tasks from the last file's server would still
1304        // be live at the start of the next file's server.
1305        let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
1306            oneshot::channel();
1307        let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
1308        let (password_server_addr_tx, password_server_addr_rx) = oneshot::channel();
1309        let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
1310        let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
1311        let server_thread = thread::spawn(|| {
1312            let runtime = match Runtime::new() {
1313                Ok(runtime) => runtime,
1314                Err(e) => {
1315                    server_addr_tx
1316                        .send(Err(e.into()))
1317                        .expect("receiver should not drop first");
1318                    return;
1319                }
1320            };
1321            let server = match runtime.block_on(listeners.serve(server_config)) {
1322                Ok(runtime) => runtime,
1323                Err(e) => {
1324                    server_addr_tx
1325                        .send(Err(e.into()))
1326                        .expect("receiver should not drop first");
1327                    return;
1328                }
1329            };
1330            server_addr_tx
1331                .send(Ok(server.sql_listener_handles["external"].local_addr))
1332                .expect("receiver should not drop first");
1333            internal_server_addr_tx
1334                .send(server.sql_listener_handles["internal"].local_addr)
1335                .expect("receiver should not drop first");
1336            password_server_addr_tx
1337                .send(server.sql_listener_handles["password"].local_addr)
1338                .expect("receiver should not drop first");
1339            internal_http_server_addr_tx
1340                .send(server.http_listener_handles["internal"].local_addr)
1341                .expect("receiver should not drop first");
1342            runtime.block_on(shutdown_trigger_rx);
1343        });
1344        let server_addr = server_addr_rx.await??;
1345        let internal_server_addr = internal_server_addr_rx.await?;
1346        let password_server_addr = password_server_addr_rx.await?;
1347        let internal_http_server_addr = internal_http_server_addr_rx.await?;
1348
1349        let system_client = connect(internal_server_addr, Some("mz_system"), None)
1350            .await
1351            .unwrap();
1352        let client = connect(server_addr, None, None).await.unwrap();
1353
1354        let inner = RunnerInner {
1355            server_addr,
1356            internal_server_addr,
1357            password_server_addr,
1358            internal_http_server_addr,
1359            _shutdown_trigger: shutdown_trigger,
1360            _server_thread: server_thread.join_on_drop(),
1361            _temp_dir: temp_dir,
1362            client,
1363            system_client,
1364            clients: BTreeMap::new(),
1365            auto_index_tables: config.auto_index_tables,
1366            auto_index_selects: config.auto_index_selects,
1367            auto_transactions: config.auto_transactions,
1368            enable_table_keys: config.enable_table_keys,
1369            verbose: config.verbose,
1370            stdout: config.stdout,
1371        };
1372        inner.ensure_fixed_features().await?;
1373
1374        Ok(inner)
1375    }
1376
1377    /// Set features that should be enabled regardless of whether reset-server was
1378    /// called. These features may be set conditionally depending on the run configuration.
1379    #[allow(clippy::disallowed_methods)]
1380    async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
1381        // We turn on enable_reduce_mfp_fusion, as we wish
1382        // to get as much coverage of these features as we can.
1383        // TODO(vmarcos): Remove this code when we retire this feature flag.
1384        self.system_client
1385            .execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
1386            .await?;
1387
1388        // Dangerous functions are useful for tests so we enable it for all tests.
1389        self.system_client
1390            .execute("ALTER SYSTEM SET unsafe_enable_unsafe_functions = on", &[])
1391            .await?;
1392        Ok(())
1393    }
1394
1395    #[allow(clippy::disallowed_methods)]
1396    async fn run_record<'r>(
1397        &mut self,
1398        record: &'r Record<'r>,
1399        in_transaction: &mut bool,
1400        replacements: &[(Regex, String)],
1401    ) -> Result<Outcome<'r>, anyhow::Error> {
1402        match &record {
1403            Record::Statement {
1404                expected_error,
1405                rows_affected,
1406                sql,
1407                location,
1408            } => {
1409                if self.auto_transactions && *in_transaction {
1410                    self.client.execute("COMMIT", &[]).await?;
1411                    *in_transaction = false;
1412                }
1413                match self
1414                    .run_statement(*expected_error, *rows_affected, sql, location.clone())
1415                    .await?
1416                {
1417                    Outcome::Success => {
1418                        if self.auto_index_tables {
1419                            let additional = mutate(sql);
1420                            for stmt in additional {
1421                                self.client.execute(&stmt, &[]).await?;
1422                            }
1423                        }
1424                        Ok(Outcome::Success)
1425                    }
1426                    other => {
1427                        if expected_error.is_some() {
1428                            Ok(other)
1429                        } else {
1430                            // If we failed to execute a statement that was supposed to succeed,
1431                            // running the rest of the tests in this file will probably cause
1432                            // false positives, so just give up on the file entirely.
1433                            Ok(Outcome::Bail {
1434                                cause: Box::new(other),
1435                                location: location.clone(),
1436                            })
1437                        }
1438                    }
1439                }
1440            }
1441            Record::Query {
1442                sql,
1443                output,
1444                location,
1445            } => {
1446                self.run_query(sql, output, location.clone(), in_transaction, replacements)
1447                    .await
1448            }
1449            Record::Simple {
1450                conn,
1451                user,
1452                password,
1453                sql,
1454                sort,
1455                output,
1456                location,
1457                ..
1458            } => {
1459                self.run_simple(
1460                    *conn,
1461                    *user,
1462                    *password,
1463                    sql,
1464                    sort.clone(),
1465                    output,
1466                    location.clone(),
1467                )
1468                .await
1469            }
1470            Record::Copy {
1471                table_name,
1472                tsv_path,
1473            } => {
1474                let tsv = tokio::fs::read(tsv_path).await?;
1475                let copy = self
1476                    .client
1477                    .copy_in(sql!("COPY {} FROM STDIN", Sql::ident(*table_name)).as_str())
1478                    .await?;
1479                tokio::pin!(copy);
1480                copy.send(bytes::Bytes::from(tsv)).await?;
1481                copy.finish().await?;
1482                Ok(Outcome::Success)
1483            }
1484            _ => Ok(Outcome::Success),
1485        }
1486    }
1487
1488    #[allow(clippy::disallowed_methods)]
1489    async fn run_statement<'r>(
1490        &self,
1491        expected_error: Option<&'r str>,
1492        expected_rows_affected: Option<u64>,
1493        sql: &'r str,
1494        location: Location,
1495    ) -> Result<Outcome<'r>, anyhow::Error> {
1496        static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
1497            LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
1498        if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
1499            // sure, we totally made you an index
1500            return Ok(Outcome::Success);
1501        }
1502
1503        match self.client.execute(sql, &[]).await {
1504            Ok(actual) => {
1505                if let Some(expected_error) = expected_error {
1506                    return Ok(Outcome::UnexpectedPlanSuccess {
1507                        expected_error,
1508                        location,
1509                    });
1510                }
1511                match expected_rows_affected {
1512                    None => Ok(Outcome::Success),
1513                    Some(expected) => {
1514                        if expected != actual {
1515                            Ok(Outcome::WrongNumberOfRowsInserted {
1516                                expected_count: expected,
1517                                actual_count: actual,
1518                                location,
1519                            })
1520                        } else {
1521                            Ok(Outcome::Success)
1522                        }
1523                    }
1524                }
1525            }
1526            Err(error) => {
1527                if let Some(expected_error) = expected_error {
1528                    if Regex::new(expected_error)?.is_match(&error.to_string_with_causes()) {
1529                        return Ok(Outcome::Success);
1530                    }
1531                    return Ok(Outcome::PlanFailure {
1532                        error: anyhow!(error),
1533                        expected_error: Some(expected_error.to_string()),
1534                        location,
1535                    });
1536                }
1537                Ok(Outcome::PlanFailure {
1538                    error: anyhow!(error),
1539                    expected_error: None,
1540                    location,
1541                })
1542            }
1543        }
1544    }
1545
1546    #[allow(clippy::disallowed_methods)]
1547    async fn prepare_query<'r>(
1548        &self,
1549        sql: &str,
1550        output: &'r Result<QueryOutput<'_>, &'r str>,
1551        location: Location,
1552        in_transaction: &mut bool,
1553    ) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
1554        // get statement
1555        let statements = match mz_sql::parse::parse(sql) {
1556            Ok(statements) => statements,
1557            Err(e) => match output {
1558                Ok(_) => {
1559                    return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1560                        error: e.into(),
1561                        location,
1562                    }));
1563                }
1564                Err(expected_error) => {
1565                    if Regex::new(expected_error)?.is_match(&e.to_string_with_causes()) {
1566                        return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
1567                    } else {
1568                        return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1569                            error: e.into(),
1570                            location,
1571                        }));
1572                    }
1573                }
1574            },
1575        };
1576        let statement = match &*statements {
1577            [] => bail!("Got zero statements?"),
1578            [statement] => &statement.ast,
1579            _ => bail!("Got multiple statements: {:?}", statements),
1580        };
1581        let (is_select, num_attributes, has_as_of) = match statement {
1582            Statement::Select(stmt) => (
1583                true,
1584                derive_num_attributes(&stmt.query.body),
1585                stmt.as_of.is_some(),
1586            ),
1587            _ => (false, None, false),
1588        };
1589
1590        match output {
1591            Ok(_) => {
1592                if self.auto_transactions && !*in_transaction {
1593                    // No ISOLATION LEVEL SERIALIZABLE because of database-issues#5323
1594                    self.client.execute("BEGIN", &[]).await?;
1595                    *in_transaction = true;
1596                }
1597            }
1598            Err(_) => {
1599                if self.auto_transactions && *in_transaction {
1600                    self.client.execute("COMMIT", &[]).await?;
1601                    *in_transaction = false;
1602                }
1603            }
1604        }
1605
1606        // `SHOW` commands reference catalog schema, thus are not in the same timedomain and not
1607        // allowed in the same transaction, see:
1608        // https://materialize.com/docs/sql/begin/#same-timedomain-error
1609        match statement {
1610            Statement::Show(..) => {
1611                if self.auto_transactions && *in_transaction {
1612                    self.client.execute("COMMIT", &[]).await?;
1613                    *in_transaction = false;
1614                }
1615            }
1616            _ => (),
1617        }
1618        Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
1619            is_select,
1620            num_attributes,
1621            has_as_of,
1622        }))
1623    }
1624
1625    #[allow(clippy::disallowed_methods)]
1626    async fn execute_query<'r>(
1627        &self,
1628        sql: &str,
1629        output: &'r Result<QueryOutput<'_>, &'r str>,
1630        location: Location,
1631        replacements: &[(Regex, String)],
1632    ) -> Result<Outcome<'r>, anyhow::Error> {
1633        let rows = match self.client.query(sql, &[]).await {
1634            Ok(rows) => rows,
1635            Err(error) => {
1636                let error_string = error.to_string_with_causes();
1637                return match output {
1638                    Ok(_) => {
1639                        if error_string.contains("supported") || error_string.contains("overload") {
1640                            // this is a failure, but it's caused by lack of support rather than by bugs
1641                            Ok(Outcome::Unsupported {
1642                                error: anyhow!(error),
1643                                location,
1644                            })
1645                        } else {
1646                            Ok(Outcome::PlanFailure {
1647                                error: anyhow!(error),
1648                                expected_error: None,
1649                                location,
1650                            })
1651                        }
1652                    }
1653                    Err(expected_error) => {
1654                        if Regex::new(expected_error)?.is_match(&error_string) {
1655                            Ok(Outcome::Success)
1656                        } else {
1657                            Ok(Outcome::PlanFailure {
1658                                error: anyhow!(error),
1659                                expected_error: Some(expected_error.to_string()),
1660                                location,
1661                            })
1662                        }
1663                    }
1664                };
1665            }
1666        };
1667
1668        // unpack expected output
1669        let QueryOutput {
1670            sort,
1671            types: expected_types,
1672            column_names: expected_column_names,
1673            output: expected_output,
1674            mode,
1675            ..
1676        } = match output {
1677            Err(expected_error) => {
1678                return Ok(Outcome::UnexpectedPlanSuccess {
1679                    expected_error,
1680                    location,
1681                });
1682            }
1683            Ok(query_output) => query_output,
1684        };
1685
1686        // format output
1687        let mut formatted_rows = vec![];
1688        for row in &rows {
1689            if row.len() != expected_types.len() {
1690                return Ok(Outcome::WrongColumnCount {
1691                    expected_count: expected_types.len(),
1692                    actual_count: row.len(),
1693                    location,
1694                });
1695            }
1696            let row = format_row(row, expected_types, *mode);
1697            formatted_rows.push(row);
1698        }
1699
1700        // sort formatted output
1701        if let Sort::Row = sort {
1702            formatted_rows.sort();
1703        }
1704        let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
1705        if let Sort::Value = sort {
1706            values.sort();
1707        }
1708
1709        // Apply any `replace` substitutions to the actual output, so that
1710        // non-deterministic tokens (e.g. a folded `mz_now()` timestamp) are
1711        // masked before the output is compared against, or rewritten into, the
1712        // expected output.
1713        if !replacements.is_empty() {
1714            for value in &mut values {
1715                for (regex, replacement) in replacements {
1716                    *value = regex.replace_all(value, replacement.as_str()).into_owned();
1717                }
1718            }
1719        }
1720
1721        // Various checks as long as there are returned rows.
1722        if let Some(row) = rows.get(0) {
1723            // check column names
1724            if let Some(expected_column_names) = expected_column_names {
1725                let actual_column_names = row
1726                    .columns()
1727                    .iter()
1728                    .map(|t| ColumnName::from(t.name()))
1729                    .collect::<Vec<_>>();
1730                if expected_column_names != &actual_column_names {
1731                    return Ok(Outcome::WrongColumnNames {
1732                        expected_column_names,
1733                        actual_column_names,
1734                        actual_output: Output::Values(values),
1735                        location,
1736                    });
1737                }
1738            }
1739        }
1740
1741        // check output
1742        match expected_output {
1743            Output::Values(expected_values) => {
1744                if values != *expected_values {
1745                    return Ok(Outcome::OutputFailure {
1746                        expected_output,
1747                        actual_raw_output: rows,
1748                        actual_output: Output::Values(values),
1749                        location,
1750                    });
1751                }
1752            }
1753            Output::Hashed {
1754                num_values,
1755                md5: expected_md5,
1756            } => {
1757                let mut hasher = Md5::new();
1758                for value in &values {
1759                    hasher.update(value);
1760                    hasher.update("\n");
1761                }
1762                let md5 = format!("{:x}", hasher.finalize());
1763                if values.len() != *num_values || md5 != *expected_md5 {
1764                    return Ok(Outcome::OutputFailure {
1765                        expected_output,
1766                        actual_raw_output: rows,
1767                        actual_output: Output::Hashed {
1768                            num_values: values.len(),
1769                            md5,
1770                        },
1771                        location,
1772                    });
1773                }
1774            }
1775        }
1776
1777        Ok(Outcome::Success)
1778    }
1779
1780    #[allow(clippy::disallowed_methods)]
1781    async fn execute_view_inner<'r>(
1782        &self,
1783        sql: &str,
1784        output: &'r Result<QueryOutput<'_>, &'r str>,
1785        location: Location,
1786    ) -> Result<Option<Outcome<'r>>, anyhow::Error> {
1787        print_sql_if(self.stdout, sql, self.verbose);
1788        let sql_result = self.client.execute(sql, &[]).await;
1789
1790        // Evaluate if we already reached an outcome or not.
1791        let tentative_outcome = if let Err(view_error) = sql_result {
1792            if let Err(expected_error) = output {
1793                if Regex::new(expected_error)?.is_match(&view_error.to_string_with_causes()) {
1794                    Some(Outcome::Success)
1795                } else {
1796                    Some(Outcome::PlanFailure {
1797                        error: view_error.into(),
1798                        expected_error: Some(expected_error.to_string()),
1799                        location: location.clone(),
1800                    })
1801                }
1802            } else {
1803                Some(Outcome::PlanFailure {
1804                    error: view_error.into(),
1805                    expected_error: None,
1806                    location: location.clone(),
1807                })
1808            }
1809        } else {
1810            None
1811        };
1812        Ok(tentative_outcome)
1813    }
1814
1815    #[allow(clippy::disallowed_methods)]
1816    async fn execute_view<'r>(
1817        &self,
1818        sql: &str,
1819        num_attributes: Option<usize>,
1820        output: &'r Result<QueryOutput<'_>, &'r str>,
1821        location: Location,
1822        replacements: &[(Regex, String)],
1823    ) -> Result<Outcome<'r>, anyhow::Error> {
1824        // Create indexed view SQL commands and execute `CREATE VIEW`.
1825        let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
1826            column_names.clone()
1827        } else {
1828            None
1829        };
1830        let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
1831            sql,
1832            Uuid::new_v4().as_simple(),
1833            num_attributes,
1834            expected_column_names,
1835        );
1836        let tentative_outcome = self
1837            .execute_view_inner(create_view.as_str(), output, location.clone())
1838            .await?;
1839
1840        // Either we already have an outcome or alternatively,
1841        // we proceed to index and query the view.
1842        if let Some(view_outcome) = tentative_outcome {
1843            return Ok(view_outcome);
1844        }
1845
1846        let tentative_outcome = self
1847            .execute_view_inner(create_index.as_str(), output, location.clone())
1848            .await?;
1849
1850        let view_outcome;
1851        if let Some(outcome) = tentative_outcome {
1852            view_outcome = outcome;
1853        } else {
1854            print_sql_if(self.stdout, view_sql.as_str(), self.verbose);
1855            view_outcome = self
1856                .execute_query(view_sql.as_str(), output, location.clone(), replacements)
1857                .await?;
1858        }
1859
1860        // Remember to clean up after ourselves by dropping the view.
1861        print_sql_if(self.stdout, drop_view.as_str(), self.verbose);
1862        self.client.execute(drop_view.as_str(), &[]).await?;
1863
1864        Ok(view_outcome)
1865    }
1866
1867    async fn run_query<'r>(
1868        &self,
1869        sql: &'r str,
1870        output: &'r Result<QueryOutput<'_>, &'r str>,
1871        location: Location,
1872        in_transaction: &mut bool,
1873        replacements: &[(Regex, String)],
1874    ) -> Result<Outcome<'r>, anyhow::Error> {
1875        let prepare_outcome = self
1876            .prepare_query(sql, output, location.clone(), in_transaction)
1877            .await?;
1878        match prepare_outcome {
1879            PrepareQueryOutcome::QueryPrepared(QueryInfo {
1880                is_select,
1881                num_attributes,
1882                has_as_of,
1883            }) => {
1884                let query_outcome = self
1885                    .execute_query(sql, output, location.clone(), replacements)
1886                    .await?;
1887                // `AS OF` queries are excluded from the indexed-view consistency
1888                // check: re-running them against a freshly created indexed view
1889                // is racy, since the view's `since` can advance past a historical
1890                // `AS OF` and spuriously fail with "could not find a valid
1891                // timestamp" even though the one-shot query succeeded.
1892                if is_select && !has_as_of && self.auto_index_selects && query_outcome.success() {
1893                    let view_outcome = self
1894                        .execute_view(sql, None, output, location.clone(), replacements)
1895                        .await?;
1896
1897                    if !view_outcome.success() {
1898                        // Before producing a failure outcome, we try to obtain a new
1899                        // outcome for view-based execution exploiting analysis of the
1900                        // number of attributes. This two-level strategy can avoid errors
1901                        // produced by column ambiguity in the `SELECT`.
1902                        let view_outcome = if num_attributes.is_some() {
1903                            self.execute_view(
1904                                sql,
1905                                num_attributes,
1906                                output,
1907                                location.clone(),
1908                                replacements,
1909                            )
1910                            .await?
1911                        } else {
1912                            view_outcome
1913                        };
1914
1915                        if !view_outcome.success() {
1916                            let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
1917                                query_outcome: Box::new(query_outcome),
1918                                view_outcome: Box::new(view_outcome),
1919                                location: location.clone(),
1920                            };
1921                            // Determine if this inconsistent view outcome should be reported
1922                            // as an error or only as a warning.
1923                            let outcome = if should_warn(&inconsistent_view_outcome) {
1924                                Outcome::Warning {
1925                                    cause: Box::new(inconsistent_view_outcome),
1926                                    location: location.clone(),
1927                                }
1928                            } else {
1929                                inconsistent_view_outcome
1930                            };
1931                            return Ok(outcome);
1932                        }
1933                    }
1934                }
1935                Ok(query_outcome)
1936            }
1937            PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
1938        }
1939    }
1940
1941    async fn get_conn(
1942        &mut self,
1943        name: Option<&str>,
1944        user: Option<&str>,
1945        password: Option<&str>,
1946    ) -> Result<&tokio_postgres::Client, tokio_postgres::Error> {
1947        match name {
1948            None => Ok(&self.client),
1949            Some(name) => {
1950                if !self.clients.contains_key(name) {
1951                    let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
1952                        self.internal_server_addr
1953                    } else if password.is_some() {
1954                        // Use password server for password authentication
1955                        self.password_server_addr
1956                    } else {
1957                        self.server_addr
1958                    };
1959                    let client = connect(addr, user, password).await?;
1960                    self.clients.insert(name.into(), client);
1961                }
1962                Ok(self.clients.get(name).unwrap())
1963            }
1964        }
1965    }
1966
1967    #[allow(clippy::disallowed_methods)]
1968    async fn run_simple<'r>(
1969        &mut self,
1970        conn: Option<&'r str>,
1971        user: Option<&'r str>,
1972        password: Option<&'r str>,
1973        sql: &'r str,
1974        sort: Sort,
1975        output: &'r Output,
1976        location: Location,
1977    ) -> Result<Outcome<'r>, anyhow::Error> {
1978        let actual = match self.get_conn(conn, user, password).await {
1979            Ok(client) => match client.simple_query(sql).await {
1980                Ok(result) => {
1981                    let mut rows = Vec::new();
1982
1983                    for m in result.into_iter() {
1984                        match m {
1985                            SimpleQueryMessage::Row(row) => {
1986                                let mut s = vec![];
1987                                for i in 0..row.len() {
1988                                    s.push(row.get(i).unwrap_or("NULL"));
1989                                }
1990                                rows.push(s.join(","));
1991                            }
1992                            SimpleQueryMessage::CommandComplete(count) => {
1993                                // This applies any sort on the COMPLETE line as
1994                                // well, but we do the same for the expected output.
1995                                rows.push(format!("COMPLETE {}", count));
1996                            }
1997                            SimpleQueryMessage::RowDescription(_) => {}
1998                            _ => panic!("unexpected"),
1999                        }
2000                    }
2001
2002                    if let Sort::Row = sort {
2003                        rows.sort();
2004                    }
2005
2006                    Output::Values(rows)
2007                }
2008                // Errors can contain multiple lines (say if there are details), and rewrite
2009                // sticks them each on their own line, so we need to split up the lines here to
2010                // each be its own String in the Vec.
2011                Err(error) => Output::Values(
2012                    error
2013                        .to_string_with_causes()
2014                        .lines()
2015                        .map(|s| s.to_string())
2016                        .collect(),
2017                ),
2018            },
2019            Err(error) => Output::Values(
2020                error
2021                    .to_string_with_causes()
2022                    .lines()
2023                    .map(|s| s.to_string())
2024                    .collect(),
2025            ),
2026        };
2027        if *output != actual {
2028            Ok(Outcome::OutputFailure {
2029                expected_output: output,
2030                actual_raw_output: vec![],
2031                actual_output: actual,
2032                location,
2033            })
2034        } else {
2035            Ok(Outcome::Success)
2036        }
2037    }
2038
2039    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
2040        let url = format!(
2041            "http://{}/api/catalog/check",
2042            self.internal_http_server_addr
2043        );
2044        let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
2045
2046        if let Some(inconsistencies) = response.get("err") {
2047            let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
2048                .expect("serializing Value cannot fail");
2049            Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
2050        } else {
2051            Ok(())
2052        }
2053    }
2054}
2055
2056async fn connect(
2057    addr: SocketAddr,
2058    user: Option<&str>,
2059    password: Option<&str>,
2060) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
2061    let mut config = tokio_postgres::Config::new();
2062    config.host(addr.ip().to_string());
2063    config.port(addr.port());
2064    config.user(user.unwrap_or("materialize"));
2065    if let Some(password) = password {
2066        config.password(password);
2067    }
2068    let (client, connection) = config.connect(NoTls).await?;
2069
2070    task::spawn(|| "sqllogictest_connect", async move {
2071        if let Err(e) = connection.await {
2072            eprintln!("connection error: {}", e);
2073        }
2074    });
2075    Ok(client)
2076}
2077
2078pub trait WriteFmt {
2079    fn write_fmt(&self, fmt: fmt::Arguments<'_>);
2080}
2081
2082pub struct RunConfig<'a> {
2083    pub stdout: &'a dyn WriteFmt,
2084    pub stderr: &'a dyn WriteFmt,
2085    pub verbose: bool,
2086    pub quiet: bool,
2087    pub postgres_url: String,
2088    pub prefix: String,
2089    pub no_fail: bool,
2090    pub fail_fast: bool,
2091    pub auto_index_tables: bool,
2092    pub auto_index_selects: bool,
2093    pub auto_transactions: bool,
2094    pub enable_table_keys: bool,
2095    pub orchestrator_process_wrapper: Option<String>,
2096    pub tracing: TracingCliArgs,
2097    pub tracing_handle: TracingHandle,
2098    pub system_parameter_defaults: BTreeMap<String, String>,
2099    /// Persist state is handled specially because:
2100    /// - Persist background workers do not necessarily shut down immediately once the server is
2101    ///   shut down, and may panic if their storage is deleted out from under them.
2102    /// - It's safe for different databases to reference the same state: all data is scoped by UUID.
2103    pub persist_dir: TempDir,
2104    pub replicas: usize,
2105    pub replica_size: String,
2106}
2107
2108/// Indentation used for verbose output of SQL statements.
2109const PRINT_INDENT: usize = 4;
2110
2111fn print_record(config: &RunConfig<'_>, record: &Record) {
2112    match record {
2113        Record::Statement { sql, .. } | Record::Query { sql, .. } => {
2114            print_sql(config.stdout, sql, None)
2115        }
2116        Record::Simple { conn, sql, .. } => print_sql(config.stdout, sql, *conn),
2117        Record::Copy {
2118            table_name,
2119            tsv_path,
2120        } => {
2121            writeln!(
2122                config.stdout,
2123                "{}slt copy {} from {}",
2124                " ".repeat(PRINT_INDENT),
2125                table_name,
2126                tsv_path
2127            )
2128        }
2129        Record::ResetServer => {
2130            writeln!(config.stdout, "{}reset-server", " ".repeat(PRINT_INDENT))
2131        }
2132        Record::Halt => {
2133            writeln!(config.stdout, "{}halt", " ".repeat(PRINT_INDENT))
2134        }
2135        Record::HashThreshold { threshold } => {
2136            writeln!(
2137                config.stdout,
2138                "{}hash-threshold {}",
2139                " ".repeat(PRINT_INDENT),
2140                threshold
2141            )
2142        }
2143        Record::Replace {
2144            pattern,
2145            replacement,
2146        } => {
2147            writeln!(
2148                config.stdout,
2149                "{}replace {}  {}",
2150                " ".repeat(PRINT_INDENT),
2151                pattern,
2152                replacement
2153            )
2154        }
2155    }
2156}
2157
2158fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
2159    if cond {
2160        print_sql(stdout, sql, None)
2161    }
2162}
2163
2164fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str, conn: Option<&str>) {
2165    let text = if let Some(conn) = conn {
2166        format!("[conn={}] {}", conn, sql)
2167    } else {
2168        sql.to_string()
2169    };
2170    writeln!(stdout, "{}", util::indent(&text, PRINT_INDENT))
2171}
2172
2173/// Regular expressions for matching error messages that should force a plan failure
2174/// in an inconsistent view outcome into a warning if the corresponding query succeeds.
2175const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
2176    // The following are unfixable errors in indexed views given our
2177    // current constraints.
2178    "cannot materialize call to",
2179    "SHOW commands are not allowed in views",
2180    "cannot create view with unstable dependencies",
2181    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
2182    "no valid schema selected",
2183    r#"system schema '\w+' cannot be modified"#,
2184    r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
2185    // NOTE(vmarcos): Column ambiguity that could not be eliminated by our
2186    // currently implemented syntactic rewrites is considered unfixable.
2187    // In addition, if some column cannot be dealt with, e.g., in `ORDER BY`
2188    // references, we treat this condition as unfixable as well.
2189    r#"column "[\w\?]+" specified more than once"#,
2190    r#"column "(\w+\.)?\w+" does not exist"#,
2191];
2192
2193/// Evaluates if the given outcome should be returned directly or if it should
2194/// be wrapped as a warning. Note that this function should be used for outcomes
2195/// that can be judged in a context-independent manner, i.e., the outcome itself
2196/// provides enough information as to whether a warning should be emitted or not.
2197fn should_warn(outcome: &Outcome) -> bool {
2198    match outcome {
2199        Outcome::InconsistentViewOutcome { view_outcome, .. } => match view_outcome.as_ref() {
2200            Outcome::PlanFailure { error, .. } => {
2201                INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
2202                    Regex::new(s)
2203                        .expect("unexpected error in regular expression parsing")
2204                        .is_match(&error.to_string_with_causes())
2205                })
2206            }
2207            _ => false,
2208        },
2209        _ => false,
2210    }
2211}
2212
2213pub async fn run_string(
2214    runner: &mut Runner<'_>,
2215    source: &str,
2216    input: &str,
2217) -> Result<Outcomes, anyhow::Error> {
2218    runner.reset_database().await?;
2219    // `replace` directives are scoped to a single file; clear any registered by
2220    // a previous file so they do not leak into this one.
2221    runner.replacements.clear();
2222
2223    let mut outcomes = Outcomes::default();
2224    let mut parser = crate::parser::Parser::new(source, input);
2225    // Transactions are currently relatively slow. Since sqllogictest runs in a single connection
2226    // there should be no difference in having longer running transactions.
2227    let mut in_transaction = false;
2228    writeln!(runner.config.stdout, "--- {}", source);
2229
2230    for record in parser.parse_records()? {
2231        // In maximal-verbose mode, print the query before attempting to run
2232        // it. Running the query might panic, so it is important to print out
2233        // what query we are trying to run *before* we panic.
2234        if runner.config.verbose {
2235            print_record(runner.config, &record);
2236        }
2237
2238        let outcome = runner
2239            .run_record(&record, &mut in_transaction)
2240            .await
2241            .map_err(|err| format!("In {}:\n{}", source, err))
2242            .unwrap();
2243
2244        // Print warnings and failures in verbose mode.
2245        if !runner.config.quiet && !outcome.success() {
2246            if !runner.config.verbose {
2247                // If `verbose` is enabled, we'll already have printed the record,
2248                // so don't print it again. Yes, this is an ugly bit of logic.
2249                // Please don't try to consolidate it with the `print_record`
2250                // call above, as it's important to have a mode in which records
2251                // are printed before they are run, so that if running the
2252                // record panics, you can tell which record caused it.
2253                if !outcome.failure() {
2254                    writeln!(
2255                        runner.config.stdout,
2256                        "{}",
2257                        util::indent("Warning detected for: ", 4)
2258                    );
2259                }
2260                print_record(runner.config, &record);
2261            }
2262            if runner.config.verbose || outcome.failure() {
2263                writeln!(
2264                    runner.config.stdout,
2265                    "{}",
2266                    util::indent(&outcome.to_string(), 4)
2267                );
2268                writeln!(runner.config.stdout, "{}", util::indent("----", 4));
2269            }
2270        }
2271
2272        outcomes.stats[outcome.code()] += 1;
2273        if outcome.failure() {
2274            outcomes.details.push(format!("{}", outcome));
2275        }
2276
2277        if let Outcome::Bail { .. } = outcome {
2278            break;
2279        }
2280
2281        if runner.config.fail_fast && outcome.failure() {
2282            break;
2283        }
2284    }
2285    Ok(outcomes)
2286}
2287
2288pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
2289    let mut input = String::new();
2290    File::open(filename)?.read_to_string(&mut input)?;
2291    let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
2292    runner.check_catalog().await?;
2293
2294    Ok(outcomes)
2295}
2296
2297pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
2298    runner.reset_database().await?;
2299    // `replace` directives are scoped to a single file; clear any registered by
2300    // a previous file so they do not leak into this one.
2301    runner.replacements.clear();
2302
2303    let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
2304
2305    let mut input = String::new();
2306    file.read_to_string(&mut input)?;
2307
2308    let mut buf = RewriteBuffer::new(&input);
2309
2310    let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
2311    writeln!(runner.config.stdout, "--- {}", filename.display());
2312    let mut in_transaction = false;
2313
2314    fn append_values_output(
2315        buf: &mut RewriteBuffer,
2316        input: &String,
2317        expected_output: &str,
2318        mode: &Mode,
2319        types: &Vec<Type>,
2320        column_names: Option<&Vec<ColumnName>>,
2321        actual_output: &Vec<String>,
2322        multiline: bool,
2323    ) {
2324        buf.append_header(input, expected_output, column_names);
2325
2326        for (i, row) in actual_output.chunks(types.len()).enumerate() {
2327            match mode {
2328                // In Cockroach mode, output each row on its own line, with
2329                // two spaces between each column.
2330                Mode::Cockroach => {
2331                    if i != 0 {
2332                        buf.append("\n");
2333                    }
2334
2335                    if row.len() == 0 {
2336                        // nothing to do
2337                    } else if row.len() == 1 {
2338                        // If there is only one column, then there is no need for space
2339                        // substitution, so we only do newline substitution.
2340                        if multiline {
2341                            buf.append(&row[0]);
2342                        } else {
2343                            buf.append(&row[0].replace('\n', "⏎"))
2344                        }
2345                    } else {
2346                        // Substitute spaces with ␠ to avoid mistaking the spaces in the result
2347                        // values with spaces that separate columns.
2348                        buf.append(
2349                            &row.iter()
2350                                .map(|col| {
2351                                    let mut col = col.replace(' ', "␠");
2352                                    if !multiline {
2353                                        col = col.replace('\n', "⏎");
2354                                    }
2355                                    col
2356                                })
2357                                .join("  "),
2358                        );
2359                    }
2360                }
2361                // In standard mode, output each value on its own line,
2362                // and ignore row boundaries.
2363                // No need to substitute spaces, because every value (not row) is on a separate
2364                // line. But we do need to substitute newlines.
2365                Mode::Standard => {
2366                    for (j, col) in row.iter().enumerate() {
2367                        if i != 0 || j != 0 {
2368                            buf.append("\n");
2369                        }
2370                        buf.append(&if multiline {
2371                            col.clone()
2372                        } else {
2373                            col.replace('\n', "⏎")
2374                        });
2375                    }
2376                }
2377            }
2378        }
2379    }
2380
2381    for record in parser.parse_records()? {
2382        let outcome = runner.run_record(&record, &mut in_transaction).await?;
2383
2384        match (&record, &outcome) {
2385            // If we see an output failure for a query, rewrite the expected output
2386            // to match the observed output.
2387            (
2388                Record::Query {
2389                    output:
2390                        Ok(QueryOutput {
2391                            mode,
2392                            output: Output::Values(_),
2393                            output_str: expected_output,
2394                            types,
2395                            column_names,
2396                            multiline,
2397                            ..
2398                        }),
2399                    ..
2400                },
2401                Outcome::OutputFailure {
2402                    actual_output: Output::Values(actual_output),
2403                    ..
2404                },
2405            ) => {
2406                append_values_output(
2407                    &mut buf,
2408                    &input,
2409                    expected_output,
2410                    mode,
2411                    types,
2412                    column_names.as_ref(),
2413                    actual_output,
2414                    *multiline,
2415                );
2416            }
2417            (
2418                Record::Query {
2419                    output:
2420                        Ok(QueryOutput {
2421                            mode,
2422                            output: Output::Values(_),
2423                            output_str: expected_output,
2424                            types,
2425                            multiline,
2426                            ..
2427                        }),
2428                    ..
2429                },
2430                Outcome::WrongColumnNames {
2431                    actual_column_names,
2432                    actual_output: Output::Values(actual_output),
2433                    ..
2434                },
2435            ) => {
2436                append_values_output(
2437                    &mut buf,
2438                    &input,
2439                    expected_output,
2440                    mode,
2441                    types,
2442                    Some(actual_column_names),
2443                    actual_output,
2444                    *multiline,
2445                );
2446            }
2447            (
2448                Record::Query {
2449                    output:
2450                        Ok(QueryOutput {
2451                            output: Output::Hashed { .. },
2452                            output_str: expected_output,
2453                            column_names,
2454                            ..
2455                        }),
2456                    ..
2457                },
2458                Outcome::OutputFailure {
2459                    actual_output: Output::Hashed { num_values, md5 },
2460                    ..
2461                },
2462            ) => {
2463                buf.append_header(&input, expected_output, column_names.as_ref());
2464
2465                buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
2466            }
2467            (
2468                Record::Simple {
2469                    output_str: expected_output,
2470                    ..
2471                },
2472                Outcome::OutputFailure {
2473                    actual_output: Output::Values(actual_output),
2474                    ..
2475                },
2476            ) => {
2477                buf.append_header(&input, expected_output, None);
2478
2479                for (i, row) in actual_output.iter().enumerate() {
2480                    if i != 0 {
2481                        buf.append("\n");
2482                    }
2483                    buf.append(row);
2484                }
2485            }
2486            (
2487                Record::Query {
2488                    sql,
2489                    output: Err(err),
2490                    ..
2491                },
2492                outcome,
2493            )
2494            | (
2495                Record::Statement {
2496                    expected_error: Some(err),
2497                    sql,
2498                    ..
2499                },
2500                outcome,
2501            ) if outcome.err_msg().is_some() => {
2502                buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
2503            }
2504            (_, Outcome::Success) => {}
2505            _ => bail!("unexpected: {:?} {:?}", record, outcome),
2506        }
2507    }
2508
2509    file.set_len(0)?;
2510    file.seek(SeekFrom::Start(0))?;
2511    file.write_all(buf.finish().as_bytes())?;
2512    file.sync_all()?;
2513    Ok(())
2514}
2515
2516/// Provides a means to rewrite the `.slt` file while iterating over it.
2517///
2518/// This struct takes the slt file as its `input`, tracks a cursor into it
2519/// (`input_offset`), and provides a buffer (`output`) to store the rewritten
2520/// results.
2521///
2522/// Functions that modify the file will lazily move `input` into `output` using
2523/// `flush_to`. However, those calls should all be interior to other functions.
2524#[derive(Debug)]
2525struct RewriteBuffer<'a> {
2526    input: &'a str,
2527    input_offset: usize,
2528    output: String,
2529}
2530
2531impl<'a> RewriteBuffer<'a> {
2532    fn new(input: &'a str) -> RewriteBuffer<'a> {
2533        RewriteBuffer {
2534            input,
2535            input_offset: 0,
2536            output: String::new(),
2537        }
2538    }
2539
2540    fn flush_to(&mut self, offset: usize) {
2541        assert!(offset >= self.input_offset);
2542        let chunk = &self.input[self.input_offset..offset];
2543        self.output.push_str(chunk);
2544        self.input_offset = offset;
2545    }
2546
2547    fn skip_to(&mut self, offset: usize) {
2548        assert!(offset >= self.input_offset);
2549        self.input_offset = offset;
2550    }
2551
2552    fn append(&mut self, s: &str) {
2553        self.output.push_str(s);
2554    }
2555
2556    fn append_header(
2557        &mut self,
2558        input: &String,
2559        expected_output: &str,
2560        column_names: Option<&Vec<ColumnName>>,
2561    ) {
2562        // Output everything before this record.
2563        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2564        #[allow(clippy::as_conversions)]
2565        let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
2566        self.flush_to(offset);
2567        self.skip_to(offset + expected_output.len());
2568
2569        // Attempt to install the result separator (----), if it does
2570        // not already exist.
2571        if self.peek_last(5) == "\n----" {
2572            self.append("\n");
2573        } else if self.peek_last(6) != "\n----\n" {
2574            self.append("\n----\n");
2575        }
2576
2577        let Some(names) = column_names else {
2578            return;
2579        };
2580        self.append(
2581            &names
2582                .iter()
2583                .map(|name| name.replace(' ', "␠"))
2584                .collect::<Vec<_>>()
2585                .join(" "),
2586        );
2587        self.append("\n");
2588    }
2589
2590    fn rewrite_expected_error(
2591        &mut self,
2592        input: &String,
2593        old_err: &str,
2594        new_err: &str,
2595        query: &str,
2596    ) {
2597        // Output everything before this error message.
2598        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2599        #[allow(clippy::as_conversions)]
2600        let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
2601        self.flush_to(err_offset);
2602        self.append(new_err);
2603        self.append("\n");
2604        self.append(query);
2605        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2606        #[allow(clippy::as_conversions)]
2607        self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
2608    }
2609
2610    fn peek_last(&self, n: usize) -> &str {
2611        &self.output[self.output.len() - n..]
2612    }
2613
2614    fn finish(mut self) -> String {
2615        self.flush_to(self.input.len());
2616        self.output
2617    }
2618}
2619
2620/// Generates view creation, view indexing, view querying, and view
2621/// dropping SQL commands for a given `SELECT` query. If the number
2622/// of attributes produced by the query is known, the view commands
2623/// are specialized to avoid issues with column ambiguity. This
2624/// function is a helper for `--auto_index_selects` and assumes that
2625/// the provided input SQL has already been run through the parser,
2626/// resulting in a valid `SELECT` statement.
2627fn generate_view_sql(
2628    sql: &str,
2629    view_uuid: &Simple,
2630    num_attributes: Option<usize>,
2631    expected_column_names: Option<Vec<ColumnName>>,
2632) -> (String, String, String, String) {
2633    // To create the view, re-parse the sql; note that we must find exactly
2634    // one statement and it must be a `SELECT`.
2635    // NOTE(vmarcos): Direct string manipulation was attempted while
2636    // prototyping the code below, which avoids the extra parsing and
2637    // data structure cloning. However, running DDL is so slow that
2638    // it did not matter in terms of runtime. We can revisit this if
2639    // DDL cost drops dramatically in the future.
2640    let stmts = parser::parse_statements(sql).unwrap_or_default();
2641    assert!(stmts.len() == 1);
2642    let (query, query_as_of) = match &stmts[0].ast {
2643        Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
2644        _ => unreachable!("This function should only be called for SELECTs"),
2645    };
2646
2647    // Prior to creating the view, process the `ORDER BY` clause of
2648    // the `SELECT` query, if any. Ordering is not preserved when a
2649    // view includes an `ORDER BY` clause and must be re-enforced by
2650    // an external `ORDER BY` clause when querying the view.
2651    let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
2652        (query.order_by.clone(), vec![], None)
2653    } else {
2654        derive_order_by(&query.body, &query.order_by)
2655    };
2656
2657    // Since one-shot SELECT statements may contain ambiguous column names,
2658    // we either use the expected column names, if that option was
2659    // provided, or else just rename the output schema of the view
2660    // using numerically increasing attribute names, whenever possible.
2661    // This strategy makes it possible to use `CREATE INDEX`, thus
2662    // matching the behavior of the option `auto_index_tables`. However,
2663    // we may be presented with a `SELECT *` query, in which case the parser
2664    // does not produce sufficient information to allow us to compute
2665    // the number of output columns. In the latter case, we are supplied
2666    // with `None` for `num_attributes` and just employ the command
2667    // `CREATE DEFAULT INDEX` instead. Additionally, the view is created
2668    // without schema renaming. This strategy is insufficient to dodge
2669    // column name ambiguity in all cases, but we assume here that we
2670    // can adjust the (hopefully) small number of tests that eventually
2671    // challenge us in this particular way.
2672    let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
2673    let projection = expected_column_names.map_or_else(
2674        || {
2675            num_attributes.map_or(vec![], |n| {
2676                (1..=n)
2677                    .map(|i| Ident::new_unchecked(format!("a{i}")))
2678                    .collect()
2679            })
2680        },
2681        |cols| {
2682            cols.iter()
2683                .map(|c| Ident::new_unchecked(c.as_str()))
2684                .collect()
2685        },
2686    );
2687    let columns: Vec<Ident> = projection
2688        .iter()
2689        .cloned()
2690        .chain(extra_columns.iter().map(|item| {
2691            if let SelectItem::Expr {
2692                expr: _,
2693                alias: Some(ident),
2694            } = item
2695            {
2696                ident.clone()
2697            } else {
2698                unreachable!("alias must be given for extra column")
2699            }
2700        }))
2701        .collect();
2702
2703    // Build a `CREATE VIEW` with the columns computed above.
2704    let mut query = query.clone();
2705    if extra_columns.len() > 0 {
2706        match &mut query.body {
2707            SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
2708            _ => unimplemented!("cannot yet rewrite projections of nested queries"),
2709        }
2710    }
2711    let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
2712        if_exists: IfExistsBehavior::Error,
2713        temporary: false,
2714        definition: ViewDefinition {
2715            name: name.clone(),
2716            columns: columns.clone(),
2717            query,
2718        },
2719    })
2720    .to_ast_string_stable();
2721
2722    // We then create either a `CREATE INDEX` or a `CREATE DEFAULT INDEX`
2723    // statement, depending on whether we could obtain the number of
2724    // attributes from the original `SELECT`.
2725    let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2726        name: None,
2727        in_cluster: None,
2728        on_name: RawItemName::Name(name.clone()),
2729        key_parts: if columns.len() == 0 {
2730            None
2731        } else {
2732            Some(
2733                columns
2734                    .iter()
2735                    .map(|ident| Expr::Identifier(vec![ident.clone()]))
2736                    .collect(),
2737            )
2738        },
2739        with_options: Vec::new(),
2740        if_not_exists: false,
2741    })
2742    .to_ast_string_stable();
2743
2744    // Assert if DISTINCT semantics are unchanged from view
2745    let distinct_unneeded = extra_columns.len() == 0
2746        || match distinct {
2747            None | Some(Distinct::On(_)) => true,
2748            Some(Distinct::EntireRow) => false,
2749        };
2750    let distinct = if distinct_unneeded { None } else { distinct };
2751
2752    // `SELECT [* | {projection}] FROM {name} [ORDER BY {view_order_by}]`
2753    let view_sql = AstStatement::<Raw>::Select(SelectStatement {
2754        query: Query {
2755            ctes: CteBlock::Simple(vec![]),
2756            body: SetExpr::Select(Box::new(Select {
2757                distinct,
2758                projection: if projection.len() == 0 {
2759                    vec![SelectItem::Wildcard]
2760                } else {
2761                    projection
2762                        .iter()
2763                        .map(|ident| SelectItem::Expr {
2764                            expr: Expr::Identifier(vec![ident.clone()]),
2765                            alias: None,
2766                        })
2767                        .collect()
2768                },
2769                from: vec![TableWithJoins {
2770                    relation: TableFactor::Table {
2771                        name: RawItemName::Name(name.clone()),
2772                        alias: None,
2773                    },
2774                    joins: vec![],
2775                }],
2776                selection: None,
2777                group_by: vec![],
2778                having: None,
2779                qualify: None,
2780                options: vec![],
2781            })),
2782            order_by: view_order_by,
2783            limit: None,
2784            offset: None,
2785        },
2786        as_of: query_as_of.clone(),
2787    })
2788    .to_ast_string_stable();
2789
2790    // `DROP VIEW {name}`
2791    let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
2792        object_type: ObjectType::View,
2793        if_exists: false,
2794        names: vec![UnresolvedObjectName::Item(name)],
2795        cascade: false,
2796    })
2797    .to_ast_string_stable();
2798
2799    (create_view, create_index, view_sql, drop_view)
2800}
2801
2802/// Analyzes the provided query `body` to derive the number of
2803/// attributes in the query. We only consider syntactic cues,
2804/// so we may end up deriving `None` for the number of attributes
2805/// as a conservative approximation.
2806fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
2807    let Some((projection, _)) = find_projection(body) else {
2808        return None;
2809    };
2810    derive_num_attributes_from_projection(projection)
2811}
2812
2813/// Analyzes a query's `ORDER BY` clause to derive an `ORDER BY`
2814/// clause that makes numeric references to any expressions in
2815/// the projection and generated-attribute references to expressions
2816/// that need to be added as extra columns to the projection list.
2817/// The rewritten `ORDER BY` clause is then usable when querying a
2818/// view that contains the same `SELECT` as the given query.
2819/// This function returns both the rewritten `ORDER BY` clause
2820/// as well as a list of extra columns that need to be added
2821/// to the query's projection for the `ORDER BY` clause to
2822/// succeed.
2823fn derive_order_by(
2824    body: &SetExpr<Raw>,
2825    order_by: &Vec<OrderByExpr<Raw>>,
2826) -> (
2827    Vec<OrderByExpr<Raw>>,
2828    Vec<SelectItem<Raw>>,
2829    Option<Distinct<Raw>>,
2830) {
2831    let Some((projection, distinct)) = find_projection(body) else {
2832        return (vec![], vec![], None);
2833    };
2834    let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
2835    (view_order_by, extra_columns, distinct.clone())
2836}
2837
2838/// Finds the projection list in a `SELECT` query body.
2839fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
2840    // Iterate to peel off the query body until the query's
2841    // projection list is found.
2842    let mut set_expr = body;
2843    loop {
2844        match set_expr {
2845            SetExpr::Select(select) => {
2846                return Some((&select.projection, &select.distinct));
2847            }
2848            SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
2849            SetExpr::Query(query) => set_expr = &query.body,
2850            _ => return None,
2851        }
2852    }
2853}
2854
2855/// Computes the number of attributes that are obtained by the
2856/// projection of a `SELECT` query. The projection may include
2857/// wildcards, in which case the analysis just returns `None`.
2858fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
2859    let mut num_attributes = 0usize;
2860    for item in projection.iter() {
2861        let SelectItem::Expr { expr, .. } = item else {
2862            return None;
2863        };
2864        match expr {
2865            Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
2866                return None;
2867            }
2868            _ => {
2869                num_attributes += 1;
2870            }
2871        }
2872    }
2873    Some(num_attributes)
2874}
2875
2876/// Computes an `ORDER BY` clause with only numeric references
2877/// from given projection and `ORDER BY` of a `SELECT` query.
2878/// If the derivation fails to match a given expression, the
2879/// matched prefix is returned. Note that this could be empty.
2880fn derive_order_by_from_projection(
2881    projection: &Vec<SelectItem<Raw>>,
2882    order_by: &Vec<OrderByExpr<Raw>>,
2883) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
2884    let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
2885    let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
2886    for order_by_expr in order_by.iter() {
2887        let query_expr = &order_by_expr.expr;
2888        let view_expr = match query_expr {
2889            Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
2890            _ => {
2891                // Find expression in query projection, if we can.
2892                if let Some(i) = projection.iter().position(|item| match item {
2893                    SelectItem::Expr { expr, alias } => {
2894                        expr == query_expr
2895                            || match query_expr {
2896                                Expr::Identifier(ident) => {
2897                                    ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
2898                                }
2899                                _ => false,
2900                            }
2901                    }
2902                    SelectItem::Wildcard => false,
2903                }) {
2904                    Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
2905                } else {
2906                    // If the expression is not found in the
2907                    // projection, add extra column.
2908                    let ident = Ident::new_unchecked(format!(
2909                        "a{}",
2910                        (projection.len() + extra_columns.len() + 1)
2911                    ));
2912                    extra_columns.push(SelectItem::Expr {
2913                        expr: query_expr.clone(),
2914                        alias: Some(ident.clone()),
2915                    });
2916                    Expr::Identifier(vec![ident])
2917                }
2918            }
2919        };
2920        view_order_by.push(OrderByExpr {
2921            expr: view_expr,
2922            asc: order_by_expr.asc,
2923            nulls_last: order_by_expr.nulls_last,
2924        });
2925    }
2926    (view_order_by, extra_columns)
2927}
2928
2929/// Returns extra statements to execute after `stmt` is executed.
2930fn mutate(sql: &str) -> Vec<String> {
2931    let stmts = parser::parse_statements(sql).unwrap_or_default();
2932    let mut additional = Vec::new();
2933    for stmt in stmts {
2934        match stmt.ast {
2935            AstStatement::CreateTable(stmt) => additional.push(
2936                // CREATE TABLE -> CREATE INDEX. Specify all columns manually in case CREATE
2937                // DEFAULT INDEX ever goes away.
2938                AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2939                    name: None,
2940                    in_cluster: None,
2941                    on_name: RawItemName::Name(stmt.name.clone()),
2942                    key_parts: Some(
2943                        stmt.columns
2944                            .iter()
2945                            .map(|def| Expr::Identifier(vec![def.name.clone()]))
2946                            .collect(),
2947                    ),
2948                    with_options: Vec::new(),
2949                    if_not_exists: false,
2950                })
2951                .to_ast_string_stable(),
2952            ),
2953            _ => {}
2954        }
2955    }
2956    additional
2957}
2958
2959#[mz_ore::test]
2960#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
2961fn test_generate_view_sql() {
2962    let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
2963    let cases = vec![
2964        (("SELECT * FROM t", None, None),
2965        (
2966            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
2967            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2968            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2969            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2970        )),
2971        (("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
2972        (
2973            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2974            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
2975            r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2976            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2977        )),
2978        (("SELECT a, b, c FROM t1, t2", Some(3), None),
2979        (
2980            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2981            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2982            r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2983            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2984        )),
2985        // A case with ambiguity that is accepted by the function, illustrating that
2986        // our measures to dodge this issue are imperfect.
2987        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
2988        (
2989            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
2990            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2991            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2992            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2993        )),
2994        (("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
2995        (
2996            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
2997            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
2998            r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
2999            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3000        )),
3001        (("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
3002        (
3003            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
3004            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
3005            r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
3006            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3007        )),
3008        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
3009        (
3010            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
3011            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3012            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
3013            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3014        )),
3015        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
3016        (
3017            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
3018            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3019            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
3020            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3021        )),
3022        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
3023        (
3024            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
3025            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
3026            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
3027            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3028        )),
3029        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
3030        (
3031            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
3032            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
3033            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
3034            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3035        )),
3036    ];
3037    for ((sql, num_attributes, expected_column_names), expected) in cases {
3038        let view_sql =
3039            generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
3040        assert_eq!(expected, view_sql);
3041    }
3042}
3043
3044#[mz_ore::test]
3045fn test_mutate() {
3046    let cases = vec![
3047        ("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
3048        (
3049            "CREATE TABLE t (a INT)",
3050            vec![r#"CREATE INDEX ON "t" ("a")"#],
3051        ),
3052        (
3053            "CREATE TABLE t (a INT, b TEXT)",
3054            vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
3055        ),
3056        // Invalid syntax works, just returns nothing.
3057        ("BAD SYNTAX", Vec::new()),
3058    ];
3059    for (sql, expected) in cases {
3060        let stmts = mutate(sql);
3061        assert_eq!(expected, stmts, "sql: {sql}");
3062    }
3063}