1use std::collections::BTreeMap;
29use std::error::Error;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write};
32use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::LazyLock;
36use std::time::Duration;
37use std::{env, fmt, ops, str, thread};
38
39use anyhow::{anyhow, bail};
40use bytes::BytesMut;
41use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
42use fallible_iterator::FallibleIterator;
43use futures::sink::SinkExt;
44use itertools::Itertools;
45use maplit::btreemap;
46use md5::{Digest, Md5};
47use mz_adapter_types::bootstrap_builtin_cluster_config::{
48 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
49 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
50 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
51};
52use mz_catalog::config::ClusterReplicaSizeMap;
53use mz_controller::{ControllerConfig, ReplicaHttpLocator};
54use mz_environmentd::CatalogConfig;
55use mz_license_keys::ValidatedLicenseKey;
56use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
57use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
58use mz_ore::cast::{CastFrom, ReinterpretCast};
59use mz_ore::channel::trigger;
60use mz_ore::error::ErrorExt;
61use mz_ore::metrics::MetricsRegistry;
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::retry::Retry;
64use mz_ore::sql;
65use mz_ore::sql::Sql;
66use mz_ore::task;
67use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
68use mz_ore::tracing::TracingHandle;
69use mz_ore::url::SensitiveUrl;
70use mz_persist_client::PersistLocation;
71use mz_persist_client::cache::PersistClientCache;
72use mz_persist_client::cfg::PersistConfig;
73use mz_persist_client::rpc::{
74 MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
75};
76use mz_pgrepr::{Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value, oid};
77use mz_repr::ColumnName;
78use mz_repr::adt::date::Date;
79use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
80use mz_repr::adt::numeric;
81use mz_secrets::SecretsController;
82use mz_server_core::listeners::{
83 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
84 ListenersConfig, SqlListenerConfig,
85};
86use mz_sql::ast::{Expr, Raw, Statement};
87use mz_sql::catalog::EnvironmentId;
88use mz_sql_parser::ast::display::AstDisplay;
89use mz_sql_parser::ast::{
90 CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
91 IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
92 SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
93 UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
94};
95use mz_sql_parser::parser;
96use mz_storage_types::connections::ConnectionContext;
97use postgres_protocol::types;
98use regex::Regex;
99use tempfile::TempDir;
100use tokio::net::TcpListener;
101use tokio::runtime::Runtime;
102use tokio::sync::oneshot;
103use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
104use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
105use tokio_stream::wrappers::TcpListenerStream;
106use tower_http::cors::AllowOrigin;
107use tracing::{error, info};
108use uuid::Uuid;
109use uuid::fmt::Simple;
110
111use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
112use crate::util;
113
114#[derive(Debug)]
115pub enum Outcome<'a> {
116 Unsupported {
117 error: anyhow::Error,
118 location: Location,
119 },
120 ParseFailure {
121 error: anyhow::Error,
122 location: Location,
123 },
124 PlanFailure {
125 error: anyhow::Error,
126 expected_error: Option<String>,
127 location: Location,
128 },
129 UnexpectedPlanSuccess {
130 expected_error: &'a str,
131 location: Location,
132 },
133 WrongNumberOfRowsInserted {
134 expected_count: u64,
135 actual_count: u64,
136 location: Location,
137 },
138 WrongColumnCount {
139 expected_count: usize,
140 actual_count: usize,
141 location: Location,
142 },
143 WrongColumnNames {
144 expected_column_names: &'a Vec<ColumnName>,
145 actual_column_names: Vec<ColumnName>,
146 actual_output: Output,
147 location: Location,
148 },
149 OutputFailure {
150 expected_output: &'a Output,
151 actual_raw_output: Vec<Row>,
152 actual_output: Output,
153 location: Location,
154 },
155 InconsistentViewOutcome {
156 query_outcome: Box<Outcome<'a>>,
157 view_outcome: Box<Outcome<'a>>,
158 location: Location,
159 },
160 Bail {
161 cause: Box<Outcome<'a>>,
162 location: Location,
163 },
164 Warning {
165 cause: Box<Outcome<'a>>,
166 location: Location,
167 },
168 Success,
169}
170
171const NUM_OUTCOMES: usize = 12;
172const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
173const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
174
175impl<'a> Outcome<'a> {
176 fn code(&self) -> usize {
177 match self {
178 Outcome::Unsupported { .. } => 0,
179 Outcome::ParseFailure { .. } => 1,
180 Outcome::PlanFailure { .. } => 2,
181 Outcome::UnexpectedPlanSuccess { .. } => 3,
182 Outcome::WrongNumberOfRowsInserted { .. } => 4,
183 Outcome::WrongColumnCount { .. } => 5,
184 Outcome::WrongColumnNames { .. } => 6,
185 Outcome::OutputFailure { .. } => 7,
186 Outcome::InconsistentViewOutcome { .. } => 8,
187 Outcome::Bail { .. } => 9,
188 Outcome::Warning { .. } => 10,
189 Outcome::Success => 11,
190 }
191 }
192
193 fn success(&self) -> bool {
194 matches!(self, Outcome::Success)
195 }
196
197 fn failure(&self) -> bool {
198 !matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
199 }
200
201 fn err_msg(&self) -> Option<String> {
205 match self {
206 Outcome::Unsupported { error, .. }
207 | Outcome::ParseFailure { error, .. }
208 | Outcome::PlanFailure { error, .. } => {
209 let err_str = error.to_string_with_causes();
212 let err_str = err_str.split('\n').next().unwrap();
213 let err_str = err_str.strip_prefix("db error: ERROR: ").unwrap_or(err_str);
216 Some(regex::escape(err_str).replace(r"\#", "#"))
224 }
225 _ => None,
226 }
227 }
228}
229
230impl fmt::Display for Outcome<'_> {
231 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
232 use Outcome::*;
233 const INDENT: &str = "\n ";
234 match self {
235 Unsupported { error, location } => write!(
236 f,
237 "Unsupported:{}:\n{}",
238 location,
239 error.display_with_causes()
240 ),
241 ParseFailure { error, location } => {
242 write!(
243 f,
244 "ParseFailure:{}:\n{}",
245 location,
246 error.display_with_causes()
247 )
248 }
249 PlanFailure {
250 error,
251 expected_error,
252 location,
253 } => {
254 if let Some(expected_error) = expected_error {
255 write!(
256 f,
257 "PlanFailure:{}:\nerror does not match expected pattern:\n expected: /{}/\n actual: {}",
258 location,
259 expected_error,
260 error.display_with_causes()
261 )
262 } else {
263 write!(f, "PlanFailure:{}:\n{:#}", location, error)
264 }
265 }
266 UnexpectedPlanSuccess {
267 expected_error,
268 location,
269 } => write!(
270 f,
271 "UnexpectedPlanSuccess:{} expected error: {}",
272 location, expected_error
273 ),
274 WrongNumberOfRowsInserted {
275 expected_count,
276 actual_count,
277 location,
278 } => write!(
279 f,
280 "WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
281 location, INDENT, expected_count, INDENT, actual_count
282 ),
283 WrongColumnCount {
284 expected_count,
285 actual_count,
286 location,
287 } => write!(
288 f,
289 "WrongColumnCount:{}{}expected: {}{}actually: {}",
290 location, INDENT, expected_count, INDENT, actual_count
291 ),
292 WrongColumnNames {
293 expected_column_names,
294 actual_column_names,
295 actual_output: _,
296 location,
297 } => write!(
298 f,
299 "Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
300 location,
301 INDENT,
302 expected_column_names
303 .iter()
304 .map(|n| n.to_string())
305 .collect::<Vec<_>>()
306 .join(" "),
307 INDENT,
308 actual_column_names
309 .iter()
310 .map(|n| n.to_string())
311 .collect::<Vec<_>>()
312 .join(" ")
313 ),
314 OutputFailure {
315 expected_output,
316 actual_raw_output,
317 actual_output,
318 location,
319 } => write!(
320 f,
321 "OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
322 location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
323 ),
324 InconsistentViewOutcome {
325 query_outcome,
326 view_outcome,
327 location,
328 } => write!(
329 f,
330 "InconsistentViewOutcome:{}{}expected from query: {}{}actually from indexed view: {}",
331 location, INDENT, query_outcome, INDENT, view_outcome
332 ),
333 Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
334 Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
335 Success => f.write_str("Success"),
336 }
337 }
338}
339
340#[derive(Default, Debug)]
341pub struct Outcomes {
342 stats: [usize; NUM_OUTCOMES],
343 details: Vec<String>,
344}
345
346impl ops::AddAssign<Outcomes> for Outcomes {
347 fn add_assign(&mut self, rhs: Outcomes) {
348 for (lhs, rhs) in self.stats.iter_mut().zip_eq(rhs.stats.iter()) {
349 *lhs += rhs
350 }
351 }
352}
353impl Outcomes {
354 pub fn any_failed(&self) -> bool {
355 self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
356 }
357
358 pub fn as_json(&self) -> serde_json::Value {
359 serde_json::json!({
360 "unsupported": self.stats[0],
361 "parse_failure": self.stats[1],
362 "plan_failure": self.stats[2],
363 "unexpected_plan_success": self.stats[3],
364 "wrong_number_of_rows_affected": self.stats[4],
365 "wrong_column_count": self.stats[5],
366 "wrong_column_names": self.stats[6],
367 "output_failure": self.stats[7],
368 "inconsistent_view_outcome": self.stats[8],
369 "bail": self.stats[9],
370 "warning": self.stats[10],
371 "success": self.stats[11],
372 })
373 }
374
375 pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
376 OutcomesDisplay {
377 inner: self,
378 no_fail,
379 failure_details,
380 }
381 }
382}
383
384pub struct OutcomesDisplay<'a> {
385 inner: &'a Outcomes,
386 no_fail: bool,
387 failure_details: bool,
388}
389
390impl<'a> fmt::Display for OutcomesDisplay<'a> {
391 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
392 let total: usize = self.inner.stats.iter().sum();
393 if self.failure_details
394 && (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
395 || self.no_fail)
396 {
397 for outcome in &self.inner.details {
398 writeln!(f, "{}", outcome)?;
399 }
400 Ok(())
401 } else {
402 write!(
403 f,
404 "{}:",
405 if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
406 "PASS"
407 } else if self.no_fail {
408 "FAIL-IGNORE"
409 } else {
410 "FAIL"
411 }
412 )?;
413 static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
414 vec![
415 "unsupported",
416 "parse-failure",
417 "plan-failure",
418 "unexpected-plan-success",
419 "wrong-number-of-rows-inserted",
420 "wrong-column-count",
421 "wrong-column-names",
422 "output-failure",
423 "inconsistent-view-outcome",
424 "bail",
425 "warning",
426 "success",
427 "total",
428 ]
429 });
430 for (i, n) in self.inner.stats.iter().enumerate() {
431 if *n > 0 {
432 write!(f, " {}={}", NAMES[i], n)?;
433 }
434 }
435 write!(f, " total={}", total)
436 }
437 }
438}
439
440struct QueryInfo {
441 is_select: bool,
442 num_attributes: Option<usize>,
443 has_as_of: bool,
449}
450
451enum PrepareQueryOutcome<'a> {
452 QueryPrepared(QueryInfo),
453 Outcome(Outcome<'a>),
454}
455
456pub struct Runner<'a> {
457 config: &'a RunConfig<'a>,
458 inner: Option<RunnerInner<'a>>,
459 replacements: Vec<(Regex, String)>,
463}
464
465pub struct RunnerInner<'a> {
466 server_addr: SocketAddr,
467 internal_server_addr: SocketAddr,
468 password_server_addr: SocketAddr,
469 internal_http_server_addr: SocketAddr,
470 client: tokio_postgres::Client,
472 system_client: tokio_postgres::Client,
473 clients: BTreeMap<String, tokio_postgres::Client>,
474 auto_index_tables: bool,
475 auto_index_selects: bool,
476 auto_transactions: bool,
477 enable_table_keys: bool,
478 verbose: bool,
479 stdout: &'a dyn WriteFmt,
480 _shutdown_trigger: trigger::Trigger,
481 _server_thread: JoinOnDropHandle<()>,
482 _temp_dir: TempDir,
483}
484
485#[derive(Debug)]
486pub struct Slt(Value);
487
488impl<'a> FromSql<'a> for Slt {
489 fn from_sql(
490 ty: &PgType,
491 mut raw: &'a [u8],
492 ) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
493 Ok(match *ty {
494 PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
495 types::bytea_from_sql(raw),
496 )?)),
497 PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
498 PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
499 PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
500 types::char_from_sql(raw)?.to_be_bytes(),
501 ))),
502 PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
503 PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
504 PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
505 raw,
506 )?)?)),
507 PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
508 PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
509 PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
510 PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
511 PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
512 PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
513 PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
514 PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
515 PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
516 PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
517 PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
518 PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
519 Self(Value::Text(types::text_from_sql(raw)?.to_string()))
520 }
521 PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
522 PgType::TIMESTAMP => Self(Value::Timestamp(
523 NaiveDateTime::from_sql(ty, raw)?.try_into()?,
524 )),
525 PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
526 DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
527 )),
528 PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
529 PgType::RECORD => {
530 let num_fields = read_be_i32(&mut raw)?;
531 let mut tuple = vec![];
532 for _ in 0..num_fields {
533 let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
534 let typ = match PgType::from_oid(oid) {
535 Some(typ) => typ,
536 None => return Err("unknown oid".into()),
537 };
538 let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
539 tuple.push(v.map(|v| v.0));
540 }
541 Self(Value::Record(tuple))
542 }
543 PgType::INT4_RANGE
544 | PgType::INT8_RANGE
545 | PgType::DATE_RANGE
546 | PgType::NUM_RANGE
547 | PgType::TS_RANGE
548 | PgType::TSTZ_RANGE => {
549 use mz_repr::adt::range::Range;
550 let range: Range<Slt> = Range::from_sql(ty, raw)?;
551 Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
552 }
553
554 _ => match ty.kind() {
555 PgKind::Array(arr_type) => {
556 let arr = types::array_from_sql(raw)?;
557 let elements: Vec<Option<Value>> = arr
558 .values()
559 .map(|v| match v {
560 Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
561 None => Ok(None),
562 })
563 .collect::<Vec<Option<Slt>>>()?
564 .into_iter()
565 .map(|v| v.map(|v| v.0))
567 .collect();
568
569 Self(Value::Array {
570 dims: arr
571 .dimensions()
572 .map(|d| {
573 Ok(mz_repr::adt::array::ArrayDimension {
574 lower_bound: isize::cast_from(d.lower_bound),
575 length: usize::try_from(d.len)
576 .expect("cannot have negative length"),
577 })
578 })
579 .collect()?,
580 elements,
581 })
582 }
583 _ => match ty.oid() {
584 oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
585 oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
586 oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
587 oid::TYPE_MZ_TIMESTAMP_OID => {
588 let s = types::text_from_sql(raw)?;
589 let t: mz_repr::Timestamp = s.parse()?;
590 Self(Value::MzTimestamp(t))
591 }
592 oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
593 types::bytea_from_sql(raw),
594 )?)),
595 _ => unreachable!(),
596 },
597 },
598 })
599 }
600 fn accepts(ty: &PgType) -> bool {
601 match ty.kind() {
602 PgKind::Array(_) | PgKind::Composite(_) => return true,
603 _ => {}
604 }
605 match ty.oid() {
606 oid::TYPE_UINT2_OID
607 | oid::TYPE_UINT4_OID
608 | oid::TYPE_UINT8_OID
609 | oid::TYPE_MZ_TIMESTAMP_OID
610 | oid::TYPE_MZ_ACL_ITEM_OID => return true,
611 _ => {}
612 }
613 matches!(
614 *ty,
615 PgType::ACLITEM
616 | PgType::BOOL
617 | PgType::BYTEA
618 | PgType::CHAR
619 | PgType::DATE
620 | PgType::FLOAT4
621 | PgType::FLOAT8
622 | PgType::INT2
623 | PgType::INT4
624 | PgType::INT8
625 | PgType::INTERVAL
626 | PgType::JSONB
627 | PgType::NAME
628 | PgType::NUMERIC
629 | PgType::OID
630 | PgType::REGCLASS
631 | PgType::REGPROC
632 | PgType::REGTYPE
633 | PgType::RECORD
634 | PgType::TEXT
635 | PgType::BPCHAR
636 | PgType::VARCHAR
637 | PgType::TIME
638 | PgType::TIMESTAMP
639 | PgType::TIMESTAMPTZ
640 | PgType::UUID
641 | PgType::INT4_RANGE
642 | PgType::INT4_RANGE_ARRAY
643 | PgType::INT8_RANGE
644 | PgType::INT8_RANGE_ARRAY
645 | PgType::DATE_RANGE
646 | PgType::DATE_RANGE_ARRAY
647 | PgType::NUM_RANGE
648 | PgType::NUM_RANGE_ARRAY
649 | PgType::TS_RANGE
650 | PgType::TS_RANGE_ARRAY
651 | PgType::TSTZ_RANGE
652 | PgType::TSTZ_RANGE_ARRAY
653 )
654 }
655}
656
657fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
659 if buf.len() < 4 {
660 return Err("invalid buffer size".into());
661 }
662 let mut bytes = [0; 4];
663 bytes.copy_from_slice(&buf[..4]);
664 *buf = &buf[4..];
665 Ok(i32::from_be_bytes(bytes))
666}
667
668fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
670where
671 T: FromSql<'a>,
672{
673 let value = match usize::try_from(read_be_i32(buf)?) {
674 Err(_) => None,
675 Ok(len) => {
676 if len > buf.len() {
677 return Err("invalid buffer size".into());
678 }
679 let (head, tail) = buf.split_at(len);
680 *buf = tail;
681 Some(head)
682 }
683 };
684 T::from_sql_nullable(type_, value)
685}
686
687fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
688 match (typ, d.0) {
689 (Type::Bool, Value::Bool(b)) => b.to_string(),
690
691 (Type::Integer, Value::Int2(i)) => i.to_string(),
692 (Type::Integer, Value::Int4(i)) => i.to_string(),
693 (Type::Integer, Value::Int8(i)) => i.to_string(),
694 (Type::Integer, Value::UInt2(u)) => u.0.to_string(),
695 (Type::Integer, Value::UInt4(u)) => u.0.to_string(),
696 (Type::Integer, Value::UInt8(u)) => u.0.to_string(),
697 (Type::Integer, Value::Oid(i)) => i.to_string(),
698 #[allow(clippy::as_conversions)]
700 (Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
701 #[allow(clippy::as_conversions)]
703 (Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
704 (Type::Integer, Value::Text(_)) => "0".to_string(),
706 (Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
707 (Type::Integer, Value::Numeric(d)) => {
708 let mut d = d.0.0.clone();
709 let mut cx = numeric::cx_datum();
710 if mode == Mode::Standard {
712 cx.set_rounding(dec::Rounding::Down);
713 }
714 cx.round(&mut d);
715 numeric::munge_numeric(&mut d).unwrap();
716 d.to_standard_notation_string()
717 }
718
719 (Type::Real, Value::Int2(i)) => format!("{:.3}", i),
720 (Type::Real, Value::Int4(i)) => format!("{:.3}", i),
721 (Type::Real, Value::Int8(i)) => format!("{:.3}", i),
722 (Type::Real, Value::Float4(f)) => match mode {
723 Mode::Standard => format!("{:.3}", f),
724 Mode::Cockroach => format!("{}", f),
725 },
726 (Type::Real, Value::Float8(f)) => match mode {
727 Mode::Standard => format!("{:.3}", f),
728 Mode::Cockroach => format!("{}", f),
729 },
730 (Type::Real, Value::Numeric(d)) => match mode {
731 Mode::Standard => {
732 let mut d = d.0.0.clone();
733 if d.exponent() < -3 {
734 numeric::rescale(&mut d, 3).unwrap();
735 }
736 numeric::munge_numeric(&mut d).unwrap();
737 d.to_standard_notation_string()
738 }
739 Mode::Cockroach => d.0.0.to_standard_notation_string(),
740 },
741
742 (Type::Text, Value::Text(s)) => {
743 if s.is_empty() {
744 "(empty)".to_string()
745 } else {
746 s
747 }
748 }
749 (Type::Text, Value::Bool(b)) => b.to_string(),
750 (Type::Text, Value::Float4(f)) => format!("{:.3}", f),
751 (Type::Text, Value::Float8(f)) => format!("{:.3}", f),
752 (Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
758 Ok(s) => s.to_string(),
759 Err(_) => format!("{:?}", b),
760 },
761 (Type::Text, Value::Numeric(d)) => d.0.0.to_standard_notation_string(),
762 (Type::Text, d) => {
765 let mut buf = BytesMut::new();
766 d.encode_text(&mut buf);
767 String::from_utf8_lossy(&buf).into_owned()
768 }
769
770 (Type::Oid, Value::Oid(o)) => o.to_string(),
771
772 (_, d) => panic!(
773 "Don't know how to format {:?} as {:?} in column {}",
774 d, typ, col,
775 ),
776 }
777}
778
779fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
780 let mut formatted: Vec<String> = vec![];
781 for i in 0..row.len() {
782 let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
783 let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
784 formatted.push(match t {
785 Some(t) => t,
786 None => "NULL".into(),
787 });
788 }
789
790 formatted
791}
792
793impl<'a> Runner<'a> {
794 pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
795 let mut runner = Self {
796 config,
797 inner: None,
798 replacements: Vec::new(),
799 };
800 runner.reset().await?;
801 Ok(runner)
802 }
803
804 pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
805 drop(self.inner.take());
808 self.inner = Some(RunnerInner::start(self.config).await?);
809
810 Ok(())
811 }
812
813 async fn run_record<'r>(
814 &mut self,
815 record: &'r Record<'r>,
816 in_transaction: &mut bool,
817 ) -> Result<Outcome<'r>, anyhow::Error> {
818 if let Record::ResetServer = record {
819 self.reset().await?;
820 Ok(Outcome::Success)
821 } else if let Record::Replace {
822 pattern,
823 replacement,
824 } = record
825 {
826 let regex = Regex::new(pattern).expect("replace regex validated by parser");
828 self.replacements.push((regex, replacement.clone()));
829 Ok(Outcome::Success)
830 } else {
831 self.inner
832 .as_mut()
833 .expect("RunnerInner missing")
834 .run_record(record, in_transaction, &self.replacements)
835 .await
836 }
837 }
838
839 async fn check_catalog(&self) -> Result<(), anyhow::Error> {
840 self.inner
841 .as_ref()
842 .expect("RunnerInner missing")
843 .check_catalog()
844 .await
845 }
846
847 #[allow(clippy::disallowed_methods)]
848 async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
849 let inner = self.inner.as_mut().expect("RunnerInner missing");
850
851 inner.client.batch_execute("ROLLBACK;").await?;
852
853 inner
854 .system_client
855 .batch_execute(
856 "ROLLBACK;
857 SET cluster = mz_catalog_server;
858 RESET cluster_replica;",
859 )
860 .await?;
861
862 inner
863 .system_client
864 .batch_execute("ALTER SYSTEM RESET ALL")
865 .await?;
866
867 for row in inner
869 .system_client
870 .query("SELECT name FROM mz_databases", &[])
871 .await?
872 {
873 let name: &str = row.get("name");
874 inner
875 .system_client
876 .batch_execute(sql!("DROP DATABASE {}", Sql::ident(name)).as_str())
877 .await?;
878 }
879 inner
880 .system_client
881 .batch_execute("CREATE DATABASE materialize")
882 .await?;
883
884 let mut needs_default_cluster = true;
888 for row in inner
889 .system_client
890 .query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
891 .await?
892 {
893 match row.get("name") {
894 "quickstart" => needs_default_cluster = false,
895 name => {
896 inner
897 .system_client
898 .batch_execute(sql!("DROP CLUSTER {}", Sql::ident(name)).as_str())
899 .await?
900 }
901 }
902 }
903 if needs_default_cluster {
904 inner
905 .system_client
906 .batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
907 .await?;
908 }
909 let mut needs_default_replica = false;
910 let rows = inner
911 .system_client
912 .query(
913 "SELECT name, size FROM mz_cluster_replicas
914 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')
915 ORDER BY name",
916 &[],
917 )
918 .await?;
919 if rows.len() != self.config.replicas {
920 needs_default_replica = true;
921 } else {
922 for (i, row) in rows.iter().enumerate() {
923 let name: &str = row.get("name");
924 let size: &str = row.get("size");
925 if name != format!("r{}", i + 1) || size != self.config.replica_size {
926 needs_default_replica = true;
927 break;
928 }
929 }
930 }
931
932 if needs_default_replica {
933 inner
934 .system_client
935 .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = false)")
936 .await?;
937 for row in inner
938 .system_client
939 .query(
940 "SELECT name FROM mz_cluster_replicas
941 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
942 &[],
943 )
944 .await?
945 {
946 let name: &str = row.get("name");
947 inner
948 .system_client
949 .batch_execute(
950 sql!("DROP CLUSTER REPLICA quickstart.{}", Sql::ident(name)).as_str(),
951 )
952 .await?;
953 }
954 for i in 1..=self.config.replicas {
955 inner
956 .system_client
957 .batch_execute(
958 sql!(
959 "CREATE CLUSTER REPLICA quickstart.r{} SIZE {}",
960 i,
961 Sql::literal(&self.config.replica_size)
962 )
963 .as_str(),
964 )
965 .await?;
966 }
967 inner
968 .system_client
969 .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = true)")
970 .await?;
971 }
972
973 inner
975 .system_client
976 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
977 .await?;
978 inner
979 .system_client
980 .batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
981 .await?;
982 inner
983 .system_client
984 .batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
985 .await?;
986 inner
987 .system_client
988 .batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
989 .await?;
990 inner
991 .system_client
992 .batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
993 .await?;
994
995 inner
998 .system_client
999 .simple_query("ALTER SYSTEM SET max_tables = 100")
1000 .await?;
1001
1002 if inner.enable_table_keys {
1003 inner
1004 .system_client
1005 .simple_query("ALTER SYSTEM SET unsafe_enable_table_keys = true")
1006 .await?;
1007 }
1008
1009 inner.ensure_fixed_features().await?;
1010
1011 inner.client = connect(inner.server_addr, None, None).await.unwrap();
1012 inner.system_client = connect(inner.internal_server_addr, Some("mz_system"), None)
1013 .await
1014 .unwrap();
1015 inner.clients = BTreeMap::new();
1016
1017 Ok(())
1018 }
1019}
1020
1021impl<'a> RunnerInner<'a> {
1022 pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
1023 let temp_dir = tempfile::tempdir()?;
1024 let scratch_dir = tempfile::tempdir()?;
1025 let environment_id = EnvironmentId::for_tests();
1026 let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
1027 let postgres_url = &config.postgres_url;
1028 let prefix = &config.prefix;
1029 info!(%postgres_url, "starting server");
1030 let (client, conn) = Retry::default()
1031 .max_tries(5)
1032 .retry_async(|_| async {
1033 match tokio_postgres::connect(postgres_url, NoTls).await {
1034 Ok(c) => Ok(c),
1035 Err(e) => {
1036 error!(%e, "failed to connect to postgres");
1037 Err(e)
1038 }
1039 }
1040 })
1041 .await?;
1042 task::spawn(|| "sqllogictest_connect", async move {
1043 if let Err(e) = conn.await {
1044 panic!("connection error: {}", e);
1045 }
1046 });
1047 #[allow(clippy::disallowed_methods)]
1050 client
1051 .batch_execute(&format!(
1052 "DROP SCHEMA IF EXISTS {prefix}_tsoracle CASCADE;
1053 CREATE SCHEMA IF NOT EXISTS {prefix}_consensus;
1054 CREATE SCHEMA {prefix}_tsoracle;"
1055 ))
1056 .await?;
1057 (
1058 format!("{postgres_url}?options=--search_path={prefix}_consensus")
1059 .parse()
1060 .expect("invalid consensus URI"),
1061 format!("{postgres_url}?options=--search_path={prefix}_tsoracle")
1062 .parse()
1063 .expect("invalid timestamp oracle URI"),
1064 )
1065 };
1066
1067 let secrets_dir = temp_dir.path().join("secrets");
1068 let orchestrator = Arc::new(
1069 ProcessOrchestrator::new(ProcessOrchestratorConfig {
1070 image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
1071 suppress_output: false,
1072 environment_id: environment_id.to_string(),
1073 secrets_dir: secrets_dir.clone(),
1074 command_wrapper: config
1075 .orchestrator_process_wrapper
1076 .as_ref()
1077 .map_or(Ok(vec![]), |s| shell_words::split(s))?,
1078 propagate_crashes: true,
1079 tcp_proxy: None,
1080 scratch_directory: scratch_dir.path().to_path_buf(),
1081 })
1082 .await?,
1083 );
1084 let now = SYSTEM_TIME.clone();
1085 let metrics_registry = MetricsRegistry::new();
1086
1087 let persist_config = PersistConfig::new(
1088 &mz_environmentd::BUILD_INFO,
1089 now.clone(),
1090 mz_dyncfgs::all_dyncfgs(),
1091 );
1092 let persist_pubsub_server =
1093 PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
1094 let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
1095 let persist_pubsub_tcp_listener =
1096 TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
1097 .await
1098 .expect("pubsub addr binding");
1099 let persist_pubsub_server_port = persist_pubsub_tcp_listener
1100 .local_addr()
1101 .expect("pubsub addr has local addr")
1102 .port();
1103 info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
1104 mz_ore::task::spawn(|| "persist_pubsub_server", async move {
1105 persist_pubsub_server
1106 .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
1107 .await
1108 .expect("success")
1109 });
1110 let persist_clients =
1111 PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1112 let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1113 cfg,
1114 persist_pubsub_client.sender,
1115 metrics,
1116 ));
1117 PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1118 });
1119 let persist_clients = Arc::new(persist_clients);
1120
1121 let secrets_controller = Arc::clone(&orchestrator);
1122 let connection_context = ConnectionContext::for_tests(orchestrator.reader());
1123 let orchestrator = Arc::new(TracingOrchestrator::new(
1124 orchestrator,
1125 config.tracing.clone(),
1126 ));
1127 let listeners_config = ListenersConfig {
1128 sql: btreemap! {
1129 "external".to_owned() => SqlListenerConfig {
1130 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1131 authenticator_kind: AuthenticatorKind::None,
1132 allowed_roles: AllowedRoles::Normal,
1133 enable_tls: false,
1134 },
1135 "internal".to_owned() => SqlListenerConfig {
1136 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1137 authenticator_kind: AuthenticatorKind::None,
1138 allowed_roles: AllowedRoles::Internal,
1139 enable_tls: false,
1140 },
1141 "password".to_owned() => SqlListenerConfig {
1142 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1143 authenticator_kind: AuthenticatorKind::Password,
1144 allowed_roles: AllowedRoles::Normal,
1145 enable_tls: false,
1146 },
1147 },
1148 http: btreemap![
1149 "external".to_owned() => HttpListenerConfig {
1150 base: BaseListenerConfig {
1151 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1152 authenticator_kind: AuthenticatorKind::None,
1153 allowed_roles: AllowedRoles::Normal,
1154 enable_tls: false
1155 },
1156 routes: HttpRoutesEnabled {
1157 base: true,
1158 webhook: true,
1159 internal: false,
1160 metrics: false,
1161 profiling: false,
1162 mcp_agent: false,
1163 mcp_developer: false,
1164 console_config: true,
1165 },
1166 },
1167 "internal".to_owned() => HttpListenerConfig {
1168 base: BaseListenerConfig {
1169 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1170 authenticator_kind: AuthenticatorKind::None,
1171 allowed_roles: AllowedRoles::NormalAndInternal,
1172 enable_tls: false
1173 },
1174 routes: HttpRoutesEnabled {
1175 base: true,
1176 webhook: true,
1177 internal: true,
1178 metrics: true,
1179 profiling: true,
1180 mcp_agent: false,
1181 mcp_developer: false,
1182 console_config: false,
1183 },
1184 },
1185 ],
1186 };
1187 let listeners = mz_environmentd::Listeners::bind(listeners_config).await?;
1188 let host_name = format!(
1189 "localhost:{}",
1190 listeners.http["external"].handle.local_addr.port()
1191 );
1192 let catalog_config = CatalogConfig {
1193 persist_clients: Arc::clone(&persist_clients),
1194 metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
1195 };
1196 let server_config = mz_environmentd::Config {
1197 catalog_config,
1198 timestamp_oracle_url: Some(timestamp_oracle_url),
1199 controller: ControllerConfig {
1200 build_info: &mz_environmentd::BUILD_INFO,
1201 orchestrator,
1202 clusterd_image: "clusterd".into(),
1203 init_container_image: None,
1204 deploy_generation: 0,
1205 persist_location: PersistLocation {
1206 blob_uri: format!(
1207 "file://{}/persist/blob",
1208 config.persist_dir.path().display()
1209 )
1210 .parse()
1211 .expect("invalid blob URI"),
1212 consensus_uri,
1213 },
1214 persist_clients,
1215 now: SYSTEM_TIME.clone(),
1216 metrics_registry: metrics_registry.clone(),
1217 persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
1218 secrets_args: mz_service::secrets::SecretsReaderCliArgs {
1219 secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
1220 secrets_reader_local_file_dir: Some(secrets_dir),
1221 secrets_reader_kubernetes_context: None,
1222 secrets_reader_aws_prefix: None,
1223 secrets_reader_name_prefix: None,
1224 },
1225 connection_context,
1226 replica_http_locator: Arc::new(ReplicaHttpLocator::default()),
1227 },
1228 secrets_controller,
1229 cloud_resource_controller: None,
1230 tls: None,
1231 frontegg: None,
1232 frontegg_oauth_issuer_url: None,
1233 cors_allowed_origin: AllowOrigin::list([]),
1234 cors_allowed_origin_list: Vec::new(),
1235 unsafe_mode: true,
1236 all_features: false,
1237 metrics_registry,
1238 now,
1239 environment_id,
1240 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
1241 bootstrap_default_cluster_replica_size: config.replica_size.clone(),
1242 bootstrap_default_cluster_replication_factor: config
1243 .replicas
1244 .try_into()
1245 .expect("replicas must fit"),
1246 bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1247 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1248 size: config.replica_size.clone(),
1249 },
1250 bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1251 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1252 size: config.replica_size.clone(),
1253 },
1254 bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1255 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1256 size: config.replica_size.clone(),
1257 },
1258 bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1259 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1260 size: config.replica_size.clone(),
1261 },
1262 bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1263 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1264 size: config.replica_size.clone(),
1265 },
1266 system_parameter_defaults: {
1267 let mut params = BTreeMap::new();
1268 params.insert(
1269 "log_filter".to_string(),
1270 config.tracing.startup_log_filter.to_string(),
1271 );
1272 params.extend(config.system_parameter_defaults.clone());
1273 params
1274 },
1275 availability_zones: Default::default(),
1276 tracing_handle: config.tracing_handle.clone(),
1277 storage_usage_collection_interval: Duration::from_secs(3600),
1278 storage_usage_retention_period: None,
1279 segment_api_key: None,
1280 segment_client_side: false,
1281 test_only_dummy_segment_client: false,
1283 egress_addresses: vec![],
1284 aws_account_id: None,
1285 aws_privatelink_availability_zones: None,
1286 launchdarkly_sdk_key: None,
1287 launchdarkly_key_map: Default::default(),
1288 config_sync_file_path: None,
1289 config_sync_timeout: Duration::from_secs(30),
1290 config_sync_loop_interval: None,
1291 bootstrap_role: Some("materialize".into()),
1292 http_host_name: Some(host_name),
1293 internal_console_redirect_url: None,
1294 tls_reload_certs: mz_server_core::cert_reload_never_reload(),
1295 helm_chart_version: None,
1296 license_key: ValidatedLicenseKey::for_tests(),
1297 external_login_password_mz_system: None,
1298 force_builtin_schema_migration: None,
1299 };
1300 let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
1306 oneshot::channel();
1307 let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
1308 let (password_server_addr_tx, password_server_addr_rx) = oneshot::channel();
1309 let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
1310 let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
1311 let server_thread = thread::spawn(|| {
1312 let runtime = match Runtime::new() {
1313 Ok(runtime) => runtime,
1314 Err(e) => {
1315 server_addr_tx
1316 .send(Err(e.into()))
1317 .expect("receiver should not drop first");
1318 return;
1319 }
1320 };
1321 let server = match runtime.block_on(listeners.serve(server_config)) {
1322 Ok(runtime) => runtime,
1323 Err(e) => {
1324 server_addr_tx
1325 .send(Err(e.into()))
1326 .expect("receiver should not drop first");
1327 return;
1328 }
1329 };
1330 server_addr_tx
1331 .send(Ok(server.sql_listener_handles["external"].local_addr))
1332 .expect("receiver should not drop first");
1333 internal_server_addr_tx
1334 .send(server.sql_listener_handles["internal"].local_addr)
1335 .expect("receiver should not drop first");
1336 password_server_addr_tx
1337 .send(server.sql_listener_handles["password"].local_addr)
1338 .expect("receiver should not drop first");
1339 internal_http_server_addr_tx
1340 .send(server.http_listener_handles["internal"].local_addr)
1341 .expect("receiver should not drop first");
1342 runtime.block_on(shutdown_trigger_rx);
1343 });
1344 let server_addr = server_addr_rx.await??;
1345 let internal_server_addr = internal_server_addr_rx.await?;
1346 let password_server_addr = password_server_addr_rx.await?;
1347 let internal_http_server_addr = internal_http_server_addr_rx.await?;
1348
1349 let system_client = connect(internal_server_addr, Some("mz_system"), None)
1350 .await
1351 .unwrap();
1352 let client = connect(server_addr, None, None).await.unwrap();
1353
1354 let inner = RunnerInner {
1355 server_addr,
1356 internal_server_addr,
1357 password_server_addr,
1358 internal_http_server_addr,
1359 _shutdown_trigger: shutdown_trigger,
1360 _server_thread: server_thread.join_on_drop(),
1361 _temp_dir: temp_dir,
1362 client,
1363 system_client,
1364 clients: BTreeMap::new(),
1365 auto_index_tables: config.auto_index_tables,
1366 auto_index_selects: config.auto_index_selects,
1367 auto_transactions: config.auto_transactions,
1368 enable_table_keys: config.enable_table_keys,
1369 verbose: config.verbose,
1370 stdout: config.stdout,
1371 };
1372 inner.ensure_fixed_features().await?;
1373
1374 Ok(inner)
1375 }
1376
1377 #[allow(clippy::disallowed_methods)]
1380 async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
1381 self.system_client
1385 .execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
1386 .await?;
1387
1388 self.system_client
1390 .execute("ALTER SYSTEM SET unsafe_enable_unsafe_functions = on", &[])
1391 .await?;
1392 Ok(())
1393 }
1394
1395 #[allow(clippy::disallowed_methods)]
1396 async fn run_record<'r>(
1397 &mut self,
1398 record: &'r Record<'r>,
1399 in_transaction: &mut bool,
1400 replacements: &[(Regex, String)],
1401 ) -> Result<Outcome<'r>, anyhow::Error> {
1402 match &record {
1403 Record::Statement {
1404 expected_error,
1405 rows_affected,
1406 sql,
1407 location,
1408 } => {
1409 if self.auto_transactions && *in_transaction {
1410 self.client.execute("COMMIT", &[]).await?;
1411 *in_transaction = false;
1412 }
1413 match self
1414 .run_statement(*expected_error, *rows_affected, sql, location.clone())
1415 .await?
1416 {
1417 Outcome::Success => {
1418 if self.auto_index_tables {
1419 let additional = mutate(sql);
1420 for stmt in additional {
1421 self.client.execute(&stmt, &[]).await?;
1422 }
1423 }
1424 Ok(Outcome::Success)
1425 }
1426 other => {
1427 if expected_error.is_some() {
1428 Ok(other)
1429 } else {
1430 Ok(Outcome::Bail {
1434 cause: Box::new(other),
1435 location: location.clone(),
1436 })
1437 }
1438 }
1439 }
1440 }
1441 Record::Query {
1442 sql,
1443 output,
1444 location,
1445 } => {
1446 self.run_query(sql, output, location.clone(), in_transaction, replacements)
1447 .await
1448 }
1449 Record::Simple {
1450 conn,
1451 user,
1452 password,
1453 sql,
1454 sort,
1455 output,
1456 location,
1457 ..
1458 } => {
1459 self.run_simple(
1460 *conn,
1461 *user,
1462 *password,
1463 sql,
1464 sort.clone(),
1465 output,
1466 location.clone(),
1467 )
1468 .await
1469 }
1470 Record::Copy {
1471 table_name,
1472 tsv_path,
1473 } => {
1474 let tsv = tokio::fs::read(tsv_path).await?;
1475 let copy = self
1476 .client
1477 .copy_in(sql!("COPY {} FROM STDIN", Sql::ident(*table_name)).as_str())
1478 .await?;
1479 tokio::pin!(copy);
1480 copy.send(bytes::Bytes::from(tsv)).await?;
1481 copy.finish().await?;
1482 Ok(Outcome::Success)
1483 }
1484 _ => Ok(Outcome::Success),
1485 }
1486 }
1487
1488 #[allow(clippy::disallowed_methods)]
1489 async fn run_statement<'r>(
1490 &self,
1491 expected_error: Option<&'r str>,
1492 expected_rows_affected: Option<u64>,
1493 sql: &'r str,
1494 location: Location,
1495 ) -> Result<Outcome<'r>, anyhow::Error> {
1496 static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
1497 LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
1498 if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
1499 return Ok(Outcome::Success);
1501 }
1502
1503 match self.client.execute(sql, &[]).await {
1504 Ok(actual) => {
1505 if let Some(expected_error) = expected_error {
1506 return Ok(Outcome::UnexpectedPlanSuccess {
1507 expected_error,
1508 location,
1509 });
1510 }
1511 match expected_rows_affected {
1512 None => Ok(Outcome::Success),
1513 Some(expected) => {
1514 if expected != actual {
1515 Ok(Outcome::WrongNumberOfRowsInserted {
1516 expected_count: expected,
1517 actual_count: actual,
1518 location,
1519 })
1520 } else {
1521 Ok(Outcome::Success)
1522 }
1523 }
1524 }
1525 }
1526 Err(error) => {
1527 if let Some(expected_error) = expected_error {
1528 if Regex::new(expected_error)?.is_match(&error.to_string_with_causes()) {
1529 return Ok(Outcome::Success);
1530 }
1531 return Ok(Outcome::PlanFailure {
1532 error: anyhow!(error),
1533 expected_error: Some(expected_error.to_string()),
1534 location,
1535 });
1536 }
1537 Ok(Outcome::PlanFailure {
1538 error: anyhow!(error),
1539 expected_error: None,
1540 location,
1541 })
1542 }
1543 }
1544 }
1545
1546 #[allow(clippy::disallowed_methods)]
1547 async fn prepare_query<'r>(
1548 &self,
1549 sql: &str,
1550 output: &'r Result<QueryOutput<'_>, &'r str>,
1551 location: Location,
1552 in_transaction: &mut bool,
1553 ) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
1554 let statements = match mz_sql::parse::parse(sql) {
1556 Ok(statements) => statements,
1557 Err(e) => match output {
1558 Ok(_) => {
1559 return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1560 error: e.into(),
1561 location,
1562 }));
1563 }
1564 Err(expected_error) => {
1565 if Regex::new(expected_error)?.is_match(&e.to_string_with_causes()) {
1566 return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
1567 } else {
1568 return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1569 error: e.into(),
1570 location,
1571 }));
1572 }
1573 }
1574 },
1575 };
1576 let statement = match &*statements {
1577 [] => bail!("Got zero statements?"),
1578 [statement] => &statement.ast,
1579 _ => bail!("Got multiple statements: {:?}", statements),
1580 };
1581 let (is_select, num_attributes, has_as_of) = match statement {
1582 Statement::Select(stmt) => (
1583 true,
1584 derive_num_attributes(&stmt.query.body),
1585 stmt.as_of.is_some(),
1586 ),
1587 _ => (false, None, false),
1588 };
1589
1590 match output {
1591 Ok(_) => {
1592 if self.auto_transactions && !*in_transaction {
1593 self.client.execute("BEGIN", &[]).await?;
1595 *in_transaction = true;
1596 }
1597 }
1598 Err(_) => {
1599 if self.auto_transactions && *in_transaction {
1600 self.client.execute("COMMIT", &[]).await?;
1601 *in_transaction = false;
1602 }
1603 }
1604 }
1605
1606 match statement {
1610 Statement::Show(..) => {
1611 if self.auto_transactions && *in_transaction {
1612 self.client.execute("COMMIT", &[]).await?;
1613 *in_transaction = false;
1614 }
1615 }
1616 _ => (),
1617 }
1618 Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
1619 is_select,
1620 num_attributes,
1621 has_as_of,
1622 }))
1623 }
1624
1625 #[allow(clippy::disallowed_methods)]
1626 async fn execute_query<'r>(
1627 &self,
1628 sql: &str,
1629 output: &'r Result<QueryOutput<'_>, &'r str>,
1630 location: Location,
1631 replacements: &[(Regex, String)],
1632 ) -> Result<Outcome<'r>, anyhow::Error> {
1633 let rows = match self.client.query(sql, &[]).await {
1634 Ok(rows) => rows,
1635 Err(error) => {
1636 let error_string = error.to_string_with_causes();
1637 return match output {
1638 Ok(_) => {
1639 if error_string.contains("supported") || error_string.contains("overload") {
1640 Ok(Outcome::Unsupported {
1642 error: anyhow!(error),
1643 location,
1644 })
1645 } else {
1646 Ok(Outcome::PlanFailure {
1647 error: anyhow!(error),
1648 expected_error: None,
1649 location,
1650 })
1651 }
1652 }
1653 Err(expected_error) => {
1654 if Regex::new(expected_error)?.is_match(&error_string) {
1655 Ok(Outcome::Success)
1656 } else {
1657 Ok(Outcome::PlanFailure {
1658 error: anyhow!(error),
1659 expected_error: Some(expected_error.to_string()),
1660 location,
1661 })
1662 }
1663 }
1664 };
1665 }
1666 };
1667
1668 let QueryOutput {
1670 sort,
1671 types: expected_types,
1672 column_names: expected_column_names,
1673 output: expected_output,
1674 mode,
1675 ..
1676 } = match output {
1677 Err(expected_error) => {
1678 return Ok(Outcome::UnexpectedPlanSuccess {
1679 expected_error,
1680 location,
1681 });
1682 }
1683 Ok(query_output) => query_output,
1684 };
1685
1686 let mut formatted_rows = vec![];
1688 for row in &rows {
1689 if row.len() != expected_types.len() {
1690 return Ok(Outcome::WrongColumnCount {
1691 expected_count: expected_types.len(),
1692 actual_count: row.len(),
1693 location,
1694 });
1695 }
1696 let row = format_row(row, expected_types, *mode);
1697 formatted_rows.push(row);
1698 }
1699
1700 if let Sort::Row = sort {
1702 formatted_rows.sort();
1703 }
1704 let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
1705 if let Sort::Value = sort {
1706 values.sort();
1707 }
1708
1709 if !replacements.is_empty() {
1714 for value in &mut values {
1715 for (regex, replacement) in replacements {
1716 *value = regex.replace_all(value, replacement.as_str()).into_owned();
1717 }
1718 }
1719 }
1720
1721 if let Some(row) = rows.get(0) {
1723 if let Some(expected_column_names) = expected_column_names {
1725 let actual_column_names = row
1726 .columns()
1727 .iter()
1728 .map(|t| ColumnName::from(t.name()))
1729 .collect::<Vec<_>>();
1730 if expected_column_names != &actual_column_names {
1731 return Ok(Outcome::WrongColumnNames {
1732 expected_column_names,
1733 actual_column_names,
1734 actual_output: Output::Values(values),
1735 location,
1736 });
1737 }
1738 }
1739 }
1740
1741 match expected_output {
1743 Output::Values(expected_values) => {
1744 if values != *expected_values {
1745 return Ok(Outcome::OutputFailure {
1746 expected_output,
1747 actual_raw_output: rows,
1748 actual_output: Output::Values(values),
1749 location,
1750 });
1751 }
1752 }
1753 Output::Hashed {
1754 num_values,
1755 md5: expected_md5,
1756 } => {
1757 let mut hasher = Md5::new();
1758 for value in &values {
1759 hasher.update(value);
1760 hasher.update("\n");
1761 }
1762 let md5 = format!("{:x}", hasher.finalize());
1763 if values.len() != *num_values || md5 != *expected_md5 {
1764 return Ok(Outcome::OutputFailure {
1765 expected_output,
1766 actual_raw_output: rows,
1767 actual_output: Output::Hashed {
1768 num_values: values.len(),
1769 md5,
1770 },
1771 location,
1772 });
1773 }
1774 }
1775 }
1776
1777 Ok(Outcome::Success)
1778 }
1779
1780 #[allow(clippy::disallowed_methods)]
1781 async fn execute_view_inner<'r>(
1782 &self,
1783 sql: &str,
1784 output: &'r Result<QueryOutput<'_>, &'r str>,
1785 location: Location,
1786 ) -> Result<Option<Outcome<'r>>, anyhow::Error> {
1787 print_sql_if(self.stdout, sql, self.verbose);
1788 let sql_result = self.client.execute(sql, &[]).await;
1789
1790 let tentative_outcome = if let Err(view_error) = sql_result {
1792 if let Err(expected_error) = output {
1793 if Regex::new(expected_error)?.is_match(&view_error.to_string_with_causes()) {
1794 Some(Outcome::Success)
1795 } else {
1796 Some(Outcome::PlanFailure {
1797 error: view_error.into(),
1798 expected_error: Some(expected_error.to_string()),
1799 location: location.clone(),
1800 })
1801 }
1802 } else {
1803 Some(Outcome::PlanFailure {
1804 error: view_error.into(),
1805 expected_error: None,
1806 location: location.clone(),
1807 })
1808 }
1809 } else {
1810 None
1811 };
1812 Ok(tentative_outcome)
1813 }
1814
1815 #[allow(clippy::disallowed_methods)]
1816 async fn execute_view<'r>(
1817 &self,
1818 sql: &str,
1819 num_attributes: Option<usize>,
1820 output: &'r Result<QueryOutput<'_>, &'r str>,
1821 location: Location,
1822 replacements: &[(Regex, String)],
1823 ) -> Result<Outcome<'r>, anyhow::Error> {
1824 let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
1826 column_names.clone()
1827 } else {
1828 None
1829 };
1830 let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
1831 sql,
1832 Uuid::new_v4().as_simple(),
1833 num_attributes,
1834 expected_column_names,
1835 );
1836 let tentative_outcome = self
1837 .execute_view_inner(create_view.as_str(), output, location.clone())
1838 .await?;
1839
1840 if let Some(view_outcome) = tentative_outcome {
1843 return Ok(view_outcome);
1844 }
1845
1846 let tentative_outcome = self
1847 .execute_view_inner(create_index.as_str(), output, location.clone())
1848 .await?;
1849
1850 let view_outcome;
1851 if let Some(outcome) = tentative_outcome {
1852 view_outcome = outcome;
1853 } else {
1854 print_sql_if(self.stdout, view_sql.as_str(), self.verbose);
1855 view_outcome = self
1856 .execute_query(view_sql.as_str(), output, location.clone(), replacements)
1857 .await?;
1858 }
1859
1860 print_sql_if(self.stdout, drop_view.as_str(), self.verbose);
1862 self.client.execute(drop_view.as_str(), &[]).await?;
1863
1864 Ok(view_outcome)
1865 }
1866
1867 async fn run_query<'r>(
1868 &self,
1869 sql: &'r str,
1870 output: &'r Result<QueryOutput<'_>, &'r str>,
1871 location: Location,
1872 in_transaction: &mut bool,
1873 replacements: &[(Regex, String)],
1874 ) -> Result<Outcome<'r>, anyhow::Error> {
1875 let prepare_outcome = self
1876 .prepare_query(sql, output, location.clone(), in_transaction)
1877 .await?;
1878 match prepare_outcome {
1879 PrepareQueryOutcome::QueryPrepared(QueryInfo {
1880 is_select,
1881 num_attributes,
1882 has_as_of,
1883 }) => {
1884 let query_outcome = self
1885 .execute_query(sql, output, location.clone(), replacements)
1886 .await?;
1887 if is_select && !has_as_of && self.auto_index_selects && query_outcome.success() {
1893 let view_outcome = self
1894 .execute_view(sql, None, output, location.clone(), replacements)
1895 .await?;
1896
1897 if !view_outcome.success() {
1898 let view_outcome = if num_attributes.is_some() {
1903 self.execute_view(
1904 sql,
1905 num_attributes,
1906 output,
1907 location.clone(),
1908 replacements,
1909 )
1910 .await?
1911 } else {
1912 view_outcome
1913 };
1914
1915 if !view_outcome.success() {
1916 let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
1917 query_outcome: Box::new(query_outcome),
1918 view_outcome: Box::new(view_outcome),
1919 location: location.clone(),
1920 };
1921 let outcome = if should_warn(&inconsistent_view_outcome) {
1924 Outcome::Warning {
1925 cause: Box::new(inconsistent_view_outcome),
1926 location: location.clone(),
1927 }
1928 } else {
1929 inconsistent_view_outcome
1930 };
1931 return Ok(outcome);
1932 }
1933 }
1934 }
1935 Ok(query_outcome)
1936 }
1937 PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
1938 }
1939 }
1940
1941 async fn get_conn(
1942 &mut self,
1943 name: Option<&str>,
1944 user: Option<&str>,
1945 password: Option<&str>,
1946 ) -> Result<&tokio_postgres::Client, tokio_postgres::Error> {
1947 match name {
1948 None => Ok(&self.client),
1949 Some(name) => {
1950 if !self.clients.contains_key(name) {
1951 let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
1952 self.internal_server_addr
1953 } else if password.is_some() {
1954 self.password_server_addr
1956 } else {
1957 self.server_addr
1958 };
1959 let client = connect(addr, user, password).await?;
1960 self.clients.insert(name.into(), client);
1961 }
1962 Ok(self.clients.get(name).unwrap())
1963 }
1964 }
1965 }
1966
1967 #[allow(clippy::disallowed_methods)]
1968 async fn run_simple<'r>(
1969 &mut self,
1970 conn: Option<&'r str>,
1971 user: Option<&'r str>,
1972 password: Option<&'r str>,
1973 sql: &'r str,
1974 sort: Sort,
1975 output: &'r Output,
1976 location: Location,
1977 ) -> Result<Outcome<'r>, anyhow::Error> {
1978 let actual = match self.get_conn(conn, user, password).await {
1979 Ok(client) => match client.simple_query(sql).await {
1980 Ok(result) => {
1981 let mut rows = Vec::new();
1982
1983 for m in result.into_iter() {
1984 match m {
1985 SimpleQueryMessage::Row(row) => {
1986 let mut s = vec![];
1987 for i in 0..row.len() {
1988 s.push(row.get(i).unwrap_or("NULL"));
1989 }
1990 rows.push(s.join(","));
1991 }
1992 SimpleQueryMessage::CommandComplete(count) => {
1993 rows.push(format!("COMPLETE {}", count));
1996 }
1997 SimpleQueryMessage::RowDescription(_) => {}
1998 _ => panic!("unexpected"),
1999 }
2000 }
2001
2002 if let Sort::Row = sort {
2003 rows.sort();
2004 }
2005
2006 Output::Values(rows)
2007 }
2008 Err(error) => Output::Values(
2012 error
2013 .to_string_with_causes()
2014 .lines()
2015 .map(|s| s.to_string())
2016 .collect(),
2017 ),
2018 },
2019 Err(error) => Output::Values(
2020 error
2021 .to_string_with_causes()
2022 .lines()
2023 .map(|s| s.to_string())
2024 .collect(),
2025 ),
2026 };
2027 if *output != actual {
2028 Ok(Outcome::OutputFailure {
2029 expected_output: output,
2030 actual_raw_output: vec![],
2031 actual_output: actual,
2032 location,
2033 })
2034 } else {
2035 Ok(Outcome::Success)
2036 }
2037 }
2038
2039 async fn check_catalog(&self) -> Result<(), anyhow::Error> {
2040 let url = format!(
2041 "http://{}/api/catalog/check",
2042 self.internal_http_server_addr
2043 );
2044 let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
2045
2046 if let Some(inconsistencies) = response.get("err") {
2047 let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
2048 .expect("serializing Value cannot fail");
2049 Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
2050 } else {
2051 Ok(())
2052 }
2053 }
2054}
2055
2056async fn connect(
2057 addr: SocketAddr,
2058 user: Option<&str>,
2059 password: Option<&str>,
2060) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
2061 let mut config = tokio_postgres::Config::new();
2062 config.host(addr.ip().to_string());
2063 config.port(addr.port());
2064 config.user(user.unwrap_or("materialize"));
2065 if let Some(password) = password {
2066 config.password(password);
2067 }
2068 let (client, connection) = config.connect(NoTls).await?;
2069
2070 task::spawn(|| "sqllogictest_connect", async move {
2071 if let Err(e) = connection.await {
2072 eprintln!("connection error: {}", e);
2073 }
2074 });
2075 Ok(client)
2076}
2077
2078pub trait WriteFmt {
2079 fn write_fmt(&self, fmt: fmt::Arguments<'_>);
2080}
2081
2082pub struct RunConfig<'a> {
2083 pub stdout: &'a dyn WriteFmt,
2084 pub stderr: &'a dyn WriteFmt,
2085 pub verbose: bool,
2086 pub quiet: bool,
2087 pub postgres_url: String,
2088 pub prefix: String,
2089 pub no_fail: bool,
2090 pub fail_fast: bool,
2091 pub auto_index_tables: bool,
2092 pub auto_index_selects: bool,
2093 pub auto_transactions: bool,
2094 pub enable_table_keys: bool,
2095 pub orchestrator_process_wrapper: Option<String>,
2096 pub tracing: TracingCliArgs,
2097 pub tracing_handle: TracingHandle,
2098 pub system_parameter_defaults: BTreeMap<String, String>,
2099 pub persist_dir: TempDir,
2104 pub replicas: usize,
2105 pub replica_size: String,
2106}
2107
2108const PRINT_INDENT: usize = 4;
2110
2111fn print_record(config: &RunConfig<'_>, record: &Record) {
2112 match record {
2113 Record::Statement { sql, .. } | Record::Query { sql, .. } => {
2114 print_sql(config.stdout, sql, None)
2115 }
2116 Record::Simple { conn, sql, .. } => print_sql(config.stdout, sql, *conn),
2117 Record::Copy {
2118 table_name,
2119 tsv_path,
2120 } => {
2121 writeln!(
2122 config.stdout,
2123 "{}slt copy {} from {}",
2124 " ".repeat(PRINT_INDENT),
2125 table_name,
2126 tsv_path
2127 )
2128 }
2129 Record::ResetServer => {
2130 writeln!(config.stdout, "{}reset-server", " ".repeat(PRINT_INDENT))
2131 }
2132 Record::Halt => {
2133 writeln!(config.stdout, "{}halt", " ".repeat(PRINT_INDENT))
2134 }
2135 Record::HashThreshold { threshold } => {
2136 writeln!(
2137 config.stdout,
2138 "{}hash-threshold {}",
2139 " ".repeat(PRINT_INDENT),
2140 threshold
2141 )
2142 }
2143 Record::Replace {
2144 pattern,
2145 replacement,
2146 } => {
2147 writeln!(
2148 config.stdout,
2149 "{}replace {} {}",
2150 " ".repeat(PRINT_INDENT),
2151 pattern,
2152 replacement
2153 )
2154 }
2155 }
2156}
2157
2158fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
2159 if cond {
2160 print_sql(stdout, sql, None)
2161 }
2162}
2163
2164fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str, conn: Option<&str>) {
2165 let text = if let Some(conn) = conn {
2166 format!("[conn={}] {}", conn, sql)
2167 } else {
2168 sql.to_string()
2169 };
2170 writeln!(stdout, "{}", util::indent(&text, PRINT_INDENT))
2171}
2172
2173const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
2176 "cannot materialize call to",
2179 "SHOW commands are not allowed in views",
2180 "cannot create view with unstable dependencies",
2181 "cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
2182 "no valid schema selected",
2183 r#"system schema '\w+' cannot be modified"#,
2184 r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
2185 r#"column "[\w\?]+" specified more than once"#,
2190 r#"column "(\w+\.)?\w+" does not exist"#,
2191];
2192
2193fn should_warn(outcome: &Outcome) -> bool {
2198 match outcome {
2199 Outcome::InconsistentViewOutcome { view_outcome, .. } => match view_outcome.as_ref() {
2200 Outcome::PlanFailure { error, .. } => {
2201 INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
2202 Regex::new(s)
2203 .expect("unexpected error in regular expression parsing")
2204 .is_match(&error.to_string_with_causes())
2205 })
2206 }
2207 _ => false,
2208 },
2209 _ => false,
2210 }
2211}
2212
2213pub async fn run_string(
2214 runner: &mut Runner<'_>,
2215 source: &str,
2216 input: &str,
2217) -> Result<Outcomes, anyhow::Error> {
2218 runner.reset_database().await?;
2219 runner.replacements.clear();
2222
2223 let mut outcomes = Outcomes::default();
2224 let mut parser = crate::parser::Parser::new(source, input);
2225 let mut in_transaction = false;
2228 writeln!(runner.config.stdout, "--- {}", source);
2229
2230 for record in parser.parse_records()? {
2231 if runner.config.verbose {
2235 print_record(runner.config, &record);
2236 }
2237
2238 let outcome = runner
2239 .run_record(&record, &mut in_transaction)
2240 .await
2241 .map_err(|err| format!("In {}:\n{}", source, err))
2242 .unwrap();
2243
2244 if !runner.config.quiet && !outcome.success() {
2246 if !runner.config.verbose {
2247 if !outcome.failure() {
2254 writeln!(
2255 runner.config.stdout,
2256 "{}",
2257 util::indent("Warning detected for: ", 4)
2258 );
2259 }
2260 print_record(runner.config, &record);
2261 }
2262 if runner.config.verbose || outcome.failure() {
2263 writeln!(
2264 runner.config.stdout,
2265 "{}",
2266 util::indent(&outcome.to_string(), 4)
2267 );
2268 writeln!(runner.config.stdout, "{}", util::indent("----", 4));
2269 }
2270 }
2271
2272 outcomes.stats[outcome.code()] += 1;
2273 if outcome.failure() {
2274 outcomes.details.push(format!("{}", outcome));
2275 }
2276
2277 if let Outcome::Bail { .. } = outcome {
2278 break;
2279 }
2280
2281 if runner.config.fail_fast && outcome.failure() {
2282 break;
2283 }
2284 }
2285 Ok(outcomes)
2286}
2287
2288pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
2289 let mut input = String::new();
2290 File::open(filename)?.read_to_string(&mut input)?;
2291 let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
2292 runner.check_catalog().await?;
2293
2294 Ok(outcomes)
2295}
2296
2297pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
2298 runner.reset_database().await?;
2299 runner.replacements.clear();
2302
2303 let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
2304
2305 let mut input = String::new();
2306 file.read_to_string(&mut input)?;
2307
2308 let mut buf = RewriteBuffer::new(&input);
2309
2310 let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
2311 writeln!(runner.config.stdout, "--- {}", filename.display());
2312 let mut in_transaction = false;
2313
2314 fn append_values_output(
2315 buf: &mut RewriteBuffer,
2316 input: &String,
2317 expected_output: &str,
2318 mode: &Mode,
2319 types: &Vec<Type>,
2320 column_names: Option<&Vec<ColumnName>>,
2321 actual_output: &Vec<String>,
2322 multiline: bool,
2323 ) {
2324 buf.append_header(input, expected_output, column_names);
2325
2326 for (i, row) in actual_output.chunks(types.len()).enumerate() {
2327 match mode {
2328 Mode::Cockroach => {
2331 if i != 0 {
2332 buf.append("\n");
2333 }
2334
2335 if row.len() == 0 {
2336 } else if row.len() == 1 {
2338 if multiline {
2341 buf.append(&row[0]);
2342 } else {
2343 buf.append(&row[0].replace('\n', "⏎"))
2344 }
2345 } else {
2346 buf.append(
2349 &row.iter()
2350 .map(|col| {
2351 let mut col = col.replace(' ', "␠");
2352 if !multiline {
2353 col = col.replace('\n', "⏎");
2354 }
2355 col
2356 })
2357 .join(" "),
2358 );
2359 }
2360 }
2361 Mode::Standard => {
2366 for (j, col) in row.iter().enumerate() {
2367 if i != 0 || j != 0 {
2368 buf.append("\n");
2369 }
2370 buf.append(&if multiline {
2371 col.clone()
2372 } else {
2373 col.replace('\n', "⏎")
2374 });
2375 }
2376 }
2377 }
2378 }
2379 }
2380
2381 for record in parser.parse_records()? {
2382 let outcome = runner.run_record(&record, &mut in_transaction).await?;
2383
2384 match (&record, &outcome) {
2385 (
2388 Record::Query {
2389 output:
2390 Ok(QueryOutput {
2391 mode,
2392 output: Output::Values(_),
2393 output_str: expected_output,
2394 types,
2395 column_names,
2396 multiline,
2397 ..
2398 }),
2399 ..
2400 },
2401 Outcome::OutputFailure {
2402 actual_output: Output::Values(actual_output),
2403 ..
2404 },
2405 ) => {
2406 append_values_output(
2407 &mut buf,
2408 &input,
2409 expected_output,
2410 mode,
2411 types,
2412 column_names.as_ref(),
2413 actual_output,
2414 *multiline,
2415 );
2416 }
2417 (
2418 Record::Query {
2419 output:
2420 Ok(QueryOutput {
2421 mode,
2422 output: Output::Values(_),
2423 output_str: expected_output,
2424 types,
2425 multiline,
2426 ..
2427 }),
2428 ..
2429 },
2430 Outcome::WrongColumnNames {
2431 actual_column_names,
2432 actual_output: Output::Values(actual_output),
2433 ..
2434 },
2435 ) => {
2436 append_values_output(
2437 &mut buf,
2438 &input,
2439 expected_output,
2440 mode,
2441 types,
2442 Some(actual_column_names),
2443 actual_output,
2444 *multiline,
2445 );
2446 }
2447 (
2448 Record::Query {
2449 output:
2450 Ok(QueryOutput {
2451 output: Output::Hashed { .. },
2452 output_str: expected_output,
2453 column_names,
2454 ..
2455 }),
2456 ..
2457 },
2458 Outcome::OutputFailure {
2459 actual_output: Output::Hashed { num_values, md5 },
2460 ..
2461 },
2462 ) => {
2463 buf.append_header(&input, expected_output, column_names.as_ref());
2464
2465 buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
2466 }
2467 (
2468 Record::Simple {
2469 output_str: expected_output,
2470 ..
2471 },
2472 Outcome::OutputFailure {
2473 actual_output: Output::Values(actual_output),
2474 ..
2475 },
2476 ) => {
2477 buf.append_header(&input, expected_output, None);
2478
2479 for (i, row) in actual_output.iter().enumerate() {
2480 if i != 0 {
2481 buf.append("\n");
2482 }
2483 buf.append(row);
2484 }
2485 }
2486 (
2487 Record::Query {
2488 sql,
2489 output: Err(err),
2490 ..
2491 },
2492 outcome,
2493 )
2494 | (
2495 Record::Statement {
2496 expected_error: Some(err),
2497 sql,
2498 ..
2499 },
2500 outcome,
2501 ) if outcome.err_msg().is_some() => {
2502 buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
2503 }
2504 (_, Outcome::Success) => {}
2505 _ => bail!("unexpected: {:?} {:?}", record, outcome),
2506 }
2507 }
2508
2509 file.set_len(0)?;
2510 file.seek(SeekFrom::Start(0))?;
2511 file.write_all(buf.finish().as_bytes())?;
2512 file.sync_all()?;
2513 Ok(())
2514}
2515
2516#[derive(Debug)]
2525struct RewriteBuffer<'a> {
2526 input: &'a str,
2527 input_offset: usize,
2528 output: String,
2529}
2530
2531impl<'a> RewriteBuffer<'a> {
2532 fn new(input: &'a str) -> RewriteBuffer<'a> {
2533 RewriteBuffer {
2534 input,
2535 input_offset: 0,
2536 output: String::new(),
2537 }
2538 }
2539
2540 fn flush_to(&mut self, offset: usize) {
2541 assert!(offset >= self.input_offset);
2542 let chunk = &self.input[self.input_offset..offset];
2543 self.output.push_str(chunk);
2544 self.input_offset = offset;
2545 }
2546
2547 fn skip_to(&mut self, offset: usize) {
2548 assert!(offset >= self.input_offset);
2549 self.input_offset = offset;
2550 }
2551
2552 fn append(&mut self, s: &str) {
2553 self.output.push_str(s);
2554 }
2555
2556 fn append_header(
2557 &mut self,
2558 input: &String,
2559 expected_output: &str,
2560 column_names: Option<&Vec<ColumnName>>,
2561 ) {
2562 #[allow(clippy::as_conversions)]
2565 let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
2566 self.flush_to(offset);
2567 self.skip_to(offset + expected_output.len());
2568
2569 if self.peek_last(5) == "\n----" {
2572 self.append("\n");
2573 } else if self.peek_last(6) != "\n----\n" {
2574 self.append("\n----\n");
2575 }
2576
2577 let Some(names) = column_names else {
2578 return;
2579 };
2580 self.append(
2581 &names
2582 .iter()
2583 .map(|name| name.replace(' ', "␠"))
2584 .collect::<Vec<_>>()
2585 .join(" "),
2586 );
2587 self.append("\n");
2588 }
2589
2590 fn rewrite_expected_error(
2591 &mut self,
2592 input: &String,
2593 old_err: &str,
2594 new_err: &str,
2595 query: &str,
2596 ) {
2597 #[allow(clippy::as_conversions)]
2600 let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
2601 self.flush_to(err_offset);
2602 self.append(new_err);
2603 self.append("\n");
2604 self.append(query);
2605 #[allow(clippy::as_conversions)]
2607 self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
2608 }
2609
2610 fn peek_last(&self, n: usize) -> &str {
2611 &self.output[self.output.len() - n..]
2612 }
2613
2614 fn finish(mut self) -> String {
2615 self.flush_to(self.input.len());
2616 self.output
2617 }
2618}
2619
2620fn generate_view_sql(
2628 sql: &str,
2629 view_uuid: &Simple,
2630 num_attributes: Option<usize>,
2631 expected_column_names: Option<Vec<ColumnName>>,
2632) -> (String, String, String, String) {
2633 let stmts = parser::parse_statements(sql).unwrap_or_default();
2641 assert!(stmts.len() == 1);
2642 let (query, query_as_of) = match &stmts[0].ast {
2643 Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
2644 _ => unreachable!("This function should only be called for SELECTs"),
2645 };
2646
2647 let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
2652 (query.order_by.clone(), vec![], None)
2653 } else {
2654 derive_order_by(&query.body, &query.order_by)
2655 };
2656
2657 let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
2673 let projection = expected_column_names.map_or_else(
2674 || {
2675 num_attributes.map_or(vec![], |n| {
2676 (1..=n)
2677 .map(|i| Ident::new_unchecked(format!("a{i}")))
2678 .collect()
2679 })
2680 },
2681 |cols| {
2682 cols.iter()
2683 .map(|c| Ident::new_unchecked(c.as_str()))
2684 .collect()
2685 },
2686 );
2687 let columns: Vec<Ident> = projection
2688 .iter()
2689 .cloned()
2690 .chain(extra_columns.iter().map(|item| {
2691 if let SelectItem::Expr {
2692 expr: _,
2693 alias: Some(ident),
2694 } = item
2695 {
2696 ident.clone()
2697 } else {
2698 unreachable!("alias must be given for extra column")
2699 }
2700 }))
2701 .collect();
2702
2703 let mut query = query.clone();
2705 if extra_columns.len() > 0 {
2706 match &mut query.body {
2707 SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
2708 _ => unimplemented!("cannot yet rewrite projections of nested queries"),
2709 }
2710 }
2711 let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
2712 if_exists: IfExistsBehavior::Error,
2713 temporary: false,
2714 definition: ViewDefinition {
2715 name: name.clone(),
2716 columns: columns.clone(),
2717 query,
2718 },
2719 })
2720 .to_ast_string_stable();
2721
2722 let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2726 name: None,
2727 in_cluster: None,
2728 on_name: RawItemName::Name(name.clone()),
2729 key_parts: if columns.len() == 0 {
2730 None
2731 } else {
2732 Some(
2733 columns
2734 .iter()
2735 .map(|ident| Expr::Identifier(vec![ident.clone()]))
2736 .collect(),
2737 )
2738 },
2739 with_options: Vec::new(),
2740 if_not_exists: false,
2741 })
2742 .to_ast_string_stable();
2743
2744 let distinct_unneeded = extra_columns.len() == 0
2746 || match distinct {
2747 None | Some(Distinct::On(_)) => true,
2748 Some(Distinct::EntireRow) => false,
2749 };
2750 let distinct = if distinct_unneeded { None } else { distinct };
2751
2752 let view_sql = AstStatement::<Raw>::Select(SelectStatement {
2754 query: Query {
2755 ctes: CteBlock::Simple(vec![]),
2756 body: SetExpr::Select(Box::new(Select {
2757 distinct,
2758 projection: if projection.len() == 0 {
2759 vec![SelectItem::Wildcard]
2760 } else {
2761 projection
2762 .iter()
2763 .map(|ident| SelectItem::Expr {
2764 expr: Expr::Identifier(vec![ident.clone()]),
2765 alias: None,
2766 })
2767 .collect()
2768 },
2769 from: vec![TableWithJoins {
2770 relation: TableFactor::Table {
2771 name: RawItemName::Name(name.clone()),
2772 alias: None,
2773 },
2774 joins: vec![],
2775 }],
2776 selection: None,
2777 group_by: vec![],
2778 having: None,
2779 qualify: None,
2780 options: vec![],
2781 })),
2782 order_by: view_order_by,
2783 limit: None,
2784 offset: None,
2785 },
2786 as_of: query_as_of.clone(),
2787 })
2788 .to_ast_string_stable();
2789
2790 let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
2792 object_type: ObjectType::View,
2793 if_exists: false,
2794 names: vec![UnresolvedObjectName::Item(name)],
2795 cascade: false,
2796 })
2797 .to_ast_string_stable();
2798
2799 (create_view, create_index, view_sql, drop_view)
2800}
2801
2802fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
2807 let Some((projection, _)) = find_projection(body) else {
2808 return None;
2809 };
2810 derive_num_attributes_from_projection(projection)
2811}
2812
2813fn derive_order_by(
2824 body: &SetExpr<Raw>,
2825 order_by: &Vec<OrderByExpr<Raw>>,
2826) -> (
2827 Vec<OrderByExpr<Raw>>,
2828 Vec<SelectItem<Raw>>,
2829 Option<Distinct<Raw>>,
2830) {
2831 let Some((projection, distinct)) = find_projection(body) else {
2832 return (vec![], vec![], None);
2833 };
2834 let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
2835 (view_order_by, extra_columns, distinct.clone())
2836}
2837
2838fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
2840 let mut set_expr = body;
2843 loop {
2844 match set_expr {
2845 SetExpr::Select(select) => {
2846 return Some((&select.projection, &select.distinct));
2847 }
2848 SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
2849 SetExpr::Query(query) => set_expr = &query.body,
2850 _ => return None,
2851 }
2852 }
2853}
2854
2855fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
2859 let mut num_attributes = 0usize;
2860 for item in projection.iter() {
2861 let SelectItem::Expr { expr, .. } = item else {
2862 return None;
2863 };
2864 match expr {
2865 Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
2866 return None;
2867 }
2868 _ => {
2869 num_attributes += 1;
2870 }
2871 }
2872 }
2873 Some(num_attributes)
2874}
2875
2876fn derive_order_by_from_projection(
2881 projection: &Vec<SelectItem<Raw>>,
2882 order_by: &Vec<OrderByExpr<Raw>>,
2883) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
2884 let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
2885 let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
2886 for order_by_expr in order_by.iter() {
2887 let query_expr = &order_by_expr.expr;
2888 let view_expr = match query_expr {
2889 Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
2890 _ => {
2891 if let Some(i) = projection.iter().position(|item| match item {
2893 SelectItem::Expr { expr, alias } => {
2894 expr == query_expr
2895 || match query_expr {
2896 Expr::Identifier(ident) => {
2897 ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
2898 }
2899 _ => false,
2900 }
2901 }
2902 SelectItem::Wildcard => false,
2903 }) {
2904 Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
2905 } else {
2906 let ident = Ident::new_unchecked(format!(
2909 "a{}",
2910 (projection.len() + extra_columns.len() + 1)
2911 ));
2912 extra_columns.push(SelectItem::Expr {
2913 expr: query_expr.clone(),
2914 alias: Some(ident.clone()),
2915 });
2916 Expr::Identifier(vec![ident])
2917 }
2918 }
2919 };
2920 view_order_by.push(OrderByExpr {
2921 expr: view_expr,
2922 asc: order_by_expr.asc,
2923 nulls_last: order_by_expr.nulls_last,
2924 });
2925 }
2926 (view_order_by, extra_columns)
2927}
2928
2929fn mutate(sql: &str) -> Vec<String> {
2931 let stmts = parser::parse_statements(sql).unwrap_or_default();
2932 let mut additional = Vec::new();
2933 for stmt in stmts {
2934 match stmt.ast {
2935 AstStatement::CreateTable(stmt) => additional.push(
2936 AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2939 name: None,
2940 in_cluster: None,
2941 on_name: RawItemName::Name(stmt.name.clone()),
2942 key_parts: Some(
2943 stmt.columns
2944 .iter()
2945 .map(|def| Expr::Identifier(vec![def.name.clone()]))
2946 .collect(),
2947 ),
2948 with_options: Vec::new(),
2949 if_not_exists: false,
2950 })
2951 .to_ast_string_stable(),
2952 ),
2953 _ => {}
2954 }
2955 }
2956 additional
2957}
2958
2959#[mz_ore::test]
2960#[cfg_attr(miri, ignore)] fn test_generate_view_sql() {
2962 let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
2963 let cases = vec![
2964 (("SELECT * FROM t", None, None),
2965 (
2966 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
2967 r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2968 r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2969 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2970 )),
2971 (("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
2972 (
2973 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2974 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
2975 r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2976 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2977 )),
2978 (("SELECT a, b, c FROM t1, t2", Some(3), None),
2979 (
2980 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2981 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2982 r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2983 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2984 )),
2985 (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
2988 (
2989 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
2990 r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2991 r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2992 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2993 )),
2994 (("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
2995 (
2996 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
2997 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
2998 r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
2999 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3000 )),
3001 (("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
3002 (
3003 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
3004 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
3005 r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
3006 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3007 )),
3008 (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
3009 (
3010 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
3011 r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3012 r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
3013 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3014 )),
3015 (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
3016 (
3017 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
3018 r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3019 r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
3020 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3021 )),
3022 (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
3023 (
3024 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
3025 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
3026 r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
3027 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3028 )),
3029 (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
3030 (
3031 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
3032 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
3033 r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
3034 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
3035 )),
3036 ];
3037 for ((sql, num_attributes, expected_column_names), expected) in cases {
3038 let view_sql =
3039 generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
3040 assert_eq!(expected, view_sql);
3041 }
3042}
3043
3044#[mz_ore::test]
3045fn test_mutate() {
3046 let cases = vec![
3047 ("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
3048 (
3049 "CREATE TABLE t (a INT)",
3050 vec![r#"CREATE INDEX ON "t" ("a")"#],
3051 ),
3052 (
3053 "CREATE TABLE t (a INT, b TEXT)",
3054 vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
3055 ),
3056 ("BAD SYNTAX", Vec::new()),
3058 ];
3059 for (sql, expected) in cases {
3060 let stmts = mutate(sql);
3061 assert_eq!(expected, stmts, "sql: {sql}");
3062 }
3063}