1use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rdkafka::ClientConfig;
43use rdkafka::producer::Producer;
44use regex::{Captures, Regex};
45use semver::Version;
46use tokio_postgres::error::{DbError, SqlState};
47use tracing::info;
48use url::Url;
49
50use crate::error::PosError;
51use crate::parser::{
52 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
53};
54use crate::util;
55use crate::util::postgres::postgres_client;
56
57pub mod consistency;
58
59mod duckdb;
60mod file;
61mod fivetran;
62mod http;
63mod kafka;
64mod mysql;
65mod nop;
66mod persist;
67mod postgres;
68mod protobuf;
69mod psql;
70mod s3;
71mod schema_registry;
72mod set;
73mod skip_end;
74mod skip_if;
75mod sleep;
76mod sql;
77mod sql_server;
78mod version_check;
79mod webhook;
80
81#[derive(Debug, Clone)]
83pub struct Config {
84 pub arg_vars: BTreeMap<String, String>,
90 pub seed: Option<u32>,
92 pub reset: bool,
95 pub temp_dir: Option<String>,
100 pub source: Option<String>,
102 pub default_timeout: Duration,
104 pub default_max_tries: usize,
106 pub initial_backoff: Duration,
110 pub backoff_factor: f64,
114 pub consistency_checks: consistency::Level,
116 pub check_statement_logging: bool,
119 pub rewrite_results: bool,
121
122 pub materialize_pgconfig: tokio_postgres::Config,
126 pub materialize_internal_pgconfig: tokio_postgres::Config,
129 pub materialize_use_https: bool,
131 pub materialize_http_port: u16,
134 pub materialize_internal_http_port: u16,
137 pub materialize_password_sql_port: u16,
140 pub materialize_sasl_sql_port: u16,
143 pub materialize_params: Vec<(String, String)>,
145 pub materialize_catalog_config: Option<CatalogConfig>,
147 pub build_info: &'static BuildInfo,
149 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
151
152 pub persist_consensus_url: Option<SensitiveUrl>,
155 pub persist_blob_url: Option<SensitiveUrl>,
157
158 pub kafka_addr: String,
161 pub kafka_default_partitions: usize,
163 pub kafka_opts: Vec<(String, String)>,
166 pub schema_registry_url: Url,
168 pub cert_path: Option<String>,
173 pub cert_password: Option<String>,
175 pub ccsr_username: Option<String>,
178 pub ccsr_password: Option<String>,
181
182 pub aws_config: SdkConfig,
185 pub aws_account: String,
187
188 pub fivetran_destination_url: String,
191 pub fivetran_destination_files_path: String,
193}
194
195pub struct MaterializeState {
196 catalog_config: Option<CatalogConfig>,
197
198 sql_addr: String,
199 use_https: bool,
200 http_addr: String,
201 internal_sql_addr: String,
202 internal_http_addr: String,
203 password_sql_addr: String,
204 sasl_sql_addr: String,
205 user: String,
206 pgclient: tokio_postgres::Client,
207 environment_id: EnvironmentId,
208 bootstrap_args: BootstrapArgs,
209}
210
211pub struct State {
212 pub config: Config,
214
215 arg_vars: BTreeMap<String, String>,
217 cmd_vars: BTreeMap<String, String>,
218 seed: u32,
219 temp_path: PathBuf,
220 _tempfile: Option<tempfile::TempDir>,
221 default_timeout: Duration,
222 timeout: Duration,
223 max_tries: usize,
224 initial_backoff: Duration,
225 backoff_factor: f64,
226 consistency_checks: consistency::Level,
227 check_statement_logging: bool,
228 consistency_checks_adhoc_skip: bool,
229 regex: Option<Regex>,
230 regex_replacement: String,
231 error_line_count: usize,
232 error_string: String,
233
234 materialize: MaterializeState,
236
237 persist_consensus_url: Option<SensitiveUrl>,
239 persist_blob_url: Option<SensitiveUrl>,
240 build_info: &'static BuildInfo,
241 persist_clients: PersistClientCache,
242
243 schema_registry_url: Url,
245 ccsr_client: mz_ccsr::Client,
246 kafka_addr: String,
247 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
248 kafka_admin_opts: rdkafka::admin::AdminOptions,
249 kafka_config: ClientConfig,
250 kafka_default_partitions: usize,
251 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
252 kafka_topics: BTreeMap<String, usize>,
253
254 aws_account: String,
256 aws_config: SdkConfig,
257
258 pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
260 mysql_clients: BTreeMap<String, mysql_async::Conn>,
261 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
262 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
263
264 fivetran_destination_url: String,
266 fivetran_destination_files_path: String,
267
268 rewrite_results: bool,
270 pub rewrites: Vec<Rewrite>,
272 pub rewrite_pos_start: usize,
274 pub rewrite_pos_end: usize,
276}
277
278pub struct Rewrite {
279 pub content: String,
280 pub start: usize,
281 pub end: usize,
282}
283
284impl State {
285 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
286 self.cmd_vars
287 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
288 self.cmd_vars.insert(
289 "testdrive.kafka-addr-resolved".into(),
290 self.kafka_addr
291 .to_socket_addrs()
292 .ok()
293 .and_then(|mut addrs| addrs.next())
294 .map(|addr| addr.to_string())
295 .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
296 );
297 self.cmd_vars.insert(
298 "testdrive.schema-registry-url".into(),
299 self.schema_registry_url.to_string(),
300 );
301 self.cmd_vars
302 .insert("testdrive.seed".into(), self.seed.to_string());
303 self.cmd_vars.insert(
304 "testdrive.temp-dir".into(),
305 self.temp_path.display().to_string(),
306 );
307 self.cmd_vars
308 .insert("testdrive.aws-region".into(), self.aws_region().into());
309 self.cmd_vars
310 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
311 self.cmd_vars
312 .insert("testdrive.aws-account".into(), self.aws_account.clone());
313 {
314 let aws_credentials = self
315 .aws_config
316 .credentials_provider()
317 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
318 .provide_credentials()
319 .await
320 .context("fetching AWS credentials")?;
321 self.cmd_vars.insert(
322 "testdrive.aws-access-key-id".into(),
323 aws_credentials.access_key_id().to_owned(),
324 );
325 self.cmd_vars.insert(
326 "testdrive.aws-secret-access-key".into(),
327 aws_credentials.secret_access_key().to_owned(),
328 );
329 self.cmd_vars.insert(
330 "testdrive.aws-token".into(),
331 aws_credentials
332 .session_token()
333 .map(|token| token.to_owned())
334 .unwrap_or_else(String::new),
335 );
336 }
337 self.cmd_vars.insert(
338 "testdrive.materialize-environment-id".into(),
339 self.materialize.environment_id.to_string(),
340 );
341 self.cmd_vars.insert(
342 "testdrive.materialize-sql-addr".into(),
343 self.materialize.sql_addr.clone(),
344 );
345 self.cmd_vars.insert(
346 "testdrive.materialize-internal-sql-addr".into(),
347 self.materialize.internal_sql_addr.clone(),
348 );
349 self.cmd_vars.insert(
350 "testdrive.materialize-password-sql-addr".into(),
351 self.materialize.password_sql_addr.clone(),
352 );
353 self.cmd_vars.insert(
354 "testdrive.materialize-sasl-sql-addr".into(),
355 self.materialize.sasl_sql_addr.clone(),
356 );
357 self.cmd_vars.insert(
358 "testdrive.materialize-user".into(),
359 self.materialize.user.clone(),
360 );
361 self.cmd_vars.insert(
362 "testdrive.fivetran-destination-url".into(),
363 self.fivetran_destination_url.clone(),
364 );
365 self.cmd_vars.insert(
366 "testdrive.fivetran-destination-files-path".into(),
367 self.fivetran_destination_files_path.clone(),
368 );
369
370 for (key, value) in env::vars() {
371 self.cmd_vars.insert(format!("env.{}", key), value);
372 }
373
374 for (key, value) in &self.arg_vars {
375 validate_ident(key)?;
376 self.cmd_vars
377 .insert(format!("arg.{}", key), value.to_string());
378 }
379
380 Ok(())
381 }
382 pub async fn with_catalog_copy<F, T>(
385 &self,
386 system_parameter_defaults: BTreeMap<String, String>,
387 build_info: &'static BuildInfo,
388 bootstrap_args: &BootstrapArgs,
389 enable_expression_cache_override: Option<bool>,
390 f: F,
391 ) -> Result<Option<T>, anyhow::Error>
392 where
393 F: FnOnce(ConnCatalog) -> T,
394 {
395 async fn persist_client(
396 persist_consensus_url: SensitiveUrl,
397 persist_blob_url: SensitiveUrl,
398 persist_clients: &PersistClientCache,
399 ) -> Result<PersistClient, anyhow::Error> {
400 let persist_location = PersistLocation {
401 blob_uri: persist_blob_url,
402 consensus_uri: persist_consensus_url,
403 };
404 Ok(persist_clients.open(persist_location).await?)
405 }
406
407 if let Some(CatalogConfig {
408 persist_consensus_url,
409 persist_blob_url,
410 }) = &self.materialize.catalog_config
411 {
412 let persist_client = persist_client(
413 persist_consensus_url.clone(),
414 persist_blob_url.clone(),
415 &self.persist_clients,
416 )
417 .await?;
418 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
419 persist_client,
420 SYSTEM_TIME.clone(),
421 self.materialize.environment_id.clone(),
422 system_parameter_defaults,
423 build_info,
424 bootstrap_args,
425 enable_expression_cache_override,
426 )
427 .await?;
428 let res = f(catalog.for_session(&Session::dummy()));
429 catalog.expire().await;
430 Ok(Some(res))
431 } else {
432 Ok(None)
433 }
434 }
435
436 pub fn aws_endpoint(&self) -> &str {
437 self.aws_config.endpoint_url().unwrap_or("")
438 }
439
440 pub fn aws_region(&self) -> &str {
441 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
442 }
443
444 pub fn clear_skip_consistency_checks(&mut self) -> bool {
447 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
448 }
449
450 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
451 let (inner_client, _) = postgres_client(
452 &format!(
453 "postgres://mz_system:materialize@{}",
454 self.materialize.internal_sql_addr
455 ),
456 self.default_timeout,
457 )
458 .await?;
459
460 let version = inner_client
461 .query_one("SELECT mz_version_num()", &[])
462 .await
463 .context("getting version of materialize")
464 .map(|row| row.get::<_, i32>(0))?;
465
466 let semver = inner_client
467 .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
468 .await
469 .context("getting semver of materialize")
470 .map(|row| row.get::<_, String>(0))?
471 .parse::<semver::Version>()
472 .context("parsing semver of materialize")?;
473
474 inner_client
475 .batch_execute("ALTER SYSTEM RESET ALL")
476 .await
477 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
478
479 {
481 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
482 let enable_unsafe_functions = if semver >= rename_version {
483 "unsafe_enable_unsafe_functions"
484 } else {
485 "enable_unsafe_functions"
486 };
487 let res = inner_client
488 .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
489 .await
490 .context("enabling dangerous functions");
491 if let Err(e) = res {
492 match e.root_cause().downcast_ref::<DbError>() {
493 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
494 info!(
495 "can't enable unsafe functions because the server is safe mode; \
496 testdrive scripts will fail if they use unsafe functions",
497 );
498 }
499 _ => return Err(e),
500 }
501 }
502 }
503
504 for row in inner_client
505 .query("SHOW DATABASES", &[])
506 .await
507 .context("resetting materialize state: SHOW DATABASES")?
508 {
509 let db_name: String = row.get(0);
510 if db_name.starts_with("testdrive_no_reset_") {
511 continue;
512 }
513 let query = format!(
514 "DROP DATABASE {}",
515 postgres_protocol::escape::escape_identifier(&db_name)
516 );
517 sql::print_query(&query, None);
518 inner_client.batch_execute(&query).await.context(format!(
519 "resetting materialize state: DROP DATABASE {}",
520 db_name,
521 ))?;
522 }
523
524 let inactive_user_clusters = "
526 WITH
527 active_user_clusters AS
528 (
529 SELECT DISTINCT cluster_id, object_id
530 FROM
531 (
532 SELECT cluster_id, id FROM mz_catalog.mz_sources
533 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
534 UNION ALL
535 SELECT cluster_id, id
536 FROM mz_catalog.mz_materialized_views
537 UNION ALL
538 SELECT cluster_id, id FROM mz_catalog.mz_indexes
539 UNION ALL
540 SELECT cluster_id, id
541 FROM mz_internal.mz_subscriptions
542 )
543 AS t (cluster_id, object_id)
544 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
545 )
546 SELECT name
547 FROM mz_catalog.mz_clusters
548 WHERE
549 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
550 AND
551 owner_id LIKE 'u%';";
552
553 let inactive_clusters = inner_client
554 .query(inactive_user_clusters, &[])
555 .await
556 .context("resetting materialize state: inactive_user_clusters")?;
557
558 if !inactive_clusters.is_empty() {
559 println!("cleaning up user clusters from previous tests...")
560 }
561
562 for cluster_name in inactive_clusters {
563 let cluster_name: String = cluster_name.get(0);
564 if cluster_name.starts_with("testdrive_no_reset_") {
565 continue;
566 }
567 let query = format!(
568 "DROP CLUSTER {}",
569 postgres_protocol::escape::escape_identifier(&cluster_name)
570 );
571 sql::print_query(&query, None);
572 inner_client.batch_execute(&query).await.context(format!(
573 "resetting materialize state: DROP CLUSTER {}",
574 cluster_name,
575 ))?;
576 }
577
578 inner_client
579 .batch_execute("CREATE DATABASE materialize")
580 .await
581 .context("resetting materialize state: CREATE DATABASE materialize")?;
582
583 if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
587 for row in rows {
588 let role_name: String = row.get(0);
589 if role_name == self.materialize.user || role_name.starts_with("mz_") {
590 continue;
591 }
592 let query = format!(
593 "DROP ROLE {}",
594 postgres_protocol::escape::escape_identifier(&role_name)
595 );
596 sql::print_query(&query, None);
597 inner_client.batch_execute(&query).await.context(format!(
598 "resetting materialize state: DROP ROLE {}",
599 role_name,
600 ))?;
601 }
602 }
603
604 inner_client
606 .batch_execute(&format!(
607 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
608 self.materialize.user
609 ))
610 .await?;
611
612 inner_client
614 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
615 .await?;
616 inner_client
617 .batch_execute(&format!(
618 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
619 self.materialize.user
620 ))
621 .await?;
622 inner_client
623 .batch_execute(&format!(
624 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
625 self.materialize.user
626 ))
627 .await?;
628
629 let cluster = match version {
630 ..=8199 => "default",
631 8200.. => "quickstart",
632 };
633 inner_client
634 .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
635 .await?;
636 inner_client
637 .batch_execute(&format!(
638 "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
639 self.materialize.user
640 ))
641 .await?;
642
643 Ok(())
644 }
645
646 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
648 let mut errors: Vec<anyhow::Error> = Vec::new();
649
650 let metadata = self.kafka_producer.client().fetch_metadata(
651 None,
652 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
653 )?;
654
655 let testdrive_topics: Vec<_> = metadata
656 .topics()
657 .iter()
658 .filter_map(|t| {
659 if t.name().starts_with("testdrive-") {
660 Some(t.name())
661 } else {
662 None
663 }
664 })
665 .collect();
666
667 if !testdrive_topics.is_empty() {
668 match self
669 .kafka_admin
670 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
671 .await
672 {
673 Ok(res) => {
674 if res.len() != testdrive_topics.len() {
675 errors.push(anyhow!(
676 "kafka topic deletion returned {} results, but exactly {} expected",
677 res.len(),
678 testdrive_topics.len()
679 ));
680 }
681 for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
682 match res {
683 Ok(_)
684 | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
685 ()
686 }
687 Err((_, err)) => {
688 errors.push(anyhow!("unable to delete {}: {}", topic, err));
689 }
690 }
691 }
692 }
693 Err(e) => {
694 errors.push(e.into());
695 }
696 };
697 }
698
699 match self
700 .ccsr_client
701 .list_subjects()
702 .await
703 .context("listing schema registry subjects")
704 {
705 Ok(subjects) => {
706 let testdrive_subjects: Vec<_> = subjects
707 .iter()
708 .filter(|s| s.starts_with("testdrive-"))
709 .collect();
710
711 for subject in testdrive_subjects {
712 match self.ccsr_client.delete_subject(subject).await {
713 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
714 Err(e) => errors.push(e.into()),
715 }
716 }
717 }
718 Err(e) => {
719 errors.push(e);
720 }
721 }
722
723 if errors.is_empty() {
724 Ok(())
725 } else {
726 bail!(
727 "deleting Kafka topics: {} errors: {}",
728 errors.len(),
729 errors
730 .into_iter()
731 .map(|e| e.to_string_with_causes())
732 .join("\n")
733 );
734 }
735 }
736}
737
738#[derive(Debug, Clone)]
740pub struct CatalogConfig {
741 pub persist_consensus_url: SensitiveUrl,
743 pub persist_blob_url: SensitiveUrl,
745}
746
747pub enum ControlFlow {
748 Continue,
749 SkipBegin,
750 SkipEnd,
751}
752
753#[async_trait]
754pub(crate) trait Run {
755 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
756}
757
758#[async_trait]
759impl Run for PosCommand {
760 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
761 macro_rules! handle_version {
762 ($version_constraint:expr) => {
763 match $version_constraint {
764 Some(VersionConstraint { min, max }) => {
765 match version_check::run_version_check(min, max, state).await {
766 Ok(true) => return Ok(ControlFlow::Continue),
767 Ok(false) => {}
768 Err(err) => return Err(PosError::new(err, self.pos)),
769 }
770 }
771 None => {}
772 }
773 };
774 }
775
776 let wrap_err = |e| PosError::new(e, self.pos);
777 let ignore_prefix = match &self.command {
780 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
781 _ => None,
782 };
783 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
784 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
785 };
786 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
787 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
788 };
789
790 let r = match self.command {
791 Command::Builtin(mut builtin, version_constraint) => {
792 handle_version!(version_constraint);
793 for val in builtin.args.values_mut() {
794 *val = subst(val, &state.cmd_vars)?;
795 }
796 for line in &mut builtin.input {
797 *line = subst(line, &state.cmd_vars)?;
798 }
799 match builtin.name.as_ref() {
800 "check-consistency" => consistency::run_consistency_checks(state).await,
801 "skip-consistency-checks" => {
802 consistency::skip_consistency_checks(builtin, state)
803 }
804 "check-shard-tombstone" => {
805 consistency::run_check_shard_tombstone(builtin, state).await
806 }
807 "duckdb-execute" => duckdb::run_execute(builtin, state).await,
808 "duckdb-query" => duckdb::run_query(builtin, state).await,
809 "fivetran-destination" => {
810 fivetran::run_destination_command(builtin, state).await
811 }
812 "file-append" => file::run_append(builtin, state).await,
813 "file-delete" => file::run_delete(builtin, state).await,
814 "http-request" => http::run_request(builtin, state).await,
815 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
816 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
817 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
818 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
819 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
820 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
821 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
822 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
823 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
824 "mysql-connect" => mysql::run_connect(builtin, state).await,
825 "mysql-execute" => mysql::run_execute(builtin, state).await,
826 "nop" => nop::run_nop(),
827 "postgres-connect" => postgres::run_connect(builtin, state).await,
828 "postgres-execute" => postgres::run_execute(builtin, state).await,
829 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
830 "protobuf-compile-descriptors" => {
831 protobuf::run_compile_descriptors(builtin, state).await
832 }
833 "psql-execute" => psql::run_execute(builtin, state).await,
834 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
835 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
836 "s3-file-upload" => s3::run_upload(builtin, state).await,
837 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
838 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
839 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
840 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
841 "skip-if" => skip_if::run_skip_if(builtin, state).await,
842 "skip-end" => skip_end::run_skip_end(),
843 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
844 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
845 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
846 "persist-force-compaction" => {
847 persist::run_force_compaction(builtin, state).await
848 }
849 "random-sleep" => sleep::run_random_sleep(builtin),
850 "set-regex" => set::run_regex_set(builtin, state),
851 "unset-regex" => set::run_regex_unset(builtin, state),
852 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
853 "set-max-tries" => set::run_max_tries(builtin, state),
854 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
855 sleep::run_sleep(builtin)
856 }
857 "set" => set::set_vars(builtin, state),
858 "set-arg-default" => set::run_set_arg_default(builtin, state),
859 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
860 "set-from-file" => set::run_set_from_file(builtin, state).await,
861 "webhook-append" => webhook::run_append(builtin, state).await,
862 _ => {
863 return Err(PosError::new(
864 anyhow!("unknown built-in command {}", builtin.name),
865 self.pos,
866 ));
867 }
868 }
869 }
870 Command::Sql(mut sql, version_constraint) => {
871 handle_version!(version_constraint);
872 sql.query = subst(&sql.query, &state.cmd_vars)?;
873 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
874 for row in expected_rows {
875 for col in row {
876 *col = subst(col, &state.cmd_vars)?;
877 }
878 }
879 }
880 sql::run_sql(sql, state).await
881 }
882 Command::FailSql(mut sql, version_constraint) => {
883 handle_version!(version_constraint);
884 sql.query = subst(&sql.query, &state.cmd_vars)?;
885 sql.expected_error = match &sql.expected_error {
886 SqlExpectedError::Contains(s) => {
887 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
888 }
889 SqlExpectedError::Exact(s) => {
890 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
891 }
892 SqlExpectedError::Regex(s) => {
893 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
894 }
895 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
896 };
897 sql::run_fail_sql(sql, state).await
898 }
899 };
900
901 r.map_err(wrap_err)
902 }
903}
904
905fn substitute_vars(
907 msg: &str,
908 vars: &BTreeMap<String, String>,
909 ignore_prefix: &Option<String>,
910 regex_escape: bool,
911) -> Result<String, anyhow::Error> {
912 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
913 let mut err = None;
914 let out = RE.replace_all(msg, |caps: &Captures| {
915 let name = &caps[1];
916 if let Some(ignore_prefix) = &ignore_prefix {
917 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
918 return caps.get(0).unwrap().as_str().to_string();
920 }
921 }
922
923 if let Some(val) = vars.get(name) {
924 if regex_escape {
925 regex::escape(val)
926 } else {
927 val.to_string()
928 }
929 } else {
930 err = Some(anyhow!("unknown variable: {}", name));
931 "#VAR-MISSING#".to_string()
932 }
933 });
934 match err {
935 Some(err) => Err(err),
936 None => Ok(out.into_owned()),
937 }
938}
939
940pub async fn create_state(
948 config: &Config,
949) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
950 let seed = config.seed.unwrap_or_else(rand::random);
951
952 let (_tempfile, temp_path) = match &config.temp_dir {
953 Some(temp_dir) => {
954 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
955 (None, PathBuf::from(&temp_dir))
956 }
957 _ => {
958 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
961 let temp_path = tempfile_handle.path().to_path_buf();
962 (Some(tempfile_handle), temp_path)
963 }
964 };
965
966 let materialize_catalog_config = config.materialize_catalog_config.clone();
967
968 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
969 info!("Connecting to {}", materialize_url.as_str());
970 let (pgclient, pgconn) = Retry::default()
971 .max_duration(config.default_timeout)
972 .retry_async_canceling(|_| async move {
973 let mut pgconfig = config.materialize_pgconfig.clone();
974 pgconfig.connect_timeout(config.default_timeout);
975 let tls = make_tls(&pgconfig)?;
976 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
977 })
978 .await?;
979
980 let pgconn_task =
981 task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
982
983 let materialize_state =
984 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
985
986 let schema_registry_url = config.schema_registry_url.to_owned();
987
988 let ccsr_client = {
989 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
990
991 if let Some(cert_path) = &config.cert_path {
992 let cert = fs::read(cert_path).context("reading cert")?;
993 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
994 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
995 .context("reading keystore file as pkcs12")?;
996 ccsr_config = ccsr_config.identity(ident);
997 }
998
999 if let Some(ccsr_username) = &config.ccsr_username {
1000 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1001 }
1002
1003 ccsr_config.build().context("Creating CCSR client")?
1004 };
1005
1006 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1007 use rdkafka::admin::{AdminClient, AdminOptions};
1008 use rdkafka::producer::FutureProducer;
1009
1010 let mut kafka_config = create_new_client_config_simple();
1011 kafka_config.set("bootstrap.servers", &config.kafka_addr);
1012 kafka_config.set("group.id", "materialize-testdrive");
1013 kafka_config.set("auto.offset.reset", "earliest");
1014 kafka_config.set("isolation.level", "read_committed");
1015 if let Some(cert_path) = &config.cert_path {
1016 kafka_config.set("security.protocol", "ssl");
1017 kafka_config.set("ssl.keystore.location", cert_path);
1018 if let Some(cert_password) = &config.cert_password {
1019 kafka_config.set("ssl.keystore.password", cert_password);
1020 }
1021 }
1022 kafka_config.set("message.max.bytes", "15728640");
1023
1024 for (key, value) in &config.kafka_opts {
1025 kafka_config.set(key, value);
1026 }
1027
1028 let admin: AdminClient<_> = kafka_config
1029 .create_with_context(MzClientContext::default())
1030 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1031
1032 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1033
1034 let producer: FutureProducer<_> = kafka_config
1035 .create_with_context(MzClientContext::default())
1036 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1037
1038 let topics = BTreeMap::new();
1039
1040 (
1041 config.kafka_addr.to_owned(),
1042 admin,
1043 admin_opts,
1044 producer,
1045 topics,
1046 kafka_config,
1047 )
1048 };
1049
1050 let mut state = State {
1051 config: config.clone(),
1052
1053 arg_vars: config.arg_vars.clone(),
1055 cmd_vars: BTreeMap::new(),
1056 seed,
1057 temp_path,
1058 _tempfile,
1059 default_timeout: config.default_timeout,
1060 timeout: config.default_timeout,
1061 max_tries: config.default_max_tries,
1062 initial_backoff: config.initial_backoff,
1063 backoff_factor: config.backoff_factor,
1064 consistency_checks: config.consistency_checks,
1065 check_statement_logging: config.check_statement_logging,
1066 consistency_checks_adhoc_skip: false,
1067 regex: None,
1068 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1069 rewrite_results: config.rewrite_results,
1070 error_line_count: 0,
1071 error_string: "".to_string(),
1072
1073 materialize: materialize_state,
1075
1076 persist_consensus_url: config.persist_consensus_url.clone(),
1078 persist_blob_url: config.persist_blob_url.clone(),
1079 build_info: config.build_info,
1080 persist_clients: PersistClientCache::new(
1081 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1082 &MetricsRegistry::new(),
1083 |_, _| PubSubClientConnection::noop(),
1084 ),
1085
1086 schema_registry_url,
1088 ccsr_client,
1089 kafka_addr,
1090 kafka_admin,
1091 kafka_admin_opts,
1092 kafka_config,
1093 kafka_default_partitions: config.kafka_default_partitions,
1094 kafka_producer,
1095 kafka_topics,
1096
1097 aws_account: config.aws_account.clone(),
1099 aws_config: config.aws_config.clone(),
1100
1101 duckdb_clients: BTreeMap::new(),
1103 mysql_clients: BTreeMap::new(),
1104 postgres_clients: BTreeMap::new(),
1105 sql_server_clients: BTreeMap::new(),
1106
1107 fivetran_destination_url: config.fivetran_destination_url.clone(),
1109 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1110
1111 rewrites: Vec::new(),
1112 rewrite_pos_start: 0,
1113 rewrite_pos_end: 0,
1114 };
1115 state.initialize_cmd_vars().await?;
1116 Ok((state, pgconn_task))
1117}
1118
1119async fn create_materialize_state(
1120 config: &&Config,
1121 materialize_catalog_config: Option<CatalogConfig>,
1122 pgclient: tokio_postgres::Client,
1123) -> Result<MaterializeState, anyhow::Error> {
1124 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1125 let materialize_internal_url =
1126 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1127
1128 for (key, value) in &config.materialize_params {
1129 pgclient
1130 .batch_execute(&format!("SET {key} = {value}"))
1131 .await
1132 .context("setting session parameter")?;
1133 }
1134
1135 let materialize_user = config
1136 .materialize_pgconfig
1137 .get_user()
1138 .expect("testdrive URL must contain user")
1139 .to_string();
1140
1141 let materialize_sql_addr = format!(
1142 "{}:{}",
1143 materialize_url.host_str().unwrap(),
1144 materialize_url.port().unwrap()
1145 );
1146 let materialize_http_addr = format!(
1147 "{}:{}",
1148 materialize_url.host_str().unwrap(),
1149 config.materialize_http_port
1150 );
1151 let materialize_internal_sql_addr = format!(
1152 "{}:{}",
1153 materialize_internal_url.host_str().unwrap(),
1154 materialize_internal_url.port().unwrap()
1155 );
1156 let materialize_password_sql_addr = format!(
1157 "{}:{}",
1158 materialize_url.host_str().unwrap(),
1159 config.materialize_password_sql_port
1160 );
1161 let materialize_sasl_sql_addr = format!(
1162 "{}:{}",
1163 materialize_url.host_str().unwrap(),
1164 config.materialize_sasl_sql_port
1165 );
1166 let materialize_internal_http_addr = format!(
1167 "{}:{}",
1168 materialize_internal_url.host_str().unwrap(),
1169 config.materialize_internal_http_port
1170 );
1171 let environment_id = pgclient
1172 .query_one("SELECT mz_environment_id()", &[])
1173 .await?
1174 .get::<_, String>(0)
1175 .parse()
1176 .context("parsing environment ID")?;
1177
1178 let bootstrap_args = BootstrapArgs {
1179 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1180 default_cluster_replica_size: "ABC".to_string(),
1181 default_cluster_replication_factor: 1,
1182 bootstrap_role: None,
1183 };
1184
1185 let materialize_state = MaterializeState {
1186 catalog_config: materialize_catalog_config,
1187 sql_addr: materialize_sql_addr,
1188 use_https: config.materialize_use_https,
1189 http_addr: materialize_http_addr,
1190 internal_sql_addr: materialize_internal_sql_addr,
1191 internal_http_addr: materialize_internal_http_addr,
1192 password_sql_addr: materialize_password_sql_addr,
1193 sasl_sql_addr: materialize_sasl_sql_addr,
1194 user: materialize_user,
1195 pgclient,
1196 environment_id,
1197 bootstrap_args,
1198 };
1199
1200 Ok(materialize_state)
1201}