use std::collections::BTreeMap;
use std::error::Error;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::Path;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Duration;
use std::{env, fmt, ops, str, thread};
use anyhow::{anyhow, bail};
use bytes::BytesMut;
use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
use fallible_iterator::FallibleIterator;
use futures::sink::SinkExt;
use itertools::Itertools;
use md5::{Digest, Md5};
use mz_catalog::config::ClusterReplicaSizeMap;
use mz_controller::ControllerConfig;
use mz_environmentd::CatalogConfig;
use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
use mz_ore::cast::{CastFrom, ReinterpretCast};
use mz_ore::channel::trigger;
use mz_ore::error::ErrorExt;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_ore::retry::Retry;
use mz_ore::task;
use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
use mz_ore::tracing::TracingHandle;
use mz_ore::url::SensitiveUrl;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::cfg::PersistConfig;
use mz_persist_client::rpc::{
MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
};
use mz_persist_client::PersistLocation;
use mz_pgrepr::{oid, Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value};
use mz_repr::adt::date::Date;
use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
use mz_repr::adt::numeric;
use mz_repr::ColumnName;
use mz_secrets::SecretsController;
use mz_sql::ast::{Expr, Raw, Statement};
use mz_sql::catalog::EnvironmentId;
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{
CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
};
use mz_sql_parser::parser;
use mz_storage_types::connections::ConnectionContext;
use postgres_protocol::types;
use regex::Regex;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
use tokio_stream::wrappers::TcpListenerStream;
use tower_http::cors::AllowOrigin;
use tracing::{error, info};
use uuid::fmt::Simple;
use uuid::Uuid;
use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
use crate::util;
#[derive(Debug)]
pub enum Outcome<'a> {
Unsupported {
error: anyhow::Error,
location: Location,
},
ParseFailure {
error: anyhow::Error,
location: Location,
},
PlanFailure {
error: anyhow::Error,
location: Location,
},
UnexpectedPlanSuccess {
expected_error: &'a str,
location: Location,
},
WrongNumberOfRowsInserted {
expected_count: u64,
actual_count: u64,
location: Location,
},
WrongColumnCount {
expected_count: usize,
actual_count: usize,
location: Location,
},
WrongColumnNames {
expected_column_names: &'a Vec<ColumnName>,
actual_column_names: Vec<ColumnName>,
actual_output: Output,
location: Location,
},
OutputFailure {
expected_output: &'a Output,
actual_raw_output: Vec<Row>,
actual_output: Output,
location: Location,
},
InconsistentViewOutcome {
query_outcome: Box<Outcome<'a>>,
view_outcome: Box<Outcome<'a>>,
location: Location,
},
Bail {
cause: Box<Outcome<'a>>,
location: Location,
},
Warning {
cause: Box<Outcome<'a>>,
location: Location,
},
Success,
}
const NUM_OUTCOMES: usize = 12;
const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
impl<'a> Outcome<'a> {
fn code(&self) -> usize {
match self {
Outcome::Unsupported { .. } => 0,
Outcome::ParseFailure { .. } => 1,
Outcome::PlanFailure { .. } => 2,
Outcome::UnexpectedPlanSuccess { .. } => 3,
Outcome::WrongNumberOfRowsInserted { .. } => 4,
Outcome::WrongColumnCount { .. } => 5,
Outcome::WrongColumnNames { .. } => 6,
Outcome::OutputFailure { .. } => 7,
Outcome::InconsistentViewOutcome { .. } => 8,
Outcome::Bail { .. } => 9,
Outcome::Warning { .. } => 10,
Outcome::Success => 11,
}
}
fn success(&self) -> bool {
matches!(self, Outcome::Success)
}
fn failure(&self) -> bool {
!matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
}
fn err_msg(&self) -> Option<String> {
match self {
Outcome::Unsupported { error, .. }
| Outcome::ParseFailure { error, .. }
| Outcome::PlanFailure { error, .. } => Some(
regex::escape(
error.to_string().split('\n').next().unwrap(),
),
),
_ => None,
}
}
}
impl fmt::Display for Outcome<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use Outcome::*;
const INDENT: &str = "\n ";
match self {
Unsupported { error, location } => write!(
f,
"Unsupported:{}:\n{}",
location,
error.display_with_causes()
),
ParseFailure { error, location } => {
write!(
f,
"ParseFailure:{}:\n{}",
location,
error.display_with_causes()
)
}
PlanFailure { error, location } => write!(f, "PlanFailure:{}:\n{:#}", location, error),
UnexpectedPlanSuccess {
expected_error,
location,
} => write!(
f,
"UnexpectedPlanSuccess:{} expected error: {}",
location, expected_error
),
WrongNumberOfRowsInserted {
expected_count,
actual_count,
location,
} => write!(
f,
"WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
location, INDENT, expected_count, INDENT, actual_count
),
WrongColumnCount {
expected_count,
actual_count,
location,
} => write!(
f,
"WrongColumnCount:{}{}expected: {}{}actually: {}",
location, INDENT, expected_count, INDENT, actual_count
),
WrongColumnNames {
expected_column_names,
actual_column_names,
actual_output: _,
location,
} => write!(
f,
"Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
location,
INDENT,
expected_column_names
.iter()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(" "),
INDENT,
actual_column_names
.iter()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(" ")
),
OutputFailure {
expected_output,
actual_raw_output,
actual_output,
location,
} => write!(
f,
"OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
),
InconsistentViewOutcome {
query_outcome,
view_outcome,
location,
} => write!(
f,
"InconsistentViewOutcome:{}{}expected from query: {:?}{}actually from indexed view: {:?}{}",
location, INDENT, query_outcome, INDENT, view_outcome, INDENT
),
Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
Success => f.write_str("Success"),
}
}
}
#[derive(Default, Debug)]
pub struct Outcomes {
stats: [usize; NUM_OUTCOMES],
details: Vec<String>,
}
impl ops::AddAssign<Outcomes> for Outcomes {
fn add_assign(&mut self, rhs: Outcomes) {
for (lhs, rhs) in self.stats.iter_mut().zip(rhs.stats.iter()) {
*lhs += rhs
}
}
}
impl Outcomes {
pub fn any_failed(&self) -> bool {
self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
}
pub fn as_json(&self) -> serde_json::Value {
serde_json::json!({
"unsupported": self.stats[0],
"parse_failure": self.stats[1],
"plan_failure": self.stats[2],
"unexpected_plan_success": self.stats[3],
"wrong_number_of_rows_affected": self.stats[4],
"wrong_column_count": self.stats[5],
"wrong_column_names": self.stats[6],
"output_failure": self.stats[7],
"inconsistent_view_outcome": self.stats[8],
"bail": self.stats[9],
"warning": self.stats[10],
"success": self.stats[11],
})
}
pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
OutcomesDisplay {
inner: self,
no_fail,
failure_details,
}
}
}
pub struct OutcomesDisplay<'a> {
inner: &'a Outcomes,
no_fail: bool,
failure_details: bool,
}
impl<'a> fmt::Display for OutcomesDisplay<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let total: usize = self.inner.stats.iter().sum();
if self.failure_details
&& (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
|| self.no_fail)
{
for outcome in &self.inner.details {
writeln!(f, "{}", outcome)?;
}
Ok(())
} else {
write!(
f,
"{}:",
if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
"PASS"
} else if self.no_fail {
"FAIL-IGNORE"
} else {
"FAIL"
}
)?;
static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
vec![
"unsupported",
"parse-failure",
"plan-failure",
"unexpected-plan-success",
"wrong-number-of-rows-inserted",
"wrong-column-count",
"wrong-column-names",
"output-failure",
"inconsistent-view-outcome",
"bail",
"warning",
"success",
"total",
]
});
for (i, n) in self.inner.stats.iter().enumerate() {
if *n > 0 {
write!(f, " {}={}", NAMES[i], n)?;
}
}
write!(f, " total={}", total)
}
}
}
struct QueryInfo {
is_select: bool,
num_attributes: Option<usize>,
}
enum PrepareQueryOutcome<'a> {
QueryPrepared(QueryInfo),
Outcome(Outcome<'a>),
}
pub struct Runner<'a> {
config: &'a RunConfig<'a>,
inner: Option<RunnerInner<'a>>,
}
pub struct RunnerInner<'a> {
server_addr: SocketAddr,
internal_server_addr: SocketAddr,
internal_http_server_addr: SocketAddr,
client: tokio_postgres::Client,
system_client: tokio_postgres::Client,
clients: BTreeMap<String, tokio_postgres::Client>,
auto_index_tables: bool,
auto_index_selects: bool,
auto_transactions: bool,
enable_table_keys: bool,
verbosity: usize,
stdout: &'a dyn WriteFmt,
_shutdown_trigger: trigger::Trigger,
_server_thread: JoinOnDropHandle<()>,
_temp_dir: TempDir,
}
#[derive(Debug)]
pub struct Slt(Value);
impl<'a> FromSql<'a> for Slt {
fn from_sql(
ty: &PgType,
mut raw: &'a [u8],
) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
Ok(match *ty {
PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
types::bytea_from_sql(raw),
)?)),
PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
types::char_from_sql(raw)?.to_be_bytes(),
))),
PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
raw,
)?)?)),
PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
Self(Value::Text(types::text_from_sql(raw)?.to_string()))
}
PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
PgType::TIMESTAMP => Self(Value::Timestamp(
NaiveDateTime::from_sql(ty, raw)?.try_into()?,
)),
PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
)),
PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
PgType::RECORD => {
let num_fields = read_be_i32(&mut raw)?;
let mut tuple = vec![];
for _ in 0..num_fields {
let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
let typ = match PgType::from_oid(oid) {
Some(typ) => typ,
None => return Err("unknown oid".into()),
};
let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
tuple.push(v.map(|v| v.0));
}
Self(Value::Record(tuple))
}
PgType::INT4_RANGE
| PgType::INT8_RANGE
| PgType::DATE_RANGE
| PgType::NUM_RANGE
| PgType::TS_RANGE
| PgType::TSTZ_RANGE => {
use mz_repr::adt::range::Range;
let range: Range<Slt> = Range::from_sql(ty, raw)?;
Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
}
_ => match ty.kind() {
PgKind::Array(arr_type) => {
let arr = types::array_from_sql(raw)?;
let elements: Vec<Option<Value>> = arr
.values()
.map(|v| match v {
Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
None => Ok(None),
})
.collect::<Vec<Option<Slt>>>()?
.into_iter()
.map(|v| v.map(|v| v.0))
.collect();
Self(Value::Array {
dims: arr
.dimensions()
.map(|d| {
Ok(mz_repr::adt::array::ArrayDimension {
lower_bound: isize::cast_from(d.lower_bound),
length: usize::try_from(d.len)
.expect("cannot have negative length"),
})
})
.collect()?,
elements,
})
}
_ => match ty.oid() {
oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
oid::TYPE_MZ_TIMESTAMP_OID => {
let s = types::text_from_sql(raw)?;
let t: mz_repr::Timestamp = s.parse()?;
Self(Value::MzTimestamp(t))
}
oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
types::bytea_from_sql(raw),
)?)),
_ => unreachable!(),
},
},
})
}
fn accepts(ty: &PgType) -> bool {
match ty.kind() {
PgKind::Array(_) | PgKind::Composite(_) => return true,
_ => {}
}
match ty.oid() {
oid::TYPE_UINT2_OID
| oid::TYPE_UINT4_OID
| oid::TYPE_UINT8_OID
| oid::TYPE_MZ_TIMESTAMP_OID
| oid::TYPE_MZ_ACL_ITEM_OID => return true,
_ => {}
}
matches!(
*ty,
PgType::ACLITEM
| PgType::BOOL
| PgType::BYTEA
| PgType::CHAR
| PgType::DATE
| PgType::FLOAT4
| PgType::FLOAT8
| PgType::INT2
| PgType::INT4
| PgType::INT8
| PgType::INTERVAL
| PgType::JSONB
| PgType::NAME
| PgType::NUMERIC
| PgType::OID
| PgType::REGCLASS
| PgType::REGPROC
| PgType::REGTYPE
| PgType::RECORD
| PgType::TEXT
| PgType::BPCHAR
| PgType::VARCHAR
| PgType::TIME
| PgType::TIMESTAMP
| PgType::TIMESTAMPTZ
| PgType::UUID
| PgType::INT4_RANGE
| PgType::INT4_RANGE_ARRAY
| PgType::INT8_RANGE
| PgType::INT8_RANGE_ARRAY
| PgType::DATE_RANGE
| PgType::DATE_RANGE_ARRAY
| PgType::NUM_RANGE
| PgType::NUM_RANGE_ARRAY
| PgType::TS_RANGE
| PgType::TS_RANGE_ARRAY
| PgType::TSTZ_RANGE
| PgType::TSTZ_RANGE_ARRAY
)
}
}
fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
if buf.len() < 4 {
return Err("invalid buffer size".into());
}
let mut bytes = [0; 4];
bytes.copy_from_slice(&buf[..4]);
*buf = &buf[4..];
Ok(i32::from_be_bytes(bytes))
}
fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
where
T: FromSql<'a>,
{
let value = match usize::try_from(read_be_i32(buf)?) {
Err(_) => None,
Ok(len) => {
if len > buf.len() {
return Err("invalid buffer size".into());
}
let (head, tail) = buf.split_at(len);
*buf = tail;
Some(head)
}
};
T::from_sql_nullable(type_, value)
}
fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
match (typ, d.0) {
(Type::Bool, Value::Bool(b)) => b.to_string(),
(Type::Integer, Value::Int2(i)) => i.to_string(),
(Type::Integer, Value::Int4(i)) => i.to_string(),
(Type::Integer, Value::Int8(i)) => i.to_string(),
(Type::Integer, Value::UInt2(u)) => u.0.to_string(),
(Type::Integer, Value::UInt4(u)) => u.0.to_string(),
(Type::Integer, Value::UInt8(u)) => u.0.to_string(),
(Type::Integer, Value::Oid(i)) => i.to_string(),
#[allow(clippy::as_conversions)]
(Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
#[allow(clippy::as_conversions)]
(Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
(Type::Integer, Value::Text(_)) => "0".to_string(),
(Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
(Type::Integer, Value::Numeric(d)) => {
let mut d = d.0 .0.clone();
let mut cx = numeric::cx_datum();
if mode == Mode::Standard {
cx.set_rounding(dec::Rounding::Down);
}
cx.round(&mut d);
numeric::munge_numeric(&mut d).unwrap();
d.to_standard_notation_string()
}
(Type::Real, Value::Int2(i)) => format!("{:.3}", i),
(Type::Real, Value::Int4(i)) => format!("{:.3}", i),
(Type::Real, Value::Int8(i)) => format!("{:.3}", i),
(Type::Real, Value::Float4(f)) => match mode {
Mode::Standard => format!("{:.3}", f),
Mode::Cockroach => format!("{}", f),
},
(Type::Real, Value::Float8(f)) => match mode {
Mode::Standard => format!("{:.3}", f),
Mode::Cockroach => format!("{}", f),
},
(Type::Real, Value::Numeric(d)) => match mode {
Mode::Standard => {
let mut d = d.0 .0.clone();
if d.exponent() < -3 {
numeric::rescale(&mut d, 3).unwrap();
}
numeric::munge_numeric(&mut d).unwrap();
d.to_standard_notation_string()
}
Mode::Cockroach => d.0 .0.to_standard_notation_string(),
},
(Type::Text, Value::Text(s)) => {
if s.is_empty() {
"(empty)".to_string()
} else {
s
}
}
(Type::Text, Value::Bool(b)) => b.to_string(),
(Type::Text, Value::Float4(f)) => format!("{:.3}", f),
(Type::Text, Value::Float8(f)) => format!("{:.3}", f),
(Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
Ok(s) => s.to_string(),
Err(_) => format!("{:?}", b),
},
(Type::Text, Value::Numeric(d)) => d.0 .0.to_standard_notation_string(),
(Type::Text, d) => {
let mut buf = BytesMut::new();
d.encode_text(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
}
(Type::Oid, Value::Oid(o)) => o.to_string(),
(_, d) => panic!(
"Don't know how to format {:?} as {:?} in column {}",
d, typ, col,
),
}
}
fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
let mut formatted: Vec<String> = vec![];
for i in 0..row.len() {
let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
formatted.push(match t {
Some(t) => t,
None => "NULL".into(),
});
}
formatted
}
impl<'a> Runner<'a> {
pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
let mut runner = Self {
config,
inner: None,
};
runner.reset().await?;
Ok(runner)
}
pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
drop(self.inner.take());
self.inner = Some(RunnerInner::start(self.config).await?);
Ok(())
}
async fn run_record<'r>(
&mut self,
record: &'r Record<'r>,
in_transaction: &mut bool,
) -> Result<Outcome<'r>, anyhow::Error> {
if let Record::ResetServer = record {
self.reset().await?;
Ok(Outcome::Success)
} else {
self.inner
.as_mut()
.expect("RunnerInner missing")
.run_record(record, in_transaction)
.await
}
}
async fn check_catalog(&self) -> Result<(), anyhow::Error> {
self.inner
.as_ref()
.expect("RunnerInner missing")
.check_catalog()
.await
}
async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
let inner = self.inner.as_mut().expect("RunnerInner missing");
inner.client.batch_execute("ROLLBACK;").await?;
inner
.system_client
.batch_execute(
"ROLLBACK;
SET cluster = mz_catalog_server;
RESET cluster_replica;",
)
.await?;
inner
.system_client
.batch_execute("ALTER SYSTEM RESET ALL")
.await?;
for row in inner
.system_client
.query("SELECT name FROM mz_databases", &[])
.await?
{
let name: &str = row.get("name");
inner
.system_client
.batch_execute(&format!("DROP DATABASE \"{name}\""))
.await?;
}
inner
.system_client
.batch_execute("CREATE DATABASE materialize")
.await?;
let mut needs_default_cluster = true;
for row in inner
.system_client
.query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
.await?
{
match row.get("name") {
"quickstart" => needs_default_cluster = false,
name => {
inner
.system_client
.batch_execute(&format!("DROP CLUSTER {name}"))
.await?
}
}
}
if needs_default_cluster {
inner
.system_client
.batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
.await?;
}
let mut needs_default_replica = true;
for row in inner
.system_client
.query(
"SELECT name, size FROM mz_cluster_replicas
WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
&[],
)
.await?
{
let name: &str = row.get("name");
let size: &str = row.get("size");
if name == "r1" && size == self.config.replicas.to_string() {
needs_default_replica = false;
} else {
inner
.system_client
.batch_execute(&format!("DROP CLUSTER REPLICA quickstart.{}", name))
.await?;
}
}
if needs_default_replica {
inner
.system_client
.batch_execute(&format!(
"CREATE CLUSTER REPLICA quickstart.r1 SIZE '{}'",
self.config.replicas
))
.await?;
}
inner
.system_client
.batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
.await?;
inner
.system_client
.batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
.await?;
inner
.system_client
.batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
.await?;
inner
.system_client
.batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
.await?;
inner
.system_client
.batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
.await?;
inner
.system_client
.simple_query("ALTER SYSTEM SET max_tables = 100")
.await?;
if inner.enable_table_keys {
inner
.system_client
.simple_query("ALTER SYSTEM SET enable_table_keys = true")
.await?;
}
inner.ensure_fixed_features().await?;
inner.client = connect(inner.server_addr, None).await;
inner.system_client = connect(inner.internal_server_addr, Some("mz_system")).await;
inner.clients = BTreeMap::new();
Ok(())
}
}
impl<'a> RunnerInner<'a> {
pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
let temp_dir = tempfile::tempdir()?;
let scratch_dir = tempfile::tempdir()?;
let environment_id = EnvironmentId::for_tests();
let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
let postgres_url = &config.postgres_url;
info!(%postgres_url, "starting server");
let (client, conn) = Retry::default()
.max_tries(5)
.retry_async(|_| async {
match tokio_postgres::connect(postgres_url, NoTls).await {
Ok(c) => Ok(c),
Err(e) => {
error!(%e, "failed to connect to postgres");
Err(e)
}
}
})
.await?;
task::spawn(|| "sqllogictest_connect", async move {
if let Err(e) = conn.await {
panic!("connection error: {}", e);
}
});
client
.batch_execute(
"DROP SCHEMA IF EXISTS sqllogictest_tsoracle CASCADE;
CREATE SCHEMA IF NOT EXISTS sqllogictest_consensus;
CREATE SCHEMA sqllogictest_tsoracle;",
)
.await?;
(
format!("{postgres_url}?options=--search_path=sqllogictest_consensus")
.parse()
.expect("invalid consensus URI"),
format!("{postgres_url}?options=--search_path=sqllogictest_tsoracle")
.parse()
.expect("invalid timestamp oracle URI"),
)
};
let secrets_dir = temp_dir.path().join("secrets");
let orchestrator = Arc::new(
ProcessOrchestrator::new(ProcessOrchestratorConfig {
image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
suppress_output: false,
environment_id: environment_id.to_string(),
secrets_dir: secrets_dir.clone(),
command_wrapper: config
.orchestrator_process_wrapper
.as_ref()
.map_or(Ok(vec![]), |s| shell_words::split(s))?,
propagate_crashes: true,
tcp_proxy: None,
scratch_directory: scratch_dir.path().to_path_buf(),
})
.await?,
);
let now = SYSTEM_TIME.clone();
let metrics_registry = MetricsRegistry::new();
let persist_config = PersistConfig::new(
&mz_environmentd::BUILD_INFO,
now.clone(),
mz_dyncfgs::all_dyncfgs(),
);
let persist_pubsub_server =
PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
let persist_pubsub_tcp_listener =
TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
.await
.expect("pubsub addr binding");
let persist_pubsub_server_port = persist_pubsub_tcp_listener
.local_addr()
.expect("pubsub addr has local addr")
.port();
info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
mz_ore::task::spawn(|| "persist_pubsub_server", async move {
persist_pubsub_server
.serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
.await
.expect("success")
});
let persist_clients =
PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
cfg,
persist_pubsub_client.sender,
metrics,
));
PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
});
let persist_clients = Arc::new(persist_clients);
let secrets_controller = Arc::clone(&orchestrator);
let connection_context = ConnectionContext::for_tests(orchestrator.reader());
let orchestrator = Arc::new(TracingOrchestrator::new(
orchestrator,
config.tracing.clone(),
));
let listeners = mz_environmentd::Listeners::bind_any_local().await?;
let host_name = format!("localhost:{}", listeners.http_local_addr().port());
let catalog_config = CatalogConfig {
persist_clients: Arc::clone(&persist_clients),
metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
};
let server_config = mz_environmentd::Config {
catalog_config,
timestamp_oracle_url: Some(timestamp_oracle_url),
controller: ControllerConfig {
build_info: &mz_environmentd::BUILD_INFO,
orchestrator,
clusterd_image: "clusterd".into(),
init_container_image: None,
deploy_generation: 0,
persist_location: PersistLocation {
blob_uri: format!(
"file://{}/persist/blob",
config.persist_dir.path().display()
)
.parse()
.expect("invalid blob URI"),
consensus_uri,
},
persist_clients,
now: SYSTEM_TIME.clone(),
metrics_registry: metrics_registry.clone(),
persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
secrets_args: mz_service::secrets::SecretsReaderCliArgs {
secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
secrets_reader_local_file_dir: Some(secrets_dir),
secrets_reader_kubernetes_context: None,
secrets_reader_aws_prefix: None,
secrets_reader_name_prefix: None,
},
connection_context,
},
secrets_controller,
cloud_resource_controller: None,
tls: None,
frontegg: None,
cors_allowed_origin: AllowOrigin::list([]),
unsafe_mode: true,
all_features: false,
metrics_registry,
now,
environment_id,
cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
bootstrap_default_cluster_replica_size: config.replicas.to_string(),
bootstrap_builtin_system_cluster_replica_size: config.replicas.to_string(),
bootstrap_builtin_catalog_server_cluster_replica_size: config.replicas.to_string(),
bootstrap_builtin_probe_cluster_replica_size: config.replicas.to_string(),
bootstrap_builtin_support_cluster_replica_size: config.replicas.to_string(),
bootstrap_builtin_analytics_cluster_replica_size: config.replicas.to_string(),
system_parameter_defaults: {
let mut params = BTreeMap::new();
params.insert(
"log_filter".to_string(),
config.tracing.startup_log_filter.to_string(),
);
params.extend(config.system_parameter_defaults.clone());
params
},
availability_zones: Default::default(),
tracing_handle: config.tracing_handle.clone(),
storage_usage_collection_interval: Duration::from_secs(3600),
storage_usage_retention_period: None,
segment_api_key: None,
segment_client_side: false,
egress_addresses: vec![],
aws_account_id: None,
aws_privatelink_availability_zones: None,
launchdarkly_sdk_key: None,
launchdarkly_key_map: Default::default(),
config_sync_timeout: Duration::from_secs(30),
config_sync_loop_interval: None,
bootstrap_role: Some("materialize".into()),
http_host_name: Some(host_name),
internal_console_redirect_url: None,
tls_reload_certs: mz_server_core::cert_reload_never_reload(),
helm_chart_version: None,
};
let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
oneshot::channel();
let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
let server_thread = thread::spawn(|| {
let runtime = match Runtime::new() {
Ok(runtime) => runtime,
Err(e) => {
server_addr_tx
.send(Err(e.into()))
.expect("receiver should not drop first");
return;
}
};
let server = match runtime.block_on(listeners.serve(server_config)) {
Ok(runtime) => runtime,
Err(e) => {
server_addr_tx
.send(Err(e.into()))
.expect("receiver should not drop first");
return;
}
};
server_addr_tx
.send(Ok(server.sql_local_addr()))
.expect("receiver should not drop first");
internal_server_addr_tx
.send(server.internal_sql_local_addr())
.expect("receiver should not drop first");
internal_http_server_addr_tx
.send(server.internal_http_local_addr())
.expect("receiver should not drop first");
let _ = runtime.block_on(shutdown_trigger_rx);
});
let server_addr = server_addr_rx.await??;
let internal_server_addr = internal_server_addr_rx.await?;
let internal_http_server_addr = internal_http_server_addr_rx.await?;
let system_client = connect(internal_server_addr, Some("mz_system")).await;
let client = connect(server_addr, None).await;
let inner = RunnerInner {
server_addr,
internal_server_addr,
internal_http_server_addr,
_shutdown_trigger: shutdown_trigger,
_server_thread: server_thread.join_on_drop(),
_temp_dir: temp_dir,
client,
system_client,
clients: BTreeMap::new(),
auto_index_tables: config.auto_index_tables,
auto_index_selects: config.auto_index_selects,
auto_transactions: config.auto_transactions,
enable_table_keys: config.enable_table_keys,
verbosity: config.verbosity,
stdout: config.stdout,
};
inner.ensure_fixed_features().await?;
Ok(inner)
}
async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
self.system_client
.execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
.await?;
self.system_client
.execute("ALTER SYSTEM SET enable_unsafe_functions = on", &[])
.await?;
Ok(())
}
async fn run_record<'r>(
&mut self,
record: &'r Record<'r>,
in_transaction: &mut bool,
) -> Result<Outcome<'r>, anyhow::Error> {
match &record {
Record::Statement {
expected_error,
rows_affected,
sql,
location,
} => {
if self.auto_transactions && *in_transaction {
self.client.execute("COMMIT", &[]).await?;
*in_transaction = false;
}
match self
.run_statement(*expected_error, *rows_affected, sql, location.clone())
.await?
{
Outcome::Success => {
if self.auto_index_tables {
let additional = mutate(sql);
for stmt in additional {
self.client.execute(&stmt, &[]).await?;
}
}
Ok(Outcome::Success)
}
other => {
if expected_error.is_some() {
Ok(other)
} else {
Ok(Outcome::Bail {
cause: Box::new(other),
location: location.clone(),
})
}
}
}
}
Record::Query {
sql,
output,
location,
} => {
self.run_query(sql, output, location.clone(), in_transaction)
.await
}
Record::Simple {
conn,
user,
sql,
output,
location,
..
} => {
self.run_simple(*conn, *user, sql, output, location.clone())
.await
}
Record::Copy {
table_name,
tsv_path,
} => {
let tsv = tokio::fs::read(tsv_path).await?;
let copy = self
.client
.copy_in(&*format!("COPY {} FROM STDIN", table_name))
.await?;
tokio::pin!(copy);
copy.send(bytes::Bytes::from(tsv)).await?;
copy.finish().await?;
Ok(Outcome::Success)
}
_ => Ok(Outcome::Success),
}
}
async fn run_statement<'r>(
&self,
expected_error: Option<&'r str>,
expected_rows_affected: Option<u64>,
sql: &'r str,
location: Location,
) -> Result<Outcome<'r>, anyhow::Error> {
static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
return Ok(Outcome::Success);
}
match self.client.execute(sql, &[]).await {
Ok(actual) => {
if let Some(expected_error) = expected_error {
return Ok(Outcome::UnexpectedPlanSuccess {
expected_error,
location,
});
}
match expected_rows_affected {
None => Ok(Outcome::Success),
Some(expected) => {
if expected != actual {
Ok(Outcome::WrongNumberOfRowsInserted {
expected_count: expected,
actual_count: actual,
location,
})
} else {
Ok(Outcome::Success)
}
}
}
}
Err(error) => {
if let Some(expected_error) = expected_error {
if Regex::new(expected_error)?.is_match(&format!("{:#}", error)) {
return Ok(Outcome::Success);
}
}
Ok(Outcome::PlanFailure {
error: anyhow!(error),
location,
})
}
}
}
async fn prepare_query<'r>(
&self,
sql: &str,
output: &'r Result<QueryOutput<'_>, &'r str>,
location: Location,
in_transaction: &mut bool,
) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
let statements = match mz_sql::parse::parse(sql) {
Ok(statements) => statements,
Err(e) => match output {
Ok(_) => {
return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
error: e.into(),
location,
}));
}
Err(expected_error) => {
if Regex::new(expected_error)?.is_match(&format!("{:#}", e)) {
return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
} else {
return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
error: e.into(),
location,
}));
}
}
},
};
let statement = match &*statements {
[] => bail!("Got zero statements?"),
[statement] => &statement.ast,
_ => bail!("Got multiple statements: {:?}", statements),
};
let (is_select, num_attributes) = match statement {
Statement::Select(stmt) => (true, derive_num_attributes(&stmt.query.body)),
_ => (false, None),
};
match output {
Ok(_) => {
if self.auto_transactions && !*in_transaction {
self.client.execute("BEGIN", &[]).await?;
*in_transaction = true;
}
}
Err(_) => {
if self.auto_transactions && *in_transaction {
self.client.execute("COMMIT", &[]).await?;
*in_transaction = false;
}
}
}
match statement {
Statement::Show(..) => {
if self.auto_transactions && *in_transaction {
self.client.execute("COMMIT", &[]).await?;
*in_transaction = false;
}
}
_ => (),
}
Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
is_select,
num_attributes,
}))
}
async fn execute_query<'r>(
&self,
sql: &str,
output: &'r Result<QueryOutput<'_>, &'r str>,
location: Location,
) -> Result<Outcome<'r>, anyhow::Error> {
let rows = match self.client.query(sql, &[]).await {
Ok(rows) => rows,
Err(error) => {
return match output {
Ok(_) => {
let error_string = format!("{}", error);
if error_string.contains("supported") || error_string.contains("overload") {
Ok(Outcome::Unsupported {
error: anyhow!(error),
location,
})
} else {
Ok(Outcome::PlanFailure {
error: anyhow!(error),
location,
})
}
}
Err(expected_error) => {
if Regex::new(expected_error)?.is_match(&format!("{:#}", error)) {
Ok(Outcome::Success)
} else {
Ok(Outcome::PlanFailure {
error: anyhow!(error),
location,
})
}
}
};
}
};
let QueryOutput {
sort,
types: expected_types,
column_names: expected_column_names,
output: expected_output,
mode,
..
} = match output {
Err(expected_error) => {
return Ok(Outcome::UnexpectedPlanSuccess {
expected_error,
location,
});
}
Ok(query_output) => query_output,
};
let mut formatted_rows = vec![];
for row in &rows {
if row.len() != expected_types.len() {
return Ok(Outcome::WrongColumnCount {
expected_count: expected_types.len(),
actual_count: row.len(),
location,
});
}
let row = format_row(row, expected_types, *mode);
formatted_rows.push(row);
}
if let Sort::Row = sort {
formatted_rows.sort();
}
let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
if let Sort::Value = sort {
values.sort();
}
if let Some(row) = rows.get(0) {
if let Some(expected_column_names) = expected_column_names {
let actual_column_names = row
.columns()
.iter()
.map(|t| ColumnName::from(t.name()))
.collect::<Vec<_>>();
if expected_column_names != &actual_column_names {
return Ok(Outcome::WrongColumnNames {
expected_column_names,
actual_column_names,
actual_output: Output::Values(values),
location,
});
}
}
}
match expected_output {
Output::Values(expected_values) => {
if values != *expected_values {
return Ok(Outcome::OutputFailure {
expected_output,
actual_raw_output: rows,
actual_output: Output::Values(values),
location,
});
}
}
Output::Hashed {
num_values,
md5: expected_md5,
} => {
let mut hasher = Md5::new();
for value in &values {
hasher.update(value);
hasher.update("\n");
}
let md5 = format!("{:x}", hasher.finalize());
if values.len() != *num_values || md5 != *expected_md5 {
return Ok(Outcome::OutputFailure {
expected_output,
actual_raw_output: rows,
actual_output: Output::Hashed {
num_values: values.len(),
md5,
},
location,
});
}
}
}
Ok(Outcome::Success)
}
async fn execute_view_inner<'r>(
&self,
sql: &str,
output: &'r Result<QueryOutput<'_>, &'r str>,
location: Location,
) -> Result<Option<Outcome<'r>>, anyhow::Error> {
print_sql_if(self.stdout, sql, self.verbosity >= 2);
let sql_result = self.client.execute(sql, &[]).await;
let tentative_outcome = if let Err(view_error) = sql_result {
if let Err(expected_error) = output {
if Regex::new(expected_error)?.is_match(&format!("{:#}", view_error)) {
Some(Outcome::Success)
} else {
Some(Outcome::PlanFailure {
error: view_error.into(),
location: location.clone(),
})
}
} else {
Some(Outcome::PlanFailure {
error: view_error.into(),
location: location.clone(),
})
}
} else {
None
};
Ok(tentative_outcome)
}
async fn execute_view<'r>(
&self,
sql: &str,
num_attributes: Option<usize>,
output: &'r Result<QueryOutput<'_>, &'r str>,
location: Location,
) -> Result<Outcome<'r>, anyhow::Error> {
let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
column_names.clone()
} else {
None
};
let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
sql,
Uuid::new_v4().as_simple(),
num_attributes,
expected_column_names,
);
let tentative_outcome = self
.execute_view_inner(create_view.as_str(), output, location.clone())
.await?;
if let Some(view_outcome) = tentative_outcome {
return Ok(view_outcome);
}
let tentative_outcome = self
.execute_view_inner(create_index.as_str(), output, location.clone())
.await?;
let view_outcome;
if let Some(outcome) = tentative_outcome {
view_outcome = outcome;
} else {
print_sql_if(self.stdout, view_sql.as_str(), self.verbosity >= 2);
view_outcome = self
.execute_query(view_sql.as_str(), output, location.clone())
.await?;
}
print_sql_if(self.stdout, drop_view.as_str(), self.verbosity >= 2);
self.client.execute(drop_view.as_str(), &[]).await?;
Ok(view_outcome)
}
async fn run_query<'r>(
&self,
sql: &'r str,
output: &'r Result<QueryOutput<'_>, &'r str>,
location: Location,
in_transaction: &mut bool,
) -> Result<Outcome<'r>, anyhow::Error> {
let prepare_outcome = self
.prepare_query(sql, output, location.clone(), in_transaction)
.await?;
match prepare_outcome {
PrepareQueryOutcome::QueryPrepared(QueryInfo {
is_select,
num_attributes,
}) => {
let query_outcome = self.execute_query(sql, output, location.clone()).await?;
if is_select && self.auto_index_selects {
let view_outcome = self
.execute_view(sql, None, output, location.clone())
.await?;
if std::mem::discriminant::<Outcome>(&query_outcome)
!= std::mem::discriminant::<Outcome>(&view_outcome)
{
let view_outcome = if num_attributes.is_some() {
self.execute_view(sql, num_attributes, output, location.clone())
.await?
} else {
view_outcome
};
if std::mem::discriminant::<Outcome>(&query_outcome)
!= std::mem::discriminant::<Outcome>(&view_outcome)
{
let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
query_outcome: Box::new(query_outcome),
view_outcome: Box::new(view_outcome),
location: location.clone(),
};
let outcome = if should_warn(&inconsistent_view_outcome) {
Outcome::Warning {
cause: Box::new(inconsistent_view_outcome),
location: location.clone(),
}
} else {
inconsistent_view_outcome
};
return Ok(outcome);
}
}
}
Ok(query_outcome)
}
PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
}
}
async fn get_conn(
&mut self,
name: Option<&str>,
user: Option<&str>,
) -> &tokio_postgres::Client {
match name {
None => &self.client,
Some(name) => {
if !self.clients.contains_key(name) {
let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
self.internal_server_addr
} else {
self.server_addr
};
let client = connect(addr, user).await;
self.clients.insert(name.into(), client);
}
self.clients.get(name).unwrap()
}
}
}
async fn run_simple<'r>(
&mut self,
conn: Option<&'r str>,
user: Option<&'r str>,
sql: &'r str,
output: &'r Output,
location: Location,
) -> Result<Outcome<'r>, anyhow::Error> {
let client = self.get_conn(conn, user).await;
let actual = Output::Values(match client.simple_query(sql).await {
Ok(result) => result
.into_iter()
.filter_map(|m| match m {
SimpleQueryMessage::Row(row) => {
let mut s = vec![];
for i in 0..row.len() {
s.push(row.get(i).unwrap_or("NULL"));
}
Some(s.join(","))
}
SimpleQueryMessage::CommandComplete(count) => {
Some(format!("COMPLETE {}", count))
}
SimpleQueryMessage::RowDescription(_) => None,
_ => panic!("unexpected"),
})
.collect::<Vec<_>>(),
Err(error) => error.to_string().lines().map(|s| s.to_string()).collect(),
});
if *output != actual {
Ok(Outcome::OutputFailure {
expected_output: output,
actual_raw_output: vec![],
actual_output: actual,
location,
})
} else {
Ok(Outcome::Success)
}
}
async fn check_catalog(&self) -> Result<(), anyhow::Error> {
let url = format!(
"http://{}/api/catalog/check",
self.internal_http_server_addr
);
let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
if let Some(inconsistencies) = response.get("err") {
let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
.expect("serializing Value cannot fail");
Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
} else {
Ok(())
}
}
}
async fn connect(addr: SocketAddr, user: Option<&str>) -> tokio_postgres::Client {
let (client, connection) = tokio_postgres::connect(
&format!(
"host={} port={} user={}",
addr.ip(),
addr.port(),
user.unwrap_or("materialize")
),
NoTls,
)
.await
.unwrap();
task::spawn(|| "sqllogictest_connect", async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
client
}
pub trait WriteFmt {
fn write_fmt(&self, fmt: fmt::Arguments<'_>);
}
pub struct RunConfig<'a> {
pub stdout: &'a dyn WriteFmt,
pub stderr: &'a dyn WriteFmt,
pub verbosity: usize,
pub postgres_url: String,
pub no_fail: bool,
pub fail_fast: bool,
pub auto_index_tables: bool,
pub auto_index_selects: bool,
pub auto_transactions: bool,
pub enable_table_keys: bool,
pub orchestrator_process_wrapper: Option<String>,
pub tracing: TracingCliArgs,
pub tracing_handle: TracingHandle,
pub system_parameter_defaults: BTreeMap<String, String>,
pub persist_dir: TempDir,
pub replicas: usize,
}
fn print_record(config: &RunConfig<'_>, record: &Record) {
match record {
Record::Statement { sql, .. } | Record::Query { sql, .. } => print_sql(config.stdout, sql),
_ => (),
}
}
fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
if cond {
print_sql(stdout, sql)
}
}
fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str) {
writeln!(stdout, "{}", crate::util::indent(sql, 4))
}
const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
"cannot materialize call to",
"SHOW commands are not allowed in views",
"cannot create view with unstable dependencies",
"cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
"no schema has been selected to create in",
r#"system schema '\w+' cannot be modified"#,
r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
r#"column "[\w\?]+" specified more than once"#,
r#"column "(\w+\.)?\w+" does not exist"#,
];
fn should_warn(outcome: &Outcome) -> bool {
match outcome {
Outcome::InconsistentViewOutcome {
query_outcome,
view_outcome,
..
} => match (query_outcome.as_ref(), view_outcome.as_ref()) {
(Outcome::Success, Outcome::PlanFailure { error, .. }) => {
INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
Regex::new(s)
.expect("unexpected error in regular expression parsing")
.is_match(&format!("{:#}", error))
})
}
_ => false,
},
_ => false,
}
}
pub async fn run_string(
runner: &mut Runner<'_>,
source: &str,
input: &str,
) -> Result<Outcomes, anyhow::Error> {
runner.reset_database().await?;
let mut outcomes = Outcomes::default();
let mut parser = crate::parser::Parser::new(source, input);
let mut in_transaction = false;
writeln!(runner.config.stdout, "--- {}", source);
for record in parser.parse_records()? {
if runner.config.verbosity >= 2 {
print_record(runner.config, &record);
}
let outcome = runner
.run_record(&record, &mut in_transaction)
.await
.map_err(|err| format!("In {}:\n{}", source, err))
.unwrap();
if runner.config.verbosity >= 1 && !outcome.success() {
if runner.config.verbosity < 2 {
if !outcome.failure() {
writeln!(
runner.config.stdout,
"{}",
util::indent("Warning detected for: ", 4)
);
}
print_record(runner.config, &record);
}
if runner.config.verbosity >= 2 || outcome.failure() {
writeln!(
runner.config.stdout,
"{}",
util::indent(&outcome.to_string(), 4)
);
writeln!(runner.config.stdout, "{}", util::indent("----", 4));
}
}
outcomes.stats[outcome.code()] += 1;
if outcome.failure() {
outcomes.details.push(format!("{}", outcome));
}
if let Outcome::Bail { .. } = outcome {
break;
}
if runner.config.fail_fast && outcome.failure() {
break;
}
}
Ok(outcomes)
}
pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
let mut input = String::new();
File::open(filename)?.read_to_string(&mut input)?;
let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
runner.check_catalog().await?;
Ok(outcomes)
}
pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
runner.reset_database().await?;
let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
let mut input = String::new();
file.read_to_string(&mut input)?;
let mut buf = RewriteBuffer::new(&input);
let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
writeln!(runner.config.stdout, "--- {}", filename.display());
let mut in_transaction = false;
fn append_values_output(
buf: &mut RewriteBuffer,
input: &String,
expected_output: &str,
mode: &Mode,
types: &Vec<Type>,
column_names: Option<&Vec<ColumnName>>,
actual_output: &Vec<String>,
) {
buf.append_header(input, expected_output, column_names);
for (i, row) in actual_output.chunks(types.len()).enumerate() {
match mode {
Mode::Cockroach => {
if i != 0 {
buf.append("\n");
}
if row.len() <= 1 {
buf.append(&row.iter().join(" "));
} else {
buf.append(&row.iter().map(|col| col.replace(' ', "␠")).join(" "));
}
}
Mode::Standard => {
for (j, col) in row.iter().enumerate() {
if i != 0 || j != 0 {
buf.append("\n");
}
buf.append(col);
}
}
}
}
}
for record in parser.parse_records()? {
let outcome = runner.run_record(&record, &mut in_transaction).await?;
match (&record, &outcome) {
(
Record::Query {
output:
Ok(QueryOutput {
mode,
output: Output::Values(_),
output_str: expected_output,
types,
column_names,
..
}),
..
},
Outcome::OutputFailure {
actual_output: Output::Values(actual_output),
..
},
) => {
append_values_output(
&mut buf,
&input,
expected_output,
mode,
types,
column_names.as_ref(),
actual_output,
);
}
(
Record::Query {
output:
Ok(QueryOutput {
mode,
output: Output::Values(_),
output_str: expected_output,
types,
..
}),
..
},
Outcome::WrongColumnNames {
actual_column_names,
actual_output: Output::Values(actual_output),
..
},
) => {
append_values_output(
&mut buf,
&input,
expected_output,
mode,
types,
Some(actual_column_names),
actual_output,
);
}
(
Record::Query {
output:
Ok(QueryOutput {
output: Output::Hashed { .. },
output_str: expected_output,
column_names,
..
}),
..
},
Outcome::OutputFailure {
actual_output: Output::Hashed { num_values, md5 },
..
},
) => {
buf.append_header(&input, expected_output, column_names.as_ref());
buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
}
(
Record::Simple {
output_str: expected_output,
..
},
Outcome::OutputFailure {
actual_output: Output::Values(actual_output),
..
},
) => {
buf.append_header(&input, expected_output, None);
for (i, row) in actual_output.iter().enumerate() {
if i != 0 {
buf.append("\n");
}
buf.append(row);
}
}
(
Record::Query {
sql,
output: Err(err),
..
},
outcome,
)
| (
Record::Statement {
expected_error: Some(err),
sql,
..
},
outcome,
) if outcome.err_msg().is_some() => {
buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
}
(_, Outcome::Success) => {}
_ => bail!("unexpected: {:?} {:?}", record, outcome),
}
}
file.set_len(0)?;
file.seek(SeekFrom::Start(0))?;
file.write_all(buf.finish().as_bytes())?;
file.sync_all()?;
Ok(())
}
#[derive(Debug)]
struct RewriteBuffer<'a> {
input: &'a str,
input_offset: usize,
output: String,
}
impl<'a> RewriteBuffer<'a> {
fn new(input: &'a str) -> RewriteBuffer<'a> {
RewriteBuffer {
input,
input_offset: 0,
output: String::new(),
}
}
fn flush_to(&mut self, offset: usize) {
assert!(offset >= self.input_offset);
let chunk = &self.input[self.input_offset..offset];
self.output.push_str(chunk);
self.input_offset = offset;
}
fn skip_to(&mut self, offset: usize) {
assert!(offset >= self.input_offset);
self.input_offset = offset;
}
fn append(&mut self, s: &str) {
self.output.push_str(s);
}
fn append_header(
&mut self,
input: &String,
expected_output: &str,
column_names: Option<&Vec<ColumnName>>,
) {
#[allow(clippy::as_conversions)]
let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
self.flush_to(offset);
self.skip_to(offset + expected_output.len());
if self.peek_last(5) == "\n----" {
self.append("\n");
} else if self.peek_last(6) != "\n----\n" {
self.append("\n----\n");
}
let Some(names) = column_names else {
return;
};
self.append(
&names
.iter()
.map(|name| name.as_str().replace('␠', " "))
.collect::<Vec<_>>()
.join(" "),
);
self.append("\n");
}
fn rewrite_expected_error(
&mut self,
input: &String,
old_err: &str,
new_err: &str,
query: &str,
) {
#[allow(clippy::as_conversions)]
let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
self.flush_to(err_offset);
self.append(new_err);
self.append("\n");
self.append(query);
#[allow(clippy::as_conversions)]
self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
}
fn peek_last(&self, n: usize) -> &str {
&self.output[self.output.len() - n..]
}
fn finish(mut self) -> String {
self.flush_to(self.input.len());
self.output
}
}
fn generate_view_sql(
sql: &str,
view_uuid: &Simple,
num_attributes: Option<usize>,
expected_column_names: Option<Vec<ColumnName>>,
) -> (String, String, String, String) {
let stmts = parser::parse_statements(sql).unwrap_or_default();
assert!(stmts.len() == 1);
let (query, query_as_of) = match &stmts[0].ast {
Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
_ => unreachable!("This function should only be called for SELECTs"),
};
let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
(query.order_by.clone(), vec![], None)
} else {
derive_order_by(&query.body, &query.order_by)
};
let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
let projection = expected_column_names.map_or(
num_attributes.map_or(vec![], |n| {
(1..=n)
.map(|i| Ident::new_unchecked(format!("a{i}")))
.collect()
}),
|cols| {
cols.iter()
.map(|c| Ident::new_unchecked(c.as_str()))
.collect()
},
);
let columns: Vec<Ident> = projection
.iter()
.cloned()
.chain(extra_columns.iter().map(|item| {
if let SelectItem::Expr {
expr: _,
alias: Some(ident),
} = item
{
ident.clone()
} else {
unreachable!("alias must be given for extra column")
}
}))
.collect();
let mut query = query.clone();
if extra_columns.len() > 0 {
match &mut query.body {
SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
_ => unimplemented!("cannot yet rewrite projections of nested queries"),
}
}
let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
if_exists: IfExistsBehavior::Error,
temporary: false,
definition: ViewDefinition {
name: name.clone(),
columns: columns.clone(),
query,
},
})
.to_ast_string_stable();
let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
name: None,
in_cluster: None,
on_name: RawItemName::Name(name.clone()),
key_parts: if columns.len() == 0 {
None
} else {
Some(
columns
.iter()
.map(|ident| Expr::Identifier(vec![ident.clone()]))
.collect(),
)
},
with_options: Vec::new(),
if_not_exists: false,
})
.to_ast_string_stable();
let distinct_unneeded = extra_columns.len() == 0
|| match distinct {
None | Some(Distinct::On(_)) => true,
Some(Distinct::EntireRow) => false,
};
let distinct = if distinct_unneeded { None } else { distinct };
let view_sql = AstStatement::<Raw>::Select(SelectStatement {
query: Query {
ctes: CteBlock::Simple(vec![]),
body: SetExpr::Select(Box::new(Select {
distinct,
projection: if projection.len() == 0 {
vec![SelectItem::Wildcard]
} else {
projection
.iter()
.map(|ident| SelectItem::Expr {
expr: Expr::Identifier(vec![ident.clone()]),
alias: None,
})
.collect()
},
from: vec![TableWithJoins {
relation: TableFactor::Table {
name: RawItemName::Name(name.clone()),
alias: None,
},
joins: vec![],
}],
selection: None,
group_by: vec![],
having: None,
options: vec![],
})),
order_by: view_order_by,
limit: None,
offset: None,
},
as_of: query_as_of.clone(),
})
.to_ast_string_stable();
let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
object_type: ObjectType::View,
if_exists: false,
names: vec![UnresolvedObjectName::Item(name)],
cascade: false,
})
.to_ast_string_stable();
(create_view, create_index, view_sql, drop_view)
}
fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
let Some((projection, _)) = find_projection(body) else {
return None;
};
derive_num_attributes_from_projection(projection)
}
fn derive_order_by(
body: &SetExpr<Raw>,
order_by: &Vec<OrderByExpr<Raw>>,
) -> (
Vec<OrderByExpr<Raw>>,
Vec<SelectItem<Raw>>,
Option<Distinct<Raw>>,
) {
let Some((projection, distinct)) = find_projection(body) else {
return (vec![], vec![], None);
};
let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
(view_order_by, extra_columns, distinct.clone())
}
fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
let mut set_expr = body;
loop {
match set_expr {
SetExpr::Select(select) => {
return Some((&select.projection, &select.distinct));
}
SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
SetExpr::Query(query) => set_expr = &query.body,
_ => return None,
}
}
}
fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
let mut num_attributes = 0usize;
for item in projection.iter() {
let SelectItem::Expr { expr, .. } = item else {
return None;
};
match expr {
Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
return None;
}
_ => {
num_attributes += 1;
}
}
}
Some(num_attributes)
}
fn derive_order_by_from_projection(
projection: &Vec<SelectItem<Raw>>,
order_by: &Vec<OrderByExpr<Raw>>,
) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
for order_by_expr in order_by.iter() {
let query_expr = &order_by_expr.expr;
let view_expr = match query_expr {
Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
_ => {
if let Some(i) = projection.iter().position(|item| match item {
SelectItem::Expr { expr, alias } => {
expr == query_expr
|| match query_expr {
Expr::Identifier(ident) => {
ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
}
_ => false,
}
}
SelectItem::Wildcard => false,
}) {
Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
} else {
let ident = Ident::new_unchecked(format!(
"a{}",
(projection.len() + extra_columns.len() + 1)
));
extra_columns.push(SelectItem::Expr {
expr: query_expr.clone(),
alias: Some(ident.clone()),
});
Expr::Identifier(vec![ident])
}
}
};
view_order_by.push(OrderByExpr {
expr: view_expr,
asc: order_by_expr.asc,
nulls_last: order_by_expr.nulls_last,
});
}
(view_order_by, extra_columns)
}
fn mutate(sql: &str) -> Vec<String> {
let stmts = parser::parse_statements(sql).unwrap_or_default();
let mut additional = Vec::new();
for stmt in stmts {
match stmt.ast {
AstStatement::CreateTable(stmt) => additional.push(
AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
name: None,
in_cluster: None,
on_name: RawItemName::Name(stmt.name.clone()),
key_parts: Some(
stmt.columns
.iter()
.map(|def| Expr::Identifier(vec![def.name.clone()]))
.collect(),
),
with_options: Vec::new(),
if_not_exists: false,
})
.to_ast_string_stable(),
),
_ => {}
}
}
additional
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn test_generate_view_sql() {
let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
let cases = vec![
(("SELECT * FROM t", None, None),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("SELECT a, b, c FROM t1, t2", Some(3), None),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
(("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
(
r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
)),
];
for ((sql, num_attributes, expected_column_names), expected) in cases {
let view_sql =
generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
assert_eq!(expected, view_sql);
}
}
#[mz_ore::test]
fn test_mutate() {
let cases = vec![
("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
(
"CREATE TABLE t (a INT)",
vec![r#"CREATE INDEX ON "t" ("a")"#],
),
(
"CREATE TABLE t (a INT, b TEXT)",
vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
),
("BAD SYNTAX", Vec::new()),
];
for (sql, expected) in cases {
let stmts = mutate(sql);
assert_eq!(expected, stmts, "sql: {sql}");
}
}