1use std::collections::BTreeMap;
11use std::future::Future;
12
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_ccsr::SubjectVersion;
30use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
31use mz_ore::error::ErrorExt;
32use mz_ore::metrics::MetricsRegistry;
33use mz_ore::now::SYSTEM_TIME;
34use mz_ore::retry::Retry;
35use mz_ore::task;
36use mz_ore::url::SensitiveUrl;
37use mz_persist_client::cache::PersistClientCache;
38use mz_persist_client::cfg::PersistConfig;
39use mz_persist_client::rpc::PubSubClientConnection;
40use mz_persist_client::{PersistClient, PersistLocation};
41use mz_postgres_util::{
42 Sql, batch_execute as pg_batch_execute, query as pg_query, query_one as pg_query_one,
43 sql as pg_sql,
44};
45use mz_sql::catalog::EnvironmentId;
46use mz_tls_util::make_tls;
47use rdkafka::ClientConfig;
48use rdkafka::producer::Producer;
49use regex::{Captures, Regex};
50use semver::Version;
51use tokio_postgres::error::{DbError, SqlState};
52use tracing::info;
53use url::Url;
54
55use crate::error::PosError;
56use crate::parser::{
57 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
58};
59use crate::util;
60use crate::util::postgres::postgres_client;
61
62pub mod consistency;
63
64mod duckdb;
65mod file;
66mod fivetran;
67mod glue;
68mod http;
69mod kafka;
70mod mysql;
71mod nop;
72mod persist;
73mod postgres;
74mod protobuf;
75mod psql;
76mod s3;
77mod schema_registry;
78mod set;
79mod skip_end;
80mod skip_if;
81mod sleep;
82mod sql;
83mod sql_server;
84mod version_check;
85mod webhook;
86
87#[derive(Debug, Clone)]
89pub struct Config {
90 pub arg_vars: BTreeMap<String, String>,
96 pub seed: Option<String>,
98 pub reset: bool,
101 pub temp_dir: Option<String>,
106 pub source: Option<String>,
108 pub default_timeout: Duration,
110 pub default_max_tries: usize,
112 pub initial_backoff: Duration,
116 pub backoff_factor: f64,
120 pub consistency_checks: consistency::Level,
122 pub check_statement_logging: bool,
125 pub rewrite_results: bool,
127
128 pub materialize_pgconfig: tokio_postgres::Config,
132 pub materialize_internal_pgconfig: tokio_postgres::Config,
135 pub materialize_use_https: bool,
137 pub materialize_http_port: u16,
140 pub materialize_internal_http_port: u16,
143 pub materialize_password_sql_port: u16,
146 pub materialize_sasl_sql_port: u16,
149 pub materialize_params: Vec<(String, String)>,
151 pub materialize_catalog_config: Option<CatalogConfig>,
153 pub build_info: &'static BuildInfo,
155 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
157
158 pub persist_consensus_url: Option<SensitiveUrl>,
161 pub persist_blob_url: Option<SensitiveUrl>,
163
164 pub kafka_addr: String,
167 pub kafka_default_partitions: usize,
169 pub kafka_opts: Vec<(String, String)>,
172 pub schema_registry_url: Url,
174 pub cert_path: Option<String>,
179 pub cert_password: Option<String>,
181 pub ccsr_username: Option<String>,
184 pub ccsr_password: Option<String>,
187
188 pub aws_config: SdkConfig,
191 pub aws_account: String,
193
194 pub fivetran_destination_url: String,
197 pub fivetran_destination_files_path: String,
199}
200
201pub struct MaterializeState {
202 catalog_config: Option<CatalogConfig>,
203
204 sql_addr: String,
205 use_https: bool,
206 http_addr: String,
207 internal_sql_addr: String,
208 internal_http_addr: String,
209 password_sql_addr: String,
210 sasl_sql_addr: String,
211 user: String,
212 pgclient: tokio_postgres::Client,
213 environment_id: EnvironmentId,
214 bootstrap_args: BootstrapArgs,
215}
216
217pub struct State {
218 pub config: Config,
220
221 arg_vars: BTreeMap<String, String>,
223 cmd_vars: BTreeMap<String, String>,
224 seed: String,
225 temp_path: PathBuf,
226 _tempfile: Option<tempfile::TempDir>,
227 default_timeout: Duration,
228 timeout: Duration,
229 max_tries: usize,
230 initial_backoff: Duration,
231 backoff_factor: f64,
232 consistency_checks: consistency::Level,
233 check_statement_logging: bool,
234 consistency_checks_adhoc_skip: bool,
235 regex: Option<Regex>,
236 regex_replacement: String,
237 error_line_count: usize,
238 error_string: String,
239
240 materialize: MaterializeState,
242
243 persist_consensus_url: Option<SensitiveUrl>,
245 persist_blob_url: Option<SensitiveUrl>,
246 build_info: &'static BuildInfo,
247 persist_clients: PersistClientCache,
248
249 schema_registry_url: Url,
251 ccsr_client: mz_ccsr::Client,
252 kafka_addr: String,
253 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
254 kafka_admin_opts: rdkafka::admin::AdminOptions,
255 kafka_config: ClientConfig,
256 kafka_default_partitions: usize,
257 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
258 kafka_topics: BTreeMap<String, usize>,
259
260 aws_account: String,
262 aws_config: SdkConfig,
263
264 pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
266 mysql_clients: BTreeMap<String, mysql_async::Conn>,
267 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
268 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
269
270 fivetran_destination_url: String,
272 fivetran_destination_files_path: String,
273
274 rewrite_results: bool,
276 pub rewrites: Vec<Rewrite>,
278 pub rewrite_pos_start: usize,
280 pub rewrite_pos_end: usize,
282}
283
284pub struct Rewrite {
285 pub content: String,
286 pub start: usize,
287 pub end: usize,
288}
289
290impl State {
291 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
292 self.cmd_vars
293 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
294 self.cmd_vars.insert(
295 "testdrive.schema-registry-url".into(),
296 self.schema_registry_url.to_string(),
297 );
298 self.cmd_vars
299 .insert("testdrive.seed".into(), self.seed.clone());
300 self.cmd_vars.insert(
301 "testdrive.temp-dir".into(),
302 self.temp_path.display().to_string(),
303 );
304 self.cmd_vars
305 .insert("testdrive.aws-region".into(), self.aws_region().into());
306 self.cmd_vars
307 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
308 self.cmd_vars
309 .insert("testdrive.aws-account".into(), self.aws_account.clone());
310 {
311 let aws_credentials = self
312 .aws_config
313 .credentials_provider()
314 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
315 .provide_credentials()
316 .await
317 .context("fetching AWS credentials")?;
318 self.cmd_vars.insert(
319 "testdrive.aws-access-key-id".into(),
320 aws_credentials.access_key_id().to_owned(),
321 );
322 self.cmd_vars.insert(
323 "testdrive.aws-secret-access-key".into(),
324 aws_credentials.secret_access_key().to_owned(),
325 );
326 self.cmd_vars.insert(
327 "testdrive.aws-token".into(),
328 aws_credentials
329 .session_token()
330 .map(|token| token.to_owned())
331 .unwrap_or_else(String::new),
332 );
333 }
334 self.cmd_vars.insert(
335 "testdrive.materialize-environment-id".into(),
336 self.materialize.environment_id.to_string(),
337 );
338 self.cmd_vars.insert(
339 "testdrive.materialize-sql-addr".into(),
340 self.materialize.sql_addr.clone(),
341 );
342 self.cmd_vars.insert(
343 "testdrive.materialize-internal-sql-addr".into(),
344 self.materialize.internal_sql_addr.clone(),
345 );
346 self.cmd_vars.insert(
347 "testdrive.materialize-password-sql-addr".into(),
348 self.materialize.password_sql_addr.clone(),
349 );
350 self.cmd_vars.insert(
351 "testdrive.materialize-sasl-sql-addr".into(),
352 self.materialize.sasl_sql_addr.clone(),
353 );
354 self.cmd_vars.insert(
355 "testdrive.materialize-user".into(),
356 self.materialize.user.clone(),
357 );
358 self.cmd_vars.insert(
359 "testdrive.fivetran-destination-url".into(),
360 self.fivetran_destination_url.clone(),
361 );
362 self.cmd_vars.insert(
363 "testdrive.fivetran-destination-files-path".into(),
364 self.fivetran_destination_files_path.clone(),
365 );
366
367 for (key, value) in env::vars() {
368 self.cmd_vars.insert(format!("env.{}", key), value);
369 }
370
371 for (key, value) in &self.arg_vars {
372 validate_ident(key)?;
373 self.cmd_vars
374 .insert(format!("arg.{}", key), value.to_string());
375 }
376
377 Ok(())
378 }
379 pub async fn with_catalog_copy<F, T>(
382 &self,
383 system_parameter_defaults: BTreeMap<String, String>,
384 build_info: &'static BuildInfo,
385 bootstrap_args: &BootstrapArgs,
386 enable_expression_cache_override: Option<bool>,
387 f: F,
388 ) -> Result<Option<T>, anyhow::Error>
389 where
390 F: FnOnce(ConnCatalog) -> T,
391 {
392 async fn persist_client(
393 persist_consensus_url: SensitiveUrl,
394 persist_blob_url: SensitiveUrl,
395 persist_clients: &PersistClientCache,
396 ) -> Result<PersistClient, anyhow::Error> {
397 let persist_location = PersistLocation {
398 blob_uri: persist_blob_url,
399 consensus_uri: persist_consensus_url,
400 };
401 Ok(persist_clients.open(persist_location).await?)
402 }
403
404 if let Some(CatalogConfig {
405 persist_consensus_url,
406 persist_blob_url,
407 }) = &self.materialize.catalog_config
408 {
409 let persist_client = persist_client(
410 persist_consensus_url.clone(),
411 persist_blob_url.clone(),
412 &self.persist_clients,
413 )
414 .await?;
415 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
416 persist_client,
417 SYSTEM_TIME.clone(),
418 self.materialize.environment_id.clone(),
419 system_parameter_defaults,
420 build_info,
421 bootstrap_args,
422 enable_expression_cache_override,
423 )
424 .await?;
425 let res = f(catalog.for_session(&Session::dummy()));
426 catalog.expire().await;
427 Ok(Some(res))
428 } else {
429 Ok(None)
430 }
431 }
432
433 pub fn aws_endpoint(&self) -> &str {
434 self.aws_config.endpoint_url().unwrap_or("")
435 }
436
437 pub fn aws_region(&self) -> &str {
438 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
439 }
440
441 pub fn clear_skip_consistency_checks(&mut self) -> bool {
444 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
445 }
446
447 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
448 let (inner_client, _) = postgres_client(
449 &format!(
450 "postgres://mz_system:materialize@{}",
451 self.materialize.internal_sql_addr
452 ),
453 self.default_timeout,
454 )
455 .await?;
456
457 let version = pg_query_one(&inner_client, pg_sql!("SELECT mz_version_num()"), &[])
458 .await
459 .context("getting version of materialize")
460 .map(|row| row.get::<_, i32>(0))?;
461
462 let semver = pg_query_one(
463 &inner_client,
464 pg_sql!("SELECT right(split_part(mz_version(), ' ', 1), -1)"),
465 &[],
466 )
467 .await
468 .context("getting semver of materialize")
469 .map(|row| row.get::<_, String>(0))?
470 .parse::<semver::Version>()
471 .context("parsing semver of materialize")?;
472
473 pg_batch_execute(&inner_client, pg_sql!("ALTER SYSTEM RESET ALL"))
474 .await
475 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
476
477 {
479 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
480 let enable_unsafe_functions = if semver >= rename_version {
481 "unsafe_enable_unsafe_functions"
482 } else {
483 "enable_unsafe_functions"
484 };
485 let res = pg_batch_execute(
486 &inner_client,
487 pg_sql!(
488 "ALTER SYSTEM SET {} = on",
489 Sql::ident(enable_unsafe_functions)
490 ),
491 )
492 .await
493 .context("enabling dangerous functions");
494 if let Err(e) = res {
495 match e.root_cause().downcast_ref::<DbError>() {
496 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
497 info!(
498 "can't enable unsafe functions because the server is safe mode; \
499 testdrive scripts will fail if they use unsafe functions",
500 );
501 }
502 _ => return Err(e),
503 }
504 }
505 }
506
507 for row in pg_query(&inner_client, pg_sql!("SHOW DATABASES"), &[])
508 .await
509 .context("resetting materialize state: SHOW DATABASES")?
510 {
511 let db_name: String = row.get(0);
512 if db_name.starts_with("testdrive_no_reset_") {
513 continue;
514 }
515 let drop_database = pg_sql!("DROP DATABASE {}", Sql::ident(&db_name));
516 sql::print_query(drop_database.as_str(), None);
517 pg_batch_execute(&inner_client, drop_database)
518 .await
519 .context(format!(
520 "resetting materialize state: DROP DATABASE {}",
521 db_name,
522 ))?;
523 }
524
525 let inactive_user_clusters = "
527 WITH
528 active_user_clusters AS
529 (
530 SELECT DISTINCT cluster_id, object_id
531 FROM
532 (
533 SELECT cluster_id, id FROM mz_catalog.mz_sources
534 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
535 UNION ALL
536 SELECT cluster_id, id
537 FROM mz_catalog.mz_materialized_views
538 UNION ALL
539 SELECT cluster_id, id FROM mz_catalog.mz_indexes
540 UNION ALL
541 SELECT cluster_id, id
542 FROM mz_internal.mz_subscriptions
543 )
544 AS t (cluster_id, object_id)
545 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
546 )
547 SELECT name
548 FROM mz_catalog.mz_clusters
549 WHERE
550 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
551 AND
552 owner_id LIKE 'u%';";
553
554 let inactive_clusters = pg_query(&inner_client, Sql::new(inactive_user_clusters), &[])
555 .await
556 .context("resetting materialize state: inactive_user_clusters")?;
557
558 if !inactive_clusters.is_empty() {
559 println!("cleaning up user clusters from previous tests...")
560 }
561
562 for cluster_name in inactive_clusters {
563 let cluster_name: String = cluster_name.get(0);
564 if cluster_name.starts_with("testdrive_no_reset_") {
565 continue;
566 }
567 let drop_cluster = pg_sql!("DROP CLUSTER {}", Sql::ident(&cluster_name));
568 sql::print_query(drop_cluster.as_str(), None);
569 pg_batch_execute(&inner_client, drop_cluster)
570 .await
571 .context(format!(
572 "resetting materialize state: DROP CLUSTER {}",
573 cluster_name,
574 ))?;
575 }
576
577 pg_batch_execute(&inner_client, pg_sql!("CREATE DATABASE materialize"))
578 .await
579 .context("resetting materialize state: CREATE DATABASE materialize")?;
580
581 if let Ok(rows) = pg_query(&inner_client, pg_sql!("SELECT name FROM mz_roles"), &[]).await {
585 for row in rows {
586 let role_name: String = row.get(0);
587 if role_name == self.materialize.user || role_name.starts_with("mz_") {
588 continue;
589 }
590 let drop_role = pg_sql!("DROP ROLE {}", Sql::ident(&role_name));
591 sql::print_query(drop_role.as_str(), None);
592 pg_batch_execute(&inner_client, drop_role)
593 .await
594 .context(format!(
595 "resetting materialize state: DROP ROLE {}",
596 role_name,
597 ))?;
598 }
599 }
600
601 pg_batch_execute(
603 &inner_client,
604 pg_sql!(
605 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
606 Sql::ident(&self.materialize.user)
607 ),
608 )
609 .await?;
610
611 pg_batch_execute(
613 &inner_client,
614 pg_sql!("GRANT USAGE ON DATABASE materialize TO PUBLIC"),
615 )
616 .await?;
617 pg_batch_execute(
618 &inner_client,
619 pg_sql!(
620 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
621 Sql::ident(&self.materialize.user)
622 ),
623 )
624 .await?;
625 pg_batch_execute(
626 &inner_client,
627 pg_sql!(
628 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
629 Sql::ident(&self.materialize.user)
630 ),
631 )
632 .await?;
633
634 let cluster = match version {
635 ..=8199 => "default",
636 8200.. => "quickstart",
637 };
638 pg_batch_execute(
639 &inner_client,
640 pg_sql!("GRANT USAGE ON CLUSTER {} TO PUBLIC", Sql::ident(cluster)),
641 )
642 .await?;
643 pg_batch_execute(
644 &inner_client,
645 pg_sql!(
646 "GRANT ALL PRIVILEGES ON CLUSTER {} TO {}",
647 Sql::ident(cluster),
648 Sql::ident(&self.materialize.user)
649 ),
650 )
651 .await?;
652
653 Ok(())
654 }
655
656 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
658 use rdkafka::types::RDKafkaErrorCode;
659 let mut errors: Vec<anyhow::Error> = Vec::new();
660
661 let metadata = self.kafka_producer.client().fetch_metadata(
662 None,
663 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
664 )?;
665
666 let testdrive_topics: Vec<_> = metadata
667 .topics()
668 .iter()
669 .filter_map(|t| {
670 if t.name().starts_with("testdrive-") {
671 Some(t.name())
672 } else {
673 None
674 }
675 })
676 .collect();
677
678 if !testdrive_topics.is_empty() {
679 match self
680 .kafka_admin
681 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
682 .await
683 {
684 Ok(res) => {
685 if res.len() != testdrive_topics.len() {
686 errors.push(anyhow!(
687 "kafka topic deletion returned {} results, but exactly {} expected",
688 res.len(),
689 testdrive_topics.len()
690 ));
691 }
692 for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
693 match res {
694 Ok(_) | Err((_, RDKafkaErrorCode::UnknownTopicOrPartition)) => (),
695 Err((_, err)) => {
696 errors.push(anyhow!("unable to delete {}: {}", topic, err));
697 }
698 }
699 }
700 }
701 Err(e) => {
702 errors.push(e.into());
703 }
704 };
705 }
706
707 let schema_registry_errors = self.reset_schema_registry().await;
708
709 errors.extend(schema_registry_errors);
710 if errors.is_empty() {
711 Ok(())
712 } else {
713 bail!(
714 "deleting Kafka topics: {} errors: {}",
715 errors.len(),
716 errors
717 .into_iter()
718 .map(|e| e.to_string_with_causes())
719 .join("\n")
720 );
721 }
722 }
723
724 #[allow(clippy::disallowed_types)]
725 async fn reset_schema_registry(&self) -> Vec<anyhow::Error> {
726 use std::collections::HashMap;
727
728 let mut errors = Vec::new();
729 match self
730 .ccsr_client
731 .list_subjects()
732 .await
733 .context("listing schema registry subjects")
734 {
735 Ok(subjects) => {
736 let testdrive_subjects: Vec<_> = subjects
737 .into_iter()
738 .filter(|s| s.starts_with("testdrive-"))
739 .collect();
740
741 let mut graphs: HashMap<SubjectVersion, Vec<SubjectVersion>> = HashMap::new();
743
744 for subject in &testdrive_subjects {
745 match self.ccsr_client.get_subject_with_references(subject).await {
746 Ok((subj, refs)) => {
747 let refs: Vec<_> = refs
749 .into_iter()
750 .filter(|r| r.subject.starts_with("testdrive-"))
751 .collect();
752 graphs.insert(
753 SubjectVersion {
754 subject: subj.name,
755 version: subj.version,
756 },
757 refs,
758 );
759 }
760 Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => {
761 }
763 Err(e) => {
764 errors.push(anyhow::anyhow!(
765 "failed to get references for subject {}: {}",
766 subject,
767 e
768 ));
769 }
770 }
771 }
772
773 let subjects_to_delete: Vec<_> = match mz_ccsr::topological_sort(&graphs) {
776 Ok(ordered) => {
777 let mut subjects: Vec<_> = ordered.into_iter().collect();
778 subjects.sort_by(|a, b| a.1.cmp(&b.1));
779 subjects.into_iter().map(|(s, _)| s.clone()).collect()
780 }
781 Err(_) => {
782 tracing::info!("Cycle detected, attempting to delete anyway");
783 graphs.into_keys().collect()
785 }
786 };
787
788 for subject in subjects_to_delete {
789 match self.ccsr_client.delete_subject(&subject.subject).await {
790 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
791 Err(e) => errors.push(e.into()),
792 }
793 }
794 }
795 Err(e) => {
796 errors.push(e);
797 }
798 }
799 errors
800 }
801}
802
803#[derive(Debug, Clone)]
805pub struct CatalogConfig {
806 pub persist_consensus_url: SensitiveUrl,
808 pub persist_blob_url: SensitiveUrl,
810}
811
812pub enum ControlFlow {
813 Continue,
814 SkipBegin,
815 SkipEnd,
816}
817
818#[async_trait]
819pub(crate) trait Run {
820 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
821}
822
823#[async_trait]
824impl Run for PosCommand {
825 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
826 macro_rules! handle_version {
827 ($version_constraint:expr) => {
828 match $version_constraint {
829 Some(VersionConstraint { min, max }) => {
830 match version_check::run_version_check(min, max, state).await {
831 Ok(true) => return Ok(ControlFlow::Continue),
832 Ok(false) => {}
833 Err(err) => return Err(PosError::new(err, self.pos)),
834 }
835 }
836 None => {}
837 }
838 };
839 }
840
841 let wrap_err = |e| PosError::new(e, self.pos);
842 let ignore_prefix = match &self.command {
845 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
846 _ => None,
847 };
848 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
849 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
850 };
851 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
852 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
853 };
854
855 let r = match self.command {
856 Command::Builtin(mut builtin, version_constraint) => {
857 handle_version!(version_constraint);
858 for val in builtin.args.values_mut() {
859 *val = subst(val, &state.cmd_vars)?;
860 }
861 for line in &mut builtin.input {
862 *line = subst(line, &state.cmd_vars)?;
863 }
864 match builtin.name.as_ref() {
865 "check-consistency" => consistency::run_consistency_checks(state).await,
866 "skip-consistency-checks" => {
867 consistency::skip_consistency_checks(builtin, state)
868 }
869 "check-shard-tombstone" => {
870 consistency::run_check_shard_tombstone(builtin, state).await
871 }
872 "duckdb-execute" => duckdb::run_execute(builtin, state).await,
873 "duckdb-query" => duckdb::run_query(builtin, state).await,
874 "fivetran-destination" => {
875 fivetran::run_destination_command(builtin, state).await
876 }
877 "file-append" => file::run_append(builtin, state).await,
878 "file-delete" => file::run_delete(builtin, state).await,
879 "glue-create-schema" => glue::run_create_schema(builtin, state).await,
880 "http-request" => http::run_request(builtin, state).await,
881 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
882 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
883 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
884 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
885 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
886 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
887 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
888 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
889 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
890 "mysql-connect" => mysql::run_connect(builtin, state).await,
891 "mysql-execute" => mysql::run_execute(builtin, state).await,
892 "nop" => nop::run_nop(),
893 "postgres-connect" => postgres::run_connect(builtin, state).await,
894 "postgres-execute" => postgres::run_execute(builtin, state).await,
895 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
896 "protobuf-compile-descriptors" => {
897 protobuf::run_compile_descriptors(builtin, state).await
898 }
899 "psql-execute" => psql::run_execute(builtin, state).await,
900 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
901 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
902 "s3-file-upload" => s3::run_upload(builtin, state).await,
903 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
904 "s3-upload-parquet-types" => s3::run_upload_parquet_types(builtin, state).await,
905 "s3-upload-parquet-unsorted-map" => {
906 s3::run_upload_parquet_unsorted_map(builtin, state).await
907 }
908 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
909 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
910 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
911 "skip-if" => skip_if::run_skip_if(builtin, state).await,
912 "skip-end" => skip_end::run_skip_end(),
913 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
914 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
915 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
916 "persist-force-compaction" => {
917 persist::run_force_compaction(builtin, state).await
918 }
919 "random-sleep" => sleep::run_random_sleep(builtin),
920 "set-regex" => set::run_regex_set(builtin, state),
921 "unset-regex" => set::run_regex_unset(builtin, state),
922 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
923 "set-max-tries" => set::run_max_tries(builtin, state),
924 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
925 sleep::run_sleep(builtin)
926 }
927 "set" => set::set_vars(builtin, state),
928 "set-arg-default" => set::run_set_arg_default(builtin, state),
929 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
930 "set-from-file" => set::run_set_from_file(builtin, state).await,
931 "webhook-append" => webhook::run_append(builtin, state).await,
932 _ => {
933 return Err(PosError::new(
934 anyhow!("unknown built-in command {}", builtin.name),
935 self.pos,
936 ));
937 }
938 }
939 }
940 Command::Sql(mut sql, version_constraint) => {
941 handle_version!(version_constraint);
942 sql.query = subst(&sql.query, &state.cmd_vars)?;
943 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
944 for row in expected_rows {
945 for col in row {
946 *col = subst(col, &state.cmd_vars)?;
947 }
948 }
949 }
950 sql::run_sql(sql, state).await
951 }
952 Command::FailSql(mut sql, version_constraint) => {
953 handle_version!(version_constraint);
954 sql.query = subst(&sql.query, &state.cmd_vars)?;
955 sql.expected_error = match &sql.expected_error {
956 SqlExpectedError::Contains(s) => {
957 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
958 }
959 SqlExpectedError::Exact(s) => {
960 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
961 }
962 SqlExpectedError::Regex(s) => {
963 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
964 }
965 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
966 };
967 sql::run_fail_sql(sql, state).await
968 }
969 };
970
971 r.map_err(wrap_err)
972 }
973}
974
975fn substitute_vars(
977 msg: &str,
978 vars: &BTreeMap<String, String>,
979 ignore_prefix: &Option<String>,
980 regex_escape: bool,
981) -> Result<String, anyhow::Error> {
982 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
983 let mut err = None;
984 let out = RE.replace_all(msg, |caps: &Captures| {
985 let name = &caps[1];
986 if let Some(ignore_prefix) = &ignore_prefix {
987 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
988 return caps.get(0).unwrap().as_str().to_string();
990 }
991 }
992
993 if let Some(val) = vars.get(name) {
994 if regex_escape {
995 regex::escape(val)
996 } else {
997 val.to_string()
998 }
999 } else {
1000 err = Some(anyhow!("unknown variable: {}", name));
1001 "#VAR-MISSING#".to_string()
1002 }
1003 });
1004 match err {
1005 Some(err) => Err(err),
1006 None => Ok(out.into_owned()),
1007 }
1008}
1009
1010pub async fn create_state(
1018 config: &Config,
1019) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
1020 let seed = config
1021 .seed
1022 .clone()
1023 .unwrap_or_else(|| format!("{:010}", rand::random::<u32>()));
1024
1025 let (_tempfile, temp_path) = match &config.temp_dir {
1026 Some(temp_dir) => {
1027 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
1028 (None, PathBuf::from(&temp_dir))
1029 }
1030 _ => {
1031 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
1034 let temp_path = tempfile_handle.path().to_path_buf();
1035 (Some(tempfile_handle), temp_path)
1036 }
1037 };
1038
1039 let materialize_catalog_config = config.materialize_catalog_config.clone();
1040
1041 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1042 info!("Connecting to {}", materialize_url.as_str());
1043 let (pgclient, pgconn) = Retry::default()
1044 .max_duration(config.default_timeout)
1045 .retry_async_canceling(|_| async move {
1046 let mut pgconfig = config.materialize_pgconfig.clone();
1047 pgconfig.connect_timeout(config.default_timeout);
1048 let tls = make_tls(&pgconfig)?;
1049 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
1050 })
1051 .await?;
1052
1053 let pgconn_task =
1054 task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
1055
1056 let materialize_state =
1057 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
1058
1059 let schema_registry_url = config.schema_registry_url.to_owned();
1060
1061 let ccsr_client = {
1062 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
1063
1064 if let Some(cert_path) = &config.cert_path {
1065 let cert = fs::read(cert_path).context("reading cert")?;
1066 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
1067 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
1068 .context("reading keystore file as pkcs12")?;
1069 ccsr_config = ccsr_config.identity(ident);
1070 }
1071
1072 if let Some(ccsr_username) = &config.ccsr_username {
1073 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1074 }
1075
1076 ccsr_config.build().context("Creating CCSR client")?
1077 };
1078
1079 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1080 use rdkafka::admin::{AdminClient, AdminOptions};
1081 use rdkafka::producer::FutureProducer;
1082
1083 let mut kafka_config = create_new_client_config_simple();
1084 kafka_config.set("bootstrap.servers", &config.kafka_addr);
1085 kafka_config.set("group.id", "materialize-testdrive");
1086 kafka_config.set("auto.offset.reset", "earliest");
1087 kafka_config.set("isolation.level", "read_committed");
1088 if let Some(cert_path) = &config.cert_path {
1089 kafka_config.set("security.protocol", "ssl");
1090 kafka_config.set("ssl.keystore.location", cert_path);
1091 if let Some(cert_password) = &config.cert_password {
1092 kafka_config.set("ssl.keystore.password", cert_password);
1093 }
1094 }
1095 kafka_config.set("message.max.bytes", "15728640");
1096
1097 for (key, value) in &config.kafka_opts {
1098 kafka_config.set(key, value);
1099 }
1100
1101 let admin: AdminClient<_> = kafka_config
1102 .create_with_context(MzClientContext::default())
1103 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1104
1105 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1106
1107 let producer: FutureProducer<_> = kafka_config
1108 .create_with_context(MzClientContext::default())
1109 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1110
1111 let topics = BTreeMap::new();
1112
1113 (
1114 config.kafka_addr.to_owned(),
1115 admin,
1116 admin_opts,
1117 producer,
1118 topics,
1119 kafka_config,
1120 )
1121 };
1122
1123 let mut state = State {
1124 config: config.clone(),
1125
1126 arg_vars: config.arg_vars.clone(),
1128 cmd_vars: BTreeMap::new(),
1129 seed,
1130 temp_path,
1131 _tempfile,
1132 default_timeout: config.default_timeout,
1133 timeout: config.default_timeout,
1134 max_tries: config.default_max_tries,
1135 initial_backoff: config.initial_backoff,
1136 backoff_factor: config.backoff_factor,
1137 consistency_checks: config.consistency_checks,
1138 check_statement_logging: config.check_statement_logging,
1139 consistency_checks_adhoc_skip: false,
1140 regex: None,
1141 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1142 rewrite_results: config.rewrite_results,
1143 error_line_count: 0,
1144 error_string: "".to_string(),
1145
1146 materialize: materialize_state,
1148
1149 persist_consensus_url: config.persist_consensus_url.clone(),
1151 persist_blob_url: config.persist_blob_url.clone(),
1152 build_info: config.build_info,
1153 persist_clients: PersistClientCache::new(
1154 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1155 &MetricsRegistry::new(),
1156 |_, _| PubSubClientConnection::noop(),
1157 ),
1158
1159 schema_registry_url,
1161 ccsr_client,
1162 kafka_addr,
1163 kafka_admin,
1164 kafka_admin_opts,
1165 kafka_config,
1166 kafka_default_partitions: config.kafka_default_partitions,
1167 kafka_producer,
1168 kafka_topics,
1169
1170 aws_account: config.aws_account.clone(),
1172 aws_config: config.aws_config.clone(),
1173
1174 duckdb_clients: BTreeMap::new(),
1176 mysql_clients: BTreeMap::new(),
1177 postgres_clients: BTreeMap::new(),
1178 sql_server_clients: BTreeMap::new(),
1179
1180 fivetran_destination_url: config.fivetran_destination_url.clone(),
1182 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1183
1184 rewrites: Vec::new(),
1185 rewrite_pos_start: 0,
1186 rewrite_pos_end: 0,
1187 };
1188 state.initialize_cmd_vars().await?;
1189 Ok((state, pgconn_task))
1190}
1191
1192async fn create_materialize_state(
1193 config: &&Config,
1194 materialize_catalog_config: Option<CatalogConfig>,
1195 pgclient: tokio_postgres::Client,
1196) -> Result<MaterializeState, anyhow::Error> {
1197 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1198 let materialize_internal_url =
1199 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1200
1201 for (key, value) in &config.materialize_params {
1202 #[allow(clippy::disallowed_methods)]
1204 pgclient
1205 .batch_execute(&format!("SET {key} = {value}"))
1206 .await
1207 .context("setting session parameter")?;
1208 }
1209
1210 let materialize_user = config
1211 .materialize_pgconfig
1212 .get_user()
1213 .expect("testdrive URL must contain user")
1214 .to_string();
1215
1216 let materialize_sql_addr = format!(
1217 "{}:{}",
1218 materialize_url.host_str().unwrap(),
1219 materialize_url.port().unwrap()
1220 );
1221 let materialize_http_addr = format!(
1222 "{}:{}",
1223 materialize_url.host_str().unwrap(),
1224 config.materialize_http_port
1225 );
1226 let materialize_internal_sql_addr = format!(
1227 "{}:{}",
1228 materialize_internal_url.host_str().unwrap(),
1229 materialize_internal_url.port().unwrap()
1230 );
1231 let materialize_password_sql_addr = format!(
1232 "{}:{}",
1233 materialize_url.host_str().unwrap(),
1234 config.materialize_password_sql_port
1235 );
1236 let materialize_sasl_sql_addr = format!(
1237 "{}:{}",
1238 materialize_url.host_str().unwrap(),
1239 config.materialize_sasl_sql_port
1240 );
1241 let materialize_internal_http_addr = format!(
1242 "{}:{}",
1243 materialize_internal_url.host_str().unwrap(),
1244 config.materialize_internal_http_port
1245 );
1246 let environment_id = pg_query_one(&pgclient, pg_sql!("SELECT mz_environment_id()"), &[])
1247 .await?
1248 .get::<_, String>(0)
1249 .parse()
1250 .context("parsing environment ID")?;
1251
1252 let bootstrap_args = BootstrapArgs {
1253 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1254 default_cluster_replica_size: "ABC".to_string(),
1255 default_cluster_replication_factor: 1,
1256 bootstrap_role: None,
1257 };
1258
1259 let materialize_state = MaterializeState {
1260 catalog_config: materialize_catalog_config,
1261 sql_addr: materialize_sql_addr,
1262 use_https: config.materialize_use_https,
1263 http_addr: materialize_http_addr,
1264 internal_sql_addr: materialize_internal_sql_addr,
1265 internal_http_addr: materialize_internal_http_addr,
1266 password_sql_addr: materialize_password_sql_addr,
1267 sasl_sql_addr: materialize_sasl_sql_addr,
1268 user: materialize_user,
1269 pgclient,
1270 environment_id,
1271 bootstrap_args,
1272 };
1273
1274 Ok(materialize_state)
1275}