1use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rdkafka::ClientConfig;
43use rdkafka::producer::Producer;
44use regex::{Captures, Regex};
45use semver::Version;
46use tokio_postgres::error::{DbError, SqlState};
47use tracing::info;
48use url::Url;
49
50use crate::error::PosError;
51use crate::parser::{
52 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
53};
54use crate::util;
55use crate::util::postgres::postgres_client;
56
57pub mod consistency;
58
59mod file;
60mod fivetran;
61mod http;
62mod kafka;
63mod mysql;
64mod nop;
65mod persist;
66mod postgres;
67mod protobuf;
68mod psql;
69mod s3;
70mod schema_registry;
71mod set;
72mod skip_end;
73mod skip_if;
74mod sleep;
75mod sql;
76mod sql_server;
77mod version_check;
78mod webhook;
79
80#[derive(Debug, Clone)]
82pub struct Config {
83 pub arg_vars: BTreeMap<String, String>,
89 pub seed: Option<u32>,
91 pub reset: bool,
94 pub temp_dir: Option<String>,
99 pub source: Option<String>,
101 pub default_timeout: Duration,
103 pub default_max_tries: usize,
105 pub initial_backoff: Duration,
109 pub backoff_factor: f64,
113 pub consistency_checks: consistency::Level,
115 pub check_statement_logging: bool,
118 pub rewrite_results: bool,
120
121 pub materialize_pgconfig: tokio_postgres::Config,
125 pub materialize_internal_pgconfig: tokio_postgres::Config,
128 pub materialize_use_https: bool,
130 pub materialize_http_port: u16,
133 pub materialize_internal_http_port: u16,
136 pub materialize_password_sql_port: u16,
139 pub materialize_sasl_sql_port: u16,
142 pub materialize_params: Vec<(String, String)>,
144 pub materialize_catalog_config: Option<CatalogConfig>,
146 pub build_info: &'static BuildInfo,
148 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
150
151 pub persist_consensus_url: Option<SensitiveUrl>,
154 pub persist_blob_url: Option<SensitiveUrl>,
156
157 pub kafka_addr: String,
160 pub kafka_default_partitions: usize,
162 pub kafka_opts: Vec<(String, String)>,
165 pub schema_registry_url: Url,
167 pub cert_path: Option<String>,
172 pub cert_password: Option<String>,
174 pub ccsr_username: Option<String>,
177 pub ccsr_password: Option<String>,
180
181 pub aws_config: SdkConfig,
184 pub aws_account: String,
186
187 pub fivetran_destination_url: String,
190 pub fivetran_destination_files_path: String,
192}
193
194pub struct MaterializeState {
195 catalog_config: Option<CatalogConfig>,
196
197 sql_addr: String,
198 use_https: bool,
199 http_addr: String,
200 internal_sql_addr: String,
201 internal_http_addr: String,
202 password_sql_addr: String,
203 sasl_sql_addr: String,
204 user: String,
205 pgclient: tokio_postgres::Client,
206 environment_id: EnvironmentId,
207 bootstrap_args: BootstrapArgs,
208}
209
210pub struct State {
211 pub config: Config,
213
214 arg_vars: BTreeMap<String, String>,
216 cmd_vars: BTreeMap<String, String>,
217 seed: u32,
218 temp_path: PathBuf,
219 _tempfile: Option<tempfile::TempDir>,
220 default_timeout: Duration,
221 timeout: Duration,
222 max_tries: usize,
223 initial_backoff: Duration,
224 backoff_factor: f64,
225 consistency_checks: consistency::Level,
226 check_statement_logging: bool,
227 consistency_checks_adhoc_skip: bool,
228 regex: Option<Regex>,
229 regex_replacement: String,
230 error_line_count: usize,
231 error_string: String,
232
233 materialize: MaterializeState,
235
236 persist_consensus_url: Option<SensitiveUrl>,
238 persist_blob_url: Option<SensitiveUrl>,
239 build_info: &'static BuildInfo,
240 persist_clients: PersistClientCache,
241
242 schema_registry_url: Url,
244 ccsr_client: mz_ccsr::Client,
245 kafka_addr: String,
246 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
247 kafka_admin_opts: rdkafka::admin::AdminOptions,
248 kafka_config: ClientConfig,
249 kafka_default_partitions: usize,
250 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
251 kafka_topics: BTreeMap<String, usize>,
252
253 aws_account: String,
255 aws_config: SdkConfig,
256
257 mysql_clients: BTreeMap<String, mysql_async::Conn>,
259 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
260 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
261
262 fivetran_destination_url: String,
264 fivetran_destination_files_path: String,
265
266 rewrite_results: bool,
268 pub rewrites: Vec<Rewrite>,
270 pub rewrite_pos_start: usize,
272 pub rewrite_pos_end: usize,
274}
275
276pub struct Rewrite {
277 pub content: String,
278 pub start: usize,
279 pub end: usize,
280}
281
282impl State {
283 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
284 self.cmd_vars
285 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
286 self.cmd_vars.insert(
287 "testdrive.kafka-addr-resolved".into(),
288 self.kafka_addr
289 .to_socket_addrs()
290 .ok()
291 .and_then(|mut addrs| addrs.next())
292 .map(|addr| addr.to_string())
293 .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
294 );
295 self.cmd_vars.insert(
296 "testdrive.schema-registry-url".into(),
297 self.schema_registry_url.to_string(),
298 );
299 self.cmd_vars
300 .insert("testdrive.seed".into(), self.seed.to_string());
301 self.cmd_vars.insert(
302 "testdrive.temp-dir".into(),
303 self.temp_path.display().to_string(),
304 );
305 self.cmd_vars
306 .insert("testdrive.aws-region".into(), self.aws_region().into());
307 self.cmd_vars
308 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
309 self.cmd_vars
310 .insert("testdrive.aws-account".into(), self.aws_account.clone());
311 {
312 let aws_credentials = self
313 .aws_config
314 .credentials_provider()
315 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
316 .provide_credentials()
317 .await
318 .context("fetching AWS credentials")?;
319 self.cmd_vars.insert(
320 "testdrive.aws-access-key-id".into(),
321 aws_credentials.access_key_id().to_owned(),
322 );
323 self.cmd_vars.insert(
324 "testdrive.aws-secret-access-key".into(),
325 aws_credentials.secret_access_key().to_owned(),
326 );
327 self.cmd_vars.insert(
328 "testdrive.aws-token".into(),
329 aws_credentials
330 .session_token()
331 .map(|token| token.to_owned())
332 .unwrap_or_else(String::new),
333 );
334 }
335 self.cmd_vars.insert(
336 "testdrive.materialize-environment-id".into(),
337 self.materialize.environment_id.to_string(),
338 );
339 self.cmd_vars.insert(
340 "testdrive.materialize-sql-addr".into(),
341 self.materialize.sql_addr.clone(),
342 );
343 self.cmd_vars.insert(
344 "testdrive.materialize-internal-sql-addr".into(),
345 self.materialize.internal_sql_addr.clone(),
346 );
347 self.cmd_vars.insert(
348 "testdrive.materialize-password-sql-addr".into(),
349 self.materialize.password_sql_addr.clone(),
350 );
351 self.cmd_vars.insert(
352 "testdrive.materialize-sasl-sql-addr".into(),
353 self.materialize.sasl_sql_addr.clone(),
354 );
355 self.cmd_vars.insert(
356 "testdrive.materialize-user".into(),
357 self.materialize.user.clone(),
358 );
359 self.cmd_vars.insert(
360 "testdrive.fivetran-destination-url".into(),
361 self.fivetran_destination_url.clone(),
362 );
363 self.cmd_vars.insert(
364 "testdrive.fivetran-destination-files-path".into(),
365 self.fivetran_destination_files_path.clone(),
366 );
367
368 for (key, value) in env::vars() {
369 self.cmd_vars.insert(format!("env.{}", key), value);
370 }
371
372 for (key, value) in &self.arg_vars {
373 validate_ident(key)?;
374 self.cmd_vars
375 .insert(format!("arg.{}", key), value.to_string());
376 }
377
378 Ok(())
379 }
380 pub async fn with_catalog_copy<F, T>(
383 &self,
384 system_parameter_defaults: BTreeMap<String, String>,
385 build_info: &'static BuildInfo,
386 bootstrap_args: &BootstrapArgs,
387 enable_expression_cache_override: Option<bool>,
388 f: F,
389 ) -> Result<Option<T>, anyhow::Error>
390 where
391 F: FnOnce(ConnCatalog) -> T,
392 {
393 async fn persist_client(
394 persist_consensus_url: SensitiveUrl,
395 persist_blob_url: SensitiveUrl,
396 persist_clients: &PersistClientCache,
397 ) -> Result<PersistClient, anyhow::Error> {
398 let persist_location = PersistLocation {
399 blob_uri: persist_blob_url,
400 consensus_uri: persist_consensus_url,
401 };
402 Ok(persist_clients.open(persist_location).await?)
403 }
404
405 if let Some(CatalogConfig {
406 persist_consensus_url,
407 persist_blob_url,
408 }) = &self.materialize.catalog_config
409 {
410 let persist_client = persist_client(
411 persist_consensus_url.clone(),
412 persist_blob_url.clone(),
413 &self.persist_clients,
414 )
415 .await?;
416 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
417 persist_client,
418 SYSTEM_TIME.clone(),
419 self.materialize.environment_id.clone(),
420 system_parameter_defaults,
421 build_info,
422 bootstrap_args,
423 enable_expression_cache_override,
424 )
425 .await?;
426 let res = f(catalog.for_session(&Session::dummy()));
427 catalog.expire().await;
428 Ok(Some(res))
429 } else {
430 Ok(None)
431 }
432 }
433
434 pub fn aws_endpoint(&self) -> &str {
435 self.aws_config.endpoint_url().unwrap_or("")
436 }
437
438 pub fn aws_region(&self) -> &str {
439 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
440 }
441
442 pub fn clear_skip_consistency_checks(&mut self) -> bool {
445 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
446 }
447
448 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
449 let (inner_client, _) = postgres_client(
450 &format!(
451 "postgres://mz_system:materialize@{}",
452 self.materialize.internal_sql_addr
453 ),
454 self.default_timeout,
455 )
456 .await?;
457
458 let version = inner_client
459 .query_one("SELECT mz_version_num()", &[])
460 .await
461 .context("getting version of materialize")
462 .map(|row| row.get::<_, i32>(0))?;
463
464 let semver = inner_client
465 .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
466 .await
467 .context("getting semver of materialize")
468 .map(|row| row.get::<_, String>(0))?
469 .parse::<semver::Version>()
470 .context("parsing semver of materialize")?;
471
472 inner_client
473 .batch_execute("ALTER SYSTEM RESET ALL")
474 .await
475 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
476
477 {
479 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
480 let enable_unsafe_functions = if semver >= rename_version {
481 "unsafe_enable_unsafe_functions"
482 } else {
483 "enable_unsafe_functions"
484 };
485 let res = inner_client
486 .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
487 .await
488 .context("enabling dangerous functions");
489 if let Err(e) = res {
490 match e.root_cause().downcast_ref::<DbError>() {
491 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
492 info!(
493 "can't enable unsafe functions because the server is safe mode; \
494 testdrive scripts will fail if they use unsafe functions",
495 );
496 }
497 _ => return Err(e),
498 }
499 }
500 }
501
502 for row in inner_client
503 .query("SHOW DATABASES", &[])
504 .await
505 .context("resetting materialize state: SHOW DATABASES")?
506 {
507 let db_name: String = row.get(0);
508 if db_name.starts_with("testdrive_no_reset_") {
509 continue;
510 }
511 let query = format!(
512 "DROP DATABASE {}",
513 postgres_protocol::escape::escape_identifier(&db_name)
514 );
515 sql::print_query(&query, None);
516 inner_client.batch_execute(&query).await.context(format!(
517 "resetting materialize state: DROP DATABASE {}",
518 db_name,
519 ))?;
520 }
521
522 let inactive_user_clusters = "
524 WITH
525 active_user_clusters AS
526 (
527 SELECT DISTINCT cluster_id, object_id
528 FROM
529 (
530 SELECT cluster_id, id FROM mz_catalog.mz_sources
531 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
532 UNION ALL
533 SELECT cluster_id, id
534 FROM mz_catalog.mz_materialized_views
535 UNION ALL
536 SELECT cluster_id, id FROM mz_catalog.mz_indexes
537 UNION ALL
538 SELECT cluster_id, id
539 FROM mz_internal.mz_subscriptions
540 )
541 AS t (cluster_id, object_id)
542 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
543 )
544 SELECT name
545 FROM mz_catalog.mz_clusters
546 WHERE
547 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
548 AND
549 owner_id LIKE 'u%';";
550
551 let inactive_clusters = inner_client
552 .query(inactive_user_clusters, &[])
553 .await
554 .context("resetting materialize state: inactive_user_clusters")?;
555
556 if !inactive_clusters.is_empty() {
557 println!("cleaning up user clusters from previous tests...")
558 }
559
560 for cluster_name in inactive_clusters {
561 let cluster_name: String = cluster_name.get(0);
562 if cluster_name.starts_with("testdrive_no_reset_") {
563 continue;
564 }
565 let query = format!(
566 "DROP CLUSTER {}",
567 postgres_protocol::escape::escape_identifier(&cluster_name)
568 );
569 sql::print_query(&query, None);
570 inner_client.batch_execute(&query).await.context(format!(
571 "resetting materialize state: DROP CLUSTER {}",
572 cluster_name,
573 ))?;
574 }
575
576 inner_client
577 .batch_execute("CREATE DATABASE materialize")
578 .await
579 .context("resetting materialize state: CREATE DATABASE materialize")?;
580
581 if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
585 for row in rows {
586 let role_name: String = row.get(0);
587 if role_name == self.materialize.user || role_name.starts_with("mz_") {
588 continue;
589 }
590 let query = format!(
591 "DROP ROLE {}",
592 postgres_protocol::escape::escape_identifier(&role_name)
593 );
594 sql::print_query(&query, None);
595 inner_client.batch_execute(&query).await.context(format!(
596 "resetting materialize state: DROP ROLE {}",
597 role_name,
598 ))?;
599 }
600 }
601
602 inner_client
604 .batch_execute(&format!(
605 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
606 self.materialize.user
607 ))
608 .await?;
609
610 inner_client
612 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
613 .await?;
614 inner_client
615 .batch_execute(&format!(
616 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
617 self.materialize.user
618 ))
619 .await?;
620 inner_client
621 .batch_execute(&format!(
622 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
623 self.materialize.user
624 ))
625 .await?;
626
627 let cluster = match version {
628 ..=8199 => "default",
629 8200.. => "quickstart",
630 };
631 inner_client
632 .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
633 .await?;
634 inner_client
635 .batch_execute(&format!(
636 "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
637 self.materialize.user
638 ))
639 .await?;
640
641 Ok(())
642 }
643
644 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
646 let mut errors: Vec<anyhow::Error> = Vec::new();
647
648 let metadata = self.kafka_producer.client().fetch_metadata(
649 None,
650 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
651 )?;
652
653 let testdrive_topics: Vec<_> = metadata
654 .topics()
655 .iter()
656 .filter_map(|t| {
657 if t.name().starts_with("testdrive-") {
658 Some(t.name())
659 } else {
660 None
661 }
662 })
663 .collect();
664
665 if !testdrive_topics.is_empty() {
666 match self
667 .kafka_admin
668 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
669 .await
670 {
671 Ok(res) => {
672 if res.len() != testdrive_topics.len() {
673 errors.push(anyhow!(
674 "kafka topic deletion returned {} results, but exactly {} expected",
675 res.len(),
676 testdrive_topics.len()
677 ));
678 }
679 for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
680 match res {
681 Ok(_)
682 | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
683 ()
684 }
685 Err((_, err)) => {
686 errors.push(anyhow!("unable to delete {}: {}", topic, err));
687 }
688 }
689 }
690 }
691 Err(e) => {
692 errors.push(e.into());
693 }
694 };
695 }
696
697 match self
698 .ccsr_client
699 .list_subjects()
700 .await
701 .context("listing schema registry subjects")
702 {
703 Ok(subjects) => {
704 let testdrive_subjects: Vec<_> = subjects
705 .iter()
706 .filter(|s| s.starts_with("testdrive-"))
707 .collect();
708
709 for subject in testdrive_subjects {
710 match self.ccsr_client.delete_subject(subject).await {
711 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
712 Err(e) => errors.push(e.into()),
713 }
714 }
715 }
716 Err(e) => {
717 errors.push(e);
718 }
719 }
720
721 if errors.is_empty() {
722 Ok(())
723 } else {
724 bail!(
725 "deleting Kafka topics: {} errors: {}",
726 errors.len(),
727 errors
728 .into_iter()
729 .map(|e| e.to_string_with_causes())
730 .join("\n")
731 );
732 }
733 }
734}
735
736#[derive(Debug, Clone)]
738pub struct CatalogConfig {
739 pub persist_consensus_url: SensitiveUrl,
741 pub persist_blob_url: SensitiveUrl,
743}
744
745pub enum ControlFlow {
746 Continue,
747 SkipBegin,
748 SkipEnd,
749}
750
751#[async_trait]
752pub(crate) trait Run {
753 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
754}
755
756#[async_trait]
757impl Run for PosCommand {
758 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
759 macro_rules! handle_version {
760 ($version_constraint:expr) => {
761 match $version_constraint {
762 Some(VersionConstraint { min, max }) => {
763 match version_check::run_version_check(min, max, state).await {
764 Ok(true) => return Ok(ControlFlow::Continue),
765 Ok(false) => {}
766 Err(err) => return Err(PosError::new(err, self.pos)),
767 }
768 }
769 None => {}
770 }
771 };
772 }
773
774 let wrap_err = |e| PosError::new(e, self.pos);
775 let ignore_prefix = match &self.command {
778 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
779 _ => None,
780 };
781 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
782 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
783 };
784 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
785 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
786 };
787
788 let r = match self.command {
789 Command::Builtin(mut builtin, version_constraint) => {
790 handle_version!(version_constraint);
791 for val in builtin.args.values_mut() {
792 *val = subst(val, &state.cmd_vars)?;
793 }
794 for line in &mut builtin.input {
795 *line = subst(line, &state.cmd_vars)?;
796 }
797 match builtin.name.as_ref() {
798 "check-consistency" => consistency::run_consistency_checks(state).await,
799 "skip-consistency-checks" => {
800 consistency::skip_consistency_checks(builtin, state)
801 }
802 "check-shard-tombstone" => {
803 consistency::run_check_shard_tombstone(builtin, state).await
804 }
805 "fivetran-destination" => {
806 fivetran::run_destination_command(builtin, state).await
807 }
808 "file-append" => file::run_append(builtin, state).await,
809 "file-delete" => file::run_delete(builtin, state).await,
810 "http-request" => http::run_request(builtin, state).await,
811 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
812 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
813 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
814 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
815 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
816 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
817 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
818 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
819 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
820 "mysql-connect" => mysql::run_connect(builtin, state).await,
821 "mysql-execute" => mysql::run_execute(builtin, state).await,
822 "nop" => nop::run_nop(),
823 "postgres-connect" => postgres::run_connect(builtin, state).await,
824 "postgres-execute" => postgres::run_execute(builtin, state).await,
825 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
826 "protobuf-compile-descriptors" => {
827 protobuf::run_compile_descriptors(builtin, state).await
828 }
829 "psql-execute" => psql::run_execute(builtin, state).await,
830 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
831 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
832 "s3-file-upload" => s3::run_upload(builtin, state).await,
833 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
834 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
835 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
836 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
837 "skip-if" => skip_if::run_skip_if(builtin, state).await,
838 "skip-end" => skip_end::run_skip_end(),
839 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
840 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
841 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
842 "persist-force-compaction" => {
843 persist::run_force_compaction(builtin, state).await
844 }
845 "random-sleep" => sleep::run_random_sleep(builtin),
846 "set-regex" => set::run_regex_set(builtin, state),
847 "unset-regex" => set::run_regex_unset(builtin, state),
848 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
849 "set-max-tries" => set::run_max_tries(builtin, state),
850 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
851 sleep::run_sleep(builtin)
852 }
853 "set" => set::set_vars(builtin, state),
854 "set-arg-default" => set::run_set_arg_default(builtin, state),
855 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
856 "set-from-file" => set::run_set_from_file(builtin, state).await,
857 "webhook-append" => webhook::run_append(builtin, state).await,
858 _ => {
859 return Err(PosError::new(
860 anyhow!("unknown built-in command {}", builtin.name),
861 self.pos,
862 ));
863 }
864 }
865 }
866 Command::Sql(mut sql, version_constraint) => {
867 handle_version!(version_constraint);
868 sql.query = subst(&sql.query, &state.cmd_vars)?;
869 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
870 for row in expected_rows {
871 for col in row {
872 *col = subst(col, &state.cmd_vars)?;
873 }
874 }
875 }
876 sql::run_sql(sql, state).await
877 }
878 Command::FailSql(mut sql, version_constraint) => {
879 handle_version!(version_constraint);
880 sql.query = subst(&sql.query, &state.cmd_vars)?;
881 sql.expected_error = match &sql.expected_error {
882 SqlExpectedError::Contains(s) => {
883 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
884 }
885 SqlExpectedError::Exact(s) => {
886 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
887 }
888 SqlExpectedError::Regex(s) => {
889 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
890 }
891 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
892 };
893 sql::run_fail_sql(sql, state).await
894 }
895 };
896
897 r.map_err(wrap_err)
898 }
899}
900
901fn substitute_vars(
903 msg: &str,
904 vars: &BTreeMap<String, String>,
905 ignore_prefix: &Option<String>,
906 regex_escape: bool,
907) -> Result<String, anyhow::Error> {
908 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
909 let mut err = None;
910 let out = RE.replace_all(msg, |caps: &Captures| {
911 let name = &caps[1];
912 if let Some(ignore_prefix) = &ignore_prefix {
913 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
914 return caps.get(0).unwrap().as_str().to_string();
916 }
917 }
918
919 if let Some(val) = vars.get(name) {
920 if regex_escape {
921 regex::escape(val)
922 } else {
923 val.to_string()
924 }
925 } else {
926 err = Some(anyhow!("unknown variable: {}", name));
927 "#VAR-MISSING#".to_string()
928 }
929 });
930 match err {
931 Some(err) => Err(err),
932 None => Ok(out.into_owned()),
933 }
934}
935
936pub async fn create_state(
944 config: &Config,
945) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
946 let seed = config.seed.unwrap_or_else(rand::random);
947
948 let (_tempfile, temp_path) = match &config.temp_dir {
949 Some(temp_dir) => {
950 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
951 (None, PathBuf::from(&temp_dir))
952 }
953 _ => {
954 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
957 let temp_path = tempfile_handle.path().to_path_buf();
958 (Some(tempfile_handle), temp_path)
959 }
960 };
961
962 let materialize_catalog_config = config.materialize_catalog_config.clone();
963
964 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
965 info!("Connecting to {}", materialize_url.as_str());
966 let (pgclient, pgconn) = Retry::default()
967 .max_duration(config.default_timeout)
968 .retry_async_canceling(|_| async move {
969 let mut pgconfig = config.materialize_pgconfig.clone();
970 pgconfig.connect_timeout(config.default_timeout);
971 let tls = make_tls(&pgconfig)?;
972 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
973 })
974 .await?;
975
976 let pgconn_task =
977 task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
978
979 let materialize_state =
980 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
981
982 let schema_registry_url = config.schema_registry_url.to_owned();
983
984 let ccsr_client = {
985 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
986
987 if let Some(cert_path) = &config.cert_path {
988 let cert = fs::read(cert_path).context("reading cert")?;
989 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
990 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
991 .context("reading keystore file as pkcs12")?;
992 ccsr_config = ccsr_config.identity(ident);
993 }
994
995 if let Some(ccsr_username) = &config.ccsr_username {
996 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
997 }
998
999 ccsr_config.build().context("Creating CCSR client")?
1000 };
1001
1002 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1003 use rdkafka::admin::{AdminClient, AdminOptions};
1004 use rdkafka::producer::FutureProducer;
1005
1006 let mut kafka_config = create_new_client_config_simple();
1007 kafka_config.set("bootstrap.servers", &config.kafka_addr);
1008 kafka_config.set("group.id", "materialize-testdrive");
1009 kafka_config.set("auto.offset.reset", "earliest");
1010 kafka_config.set("isolation.level", "read_committed");
1011 if let Some(cert_path) = &config.cert_path {
1012 kafka_config.set("security.protocol", "ssl");
1013 kafka_config.set("ssl.keystore.location", cert_path);
1014 if let Some(cert_password) = &config.cert_password {
1015 kafka_config.set("ssl.keystore.password", cert_password);
1016 }
1017 }
1018 kafka_config.set("message.max.bytes", "15728640");
1019
1020 for (key, value) in &config.kafka_opts {
1021 kafka_config.set(key, value);
1022 }
1023
1024 let admin: AdminClient<_> = kafka_config
1025 .create_with_context(MzClientContext::default())
1026 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1027
1028 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1029
1030 let producer: FutureProducer<_> = kafka_config
1031 .create_with_context(MzClientContext::default())
1032 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1033
1034 let topics = BTreeMap::new();
1035
1036 (
1037 config.kafka_addr.to_owned(),
1038 admin,
1039 admin_opts,
1040 producer,
1041 topics,
1042 kafka_config,
1043 )
1044 };
1045
1046 let mut state = State {
1047 config: config.clone(),
1048
1049 arg_vars: config.arg_vars.clone(),
1051 cmd_vars: BTreeMap::new(),
1052 seed,
1053 temp_path,
1054 _tempfile,
1055 default_timeout: config.default_timeout,
1056 timeout: config.default_timeout,
1057 max_tries: config.default_max_tries,
1058 initial_backoff: config.initial_backoff,
1059 backoff_factor: config.backoff_factor,
1060 consistency_checks: config.consistency_checks,
1061 check_statement_logging: config.check_statement_logging,
1062 consistency_checks_adhoc_skip: false,
1063 regex: None,
1064 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1065 rewrite_results: config.rewrite_results,
1066 error_line_count: 0,
1067 error_string: "".to_string(),
1068
1069 materialize: materialize_state,
1071
1072 persist_consensus_url: config.persist_consensus_url.clone(),
1074 persist_blob_url: config.persist_blob_url.clone(),
1075 build_info: config.build_info,
1076 persist_clients: PersistClientCache::new(
1077 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1078 &MetricsRegistry::new(),
1079 |_, _| PubSubClientConnection::noop(),
1080 ),
1081
1082 schema_registry_url,
1084 ccsr_client,
1085 kafka_addr,
1086 kafka_admin,
1087 kafka_admin_opts,
1088 kafka_config,
1089 kafka_default_partitions: config.kafka_default_partitions,
1090 kafka_producer,
1091 kafka_topics,
1092
1093 aws_account: config.aws_account.clone(),
1095 aws_config: config.aws_config.clone(),
1096
1097 mysql_clients: BTreeMap::new(),
1099 postgres_clients: BTreeMap::new(),
1100 sql_server_clients: BTreeMap::new(),
1101
1102 fivetran_destination_url: config.fivetran_destination_url.clone(),
1104 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1105
1106 rewrites: Vec::new(),
1107 rewrite_pos_start: 0,
1108 rewrite_pos_end: 0,
1109 };
1110 state.initialize_cmd_vars().await?;
1111 Ok((state, pgconn_task))
1112}
1113
1114async fn create_materialize_state(
1115 config: &&Config,
1116 materialize_catalog_config: Option<CatalogConfig>,
1117 pgclient: tokio_postgres::Client,
1118) -> Result<MaterializeState, anyhow::Error> {
1119 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1120 let materialize_internal_url =
1121 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1122
1123 for (key, value) in &config.materialize_params {
1124 pgclient
1125 .batch_execute(&format!("SET {key} = {value}"))
1126 .await
1127 .context("setting session parameter")?;
1128 }
1129
1130 let materialize_user = config
1131 .materialize_pgconfig
1132 .get_user()
1133 .expect("testdrive URL must contain user")
1134 .to_string();
1135
1136 let materialize_sql_addr = format!(
1137 "{}:{}",
1138 materialize_url.host_str().unwrap(),
1139 materialize_url.port().unwrap()
1140 );
1141 let materialize_http_addr = format!(
1142 "{}:{}",
1143 materialize_url.host_str().unwrap(),
1144 config.materialize_http_port
1145 );
1146 let materialize_internal_sql_addr = format!(
1147 "{}:{}",
1148 materialize_internal_url.host_str().unwrap(),
1149 materialize_internal_url.port().unwrap()
1150 );
1151 let materialize_password_sql_addr = format!(
1152 "{}:{}",
1153 materialize_url.host_str().unwrap(),
1154 config.materialize_password_sql_port
1155 );
1156 let materialize_sasl_sql_addr = format!(
1157 "{}:{}",
1158 materialize_url.host_str().unwrap(),
1159 config.materialize_sasl_sql_port
1160 );
1161 let materialize_internal_http_addr = format!(
1162 "{}:{}",
1163 materialize_internal_url.host_str().unwrap(),
1164 config.materialize_internal_http_port
1165 );
1166 let environment_id = pgclient
1167 .query_one("SELECT mz_environment_id()", &[])
1168 .await?
1169 .get::<_, String>(0)
1170 .parse()
1171 .context("parsing environment ID")?;
1172
1173 let bootstrap_args = BootstrapArgs {
1174 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1175 default_cluster_replica_size: "ABC".to_string(),
1176 default_cluster_replication_factor: 1,
1177 bootstrap_role: None,
1178 };
1179
1180 let materialize_state = MaterializeState {
1181 catalog_config: materialize_catalog_config,
1182 sql_addr: materialize_sql_addr,
1183 use_https: config.materialize_use_https,
1184 http_addr: materialize_http_addr,
1185 internal_sql_addr: materialize_internal_sql_addr,
1186 internal_http_addr: materialize_internal_http_addr,
1187 password_sql_addr: materialize_password_sql_addr,
1188 sasl_sql_addr: materialize_sasl_sql_addr,
1189 user: materialize_user,
1190 pgclient,
1191 environment_id,
1192 bootstrap_args,
1193 };
1194
1195 Ok(materialize_state)
1196}