mz_sqllogictest/
runner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The Materialize-specific runner for sqllogictest.
11//!
12//! slt tests expect a serialized execution of sql statements and queries.
13//! To get the same results in materialize we track current_timestamp and increment it whenever we execute a statement.
14//!
15//! The high-level workflow is:
16//!   for each record in the test file:
17//!     if record is a sql statement:
18//!       run sql in postgres, observe changes and copy them to materialize using LocalInput::Updates(..)
19//!       advance current_timestamp
20//!       promise to never send updates for times < current_timestamp using LocalInput::Watermark(..)
21//!       compare to expected results
22//!       if wrong, bail out and stop processing this file
23//!     if record is a sql query:
24//!       peek query at current_timestamp
25//!       compare to expected results
26//!       if wrong, record the error
27
28use std::collections::BTreeMap;
29use std::error::Error;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write};
32use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::LazyLock;
36use std::time::Duration;
37use std::{env, fmt, ops, str, thread};
38
39use anyhow::{anyhow, bail};
40use bytes::BytesMut;
41use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
42use fallible_iterator::FallibleIterator;
43use futures::sink::SinkExt;
44use itertools::Itertools;
45use maplit::btreemap;
46use md5::{Digest, Md5};
47use mz_adapter_types::bootstrap_builtin_cluster_config::{
48    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
49    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
50    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
51};
52use mz_catalog::config::ClusterReplicaSizeMap;
53use mz_controller::ControllerConfig;
54use mz_environmentd::CatalogConfig;
55use mz_license_keys::ValidatedLicenseKey;
56use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
57use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
58use mz_ore::cast::{CastFrom, ReinterpretCast};
59use mz_ore::channel::trigger;
60use mz_ore::error::ErrorExt;
61use mz_ore::metrics::MetricsRegistry;
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::retry::Retry;
64use mz_ore::task;
65use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
66use mz_ore::tracing::TracingHandle;
67use mz_ore::url::SensitiveUrl;
68use mz_persist_client::PersistLocation;
69use mz_persist_client::cache::PersistClientCache;
70use mz_persist_client::cfg::PersistConfig;
71use mz_persist_client::rpc::{
72    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
73};
74use mz_pgrepr::{Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value, oid};
75use mz_repr::ColumnName;
76use mz_repr::adt::date::Date;
77use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
78use mz_repr::adt::numeric;
79use mz_secrets::SecretsController;
80use mz_server_core::listeners::{
81    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
82    ListenersConfig, SqlListenerConfig,
83};
84use mz_sql::ast::{Expr, Raw, Statement};
85use mz_sql::catalog::EnvironmentId;
86use mz_sql_parser::ast::display::AstDisplay;
87use mz_sql_parser::ast::{
88    CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
89    IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
90    SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
91    UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
92};
93use mz_sql_parser::parser;
94use mz_storage_types::connections::ConnectionContext;
95use postgres_protocol::types;
96use regex::Regex;
97use tempfile::TempDir;
98use tokio::net::TcpListener;
99use tokio::runtime::Runtime;
100use tokio::sync::oneshot;
101use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
102use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
103use tokio_stream::wrappers::TcpListenerStream;
104use tower_http::cors::AllowOrigin;
105use tracing::{error, info};
106use uuid::Uuid;
107use uuid::fmt::Simple;
108
109use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
110use crate::util;
111
112#[derive(Debug)]
113pub enum Outcome<'a> {
114    Unsupported {
115        error: anyhow::Error,
116        location: Location,
117    },
118    ParseFailure {
119        error: anyhow::Error,
120        location: Location,
121    },
122    PlanFailure {
123        error: anyhow::Error,
124        location: Location,
125    },
126    UnexpectedPlanSuccess {
127        expected_error: &'a str,
128        location: Location,
129    },
130    WrongNumberOfRowsInserted {
131        expected_count: u64,
132        actual_count: u64,
133        location: Location,
134    },
135    WrongColumnCount {
136        expected_count: usize,
137        actual_count: usize,
138        location: Location,
139    },
140    WrongColumnNames {
141        expected_column_names: &'a Vec<ColumnName>,
142        actual_column_names: Vec<ColumnName>,
143        actual_output: Output,
144        location: Location,
145    },
146    OutputFailure {
147        expected_output: &'a Output,
148        actual_raw_output: Vec<Row>,
149        actual_output: Output,
150        location: Location,
151    },
152    InconsistentViewOutcome {
153        query_outcome: Box<Outcome<'a>>,
154        view_outcome: Box<Outcome<'a>>,
155        location: Location,
156    },
157    Bail {
158        cause: Box<Outcome<'a>>,
159        location: Location,
160    },
161    Warning {
162        cause: Box<Outcome<'a>>,
163        location: Location,
164    },
165    Success,
166}
167
168const NUM_OUTCOMES: usize = 12;
169const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
170const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
171
172impl<'a> Outcome<'a> {
173    fn code(&self) -> usize {
174        match self {
175            Outcome::Unsupported { .. } => 0,
176            Outcome::ParseFailure { .. } => 1,
177            Outcome::PlanFailure { .. } => 2,
178            Outcome::UnexpectedPlanSuccess { .. } => 3,
179            Outcome::WrongNumberOfRowsInserted { .. } => 4,
180            Outcome::WrongColumnCount { .. } => 5,
181            Outcome::WrongColumnNames { .. } => 6,
182            Outcome::OutputFailure { .. } => 7,
183            Outcome::InconsistentViewOutcome { .. } => 8,
184            Outcome::Bail { .. } => 9,
185            Outcome::Warning { .. } => 10,
186            Outcome::Success => 11,
187        }
188    }
189
190    fn success(&self) -> bool {
191        matches!(self, Outcome::Success)
192    }
193
194    fn failure(&self) -> bool {
195        !matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
196    }
197
198    /// Returns an error message that will match self. Appropriate for
199    /// rewriting error messages (i.e. not inserting error messages where we
200    /// currently expect success).
201    fn err_msg(&self) -> Option<String> {
202        match self {
203            Outcome::Unsupported { error, .. }
204            | Outcome::ParseFailure { error, .. }
205            | Outcome::PlanFailure { error, .. } => Some(
206                // This value gets fed back into regex to check that it matches
207                // `self`, so escape its meta characters.
208                regex::escape(
209                    // Take only the first string in the error message, which should be
210                    // sufficient for meaningfully matching the error.
211                    error.to_string().split('\n').next().unwrap(),
212                )
213                // We need to undo the escaping of #. `regex::escape` escapes this because it
214                // expects that we use the `x` flag when building a regex, but this is not the case,
215                // so \# would end up being an invalid escape sequence, which would choke the
216                // parsing of the slt file the next time around.
217                .replace(r"\#", "#"),
218            ),
219            _ => None,
220        }
221    }
222}
223
224impl fmt::Display for Outcome<'_> {
225    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
226        use Outcome::*;
227        const INDENT: &str = "\n        ";
228        match self {
229            Unsupported { error, location } => write!(
230                f,
231                "Unsupported:{}:\n{}",
232                location,
233                error.display_with_causes()
234            ),
235            ParseFailure { error, location } => {
236                write!(
237                    f,
238                    "ParseFailure:{}:\n{}",
239                    location,
240                    error.display_with_causes()
241                )
242            }
243            PlanFailure { error, location } => write!(f, "PlanFailure:{}:\n{:#}", location, error),
244            UnexpectedPlanSuccess {
245                expected_error,
246                location,
247            } => write!(
248                f,
249                "UnexpectedPlanSuccess:{} expected error: {}",
250                location, expected_error
251            ),
252            WrongNumberOfRowsInserted {
253                expected_count,
254                actual_count,
255                location,
256            } => write!(
257                f,
258                "WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
259                location, INDENT, expected_count, INDENT, actual_count
260            ),
261            WrongColumnCount {
262                expected_count,
263                actual_count,
264                location,
265            } => write!(
266                f,
267                "WrongColumnCount:{}{}expected: {}{}actually: {}",
268                location, INDENT, expected_count, INDENT, actual_count
269            ),
270            WrongColumnNames {
271                expected_column_names,
272                actual_column_names,
273                actual_output: _,
274                location,
275            } => write!(
276                f,
277                "Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
278                location,
279                INDENT,
280                expected_column_names
281                    .iter()
282                    .map(|n| n.to_string())
283                    .collect::<Vec<_>>()
284                    .join(" "),
285                INDENT,
286                actual_column_names
287                    .iter()
288                    .map(|n| n.to_string())
289                    .collect::<Vec<_>>()
290                    .join(" ")
291            ),
292            OutputFailure {
293                expected_output,
294                actual_raw_output,
295                actual_output,
296                location,
297            } => write!(
298                f,
299                "OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
300                location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
301            ),
302            InconsistentViewOutcome {
303                query_outcome,
304                view_outcome,
305                location,
306            } => write!(
307                f,
308                "InconsistentViewOutcome:{}{}expected from query: {:?}{}actually from indexed view: {:?}{}",
309                location, INDENT, query_outcome, INDENT, view_outcome, INDENT
310            ),
311            Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
312            Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
313            Success => f.write_str("Success"),
314        }
315    }
316}
317
318#[derive(Default, Debug)]
319pub struct Outcomes {
320    stats: [usize; NUM_OUTCOMES],
321    details: Vec<String>,
322}
323
324impl ops::AddAssign<Outcomes> for Outcomes {
325    fn add_assign(&mut self, rhs: Outcomes) {
326        for (lhs, rhs) in self.stats.iter_mut().zip_eq(rhs.stats.iter()) {
327            *lhs += rhs
328        }
329    }
330}
331impl Outcomes {
332    pub fn any_failed(&self) -> bool {
333        self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
334    }
335
336    pub fn as_json(&self) -> serde_json::Value {
337        serde_json::json!({
338            "unsupported": self.stats[0],
339            "parse_failure": self.stats[1],
340            "plan_failure": self.stats[2],
341            "unexpected_plan_success": self.stats[3],
342            "wrong_number_of_rows_affected": self.stats[4],
343            "wrong_column_count": self.stats[5],
344            "wrong_column_names": self.stats[6],
345            "output_failure": self.stats[7],
346            "inconsistent_view_outcome": self.stats[8],
347            "bail": self.stats[9],
348            "warning": self.stats[10],
349            "success": self.stats[11],
350        })
351    }
352
353    pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
354        OutcomesDisplay {
355            inner: self,
356            no_fail,
357            failure_details,
358        }
359    }
360}
361
362pub struct OutcomesDisplay<'a> {
363    inner: &'a Outcomes,
364    no_fail: bool,
365    failure_details: bool,
366}
367
368impl<'a> fmt::Display for OutcomesDisplay<'a> {
369    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
370        let total: usize = self.inner.stats.iter().sum();
371        if self.failure_details
372            && (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
373                || self.no_fail)
374        {
375            for outcome in &self.inner.details {
376                writeln!(f, "{}", outcome)?;
377            }
378            Ok(())
379        } else {
380            write!(
381                f,
382                "{}:",
383                if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
384                    "PASS"
385                } else if self.no_fail {
386                    "FAIL-IGNORE"
387                } else {
388                    "FAIL"
389                }
390            )?;
391            static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
392                vec![
393                    "unsupported",
394                    "parse-failure",
395                    "plan-failure",
396                    "unexpected-plan-success",
397                    "wrong-number-of-rows-inserted",
398                    "wrong-column-count",
399                    "wrong-column-names",
400                    "output-failure",
401                    "inconsistent-view-outcome",
402                    "bail",
403                    "warning",
404                    "success",
405                    "total",
406                ]
407            });
408            for (i, n) in self.inner.stats.iter().enumerate() {
409                if *n > 0 {
410                    write!(f, " {}={}", NAMES[i], n)?;
411                }
412            }
413            write!(f, " total={}", total)
414        }
415    }
416}
417
418struct QueryInfo {
419    is_select: bool,
420    num_attributes: Option<usize>,
421}
422
423enum PrepareQueryOutcome<'a> {
424    QueryPrepared(QueryInfo),
425    Outcome(Outcome<'a>),
426}
427
428pub struct Runner<'a> {
429    config: &'a RunConfig<'a>,
430    inner: Option<RunnerInner<'a>>,
431}
432
433pub struct RunnerInner<'a> {
434    server_addr: SocketAddr,
435    internal_server_addr: SocketAddr,
436    password_server_addr: SocketAddr,
437    internal_http_server_addr: SocketAddr,
438    // Drop order matters for these fields.
439    client: tokio_postgres::Client,
440    system_client: tokio_postgres::Client,
441    clients: BTreeMap<String, tokio_postgres::Client>,
442    auto_index_tables: bool,
443    auto_index_selects: bool,
444    auto_transactions: bool,
445    enable_table_keys: bool,
446    verbosity: u8,
447    stdout: &'a dyn WriteFmt,
448    _shutdown_trigger: trigger::Trigger,
449    _server_thread: JoinOnDropHandle<()>,
450    _temp_dir: TempDir,
451}
452
453#[derive(Debug)]
454pub struct Slt(Value);
455
456impl<'a> FromSql<'a> for Slt {
457    fn from_sql(
458        ty: &PgType,
459        mut raw: &'a [u8],
460    ) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
461        Ok(match *ty {
462            PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
463                types::bytea_from_sql(raw),
464            )?)),
465            PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
466            PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
467            PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
468                types::char_from_sql(raw)?.to_be_bytes(),
469            ))),
470            PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
471            PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
472            PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
473                raw,
474            )?)?)),
475            PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
476            PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
477            PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
478            PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
479            PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
480            PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
481            PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
482            PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
483            PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
484            PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
485            PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
486            PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
487                Self(Value::Text(types::text_from_sql(raw)?.to_string()))
488            }
489            PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
490            PgType::TIMESTAMP => Self(Value::Timestamp(
491                NaiveDateTime::from_sql(ty, raw)?.try_into()?,
492            )),
493            PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
494                DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
495            )),
496            PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
497            PgType::RECORD => {
498                let num_fields = read_be_i32(&mut raw)?;
499                let mut tuple = vec![];
500                for _ in 0..num_fields {
501                    let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
502                    let typ = match PgType::from_oid(oid) {
503                        Some(typ) => typ,
504                        None => return Err("unknown oid".into()),
505                    };
506                    let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
507                    tuple.push(v.map(|v| v.0));
508                }
509                Self(Value::Record(tuple))
510            }
511            PgType::INT4_RANGE
512            | PgType::INT8_RANGE
513            | PgType::DATE_RANGE
514            | PgType::NUM_RANGE
515            | PgType::TS_RANGE
516            | PgType::TSTZ_RANGE => {
517                use mz_repr::adt::range::Range;
518                let range: Range<Slt> = Range::from_sql(ty, raw)?;
519                Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
520            }
521
522            _ => match ty.kind() {
523                PgKind::Array(arr_type) => {
524                    let arr = types::array_from_sql(raw)?;
525                    let elements: Vec<Option<Value>> = arr
526                        .values()
527                        .map(|v| match v {
528                            Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
529                            None => Ok(None),
530                        })
531                        .collect::<Vec<Option<Slt>>>()?
532                        .into_iter()
533                        // Map a Vec<Option<Slt>> to Vec<Option<Value>>.
534                        .map(|v| v.map(|v| v.0))
535                        .collect();
536
537                    Self(Value::Array {
538                        dims: arr
539                            .dimensions()
540                            .map(|d| {
541                                Ok(mz_repr::adt::array::ArrayDimension {
542                                    lower_bound: isize::cast_from(d.lower_bound),
543                                    length: usize::try_from(d.len)
544                                        .expect("cannot have negative length"),
545                                })
546                            })
547                            .collect()?,
548                        elements,
549                    })
550                }
551                _ => match ty.oid() {
552                    oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
553                    oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
554                    oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
555                    oid::TYPE_MZ_TIMESTAMP_OID => {
556                        let s = types::text_from_sql(raw)?;
557                        let t: mz_repr::Timestamp = s.parse()?;
558                        Self(Value::MzTimestamp(t))
559                    }
560                    oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
561                        types::bytea_from_sql(raw),
562                    )?)),
563                    _ => unreachable!(),
564                },
565            },
566        })
567    }
568    fn accepts(ty: &PgType) -> bool {
569        match ty.kind() {
570            PgKind::Array(_) | PgKind::Composite(_) => return true,
571            _ => {}
572        }
573        match ty.oid() {
574            oid::TYPE_UINT2_OID
575            | oid::TYPE_UINT4_OID
576            | oid::TYPE_UINT8_OID
577            | oid::TYPE_MZ_TIMESTAMP_OID
578            | oid::TYPE_MZ_ACL_ITEM_OID => return true,
579            _ => {}
580        }
581        matches!(
582            *ty,
583            PgType::ACLITEM
584                | PgType::BOOL
585                | PgType::BYTEA
586                | PgType::CHAR
587                | PgType::DATE
588                | PgType::FLOAT4
589                | PgType::FLOAT8
590                | PgType::INT2
591                | PgType::INT4
592                | PgType::INT8
593                | PgType::INTERVAL
594                | PgType::JSONB
595                | PgType::NAME
596                | PgType::NUMERIC
597                | PgType::OID
598                | PgType::REGCLASS
599                | PgType::REGPROC
600                | PgType::REGTYPE
601                | PgType::RECORD
602                | PgType::TEXT
603                | PgType::BPCHAR
604                | PgType::VARCHAR
605                | PgType::TIME
606                | PgType::TIMESTAMP
607                | PgType::TIMESTAMPTZ
608                | PgType::UUID
609                | PgType::INT4_RANGE
610                | PgType::INT4_RANGE_ARRAY
611                | PgType::INT8_RANGE
612                | PgType::INT8_RANGE_ARRAY
613                | PgType::DATE_RANGE
614                | PgType::DATE_RANGE_ARRAY
615                | PgType::NUM_RANGE
616                | PgType::NUM_RANGE_ARRAY
617                | PgType::TS_RANGE
618                | PgType::TS_RANGE_ARRAY
619                | PgType::TSTZ_RANGE
620                | PgType::TSTZ_RANGE_ARRAY
621        )
622    }
623}
624
625// From postgres-types/src/private.rs.
626fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
627    if buf.len() < 4 {
628        return Err("invalid buffer size".into());
629    }
630    let mut bytes = [0; 4];
631    bytes.copy_from_slice(&buf[..4]);
632    *buf = &buf[4..];
633    Ok(i32::from_be_bytes(bytes))
634}
635
636// From postgres-types/src/private.rs.
637fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
638where
639    T: FromSql<'a>,
640{
641    let value = match usize::try_from(read_be_i32(buf)?) {
642        Err(_) => None,
643        Ok(len) => {
644            if len > buf.len() {
645                return Err("invalid buffer size".into());
646            }
647            let (head, tail) = buf.split_at(len);
648            *buf = tail;
649            Some(head)
650        }
651    };
652    T::from_sql_nullable(type_, value)
653}
654
655fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
656    match (typ, d.0) {
657        (Type::Bool, Value::Bool(b)) => b.to_string(),
658
659        (Type::Integer, Value::Int2(i)) => i.to_string(),
660        (Type::Integer, Value::Int4(i)) => i.to_string(),
661        (Type::Integer, Value::Int8(i)) => i.to_string(),
662        (Type::Integer, Value::UInt2(u)) => u.0.to_string(),
663        (Type::Integer, Value::UInt4(u)) => u.0.to_string(),
664        (Type::Integer, Value::UInt8(u)) => u.0.to_string(),
665        (Type::Integer, Value::Oid(i)) => i.to_string(),
666        // TODO(benesch): rewrite to avoid `as`.
667        #[allow(clippy::as_conversions)]
668        (Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
669        // TODO(benesch): rewrite to avoid `as`.
670        #[allow(clippy::as_conversions)]
671        (Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
672        // This is so wrong, but sqlite needs it.
673        (Type::Integer, Value::Text(_)) => "0".to_string(),
674        (Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
675        (Type::Integer, Value::Numeric(d)) => {
676            let mut d = d.0.0.clone();
677            let mut cx = numeric::cx_datum();
678            // Truncate the decimal to match sqlite.
679            if mode == Mode::Standard {
680                cx.set_rounding(dec::Rounding::Down);
681            }
682            cx.round(&mut d);
683            numeric::munge_numeric(&mut d).unwrap();
684            d.to_standard_notation_string()
685        }
686
687        (Type::Real, Value::Int2(i)) => format!("{:.3}", i),
688        (Type::Real, Value::Int4(i)) => format!("{:.3}", i),
689        (Type::Real, Value::Int8(i)) => format!("{:.3}", i),
690        (Type::Real, Value::Float4(f)) => match mode {
691            Mode::Standard => format!("{:.3}", f),
692            Mode::Cockroach => format!("{}", f),
693        },
694        (Type::Real, Value::Float8(f)) => match mode {
695            Mode::Standard => format!("{:.3}", f),
696            Mode::Cockroach => format!("{}", f),
697        },
698        (Type::Real, Value::Numeric(d)) => match mode {
699            Mode::Standard => {
700                let mut d = d.0.0.clone();
701                if d.exponent() < -3 {
702                    numeric::rescale(&mut d, 3).unwrap();
703                }
704                numeric::munge_numeric(&mut d).unwrap();
705                d.to_standard_notation_string()
706            }
707            Mode::Cockroach => d.0.0.to_standard_notation_string(),
708        },
709
710        (Type::Text, Value::Text(s)) => {
711            if s.is_empty() {
712                "(empty)".to_string()
713            } else {
714                s
715            }
716        }
717        (Type::Text, Value::Bool(b)) => b.to_string(),
718        (Type::Text, Value::Float4(f)) => format!("{:.3}", f),
719        (Type::Text, Value::Float8(f)) => format!("{:.3}", f),
720        // Bytes are printed as text iff they are valid UTF-8. This
721        // seems guaranteed to confuse everyone, but it is required for
722        // compliance with the CockroachDB sqllogictest runner. [0]
723        //
724        // [0]: https://github.com/cockroachdb/cockroach/blob/970782487/pkg/sql/logictest/logic.go#L2038-L2043
725        (Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
726            Ok(s) => s.to_string(),
727            Err(_) => format!("{:?}", b),
728        },
729        (Type::Text, Value::Numeric(d)) => d.0.0.to_standard_notation_string(),
730        // Everything else gets normal text encoding. This correctly handles things
731        // like arrays, tuples, and strings that need to be quoted.
732        (Type::Text, d) => {
733            let mut buf = BytesMut::new();
734            d.encode_text(&mut buf);
735            String::from_utf8_lossy(&buf).into_owned()
736        }
737
738        (Type::Oid, Value::Oid(o)) => o.to_string(),
739
740        (_, d) => panic!(
741            "Don't know how to format {:?} as {:?} in column {}",
742            d, typ, col,
743        ),
744    }
745}
746
747fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
748    let mut formatted: Vec<String> = vec![];
749    for i in 0..row.len() {
750        let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
751        let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
752        formatted.push(match t {
753            Some(t) => t,
754            None => "NULL".into(),
755        });
756    }
757
758    formatted
759}
760
761impl<'a> Runner<'a> {
762    pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
763        let mut runner = Self {
764            config,
765            inner: None,
766        };
767        runner.reset().await?;
768        Ok(runner)
769    }
770
771    pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
772        // Explicitly drop the old runner here to ensure that we wait for threads to terminate
773        // before starting a new runner
774        drop(self.inner.take());
775        self.inner = Some(RunnerInner::start(self.config).await?);
776
777        Ok(())
778    }
779
780    async fn run_record<'r>(
781        &mut self,
782        record: &'r Record<'r>,
783        in_transaction: &mut bool,
784    ) -> Result<Outcome<'r>, anyhow::Error> {
785        if let Record::ResetServer = record {
786            self.reset().await?;
787            Ok(Outcome::Success)
788        } else {
789            self.inner
790                .as_mut()
791                .expect("RunnerInner missing")
792                .run_record(record, in_transaction)
793                .await
794        }
795    }
796
797    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
798        self.inner
799            .as_ref()
800            .expect("RunnerInner missing")
801            .check_catalog()
802            .await
803    }
804
805    async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
806        let inner = self.inner.as_mut().expect("RunnerInner missing");
807
808        inner.client.batch_execute("ROLLBACK;").await?;
809
810        inner
811            .system_client
812            .batch_execute(
813                "ROLLBACK;
814                 SET cluster = mz_catalog_server;
815                 RESET cluster_replica;",
816            )
817            .await?;
818
819        inner
820            .system_client
821            .batch_execute("ALTER SYSTEM RESET ALL")
822            .await?;
823
824        // Drop all databases, then recreate the `materialize` database.
825        for row in inner
826            .system_client
827            .query("SELECT name FROM mz_databases", &[])
828            .await?
829        {
830            let name: &str = row.get("name");
831            inner
832                .system_client
833                .batch_execute(&format!("DROP DATABASE \"{name}\""))
834                .await?;
835        }
836        inner
837            .system_client
838            .batch_execute("CREATE DATABASE materialize")
839            .await?;
840
841        // Ensure quickstart cluster exists with one replica of size `self.config.replica_size`.
842        // We don't destroy the existing quickstart cluster replica if it exists, as turning
843        // on a cluster replica is exceptionally slow.
844        let mut needs_default_cluster = true;
845        for row in inner
846            .system_client
847            .query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
848            .await?
849        {
850            match row.get("name") {
851                "quickstart" => needs_default_cluster = false,
852                name => {
853                    inner
854                        .system_client
855                        .batch_execute(&format!("DROP CLUSTER {name}"))
856                        .await?
857                }
858            }
859        }
860        if needs_default_cluster {
861            inner
862                .system_client
863                .batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
864                .await?;
865        }
866        let mut needs_default_replica = false;
867        let rows = inner
868            .system_client
869            .query(
870                "SELECT name, size FROM mz_cluster_replicas
871                 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')
872                 ORDER BY name",
873                &[],
874            )
875            .await?;
876        if rows.len() != self.config.replicas {
877            needs_default_replica = true;
878        } else {
879            for (i, row) in rows.iter().enumerate() {
880                let name: &str = row.get("name");
881                let size: &str = row.get("size");
882                if name != format!("r{i}") || size != self.config.replica_size {
883                    needs_default_replica = true;
884                    break;
885                }
886            }
887        }
888
889        if needs_default_replica {
890            inner
891                .system_client
892                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = false)")
893                .await?;
894            for row in inner
895                .system_client
896                .query(
897                    "SELECT name FROM mz_cluster_replicas
898                     WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
899                    &[],
900                )
901                .await?
902            {
903                let name: &str = row.get("name");
904                inner
905                    .system_client
906                    .batch_execute(&format!("DROP CLUSTER REPLICA quickstart.{}", name))
907                    .await?;
908            }
909            for i in 1..=self.config.replicas {
910                inner
911                    .system_client
912                    .batch_execute(&format!(
913                        "CREATE CLUSTER REPLICA quickstart.r{i} SIZE '{}'",
914                        self.config.replica_size
915                    ))
916                    .await?;
917            }
918            inner
919                .system_client
920                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = true)")
921                .await?;
922        }
923
924        // Grant initial privileges.
925        inner
926            .system_client
927            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
928            .await?;
929        inner
930            .system_client
931            .batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
932            .await?;
933        inner
934            .system_client
935            .batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
936            .await?;
937        inner
938            .system_client
939            .batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
940            .await?;
941        inner
942            .system_client
943            .batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
944            .await?;
945
946        // Some sqllogictests require more than the default amount of tables, so we increase the
947        // limit for all tests.
948        inner
949            .system_client
950            .simple_query("ALTER SYSTEM SET max_tables = 100")
951            .await?;
952
953        if inner.enable_table_keys {
954            inner
955                .system_client
956                .simple_query("ALTER SYSTEM SET unsafe_enable_table_keys = true")
957                .await?;
958        }
959
960        inner.ensure_fixed_features().await?;
961
962        inner.client = connect(inner.server_addr, None, None).await.unwrap();
963        inner.system_client = connect(inner.internal_server_addr, Some("mz_system"), None)
964            .await
965            .unwrap();
966        inner.clients = BTreeMap::new();
967
968        Ok(())
969    }
970}
971
972impl<'a> RunnerInner<'a> {
973    pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
974        let temp_dir = tempfile::tempdir()?;
975        let scratch_dir = tempfile::tempdir()?;
976        let environment_id = EnvironmentId::for_tests();
977        let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
978            let postgres_url = &config.postgres_url;
979            let prefix = &config.prefix;
980            info!(%postgres_url, "starting server");
981            let (client, conn) = Retry::default()
982                .max_tries(5)
983                .retry_async(|_| async {
984                    match tokio_postgres::connect(postgres_url, NoTls).await {
985                        Ok(c) => Ok(c),
986                        Err(e) => {
987                            error!(%e, "failed to connect to postgres");
988                            Err(e)
989                        }
990                    }
991                })
992                .await?;
993            task::spawn(|| "sqllogictest_connect", async move {
994                if let Err(e) = conn.await {
995                    panic!("connection error: {}", e);
996                }
997            });
998            client
999                .batch_execute(&format!(
1000                    "DROP SCHEMA IF EXISTS {prefix}_tsoracle CASCADE;
1001                     CREATE SCHEMA IF NOT EXISTS {prefix}_consensus;
1002                     CREATE SCHEMA {prefix}_tsoracle;"
1003                ))
1004                .await?;
1005            (
1006                format!("{postgres_url}?options=--search_path={prefix}_consensus")
1007                    .parse()
1008                    .expect("invalid consensus URI"),
1009                format!("{postgres_url}?options=--search_path={prefix}_tsoracle")
1010                    .parse()
1011                    .expect("invalid timestamp oracle URI"),
1012            )
1013        };
1014
1015        let secrets_dir = temp_dir.path().join("secrets");
1016        let orchestrator = Arc::new(
1017            ProcessOrchestrator::new(ProcessOrchestratorConfig {
1018                image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
1019                suppress_output: false,
1020                environment_id: environment_id.to_string(),
1021                secrets_dir: secrets_dir.clone(),
1022                command_wrapper: config
1023                    .orchestrator_process_wrapper
1024                    .as_ref()
1025                    .map_or(Ok(vec![]), |s| shell_words::split(s))?,
1026                propagate_crashes: true,
1027                tcp_proxy: None,
1028                scratch_directory: scratch_dir.path().to_path_buf(),
1029            })
1030            .await?,
1031        );
1032        let now = SYSTEM_TIME.clone();
1033        let metrics_registry = MetricsRegistry::new();
1034
1035        let persist_config = PersistConfig::new(
1036            &mz_environmentd::BUILD_INFO,
1037            now.clone(),
1038            mz_dyncfgs::all_dyncfgs(),
1039        );
1040        let persist_pubsub_server =
1041            PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
1042        let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
1043        let persist_pubsub_tcp_listener =
1044            TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
1045                .await
1046                .expect("pubsub addr binding");
1047        let persist_pubsub_server_port = persist_pubsub_tcp_listener
1048            .local_addr()
1049            .expect("pubsub addr has local addr")
1050            .port();
1051        info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
1052        mz_ore::task::spawn(|| "persist_pubsub_server", async move {
1053            persist_pubsub_server
1054                .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
1055                .await
1056                .expect("success")
1057        });
1058        let persist_clients =
1059            PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1060                let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1061                    cfg,
1062                    persist_pubsub_client.sender,
1063                    metrics,
1064                ));
1065                PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1066            });
1067        let persist_clients = Arc::new(persist_clients);
1068
1069        let secrets_controller = Arc::clone(&orchestrator);
1070        let connection_context = ConnectionContext::for_tests(orchestrator.reader());
1071        let orchestrator = Arc::new(TracingOrchestrator::new(
1072            orchestrator,
1073            config.tracing.clone(),
1074        ));
1075        let listeners_config = ListenersConfig {
1076            sql: btreemap! {
1077                "external".to_owned() => SqlListenerConfig {
1078                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1079                    authenticator_kind: AuthenticatorKind::None,
1080                    allowed_roles: AllowedRoles::Normal,
1081                    enable_tls: false,
1082                },
1083                "internal".to_owned() => SqlListenerConfig {
1084                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1085                    authenticator_kind: AuthenticatorKind::None,
1086                    allowed_roles: AllowedRoles::Internal,
1087                    enable_tls: false,
1088                },
1089                "password".to_owned() => SqlListenerConfig {
1090                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1091                    authenticator_kind: AuthenticatorKind::Password,
1092                    allowed_roles: AllowedRoles::Normal,
1093                    enable_tls: false,
1094                },
1095            },
1096            http: btreemap![
1097                "external".to_owned() => HttpListenerConfig {
1098                    base: BaseListenerConfig {
1099                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1100                        authenticator_kind: AuthenticatorKind::None,
1101                        allowed_roles: AllowedRoles::Normal,
1102                        enable_tls: false
1103                    },
1104                    routes: HttpRoutesEnabled {
1105                        base: true,
1106                        webhook: true,
1107                        internal: false,
1108                        metrics: false,
1109                        profiling: false,
1110                    },
1111                },
1112                "internal".to_owned() => HttpListenerConfig {
1113                    base: BaseListenerConfig {
1114                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1115                        authenticator_kind: AuthenticatorKind::None,
1116                        allowed_roles: AllowedRoles::NormalAndInternal,
1117                        enable_tls: false
1118                    },
1119                    routes: HttpRoutesEnabled {
1120                        base: true,
1121                        webhook: true,
1122                        internal: true,
1123                        metrics: true,
1124                        profiling: true,
1125                    },
1126                },
1127            ],
1128        };
1129        let listeners = mz_environmentd::Listeners::bind(listeners_config).await?;
1130        let host_name = format!(
1131            "localhost:{}",
1132            listeners.http["external"].handle.local_addr.port()
1133        );
1134        let catalog_config = CatalogConfig {
1135            persist_clients: Arc::clone(&persist_clients),
1136            metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
1137        };
1138        let server_config = mz_environmentd::Config {
1139            catalog_config,
1140            timestamp_oracle_url: Some(timestamp_oracle_url),
1141            controller: ControllerConfig {
1142                build_info: &mz_environmentd::BUILD_INFO,
1143                orchestrator,
1144                clusterd_image: "clusterd".into(),
1145                init_container_image: None,
1146                deploy_generation: 0,
1147                persist_location: PersistLocation {
1148                    blob_uri: format!(
1149                        "file://{}/persist/blob",
1150                        config.persist_dir.path().display()
1151                    )
1152                    .parse()
1153                    .expect("invalid blob URI"),
1154                    consensus_uri,
1155                },
1156                persist_clients,
1157                now: SYSTEM_TIME.clone(),
1158                metrics_registry: metrics_registry.clone(),
1159                persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
1160                secrets_args: mz_service::secrets::SecretsReaderCliArgs {
1161                    secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
1162                    secrets_reader_local_file_dir: Some(secrets_dir),
1163                    secrets_reader_kubernetes_context: None,
1164                    secrets_reader_aws_prefix: None,
1165                    secrets_reader_name_prefix: None,
1166                },
1167                connection_context,
1168            },
1169            secrets_controller,
1170            cloud_resource_controller: None,
1171            tls: None,
1172            frontegg: None,
1173            cors_allowed_origin: AllowOrigin::list([]),
1174            unsafe_mode: true,
1175            all_features: false,
1176            metrics_registry,
1177            now,
1178            environment_id,
1179            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
1180            bootstrap_default_cluster_replica_size: config.replica_size.clone(),
1181            bootstrap_default_cluster_replication_factor: config
1182                .replicas
1183                .try_into()
1184                .expect("replicas must fit"),
1185            bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1186                replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1187                size: config.replica_size.clone(),
1188            },
1189            bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1190                replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1191                size: config.replica_size.clone(),
1192            },
1193            bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1194                replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1195                size: config.replica_size.clone(),
1196            },
1197            bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1198                replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1199                size: config.replica_size.clone(),
1200            },
1201            bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1202                replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1203                size: config.replica_size.clone(),
1204            },
1205            system_parameter_defaults: {
1206                let mut params = BTreeMap::new();
1207                params.insert(
1208                    "log_filter".to_string(),
1209                    config.tracing.startup_log_filter.to_string(),
1210                );
1211                params.extend(config.system_parameter_defaults.clone());
1212                params
1213            },
1214            availability_zones: Default::default(),
1215            tracing_handle: config.tracing_handle.clone(),
1216            storage_usage_collection_interval: Duration::from_secs(3600),
1217            storage_usage_retention_period: None,
1218            segment_api_key: None,
1219            segment_client_side: false,
1220            // SLT doesn't like eternally running tasks since it waits for them to finish inbetween SLT files
1221            test_only_dummy_segment_client: false,
1222            egress_addresses: vec![],
1223            aws_account_id: None,
1224            aws_privatelink_availability_zones: None,
1225            launchdarkly_sdk_key: None,
1226            launchdarkly_key_map: Default::default(),
1227            config_sync_file_path: None,
1228            config_sync_timeout: Duration::from_secs(30),
1229            config_sync_loop_interval: None,
1230            bootstrap_role: Some("materialize".into()),
1231            http_host_name: Some(host_name),
1232            internal_console_redirect_url: None,
1233            tls_reload_certs: mz_server_core::cert_reload_never_reload(),
1234            helm_chart_version: None,
1235            license_key: ValidatedLicenseKey::for_tests(),
1236            external_login_password_mz_system: None,
1237        };
1238        // We need to run the server on its own Tokio runtime, which in turn
1239        // requires its own thread, so that we can wait for any tasks spawned
1240        // by the server to be shutdown at the end of each file. If we were to
1241        // share a Tokio runtime, tasks from the last file's server would still
1242        // be live at the start of the next file's server.
1243        let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
1244            oneshot::channel();
1245        let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
1246        let (password_server_addr_tx, password_server_addr_rx) = oneshot::channel();
1247        let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
1248        let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
1249        let server_thread = thread::spawn(|| {
1250            let runtime = match Runtime::new() {
1251                Ok(runtime) => runtime,
1252                Err(e) => {
1253                    server_addr_tx
1254                        .send(Err(e.into()))
1255                        .expect("receiver should not drop first");
1256                    return;
1257                }
1258            };
1259            let server = match runtime.block_on(listeners.serve(server_config)) {
1260                Ok(runtime) => runtime,
1261                Err(e) => {
1262                    server_addr_tx
1263                        .send(Err(e.into()))
1264                        .expect("receiver should not drop first");
1265                    return;
1266                }
1267            };
1268            server_addr_tx
1269                .send(Ok(server.sql_listener_handles["external"].local_addr))
1270                .expect("receiver should not drop first");
1271            internal_server_addr_tx
1272                .send(server.sql_listener_handles["internal"].local_addr)
1273                .expect("receiver should not drop first");
1274            password_server_addr_tx
1275                .send(server.sql_listener_handles["password"].local_addr)
1276                .expect("receiver should not drop first");
1277            internal_http_server_addr_tx
1278                .send(server.http_listener_handles["internal"].local_addr)
1279                .expect("receiver should not drop first");
1280            let _ = runtime.block_on(shutdown_trigger_rx);
1281        });
1282        let server_addr = server_addr_rx.await??;
1283        let internal_server_addr = internal_server_addr_rx.await?;
1284        let password_server_addr = password_server_addr_rx.await?;
1285        let internal_http_server_addr = internal_http_server_addr_rx.await?;
1286
1287        let system_client = connect(internal_server_addr, Some("mz_system"), None)
1288            .await
1289            .unwrap();
1290        let client = connect(server_addr, None, None).await.unwrap();
1291
1292        let inner = RunnerInner {
1293            server_addr,
1294            internal_server_addr,
1295            password_server_addr,
1296            internal_http_server_addr,
1297            _shutdown_trigger: shutdown_trigger,
1298            _server_thread: server_thread.join_on_drop(),
1299            _temp_dir: temp_dir,
1300            client,
1301            system_client,
1302            clients: BTreeMap::new(),
1303            auto_index_tables: config.auto_index_tables,
1304            auto_index_selects: config.auto_index_selects,
1305            auto_transactions: config.auto_transactions,
1306            enable_table_keys: config.enable_table_keys,
1307            verbosity: config.verbosity,
1308            stdout: config.stdout,
1309        };
1310        inner.ensure_fixed_features().await?;
1311
1312        Ok(inner)
1313    }
1314
1315    /// Set features that should be enabled regardless of whether reset-server was
1316    /// called. These features may be set conditionally depending on the run configuration.
1317    async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
1318        // We turn on enable_reduce_mfp_fusion, as we wish
1319        // to get as much coverage of these features as we can.
1320        // TODO(vmarcos): Remove this code when we retire this feature flag.
1321        self.system_client
1322            .execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
1323            .await?;
1324
1325        // Dangerous functions are useful for tests so we enable it for all tests.
1326        self.system_client
1327            .execute("ALTER SYSTEM SET unsafe_enable_unsafe_functions = on", &[])
1328            .await?;
1329        Ok(())
1330    }
1331
1332    async fn run_record<'r>(
1333        &mut self,
1334        record: &'r Record<'r>,
1335        in_transaction: &mut bool,
1336    ) -> Result<Outcome<'r>, anyhow::Error> {
1337        match &record {
1338            Record::Statement {
1339                expected_error,
1340                rows_affected,
1341                sql,
1342                location,
1343            } => {
1344                if self.auto_transactions && *in_transaction {
1345                    self.client.execute("COMMIT", &[]).await?;
1346                    *in_transaction = false;
1347                }
1348                match self
1349                    .run_statement(*expected_error, *rows_affected, sql, location.clone())
1350                    .await?
1351                {
1352                    Outcome::Success => {
1353                        if self.auto_index_tables {
1354                            let additional = mutate(sql);
1355                            for stmt in additional {
1356                                self.client.execute(&stmt, &[]).await?;
1357                            }
1358                        }
1359                        Ok(Outcome::Success)
1360                    }
1361                    other => {
1362                        if expected_error.is_some() {
1363                            Ok(other)
1364                        } else {
1365                            // If we failed to execute a statement that was supposed to succeed,
1366                            // running the rest of the tests in this file will probably cause
1367                            // false positives, so just give up on the file entirely.
1368                            Ok(Outcome::Bail {
1369                                cause: Box::new(other),
1370                                location: location.clone(),
1371                            })
1372                        }
1373                    }
1374                }
1375            }
1376            Record::Query {
1377                sql,
1378                output,
1379                location,
1380            } => {
1381                self.run_query(sql, output, location.clone(), in_transaction)
1382                    .await
1383            }
1384            Record::Simple {
1385                conn,
1386                user,
1387                password,
1388                sql,
1389                sort,
1390                output,
1391                location,
1392                ..
1393            } => {
1394                self.run_simple(
1395                    *conn,
1396                    *user,
1397                    *password,
1398                    sql,
1399                    sort.clone(),
1400                    output,
1401                    location.clone(),
1402                )
1403                .await
1404            }
1405            Record::Copy {
1406                table_name,
1407                tsv_path,
1408            } => {
1409                let tsv = tokio::fs::read(tsv_path).await?;
1410                let copy = self
1411                    .client
1412                    .copy_in(&*format!("COPY {} FROM STDIN", table_name))
1413                    .await?;
1414                tokio::pin!(copy);
1415                copy.send(bytes::Bytes::from(tsv)).await?;
1416                copy.finish().await?;
1417                Ok(Outcome::Success)
1418            }
1419            _ => Ok(Outcome::Success),
1420        }
1421    }
1422
1423    async fn run_statement<'r>(
1424        &self,
1425        expected_error: Option<&'r str>,
1426        expected_rows_affected: Option<u64>,
1427        sql: &'r str,
1428        location: Location,
1429    ) -> Result<Outcome<'r>, anyhow::Error> {
1430        static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
1431            LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
1432        if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
1433            // sure, we totally made you an index
1434            return Ok(Outcome::Success);
1435        }
1436
1437        match self.client.execute(sql, &[]).await {
1438            Ok(actual) => {
1439                if let Some(expected_error) = expected_error {
1440                    return Ok(Outcome::UnexpectedPlanSuccess {
1441                        expected_error,
1442                        location,
1443                    });
1444                }
1445                match expected_rows_affected {
1446                    None => Ok(Outcome::Success),
1447                    Some(expected) => {
1448                        if expected != actual {
1449                            Ok(Outcome::WrongNumberOfRowsInserted {
1450                                expected_count: expected,
1451                                actual_count: actual,
1452                                location,
1453                            })
1454                        } else {
1455                            Ok(Outcome::Success)
1456                        }
1457                    }
1458                }
1459            }
1460            Err(error) => {
1461                if let Some(expected_error) = expected_error {
1462                    if Regex::new(expected_error)?.is_match(&format!("{:#}", error)) {
1463                        return Ok(Outcome::Success);
1464                    }
1465                }
1466                Ok(Outcome::PlanFailure {
1467                    error: anyhow!(error),
1468                    location,
1469                })
1470            }
1471        }
1472    }
1473
1474    async fn prepare_query<'r>(
1475        &self,
1476        sql: &str,
1477        output: &'r Result<QueryOutput<'_>, &'r str>,
1478        location: Location,
1479        in_transaction: &mut bool,
1480    ) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
1481        // get statement
1482        let statements = match mz_sql::parse::parse(sql) {
1483            Ok(statements) => statements,
1484            Err(e) => match output {
1485                Ok(_) => {
1486                    return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1487                        error: e.into(),
1488                        location,
1489                    }));
1490                }
1491                Err(expected_error) => {
1492                    if Regex::new(expected_error)?.is_match(&format!("{:#}", e)) {
1493                        return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
1494                    } else {
1495                        return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1496                            error: e.into(),
1497                            location,
1498                        }));
1499                    }
1500                }
1501            },
1502        };
1503        let statement = match &*statements {
1504            [] => bail!("Got zero statements?"),
1505            [statement] => &statement.ast,
1506            _ => bail!("Got multiple statements: {:?}", statements),
1507        };
1508        let (is_select, num_attributes) = match statement {
1509            Statement::Select(stmt) => (true, derive_num_attributes(&stmt.query.body)),
1510            _ => (false, None),
1511        };
1512
1513        match output {
1514            Ok(_) => {
1515                if self.auto_transactions && !*in_transaction {
1516                    // No ISOLATION LEVEL SERIALIZABLE because of database-issues#5323
1517                    self.client.execute("BEGIN", &[]).await?;
1518                    *in_transaction = true;
1519                }
1520            }
1521            Err(_) => {
1522                if self.auto_transactions && *in_transaction {
1523                    self.client.execute("COMMIT", &[]).await?;
1524                    *in_transaction = false;
1525                }
1526            }
1527        }
1528
1529        // `SHOW` commands reference catalog schema, thus are not in the same timedomain and not
1530        // allowed in the same transaction, see:
1531        // https://materialize.com/docs/sql/begin/#same-timedomain-error
1532        match statement {
1533            Statement::Show(..) => {
1534                if self.auto_transactions && *in_transaction {
1535                    self.client.execute("COMMIT", &[]).await?;
1536                    *in_transaction = false;
1537                }
1538            }
1539            _ => (),
1540        }
1541        Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
1542            is_select,
1543            num_attributes,
1544        }))
1545    }
1546
1547    async fn execute_query<'r>(
1548        &self,
1549        sql: &str,
1550        output: &'r Result<QueryOutput<'_>, &'r str>,
1551        location: Location,
1552    ) -> Result<Outcome<'r>, anyhow::Error> {
1553        let rows = match self.client.query(sql, &[]).await {
1554            Ok(rows) => rows,
1555            Err(error) => {
1556                return match output {
1557                    Ok(_) => {
1558                        let error_string = format!("{}", error);
1559                        if error_string.contains("supported") || error_string.contains("overload") {
1560                            // this is a failure, but it's caused by lack of support rather than by bugs
1561                            Ok(Outcome::Unsupported {
1562                                error: anyhow!(error),
1563                                location,
1564                            })
1565                        } else {
1566                            Ok(Outcome::PlanFailure {
1567                                error: anyhow!(error),
1568                                location,
1569                            })
1570                        }
1571                    }
1572                    Err(expected_error) => {
1573                        if Regex::new(expected_error)?.is_match(&format!("{:#}", error)) {
1574                            Ok(Outcome::Success)
1575                        } else {
1576                            Ok(Outcome::PlanFailure {
1577                                error: anyhow!(error),
1578                                location,
1579                            })
1580                        }
1581                    }
1582                };
1583            }
1584        };
1585
1586        // unpack expected output
1587        let QueryOutput {
1588            sort,
1589            types: expected_types,
1590            column_names: expected_column_names,
1591            output: expected_output,
1592            mode,
1593            ..
1594        } = match output {
1595            Err(expected_error) => {
1596                return Ok(Outcome::UnexpectedPlanSuccess {
1597                    expected_error,
1598                    location,
1599                });
1600            }
1601            Ok(query_output) => query_output,
1602        };
1603
1604        // format output
1605        let mut formatted_rows = vec![];
1606        for row in &rows {
1607            if row.len() != expected_types.len() {
1608                return Ok(Outcome::WrongColumnCount {
1609                    expected_count: expected_types.len(),
1610                    actual_count: row.len(),
1611                    location,
1612                });
1613            }
1614            let row = format_row(row, expected_types, *mode);
1615            formatted_rows.push(row);
1616        }
1617
1618        // sort formatted output
1619        if let Sort::Row = sort {
1620            formatted_rows.sort();
1621        }
1622        let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
1623        if let Sort::Value = sort {
1624            values.sort();
1625        }
1626
1627        // Various checks as long as there are returned rows.
1628        if let Some(row) = rows.get(0) {
1629            // check column names
1630            if let Some(expected_column_names) = expected_column_names {
1631                let actual_column_names = row
1632                    .columns()
1633                    .iter()
1634                    .map(|t| ColumnName::from(t.name()))
1635                    .collect::<Vec<_>>();
1636                if expected_column_names != &actual_column_names {
1637                    return Ok(Outcome::WrongColumnNames {
1638                        expected_column_names,
1639                        actual_column_names,
1640                        actual_output: Output::Values(values),
1641                        location,
1642                    });
1643                }
1644            }
1645        }
1646
1647        // check output
1648        match expected_output {
1649            Output::Values(expected_values) => {
1650                if values != *expected_values {
1651                    return Ok(Outcome::OutputFailure {
1652                        expected_output,
1653                        actual_raw_output: rows,
1654                        actual_output: Output::Values(values),
1655                        location,
1656                    });
1657                }
1658            }
1659            Output::Hashed {
1660                num_values,
1661                md5: expected_md5,
1662            } => {
1663                let mut hasher = Md5::new();
1664                for value in &values {
1665                    hasher.update(value);
1666                    hasher.update("\n");
1667                }
1668                let md5 = format!("{:x}", hasher.finalize());
1669                if values.len() != *num_values || md5 != *expected_md5 {
1670                    return Ok(Outcome::OutputFailure {
1671                        expected_output,
1672                        actual_raw_output: rows,
1673                        actual_output: Output::Hashed {
1674                            num_values: values.len(),
1675                            md5,
1676                        },
1677                        location,
1678                    });
1679                }
1680            }
1681        }
1682
1683        Ok(Outcome::Success)
1684    }
1685
1686    async fn execute_view_inner<'r>(
1687        &self,
1688        sql: &str,
1689        output: &'r Result<QueryOutput<'_>, &'r str>,
1690        location: Location,
1691    ) -> Result<Option<Outcome<'r>>, anyhow::Error> {
1692        print_sql_if(self.stdout, sql, self.verbosity >= 2);
1693        let sql_result = self.client.execute(sql, &[]).await;
1694
1695        // Evaluate if we already reached an outcome or not.
1696        let tentative_outcome = if let Err(view_error) = sql_result {
1697            if let Err(expected_error) = output {
1698                if Regex::new(expected_error)?.is_match(&format!("{:#}", view_error)) {
1699                    Some(Outcome::Success)
1700                } else {
1701                    Some(Outcome::PlanFailure {
1702                        error: view_error.into(),
1703                        location: location.clone(),
1704                    })
1705                }
1706            } else {
1707                Some(Outcome::PlanFailure {
1708                    error: view_error.into(),
1709                    location: location.clone(),
1710                })
1711            }
1712        } else {
1713            None
1714        };
1715        Ok(tentative_outcome)
1716    }
1717
1718    async fn execute_view<'r>(
1719        &self,
1720        sql: &str,
1721        num_attributes: Option<usize>,
1722        output: &'r Result<QueryOutput<'_>, &'r str>,
1723        location: Location,
1724    ) -> Result<Outcome<'r>, anyhow::Error> {
1725        // Create indexed view SQL commands and execute `CREATE VIEW`.
1726        let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
1727            column_names.clone()
1728        } else {
1729            None
1730        };
1731        let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
1732            sql,
1733            Uuid::new_v4().as_simple(),
1734            num_attributes,
1735            expected_column_names,
1736        );
1737        let tentative_outcome = self
1738            .execute_view_inner(create_view.as_str(), output, location.clone())
1739            .await?;
1740
1741        // Either we already have an outcome or alternatively,
1742        // we proceed to index and query the view.
1743        if let Some(view_outcome) = tentative_outcome {
1744            return Ok(view_outcome);
1745        }
1746
1747        let tentative_outcome = self
1748            .execute_view_inner(create_index.as_str(), output, location.clone())
1749            .await?;
1750
1751        let view_outcome;
1752        if let Some(outcome) = tentative_outcome {
1753            view_outcome = outcome;
1754        } else {
1755            print_sql_if(self.stdout, view_sql.as_str(), self.verbosity >= 2);
1756            view_outcome = self
1757                .execute_query(view_sql.as_str(), output, location.clone())
1758                .await?;
1759        }
1760
1761        // Remember to clean up after ourselves by dropping the view.
1762        print_sql_if(self.stdout, drop_view.as_str(), self.verbosity >= 2);
1763        self.client.execute(drop_view.as_str(), &[]).await?;
1764
1765        Ok(view_outcome)
1766    }
1767
1768    async fn run_query<'r>(
1769        &self,
1770        sql: &'r str,
1771        output: &'r Result<QueryOutput<'_>, &'r str>,
1772        location: Location,
1773        in_transaction: &mut bool,
1774    ) -> Result<Outcome<'r>, anyhow::Error> {
1775        let prepare_outcome = self
1776            .prepare_query(sql, output, location.clone(), in_transaction)
1777            .await?;
1778        match prepare_outcome {
1779            PrepareQueryOutcome::QueryPrepared(QueryInfo {
1780                is_select,
1781                num_attributes,
1782            }) => {
1783                let query_outcome = self.execute_query(sql, output, location.clone()).await?;
1784                if is_select && self.auto_index_selects {
1785                    let view_outcome = self
1786                        .execute_view(sql, None, output, location.clone())
1787                        .await?;
1788
1789                    // We compare here the query-based and view-based outcomes.
1790                    // We only produce a test failure if the outcomes are of different
1791                    // variant types, thus accepting smaller deviations in the details
1792                    // produced for each variant.
1793                    if std::mem::discriminant::<Outcome>(&query_outcome)
1794                        != std::mem::discriminant::<Outcome>(&view_outcome)
1795                    {
1796                        // Before producing a failure outcome, we try to obtain a new
1797                        // outcome for view-based execution exploiting analysis of the
1798                        // number of attributes. This two-level strategy can avoid errors
1799                        // produced by column ambiguity in the `SELECT`.
1800                        let view_outcome = if num_attributes.is_some() {
1801                            self.execute_view(sql, num_attributes, output, location.clone())
1802                                .await?
1803                        } else {
1804                            view_outcome
1805                        };
1806
1807                        if std::mem::discriminant::<Outcome>(&query_outcome)
1808                            != std::mem::discriminant::<Outcome>(&view_outcome)
1809                        {
1810                            let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
1811                                query_outcome: Box::new(query_outcome),
1812                                view_outcome: Box::new(view_outcome),
1813                                location: location.clone(),
1814                            };
1815                            // Determine if this inconsistent view outcome should be reported
1816                            // as an error or only as a warning.
1817                            let outcome = if should_warn(&inconsistent_view_outcome) {
1818                                Outcome::Warning {
1819                                    cause: Box::new(inconsistent_view_outcome),
1820                                    location: location.clone(),
1821                                }
1822                            } else {
1823                                inconsistent_view_outcome
1824                            };
1825                            return Ok(outcome);
1826                        }
1827                    }
1828                }
1829                Ok(query_outcome)
1830            }
1831            PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
1832        }
1833    }
1834
1835    async fn get_conn(
1836        &mut self,
1837        name: Option<&str>,
1838        user: Option<&str>,
1839        password: Option<&str>,
1840    ) -> Result<&tokio_postgres::Client, tokio_postgres::Error> {
1841        match name {
1842            None => Ok(&self.client),
1843            Some(name) => {
1844                if !self.clients.contains_key(name) {
1845                    let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
1846                        self.internal_server_addr
1847                    } else if password.is_some() {
1848                        // Use password server for password authentication
1849                        self.password_server_addr
1850                    } else {
1851                        self.server_addr
1852                    };
1853                    let client = connect(addr, user, password).await?;
1854                    self.clients.insert(name.into(), client);
1855                }
1856                Ok(self.clients.get(name).unwrap())
1857            }
1858        }
1859    }
1860
1861    async fn run_simple<'r>(
1862        &mut self,
1863        conn: Option<&'r str>,
1864        user: Option<&'r str>,
1865        password: Option<&'r str>,
1866        sql: &'r str,
1867        sort: Sort,
1868        output: &'r Output,
1869        location: Location,
1870    ) -> Result<Outcome<'r>, anyhow::Error> {
1871        let actual = match self.get_conn(conn, user, password).await {
1872            Ok(client) => match client.simple_query(sql).await {
1873                Ok(result) => {
1874                    let mut rows = Vec::new();
1875
1876                    for m in result.into_iter() {
1877                        match m {
1878                            SimpleQueryMessage::Row(row) => {
1879                                let mut s = vec![];
1880                                for i in 0..row.len() {
1881                                    s.push(row.get(i).unwrap_or("NULL"));
1882                                }
1883                                rows.push(s.join(","));
1884                            }
1885                            SimpleQueryMessage::CommandComplete(count) => {
1886                                // This applies any sort on the COMPLETE line as
1887                                // well, but we do the same for the expected output.
1888                                rows.push(format!("COMPLETE {}", count));
1889                            }
1890                            SimpleQueryMessage::RowDescription(_) => {}
1891                            _ => panic!("unexpected"),
1892                        }
1893                    }
1894
1895                    if let Sort::Row = sort {
1896                        rows.sort();
1897                    }
1898
1899                    Output::Values(rows)
1900                }
1901                // Errors can contain multiple lines (say if there are details), and rewrite
1902                // sticks them each on their own line, so we need to split up the lines here to
1903                // each be its own String in the Vec.
1904                Err(error) => {
1905                    Output::Values(error.to_string().lines().map(|s| s.to_string()).collect())
1906                }
1907            },
1908            Err(error) => {
1909                Output::Values(error.to_string().lines().map(|s| s.to_string()).collect())
1910            }
1911        };
1912        if *output != actual {
1913            Ok(Outcome::OutputFailure {
1914                expected_output: output,
1915                actual_raw_output: vec![],
1916                actual_output: actual,
1917                location,
1918            })
1919        } else {
1920            Ok(Outcome::Success)
1921        }
1922    }
1923
1924    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
1925        let url = format!(
1926            "http://{}/api/catalog/check",
1927            self.internal_http_server_addr
1928        );
1929        let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
1930
1931        if let Some(inconsistencies) = response.get("err") {
1932            let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
1933                .expect("serializing Value cannot fail");
1934            Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
1935        } else {
1936            Ok(())
1937        }
1938    }
1939}
1940
1941async fn connect(
1942    addr: SocketAddr,
1943    user: Option<&str>,
1944    password: Option<&str>,
1945) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
1946    let mut config = tokio_postgres::Config::new();
1947    config.host(addr.ip().to_string());
1948    config.port(addr.port());
1949    config.user(user.unwrap_or("materialize"));
1950    if let Some(password) = password {
1951        config.password(password);
1952    }
1953    let (client, connection) = config.connect(NoTls).await?;
1954
1955    task::spawn(|| "sqllogictest_connect", async move {
1956        if let Err(e) = connection.await {
1957            eprintln!("connection error: {}", e);
1958        }
1959    });
1960    Ok(client)
1961}
1962
1963pub trait WriteFmt {
1964    fn write_fmt(&self, fmt: fmt::Arguments<'_>);
1965}
1966
1967pub struct RunConfig<'a> {
1968    pub stdout: &'a dyn WriteFmt,
1969    pub stderr: &'a dyn WriteFmt,
1970    pub verbosity: u8,
1971    pub postgres_url: String,
1972    pub prefix: String,
1973    pub no_fail: bool,
1974    pub fail_fast: bool,
1975    pub auto_index_tables: bool,
1976    pub auto_index_selects: bool,
1977    pub auto_transactions: bool,
1978    pub enable_table_keys: bool,
1979    pub orchestrator_process_wrapper: Option<String>,
1980    pub tracing: TracingCliArgs,
1981    pub tracing_handle: TracingHandle,
1982    pub system_parameter_defaults: BTreeMap<String, String>,
1983    /// Persist state is handled specially because:
1984    /// - Persist background workers do not necessarily shut down immediately once the server is
1985    ///   shut down, and may panic if their storage is deleted out from under them.
1986    /// - It's safe for different databases to reference the same state: all data is scoped by UUID.
1987    pub persist_dir: TempDir,
1988    pub replicas: usize,
1989    pub replica_size: String,
1990}
1991
1992fn print_record(config: &RunConfig<'_>, record: &Record) {
1993    match record {
1994        Record::Statement { sql, .. } | Record::Query { sql, .. } => print_sql(config.stdout, sql),
1995        _ => (),
1996    }
1997}
1998
1999fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
2000    if cond {
2001        print_sql(stdout, sql)
2002    }
2003}
2004
2005fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str) {
2006    writeln!(stdout, "{}", crate::util::indent(sql, 4))
2007}
2008
2009/// Regular expressions for matching error messages that should force a plan failure
2010/// in an inconsistent view outcome into a warning if the corresponding query succeeds.
2011const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
2012    // The following are unfixable errors in indexed views given our
2013    // current constraints.
2014    "cannot materialize call to",
2015    "SHOW commands are not allowed in views",
2016    "cannot create view with unstable dependencies",
2017    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
2018    "no valid schema selected",
2019    r#"system schema '\w+' cannot be modified"#,
2020    r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
2021    // NOTE(vmarcos): Column ambiguity that could not be eliminated by our
2022    // currently implemented syntactic rewrites is considered unfixable.
2023    // In addition, if some column cannot be dealt with, e.g., in `ORDER BY`
2024    // references, we treat this condition as unfixable as well.
2025    r#"column "[\w\?]+" specified more than once"#,
2026    r#"column "(\w+\.)?\w+" does not exist"#,
2027];
2028
2029/// Evaluates if the given outcome should be returned directly or if it should
2030/// be wrapped as a warning. Note that this function should be used for outcomes
2031/// that can be judged in a context-independent manner, i.e., the outcome itself
2032/// provides enough information as to whether a warning should be emitted or not.
2033fn should_warn(outcome: &Outcome) -> bool {
2034    match outcome {
2035        Outcome::InconsistentViewOutcome {
2036            query_outcome,
2037            view_outcome,
2038            ..
2039        } => match (query_outcome.as_ref(), view_outcome.as_ref()) {
2040            (Outcome::Success, Outcome::PlanFailure { error, .. }) => {
2041                INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
2042                    Regex::new(s)
2043                        .expect("unexpected error in regular expression parsing")
2044                        .is_match(&format!("{:#}", error))
2045                })
2046            }
2047            _ => false,
2048        },
2049        _ => false,
2050    }
2051}
2052
2053pub async fn run_string(
2054    runner: &mut Runner<'_>,
2055    source: &str,
2056    input: &str,
2057) -> Result<Outcomes, anyhow::Error> {
2058    runner.reset_database().await?;
2059
2060    let mut outcomes = Outcomes::default();
2061    let mut parser = crate::parser::Parser::new(source, input);
2062    // Transactions are currently relatively slow. Since sqllogictest runs in a single connection
2063    // there should be no difference in having longer running transactions.
2064    let mut in_transaction = false;
2065    writeln!(runner.config.stdout, "--- {}", source);
2066
2067    for record in parser.parse_records()? {
2068        // In maximal-verbosity mode, print the query before attempting to run
2069        // it. Running the query might panic, so it is important to print out
2070        // what query we are trying to run *before* we panic.
2071        if runner.config.verbosity >= 2 {
2072            print_record(runner.config, &record);
2073        }
2074
2075        let outcome = runner
2076            .run_record(&record, &mut in_transaction)
2077            .await
2078            .map_err(|err| format!("In {}:\n{}", source, err))
2079            .unwrap();
2080
2081        // Print warnings and failures in verbose mode.
2082        if runner.config.verbosity >= 1 && !outcome.success() {
2083            if runner.config.verbosity < 2 {
2084                // If `verbosity >= 2`, we'll already have printed the record,
2085                // so don't print it again. Yes, this is an ugly bit of logic.
2086                // Please don't try to consolidate it with the `print_record`
2087                // call above, as it's important to have a mode in which records
2088                // are printed before they are run, so that if running the
2089                // record panics, you can tell which record caused it.
2090                if !outcome.failure() {
2091                    writeln!(
2092                        runner.config.stdout,
2093                        "{}",
2094                        util::indent("Warning detected for: ", 4)
2095                    );
2096                }
2097                print_record(runner.config, &record);
2098            }
2099            if runner.config.verbosity >= 2 || outcome.failure() {
2100                writeln!(
2101                    runner.config.stdout,
2102                    "{}",
2103                    util::indent(&outcome.to_string(), 4)
2104                );
2105                writeln!(runner.config.stdout, "{}", util::indent("----", 4));
2106            }
2107        }
2108
2109        outcomes.stats[outcome.code()] += 1;
2110        if outcome.failure() {
2111            outcomes.details.push(format!("{}", outcome));
2112        }
2113
2114        if let Outcome::Bail { .. } = outcome {
2115            break;
2116        }
2117
2118        if runner.config.fail_fast && outcome.failure() {
2119            break;
2120        }
2121    }
2122    Ok(outcomes)
2123}
2124
2125pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
2126    let mut input = String::new();
2127    File::open(filename)?.read_to_string(&mut input)?;
2128    let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
2129    runner.check_catalog().await?;
2130
2131    Ok(outcomes)
2132}
2133
2134pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
2135    runner.reset_database().await?;
2136
2137    let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
2138
2139    let mut input = String::new();
2140    file.read_to_string(&mut input)?;
2141
2142    let mut buf = RewriteBuffer::new(&input);
2143
2144    let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
2145    writeln!(runner.config.stdout, "--- {}", filename.display());
2146    let mut in_transaction = false;
2147
2148    fn append_values_output(
2149        buf: &mut RewriteBuffer,
2150        input: &String,
2151        expected_output: &str,
2152        mode: &Mode,
2153        types: &Vec<Type>,
2154        column_names: Option<&Vec<ColumnName>>,
2155        actual_output: &Vec<String>,
2156        multiline: bool,
2157    ) {
2158        buf.append_header(input, expected_output, column_names);
2159
2160        for (i, row) in actual_output.chunks(types.len()).enumerate() {
2161            match mode {
2162                // In Cockroach mode, output each row on its own line, with
2163                // two spaces between each column.
2164                Mode::Cockroach => {
2165                    if i != 0 {
2166                        buf.append("\n");
2167                    }
2168
2169                    if row.len() == 0 {
2170                        // nothing to do
2171                    } else if row.len() == 1 {
2172                        // If there is only one column, then there is no need for space
2173                        // substitution, so we only do newline substitution.
2174                        if multiline {
2175                            buf.append(&row[0]);
2176                        } else {
2177                            buf.append(&row[0].replace('\n', "⏎"))
2178                        }
2179                    } else {
2180                        // Substitute spaces with ␠ to avoid mistaking the spaces in the result
2181                        // values with spaces that separate columns.
2182                        buf.append(
2183                            &row.iter()
2184                                .map(|col| {
2185                                    let mut col = col.replace(' ', "␠");
2186                                    if !multiline {
2187                                        col = col.replace('\n', "⏎");
2188                                    }
2189                                    col
2190                                })
2191                                .join("  "),
2192                        );
2193                    }
2194                }
2195                // In standard mode, output each value on its own line,
2196                // and ignore row boundaries.
2197                // No need to substitute spaces, because every value (not row) is on a separate
2198                // line. But we do need to substitute newlines.
2199                Mode::Standard => {
2200                    for (j, col) in row.iter().enumerate() {
2201                        if i != 0 || j != 0 {
2202                            buf.append("\n");
2203                        }
2204                        buf.append(&if multiline {
2205                            col.clone()
2206                        } else {
2207                            col.replace('\n', "⏎")
2208                        });
2209                    }
2210                }
2211            }
2212        }
2213    }
2214
2215    for record in parser.parse_records()? {
2216        let outcome = runner.run_record(&record, &mut in_transaction).await?;
2217
2218        match (&record, &outcome) {
2219            // If we see an output failure for a query, rewrite the expected output
2220            // to match the observed output.
2221            (
2222                Record::Query {
2223                    output:
2224                        Ok(QueryOutput {
2225                            mode,
2226                            output: Output::Values(_),
2227                            output_str: expected_output,
2228                            types,
2229                            column_names,
2230                            multiline,
2231                            ..
2232                        }),
2233                    ..
2234                },
2235                Outcome::OutputFailure {
2236                    actual_output: Output::Values(actual_output),
2237                    ..
2238                },
2239            ) => {
2240                append_values_output(
2241                    &mut buf,
2242                    &input,
2243                    expected_output,
2244                    mode,
2245                    types,
2246                    column_names.as_ref(),
2247                    actual_output,
2248                    *multiline,
2249                );
2250            }
2251            (
2252                Record::Query {
2253                    output:
2254                        Ok(QueryOutput {
2255                            mode,
2256                            output: Output::Values(_),
2257                            output_str: expected_output,
2258                            types,
2259                            multiline,
2260                            ..
2261                        }),
2262                    ..
2263                },
2264                Outcome::WrongColumnNames {
2265                    actual_column_names,
2266                    actual_output: Output::Values(actual_output),
2267                    ..
2268                },
2269            ) => {
2270                append_values_output(
2271                    &mut buf,
2272                    &input,
2273                    expected_output,
2274                    mode,
2275                    types,
2276                    Some(actual_column_names),
2277                    actual_output,
2278                    *multiline,
2279                );
2280            }
2281            (
2282                Record::Query {
2283                    output:
2284                        Ok(QueryOutput {
2285                            output: Output::Hashed { .. },
2286                            output_str: expected_output,
2287                            column_names,
2288                            ..
2289                        }),
2290                    ..
2291                },
2292                Outcome::OutputFailure {
2293                    actual_output: Output::Hashed { num_values, md5 },
2294                    ..
2295                },
2296            ) => {
2297                buf.append_header(&input, expected_output, column_names.as_ref());
2298
2299                buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
2300            }
2301            (
2302                Record::Simple {
2303                    output_str: expected_output,
2304                    ..
2305                },
2306                Outcome::OutputFailure {
2307                    actual_output: Output::Values(actual_output),
2308                    ..
2309                },
2310            ) => {
2311                buf.append_header(&input, expected_output, None);
2312
2313                for (i, row) in actual_output.iter().enumerate() {
2314                    if i != 0 {
2315                        buf.append("\n");
2316                    }
2317                    buf.append(row);
2318                }
2319            }
2320            (
2321                Record::Query {
2322                    sql,
2323                    output: Err(err),
2324                    ..
2325                },
2326                outcome,
2327            )
2328            | (
2329                Record::Statement {
2330                    expected_error: Some(err),
2331                    sql,
2332                    ..
2333                },
2334                outcome,
2335            ) if outcome.err_msg().is_some() => {
2336                buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
2337            }
2338            (_, Outcome::Success) => {}
2339            _ => bail!("unexpected: {:?} {:?}", record, outcome),
2340        }
2341    }
2342
2343    file.set_len(0)?;
2344    file.seek(SeekFrom::Start(0))?;
2345    file.write_all(buf.finish().as_bytes())?;
2346    file.sync_all()?;
2347    Ok(())
2348}
2349
2350/// Provides a means to rewrite the `.slt` file while iterating over it.
2351///
2352/// This struct takes the slt file as its `input`, tracks a cursor into it
2353/// (`input_offset`), and provides a buffer (`output`) to store the rewritten
2354/// results.
2355///
2356/// Functions that modify the file will lazily move `input` into `output` using
2357/// `flush_to`. However, those calls should all be interior to other functions.
2358#[derive(Debug)]
2359struct RewriteBuffer<'a> {
2360    input: &'a str,
2361    input_offset: usize,
2362    output: String,
2363}
2364
2365impl<'a> RewriteBuffer<'a> {
2366    fn new(input: &'a str) -> RewriteBuffer<'a> {
2367        RewriteBuffer {
2368            input,
2369            input_offset: 0,
2370            output: String::new(),
2371        }
2372    }
2373
2374    fn flush_to(&mut self, offset: usize) {
2375        assert!(offset >= self.input_offset);
2376        let chunk = &self.input[self.input_offset..offset];
2377        self.output.push_str(chunk);
2378        self.input_offset = offset;
2379    }
2380
2381    fn skip_to(&mut self, offset: usize) {
2382        assert!(offset >= self.input_offset);
2383        self.input_offset = offset;
2384    }
2385
2386    fn append(&mut self, s: &str) {
2387        self.output.push_str(s);
2388    }
2389
2390    fn append_header(
2391        &mut self,
2392        input: &String,
2393        expected_output: &str,
2394        column_names: Option<&Vec<ColumnName>>,
2395    ) {
2396        // Output everything before this record.
2397        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2398        #[allow(clippy::as_conversions)]
2399        let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
2400        self.flush_to(offset);
2401        self.skip_to(offset + expected_output.len());
2402
2403        // Attempt to install the result separator (----), if it does
2404        // not already exist.
2405        if self.peek_last(5) == "\n----" {
2406            self.append("\n");
2407        } else if self.peek_last(6) != "\n----\n" {
2408            self.append("\n----\n");
2409        }
2410
2411        let Some(names) = column_names else {
2412            return;
2413        };
2414        self.append(
2415            &names
2416                .iter()
2417                .map(|name| name.replace(' ', "␠"))
2418                .collect::<Vec<_>>()
2419                .join(" "),
2420        );
2421        self.append("\n");
2422    }
2423
2424    fn rewrite_expected_error(
2425        &mut self,
2426        input: &String,
2427        old_err: &str,
2428        new_err: &str,
2429        query: &str,
2430    ) {
2431        // Output everything before this error message.
2432        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2433        #[allow(clippy::as_conversions)]
2434        let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
2435        self.flush_to(err_offset);
2436        self.append(new_err);
2437        self.append("\n");
2438        self.append(query);
2439        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2440        #[allow(clippy::as_conversions)]
2441        self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
2442    }
2443
2444    fn peek_last(&self, n: usize) -> &str {
2445        &self.output[self.output.len() - n..]
2446    }
2447
2448    fn finish(mut self) -> String {
2449        self.flush_to(self.input.len());
2450        self.output
2451    }
2452}
2453
2454/// Generates view creation, view indexing, view querying, and view
2455/// dropping SQL commands for a given `SELECT` query. If the number
2456/// of attributes produced by the query is known, the view commands
2457/// are specialized to avoid issues with column ambiguity. This
2458/// function is a helper for `--auto_index_selects` and assumes that
2459/// the provided input SQL has already been run through the parser,
2460/// resulting in a valid `SELECT` statement.
2461fn generate_view_sql(
2462    sql: &str,
2463    view_uuid: &Simple,
2464    num_attributes: Option<usize>,
2465    expected_column_names: Option<Vec<ColumnName>>,
2466) -> (String, String, String, String) {
2467    // To create the view, re-parse the sql; note that we must find exactly
2468    // one statement and it must be a `SELECT`.
2469    // NOTE(vmarcos): Direct string manipulation was attempted while
2470    // prototyping the code below, which avoids the extra parsing and
2471    // data structure cloning. However, running DDL is so slow that
2472    // it did not matter in terms of runtime. We can revisit this if
2473    // DDL cost drops dramatically in the future.
2474    let stmts = parser::parse_statements(sql).unwrap_or_default();
2475    assert!(stmts.len() == 1);
2476    let (query, query_as_of) = match &stmts[0].ast {
2477        Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
2478        _ => unreachable!("This function should only be called for SELECTs"),
2479    };
2480
2481    // Prior to creating the view, process the `ORDER BY` clause of
2482    // the `SELECT` query, if any. Ordering is not preserved when a
2483    // view includes an `ORDER BY` clause and must be re-enforced by
2484    // an external `ORDER BY` clause when querying the view.
2485    let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
2486        (query.order_by.clone(), vec![], None)
2487    } else {
2488        derive_order_by(&query.body, &query.order_by)
2489    };
2490
2491    // Since one-shot SELECT statements may contain ambiguous column names,
2492    // we either use the expected column names, if that option was
2493    // provided, or else just rename the output schema of the view
2494    // using numerically increasing attribute names, whenever possible.
2495    // This strategy makes it possible to use `CREATE INDEX`, thus
2496    // matching the behavior of the option `auto_index_tables`. However,
2497    // we may be presented with a `SELECT *` query, in which case the parser
2498    // does not produce sufficient information to allow us to compute
2499    // the number of output columns. In the latter case, we are supplied
2500    // with `None` for `num_attributes` and just employ the command
2501    // `CREATE DEFAULT INDEX` instead. Additionally, the view is created
2502    // without schema renaming. This strategy is insufficient to dodge
2503    // column name ambiguity in all cases, but we assume here that we
2504    // can adjust the (hopefully) small number of tests that eventually
2505    // challenge us in this particular way.
2506    let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
2507    let projection = expected_column_names.map_or_else(
2508        || {
2509            num_attributes.map_or(vec![], |n| {
2510                (1..=n)
2511                    .map(|i| Ident::new_unchecked(format!("a{i}")))
2512                    .collect()
2513            })
2514        },
2515        |cols| {
2516            cols.iter()
2517                .map(|c| Ident::new_unchecked(c.as_str()))
2518                .collect()
2519        },
2520    );
2521    let columns: Vec<Ident> = projection
2522        .iter()
2523        .cloned()
2524        .chain(extra_columns.iter().map(|item| {
2525            if let SelectItem::Expr {
2526                expr: _,
2527                alias: Some(ident),
2528            } = item
2529            {
2530                ident.clone()
2531            } else {
2532                unreachable!("alias must be given for extra column")
2533            }
2534        }))
2535        .collect();
2536
2537    // Build a `CREATE VIEW` with the columns computed above.
2538    let mut query = query.clone();
2539    if extra_columns.len() > 0 {
2540        match &mut query.body {
2541            SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
2542            _ => unimplemented!("cannot yet rewrite projections of nested queries"),
2543        }
2544    }
2545    let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
2546        if_exists: IfExistsBehavior::Error,
2547        temporary: false,
2548        definition: ViewDefinition {
2549            name: name.clone(),
2550            columns: columns.clone(),
2551            query,
2552        },
2553    })
2554    .to_ast_string_stable();
2555
2556    // We then create either a `CREATE INDEX` or a `CREATE DEFAULT INDEX`
2557    // statement, depending on whether we could obtain the number of
2558    // attributes from the original `SELECT`.
2559    let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2560        name: None,
2561        in_cluster: None,
2562        on_name: RawItemName::Name(name.clone()),
2563        key_parts: if columns.len() == 0 {
2564            None
2565        } else {
2566            Some(
2567                columns
2568                    .iter()
2569                    .map(|ident| Expr::Identifier(vec![ident.clone()]))
2570                    .collect(),
2571            )
2572        },
2573        with_options: Vec::new(),
2574        if_not_exists: false,
2575    })
2576    .to_ast_string_stable();
2577
2578    // Assert if DISTINCT semantics are unchanged from view
2579    let distinct_unneeded = extra_columns.len() == 0
2580        || match distinct {
2581            None | Some(Distinct::On(_)) => true,
2582            Some(Distinct::EntireRow) => false,
2583        };
2584    let distinct = if distinct_unneeded { None } else { distinct };
2585
2586    // `SELECT [* | {projection}] FROM {name} [ORDER BY {view_order_by}]`
2587    let view_sql = AstStatement::<Raw>::Select(SelectStatement {
2588        query: Query {
2589            ctes: CteBlock::Simple(vec![]),
2590            body: SetExpr::Select(Box::new(Select {
2591                distinct,
2592                projection: if projection.len() == 0 {
2593                    vec![SelectItem::Wildcard]
2594                } else {
2595                    projection
2596                        .iter()
2597                        .map(|ident| SelectItem::Expr {
2598                            expr: Expr::Identifier(vec![ident.clone()]),
2599                            alias: None,
2600                        })
2601                        .collect()
2602                },
2603                from: vec![TableWithJoins {
2604                    relation: TableFactor::Table {
2605                        name: RawItemName::Name(name.clone()),
2606                        alias: None,
2607                    },
2608                    joins: vec![],
2609                }],
2610                selection: None,
2611                group_by: vec![],
2612                having: None,
2613                qualify: None,
2614                options: vec![],
2615            })),
2616            order_by: view_order_by,
2617            limit: None,
2618            offset: None,
2619        },
2620        as_of: query_as_of.clone(),
2621    })
2622    .to_ast_string_stable();
2623
2624    // `DROP VIEW {name}`
2625    let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
2626        object_type: ObjectType::View,
2627        if_exists: false,
2628        names: vec![UnresolvedObjectName::Item(name)],
2629        cascade: false,
2630    })
2631    .to_ast_string_stable();
2632
2633    (create_view, create_index, view_sql, drop_view)
2634}
2635
2636/// Analyzes the provided query `body` to derive the number of
2637/// attributes in the query. We only consider syntactic cues,
2638/// so we may end up deriving `None` for the number of attributes
2639/// as a conservative approximation.
2640fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
2641    let Some((projection, _)) = find_projection(body) else {
2642        return None;
2643    };
2644    derive_num_attributes_from_projection(projection)
2645}
2646
2647/// Analyzes a query's `ORDER BY` clause to derive an `ORDER BY`
2648/// clause that makes numeric references to any expressions in
2649/// the projection and generated-attribute references to expressions
2650/// that need to be added as extra columns to the projection list.
2651/// The rewritten `ORDER BY` clause is then usable when querying a
2652/// view that contains the same `SELECT` as the given query.
2653/// This function returns both the rewritten `ORDER BY` clause
2654/// as well as a list of extra columns that need to be added
2655/// to the query's projection for the `ORDER BY` clause to
2656/// succeed.
2657fn derive_order_by(
2658    body: &SetExpr<Raw>,
2659    order_by: &Vec<OrderByExpr<Raw>>,
2660) -> (
2661    Vec<OrderByExpr<Raw>>,
2662    Vec<SelectItem<Raw>>,
2663    Option<Distinct<Raw>>,
2664) {
2665    let Some((projection, distinct)) = find_projection(body) else {
2666        return (vec![], vec![], None);
2667    };
2668    let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
2669    (view_order_by, extra_columns, distinct.clone())
2670}
2671
2672/// Finds the projection list in a `SELECT` query body.
2673fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
2674    // Iterate to peel off the query body until the query's
2675    // projection list is found.
2676    let mut set_expr = body;
2677    loop {
2678        match set_expr {
2679            SetExpr::Select(select) => {
2680                return Some((&select.projection, &select.distinct));
2681            }
2682            SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
2683            SetExpr::Query(query) => set_expr = &query.body,
2684            _ => return None,
2685        }
2686    }
2687}
2688
2689/// Computes the number of attributes that are obtained by the
2690/// projection of a `SELECT` query. The projection may include
2691/// wildcards, in which case the analysis just returns `None`.
2692fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
2693    let mut num_attributes = 0usize;
2694    for item in projection.iter() {
2695        let SelectItem::Expr { expr, .. } = item else {
2696            return None;
2697        };
2698        match expr {
2699            Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
2700                return None;
2701            }
2702            _ => {
2703                num_attributes += 1;
2704            }
2705        }
2706    }
2707    Some(num_attributes)
2708}
2709
2710/// Computes an `ORDER BY` clause with only numeric references
2711/// from given projection and `ORDER BY` of a `SELECT` query.
2712/// If the derivation fails to match a given expression, the
2713/// matched prefix is returned. Note that this could be empty.
2714fn derive_order_by_from_projection(
2715    projection: &Vec<SelectItem<Raw>>,
2716    order_by: &Vec<OrderByExpr<Raw>>,
2717) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
2718    let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
2719    let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
2720    for order_by_expr in order_by.iter() {
2721        let query_expr = &order_by_expr.expr;
2722        let view_expr = match query_expr {
2723            Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
2724            _ => {
2725                // Find expression in query projection, if we can.
2726                if let Some(i) = projection.iter().position(|item| match item {
2727                    SelectItem::Expr { expr, alias } => {
2728                        expr == query_expr
2729                            || match query_expr {
2730                                Expr::Identifier(ident) => {
2731                                    ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
2732                                }
2733                                _ => false,
2734                            }
2735                    }
2736                    SelectItem::Wildcard => false,
2737                }) {
2738                    Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
2739                } else {
2740                    // If the expression is not found in the
2741                    // projection, add extra column.
2742                    let ident = Ident::new_unchecked(format!(
2743                        "a{}",
2744                        (projection.len() + extra_columns.len() + 1)
2745                    ));
2746                    extra_columns.push(SelectItem::Expr {
2747                        expr: query_expr.clone(),
2748                        alias: Some(ident.clone()),
2749                    });
2750                    Expr::Identifier(vec![ident])
2751                }
2752            }
2753        };
2754        view_order_by.push(OrderByExpr {
2755            expr: view_expr,
2756            asc: order_by_expr.asc,
2757            nulls_last: order_by_expr.nulls_last,
2758        });
2759    }
2760    (view_order_by, extra_columns)
2761}
2762
2763/// Returns extra statements to execute after `stmt` is executed.
2764fn mutate(sql: &str) -> Vec<String> {
2765    let stmts = parser::parse_statements(sql).unwrap_or_default();
2766    let mut additional = Vec::new();
2767    for stmt in stmts {
2768        match stmt.ast {
2769            AstStatement::CreateTable(stmt) => additional.push(
2770                // CREATE TABLE -> CREATE INDEX. Specify all columns manually in case CREATE
2771                // DEFAULT INDEX ever goes away.
2772                AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2773                    name: None,
2774                    in_cluster: None,
2775                    on_name: RawItemName::Name(stmt.name.clone()),
2776                    key_parts: Some(
2777                        stmt.columns
2778                            .iter()
2779                            .map(|def| Expr::Identifier(vec![def.name.clone()]))
2780                            .collect(),
2781                    ),
2782                    with_options: Vec::new(),
2783                    if_not_exists: false,
2784                })
2785                .to_ast_string_stable(),
2786            ),
2787            _ => {}
2788        }
2789    }
2790    additional
2791}
2792
2793#[mz_ore::test]
2794#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
2795fn test_generate_view_sql() {
2796    let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
2797    let cases = vec![
2798        (("SELECT * FROM t", None, None),
2799        (
2800            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
2801            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2802            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2803            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2804        )),
2805        (("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
2806        (
2807            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2808            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
2809            r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2810            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2811        )),
2812        (("SELECT a, b, c FROM t1, t2", Some(3), None),
2813        (
2814            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2815            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2816            r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2817            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2818        )),
2819        // A case with ambiguity that is accepted by the function, illustrating that
2820        // our measures to dodge this issue are imperfect.
2821        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
2822        (
2823            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
2824            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2825            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2826            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2827        )),
2828        (("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
2829        (
2830            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
2831            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
2832            r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
2833            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2834        )),
2835        (("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
2836        (
2837            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
2838            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
2839            r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2840            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2841        )),
2842        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
2843        (
2844            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
2845            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2846            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2847            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2848        )),
2849        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
2850        (
2851            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
2852            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2853            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
2854            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2855        )),
2856        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
2857        (
2858            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
2859            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2860            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
2861            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2862        )),
2863        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
2864        (
2865            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
2866            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2867            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
2868            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2869        )),
2870    ];
2871    for ((sql, num_attributes, expected_column_names), expected) in cases {
2872        let view_sql =
2873            generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
2874        assert_eq!(expected, view_sql);
2875    }
2876}
2877
2878#[mz_ore::test]
2879fn test_mutate() {
2880    let cases = vec![
2881        ("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
2882        (
2883            "CREATE TABLE t (a INT)",
2884            vec![r#"CREATE INDEX ON "t" ("a")"#],
2885        ),
2886        (
2887            "CREATE TABLE t (a INT, b TEXT)",
2888            vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
2889        ),
2890        // Invalid syntax works, just returns nothing.
2891        ("BAD SYNTAX", Vec::new()),
2892    ];
2893    for (sql, expected) in cases {
2894        let stmts = mutate(sql);
2895        assert_eq!(expected, stmts, "sql: {sql}");
2896    }
2897}