1use std::collections::BTreeMap;
11use std::future::Future;
12
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_ccsr::SubjectVersion;
30use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
31use mz_ore::error::ErrorExt;
32use mz_ore::metrics::MetricsRegistry;
33use mz_ore::now::SYSTEM_TIME;
34use mz_ore::retry::Retry;
35use mz_ore::task;
36use mz_ore::url::SensitiveUrl;
37use mz_persist_client::cache::PersistClientCache;
38use mz_persist_client::cfg::PersistConfig;
39use mz_persist_client::rpc::PubSubClientConnection;
40use mz_persist_client::{PersistClient, PersistLocation};
41use mz_postgres_util::{
42 Sql, batch_execute as pg_batch_execute, query as pg_query, query_one as pg_query_one,
43 sql as pg_sql,
44};
45use mz_sql::catalog::EnvironmentId;
46use mz_tls_util::make_tls;
47use rdkafka::ClientConfig;
48use rdkafka::producer::Producer;
49use regex::{Captures, Regex};
50use semver::Version;
51use tokio_postgres::error::{DbError, SqlState};
52use tracing::info;
53use url::Url;
54
55use crate::error::PosError;
56use crate::parser::{
57 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
58};
59use crate::util;
60use crate::util::postgres::postgres_client;
61
62pub mod consistency;
63
64mod duckdb;
65mod file;
66mod fivetran;
67mod http;
68mod kafka;
69mod mysql;
70mod nop;
71mod persist;
72mod postgres;
73mod protobuf;
74mod psql;
75mod s3;
76mod schema_registry;
77mod set;
78mod skip_end;
79mod skip_if;
80mod sleep;
81mod sql;
82mod sql_server;
83mod version_check;
84mod webhook;
85
86#[derive(Debug, Clone)]
88pub struct Config {
89 pub arg_vars: BTreeMap<String, String>,
95 pub seed: Option<String>,
97 pub reset: bool,
100 pub temp_dir: Option<String>,
105 pub source: Option<String>,
107 pub default_timeout: Duration,
109 pub default_max_tries: usize,
111 pub initial_backoff: Duration,
115 pub backoff_factor: f64,
119 pub consistency_checks: consistency::Level,
121 pub check_statement_logging: bool,
124 pub rewrite_results: bool,
126
127 pub materialize_pgconfig: tokio_postgres::Config,
131 pub materialize_internal_pgconfig: tokio_postgres::Config,
134 pub materialize_use_https: bool,
136 pub materialize_http_port: u16,
139 pub materialize_internal_http_port: u16,
142 pub materialize_password_sql_port: u16,
145 pub materialize_sasl_sql_port: u16,
148 pub materialize_params: Vec<(String, String)>,
150 pub materialize_catalog_config: Option<CatalogConfig>,
152 pub build_info: &'static BuildInfo,
154 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
156
157 pub persist_consensus_url: Option<SensitiveUrl>,
160 pub persist_blob_url: Option<SensitiveUrl>,
162
163 pub kafka_addr: String,
166 pub kafka_default_partitions: usize,
168 pub kafka_opts: Vec<(String, String)>,
171 pub schema_registry_url: Url,
173 pub cert_path: Option<String>,
178 pub cert_password: Option<String>,
180 pub ccsr_username: Option<String>,
183 pub ccsr_password: Option<String>,
186
187 pub aws_config: SdkConfig,
190 pub aws_account: String,
192
193 pub fivetran_destination_url: String,
196 pub fivetran_destination_files_path: String,
198}
199
200pub struct MaterializeState {
201 catalog_config: Option<CatalogConfig>,
202
203 sql_addr: String,
204 use_https: bool,
205 http_addr: String,
206 internal_sql_addr: String,
207 internal_http_addr: String,
208 password_sql_addr: String,
209 sasl_sql_addr: String,
210 user: String,
211 pgclient: tokio_postgres::Client,
212 environment_id: EnvironmentId,
213 bootstrap_args: BootstrapArgs,
214}
215
216pub struct State {
217 pub config: Config,
219
220 arg_vars: BTreeMap<String, String>,
222 cmd_vars: BTreeMap<String, String>,
223 seed: String,
224 temp_path: PathBuf,
225 _tempfile: Option<tempfile::TempDir>,
226 default_timeout: Duration,
227 timeout: Duration,
228 max_tries: usize,
229 initial_backoff: Duration,
230 backoff_factor: f64,
231 consistency_checks: consistency::Level,
232 check_statement_logging: bool,
233 consistency_checks_adhoc_skip: bool,
234 regex: Option<Regex>,
235 regex_replacement: String,
236 error_line_count: usize,
237 error_string: String,
238
239 materialize: MaterializeState,
241
242 persist_consensus_url: Option<SensitiveUrl>,
244 persist_blob_url: Option<SensitiveUrl>,
245 build_info: &'static BuildInfo,
246 persist_clients: PersistClientCache,
247
248 schema_registry_url: Url,
250 ccsr_client: mz_ccsr::Client,
251 kafka_addr: String,
252 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
253 kafka_admin_opts: rdkafka::admin::AdminOptions,
254 kafka_config: ClientConfig,
255 kafka_default_partitions: usize,
256 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
257 kafka_topics: BTreeMap<String, usize>,
258
259 aws_account: String,
261 aws_config: SdkConfig,
262
263 pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
265 mysql_clients: BTreeMap<String, mysql_async::Conn>,
266 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
267 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
268
269 fivetran_destination_url: String,
271 fivetran_destination_files_path: String,
272
273 rewrite_results: bool,
275 pub rewrites: Vec<Rewrite>,
277 pub rewrite_pos_start: usize,
279 pub rewrite_pos_end: usize,
281}
282
283pub struct Rewrite {
284 pub content: String,
285 pub start: usize,
286 pub end: usize,
287}
288
289impl State {
290 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
291 self.cmd_vars
292 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
293 self.cmd_vars.insert(
294 "testdrive.schema-registry-url".into(),
295 self.schema_registry_url.to_string(),
296 );
297 self.cmd_vars
298 .insert("testdrive.seed".into(), self.seed.clone());
299 self.cmd_vars.insert(
300 "testdrive.temp-dir".into(),
301 self.temp_path.display().to_string(),
302 );
303 self.cmd_vars
304 .insert("testdrive.aws-region".into(), self.aws_region().into());
305 self.cmd_vars
306 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
307 self.cmd_vars
308 .insert("testdrive.aws-account".into(), self.aws_account.clone());
309 {
310 let aws_credentials = self
311 .aws_config
312 .credentials_provider()
313 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
314 .provide_credentials()
315 .await
316 .context("fetching AWS credentials")?;
317 self.cmd_vars.insert(
318 "testdrive.aws-access-key-id".into(),
319 aws_credentials.access_key_id().to_owned(),
320 );
321 self.cmd_vars.insert(
322 "testdrive.aws-secret-access-key".into(),
323 aws_credentials.secret_access_key().to_owned(),
324 );
325 self.cmd_vars.insert(
326 "testdrive.aws-token".into(),
327 aws_credentials
328 .session_token()
329 .map(|token| token.to_owned())
330 .unwrap_or_else(String::new),
331 );
332 }
333 self.cmd_vars.insert(
334 "testdrive.materialize-environment-id".into(),
335 self.materialize.environment_id.to_string(),
336 );
337 self.cmd_vars.insert(
338 "testdrive.materialize-sql-addr".into(),
339 self.materialize.sql_addr.clone(),
340 );
341 self.cmd_vars.insert(
342 "testdrive.materialize-internal-sql-addr".into(),
343 self.materialize.internal_sql_addr.clone(),
344 );
345 self.cmd_vars.insert(
346 "testdrive.materialize-password-sql-addr".into(),
347 self.materialize.password_sql_addr.clone(),
348 );
349 self.cmd_vars.insert(
350 "testdrive.materialize-sasl-sql-addr".into(),
351 self.materialize.sasl_sql_addr.clone(),
352 );
353 self.cmd_vars.insert(
354 "testdrive.materialize-user".into(),
355 self.materialize.user.clone(),
356 );
357 self.cmd_vars.insert(
358 "testdrive.fivetran-destination-url".into(),
359 self.fivetran_destination_url.clone(),
360 );
361 self.cmd_vars.insert(
362 "testdrive.fivetran-destination-files-path".into(),
363 self.fivetran_destination_files_path.clone(),
364 );
365
366 for (key, value) in env::vars() {
367 self.cmd_vars.insert(format!("env.{}", key), value);
368 }
369
370 for (key, value) in &self.arg_vars {
371 validate_ident(key)?;
372 self.cmd_vars
373 .insert(format!("arg.{}", key), value.to_string());
374 }
375
376 Ok(())
377 }
378 pub async fn with_catalog_copy<F, T>(
381 &self,
382 system_parameter_defaults: BTreeMap<String, String>,
383 build_info: &'static BuildInfo,
384 bootstrap_args: &BootstrapArgs,
385 enable_expression_cache_override: Option<bool>,
386 f: F,
387 ) -> Result<Option<T>, anyhow::Error>
388 where
389 F: FnOnce(ConnCatalog) -> T,
390 {
391 async fn persist_client(
392 persist_consensus_url: SensitiveUrl,
393 persist_blob_url: SensitiveUrl,
394 persist_clients: &PersistClientCache,
395 ) -> Result<PersistClient, anyhow::Error> {
396 let persist_location = PersistLocation {
397 blob_uri: persist_blob_url,
398 consensus_uri: persist_consensus_url,
399 };
400 Ok(persist_clients.open(persist_location).await?)
401 }
402
403 if let Some(CatalogConfig {
404 persist_consensus_url,
405 persist_blob_url,
406 }) = &self.materialize.catalog_config
407 {
408 let persist_client = persist_client(
409 persist_consensus_url.clone(),
410 persist_blob_url.clone(),
411 &self.persist_clients,
412 )
413 .await?;
414 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
415 persist_client,
416 SYSTEM_TIME.clone(),
417 self.materialize.environment_id.clone(),
418 system_parameter_defaults,
419 build_info,
420 bootstrap_args,
421 enable_expression_cache_override,
422 )
423 .await?;
424 let res = f(catalog.for_session(&Session::dummy()));
425 catalog.expire().await;
426 Ok(Some(res))
427 } else {
428 Ok(None)
429 }
430 }
431
432 pub fn aws_endpoint(&self) -> &str {
433 self.aws_config.endpoint_url().unwrap_or("")
434 }
435
436 pub fn aws_region(&self) -> &str {
437 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
438 }
439
440 pub fn clear_skip_consistency_checks(&mut self) -> bool {
443 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
444 }
445
446 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
447 let (inner_client, _) = postgres_client(
448 &format!(
449 "postgres://mz_system:materialize@{}",
450 self.materialize.internal_sql_addr
451 ),
452 self.default_timeout,
453 )
454 .await?;
455
456 let version = pg_query_one(&inner_client, pg_sql!("SELECT mz_version_num()"), &[])
457 .await
458 .context("getting version of materialize")
459 .map(|row| row.get::<_, i32>(0))?;
460
461 let semver = pg_query_one(
462 &inner_client,
463 pg_sql!("SELECT right(split_part(mz_version(), ' ', 1), -1)"),
464 &[],
465 )
466 .await
467 .context("getting semver of materialize")
468 .map(|row| row.get::<_, String>(0))?
469 .parse::<semver::Version>()
470 .context("parsing semver of materialize")?;
471
472 pg_batch_execute(&inner_client, pg_sql!("ALTER SYSTEM RESET ALL"))
473 .await
474 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
475
476 {
478 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
479 let enable_unsafe_functions = if semver >= rename_version {
480 "unsafe_enable_unsafe_functions"
481 } else {
482 "enable_unsafe_functions"
483 };
484 let res = pg_batch_execute(
485 &inner_client,
486 pg_sql!(
487 "ALTER SYSTEM SET {} = on",
488 Sql::ident(enable_unsafe_functions)
489 ),
490 )
491 .await
492 .context("enabling dangerous functions");
493 if let Err(e) = res {
494 match e.root_cause().downcast_ref::<DbError>() {
495 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
496 info!(
497 "can't enable unsafe functions because the server is safe mode; \
498 testdrive scripts will fail if they use unsafe functions",
499 );
500 }
501 _ => return Err(e),
502 }
503 }
504 }
505
506 for row in pg_query(&inner_client, pg_sql!("SHOW DATABASES"), &[])
507 .await
508 .context("resetting materialize state: SHOW DATABASES")?
509 {
510 let db_name: String = row.get(0);
511 if db_name.starts_with("testdrive_no_reset_") {
512 continue;
513 }
514 let drop_database = pg_sql!("DROP DATABASE {}", Sql::ident(&db_name));
515 sql::print_query(drop_database.as_str(), None);
516 pg_batch_execute(&inner_client, drop_database)
517 .await
518 .context(format!(
519 "resetting materialize state: DROP DATABASE {}",
520 db_name,
521 ))?;
522 }
523
524 let inactive_user_clusters = "
526 WITH
527 active_user_clusters AS
528 (
529 SELECT DISTINCT cluster_id, object_id
530 FROM
531 (
532 SELECT cluster_id, id FROM mz_catalog.mz_sources
533 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
534 UNION ALL
535 SELECT cluster_id, id
536 FROM mz_catalog.mz_materialized_views
537 UNION ALL
538 SELECT cluster_id, id FROM mz_catalog.mz_indexes
539 UNION ALL
540 SELECT cluster_id, id
541 FROM mz_internal.mz_subscriptions
542 )
543 AS t (cluster_id, object_id)
544 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
545 )
546 SELECT name
547 FROM mz_catalog.mz_clusters
548 WHERE
549 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
550 AND
551 owner_id LIKE 'u%';";
552
553 let inactive_clusters = pg_query(&inner_client, Sql::new(inactive_user_clusters), &[])
554 .await
555 .context("resetting materialize state: inactive_user_clusters")?;
556
557 if !inactive_clusters.is_empty() {
558 println!("cleaning up user clusters from previous tests...")
559 }
560
561 for cluster_name in inactive_clusters {
562 let cluster_name: String = cluster_name.get(0);
563 if cluster_name.starts_with("testdrive_no_reset_") {
564 continue;
565 }
566 let drop_cluster = pg_sql!("DROP CLUSTER {}", Sql::ident(&cluster_name));
567 sql::print_query(drop_cluster.as_str(), None);
568 pg_batch_execute(&inner_client, drop_cluster)
569 .await
570 .context(format!(
571 "resetting materialize state: DROP CLUSTER {}",
572 cluster_name,
573 ))?;
574 }
575
576 pg_batch_execute(&inner_client, pg_sql!("CREATE DATABASE materialize"))
577 .await
578 .context("resetting materialize state: CREATE DATABASE materialize")?;
579
580 if let Ok(rows) = pg_query(&inner_client, pg_sql!("SELECT name FROM mz_roles"), &[]).await {
584 for row in rows {
585 let role_name: String = row.get(0);
586 if role_name == self.materialize.user || role_name.starts_with("mz_") {
587 continue;
588 }
589 let drop_role = pg_sql!("DROP ROLE {}", Sql::ident(&role_name));
590 sql::print_query(drop_role.as_str(), None);
591 pg_batch_execute(&inner_client, drop_role)
592 .await
593 .context(format!(
594 "resetting materialize state: DROP ROLE {}",
595 role_name,
596 ))?;
597 }
598 }
599
600 pg_batch_execute(
602 &inner_client,
603 pg_sql!(
604 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
605 Sql::ident(&self.materialize.user)
606 ),
607 )
608 .await?;
609
610 pg_batch_execute(
612 &inner_client,
613 pg_sql!("GRANT USAGE ON DATABASE materialize TO PUBLIC"),
614 )
615 .await?;
616 pg_batch_execute(
617 &inner_client,
618 pg_sql!(
619 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
620 Sql::ident(&self.materialize.user)
621 ),
622 )
623 .await?;
624 pg_batch_execute(
625 &inner_client,
626 pg_sql!(
627 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
628 Sql::ident(&self.materialize.user)
629 ),
630 )
631 .await?;
632
633 let cluster = match version {
634 ..=8199 => "default",
635 8200.. => "quickstart",
636 };
637 pg_batch_execute(
638 &inner_client,
639 pg_sql!("GRANT USAGE ON CLUSTER {} TO PUBLIC", Sql::ident(cluster)),
640 )
641 .await?;
642 pg_batch_execute(
643 &inner_client,
644 pg_sql!(
645 "GRANT ALL PRIVILEGES ON CLUSTER {} TO {}",
646 Sql::ident(cluster),
647 Sql::ident(&self.materialize.user)
648 ),
649 )
650 .await?;
651
652 Ok(())
653 }
654
655 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
657 use rdkafka::types::RDKafkaErrorCode;
658 let mut errors: Vec<anyhow::Error> = Vec::new();
659
660 let metadata = self.kafka_producer.client().fetch_metadata(
661 None,
662 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
663 )?;
664
665 let testdrive_topics: Vec<_> = metadata
666 .topics()
667 .iter()
668 .filter_map(|t| {
669 if t.name().starts_with("testdrive-") {
670 Some(t.name())
671 } else {
672 None
673 }
674 })
675 .collect();
676
677 if !testdrive_topics.is_empty() {
678 match self
679 .kafka_admin
680 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
681 .await
682 {
683 Ok(res) => {
684 if res.len() != testdrive_topics.len() {
685 errors.push(anyhow!(
686 "kafka topic deletion returned {} results, but exactly {} expected",
687 res.len(),
688 testdrive_topics.len()
689 ));
690 }
691 for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
692 match res {
693 Ok(_) | Err((_, RDKafkaErrorCode::UnknownTopicOrPartition)) => (),
694 Err((_, err)) => {
695 errors.push(anyhow!("unable to delete {}: {}", topic, err));
696 }
697 }
698 }
699 }
700 Err(e) => {
701 errors.push(e.into());
702 }
703 };
704 }
705
706 let schema_registry_errors = self.reset_schema_registry().await;
707
708 errors.extend(schema_registry_errors);
709 if errors.is_empty() {
710 Ok(())
711 } else {
712 bail!(
713 "deleting Kafka topics: {} errors: {}",
714 errors.len(),
715 errors
716 .into_iter()
717 .map(|e| e.to_string_with_causes())
718 .join("\n")
719 );
720 }
721 }
722
723 #[allow(clippy::disallowed_types)]
724 async fn reset_schema_registry(&self) -> Vec<anyhow::Error> {
725 use std::collections::HashMap;
726
727 let mut errors = Vec::new();
728 match self
729 .ccsr_client
730 .list_subjects()
731 .await
732 .context("listing schema registry subjects")
733 {
734 Ok(subjects) => {
735 let testdrive_subjects: Vec<_> = subjects
736 .into_iter()
737 .filter(|s| s.starts_with("testdrive-"))
738 .collect();
739
740 let mut graphs: HashMap<SubjectVersion, Vec<SubjectVersion>> = HashMap::new();
742
743 for subject in &testdrive_subjects {
744 match self.ccsr_client.get_subject_with_references(subject).await {
745 Ok((subj, refs)) => {
746 let refs: Vec<_> = refs
748 .into_iter()
749 .filter(|r| r.subject.starts_with("testdrive-"))
750 .collect();
751 graphs.insert(
752 SubjectVersion {
753 subject: subj.name,
754 version: subj.version,
755 },
756 refs,
757 );
758 }
759 Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => {
760 }
762 Err(e) => {
763 errors.push(anyhow::anyhow!(
764 "failed to get references for subject {}: {}",
765 subject,
766 e
767 ));
768 }
769 }
770 }
771
772 let subjects_to_delete: Vec<_> = match mz_ccsr::topological_sort(&graphs) {
775 Ok(ordered) => {
776 let mut subjects: Vec<_> = ordered.into_iter().collect();
777 subjects.sort_by(|a, b| a.1.cmp(&b.1));
778 subjects.into_iter().map(|(s, _)| s.clone()).collect()
779 }
780 Err(_) => {
781 tracing::info!("Cycle detected, attempting to delete anyway");
782 graphs.into_keys().collect()
784 }
785 };
786
787 for subject in subjects_to_delete {
788 match self.ccsr_client.delete_subject(&subject.subject).await {
789 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
790 Err(e) => errors.push(e.into()),
791 }
792 }
793 }
794 Err(e) => {
795 errors.push(e);
796 }
797 }
798 errors
799 }
800}
801
802#[derive(Debug, Clone)]
804pub struct CatalogConfig {
805 pub persist_consensus_url: SensitiveUrl,
807 pub persist_blob_url: SensitiveUrl,
809}
810
811pub enum ControlFlow {
812 Continue,
813 SkipBegin,
814 SkipEnd,
815}
816
817#[async_trait]
818pub(crate) trait Run {
819 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
820}
821
822#[async_trait]
823impl Run for PosCommand {
824 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
825 macro_rules! handle_version {
826 ($version_constraint:expr) => {
827 match $version_constraint {
828 Some(VersionConstraint { min, max }) => {
829 match version_check::run_version_check(min, max, state).await {
830 Ok(true) => return Ok(ControlFlow::Continue),
831 Ok(false) => {}
832 Err(err) => return Err(PosError::new(err, self.pos)),
833 }
834 }
835 None => {}
836 }
837 };
838 }
839
840 let wrap_err = |e| PosError::new(e, self.pos);
841 let ignore_prefix = match &self.command {
844 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
845 _ => None,
846 };
847 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
848 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
849 };
850 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
851 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
852 };
853
854 let r = match self.command {
855 Command::Builtin(mut builtin, version_constraint) => {
856 handle_version!(version_constraint);
857 for val in builtin.args.values_mut() {
858 *val = subst(val, &state.cmd_vars)?;
859 }
860 for line in &mut builtin.input {
861 *line = subst(line, &state.cmd_vars)?;
862 }
863 match builtin.name.as_ref() {
864 "check-consistency" => consistency::run_consistency_checks(state).await,
865 "skip-consistency-checks" => {
866 consistency::skip_consistency_checks(builtin, state)
867 }
868 "check-shard-tombstone" => {
869 consistency::run_check_shard_tombstone(builtin, state).await
870 }
871 "duckdb-execute" => duckdb::run_execute(builtin, state).await,
872 "duckdb-query" => duckdb::run_query(builtin, state).await,
873 "fivetran-destination" => {
874 fivetran::run_destination_command(builtin, state).await
875 }
876 "file-append" => file::run_append(builtin, state).await,
877 "file-delete" => file::run_delete(builtin, state).await,
878 "http-request" => http::run_request(builtin, state).await,
879 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
880 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
881 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
882 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
883 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
884 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
885 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
886 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
887 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
888 "mysql-connect" => mysql::run_connect(builtin, state).await,
889 "mysql-execute" => mysql::run_execute(builtin, state).await,
890 "nop" => nop::run_nop(),
891 "postgres-connect" => postgres::run_connect(builtin, state).await,
892 "postgres-execute" => postgres::run_execute(builtin, state).await,
893 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
894 "protobuf-compile-descriptors" => {
895 protobuf::run_compile_descriptors(builtin, state).await
896 }
897 "psql-execute" => psql::run_execute(builtin, state).await,
898 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
899 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
900 "s3-file-upload" => s3::run_upload(builtin, state).await,
901 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
902 "s3-upload-parquet-types" => s3::run_upload_parquet_types(builtin, state).await,
903 "s3-upload-parquet-unsorted-map" => {
904 s3::run_upload_parquet_unsorted_map(builtin, state).await
905 }
906 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
907 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
908 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
909 "skip-if" => skip_if::run_skip_if(builtin, state).await,
910 "skip-end" => skip_end::run_skip_end(),
911 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
912 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
913 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
914 "persist-force-compaction" => {
915 persist::run_force_compaction(builtin, state).await
916 }
917 "random-sleep" => sleep::run_random_sleep(builtin),
918 "set-regex" => set::run_regex_set(builtin, state),
919 "unset-regex" => set::run_regex_unset(builtin, state),
920 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
921 "set-max-tries" => set::run_max_tries(builtin, state),
922 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
923 sleep::run_sleep(builtin)
924 }
925 "set" => set::set_vars(builtin, state),
926 "set-arg-default" => set::run_set_arg_default(builtin, state),
927 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
928 "set-from-file" => set::run_set_from_file(builtin, state).await,
929 "webhook-append" => webhook::run_append(builtin, state).await,
930 _ => {
931 return Err(PosError::new(
932 anyhow!("unknown built-in command {}", builtin.name),
933 self.pos,
934 ));
935 }
936 }
937 }
938 Command::Sql(mut sql, version_constraint) => {
939 handle_version!(version_constraint);
940 sql.query = subst(&sql.query, &state.cmd_vars)?;
941 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
942 for row in expected_rows {
943 for col in row {
944 *col = subst(col, &state.cmd_vars)?;
945 }
946 }
947 }
948 sql::run_sql(sql, state).await
949 }
950 Command::FailSql(mut sql, version_constraint) => {
951 handle_version!(version_constraint);
952 sql.query = subst(&sql.query, &state.cmd_vars)?;
953 sql.expected_error = match &sql.expected_error {
954 SqlExpectedError::Contains(s) => {
955 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
956 }
957 SqlExpectedError::Exact(s) => {
958 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
959 }
960 SqlExpectedError::Regex(s) => {
961 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
962 }
963 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
964 };
965 sql::run_fail_sql(sql, state).await
966 }
967 };
968
969 r.map_err(wrap_err)
970 }
971}
972
973fn substitute_vars(
975 msg: &str,
976 vars: &BTreeMap<String, String>,
977 ignore_prefix: &Option<String>,
978 regex_escape: bool,
979) -> Result<String, anyhow::Error> {
980 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
981 let mut err = None;
982 let out = RE.replace_all(msg, |caps: &Captures| {
983 let name = &caps[1];
984 if let Some(ignore_prefix) = &ignore_prefix {
985 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
986 return caps.get(0).unwrap().as_str().to_string();
988 }
989 }
990
991 if let Some(val) = vars.get(name) {
992 if regex_escape {
993 regex::escape(val)
994 } else {
995 val.to_string()
996 }
997 } else {
998 err = Some(anyhow!("unknown variable: {}", name));
999 "#VAR-MISSING#".to_string()
1000 }
1001 });
1002 match err {
1003 Some(err) => Err(err),
1004 None => Ok(out.into_owned()),
1005 }
1006}
1007
1008pub async fn create_state(
1016 config: &Config,
1017) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
1018 let seed = config
1019 .seed
1020 .clone()
1021 .unwrap_or_else(|| format!("{:010}", rand::random::<u32>()));
1022
1023 let (_tempfile, temp_path) = match &config.temp_dir {
1024 Some(temp_dir) => {
1025 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
1026 (None, PathBuf::from(&temp_dir))
1027 }
1028 _ => {
1029 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
1032 let temp_path = tempfile_handle.path().to_path_buf();
1033 (Some(tempfile_handle), temp_path)
1034 }
1035 };
1036
1037 let materialize_catalog_config = config.materialize_catalog_config.clone();
1038
1039 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1040 info!("Connecting to {}", materialize_url.as_str());
1041 let (pgclient, pgconn) = Retry::default()
1042 .max_duration(config.default_timeout)
1043 .retry_async_canceling(|_| async move {
1044 let mut pgconfig = config.materialize_pgconfig.clone();
1045 pgconfig.connect_timeout(config.default_timeout);
1046 let tls = make_tls(&pgconfig)?;
1047 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
1048 })
1049 .await?;
1050
1051 let pgconn_task =
1052 task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
1053
1054 let materialize_state =
1055 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
1056
1057 let schema_registry_url = config.schema_registry_url.to_owned();
1058
1059 let ccsr_client = {
1060 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
1061
1062 if let Some(cert_path) = &config.cert_path {
1063 let cert = fs::read(cert_path).context("reading cert")?;
1064 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
1065 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
1066 .context("reading keystore file as pkcs12")?;
1067 ccsr_config = ccsr_config.identity(ident);
1068 }
1069
1070 if let Some(ccsr_username) = &config.ccsr_username {
1071 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1072 }
1073
1074 ccsr_config.build().context("Creating CCSR client")?
1075 };
1076
1077 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1078 use rdkafka::admin::{AdminClient, AdminOptions};
1079 use rdkafka::producer::FutureProducer;
1080
1081 let mut kafka_config = create_new_client_config_simple();
1082 kafka_config.set("bootstrap.servers", &config.kafka_addr);
1083 kafka_config.set("group.id", "materialize-testdrive");
1084 kafka_config.set("auto.offset.reset", "earliest");
1085 kafka_config.set("isolation.level", "read_committed");
1086 if let Some(cert_path) = &config.cert_path {
1087 kafka_config.set("security.protocol", "ssl");
1088 kafka_config.set("ssl.keystore.location", cert_path);
1089 if let Some(cert_password) = &config.cert_password {
1090 kafka_config.set("ssl.keystore.password", cert_password);
1091 }
1092 }
1093 kafka_config.set("message.max.bytes", "15728640");
1094
1095 for (key, value) in &config.kafka_opts {
1096 kafka_config.set(key, value);
1097 }
1098
1099 let admin: AdminClient<_> = kafka_config
1100 .create_with_context(MzClientContext::default())
1101 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1102
1103 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1104
1105 let producer: FutureProducer<_> = kafka_config
1106 .create_with_context(MzClientContext::default())
1107 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1108
1109 let topics = BTreeMap::new();
1110
1111 (
1112 config.kafka_addr.to_owned(),
1113 admin,
1114 admin_opts,
1115 producer,
1116 topics,
1117 kafka_config,
1118 )
1119 };
1120
1121 let mut state = State {
1122 config: config.clone(),
1123
1124 arg_vars: config.arg_vars.clone(),
1126 cmd_vars: BTreeMap::new(),
1127 seed,
1128 temp_path,
1129 _tempfile,
1130 default_timeout: config.default_timeout,
1131 timeout: config.default_timeout,
1132 max_tries: config.default_max_tries,
1133 initial_backoff: config.initial_backoff,
1134 backoff_factor: config.backoff_factor,
1135 consistency_checks: config.consistency_checks,
1136 check_statement_logging: config.check_statement_logging,
1137 consistency_checks_adhoc_skip: false,
1138 regex: None,
1139 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1140 rewrite_results: config.rewrite_results,
1141 error_line_count: 0,
1142 error_string: "".to_string(),
1143
1144 materialize: materialize_state,
1146
1147 persist_consensus_url: config.persist_consensus_url.clone(),
1149 persist_blob_url: config.persist_blob_url.clone(),
1150 build_info: config.build_info,
1151 persist_clients: PersistClientCache::new(
1152 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1153 &MetricsRegistry::new(),
1154 |_, _| PubSubClientConnection::noop(),
1155 ),
1156
1157 schema_registry_url,
1159 ccsr_client,
1160 kafka_addr,
1161 kafka_admin,
1162 kafka_admin_opts,
1163 kafka_config,
1164 kafka_default_partitions: config.kafka_default_partitions,
1165 kafka_producer,
1166 kafka_topics,
1167
1168 aws_account: config.aws_account.clone(),
1170 aws_config: config.aws_config.clone(),
1171
1172 duckdb_clients: BTreeMap::new(),
1174 mysql_clients: BTreeMap::new(),
1175 postgres_clients: BTreeMap::new(),
1176 sql_server_clients: BTreeMap::new(),
1177
1178 fivetran_destination_url: config.fivetran_destination_url.clone(),
1180 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1181
1182 rewrites: Vec::new(),
1183 rewrite_pos_start: 0,
1184 rewrite_pos_end: 0,
1185 };
1186 state.initialize_cmd_vars().await?;
1187 Ok((state, pgconn_task))
1188}
1189
1190async fn create_materialize_state(
1191 config: &&Config,
1192 materialize_catalog_config: Option<CatalogConfig>,
1193 pgclient: tokio_postgres::Client,
1194) -> Result<MaterializeState, anyhow::Error> {
1195 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1196 let materialize_internal_url =
1197 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1198
1199 for (key, value) in &config.materialize_params {
1200 #[allow(clippy::disallowed_methods)]
1202 pgclient
1203 .batch_execute(&format!("SET {key} = {value}"))
1204 .await
1205 .context("setting session parameter")?;
1206 }
1207
1208 let materialize_user = config
1209 .materialize_pgconfig
1210 .get_user()
1211 .expect("testdrive URL must contain user")
1212 .to_string();
1213
1214 let materialize_sql_addr = format!(
1215 "{}:{}",
1216 materialize_url.host_str().unwrap(),
1217 materialize_url.port().unwrap()
1218 );
1219 let materialize_http_addr = format!(
1220 "{}:{}",
1221 materialize_url.host_str().unwrap(),
1222 config.materialize_http_port
1223 );
1224 let materialize_internal_sql_addr = format!(
1225 "{}:{}",
1226 materialize_internal_url.host_str().unwrap(),
1227 materialize_internal_url.port().unwrap()
1228 );
1229 let materialize_password_sql_addr = format!(
1230 "{}:{}",
1231 materialize_url.host_str().unwrap(),
1232 config.materialize_password_sql_port
1233 );
1234 let materialize_sasl_sql_addr = format!(
1235 "{}:{}",
1236 materialize_url.host_str().unwrap(),
1237 config.materialize_sasl_sql_port
1238 );
1239 let materialize_internal_http_addr = format!(
1240 "{}:{}",
1241 materialize_internal_url.host_str().unwrap(),
1242 config.materialize_internal_http_port
1243 );
1244 let environment_id = pg_query_one(&pgclient, pg_sql!("SELECT mz_environment_id()"), &[])
1245 .await?
1246 .get::<_, String>(0)
1247 .parse()
1248 .context("parsing environment ID")?;
1249
1250 let bootstrap_args = BootstrapArgs {
1251 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1252 default_cluster_replica_size: "ABC".to_string(),
1253 default_cluster_replication_factor: 1,
1254 bootstrap_role: None,
1255 };
1256
1257 let materialize_state = MaterializeState {
1258 catalog_config: materialize_catalog_config,
1259 sql_addr: materialize_sql_addr,
1260 use_https: config.materialize_use_https,
1261 http_addr: materialize_http_addr,
1262 internal_sql_addr: materialize_internal_sql_addr,
1263 internal_http_addr: materialize_internal_http_addr,
1264 password_sql_addr: materialize_password_sql_addr,
1265 sasl_sql_addr: materialize_sasl_sql_addr,
1266 user: materialize_user,
1267 pgclient,
1268 environment_id,
1269 bootstrap_args,
1270 };
1271
1272 Ok(materialize_state)
1273}