mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rdkafka::ClientConfig;
43use rdkafka::producer::Producer;
44use regex::{Captures, Regex};
45use semver::Version;
46use tokio_postgres::error::{DbError, SqlState};
47use tracing::info;
48use url::Url;
49
50use crate::error::PosError;
51use crate::parser::{
52    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
53};
54use crate::util;
55use crate::util::postgres::postgres_client;
56
57pub mod consistency;
58
59mod file;
60mod fivetran;
61mod http;
62mod kafka;
63mod mysql;
64mod nop;
65mod persist;
66mod postgres;
67mod protobuf;
68mod psql;
69mod s3;
70mod schema_registry;
71mod set;
72mod skip_end;
73mod skip_if;
74mod sleep;
75mod sql;
76mod sql_server;
77mod version_check;
78mod webhook;
79
80/// User-settable configuration parameters.
81#[derive(Debug)]
82pub struct Config {
83    // === Testdrive options. ===
84    /// Variables to make available to the testdrive script.
85    ///
86    /// The value of each entry will be made available to the script in a
87    /// variable named `arg.KEY`.
88    pub arg_vars: BTreeMap<String, String>,
89    /// A random number to distinguish each run of a testdrive script.
90    pub seed: Option<u32>,
91    /// Whether to reset Materialize state before executing each script and
92    /// to clean up AWS state after each script.
93    pub reset: bool,
94    /// Force the use of the specified temporary directory to use.
95    ///
96    /// If unspecified, testdrive creates a temporary directory with a random
97    /// name.
98    pub temp_dir: Option<String>,
99    /// Source string to print out on errors.
100    pub source: Option<String>,
101    /// The default timeout for cancellable operations.
102    pub default_timeout: Duration,
103    /// The default number of tries for retriable operations.
104    pub default_max_tries: usize,
105    /// The initial backoff interval for retry operations.
106    ///
107    /// Set to 0 to retry immediately on failure.
108    pub initial_backoff: Duration,
109    /// Backoff factor to use for retry operations.
110    ///
111    /// Set to 1 to retry at a steady pace.
112    pub backoff_factor: f64,
113    /// Should we skip coordinator and catalog consistency checks.
114    pub consistency_checks: consistency::Level,
115    /// Whether to automatically rewrite wrong results instead of failing.
116    pub rewrite_results: bool,
117
118    // === Materialize options. ===
119    /// The pgwire connection parameters for the Materialize instance that
120    /// testdrive will connect to.
121    pub materialize_pgconfig: tokio_postgres::Config,
122    /// The internal pgwire connection parameters for the Materialize instance that
123    /// testdrive will connect to.
124    pub materialize_internal_pgconfig: tokio_postgres::Config,
125    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
126    pub materialize_use_https: bool,
127    /// The port for the public endpoints of the materialize instance that
128    /// testdrive will connect to via HTTP.
129    pub materialize_http_port: u16,
130    /// The port for the internal endpoints of the materialize instance that
131    /// testdrive will connect to via HTTP.
132    pub materialize_internal_http_port: u16,
133    /// The port for the password endpoints of the materialize instance that
134    /// testdrive will connect to via SQL.
135    pub materialize_password_sql_port: u16,
136    /// The port for the SASL endpoints of the materialize instance that
137    /// testdrive will connect to via SQL.
138    pub materialize_sasl_sql_port: u16,
139    /// Session parameters to set after connecting to materialize.
140    pub materialize_params: Vec<(String, String)>,
141    /// An optional catalog configuration.
142    pub materialize_catalog_config: Option<CatalogConfig>,
143    /// Build information
144    pub build_info: &'static BuildInfo,
145    /// Configured cluster replica sizes
146    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
147
148    // === Persist options. ===
149    /// Handle to the persist consensus system.
150    pub persist_consensus_url: Option<SensitiveUrl>,
151    /// Handle to the persist blob storage.
152    pub persist_blob_url: Option<SensitiveUrl>,
153
154    // === Confluent options. ===
155    /// The address of the Kafka broker that testdrive will interact with.
156    pub kafka_addr: String,
157    /// Default number of partitions to use for topics
158    pub kafka_default_partitions: usize,
159    /// Arbitrary rdkafka options for testdrive to use when connecting to the
160    /// Kafka broker.
161    pub kafka_opts: Vec<(String, String)>,
162    /// The URL of the schema registry that testdrive will connect to.
163    pub schema_registry_url: Url,
164    /// An optional path to a TLS certificate that testdrive will present when
165    /// performing client authentication.
166    ///
167    /// The keystore must be in the PKCS#12 format.
168    pub cert_path: Option<String>,
169    /// An optional password for the TLS certificate.
170    pub cert_password: Option<String>,
171    /// An optional username for basic authentication with the Confluent Schema
172    /// Registry.
173    pub ccsr_username: Option<String>,
174    /// An optional password for basic authentication with the Confluent Schema
175    /// Registry.
176    pub ccsr_password: Option<String>,
177
178    // === AWS options. ===
179    /// The configuration to use when connecting to AWS.
180    pub aws_config: SdkConfig,
181    /// The ID of the AWS account that `aws_config` configures.
182    pub aws_account: String,
183
184    // === Fivetran options. ===
185    /// Address of the Fivetran Destination that is currently running.
186    pub fivetran_destination_url: String,
187    /// Directory that is accessible to the Fivetran Destination.
188    pub fivetran_destination_files_path: String,
189}
190
191pub struct MaterializeState {
192    catalog_config: Option<CatalogConfig>,
193
194    sql_addr: String,
195    use_https: bool,
196    http_addr: String,
197    internal_sql_addr: String,
198    internal_http_addr: String,
199    password_sql_addr: String,
200    sasl_sql_addr: String,
201    user: String,
202    pgclient: tokio_postgres::Client,
203    environment_id: EnvironmentId,
204    bootstrap_args: BootstrapArgs,
205}
206
207pub struct State {
208    // === Testdrive state. ===
209    arg_vars: BTreeMap<String, String>,
210    cmd_vars: BTreeMap<String, String>,
211    seed: u32,
212    temp_path: PathBuf,
213    _tempfile: Option<tempfile::TempDir>,
214    default_timeout: Duration,
215    timeout: Duration,
216    max_tries: usize,
217    initial_backoff: Duration,
218    backoff_factor: f64,
219    consistency_checks: consistency::Level,
220    consistency_checks_adhoc_skip: bool,
221    regex: Option<Regex>,
222    regex_replacement: String,
223    error_line_count: usize,
224    error_string: String,
225
226    // === Materialize state. ===
227    materialize: MaterializeState,
228
229    // === Persist state. ===
230    persist_consensus_url: Option<SensitiveUrl>,
231    persist_blob_url: Option<SensitiveUrl>,
232    build_info: &'static BuildInfo,
233    persist_clients: PersistClientCache,
234
235    // === Confluent state. ===
236    schema_registry_url: Url,
237    ccsr_client: mz_ccsr::Client,
238    kafka_addr: String,
239    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
240    kafka_admin_opts: rdkafka::admin::AdminOptions,
241    kafka_config: ClientConfig,
242    kafka_default_partitions: usize,
243    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
244    kafka_topics: BTreeMap<String, usize>,
245
246    // === AWS state. ===
247    aws_account: String,
248    aws_config: SdkConfig,
249
250    // === Database driver state. ===
251    mysql_clients: BTreeMap<String, mysql_async::Conn>,
252    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
253    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
254
255    // === Fivetran state. ===
256    fivetran_destination_url: String,
257    fivetran_destination_files_path: String,
258
259    // === Rewrite state. ===
260    rewrite_results: bool,
261    /// Current file, results are replaced inline
262    pub rewrites: Vec<Rewrite>,
263    /// Start position of currently expected result
264    pub rewrite_pos_start: usize,
265    /// End position of currently expected result
266    pub rewrite_pos_end: usize,
267}
268
269pub struct Rewrite {
270    pub content: String,
271    pub start: usize,
272    pub end: usize,
273}
274
275impl State {
276    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
277        self.cmd_vars
278            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
279        self.cmd_vars.insert(
280            "testdrive.kafka-addr-resolved".into(),
281            self.kafka_addr
282                .to_socket_addrs()
283                .ok()
284                .and_then(|mut addrs| addrs.next())
285                .map(|addr| addr.to_string())
286                .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
287        );
288        self.cmd_vars.insert(
289            "testdrive.schema-registry-url".into(),
290            self.schema_registry_url.to_string(),
291        );
292        self.cmd_vars
293            .insert("testdrive.seed".into(), self.seed.to_string());
294        self.cmd_vars.insert(
295            "testdrive.temp-dir".into(),
296            self.temp_path.display().to_string(),
297        );
298        self.cmd_vars
299            .insert("testdrive.aws-region".into(), self.aws_region().into());
300        self.cmd_vars
301            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
302        self.cmd_vars
303            .insert("testdrive.aws-account".into(), self.aws_account.clone());
304        {
305            let aws_credentials = self
306                .aws_config
307                .credentials_provider()
308                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
309                .provide_credentials()
310                .await
311                .context("fetching AWS credentials")?;
312            self.cmd_vars.insert(
313                "testdrive.aws-access-key-id".into(),
314                aws_credentials.access_key_id().to_owned(),
315            );
316            self.cmd_vars.insert(
317                "testdrive.aws-secret-access-key".into(),
318                aws_credentials.secret_access_key().to_owned(),
319            );
320            self.cmd_vars.insert(
321                "testdrive.aws-token".into(),
322                aws_credentials
323                    .session_token()
324                    .map(|token| token.to_owned())
325                    .unwrap_or_else(String::new),
326            );
327        }
328        self.cmd_vars.insert(
329            "testdrive.materialize-environment-id".into(),
330            self.materialize.environment_id.to_string(),
331        );
332        self.cmd_vars.insert(
333            "testdrive.materialize-sql-addr".into(),
334            self.materialize.sql_addr.clone(),
335        );
336        self.cmd_vars.insert(
337            "testdrive.materialize-internal-sql-addr".into(),
338            self.materialize.internal_sql_addr.clone(),
339        );
340        self.cmd_vars.insert(
341            "testdrive.materialize-password-sql-addr".into(),
342            self.materialize.password_sql_addr.clone(),
343        );
344        self.cmd_vars.insert(
345            "testdrive.materialize-sasl-sql-addr".into(),
346            self.materialize.sasl_sql_addr.clone(),
347        );
348        self.cmd_vars.insert(
349            "testdrive.materialize-user".into(),
350            self.materialize.user.clone(),
351        );
352        self.cmd_vars.insert(
353            "testdrive.fivetran-destination-url".into(),
354            self.fivetran_destination_url.clone(),
355        );
356        self.cmd_vars.insert(
357            "testdrive.fivetran-destination-files-path".into(),
358            self.fivetran_destination_files_path.clone(),
359        );
360
361        for (key, value) in env::vars() {
362            self.cmd_vars.insert(format!("env.{}", key), value);
363        }
364
365        for (key, value) in &self.arg_vars {
366            validate_ident(key)?;
367            self.cmd_vars
368                .insert(format!("arg.{}", key), value.to_string());
369        }
370
371        Ok(())
372    }
373    /// Makes of copy of the durable catalog and runs a function on its
374    /// state. Returns `None` if there's no catalog information in the State.
375    pub async fn with_catalog_copy<F, T>(
376        &self,
377        system_parameter_defaults: BTreeMap<String, String>,
378        build_info: &'static BuildInfo,
379        bootstrap_args: &BootstrapArgs,
380        enable_expression_cache_override: Option<bool>,
381        f: F,
382    ) -> Result<Option<T>, anyhow::Error>
383    where
384        F: FnOnce(ConnCatalog) -> T,
385    {
386        async fn persist_client(
387            persist_consensus_url: SensitiveUrl,
388            persist_blob_url: SensitiveUrl,
389            persist_clients: &PersistClientCache,
390        ) -> Result<PersistClient, anyhow::Error> {
391            let persist_location = PersistLocation {
392                blob_uri: persist_blob_url,
393                consensus_uri: persist_consensus_url,
394            };
395            Ok(persist_clients.open(persist_location).await?)
396        }
397
398        if let Some(CatalogConfig {
399            persist_consensus_url,
400            persist_blob_url,
401        }) = &self.materialize.catalog_config
402        {
403            let persist_client = persist_client(
404                persist_consensus_url.clone(),
405                persist_blob_url.clone(),
406                &self.persist_clients,
407            )
408            .await?;
409            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
410                persist_client,
411                SYSTEM_TIME.clone(),
412                self.materialize.environment_id.clone(),
413                system_parameter_defaults,
414                build_info,
415                bootstrap_args,
416                enable_expression_cache_override,
417            )
418            .await?;
419            let res = f(catalog.for_session(&Session::dummy()));
420            catalog.expire().await;
421            Ok(Some(res))
422        } else {
423            Ok(None)
424        }
425    }
426
427    pub fn aws_endpoint(&self) -> &str {
428        self.aws_config.endpoint_url().unwrap_or("")
429    }
430
431    pub fn aws_region(&self) -> &str {
432        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
433    }
434
435    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
436    /// the consistency checks should be skipped for this current run.
437    pub fn clear_skip_consistency_checks(&mut self) -> bool {
438        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
439    }
440
441    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
442        let (inner_client, _) = postgres_client(
443            &format!(
444                "postgres://mz_system:materialize@{}",
445                self.materialize.internal_sql_addr
446            ),
447            self.default_timeout,
448        )
449        .await?;
450
451        let version = inner_client
452            .query_one("SELECT mz_version_num()", &[])
453            .await
454            .context("getting version of materialize")
455            .map(|row| row.get::<_, i32>(0))?;
456
457        let semver = inner_client
458            .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
459            .await
460            .context("getting semver of materialize")
461            .map(|row| row.get::<_, String>(0))?
462            .parse::<semver::Version>()
463            .context("parsing semver of materialize")?;
464
465        inner_client
466            .batch_execute("ALTER SYSTEM RESET ALL")
467            .await
468            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
469
470        // Dangerous functions are useful for tests so we enable it for all tests.
471        {
472            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
473            let enable_unsafe_functions = if semver >= rename_version {
474                "unsafe_enable_unsafe_functions"
475            } else {
476                "enable_unsafe_functions"
477            };
478            let res = inner_client
479                .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
480                .await
481                .context("enabling dangerous functions");
482            if let Err(e) = res {
483                match e.root_cause().downcast_ref::<DbError>() {
484                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
485                        info!(
486                            "can't enable unsafe functions because the server is safe mode; \
487                             testdrive scripts will fail if they use unsafe functions",
488                        );
489                    }
490                    _ => return Err(e),
491                }
492            }
493        }
494
495        for row in inner_client
496            .query("SHOW DATABASES", &[])
497            .await
498            .context("resetting materialize state: SHOW DATABASES")?
499        {
500            let db_name: String = row.get(0);
501            if db_name.starts_with("testdrive_no_reset_") {
502                continue;
503            }
504            let query = format!(
505                "DROP DATABASE {}",
506                postgres_protocol::escape::escape_identifier(&db_name)
507            );
508            sql::print_query(&query, None);
509            inner_client.batch_execute(&query).await.context(format!(
510                "resetting materialize state: DROP DATABASE {}",
511                db_name,
512            ))?;
513        }
514
515        // Get all user clusters not running any objects owned by users
516        let inactive_user_clusters = "
517        WITH
518            active_user_clusters AS
519            (
520                SELECT DISTINCT cluster_id, object_id
521                FROM
522                    (
523                        SELECT cluster_id, id FROM mz_catalog.mz_sources
524                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
525                        UNION ALL
526                            SELECT cluster_id, id
527                            FROM mz_catalog.mz_materialized_views
528                        UNION ALL
529                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
530                        UNION ALL
531                            SELECT cluster_id, id
532                            FROM mz_internal.mz_subscriptions
533                    )
534                    AS t (cluster_id, object_id)
535                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
536            )
537        SELECT name
538        FROM mz_catalog.mz_clusters
539        WHERE
540            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
541                AND
542            owner_id LIKE 'u%';";
543
544        let inactive_clusters = inner_client
545            .query(inactive_user_clusters, &[])
546            .await
547            .context("resetting materialize state: inactive_user_clusters")?;
548
549        if !inactive_clusters.is_empty() {
550            println!("cleaning up user clusters from previous tests...")
551        }
552
553        for cluster_name in inactive_clusters {
554            let cluster_name: String = cluster_name.get(0);
555            if cluster_name.starts_with("testdrive_no_reset_") {
556                continue;
557            }
558            let query = format!(
559                "DROP CLUSTER {}",
560                postgres_protocol::escape::escape_identifier(&cluster_name)
561            );
562            sql::print_query(&query, None);
563            inner_client.batch_execute(&query).await.context(format!(
564                "resetting materialize state: DROP CLUSTER {}",
565                cluster_name,
566            ))?;
567        }
568
569        inner_client
570            .batch_execute("CREATE DATABASE materialize")
571            .await
572            .context("resetting materialize state: CREATE DATABASE materialize")?;
573
574        // Attempt to remove all users but the current user. Old versions of
575        // Materialize did not support roles, so this degrades gracefully if
576        // mz_roles does not exist.
577        if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
578            for row in rows {
579                let role_name: String = row.get(0);
580                if role_name == self.materialize.user || role_name.starts_with("mz_") {
581                    continue;
582                }
583                let query = format!(
584                    "DROP ROLE {}",
585                    postgres_protocol::escape::escape_identifier(&role_name)
586                );
587                sql::print_query(&query, None);
588                inner_client.batch_execute(&query).await.context(format!(
589                    "resetting materialize state: DROP ROLE {}",
590                    role_name,
591                ))?;
592            }
593        }
594
595        // Alter materialize user with all system privileges.
596        inner_client
597            .batch_execute(&format!(
598                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
599                self.materialize.user
600            ))
601            .await?;
602
603        // Grant initial privileges.
604        inner_client
605            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
606            .await?;
607        inner_client
608            .batch_execute(&format!(
609                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
610                self.materialize.user
611            ))
612            .await?;
613        inner_client
614            .batch_execute(&format!(
615                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
616                self.materialize.user
617            ))
618            .await?;
619
620        let cluster = match version {
621            ..=8199 => "default",
622            8200.. => "quickstart",
623        };
624        inner_client
625            .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
626            .await?;
627        inner_client
628            .batch_execute(&format!(
629                "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
630                self.materialize.user
631            ))
632            .await?;
633
634        Ok(())
635    }
636
637    /// Delete Kafka topics + CCSR subjects that were created in this run
638    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
639        let mut errors: Vec<anyhow::Error> = Vec::new();
640
641        let metadata = self.kafka_producer.client().fetch_metadata(
642            None,
643            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
644        )?;
645
646        let testdrive_topics: Vec<_> = metadata
647            .topics()
648            .iter()
649            .filter_map(|t| {
650                if t.name().starts_with("testdrive-") {
651                    Some(t.name())
652                } else {
653                    None
654                }
655            })
656            .collect();
657
658        if !testdrive_topics.is_empty() {
659            match self
660                .kafka_admin
661                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
662                .await
663            {
664                Ok(res) => {
665                    if res.len() != testdrive_topics.len() {
666                        errors.push(anyhow!(
667                            "kafka topic deletion returned {} results, but exactly {} expected",
668                            res.len(),
669                            testdrive_topics.len()
670                        ));
671                    }
672                    for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
673                        match res {
674                            Ok(_)
675                            | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
676                                ()
677                            }
678                            Err((_, err)) => {
679                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
680                            }
681                        }
682                    }
683                }
684                Err(e) => {
685                    errors.push(e.into());
686                }
687            };
688        }
689
690        match self
691            .ccsr_client
692            .list_subjects()
693            .await
694            .context("listing schema registry subjects")
695        {
696            Ok(subjects) => {
697                let testdrive_subjects: Vec<_> = subjects
698                    .iter()
699                    .filter(|s| s.starts_with("testdrive-"))
700                    .collect();
701
702                for subject in testdrive_subjects {
703                    match self.ccsr_client.delete_subject(subject).await {
704                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
705                        Err(e) => errors.push(e.into()),
706                    }
707                }
708            }
709            Err(e) => {
710                errors.push(e);
711            }
712        }
713
714        if errors.is_empty() {
715            Ok(())
716        } else {
717            bail!(
718                "deleting Kafka topics: {} errors: {}",
719                errors.len(),
720                errors
721                    .into_iter()
722                    .map(|e| e.to_string_with_causes())
723                    .join("\n")
724            );
725        }
726    }
727}
728
729/// Configuration for the Catalog.
730#[derive(Debug, Clone)]
731pub struct CatalogConfig {
732    /// Handle to the persist consensus system.
733    pub persist_consensus_url: SensitiveUrl,
734    /// Handle to the persist blob storage.
735    pub persist_blob_url: SensitiveUrl,
736}
737
738pub enum ControlFlow {
739    Continue,
740    SkipBegin,
741    SkipEnd,
742}
743
744#[async_trait]
745pub(crate) trait Run {
746    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
747}
748
749#[async_trait]
750impl Run for PosCommand {
751    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
752        macro_rules! handle_version {
753            ($version_constraint:expr) => {
754                match $version_constraint {
755                    Some(VersionConstraint { min, max }) => {
756                        match version_check::run_version_check(min, max, state).await {
757                            Ok(true) => return Ok(ControlFlow::Continue),
758                            Ok(false) => {}
759                            Err(err) => return Err(PosError::new(err, self.pos)),
760                        }
761                    }
762                    None => {}
763                }
764            };
765        }
766
767        let wrap_err = |e| PosError::new(e, self.pos);
768        // Substitute variables at startup except for the command-specific ones
769        // Those will be substituted at runtime
770        let ignore_prefix = match &self.command {
771            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
772            _ => None,
773        };
774        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
775            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
776        };
777        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
778            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
779        };
780
781        let r = match self.command {
782            Command::Builtin(mut builtin, version_constraint) => {
783                handle_version!(version_constraint);
784                for val in builtin.args.values_mut() {
785                    *val = subst(val, &state.cmd_vars)?;
786                }
787                for line in &mut builtin.input {
788                    *line = subst(line, &state.cmd_vars)?;
789                }
790                match builtin.name.as_ref() {
791                    "check-consistency" => consistency::run_consistency_checks(state).await,
792                    "skip-consistency-checks" => {
793                        consistency::skip_consistency_checks(builtin, state)
794                    }
795                    "check-shard-tombstone" => {
796                        consistency::run_check_shard_tombstone(builtin, state).await
797                    }
798                    "fivetran-destination" => {
799                        fivetran::run_destination_command(builtin, state).await
800                    }
801                    "file-append" => file::run_append(builtin, state).await,
802                    "file-delete" => file::run_delete(builtin, state).await,
803                    "http-request" => http::run_request(builtin, state).await,
804                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
805                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
806                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
807                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
808                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
809                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
810                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
811                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
812                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
813                    "mysql-connect" => mysql::run_connect(builtin, state).await,
814                    "mysql-execute" => mysql::run_execute(builtin, state).await,
815                    "nop" => nop::run_nop(),
816                    "postgres-connect" => postgres::run_connect(builtin, state).await,
817                    "postgres-execute" => postgres::run_execute(builtin, state).await,
818                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
819                    "protobuf-compile-descriptors" => {
820                        protobuf::run_compile_descriptors(builtin, state).await
821                    }
822                    "psql-execute" => psql::run_execute(builtin, state).await,
823                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
824                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
825                    "s3-file-upload" => s3::run_upload(builtin, state).await,
826                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
827                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
828                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
829                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
830                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
831                    "skip-end" => skip_end::run_skip_end(),
832                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
833                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
834                    "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
835                    "persist-force-compaction" => {
836                        persist::run_force_compaction(builtin, state).await
837                    }
838                    "random-sleep" => sleep::run_random_sleep(builtin),
839                    "set-regex" => set::run_regex_set(builtin, state),
840                    "unset-regex" => set::run_regex_unset(builtin, state),
841                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
842                    "set-max-tries" => set::run_max_tries(builtin, state),
843                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
844                        sleep::run_sleep(builtin)
845                    }
846                    "set" => set::set_vars(builtin, state),
847                    "set-arg-default" => set::run_set_arg_default(builtin, state),
848                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
849                    "set-from-file" => set::run_set_from_file(builtin, state).await,
850                    "webhook-append" => webhook::run_append(builtin, state).await,
851                    _ => {
852                        return Err(PosError::new(
853                            anyhow!("unknown built-in command {}", builtin.name),
854                            self.pos,
855                        ));
856                    }
857                }
858            }
859            Command::Sql(mut sql, version_constraint) => {
860                handle_version!(version_constraint);
861                sql.query = subst(&sql.query, &state.cmd_vars)?;
862                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
863                    for row in expected_rows {
864                        for col in row {
865                            *col = subst(col, &state.cmd_vars)?;
866                        }
867                    }
868                }
869                sql::run_sql(sql, state).await
870            }
871            Command::FailSql(mut sql, version_constraint) => {
872                handle_version!(version_constraint);
873                sql.query = subst(&sql.query, &state.cmd_vars)?;
874                sql.expected_error = match &sql.expected_error {
875                    SqlExpectedError::Contains(s) => {
876                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
877                    }
878                    SqlExpectedError::Exact(s) => {
879                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
880                    }
881                    SqlExpectedError::Regex(s) => {
882                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
883                    }
884                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
885                };
886                sql::run_fail_sql(sql, state).await
887            }
888        };
889
890        r.map_err(wrap_err)
891    }
892}
893
894/// Substituted `${}`-delimited variables from `vars` into `msg`
895fn substitute_vars(
896    msg: &str,
897    vars: &BTreeMap<String, String>,
898    ignore_prefix: &Option<String>,
899    regex_escape: bool,
900) -> Result<String, anyhow::Error> {
901    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
902    let mut err = None;
903    let out = RE.replace_all(msg, |caps: &Captures| {
904        let name = &caps[1];
905        if let Some(ignore_prefix) = &ignore_prefix {
906            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
907                // Do not substitute, leave original variable name in place
908                return caps.get(0).unwrap().as_str().to_string();
909            }
910        }
911
912        if let Some(val) = vars.get(name) {
913            if regex_escape {
914                regex::escape(val)
915            } else {
916                val.to_string()
917            }
918        } else {
919            err = Some(anyhow!("unknown variable: {}", name));
920            "#VAR-MISSING#".to_string()
921        }
922    });
923    match err {
924        Some(err) => Err(err),
925        None => Ok(out.into_owned()),
926    }
927}
928
929/// Initializes a [`State`] object by connecting to the various external
930/// services specified in `config`.
931///
932/// Returns the initialized `State` and a cleanup future. The cleanup future
933/// should be `await`ed only *after* dropping the `State` to check whether any
934/// errors occured while dropping the `State`. This awkward API is a workaround
935/// for the lack of `AsyncDrop` support in Rust.
936pub async fn create_state(
937    config: &Config,
938) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
939    let seed = config.seed.unwrap_or_else(rand::random);
940
941    let (_tempfile, temp_path) = match &config.temp_dir {
942        Some(temp_dir) => {
943            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
944            (None, PathBuf::from(&temp_dir))
945        }
946        _ => {
947            // Stash the tempfile object so that it does not go out of scope and delete
948            // the tempdir prematurely
949            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
950            let temp_path = tempfile_handle.path().to_path_buf();
951            (Some(tempfile_handle), temp_path)
952        }
953    };
954
955    let materialize_catalog_config = config.materialize_catalog_config.clone();
956
957    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
958    info!("Connecting to {}", materialize_url.as_str());
959    let (pgclient, pgconn) = Retry::default()
960        .max_duration(config.default_timeout)
961        .retry_async_canceling(|_| async move {
962            let mut pgconfig = config.materialize_pgconfig.clone();
963            pgconfig.connect_timeout(config.default_timeout);
964            let tls = make_tls(&pgconfig)?;
965            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
966        })
967        .await?;
968
969    let pgconn_task =
970        task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
971
972    let materialize_state =
973        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
974
975    let schema_registry_url = config.schema_registry_url.to_owned();
976
977    let ccsr_client = {
978        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
979
980        if let Some(cert_path) = &config.cert_path {
981            let cert = fs::read(cert_path).context("reading cert")?;
982            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
983            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
984                .context("reading keystore file as pkcs12")?;
985            ccsr_config = ccsr_config.identity(ident);
986        }
987
988        if let Some(ccsr_username) = &config.ccsr_username {
989            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
990        }
991
992        ccsr_config.build().context("Creating CCSR client")?
993    };
994
995    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
996        use rdkafka::admin::{AdminClient, AdminOptions};
997        use rdkafka::producer::FutureProducer;
998
999        let mut kafka_config = create_new_client_config_simple();
1000        kafka_config.set("bootstrap.servers", &config.kafka_addr);
1001        kafka_config.set("group.id", "materialize-testdrive");
1002        kafka_config.set("auto.offset.reset", "earliest");
1003        kafka_config.set("isolation.level", "read_committed");
1004        if let Some(cert_path) = &config.cert_path {
1005            kafka_config.set("security.protocol", "ssl");
1006            kafka_config.set("ssl.keystore.location", cert_path);
1007            if let Some(cert_password) = &config.cert_password {
1008                kafka_config.set("ssl.keystore.password", cert_password);
1009            }
1010        }
1011        kafka_config.set("message.max.bytes", "15728640");
1012
1013        for (key, value) in &config.kafka_opts {
1014            kafka_config.set(key, value);
1015        }
1016
1017        let admin: AdminClient<_> = kafka_config
1018            .create_with_context(MzClientContext::default())
1019            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1020
1021        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1022
1023        let producer: FutureProducer<_> = kafka_config
1024            .create_with_context(MzClientContext::default())
1025            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1026
1027        let topics = BTreeMap::new();
1028
1029        (
1030            config.kafka_addr.to_owned(),
1031            admin,
1032            admin_opts,
1033            producer,
1034            topics,
1035            kafka_config,
1036        )
1037    };
1038
1039    let mut state = State {
1040        // === Testdrive state. ===
1041        arg_vars: config.arg_vars.clone(),
1042        cmd_vars: BTreeMap::new(),
1043        seed,
1044        temp_path,
1045        _tempfile,
1046        default_timeout: config.default_timeout,
1047        timeout: config.default_timeout,
1048        max_tries: config.default_max_tries,
1049        initial_backoff: config.initial_backoff,
1050        backoff_factor: config.backoff_factor,
1051        consistency_checks: config.consistency_checks,
1052        consistency_checks_adhoc_skip: false,
1053        regex: None,
1054        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1055        rewrite_results: config.rewrite_results,
1056        error_line_count: 0,
1057        error_string: "".to_string(),
1058
1059        // === Materialize state. ===
1060        materialize: materialize_state,
1061
1062        // === Persist state. ===
1063        persist_consensus_url: config.persist_consensus_url.clone(),
1064        persist_blob_url: config.persist_blob_url.clone(),
1065        build_info: config.build_info,
1066        persist_clients: PersistClientCache::new(
1067            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1068            &MetricsRegistry::new(),
1069            |_, _| PubSubClientConnection::noop(),
1070        ),
1071
1072        // === Confluent state. ===
1073        schema_registry_url,
1074        ccsr_client,
1075        kafka_addr,
1076        kafka_admin,
1077        kafka_admin_opts,
1078        kafka_config,
1079        kafka_default_partitions: config.kafka_default_partitions,
1080        kafka_producer,
1081        kafka_topics,
1082
1083        // === AWS state. ===
1084        aws_account: config.aws_account.clone(),
1085        aws_config: config.aws_config.clone(),
1086
1087        // === Database driver state. ===
1088        mysql_clients: BTreeMap::new(),
1089        postgres_clients: BTreeMap::new(),
1090        sql_server_clients: BTreeMap::new(),
1091
1092        // === Fivetran state. ===
1093        fivetran_destination_url: config.fivetran_destination_url.clone(),
1094        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1095
1096        rewrites: Vec::new(),
1097        rewrite_pos_start: 0,
1098        rewrite_pos_end: 0,
1099    };
1100    state.initialize_cmd_vars().await?;
1101    Ok((state, pgconn_task))
1102}
1103
1104async fn create_materialize_state(
1105    config: &&Config,
1106    materialize_catalog_config: Option<CatalogConfig>,
1107    pgclient: tokio_postgres::Client,
1108) -> Result<MaterializeState, anyhow::Error> {
1109    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1110    let materialize_internal_url =
1111        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1112
1113    for (key, value) in &config.materialize_params {
1114        pgclient
1115            .batch_execute(&format!("SET {key} = {value}"))
1116            .await
1117            .context("setting session parameter")?;
1118    }
1119
1120    let materialize_user = config
1121        .materialize_pgconfig
1122        .get_user()
1123        .expect("testdrive URL must contain user")
1124        .to_string();
1125
1126    let materialize_sql_addr = format!(
1127        "{}:{}",
1128        materialize_url.host_str().unwrap(),
1129        materialize_url.port().unwrap()
1130    );
1131    let materialize_http_addr = format!(
1132        "{}:{}",
1133        materialize_url.host_str().unwrap(),
1134        config.materialize_http_port
1135    );
1136    let materialize_internal_sql_addr = format!(
1137        "{}:{}",
1138        materialize_internal_url.host_str().unwrap(),
1139        materialize_internal_url.port().unwrap()
1140    );
1141    let materialize_password_sql_addr = format!(
1142        "{}:{}",
1143        materialize_url.host_str().unwrap(),
1144        config.materialize_password_sql_port
1145    );
1146    let materialize_sasl_sql_addr = format!(
1147        "{}:{}",
1148        materialize_url.host_str().unwrap(),
1149        config.materialize_sasl_sql_port
1150    );
1151    let materialize_internal_http_addr = format!(
1152        "{}:{}",
1153        materialize_internal_url.host_str().unwrap(),
1154        config.materialize_internal_http_port
1155    );
1156    let environment_id = pgclient
1157        .query_one("SELECT mz_environment_id()", &[])
1158        .await?
1159        .get::<_, String>(0)
1160        .parse()
1161        .context("parsing environment ID")?;
1162
1163    let bootstrap_args = BootstrapArgs {
1164        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1165        default_cluster_replica_size: "ABC".to_string(),
1166        default_cluster_replication_factor: 1,
1167        bootstrap_role: None,
1168    };
1169
1170    let materialize_state = MaterializeState {
1171        catalog_config: materialize_catalog_config,
1172        sql_addr: materialize_sql_addr,
1173        use_https: config.materialize_use_https,
1174        http_addr: materialize_http_addr,
1175        internal_sql_addr: materialize_internal_sql_addr,
1176        internal_http_addr: materialize_internal_http_addr,
1177        password_sql_addr: materialize_password_sql_addr,
1178        sasl_sql_addr: materialize_sasl_sql_addr,
1179        user: materialize_user,
1180        pgclient,
1181        environment_id,
1182        bootstrap_args,
1183    };
1184
1185    Ok(materialize_state)
1186}