1use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rand::Rng;
43use rdkafka::ClientConfig;
44use rdkafka::producer::Producer;
45use regex::{Captures, Regex};
46use semver::Version;
47use tokio_postgres::error::{DbError, SqlState};
48use tracing::info;
49use url::Url;
50
51use crate::error::PosError;
52use crate::parser::{
53 Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
54};
55use crate::util;
56use crate::util::postgres::postgres_client;
57
58pub mod consistency;
59
60mod file;
61mod fivetran;
62mod http;
63mod kafka;
64mod mysql;
65mod nop;
66mod persist;
67mod postgres;
68mod protobuf;
69mod psql;
70mod s3;
71mod schema_registry;
72mod set;
73mod skip_end;
74mod skip_if;
75mod sleep;
76mod sql;
77mod sql_server;
78mod version_check;
79mod webhook;
80
81#[derive(Debug)]
83pub struct Config {
84 pub arg_vars: BTreeMap<String, String>,
90 pub seed: Option<u32>,
92 pub reset: bool,
95 pub temp_dir: Option<String>,
100 pub source: Option<String>,
102 pub default_timeout: Duration,
104 pub default_max_tries: usize,
106 pub initial_backoff: Duration,
110 pub backoff_factor: f64,
114 pub consistency_checks: consistency::Level,
116 pub rewrite_results: bool,
118
119 pub materialize_pgconfig: tokio_postgres::Config,
123 pub materialize_internal_pgconfig: tokio_postgres::Config,
126 pub materialize_use_https: bool,
128 pub materialize_http_port: u16,
131 pub materialize_internal_http_port: u16,
134 pub materialize_params: Vec<(String, String)>,
136 pub materialize_catalog_config: Option<CatalogConfig>,
138 pub build_info: &'static BuildInfo,
140 pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
142
143 pub persist_consensus_url: Option<SensitiveUrl>,
146 pub persist_blob_url: Option<SensitiveUrl>,
148
149 pub kafka_addr: String,
152 pub kafka_default_partitions: usize,
154 pub kafka_opts: Vec<(String, String)>,
157 pub schema_registry_url: Url,
159 pub cert_path: Option<String>,
164 pub cert_password: Option<String>,
166 pub ccsr_username: Option<String>,
169 pub ccsr_password: Option<String>,
172
173 pub aws_config: SdkConfig,
176 pub aws_account: String,
178
179 pub fivetran_destination_url: String,
182 pub fivetran_destination_files_path: String,
184}
185
186pub struct MaterializeState {
187 catalog_config: Option<CatalogConfig>,
188
189 sql_addr: String,
190 use_https: bool,
191 http_addr: String,
192 internal_sql_addr: String,
193 internal_http_addr: String,
194 user: String,
195 pgclient: tokio_postgres::Client,
196 environment_id: EnvironmentId,
197 bootstrap_args: BootstrapArgs,
198}
199
200pub struct State {
201 arg_vars: BTreeMap<String, String>,
203 cmd_vars: BTreeMap<String, String>,
204 seed: u32,
205 temp_path: PathBuf,
206 _tempfile: Option<tempfile::TempDir>,
207 default_timeout: Duration,
208 timeout: Duration,
209 max_tries: usize,
210 initial_backoff: Duration,
211 backoff_factor: f64,
212 consistency_checks: consistency::Level,
213 consistency_checks_adhoc_skip: bool,
214 regex: Option<Regex>,
215 regex_replacement: String,
216 error_line_count: usize,
217 error_string: String,
218
219 materialize: MaterializeState,
221
222 persist_consensus_url: Option<SensitiveUrl>,
224 persist_blob_url: Option<SensitiveUrl>,
225 build_info: &'static BuildInfo,
226 persist_clients: PersistClientCache,
227
228 schema_registry_url: Url,
230 ccsr_client: mz_ccsr::Client,
231 kafka_addr: String,
232 kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
233 kafka_admin_opts: rdkafka::admin::AdminOptions,
234 kafka_config: ClientConfig,
235 kafka_default_partitions: usize,
236 kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
237 kafka_topics: BTreeMap<String, usize>,
238
239 aws_account: String,
241 aws_config: SdkConfig,
242
243 mysql_clients: BTreeMap<String, mysql_async::Conn>,
245 postgres_clients: BTreeMap<String, tokio_postgres::Client>,
246 sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
247
248 fivetran_destination_url: String,
250 fivetran_destination_files_path: String,
251
252 rewrite_results: bool,
254 pub rewrites: Vec<Rewrite>,
256 pub rewrite_pos_start: usize,
258 pub rewrite_pos_end: usize,
260}
261
262pub struct Rewrite {
263 pub content: String,
264 pub start: usize,
265 pub end: usize,
266}
267
268impl State {
269 pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
270 self.cmd_vars
271 .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
272 self.cmd_vars.insert(
273 "testdrive.kafka-addr-resolved".into(),
274 self.kafka_addr
275 .to_socket_addrs()
276 .ok()
277 .and_then(|mut addrs| addrs.next())
278 .map(|addr| addr.to_string())
279 .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
280 );
281 self.cmd_vars.insert(
282 "testdrive.schema-registry-url".into(),
283 self.schema_registry_url.to_string(),
284 );
285 self.cmd_vars
286 .insert("testdrive.seed".into(), self.seed.to_string());
287 self.cmd_vars.insert(
288 "testdrive.temp-dir".into(),
289 self.temp_path.display().to_string(),
290 );
291 self.cmd_vars
292 .insert("testdrive.aws-region".into(), self.aws_region().into());
293 self.cmd_vars
294 .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
295 self.cmd_vars
296 .insert("testdrive.aws-account".into(), self.aws_account.clone());
297 {
298 let aws_credentials = self
299 .aws_config
300 .credentials_provider()
301 .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
302 .provide_credentials()
303 .await
304 .context("fetching AWS credentials")?;
305 self.cmd_vars.insert(
306 "testdrive.aws-access-key-id".into(),
307 aws_credentials.access_key_id().to_owned(),
308 );
309 self.cmd_vars.insert(
310 "testdrive.aws-secret-access-key".into(),
311 aws_credentials.secret_access_key().to_owned(),
312 );
313 self.cmd_vars.insert(
314 "testdrive.aws-token".into(),
315 aws_credentials
316 .session_token()
317 .map(|token| token.to_owned())
318 .unwrap_or_else(String::new),
319 );
320 }
321 self.cmd_vars.insert(
322 "testdrive.materialize-environment-id".into(),
323 self.materialize.environment_id.to_string(),
324 );
325 self.cmd_vars.insert(
326 "testdrive.materialize-sql-addr".into(),
327 self.materialize.sql_addr.clone(),
328 );
329 self.cmd_vars.insert(
330 "testdrive.materialize-internal-sql-addr".into(),
331 self.materialize.internal_sql_addr.clone(),
332 );
333 self.cmd_vars.insert(
334 "testdrive.materialize-user".into(),
335 self.materialize.user.clone(),
336 );
337 self.cmd_vars.insert(
338 "testdrive.fivetran-destination-url".into(),
339 self.fivetran_destination_url.clone(),
340 );
341 self.cmd_vars.insert(
342 "testdrive.fivetran-destination-files-path".into(),
343 self.fivetran_destination_files_path.clone(),
344 );
345
346 for (key, value) in env::vars() {
347 self.cmd_vars.insert(format!("env.{}", key), value);
348 }
349
350 for (key, value) in &self.arg_vars {
351 validate_ident(key)?;
352 self.cmd_vars
353 .insert(format!("arg.{}", key), value.to_string());
354 }
355
356 Ok(())
357 }
358 pub async fn with_catalog_copy<F, T>(
361 &self,
362 system_parameter_defaults: BTreeMap<String, String>,
363 build_info: &'static BuildInfo,
364 bootstrap_args: &BootstrapArgs,
365 enable_expression_cache_override: Option<bool>,
366 f: F,
367 ) -> Result<Option<T>, anyhow::Error>
368 where
369 F: FnOnce(ConnCatalog) -> T,
370 {
371 async fn persist_client(
372 persist_consensus_url: SensitiveUrl,
373 persist_blob_url: SensitiveUrl,
374 persist_clients: &PersistClientCache,
375 ) -> Result<PersistClient, anyhow::Error> {
376 let persist_location = PersistLocation {
377 blob_uri: persist_blob_url,
378 consensus_uri: persist_consensus_url,
379 };
380 Ok(persist_clients.open(persist_location).await?)
381 }
382
383 if let Some(CatalogConfig {
384 persist_consensus_url,
385 persist_blob_url,
386 }) = &self.materialize.catalog_config
387 {
388 let persist_client = persist_client(
389 persist_consensus_url.clone(),
390 persist_blob_url.clone(),
391 &self.persist_clients,
392 )
393 .await?;
394 let catalog = Catalog::open_debug_read_only_persist_catalog_config(
395 persist_client,
396 SYSTEM_TIME.clone(),
397 self.materialize.environment_id.clone(),
398 system_parameter_defaults,
399 build_info,
400 bootstrap_args,
401 enable_expression_cache_override,
402 )
403 .await?;
404 let res = f(catalog.for_session(&Session::dummy()));
405 catalog.expire().await;
406 Ok(Some(res))
407 } else {
408 Ok(None)
409 }
410 }
411
412 pub fn aws_endpoint(&self) -> &str {
413 self.aws_config.endpoint_url().unwrap_or("")
414 }
415
416 pub fn aws_region(&self) -> &str {
417 self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
418 }
419
420 pub fn clear_skip_consistency_checks(&mut self) -> bool {
423 std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
424 }
425
426 pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
427 let (inner_client, _) = postgres_client(
428 &format!(
429 "postgres://mz_system:materialize@{}",
430 self.materialize.internal_sql_addr
431 ),
432 self.default_timeout,
433 )
434 .await?;
435
436 let version = inner_client
437 .query_one("SELECT mz_version_num()", &[])
438 .await
439 .context("getting version of materialize")
440 .map(|row| row.get::<_, i32>(0))?;
441
442 let semver = inner_client
443 .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
444 .await
445 .context("getting semver of materialize")
446 .map(|row| row.get::<_, String>(0))?
447 .parse::<semver::Version>()
448 .context("parsing semver of materialize")?;
449
450 inner_client
451 .batch_execute("ALTER SYSTEM RESET ALL")
452 .await
453 .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
454
455 {
457 let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
458 let enable_unsafe_functions = if semver >= rename_version {
459 "unsafe_enable_unsafe_functions"
460 } else {
461 "enable_unsafe_functions"
462 };
463 let res = inner_client
464 .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
465 .await
466 .context("enabling dangerous functions");
467 if let Err(e) = res {
468 match e.root_cause().downcast_ref::<DbError>() {
469 Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
470 info!(
471 "can't enable unsafe functions because the server is safe mode; \
472 testdrive scripts will fail if they use unsafe functions",
473 );
474 }
475 _ => return Err(e),
476 }
477 }
478 }
479
480 for row in inner_client
481 .query("SHOW DATABASES", &[])
482 .await
483 .context("resetting materialize state: SHOW DATABASES")?
484 {
485 let db_name: String = row.get(0);
486 if db_name.starts_with("testdrive_no_reset_") {
487 continue;
488 }
489 let query = format!(
490 "DROP DATABASE {}",
491 postgres_protocol::escape::escape_identifier(&db_name)
492 );
493 sql::print_query(&query, None);
494 inner_client.batch_execute(&query).await.context(format!(
495 "resetting materialize state: DROP DATABASE {}",
496 db_name,
497 ))?;
498 }
499
500 let inactive_user_clusters = "
502 WITH
503 active_user_clusters AS
504 (
505 SELECT DISTINCT cluster_id, object_id
506 FROM
507 (
508 SELECT cluster_id, id FROM mz_catalog.mz_sources
509 UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
510 UNION ALL
511 SELECT cluster_id, id
512 FROM mz_catalog.mz_materialized_views
513 UNION ALL
514 SELECT cluster_id, id FROM mz_catalog.mz_indexes
515 UNION ALL
516 SELECT cluster_id, id
517 FROM mz_internal.mz_subscriptions
518 )
519 AS t (cluster_id, object_id)
520 WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
521 )
522 SELECT name
523 FROM mz_catalog.mz_clusters
524 WHERE
525 id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
526 AND
527 owner_id LIKE 'u%';";
528
529 let inactive_clusters = inner_client
530 .query(inactive_user_clusters, &[])
531 .await
532 .context("resetting materialize state: inactive_user_clusters")?;
533
534 if !inactive_clusters.is_empty() {
535 println!("cleaning up user clusters from previous tests...")
536 }
537
538 for cluster_name in inactive_clusters {
539 let cluster_name: String = cluster_name.get(0);
540 if cluster_name.starts_with("testdrive_no_reset_") {
541 continue;
542 }
543 let query = format!(
544 "DROP CLUSTER {}",
545 postgres_protocol::escape::escape_identifier(&cluster_name)
546 );
547 sql::print_query(&query, None);
548 inner_client.batch_execute(&query).await.context(format!(
549 "resetting materialize state: DROP CLUSTER {}",
550 cluster_name,
551 ))?;
552 }
553
554 inner_client
555 .batch_execute("CREATE DATABASE materialize")
556 .await
557 .context("resetting materialize state: CREATE DATABASE materialize")?;
558
559 if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
563 for row in rows {
564 let role_name: String = row.get(0);
565 if role_name == self.materialize.user || role_name.starts_with("mz_") {
566 continue;
567 }
568 let query = format!(
569 "DROP ROLE {}",
570 postgres_protocol::escape::escape_identifier(&role_name)
571 );
572 sql::print_query(&query, None);
573 inner_client.batch_execute(&query).await.context(format!(
574 "resetting materialize state: DROP ROLE {}",
575 role_name,
576 ))?;
577 }
578 }
579
580 inner_client
582 .batch_execute(&format!(
583 "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
584 self.materialize.user
585 ))
586 .await?;
587
588 inner_client
590 .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
591 .await?;
592 inner_client
593 .batch_execute(&format!(
594 "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
595 self.materialize.user
596 ))
597 .await?;
598 inner_client
599 .batch_execute(&format!(
600 "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
601 self.materialize.user
602 ))
603 .await?;
604
605 let cluster = match version {
606 ..=8199 => "default",
607 8200.. => "quickstart",
608 };
609 inner_client
610 .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
611 .await?;
612 inner_client
613 .batch_execute(&format!(
614 "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
615 self.materialize.user
616 ))
617 .await?;
618
619 Ok(())
620 }
621
622 pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
624 let mut errors: Vec<anyhow::Error> = Vec::new();
625
626 let metadata = self.kafka_producer.client().fetch_metadata(
627 None,
628 Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
629 )?;
630
631 let testdrive_topics: Vec<_> = metadata
632 .topics()
633 .iter()
634 .filter_map(|t| {
635 if t.name().starts_with("testdrive-") {
636 Some(t.name())
637 } else {
638 None
639 }
640 })
641 .collect();
642
643 if !testdrive_topics.is_empty() {
644 match self
645 .kafka_admin
646 .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
647 .await
648 {
649 Ok(res) => {
650 if res.len() != testdrive_topics.len() {
651 errors.push(anyhow!(
652 "kafka topic deletion returned {} results, but exactly {} expected",
653 res.len(),
654 testdrive_topics.len()
655 ));
656 }
657 for (res, topic) in res.iter().zip(testdrive_topics.iter()) {
658 match res {
659 Ok(_)
660 | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
661 ()
662 }
663 Err((_, err)) => {
664 errors.push(anyhow!("unable to delete {}: {}", topic, err));
665 }
666 }
667 }
668 }
669 Err(e) => {
670 errors.push(e.into());
671 }
672 };
673 }
674
675 match self
676 .ccsr_client
677 .list_subjects()
678 .await
679 .context("listing schema registry subjects")
680 {
681 Ok(subjects) => {
682 let testdrive_subjects: Vec<_> = subjects
683 .iter()
684 .filter(|s| s.starts_with("testdrive-"))
685 .collect();
686
687 for subject in testdrive_subjects {
688 match self.ccsr_client.delete_subject(subject).await {
689 Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
690 Err(e) => errors.push(e.into()),
691 }
692 }
693 }
694 Err(e) => {
695 errors.push(e);
696 }
697 }
698
699 if errors.is_empty() {
700 Ok(())
701 } else {
702 bail!(
703 "deleting Kafka topics: {} errors: {}",
704 errors.len(),
705 errors
706 .into_iter()
707 .map(|e| e.to_string_with_causes())
708 .join("\n")
709 );
710 }
711 }
712}
713
714#[derive(Debug, Clone)]
716pub struct CatalogConfig {
717 pub persist_consensus_url: SensitiveUrl,
719 pub persist_blob_url: SensitiveUrl,
721}
722
723pub enum ControlFlow {
724 Continue,
725 SkipBegin,
726 SkipEnd,
727}
728
729#[async_trait]
730pub(crate) trait Run {
731 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
732}
733
734#[async_trait]
735impl Run for PosCommand {
736 async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
737 macro_rules! handle_version {
738 ($version_constraint:expr) => {
739 match $version_constraint {
740 Some(VersionConstraint { min, max }) => {
741 match version_check::run_version_check(min, max, state).await {
742 Ok(true) => return Ok(ControlFlow::Continue),
743 Ok(false) => {}
744 Err(err) => return Err(PosError::new(err, self.pos)),
745 }
746 }
747 None => {}
748 }
749 };
750 }
751
752 let wrap_err = |e| PosError::new(e, self.pos);
753 let ignore_prefix = match &self.command {
756 Command::Builtin(builtin, _) => Some(builtin.name.clone()),
757 _ => None,
758 };
759 let subst = |msg: &str, vars: &BTreeMap<String, String>| {
760 substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
761 };
762 let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
763 substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
764 };
765
766 let r = match self.command {
767 Command::Builtin(mut builtin, version_constraint) => {
768 handle_version!(version_constraint);
769 for val in builtin.args.values_mut() {
770 *val = subst(val, &state.cmd_vars)?;
771 }
772 for line in &mut builtin.input {
773 *line = subst(line, &state.cmd_vars)?;
774 }
775 match builtin.name.as_ref() {
776 "check-consistency" => consistency::run_consistency_checks(state).await,
777 "skip-consistency-checks" => {
778 consistency::skip_consistency_checks(builtin, state)
779 }
780 "check-shard-tombstone" => {
781 consistency::run_check_shard_tombstone(builtin, state).await
782 }
783 "fivetran-destination" => {
784 fivetran::run_destination_command(builtin, state).await
785 }
786 "file-append" => file::run_append(builtin, state).await,
787 "file-delete" => file::run_delete(builtin, state).await,
788 "http-request" => http::run_request(builtin, state).await,
789 "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
790 "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
791 "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
792 "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
793 "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
794 "kafka-ingest" => kafka::run_ingest(builtin, state).await,
795 "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
796 "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
797 "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
798 "mysql-connect" => mysql::run_connect(builtin, state).await,
799 "mysql-execute" => mysql::run_execute(builtin, state).await,
800 "nop" => nop::run_nop(),
801 "postgres-connect" => postgres::run_connect(builtin, state).await,
802 "postgres-execute" => postgres::run_execute(builtin, state).await,
803 "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
804 "protobuf-compile-descriptors" => {
805 protobuf::run_compile_descriptors(builtin, state).await
806 }
807 "psql-execute" => psql::run_execute(builtin, state).await,
808 "s3-verify-data" => s3::run_verify_data(builtin, state).await,
809 "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
810 "s3-file-upload" => s3::run_upload(builtin, state).await,
811 "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
812 "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
813 "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
814 "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
815 "skip-if" => skip_if::run_skip_if(builtin, state).await,
816 "skip-end" => skip_end::run_skip_end(),
817 "sql-server-connect" => sql_server::run_connect(builtin, state).await,
818 "sql-server-execute" => sql_server::run_execute(builtin, state).await,
819 "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
820 "persist-force-compaction" => {
821 persist::run_force_compaction(builtin, state).await
822 }
823 "random-sleep" => sleep::run_random_sleep(builtin),
824 "set-regex" => set::run_regex_set(builtin, state),
825 "unset-regex" => set::run_regex_unset(builtin, state),
826 "set-sql-timeout" => set::run_sql_timeout(builtin, state),
827 "set-max-tries" => set::run_max_tries(builtin, state),
828 "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
829 sleep::run_sleep(builtin)
830 }
831 "set" => set::set_vars(builtin, state),
832 "set-arg-default" => set::run_set_arg_default(builtin, state),
833 "set-from-sql" => set::run_set_from_sql(builtin, state).await,
834 "set-from-file" => set::run_set_from_file(builtin, state).await,
835 "webhook-append" => webhook::run_append(builtin, state).await,
836 _ => {
837 return Err(PosError::new(
838 anyhow!("unknown built-in command {}", builtin.name),
839 self.pos,
840 ));
841 }
842 }
843 }
844 Command::Sql(mut sql, version_constraint) => {
845 handle_version!(version_constraint);
846 sql.query = subst(&sql.query, &state.cmd_vars)?;
847 if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
848 for row in expected_rows {
849 for col in row {
850 *col = subst(col, &state.cmd_vars)?;
851 }
852 }
853 }
854 sql::run_sql(sql, state).await
855 }
856 Command::FailSql(mut sql, version_constraint) => {
857 handle_version!(version_constraint);
858 sql.query = subst(&sql.query, &state.cmd_vars)?;
859 sql.expected_error = match &sql.expected_error {
860 SqlExpectedError::Contains(s) => {
861 SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
862 }
863 SqlExpectedError::Exact(s) => {
864 SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
865 }
866 SqlExpectedError::Regex(s) => {
867 SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
868 }
869 SqlExpectedError::Timeout => SqlExpectedError::Timeout,
870 };
871 sql::run_fail_sql(sql, state).await
872 }
873 };
874
875 r.map_err(wrap_err)
876 }
877}
878
879fn substitute_vars(
881 msg: &str,
882 vars: &BTreeMap<String, String>,
883 ignore_prefix: &Option<String>,
884 regex_escape: bool,
885) -> Result<String, anyhow::Error> {
886 static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
887 let mut err = None;
888 let out = RE.replace_all(msg, |caps: &Captures| {
889 let name = &caps[1];
890 if let Some(ignore_prefix) = &ignore_prefix {
891 if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
892 return caps.get(0).unwrap().as_str().to_string();
894 }
895 }
896
897 if let Some(val) = vars.get(name) {
898 if regex_escape {
899 regex::escape(val)
900 } else {
901 val.to_string()
902 }
903 } else {
904 err = Some(anyhow!("unknown variable: {}", name));
905 "#VAR-MISSING#".to_string()
906 }
907 });
908 match err {
909 Some(err) => Err(err),
910 None => Ok(out.into_owned()),
911 }
912}
913
914pub async fn create_state(
922 config: &Config,
923) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
924 let seed = config.seed.unwrap_or_else(|| rand::thread_rng().r#gen());
925
926 let (_tempfile, temp_path) = match &config.temp_dir {
927 Some(temp_dir) => {
928 fs::create_dir_all(temp_dir).context("creating temporary directory")?;
929 (None, PathBuf::from(&temp_dir))
930 }
931 _ => {
932 let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
935 let temp_path = tempfile_handle.path().to_path_buf();
936 (Some(tempfile_handle), temp_path)
937 }
938 };
939
940 let materialize_catalog_config = config.materialize_catalog_config.clone();
941
942 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
943 info!("Connecting to {}", materialize_url.as_str());
944 let (pgclient, pgconn) = Retry::default()
945 .max_duration(config.default_timeout)
946 .retry_async_canceling(|_| async move {
947 let mut pgconfig = config.materialize_pgconfig.clone();
948 pgconfig.connect_timeout(config.default_timeout);
949 let tls = make_tls(&pgconfig)?;
950 pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
951 })
952 .await?;
953
954 let pgconn_task = task::spawn(|| "pgconn_task", pgconn).map(|join| {
955 join.expect("pgconn_task unexpectedly canceled")
956 .context("running SQL connection")
957 });
958
959 let materialize_state =
960 create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
961
962 let schema_registry_url = config.schema_registry_url.to_owned();
963
964 let ccsr_client = {
965 let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
966
967 if let Some(cert_path) = &config.cert_path {
968 let cert = fs::read(cert_path).context("reading cert")?;
969 let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
970 let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
971 .context("reading keystore file as pkcs12")?;
972 ccsr_config = ccsr_config.identity(ident);
973 }
974
975 if let Some(ccsr_username) = &config.ccsr_username {
976 ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
977 }
978
979 ccsr_config.build().context("Creating CCSR client")?
980 };
981
982 let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
983 use rdkafka::admin::{AdminClient, AdminOptions};
984 use rdkafka::producer::FutureProducer;
985
986 let mut kafka_config = create_new_client_config_simple();
987 kafka_config.set("bootstrap.servers", &config.kafka_addr);
988 kafka_config.set("group.id", "materialize-testdrive");
989 kafka_config.set("auto.offset.reset", "earliest");
990 kafka_config.set("isolation.level", "read_committed");
991 if let Some(cert_path) = &config.cert_path {
992 kafka_config.set("security.protocol", "ssl");
993 kafka_config.set("ssl.keystore.location", cert_path);
994 if let Some(cert_password) = &config.cert_password {
995 kafka_config.set("ssl.keystore.password", cert_password);
996 }
997 }
998 kafka_config.set("message.max.bytes", "15728640");
999
1000 for (key, value) in &config.kafka_opts {
1001 kafka_config.set(key, value);
1002 }
1003
1004 let admin: AdminClient<_> = kafka_config
1005 .create_with_context(MzClientContext::default())
1006 .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1007
1008 let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1009
1010 let producer: FutureProducer<_> = kafka_config
1011 .create_with_context(MzClientContext::default())
1012 .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1013
1014 let topics = BTreeMap::new();
1015
1016 (
1017 config.kafka_addr.to_owned(),
1018 admin,
1019 admin_opts,
1020 producer,
1021 topics,
1022 kafka_config,
1023 )
1024 };
1025
1026 let mut state = State {
1027 arg_vars: config.arg_vars.clone(),
1029 cmd_vars: BTreeMap::new(),
1030 seed,
1031 temp_path,
1032 _tempfile,
1033 default_timeout: config.default_timeout,
1034 timeout: config.default_timeout,
1035 max_tries: config.default_max_tries,
1036 initial_backoff: config.initial_backoff,
1037 backoff_factor: config.backoff_factor,
1038 consistency_checks: config.consistency_checks,
1039 consistency_checks_adhoc_skip: false,
1040 regex: None,
1041 regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1042 rewrite_results: config.rewrite_results,
1043 error_line_count: 0,
1044 error_string: "".to_string(),
1045
1046 materialize: materialize_state,
1048
1049 persist_consensus_url: config.persist_consensus_url.clone(),
1051 persist_blob_url: config.persist_blob_url.clone(),
1052 build_info: config.build_info,
1053 persist_clients: PersistClientCache::new(
1054 PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1055 &MetricsRegistry::new(),
1056 |_, _| PubSubClientConnection::noop(),
1057 ),
1058
1059 schema_registry_url,
1061 ccsr_client,
1062 kafka_addr,
1063 kafka_admin,
1064 kafka_admin_opts,
1065 kafka_config,
1066 kafka_default_partitions: config.kafka_default_partitions,
1067 kafka_producer,
1068 kafka_topics,
1069
1070 aws_account: config.aws_account.clone(),
1072 aws_config: config.aws_config.clone(),
1073
1074 mysql_clients: BTreeMap::new(),
1076 postgres_clients: BTreeMap::new(),
1077 sql_server_clients: BTreeMap::new(),
1078
1079 fivetran_destination_url: config.fivetran_destination_url.clone(),
1081 fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1082
1083 rewrites: Vec::new(),
1084 rewrite_pos_start: 0,
1085 rewrite_pos_end: 0,
1086 };
1087 state.initialize_cmd_vars().await?;
1088 Ok((state, pgconn_task))
1089}
1090
1091async fn create_materialize_state(
1092 config: &&Config,
1093 materialize_catalog_config: Option<CatalogConfig>,
1094 pgclient: tokio_postgres::Client,
1095) -> Result<MaterializeState, anyhow::Error> {
1096 let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1097 let materialize_internal_url =
1098 util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1099
1100 for (key, value) in &config.materialize_params {
1101 pgclient
1102 .batch_execute(&format!("SET {key} = {value}"))
1103 .await
1104 .context("setting session parameter")?;
1105 }
1106
1107 let materialize_user = config
1108 .materialize_pgconfig
1109 .get_user()
1110 .expect("testdrive URL must contain user")
1111 .to_string();
1112
1113 let materialize_sql_addr = format!(
1114 "{}:{}",
1115 materialize_url.host_str().unwrap(),
1116 materialize_url.port().unwrap()
1117 );
1118 let materialize_http_addr = format!(
1119 "{}:{}",
1120 materialize_url.host_str().unwrap(),
1121 config.materialize_http_port
1122 );
1123 let materialize_internal_sql_addr = format!(
1124 "{}:{}",
1125 materialize_internal_url.host_str().unwrap(),
1126 materialize_internal_url.port().unwrap()
1127 );
1128 let materialize_internal_http_addr = format!(
1129 "{}:{}",
1130 materialize_internal_url.host_str().unwrap(),
1131 config.materialize_internal_http_port
1132 );
1133 let environment_id = pgclient
1134 .query_one("SELECT mz_environment_id()", &[])
1135 .await?
1136 .get::<_, String>(0)
1137 .parse()
1138 .context("parsing environment ID")?;
1139
1140 let bootstrap_args = BootstrapArgs {
1141 cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1142 default_cluster_replica_size: "ABC".to_string(),
1143 default_cluster_replication_factor: 1,
1144 bootstrap_role: None,
1145 };
1146
1147 let materialize_state = MaterializeState {
1148 catalog_config: materialize_catalog_config,
1149 sql_addr: materialize_sql_addr,
1150 use_https: config.materialize_use_https,
1151 http_addr: materialize_http_addr,
1152 internal_sql_addr: materialize_internal_sql_addr,
1153 internal_http_addr: materialize_internal_http_addr,
1154 user: materialize_user,
1155 pgclient,
1156 environment_id,
1157 bootstrap_args,
1158 };
1159
1160 Ok(materialize_state)
1161}