1use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rdkafka::ClientConfig;
43use rdkafka::producer::Producer;
44use regex::{Captures, Regex};
45use semver::Version;
46use tokio_postgres::error::{DbError, SqlState};
47use tracing::info;
48use url::Url;
49
50use crate::error::PosError;
51use crate::parser::{
52 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
53};
54use crate::util;
55use crate::util::postgres::postgres_client;
56
57pub mod consistency;
58
59mod file;
60mod fivetran;
61mod http;
62mod kafka;
63mod mysql;
64mod nop;
65mod persist;
66mod postgres;
67mod protobuf;
68mod psql;
69mod s3;
70mod schema_registry;
71mod set;
72mod skip_end;
73mod skip_if;
74mod sleep;
75mod sql;
76mod sql_server;
77mod version_check;
78mod webhook;
79
80#[derive(Debug)]
82pub struct Config {
83 pub arg_vars: BTreeMap<String, String>,
89 pub seed: Option<u32>,
91 pub reset: bool,
94 pub temp_dir: Option<String>,
99 pub source: Option<String>,
101 pub default_timeout: Duration,
103 pub default_max_tries: usize,
105 pub initial_backoff: Duration,
109 pub backoff_factor: f64,
113 pub consistency_checks: consistency::Level,
115 pub rewrite_results: bool,
117
118 pub materialize_pgconfig: tokio_postgres::Config,
122 pub materialize_internal_pgconfig: tokio_postgres::Config,
125 pub materialize_use_https: bool,
127 pub materialize_http_port: u16,
130 pub materialize_internal_http_port: u16,
133 pub materialize_password_sql_port: u16,
136 pub materialize_sasl_sql_port: u16,
139 pub materialize_params: Vec<(String, String)>,
141 pub materialize_catalog_config: Option<CatalogConfig>,
143 pub build_info: &'static BuildInfo,
145 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
147
148 pub persist_consensus_url: Option<SensitiveUrl>,
151 pub persist_blob_url: Option<SensitiveUrl>,
153
154 pub kafka_addr: String,
157 pub kafka_default_partitions: usize,
159 pub kafka_opts: Vec<(String, String)>,
162 pub schema_registry_url: Url,
164 pub cert_path: Option<String>,
169 pub cert_password: Option<String>,
171 pub ccsr_username: Option<String>,
174 pub ccsr_password: Option<String>,
177
178 pub aws_config: SdkConfig,
181 pub aws_account: String,
183
184 pub fivetran_destination_url: String,
187 pub fivetran_destination_files_path: String,
189}
190
191pub struct MaterializeState {
192 catalog_config: Option<CatalogConfig>,
193
194 sql_addr: String,
195 use_https: bool,
196 http_addr: String,
197 internal_sql_addr: String,
198 internal_http_addr: String,
199 password_sql_addr: String,
200 sasl_sql_addr: String,
201 user: String,
202 pgclient: tokio_postgres::Client,
203 environment_id: EnvironmentId,
204 bootstrap_args: BootstrapArgs,
205}
206
207pub struct State {
208 arg_vars: BTreeMap<String, String>,
210 cmd_vars: BTreeMap<String, String>,
211 seed: u32,
212 temp_path: PathBuf,
213 _tempfile: Option<tempfile::TempDir>,
214 default_timeout: Duration,
215 timeout: Duration,
216 max_tries: usize,
217 initial_backoff: Duration,
218 backoff_factor: f64,
219 consistency_checks: consistency::Level,
220 consistency_checks_adhoc_skip: bool,
221 regex: Option<Regex>,
222 regex_replacement: String,
223 error_line_count: usize,
224 error_string: String,
225
226 materialize: MaterializeState,
228
229 persist_consensus_url: Option<SensitiveUrl>,
231 persist_blob_url: Option<SensitiveUrl>,
232 build_info: &'static BuildInfo,
233 persist_clients: PersistClientCache,
234
235 schema_registry_url: Url,
237 ccsr_client: mz_ccsr::Client,
238 kafka_addr: String,
239 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
240 kafka_admin_opts: rdkafka::admin::AdminOptions,
241 kafka_config: ClientConfig,
242 kafka_default_partitions: usize,
243 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
244 kafka_topics: BTreeMap<String, usize>,
245
246 aws_account: String,
248 aws_config: SdkConfig,
249
250 mysql_clients: BTreeMap<String, mysql_async::Conn>,
252 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
253 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
254
255 fivetran_destination_url: String,
257 fivetran_destination_files_path: String,
258
259 rewrite_results: bool,
261 pub rewrites: Vec<Rewrite>,
263 pub rewrite_pos_start: usize,
265 pub rewrite_pos_end: usize,
267}
268
269pub struct Rewrite {
270 pub content: String,
271 pub start: usize,
272 pub end: usize,
273}
274
275impl State {
276 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
277 self.cmd_vars
278 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
279 self.cmd_vars.insert(
280 "testdrive.kafka-addr-resolved".into(),
281 self.kafka_addr
282 .to_socket_addrs()
283 .ok()
284 .and_then(|mut addrs| addrs.next())
285 .map(|addr| addr.to_string())
286 .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
287 );
288 self.cmd_vars.insert(
289 "testdrive.schema-registry-url".into(),
290 self.schema_registry_url.to_string(),
291 );
292 self.cmd_vars
293 .insert("testdrive.seed".into(), self.seed.to_string());
294 self.cmd_vars.insert(
295 "testdrive.temp-dir".into(),
296 self.temp_path.display().to_string(),
297 );
298 self.cmd_vars
299 .insert("testdrive.aws-region".into(), self.aws_region().into());
300 self.cmd_vars
301 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
302 self.cmd_vars
303 .insert("testdrive.aws-account".into(), self.aws_account.clone());
304 {
305 let aws_credentials = self
306 .aws_config
307 .credentials_provider()
308 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
309 .provide_credentials()
310 .await
311 .context("fetching AWS credentials")?;
312 self.cmd_vars.insert(
313 "testdrive.aws-access-key-id".into(),
314 aws_credentials.access_key_id().to_owned(),
315 );
316 self.cmd_vars.insert(
317 "testdrive.aws-secret-access-key".into(),
318 aws_credentials.secret_access_key().to_owned(),
319 );
320 self.cmd_vars.insert(
321 "testdrive.aws-token".into(),
322 aws_credentials
323 .session_token()
324 .map(|token| token.to_owned())
325 .unwrap_or_else(String::new),
326 );
327 }
328 self.cmd_vars.insert(
329 "testdrive.materialize-environment-id".into(),
330 self.materialize.environment_id.to_string(),
331 );
332 self.cmd_vars.insert(
333 "testdrive.materialize-sql-addr".into(),
334 self.materialize.sql_addr.clone(),
335 );
336 self.cmd_vars.insert(
337 "testdrive.materialize-internal-sql-addr".into(),
338 self.materialize.internal_sql_addr.clone(),
339 );
340 self.cmd_vars.insert(
341 "testdrive.materialize-password-sql-addr".into(),
342 self.materialize.password_sql_addr.clone(),
343 );
344 self.cmd_vars.insert(
345 "testdrive.materialize-sasl-sql-addr".into(),
346 self.materialize.sasl_sql_addr.clone(),
347 );
348 self.cmd_vars.insert(
349 "testdrive.materialize-user".into(),
350 self.materialize.user.clone(),
351 );
352 self.cmd_vars.insert(
353 "testdrive.fivetran-destination-url".into(),
354 self.fivetran_destination_url.clone(),
355 );
356 self.cmd_vars.insert(
357 "testdrive.fivetran-destination-files-path".into(),
358 self.fivetran_destination_files_path.clone(),
359 );
360
361 for (key, value) in env::vars() {
362 self.cmd_vars.insert(format!("env.{}", key), value);
363 }
364
365 for (key, value) in &self.arg_vars {
366 validate_ident(key)?;
367 self.cmd_vars
368 .insert(format!("arg.{}", key), value.to_string());
369 }
370
371 Ok(())
372 }
373 pub async fn with_catalog_copy<F, T>(
376 &self,
377 system_parameter_defaults: BTreeMap<String, String>,
378 build_info: &'static BuildInfo,
379 bootstrap_args: &BootstrapArgs,
380 enable_expression_cache_override: Option<bool>,
381 f: F,
382 ) -> Result<Option<T>, anyhow::Error>
383 where
384 F: FnOnce(ConnCatalog) -> T,
385 {
386 async fn persist_client(
387 persist_consensus_url: SensitiveUrl,
388 persist_blob_url: SensitiveUrl,
389 persist_clients: &PersistClientCache,
390 ) -> Result<PersistClient, anyhow::Error> {
391 let persist_location = PersistLocation {
392 blob_uri: persist_blob_url,
393 consensus_uri: persist_consensus_url,
394 };
395 Ok(persist_clients.open(persist_location).await?)
396 }
397
398 if let Some(CatalogConfig {
399 persist_consensus_url,
400 persist_blob_url,
401 }) = &self.materialize.catalog_config
402 {
403 let persist_client = persist_client(
404 persist_consensus_url.clone(),
405 persist_blob_url.clone(),
406 &self.persist_clients,
407 )
408 .await?;
409 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
410 persist_client,
411 SYSTEM_TIME.clone(),
412 self.materialize.environment_id.clone(),
413 system_parameter_defaults,
414 build_info,
415 bootstrap_args,
416 enable_expression_cache_override,
417 )
418 .await?;
419 let res = f(catalog.for_session(&Session::dummy()));
420 catalog.expire().await;
421 Ok(Some(res))
422 } else {
423 Ok(None)
424 }
425 }
426
427 pub fn aws_endpoint(&self) -> &str {
428 self.aws_config.endpoint_url().unwrap_or("")
429 }
430
431 pub fn aws_region(&self) -> &str {
432 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
433 }
434
435 pub fn clear_skip_consistency_checks(&mut self) -> bool {
438 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
439 }
440
441 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
442 let (inner_client, _) = postgres_client(
443 &format!(
444 "postgres://mz_system:materialize@{}",
445 self.materialize.internal_sql_addr
446 ),
447 self.default_timeout,
448 )
449 .await?;
450
451 let version = inner_client
452 .query_one("SELECT mz_version_num()", &[])
453 .await
454 .context("getting version of materialize")
455 .map(|row| row.get::<_, i32>(0))?;
456
457 let semver = inner_client
458 .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
459 .await
460 .context("getting semver of materialize")
461 .map(|row| row.get::<_, String>(0))?
462 .parse::<semver::Version>()
463 .context("parsing semver of materialize")?;
464
465 inner_client
466 .batch_execute("ALTER SYSTEM RESET ALL")
467 .await
468 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
469
470 {
472 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
473 let enable_unsafe_functions = if semver >= rename_version {
474 "unsafe_enable_unsafe_functions"
475 } else {
476 "enable_unsafe_functions"
477 };
478 let res = inner_client
479 .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
480 .await
481 .context("enabling dangerous functions");
482 if let Err(e) = res {
483 match e.root_cause().downcast_ref::<DbError>() {
484 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
485 info!(
486 "can't enable unsafe functions because the server is safe mode; \
487 testdrive scripts will fail if they use unsafe functions",
488 );
489 }
490 _ => return Err(e),
491 }
492 }
493 }
494
495 for row in inner_client
496 .query("SHOW DATABASES", &[])
497 .await
498 .context("resetting materialize state: SHOW DATABASES")?
499 {
500 let db_name: String = row.get(0);
501 if db_name.starts_with("testdrive_no_reset_") {
502 continue;
503 }
504 let query = format!(
505 "DROP DATABASE {}",
506 postgres_protocol::escape::escape_identifier(&db_name)
507 );
508 sql::print_query(&query, None);
509 inner_client.batch_execute(&query).await.context(format!(
510 "resetting materialize state: DROP DATABASE {}",
511 db_name,
512 ))?;
513 }
514
515 let inactive_user_clusters = "
517 WITH
518 active_user_clusters AS
519 (
520 SELECT DISTINCT cluster_id, object_id
521 FROM
522 (
523 SELECT cluster_id, id FROM mz_catalog.mz_sources
524 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
525 UNION ALL
526 SELECT cluster_id, id
527 FROM mz_catalog.mz_materialized_views
528 UNION ALL
529 SELECT cluster_id, id FROM mz_catalog.mz_indexes
530 UNION ALL
531 SELECT cluster_id, id
532 FROM mz_internal.mz_subscriptions
533 )
534 AS t (cluster_id, object_id)
535 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
536 )
537 SELECT name
538 FROM mz_catalog.mz_clusters
539 WHERE
540 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
541 AND
542 owner_id LIKE 'u%';";
543
544 let inactive_clusters = inner_client
545 .query(inactive_user_clusters, &[])
546 .await
547 .context("resetting materialize state: inactive_user_clusters")?;
548
549 if !inactive_clusters.is_empty() {
550 println!("cleaning up user clusters from previous tests...")
551 }
552
553 for cluster_name in inactive_clusters {
554 let cluster_name: String = cluster_name.get(0);
555 if cluster_name.starts_with("testdrive_no_reset_") {
556 continue;
557 }
558 let query = format!(
559 "DROP CLUSTER {}",
560 postgres_protocol::escape::escape_identifier(&cluster_name)
561 );
562 sql::print_query(&query, None);
563 inner_client.batch_execute(&query).await.context(format!(
564 "resetting materialize state: DROP CLUSTER {}",
565 cluster_name,
566 ))?;
567 }
568
569 inner_client
570 .batch_execute("CREATE DATABASE materialize")
571 .await
572 .context("resetting materialize state: CREATE DATABASE materialize")?;
573
574 if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
578 for row in rows {
579 let role_name: String = row.get(0);
580 if role_name == self.materialize.user || role_name.starts_with("mz_") {
581 continue;
582 }
583 let query = format!(
584 "DROP ROLE {}",
585 postgres_protocol::escape::escape_identifier(&role_name)
586 );
587 sql::print_query(&query, None);
588 inner_client.batch_execute(&query).await.context(format!(
589 "resetting materialize state: DROP ROLE {}",
590 role_name,
591 ))?;
592 }
593 }
594
595 inner_client
597 .batch_execute(&format!(
598 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
599 self.materialize.user
600 ))
601 .await?;
602
603 inner_client
605 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
606 .await?;
607 inner_client
608 .batch_execute(&format!(
609 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
610 self.materialize.user
611 ))
612 .await?;
613 inner_client
614 .batch_execute(&format!(
615 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
616 self.materialize.user
617 ))
618 .await?;
619
620 let cluster = match version {
621 ..=8199 => "default",
622 8200.. => "quickstart",
623 };
624 inner_client
625 .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
626 .await?;
627 inner_client
628 .batch_execute(&format!(
629 "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
630 self.materialize.user
631 ))
632 .await?;
633
634 Ok(())
635 }
636
637 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
639 let mut errors: Vec<anyhow::Error> = Vec::new();
640
641 let metadata = self.kafka_producer.client().fetch_metadata(
642 None,
643 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
644 )?;
645
646 let testdrive_topics: Vec<_> = metadata
647 .topics()
648 .iter()
649 .filter_map(|t| {
650 if t.name().starts_with("testdrive-") {
651 Some(t.name())
652 } else {
653 None
654 }
655 })
656 .collect();
657
658 if !testdrive_topics.is_empty() {
659 match self
660 .kafka_admin
661 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
662 .await
663 {
664 Ok(res) => {
665 if res.len() != testdrive_topics.len() {
666 errors.push(anyhow!(
667 "kafka topic deletion returned {} results, but exactly {} expected",
668 res.len(),
669 testdrive_topics.len()
670 ));
671 }
672 for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
673 match res {
674 Ok(_)
675 | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
676 ()
677 }
678 Err((_, err)) => {
679 errors.push(anyhow!("unable to delete {}: {}", topic, err));
680 }
681 }
682 }
683 }
684 Err(e) => {
685 errors.push(e.into());
686 }
687 };
688 }
689
690 match self
691 .ccsr_client
692 .list_subjects()
693 .await
694 .context("listing schema registry subjects")
695 {
696 Ok(subjects) => {
697 let testdrive_subjects: Vec<_> = subjects
698 .iter()
699 .filter(|s| s.starts_with("testdrive-"))
700 .collect();
701
702 for subject in testdrive_subjects {
703 match self.ccsr_client.delete_subject(subject).await {
704 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
705 Err(e) => errors.push(e.into()),
706 }
707 }
708 }
709 Err(e) => {
710 errors.push(e);
711 }
712 }
713
714 if errors.is_empty() {
715 Ok(())
716 } else {
717 bail!(
718 "deleting Kafka topics: {} errors: {}",
719 errors.len(),
720 errors
721 .into_iter()
722 .map(|e| e.to_string_with_causes())
723 .join("\n")
724 );
725 }
726 }
727}
728
729#[derive(Debug, Clone)]
731pub struct CatalogConfig {
732 pub persist_consensus_url: SensitiveUrl,
734 pub persist_blob_url: SensitiveUrl,
736}
737
738pub enum ControlFlow {
739 Continue,
740 SkipBegin,
741 SkipEnd,
742}
743
744#[async_trait]
745pub(crate) trait Run {
746 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
747}
748
749#[async_trait]
750impl Run for PosCommand {
751 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
752 macro_rules! handle_version {
753 ($version_constraint:expr) => {
754 match $version_constraint {
755 Some(VersionConstraint { min, max }) => {
756 match version_check::run_version_check(min, max, state).await {
757 Ok(true) => return Ok(ControlFlow::Continue),
758 Ok(false) => {}
759 Err(err) => return Err(PosError::new(err, self.pos)),
760 }
761 }
762 None => {}
763 }
764 };
765 }
766
767 let wrap_err = |e| PosError::new(e, self.pos);
768 let ignore_prefix = match &self.command {
771 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
772 _ => None,
773 };
774 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
775 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
776 };
777 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
778 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
779 };
780
781 let r = match self.command {
782 Command::Builtin(mut builtin, version_constraint) => {
783 handle_version!(version_constraint);
784 for val in builtin.args.values_mut() {
785 *val = subst(val, &state.cmd_vars)?;
786 }
787 for line in &mut builtin.input {
788 *line = subst(line, &state.cmd_vars)?;
789 }
790 match builtin.name.as_ref() {
791 "check-consistency" => consistency::run_consistency_checks(state).await,
792 "skip-consistency-checks" => {
793 consistency::skip_consistency_checks(builtin, state)
794 }
795 "check-shard-tombstone" => {
796 consistency::run_check_shard_tombstone(builtin, state).await
797 }
798 "fivetran-destination" => {
799 fivetran::run_destination_command(builtin, state).await
800 }
801 "file-append" => file::run_append(builtin, state).await,
802 "file-delete" => file::run_delete(builtin, state).await,
803 "http-request" => http::run_request(builtin, state).await,
804 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
805 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
806 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
807 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
808 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
809 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
810 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
811 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
812 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
813 "mysql-connect" => mysql::run_connect(builtin, state).await,
814 "mysql-execute" => mysql::run_execute(builtin, state).await,
815 "nop" => nop::run_nop(),
816 "postgres-connect" => postgres::run_connect(builtin, state).await,
817 "postgres-execute" => postgres::run_execute(builtin, state).await,
818 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
819 "protobuf-compile-descriptors" => {
820 protobuf::run_compile_descriptors(builtin, state).await
821 }
822 "psql-execute" => psql::run_execute(builtin, state).await,
823 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
824 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
825 "s3-file-upload" => s3::run_upload(builtin, state).await,
826 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
827 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
828 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
829 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
830 "skip-if" => skip_if::run_skip_if(builtin, state).await,
831 "skip-end" => skip_end::run_skip_end(),
832 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
833 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
834 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
835 "persist-force-compaction" => {
836 persist::run_force_compaction(builtin, state).await
837 }
838 "random-sleep" => sleep::run_random_sleep(builtin),
839 "set-regex" => set::run_regex_set(builtin, state),
840 "unset-regex" => set::run_regex_unset(builtin, state),
841 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
842 "set-max-tries" => set::run_max_tries(builtin, state),
843 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
844 sleep::run_sleep(builtin)
845 }
846 "set" => set::set_vars(builtin, state),
847 "set-arg-default" => set::run_set_arg_default(builtin, state),
848 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
849 "set-from-file" => set::run_set_from_file(builtin, state).await,
850 "webhook-append" => webhook::run_append(builtin, state).await,
851 _ => {
852 return Err(PosError::new(
853 anyhow!("unknown built-in command {}", builtin.name),
854 self.pos,
855 ));
856 }
857 }
858 }
859 Command::Sql(mut sql, version_constraint) => {
860 handle_version!(version_constraint);
861 sql.query = subst(&sql.query, &state.cmd_vars)?;
862 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
863 for row in expected_rows {
864 for col in row {
865 *col = subst(col, &state.cmd_vars)?;
866 }
867 }
868 }
869 sql::run_sql(sql, state).await
870 }
871 Command::FailSql(mut sql, version_constraint) => {
872 handle_version!(version_constraint);
873 sql.query = subst(&sql.query, &state.cmd_vars)?;
874 sql.expected_error = match &sql.expected_error {
875 SqlExpectedError::Contains(s) => {
876 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
877 }
878 SqlExpectedError::Exact(s) => {
879 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
880 }
881 SqlExpectedError::Regex(s) => {
882 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
883 }
884 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
885 };
886 sql::run_fail_sql(sql, state).await
887 }
888 };
889
890 r.map_err(wrap_err)
891 }
892}
893
894fn substitute_vars(
896 msg: &str,
897 vars: &BTreeMap<String, String>,
898 ignore_prefix: &Option<String>,
899 regex_escape: bool,
900) -> Result<String, anyhow::Error> {
901 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
902 let mut err = None;
903 let out = RE.replace_all(msg, |caps: &Captures| {
904 let name = &caps[1];
905 if let Some(ignore_prefix) = &ignore_prefix {
906 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
907 return caps.get(0).unwrap().as_str().to_string();
909 }
910 }
911
912 if let Some(val) = vars.get(name) {
913 if regex_escape {
914 regex::escape(val)
915 } else {
916 val.to_string()
917 }
918 } else {
919 err = Some(anyhow!("unknown variable: {}", name));
920 "#VAR-MISSING#".to_string()
921 }
922 });
923 match err {
924 Some(err) => Err(err),
925 None => Ok(out.into_owned()),
926 }
927}
928
929pub async fn create_state(
937 config: &Config,
938) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
939 let seed = config.seed.unwrap_or_else(rand::random);
940
941 let (_tempfile, temp_path) = match &config.temp_dir {
942 Some(temp_dir) => {
943 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
944 (None, PathBuf::from(&temp_dir))
945 }
946 _ => {
947 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
950 let temp_path = tempfile_handle.path().to_path_buf();
951 (Some(tempfile_handle), temp_path)
952 }
953 };
954
955 let materialize_catalog_config = config.materialize_catalog_config.clone();
956
957 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
958 info!("Connecting to {}", materialize_url.as_str());
959 let (pgclient, pgconn) = Retry::default()
960 .max_duration(config.default_timeout)
961 .retry_async_canceling(|_| async move {
962 let mut pgconfig = config.materialize_pgconfig.clone();
963 pgconfig.connect_timeout(config.default_timeout);
964 let tls = make_tls(&pgconfig)?;
965 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
966 })
967 .await?;
968
969 let pgconn_task =
970 task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
971
972 let materialize_state =
973 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
974
975 let schema_registry_url = config.schema_registry_url.to_owned();
976
977 let ccsr_client = {
978 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
979
980 if let Some(cert_path) = &config.cert_path {
981 let cert = fs::read(cert_path).context("reading cert")?;
982 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
983 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
984 .context("reading keystore file as pkcs12")?;
985 ccsr_config = ccsr_config.identity(ident);
986 }
987
988 if let Some(ccsr_username) = &config.ccsr_username {
989 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
990 }
991
992 ccsr_config.build().context("Creating CCSR client")?
993 };
994
995 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
996 use rdkafka::admin::{AdminClient, AdminOptions};
997 use rdkafka::producer::FutureProducer;
998
999 let mut kafka_config = create_new_client_config_simple();
1000 kafka_config.set("bootstrap.servers", &config.kafka_addr);
1001 kafka_config.set("group.id", "materialize-testdrive");
1002 kafka_config.set("auto.offset.reset", "earliest");
1003 kafka_config.set("isolation.level", "read_committed");
1004 if let Some(cert_path) = &config.cert_path {
1005 kafka_config.set("security.protocol", "ssl");
1006 kafka_config.set("ssl.keystore.location", cert_path);
1007 if let Some(cert_password) = &config.cert_password {
1008 kafka_config.set("ssl.keystore.password", cert_password);
1009 }
1010 }
1011 kafka_config.set("message.max.bytes", "15728640");
1012
1013 for (key, value) in &config.kafka_opts {
1014 kafka_config.set(key, value);
1015 }
1016
1017 let admin: AdminClient<_> = kafka_config
1018 .create_with_context(MzClientContext::default())
1019 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1020
1021 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1022
1023 let producer: FutureProducer<_> = kafka_config
1024 .create_with_context(MzClientContext::default())
1025 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1026
1027 let topics = BTreeMap::new();
1028
1029 (
1030 config.kafka_addr.to_owned(),
1031 admin,
1032 admin_opts,
1033 producer,
1034 topics,
1035 kafka_config,
1036 )
1037 };
1038
1039 let mut state = State {
1040 arg_vars: config.arg_vars.clone(),
1042 cmd_vars: BTreeMap::new(),
1043 seed,
1044 temp_path,
1045 _tempfile,
1046 default_timeout: config.default_timeout,
1047 timeout: config.default_timeout,
1048 max_tries: config.default_max_tries,
1049 initial_backoff: config.initial_backoff,
1050 backoff_factor: config.backoff_factor,
1051 consistency_checks: config.consistency_checks,
1052 consistency_checks_adhoc_skip: false,
1053 regex: None,
1054 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1055 rewrite_results: config.rewrite_results,
1056 error_line_count: 0,
1057 error_string: "".to_string(),
1058
1059 materialize: materialize_state,
1061
1062 persist_consensus_url: config.persist_consensus_url.clone(),
1064 persist_blob_url: config.persist_blob_url.clone(),
1065 build_info: config.build_info,
1066 persist_clients: PersistClientCache::new(
1067 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1068 &MetricsRegistry::new(),
1069 |_, _| PubSubClientConnection::noop(),
1070 ),
1071
1072 schema_registry_url,
1074 ccsr_client,
1075 kafka_addr,
1076 kafka_admin,
1077 kafka_admin_opts,
1078 kafka_config,
1079 kafka_default_partitions: config.kafka_default_partitions,
1080 kafka_producer,
1081 kafka_topics,
1082
1083 aws_account: config.aws_account.clone(),
1085 aws_config: config.aws_config.clone(),
1086
1087 mysql_clients: BTreeMap::new(),
1089 postgres_clients: BTreeMap::new(),
1090 sql_server_clients: BTreeMap::new(),
1091
1092 fivetran_destination_url: config.fivetran_destination_url.clone(),
1094 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1095
1096 rewrites: Vec::new(),
1097 rewrite_pos_start: 0,
1098 rewrite_pos_end: 0,
1099 };
1100 state.initialize_cmd_vars().await?;
1101 Ok((state, pgconn_task))
1102}
1103
1104async fn create_materialize_state(
1105 config: &&Config,
1106 materialize_catalog_config: Option<CatalogConfig>,
1107 pgclient: tokio_postgres::Client,
1108) -> Result<MaterializeState, anyhow::Error> {
1109 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1110 let materialize_internal_url =
1111 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1112
1113 for (key, value) in &config.materialize_params {
1114 pgclient
1115 .batch_execute(&format!("SET {key} = {value}"))
1116 .await
1117 .context("setting session parameter")?;
1118 }
1119
1120 let materialize_user = config
1121 .materialize_pgconfig
1122 .get_user()
1123 .expect("testdrive URL must contain user")
1124 .to_string();
1125
1126 let materialize_sql_addr = format!(
1127 "{}:{}",
1128 materialize_url.host_str().unwrap(),
1129 materialize_url.port().unwrap()
1130 );
1131 let materialize_http_addr = format!(
1132 "{}:{}",
1133 materialize_url.host_str().unwrap(),
1134 config.materialize_http_port
1135 );
1136 let materialize_internal_sql_addr = format!(
1137 "{}:{}",
1138 materialize_internal_url.host_str().unwrap(),
1139 materialize_internal_url.port().unwrap()
1140 );
1141 let materialize_password_sql_addr = format!(
1142 "{}:{}",
1143 materialize_url.host_str().unwrap(),
1144 config.materialize_password_sql_port
1145 );
1146 let materialize_sasl_sql_addr = format!(
1147 "{}:{}",
1148 materialize_url.host_str().unwrap(),
1149 config.materialize_sasl_sql_port
1150 );
1151 let materialize_internal_http_addr = format!(
1152 "{}:{}",
1153 materialize_internal_url.host_str().unwrap(),
1154 config.materialize_internal_http_port
1155 );
1156 let environment_id = pgclient
1157 .query_one("SELECT mz_environment_id()", &[])
1158 .await?
1159 .get::<_, String>(0)
1160 .parse()
1161 .context("parsing environment ID")?;
1162
1163 let bootstrap_args = BootstrapArgs {
1164 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1165 default_cluster_replica_size: "ABC".to_string(),
1166 default_cluster_replication_factor: 1,
1167 bootstrap_role: None,
1168 };
1169
1170 let materialize_state = MaterializeState {
1171 catalog_config: materialize_catalog_config,
1172 sql_addr: materialize_sql_addr,
1173 use_https: config.materialize_use_https,
1174 http_addr: materialize_http_addr,
1175 internal_sql_addr: materialize_internal_sql_addr,
1176 internal_http_addr: materialize_internal_http_addr,
1177 password_sql_addr: materialize_password_sql_addr,
1178 sasl_sql_addr: materialize_sasl_sql_addr,
1179 user: materialize_user,
1180 pgclient,
1181 environment_id,
1182 bootstrap_args,
1183 };
1184
1185 Ok(materialize_state)
1186}