mz_sqllogictest/
runner.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The Materialize-specific runner for sqllogictest.
11//!
12//! slt tests expect a serialized execution of sql statements and queries.
13//! To get the same results in materialize we track current_timestamp and increment it whenever we execute a statement.
14//!
15//! The high-level workflow is:
16//!   for each record in the test file:
17//!     if record is a sql statement:
18//!       run sql in postgres, observe changes and copy them to materialize using LocalInput::Updates(..)
19//!       advance current_timestamp
20//!       promise to never send updates for times < current_timestamp using LocalInput::Watermark(..)
21//!       compare to expected results
22//!       if wrong, bail out and stop processing this file
23//!     if record is a sql query:
24//!       peek query at current_timestamp
25//!       compare to expected results
26//!       if wrong, record the error
27
28use std::collections::BTreeMap;
29use std::error::Error;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write};
32use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::LazyLock;
36use std::time::Duration;
37use std::{env, fmt, ops, str, thread};
38
39use anyhow::{anyhow, bail};
40use bytes::BytesMut;
41use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
42use fallible_iterator::FallibleIterator;
43use futures::sink::SinkExt;
44use itertools::Itertools;
45use maplit::btreemap;
46use md5::{Digest, Md5};
47use mz_adapter_types::bootstrap_builtin_cluster_config::{
48    ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
49    CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
50    SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
51};
52use mz_catalog::config::ClusterReplicaSizeMap;
53use mz_controller::ControllerConfig;
54use mz_environmentd::CatalogConfig;
55use mz_license_keys::ValidatedLicenseKey;
56use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
57use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
58use mz_ore::cast::{CastFrom, ReinterpretCast};
59use mz_ore::channel::trigger;
60use mz_ore::error::ErrorExt;
61use mz_ore::metrics::MetricsRegistry;
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::retry::Retry;
64use mz_ore::task;
65use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
66use mz_ore::tracing::TracingHandle;
67use mz_ore::url::SensitiveUrl;
68use mz_persist_client::PersistLocation;
69use mz_persist_client::cache::PersistClientCache;
70use mz_persist_client::cfg::PersistConfig;
71use mz_persist_client::rpc::{
72    MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
73};
74use mz_pgrepr::{Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value, oid};
75use mz_repr::ColumnName;
76use mz_repr::adt::date::Date;
77use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
78use mz_repr::adt::numeric;
79use mz_secrets::SecretsController;
80use mz_server_core::listeners::{
81    AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
82    ListenersConfig, SqlListenerConfig,
83};
84use mz_sql::ast::{Expr, Raw, Statement};
85use mz_sql::catalog::EnvironmentId;
86use mz_sql_parser::ast::display::AstDisplay;
87use mz_sql_parser::ast::{
88    CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
89    IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
90    SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
91    UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
92};
93use mz_sql_parser::parser;
94use mz_storage_types::connections::ConnectionContext;
95use postgres_protocol::types;
96use regex::Regex;
97use tempfile::TempDir;
98use tokio::net::TcpListener;
99use tokio::runtime::Runtime;
100use tokio::sync::oneshot;
101use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
102use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
103use tokio_stream::wrappers::TcpListenerStream;
104use tower_http::cors::AllowOrigin;
105use tracing::{error, info};
106use uuid::Uuid;
107use uuid::fmt::Simple;
108
109use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
110use crate::util;
111
112#[derive(Debug)]
113pub enum Outcome<'a> {
114    Unsupported {
115        error: anyhow::Error,
116        location: Location,
117    },
118    ParseFailure {
119        error: anyhow::Error,
120        location: Location,
121    },
122    PlanFailure {
123        error: anyhow::Error,
124        location: Location,
125    },
126    UnexpectedPlanSuccess {
127        expected_error: &'a str,
128        location: Location,
129    },
130    WrongNumberOfRowsInserted {
131        expected_count: u64,
132        actual_count: u64,
133        location: Location,
134    },
135    WrongColumnCount {
136        expected_count: usize,
137        actual_count: usize,
138        location: Location,
139    },
140    WrongColumnNames {
141        expected_column_names: &'a Vec<ColumnName>,
142        actual_column_names: Vec<ColumnName>,
143        actual_output: Output,
144        location: Location,
145    },
146    OutputFailure {
147        expected_output: &'a Output,
148        actual_raw_output: Vec<Row>,
149        actual_output: Output,
150        location: Location,
151    },
152    InconsistentViewOutcome {
153        query_outcome: Box<Outcome<'a>>,
154        view_outcome: Box<Outcome<'a>>,
155        location: Location,
156    },
157    Bail {
158        cause: Box<Outcome<'a>>,
159        location: Location,
160    },
161    Warning {
162        cause: Box<Outcome<'a>>,
163        location: Location,
164    },
165    Success,
166}
167
168const NUM_OUTCOMES: usize = 12;
169const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
170const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
171
172impl<'a> Outcome<'a> {
173    fn code(&self) -> usize {
174        match self {
175            Outcome::Unsupported { .. } => 0,
176            Outcome::ParseFailure { .. } => 1,
177            Outcome::PlanFailure { .. } => 2,
178            Outcome::UnexpectedPlanSuccess { .. } => 3,
179            Outcome::WrongNumberOfRowsInserted { .. } => 4,
180            Outcome::WrongColumnCount { .. } => 5,
181            Outcome::WrongColumnNames { .. } => 6,
182            Outcome::OutputFailure { .. } => 7,
183            Outcome::InconsistentViewOutcome { .. } => 8,
184            Outcome::Bail { .. } => 9,
185            Outcome::Warning { .. } => 10,
186            Outcome::Success => 11,
187        }
188    }
189
190    fn success(&self) -> bool {
191        matches!(self, Outcome::Success)
192    }
193
194    fn failure(&self) -> bool {
195        !matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
196    }
197
198    /// Returns an error message that will match self. Appropriate for
199    /// rewriting error messages (i.e. not inserting error messages where we
200    /// currently expect success).
201    fn err_msg(&self) -> Option<String> {
202        match self {
203            Outcome::Unsupported { error, .. }
204            | Outcome::ParseFailure { error, .. }
205            | Outcome::PlanFailure { error, .. } => Some(
206                // This value gets fed back into regex to check that it matches
207                // `self`, so escape its meta characters.
208                regex::escape(
209                    // Take only the first string in the error message, which should be
210                    // sufficient for meaningfully matching the error.
211                    error.to_string_with_causes().split('\n').next().unwrap(),
212                )
213                // We need to undo the escaping of #. `regex::escape` escapes this because it
214                // expects that we use the `x` flag when building a regex, but this is not the case,
215                // so \# would end up being an invalid escape sequence, which would choke the
216                // parsing of the slt file the next time around.
217                .replace(r"\#", "#"),
218            ),
219            _ => None,
220        }
221    }
222}
223
224impl fmt::Display for Outcome<'_> {
225    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
226        use Outcome::*;
227        const INDENT: &str = "\n        ";
228        match self {
229            Unsupported { error, location } => write!(
230                f,
231                "Unsupported:{}:\n{}",
232                location,
233                error.display_with_causes()
234            ),
235            ParseFailure { error, location } => {
236                write!(
237                    f,
238                    "ParseFailure:{}:\n{}",
239                    location,
240                    error.display_with_causes()
241                )
242            }
243            PlanFailure { error, location } => write!(f, "PlanFailure:{}:\n{:#}", location, error),
244            UnexpectedPlanSuccess {
245                expected_error,
246                location,
247            } => write!(
248                f,
249                "UnexpectedPlanSuccess:{} expected error: {}",
250                location, expected_error
251            ),
252            WrongNumberOfRowsInserted {
253                expected_count,
254                actual_count,
255                location,
256            } => write!(
257                f,
258                "WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
259                location, INDENT, expected_count, INDENT, actual_count
260            ),
261            WrongColumnCount {
262                expected_count,
263                actual_count,
264                location,
265            } => write!(
266                f,
267                "WrongColumnCount:{}{}expected: {}{}actually: {}",
268                location, INDENT, expected_count, INDENT, actual_count
269            ),
270            WrongColumnNames {
271                expected_column_names,
272                actual_column_names,
273                actual_output: _,
274                location,
275            } => write!(
276                f,
277                "Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
278                location,
279                INDENT,
280                expected_column_names
281                    .iter()
282                    .map(|n| n.to_string())
283                    .collect::<Vec<_>>()
284                    .join(" "),
285                INDENT,
286                actual_column_names
287                    .iter()
288                    .map(|n| n.to_string())
289                    .collect::<Vec<_>>()
290                    .join(" ")
291            ),
292            OutputFailure {
293                expected_output,
294                actual_raw_output,
295                actual_output,
296                location,
297            } => write!(
298                f,
299                "OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
300                location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
301            ),
302            InconsistentViewOutcome {
303                query_outcome,
304                view_outcome,
305                location,
306            } => write!(
307                f,
308                "InconsistentViewOutcome:{}{}expected from query: {:?}{}actually from indexed view: {:?}{}",
309                location, INDENT, query_outcome, INDENT, view_outcome, INDENT
310            ),
311            Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
312            Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
313            Success => f.write_str("Success"),
314        }
315    }
316}
317
318#[derive(Default, Debug)]
319pub struct Outcomes {
320    stats: [usize; NUM_OUTCOMES],
321    details: Vec<String>,
322}
323
324impl ops::AddAssign<Outcomes> for Outcomes {
325    fn add_assign(&mut self, rhs: Outcomes) {
326        for (lhs, rhs) in self.stats.iter_mut().zip_eq(rhs.stats.iter()) {
327            *lhs += rhs
328        }
329    }
330}
331impl Outcomes {
332    pub fn any_failed(&self) -> bool {
333        self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
334    }
335
336    pub fn as_json(&self) -> serde_json::Value {
337        serde_json::json!({
338            "unsupported": self.stats[0],
339            "parse_failure": self.stats[1],
340            "plan_failure": self.stats[2],
341            "unexpected_plan_success": self.stats[3],
342            "wrong_number_of_rows_affected": self.stats[4],
343            "wrong_column_count": self.stats[5],
344            "wrong_column_names": self.stats[6],
345            "output_failure": self.stats[7],
346            "inconsistent_view_outcome": self.stats[8],
347            "bail": self.stats[9],
348            "warning": self.stats[10],
349            "success": self.stats[11],
350        })
351    }
352
353    pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
354        OutcomesDisplay {
355            inner: self,
356            no_fail,
357            failure_details,
358        }
359    }
360}
361
362pub struct OutcomesDisplay<'a> {
363    inner: &'a Outcomes,
364    no_fail: bool,
365    failure_details: bool,
366}
367
368impl<'a> fmt::Display for OutcomesDisplay<'a> {
369    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
370        let total: usize = self.inner.stats.iter().sum();
371        if self.failure_details
372            && (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
373                || self.no_fail)
374        {
375            for outcome in &self.inner.details {
376                writeln!(f, "{}", outcome)?;
377            }
378            Ok(())
379        } else {
380            write!(
381                f,
382                "{}:",
383                if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
384                    "PASS"
385                } else if self.no_fail {
386                    "FAIL-IGNORE"
387                } else {
388                    "FAIL"
389                }
390            )?;
391            static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
392                vec![
393                    "unsupported",
394                    "parse-failure",
395                    "plan-failure",
396                    "unexpected-plan-success",
397                    "wrong-number-of-rows-inserted",
398                    "wrong-column-count",
399                    "wrong-column-names",
400                    "output-failure",
401                    "inconsistent-view-outcome",
402                    "bail",
403                    "warning",
404                    "success",
405                    "total",
406                ]
407            });
408            for (i, n) in self.inner.stats.iter().enumerate() {
409                if *n > 0 {
410                    write!(f, " {}={}", NAMES[i], n)?;
411                }
412            }
413            write!(f, " total={}", total)
414        }
415    }
416}
417
418struct QueryInfo {
419    is_select: bool,
420    num_attributes: Option<usize>,
421}
422
423enum PrepareQueryOutcome<'a> {
424    QueryPrepared(QueryInfo),
425    Outcome(Outcome<'a>),
426}
427
428pub struct Runner<'a> {
429    config: &'a RunConfig<'a>,
430    inner: Option<RunnerInner<'a>>,
431}
432
433pub struct RunnerInner<'a> {
434    server_addr: SocketAddr,
435    internal_server_addr: SocketAddr,
436    password_server_addr: SocketAddr,
437    internal_http_server_addr: SocketAddr,
438    // Drop order matters for these fields.
439    client: tokio_postgres::Client,
440    system_client: tokio_postgres::Client,
441    clients: BTreeMap<String, tokio_postgres::Client>,
442    auto_index_tables: bool,
443    auto_index_selects: bool,
444    auto_transactions: bool,
445    enable_table_keys: bool,
446    verbosity: u8,
447    stdout: &'a dyn WriteFmt,
448    _shutdown_trigger: trigger::Trigger,
449    _server_thread: JoinOnDropHandle<()>,
450    _temp_dir: TempDir,
451}
452
453#[derive(Debug)]
454pub struct Slt(Value);
455
456impl<'a> FromSql<'a> for Slt {
457    fn from_sql(
458        ty: &PgType,
459        mut raw: &'a [u8],
460    ) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
461        Ok(match *ty {
462            PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
463                types::bytea_from_sql(raw),
464            )?)),
465            PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
466            PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
467            PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
468                types::char_from_sql(raw)?.to_be_bytes(),
469            ))),
470            PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
471            PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
472            PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
473                raw,
474            )?)?)),
475            PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
476            PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
477            PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
478            PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
479            PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
480            PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
481            PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
482            PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
483            PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
484            PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
485            PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
486            PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
487                Self(Value::Text(types::text_from_sql(raw)?.to_string()))
488            }
489            PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
490            PgType::TIMESTAMP => Self(Value::Timestamp(
491                NaiveDateTime::from_sql(ty, raw)?.try_into()?,
492            )),
493            PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
494                DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
495            )),
496            PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
497            PgType::RECORD => {
498                let num_fields = read_be_i32(&mut raw)?;
499                let mut tuple = vec![];
500                for _ in 0..num_fields {
501                    let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
502                    let typ = match PgType::from_oid(oid) {
503                        Some(typ) => typ,
504                        None => return Err("unknown oid".into()),
505                    };
506                    let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
507                    tuple.push(v.map(|v| v.0));
508                }
509                Self(Value::Record(tuple))
510            }
511            PgType::INT4_RANGE
512            | PgType::INT8_RANGE
513            | PgType::DATE_RANGE
514            | PgType::NUM_RANGE
515            | PgType::TS_RANGE
516            | PgType::TSTZ_RANGE => {
517                use mz_repr::adt::range::Range;
518                let range: Range<Slt> = Range::from_sql(ty, raw)?;
519                Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
520            }
521
522            _ => match ty.kind() {
523                PgKind::Array(arr_type) => {
524                    let arr = types::array_from_sql(raw)?;
525                    let elements: Vec<Option<Value>> = arr
526                        .values()
527                        .map(|v| match v {
528                            Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
529                            None => Ok(None),
530                        })
531                        .collect::<Vec<Option<Slt>>>()?
532                        .into_iter()
533                        // Map a Vec<Option<Slt>> to Vec<Option<Value>>.
534                        .map(|v| v.map(|v| v.0))
535                        .collect();
536
537                    Self(Value::Array {
538                        dims: arr
539                            .dimensions()
540                            .map(|d| {
541                                Ok(mz_repr::adt::array::ArrayDimension {
542                                    lower_bound: isize::cast_from(d.lower_bound),
543                                    length: usize::try_from(d.len)
544                                        .expect("cannot have negative length"),
545                                })
546                            })
547                            .collect()?,
548                        elements,
549                    })
550                }
551                _ => match ty.oid() {
552                    oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
553                    oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
554                    oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
555                    oid::TYPE_MZ_TIMESTAMP_OID => {
556                        let s = types::text_from_sql(raw)?;
557                        let t: mz_repr::Timestamp = s.parse()?;
558                        Self(Value::MzTimestamp(t))
559                    }
560                    oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
561                        types::bytea_from_sql(raw),
562                    )?)),
563                    _ => unreachable!(),
564                },
565            },
566        })
567    }
568    fn accepts(ty: &PgType) -> bool {
569        match ty.kind() {
570            PgKind::Array(_) | PgKind::Composite(_) => return true,
571            _ => {}
572        }
573        match ty.oid() {
574            oid::TYPE_UINT2_OID
575            | oid::TYPE_UINT4_OID
576            | oid::TYPE_UINT8_OID
577            | oid::TYPE_MZ_TIMESTAMP_OID
578            | oid::TYPE_MZ_ACL_ITEM_OID => return true,
579            _ => {}
580        }
581        matches!(
582            *ty,
583            PgType::ACLITEM
584                | PgType::BOOL
585                | PgType::BYTEA
586                | PgType::CHAR
587                | PgType::DATE
588                | PgType::FLOAT4
589                | PgType::FLOAT8
590                | PgType::INT2
591                | PgType::INT4
592                | PgType::INT8
593                | PgType::INTERVAL
594                | PgType::JSONB
595                | PgType::NAME
596                | PgType::NUMERIC
597                | PgType::OID
598                | PgType::REGCLASS
599                | PgType::REGPROC
600                | PgType::REGTYPE
601                | PgType::RECORD
602                | PgType::TEXT
603                | PgType::BPCHAR
604                | PgType::VARCHAR
605                | PgType::TIME
606                | PgType::TIMESTAMP
607                | PgType::TIMESTAMPTZ
608                | PgType::UUID
609                | PgType::INT4_RANGE
610                | PgType::INT4_RANGE_ARRAY
611                | PgType::INT8_RANGE
612                | PgType::INT8_RANGE_ARRAY
613                | PgType::DATE_RANGE
614                | PgType::DATE_RANGE_ARRAY
615                | PgType::NUM_RANGE
616                | PgType::NUM_RANGE_ARRAY
617                | PgType::TS_RANGE
618                | PgType::TS_RANGE_ARRAY
619                | PgType::TSTZ_RANGE
620                | PgType::TSTZ_RANGE_ARRAY
621        )
622    }
623}
624
625// From postgres-types/src/private.rs.
626fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
627    if buf.len() < 4 {
628        return Err("invalid buffer size".into());
629    }
630    let mut bytes = [0; 4];
631    bytes.copy_from_slice(&buf[..4]);
632    *buf = &buf[4..];
633    Ok(i32::from_be_bytes(bytes))
634}
635
636// From postgres-types/src/private.rs.
637fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
638where
639    T: FromSql<'a>,
640{
641    let value = match usize::try_from(read_be_i32(buf)?) {
642        Err(_) => None,
643        Ok(len) => {
644            if len > buf.len() {
645                return Err("invalid buffer size".into());
646            }
647            let (head, tail) = buf.split_at(len);
648            *buf = tail;
649            Some(head)
650        }
651    };
652    T::from_sql_nullable(type_, value)
653}
654
655fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
656    match (typ, d.0) {
657        (Type::Bool, Value::Bool(b)) => b.to_string(),
658
659        (Type::Integer, Value::Int2(i)) => i.to_string(),
660        (Type::Integer, Value::Int4(i)) => i.to_string(),
661        (Type::Integer, Value::Int8(i)) => i.to_string(),
662        (Type::Integer, Value::UInt2(u)) => u.0.to_string(),
663        (Type::Integer, Value::UInt4(u)) => u.0.to_string(),
664        (Type::Integer, Value::UInt8(u)) => u.0.to_string(),
665        (Type::Integer, Value::Oid(i)) => i.to_string(),
666        // TODO(benesch): rewrite to avoid `as`.
667        #[allow(clippy::as_conversions)]
668        (Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
669        // TODO(benesch): rewrite to avoid `as`.
670        #[allow(clippy::as_conversions)]
671        (Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
672        // This is so wrong, but sqlite needs it.
673        (Type::Integer, Value::Text(_)) => "0".to_string(),
674        (Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
675        (Type::Integer, Value::Numeric(d)) => {
676            let mut d = d.0.0.clone();
677            let mut cx = numeric::cx_datum();
678            // Truncate the decimal to match sqlite.
679            if mode == Mode::Standard {
680                cx.set_rounding(dec::Rounding::Down);
681            }
682            cx.round(&mut d);
683            numeric::munge_numeric(&mut d).unwrap();
684            d.to_standard_notation_string()
685        }
686
687        (Type::Real, Value::Int2(i)) => format!("{:.3}", i),
688        (Type::Real, Value::Int4(i)) => format!("{:.3}", i),
689        (Type::Real, Value::Int8(i)) => format!("{:.3}", i),
690        (Type::Real, Value::Float4(f)) => match mode {
691            Mode::Standard => format!("{:.3}", f),
692            Mode::Cockroach => format!("{}", f),
693        },
694        (Type::Real, Value::Float8(f)) => match mode {
695            Mode::Standard => format!("{:.3}", f),
696            Mode::Cockroach => format!("{}", f),
697        },
698        (Type::Real, Value::Numeric(d)) => match mode {
699            Mode::Standard => {
700                let mut d = d.0.0.clone();
701                if d.exponent() < -3 {
702                    numeric::rescale(&mut d, 3).unwrap();
703                }
704                numeric::munge_numeric(&mut d).unwrap();
705                d.to_standard_notation_string()
706            }
707            Mode::Cockroach => d.0.0.to_standard_notation_string(),
708        },
709
710        (Type::Text, Value::Text(s)) => {
711            if s.is_empty() {
712                "(empty)".to_string()
713            } else {
714                s
715            }
716        }
717        (Type::Text, Value::Bool(b)) => b.to_string(),
718        (Type::Text, Value::Float4(f)) => format!("{:.3}", f),
719        (Type::Text, Value::Float8(f)) => format!("{:.3}", f),
720        // Bytes are printed as text iff they are valid UTF-8. This
721        // seems guaranteed to confuse everyone, but it is required for
722        // compliance with the CockroachDB sqllogictest runner. [0]
723        //
724        // [0]: https://github.com/cockroachdb/cockroach/blob/970782487/pkg/sql/logictest/logic.go#L2038-L2043
725        (Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
726            Ok(s) => s.to_string(),
727            Err(_) => format!("{:?}", b),
728        },
729        (Type::Text, Value::Numeric(d)) => d.0.0.to_standard_notation_string(),
730        // Everything else gets normal text encoding. This correctly handles things
731        // like arrays, tuples, and strings that need to be quoted.
732        (Type::Text, d) => {
733            let mut buf = BytesMut::new();
734            d.encode_text(&mut buf);
735            String::from_utf8_lossy(&buf).into_owned()
736        }
737
738        (Type::Oid, Value::Oid(o)) => o.to_string(),
739
740        (_, d) => panic!(
741            "Don't know how to format {:?} as {:?} in column {}",
742            d, typ, col,
743        ),
744    }
745}
746
747fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
748    let mut formatted: Vec<String> = vec![];
749    for i in 0..row.len() {
750        let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
751        let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
752        formatted.push(match t {
753            Some(t) => t,
754            None => "NULL".into(),
755        });
756    }
757
758    formatted
759}
760
761impl<'a> Runner<'a> {
762    pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
763        let mut runner = Self {
764            config,
765            inner: None,
766        };
767        runner.reset().await?;
768        Ok(runner)
769    }
770
771    pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
772        // Explicitly drop the old runner here to ensure that we wait for threads to terminate
773        // before starting a new runner
774        drop(self.inner.take());
775        self.inner = Some(RunnerInner::start(self.config).await?);
776
777        Ok(())
778    }
779
780    async fn run_record<'r>(
781        &mut self,
782        record: &'r Record<'r>,
783        in_transaction: &mut bool,
784    ) -> Result<Outcome<'r>, anyhow::Error> {
785        if let Record::ResetServer = record {
786            self.reset().await?;
787            Ok(Outcome::Success)
788        } else {
789            self.inner
790                .as_mut()
791                .expect("RunnerInner missing")
792                .run_record(record, in_transaction)
793                .await
794        }
795    }
796
797    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
798        self.inner
799            .as_ref()
800            .expect("RunnerInner missing")
801            .check_catalog()
802            .await
803    }
804
805    async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
806        let inner = self.inner.as_mut().expect("RunnerInner missing");
807
808        inner.client.batch_execute("ROLLBACK;").await?;
809
810        inner
811            .system_client
812            .batch_execute(
813                "ROLLBACK;
814                 SET cluster = mz_catalog_server;
815                 RESET cluster_replica;",
816            )
817            .await?;
818
819        inner
820            .system_client
821            .batch_execute("ALTER SYSTEM RESET ALL")
822            .await?;
823
824        // Drop all databases, then recreate the `materialize` database.
825        for row in inner
826            .system_client
827            .query("SELECT name FROM mz_databases", &[])
828            .await?
829        {
830            let name: &str = row.get("name");
831            inner
832                .system_client
833                .batch_execute(&format!("DROP DATABASE \"{name}\""))
834                .await?;
835        }
836        inner
837            .system_client
838            .batch_execute("CREATE DATABASE materialize")
839            .await?;
840
841        // Ensure quickstart cluster exists with one replica of size `self.config.replica_size`.
842        // We don't destroy the existing quickstart cluster replica if it exists, as turning
843        // on a cluster replica is exceptionally slow.
844        let mut needs_default_cluster = true;
845        for row in inner
846            .system_client
847            .query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
848            .await?
849        {
850            match row.get("name") {
851                "quickstart" => needs_default_cluster = false,
852                name => {
853                    inner
854                        .system_client
855                        .batch_execute(&format!("DROP CLUSTER {name}"))
856                        .await?
857                }
858            }
859        }
860        if needs_default_cluster {
861            inner
862                .system_client
863                .batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
864                .await?;
865        }
866        let mut needs_default_replica = false;
867        let rows = inner
868            .system_client
869            .query(
870                "SELECT name, size FROM mz_cluster_replicas
871                 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')
872                 ORDER BY name",
873                &[],
874            )
875            .await?;
876        if rows.len() != self.config.replicas {
877            needs_default_replica = true;
878        } else {
879            for (i, row) in rows.iter().enumerate() {
880                let name: &str = row.get("name");
881                let size: &str = row.get("size");
882                if name != format!("r{i}") || size != self.config.replica_size {
883                    needs_default_replica = true;
884                    break;
885                }
886            }
887        }
888
889        if needs_default_replica {
890            inner
891                .system_client
892                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = false)")
893                .await?;
894            for row in inner
895                .system_client
896                .query(
897                    "SELECT name FROM mz_cluster_replicas
898                     WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
899                    &[],
900                )
901                .await?
902            {
903                let name: &str = row.get("name");
904                inner
905                    .system_client
906                    .batch_execute(&format!("DROP CLUSTER REPLICA quickstart.{}", name))
907                    .await?;
908            }
909            for i in 1..=self.config.replicas {
910                inner
911                    .system_client
912                    .batch_execute(&format!(
913                        "CREATE CLUSTER REPLICA quickstart.r{i} SIZE '{}'",
914                        self.config.replica_size
915                    ))
916                    .await?;
917            }
918            inner
919                .system_client
920                .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = true)")
921                .await?;
922        }
923
924        // Grant initial privileges.
925        inner
926            .system_client
927            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
928            .await?;
929        inner
930            .system_client
931            .batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
932            .await?;
933        inner
934            .system_client
935            .batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
936            .await?;
937        inner
938            .system_client
939            .batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
940            .await?;
941        inner
942            .system_client
943            .batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
944            .await?;
945
946        // Some sqllogictests require more than the default amount of tables, so we increase the
947        // limit for all tests.
948        inner
949            .system_client
950            .simple_query("ALTER SYSTEM SET max_tables = 100")
951            .await?;
952
953        if inner.enable_table_keys {
954            inner
955                .system_client
956                .simple_query("ALTER SYSTEM SET unsafe_enable_table_keys = true")
957                .await?;
958        }
959
960        inner.ensure_fixed_features().await?;
961
962        inner.client = connect(inner.server_addr, None, None).await.unwrap();
963        inner.system_client = connect(inner.internal_server_addr, Some("mz_system"), None)
964            .await
965            .unwrap();
966        inner.clients = BTreeMap::new();
967
968        Ok(())
969    }
970}
971
972impl<'a> RunnerInner<'a> {
973    pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
974        let temp_dir = tempfile::tempdir()?;
975        let scratch_dir = tempfile::tempdir()?;
976        let environment_id = EnvironmentId::for_tests();
977        let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
978            let postgres_url = &config.postgres_url;
979            let prefix = &config.prefix;
980            info!(%postgres_url, "starting server");
981            let (client, conn) = Retry::default()
982                .max_tries(5)
983                .retry_async(|_| async {
984                    match tokio_postgres::connect(postgres_url, NoTls).await {
985                        Ok(c) => Ok(c),
986                        Err(e) => {
987                            error!(%e, "failed to connect to postgres");
988                            Err(e)
989                        }
990                    }
991                })
992                .await?;
993            task::spawn(|| "sqllogictest_connect", async move {
994                if let Err(e) = conn.await {
995                    panic!("connection error: {}", e);
996                }
997            });
998            client
999                .batch_execute(&format!(
1000                    "DROP SCHEMA IF EXISTS {prefix}_tsoracle CASCADE;
1001                     CREATE SCHEMA IF NOT EXISTS {prefix}_consensus;
1002                     CREATE SCHEMA {prefix}_tsoracle;"
1003                ))
1004                .await?;
1005            (
1006                format!("{postgres_url}?options=--search_path={prefix}_consensus")
1007                    .parse()
1008                    .expect("invalid consensus URI"),
1009                format!("{postgres_url}?options=--search_path={prefix}_tsoracle")
1010                    .parse()
1011                    .expect("invalid timestamp oracle URI"),
1012            )
1013        };
1014
1015        let secrets_dir = temp_dir.path().join("secrets");
1016        let orchestrator = Arc::new(
1017            ProcessOrchestrator::new(ProcessOrchestratorConfig {
1018                image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
1019                suppress_output: false,
1020                environment_id: environment_id.to_string(),
1021                secrets_dir: secrets_dir.clone(),
1022                command_wrapper: config
1023                    .orchestrator_process_wrapper
1024                    .as_ref()
1025                    .map_or(Ok(vec![]), |s| shell_words::split(s))?,
1026                propagate_crashes: true,
1027                tcp_proxy: None,
1028                scratch_directory: scratch_dir.path().to_path_buf(),
1029            })
1030            .await?,
1031        );
1032        let now = SYSTEM_TIME.clone();
1033        let metrics_registry = MetricsRegistry::new();
1034
1035        let persist_config = PersistConfig::new(
1036            &mz_environmentd::BUILD_INFO,
1037            now.clone(),
1038            mz_dyncfgs::all_dyncfgs(),
1039        );
1040        let persist_pubsub_server =
1041            PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
1042        let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
1043        let persist_pubsub_tcp_listener =
1044            TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
1045                .await
1046                .expect("pubsub addr binding");
1047        let persist_pubsub_server_port = persist_pubsub_tcp_listener
1048            .local_addr()
1049            .expect("pubsub addr has local addr")
1050            .port();
1051        info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
1052        mz_ore::task::spawn(|| "persist_pubsub_server", async move {
1053            persist_pubsub_server
1054                .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
1055                .await
1056                .expect("success")
1057        });
1058        let persist_clients =
1059            PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1060                let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1061                    cfg,
1062                    persist_pubsub_client.sender,
1063                    metrics,
1064                ));
1065                PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1066            });
1067        let persist_clients = Arc::new(persist_clients);
1068
1069        let secrets_controller = Arc::clone(&orchestrator);
1070        let connection_context = ConnectionContext::for_tests(orchestrator.reader());
1071        let orchestrator = Arc::new(TracingOrchestrator::new(
1072            orchestrator,
1073            config.tracing.clone(),
1074        ));
1075        let listeners_config = ListenersConfig {
1076            sql: btreemap! {
1077                "external".to_owned() => SqlListenerConfig {
1078                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1079                    authenticator_kind: AuthenticatorKind::None,
1080                    allowed_roles: AllowedRoles::Normal,
1081                    enable_tls: false,
1082                },
1083                "internal".to_owned() => SqlListenerConfig {
1084                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1085                    authenticator_kind: AuthenticatorKind::None,
1086                    allowed_roles: AllowedRoles::Internal,
1087                    enable_tls: false,
1088                },
1089                "password".to_owned() => SqlListenerConfig {
1090                    addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1091                    authenticator_kind: AuthenticatorKind::Password,
1092                    allowed_roles: AllowedRoles::Normal,
1093                    enable_tls: false,
1094                },
1095            },
1096            http: btreemap![
1097                "external".to_owned() => HttpListenerConfig {
1098                    base: BaseListenerConfig {
1099                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1100                        authenticator_kind: AuthenticatorKind::None,
1101                        allowed_roles: AllowedRoles::Normal,
1102                        enable_tls: false
1103                    },
1104                    routes: HttpRoutesEnabled {
1105                        base: true,
1106                        webhook: true,
1107                        internal: false,
1108                        metrics: false,
1109                        profiling: false,
1110                    },
1111                },
1112                "internal".to_owned() => HttpListenerConfig {
1113                    base: BaseListenerConfig {
1114                        addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1115                        authenticator_kind: AuthenticatorKind::None,
1116                        allowed_roles: AllowedRoles::NormalAndInternal,
1117                        enable_tls: false
1118                    },
1119                    routes: HttpRoutesEnabled {
1120                        base: true,
1121                        webhook: true,
1122                        internal: true,
1123                        metrics: true,
1124                        profiling: true,
1125                    },
1126                },
1127            ],
1128        };
1129        let listeners = mz_environmentd::Listeners::bind(listeners_config).await?;
1130        let host_name = format!(
1131            "localhost:{}",
1132            listeners.http["external"].handle.local_addr.port()
1133        );
1134        let catalog_config = CatalogConfig {
1135            persist_clients: Arc::clone(&persist_clients),
1136            metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
1137        };
1138        let server_config = mz_environmentd::Config {
1139            catalog_config,
1140            timestamp_oracle_url: Some(timestamp_oracle_url),
1141            controller: ControllerConfig {
1142                build_info: &mz_environmentd::BUILD_INFO,
1143                orchestrator,
1144                clusterd_image: "clusterd".into(),
1145                init_container_image: None,
1146                deploy_generation: 0,
1147                persist_location: PersistLocation {
1148                    blob_uri: format!(
1149                        "file://{}/persist/blob",
1150                        config.persist_dir.path().display()
1151                    )
1152                    .parse()
1153                    .expect("invalid blob URI"),
1154                    consensus_uri,
1155                },
1156                persist_clients,
1157                now: SYSTEM_TIME.clone(),
1158                metrics_registry: metrics_registry.clone(),
1159                persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
1160                secrets_args: mz_service::secrets::SecretsReaderCliArgs {
1161                    secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
1162                    secrets_reader_local_file_dir: Some(secrets_dir),
1163                    secrets_reader_kubernetes_context: None,
1164                    secrets_reader_aws_prefix: None,
1165                    secrets_reader_name_prefix: None,
1166                },
1167                connection_context,
1168            },
1169            secrets_controller,
1170            cloud_resource_controller: None,
1171            tls: None,
1172            frontegg: None,
1173            cors_allowed_origin: AllowOrigin::list([]),
1174            unsafe_mode: true,
1175            all_features: false,
1176            metrics_registry,
1177            now,
1178            environment_id,
1179            cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
1180            bootstrap_default_cluster_replica_size: config.replica_size.clone(),
1181            bootstrap_default_cluster_replication_factor: config
1182                .replicas
1183                .try_into()
1184                .expect("replicas must fit"),
1185            bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1186                replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1187                size: config.replica_size.clone(),
1188            },
1189            bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1190                replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1191                size: config.replica_size.clone(),
1192            },
1193            bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1194                replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1195                size: config.replica_size.clone(),
1196            },
1197            bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1198                replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1199                size: config.replica_size.clone(),
1200            },
1201            bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1202                replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1203                size: config.replica_size.clone(),
1204            },
1205            system_parameter_defaults: {
1206                let mut params = BTreeMap::new();
1207                params.insert(
1208                    "log_filter".to_string(),
1209                    config.tracing.startup_log_filter.to_string(),
1210                );
1211                params.extend(config.system_parameter_defaults.clone());
1212                params
1213            },
1214            availability_zones: Default::default(),
1215            tracing_handle: config.tracing_handle.clone(),
1216            storage_usage_collection_interval: Duration::from_secs(3600),
1217            storage_usage_retention_period: None,
1218            segment_api_key: None,
1219            segment_client_side: false,
1220            // SLT doesn't like eternally running tasks since it waits for them to finish inbetween SLT files
1221            test_only_dummy_segment_client: false,
1222            egress_addresses: vec![],
1223            aws_account_id: None,
1224            aws_privatelink_availability_zones: None,
1225            launchdarkly_sdk_key: None,
1226            launchdarkly_key_map: Default::default(),
1227            config_sync_file_path: None,
1228            config_sync_timeout: Duration::from_secs(30),
1229            config_sync_loop_interval: None,
1230            bootstrap_role: Some("materialize".into()),
1231            http_host_name: Some(host_name),
1232            internal_console_redirect_url: None,
1233            tls_reload_certs: mz_server_core::cert_reload_never_reload(),
1234            helm_chart_version: None,
1235            license_key: ValidatedLicenseKey::for_tests(),
1236            external_login_password_mz_system: None,
1237            force_builtin_schema_migration: None,
1238        };
1239        // We need to run the server on its own Tokio runtime, which in turn
1240        // requires its own thread, so that we can wait for any tasks spawned
1241        // by the server to be shutdown at the end of each file. If we were to
1242        // share a Tokio runtime, tasks from the last file's server would still
1243        // be live at the start of the next file's server.
1244        let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
1245            oneshot::channel();
1246        let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
1247        let (password_server_addr_tx, password_server_addr_rx) = oneshot::channel();
1248        let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
1249        let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
1250        let server_thread = thread::spawn(|| {
1251            let runtime = match Runtime::new() {
1252                Ok(runtime) => runtime,
1253                Err(e) => {
1254                    server_addr_tx
1255                        .send(Err(e.into()))
1256                        .expect("receiver should not drop first");
1257                    return;
1258                }
1259            };
1260            let server = match runtime.block_on(listeners.serve(server_config)) {
1261                Ok(runtime) => runtime,
1262                Err(e) => {
1263                    server_addr_tx
1264                        .send(Err(e.into()))
1265                        .expect("receiver should not drop first");
1266                    return;
1267                }
1268            };
1269            server_addr_tx
1270                .send(Ok(server.sql_listener_handles["external"].local_addr))
1271                .expect("receiver should not drop first");
1272            internal_server_addr_tx
1273                .send(server.sql_listener_handles["internal"].local_addr)
1274                .expect("receiver should not drop first");
1275            password_server_addr_tx
1276                .send(server.sql_listener_handles["password"].local_addr)
1277                .expect("receiver should not drop first");
1278            internal_http_server_addr_tx
1279                .send(server.http_listener_handles["internal"].local_addr)
1280                .expect("receiver should not drop first");
1281            let _ = runtime.block_on(shutdown_trigger_rx);
1282        });
1283        let server_addr = server_addr_rx.await??;
1284        let internal_server_addr = internal_server_addr_rx.await?;
1285        let password_server_addr = password_server_addr_rx.await?;
1286        let internal_http_server_addr = internal_http_server_addr_rx.await?;
1287
1288        let system_client = connect(internal_server_addr, Some("mz_system"), None)
1289            .await
1290            .unwrap();
1291        let client = connect(server_addr, None, None).await.unwrap();
1292
1293        let inner = RunnerInner {
1294            server_addr,
1295            internal_server_addr,
1296            password_server_addr,
1297            internal_http_server_addr,
1298            _shutdown_trigger: shutdown_trigger,
1299            _server_thread: server_thread.join_on_drop(),
1300            _temp_dir: temp_dir,
1301            client,
1302            system_client,
1303            clients: BTreeMap::new(),
1304            auto_index_tables: config.auto_index_tables,
1305            auto_index_selects: config.auto_index_selects,
1306            auto_transactions: config.auto_transactions,
1307            enable_table_keys: config.enable_table_keys,
1308            verbosity: config.verbosity,
1309            stdout: config.stdout,
1310        };
1311        inner.ensure_fixed_features().await?;
1312
1313        Ok(inner)
1314    }
1315
1316    /// Set features that should be enabled regardless of whether reset-server was
1317    /// called. These features may be set conditionally depending on the run configuration.
1318    async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
1319        // We turn on enable_reduce_mfp_fusion, as we wish
1320        // to get as much coverage of these features as we can.
1321        // TODO(vmarcos): Remove this code when we retire this feature flag.
1322        self.system_client
1323            .execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
1324            .await?;
1325
1326        // Dangerous functions are useful for tests so we enable it for all tests.
1327        self.system_client
1328            .execute("ALTER SYSTEM SET unsafe_enable_unsafe_functions = on", &[])
1329            .await?;
1330        Ok(())
1331    }
1332
1333    async fn run_record<'r>(
1334        &mut self,
1335        record: &'r Record<'r>,
1336        in_transaction: &mut bool,
1337    ) -> Result<Outcome<'r>, anyhow::Error> {
1338        match &record {
1339            Record::Statement {
1340                expected_error,
1341                rows_affected,
1342                sql,
1343                location,
1344            } => {
1345                if self.auto_transactions && *in_transaction {
1346                    self.client.execute("COMMIT", &[]).await?;
1347                    *in_transaction = false;
1348                }
1349                match self
1350                    .run_statement(*expected_error, *rows_affected, sql, location.clone())
1351                    .await?
1352                {
1353                    Outcome::Success => {
1354                        if self.auto_index_tables {
1355                            let additional = mutate(sql);
1356                            for stmt in additional {
1357                                self.client.execute(&stmt, &[]).await?;
1358                            }
1359                        }
1360                        Ok(Outcome::Success)
1361                    }
1362                    other => {
1363                        if expected_error.is_some() {
1364                            Ok(other)
1365                        } else {
1366                            // If we failed to execute a statement that was supposed to succeed,
1367                            // running the rest of the tests in this file will probably cause
1368                            // false positives, so just give up on the file entirely.
1369                            Ok(Outcome::Bail {
1370                                cause: Box::new(other),
1371                                location: location.clone(),
1372                            })
1373                        }
1374                    }
1375                }
1376            }
1377            Record::Query {
1378                sql,
1379                output,
1380                location,
1381            } => {
1382                self.run_query(sql, output, location.clone(), in_transaction)
1383                    .await
1384            }
1385            Record::Simple {
1386                conn,
1387                user,
1388                password,
1389                sql,
1390                sort,
1391                output,
1392                location,
1393                ..
1394            } => {
1395                self.run_simple(
1396                    *conn,
1397                    *user,
1398                    *password,
1399                    sql,
1400                    sort.clone(),
1401                    output,
1402                    location.clone(),
1403                )
1404                .await
1405            }
1406            Record::Copy {
1407                table_name,
1408                tsv_path,
1409            } => {
1410                let tsv = tokio::fs::read(tsv_path).await?;
1411                let copy = self
1412                    .client
1413                    .copy_in(&*format!("COPY {} FROM STDIN", table_name))
1414                    .await?;
1415                tokio::pin!(copy);
1416                copy.send(bytes::Bytes::from(tsv)).await?;
1417                copy.finish().await?;
1418                Ok(Outcome::Success)
1419            }
1420            _ => Ok(Outcome::Success),
1421        }
1422    }
1423
1424    async fn run_statement<'r>(
1425        &self,
1426        expected_error: Option<&'r str>,
1427        expected_rows_affected: Option<u64>,
1428        sql: &'r str,
1429        location: Location,
1430    ) -> Result<Outcome<'r>, anyhow::Error> {
1431        static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
1432            LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
1433        if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
1434            // sure, we totally made you an index
1435            return Ok(Outcome::Success);
1436        }
1437
1438        match self.client.execute(sql, &[]).await {
1439            Ok(actual) => {
1440                if let Some(expected_error) = expected_error {
1441                    return Ok(Outcome::UnexpectedPlanSuccess {
1442                        expected_error,
1443                        location,
1444                    });
1445                }
1446                match expected_rows_affected {
1447                    None => Ok(Outcome::Success),
1448                    Some(expected) => {
1449                        if expected != actual {
1450                            Ok(Outcome::WrongNumberOfRowsInserted {
1451                                expected_count: expected,
1452                                actual_count: actual,
1453                                location,
1454                            })
1455                        } else {
1456                            Ok(Outcome::Success)
1457                        }
1458                    }
1459                }
1460            }
1461            Err(error) => {
1462                if let Some(expected_error) = expected_error {
1463                    if Regex::new(expected_error)?.is_match(&error.to_string_with_causes()) {
1464                        return Ok(Outcome::Success);
1465                    }
1466                }
1467                Ok(Outcome::PlanFailure {
1468                    error: anyhow!(error),
1469                    location,
1470                })
1471            }
1472        }
1473    }
1474
1475    async fn prepare_query<'r>(
1476        &self,
1477        sql: &str,
1478        output: &'r Result<QueryOutput<'_>, &'r str>,
1479        location: Location,
1480        in_transaction: &mut bool,
1481    ) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
1482        // get statement
1483        let statements = match mz_sql::parse::parse(sql) {
1484            Ok(statements) => statements,
1485            Err(e) => match output {
1486                Ok(_) => {
1487                    return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1488                        error: e.into(),
1489                        location,
1490                    }));
1491                }
1492                Err(expected_error) => {
1493                    if Regex::new(expected_error)?.is_match(&e.to_string_with_causes()) {
1494                        return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
1495                    } else {
1496                        return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1497                            error: e.into(),
1498                            location,
1499                        }));
1500                    }
1501                }
1502            },
1503        };
1504        let statement = match &*statements {
1505            [] => bail!("Got zero statements?"),
1506            [statement] => &statement.ast,
1507            _ => bail!("Got multiple statements: {:?}", statements),
1508        };
1509        let (is_select, num_attributes) = match statement {
1510            Statement::Select(stmt) => (true, derive_num_attributes(&stmt.query.body)),
1511            _ => (false, None),
1512        };
1513
1514        match output {
1515            Ok(_) => {
1516                if self.auto_transactions && !*in_transaction {
1517                    // No ISOLATION LEVEL SERIALIZABLE because of database-issues#5323
1518                    self.client.execute("BEGIN", &[]).await?;
1519                    *in_transaction = true;
1520                }
1521            }
1522            Err(_) => {
1523                if self.auto_transactions && *in_transaction {
1524                    self.client.execute("COMMIT", &[]).await?;
1525                    *in_transaction = false;
1526                }
1527            }
1528        }
1529
1530        // `SHOW` commands reference catalog schema, thus are not in the same timedomain and not
1531        // allowed in the same transaction, see:
1532        // https://materialize.com/docs/sql/begin/#same-timedomain-error
1533        match statement {
1534            Statement::Show(..) => {
1535                if self.auto_transactions && *in_transaction {
1536                    self.client.execute("COMMIT", &[]).await?;
1537                    *in_transaction = false;
1538                }
1539            }
1540            _ => (),
1541        }
1542        Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
1543            is_select,
1544            num_attributes,
1545        }))
1546    }
1547
1548    async fn execute_query<'r>(
1549        &self,
1550        sql: &str,
1551        output: &'r Result<QueryOutput<'_>, &'r str>,
1552        location: Location,
1553    ) -> Result<Outcome<'r>, anyhow::Error> {
1554        let rows = match self.client.query(sql, &[]).await {
1555            Ok(rows) => rows,
1556            Err(error) => {
1557                let error_string = error.to_string_with_causes();
1558                return match output {
1559                    Ok(_) => {
1560                        if error_string.contains("supported") || error_string.contains("overload") {
1561                            // this is a failure, but it's caused by lack of support rather than by bugs
1562                            Ok(Outcome::Unsupported {
1563                                error: anyhow!(error),
1564                                location,
1565                            })
1566                        } else {
1567                            Ok(Outcome::PlanFailure {
1568                                error: anyhow!(error),
1569                                location,
1570                            })
1571                        }
1572                    }
1573                    Err(expected_error) => {
1574                        if Regex::new(expected_error)?.is_match(&error_string) {
1575                            Ok(Outcome::Success)
1576                        } else {
1577                            Ok(Outcome::PlanFailure {
1578                                error: anyhow!(error),
1579                                location,
1580                            })
1581                        }
1582                    }
1583                };
1584            }
1585        };
1586
1587        // unpack expected output
1588        let QueryOutput {
1589            sort,
1590            types: expected_types,
1591            column_names: expected_column_names,
1592            output: expected_output,
1593            mode,
1594            ..
1595        } = match output {
1596            Err(expected_error) => {
1597                return Ok(Outcome::UnexpectedPlanSuccess {
1598                    expected_error,
1599                    location,
1600                });
1601            }
1602            Ok(query_output) => query_output,
1603        };
1604
1605        // format output
1606        let mut formatted_rows = vec![];
1607        for row in &rows {
1608            if row.len() != expected_types.len() {
1609                return Ok(Outcome::WrongColumnCount {
1610                    expected_count: expected_types.len(),
1611                    actual_count: row.len(),
1612                    location,
1613                });
1614            }
1615            let row = format_row(row, expected_types, *mode);
1616            formatted_rows.push(row);
1617        }
1618
1619        // sort formatted output
1620        if let Sort::Row = sort {
1621            formatted_rows.sort();
1622        }
1623        let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
1624        if let Sort::Value = sort {
1625            values.sort();
1626        }
1627
1628        // Various checks as long as there are returned rows.
1629        if let Some(row) = rows.get(0) {
1630            // check column names
1631            if let Some(expected_column_names) = expected_column_names {
1632                let actual_column_names = row
1633                    .columns()
1634                    .iter()
1635                    .map(|t| ColumnName::from(t.name()))
1636                    .collect::<Vec<_>>();
1637                if expected_column_names != &actual_column_names {
1638                    return Ok(Outcome::WrongColumnNames {
1639                        expected_column_names,
1640                        actual_column_names,
1641                        actual_output: Output::Values(values),
1642                        location,
1643                    });
1644                }
1645            }
1646        }
1647
1648        // check output
1649        match expected_output {
1650            Output::Values(expected_values) => {
1651                if values != *expected_values {
1652                    return Ok(Outcome::OutputFailure {
1653                        expected_output,
1654                        actual_raw_output: rows,
1655                        actual_output: Output::Values(values),
1656                        location,
1657                    });
1658                }
1659            }
1660            Output::Hashed {
1661                num_values,
1662                md5: expected_md5,
1663            } => {
1664                let mut hasher = Md5::new();
1665                for value in &values {
1666                    hasher.update(value);
1667                    hasher.update("\n");
1668                }
1669                let md5 = format!("{:x}", hasher.finalize());
1670                if values.len() != *num_values || md5 != *expected_md5 {
1671                    return Ok(Outcome::OutputFailure {
1672                        expected_output,
1673                        actual_raw_output: rows,
1674                        actual_output: Output::Hashed {
1675                            num_values: values.len(),
1676                            md5,
1677                        },
1678                        location,
1679                    });
1680                }
1681            }
1682        }
1683
1684        Ok(Outcome::Success)
1685    }
1686
1687    async fn execute_view_inner<'r>(
1688        &self,
1689        sql: &str,
1690        output: &'r Result<QueryOutput<'_>, &'r str>,
1691        location: Location,
1692    ) -> Result<Option<Outcome<'r>>, anyhow::Error> {
1693        print_sql_if(self.stdout, sql, self.verbosity >= 2);
1694        let sql_result = self.client.execute(sql, &[]).await;
1695
1696        // Evaluate if we already reached an outcome or not.
1697        let tentative_outcome = if let Err(view_error) = sql_result {
1698            if let Err(expected_error) = output {
1699                if Regex::new(expected_error)?.is_match(&view_error.to_string_with_causes()) {
1700                    Some(Outcome::Success)
1701                } else {
1702                    Some(Outcome::PlanFailure {
1703                        error: view_error.into(),
1704                        location: location.clone(),
1705                    })
1706                }
1707            } else {
1708                Some(Outcome::PlanFailure {
1709                    error: view_error.into(),
1710                    location: location.clone(),
1711                })
1712            }
1713        } else {
1714            None
1715        };
1716        Ok(tentative_outcome)
1717    }
1718
1719    async fn execute_view<'r>(
1720        &self,
1721        sql: &str,
1722        num_attributes: Option<usize>,
1723        output: &'r Result<QueryOutput<'_>, &'r str>,
1724        location: Location,
1725    ) -> Result<Outcome<'r>, anyhow::Error> {
1726        // Create indexed view SQL commands and execute `CREATE VIEW`.
1727        let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
1728            column_names.clone()
1729        } else {
1730            None
1731        };
1732        let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
1733            sql,
1734            Uuid::new_v4().as_simple(),
1735            num_attributes,
1736            expected_column_names,
1737        );
1738        let tentative_outcome = self
1739            .execute_view_inner(create_view.as_str(), output, location.clone())
1740            .await?;
1741
1742        // Either we already have an outcome or alternatively,
1743        // we proceed to index and query the view.
1744        if let Some(view_outcome) = tentative_outcome {
1745            return Ok(view_outcome);
1746        }
1747
1748        let tentative_outcome = self
1749            .execute_view_inner(create_index.as_str(), output, location.clone())
1750            .await?;
1751
1752        let view_outcome;
1753        if let Some(outcome) = tentative_outcome {
1754            view_outcome = outcome;
1755        } else {
1756            print_sql_if(self.stdout, view_sql.as_str(), self.verbosity >= 2);
1757            view_outcome = self
1758                .execute_query(view_sql.as_str(), output, location.clone())
1759                .await?;
1760        }
1761
1762        // Remember to clean up after ourselves by dropping the view.
1763        print_sql_if(self.stdout, drop_view.as_str(), self.verbosity >= 2);
1764        self.client.execute(drop_view.as_str(), &[]).await?;
1765
1766        Ok(view_outcome)
1767    }
1768
1769    async fn run_query<'r>(
1770        &self,
1771        sql: &'r str,
1772        output: &'r Result<QueryOutput<'_>, &'r str>,
1773        location: Location,
1774        in_transaction: &mut bool,
1775    ) -> Result<Outcome<'r>, anyhow::Error> {
1776        let prepare_outcome = self
1777            .prepare_query(sql, output, location.clone(), in_transaction)
1778            .await?;
1779        match prepare_outcome {
1780            PrepareQueryOutcome::QueryPrepared(QueryInfo {
1781                is_select,
1782                num_attributes,
1783            }) => {
1784                let query_outcome = self.execute_query(sql, output, location.clone()).await?;
1785                if is_select && self.auto_index_selects {
1786                    let view_outcome = self
1787                        .execute_view(sql, None, output, location.clone())
1788                        .await?;
1789
1790                    // We compare here the query-based and view-based outcomes.
1791                    // We only produce a test failure if the outcomes are of different
1792                    // variant types, thus accepting smaller deviations in the details
1793                    // produced for each variant.
1794                    if std::mem::discriminant::<Outcome>(&query_outcome)
1795                        != std::mem::discriminant::<Outcome>(&view_outcome)
1796                    {
1797                        // Before producing a failure outcome, we try to obtain a new
1798                        // outcome for view-based execution exploiting analysis of the
1799                        // number of attributes. This two-level strategy can avoid errors
1800                        // produced by column ambiguity in the `SELECT`.
1801                        let view_outcome = if num_attributes.is_some() {
1802                            self.execute_view(sql, num_attributes, output, location.clone())
1803                                .await?
1804                        } else {
1805                            view_outcome
1806                        };
1807
1808                        if std::mem::discriminant::<Outcome>(&query_outcome)
1809                            != std::mem::discriminant::<Outcome>(&view_outcome)
1810                        {
1811                            let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
1812                                query_outcome: Box::new(query_outcome),
1813                                view_outcome: Box::new(view_outcome),
1814                                location: location.clone(),
1815                            };
1816                            // Determine if this inconsistent view outcome should be reported
1817                            // as an error or only as a warning.
1818                            let outcome = if should_warn(&inconsistent_view_outcome) {
1819                                Outcome::Warning {
1820                                    cause: Box::new(inconsistent_view_outcome),
1821                                    location: location.clone(),
1822                                }
1823                            } else {
1824                                inconsistent_view_outcome
1825                            };
1826                            return Ok(outcome);
1827                        }
1828                    }
1829                }
1830                Ok(query_outcome)
1831            }
1832            PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
1833        }
1834    }
1835
1836    async fn get_conn(
1837        &mut self,
1838        name: Option<&str>,
1839        user: Option<&str>,
1840        password: Option<&str>,
1841    ) -> Result<&tokio_postgres::Client, tokio_postgres::Error> {
1842        match name {
1843            None => Ok(&self.client),
1844            Some(name) => {
1845                if !self.clients.contains_key(name) {
1846                    let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
1847                        self.internal_server_addr
1848                    } else if password.is_some() {
1849                        // Use password server for password authentication
1850                        self.password_server_addr
1851                    } else {
1852                        self.server_addr
1853                    };
1854                    let client = connect(addr, user, password).await?;
1855                    self.clients.insert(name.into(), client);
1856                }
1857                Ok(self.clients.get(name).unwrap())
1858            }
1859        }
1860    }
1861
1862    async fn run_simple<'r>(
1863        &mut self,
1864        conn: Option<&'r str>,
1865        user: Option<&'r str>,
1866        password: Option<&'r str>,
1867        sql: &'r str,
1868        sort: Sort,
1869        output: &'r Output,
1870        location: Location,
1871    ) -> Result<Outcome<'r>, anyhow::Error> {
1872        let actual = match self.get_conn(conn, user, password).await {
1873            Ok(client) => match client.simple_query(sql).await {
1874                Ok(result) => {
1875                    let mut rows = Vec::new();
1876
1877                    for m in result.into_iter() {
1878                        match m {
1879                            SimpleQueryMessage::Row(row) => {
1880                                let mut s = vec![];
1881                                for i in 0..row.len() {
1882                                    s.push(row.get(i).unwrap_or("NULL"));
1883                                }
1884                                rows.push(s.join(","));
1885                            }
1886                            SimpleQueryMessage::CommandComplete(count) => {
1887                                // This applies any sort on the COMPLETE line as
1888                                // well, but we do the same for the expected output.
1889                                rows.push(format!("COMPLETE {}", count));
1890                            }
1891                            SimpleQueryMessage::RowDescription(_) => {}
1892                            _ => panic!("unexpected"),
1893                        }
1894                    }
1895
1896                    if let Sort::Row = sort {
1897                        rows.sort();
1898                    }
1899
1900                    Output::Values(rows)
1901                }
1902                // Errors can contain multiple lines (say if there are details), and rewrite
1903                // sticks them each on their own line, so we need to split up the lines here to
1904                // each be its own String in the Vec.
1905                Err(error) => Output::Values(
1906                    error
1907                        .to_string_with_causes()
1908                        .lines()
1909                        .map(|s| s.to_string())
1910                        .collect(),
1911                ),
1912            },
1913            Err(error) => Output::Values(
1914                error
1915                    .to_string_with_causes()
1916                    .lines()
1917                    .map(|s| s.to_string())
1918                    .collect(),
1919            ),
1920        };
1921        if *output != actual {
1922            Ok(Outcome::OutputFailure {
1923                expected_output: output,
1924                actual_raw_output: vec![],
1925                actual_output: actual,
1926                location,
1927            })
1928        } else {
1929            Ok(Outcome::Success)
1930        }
1931    }
1932
1933    async fn check_catalog(&self) -> Result<(), anyhow::Error> {
1934        let url = format!(
1935            "http://{}/api/catalog/check",
1936            self.internal_http_server_addr
1937        );
1938        let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
1939
1940        if let Some(inconsistencies) = response.get("err") {
1941            let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
1942                .expect("serializing Value cannot fail");
1943            Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
1944        } else {
1945            Ok(())
1946        }
1947    }
1948}
1949
1950async fn connect(
1951    addr: SocketAddr,
1952    user: Option<&str>,
1953    password: Option<&str>,
1954) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
1955    let mut config = tokio_postgres::Config::new();
1956    config.host(addr.ip().to_string());
1957    config.port(addr.port());
1958    config.user(user.unwrap_or("materialize"));
1959    if let Some(password) = password {
1960        config.password(password);
1961    }
1962    let (client, connection) = config.connect(NoTls).await?;
1963
1964    task::spawn(|| "sqllogictest_connect", async move {
1965        if let Err(e) = connection.await {
1966            eprintln!("connection error: {}", e);
1967        }
1968    });
1969    Ok(client)
1970}
1971
1972pub trait WriteFmt {
1973    fn write_fmt(&self, fmt: fmt::Arguments<'_>);
1974}
1975
1976pub struct RunConfig<'a> {
1977    pub stdout: &'a dyn WriteFmt,
1978    pub stderr: &'a dyn WriteFmt,
1979    pub verbosity: u8,
1980    pub postgres_url: String,
1981    pub prefix: String,
1982    pub no_fail: bool,
1983    pub fail_fast: bool,
1984    pub auto_index_tables: bool,
1985    pub auto_index_selects: bool,
1986    pub auto_transactions: bool,
1987    pub enable_table_keys: bool,
1988    pub orchestrator_process_wrapper: Option<String>,
1989    pub tracing: TracingCliArgs,
1990    pub tracing_handle: TracingHandle,
1991    pub system_parameter_defaults: BTreeMap<String, String>,
1992    /// Persist state is handled specially because:
1993    /// - Persist background workers do not necessarily shut down immediately once the server is
1994    ///   shut down, and may panic if their storage is deleted out from under them.
1995    /// - It's safe for different databases to reference the same state: all data is scoped by UUID.
1996    pub persist_dir: TempDir,
1997    pub replicas: usize,
1998    pub replica_size: String,
1999}
2000
2001fn print_record(config: &RunConfig<'_>, record: &Record) {
2002    match record {
2003        Record::Statement { sql, .. } | Record::Query { sql, .. } => print_sql(config.stdout, sql),
2004        _ => (),
2005    }
2006}
2007
2008fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
2009    if cond {
2010        print_sql(stdout, sql)
2011    }
2012}
2013
2014fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str) {
2015    writeln!(stdout, "{}", crate::util::indent(sql, 4))
2016}
2017
2018/// Regular expressions for matching error messages that should force a plan failure
2019/// in an inconsistent view outcome into a warning if the corresponding query succeeds.
2020const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
2021    // The following are unfixable errors in indexed views given our
2022    // current constraints.
2023    "cannot materialize call to",
2024    "SHOW commands are not allowed in views",
2025    "cannot create view with unstable dependencies",
2026    "cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
2027    "no valid schema selected",
2028    r#"system schema '\w+' cannot be modified"#,
2029    r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
2030    // NOTE(vmarcos): Column ambiguity that could not be eliminated by our
2031    // currently implemented syntactic rewrites is considered unfixable.
2032    // In addition, if some column cannot be dealt with, e.g., in `ORDER BY`
2033    // references, we treat this condition as unfixable as well.
2034    r#"column "[\w\?]+" specified more than once"#,
2035    r#"column "(\w+\.)?\w+" does not exist"#,
2036];
2037
2038/// Evaluates if the given outcome should be returned directly or if it should
2039/// be wrapped as a warning. Note that this function should be used for outcomes
2040/// that can be judged in a context-independent manner, i.e., the outcome itself
2041/// provides enough information as to whether a warning should be emitted or not.
2042fn should_warn(outcome: &Outcome) -> bool {
2043    match outcome {
2044        Outcome::InconsistentViewOutcome {
2045            query_outcome,
2046            view_outcome,
2047            ..
2048        } => match (query_outcome.as_ref(), view_outcome.as_ref()) {
2049            (Outcome::Success, Outcome::PlanFailure { error, .. }) => {
2050                INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
2051                    Regex::new(s)
2052                        .expect("unexpected error in regular expression parsing")
2053                        .is_match(&error.to_string_with_causes())
2054                })
2055            }
2056            _ => false,
2057        },
2058        _ => false,
2059    }
2060}
2061
2062pub async fn run_string(
2063    runner: &mut Runner<'_>,
2064    source: &str,
2065    input: &str,
2066) -> Result<Outcomes, anyhow::Error> {
2067    runner.reset_database().await?;
2068
2069    let mut outcomes = Outcomes::default();
2070    let mut parser = crate::parser::Parser::new(source, input);
2071    // Transactions are currently relatively slow. Since sqllogictest runs in a single connection
2072    // there should be no difference in having longer running transactions.
2073    let mut in_transaction = false;
2074    writeln!(runner.config.stdout, "--- {}", source);
2075
2076    for record in parser.parse_records()? {
2077        // In maximal-verbosity mode, print the query before attempting to run
2078        // it. Running the query might panic, so it is important to print out
2079        // what query we are trying to run *before* we panic.
2080        if runner.config.verbosity >= 2 {
2081            print_record(runner.config, &record);
2082        }
2083
2084        let outcome = runner
2085            .run_record(&record, &mut in_transaction)
2086            .await
2087            .map_err(|err| format!("In {}:\n{}", source, err))
2088            .unwrap();
2089
2090        // Print warnings and failures in verbose mode.
2091        if runner.config.verbosity >= 1 && !outcome.success() {
2092            if runner.config.verbosity < 2 {
2093                // If `verbosity >= 2`, we'll already have printed the record,
2094                // so don't print it again. Yes, this is an ugly bit of logic.
2095                // Please don't try to consolidate it with the `print_record`
2096                // call above, as it's important to have a mode in which records
2097                // are printed before they are run, so that if running the
2098                // record panics, you can tell which record caused it.
2099                if !outcome.failure() {
2100                    writeln!(
2101                        runner.config.stdout,
2102                        "{}",
2103                        util::indent("Warning detected for: ", 4)
2104                    );
2105                }
2106                print_record(runner.config, &record);
2107            }
2108            if runner.config.verbosity >= 2 || outcome.failure() {
2109                writeln!(
2110                    runner.config.stdout,
2111                    "{}",
2112                    util::indent(&outcome.to_string(), 4)
2113                );
2114                writeln!(runner.config.stdout, "{}", util::indent("----", 4));
2115            }
2116        }
2117
2118        outcomes.stats[outcome.code()] += 1;
2119        if outcome.failure() {
2120            outcomes.details.push(format!("{}", outcome));
2121        }
2122
2123        if let Outcome::Bail { .. } = outcome {
2124            break;
2125        }
2126
2127        if runner.config.fail_fast && outcome.failure() {
2128            break;
2129        }
2130    }
2131    Ok(outcomes)
2132}
2133
2134pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
2135    let mut input = String::new();
2136    File::open(filename)?.read_to_string(&mut input)?;
2137    let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
2138    runner.check_catalog().await?;
2139
2140    Ok(outcomes)
2141}
2142
2143pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
2144    runner.reset_database().await?;
2145
2146    let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
2147
2148    let mut input = String::new();
2149    file.read_to_string(&mut input)?;
2150
2151    let mut buf = RewriteBuffer::new(&input);
2152
2153    let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
2154    writeln!(runner.config.stdout, "--- {}", filename.display());
2155    let mut in_transaction = false;
2156
2157    fn append_values_output(
2158        buf: &mut RewriteBuffer,
2159        input: &String,
2160        expected_output: &str,
2161        mode: &Mode,
2162        types: &Vec<Type>,
2163        column_names: Option<&Vec<ColumnName>>,
2164        actual_output: &Vec<String>,
2165        multiline: bool,
2166    ) {
2167        buf.append_header(input, expected_output, column_names);
2168
2169        for (i, row) in actual_output.chunks(types.len()).enumerate() {
2170            match mode {
2171                // In Cockroach mode, output each row on its own line, with
2172                // two spaces between each column.
2173                Mode::Cockroach => {
2174                    if i != 0 {
2175                        buf.append("\n");
2176                    }
2177
2178                    if row.len() == 0 {
2179                        // nothing to do
2180                    } else if row.len() == 1 {
2181                        // If there is only one column, then there is no need for space
2182                        // substitution, so we only do newline substitution.
2183                        if multiline {
2184                            buf.append(&row[0]);
2185                        } else {
2186                            buf.append(&row[0].replace('\n', "⏎"))
2187                        }
2188                    } else {
2189                        // Substitute spaces with ␠ to avoid mistaking the spaces in the result
2190                        // values with spaces that separate columns.
2191                        buf.append(
2192                            &row.iter()
2193                                .map(|col| {
2194                                    let mut col = col.replace(' ', "␠");
2195                                    if !multiline {
2196                                        col = col.replace('\n', "⏎");
2197                                    }
2198                                    col
2199                                })
2200                                .join("  "),
2201                        );
2202                    }
2203                }
2204                // In standard mode, output each value on its own line,
2205                // and ignore row boundaries.
2206                // No need to substitute spaces, because every value (not row) is on a separate
2207                // line. But we do need to substitute newlines.
2208                Mode::Standard => {
2209                    for (j, col) in row.iter().enumerate() {
2210                        if i != 0 || j != 0 {
2211                            buf.append("\n");
2212                        }
2213                        buf.append(&if multiline {
2214                            col.clone()
2215                        } else {
2216                            col.replace('\n', "⏎")
2217                        });
2218                    }
2219                }
2220            }
2221        }
2222    }
2223
2224    for record in parser.parse_records()? {
2225        let outcome = runner.run_record(&record, &mut in_transaction).await?;
2226
2227        match (&record, &outcome) {
2228            // If we see an output failure for a query, rewrite the expected output
2229            // to match the observed output.
2230            (
2231                Record::Query {
2232                    output:
2233                        Ok(QueryOutput {
2234                            mode,
2235                            output: Output::Values(_),
2236                            output_str: expected_output,
2237                            types,
2238                            column_names,
2239                            multiline,
2240                            ..
2241                        }),
2242                    ..
2243                },
2244                Outcome::OutputFailure {
2245                    actual_output: Output::Values(actual_output),
2246                    ..
2247                },
2248            ) => {
2249                append_values_output(
2250                    &mut buf,
2251                    &input,
2252                    expected_output,
2253                    mode,
2254                    types,
2255                    column_names.as_ref(),
2256                    actual_output,
2257                    *multiline,
2258                );
2259            }
2260            (
2261                Record::Query {
2262                    output:
2263                        Ok(QueryOutput {
2264                            mode,
2265                            output: Output::Values(_),
2266                            output_str: expected_output,
2267                            types,
2268                            multiline,
2269                            ..
2270                        }),
2271                    ..
2272                },
2273                Outcome::WrongColumnNames {
2274                    actual_column_names,
2275                    actual_output: Output::Values(actual_output),
2276                    ..
2277                },
2278            ) => {
2279                append_values_output(
2280                    &mut buf,
2281                    &input,
2282                    expected_output,
2283                    mode,
2284                    types,
2285                    Some(actual_column_names),
2286                    actual_output,
2287                    *multiline,
2288                );
2289            }
2290            (
2291                Record::Query {
2292                    output:
2293                        Ok(QueryOutput {
2294                            output: Output::Hashed { .. },
2295                            output_str: expected_output,
2296                            column_names,
2297                            ..
2298                        }),
2299                    ..
2300                },
2301                Outcome::OutputFailure {
2302                    actual_output: Output::Hashed { num_values, md5 },
2303                    ..
2304                },
2305            ) => {
2306                buf.append_header(&input, expected_output, column_names.as_ref());
2307
2308                buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
2309            }
2310            (
2311                Record::Simple {
2312                    output_str: expected_output,
2313                    ..
2314                },
2315                Outcome::OutputFailure {
2316                    actual_output: Output::Values(actual_output),
2317                    ..
2318                },
2319            ) => {
2320                buf.append_header(&input, expected_output, None);
2321
2322                for (i, row) in actual_output.iter().enumerate() {
2323                    if i != 0 {
2324                        buf.append("\n");
2325                    }
2326                    buf.append(row);
2327                }
2328            }
2329            (
2330                Record::Query {
2331                    sql,
2332                    output: Err(err),
2333                    ..
2334                },
2335                outcome,
2336            )
2337            | (
2338                Record::Statement {
2339                    expected_error: Some(err),
2340                    sql,
2341                    ..
2342                },
2343                outcome,
2344            ) if outcome.err_msg().is_some() => {
2345                buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
2346            }
2347            (_, Outcome::Success) => {}
2348            _ => bail!("unexpected: {:?} {:?}", record, outcome),
2349        }
2350    }
2351
2352    file.set_len(0)?;
2353    file.seek(SeekFrom::Start(0))?;
2354    file.write_all(buf.finish().as_bytes())?;
2355    file.sync_all()?;
2356    Ok(())
2357}
2358
2359/// Provides a means to rewrite the `.slt` file while iterating over it.
2360///
2361/// This struct takes the slt file as its `input`, tracks a cursor into it
2362/// (`input_offset`), and provides a buffer (`output`) to store the rewritten
2363/// results.
2364///
2365/// Functions that modify the file will lazily move `input` into `output` using
2366/// `flush_to`. However, those calls should all be interior to other functions.
2367#[derive(Debug)]
2368struct RewriteBuffer<'a> {
2369    input: &'a str,
2370    input_offset: usize,
2371    output: String,
2372}
2373
2374impl<'a> RewriteBuffer<'a> {
2375    fn new(input: &'a str) -> RewriteBuffer<'a> {
2376        RewriteBuffer {
2377            input,
2378            input_offset: 0,
2379            output: String::new(),
2380        }
2381    }
2382
2383    fn flush_to(&mut self, offset: usize) {
2384        assert!(offset >= self.input_offset);
2385        let chunk = &self.input[self.input_offset..offset];
2386        self.output.push_str(chunk);
2387        self.input_offset = offset;
2388    }
2389
2390    fn skip_to(&mut self, offset: usize) {
2391        assert!(offset >= self.input_offset);
2392        self.input_offset = offset;
2393    }
2394
2395    fn append(&mut self, s: &str) {
2396        self.output.push_str(s);
2397    }
2398
2399    fn append_header(
2400        &mut self,
2401        input: &String,
2402        expected_output: &str,
2403        column_names: Option<&Vec<ColumnName>>,
2404    ) {
2405        // Output everything before this record.
2406        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2407        #[allow(clippy::as_conversions)]
2408        let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
2409        self.flush_to(offset);
2410        self.skip_to(offset + expected_output.len());
2411
2412        // Attempt to install the result separator (----), if it does
2413        // not already exist.
2414        if self.peek_last(5) == "\n----" {
2415            self.append("\n");
2416        } else if self.peek_last(6) != "\n----\n" {
2417            self.append("\n----\n");
2418        }
2419
2420        let Some(names) = column_names else {
2421            return;
2422        };
2423        self.append(
2424            &names
2425                .iter()
2426                .map(|name| name.replace(' ', "␠"))
2427                .collect::<Vec<_>>()
2428                .join(" "),
2429        );
2430        self.append("\n");
2431    }
2432
2433    fn rewrite_expected_error(
2434        &mut self,
2435        input: &String,
2436        old_err: &str,
2437        new_err: &str,
2438        query: &str,
2439    ) {
2440        // Output everything before this error message.
2441        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2442        #[allow(clippy::as_conversions)]
2443        let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
2444        self.flush_to(err_offset);
2445        self.append(new_err);
2446        self.append("\n");
2447        self.append(query);
2448        // TODO(benesch): is it possible to rewrite this to avoid `as`?
2449        #[allow(clippy::as_conversions)]
2450        self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
2451    }
2452
2453    fn peek_last(&self, n: usize) -> &str {
2454        &self.output[self.output.len() - n..]
2455    }
2456
2457    fn finish(mut self) -> String {
2458        self.flush_to(self.input.len());
2459        self.output
2460    }
2461}
2462
2463/// Generates view creation, view indexing, view querying, and view
2464/// dropping SQL commands for a given `SELECT` query. If the number
2465/// of attributes produced by the query is known, the view commands
2466/// are specialized to avoid issues with column ambiguity. This
2467/// function is a helper for `--auto_index_selects` and assumes that
2468/// the provided input SQL has already been run through the parser,
2469/// resulting in a valid `SELECT` statement.
2470fn generate_view_sql(
2471    sql: &str,
2472    view_uuid: &Simple,
2473    num_attributes: Option<usize>,
2474    expected_column_names: Option<Vec<ColumnName>>,
2475) -> (String, String, String, String) {
2476    // To create the view, re-parse the sql; note that we must find exactly
2477    // one statement and it must be a `SELECT`.
2478    // NOTE(vmarcos): Direct string manipulation was attempted while
2479    // prototyping the code below, which avoids the extra parsing and
2480    // data structure cloning. However, running DDL is so slow that
2481    // it did not matter in terms of runtime. We can revisit this if
2482    // DDL cost drops dramatically in the future.
2483    let stmts = parser::parse_statements(sql).unwrap_or_default();
2484    assert!(stmts.len() == 1);
2485    let (query, query_as_of) = match &stmts[0].ast {
2486        Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
2487        _ => unreachable!("This function should only be called for SELECTs"),
2488    };
2489
2490    // Prior to creating the view, process the `ORDER BY` clause of
2491    // the `SELECT` query, if any. Ordering is not preserved when a
2492    // view includes an `ORDER BY` clause and must be re-enforced by
2493    // an external `ORDER BY` clause when querying the view.
2494    let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
2495        (query.order_by.clone(), vec![], None)
2496    } else {
2497        derive_order_by(&query.body, &query.order_by)
2498    };
2499
2500    // Since one-shot SELECT statements may contain ambiguous column names,
2501    // we either use the expected column names, if that option was
2502    // provided, or else just rename the output schema of the view
2503    // using numerically increasing attribute names, whenever possible.
2504    // This strategy makes it possible to use `CREATE INDEX`, thus
2505    // matching the behavior of the option `auto_index_tables`. However,
2506    // we may be presented with a `SELECT *` query, in which case the parser
2507    // does not produce sufficient information to allow us to compute
2508    // the number of output columns. In the latter case, we are supplied
2509    // with `None` for `num_attributes` and just employ the command
2510    // `CREATE DEFAULT INDEX` instead. Additionally, the view is created
2511    // without schema renaming. This strategy is insufficient to dodge
2512    // column name ambiguity in all cases, but we assume here that we
2513    // can adjust the (hopefully) small number of tests that eventually
2514    // challenge us in this particular way.
2515    let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
2516    let projection = expected_column_names.map_or_else(
2517        || {
2518            num_attributes.map_or(vec![], |n| {
2519                (1..=n)
2520                    .map(|i| Ident::new_unchecked(format!("a{i}")))
2521                    .collect()
2522            })
2523        },
2524        |cols| {
2525            cols.iter()
2526                .map(|c| Ident::new_unchecked(c.as_str()))
2527                .collect()
2528        },
2529    );
2530    let columns: Vec<Ident> = projection
2531        .iter()
2532        .cloned()
2533        .chain(extra_columns.iter().map(|item| {
2534            if let SelectItem::Expr {
2535                expr: _,
2536                alias: Some(ident),
2537            } = item
2538            {
2539                ident.clone()
2540            } else {
2541                unreachable!("alias must be given for extra column")
2542            }
2543        }))
2544        .collect();
2545
2546    // Build a `CREATE VIEW` with the columns computed above.
2547    let mut query = query.clone();
2548    if extra_columns.len() > 0 {
2549        match &mut query.body {
2550            SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
2551            _ => unimplemented!("cannot yet rewrite projections of nested queries"),
2552        }
2553    }
2554    let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
2555        if_exists: IfExistsBehavior::Error,
2556        temporary: false,
2557        definition: ViewDefinition {
2558            name: name.clone(),
2559            columns: columns.clone(),
2560            query,
2561        },
2562    })
2563    .to_ast_string_stable();
2564
2565    // We then create either a `CREATE INDEX` or a `CREATE DEFAULT INDEX`
2566    // statement, depending on whether we could obtain the number of
2567    // attributes from the original `SELECT`.
2568    let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2569        name: None,
2570        in_cluster: None,
2571        on_name: RawItemName::Name(name.clone()),
2572        key_parts: if columns.len() == 0 {
2573            None
2574        } else {
2575            Some(
2576                columns
2577                    .iter()
2578                    .map(|ident| Expr::Identifier(vec![ident.clone()]))
2579                    .collect(),
2580            )
2581        },
2582        with_options: Vec::new(),
2583        if_not_exists: false,
2584    })
2585    .to_ast_string_stable();
2586
2587    // Assert if DISTINCT semantics are unchanged from view
2588    let distinct_unneeded = extra_columns.len() == 0
2589        || match distinct {
2590            None | Some(Distinct::On(_)) => true,
2591            Some(Distinct::EntireRow) => false,
2592        };
2593    let distinct = if distinct_unneeded { None } else { distinct };
2594
2595    // `SELECT [* | {projection}] FROM {name} [ORDER BY {view_order_by}]`
2596    let view_sql = AstStatement::<Raw>::Select(SelectStatement {
2597        query: Query {
2598            ctes: CteBlock::Simple(vec![]),
2599            body: SetExpr::Select(Box::new(Select {
2600                distinct,
2601                projection: if projection.len() == 0 {
2602                    vec![SelectItem::Wildcard]
2603                } else {
2604                    projection
2605                        .iter()
2606                        .map(|ident| SelectItem::Expr {
2607                            expr: Expr::Identifier(vec![ident.clone()]),
2608                            alias: None,
2609                        })
2610                        .collect()
2611                },
2612                from: vec![TableWithJoins {
2613                    relation: TableFactor::Table {
2614                        name: RawItemName::Name(name.clone()),
2615                        alias: None,
2616                    },
2617                    joins: vec![],
2618                }],
2619                selection: None,
2620                group_by: vec![],
2621                having: None,
2622                qualify: None,
2623                options: vec![],
2624            })),
2625            order_by: view_order_by,
2626            limit: None,
2627            offset: None,
2628        },
2629        as_of: query_as_of.clone(),
2630    })
2631    .to_ast_string_stable();
2632
2633    // `DROP VIEW {name}`
2634    let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
2635        object_type: ObjectType::View,
2636        if_exists: false,
2637        names: vec![UnresolvedObjectName::Item(name)],
2638        cascade: false,
2639    })
2640    .to_ast_string_stable();
2641
2642    (create_view, create_index, view_sql, drop_view)
2643}
2644
2645/// Analyzes the provided query `body` to derive the number of
2646/// attributes in the query. We only consider syntactic cues,
2647/// so we may end up deriving `None` for the number of attributes
2648/// as a conservative approximation.
2649fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
2650    let Some((projection, _)) = find_projection(body) else {
2651        return None;
2652    };
2653    derive_num_attributes_from_projection(projection)
2654}
2655
2656/// Analyzes a query's `ORDER BY` clause to derive an `ORDER BY`
2657/// clause that makes numeric references to any expressions in
2658/// the projection and generated-attribute references to expressions
2659/// that need to be added as extra columns to the projection list.
2660/// The rewritten `ORDER BY` clause is then usable when querying a
2661/// view that contains the same `SELECT` as the given query.
2662/// This function returns both the rewritten `ORDER BY` clause
2663/// as well as a list of extra columns that need to be added
2664/// to the query's projection for the `ORDER BY` clause to
2665/// succeed.
2666fn derive_order_by(
2667    body: &SetExpr<Raw>,
2668    order_by: &Vec<OrderByExpr<Raw>>,
2669) -> (
2670    Vec<OrderByExpr<Raw>>,
2671    Vec<SelectItem<Raw>>,
2672    Option<Distinct<Raw>>,
2673) {
2674    let Some((projection, distinct)) = find_projection(body) else {
2675        return (vec![], vec![], None);
2676    };
2677    let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
2678    (view_order_by, extra_columns, distinct.clone())
2679}
2680
2681/// Finds the projection list in a `SELECT` query body.
2682fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
2683    // Iterate to peel off the query body until the query's
2684    // projection list is found.
2685    let mut set_expr = body;
2686    loop {
2687        match set_expr {
2688            SetExpr::Select(select) => {
2689                return Some((&select.projection, &select.distinct));
2690            }
2691            SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
2692            SetExpr::Query(query) => set_expr = &query.body,
2693            _ => return None,
2694        }
2695    }
2696}
2697
2698/// Computes the number of attributes that are obtained by the
2699/// projection of a `SELECT` query. The projection may include
2700/// wildcards, in which case the analysis just returns `None`.
2701fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
2702    let mut num_attributes = 0usize;
2703    for item in projection.iter() {
2704        let SelectItem::Expr { expr, .. } = item else {
2705            return None;
2706        };
2707        match expr {
2708            Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
2709                return None;
2710            }
2711            _ => {
2712                num_attributes += 1;
2713            }
2714        }
2715    }
2716    Some(num_attributes)
2717}
2718
2719/// Computes an `ORDER BY` clause with only numeric references
2720/// from given projection and `ORDER BY` of a `SELECT` query.
2721/// If the derivation fails to match a given expression, the
2722/// matched prefix is returned. Note that this could be empty.
2723fn derive_order_by_from_projection(
2724    projection: &Vec<SelectItem<Raw>>,
2725    order_by: &Vec<OrderByExpr<Raw>>,
2726) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
2727    let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
2728    let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
2729    for order_by_expr in order_by.iter() {
2730        let query_expr = &order_by_expr.expr;
2731        let view_expr = match query_expr {
2732            Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
2733            _ => {
2734                // Find expression in query projection, if we can.
2735                if let Some(i) = projection.iter().position(|item| match item {
2736                    SelectItem::Expr { expr, alias } => {
2737                        expr == query_expr
2738                            || match query_expr {
2739                                Expr::Identifier(ident) => {
2740                                    ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
2741                                }
2742                                _ => false,
2743                            }
2744                    }
2745                    SelectItem::Wildcard => false,
2746                }) {
2747                    Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
2748                } else {
2749                    // If the expression is not found in the
2750                    // projection, add extra column.
2751                    let ident = Ident::new_unchecked(format!(
2752                        "a{}",
2753                        (projection.len() + extra_columns.len() + 1)
2754                    ));
2755                    extra_columns.push(SelectItem::Expr {
2756                        expr: query_expr.clone(),
2757                        alias: Some(ident.clone()),
2758                    });
2759                    Expr::Identifier(vec![ident])
2760                }
2761            }
2762        };
2763        view_order_by.push(OrderByExpr {
2764            expr: view_expr,
2765            asc: order_by_expr.asc,
2766            nulls_last: order_by_expr.nulls_last,
2767        });
2768    }
2769    (view_order_by, extra_columns)
2770}
2771
2772/// Returns extra statements to execute after `stmt` is executed.
2773fn mutate(sql: &str) -> Vec<String> {
2774    let stmts = parser::parse_statements(sql).unwrap_or_default();
2775    let mut additional = Vec::new();
2776    for stmt in stmts {
2777        match stmt.ast {
2778            AstStatement::CreateTable(stmt) => additional.push(
2779                // CREATE TABLE -> CREATE INDEX. Specify all columns manually in case CREATE
2780                // DEFAULT INDEX ever goes away.
2781                AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2782                    name: None,
2783                    in_cluster: None,
2784                    on_name: RawItemName::Name(stmt.name.clone()),
2785                    key_parts: Some(
2786                        stmt.columns
2787                            .iter()
2788                            .map(|def| Expr::Identifier(vec![def.name.clone()]))
2789                            .collect(),
2790                    ),
2791                    with_options: Vec::new(),
2792                    if_not_exists: false,
2793                })
2794                .to_ast_string_stable(),
2795            ),
2796            _ => {}
2797        }
2798    }
2799    additional
2800}
2801
2802#[mz_ore::test]
2803#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
2804fn test_generate_view_sql() {
2805    let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
2806    let cases = vec![
2807        (("SELECT * FROM t", None, None),
2808        (
2809            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
2810            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2811            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2812            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2813        )),
2814        (("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
2815        (
2816            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2817            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
2818            r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2819            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2820        )),
2821        (("SELECT a, b, c FROM t1, t2", Some(3), None),
2822        (
2823            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2824            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2825            r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2826            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2827        )),
2828        // A case with ambiguity that is accepted by the function, illustrating that
2829        // our measures to dodge this issue are imperfect.
2830        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
2831        (
2832            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
2833            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2834            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2835            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2836        )),
2837        (("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
2838        (
2839            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
2840            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
2841            r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
2842            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2843        )),
2844        (("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
2845        (
2846            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
2847            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
2848            r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2849            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2850        )),
2851        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
2852        (
2853            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
2854            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2855            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2856            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2857        )),
2858        (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
2859        (
2860            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
2861            r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2862            r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
2863            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2864        )),
2865        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
2866        (
2867            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
2868            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2869            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
2870            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2871        )),
2872        (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
2873        (
2874            r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
2875            r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2876            r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
2877            r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2878        )),
2879    ];
2880    for ((sql, num_attributes, expected_column_names), expected) in cases {
2881        let view_sql =
2882            generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
2883        assert_eq!(expected, view_sql);
2884    }
2885}
2886
2887#[mz_ore::test]
2888fn test_mutate() {
2889    let cases = vec![
2890        ("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
2891        (
2892            "CREATE TABLE t (a INT)",
2893            vec![r#"CREATE INDEX ON "t" ("a")"#],
2894        ),
2895        (
2896            "CREATE TABLE t (a INT, b TEXT)",
2897            vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
2898        ),
2899        // Invalid syntax works, just returns nothing.
2900        ("BAD SYNTAX", Vec::new()),
2901    ];
2902    for (sql, expected) in cases {
2903        let stmts = mutate(sql);
2904        assert_eq!(expected, stmts, "sql: {sql}");
2905    }
2906}