mz_sqllogictest/
runner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The Materialize-specific runner for sqllogictest.
11//!
12//! slt tests expect a serialized execution of sql statements and queries.
13//! To get the same results in materialize we track current_timestamp and increment it whenever we execute a statement.
14//!
15//! The high-level workflow is:
16//!   for each record in the test file:
17//!     if record is a sql statement:
18//!       run sql in postgres, observe changes and copy them to materialize using LocalInput::Updates(..)
19//!       advance current_timestamp
20//!       promise to never send updates for times < current_timestamp using LocalInput::Watermark(..)
21//!       compare to expected results
22//!       if wrong, bail out and stop processing this file
23//!     if record is a sql query:
24//!       peek query at current_timestamp
25//!       compare to expected results
26//!       if wrong, record the error
27
28use std::collections::BTreeMap;
29use std::error::Error;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write};
32use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::LazyLock;
36use std::time::Duration;
37use std::{env, fmt, ops, str, thread};
38
39use anyhow::{anyhow, bail};
40use bytes::BytesMut;
41use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
42use fallible_iterator::FallibleIterator;
43use futures::sink::SinkExt;
44use itertools::Itertools;
45use maplit::btreemap;
46use md5::{Digest, Md5};
47use mz_adapter_types::bootstrap_builtin_cluster_config::{
48    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
49    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
50    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
51};
52use mz_catalog::config::ClusterReplicaSizeMap;
53use mz_controller::ControllerConfig;
54use mz_environmentd::CatalogConfig;
55use mz_license_keys::ValidatedLicenseKey;
56use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
57use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
58use mz_ore::cast::{CastFrom, ReinterpretCast};
59use mz_ore::channel::trigger;
60use mz_ore::error::ErrorExt;
61use mz_ore::metrics::MetricsRegistry;
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::retry::Retry;
64use mz_ore::task;
65use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
66use mz_ore::tracing::TracingHandle;
67use mz_ore::url::SensitiveUrl;
68use mz_persist_client::PersistLocation;
69use mz_persist_client::cache::PersistClientCache;
70use mz_persist_client::cfg::PersistConfig;
71use mz_persist_client::rpc::{
72    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
73};
74use mz_pgrepr::{Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value, oid};
75use mz_repr::ColumnName;
76use mz_repr::adt::date::Date;
77use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
78use mz_repr::adt::numeric;
79use mz_secrets::SecretsController;
80use mz_server_core::listeners::{
81    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
82    ListenersConfig, SqlListenerConfig,
83};
84use mz_sql::ast::{Expr, Raw, Statement};
85use mz_sql::catalog::EnvironmentId;
86use mz_sql_parser::ast::display::AstDisplay;
87use mz_sql_parser::ast::{
88    CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
89    IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
90    SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
91    UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
92};
93use mz_sql_parser::parser;
94use mz_storage_types::connections::ConnectionContext;
95use postgres_protocol::types;
96use regex::Regex;
97use tempfile::TempDir;
98use tokio::net::TcpListener;
99use tokio::runtime::Runtime;
100use tokio::sync::oneshot;
101use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
102use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
103use tokio_stream::wrappers::TcpListenerStream;
104use tower_http::cors::AllowOrigin;
105use tracing::{error, info};
106use uuid::Uuid;
107use uuid::fmt::Simple;
108
109use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
110use crate::util;
111
112#[derive(Debug)]
113pub enum Outcome<'a> {
114    Unsupported {
115        error: anyhow::Error,
116        location: Location,
117    },
118    ParseFailure {
119        error: anyhow::Error,
120        location: Location,
121    },
122    PlanFailure {
123        error: anyhow::Error,
124        location: Location,
125    },
126    UnexpectedPlanSuccess {
127        expected_error: &'a str,
128        location: Location,
129    },
130    WrongNumberOfRowsInserted {
131        expected_count: u64,
132        actual_count: u64,
133        location: Location,
134    },
135    WrongColumnCount {
136        expected_count: usize,
137        actual_count: usize,
138        location: Location,
139    },
140    WrongColumnNames {
141        expected_column_names: &'a Vec<ColumnName>,
142        actual_column_names: Vec<ColumnName>,
143        actual_output: Output,
144        location: Location,
145    },
146    OutputFailure {
147        expected_output: &'a Output,
148        actual_raw_output: Vec<Row>,
149        actual_output: Output,
150        location: Location,
151    },
152    InconsistentViewOutcome {
153        query_outcome: Box<Outcome<'a>>,
154        view_outcome: Box<Outcome<'a>>,
155        location: Location,
156    },
157    Bail {
158        cause: Box<Outcome<'a>>,
159        location: Location,
160    },
161    Warning {
162        cause: Box<Outcome<'a>>,
163        location: Location,
164    },
165    Success,
166}
167
168const NUM_OUTCOMES: usize = 12;
169const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
170const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
171
172impl<'a> Outcome<'a> {
173    fn code(&self) -> usize {
174        match self {
175            Outcome::Unsupported { .. } => 0,
176            Outcome::ParseFailure { .. } => 1,
177            Outcome::PlanFailure { .. } => 2,
178            Outcome::UnexpectedPlanSuccess { .. } => 3,
179            Outcome::WrongNumberOfRowsInserted { .. } => 4,
180            Outcome::WrongColumnCount { .. } => 5,
181            Outcome::WrongColumnNames { .. } => 6,
182            Outcome::OutputFailure { .. } => 7,
183            Outcome::InconsistentViewOutcome { .. } => 8,
184            Outcome::Bail { .. } => 9,
185            Outcome::Warning { .. } => 10,
186            Outcome::Success => 11,
187        }
188    }
189
190    fn success(&self) -> bool {
191        matches!(self, Outcome::Success)
192    }
193
194    fn failure(&self) -> bool {
195        !matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
196    }
197
198    /// Returns an error message that will match self. Appropriate for
199    /// rewriting error messages (i.e. not inserting error messages where we
200    /// currently expect success).
201    fn err_msg(&self) -> Option<String> {
202        match self {
203            Outcome::Unsupported { error, .. }
204            | Outcome::ParseFailure { error, .. }
205            | Outcome::PlanFailure { error, .. } => Some(
206                // This value gets fed back into regex to check that it matches
207                // `self`, so escape its meta characters.
208                regex::escape(
209                    // Take only the first string in the error message, which should be
210                    // sufficient for meaningfully matching the error.
211                    error.to_string().split('\n').next().unwrap(),
212                )
213                // We need to undo the escaping of #. `regex::escape` escapes this because it
214                // expects that we use the `x` flag when building a regex, but this is not the case,
215                // so \# would end up being an invalid escape sequence, which would choke the
216                // parsing of the slt file the next time around.
217                .replace(r"\#", "#"),
218            ),
219            _ => None,
220        }
221    }
222}
223
224impl fmt::Display for Outcome<'_> {
225    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
226        use Outcome::*;
227        const INDENT: &str = "\n        ";
228        match self {
229            Unsupported { error, location } => write!(
230                f,
231                "Unsupported:{}:\n{}",
232                location,
233                error.display_with_causes()
234            ),
235            ParseFailure { error, location } => {
236                write!(
237                    f,
238                    "ParseFailure:{}:\n{}",
239                    location,
240                    error.display_with_causes()
241                )
242            }
243            PlanFailure { error, location } => write!(f, "PlanFailure:{}:\n{:#}", location, error),
244            UnexpectedPlanSuccess {
245                expected_error,
246                location,
247            } => write!(
248                f,
249                "UnexpectedPlanSuccess:{} expected error: {}",
250                location, expected_error
251            ),
252            WrongNumberOfRowsInserted {
253                expected_count,
254                actual_count,
255                location,
256            } => write!(
257                f,
258                "WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
259                location, INDENT, expected_count, INDENT, actual_count
260            ),
261            WrongColumnCount {
262                expected_count,
263                actual_count,
264                location,
265            } => write!(
266                f,
267                "WrongColumnCount:{}{}expected: {}{}actually: {}",
268                location, INDENT, expected_count, INDENT, actual_count
269            ),
270            WrongColumnNames {
271                expected_column_names,
272                actual_column_names,
273                actual_output: _,
274                location,
275            } => write!(
276                f,
277                "Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
278                location,
279                INDENT,
280                expected_column_names
281                    .iter()
282                    .map(|n| n.to_string())
283                    .collect::<Vec<_>>()
284                    .join(" "),
285                INDENT,
286                actual_column_names
287                    .iter()
288                    .map(|n| n.to_string())
289                    .collect::<Vec<_>>()
290                    .join(" ")
291            ),
292            OutputFailure {
293                expected_output,
294                actual_raw_output,
295                actual_output,
296                location,
297            } => write!(
298                f,
299                "OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
300                location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
301            ),
302            InconsistentViewOutcome {
303                query_outcome,
304                view_outcome,
305                location,
306            } => write!(
307                f,
308                "InconsistentViewOutcome:{}{}expected from query: {:?}{}actually from indexed view: {:?}{}",
309                location, INDENT, query_outcome, INDENT, view_outcome, INDENT
310            ),
311            Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
312            Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
313            Success => f.write_str("Success"),
314        }
315    }
316}
317
318#[derive(Default, Debug)]
319pub struct Outcomes {
320    stats: [usize; NUM_OUTCOMES],
321    details: Vec<String>,
322}
323
324impl ops::AddAssign<Outcomes> for Outcomes {
325    fn add_assign(&mut self, rhs: Outcomes) {
326        for (lhs, rhs) in self.stats.iter_mut().zip(rhs.stats.iter()) {
327            *lhs += rhs
328        }
329    }
330}
331impl Outcomes {
332    pub fn any_failed(&self) -> bool {
333        self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
334    }
335
336    pub fn as_json(&self) -> serde_json::Value {
337        serde_json::json!({
338            "unsupported": self.stats[0],
339            "parse_failure": self.stats[1],
340            "plan_failure": self.stats[2],
341            "unexpected_plan_success": self.stats[3],
342            "wrong_number_of_rows_affected": self.stats[4],
343            "wrong_column_count": self.stats[5],
344            "wrong_column_names": self.stats[6],
345            "output_failure": self.stats[7],
346            "inconsistent_view_outcome": self.stats[8],
347            "bail": self.stats[9],
348            "warning": self.stats[10],
349            "success": self.stats[11],
350        })
351    }
352
353    pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
354        OutcomesDisplay {
355            inner: self,
356            no_fail,
357            failure_details,
358        }
359    }
360}
361
362pub struct OutcomesDisplay<'a> {
363    inner: &'a Outcomes,
364    no_fail: bool,
365    failure_details: bool,
366}
367
368impl<'a> fmt::Display for OutcomesDisplay<'a> {
369    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
370        let total: usize = self.inner.stats.iter().sum();
371        if self.failure_details
372            && (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
373                || self.no_fail)
374        {
375            for outcome in &self.inner.details {
376                writeln!(f, "{}", outcome)?;
377            }
378            Ok(())
379        } else {
380            write!(
381                f,
382                "{}:",
383                if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
384                    "PASS"
385                } else if self.no_fail {
386                    "FAIL-IGNORE"
387                } else {
388                    "FAIL"
389                }
390            )?;
391            static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
392                vec![
393                    "unsupported",
394                    "parse-failure",
395                    "plan-failure",
396                    "unexpected-plan-success",
397                    "wrong-number-of-rows-inserted",
398                    "wrong-column-count",
399                    "wrong-column-names",
400                    "output-failure",
401                    "inconsistent-view-outcome",
402                    "bail",
403                    "warning",
404                    "success",
405                    "total",
406                ]
407            });
408            for (i, n) in self.inner.stats.iter().enumerate() {
409                if *n > 0 {
410                    write!(f, " {}={}", NAMES[i], n)?;
411                }
412            }
413            write!(f, " total={}", total)
414        }
415    }
416}
417
418struct QueryInfo {
419    is_select: bool,
420    num_attributes: Option<usize>,
421}
422
423enum PrepareQueryOutcome<'a> {
424    QueryPrepared(QueryInfo),
425    Outcome(Outcome<'a>),
426}
427
428pub struct Runner<'a> {
429    config: &'a RunConfig<'a>,
430    inner: Option<RunnerInner<'a>>,
431}
432
433pub struct RunnerInner<'a> {
434    server_addr: SocketAddr,
435    internal_server_addr: SocketAddr,
436    internal_http_server_addr: SocketAddr,
437    // Drop order matters for these fields.
438    client: tokio_postgres::Client,
439    system_client: tokio_postgres::Client,
440    clients: BTreeMap<String, tokio_postgres::Client>,
441    auto_index_tables: bool,
442    auto_index_selects: bool,
443    auto_transactions: bool,
444    enable_table_keys: bool,
445    verbosity: u8,
446    stdout: &'a dyn WriteFmt,
447    _shutdown_trigger: trigger::Trigger,
448    _server_thread: JoinOnDropHandle<()>,
449    _temp_dir: TempDir,
450}
451
452#[derive(Debug)]
453pub struct Slt(Value);
454
455impl<'a> FromSql<'a> for Slt {
456    fn from_sql(
457        ty: &PgType,
458        mut raw: &'a [u8],
459    ) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
460        Ok(match *ty {
461            PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
462                types::bytea_from_sql(raw),
463            )?)),
464            PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
465            PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
466            PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
467                types::char_from_sql(raw)?.to_be_bytes(),
468            ))),
469            PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
470            PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
471            PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
472                raw,
473            )?)?)),
474            PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
475            PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
476            PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
477            PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
478            PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
479            PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
480            PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
481            PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
482            PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
483            PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
484            PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
485            PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
486                Self(Value::Text(types::text_from_sql(raw)?.to_string()))
487            }
488            PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
489            PgType::TIMESTAMP => Self(Value::Timestamp(
490                NaiveDateTime::from_sql(ty, raw)?.try_into()?,
491            )),
492            PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
493                DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
494            )),
495            PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
496            PgType::RECORD => {
497                let num_fields = read_be_i32(&mut raw)?;
498                let mut tuple = vec![];
499                for _ in 0..num_fields {
500                    let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
501                    let typ = match PgType::from_oid(oid) {
502                        Some(typ) => typ,
503                        None => return Err("unknown oid".into()),
504                    };
505                    let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
506                    tuple.push(v.map(|v| v.0));
507                }
508                Self(Value::Record(tuple))
509            }
510            PgType::INT4_RANGE
511            | PgType::INT8_RANGE
512            | PgType::DATE_RANGE
513            | PgType::NUM_RANGE
514            | PgType::TS_RANGE
515            | PgType::TSTZ_RANGE => {
516                use mz_repr::adt::range::Range;
517                let range: Range<Slt> = Range::from_sql(ty, raw)?;
518                Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
519            }
520
521            _ => match ty.kind() {
522                PgKind::Array(arr_type) => {
523                    let arr = types::array_from_sql(raw)?;
524                    let elements: Vec<Option<Value>> = arr
525                        .values()
526                        .map(|v| match v {
527                            Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
528                            None => Ok(None),
529                        })
530                        .collect::<Vec<Option<Slt>>>()?
531                        .into_iter()
532                        // Map a Vec<Option<Slt>> to Vec<Option<Value>>.
533                        .map(|v| v.map(|v| v.0))
534                        .collect();
535
536                    Self(Value::Array {
537                        dims: arr
538                            .dimensions()
539                            .map(|d| {
540                                Ok(mz_repr::adt::array::ArrayDimension {
541                                    lower_bound: isize::cast_from(d.lower_bound),
542                                    length: usize::try_from(d.len)
543                                        .expect("cannot have negative length"),
544                                })
545                            })
546                            .collect()?,
547                        elements,
548                    })
549                }
550                _ => match ty.oid() {
551                    oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
552                    oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
553                    oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
554                    oid::TYPE_MZ_TIMESTAMP_OID => {
555                        let s = types::text_from_sql(raw)?;
556                        let t: mz_repr::Timestamp = s.parse()?;
557                        Self(Value::MzTimestamp(t))
558                    }
559                    oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
560                        types::bytea_from_sql(raw),
561                    )?)),
562                    _ => unreachable!(),
563                },
564            },
565        })
566    }
567    fn accepts(ty: &PgType) -> bool {
568        match ty.kind() {
569            PgKind::Array(_) | PgKind::Composite(_) => return true,
570            _ => {}
571        }
572        match ty.oid() {
573            oid::TYPE_UINT2_OID
574            | oid::TYPE_UINT4_OID
575            | oid::TYPE_UINT8_OID
576            | oid::TYPE_MZ_TIMESTAMP_OID
577            | oid::TYPE_MZ_ACL_ITEM_OID => return true,
578            _ => {}
579        }
580        matches!(
581            *ty,
582            PgType::ACLITEM
583                | PgType::BOOL
584                | PgType::BYTEA
585                | PgType::CHAR
586                | PgType::DATE
587                | PgType::FLOAT4
588                | PgType::FLOAT8
589                | PgType::INT2
590                | PgType::INT4
591                | PgType::INT8
592                | PgType::INTERVAL
593                | PgType::JSONB
594                | PgType::NAME
595                | PgType::NUMERIC
596                | PgType::OID
597                | PgType::REGCLASS
598                | PgType::REGPROC
599                | PgType::REGTYPE
600                | PgType::RECORD
601                | PgType::TEXT
602                | PgType::BPCHAR
603                | PgType::VARCHAR
604                | PgType::TIME
605                | PgType::TIMESTAMP
606                | PgType::TIMESTAMPTZ
607                | PgType::UUID
608                | PgType::INT4_RANGE
609                | PgType::INT4_RANGE_ARRAY
610                | PgType::INT8_RANGE
611                | PgType::INT8_RANGE_ARRAY
612                | PgType::DATE_RANGE
613                | PgType::DATE_RANGE_ARRAY
614                | PgType::NUM_RANGE
615                | PgType::NUM_RANGE_ARRAY
616                | PgType::TS_RANGE
617                | PgType::TS_RANGE_ARRAY
618                | PgType::TSTZ_RANGE
619                | PgType::TSTZ_RANGE_ARRAY
620        )
621    }
622}
623
624// From postgres-types/src/private.rs.
625fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
626    if buf.len() < 4 {
627        return Err("invalid buffer size".into());
628    }
629    let mut bytes = [0; 4];
630    bytes.copy_from_slice(&buf[..4]);
631    *buf = &buf[4..];
632    Ok(i32::from_be_bytes(bytes))
633}
634
635// From postgres-types/src/private.rs.
636fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
637where
638    T: FromSql<'a>,
639{
640    let value = match usize::try_from(read_be_i32(buf)?) {
641        Err(_) => None,
642        Ok(len) => {
643            if len > buf.len() {
644                return Err("invalid buffer size".into());
645            }
646            let (head, tail) = buf.split_at(len);
647            *buf = tail;
648            Some(head)
649        }
650    };
651    T::from_sql_nullable(type_, value)
652}
653
654fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
655    match (typ, d.0) {
656        (Type::Bool, Value::Bool(b)) => b.to_string(),
657
658        (Type::Integer, Value::Int2(i)) => i.to_string(),
659        (Type::Integer, Value::Int4(i)) => i.to_string(),
660        (Type::Integer, Value::Int8(i)) => i.to_string(),
661        (Type::Integer, Value::UInt2(u)) => u.0.to_string(),
662        (Type::Integer, Value::UInt4(u)) => u.0.to_string(),
663        (Type::Integer, Value::UInt8(u)) => u.0.to_string(),
664        (Type::Integer, Value::Oid(i)) => i.to_string(),
665        // TODO(benesch): rewrite to avoid `as`.
666        #[allow(clippy::as_conversions)]
667        (Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
668        // TODO(benesch): rewrite to avoid `as`.
669        #[allow(clippy::as_conversions)]
670        (Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
671        // This is so wrong, but sqlite needs it.
672        (Type::Integer, Value::Text(_)) => "0".to_string(),
673        (Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
674        (Type::Integer, Value::Numeric(d)) => {
675            let mut d = d.0.0.clone();
676            let mut cx = numeric::cx_datum();
677            // Truncate the decimal to match sqlite.
678            if mode == Mode::Standard {
679                cx.set_rounding(dec::Rounding::Down);
680            }
681            cx.round(&mut d);
682            numeric::munge_numeric(&mut d).unwrap();
683            d.to_standard_notation_string()
684        }
685
686        (Type::Real, Value::Int2(i)) => format!("{:.3}", i),
687        (Type::Real, Value::Int4(i)) => format!("{:.3}", i),
688        (Type::Real, Value::Int8(i)) => format!("{:.3}", i),
689        (Type::Real, Value::Float4(f)) => match mode {
690            Mode::Standard => format!("{:.3}", f),
691            Mode::Cockroach => format!("{}", f),
692        },
693        (Type::Real, Value::Float8(f)) => match mode {
694            Mode::Standard => format!("{:.3}", f),
695            Mode::Cockroach => format!("{}", f),
696        },
697        (Type::Real, Value::Numeric(d)) => match mode {
698            Mode::Standard => {
699                let mut d = d.0.0.clone();
700                if d.exponent() < -3 {
701                    numeric::rescale(&mut d, 3).unwrap();
702                }
703                numeric::munge_numeric(&mut d).unwrap();
704                d.to_standard_notation_string()
705            }
706            Mode::Cockroach => d.0.0.to_standard_notation_string(),
707        },
708
709        (Type::Text, Value::Text(s)) => {
710            if s.is_empty() {
711                "(empty)".to_string()
712            } else {
713                s
714            }
715        }
716        (Type::Text, Value::Bool(b)) => b.to_string(),
717        (Type::Text, Value::Float4(f)) => format!("{:.3}", f),
718        (Type::Text, Value::Float8(f)) => format!("{:.3}", f),
719        // Bytes are printed as text iff they are valid UTF-8. This
720        // seems guaranteed to confuse everyone, but it is required for
721        // compliance with the CockroachDB sqllogictest runner. [0]
722        //
723        // [0]: https://github.com/cockroachdb/cockroach/blob/970782487/pkg/sql/logictest/logic.go#L2038-L2043
724        (Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
725            Ok(s) => s.to_string(),
726            Err(_) => format!("{:?}", b),
727        },
728        (Type::Text, Value::Numeric(d)) => d.0.0.to_standard_notation_string(),
729        // Everything else gets normal text encoding. This correctly handles things
730        // like arrays, tuples, and strings that need to be quoted.
731        (Type::Text, d) => {
732            let mut buf = BytesMut::new();
733            d.encode_text(&mut buf);
734            String::from_utf8_lossy(&buf).into_owned()
735        }
736
737        (Type::Oid, Value::Oid(o)) => o.to_string(),
738
739        (_, d) => panic!(
740            "Don't know how to format {:?} as {:?} in column {}",
741            d, typ, col,
742        ),
743    }
744}
745
746fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
747    let mut formatted: Vec<String> = vec![];
748    for i in 0..row.len() {
749        let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
750        let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
751        formatted.push(match t {
752            Some(t) => t,
753            None => "NULL".into(),
754        });
755    }
756
757    formatted
758}
759
760impl<'a> Runner<'a> {
761    pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
762        let mut runner = Self {
763            config,
764            inner: None,
765        };
766        runner.reset().await?;
767        Ok(runner)
768    }
769
770    pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
771        // Explicitly drop the old runner here to ensure that we wait for threads to terminate
772        // before starting a new runner
773        drop(self.inner.take());
774        self.inner = Some(RunnerInner::start(self.config).await?);
775
776        Ok(())
777    }
778
779    async fn run_record<'r>(
780        &mut self,
781        record: &'r Record<'r>,
782        in_transaction: &mut bool,
783    ) -> Result<Outcome<'r>, anyhow::Error> {
784        if let Record::ResetServer = record {
785            self.reset().await?;
786            Ok(Outcome::Success)
787        } else {
788            self.inner
789                .as_mut()
790                .expect("RunnerInner missing")
791                .run_record(record, in_transaction)
792                .await
793        }
794    }
795
796    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
797        self.inner
798            .as_ref()
799            .expect("RunnerInner missing")
800            .check_catalog()
801            .await
802    }
803
804    async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
805        let inner = self.inner.as_mut().expect("RunnerInner missing");
806
807        inner.client.batch_execute("ROLLBACK;").await?;
808
809        inner
810            .system_client
811            .batch_execute(
812                "ROLLBACK;
813                 SET cluster = mz_catalog_server;
814                 RESET cluster_replica;",
815            )
816            .await?;
817
818        inner
819            .system_client
820            .batch_execute("ALTER SYSTEM RESET ALL")
821            .await?;
822
823        // Drop all databases, then recreate the `materialize` database.
824        for row in inner
825            .system_client
826            .query("SELECT name FROM mz_databases", &[])
827            .await?
828        {
829            let name: &str = row.get("name");
830            inner
831                .system_client
832                .batch_execute(&format!("DROP DATABASE \"{name}\""))
833                .await?;
834        }
835        inner
836            .system_client
837            .batch_execute("CREATE DATABASE materialize")
838            .await?;
839
840        // Ensure quickstart cluster exists with one replica of size `self.config.replica_size`.
841        // We don't destroy the existing quickstart cluster replica if it exists, as turning
842        // on a cluster replica is exceptionally slow.
843        let mut needs_default_cluster = true;
844        for row in inner
845            .system_client
846            .query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
847            .await?
848        {
849            match row.get("name") {
850                "quickstart" => needs_default_cluster = false,
851                name => {
852                    inner
853                        .system_client
854                        .batch_execute(&format!("DROP CLUSTER {name}"))
855                        .await?
856                }
857            }
858        }
859        if needs_default_cluster {
860            inner
861                .system_client
862                .batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
863                .await?;
864        }
865        let mut needs_default_replica = false;
866        let rows = inner
867            .system_client
868            .query(
869                "SELECT name, size FROM mz_cluster_replicas
870                 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')
871                 ORDER BY name",
872                &[],
873            )
874            .await?;
875        if rows.len() != self.config.replicas {
876            needs_default_replica = true;
877        } else {
878            for (i, row) in rows.iter().enumerate() {
879                let name: &str = row.get("name");
880                let size: &str = row.get("size");
881                if name != format!("r{i}") || size != self.config.replica_size.to_string() {
882                    needs_default_replica = true;
883                    break;
884                }
885            }
886        }
887
888        if needs_default_replica {
889            inner
890                .system_client
891                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = false)")
892                .await?;
893            for row in inner
894                .system_client
895                .query(
896                    "SELECT name FROM mz_cluster_replicas
897                     WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
898                    &[],
899                )
900                .await?
901            {
902                let name: &str = row.get("name");
903                inner
904                    .system_client
905                    .batch_execute(&format!("DROP CLUSTER REPLICA quickstart.{}", name))
906                    .await?;
907            }
908            for i in 1..=self.config.replicas {
909                inner
910                    .system_client
911                    .batch_execute(&format!(
912                        "CREATE CLUSTER REPLICA quickstart.r{i} SIZE '{}'",
913                        self.config.replica_size
914                    ))
915                    .await?;
916            }
917            inner
918                .system_client
919                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = true)")
920                .await?;
921        }
922
923        // Grant initial privileges.
924        inner
925            .system_client
926            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
927            .await?;
928        inner
929            .system_client
930            .batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
931            .await?;
932        inner
933            .system_client
934            .batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
935            .await?;
936        inner
937            .system_client
938            .batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
939            .await?;
940        inner
941            .system_client
942            .batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
943            .await?;
944
945        // Some sqllogictests require more than the default amount of tables, so we increase the
946        // limit for all tests.
947        inner
948            .system_client
949            .simple_query("ALTER SYSTEM SET max_tables = 100")
950            .await?;
951
952        if inner.enable_table_keys {
953            inner
954                .system_client
955                .simple_query("ALTER SYSTEM SET unsafe_enable_table_keys = true")
956                .await?;
957        }
958
959        inner.ensure_fixed_features().await?;
960
961        inner.client = connect(inner.server_addr, None).await;
962        inner.system_client = connect(inner.internal_server_addr, Some("mz_system")).await;
963        inner.clients = BTreeMap::new();
964
965        Ok(())
966    }
967}
968
969impl<'a> RunnerInner<'a> {
970    pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
971        let temp_dir = tempfile::tempdir()?;
972        let scratch_dir = tempfile::tempdir()?;
973        let environment_id = EnvironmentId::for_tests();
974        let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
975            let postgres_url = &config.postgres_url;
976            let prefix = &config.prefix;
977            info!(%postgres_url, "starting server");
978            let (client, conn) = Retry::default()
979                .max_tries(5)
980                .retry_async(|_| async {
981                    match tokio_postgres::connect(postgres_url, NoTls).await {
982                        Ok(c) => Ok(c),
983                        Err(e) => {
984                            error!(%e, "failed to connect to postgres");
985                            Err(e)
986                        }
987                    }
988                })
989                .await?;
990            task::spawn(|| "sqllogictest_connect", async move {
991                if let Err(e) = conn.await {
992                    panic!("connection error: {}", e);
993                }
994            });
995            client
996                .batch_execute(&format!(
997                    "DROP SCHEMA IF EXISTS {prefix}_tsoracle CASCADE;
998                     CREATE SCHEMA IF NOT EXISTS {prefix}_consensus;
999                     CREATE SCHEMA {prefix}_tsoracle;"
1000                ))
1001                .await?;
1002            (
1003                format!("{postgres_url}?options=--search_path={prefix}_consensus")
1004                    .parse()
1005                    .expect("invalid consensus URI"),
1006                format!("{postgres_url}?options=--search_path={prefix}_tsoracle")
1007                    .parse()
1008                    .expect("invalid timestamp oracle URI"),
1009            )
1010        };
1011
1012        let secrets_dir = temp_dir.path().join("secrets");
1013        let orchestrator = Arc::new(
1014            ProcessOrchestrator::new(ProcessOrchestratorConfig {
1015                image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
1016                suppress_output: false,
1017                environment_id: environment_id.to_string(),
1018                secrets_dir: secrets_dir.clone(),
1019                command_wrapper: config
1020                    .orchestrator_process_wrapper
1021                    .as_ref()
1022                    .map_or(Ok(vec![]), |s| shell_words::split(s))?,
1023                propagate_crashes: true,
1024                tcp_proxy: None,
1025                scratch_directory: scratch_dir.path().to_path_buf(),
1026            })
1027            .await?,
1028        );
1029        let now = SYSTEM_TIME.clone();
1030        let metrics_registry = MetricsRegistry::new();
1031
1032        let persist_config = PersistConfig::new(
1033            &mz_environmentd::BUILD_INFO,
1034            now.clone(),
1035            mz_dyncfgs::all_dyncfgs(),
1036        );
1037        let persist_pubsub_server =
1038            PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
1039        let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
1040        let persist_pubsub_tcp_listener =
1041            TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
1042                .await
1043                .expect("pubsub addr binding");
1044        let persist_pubsub_server_port = persist_pubsub_tcp_listener
1045            .local_addr()
1046            .expect("pubsub addr has local addr")
1047            .port();
1048        info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
1049        mz_ore::task::spawn(|| "persist_pubsub_server", async move {
1050            persist_pubsub_server
1051                .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
1052                .await
1053                .expect("success")
1054        });
1055        let persist_clients =
1056            PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1057                let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1058                    cfg,
1059                    persist_pubsub_client.sender,
1060                    metrics,
1061                ));
1062                PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1063            });
1064        let persist_clients = Arc::new(persist_clients);
1065
1066        let secrets_controller = Arc::clone(&orchestrator);
1067        let connection_context = ConnectionContext::for_tests(orchestrator.reader());
1068        let orchestrator = Arc::new(TracingOrchestrator::new(
1069            orchestrator,
1070            config.tracing.clone(),
1071        ));
1072        let listeners_config = ListenersConfig {
1073            sql: btreemap! {
1074                "external".to_owned() => SqlListenerConfig {
1075                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1076                    authenticator_kind: AuthenticatorKind::None,
1077                    allowed_roles: AllowedRoles::Normal,
1078                    enable_tls: false,
1079                },
1080                "internal".to_owned() => SqlListenerConfig {
1081                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1082                    authenticator_kind: AuthenticatorKind::None,
1083                    allowed_roles: AllowedRoles::Internal,
1084                    enable_tls: false,
1085                },
1086            },
1087            http: btreemap![
1088                "external".to_owned() => HttpListenerConfig {
1089                    base: BaseListenerConfig {
1090                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1091                        authenticator_kind: AuthenticatorKind::None,
1092                        allowed_roles: AllowedRoles::Normal,
1093                        enable_tls: false
1094                    },
1095                    routes: HttpRoutesEnabled {
1096                        base: true,
1097                        webhook: true,
1098                        internal: false,
1099                        metrics: false,
1100                        profiling: false,
1101                    },
1102                },
1103                "internal".to_owned() => HttpListenerConfig {
1104                    base: BaseListenerConfig {
1105                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1106                        authenticator_kind: AuthenticatorKind::None,
1107                        allowed_roles: AllowedRoles::NormalAndInternal,
1108                        enable_tls: false
1109                    },
1110                    routes: HttpRoutesEnabled {
1111                        base: true,
1112                        webhook: true,
1113                        internal: true,
1114                        metrics: true,
1115                        profiling: true,
1116                    },
1117                },
1118            ],
1119        };
1120        let listeners = mz_environmentd::Listeners::bind(listeners_config).await?;
1121        let host_name = format!(
1122            "localhost:{}",
1123            listeners.http["external"].handle.local_addr.port()
1124        );
1125        let catalog_config = CatalogConfig {
1126            persist_clients: Arc::clone(&persist_clients),
1127            metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
1128        };
1129        let server_config = mz_environmentd::Config {
1130            catalog_config,
1131            timestamp_oracle_url: Some(timestamp_oracle_url),
1132            controller: ControllerConfig {
1133                build_info: &mz_environmentd::BUILD_INFO,
1134                orchestrator,
1135                clusterd_image: "clusterd".into(),
1136                init_container_image: None,
1137                deploy_generation: 0,
1138                persist_location: PersistLocation {
1139                    blob_uri: format!(
1140                        "file://{}/persist/blob",
1141                        config.persist_dir.path().display()
1142                    )
1143                    .parse()
1144                    .expect("invalid blob URI"),
1145                    consensus_uri,
1146                },
1147                persist_clients,
1148                now: SYSTEM_TIME.clone(),
1149                metrics_registry: metrics_registry.clone(),
1150                persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
1151                secrets_args: mz_service::secrets::SecretsReaderCliArgs {
1152                    secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
1153                    secrets_reader_local_file_dir: Some(secrets_dir),
1154                    secrets_reader_kubernetes_context: None,
1155                    secrets_reader_aws_prefix: None,
1156                    secrets_reader_name_prefix: None,
1157                },
1158                connection_context,
1159            },
1160            secrets_controller,
1161            cloud_resource_controller: None,
1162            tls: None,
1163            frontegg: None,
1164            cors_allowed_origin: AllowOrigin::list([]),
1165            unsafe_mode: true,
1166            all_features: false,
1167            metrics_registry,
1168            now,
1169            environment_id,
1170            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
1171            bootstrap_default_cluster_replica_size: config.replica_size.to_string(),
1172            bootstrap_default_cluster_replication_factor: config
1173                .replicas
1174                .try_into()
1175                .expect("replicas must fit"),
1176            bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1177                replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1178                size: config.replica_size.to_string(),
1179            },
1180            bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1181                replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1182                size: config.replica_size.to_string(),
1183            },
1184            bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1185                replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1186                size: config.replica_size.to_string(),
1187            },
1188            bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1189                replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1190                size: config.replica_size.to_string(),
1191            },
1192            bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1193                replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1194                size: config.replica_size.to_string(),
1195            },
1196            system_parameter_defaults: {
1197                let mut params = BTreeMap::new();
1198                params.insert(
1199                    "log_filter".to_string(),
1200                    config.tracing.startup_log_filter.to_string(),
1201                );
1202                params.extend(config.system_parameter_defaults.clone());
1203                params
1204            },
1205            availability_zones: Default::default(),
1206            tracing_handle: config.tracing_handle.clone(),
1207            storage_usage_collection_interval: Duration::from_secs(3600),
1208            storage_usage_retention_period: None,
1209            segment_api_key: None,
1210            segment_client_side: false,
1211            // SLT doesn't like eternally running tasks since it waits for them to finish inbetween SLT files
1212            test_only_dummy_segment_client: false,
1213            egress_addresses: vec![],
1214            aws_account_id: None,
1215            aws_privatelink_availability_zones: None,
1216            launchdarkly_sdk_key: None,
1217            launchdarkly_key_map: Default::default(),
1218            config_sync_file_path: None,
1219            config_sync_timeout: Duration::from_secs(30),
1220            config_sync_loop_interval: None,
1221            bootstrap_role: Some("materialize".into()),
1222            http_host_name: Some(host_name),
1223            internal_console_redirect_url: None,
1224            tls_reload_certs: mz_server_core::cert_reload_never_reload(),
1225            helm_chart_version: None,
1226            license_key: ValidatedLicenseKey::for_tests(),
1227            external_login_password_mz_system: None,
1228        };
1229        // We need to run the server on its own Tokio runtime, which in turn
1230        // requires its own thread, so that we can wait for any tasks spawned
1231        // by the server to be shutdown at the end of each file. If we were to
1232        // share a Tokio runtime, tasks from the last file's server would still
1233        // be live at the start of the next file's server.
1234        let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
1235            oneshot::channel();
1236        let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
1237        let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
1238        let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
1239        let server_thread = thread::spawn(|| {
1240            let runtime = match Runtime::new() {
1241                Ok(runtime) => runtime,
1242                Err(e) => {
1243                    server_addr_tx
1244                        .send(Err(e.into()))
1245                        .expect("receiver should not drop first");
1246                    return;
1247                }
1248            };
1249            let server = match runtime.block_on(listeners.serve(server_config)) {
1250                Ok(runtime) => runtime,
1251                Err(e) => {
1252                    server_addr_tx
1253                        .send(Err(e.into()))
1254                        .expect("receiver should not drop first");
1255                    return;
1256                }
1257            };
1258            server_addr_tx
1259                .send(Ok(server.sql_listener_handles["external"].local_addr))
1260                .expect("receiver should not drop first");
1261            internal_server_addr_tx
1262                .send(server.sql_listener_handles["internal"].local_addr)
1263                .expect("receiver should not drop first");
1264            internal_http_server_addr_tx
1265                .send(server.http_listener_handles["internal"].local_addr)
1266                .expect("receiver should not drop first");
1267            let _ = runtime.block_on(shutdown_trigger_rx);
1268        });
1269        let server_addr = server_addr_rx.await??;
1270        let internal_server_addr = internal_server_addr_rx.await?;
1271        let internal_http_server_addr = internal_http_server_addr_rx.await?;
1272
1273        let system_client = connect(internal_server_addr, Some("mz_system")).await;
1274        let client = connect(server_addr, None).await;
1275
1276        let inner = RunnerInner {
1277            server_addr,
1278            internal_server_addr,
1279            internal_http_server_addr,
1280            _shutdown_trigger: shutdown_trigger,
1281            _server_thread: server_thread.join_on_drop(),
1282            _temp_dir: temp_dir,
1283            client,
1284            system_client,
1285            clients: BTreeMap::new(),
1286            auto_index_tables: config.auto_index_tables,
1287            auto_index_selects: config.auto_index_selects,
1288            auto_transactions: config.auto_transactions,
1289            enable_table_keys: config.enable_table_keys,
1290            verbosity: config.verbosity,
1291            stdout: config.stdout,
1292        };
1293        inner.ensure_fixed_features().await?;
1294
1295        Ok(inner)
1296    }
1297
1298    /// Set features that should be enabled regardless of whether reset-server was
1299    /// called. These features may be set conditionally depending on the run configuration.
1300    async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
1301        // We turn on enable_reduce_mfp_fusion, as we wish
1302        // to get as much coverage of these features as we can.
1303        // TODO(vmarcos): Remove this code when we retire this feature flag.
1304        self.system_client
1305            .execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
1306            .await?;
1307
1308        // Dangerous functions are useful for tests so we enable it for all tests.
1309        self.system_client
1310            .execute("ALTER SYSTEM SET unsafe_enable_unsafe_functions = on", &[])
1311            .await?;
1312        Ok(())
1313    }
1314
1315    async fn run_record<'r>(
1316        &mut self,
1317        record: &'r Record<'r>,
1318        in_transaction: &mut bool,
1319    ) -> Result<Outcome<'r>, anyhow::Error> {
1320        match &record {
1321            Record::Statement {
1322                expected_error,
1323                rows_affected,
1324                sql,
1325                location,
1326            } => {
1327                if self.auto_transactions && *in_transaction {
1328                    self.client.execute("COMMIT", &[]).await?;
1329                    *in_transaction = false;
1330                }
1331                match self
1332                    .run_statement(*expected_error, *rows_affected, sql, location.clone())
1333                    .await?
1334                {
1335                    Outcome::Success => {
1336                        if self.auto_index_tables {
1337                            let additional = mutate(sql);
1338                            for stmt in additional {
1339                                self.client.execute(&stmt, &[]).await?;
1340                            }
1341                        }
1342                        Ok(Outcome::Success)
1343                    }
1344                    other => {
1345                        if expected_error.is_some() {
1346                            Ok(other)
1347                        } else {
1348                            // If we failed to execute a statement that was supposed to succeed,
1349                            // running the rest of the tests in this file will probably cause
1350                            // false positives, so just give up on the file entirely.
1351                            Ok(Outcome::Bail {
1352                                cause: Box::new(other),
1353                                location: location.clone(),
1354                            })
1355                        }
1356                    }
1357                }
1358            }
1359            Record::Query {
1360                sql,
1361                output,
1362                location,
1363            } => {
1364                self.run_query(sql, output, location.clone(), in_transaction)
1365                    .await
1366            }
1367            Record::Simple {
1368                conn,
1369                user,
1370                sql,
1371                sort,
1372                output,
1373                location,
1374                ..
1375            } => {
1376                self.run_simple(*conn, *user, sql, sort.clone(), output, location.clone())
1377                    .await
1378            }
1379            Record::Copy {
1380                table_name,
1381                tsv_path,
1382            } => {
1383                let tsv = tokio::fs::read(tsv_path).await?;
1384                let copy = self
1385                    .client
1386                    .copy_in(&*format!("COPY {} FROM STDIN", table_name))
1387                    .await?;
1388                tokio::pin!(copy);
1389                copy.send(bytes::Bytes::from(tsv)).await?;
1390                copy.finish().await?;
1391                Ok(Outcome::Success)
1392            }
1393            _ => Ok(Outcome::Success),
1394        }
1395    }
1396
1397    async fn run_statement<'r>(
1398        &self,
1399        expected_error: Option<&'r str>,
1400        expected_rows_affected: Option<u64>,
1401        sql: &'r str,
1402        location: Location,
1403    ) -> Result<Outcome<'r>, anyhow::Error> {
1404        static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
1405            LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
1406        if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
1407            // sure, we totally made you an index
1408            return Ok(Outcome::Success);
1409        }
1410
1411        match self.client.execute(sql, &[]).await {
1412            Ok(actual) => {
1413                if let Some(expected_error) = expected_error {
1414                    return Ok(Outcome::UnexpectedPlanSuccess {
1415                        expected_error,
1416                        location,
1417                    });
1418                }
1419                match expected_rows_affected {
1420                    None => Ok(Outcome::Success),
1421                    Some(expected) => {
1422                        if expected != actual {
1423                            Ok(Outcome::WrongNumberOfRowsInserted {
1424                                expected_count: expected,
1425                                actual_count: actual,
1426                                location,
1427                            })
1428                        } else {
1429                            Ok(Outcome::Success)
1430                        }
1431                    }
1432                }
1433            }
1434            Err(error) => {
1435                if let Some(expected_error) = expected_error {
1436                    if Regex::new(expected_error)?.is_match(&format!("{:#}", error)) {
1437                        return Ok(Outcome::Success);
1438                    }
1439                }
1440                Ok(Outcome::PlanFailure {
1441                    error: anyhow!(error),
1442                    location,
1443                })
1444            }
1445        }
1446    }
1447
1448    async fn prepare_query<'r>(
1449        &self,
1450        sql: &str,
1451        output: &'r Result<QueryOutput<'_>, &'r str>,
1452        location: Location,
1453        in_transaction: &mut bool,
1454    ) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
1455        // get statement
1456        let statements = match mz_sql::parse::parse(sql) {
1457            Ok(statements) => statements,
1458            Err(e) => match output {
1459                Ok(_) => {
1460                    return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1461                        error: e.into(),
1462                        location,
1463                    }));
1464                }
1465                Err(expected_error) => {
1466                    if Regex::new(expected_error)?.is_match(&format!("{:#}", e)) {
1467                        return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
1468                    } else {
1469                        return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1470                            error: e.into(),
1471                            location,
1472                        }));
1473                    }
1474                }
1475            },
1476        };
1477        let statement = match &*statements {
1478            [] => bail!("Got zero statements?"),
1479            [statement] => &statement.ast,
1480            _ => bail!("Got multiple statements: {:?}", statements),
1481        };
1482        let (is_select, num_attributes) = match statement {
1483            Statement::Select(stmt) => (true, derive_num_attributes(&stmt.query.body)),
1484            _ => (false, None),
1485        };
1486
1487        match output {
1488            Ok(_) => {
1489                if self.auto_transactions && !*in_transaction {
1490                    // No ISOLATION LEVEL SERIALIZABLE because of database-issues#5323
1491                    self.client.execute("BEGIN", &[]).await?;
1492                    *in_transaction = true;
1493                }
1494            }
1495            Err(_) => {
1496                if self.auto_transactions && *in_transaction {
1497                    self.client.execute("COMMIT", &[]).await?;
1498                    *in_transaction = false;
1499                }
1500            }
1501        }
1502
1503        // `SHOW` commands reference catalog schema, thus are not in the same timedomain and not
1504        // allowed in the same transaction, see:
1505        // https://materialize.com/docs/sql/begin/#same-timedomain-error
1506        match statement {
1507            Statement::Show(..) => {
1508                if self.auto_transactions && *in_transaction {
1509                    self.client.execute("COMMIT", &[]).await?;
1510                    *in_transaction = false;
1511                }
1512            }
1513            _ => (),
1514        }
1515        Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
1516            is_select,
1517            num_attributes,
1518        }))
1519    }
1520
1521    async fn execute_query<'r>(
1522        &self,
1523        sql: &str,
1524        output: &'r Result<QueryOutput<'_>, &'r str>,
1525        location: Location,
1526    ) -> Result<Outcome<'r>, anyhow::Error> {
1527        let rows = match self.client.query(sql, &[]).await {
1528            Ok(rows) => rows,
1529            Err(error) => {
1530                return match output {
1531                    Ok(_) => {
1532                        let error_string = format!("{}", error);
1533                        if error_string.contains("supported") || error_string.contains("overload") {
1534                            // this is a failure, but it's caused by lack of support rather than by bugs
1535                            Ok(Outcome::Unsupported {
1536                                error: anyhow!(error),
1537                                location,
1538                            })
1539                        } else {
1540                            Ok(Outcome::PlanFailure {
1541                                error: anyhow!(error),
1542                                location,
1543                            })
1544                        }
1545                    }
1546                    Err(expected_error) => {
1547                        if Regex::new(expected_error)?.is_match(&format!("{:#}", error)) {
1548                            Ok(Outcome::Success)
1549                        } else {
1550                            Ok(Outcome::PlanFailure {
1551                                error: anyhow!(error),
1552                                location,
1553                            })
1554                        }
1555                    }
1556                };
1557            }
1558        };
1559
1560        // unpack expected output
1561        let QueryOutput {
1562            sort,
1563            types: expected_types,
1564            column_names: expected_column_names,
1565            output: expected_output,
1566            mode,
1567            ..
1568        } = match output {
1569            Err(expected_error) => {
1570                return Ok(Outcome::UnexpectedPlanSuccess {
1571                    expected_error,
1572                    location,
1573                });
1574            }
1575            Ok(query_output) => query_output,
1576        };
1577
1578        // format output
1579        let mut formatted_rows = vec![];
1580        for row in &rows {
1581            if row.len() != expected_types.len() {
1582                return Ok(Outcome::WrongColumnCount {
1583                    expected_count: expected_types.len(),
1584                    actual_count: row.len(),
1585                    location,
1586                });
1587            }
1588            let row = format_row(row, expected_types, *mode);
1589            formatted_rows.push(row);
1590        }
1591
1592        // sort formatted output
1593        if let Sort::Row = sort {
1594            formatted_rows.sort();
1595        }
1596        let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
1597        if let Sort::Value = sort {
1598            values.sort();
1599        }
1600
1601        // Various checks as long as there are returned rows.
1602        if let Some(row) = rows.get(0) {
1603            // check column names
1604            if let Some(expected_column_names) = expected_column_names {
1605                let actual_column_names = row
1606                    .columns()
1607                    .iter()
1608                    .map(|t| ColumnName::from(t.name()))
1609                    .collect::<Vec<_>>();
1610                if expected_column_names != &actual_column_names {
1611                    return Ok(Outcome::WrongColumnNames {
1612                        expected_column_names,
1613                        actual_column_names,
1614                        actual_output: Output::Values(values),
1615                        location,
1616                    });
1617                }
1618            }
1619        }
1620
1621        // check output
1622        match expected_output {
1623            Output::Values(expected_values) => {
1624                if values != *expected_values {
1625                    return Ok(Outcome::OutputFailure {
1626                        expected_output,
1627                        actual_raw_output: rows,
1628                        actual_output: Output::Values(values),
1629                        location,
1630                    });
1631                }
1632            }
1633            Output::Hashed {
1634                num_values,
1635                md5: expected_md5,
1636            } => {
1637                let mut hasher = Md5::new();
1638                for value in &values {
1639                    hasher.update(value);
1640                    hasher.update("\n");
1641                }
1642                let md5 = format!("{:x}", hasher.finalize());
1643                if values.len() != *num_values || md5 != *expected_md5 {
1644                    return Ok(Outcome::OutputFailure {
1645                        expected_output,
1646                        actual_raw_output: rows,
1647                        actual_output: Output::Hashed {
1648                            num_values: values.len(),
1649                            md5,
1650                        },
1651                        location,
1652                    });
1653                }
1654            }
1655        }
1656
1657        Ok(Outcome::Success)
1658    }
1659
1660    async fn execute_view_inner<'r>(
1661        &self,
1662        sql: &str,
1663        output: &'r Result<QueryOutput<'_>, &'r str>,
1664        location: Location,
1665    ) -> Result<Option<Outcome<'r>>, anyhow::Error> {
1666        print_sql_if(self.stdout, sql, self.verbosity >= 2);
1667        let sql_result = self.client.execute(sql, &[]).await;
1668
1669        // Evaluate if we already reached an outcome or not.
1670        let tentative_outcome = if let Err(view_error) = sql_result {
1671            if let Err(expected_error) = output {
1672                if Regex::new(expected_error)?.is_match(&format!("{:#}", view_error)) {
1673                    Some(Outcome::Success)
1674                } else {
1675                    Some(Outcome::PlanFailure {
1676                        error: view_error.into(),
1677                        location: location.clone(),
1678                    })
1679                }
1680            } else {
1681                Some(Outcome::PlanFailure {
1682                    error: view_error.into(),
1683                    location: location.clone(),
1684                })
1685            }
1686        } else {
1687            None
1688        };
1689        Ok(tentative_outcome)
1690    }
1691
1692    async fn execute_view<'r>(
1693        &self,
1694        sql: &str,
1695        num_attributes: Option<usize>,
1696        output: &'r Result<QueryOutput<'_>, &'r str>,
1697        location: Location,
1698    ) -> Result<Outcome<'r>, anyhow::Error> {
1699        // Create indexed view SQL commands and execute `CREATE VIEW`.
1700        let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
1701            column_names.clone()
1702        } else {
1703            None
1704        };
1705        let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
1706            sql,
1707            Uuid::new_v4().as_simple(),
1708            num_attributes,
1709            expected_column_names,
1710        );
1711        let tentative_outcome = self
1712            .execute_view_inner(create_view.as_str(), output, location.clone())
1713            .await?;
1714
1715        // Either we already have an outcome or alternatively,
1716        // we proceed to index and query the view.
1717        if let Some(view_outcome) = tentative_outcome {
1718            return Ok(view_outcome);
1719        }
1720
1721        let tentative_outcome = self
1722            .execute_view_inner(create_index.as_str(), output, location.clone())
1723            .await?;
1724
1725        let view_outcome;
1726        if let Some(outcome) = tentative_outcome {
1727            view_outcome = outcome;
1728        } else {
1729            print_sql_if(self.stdout, view_sql.as_str(), self.verbosity >= 2);
1730            view_outcome = self
1731                .execute_query(view_sql.as_str(), output, location.clone())
1732                .await?;
1733        }
1734
1735        // Remember to clean up after ourselves by dropping the view.
1736        print_sql_if(self.stdout, drop_view.as_str(), self.verbosity >= 2);
1737        self.client.execute(drop_view.as_str(), &[]).await?;
1738
1739        Ok(view_outcome)
1740    }
1741
1742    async fn run_query<'r>(
1743        &self,
1744        sql: &'r str,
1745        output: &'r Result<QueryOutput<'_>, &'r str>,
1746        location: Location,
1747        in_transaction: &mut bool,
1748    ) -> Result<Outcome<'r>, anyhow::Error> {
1749        let prepare_outcome = self
1750            .prepare_query(sql, output, location.clone(), in_transaction)
1751            .await?;
1752        match prepare_outcome {
1753            PrepareQueryOutcome::QueryPrepared(QueryInfo {
1754                is_select,
1755                num_attributes,
1756            }) => {
1757                let query_outcome = self.execute_query(sql, output, location.clone()).await?;
1758                if is_select && self.auto_index_selects {
1759                    let view_outcome = self
1760                        .execute_view(sql, None, output, location.clone())
1761                        .await?;
1762
1763                    // We compare here the query-based and view-based outcomes.
1764                    // We only produce a test failure if the outcomes are of different
1765                    // variant types, thus accepting smaller deviations in the details
1766                    // produced for each variant.
1767                    if std::mem::discriminant::<Outcome>(&query_outcome)
1768                        != std::mem::discriminant::<Outcome>(&view_outcome)
1769                    {
1770                        // Before producing a failure outcome, we try to obtain a new
1771                        // outcome for view-based execution exploiting analysis of the
1772                        // number of attributes. This two-level strategy can avoid errors
1773                        // produced by column ambiguity in the `SELECT`.
1774                        let view_outcome = if num_attributes.is_some() {
1775                            self.execute_view(sql, num_attributes, output, location.clone())
1776                                .await?
1777                        } else {
1778                            view_outcome
1779                        };
1780
1781                        if std::mem::discriminant::<Outcome>(&query_outcome)
1782                            != std::mem::discriminant::<Outcome>(&view_outcome)
1783                        {
1784                            let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
1785                                query_outcome: Box::new(query_outcome),
1786                                view_outcome: Box::new(view_outcome),
1787                                location: location.clone(),
1788                            };
1789                            // Determine if this inconsistent view outcome should be reported
1790                            // as an error or only as a warning.
1791                            let outcome = if should_warn(&inconsistent_view_outcome) {
1792                                Outcome::Warning {
1793                                    cause: Box::new(inconsistent_view_outcome),
1794                                    location: location.clone(),
1795                                }
1796                            } else {
1797                                inconsistent_view_outcome
1798                            };
1799                            return Ok(outcome);
1800                        }
1801                    }
1802                }
1803                Ok(query_outcome)
1804            }
1805            PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
1806        }
1807    }
1808
1809    async fn get_conn(
1810        &mut self,
1811        name: Option<&str>,
1812        user: Option<&str>,
1813    ) -> &tokio_postgres::Client {
1814        match name {
1815            None => &self.client,
1816            Some(name) => {
1817                if !self.clients.contains_key(name) {
1818                    let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
1819                        self.internal_server_addr
1820                    } else {
1821                        self.server_addr
1822                    };
1823                    let client = connect(addr, user).await;
1824                    self.clients.insert(name.into(), client);
1825                }
1826                self.clients.get(name).unwrap()
1827            }
1828        }
1829    }
1830
1831    async fn run_simple<'r>(
1832        &mut self,
1833        conn: Option<&'r str>,
1834        user: Option<&'r str>,
1835        sql: &'r str,
1836        sort: Sort,
1837        output: &'r Output,
1838        location: Location,
1839    ) -> Result<Outcome<'r>, anyhow::Error> {
1840        let client = self.get_conn(conn, user).await;
1841        let actual = Output::Values(match client.simple_query(sql).await {
1842            Ok(result) => {
1843                let mut rows = Vec::new();
1844
1845                for m in result.into_iter() {
1846                    match m {
1847                        SimpleQueryMessage::Row(row) => {
1848                            let mut s = vec![];
1849                            for i in 0..row.len() {
1850                                s.push(row.get(i).unwrap_or("NULL"));
1851                            }
1852                            rows.push(s.join(","));
1853                        }
1854                        SimpleQueryMessage::CommandComplete(count) => {
1855                            // This applies any sort on the COMPLETE line as
1856                            // well, but we do the same for the expected output.
1857                            rows.push(format!("COMPLETE {}", count));
1858                        }
1859                        SimpleQueryMessage::RowDescription(_) => {}
1860                        _ => panic!("unexpected"),
1861                    }
1862                }
1863
1864                if let Sort::Row = sort {
1865                    rows.sort();
1866                }
1867
1868                rows
1869            }
1870            // Errors can contain multiple lines (say if there are details), and rewrite
1871            // sticks them each on their own line, so we need to split up the lines here to
1872            // each be its own String in the Vec.
1873            Err(error) => error.to_string().lines().map(|s| s.to_string()).collect(),
1874        });
1875        if *output != actual {
1876            Ok(Outcome::OutputFailure {
1877                expected_output: output,
1878                actual_raw_output: vec![],
1879                actual_output: actual,
1880                location,
1881            })
1882        } else {
1883            Ok(Outcome::Success)
1884        }
1885    }
1886
1887    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
1888        let url = format!(
1889            "http://{}/api/catalog/check",
1890            self.internal_http_server_addr
1891        );
1892        let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
1893
1894        if let Some(inconsistencies) = response.get("err") {
1895            let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
1896                .expect("serializing Value cannot fail");
1897            Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
1898        } else {
1899            Ok(())
1900        }
1901    }
1902}
1903
1904async fn connect(addr: SocketAddr, user: Option<&str>) -> tokio_postgres::Client {
1905    let (client, connection) = tokio_postgres::connect(
1906        &format!(
1907            "host={} port={} user={}",
1908            addr.ip(),
1909            addr.port(),
1910            user.unwrap_or("materialize")
1911        ),
1912        NoTls,
1913    )
1914    .await
1915    .unwrap();
1916
1917    task::spawn(|| "sqllogictest_connect", async move {
1918        if let Err(e) = connection.await {
1919            eprintln!("connection error: {}", e);
1920        }
1921    });
1922    client
1923}
1924
1925pub trait WriteFmt {
1926    fn write_fmt(&self, fmt: fmt::Arguments<'_>);
1927}
1928
1929pub struct RunConfig<'a> {
1930    pub stdout: &'a dyn WriteFmt,
1931    pub stderr: &'a dyn WriteFmt,
1932    pub verbosity: u8,
1933    pub postgres_url: String,
1934    pub prefix: String,
1935    pub no_fail: bool,
1936    pub fail_fast: bool,
1937    pub auto_index_tables: bool,
1938    pub auto_index_selects: bool,
1939    pub auto_transactions: bool,
1940    pub enable_table_keys: bool,
1941    pub orchestrator_process_wrapper: Option<String>,
1942    pub tracing: TracingCliArgs,
1943    pub tracing_handle: TracingHandle,
1944    pub system_parameter_defaults: BTreeMap<String, String>,
1945    /// Persist state is handled specially because:
1946    /// - Persist background workers do not necessarily shut down immediately once the server is
1947    ///   shut down, and may panic if their storage is deleted out from under them.
1948    /// - It's safe for different databases to reference the same state: all data is scoped by UUID.
1949    pub persist_dir: TempDir,
1950    pub replicas: usize,
1951    pub replica_size: usize,
1952}
1953
1954fn print_record(config: &RunConfig<'_>, record: &Record) {
1955    match record {
1956        Record::Statement { sql, .. } | Record::Query { sql, .. } => print_sql(config.stdout, sql),
1957        _ => (),
1958    }
1959}
1960
1961fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
1962    if cond {
1963        print_sql(stdout, sql)
1964    }
1965}
1966
1967fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str) {
1968    writeln!(stdout, "{}", crate::util::indent(sql, 4))
1969}
1970
1971/// Regular expressions for matching error messages that should force a plan failure
1972/// in an inconsistent view outcome into a warning if the corresponding query succeeds.
1973const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
1974    // The following are unfixable errors in indexed views given our
1975    // current constraints.
1976    "cannot materialize call to",
1977    "SHOW commands are not allowed in views",
1978    "cannot create view with unstable dependencies",
1979    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
1980    "no valid schema selected",
1981    r#"system schema '\w+' cannot be modified"#,
1982    r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
1983    // NOTE(vmarcos): Column ambiguity that could not be eliminated by our
1984    // currently implemented syntactic rewrites is considered unfixable.
1985    // In addition, if some column cannot be dealt with, e.g., in `ORDER BY`
1986    // references, we treat this condition as unfixable as well.
1987    r#"column "[\w\?]+" specified more than once"#,
1988    r#"column "(\w+\.)?\w+" does not exist"#,
1989];
1990
1991/// Evaluates if the given outcome should be returned directly or if it should
1992/// be wrapped as a warning. Note that this function should be used for outcomes
1993/// that can be judged in a context-independent manner, i.e., the outcome itself
1994/// provides enough information as to whether a warning should be emitted or not.
1995fn should_warn(outcome: &Outcome) -> bool {
1996    match outcome {
1997        Outcome::InconsistentViewOutcome {
1998            query_outcome,
1999            view_outcome,
2000            ..
2001        } => match (query_outcome.as_ref(), view_outcome.as_ref()) {
2002            (Outcome::Success, Outcome::PlanFailure { error, .. }) => {
2003                INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
2004                    Regex::new(s)
2005                        .expect("unexpected error in regular expression parsing")
2006                        .is_match(&format!("{:#}", error))
2007                })
2008            }
2009            _ => false,
2010        },
2011        _ => false,
2012    }
2013}
2014
2015pub async fn run_string(
2016    runner: &mut Runner<'_>,
2017    source: &str,
2018    input: &str,
2019) -> Result<Outcomes, anyhow::Error> {
2020    runner.reset_database().await?;
2021
2022    let mut outcomes = Outcomes::default();
2023    let mut parser = crate::parser::Parser::new(source, input);
2024    // Transactions are currently relatively slow. Since sqllogictest runs in a single connection
2025    // there should be no difference in having longer running transactions.
2026    let mut in_transaction = false;
2027    writeln!(runner.config.stdout, "--- {}", source);
2028
2029    for record in parser.parse_records()? {
2030        // In maximal-verbosity mode, print the query before attempting to run
2031        // it. Running the query might panic, so it is important to print out
2032        // what query we are trying to run *before* we panic.
2033        if runner.config.verbosity >= 2 {
2034            print_record(runner.config, &record);
2035        }
2036
2037        let outcome = runner
2038            .run_record(&record, &mut in_transaction)
2039            .await
2040            .map_err(|err| format!("In {}:\n{}", source, err))
2041            .unwrap();
2042
2043        // Print warnings and failures in verbose mode.
2044        if runner.config.verbosity >= 1 && !outcome.success() {
2045            if runner.config.verbosity < 2 {
2046                // If `verbosity >= 2`, we'll already have printed the record,
2047                // so don't print it again. Yes, this is an ugly bit of logic.
2048                // Please don't try to consolidate it with the `print_record`
2049                // call above, as it's important to have a mode in which records
2050                // are printed before they are run, so that if running the
2051                // record panics, you can tell which record caused it.
2052                if !outcome.failure() {
2053                    writeln!(
2054                        runner.config.stdout,
2055                        "{}",
2056                        util::indent("Warning detected for: ", 4)
2057                    );
2058                }
2059                print_record(runner.config, &record);
2060            }
2061            if runner.config.verbosity >= 2 || outcome.failure() {
2062                writeln!(
2063                    runner.config.stdout,
2064                    "{}",
2065                    util::indent(&outcome.to_string(), 4)
2066                );
2067                writeln!(runner.config.stdout, "{}", util::indent("----", 4));
2068            }
2069        }
2070
2071        outcomes.stats[outcome.code()] += 1;
2072        if outcome.failure() {
2073            outcomes.details.push(format!("{}", outcome));
2074        }
2075
2076        if let Outcome::Bail { .. } = outcome {
2077            break;
2078        }
2079
2080        if runner.config.fail_fast && outcome.failure() {
2081            break;
2082        }
2083    }
2084    Ok(outcomes)
2085}
2086
2087pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
2088    let mut input = String::new();
2089    File::open(filename)?.read_to_string(&mut input)?;
2090    let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
2091    runner.check_catalog().await?;
2092
2093    Ok(outcomes)
2094}
2095
2096pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
2097    runner.reset_database().await?;
2098
2099    let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
2100
2101    let mut input = String::new();
2102    file.read_to_string(&mut input)?;
2103
2104    let mut buf = RewriteBuffer::new(&input);
2105
2106    let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
2107    writeln!(runner.config.stdout, "--- {}", filename.display());
2108    let mut in_transaction = false;
2109
2110    fn append_values_output(
2111        buf: &mut RewriteBuffer,
2112        input: &String,
2113        expected_output: &str,
2114        mode: &Mode,
2115        types: &Vec<Type>,
2116        column_names: Option<&Vec<ColumnName>>,
2117        actual_output: &Vec<String>,
2118        multiline: bool,
2119    ) {
2120        buf.append_header(input, expected_output, column_names);
2121
2122        for (i, row) in actual_output.chunks(types.len()).enumerate() {
2123            match mode {
2124                // In Cockroach mode, output each row on its own line, with
2125                // two spaces between each column.
2126                Mode::Cockroach => {
2127                    if i != 0 {
2128                        buf.append("\n");
2129                    }
2130
2131                    if row.len() == 0 {
2132                        // nothing to do
2133                    } else if row.len() == 1 {
2134                        // If there is only one column, then there is no need for space
2135                        // substitution, so we only do newline substitution.
2136                        if multiline {
2137                            buf.append(&row[0]);
2138                        } else {
2139                            buf.append(&row[0].replace('\n', "⏎"))
2140                        }
2141                    } else {
2142                        // Substitute spaces with ␠ to avoid mistaking the spaces in the result
2143                        // values with spaces that separate columns.
2144                        buf.append(
2145                            &row.iter()
2146                                .map(|col| {
2147                                    let mut col = col.replace(' ', "␠");
2148                                    if !multiline {
2149                                        col = col.replace('\n', "⏎");
2150                                    }
2151                                    col
2152                                })
2153                                .join("  "),
2154                        );
2155                    }
2156                }
2157                // In standard mode, output each value on its own line,
2158                // and ignore row boundaries.
2159                // No need to substitute spaces, because every value (not row) is on a separate
2160                // line. But we do need to substitute newlines.
2161                Mode::Standard => {
2162                    for (j, col) in row.iter().enumerate() {
2163                        if i != 0 || j != 0 {
2164                            buf.append("\n");
2165                        }
2166                        buf.append(&if multiline {
2167                            col.clone()
2168                        } else {
2169                            col.replace('\n', "⏎")
2170                        });
2171                    }
2172                }
2173            }
2174        }
2175    }
2176
2177    for record in parser.parse_records()? {
2178        let outcome = runner.run_record(&record, &mut in_transaction).await?;
2179
2180        match (&record, &outcome) {
2181            // If we see an output failure for a query, rewrite the expected output
2182            // to match the observed output.
2183            (
2184                Record::Query {
2185                    output:
2186                        Ok(QueryOutput {
2187                            mode,
2188                            output: Output::Values(_),
2189                            output_str: expected_output,
2190                            types,
2191                            column_names,
2192                            multiline,
2193                            ..
2194                        }),
2195                    ..
2196                },
2197                Outcome::OutputFailure {
2198                    actual_output: Output::Values(actual_output),
2199                    ..
2200                },
2201            ) => {
2202                append_values_output(
2203                    &mut buf,
2204                    &input,
2205                    expected_output,
2206                    mode,
2207                    types,
2208                    column_names.as_ref(),
2209                    actual_output,
2210                    *multiline,
2211                );
2212            }
2213            (
2214                Record::Query {
2215                    output:
2216                        Ok(QueryOutput {
2217                            mode,
2218                            output: Output::Values(_),
2219                            output_str: expected_output,
2220                            types,
2221                            multiline,
2222                            ..
2223                        }),
2224                    ..
2225                },
2226                Outcome::WrongColumnNames {
2227                    actual_column_names,
2228                    actual_output: Output::Values(actual_output),
2229                    ..
2230                },
2231            ) => {
2232                append_values_output(
2233                    &mut buf,
2234                    &input,
2235                    expected_output,
2236                    mode,
2237                    types,
2238                    Some(actual_column_names),
2239                    actual_output,
2240                    *multiline,
2241                );
2242            }
2243            (
2244                Record::Query {
2245                    output:
2246                        Ok(QueryOutput {
2247                            output: Output::Hashed { .. },
2248                            output_str: expected_output,
2249                            column_names,
2250                            ..
2251                        }),
2252                    ..
2253                },
2254                Outcome::OutputFailure {
2255                    actual_output: Output::Hashed { num_values, md5 },
2256                    ..
2257                },
2258            ) => {
2259                buf.append_header(&input, expected_output, column_names.as_ref());
2260
2261                buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
2262            }
2263            (
2264                Record::Simple {
2265                    output_str: expected_output,
2266                    ..
2267                },
2268                Outcome::OutputFailure {
2269                    actual_output: Output::Values(actual_output),
2270                    ..
2271                },
2272            ) => {
2273                buf.append_header(&input, expected_output, None);
2274
2275                for (i, row) in actual_output.iter().enumerate() {
2276                    if i != 0 {
2277                        buf.append("\n");
2278                    }
2279                    buf.append(row);
2280                }
2281            }
2282            (
2283                Record::Query {
2284                    sql,
2285                    output: Err(err),
2286                    ..
2287                },
2288                outcome,
2289            )
2290            | (
2291                Record::Statement {
2292                    expected_error: Some(err),
2293                    sql,
2294                    ..
2295                },
2296                outcome,
2297            ) if outcome.err_msg().is_some() => {
2298                buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
2299            }
2300            (_, Outcome::Success) => {}
2301            _ => bail!("unexpected: {:?} {:?}", record, outcome),
2302        }
2303    }
2304
2305    file.set_len(0)?;
2306    file.seek(SeekFrom::Start(0))?;
2307    file.write_all(buf.finish().as_bytes())?;
2308    file.sync_all()?;
2309    Ok(())
2310}
2311
2312/// Provides a means to rewrite the `.slt` file while iterating over it.
2313///
2314/// This struct takes the slt file as its `input`, tracks a cursor into it
2315/// (`input_offset`), and provides a buffer (`output`) to store the rewritten
2316/// results.
2317///
2318/// Functions that modify the file will lazily move `input` into `output` using
2319/// `flush_to`. However, those calls should all be interior to other functions.
2320#[derive(Debug)]
2321struct RewriteBuffer<'a> {
2322    input: &'a str,
2323    input_offset: usize,
2324    output: String,
2325}
2326
2327impl<'a> RewriteBuffer<'a> {
2328    fn new(input: &'a str) -> RewriteBuffer<'a> {
2329        RewriteBuffer {
2330            input,
2331            input_offset: 0,
2332            output: String::new(),
2333        }
2334    }
2335
2336    fn flush_to(&mut self, offset: usize) {
2337        assert!(offset >= self.input_offset);
2338        let chunk = &self.input[self.input_offset..offset];
2339        self.output.push_str(chunk);
2340        self.input_offset = offset;
2341    }
2342
2343    fn skip_to(&mut self, offset: usize) {
2344        assert!(offset >= self.input_offset);
2345        self.input_offset = offset;
2346    }
2347
2348    fn append(&mut self, s: &str) {
2349        self.output.push_str(s);
2350    }
2351
2352    fn append_header(
2353        &mut self,
2354        input: &String,
2355        expected_output: &str,
2356        column_names: Option<&Vec<ColumnName>>,
2357    ) {
2358        // Output everything before this record.
2359        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2360        #[allow(clippy::as_conversions)]
2361        let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
2362        self.flush_to(offset);
2363        self.skip_to(offset + expected_output.len());
2364
2365        // Attempt to install the result separator (----), if it does
2366        // not already exist.
2367        if self.peek_last(5) == "\n----" {
2368            self.append("\n");
2369        } else if self.peek_last(6) != "\n----\n" {
2370            self.append("\n----\n");
2371        }
2372
2373        let Some(names) = column_names else {
2374            return;
2375        };
2376        self.append(
2377            &names
2378                .iter()
2379                .map(|name| name.replace(' ', "␠"))
2380                .collect::<Vec<_>>()
2381                .join(" "),
2382        );
2383        self.append("\n");
2384    }
2385
2386    fn rewrite_expected_error(
2387        &mut self,
2388        input: &String,
2389        old_err: &str,
2390        new_err: &str,
2391        query: &str,
2392    ) {
2393        // Output everything before this error message.
2394        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2395        #[allow(clippy::as_conversions)]
2396        let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
2397        self.flush_to(err_offset);
2398        self.append(new_err);
2399        self.append("\n");
2400        self.append(query);
2401        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2402        #[allow(clippy::as_conversions)]
2403        self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
2404    }
2405
2406    fn peek_last(&self, n: usize) -> &str {
2407        &self.output[self.output.len() - n..]
2408    }
2409
2410    fn finish(mut self) -> String {
2411        self.flush_to(self.input.len());
2412        self.output
2413    }
2414}
2415
2416/// Generates view creation, view indexing, view querying, and view
2417/// dropping SQL commands for a given `SELECT` query. If the number
2418/// of attributes produced by the query is known, the view commands
2419/// are specialized to avoid issues with column ambiguity. This
2420/// function is a helper for `--auto_index_selects` and assumes that
2421/// the provided input SQL has already been run through the parser,
2422/// resulting in a valid `SELECT` statement.
2423fn generate_view_sql(
2424    sql: &str,
2425    view_uuid: &Simple,
2426    num_attributes: Option<usize>,
2427    expected_column_names: Option<Vec<ColumnName>>,
2428) -> (String, String, String, String) {
2429    // To create the view, re-parse the sql; note that we must find exactly
2430    // one statement and it must be a `SELECT`.
2431    // NOTE(vmarcos): Direct string manipulation was attempted while
2432    // prototyping the code below, which avoids the extra parsing and
2433    // data structure cloning. However, running DDL is so slow that
2434    // it did not matter in terms of runtime. We can revisit this if
2435    // DDL cost drops dramatically in the future.
2436    let stmts = parser::parse_statements(sql).unwrap_or_default();
2437    assert!(stmts.len() == 1);
2438    let (query, query_as_of) = match &stmts[0].ast {
2439        Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
2440        _ => unreachable!("This function should only be called for SELECTs"),
2441    };
2442
2443    // Prior to creating the view, process the `ORDER BY` clause of
2444    // the `SELECT` query, if any. Ordering is not preserved when a
2445    // view includes an `ORDER BY` clause and must be re-enforced by
2446    // an external `ORDER BY` clause when querying the view.
2447    let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
2448        (query.order_by.clone(), vec![], None)
2449    } else {
2450        derive_order_by(&query.body, &query.order_by)
2451    };
2452
2453    // Since one-shot SELECT statements may contain ambiguous column names,
2454    // we either use the expected column names, if that option was
2455    // provided, or else just rename the output schema of the view
2456    // using numerically increasing attribute names, whenever possible.
2457    // This strategy makes it possible to use `CREATE INDEX`, thus
2458    // matching the behavior of the option `auto_index_tables`. However,
2459    // we may be presented with a `SELECT *` query, in which case the parser
2460    // does not produce sufficient information to allow us to compute
2461    // the number of output columns. In the latter case, we are supplied
2462    // with `None` for `num_attributes` and just employ the command
2463    // `CREATE DEFAULT INDEX` instead. Additionally, the view is created
2464    // without schema renaming. This strategy is insufficient to dodge
2465    // column name ambiguity in all cases, but we assume here that we
2466    // can adjust the (hopefully) small number of tests that eventually
2467    // challenge us in this particular way.
2468    let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
2469    let projection = expected_column_names.map_or_else(
2470        || {
2471            num_attributes.map_or(vec![], |n| {
2472                (1..=n)
2473                    .map(|i| Ident::new_unchecked(format!("a{i}")))
2474                    .collect()
2475            })
2476        },
2477        |cols| {
2478            cols.iter()
2479                .map(|c| Ident::new_unchecked(c.as_str()))
2480                .collect()
2481        },
2482    );
2483    let columns: Vec<Ident> = projection
2484        .iter()
2485        .cloned()
2486        .chain(extra_columns.iter().map(|item| {
2487            if let SelectItem::Expr {
2488                expr: _,
2489                alias: Some(ident),
2490            } = item
2491            {
2492                ident.clone()
2493            } else {
2494                unreachable!("alias must be given for extra column")
2495            }
2496        }))
2497        .collect();
2498
2499    // Build a `CREATE VIEW` with the columns computed above.
2500    let mut query = query.clone();
2501    if extra_columns.len() > 0 {
2502        match &mut query.body {
2503            SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
2504            _ => unimplemented!("cannot yet rewrite projections of nested queries"),
2505        }
2506    }
2507    let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
2508        if_exists: IfExistsBehavior::Error,
2509        temporary: false,
2510        definition: ViewDefinition {
2511            name: name.clone(),
2512            columns: columns.clone(),
2513            query,
2514        },
2515    })
2516    .to_ast_string_stable();
2517
2518    // We then create either a `CREATE INDEX` or a `CREATE DEFAULT INDEX`
2519    // statement, depending on whether we could obtain the number of
2520    // attributes from the original `SELECT`.
2521    let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2522        name: None,
2523        in_cluster: None,
2524        on_name: RawItemName::Name(name.clone()),
2525        key_parts: if columns.len() == 0 {
2526            None
2527        } else {
2528            Some(
2529                columns
2530                    .iter()
2531                    .map(|ident| Expr::Identifier(vec![ident.clone()]))
2532                    .collect(),
2533            )
2534        },
2535        with_options: Vec::new(),
2536        if_not_exists: false,
2537    })
2538    .to_ast_string_stable();
2539
2540    // Assert if DISTINCT semantics are unchanged from view
2541    let distinct_unneeded = extra_columns.len() == 0
2542        || match distinct {
2543            None | Some(Distinct::On(_)) => true,
2544            Some(Distinct::EntireRow) => false,
2545        };
2546    let distinct = if distinct_unneeded { None } else { distinct };
2547
2548    // `SELECT [* | {projection}] FROM {name} [ORDER BY {view_order_by}]`
2549    let view_sql = AstStatement::<Raw>::Select(SelectStatement {
2550        query: Query {
2551            ctes: CteBlock::Simple(vec![]),
2552            body: SetExpr::Select(Box::new(Select {
2553                distinct,
2554                projection: if projection.len() == 0 {
2555                    vec![SelectItem::Wildcard]
2556                } else {
2557                    projection
2558                        .iter()
2559                        .map(|ident| SelectItem::Expr {
2560                            expr: Expr::Identifier(vec![ident.clone()]),
2561                            alias: None,
2562                        })
2563                        .collect()
2564                },
2565                from: vec![TableWithJoins {
2566                    relation: TableFactor::Table {
2567                        name: RawItemName::Name(name.clone()),
2568                        alias: None,
2569                    },
2570                    joins: vec![],
2571                }],
2572                selection: None,
2573                group_by: vec![],
2574                having: None,
2575                qualify: None,
2576                options: vec![],
2577            })),
2578            order_by: view_order_by,
2579            limit: None,
2580            offset: None,
2581        },
2582        as_of: query_as_of.clone(),
2583    })
2584    .to_ast_string_stable();
2585
2586    // `DROP VIEW {name}`
2587    let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
2588        object_type: ObjectType::View,
2589        if_exists: false,
2590        names: vec![UnresolvedObjectName::Item(name)],
2591        cascade: false,
2592    })
2593    .to_ast_string_stable();
2594
2595    (create_view, create_index, view_sql, drop_view)
2596}
2597
2598/// Analyzes the provided query `body` to derive the number of
2599/// attributes in the query. We only consider syntactic cues,
2600/// so we may end up deriving `None` for the number of attributes
2601/// as a conservative approximation.
2602fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
2603    let Some((projection, _)) = find_projection(body) else {
2604        return None;
2605    };
2606    derive_num_attributes_from_projection(projection)
2607}
2608
2609/// Analyzes a query's `ORDER BY` clause to derive an `ORDER BY`
2610/// clause that makes numeric references to any expressions in
2611/// the projection and generated-attribute references to expressions
2612/// that need to be added as extra columns to the projection list.
2613/// The rewritten `ORDER BY` clause is then usable when querying a
2614/// view that contains the same `SELECT` as the given query.
2615/// This function returns both the rewritten `ORDER BY` clause
2616/// as well as a list of extra columns that need to be added
2617/// to the query's projection for the `ORDER BY` clause to
2618/// succeed.
2619fn derive_order_by(
2620    body: &SetExpr<Raw>,
2621    order_by: &Vec<OrderByExpr<Raw>>,
2622) -> (
2623    Vec<OrderByExpr<Raw>>,
2624    Vec<SelectItem<Raw>>,
2625    Option<Distinct<Raw>>,
2626) {
2627    let Some((projection, distinct)) = find_projection(body) else {
2628        return (vec![], vec![], None);
2629    };
2630    let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
2631    (view_order_by, extra_columns, distinct.clone())
2632}
2633
2634/// Finds the projection list in a `SELECT` query body.
2635fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
2636    // Iterate to peel off the query body until the query's
2637    // projection list is found.
2638    let mut set_expr = body;
2639    loop {
2640        match set_expr {
2641            SetExpr::Select(select) => {
2642                return Some((&select.projection, &select.distinct));
2643            }
2644            SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
2645            SetExpr::Query(query) => set_expr = &query.body,
2646            _ => return None,
2647        }
2648    }
2649}
2650
2651/// Computes the number of attributes that are obtained by the
2652/// projection of a `SELECT` query. The projection may include
2653/// wildcards, in which case the analysis just returns `None`.
2654fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
2655    let mut num_attributes = 0usize;
2656    for item in projection.iter() {
2657        let SelectItem::Expr { expr, .. } = item else {
2658            return None;
2659        };
2660        match expr {
2661            Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
2662                return None;
2663            }
2664            _ => {
2665                num_attributes += 1;
2666            }
2667        }
2668    }
2669    Some(num_attributes)
2670}
2671
2672/// Computes an `ORDER BY` clause with only numeric references
2673/// from given projection and `ORDER BY` of a `SELECT` query.
2674/// If the derivation fails to match a given expression, the
2675/// matched prefix is returned. Note that this could be empty.
2676fn derive_order_by_from_projection(
2677    projection: &Vec<SelectItem<Raw>>,
2678    order_by: &Vec<OrderByExpr<Raw>>,
2679) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
2680    let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
2681    let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
2682    for order_by_expr in order_by.iter() {
2683        let query_expr = &order_by_expr.expr;
2684        let view_expr = match query_expr {
2685            Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
2686            _ => {
2687                // Find expression in query projection, if we can.
2688                if let Some(i) = projection.iter().position(|item| match item {
2689                    SelectItem::Expr { expr, alias } => {
2690                        expr == query_expr
2691                            || match query_expr {
2692                                Expr::Identifier(ident) => {
2693                                    ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
2694                                }
2695                                _ => false,
2696                            }
2697                    }
2698                    SelectItem::Wildcard => false,
2699                }) {
2700                    Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
2701                } else {
2702                    // If the expression is not found in the
2703                    // projection, add extra column.
2704                    let ident = Ident::new_unchecked(format!(
2705                        "a{}",
2706                        (projection.len() + extra_columns.len() + 1)
2707                    ));
2708                    extra_columns.push(SelectItem::Expr {
2709                        expr: query_expr.clone(),
2710                        alias: Some(ident.clone()),
2711                    });
2712                    Expr::Identifier(vec![ident])
2713                }
2714            }
2715        };
2716        view_order_by.push(OrderByExpr {
2717            expr: view_expr,
2718            asc: order_by_expr.asc,
2719            nulls_last: order_by_expr.nulls_last,
2720        });
2721    }
2722    (view_order_by, extra_columns)
2723}
2724
2725/// Returns extra statements to execute after `stmt` is executed.
2726fn mutate(sql: &str) -> Vec<String> {
2727    let stmts = parser::parse_statements(sql).unwrap_or_default();
2728    let mut additional = Vec::new();
2729    for stmt in stmts {
2730        match stmt.ast {
2731            AstStatement::CreateTable(stmt) => additional.push(
2732                // CREATE TABLE -> CREATE INDEX. Specify all columns manually in case CREATE
2733                // DEFAULT INDEX ever goes away.
2734                AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2735                    name: None,
2736                    in_cluster: None,
2737                    on_name: RawItemName::Name(stmt.name.clone()),
2738                    key_parts: Some(
2739                        stmt.columns
2740                            .iter()
2741                            .map(|def| Expr::Identifier(vec![def.name.clone()]))
2742                            .collect(),
2743                    ),
2744                    with_options: Vec::new(),
2745                    if_not_exists: false,
2746                })
2747                .to_ast_string_stable(),
2748            ),
2749            _ => {}
2750        }
2751    }
2752    additional
2753}
2754
2755#[mz_ore::test]
2756#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
2757fn test_generate_view_sql() {
2758    let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
2759    let cases = vec![
2760        (("SELECT * FROM t", None, None),
2761        (
2762            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
2763            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2764            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2765            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2766        )),
2767        (("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
2768        (
2769            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2770            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
2771            r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2772            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2773        )),
2774        (("SELECT a, b, c FROM t1, t2", Some(3), None),
2775        (
2776            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2777            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2778            r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2779            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2780        )),
2781        // A case with ambiguity that is accepted by the function, illustrating that
2782        // our measures to dodge this issue are imperfect.
2783        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
2784        (
2785            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
2786            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2787            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2788            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2789        )),
2790        (("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
2791        (
2792            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
2793            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
2794            r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
2795            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2796        )),
2797        (("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
2798        (
2799            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
2800            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
2801            r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2802            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2803        )),
2804        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
2805        (
2806            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
2807            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2808            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2809            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2810        )),
2811        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
2812        (
2813            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
2814            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2815            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
2816            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2817        )),
2818        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
2819        (
2820            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
2821            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2822            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
2823            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2824        )),
2825        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
2826        (
2827            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
2828            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2829            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
2830            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2831        )),
2832    ];
2833    for ((sql, num_attributes, expected_column_names), expected) in cases {
2834        let view_sql =
2835            generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
2836        assert_eq!(expected, view_sql);
2837    }
2838}
2839
2840#[mz_ore::test]
2841fn test_mutate() {
2842    let cases = vec![
2843        ("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
2844        (
2845            "CREATE TABLE t (a INT)",
2846            vec![r#"CREATE INDEX ON "t" ("a")"#],
2847        ),
2848        (
2849            "CREATE TABLE t (a INT, b TEXT)",
2850            vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
2851        ),
2852        // Invalid syntax works, just returns nothing.
2853        ("BAD SYNTAX", Vec::new()),
2854    ];
2855    for (sql, expected) in cases {
2856        let stmts = mutate(sql);
2857        assert_eq!(expected, stmts, "sql: {sql}");
2858    }
2859}