1use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_ccsr::SubjectVersion;
30use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
31use mz_ore::error::ErrorExt;
32use mz_ore::metrics::MetricsRegistry;
33use mz_ore::now::SYSTEM_TIME;
34use mz_ore::retry::Retry;
35use mz_ore::task;
36use mz_ore::url::SensitiveUrl;
37use mz_persist_client::cache::PersistClientCache;
38use mz_persist_client::cfg::PersistConfig;
39use mz_persist_client::rpc::PubSubClientConnection;
40use mz_persist_client::{PersistClient, PersistLocation};
41use mz_sql::catalog::EnvironmentId;
42use mz_tls_util::make_tls;
43use rdkafka::ClientConfig;
44use rdkafka::producer::Producer;
45use regex::{Captures, Regex};
46use semver::Version;
47use tokio_postgres::error::{DbError, SqlState};
48use tracing::info;
49use url::Url;
50
51use crate::error::PosError;
52use crate::parser::{
53 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
54};
55use crate::util;
56use crate::util::postgres::postgres_client;
57
58pub mod consistency;
59
60mod duckdb;
61mod file;
62mod fivetran;
63mod http;
64mod kafka;
65mod mysql;
66mod nop;
67mod persist;
68mod postgres;
69mod protobuf;
70mod psql;
71mod s3;
72mod schema_registry;
73mod set;
74mod skip_end;
75mod skip_if;
76mod sleep;
77mod sql;
78mod sql_server;
79mod version_check;
80mod webhook;
81
82#[derive(Debug, Clone)]
84pub struct Config {
85 pub arg_vars: BTreeMap<String, String>,
91 pub seed: Option<String>,
93 pub reset: bool,
96 pub temp_dir: Option<String>,
101 pub source: Option<String>,
103 pub default_timeout: Duration,
105 pub default_max_tries: usize,
107 pub initial_backoff: Duration,
111 pub backoff_factor: f64,
115 pub consistency_checks: consistency::Level,
117 pub check_statement_logging: bool,
120 pub rewrite_results: bool,
122
123 pub materialize_pgconfig: tokio_postgres::Config,
127 pub materialize_internal_pgconfig: tokio_postgres::Config,
130 pub materialize_use_https: bool,
132 pub materialize_http_port: u16,
135 pub materialize_internal_http_port: u16,
138 pub materialize_password_sql_port: u16,
141 pub materialize_sasl_sql_port: u16,
144 pub materialize_params: Vec<(String, String)>,
146 pub materialize_catalog_config: Option<CatalogConfig>,
148 pub build_info: &'static BuildInfo,
150 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
152
153 pub persist_consensus_url: Option<SensitiveUrl>,
156 pub persist_blob_url: Option<SensitiveUrl>,
158
159 pub kafka_addr: String,
162 pub kafka_default_partitions: usize,
164 pub kafka_opts: Vec<(String, String)>,
167 pub schema_registry_url: Url,
169 pub cert_path: Option<String>,
174 pub cert_password: Option<String>,
176 pub ccsr_username: Option<String>,
179 pub ccsr_password: Option<String>,
182
183 pub aws_config: SdkConfig,
186 pub aws_account: String,
188
189 pub fivetran_destination_url: String,
192 pub fivetran_destination_files_path: String,
194}
195
196pub struct MaterializeState {
197 catalog_config: Option<CatalogConfig>,
198
199 sql_addr: String,
200 use_https: bool,
201 http_addr: String,
202 internal_sql_addr: String,
203 internal_http_addr: String,
204 password_sql_addr: String,
205 sasl_sql_addr: String,
206 user: String,
207 pgclient: tokio_postgres::Client,
208 environment_id: EnvironmentId,
209 bootstrap_args: BootstrapArgs,
210}
211
212pub struct State {
213 pub config: Config,
215
216 arg_vars: BTreeMap<String, String>,
218 cmd_vars: BTreeMap<String, String>,
219 seed: String,
220 temp_path: PathBuf,
221 _tempfile: Option<tempfile::TempDir>,
222 default_timeout: Duration,
223 timeout: Duration,
224 max_tries: usize,
225 initial_backoff: Duration,
226 backoff_factor: f64,
227 consistency_checks: consistency::Level,
228 check_statement_logging: bool,
229 consistency_checks_adhoc_skip: bool,
230 regex: Option<Regex>,
231 regex_replacement: String,
232 error_line_count: usize,
233 error_string: String,
234
235 materialize: MaterializeState,
237
238 persist_consensus_url: Option<SensitiveUrl>,
240 persist_blob_url: Option<SensitiveUrl>,
241 build_info: &'static BuildInfo,
242 persist_clients: PersistClientCache,
243
244 schema_registry_url: Url,
246 ccsr_client: mz_ccsr::Client,
247 kafka_addr: String,
248 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
249 kafka_admin_opts: rdkafka::admin::AdminOptions,
250 kafka_config: ClientConfig,
251 kafka_default_partitions: usize,
252 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
253 kafka_topics: BTreeMap<String, usize>,
254
255 aws_account: String,
257 aws_config: SdkConfig,
258
259 pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
261 mysql_clients: BTreeMap<String, mysql_async::Conn>,
262 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
263 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
264
265 fivetran_destination_url: String,
267 fivetran_destination_files_path: String,
268
269 rewrite_results: bool,
271 pub rewrites: Vec<Rewrite>,
273 pub rewrite_pos_start: usize,
275 pub rewrite_pos_end: usize,
277}
278
279pub struct Rewrite {
280 pub content: String,
281 pub start: usize,
282 pub end: usize,
283}
284
285impl State {
286 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
287 self.cmd_vars
288 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
289 self.cmd_vars.insert(
290 "testdrive.kafka-addr-resolved".into(),
291 self.kafka_addr
292 .to_socket_addrs()
293 .ok()
294 .and_then(|mut addrs| addrs.next())
295 .map(|addr| addr.to_string())
296 .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
297 );
298 self.cmd_vars.insert(
299 "testdrive.schema-registry-url".into(),
300 self.schema_registry_url.to_string(),
301 );
302 self.cmd_vars
303 .insert("testdrive.seed".into(), self.seed.clone());
304 self.cmd_vars.insert(
305 "testdrive.temp-dir".into(),
306 self.temp_path.display().to_string(),
307 );
308 self.cmd_vars
309 .insert("testdrive.aws-region".into(), self.aws_region().into());
310 self.cmd_vars
311 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
312 self.cmd_vars
313 .insert("testdrive.aws-account".into(), self.aws_account.clone());
314 {
315 let aws_credentials = self
316 .aws_config
317 .credentials_provider()
318 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
319 .provide_credentials()
320 .await
321 .context("fetching AWS credentials")?;
322 self.cmd_vars.insert(
323 "testdrive.aws-access-key-id".into(),
324 aws_credentials.access_key_id().to_owned(),
325 );
326 self.cmd_vars.insert(
327 "testdrive.aws-secret-access-key".into(),
328 aws_credentials.secret_access_key().to_owned(),
329 );
330 self.cmd_vars.insert(
331 "testdrive.aws-token".into(),
332 aws_credentials
333 .session_token()
334 .map(|token| token.to_owned())
335 .unwrap_or_else(String::new),
336 );
337 }
338 self.cmd_vars.insert(
339 "testdrive.materialize-environment-id".into(),
340 self.materialize.environment_id.to_string(),
341 );
342 self.cmd_vars.insert(
343 "testdrive.materialize-sql-addr".into(),
344 self.materialize.sql_addr.clone(),
345 );
346 self.cmd_vars.insert(
347 "testdrive.materialize-internal-sql-addr".into(),
348 self.materialize.internal_sql_addr.clone(),
349 );
350 self.cmd_vars.insert(
351 "testdrive.materialize-password-sql-addr".into(),
352 self.materialize.password_sql_addr.clone(),
353 );
354 self.cmd_vars.insert(
355 "testdrive.materialize-sasl-sql-addr".into(),
356 self.materialize.sasl_sql_addr.clone(),
357 );
358 self.cmd_vars.insert(
359 "testdrive.materialize-user".into(),
360 self.materialize.user.clone(),
361 );
362 self.cmd_vars.insert(
363 "testdrive.fivetran-destination-url".into(),
364 self.fivetran_destination_url.clone(),
365 );
366 self.cmd_vars.insert(
367 "testdrive.fivetran-destination-files-path".into(),
368 self.fivetran_destination_files_path.clone(),
369 );
370
371 for (key, value) in env::vars() {
372 self.cmd_vars.insert(format!("env.{}", key), value);
373 }
374
375 for (key, value) in &self.arg_vars {
376 validate_ident(key)?;
377 self.cmd_vars
378 .insert(format!("arg.{}", key), value.to_string());
379 }
380
381 Ok(())
382 }
383 pub async fn with_catalog_copy<F, T>(
386 &self,
387 system_parameter_defaults: BTreeMap<String, String>,
388 build_info: &'static BuildInfo,
389 bootstrap_args: &BootstrapArgs,
390 enable_expression_cache_override: Option<bool>,
391 f: F,
392 ) -> Result<Option<T>, anyhow::Error>
393 where
394 F: FnOnce(ConnCatalog) -> T,
395 {
396 async fn persist_client(
397 persist_consensus_url: SensitiveUrl,
398 persist_blob_url: SensitiveUrl,
399 persist_clients: &PersistClientCache,
400 ) -> Result<PersistClient, anyhow::Error> {
401 let persist_location = PersistLocation {
402 blob_uri: persist_blob_url,
403 consensus_uri: persist_consensus_url,
404 };
405 Ok(persist_clients.open(persist_location).await?)
406 }
407
408 if let Some(CatalogConfig {
409 persist_consensus_url,
410 persist_blob_url,
411 }) = &self.materialize.catalog_config
412 {
413 let persist_client = persist_client(
414 persist_consensus_url.clone(),
415 persist_blob_url.clone(),
416 &self.persist_clients,
417 )
418 .await?;
419 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
420 persist_client,
421 SYSTEM_TIME.clone(),
422 self.materialize.environment_id.clone(),
423 system_parameter_defaults,
424 build_info,
425 bootstrap_args,
426 enable_expression_cache_override,
427 )
428 .await?;
429 let res = f(catalog.for_session(&Session::dummy()));
430 catalog.expire().await;
431 Ok(Some(res))
432 } else {
433 Ok(None)
434 }
435 }
436
437 pub fn aws_endpoint(&self) -> &str {
438 self.aws_config.endpoint_url().unwrap_or("")
439 }
440
441 pub fn aws_region(&self) -> &str {
442 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
443 }
444
445 pub fn clear_skip_consistency_checks(&mut self) -> bool {
448 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
449 }
450
451 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
452 let (inner_client, _) = postgres_client(
453 &format!(
454 "postgres://mz_system:materialize@{}",
455 self.materialize.internal_sql_addr
456 ),
457 self.default_timeout,
458 )
459 .await?;
460
461 let version = inner_client
462 .query_one("SELECT mz_version_num()", &[])
463 .await
464 .context("getting version of materialize")
465 .map(|row| row.get::<_, i32>(0))?;
466
467 let semver = inner_client
468 .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
469 .await
470 .context("getting semver of materialize")
471 .map(|row| row.get::<_, String>(0))?
472 .parse::<semver::Version>()
473 .context("parsing semver of materialize")?;
474
475 inner_client
476 .batch_execute("ALTER SYSTEM RESET ALL")
477 .await
478 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
479
480 {
482 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
483 let enable_unsafe_functions = if semver >= rename_version {
484 "unsafe_enable_unsafe_functions"
485 } else {
486 "enable_unsafe_functions"
487 };
488 let res = inner_client
489 .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
490 .await
491 .context("enabling dangerous functions");
492 if let Err(e) = res {
493 match e.root_cause().downcast_ref::<DbError>() {
494 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
495 info!(
496 "can't enable unsafe functions because the server is safe mode; \
497 testdrive scripts will fail if they use unsafe functions",
498 );
499 }
500 _ => return Err(e),
501 }
502 }
503 }
504
505 for row in inner_client
506 .query("SHOW DATABASES", &[])
507 .await
508 .context("resetting materialize state: SHOW DATABASES")?
509 {
510 let db_name: String = row.get(0);
511 if db_name.starts_with("testdrive_no_reset_") {
512 continue;
513 }
514 let query = format!(
515 "DROP DATABASE {}",
516 postgres_protocol::escape::escape_identifier(&db_name)
517 );
518 sql::print_query(&query, None);
519 inner_client.batch_execute(&query).await.context(format!(
520 "resetting materialize state: DROP DATABASE {}",
521 db_name,
522 ))?;
523 }
524
525 let inactive_user_clusters = "
527 WITH
528 active_user_clusters AS
529 (
530 SELECT DISTINCT cluster_id, object_id
531 FROM
532 (
533 SELECT cluster_id, id FROM mz_catalog.mz_sources
534 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
535 UNION ALL
536 SELECT cluster_id, id
537 FROM mz_catalog.mz_materialized_views
538 UNION ALL
539 SELECT cluster_id, id FROM mz_catalog.mz_indexes
540 UNION ALL
541 SELECT cluster_id, id
542 FROM mz_internal.mz_subscriptions
543 )
544 AS t (cluster_id, object_id)
545 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
546 )
547 SELECT name
548 FROM mz_catalog.mz_clusters
549 WHERE
550 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
551 AND
552 owner_id LIKE 'u%';";
553
554 let inactive_clusters = inner_client
555 .query(inactive_user_clusters, &[])
556 .await
557 .context("resetting materialize state: inactive_user_clusters")?;
558
559 if !inactive_clusters.is_empty() {
560 println!("cleaning up user clusters from previous tests...")
561 }
562
563 for cluster_name in inactive_clusters {
564 let cluster_name: String = cluster_name.get(0);
565 if cluster_name.starts_with("testdrive_no_reset_") {
566 continue;
567 }
568 let query = format!(
569 "DROP CLUSTER {}",
570 postgres_protocol::escape::escape_identifier(&cluster_name)
571 );
572 sql::print_query(&query, None);
573 inner_client.batch_execute(&query).await.context(format!(
574 "resetting materialize state: DROP CLUSTER {}",
575 cluster_name,
576 ))?;
577 }
578
579 inner_client
580 .batch_execute("CREATE DATABASE materialize")
581 .await
582 .context("resetting materialize state: CREATE DATABASE materialize")?;
583
584 if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
588 for row in rows {
589 let role_name: String = row.get(0);
590 if role_name == self.materialize.user || role_name.starts_with("mz_") {
591 continue;
592 }
593 let query = format!(
594 "DROP ROLE {}",
595 postgres_protocol::escape::escape_identifier(&role_name)
596 );
597 sql::print_query(&query, None);
598 inner_client.batch_execute(&query).await.context(format!(
599 "resetting materialize state: DROP ROLE {}",
600 role_name,
601 ))?;
602 }
603 }
604
605 inner_client
607 .batch_execute(&format!(
608 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
609 self.materialize.user
610 ))
611 .await?;
612
613 inner_client
615 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
616 .await?;
617 inner_client
618 .batch_execute(&format!(
619 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
620 self.materialize.user
621 ))
622 .await?;
623 inner_client
624 .batch_execute(&format!(
625 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
626 self.materialize.user
627 ))
628 .await?;
629
630 let cluster = match version {
631 ..=8199 => "default",
632 8200.. => "quickstart",
633 };
634 inner_client
635 .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
636 .await?;
637 inner_client
638 .batch_execute(&format!(
639 "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
640 self.materialize.user
641 ))
642 .await?;
643
644 Ok(())
645 }
646
647 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
649 use rdkafka::types::RDKafkaErrorCode;
650 let mut errors: Vec<anyhow::Error> = Vec::new();
651
652 let metadata = self.kafka_producer.client().fetch_metadata(
653 None,
654 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
655 )?;
656
657 let testdrive_topics: Vec<_> = metadata
658 .topics()
659 .iter()
660 .filter_map(|t| {
661 if t.name().starts_with("testdrive-") {
662 Some(t.name())
663 } else {
664 None
665 }
666 })
667 .collect();
668
669 if !testdrive_topics.is_empty() {
670 match self
671 .kafka_admin
672 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
673 .await
674 {
675 Ok(res) => {
676 if res.len() != testdrive_topics.len() {
677 errors.push(anyhow!(
678 "kafka topic deletion returned {} results, but exactly {} expected",
679 res.len(),
680 testdrive_topics.len()
681 ));
682 }
683 for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
684 match res {
685 Ok(_) | Err((_, RDKafkaErrorCode::UnknownTopicOrPartition)) => (),
686 Err((_, err)) => {
687 errors.push(anyhow!("unable to delete {}: {}", topic, err));
688 }
689 }
690 }
691 }
692 Err(e) => {
693 errors.push(e.into());
694 }
695 };
696 }
697
698 let schema_registry_errors = self.reset_schema_registry().await;
699
700 errors.extend(schema_registry_errors);
701 if errors.is_empty() {
702 Ok(())
703 } else {
704 bail!(
705 "deleting Kafka topics: {} errors: {}",
706 errors.len(),
707 errors
708 .into_iter()
709 .map(|e| e.to_string_with_causes())
710 .join("\n")
711 );
712 }
713 }
714
715 #[allow(clippy::disallowed_types)]
716 async fn reset_schema_registry(&self) -> Vec<anyhow::Error> {
717 use std::collections::HashMap;
718
719 let mut errors = Vec::new();
720 match self
721 .ccsr_client
722 .list_subjects()
723 .await
724 .context("listing schema registry subjects")
725 {
726 Ok(subjects) => {
727 let testdrive_subjects: Vec<_> = subjects
728 .into_iter()
729 .filter(|s| s.starts_with("testdrive-"))
730 .collect();
731
732 let mut graphs: HashMap<SubjectVersion, Vec<SubjectVersion>> = HashMap::new();
734
735 for subject in &testdrive_subjects {
736 match self.ccsr_client.get_subject_with_references(subject).await {
737 Ok((subj, refs)) => {
738 let refs: Vec<_> = refs
740 .into_iter()
741 .filter(|r| r.subject.starts_with("testdrive-"))
742 .collect();
743 graphs.insert(
744 SubjectVersion {
745 subject: subj.name,
746 version: subj.version,
747 },
748 refs,
749 );
750 }
751 Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => {
752 }
754 Err(e) => {
755 errors.push(anyhow::anyhow!(
756 "failed to get references for subject {}: {}",
757 subject,
758 e
759 ));
760 }
761 }
762 }
763
764 let subjects_to_delete: Vec<_> = match mz_ccsr::topological_sort(&graphs) {
767 Ok(ordered) => {
768 let mut subjects: Vec<_> = ordered.into_iter().collect();
769 subjects.sort_by(|a, b| a.1.cmp(&b.1));
770 subjects.into_iter().map(|(s, _)| s.clone()).collect()
771 }
772 Err(_) => {
773 tracing::info!("Cycle detected, attempting to delete anyway");
774 graphs.into_keys().collect()
776 }
777 };
778
779 for subject in subjects_to_delete {
780 match self.ccsr_client.delete_subject(&subject.subject).await {
781 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
782 Err(e) => errors.push(e.into()),
783 }
784 }
785 }
786 Err(e) => {
787 errors.push(e);
788 }
789 }
790 errors
791 }
792}
793
794#[derive(Debug, Clone)]
796pub struct CatalogConfig {
797 pub persist_consensus_url: SensitiveUrl,
799 pub persist_blob_url: SensitiveUrl,
801}
802
803pub enum ControlFlow {
804 Continue,
805 SkipBegin,
806 SkipEnd,
807}
808
809#[async_trait]
810pub(crate) trait Run {
811 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
812}
813
814#[async_trait]
815impl Run for PosCommand {
816 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
817 macro_rules! handle_version {
818 ($version_constraint:expr) => {
819 match $version_constraint {
820 Some(VersionConstraint { min, max }) => {
821 match version_check::run_version_check(min, max, state).await {
822 Ok(true) => return Ok(ControlFlow::Continue),
823 Ok(false) => {}
824 Err(err) => return Err(PosError::new(err, self.pos)),
825 }
826 }
827 None => {}
828 }
829 };
830 }
831
832 let wrap_err = |e| PosError::new(e, self.pos);
833 let ignore_prefix = match &self.command {
836 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
837 _ => None,
838 };
839 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
840 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
841 };
842 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
843 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
844 };
845
846 let r = match self.command {
847 Command::Builtin(mut builtin, version_constraint) => {
848 handle_version!(version_constraint);
849 for val in builtin.args.values_mut() {
850 *val = subst(val, &state.cmd_vars)?;
851 }
852 for line in &mut builtin.input {
853 *line = subst(line, &state.cmd_vars)?;
854 }
855 match builtin.name.as_ref() {
856 "check-consistency" => consistency::run_consistency_checks(state).await,
857 "skip-consistency-checks" => {
858 consistency::skip_consistency_checks(builtin, state)
859 }
860 "check-shard-tombstone" => {
861 consistency::run_check_shard_tombstone(builtin, state).await
862 }
863 "duckdb-execute" => duckdb::run_execute(builtin, state).await,
864 "duckdb-query" => duckdb::run_query(builtin, state).await,
865 "fivetran-destination" => {
866 fivetran::run_destination_command(builtin, state).await
867 }
868 "file-append" => file::run_append(builtin, state).await,
869 "file-delete" => file::run_delete(builtin, state).await,
870 "http-request" => http::run_request(builtin, state).await,
871 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
872 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
873 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
874 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
875 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
876 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
877 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
878 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
879 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
880 "mysql-connect" => mysql::run_connect(builtin, state).await,
881 "mysql-execute" => mysql::run_execute(builtin, state).await,
882 "nop" => nop::run_nop(),
883 "postgres-connect" => postgres::run_connect(builtin, state).await,
884 "postgres-execute" => postgres::run_execute(builtin, state).await,
885 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
886 "protobuf-compile-descriptors" => {
887 protobuf::run_compile_descriptors(builtin, state).await
888 }
889 "psql-execute" => psql::run_execute(builtin, state).await,
890 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
891 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
892 "s3-file-upload" => s3::run_upload(builtin, state).await,
893 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
894 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
895 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
896 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
897 "skip-if" => skip_if::run_skip_if(builtin, state).await,
898 "skip-end" => skip_end::run_skip_end(),
899 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
900 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
901 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
902 "persist-force-compaction" => {
903 persist::run_force_compaction(builtin, state).await
904 }
905 "random-sleep" => sleep::run_random_sleep(builtin),
906 "set-regex" => set::run_regex_set(builtin, state),
907 "unset-regex" => set::run_regex_unset(builtin, state),
908 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
909 "set-max-tries" => set::run_max_tries(builtin, state),
910 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
911 sleep::run_sleep(builtin)
912 }
913 "set" => set::set_vars(builtin, state),
914 "set-arg-default" => set::run_set_arg_default(builtin, state),
915 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
916 "set-from-file" => set::run_set_from_file(builtin, state).await,
917 "webhook-append" => webhook::run_append(builtin, state).await,
918 _ => {
919 return Err(PosError::new(
920 anyhow!("unknown built-in command {}", builtin.name),
921 self.pos,
922 ));
923 }
924 }
925 }
926 Command::Sql(mut sql, version_constraint) => {
927 handle_version!(version_constraint);
928 sql.query = subst(&sql.query, &state.cmd_vars)?;
929 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
930 for row in expected_rows {
931 for col in row {
932 *col = subst(col, &state.cmd_vars)?;
933 }
934 }
935 }
936 sql::run_sql(sql, state).await
937 }
938 Command::FailSql(mut sql, version_constraint) => {
939 handle_version!(version_constraint);
940 sql.query = subst(&sql.query, &state.cmd_vars)?;
941 sql.expected_error = match &sql.expected_error {
942 SqlExpectedError::Contains(s) => {
943 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
944 }
945 SqlExpectedError::Exact(s) => {
946 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
947 }
948 SqlExpectedError::Regex(s) => {
949 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
950 }
951 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
952 };
953 sql::run_fail_sql(sql, state).await
954 }
955 };
956
957 r.map_err(wrap_err)
958 }
959}
960
961fn substitute_vars(
963 msg: &str,
964 vars: &BTreeMap<String, String>,
965 ignore_prefix: &Option<String>,
966 regex_escape: bool,
967) -> Result<String, anyhow::Error> {
968 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
969 let mut err = None;
970 let out = RE.replace_all(msg, |caps: &Captures| {
971 let name = &caps[1];
972 if let Some(ignore_prefix) = &ignore_prefix {
973 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
974 return caps.get(0).unwrap().as_str().to_string();
976 }
977 }
978
979 if let Some(val) = vars.get(name) {
980 if regex_escape {
981 regex::escape(val)
982 } else {
983 val.to_string()
984 }
985 } else {
986 err = Some(anyhow!("unknown variable: {}", name));
987 "#VAR-MISSING#".to_string()
988 }
989 });
990 match err {
991 Some(err) => Err(err),
992 None => Ok(out.into_owned()),
993 }
994}
995
996pub async fn create_state(
1004 config: &Config,
1005) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
1006 let seed = config
1007 .seed
1008 .clone()
1009 .unwrap_or_else(|| format!("{:010}", rand::random::<u32>()));
1010
1011 let (_tempfile, temp_path) = match &config.temp_dir {
1012 Some(temp_dir) => {
1013 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
1014 (None, PathBuf::from(&temp_dir))
1015 }
1016 _ => {
1017 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
1020 let temp_path = tempfile_handle.path().to_path_buf();
1021 (Some(tempfile_handle), temp_path)
1022 }
1023 };
1024
1025 let materialize_catalog_config = config.materialize_catalog_config.clone();
1026
1027 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1028 info!("Connecting to {}", materialize_url.as_str());
1029 let (pgclient, pgconn) = Retry::default()
1030 .max_duration(config.default_timeout)
1031 .retry_async_canceling(|_| async move {
1032 let mut pgconfig = config.materialize_pgconfig.clone();
1033 pgconfig.connect_timeout(config.default_timeout);
1034 let tls = make_tls(&pgconfig)?;
1035 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
1036 })
1037 .await?;
1038
1039 let pgconn_task =
1040 task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
1041
1042 let materialize_state =
1043 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
1044
1045 let schema_registry_url = config.schema_registry_url.to_owned();
1046
1047 let ccsr_client = {
1048 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
1049
1050 if let Some(cert_path) = &config.cert_path {
1051 let cert = fs::read(cert_path).context("reading cert")?;
1052 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
1053 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
1054 .context("reading keystore file as pkcs12")?;
1055 ccsr_config = ccsr_config.identity(ident);
1056 }
1057
1058 if let Some(ccsr_username) = &config.ccsr_username {
1059 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1060 }
1061
1062 ccsr_config.build().context("Creating CCSR client")?
1063 };
1064
1065 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1066 use rdkafka::admin::{AdminClient, AdminOptions};
1067 use rdkafka::producer::FutureProducer;
1068
1069 let mut kafka_config = create_new_client_config_simple();
1070 kafka_config.set("bootstrap.servers", &config.kafka_addr);
1071 kafka_config.set("group.id", "materialize-testdrive");
1072 kafka_config.set("auto.offset.reset", "earliest");
1073 kafka_config.set("isolation.level", "read_committed");
1074 if let Some(cert_path) = &config.cert_path {
1075 kafka_config.set("security.protocol", "ssl");
1076 kafka_config.set("ssl.keystore.location", cert_path);
1077 if let Some(cert_password) = &config.cert_password {
1078 kafka_config.set("ssl.keystore.password", cert_password);
1079 }
1080 }
1081 kafka_config.set("message.max.bytes", "15728640");
1082
1083 for (key, value) in &config.kafka_opts {
1084 kafka_config.set(key, value);
1085 }
1086
1087 let admin: AdminClient<_> = kafka_config
1088 .create_with_context(MzClientContext::default())
1089 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1090
1091 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1092
1093 let producer: FutureProducer<_> = kafka_config
1094 .create_with_context(MzClientContext::default())
1095 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1096
1097 let topics = BTreeMap::new();
1098
1099 (
1100 config.kafka_addr.to_owned(),
1101 admin,
1102 admin_opts,
1103 producer,
1104 topics,
1105 kafka_config,
1106 )
1107 };
1108
1109 let mut state = State {
1110 config: config.clone(),
1111
1112 arg_vars: config.arg_vars.clone(),
1114 cmd_vars: BTreeMap::new(),
1115 seed,
1116 temp_path,
1117 _tempfile,
1118 default_timeout: config.default_timeout,
1119 timeout: config.default_timeout,
1120 max_tries: config.default_max_tries,
1121 initial_backoff: config.initial_backoff,
1122 backoff_factor: config.backoff_factor,
1123 consistency_checks: config.consistency_checks,
1124 check_statement_logging: config.check_statement_logging,
1125 consistency_checks_adhoc_skip: false,
1126 regex: None,
1127 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1128 rewrite_results: config.rewrite_results,
1129 error_line_count: 0,
1130 error_string: "".to_string(),
1131
1132 materialize: materialize_state,
1134
1135 persist_consensus_url: config.persist_consensus_url.clone(),
1137 persist_blob_url: config.persist_blob_url.clone(),
1138 build_info: config.build_info,
1139 persist_clients: PersistClientCache::new(
1140 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1141 &MetricsRegistry::new(),
1142 |_, _| PubSubClientConnection::noop(),
1143 ),
1144
1145 schema_registry_url,
1147 ccsr_client,
1148 kafka_addr,
1149 kafka_admin,
1150 kafka_admin_opts,
1151 kafka_config,
1152 kafka_default_partitions: config.kafka_default_partitions,
1153 kafka_producer,
1154 kafka_topics,
1155
1156 aws_account: config.aws_account.clone(),
1158 aws_config: config.aws_config.clone(),
1159
1160 duckdb_clients: BTreeMap::new(),
1162 mysql_clients: BTreeMap::new(),
1163 postgres_clients: BTreeMap::new(),
1164 sql_server_clients: BTreeMap::new(),
1165
1166 fivetran_destination_url: config.fivetran_destination_url.clone(),
1168 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1169
1170 rewrites: Vec::new(),
1171 rewrite_pos_start: 0,
1172 rewrite_pos_end: 0,
1173 };
1174 state.initialize_cmd_vars().await?;
1175 Ok((state, pgconn_task))
1176}
1177
1178async fn create_materialize_state(
1179 config: &&Config,
1180 materialize_catalog_config: Option<CatalogConfig>,
1181 pgclient: tokio_postgres::Client,
1182) -> Result<MaterializeState, anyhow::Error> {
1183 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1184 let materialize_internal_url =
1185 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1186
1187 for (key, value) in &config.materialize_params {
1188 pgclient
1189 .batch_execute(&format!("SET {key} = {value}"))
1190 .await
1191 .context("setting session parameter")?;
1192 }
1193
1194 let materialize_user = config
1195 .materialize_pgconfig
1196 .get_user()
1197 .expect("testdrive URL must contain user")
1198 .to_string();
1199
1200 let materialize_sql_addr = format!(
1201 "{}:{}",
1202 materialize_url.host_str().unwrap(),
1203 materialize_url.port().unwrap()
1204 );
1205 let materialize_http_addr = format!(
1206 "{}:{}",
1207 materialize_url.host_str().unwrap(),
1208 config.materialize_http_port
1209 );
1210 let materialize_internal_sql_addr = format!(
1211 "{}:{}",
1212 materialize_internal_url.host_str().unwrap(),
1213 materialize_internal_url.port().unwrap()
1214 );
1215 let materialize_password_sql_addr = format!(
1216 "{}:{}",
1217 materialize_url.host_str().unwrap(),
1218 config.materialize_password_sql_port
1219 );
1220 let materialize_sasl_sql_addr = format!(
1221 "{}:{}",
1222 materialize_url.host_str().unwrap(),
1223 config.materialize_sasl_sql_port
1224 );
1225 let materialize_internal_http_addr = format!(
1226 "{}:{}",
1227 materialize_internal_url.host_str().unwrap(),
1228 config.materialize_internal_http_port
1229 );
1230 let environment_id = pgclient
1231 .query_one("SELECT mz_environment_id()", &[])
1232 .await?
1233 .get::<_, String>(0)
1234 .parse()
1235 .context("parsing environment ID")?;
1236
1237 let bootstrap_args = BootstrapArgs {
1238 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1239 default_cluster_replica_size: "ABC".to_string(),
1240 default_cluster_replication_factor: 1,
1241 bootstrap_role: None,
1242 };
1243
1244 let materialize_state = MaterializeState {
1245 catalog_config: materialize_catalog_config,
1246 sql_addr: materialize_sql_addr,
1247 use_https: config.materialize_use_https,
1248 http_addr: materialize_http_addr,
1249 internal_sql_addr: materialize_internal_sql_addr,
1250 internal_http_addr: materialize_internal_http_addr,
1251 password_sql_addr: materialize_password_sql_addr,
1252 sasl_sql_addr: materialize_sasl_sql_addr,
1253 user: materialize_user,
1254 pgclient,
1255 environment_id,
1256 bootstrap_args,
1257 };
1258
1259 Ok(materialize_state)
1260}