1use std::collections::BTreeMap;
11use std::future::Future;
12
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_ccsr::SubjectVersion;
30use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
31use mz_ore::error::ErrorExt;
32use mz_ore::metrics::MetricsRegistry;
33use mz_ore::now::SYSTEM_TIME;
34use mz_ore::retry::Retry;
35use mz_ore::task;
36use mz_ore::url::SensitiveUrl;
37use mz_persist_client::cache::PersistClientCache;
38use mz_persist_client::cfg::PersistConfig;
39use mz_persist_client::rpc::PubSubClientConnection;
40use mz_persist_client::{PersistClient, PersistLocation};
41use mz_sql::catalog::EnvironmentId;
42use mz_tls_util::make_tls;
43use rdkafka::ClientConfig;
44use rdkafka::producer::Producer;
45use regex::{Captures, Regex};
46use semver::Version;
47use tokio_postgres::error::{DbError, SqlState};
48use tracing::info;
49use url::Url;
50
51use crate::error::PosError;
52use crate::parser::{
53 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
54};
55use crate::util;
56use crate::util::postgres::postgres_client;
57
58pub mod consistency;
59
60mod duckdb;
61mod file;
62mod fivetran;
63mod http;
64mod kafka;
65mod mysql;
66mod nop;
67mod persist;
68mod postgres;
69mod protobuf;
70mod psql;
71mod s3;
72mod schema_registry;
73mod set;
74mod skip_end;
75mod skip_if;
76mod sleep;
77mod sql;
78mod sql_server;
79mod version_check;
80mod webhook;
81
82#[derive(Debug, Clone)]
84pub struct Config {
85 pub arg_vars: BTreeMap<String, String>,
91 pub seed: Option<String>,
93 pub reset: bool,
96 pub temp_dir: Option<String>,
101 pub source: Option<String>,
103 pub default_timeout: Duration,
105 pub default_max_tries: usize,
107 pub initial_backoff: Duration,
111 pub backoff_factor: f64,
115 pub consistency_checks: consistency::Level,
117 pub check_statement_logging: bool,
120 pub rewrite_results: bool,
122
123 pub materialize_pgconfig: tokio_postgres::Config,
127 pub materialize_internal_pgconfig: tokio_postgres::Config,
130 pub materialize_use_https: bool,
132 pub materialize_http_port: u16,
135 pub materialize_internal_http_port: u16,
138 pub materialize_password_sql_port: u16,
141 pub materialize_sasl_sql_port: u16,
144 pub materialize_params: Vec<(String, String)>,
146 pub materialize_catalog_config: Option<CatalogConfig>,
148 pub build_info: &'static BuildInfo,
150 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
152
153 pub persist_consensus_url: Option<SensitiveUrl>,
156 pub persist_blob_url: Option<SensitiveUrl>,
158
159 pub kafka_addr: String,
162 pub kafka_default_partitions: usize,
164 pub kafka_opts: Vec<(String, String)>,
167 pub schema_registry_url: Url,
169 pub cert_path: Option<String>,
174 pub cert_password: Option<String>,
176 pub ccsr_username: Option<String>,
179 pub ccsr_password: Option<String>,
182
183 pub aws_config: SdkConfig,
186 pub aws_account: String,
188
189 pub fivetran_destination_url: String,
192 pub fivetran_destination_files_path: String,
194}
195
196pub struct MaterializeState {
197 catalog_config: Option<CatalogConfig>,
198
199 sql_addr: String,
200 use_https: bool,
201 http_addr: String,
202 internal_sql_addr: String,
203 internal_http_addr: String,
204 password_sql_addr: String,
205 sasl_sql_addr: String,
206 user: String,
207 pgclient: tokio_postgres::Client,
208 environment_id: EnvironmentId,
209 bootstrap_args: BootstrapArgs,
210}
211
212pub struct State {
213 pub config: Config,
215
216 arg_vars: BTreeMap<String, String>,
218 cmd_vars: BTreeMap<String, String>,
219 seed: String,
220 temp_path: PathBuf,
221 _tempfile: Option<tempfile::TempDir>,
222 default_timeout: Duration,
223 timeout: Duration,
224 max_tries: usize,
225 initial_backoff: Duration,
226 backoff_factor: f64,
227 consistency_checks: consistency::Level,
228 check_statement_logging: bool,
229 consistency_checks_adhoc_skip: bool,
230 regex: Option<Regex>,
231 regex_replacement: String,
232 error_line_count: usize,
233 error_string: String,
234
235 materialize: MaterializeState,
237
238 persist_consensus_url: Option<SensitiveUrl>,
240 persist_blob_url: Option<SensitiveUrl>,
241 build_info: &'static BuildInfo,
242 persist_clients: PersistClientCache,
243
244 schema_registry_url: Url,
246 ccsr_client: mz_ccsr::Client,
247 kafka_addr: String,
248 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
249 kafka_admin_opts: rdkafka::admin::AdminOptions,
250 kafka_config: ClientConfig,
251 kafka_default_partitions: usize,
252 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
253 kafka_topics: BTreeMap<String, usize>,
254
255 aws_account: String,
257 aws_config: SdkConfig,
258
259 pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
261 mysql_clients: BTreeMap<String, mysql_async::Conn>,
262 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
263 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
264
265 fivetran_destination_url: String,
267 fivetran_destination_files_path: String,
268
269 rewrite_results: bool,
271 pub rewrites: Vec<Rewrite>,
273 pub rewrite_pos_start: usize,
275 pub rewrite_pos_end: usize,
277}
278
279pub struct Rewrite {
280 pub content: String,
281 pub start: usize,
282 pub end: usize,
283}
284
285impl State {
286 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
287 self.cmd_vars
288 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
289 self.cmd_vars.insert(
290 "testdrive.schema-registry-url".into(),
291 self.schema_registry_url.to_string(),
292 );
293 self.cmd_vars
294 .insert("testdrive.seed".into(), self.seed.clone());
295 self.cmd_vars.insert(
296 "testdrive.temp-dir".into(),
297 self.temp_path.display().to_string(),
298 );
299 self.cmd_vars
300 .insert("testdrive.aws-region".into(), self.aws_region().into());
301 self.cmd_vars
302 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
303 self.cmd_vars
304 .insert("testdrive.aws-account".into(), self.aws_account.clone());
305 {
306 let aws_credentials = self
307 .aws_config
308 .credentials_provider()
309 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
310 .provide_credentials()
311 .await
312 .context("fetching AWS credentials")?;
313 self.cmd_vars.insert(
314 "testdrive.aws-access-key-id".into(),
315 aws_credentials.access_key_id().to_owned(),
316 );
317 self.cmd_vars.insert(
318 "testdrive.aws-secret-access-key".into(),
319 aws_credentials.secret_access_key().to_owned(),
320 );
321 self.cmd_vars.insert(
322 "testdrive.aws-token".into(),
323 aws_credentials
324 .session_token()
325 .map(|token| token.to_owned())
326 .unwrap_or_else(String::new),
327 );
328 }
329 self.cmd_vars.insert(
330 "testdrive.materialize-environment-id".into(),
331 self.materialize.environment_id.to_string(),
332 );
333 self.cmd_vars.insert(
334 "testdrive.materialize-sql-addr".into(),
335 self.materialize.sql_addr.clone(),
336 );
337 self.cmd_vars.insert(
338 "testdrive.materialize-internal-sql-addr".into(),
339 self.materialize.internal_sql_addr.clone(),
340 );
341 self.cmd_vars.insert(
342 "testdrive.materialize-password-sql-addr".into(),
343 self.materialize.password_sql_addr.clone(),
344 );
345 self.cmd_vars.insert(
346 "testdrive.materialize-sasl-sql-addr".into(),
347 self.materialize.sasl_sql_addr.clone(),
348 );
349 self.cmd_vars.insert(
350 "testdrive.materialize-user".into(),
351 self.materialize.user.clone(),
352 );
353 self.cmd_vars.insert(
354 "testdrive.fivetran-destination-url".into(),
355 self.fivetran_destination_url.clone(),
356 );
357 self.cmd_vars.insert(
358 "testdrive.fivetran-destination-files-path".into(),
359 self.fivetran_destination_files_path.clone(),
360 );
361
362 for (key, value) in env::vars() {
363 self.cmd_vars.insert(format!("env.{}", key), value);
364 }
365
366 for (key, value) in &self.arg_vars {
367 validate_ident(key)?;
368 self.cmd_vars
369 .insert(format!("arg.{}", key), value.to_string());
370 }
371
372 Ok(())
373 }
374 pub async fn with_catalog_copy<F, T>(
377 &self,
378 system_parameter_defaults: BTreeMap<String, String>,
379 build_info: &'static BuildInfo,
380 bootstrap_args: &BootstrapArgs,
381 enable_expression_cache_override: Option<bool>,
382 f: F,
383 ) -> Result<Option<T>, anyhow::Error>
384 where
385 F: FnOnce(ConnCatalog) -> T,
386 {
387 async fn persist_client(
388 persist_consensus_url: SensitiveUrl,
389 persist_blob_url: SensitiveUrl,
390 persist_clients: &PersistClientCache,
391 ) -> Result<PersistClient, anyhow::Error> {
392 let persist_location = PersistLocation {
393 blob_uri: persist_blob_url,
394 consensus_uri: persist_consensus_url,
395 };
396 Ok(persist_clients.open(persist_location).await?)
397 }
398
399 if let Some(CatalogConfig {
400 persist_consensus_url,
401 persist_blob_url,
402 }) = &self.materialize.catalog_config
403 {
404 let persist_client = persist_client(
405 persist_consensus_url.clone(),
406 persist_blob_url.clone(),
407 &self.persist_clients,
408 )
409 .await?;
410 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
411 persist_client,
412 SYSTEM_TIME.clone(),
413 self.materialize.environment_id.clone(),
414 system_parameter_defaults,
415 build_info,
416 bootstrap_args,
417 enable_expression_cache_override,
418 )
419 .await?;
420 let res = f(catalog.for_session(&Session::dummy()));
421 catalog.expire().await;
422 Ok(Some(res))
423 } else {
424 Ok(None)
425 }
426 }
427
428 pub fn aws_endpoint(&self) -> &str {
429 self.aws_config.endpoint_url().unwrap_or("")
430 }
431
432 pub fn aws_region(&self) -> &str {
433 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
434 }
435
436 pub fn clear_skip_consistency_checks(&mut self) -> bool {
439 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
440 }
441
442 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
443 let (inner_client, _) = postgres_client(
444 &format!(
445 "postgres://mz_system:materialize@{}",
446 self.materialize.internal_sql_addr
447 ),
448 self.default_timeout,
449 )
450 .await?;
451
452 let version = inner_client
453 .query_one("SELECT mz_version_num()", &[])
454 .await
455 .context("getting version of materialize")
456 .map(|row| row.get::<_, i32>(0))?;
457
458 let semver = inner_client
459 .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
460 .await
461 .context("getting semver of materialize")
462 .map(|row| row.get::<_, String>(0))?
463 .parse::<semver::Version>()
464 .context("parsing semver of materialize")?;
465
466 inner_client
467 .batch_execute("ALTER SYSTEM RESET ALL")
468 .await
469 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
470
471 {
473 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
474 let enable_unsafe_functions = if semver >= rename_version {
475 "unsafe_enable_unsafe_functions"
476 } else {
477 "enable_unsafe_functions"
478 };
479 let res = inner_client
480 .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
481 .await
482 .context("enabling dangerous functions");
483 if let Err(e) = res {
484 match e.root_cause().downcast_ref::<DbError>() {
485 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
486 info!(
487 "can't enable unsafe functions because the server is safe mode; \
488 testdrive scripts will fail if they use unsafe functions",
489 );
490 }
491 _ => return Err(e),
492 }
493 }
494 }
495
496 for row in inner_client
497 .query("SHOW DATABASES", &[])
498 .await
499 .context("resetting materialize state: SHOW DATABASES")?
500 {
501 let db_name: String = row.get(0);
502 if db_name.starts_with("testdrive_no_reset_") {
503 continue;
504 }
505 let query = format!(
506 "DROP DATABASE {}",
507 postgres_protocol::escape::escape_identifier(&db_name)
508 );
509 sql::print_query(&query, None);
510 inner_client.batch_execute(&query).await.context(format!(
511 "resetting materialize state: DROP DATABASE {}",
512 db_name,
513 ))?;
514 }
515
516 let inactive_user_clusters = "
518 WITH
519 active_user_clusters AS
520 (
521 SELECT DISTINCT cluster_id, object_id
522 FROM
523 (
524 SELECT cluster_id, id FROM mz_catalog.mz_sources
525 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
526 UNION ALL
527 SELECT cluster_id, id
528 FROM mz_catalog.mz_materialized_views
529 UNION ALL
530 SELECT cluster_id, id FROM mz_catalog.mz_indexes
531 UNION ALL
532 SELECT cluster_id, id
533 FROM mz_internal.mz_subscriptions
534 )
535 AS t (cluster_id, object_id)
536 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
537 )
538 SELECT name
539 FROM mz_catalog.mz_clusters
540 WHERE
541 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
542 AND
543 owner_id LIKE 'u%';";
544
545 let inactive_clusters = inner_client
546 .query(inactive_user_clusters, &[])
547 .await
548 .context("resetting materialize state: inactive_user_clusters")?;
549
550 if !inactive_clusters.is_empty() {
551 println!("cleaning up user clusters from previous tests...")
552 }
553
554 for cluster_name in inactive_clusters {
555 let cluster_name: String = cluster_name.get(0);
556 if cluster_name.starts_with("testdrive_no_reset_") {
557 continue;
558 }
559 let query = format!(
560 "DROP CLUSTER {}",
561 postgres_protocol::escape::escape_identifier(&cluster_name)
562 );
563 sql::print_query(&query, None);
564 inner_client.batch_execute(&query).await.context(format!(
565 "resetting materialize state: DROP CLUSTER {}",
566 cluster_name,
567 ))?;
568 }
569
570 inner_client
571 .batch_execute("CREATE DATABASE materialize")
572 .await
573 .context("resetting materialize state: CREATE DATABASE materialize")?;
574
575 if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
579 for row in rows {
580 let role_name: String = row.get(0);
581 if role_name == self.materialize.user || role_name.starts_with("mz_") {
582 continue;
583 }
584 let query = format!(
585 "DROP ROLE {}",
586 postgres_protocol::escape::escape_identifier(&role_name)
587 );
588 sql::print_query(&query, None);
589 inner_client.batch_execute(&query).await.context(format!(
590 "resetting materialize state: DROP ROLE {}",
591 role_name,
592 ))?;
593 }
594 }
595
596 inner_client
598 .batch_execute(&format!(
599 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
600 self.materialize.user
601 ))
602 .await?;
603
604 inner_client
606 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
607 .await?;
608 inner_client
609 .batch_execute(&format!(
610 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
611 self.materialize.user
612 ))
613 .await?;
614 inner_client
615 .batch_execute(&format!(
616 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
617 self.materialize.user
618 ))
619 .await?;
620
621 let cluster = match version {
622 ..=8199 => "default",
623 8200.. => "quickstart",
624 };
625 inner_client
626 .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
627 .await?;
628 inner_client
629 .batch_execute(&format!(
630 "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
631 self.materialize.user
632 ))
633 .await?;
634
635 Ok(())
636 }
637
638 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
640 use rdkafka::types::RDKafkaErrorCode;
641 let mut errors: Vec<anyhow::Error> = Vec::new();
642
643 let metadata = self.kafka_producer.client().fetch_metadata(
644 None,
645 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
646 )?;
647
648 let testdrive_topics: Vec<_> = metadata
649 .topics()
650 .iter()
651 .filter_map(|t| {
652 if t.name().starts_with("testdrive-") {
653 Some(t.name())
654 } else {
655 None
656 }
657 })
658 .collect();
659
660 if !testdrive_topics.is_empty() {
661 match self
662 .kafka_admin
663 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
664 .await
665 {
666 Ok(res) => {
667 if res.len() != testdrive_topics.len() {
668 errors.push(anyhow!(
669 "kafka topic deletion returned {} results, but exactly {} expected",
670 res.len(),
671 testdrive_topics.len()
672 ));
673 }
674 for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
675 match res {
676 Ok(_) | Err((_, RDKafkaErrorCode::UnknownTopicOrPartition)) => (),
677 Err((_, err)) => {
678 errors.push(anyhow!("unable to delete {}: {}", topic, err));
679 }
680 }
681 }
682 }
683 Err(e) => {
684 errors.push(e.into());
685 }
686 };
687 }
688
689 let schema_registry_errors = self.reset_schema_registry().await;
690
691 errors.extend(schema_registry_errors);
692 if errors.is_empty() {
693 Ok(())
694 } else {
695 bail!(
696 "deleting Kafka topics: {} errors: {}",
697 errors.len(),
698 errors
699 .into_iter()
700 .map(|e| e.to_string_with_causes())
701 .join("\n")
702 );
703 }
704 }
705
706 #[allow(clippy::disallowed_types)]
707 async fn reset_schema_registry(&self) -> Vec<anyhow::Error> {
708 use std::collections::HashMap;
709
710 let mut errors = Vec::new();
711 match self
712 .ccsr_client
713 .list_subjects()
714 .await
715 .context("listing schema registry subjects")
716 {
717 Ok(subjects) => {
718 let testdrive_subjects: Vec<_> = subjects
719 .into_iter()
720 .filter(|s| s.starts_with("testdrive-"))
721 .collect();
722
723 let mut graphs: HashMap<SubjectVersion, Vec<SubjectVersion>> = HashMap::new();
725
726 for subject in &testdrive_subjects {
727 match self.ccsr_client.get_subject_with_references(subject).await {
728 Ok((subj, refs)) => {
729 let refs: Vec<_> = refs
731 .into_iter()
732 .filter(|r| r.subject.starts_with("testdrive-"))
733 .collect();
734 graphs.insert(
735 SubjectVersion {
736 subject: subj.name,
737 version: subj.version,
738 },
739 refs,
740 );
741 }
742 Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => {
743 }
745 Err(e) => {
746 errors.push(anyhow::anyhow!(
747 "failed to get references for subject {}: {}",
748 subject,
749 e
750 ));
751 }
752 }
753 }
754
755 let subjects_to_delete: Vec<_> = match mz_ccsr::topological_sort(&graphs) {
758 Ok(ordered) => {
759 let mut subjects: Vec<_> = ordered.into_iter().collect();
760 subjects.sort_by(|a, b| a.1.cmp(&b.1));
761 subjects.into_iter().map(|(s, _)| s.clone()).collect()
762 }
763 Err(_) => {
764 tracing::info!("Cycle detected, attempting to delete anyway");
765 graphs.into_keys().collect()
767 }
768 };
769
770 for subject in subjects_to_delete {
771 match self.ccsr_client.delete_subject(&subject.subject).await {
772 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
773 Err(e) => errors.push(e.into()),
774 }
775 }
776 }
777 Err(e) => {
778 errors.push(e);
779 }
780 }
781 errors
782 }
783}
784
785#[derive(Debug, Clone)]
787pub struct CatalogConfig {
788 pub persist_consensus_url: SensitiveUrl,
790 pub persist_blob_url: SensitiveUrl,
792}
793
794pub enum ControlFlow {
795 Continue,
796 SkipBegin,
797 SkipEnd,
798}
799
800#[async_trait]
801pub(crate) trait Run {
802 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
803}
804
805#[async_trait]
806impl Run for PosCommand {
807 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
808 macro_rules! handle_version {
809 ($version_constraint:expr) => {
810 match $version_constraint {
811 Some(VersionConstraint { min, max }) => {
812 match version_check::run_version_check(min, max, state).await {
813 Ok(true) => return Ok(ControlFlow::Continue),
814 Ok(false) => {}
815 Err(err) => return Err(PosError::new(err, self.pos)),
816 }
817 }
818 None => {}
819 }
820 };
821 }
822
823 let wrap_err = |e| PosError::new(e, self.pos);
824 let ignore_prefix = match &self.command {
827 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
828 _ => None,
829 };
830 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
831 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
832 };
833 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
834 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
835 };
836
837 let r = match self.command {
838 Command::Builtin(mut builtin, version_constraint) => {
839 handle_version!(version_constraint);
840 for val in builtin.args.values_mut() {
841 *val = subst(val, &state.cmd_vars)?;
842 }
843 for line in &mut builtin.input {
844 *line = subst(line, &state.cmd_vars)?;
845 }
846 match builtin.name.as_ref() {
847 "check-consistency" => consistency::run_consistency_checks(state).await,
848 "skip-consistency-checks" => {
849 consistency::skip_consistency_checks(builtin, state)
850 }
851 "check-shard-tombstone" => {
852 consistency::run_check_shard_tombstone(builtin, state).await
853 }
854 "duckdb-execute" => duckdb::run_execute(builtin, state).await,
855 "duckdb-query" => duckdb::run_query(builtin, state).await,
856 "fivetran-destination" => {
857 fivetran::run_destination_command(builtin, state).await
858 }
859 "file-append" => file::run_append(builtin, state).await,
860 "file-delete" => file::run_delete(builtin, state).await,
861 "http-request" => http::run_request(builtin, state).await,
862 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
863 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
864 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
865 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
866 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
867 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
868 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
869 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
870 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
871 "mysql-connect" => mysql::run_connect(builtin, state).await,
872 "mysql-execute" => mysql::run_execute(builtin, state).await,
873 "nop" => nop::run_nop(),
874 "postgres-connect" => postgres::run_connect(builtin, state).await,
875 "postgres-execute" => postgres::run_execute(builtin, state).await,
876 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
877 "protobuf-compile-descriptors" => {
878 protobuf::run_compile_descriptors(builtin, state).await
879 }
880 "psql-execute" => psql::run_execute(builtin, state).await,
881 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
882 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
883 "s3-file-upload" => s3::run_upload(builtin, state).await,
884 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
885 "s3-upload-parquet-types" => s3::run_upload_parquet_types(builtin, state).await,
886 "s3-upload-parquet-unsorted-map" => {
887 s3::run_upload_parquet_unsorted_map(builtin, state).await
888 }
889 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
890 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
891 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
892 "skip-if" => skip_if::run_skip_if(builtin, state).await,
893 "skip-end" => skip_end::run_skip_end(),
894 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
895 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
896 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
897 "persist-force-compaction" => {
898 persist::run_force_compaction(builtin, state).await
899 }
900 "random-sleep" => sleep::run_random_sleep(builtin),
901 "set-regex" => set::run_regex_set(builtin, state),
902 "unset-regex" => set::run_regex_unset(builtin, state),
903 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
904 "set-max-tries" => set::run_max_tries(builtin, state),
905 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
906 sleep::run_sleep(builtin)
907 }
908 "set" => set::set_vars(builtin, state),
909 "set-arg-default" => set::run_set_arg_default(builtin, state),
910 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
911 "set-from-file" => set::run_set_from_file(builtin, state).await,
912 "webhook-append" => webhook::run_append(builtin, state).await,
913 _ => {
914 return Err(PosError::new(
915 anyhow!("unknown built-in command {}", builtin.name),
916 self.pos,
917 ));
918 }
919 }
920 }
921 Command::Sql(mut sql, version_constraint) => {
922 handle_version!(version_constraint);
923 sql.query = subst(&sql.query, &state.cmd_vars)?;
924 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
925 for row in expected_rows {
926 for col in row {
927 *col = subst(col, &state.cmd_vars)?;
928 }
929 }
930 }
931 sql::run_sql(sql, state).await
932 }
933 Command::FailSql(mut sql, version_constraint) => {
934 handle_version!(version_constraint);
935 sql.query = subst(&sql.query, &state.cmd_vars)?;
936 sql.expected_error = match &sql.expected_error {
937 SqlExpectedError::Contains(s) => {
938 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
939 }
940 SqlExpectedError::Exact(s) => {
941 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
942 }
943 SqlExpectedError::Regex(s) => {
944 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
945 }
946 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
947 };
948 sql::run_fail_sql(sql, state).await
949 }
950 };
951
952 r.map_err(wrap_err)
953 }
954}
955
956fn substitute_vars(
958 msg: &str,
959 vars: &BTreeMap<String, String>,
960 ignore_prefix: &Option<String>,
961 regex_escape: bool,
962) -> Result<String, anyhow::Error> {
963 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
964 let mut err = None;
965 let out = RE.replace_all(msg, |caps: &Captures| {
966 let name = &caps[1];
967 if let Some(ignore_prefix) = &ignore_prefix {
968 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
969 return caps.get(0).unwrap().as_str().to_string();
971 }
972 }
973
974 if let Some(val) = vars.get(name) {
975 if regex_escape {
976 regex::escape(val)
977 } else {
978 val.to_string()
979 }
980 } else {
981 err = Some(anyhow!("unknown variable: {}", name));
982 "#VAR-MISSING#".to_string()
983 }
984 });
985 match err {
986 Some(err) => Err(err),
987 None => Ok(out.into_owned()),
988 }
989}
990
991pub async fn create_state(
999 config: &Config,
1000) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
1001 let seed = config
1002 .seed
1003 .clone()
1004 .unwrap_or_else(|| format!("{:010}", rand::random::<u32>()));
1005
1006 let (_tempfile, temp_path) = match &config.temp_dir {
1007 Some(temp_dir) => {
1008 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
1009 (None, PathBuf::from(&temp_dir))
1010 }
1011 _ => {
1012 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
1015 let temp_path = tempfile_handle.path().to_path_buf();
1016 (Some(tempfile_handle), temp_path)
1017 }
1018 };
1019
1020 let materialize_catalog_config = config.materialize_catalog_config.clone();
1021
1022 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1023 info!("Connecting to {}", materialize_url.as_str());
1024 let (pgclient, pgconn) = Retry::default()
1025 .max_duration(config.default_timeout)
1026 .retry_async_canceling(|_| async move {
1027 let mut pgconfig = config.materialize_pgconfig.clone();
1028 pgconfig.connect_timeout(config.default_timeout);
1029 let tls = make_tls(&pgconfig)?;
1030 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
1031 })
1032 .await?;
1033
1034 let pgconn_task =
1035 task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
1036
1037 let materialize_state =
1038 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
1039
1040 let schema_registry_url = config.schema_registry_url.to_owned();
1041
1042 let ccsr_client = {
1043 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
1044
1045 if let Some(cert_path) = &config.cert_path {
1046 let cert = fs::read(cert_path).context("reading cert")?;
1047 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
1048 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
1049 .context("reading keystore file as pkcs12")?;
1050 ccsr_config = ccsr_config.identity(ident);
1051 }
1052
1053 if let Some(ccsr_username) = &config.ccsr_username {
1054 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1055 }
1056
1057 ccsr_config.build().context("Creating CCSR client")?
1058 };
1059
1060 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1061 use rdkafka::admin::{AdminClient, AdminOptions};
1062 use rdkafka::producer::FutureProducer;
1063
1064 let mut kafka_config = create_new_client_config_simple();
1065 kafka_config.set("bootstrap.servers", &config.kafka_addr);
1066 kafka_config.set("group.id", "materialize-testdrive");
1067 kafka_config.set("auto.offset.reset", "earliest");
1068 kafka_config.set("isolation.level", "read_committed");
1069 if let Some(cert_path) = &config.cert_path {
1070 kafka_config.set("security.protocol", "ssl");
1071 kafka_config.set("ssl.keystore.location", cert_path);
1072 if let Some(cert_password) = &config.cert_password {
1073 kafka_config.set("ssl.keystore.password", cert_password);
1074 }
1075 }
1076 kafka_config.set("message.max.bytes", "15728640");
1077
1078 for (key, value) in &config.kafka_opts {
1079 kafka_config.set(key, value);
1080 }
1081
1082 let admin: AdminClient<_> = kafka_config
1083 .create_with_context(MzClientContext::default())
1084 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1085
1086 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1087
1088 let producer: FutureProducer<_> = kafka_config
1089 .create_with_context(MzClientContext::default())
1090 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1091
1092 let topics = BTreeMap::new();
1093
1094 (
1095 config.kafka_addr.to_owned(),
1096 admin,
1097 admin_opts,
1098 producer,
1099 topics,
1100 kafka_config,
1101 )
1102 };
1103
1104 let mut state = State {
1105 config: config.clone(),
1106
1107 arg_vars: config.arg_vars.clone(),
1109 cmd_vars: BTreeMap::new(),
1110 seed,
1111 temp_path,
1112 _tempfile,
1113 default_timeout: config.default_timeout,
1114 timeout: config.default_timeout,
1115 max_tries: config.default_max_tries,
1116 initial_backoff: config.initial_backoff,
1117 backoff_factor: config.backoff_factor,
1118 consistency_checks: config.consistency_checks,
1119 check_statement_logging: config.check_statement_logging,
1120 consistency_checks_adhoc_skip: false,
1121 regex: None,
1122 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1123 rewrite_results: config.rewrite_results,
1124 error_line_count: 0,
1125 error_string: "".to_string(),
1126
1127 materialize: materialize_state,
1129
1130 persist_consensus_url: config.persist_consensus_url.clone(),
1132 persist_blob_url: config.persist_blob_url.clone(),
1133 build_info: config.build_info,
1134 persist_clients: PersistClientCache::new(
1135 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1136 &MetricsRegistry::new(),
1137 |_, _| PubSubClientConnection::noop(),
1138 ),
1139
1140 schema_registry_url,
1142 ccsr_client,
1143 kafka_addr,
1144 kafka_admin,
1145 kafka_admin_opts,
1146 kafka_config,
1147 kafka_default_partitions: config.kafka_default_partitions,
1148 kafka_producer,
1149 kafka_topics,
1150
1151 aws_account: config.aws_account.clone(),
1153 aws_config: config.aws_config.clone(),
1154
1155 duckdb_clients: BTreeMap::new(),
1157 mysql_clients: BTreeMap::new(),
1158 postgres_clients: BTreeMap::new(),
1159 sql_server_clients: BTreeMap::new(),
1160
1161 fivetran_destination_url: config.fivetran_destination_url.clone(),
1163 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1164
1165 rewrites: Vec::new(),
1166 rewrite_pos_start: 0,
1167 rewrite_pos_end: 0,
1168 };
1169 state.initialize_cmd_vars().await?;
1170 Ok((state, pgconn_task))
1171}
1172
1173async fn create_materialize_state(
1174 config: &&Config,
1175 materialize_catalog_config: Option<CatalogConfig>,
1176 pgclient: tokio_postgres::Client,
1177) -> Result<MaterializeState, anyhow::Error> {
1178 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1179 let materialize_internal_url =
1180 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1181
1182 for (key, value) in &config.materialize_params {
1183 pgclient
1184 .batch_execute(&format!("SET {key} = {value}"))
1185 .await
1186 .context("setting session parameter")?;
1187 }
1188
1189 let materialize_user = config
1190 .materialize_pgconfig
1191 .get_user()
1192 .expect("testdrive URL must contain user")
1193 .to_string();
1194
1195 let materialize_sql_addr = format!(
1196 "{}:{}",
1197 materialize_url.host_str().unwrap(),
1198 materialize_url.port().unwrap()
1199 );
1200 let materialize_http_addr = format!(
1201 "{}:{}",
1202 materialize_url.host_str().unwrap(),
1203 config.materialize_http_port
1204 );
1205 let materialize_internal_sql_addr = format!(
1206 "{}:{}",
1207 materialize_internal_url.host_str().unwrap(),
1208 materialize_internal_url.port().unwrap()
1209 );
1210 let materialize_password_sql_addr = format!(
1211 "{}:{}",
1212 materialize_url.host_str().unwrap(),
1213 config.materialize_password_sql_port
1214 );
1215 let materialize_sasl_sql_addr = format!(
1216 "{}:{}",
1217 materialize_url.host_str().unwrap(),
1218 config.materialize_sasl_sql_port
1219 );
1220 let materialize_internal_http_addr = format!(
1221 "{}:{}",
1222 materialize_internal_url.host_str().unwrap(),
1223 config.materialize_internal_http_port
1224 );
1225 let environment_id = pgclient
1226 .query_one("SELECT mz_environment_id()", &[])
1227 .await?
1228 .get::<_, String>(0)
1229 .parse()
1230 .context("parsing environment ID")?;
1231
1232 let bootstrap_args = BootstrapArgs {
1233 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1234 default_cluster_replica_size: "ABC".to_string(),
1235 default_cluster_replication_factor: 1,
1236 bootstrap_role: None,
1237 };
1238
1239 let materialize_state = MaterializeState {
1240 catalog_config: materialize_catalog_config,
1241 sql_addr: materialize_sql_addr,
1242 use_https: config.materialize_use_https,
1243 http_addr: materialize_http_addr,
1244 internal_sql_addr: materialize_internal_sql_addr,
1245 internal_http_addr: materialize_internal_http_addr,
1246 password_sql_addr: materialize_password_sql_addr,
1247 sasl_sql_addr: materialize_sasl_sql_addr,
1248 user: materialize_user,
1249 pgclient,
1250 environment_id,
1251 bootstrap_args,
1252 };
1253
1254 Ok(materialize_state)
1255}