1use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rand::Rng;
43use rdkafka::ClientConfig;
44use rdkafka::producer::Producer;
45use regex::{Captures, Regex};
46use semver::Version;
47use tokio_postgres::error::{DbError, SqlState};
48use tracing::info;
49use url::Url;
50
51use crate::error::PosError;
52use crate::parser::{
53 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
54};
55use crate::util;
56use crate::util::postgres::postgres_client;
57
58pub mod consistency;
59
60mod file;
61mod fivetran;
62mod http;
63mod kafka;
64mod mysql;
65mod nop;
66mod persist;
67mod postgres;
68mod protobuf;
69mod psql;
70mod s3;
71mod schema_registry;
72mod set;
73mod skip_end;
74mod skip_if;
75mod sleep;
76mod sql;
77mod sql_server;
78mod version_check;
79mod webhook;
80
81#[derive(Debug)]
83pub struct Config {
84 pub arg_vars: BTreeMap<String, String>,
90 pub seed: Option<u32>,
92 pub reset: bool,
95 pub temp_dir: Option<String>,
100 pub source: Option<String>,
102 pub default_timeout: Duration,
104 pub default_max_tries: usize,
106 pub initial_backoff: Duration,
110 pub backoff_factor: f64,
114 pub consistency_checks: consistency::Level,
116 pub rewrite_results: bool,
118
119 pub materialize_pgconfig: tokio_postgres::Config,
123 pub materialize_internal_pgconfig: tokio_postgres::Config,
126 pub materialize_use_https: bool,
128 pub materialize_http_port: u16,
131 pub materialize_internal_http_port: u16,
134 pub materialize_password_sql_port: u16,
137 pub materialize_params: Vec<(String, String)>,
139 pub materialize_catalog_config: Option<CatalogConfig>,
141 pub build_info: &'static BuildInfo,
143 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
145
146 pub persist_consensus_url: Option<SensitiveUrl>,
149 pub persist_blob_url: Option<SensitiveUrl>,
151
152 pub kafka_addr: String,
155 pub kafka_default_partitions: usize,
157 pub kafka_opts: Vec<(String, String)>,
160 pub schema_registry_url: Url,
162 pub cert_path: Option<String>,
167 pub cert_password: Option<String>,
169 pub ccsr_username: Option<String>,
172 pub ccsr_password: Option<String>,
175
176 pub aws_config: SdkConfig,
179 pub aws_account: String,
181
182 pub fivetran_destination_url: String,
185 pub fivetran_destination_files_path: String,
187}
188
189pub struct MaterializeState {
190 catalog_config: Option<CatalogConfig>,
191
192 sql_addr: String,
193 use_https: bool,
194 http_addr: String,
195 internal_sql_addr: String,
196 internal_http_addr: String,
197 password_sql_addr: String,
198 user: String,
199 pgclient: tokio_postgres::Client,
200 environment_id: EnvironmentId,
201 bootstrap_args: BootstrapArgs,
202}
203
204pub struct State {
205 arg_vars: BTreeMap<String, String>,
207 cmd_vars: BTreeMap<String, String>,
208 seed: u32,
209 temp_path: PathBuf,
210 _tempfile: Option<tempfile::TempDir>,
211 default_timeout: Duration,
212 timeout: Duration,
213 max_tries: usize,
214 initial_backoff: Duration,
215 backoff_factor: f64,
216 consistency_checks: consistency::Level,
217 consistency_checks_adhoc_skip: bool,
218 regex: Option<Regex>,
219 regex_replacement: String,
220 error_line_count: usize,
221 error_string: String,
222
223 materialize: MaterializeState,
225
226 persist_consensus_url: Option<SensitiveUrl>,
228 persist_blob_url: Option<SensitiveUrl>,
229 build_info: &'static BuildInfo,
230 persist_clients: PersistClientCache,
231
232 schema_registry_url: Url,
234 ccsr_client: mz_ccsr::Client,
235 kafka_addr: String,
236 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
237 kafka_admin_opts: rdkafka::admin::AdminOptions,
238 kafka_config: ClientConfig,
239 kafka_default_partitions: usize,
240 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
241 kafka_topics: BTreeMap<String, usize>,
242
243 aws_account: String,
245 aws_config: SdkConfig,
246
247 mysql_clients: BTreeMap<String, mysql_async::Conn>,
249 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
250 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
251
252 fivetran_destination_url: String,
254 fivetran_destination_files_path: String,
255
256 rewrite_results: bool,
258 pub rewrites: Vec<Rewrite>,
260 pub rewrite_pos_start: usize,
262 pub rewrite_pos_end: usize,
264}
265
266pub struct Rewrite {
267 pub content: String,
268 pub start: usize,
269 pub end: usize,
270}
271
272impl State {
273 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
274 self.cmd_vars
275 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
276 self.cmd_vars.insert(
277 "testdrive.kafka-addr-resolved".into(),
278 self.kafka_addr
279 .to_socket_addrs()
280 .ok()
281 .and_then(|mut addrs| addrs.next())
282 .map(|addr| addr.to_string())
283 .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
284 );
285 self.cmd_vars.insert(
286 "testdrive.schema-registry-url".into(),
287 self.schema_registry_url.to_string(),
288 );
289 self.cmd_vars
290 .insert("testdrive.seed".into(), self.seed.to_string());
291 self.cmd_vars.insert(
292 "testdrive.temp-dir".into(),
293 self.temp_path.display().to_string(),
294 );
295 self.cmd_vars
296 .insert("testdrive.aws-region".into(), self.aws_region().into());
297 self.cmd_vars
298 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
299 self.cmd_vars
300 .insert("testdrive.aws-account".into(), self.aws_account.clone());
301 {
302 let aws_credentials = self
303 .aws_config
304 .credentials_provider()
305 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
306 .provide_credentials()
307 .await
308 .context("fetching AWS credentials")?;
309 self.cmd_vars.insert(
310 "testdrive.aws-access-key-id".into(),
311 aws_credentials.access_key_id().to_owned(),
312 );
313 self.cmd_vars.insert(
314 "testdrive.aws-secret-access-key".into(),
315 aws_credentials.secret_access_key().to_owned(),
316 );
317 self.cmd_vars.insert(
318 "testdrive.aws-token".into(),
319 aws_credentials
320 .session_token()
321 .map(|token| token.to_owned())
322 .unwrap_or_else(String::new),
323 );
324 }
325 self.cmd_vars.insert(
326 "testdrive.materialize-environment-id".into(),
327 self.materialize.environment_id.to_string(),
328 );
329 self.cmd_vars.insert(
330 "testdrive.materialize-sql-addr".into(),
331 self.materialize.sql_addr.clone(),
332 );
333 self.cmd_vars.insert(
334 "testdrive.materialize-internal-sql-addr".into(),
335 self.materialize.internal_sql_addr.clone(),
336 );
337 self.cmd_vars.insert(
338 "testdrive.materialize-password-sql-addr".into(),
339 self.materialize.password_sql_addr.clone(),
340 );
341 self.cmd_vars.insert(
342 "testdrive.materialize-user".into(),
343 self.materialize.user.clone(),
344 );
345 self.cmd_vars.insert(
346 "testdrive.fivetran-destination-url".into(),
347 self.fivetran_destination_url.clone(),
348 );
349 self.cmd_vars.insert(
350 "testdrive.fivetran-destination-files-path".into(),
351 self.fivetran_destination_files_path.clone(),
352 );
353
354 for (key, value) in env::vars() {
355 self.cmd_vars.insert(format!("env.{}", key), value);
356 }
357
358 for (key, value) in &self.arg_vars {
359 validate_ident(key)?;
360 self.cmd_vars
361 .insert(format!("arg.{}", key), value.to_string());
362 }
363
364 Ok(())
365 }
366 pub async fn with_catalog_copy<F, T>(
369 &self,
370 system_parameter_defaults: BTreeMap<String, String>,
371 build_info: &'static BuildInfo,
372 bootstrap_args: &BootstrapArgs,
373 enable_expression_cache_override: Option<bool>,
374 f: F,
375 ) -> Result<Option<T>, anyhow::Error>
376 where
377 F: FnOnce(ConnCatalog) -> T,
378 {
379 async fn persist_client(
380 persist_consensus_url: SensitiveUrl,
381 persist_blob_url: SensitiveUrl,
382 persist_clients: &PersistClientCache,
383 ) -> Result<PersistClient, anyhow::Error> {
384 let persist_location = PersistLocation {
385 blob_uri: persist_blob_url,
386 consensus_uri: persist_consensus_url,
387 };
388 Ok(persist_clients.open(persist_location).await?)
389 }
390
391 if let Some(CatalogConfig {
392 persist_consensus_url,
393 persist_blob_url,
394 }) = &self.materialize.catalog_config
395 {
396 let persist_client = persist_client(
397 persist_consensus_url.clone(),
398 persist_blob_url.clone(),
399 &self.persist_clients,
400 )
401 .await?;
402 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
403 persist_client,
404 SYSTEM_TIME.clone(),
405 self.materialize.environment_id.clone(),
406 system_parameter_defaults,
407 build_info,
408 bootstrap_args,
409 enable_expression_cache_override,
410 )
411 .await?;
412 let res = f(catalog.for_session(&Session::dummy()));
413 catalog.expire().await;
414 Ok(Some(res))
415 } else {
416 Ok(None)
417 }
418 }
419
420 pub fn aws_endpoint(&self) -> &str {
421 self.aws_config.endpoint_url().unwrap_or("")
422 }
423
424 pub fn aws_region(&self) -> &str {
425 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
426 }
427
428 pub fn clear_skip_consistency_checks(&mut self) -> bool {
431 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
432 }
433
434 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
435 let (inner_client, _) = postgres_client(
436 &format!(
437 "postgres://mz_system:materialize@{}",
438 self.materialize.internal_sql_addr
439 ),
440 self.default_timeout,
441 )
442 .await?;
443
444 let version = inner_client
445 .query_one("SELECT mz_version_num()", &[])
446 .await
447 .context("getting version of materialize")
448 .map(|row| row.get::<_, i32>(0))?;
449
450 let semver = inner_client
451 .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
452 .await
453 .context("getting semver of materialize")
454 .map(|row| row.get::<_, String>(0))?
455 .parse::<semver::Version>()
456 .context("parsing semver of materialize")?;
457
458 inner_client
459 .batch_execute("ALTER SYSTEM RESET ALL")
460 .await
461 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
462
463 {
465 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
466 let enable_unsafe_functions = if semver >= rename_version {
467 "unsafe_enable_unsafe_functions"
468 } else {
469 "enable_unsafe_functions"
470 };
471 let res = inner_client
472 .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
473 .await
474 .context("enabling dangerous functions");
475 if let Err(e) = res {
476 match e.root_cause().downcast_ref::<DbError>() {
477 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
478 info!(
479 "can't enable unsafe functions because the server is safe mode; \
480 testdrive scripts will fail if they use unsafe functions",
481 );
482 }
483 _ => return Err(e),
484 }
485 }
486 }
487
488 for row in inner_client
489 .query("SHOW DATABASES", &[])
490 .await
491 .context("resetting materialize state: SHOW DATABASES")?
492 {
493 let db_name: String = row.get(0);
494 if db_name.starts_with("testdrive_no_reset_") {
495 continue;
496 }
497 let query = format!(
498 "DROP DATABASE {}",
499 postgres_protocol::escape::escape_identifier(&db_name)
500 );
501 sql::print_query(&query, None);
502 inner_client.batch_execute(&query).await.context(format!(
503 "resetting materialize state: DROP DATABASE {}",
504 db_name,
505 ))?;
506 }
507
508 let inactive_user_clusters = "
510 WITH
511 active_user_clusters AS
512 (
513 SELECT DISTINCT cluster_id, object_id
514 FROM
515 (
516 SELECT cluster_id, id FROM mz_catalog.mz_sources
517 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
518 UNION ALL
519 SELECT cluster_id, id
520 FROM mz_catalog.mz_materialized_views
521 UNION ALL
522 SELECT cluster_id, id FROM mz_catalog.mz_indexes
523 UNION ALL
524 SELECT cluster_id, id
525 FROM mz_internal.mz_subscriptions
526 )
527 AS t (cluster_id, object_id)
528 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
529 )
530 SELECT name
531 FROM mz_catalog.mz_clusters
532 WHERE
533 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
534 AND
535 owner_id LIKE 'u%';";
536
537 let inactive_clusters = inner_client
538 .query(inactive_user_clusters, &[])
539 .await
540 .context("resetting materialize state: inactive_user_clusters")?;
541
542 if !inactive_clusters.is_empty() {
543 println!("cleaning up user clusters from previous tests...")
544 }
545
546 for cluster_name in inactive_clusters {
547 let cluster_name: String = cluster_name.get(0);
548 if cluster_name.starts_with("testdrive_no_reset_") {
549 continue;
550 }
551 let query = format!(
552 "DROP CLUSTER {}",
553 postgres_protocol::escape::escape_identifier(&cluster_name)
554 );
555 sql::print_query(&query, None);
556 inner_client.batch_execute(&query).await.context(format!(
557 "resetting materialize state: DROP CLUSTER {}",
558 cluster_name,
559 ))?;
560 }
561
562 inner_client
563 .batch_execute("CREATE DATABASE materialize")
564 .await
565 .context("resetting materialize state: CREATE DATABASE materialize")?;
566
567 if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
571 for row in rows {
572 let role_name: String = row.get(0);
573 if role_name == self.materialize.user || role_name.starts_with("mz_") {
574 continue;
575 }
576 let query = format!(
577 "DROP ROLE {}",
578 postgres_protocol::escape::escape_identifier(&role_name)
579 );
580 sql::print_query(&query, None);
581 inner_client.batch_execute(&query).await.context(format!(
582 "resetting materialize state: DROP ROLE {}",
583 role_name,
584 ))?;
585 }
586 }
587
588 inner_client
590 .batch_execute(&format!(
591 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
592 self.materialize.user
593 ))
594 .await?;
595
596 inner_client
598 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
599 .await?;
600 inner_client
601 .batch_execute(&format!(
602 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
603 self.materialize.user
604 ))
605 .await?;
606 inner_client
607 .batch_execute(&format!(
608 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
609 self.materialize.user
610 ))
611 .await?;
612
613 let cluster = match version {
614 ..=8199 => "default",
615 8200.. => "quickstart",
616 };
617 inner_client
618 .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
619 .await?;
620 inner_client
621 .batch_execute(&format!(
622 "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
623 self.materialize.user
624 ))
625 .await?;
626
627 Ok(())
628 }
629
630 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
632 let mut errors: Vec<anyhow::Error> = Vec::new();
633
634 let metadata = self.kafka_producer.client().fetch_metadata(
635 None,
636 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
637 )?;
638
639 let testdrive_topics: Vec<_> = metadata
640 .topics()
641 .iter()
642 .filter_map(|t| {
643 if t.name().starts_with("testdrive-") {
644 Some(t.name())
645 } else {
646 None
647 }
648 })
649 .collect();
650
651 if !testdrive_topics.is_empty() {
652 match self
653 .kafka_admin
654 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
655 .await
656 {
657 Ok(res) => {
658 if res.len() != testdrive_topics.len() {
659 errors.push(anyhow!(
660 "kafka topic deletion returned {} results, but exactly {} expected",
661 res.len(),
662 testdrive_topics.len()
663 ));
664 }
665 for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
666 match res {
667 Ok(_)
668 | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
669 ()
670 }
671 Err((_, err)) => {
672 errors.push(anyhow!("unable to delete {}: {}", topic, err));
673 }
674 }
675 }
676 }
677 Err(e) => {
678 errors.push(e.into());
679 }
680 };
681 }
682
683 match self
684 .ccsr_client
685 .list_subjects()
686 .await
687 .context("listing schema registry subjects")
688 {
689 Ok(subjects) => {
690 let testdrive_subjects: Vec<_> = subjects
691 .iter()
692 .filter(|s| s.starts_with("testdrive-"))
693 .collect();
694
695 for subject in testdrive_subjects {
696 match self.ccsr_client.delete_subject(subject).await {
697 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
698 Err(e) => errors.push(e.into()),
699 }
700 }
701 }
702 Err(e) => {
703 errors.push(e);
704 }
705 }
706
707 if errors.is_empty() {
708 Ok(())
709 } else {
710 bail!(
711 "deleting Kafka topics: {} errors: {}",
712 errors.len(),
713 errors
714 .into_iter()
715 .map(|e| e.to_string_with_causes())
716 .join("\n")
717 );
718 }
719 }
720}
721
722#[derive(Debug, Clone)]
724pub struct CatalogConfig {
725 pub persist_consensus_url: SensitiveUrl,
727 pub persist_blob_url: SensitiveUrl,
729}
730
731pub enum ControlFlow {
732 Continue,
733 SkipBegin,
734 SkipEnd,
735}
736
737#[async_trait]
738pub(crate) trait Run {
739 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
740}
741
742#[async_trait]
743impl Run for PosCommand {
744 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
745 macro_rules! handle_version {
746 ($version_constraint:expr) => {
747 match $version_constraint {
748 Some(VersionConstraint { min, max }) => {
749 match version_check::run_version_check(min, max, state).await {
750 Ok(true) => return Ok(ControlFlow::Continue),
751 Ok(false) => {}
752 Err(err) => return Err(PosError::new(err, self.pos)),
753 }
754 }
755 None => {}
756 }
757 };
758 }
759
760 let wrap_err = |e| PosError::new(e, self.pos);
761 let ignore_prefix = match &self.command {
764 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
765 _ => None,
766 };
767 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
768 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
769 };
770 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
771 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
772 };
773
774 let r = match self.command {
775 Command::Builtin(mut builtin, version_constraint) => {
776 handle_version!(version_constraint);
777 for val in builtin.args.values_mut() {
778 *val = subst(val, &state.cmd_vars)?;
779 }
780 for line in &mut builtin.input {
781 *line = subst(line, &state.cmd_vars)?;
782 }
783 match builtin.name.as_ref() {
784 "check-consistency" => consistency::run_consistency_checks(state).await,
785 "skip-consistency-checks" => {
786 consistency::skip_consistency_checks(builtin, state)
787 }
788 "check-shard-tombstone" => {
789 consistency::run_check_shard_tombstone(builtin, state).await
790 }
791 "fivetran-destination" => {
792 fivetran::run_destination_command(builtin, state).await
793 }
794 "file-append" => file::run_append(builtin, state).await,
795 "file-delete" => file::run_delete(builtin, state).await,
796 "http-request" => http::run_request(builtin, state).await,
797 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
798 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
799 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
800 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
801 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
802 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
803 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
804 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
805 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
806 "mysql-connect" => mysql::run_connect(builtin, state).await,
807 "mysql-execute" => mysql::run_execute(builtin, state).await,
808 "nop" => nop::run_nop(),
809 "postgres-connect" => postgres::run_connect(builtin, state).await,
810 "postgres-execute" => postgres::run_execute(builtin, state).await,
811 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
812 "protobuf-compile-descriptors" => {
813 protobuf::run_compile_descriptors(builtin, state).await
814 }
815 "psql-execute" => psql::run_execute(builtin, state).await,
816 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
817 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
818 "s3-file-upload" => s3::run_upload(builtin, state).await,
819 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
820 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
821 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
822 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
823 "skip-if" => skip_if::run_skip_if(builtin, state).await,
824 "skip-end" => skip_end::run_skip_end(),
825 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
826 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
827 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
828 "persist-force-compaction" => {
829 persist::run_force_compaction(builtin, state).await
830 }
831 "random-sleep" => sleep::run_random_sleep(builtin),
832 "set-regex" => set::run_regex_set(builtin, state),
833 "unset-regex" => set::run_regex_unset(builtin, state),
834 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
835 "set-max-tries" => set::run_max_tries(builtin, state),
836 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
837 sleep::run_sleep(builtin)
838 }
839 "set" => set::set_vars(builtin, state),
840 "set-arg-default" => set::run_set_arg_default(builtin, state),
841 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
842 "set-from-file" => set::run_set_from_file(builtin, state).await,
843 "webhook-append" => webhook::run_append(builtin, state).await,
844 _ => {
845 return Err(PosError::new(
846 anyhow!("unknown built-in command {}", builtin.name),
847 self.pos,
848 ));
849 }
850 }
851 }
852 Command::Sql(mut sql, version_constraint) => {
853 handle_version!(version_constraint);
854 sql.query = subst(&sql.query, &state.cmd_vars)?;
855 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
856 for row in expected_rows {
857 for col in row {
858 *col = subst(col, &state.cmd_vars)?;
859 }
860 }
861 }
862 sql::run_sql(sql, state).await
863 }
864 Command::FailSql(mut sql, version_constraint) => {
865 handle_version!(version_constraint);
866 sql.query = subst(&sql.query, &state.cmd_vars)?;
867 sql.expected_error = match &sql.expected_error {
868 SqlExpectedError::Contains(s) => {
869 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
870 }
871 SqlExpectedError::Exact(s) => {
872 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
873 }
874 SqlExpectedError::Regex(s) => {
875 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
876 }
877 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
878 };
879 sql::run_fail_sql(sql, state).await
880 }
881 };
882
883 r.map_err(wrap_err)
884 }
885}
886
887fn substitute_vars(
889 msg: &str,
890 vars: &BTreeMap<String, String>,
891 ignore_prefix: &Option<String>,
892 regex_escape: bool,
893) -> Result<String, anyhow::Error> {
894 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
895 let mut err = None;
896 let out = RE.replace_all(msg, |caps: &Captures| {
897 let name = &caps[1];
898 if let Some(ignore_prefix) = &ignore_prefix {
899 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
900 return caps.get(0).unwrap().as_str().to_string();
902 }
903 }
904
905 if let Some(val) = vars.get(name) {
906 if regex_escape {
907 regex::escape(val)
908 } else {
909 val.to_string()
910 }
911 } else {
912 err = Some(anyhow!("unknown variable: {}", name));
913 "#VAR-MISSING#".to_string()
914 }
915 });
916 match err {
917 Some(err) => Err(err),
918 None => Ok(out.into_owned()),
919 }
920}
921
922pub async fn create_state(
930 config: &Config,
931) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
932 let seed = config.seed.unwrap_or_else(|| rand::thread_rng().r#gen());
933
934 let (_tempfile, temp_path) = match &config.temp_dir {
935 Some(temp_dir) => {
936 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
937 (None, PathBuf::from(&temp_dir))
938 }
939 _ => {
940 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
943 let temp_path = tempfile_handle.path().to_path_buf();
944 (Some(tempfile_handle), temp_path)
945 }
946 };
947
948 let materialize_catalog_config = config.materialize_catalog_config.clone();
949
950 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
951 info!("Connecting to {}", materialize_url.as_str());
952 let (pgclient, pgconn) = Retry::default()
953 .max_duration(config.default_timeout)
954 .retry_async_canceling(|_| async move {
955 let mut pgconfig = config.materialize_pgconfig.clone();
956 pgconfig.connect_timeout(config.default_timeout);
957 let tls = make_tls(&pgconfig)?;
958 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
959 })
960 .await?;
961
962 let pgconn_task = task::spawn(|| "pgconn_task", pgconn).map(|join| {
963 join.expect("pgconn_task unexpectedly canceled")
964 .context("running SQL connection")
965 });
966
967 let materialize_state =
968 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
969
970 let schema_registry_url = config.schema_registry_url.to_owned();
971
972 let ccsr_client = {
973 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
974
975 if let Some(cert_path) = &config.cert_path {
976 let cert = fs::read(cert_path).context("reading cert")?;
977 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
978 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
979 .context("reading keystore file as pkcs12")?;
980 ccsr_config = ccsr_config.identity(ident);
981 }
982
983 if let Some(ccsr_username) = &config.ccsr_username {
984 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
985 }
986
987 ccsr_config.build().context("Creating CCSR client")?
988 };
989
990 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
991 use rdkafka::admin::{AdminClient, AdminOptions};
992 use rdkafka::producer::FutureProducer;
993
994 let mut kafka_config = create_new_client_config_simple();
995 kafka_config.set("bootstrap.servers", &config.kafka_addr);
996 kafka_config.set("group.id", "materialize-testdrive");
997 kafka_config.set("auto.offset.reset", "earliest");
998 kafka_config.set("isolation.level", "read_committed");
999 if let Some(cert_path) = &config.cert_path {
1000 kafka_config.set("security.protocol", "ssl");
1001 kafka_config.set("ssl.keystore.location", cert_path);
1002 if let Some(cert_password) = &config.cert_password {
1003 kafka_config.set("ssl.keystore.password", cert_password);
1004 }
1005 }
1006 kafka_config.set("message.max.bytes", "15728640");
1007
1008 for (key, value) in &config.kafka_opts {
1009 kafka_config.set(key, value);
1010 }
1011
1012 let admin: AdminClient<_> = kafka_config
1013 .create_with_context(MzClientContext::default())
1014 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1015
1016 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1017
1018 let producer: FutureProducer<_> = kafka_config
1019 .create_with_context(MzClientContext::default())
1020 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1021
1022 let topics = BTreeMap::new();
1023
1024 (
1025 config.kafka_addr.to_owned(),
1026 admin,
1027 admin_opts,
1028 producer,
1029 topics,
1030 kafka_config,
1031 )
1032 };
1033
1034 let mut state = State {
1035 arg_vars: config.arg_vars.clone(),
1037 cmd_vars: BTreeMap::new(),
1038 seed,
1039 temp_path,
1040 _tempfile,
1041 default_timeout: config.default_timeout,
1042 timeout: config.default_timeout,
1043 max_tries: config.default_max_tries,
1044 initial_backoff: config.initial_backoff,
1045 backoff_factor: config.backoff_factor,
1046 consistency_checks: config.consistency_checks,
1047 consistency_checks_adhoc_skip: false,
1048 regex: None,
1049 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1050 rewrite_results: config.rewrite_results,
1051 error_line_count: 0,
1052 error_string: "".to_string(),
1053
1054 materialize: materialize_state,
1056
1057 persist_consensus_url: config.persist_consensus_url.clone(),
1059 persist_blob_url: config.persist_blob_url.clone(),
1060 build_info: config.build_info,
1061 persist_clients: PersistClientCache::new(
1062 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1063 &MetricsRegistry::new(),
1064 |_, _| PubSubClientConnection::noop(),
1065 ),
1066
1067 schema_registry_url,
1069 ccsr_client,
1070 kafka_addr,
1071 kafka_admin,
1072 kafka_admin_opts,
1073 kafka_config,
1074 kafka_default_partitions: config.kafka_default_partitions,
1075 kafka_producer,
1076 kafka_topics,
1077
1078 aws_account: config.aws_account.clone(),
1080 aws_config: config.aws_config.clone(),
1081
1082 mysql_clients: BTreeMap::new(),
1084 postgres_clients: BTreeMap::new(),
1085 sql_server_clients: BTreeMap::new(),
1086
1087 fivetran_destination_url: config.fivetran_destination_url.clone(),
1089 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1090
1091 rewrites: Vec::new(),
1092 rewrite_pos_start: 0,
1093 rewrite_pos_end: 0,
1094 };
1095 state.initialize_cmd_vars().await?;
1096 Ok((state, pgconn_task))
1097}
1098
1099async fn create_materialize_state(
1100 config: &&Config,
1101 materialize_catalog_config: Option<CatalogConfig>,
1102 pgclient: tokio_postgres::Client,
1103) -> Result<MaterializeState, anyhow::Error> {
1104 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1105 let materialize_internal_url =
1106 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1107
1108 for (key, value) in &config.materialize_params {
1109 pgclient
1110 .batch_execute(&format!("SET {key} = {value}"))
1111 .await
1112 .context("setting session parameter")?;
1113 }
1114
1115 let materialize_user = config
1116 .materialize_pgconfig
1117 .get_user()
1118 .expect("testdrive URL must contain user")
1119 .to_string();
1120
1121 let materialize_sql_addr = format!(
1122 "{}:{}",
1123 materialize_url.host_str().unwrap(),
1124 materialize_url.port().unwrap()
1125 );
1126 let materialize_http_addr = format!(
1127 "{}:{}",
1128 materialize_url.host_str().unwrap(),
1129 config.materialize_http_port
1130 );
1131 let materialize_internal_sql_addr = format!(
1132 "{}:{}",
1133 materialize_internal_url.host_str().unwrap(),
1134 materialize_internal_url.port().unwrap()
1135 );
1136 let materialize_password_sql_addr = format!(
1137 "{}:{}",
1138 materialize_url.host_str().unwrap(),
1139 config.materialize_password_sql_port
1140 );
1141 let materialize_internal_http_addr = format!(
1142 "{}:{}",
1143 materialize_internal_url.host_str().unwrap(),
1144 config.materialize_internal_http_port
1145 );
1146 let environment_id = pgclient
1147 .query_one("SELECT mz_environment_id()", &[])
1148 .await?
1149 .get::<_, String>(0)
1150 .parse()
1151 .context("parsing environment ID")?;
1152
1153 let bootstrap_args = BootstrapArgs {
1154 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1155 default_cluster_replica_size: "ABC".to_string(),
1156 default_cluster_replication_factor: 1,
1157 bootstrap_role: None,
1158 };
1159
1160 let materialize_state = MaterializeState {
1161 catalog_config: materialize_catalog_config,
1162 sql_addr: materialize_sql_addr,
1163 use_https: config.materialize_use_https,
1164 http_addr: materialize_http_addr,
1165 internal_sql_addr: materialize_internal_sql_addr,
1166 internal_http_addr: materialize_internal_http_addr,
1167 password_sql_addr: materialize_password_sql_addr,
1168 user: materialize_user,
1169 pgclient,
1170 environment_id,
1171 bootstrap_args,
1172 };
1173
1174 Ok(materialize_state)
1175}