1use std::collections::BTreeMap;
29use std::error::Error;
30use std::fs::{File, OpenOptions};
31use std::io::{Read, Seek, SeekFrom, Write};
32use std::net::{IpAddr, Ipv4Addr, SocketAddr};
33use std::path::Path;
34use std::sync::Arc;
35use std::sync::LazyLock;
36use std::time::Duration;
37use std::{env, fmt, ops, str, thread};
38
39use anyhow::{anyhow, bail};
40use bytes::BytesMut;
41use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
42use fallible_iterator::FallibleIterator;
43use futures::sink::SinkExt;
44use itertools::Itertools;
45use maplit::btreemap;
46use md5::{Digest, Md5};
47use mz_adapter_types::bootstrap_builtin_cluster_config::{
48 ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR, BootstrapBuiltinClusterConfig,
49 CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR, PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
50 SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR, SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
51};
52use mz_catalog::config::ClusterReplicaSizeMap;
53use mz_controller::ControllerConfig;
54use mz_environmentd::CatalogConfig;
55use mz_license_keys::ValidatedLicenseKey;
56use mz_orchestrator_process::{ProcessOrchestrator, ProcessOrchestratorConfig};
57use mz_orchestrator_tracing::{TracingCliArgs, TracingOrchestrator};
58use mz_ore::cast::{CastFrom, ReinterpretCast};
59use mz_ore::channel::trigger;
60use mz_ore::error::ErrorExt;
61use mz_ore::metrics::MetricsRegistry;
62use mz_ore::now::SYSTEM_TIME;
63use mz_ore::retry::Retry;
64use mz_ore::task;
65use mz_ore::thread::{JoinHandleExt, JoinOnDropHandle};
66use mz_ore::tracing::TracingHandle;
67use mz_ore::url::SensitiveUrl;
68use mz_persist_client::PersistLocation;
69use mz_persist_client::cache::PersistClientCache;
70use mz_persist_client::cfg::PersistConfig;
71use mz_persist_client::rpc::{
72 MetricsSameProcessPubSubSender, PersistGrpcPubSubServer, PubSubClientConnection, PubSubSender,
73};
74use mz_pgrepr::{Interval, Jsonb, Numeric, UInt2, UInt4, UInt8, Value, oid};
75use mz_repr::ColumnName;
76use mz_repr::adt::date::Date;
77use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
78use mz_repr::adt::numeric;
79use mz_secrets::SecretsController;
80use mz_server_core::listeners::{
81 AllowedRoles, AuthenticatorKind, BaseListenerConfig, HttpListenerConfig, HttpRoutesEnabled,
82 ListenersConfig, SqlListenerConfig,
83};
84use mz_sql::ast::{Expr, Raw, Statement};
85use mz_sql::catalog::EnvironmentId;
86use mz_sql_parser::ast::display::AstDisplay;
87use mz_sql_parser::ast::{
88 CreateIndexStatement, CreateViewStatement, CteBlock, Distinct, DropObjectsStatement, Ident,
89 IfExistsBehavior, ObjectType, OrderByExpr, Query, RawItemName, Select, SelectItem,
90 SelectStatement, SetExpr, Statement as AstStatement, TableFactor, TableWithJoins,
91 UnresolvedItemName, UnresolvedObjectName, ViewDefinition,
92};
93use mz_sql_parser::parser;
94use mz_storage_types::connections::ConnectionContext;
95use postgres_protocol::types;
96use regex::Regex;
97use tempfile::TempDir;
98use tokio::net::TcpListener;
99use tokio::runtime::Runtime;
100use tokio::sync::oneshot;
101use tokio_postgres::types::{FromSql, Kind as PgKind, Type as PgType};
102use tokio_postgres::{NoTls, Row, SimpleQueryMessage};
103use tokio_stream::wrappers::TcpListenerStream;
104use tower_http::cors::AllowOrigin;
105use tracing::{error, info};
106use uuid::Uuid;
107use uuid::fmt::Simple;
108
109use crate::ast::{Location, Mode, Output, QueryOutput, Record, Sort, Type};
110use crate::util;
111
112#[derive(Debug)]
113pub enum Outcome<'a> {
114 Unsupported {
115 error: anyhow::Error,
116 location: Location,
117 },
118 ParseFailure {
119 error: anyhow::Error,
120 location: Location,
121 },
122 PlanFailure {
123 error: anyhow::Error,
124 location: Location,
125 },
126 UnexpectedPlanSuccess {
127 expected_error: &'a str,
128 location: Location,
129 },
130 WrongNumberOfRowsInserted {
131 expected_count: u64,
132 actual_count: u64,
133 location: Location,
134 },
135 WrongColumnCount {
136 expected_count: usize,
137 actual_count: usize,
138 location: Location,
139 },
140 WrongColumnNames {
141 expected_column_names: &'a Vec<ColumnName>,
142 actual_column_names: Vec<ColumnName>,
143 actual_output: Output,
144 location: Location,
145 },
146 OutputFailure {
147 expected_output: &'a Output,
148 actual_raw_output: Vec<Row>,
149 actual_output: Output,
150 location: Location,
151 },
152 InconsistentViewOutcome {
153 query_outcome: Box<Outcome<'a>>,
154 view_outcome: Box<Outcome<'a>>,
155 location: Location,
156 },
157 Bail {
158 cause: Box<Outcome<'a>>,
159 location: Location,
160 },
161 Warning {
162 cause: Box<Outcome<'a>>,
163 location: Location,
164 },
165 Success,
166}
167
168const NUM_OUTCOMES: usize = 12;
169const WARNING_OUTCOME: usize = NUM_OUTCOMES - 2;
170const SUCCESS_OUTCOME: usize = NUM_OUTCOMES - 1;
171
172impl<'a> Outcome<'a> {
173 fn code(&self) -> usize {
174 match self {
175 Outcome::Unsupported { .. } => 0,
176 Outcome::ParseFailure { .. } => 1,
177 Outcome::PlanFailure { .. } => 2,
178 Outcome::UnexpectedPlanSuccess { .. } => 3,
179 Outcome::WrongNumberOfRowsInserted { .. } => 4,
180 Outcome::WrongColumnCount { .. } => 5,
181 Outcome::WrongColumnNames { .. } => 6,
182 Outcome::OutputFailure { .. } => 7,
183 Outcome::InconsistentViewOutcome { .. } => 8,
184 Outcome::Bail { .. } => 9,
185 Outcome::Warning { .. } => 10,
186 Outcome::Success => 11,
187 }
188 }
189
190 fn success(&self) -> bool {
191 matches!(self, Outcome::Success)
192 }
193
194 fn failure(&self) -> bool {
195 !matches!(self, Outcome::Success) && !matches!(self, Outcome::Warning { .. })
196 }
197
198 fn err_msg(&self) -> Option<String> {
202 match self {
203 Outcome::Unsupported { error, .. }
204 | Outcome::ParseFailure { error, .. }
205 | Outcome::PlanFailure { error, .. } => Some(
206 regex::escape(
209 error.to_string().split('\n').next().unwrap(),
212 )
213 .replace(r"\#", "#"),
218 ),
219 _ => None,
220 }
221 }
222}
223
224impl fmt::Display for Outcome<'_> {
225 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
226 use Outcome::*;
227 const INDENT: &str = "\n ";
228 match self {
229 Unsupported { error, location } => write!(
230 f,
231 "Unsupported:{}:\n{}",
232 location,
233 error.display_with_causes()
234 ),
235 ParseFailure { error, location } => {
236 write!(
237 f,
238 "ParseFailure:{}:\n{}",
239 location,
240 error.display_with_causes()
241 )
242 }
243 PlanFailure { error, location } => write!(f, "PlanFailure:{}:\n{:#}", location, error),
244 UnexpectedPlanSuccess {
245 expected_error,
246 location,
247 } => write!(
248 f,
249 "UnexpectedPlanSuccess:{} expected error: {}",
250 location, expected_error
251 ),
252 WrongNumberOfRowsInserted {
253 expected_count,
254 actual_count,
255 location,
256 } => write!(
257 f,
258 "WrongNumberOfRowsInserted:{}{}expected: {}{}actually: {}",
259 location, INDENT, expected_count, INDENT, actual_count
260 ),
261 WrongColumnCount {
262 expected_count,
263 actual_count,
264 location,
265 } => write!(
266 f,
267 "WrongColumnCount:{}{}expected: {}{}actually: {}",
268 location, INDENT, expected_count, INDENT, actual_count
269 ),
270 WrongColumnNames {
271 expected_column_names,
272 actual_column_names,
273 actual_output: _,
274 location,
275 } => write!(
276 f,
277 "Wrong Column Names:{}:{}expected column names: {}{}inferred column names: {}",
278 location,
279 INDENT,
280 expected_column_names
281 .iter()
282 .map(|n| n.to_string())
283 .collect::<Vec<_>>()
284 .join(" "),
285 INDENT,
286 actual_column_names
287 .iter()
288 .map(|n| n.to_string())
289 .collect::<Vec<_>>()
290 .join(" ")
291 ),
292 OutputFailure {
293 expected_output,
294 actual_raw_output,
295 actual_output,
296 location,
297 } => write!(
298 f,
299 "OutputFailure:{}{}expected: {:?}{}actually: {:?}{}actual raw: {:?}",
300 location, INDENT, expected_output, INDENT, actual_output, INDENT, actual_raw_output
301 ),
302 InconsistentViewOutcome {
303 query_outcome,
304 view_outcome,
305 location,
306 } => write!(
307 f,
308 "InconsistentViewOutcome:{}{}expected from query: {:?}{}actually from indexed view: {:?}{}",
309 location, INDENT, query_outcome, INDENT, view_outcome, INDENT
310 ),
311 Bail { cause, location } => write!(f, "Bail:{} {}", location, cause),
312 Warning { cause, location } => write!(f, "Warning:{} {}", location, cause),
313 Success => f.write_str("Success"),
314 }
315 }
316}
317
318#[derive(Default, Debug)]
319pub struct Outcomes {
320 stats: [usize; NUM_OUTCOMES],
321 details: Vec<String>,
322}
323
324impl ops::AddAssign<Outcomes> for Outcomes {
325 fn add_assign(&mut self, rhs: Outcomes) {
326 for (lhs, rhs) in self.stats.iter_mut().zip(rhs.stats.iter()) {
327 *lhs += rhs
328 }
329 }
330}
331impl Outcomes {
332 pub fn any_failed(&self) -> bool {
333 self.stats[SUCCESS_OUTCOME] + self.stats[WARNING_OUTCOME] < self.stats.iter().sum::<usize>()
334 }
335
336 pub fn as_json(&self) -> serde_json::Value {
337 serde_json::json!({
338 "unsupported": self.stats[0],
339 "parse_failure": self.stats[1],
340 "plan_failure": self.stats[2],
341 "unexpected_plan_success": self.stats[3],
342 "wrong_number_of_rows_affected": self.stats[4],
343 "wrong_column_count": self.stats[5],
344 "wrong_column_names": self.stats[6],
345 "output_failure": self.stats[7],
346 "inconsistent_view_outcome": self.stats[8],
347 "bail": self.stats[9],
348 "warning": self.stats[10],
349 "success": self.stats[11],
350 })
351 }
352
353 pub fn display(&self, no_fail: bool, failure_details: bool) -> OutcomesDisplay<'_> {
354 OutcomesDisplay {
355 inner: self,
356 no_fail,
357 failure_details,
358 }
359 }
360}
361
362pub struct OutcomesDisplay<'a> {
363 inner: &'a Outcomes,
364 no_fail: bool,
365 failure_details: bool,
366}
367
368impl<'a> fmt::Display for OutcomesDisplay<'a> {
369 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
370 let total: usize = self.inner.stats.iter().sum();
371 if self.failure_details
372 && (self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] != total
373 || self.no_fail)
374 {
375 for outcome in &self.inner.details {
376 writeln!(f, "{}", outcome)?;
377 }
378 Ok(())
379 } else {
380 write!(
381 f,
382 "{}:",
383 if self.inner.stats[SUCCESS_OUTCOME] + self.inner.stats[WARNING_OUTCOME] == total {
384 "PASS"
385 } else if self.no_fail {
386 "FAIL-IGNORE"
387 } else {
388 "FAIL"
389 }
390 )?;
391 static NAMES: LazyLock<Vec<&'static str>> = LazyLock::new(|| {
392 vec![
393 "unsupported",
394 "parse-failure",
395 "plan-failure",
396 "unexpected-plan-success",
397 "wrong-number-of-rows-inserted",
398 "wrong-column-count",
399 "wrong-column-names",
400 "output-failure",
401 "inconsistent-view-outcome",
402 "bail",
403 "warning",
404 "success",
405 "total",
406 ]
407 });
408 for (i, n) in self.inner.stats.iter().enumerate() {
409 if *n > 0 {
410 write!(f, " {}={}", NAMES[i], n)?;
411 }
412 }
413 write!(f, " total={}", total)
414 }
415 }
416}
417
418struct QueryInfo {
419 is_select: bool,
420 num_attributes: Option<usize>,
421}
422
423enum PrepareQueryOutcome<'a> {
424 QueryPrepared(QueryInfo),
425 Outcome(Outcome<'a>),
426}
427
428pub struct Runner<'a> {
429 config: &'a RunConfig<'a>,
430 inner: Option<RunnerInner<'a>>,
431}
432
433pub struct RunnerInner<'a> {
434 server_addr: SocketAddr,
435 internal_server_addr: SocketAddr,
436 internal_http_server_addr: SocketAddr,
437 client: tokio_postgres::Client,
439 system_client: tokio_postgres::Client,
440 clients: BTreeMap<String, tokio_postgres::Client>,
441 auto_index_tables: bool,
442 auto_index_selects: bool,
443 auto_transactions: bool,
444 enable_table_keys: bool,
445 verbosity: u8,
446 stdout: &'a dyn WriteFmt,
447 _shutdown_trigger: trigger::Trigger,
448 _server_thread: JoinOnDropHandle<()>,
449 _temp_dir: TempDir,
450}
451
452#[derive(Debug)]
453pub struct Slt(Value);
454
455impl<'a> FromSql<'a> for Slt {
456 fn from_sql(
457 ty: &PgType,
458 mut raw: &'a [u8],
459 ) -> Result<Self, Box<dyn Error + 'static + Send + Sync>> {
460 Ok(match *ty {
461 PgType::ACLITEM => Self(Value::AclItem(AclItem::decode_binary(
462 types::bytea_from_sql(raw),
463 )?)),
464 PgType::BOOL => Self(Value::Bool(types::bool_from_sql(raw)?)),
465 PgType::BYTEA => Self(Value::Bytea(types::bytea_from_sql(raw).to_vec())),
466 PgType::CHAR => Self(Value::Char(u8::from_be_bytes(
467 types::char_from_sql(raw)?.to_be_bytes(),
468 ))),
469 PgType::FLOAT4 => Self(Value::Float4(types::float4_from_sql(raw)?)),
470 PgType::FLOAT8 => Self(Value::Float8(types::float8_from_sql(raw)?)),
471 PgType::DATE => Self(Value::Date(Date::from_pg_epoch(types::int4_from_sql(
472 raw,
473 )?)?)),
474 PgType::INT2 => Self(Value::Int2(types::int2_from_sql(raw)?)),
475 PgType::INT4 => Self(Value::Int4(types::int4_from_sql(raw)?)),
476 PgType::INT8 => Self(Value::Int8(types::int8_from_sql(raw)?)),
477 PgType::INTERVAL => Self(Value::Interval(Interval::from_sql(ty, raw)?)),
478 PgType::JSONB => Self(Value::Jsonb(Jsonb::from_sql(ty, raw)?)),
479 PgType::NAME => Self(Value::Name(types::text_from_sql(raw)?.to_string())),
480 PgType::NUMERIC => Self(Value::Numeric(Numeric::from_sql(ty, raw)?)),
481 PgType::OID => Self(Value::Oid(types::oid_from_sql(raw)?)),
482 PgType::REGCLASS => Self(Value::Oid(types::oid_from_sql(raw)?)),
483 PgType::REGPROC => Self(Value::Oid(types::oid_from_sql(raw)?)),
484 PgType::REGTYPE => Self(Value::Oid(types::oid_from_sql(raw)?)),
485 PgType::TEXT | PgType::BPCHAR | PgType::VARCHAR => {
486 Self(Value::Text(types::text_from_sql(raw)?.to_string()))
487 }
488 PgType::TIME => Self(Value::Time(NaiveTime::from_sql(ty, raw)?)),
489 PgType::TIMESTAMP => Self(Value::Timestamp(
490 NaiveDateTime::from_sql(ty, raw)?.try_into()?,
491 )),
492 PgType::TIMESTAMPTZ => Self(Value::TimestampTz(
493 DateTime::<Utc>::from_sql(ty, raw)?.try_into()?,
494 )),
495 PgType::UUID => Self(Value::Uuid(Uuid::from_sql(ty, raw)?)),
496 PgType::RECORD => {
497 let num_fields = read_be_i32(&mut raw)?;
498 let mut tuple = vec![];
499 for _ in 0..num_fields {
500 let oid = u32::reinterpret_cast(read_be_i32(&mut raw)?);
501 let typ = match PgType::from_oid(oid) {
502 Some(typ) => typ,
503 None => return Err("unknown oid".into()),
504 };
505 let v = read_value::<Option<Slt>>(&typ, &mut raw)?;
506 tuple.push(v.map(|v| v.0));
507 }
508 Self(Value::Record(tuple))
509 }
510 PgType::INT4_RANGE
511 | PgType::INT8_RANGE
512 | PgType::DATE_RANGE
513 | PgType::NUM_RANGE
514 | PgType::TS_RANGE
515 | PgType::TSTZ_RANGE => {
516 use mz_repr::adt::range::Range;
517 let range: Range<Slt> = Range::from_sql(ty, raw)?;
518 Self(Value::Range(range.into_bounds(|b| Box::new(b.0))))
519 }
520
521 _ => match ty.kind() {
522 PgKind::Array(arr_type) => {
523 let arr = types::array_from_sql(raw)?;
524 let elements: Vec<Option<Value>> = arr
525 .values()
526 .map(|v| match v {
527 Some(v) => Ok(Some(Slt::from_sql(arr_type, v)?)),
528 None => Ok(None),
529 })
530 .collect::<Vec<Option<Slt>>>()?
531 .into_iter()
532 .map(|v| v.map(|v| v.0))
534 .collect();
535
536 Self(Value::Array {
537 dims: arr
538 .dimensions()
539 .map(|d| {
540 Ok(mz_repr::adt::array::ArrayDimension {
541 lower_bound: isize::cast_from(d.lower_bound),
542 length: usize::try_from(d.len)
543 .expect("cannot have negative length"),
544 })
545 })
546 .collect()?,
547 elements,
548 })
549 }
550 _ => match ty.oid() {
551 oid::TYPE_UINT2_OID => Self(Value::UInt2(UInt2::from_sql(ty, raw)?)),
552 oid::TYPE_UINT4_OID => Self(Value::UInt4(UInt4::from_sql(ty, raw)?)),
553 oid::TYPE_UINT8_OID => Self(Value::UInt8(UInt8::from_sql(ty, raw)?)),
554 oid::TYPE_MZ_TIMESTAMP_OID => {
555 let s = types::text_from_sql(raw)?;
556 let t: mz_repr::Timestamp = s.parse()?;
557 Self(Value::MzTimestamp(t))
558 }
559 oid::TYPE_MZ_ACL_ITEM_OID => Self(Value::MzAclItem(MzAclItem::decode_binary(
560 types::bytea_from_sql(raw),
561 )?)),
562 _ => unreachable!(),
563 },
564 },
565 })
566 }
567 fn accepts(ty: &PgType) -> bool {
568 match ty.kind() {
569 PgKind::Array(_) | PgKind::Composite(_) => return true,
570 _ => {}
571 }
572 match ty.oid() {
573 oid::TYPE_UINT2_OID
574 | oid::TYPE_UINT4_OID
575 | oid::TYPE_UINT8_OID
576 | oid::TYPE_MZ_TIMESTAMP_OID
577 | oid::TYPE_MZ_ACL_ITEM_OID => return true,
578 _ => {}
579 }
580 matches!(
581 *ty,
582 PgType::ACLITEM
583 | PgType::BOOL
584 | PgType::BYTEA
585 | PgType::CHAR
586 | PgType::DATE
587 | PgType::FLOAT4
588 | PgType::FLOAT8
589 | PgType::INT2
590 | PgType::INT4
591 | PgType::INT8
592 | PgType::INTERVAL
593 | PgType::JSONB
594 | PgType::NAME
595 | PgType::NUMERIC
596 | PgType::OID
597 | PgType::REGCLASS
598 | PgType::REGPROC
599 | PgType::REGTYPE
600 | PgType::RECORD
601 | PgType::TEXT
602 | PgType::BPCHAR
603 | PgType::VARCHAR
604 | PgType::TIME
605 | PgType::TIMESTAMP
606 | PgType::TIMESTAMPTZ
607 | PgType::UUID
608 | PgType::INT4_RANGE
609 | PgType::INT4_RANGE_ARRAY
610 | PgType::INT8_RANGE
611 | PgType::INT8_RANGE_ARRAY
612 | PgType::DATE_RANGE
613 | PgType::DATE_RANGE_ARRAY
614 | PgType::NUM_RANGE
615 | PgType::NUM_RANGE_ARRAY
616 | PgType::TS_RANGE
617 | PgType::TS_RANGE_ARRAY
618 | PgType::TSTZ_RANGE
619 | PgType::TSTZ_RANGE_ARRAY
620 )
621 }
622}
623
624fn read_be_i32(buf: &mut &[u8]) -> Result<i32, Box<dyn Error + Sync + Send>> {
626 if buf.len() < 4 {
627 return Err("invalid buffer size".into());
628 }
629 let mut bytes = [0; 4];
630 bytes.copy_from_slice(&buf[..4]);
631 *buf = &buf[4..];
632 Ok(i32::from_be_bytes(bytes))
633}
634
635fn read_value<'a, T>(type_: &PgType, buf: &mut &'a [u8]) -> Result<T, Box<dyn Error + Sync + Send>>
637where
638 T: FromSql<'a>,
639{
640 let value = match usize::try_from(read_be_i32(buf)?) {
641 Err(_) => None,
642 Ok(len) => {
643 if len > buf.len() {
644 return Err("invalid buffer size".into());
645 }
646 let (head, tail) = buf.split_at(len);
647 *buf = tail;
648 Some(head)
649 }
650 };
651 T::from_sql_nullable(type_, value)
652}
653
654fn format_datum(d: Slt, typ: &Type, mode: Mode, col: usize) -> String {
655 match (typ, d.0) {
656 (Type::Bool, Value::Bool(b)) => b.to_string(),
657
658 (Type::Integer, Value::Int2(i)) => i.to_string(),
659 (Type::Integer, Value::Int4(i)) => i.to_string(),
660 (Type::Integer, Value::Int8(i)) => i.to_string(),
661 (Type::Integer, Value::UInt2(u)) => u.0.to_string(),
662 (Type::Integer, Value::UInt4(u)) => u.0.to_string(),
663 (Type::Integer, Value::UInt8(u)) => u.0.to_string(),
664 (Type::Integer, Value::Oid(i)) => i.to_string(),
665 #[allow(clippy::as_conversions)]
667 (Type::Integer, Value::Float4(f)) => format!("{}", f as i64),
668 #[allow(clippy::as_conversions)]
670 (Type::Integer, Value::Float8(f)) => format!("{}", f as i64),
671 (Type::Integer, Value::Text(_)) => "0".to_string(),
673 (Type::Integer, Value::Bool(b)) => i8::from(b).to_string(),
674 (Type::Integer, Value::Numeric(d)) => {
675 let mut d = d.0.0.clone();
676 let mut cx = numeric::cx_datum();
677 if mode == Mode::Standard {
679 cx.set_rounding(dec::Rounding::Down);
680 }
681 cx.round(&mut d);
682 numeric::munge_numeric(&mut d).unwrap();
683 d.to_standard_notation_string()
684 }
685
686 (Type::Real, Value::Int2(i)) => format!("{:.3}", i),
687 (Type::Real, Value::Int4(i)) => format!("{:.3}", i),
688 (Type::Real, Value::Int8(i)) => format!("{:.3}", i),
689 (Type::Real, Value::Float4(f)) => match mode {
690 Mode::Standard => format!("{:.3}", f),
691 Mode::Cockroach => format!("{}", f),
692 },
693 (Type::Real, Value::Float8(f)) => match mode {
694 Mode::Standard => format!("{:.3}", f),
695 Mode::Cockroach => format!("{}", f),
696 },
697 (Type::Real, Value::Numeric(d)) => match mode {
698 Mode::Standard => {
699 let mut d = d.0.0.clone();
700 if d.exponent() < -3 {
701 numeric::rescale(&mut d, 3).unwrap();
702 }
703 numeric::munge_numeric(&mut d).unwrap();
704 d.to_standard_notation_string()
705 }
706 Mode::Cockroach => d.0.0.to_standard_notation_string(),
707 },
708
709 (Type::Text, Value::Text(s)) => {
710 if s.is_empty() {
711 "(empty)".to_string()
712 } else {
713 s
714 }
715 }
716 (Type::Text, Value::Bool(b)) => b.to_string(),
717 (Type::Text, Value::Float4(f)) => format!("{:.3}", f),
718 (Type::Text, Value::Float8(f)) => format!("{:.3}", f),
719 (Type::Text, Value::Bytea(b)) => match str::from_utf8(&b) {
725 Ok(s) => s.to_string(),
726 Err(_) => format!("{:?}", b),
727 },
728 (Type::Text, Value::Numeric(d)) => d.0.0.to_standard_notation_string(),
729 (Type::Text, d) => {
732 let mut buf = BytesMut::new();
733 d.encode_text(&mut buf);
734 String::from_utf8_lossy(&buf).into_owned()
735 }
736
737 (Type::Oid, Value::Oid(o)) => o.to_string(),
738
739 (_, d) => panic!(
740 "Don't know how to format {:?} as {:?} in column {}",
741 d, typ, col,
742 ),
743 }
744}
745
746fn format_row(row: &Row, types: &[Type], mode: Mode) -> Vec<String> {
747 let mut formatted: Vec<String> = vec![];
748 for i in 0..row.len() {
749 let t: Option<Slt> = row.get::<usize, Option<Slt>>(i);
750 let t: Option<String> = t.map(|d| format_datum(d, &types[i], mode, i));
751 formatted.push(match t {
752 Some(t) => t,
753 None => "NULL".into(),
754 });
755 }
756
757 formatted
758}
759
760impl<'a> Runner<'a> {
761 pub async fn start(config: &'a RunConfig<'a>) -> Result<Runner<'a>, anyhow::Error> {
762 let mut runner = Self {
763 config,
764 inner: None,
765 };
766 runner.reset().await?;
767 Ok(runner)
768 }
769
770 pub async fn reset(&mut self) -> Result<(), anyhow::Error> {
771 drop(self.inner.take());
774 self.inner = Some(RunnerInner::start(self.config).await?);
775
776 Ok(())
777 }
778
779 async fn run_record<'r>(
780 &mut self,
781 record: &'r Record<'r>,
782 in_transaction: &mut bool,
783 ) -> Result<Outcome<'r>, anyhow::Error> {
784 if let Record::ResetServer = record {
785 self.reset().await?;
786 Ok(Outcome::Success)
787 } else {
788 self.inner
789 .as_mut()
790 .expect("RunnerInner missing")
791 .run_record(record, in_transaction)
792 .await
793 }
794 }
795
796 async fn check_catalog(&self) -> Result<(), anyhow::Error> {
797 self.inner
798 .as_ref()
799 .expect("RunnerInner missing")
800 .check_catalog()
801 .await
802 }
803
804 async fn reset_database(&mut self) -> Result<(), anyhow::Error> {
805 let inner = self.inner.as_mut().expect("RunnerInner missing");
806
807 inner.client.batch_execute("ROLLBACK;").await?;
808
809 inner
810 .system_client
811 .batch_execute(
812 "ROLLBACK;
813 SET cluster = mz_catalog_server;
814 RESET cluster_replica;",
815 )
816 .await?;
817
818 inner
819 .system_client
820 .batch_execute("ALTER SYSTEM RESET ALL")
821 .await?;
822
823 for row in inner
825 .system_client
826 .query("SELECT name FROM mz_databases", &[])
827 .await?
828 {
829 let name: &str = row.get("name");
830 inner
831 .system_client
832 .batch_execute(&format!("DROP DATABASE \"{name}\""))
833 .await?;
834 }
835 inner
836 .system_client
837 .batch_execute("CREATE DATABASE materialize")
838 .await?;
839
840 let mut needs_default_cluster = true;
844 for row in inner
845 .system_client
846 .query("SELECT name FROM mz_clusters WHERE id LIKE 'u%'", &[])
847 .await?
848 {
849 match row.get("name") {
850 "quickstart" => needs_default_cluster = false,
851 name => {
852 inner
853 .system_client
854 .batch_execute(&format!("DROP CLUSTER {name}"))
855 .await?
856 }
857 }
858 }
859 if needs_default_cluster {
860 inner
861 .system_client
862 .batch_execute("CREATE CLUSTER quickstart REPLICAS ()")
863 .await?;
864 }
865 let mut needs_default_replica = false;
866 let rows = inner
867 .system_client
868 .query(
869 "SELECT name, size FROM mz_cluster_replicas
870 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')
871 ORDER BY name",
872 &[],
873 )
874 .await?;
875 if rows.len() != self.config.replicas {
876 needs_default_replica = true;
877 } else {
878 for (i, row) in rows.iter().enumerate() {
879 let name: &str = row.get("name");
880 let size: &str = row.get("size");
881 if name != format!("r{i}") || size != self.config.replica_size.to_string() {
882 needs_default_replica = true;
883 break;
884 }
885 }
886 }
887
888 if needs_default_replica {
889 inner
890 .system_client
891 .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = false)")
892 .await?;
893 for row in inner
894 .system_client
895 .query(
896 "SELECT name FROM mz_cluster_replicas
897 WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')",
898 &[],
899 )
900 .await?
901 {
902 let name: &str = row.get("name");
903 inner
904 .system_client
905 .batch_execute(&format!("DROP CLUSTER REPLICA quickstart.{}", name))
906 .await?;
907 }
908 for i in 1..=self.config.replicas {
909 inner
910 .system_client
911 .batch_execute(&format!(
912 "CREATE CLUSTER REPLICA quickstart.r{i} SIZE '{}'",
913 self.config.replica_size
914 ))
915 .await?;
916 }
917 inner
918 .system_client
919 .batch_execute("ALTER CLUSTER quickstart SET (MANAGED = true)")
920 .await?;
921 }
922
923 inner
925 .system_client
926 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
927 .await?;
928 inner
929 .system_client
930 .batch_execute("GRANT CREATE ON DATABASE materialize TO materialize")
931 .await?;
932 inner
933 .system_client
934 .batch_execute("GRANT CREATE ON SCHEMA materialize.public TO materialize")
935 .await?;
936 inner
937 .system_client
938 .batch_execute("GRANT USAGE ON CLUSTER quickstart TO PUBLIC")
939 .await?;
940 inner
941 .system_client
942 .batch_execute("GRANT CREATE ON CLUSTER quickstart TO materialize")
943 .await?;
944
945 inner
948 .system_client
949 .simple_query("ALTER SYSTEM SET max_tables = 100")
950 .await?;
951
952 if inner.enable_table_keys {
953 inner
954 .system_client
955 .simple_query("ALTER SYSTEM SET unsafe_enable_table_keys = true")
956 .await?;
957 }
958
959 inner.ensure_fixed_features().await?;
960
961 inner.client = connect(inner.server_addr, None).await;
962 inner.system_client = connect(inner.internal_server_addr, Some("mz_system")).await;
963 inner.clients = BTreeMap::new();
964
965 Ok(())
966 }
967}
968
969impl<'a> RunnerInner<'a> {
970 pub async fn start(config: &RunConfig<'a>) -> Result<RunnerInner<'a>, anyhow::Error> {
971 let temp_dir = tempfile::tempdir()?;
972 let scratch_dir = tempfile::tempdir()?;
973 let environment_id = EnvironmentId::for_tests();
974 let (consensus_uri, timestamp_oracle_url): (SensitiveUrl, SensitiveUrl) = {
975 let postgres_url = &config.postgres_url;
976 let prefix = &config.prefix;
977 info!(%postgres_url, "starting server");
978 let (client, conn) = Retry::default()
979 .max_tries(5)
980 .retry_async(|_| async {
981 match tokio_postgres::connect(postgres_url, NoTls).await {
982 Ok(c) => Ok(c),
983 Err(e) => {
984 error!(%e, "failed to connect to postgres");
985 Err(e)
986 }
987 }
988 })
989 .await?;
990 task::spawn(|| "sqllogictest_connect", async move {
991 if let Err(e) = conn.await {
992 panic!("connection error: {}", e);
993 }
994 });
995 client
996 .batch_execute(&format!(
997 "DROP SCHEMA IF EXISTS {prefix}_tsoracle CASCADE;
998 CREATE SCHEMA IF NOT EXISTS {prefix}_consensus;
999 CREATE SCHEMA {prefix}_tsoracle;"
1000 ))
1001 .await?;
1002 (
1003 format!("{postgres_url}?options=--search_path={prefix}_consensus")
1004 .parse()
1005 .expect("invalid consensus URI"),
1006 format!("{postgres_url}?options=--search_path={prefix}_tsoracle")
1007 .parse()
1008 .expect("invalid timestamp oracle URI"),
1009 )
1010 };
1011
1012 let secrets_dir = temp_dir.path().join("secrets");
1013 let orchestrator = Arc::new(
1014 ProcessOrchestrator::new(ProcessOrchestratorConfig {
1015 image_dir: env::current_exe()?.parent().unwrap().to_path_buf(),
1016 suppress_output: false,
1017 environment_id: environment_id.to_string(),
1018 secrets_dir: secrets_dir.clone(),
1019 command_wrapper: config
1020 .orchestrator_process_wrapper
1021 .as_ref()
1022 .map_or(Ok(vec![]), |s| shell_words::split(s))?,
1023 propagate_crashes: true,
1024 tcp_proxy: None,
1025 scratch_directory: scratch_dir.path().to_path_buf(),
1026 })
1027 .await?,
1028 );
1029 let now = SYSTEM_TIME.clone();
1030 let metrics_registry = MetricsRegistry::new();
1031
1032 let persist_config = PersistConfig::new(
1033 &mz_environmentd::BUILD_INFO,
1034 now.clone(),
1035 mz_dyncfgs::all_dyncfgs(),
1036 );
1037 let persist_pubsub_server =
1038 PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
1039 let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();
1040 let persist_pubsub_tcp_listener =
1041 TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0))
1042 .await
1043 .expect("pubsub addr binding");
1044 let persist_pubsub_server_port = persist_pubsub_tcp_listener
1045 .local_addr()
1046 .expect("pubsub addr has local addr")
1047 .port();
1048 info!("listening for persist pubsub connections on localhost:{persist_pubsub_server_port}");
1049 mz_ore::task::spawn(|| "persist_pubsub_server", async move {
1050 persist_pubsub_server
1051 .serve_with_stream(TcpListenerStream::new(persist_pubsub_tcp_listener))
1052 .await
1053 .expect("success")
1054 });
1055 let persist_clients =
1056 PersistClientCache::new(persist_config, &metrics_registry, |cfg, metrics| {
1057 let sender: Arc<dyn PubSubSender> = Arc::new(MetricsSameProcessPubSubSender::new(
1058 cfg,
1059 persist_pubsub_client.sender,
1060 metrics,
1061 ));
1062 PubSubClientConnection::new(sender, persist_pubsub_client.receiver)
1063 });
1064 let persist_clients = Arc::new(persist_clients);
1065
1066 let secrets_controller = Arc::clone(&orchestrator);
1067 let connection_context = ConnectionContext::for_tests(orchestrator.reader());
1068 let orchestrator = Arc::new(TracingOrchestrator::new(
1069 orchestrator,
1070 config.tracing.clone(),
1071 ));
1072 let listeners_config = ListenersConfig {
1073 sql: btreemap! {
1074 "external".to_owned() => SqlListenerConfig {
1075 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1076 authenticator_kind: AuthenticatorKind::None,
1077 allowed_roles: AllowedRoles::Normal,
1078 enable_tls: false,
1079 },
1080 "internal".to_owned() => SqlListenerConfig {
1081 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1082 authenticator_kind: AuthenticatorKind::None,
1083 allowed_roles: AllowedRoles::Internal,
1084 enable_tls: false,
1085 },
1086 },
1087 http: btreemap![
1088 "external".to_owned() => HttpListenerConfig {
1089 base: BaseListenerConfig {
1090 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1091 authenticator_kind: AuthenticatorKind::None,
1092 allowed_roles: AllowedRoles::Normal,
1093 enable_tls: false
1094 },
1095 routes: HttpRoutesEnabled {
1096 base: true,
1097 webhook: true,
1098 internal: false,
1099 metrics: false,
1100 profiling: false,
1101 },
1102 },
1103 "internal".to_owned() => HttpListenerConfig {
1104 base: BaseListenerConfig {
1105 addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
1106 authenticator_kind: AuthenticatorKind::None,
1107 allowed_roles: AllowedRoles::NormalAndInternal,
1108 enable_tls: false
1109 },
1110 routes: HttpRoutesEnabled {
1111 base: true,
1112 webhook: true,
1113 internal: true,
1114 metrics: true,
1115 profiling: true,
1116 },
1117 },
1118 ],
1119 };
1120 let listeners = mz_environmentd::Listeners::bind(listeners_config).await?;
1121 let host_name = format!(
1122 "localhost:{}",
1123 listeners.http["external"].handle.local_addr.port()
1124 );
1125 let catalog_config = CatalogConfig {
1126 persist_clients: Arc::clone(&persist_clients),
1127 metrics: Arc::new(mz_catalog::durable::Metrics::new(&MetricsRegistry::new())),
1128 };
1129 let server_config = mz_environmentd::Config {
1130 catalog_config,
1131 timestamp_oracle_url: Some(timestamp_oracle_url),
1132 controller: ControllerConfig {
1133 build_info: &mz_environmentd::BUILD_INFO,
1134 orchestrator,
1135 clusterd_image: "clusterd".into(),
1136 init_container_image: None,
1137 deploy_generation: 0,
1138 persist_location: PersistLocation {
1139 blob_uri: format!(
1140 "file://{}/persist/blob",
1141 config.persist_dir.path().display()
1142 )
1143 .parse()
1144 .expect("invalid blob URI"),
1145 consensus_uri,
1146 },
1147 persist_clients,
1148 now: SYSTEM_TIME.clone(),
1149 metrics_registry: metrics_registry.clone(),
1150 persist_pubsub_url: format!("http://localhost:{}", persist_pubsub_server_port),
1151 secrets_args: mz_service::secrets::SecretsReaderCliArgs {
1152 secrets_reader: mz_service::secrets::SecretsControllerKind::LocalFile,
1153 secrets_reader_local_file_dir: Some(secrets_dir),
1154 secrets_reader_kubernetes_context: None,
1155 secrets_reader_aws_prefix: None,
1156 secrets_reader_name_prefix: None,
1157 },
1158 connection_context,
1159 },
1160 secrets_controller,
1161 cloud_resource_controller: None,
1162 tls: None,
1163 frontegg: None,
1164 cors_allowed_origin: AllowOrigin::list([]),
1165 unsafe_mode: true,
1166 all_features: false,
1167 metrics_registry,
1168 now,
1169 environment_id,
1170 cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(),
1171 bootstrap_default_cluster_replica_size: config.replica_size.to_string(),
1172 bootstrap_default_cluster_replication_factor: config
1173 .replicas
1174 .try_into()
1175 .expect("replicas must fit"),
1176 bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig {
1177 replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1178 size: config.replica_size.to_string(),
1179 },
1180 bootstrap_builtin_catalog_server_cluster_config: BootstrapBuiltinClusterConfig {
1181 replication_factor: CATALOG_SERVER_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1182 size: config.replica_size.to_string(),
1183 },
1184 bootstrap_builtin_probe_cluster_config: BootstrapBuiltinClusterConfig {
1185 replication_factor: PROBE_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1186 size: config.replica_size.to_string(),
1187 },
1188 bootstrap_builtin_support_cluster_config: BootstrapBuiltinClusterConfig {
1189 replication_factor: SUPPORT_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1190 size: config.replica_size.to_string(),
1191 },
1192 bootstrap_builtin_analytics_cluster_config: BootstrapBuiltinClusterConfig {
1193 replication_factor: ANALYTICS_CLUSTER_DEFAULT_REPLICATION_FACTOR,
1194 size: config.replica_size.to_string(),
1195 },
1196 system_parameter_defaults: {
1197 let mut params = BTreeMap::new();
1198 params.insert(
1199 "log_filter".to_string(),
1200 config.tracing.startup_log_filter.to_string(),
1201 );
1202 params.extend(config.system_parameter_defaults.clone());
1203 params
1204 },
1205 availability_zones: Default::default(),
1206 tracing_handle: config.tracing_handle.clone(),
1207 storage_usage_collection_interval: Duration::from_secs(3600),
1208 storage_usage_retention_period: None,
1209 segment_api_key: None,
1210 segment_client_side: false,
1211 test_only_dummy_segment_client: false,
1213 egress_addresses: vec![],
1214 aws_account_id: None,
1215 aws_privatelink_availability_zones: None,
1216 launchdarkly_sdk_key: None,
1217 launchdarkly_key_map: Default::default(),
1218 config_sync_file_path: None,
1219 config_sync_timeout: Duration::from_secs(30),
1220 config_sync_loop_interval: None,
1221 bootstrap_role: Some("materialize".into()),
1222 http_host_name: Some(host_name),
1223 internal_console_redirect_url: None,
1224 tls_reload_certs: mz_server_core::cert_reload_never_reload(),
1225 helm_chart_version: None,
1226 license_key: ValidatedLicenseKey::for_tests(),
1227 external_login_password_mz_system: None,
1228 };
1229 let (server_addr_tx, server_addr_rx): (oneshot::Sender<Result<_, anyhow::Error>>, _) =
1235 oneshot::channel();
1236 let (internal_server_addr_tx, internal_server_addr_rx) = oneshot::channel();
1237 let (internal_http_server_addr_tx, internal_http_server_addr_rx) = oneshot::channel();
1238 let (shutdown_trigger, shutdown_trigger_rx) = trigger::channel();
1239 let server_thread = thread::spawn(|| {
1240 let runtime = match Runtime::new() {
1241 Ok(runtime) => runtime,
1242 Err(e) => {
1243 server_addr_tx
1244 .send(Err(e.into()))
1245 .expect("receiver should not drop first");
1246 return;
1247 }
1248 };
1249 let server = match runtime.block_on(listeners.serve(server_config)) {
1250 Ok(runtime) => runtime,
1251 Err(e) => {
1252 server_addr_tx
1253 .send(Err(e.into()))
1254 .expect("receiver should not drop first");
1255 return;
1256 }
1257 };
1258 server_addr_tx
1259 .send(Ok(server.sql_listener_handles["external"].local_addr))
1260 .expect("receiver should not drop first");
1261 internal_server_addr_tx
1262 .send(server.sql_listener_handles["internal"].local_addr)
1263 .expect("receiver should not drop first");
1264 internal_http_server_addr_tx
1265 .send(server.http_listener_handles["internal"].local_addr)
1266 .expect("receiver should not drop first");
1267 let _ = runtime.block_on(shutdown_trigger_rx);
1268 });
1269 let server_addr = server_addr_rx.await??;
1270 let internal_server_addr = internal_server_addr_rx.await?;
1271 let internal_http_server_addr = internal_http_server_addr_rx.await?;
1272
1273 let system_client = connect(internal_server_addr, Some("mz_system")).await;
1274 let client = connect(server_addr, None).await;
1275
1276 let inner = RunnerInner {
1277 server_addr,
1278 internal_server_addr,
1279 internal_http_server_addr,
1280 _shutdown_trigger: shutdown_trigger,
1281 _server_thread: server_thread.join_on_drop(),
1282 _temp_dir: temp_dir,
1283 client,
1284 system_client,
1285 clients: BTreeMap::new(),
1286 auto_index_tables: config.auto_index_tables,
1287 auto_index_selects: config.auto_index_selects,
1288 auto_transactions: config.auto_transactions,
1289 enable_table_keys: config.enable_table_keys,
1290 verbosity: config.verbosity,
1291 stdout: config.stdout,
1292 };
1293 inner.ensure_fixed_features().await?;
1294
1295 Ok(inner)
1296 }
1297
1298 async fn ensure_fixed_features(&self) -> Result<(), anyhow::Error> {
1301 self.system_client
1305 .execute("ALTER SYSTEM SET enable_reduce_mfp_fusion = on", &[])
1306 .await?;
1307
1308 self.system_client
1310 .execute("ALTER SYSTEM SET unsafe_enable_unsafe_functions = on", &[])
1311 .await?;
1312 Ok(())
1313 }
1314
1315 async fn run_record<'r>(
1316 &mut self,
1317 record: &'r Record<'r>,
1318 in_transaction: &mut bool,
1319 ) -> Result<Outcome<'r>, anyhow::Error> {
1320 match &record {
1321 Record::Statement {
1322 expected_error,
1323 rows_affected,
1324 sql,
1325 location,
1326 } => {
1327 if self.auto_transactions && *in_transaction {
1328 self.client.execute("COMMIT", &[]).await?;
1329 *in_transaction = false;
1330 }
1331 match self
1332 .run_statement(*expected_error, *rows_affected, sql, location.clone())
1333 .await?
1334 {
1335 Outcome::Success => {
1336 if self.auto_index_tables {
1337 let additional = mutate(sql);
1338 for stmt in additional {
1339 self.client.execute(&stmt, &[]).await?;
1340 }
1341 }
1342 Ok(Outcome::Success)
1343 }
1344 other => {
1345 if expected_error.is_some() {
1346 Ok(other)
1347 } else {
1348 Ok(Outcome::Bail {
1352 cause: Box::new(other),
1353 location: location.clone(),
1354 })
1355 }
1356 }
1357 }
1358 }
1359 Record::Query {
1360 sql,
1361 output,
1362 location,
1363 } => {
1364 self.run_query(sql, output, location.clone(), in_transaction)
1365 .await
1366 }
1367 Record::Simple {
1368 conn,
1369 user,
1370 sql,
1371 sort,
1372 output,
1373 location,
1374 ..
1375 } => {
1376 self.run_simple(*conn, *user, sql, sort.clone(), output, location.clone())
1377 .await
1378 }
1379 Record::Copy {
1380 table_name,
1381 tsv_path,
1382 } => {
1383 let tsv = tokio::fs::read(tsv_path).await?;
1384 let copy = self
1385 .client
1386 .copy_in(&*format!("COPY {} FROM STDIN", table_name))
1387 .await?;
1388 tokio::pin!(copy);
1389 copy.send(bytes::Bytes::from(tsv)).await?;
1390 copy.finish().await?;
1391 Ok(Outcome::Success)
1392 }
1393 _ => Ok(Outcome::Success),
1394 }
1395 }
1396
1397 async fn run_statement<'r>(
1398 &self,
1399 expected_error: Option<&'r str>,
1400 expected_rows_affected: Option<u64>,
1401 sql: &'r str,
1402 location: Location,
1403 ) -> Result<Outcome<'r>, anyhow::Error> {
1404 static UNSUPPORTED_INDEX_STATEMENT_REGEX: LazyLock<Regex> =
1405 LazyLock::new(|| Regex::new("^(CREATE UNIQUE INDEX|REINDEX)").unwrap());
1406 if UNSUPPORTED_INDEX_STATEMENT_REGEX.is_match(sql) {
1407 return Ok(Outcome::Success);
1409 }
1410
1411 match self.client.execute(sql, &[]).await {
1412 Ok(actual) => {
1413 if let Some(expected_error) = expected_error {
1414 return Ok(Outcome::UnexpectedPlanSuccess {
1415 expected_error,
1416 location,
1417 });
1418 }
1419 match expected_rows_affected {
1420 None => Ok(Outcome::Success),
1421 Some(expected) => {
1422 if expected != actual {
1423 Ok(Outcome::WrongNumberOfRowsInserted {
1424 expected_count: expected,
1425 actual_count: actual,
1426 location,
1427 })
1428 } else {
1429 Ok(Outcome::Success)
1430 }
1431 }
1432 }
1433 }
1434 Err(error) => {
1435 if let Some(expected_error) = expected_error {
1436 if Regex::new(expected_error)?.is_match(&format!("{:#}", error)) {
1437 return Ok(Outcome::Success);
1438 }
1439 }
1440 Ok(Outcome::PlanFailure {
1441 error: anyhow!(error),
1442 location,
1443 })
1444 }
1445 }
1446 }
1447
1448 async fn prepare_query<'r>(
1449 &self,
1450 sql: &str,
1451 output: &'r Result<QueryOutput<'_>, &'r str>,
1452 location: Location,
1453 in_transaction: &mut bool,
1454 ) -> Result<PrepareQueryOutcome<'r>, anyhow::Error> {
1455 let statements = match mz_sql::parse::parse(sql) {
1457 Ok(statements) => statements,
1458 Err(e) => match output {
1459 Ok(_) => {
1460 return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1461 error: e.into(),
1462 location,
1463 }));
1464 }
1465 Err(expected_error) => {
1466 if Regex::new(expected_error)?.is_match(&format!("{:#}", e)) {
1467 return Ok(PrepareQueryOutcome::Outcome(Outcome::Success));
1468 } else {
1469 return Ok(PrepareQueryOutcome::Outcome(Outcome::ParseFailure {
1470 error: e.into(),
1471 location,
1472 }));
1473 }
1474 }
1475 },
1476 };
1477 let statement = match &*statements {
1478 [] => bail!("Got zero statements?"),
1479 [statement] => &statement.ast,
1480 _ => bail!("Got multiple statements: {:?}", statements),
1481 };
1482 let (is_select, num_attributes) = match statement {
1483 Statement::Select(stmt) => (true, derive_num_attributes(&stmt.query.body)),
1484 _ => (false, None),
1485 };
1486
1487 match output {
1488 Ok(_) => {
1489 if self.auto_transactions && !*in_transaction {
1490 self.client.execute("BEGIN", &[]).await?;
1492 *in_transaction = true;
1493 }
1494 }
1495 Err(_) => {
1496 if self.auto_transactions && *in_transaction {
1497 self.client.execute("COMMIT", &[]).await?;
1498 *in_transaction = false;
1499 }
1500 }
1501 }
1502
1503 match statement {
1507 Statement::Show(..) => {
1508 if self.auto_transactions && *in_transaction {
1509 self.client.execute("COMMIT", &[]).await?;
1510 *in_transaction = false;
1511 }
1512 }
1513 _ => (),
1514 }
1515 Ok(PrepareQueryOutcome::QueryPrepared(QueryInfo {
1516 is_select,
1517 num_attributes,
1518 }))
1519 }
1520
1521 async fn execute_query<'r>(
1522 &self,
1523 sql: &str,
1524 output: &'r Result<QueryOutput<'_>, &'r str>,
1525 location: Location,
1526 ) -> Result<Outcome<'r>, anyhow::Error> {
1527 let rows = match self.client.query(sql, &[]).await {
1528 Ok(rows) => rows,
1529 Err(error) => {
1530 return match output {
1531 Ok(_) => {
1532 let error_string = format!("{}", error);
1533 if error_string.contains("supported") || error_string.contains("overload") {
1534 Ok(Outcome::Unsupported {
1536 error: anyhow!(error),
1537 location,
1538 })
1539 } else {
1540 Ok(Outcome::PlanFailure {
1541 error: anyhow!(error),
1542 location,
1543 })
1544 }
1545 }
1546 Err(expected_error) => {
1547 if Regex::new(expected_error)?.is_match(&format!("{:#}", error)) {
1548 Ok(Outcome::Success)
1549 } else {
1550 Ok(Outcome::PlanFailure {
1551 error: anyhow!(error),
1552 location,
1553 })
1554 }
1555 }
1556 };
1557 }
1558 };
1559
1560 let QueryOutput {
1562 sort,
1563 types: expected_types,
1564 column_names: expected_column_names,
1565 output: expected_output,
1566 mode,
1567 ..
1568 } = match output {
1569 Err(expected_error) => {
1570 return Ok(Outcome::UnexpectedPlanSuccess {
1571 expected_error,
1572 location,
1573 });
1574 }
1575 Ok(query_output) => query_output,
1576 };
1577
1578 let mut formatted_rows = vec![];
1580 for row in &rows {
1581 if row.len() != expected_types.len() {
1582 return Ok(Outcome::WrongColumnCount {
1583 expected_count: expected_types.len(),
1584 actual_count: row.len(),
1585 location,
1586 });
1587 }
1588 let row = format_row(row, expected_types, *mode);
1589 formatted_rows.push(row);
1590 }
1591
1592 if let Sort::Row = sort {
1594 formatted_rows.sort();
1595 }
1596 let mut values = formatted_rows.into_iter().flatten().collect::<Vec<_>>();
1597 if let Sort::Value = sort {
1598 values.sort();
1599 }
1600
1601 if let Some(row) = rows.get(0) {
1603 if let Some(expected_column_names) = expected_column_names {
1605 let actual_column_names = row
1606 .columns()
1607 .iter()
1608 .map(|t| ColumnName::from(t.name()))
1609 .collect::<Vec<_>>();
1610 if expected_column_names != &actual_column_names {
1611 return Ok(Outcome::WrongColumnNames {
1612 expected_column_names,
1613 actual_column_names,
1614 actual_output: Output::Values(values),
1615 location,
1616 });
1617 }
1618 }
1619 }
1620
1621 match expected_output {
1623 Output::Values(expected_values) => {
1624 if values != *expected_values {
1625 return Ok(Outcome::OutputFailure {
1626 expected_output,
1627 actual_raw_output: rows,
1628 actual_output: Output::Values(values),
1629 location,
1630 });
1631 }
1632 }
1633 Output::Hashed {
1634 num_values,
1635 md5: expected_md5,
1636 } => {
1637 let mut hasher = Md5::new();
1638 for value in &values {
1639 hasher.update(value);
1640 hasher.update("\n");
1641 }
1642 let md5 = format!("{:x}", hasher.finalize());
1643 if values.len() != *num_values || md5 != *expected_md5 {
1644 return Ok(Outcome::OutputFailure {
1645 expected_output,
1646 actual_raw_output: rows,
1647 actual_output: Output::Hashed {
1648 num_values: values.len(),
1649 md5,
1650 },
1651 location,
1652 });
1653 }
1654 }
1655 }
1656
1657 Ok(Outcome::Success)
1658 }
1659
1660 async fn execute_view_inner<'r>(
1661 &self,
1662 sql: &str,
1663 output: &'r Result<QueryOutput<'_>, &'r str>,
1664 location: Location,
1665 ) -> Result<Option<Outcome<'r>>, anyhow::Error> {
1666 print_sql_if(self.stdout, sql, self.verbosity >= 2);
1667 let sql_result = self.client.execute(sql, &[]).await;
1668
1669 let tentative_outcome = if let Err(view_error) = sql_result {
1671 if let Err(expected_error) = output {
1672 if Regex::new(expected_error)?.is_match(&format!("{:#}", view_error)) {
1673 Some(Outcome::Success)
1674 } else {
1675 Some(Outcome::PlanFailure {
1676 error: view_error.into(),
1677 location: location.clone(),
1678 })
1679 }
1680 } else {
1681 Some(Outcome::PlanFailure {
1682 error: view_error.into(),
1683 location: location.clone(),
1684 })
1685 }
1686 } else {
1687 None
1688 };
1689 Ok(tentative_outcome)
1690 }
1691
1692 async fn execute_view<'r>(
1693 &self,
1694 sql: &str,
1695 num_attributes: Option<usize>,
1696 output: &'r Result<QueryOutput<'_>, &'r str>,
1697 location: Location,
1698 ) -> Result<Outcome<'r>, anyhow::Error> {
1699 let expected_column_names = if let Ok(QueryOutput { column_names, .. }) = output {
1701 column_names.clone()
1702 } else {
1703 None
1704 };
1705 let (create_view, create_index, view_sql, drop_view) = generate_view_sql(
1706 sql,
1707 Uuid::new_v4().as_simple(),
1708 num_attributes,
1709 expected_column_names,
1710 );
1711 let tentative_outcome = self
1712 .execute_view_inner(create_view.as_str(), output, location.clone())
1713 .await?;
1714
1715 if let Some(view_outcome) = tentative_outcome {
1718 return Ok(view_outcome);
1719 }
1720
1721 let tentative_outcome = self
1722 .execute_view_inner(create_index.as_str(), output, location.clone())
1723 .await?;
1724
1725 let view_outcome;
1726 if let Some(outcome) = tentative_outcome {
1727 view_outcome = outcome;
1728 } else {
1729 print_sql_if(self.stdout, view_sql.as_str(), self.verbosity >= 2);
1730 view_outcome = self
1731 .execute_query(view_sql.as_str(), output, location.clone())
1732 .await?;
1733 }
1734
1735 print_sql_if(self.stdout, drop_view.as_str(), self.verbosity >= 2);
1737 self.client.execute(drop_view.as_str(), &[]).await?;
1738
1739 Ok(view_outcome)
1740 }
1741
1742 async fn run_query<'r>(
1743 &self,
1744 sql: &'r str,
1745 output: &'r Result<QueryOutput<'_>, &'r str>,
1746 location: Location,
1747 in_transaction: &mut bool,
1748 ) -> Result<Outcome<'r>, anyhow::Error> {
1749 let prepare_outcome = self
1750 .prepare_query(sql, output, location.clone(), in_transaction)
1751 .await?;
1752 match prepare_outcome {
1753 PrepareQueryOutcome::QueryPrepared(QueryInfo {
1754 is_select,
1755 num_attributes,
1756 }) => {
1757 let query_outcome = self.execute_query(sql, output, location.clone()).await?;
1758 if is_select && self.auto_index_selects {
1759 let view_outcome = self
1760 .execute_view(sql, None, output, location.clone())
1761 .await?;
1762
1763 if std::mem::discriminant::<Outcome>(&query_outcome)
1768 != std::mem::discriminant::<Outcome>(&view_outcome)
1769 {
1770 let view_outcome = if num_attributes.is_some() {
1775 self.execute_view(sql, num_attributes, output, location.clone())
1776 .await?
1777 } else {
1778 view_outcome
1779 };
1780
1781 if std::mem::discriminant::<Outcome>(&query_outcome)
1782 != std::mem::discriminant::<Outcome>(&view_outcome)
1783 {
1784 let inconsistent_view_outcome = Outcome::InconsistentViewOutcome {
1785 query_outcome: Box::new(query_outcome),
1786 view_outcome: Box::new(view_outcome),
1787 location: location.clone(),
1788 };
1789 let outcome = if should_warn(&inconsistent_view_outcome) {
1792 Outcome::Warning {
1793 cause: Box::new(inconsistent_view_outcome),
1794 location: location.clone(),
1795 }
1796 } else {
1797 inconsistent_view_outcome
1798 };
1799 return Ok(outcome);
1800 }
1801 }
1802 }
1803 Ok(query_outcome)
1804 }
1805 PrepareQueryOutcome::Outcome(outcome) => Ok(outcome),
1806 }
1807 }
1808
1809 async fn get_conn(
1810 &mut self,
1811 name: Option<&str>,
1812 user: Option<&str>,
1813 ) -> &tokio_postgres::Client {
1814 match name {
1815 None => &self.client,
1816 Some(name) => {
1817 if !self.clients.contains_key(name) {
1818 let addr = if matches!(user, Some("mz_system") | Some("mz_support")) {
1819 self.internal_server_addr
1820 } else {
1821 self.server_addr
1822 };
1823 let client = connect(addr, user).await;
1824 self.clients.insert(name.into(), client);
1825 }
1826 self.clients.get(name).unwrap()
1827 }
1828 }
1829 }
1830
1831 async fn run_simple<'r>(
1832 &mut self,
1833 conn: Option<&'r str>,
1834 user: Option<&'r str>,
1835 sql: &'r str,
1836 sort: Sort,
1837 output: &'r Output,
1838 location: Location,
1839 ) -> Result<Outcome<'r>, anyhow::Error> {
1840 let client = self.get_conn(conn, user).await;
1841 let actual = Output::Values(match client.simple_query(sql).await {
1842 Ok(result) => {
1843 let mut rows = Vec::new();
1844
1845 for m in result.into_iter() {
1846 match m {
1847 SimpleQueryMessage::Row(row) => {
1848 let mut s = vec![];
1849 for i in 0..row.len() {
1850 s.push(row.get(i).unwrap_or("NULL"));
1851 }
1852 rows.push(s.join(","));
1853 }
1854 SimpleQueryMessage::CommandComplete(count) => {
1855 rows.push(format!("COMPLETE {}", count));
1858 }
1859 SimpleQueryMessage::RowDescription(_) => {}
1860 _ => panic!("unexpected"),
1861 }
1862 }
1863
1864 if let Sort::Row = sort {
1865 rows.sort();
1866 }
1867
1868 rows
1869 }
1870 Err(error) => error.to_string().lines().map(|s| s.to_string()).collect(),
1874 });
1875 if *output != actual {
1876 Ok(Outcome::OutputFailure {
1877 expected_output: output,
1878 actual_raw_output: vec![],
1879 actual_output: actual,
1880 location,
1881 })
1882 } else {
1883 Ok(Outcome::Success)
1884 }
1885 }
1886
1887 async fn check_catalog(&self) -> Result<(), anyhow::Error> {
1888 let url = format!(
1889 "http://{}/api/catalog/check",
1890 self.internal_http_server_addr
1891 );
1892 let response: serde_json::Value = reqwest::get(&url).await?.json().await?;
1893
1894 if let Some(inconsistencies) = response.get("err") {
1895 let inconsistencies = serde_json::to_string_pretty(&inconsistencies)
1896 .expect("serializing Value cannot fail");
1897 Err(anyhow::anyhow!("Catalog inconsistency\n{inconsistencies}"))
1898 } else {
1899 Ok(())
1900 }
1901 }
1902}
1903
1904async fn connect(addr: SocketAddr, user: Option<&str>) -> tokio_postgres::Client {
1905 let (client, connection) = tokio_postgres::connect(
1906 &format!(
1907 "host={} port={} user={}",
1908 addr.ip(),
1909 addr.port(),
1910 user.unwrap_or("materialize")
1911 ),
1912 NoTls,
1913 )
1914 .await
1915 .unwrap();
1916
1917 task::spawn(|| "sqllogictest_connect", async move {
1918 if let Err(e) = connection.await {
1919 eprintln!("connection error: {}", e);
1920 }
1921 });
1922 client
1923}
1924
1925pub trait WriteFmt {
1926 fn write_fmt(&self, fmt: fmt::Arguments<'_>);
1927}
1928
1929pub struct RunConfig<'a> {
1930 pub stdout: &'a dyn WriteFmt,
1931 pub stderr: &'a dyn WriteFmt,
1932 pub verbosity: u8,
1933 pub postgres_url: String,
1934 pub prefix: String,
1935 pub no_fail: bool,
1936 pub fail_fast: bool,
1937 pub auto_index_tables: bool,
1938 pub auto_index_selects: bool,
1939 pub auto_transactions: bool,
1940 pub enable_table_keys: bool,
1941 pub orchestrator_process_wrapper: Option<String>,
1942 pub tracing: TracingCliArgs,
1943 pub tracing_handle: TracingHandle,
1944 pub system_parameter_defaults: BTreeMap<String, String>,
1945 pub persist_dir: TempDir,
1950 pub replicas: usize,
1951 pub replica_size: usize,
1952}
1953
1954fn print_record(config: &RunConfig<'_>, record: &Record) {
1955 match record {
1956 Record::Statement { sql, .. } | Record::Query { sql, .. } => print_sql(config.stdout, sql),
1957 _ => (),
1958 }
1959}
1960
1961fn print_sql_if<'a>(stdout: &'a dyn WriteFmt, sql: &str, cond: bool) {
1962 if cond {
1963 print_sql(stdout, sql)
1964 }
1965}
1966
1967fn print_sql<'a>(stdout: &'a dyn WriteFmt, sql: &str) {
1968 writeln!(stdout, "{}", crate::util::indent(sql, 4))
1969}
1970
1971const INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS: [&str; 9] = [
1974 "cannot materialize call to",
1977 "SHOW commands are not allowed in views",
1978 "cannot create view with unstable dependencies",
1979 "cannot use wildcard expansions or NATURAL JOINs in a view that depends on system objects",
1980 "no valid schema selected",
1981 r#"system schema '\w+' cannot be modified"#,
1982 r#"permission denied for (SCHEMA|CLUSTER) "(\w+\.)?\w+""#,
1983 r#"column "[\w\?]+" specified more than once"#,
1988 r#"column "(\w+\.)?\w+" does not exist"#,
1989];
1990
1991fn should_warn(outcome: &Outcome) -> bool {
1996 match outcome {
1997 Outcome::InconsistentViewOutcome {
1998 query_outcome,
1999 view_outcome,
2000 ..
2001 } => match (query_outcome.as_ref(), view_outcome.as_ref()) {
2002 (Outcome::Success, Outcome::PlanFailure { error, .. }) => {
2003 INCONSISTENT_VIEW_OUTCOME_WARNING_REGEXPS.iter().any(|s| {
2004 Regex::new(s)
2005 .expect("unexpected error in regular expression parsing")
2006 .is_match(&format!("{:#}", error))
2007 })
2008 }
2009 _ => false,
2010 },
2011 _ => false,
2012 }
2013}
2014
2015pub async fn run_string(
2016 runner: &mut Runner<'_>,
2017 source: &str,
2018 input: &str,
2019) -> Result<Outcomes, anyhow::Error> {
2020 runner.reset_database().await?;
2021
2022 let mut outcomes = Outcomes::default();
2023 let mut parser = crate::parser::Parser::new(source, input);
2024 let mut in_transaction = false;
2027 writeln!(runner.config.stdout, "--- {}", source);
2028
2029 for record in parser.parse_records()? {
2030 if runner.config.verbosity >= 2 {
2034 print_record(runner.config, &record);
2035 }
2036
2037 let outcome = runner
2038 .run_record(&record, &mut in_transaction)
2039 .await
2040 .map_err(|err| format!("In {}:\n{}", source, err))
2041 .unwrap();
2042
2043 if runner.config.verbosity >= 1 && !outcome.success() {
2045 if runner.config.verbosity < 2 {
2046 if !outcome.failure() {
2053 writeln!(
2054 runner.config.stdout,
2055 "{}",
2056 util::indent("Warning detected for: ", 4)
2057 );
2058 }
2059 print_record(runner.config, &record);
2060 }
2061 if runner.config.verbosity >= 2 || outcome.failure() {
2062 writeln!(
2063 runner.config.stdout,
2064 "{}",
2065 util::indent(&outcome.to_string(), 4)
2066 );
2067 writeln!(runner.config.stdout, "{}", util::indent("----", 4));
2068 }
2069 }
2070
2071 outcomes.stats[outcome.code()] += 1;
2072 if outcome.failure() {
2073 outcomes.details.push(format!("{}", outcome));
2074 }
2075
2076 if let Outcome::Bail { .. } = outcome {
2077 break;
2078 }
2079
2080 if runner.config.fail_fast && outcome.failure() {
2081 break;
2082 }
2083 }
2084 Ok(outcomes)
2085}
2086
2087pub async fn run_file(runner: &mut Runner<'_>, filename: &Path) -> Result<Outcomes, anyhow::Error> {
2088 let mut input = String::new();
2089 File::open(filename)?.read_to_string(&mut input)?;
2090 let outcomes = run_string(runner, &format!("{}", filename.display()), &input).await?;
2091 runner.check_catalog().await?;
2092
2093 Ok(outcomes)
2094}
2095
2096pub async fn rewrite_file(runner: &mut Runner<'_>, filename: &Path) -> Result<(), anyhow::Error> {
2097 runner.reset_database().await?;
2098
2099 let mut file = OpenOptions::new().read(true).write(true).open(filename)?;
2100
2101 let mut input = String::new();
2102 file.read_to_string(&mut input)?;
2103
2104 let mut buf = RewriteBuffer::new(&input);
2105
2106 let mut parser = crate::parser::Parser::new(filename.to_str().unwrap_or(""), &input);
2107 writeln!(runner.config.stdout, "--- {}", filename.display());
2108 let mut in_transaction = false;
2109
2110 fn append_values_output(
2111 buf: &mut RewriteBuffer,
2112 input: &String,
2113 expected_output: &str,
2114 mode: &Mode,
2115 types: &Vec<Type>,
2116 column_names: Option<&Vec<ColumnName>>,
2117 actual_output: &Vec<String>,
2118 multiline: bool,
2119 ) {
2120 buf.append_header(input, expected_output, column_names);
2121
2122 for (i, row) in actual_output.chunks(types.len()).enumerate() {
2123 match mode {
2124 Mode::Cockroach => {
2127 if i != 0 {
2128 buf.append("\n");
2129 }
2130
2131 if row.len() == 0 {
2132 } else if row.len() == 1 {
2134 if multiline {
2137 buf.append(&row[0]);
2138 } else {
2139 buf.append(&row[0].replace('\n', "⏎"))
2140 }
2141 } else {
2142 buf.append(
2145 &row.iter()
2146 .map(|col| {
2147 let mut col = col.replace(' ', "␠");
2148 if !multiline {
2149 col = col.replace('\n', "⏎");
2150 }
2151 col
2152 })
2153 .join(" "),
2154 );
2155 }
2156 }
2157 Mode::Standard => {
2162 for (j, col) in row.iter().enumerate() {
2163 if i != 0 || j != 0 {
2164 buf.append("\n");
2165 }
2166 buf.append(&if multiline {
2167 col.clone()
2168 } else {
2169 col.replace('\n', "⏎")
2170 });
2171 }
2172 }
2173 }
2174 }
2175 }
2176
2177 for record in parser.parse_records()? {
2178 let outcome = runner.run_record(&record, &mut in_transaction).await?;
2179
2180 match (&record, &outcome) {
2181 (
2184 Record::Query {
2185 output:
2186 Ok(QueryOutput {
2187 mode,
2188 output: Output::Values(_),
2189 output_str: expected_output,
2190 types,
2191 column_names,
2192 multiline,
2193 ..
2194 }),
2195 ..
2196 },
2197 Outcome::OutputFailure {
2198 actual_output: Output::Values(actual_output),
2199 ..
2200 },
2201 ) => {
2202 append_values_output(
2203 &mut buf,
2204 &input,
2205 expected_output,
2206 mode,
2207 types,
2208 column_names.as_ref(),
2209 actual_output,
2210 *multiline,
2211 );
2212 }
2213 (
2214 Record::Query {
2215 output:
2216 Ok(QueryOutput {
2217 mode,
2218 output: Output::Values(_),
2219 output_str: expected_output,
2220 types,
2221 multiline,
2222 ..
2223 }),
2224 ..
2225 },
2226 Outcome::WrongColumnNames {
2227 actual_column_names,
2228 actual_output: Output::Values(actual_output),
2229 ..
2230 },
2231 ) => {
2232 append_values_output(
2233 &mut buf,
2234 &input,
2235 expected_output,
2236 mode,
2237 types,
2238 Some(actual_column_names),
2239 actual_output,
2240 *multiline,
2241 );
2242 }
2243 (
2244 Record::Query {
2245 output:
2246 Ok(QueryOutput {
2247 output: Output::Hashed { .. },
2248 output_str: expected_output,
2249 column_names,
2250 ..
2251 }),
2252 ..
2253 },
2254 Outcome::OutputFailure {
2255 actual_output: Output::Hashed { num_values, md5 },
2256 ..
2257 },
2258 ) => {
2259 buf.append_header(&input, expected_output, column_names.as_ref());
2260
2261 buf.append(format!("{} values hashing to {}\n", num_values, md5).as_str())
2262 }
2263 (
2264 Record::Simple {
2265 output_str: expected_output,
2266 ..
2267 },
2268 Outcome::OutputFailure {
2269 actual_output: Output::Values(actual_output),
2270 ..
2271 },
2272 ) => {
2273 buf.append_header(&input, expected_output, None);
2274
2275 for (i, row) in actual_output.iter().enumerate() {
2276 if i != 0 {
2277 buf.append("\n");
2278 }
2279 buf.append(row);
2280 }
2281 }
2282 (
2283 Record::Query {
2284 sql,
2285 output: Err(err),
2286 ..
2287 },
2288 outcome,
2289 )
2290 | (
2291 Record::Statement {
2292 expected_error: Some(err),
2293 sql,
2294 ..
2295 },
2296 outcome,
2297 ) if outcome.err_msg().is_some() => {
2298 buf.rewrite_expected_error(&input, err, &outcome.err_msg().unwrap(), sql)
2299 }
2300 (_, Outcome::Success) => {}
2301 _ => bail!("unexpected: {:?} {:?}", record, outcome),
2302 }
2303 }
2304
2305 file.set_len(0)?;
2306 file.seek(SeekFrom::Start(0))?;
2307 file.write_all(buf.finish().as_bytes())?;
2308 file.sync_all()?;
2309 Ok(())
2310}
2311
2312#[derive(Debug)]
2321struct RewriteBuffer<'a> {
2322 input: &'a str,
2323 input_offset: usize,
2324 output: String,
2325}
2326
2327impl<'a> RewriteBuffer<'a> {
2328 fn new(input: &'a str) -> RewriteBuffer<'a> {
2329 RewriteBuffer {
2330 input,
2331 input_offset: 0,
2332 output: String::new(),
2333 }
2334 }
2335
2336 fn flush_to(&mut self, offset: usize) {
2337 assert!(offset >= self.input_offset);
2338 let chunk = &self.input[self.input_offset..offset];
2339 self.output.push_str(chunk);
2340 self.input_offset = offset;
2341 }
2342
2343 fn skip_to(&mut self, offset: usize) {
2344 assert!(offset >= self.input_offset);
2345 self.input_offset = offset;
2346 }
2347
2348 fn append(&mut self, s: &str) {
2349 self.output.push_str(s);
2350 }
2351
2352 fn append_header(
2353 &mut self,
2354 input: &String,
2355 expected_output: &str,
2356 column_names: Option<&Vec<ColumnName>>,
2357 ) {
2358 #[allow(clippy::as_conversions)]
2361 let offset = expected_output.as_ptr() as usize - input.as_ptr() as usize;
2362 self.flush_to(offset);
2363 self.skip_to(offset + expected_output.len());
2364
2365 if self.peek_last(5) == "\n----" {
2368 self.append("\n");
2369 } else if self.peek_last(6) != "\n----\n" {
2370 self.append("\n----\n");
2371 }
2372
2373 let Some(names) = column_names else {
2374 return;
2375 };
2376 self.append(
2377 &names
2378 .iter()
2379 .map(|name| name.replace(' ', "␠"))
2380 .collect::<Vec<_>>()
2381 .join(" "),
2382 );
2383 self.append("\n");
2384 }
2385
2386 fn rewrite_expected_error(
2387 &mut self,
2388 input: &String,
2389 old_err: &str,
2390 new_err: &str,
2391 query: &str,
2392 ) {
2393 #[allow(clippy::as_conversions)]
2396 let err_offset = old_err.as_ptr() as usize - input.as_ptr() as usize;
2397 self.flush_to(err_offset);
2398 self.append(new_err);
2399 self.append("\n");
2400 self.append(query);
2401 #[allow(clippy::as_conversions)]
2403 self.skip_to(query.as_ptr() as usize - input.as_ptr() as usize + query.len())
2404 }
2405
2406 fn peek_last(&self, n: usize) -> &str {
2407 &self.output[self.output.len() - n..]
2408 }
2409
2410 fn finish(mut self) -> String {
2411 self.flush_to(self.input.len());
2412 self.output
2413 }
2414}
2415
2416fn generate_view_sql(
2424 sql: &str,
2425 view_uuid: &Simple,
2426 num_attributes: Option<usize>,
2427 expected_column_names: Option<Vec<ColumnName>>,
2428) -> (String, String, String, String) {
2429 let stmts = parser::parse_statements(sql).unwrap_or_default();
2437 assert!(stmts.len() == 1);
2438 let (query, query_as_of) = match &stmts[0].ast {
2439 Statement::Select(stmt) => (&stmt.query, &stmt.as_of),
2440 _ => unreachable!("This function should only be called for SELECTs"),
2441 };
2442
2443 let (view_order_by, extra_columns, distinct) = if num_attributes.is_none() {
2448 (query.order_by.clone(), vec![], None)
2449 } else {
2450 derive_order_by(&query.body, &query.order_by)
2451 };
2452
2453 let name = UnresolvedItemName(vec![Ident::new_unchecked(format!("v{}", view_uuid))]);
2469 let projection = expected_column_names.map_or_else(
2470 || {
2471 num_attributes.map_or(vec![], |n| {
2472 (1..=n)
2473 .map(|i| Ident::new_unchecked(format!("a{i}")))
2474 .collect()
2475 })
2476 },
2477 |cols| {
2478 cols.iter()
2479 .map(|c| Ident::new_unchecked(c.as_str()))
2480 .collect()
2481 },
2482 );
2483 let columns: Vec<Ident> = projection
2484 .iter()
2485 .cloned()
2486 .chain(extra_columns.iter().map(|item| {
2487 if let SelectItem::Expr {
2488 expr: _,
2489 alias: Some(ident),
2490 } = item
2491 {
2492 ident.clone()
2493 } else {
2494 unreachable!("alias must be given for extra column")
2495 }
2496 }))
2497 .collect();
2498
2499 let mut query = query.clone();
2501 if extra_columns.len() > 0 {
2502 match &mut query.body {
2503 SetExpr::Select(stmt) => stmt.projection.extend(extra_columns.iter().cloned()),
2504 _ => unimplemented!("cannot yet rewrite projections of nested queries"),
2505 }
2506 }
2507 let create_view = AstStatement::<Raw>::CreateView(CreateViewStatement {
2508 if_exists: IfExistsBehavior::Error,
2509 temporary: false,
2510 definition: ViewDefinition {
2511 name: name.clone(),
2512 columns: columns.clone(),
2513 query,
2514 },
2515 })
2516 .to_ast_string_stable();
2517
2518 let create_index = AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2522 name: None,
2523 in_cluster: None,
2524 on_name: RawItemName::Name(name.clone()),
2525 key_parts: if columns.len() == 0 {
2526 None
2527 } else {
2528 Some(
2529 columns
2530 .iter()
2531 .map(|ident| Expr::Identifier(vec![ident.clone()]))
2532 .collect(),
2533 )
2534 },
2535 with_options: Vec::new(),
2536 if_not_exists: false,
2537 })
2538 .to_ast_string_stable();
2539
2540 let distinct_unneeded = extra_columns.len() == 0
2542 || match distinct {
2543 None | Some(Distinct::On(_)) => true,
2544 Some(Distinct::EntireRow) => false,
2545 };
2546 let distinct = if distinct_unneeded { None } else { distinct };
2547
2548 let view_sql = AstStatement::<Raw>::Select(SelectStatement {
2550 query: Query {
2551 ctes: CteBlock::Simple(vec![]),
2552 body: SetExpr::Select(Box::new(Select {
2553 distinct,
2554 projection: if projection.len() == 0 {
2555 vec![SelectItem::Wildcard]
2556 } else {
2557 projection
2558 .iter()
2559 .map(|ident| SelectItem::Expr {
2560 expr: Expr::Identifier(vec![ident.clone()]),
2561 alias: None,
2562 })
2563 .collect()
2564 },
2565 from: vec![TableWithJoins {
2566 relation: TableFactor::Table {
2567 name: RawItemName::Name(name.clone()),
2568 alias: None,
2569 },
2570 joins: vec![],
2571 }],
2572 selection: None,
2573 group_by: vec![],
2574 having: None,
2575 qualify: None,
2576 options: vec![],
2577 })),
2578 order_by: view_order_by,
2579 limit: None,
2580 offset: None,
2581 },
2582 as_of: query_as_of.clone(),
2583 })
2584 .to_ast_string_stable();
2585
2586 let drop_view = AstStatement::<Raw>::DropObjects(DropObjectsStatement {
2588 object_type: ObjectType::View,
2589 if_exists: false,
2590 names: vec![UnresolvedObjectName::Item(name)],
2591 cascade: false,
2592 })
2593 .to_ast_string_stable();
2594
2595 (create_view, create_index, view_sql, drop_view)
2596}
2597
2598fn derive_num_attributes(body: &SetExpr<Raw>) -> Option<usize> {
2603 let Some((projection, _)) = find_projection(body) else {
2604 return None;
2605 };
2606 derive_num_attributes_from_projection(projection)
2607}
2608
2609fn derive_order_by(
2620 body: &SetExpr<Raw>,
2621 order_by: &Vec<OrderByExpr<Raw>>,
2622) -> (
2623 Vec<OrderByExpr<Raw>>,
2624 Vec<SelectItem<Raw>>,
2625 Option<Distinct<Raw>>,
2626) {
2627 let Some((projection, distinct)) = find_projection(body) else {
2628 return (vec![], vec![], None);
2629 };
2630 let (view_order_by, extra_columns) = derive_order_by_from_projection(projection, order_by);
2631 (view_order_by, extra_columns, distinct.clone())
2632}
2633
2634fn find_projection(body: &SetExpr<Raw>) -> Option<(&Vec<SelectItem<Raw>>, &Option<Distinct<Raw>>)> {
2636 let mut set_expr = body;
2639 loop {
2640 match set_expr {
2641 SetExpr::Select(select) => {
2642 return Some((&select.projection, &select.distinct));
2643 }
2644 SetExpr::SetOperation { left, .. } => set_expr = left.as_ref(),
2645 SetExpr::Query(query) => set_expr = &query.body,
2646 _ => return None,
2647 }
2648 }
2649}
2650
2651fn derive_num_attributes_from_projection(projection: &Vec<SelectItem<Raw>>) -> Option<usize> {
2655 let mut num_attributes = 0usize;
2656 for item in projection.iter() {
2657 let SelectItem::Expr { expr, .. } = item else {
2658 return None;
2659 };
2660 match expr {
2661 Expr::QualifiedWildcard(..) | Expr::WildcardAccess(..) => {
2662 return None;
2663 }
2664 _ => {
2665 num_attributes += 1;
2666 }
2667 }
2668 }
2669 Some(num_attributes)
2670}
2671
2672fn derive_order_by_from_projection(
2677 projection: &Vec<SelectItem<Raw>>,
2678 order_by: &Vec<OrderByExpr<Raw>>,
2679) -> (Vec<OrderByExpr<Raw>>, Vec<SelectItem<Raw>>) {
2680 let mut view_order_by: Vec<OrderByExpr<Raw>> = vec![];
2681 let mut extra_columns: Vec<SelectItem<Raw>> = vec![];
2682 for order_by_expr in order_by.iter() {
2683 let query_expr = &order_by_expr.expr;
2684 let view_expr = match query_expr {
2685 Expr::Value(mz_sql_parser::ast::Value::Number(_)) => query_expr.clone(),
2686 _ => {
2687 if let Some(i) = projection.iter().position(|item| match item {
2689 SelectItem::Expr { expr, alias } => {
2690 expr == query_expr
2691 || match query_expr {
2692 Expr::Identifier(ident) => {
2693 ident.len() == 1 && Some(&ident[0]) == alias.as_ref()
2694 }
2695 _ => false,
2696 }
2697 }
2698 SelectItem::Wildcard => false,
2699 }) {
2700 Expr::Value(mz_sql_parser::ast::Value::Number((i + 1).to_string()))
2701 } else {
2702 let ident = Ident::new_unchecked(format!(
2705 "a{}",
2706 (projection.len() + extra_columns.len() + 1)
2707 ));
2708 extra_columns.push(SelectItem::Expr {
2709 expr: query_expr.clone(),
2710 alias: Some(ident.clone()),
2711 });
2712 Expr::Identifier(vec![ident])
2713 }
2714 }
2715 };
2716 view_order_by.push(OrderByExpr {
2717 expr: view_expr,
2718 asc: order_by_expr.asc,
2719 nulls_last: order_by_expr.nulls_last,
2720 });
2721 }
2722 (view_order_by, extra_columns)
2723}
2724
2725fn mutate(sql: &str) -> Vec<String> {
2727 let stmts = parser::parse_statements(sql).unwrap_or_default();
2728 let mut additional = Vec::new();
2729 for stmt in stmts {
2730 match stmt.ast {
2731 AstStatement::CreateTable(stmt) => additional.push(
2732 AstStatement::<Raw>::CreateIndex(CreateIndexStatement {
2735 name: None,
2736 in_cluster: None,
2737 on_name: RawItemName::Name(stmt.name.clone()),
2738 key_parts: Some(
2739 stmt.columns
2740 .iter()
2741 .map(|def| Expr::Identifier(vec![def.name.clone()]))
2742 .collect(),
2743 ),
2744 with_options: Vec::new(),
2745 if_not_exists: false,
2746 })
2747 .to_ast_string_stable(),
2748 ),
2749 _ => {}
2750 }
2751 }
2752 additional
2753}
2754
2755#[mz_ore::test]
2756#[cfg_attr(miri, ignore)] fn test_generate_view_sql() {
2758 let uuid = Uuid::parse_str("67e5504410b1426f9247bb680e5fe0c8").unwrap();
2759 let cases = vec![
2760 (("SELECT * FROM t", None, None),
2761 (
2762 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM "t""#.to_string(),
2763 r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2764 r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2765 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2766 )),
2767 (("SELECT a, b, c FROM t1, t2", Some(3), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c")])),
2768 (
2769 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2770 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c")"#.to_string(),
2771 r#"SELECT "a", "b", "c" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2772 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2773 )),
2774 (("SELECT a, b, c FROM t1, t2", Some(3), None),
2775 (
2776 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "b", "c" FROM "t1", "t2""#.to_string(),
2777 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2778 r#"SELECT "a1", "a2", "a3" FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2779 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2780 )),
2781 (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a)", None, None),
2784 (
2785 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a")"#.to_string(),
2786 r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2787 r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2788 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2789 )),
2790 (("SELECT a, b, b + d AS c, a + b AS d FROM t1, t2 ORDER BY a, c, a + b", Some(4), Some(vec![ColumnName::from("a"), ColumnName::from("b"), ColumnName::from("c"), ColumnName::from("d")])),
2791 (
2792 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d") AS SELECT "a", "b", "b" + "d" AS "c", "a" + "b" AS "d" FROM "t1", "t2" ORDER BY "a", "c", "a" + "b""#.to_string(),
2793 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a", "b", "c", "d")"#.to_string(),
2794 r#"SELECT "a", "b", "c", "d" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, 3, 4"#.to_string(),
2795 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2796 )),
2797 (("((SELECT 1 AS a UNION SELECT 2 AS b) UNION SELECT 3 AS c) ORDER BY a", Some(1), None),
2798 (
2799 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1") AS (SELECT 1 AS "a" UNION SELECT 2 AS "b") UNION SELECT 3 AS "c" ORDER BY "a""#.to_string(),
2800 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1")"#.to_string(),
2801 r#"SELECT "a1" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2802 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2803 )),
2804 (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY 1", None, None),
2805 (
2806 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY 1"#.to_string(),
2807 r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2808 r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1"#.to_string(),
2809 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2810 )),
2811 (("SELECT * FROM (SELECT a, sum(b) AS a FROM t GROUP BY a) ORDER BY a", None, None),
2812 (
2813 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" AS SELECT * FROM (SELECT "a", "sum"("b") AS "a" FROM "t" GROUP BY "a") ORDER BY "a""#.to_string(),
2814 r#"CREATE DEFAULT INDEX ON "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2815 r#"SELECT * FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a""#.to_string(),
2816 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2817 )),
2818 (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY a, c", Some(2), None),
2819 (
2820 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "a", "c""#.to_string(),
2821 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2822 r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY 1, "a3""#.to_string(),
2823 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2824 )),
2825 (("SELECT a, sum(b) AS a FROM t GROUP BY a, c ORDER BY c, a", Some(2), None),
2826 (
2827 r#"CREATE VIEW "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3") AS SELECT "a", "sum"("b") AS "a", "c" AS "a3" FROM "t" GROUP BY "a", "c" ORDER BY "c", "a""#.to_string(),
2828 r#"CREATE INDEX ON "v67e5504410b1426f9247bb680e5fe0c8" ("a1", "a2", "a3")"#.to_string(),
2829 r#"SELECT "a1", "a2" FROM "v67e5504410b1426f9247bb680e5fe0c8" ORDER BY "a3", 1"#.to_string(),
2830 r#"DROP VIEW "v67e5504410b1426f9247bb680e5fe0c8""#.to_string(),
2831 )),
2832 ];
2833 for ((sql, num_attributes, expected_column_names), expected) in cases {
2834 let view_sql =
2835 generate_view_sql(sql, uuid.as_simple(), num_attributes, expected_column_names);
2836 assert_eq!(expected, view_sql);
2837 }
2838}
2839
2840#[mz_ore::test]
2841fn test_mutate() {
2842 let cases = vec![
2843 ("CREATE TABLE t ()", vec![r#"CREATE INDEX ON "t" ()"#]),
2844 (
2845 "CREATE TABLE t (a INT)",
2846 vec![r#"CREATE INDEX ON "t" ("a")"#],
2847 ),
2848 (
2849 "CREATE TABLE t (a INT, b TEXT)",
2850 vec![r#"CREATE INDEX ON "t" ("a", "b")"#],
2851 ),
2852 ("BAD SYNTAX", Vec::new()),
2854 ];
2855 for (sql, expected) in cases {
2856 let stmts = mutate(sql);
2857 assert_eq!(expected, stmts, "sql: {sql}");
2858 }
2859}