Skip to main content

mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_ccsr::SubjectVersion;
30use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
31use mz_ore::error::ErrorExt;
32use mz_ore::metrics::MetricsRegistry;
33use mz_ore::now::SYSTEM_TIME;
34use mz_ore::retry::Retry;
35use mz_ore::task;
36use mz_ore::url::SensitiveUrl;
37use mz_persist_client::cache::PersistClientCache;
38use mz_persist_client::cfg::PersistConfig;
39use mz_persist_client::rpc::PubSubClientConnection;
40use mz_persist_client::{PersistClient, PersistLocation};
41use mz_postgres_util::{
42    Sql, batch_execute as pg_batch_execute, query as pg_query, query_one as pg_query_one,
43    sql as pg_sql,
44};
45use mz_sql::catalog::EnvironmentId;
46use mz_tls_util::make_tls;
47use rdkafka::ClientConfig;
48use rdkafka::producer::Producer;
49use regex::{Captures, Regex};
50use semver::Version;
51use tokio_postgres::error::{DbError, SqlState};
52use tracing::info;
53use url::Url;
54
55use crate::error::PosError;
56use crate::parser::{
57    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
58};
59use crate::util;
60use crate::util::postgres::postgres_client;
61
62pub mod consistency;
63
64mod duckdb;
65mod file;
66mod fivetran;
67mod http;
68mod kafka;
69mod mysql;
70mod nop;
71mod persist;
72mod postgres;
73mod protobuf;
74mod psql;
75mod s3;
76mod schema_registry;
77mod set;
78mod skip_end;
79mod skip_if;
80mod sleep;
81mod sql;
82mod sql_server;
83mod version_check;
84mod webhook;
85
86/// User-settable configuration parameters.
87#[derive(Debug, Clone)]
88pub struct Config {
89    // === Testdrive options. ===
90    /// Variables to make available to the testdrive script.
91    ///
92    /// The value of each entry will be made available to the script in a
93    /// variable named `arg.KEY`.
94    pub arg_vars: BTreeMap<String, String>,
95    /// A random number to distinguish each run of a testdrive script.
96    pub seed: Option<String>,
97    /// Whether to reset Materialize state before executing each script and
98    /// to clean up AWS state after each script.
99    pub reset: bool,
100    /// Force the use of the specified temporary directory to use.
101    ///
102    /// If unspecified, testdrive creates a temporary directory with a random
103    /// name.
104    pub temp_dir: Option<String>,
105    /// Source string to print out on errors.
106    pub source: Option<String>,
107    /// The default timeout for cancellable operations.
108    pub default_timeout: Duration,
109    /// The default number of tries for retriable operations.
110    pub default_max_tries: usize,
111    /// The initial backoff interval for retry operations.
112    ///
113    /// Set to 0 to retry immediately on failure.
114    pub initial_backoff: Duration,
115    /// Backoff factor to use for retry operations.
116    ///
117    /// Set to 1 to retry at a steady pace.
118    pub backoff_factor: f64,
119    /// Should we skip coordinator and catalog consistency checks.
120    pub consistency_checks: consistency::Level,
121    /// Whether to run statement logging consistency checks (adds a few seconds at the end of every
122    /// test file).
123    pub check_statement_logging: bool,
124    /// Whether to automatically rewrite wrong results instead of failing.
125    pub rewrite_results: bool,
126
127    // === Materialize options. ===
128    /// The pgwire connection parameters for the Materialize instance that
129    /// testdrive will connect to.
130    pub materialize_pgconfig: tokio_postgres::Config,
131    /// The internal pgwire connection parameters for the Materialize instance that
132    /// testdrive will connect to.
133    pub materialize_internal_pgconfig: tokio_postgres::Config,
134    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
135    pub materialize_use_https: bool,
136    /// The port for the public endpoints of the materialize instance that
137    /// testdrive will connect to via HTTP.
138    pub materialize_http_port: u16,
139    /// The port for the internal endpoints of the materialize instance that
140    /// testdrive will connect to via HTTP.
141    pub materialize_internal_http_port: u16,
142    /// The port for the password endpoints of the materialize instance that
143    /// testdrive will connect to via SQL.
144    pub materialize_password_sql_port: u16,
145    /// The port for the SASL endpoints of the materialize instance that
146    /// testdrive will connect to via SQL.
147    pub materialize_sasl_sql_port: u16,
148    /// Session parameters to set after connecting to materialize.
149    pub materialize_params: Vec<(String, String)>,
150    /// An optional catalog configuration.
151    pub materialize_catalog_config: Option<CatalogConfig>,
152    /// Build information
153    pub build_info: &'static BuildInfo,
154    /// Configured cluster replica sizes
155    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
156
157    // === Persist options. ===
158    /// Handle to the persist consensus system.
159    pub persist_consensus_url: Option<SensitiveUrl>,
160    /// Handle to the persist blob storage.
161    pub persist_blob_url: Option<SensitiveUrl>,
162
163    // === Confluent options. ===
164    /// The address of the Kafka broker that testdrive will interact with.
165    pub kafka_addr: String,
166    /// Default number of partitions to use for topics
167    pub kafka_default_partitions: usize,
168    /// Arbitrary rdkafka options for testdrive to use when connecting to the
169    /// Kafka broker.
170    pub kafka_opts: Vec<(String, String)>,
171    /// The URL of the schema registry that testdrive will connect to.
172    pub schema_registry_url: Url,
173    /// An optional path to a TLS certificate that testdrive will present when
174    /// performing client authentication.
175    ///
176    /// The keystore must be in the PKCS#12 format.
177    pub cert_path: Option<String>,
178    /// An optional password for the TLS certificate.
179    pub cert_password: Option<String>,
180    /// An optional username for basic authentication with the Confluent Schema
181    /// Registry.
182    pub ccsr_username: Option<String>,
183    /// An optional password for basic authentication with the Confluent Schema
184    /// Registry.
185    pub ccsr_password: Option<String>,
186
187    // === AWS options. ===
188    /// The configuration to use when connecting to AWS.
189    pub aws_config: SdkConfig,
190    /// The ID of the AWS account that `aws_config` configures.
191    pub aws_account: String,
192
193    // === Fivetran options. ===
194    /// Address of the Fivetran Destination that is currently running.
195    pub fivetran_destination_url: String,
196    /// Directory that is accessible to the Fivetran Destination.
197    pub fivetran_destination_files_path: String,
198}
199
200pub struct MaterializeState {
201    catalog_config: Option<CatalogConfig>,
202
203    sql_addr: String,
204    use_https: bool,
205    http_addr: String,
206    internal_sql_addr: String,
207    internal_http_addr: String,
208    password_sql_addr: String,
209    sasl_sql_addr: String,
210    user: String,
211    pgclient: tokio_postgres::Client,
212    environment_id: EnvironmentId,
213    bootstrap_args: BootstrapArgs,
214}
215
216pub struct State {
217    // The Config that this `State` was originally created from.
218    pub config: Config,
219
220    // === Testdrive state. ===
221    arg_vars: BTreeMap<String, String>,
222    cmd_vars: BTreeMap<String, String>,
223    seed: String,
224    temp_path: PathBuf,
225    _tempfile: Option<tempfile::TempDir>,
226    default_timeout: Duration,
227    timeout: Duration,
228    max_tries: usize,
229    initial_backoff: Duration,
230    backoff_factor: f64,
231    consistency_checks: consistency::Level,
232    check_statement_logging: bool,
233    consistency_checks_adhoc_skip: bool,
234    regex: Option<Regex>,
235    regex_replacement: String,
236    error_line_count: usize,
237    error_string: String,
238
239    // === Materialize state. ===
240    materialize: MaterializeState,
241
242    // === Persist state. ===
243    persist_consensus_url: Option<SensitiveUrl>,
244    persist_blob_url: Option<SensitiveUrl>,
245    build_info: &'static BuildInfo,
246    persist_clients: PersistClientCache,
247
248    // === Confluent state. ===
249    schema_registry_url: Url,
250    ccsr_client: mz_ccsr::Client,
251    kafka_addr: String,
252    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
253    kafka_admin_opts: rdkafka::admin::AdminOptions,
254    kafka_config: ClientConfig,
255    kafka_default_partitions: usize,
256    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
257    kafka_topics: BTreeMap<String, usize>,
258
259    // === AWS state. ===
260    aws_account: String,
261    aws_config: SdkConfig,
262
263    // === Database driver state. ===
264    pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
265    mysql_clients: BTreeMap<String, mysql_async::Conn>,
266    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
267    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
268
269    // === Fivetran state. ===
270    fivetran_destination_url: String,
271    fivetran_destination_files_path: String,
272
273    // === Rewrite state. ===
274    rewrite_results: bool,
275    /// Current file, results are replaced inline
276    pub rewrites: Vec<Rewrite>,
277    /// Start position of currently expected result
278    pub rewrite_pos_start: usize,
279    /// End position of currently expected result
280    pub rewrite_pos_end: usize,
281}
282
283pub struct Rewrite {
284    pub content: String,
285    pub start: usize,
286    pub end: usize,
287}
288
289impl State {
290    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
291        self.cmd_vars
292            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
293        self.cmd_vars.insert(
294            "testdrive.schema-registry-url".into(),
295            self.schema_registry_url.to_string(),
296        );
297        self.cmd_vars
298            .insert("testdrive.seed".into(), self.seed.clone());
299        self.cmd_vars.insert(
300            "testdrive.temp-dir".into(),
301            self.temp_path.display().to_string(),
302        );
303        self.cmd_vars
304            .insert("testdrive.aws-region".into(), self.aws_region().into());
305        self.cmd_vars
306            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
307        self.cmd_vars
308            .insert("testdrive.aws-account".into(), self.aws_account.clone());
309        {
310            let aws_credentials = self
311                .aws_config
312                .credentials_provider()
313                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
314                .provide_credentials()
315                .await
316                .context("fetching AWS credentials")?;
317            self.cmd_vars.insert(
318                "testdrive.aws-access-key-id".into(),
319                aws_credentials.access_key_id().to_owned(),
320            );
321            self.cmd_vars.insert(
322                "testdrive.aws-secret-access-key".into(),
323                aws_credentials.secret_access_key().to_owned(),
324            );
325            self.cmd_vars.insert(
326                "testdrive.aws-token".into(),
327                aws_credentials
328                    .session_token()
329                    .map(|token| token.to_owned())
330                    .unwrap_or_else(String::new),
331            );
332        }
333        self.cmd_vars.insert(
334            "testdrive.materialize-environment-id".into(),
335            self.materialize.environment_id.to_string(),
336        );
337        self.cmd_vars.insert(
338            "testdrive.materialize-sql-addr".into(),
339            self.materialize.sql_addr.clone(),
340        );
341        self.cmd_vars.insert(
342            "testdrive.materialize-internal-sql-addr".into(),
343            self.materialize.internal_sql_addr.clone(),
344        );
345        self.cmd_vars.insert(
346            "testdrive.materialize-password-sql-addr".into(),
347            self.materialize.password_sql_addr.clone(),
348        );
349        self.cmd_vars.insert(
350            "testdrive.materialize-sasl-sql-addr".into(),
351            self.materialize.sasl_sql_addr.clone(),
352        );
353        self.cmd_vars.insert(
354            "testdrive.materialize-user".into(),
355            self.materialize.user.clone(),
356        );
357        self.cmd_vars.insert(
358            "testdrive.fivetran-destination-url".into(),
359            self.fivetran_destination_url.clone(),
360        );
361        self.cmd_vars.insert(
362            "testdrive.fivetran-destination-files-path".into(),
363            self.fivetran_destination_files_path.clone(),
364        );
365
366        for (key, value) in env::vars() {
367            self.cmd_vars.insert(format!("env.{}", key), value);
368        }
369
370        for (key, value) in &self.arg_vars {
371            validate_ident(key)?;
372            self.cmd_vars
373                .insert(format!("arg.{}", key), value.to_string());
374        }
375
376        Ok(())
377    }
378    /// Makes of copy of the durable catalog and runs a function on its
379    /// state. Returns `None` if there's no catalog information in the State.
380    pub async fn with_catalog_copy<F, T>(
381        &self,
382        system_parameter_defaults: BTreeMap<String, String>,
383        build_info: &'static BuildInfo,
384        bootstrap_args: &BootstrapArgs,
385        enable_expression_cache_override: Option<bool>,
386        f: F,
387    ) -> Result<Option<T>, anyhow::Error>
388    where
389        F: FnOnce(ConnCatalog) -> T,
390    {
391        async fn persist_client(
392            persist_consensus_url: SensitiveUrl,
393            persist_blob_url: SensitiveUrl,
394            persist_clients: &PersistClientCache,
395        ) -> Result<PersistClient, anyhow::Error> {
396            let persist_location = PersistLocation {
397                blob_uri: persist_blob_url,
398                consensus_uri: persist_consensus_url,
399            };
400            Ok(persist_clients.open(persist_location).await?)
401        }
402
403        if let Some(CatalogConfig {
404            persist_consensus_url,
405            persist_blob_url,
406        }) = &self.materialize.catalog_config
407        {
408            let persist_client = persist_client(
409                persist_consensus_url.clone(),
410                persist_blob_url.clone(),
411                &self.persist_clients,
412            )
413            .await?;
414            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
415                persist_client,
416                SYSTEM_TIME.clone(),
417                self.materialize.environment_id.clone(),
418                system_parameter_defaults,
419                build_info,
420                bootstrap_args,
421                enable_expression_cache_override,
422            )
423            .await?;
424            let res = f(catalog.for_session(&Session::dummy()));
425            catalog.expire().await;
426            Ok(Some(res))
427        } else {
428            Ok(None)
429        }
430    }
431
432    pub fn aws_endpoint(&self) -> &str {
433        self.aws_config.endpoint_url().unwrap_or("")
434    }
435
436    pub fn aws_region(&self) -> &str {
437        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
438    }
439
440    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
441    /// the consistency checks should be skipped for this current run.
442    pub fn clear_skip_consistency_checks(&mut self) -> bool {
443        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
444    }
445
446    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
447        let (inner_client, _) = postgres_client(
448            &format!(
449                "postgres://mz_system:materialize@{}",
450                self.materialize.internal_sql_addr
451            ),
452            self.default_timeout,
453        )
454        .await?;
455
456        let version = pg_query_one(&inner_client, pg_sql!("SELECT mz_version_num()"), &[])
457            .await
458            .context("getting version of materialize")
459            .map(|row| row.get::<_, i32>(0))?;
460
461        let semver = pg_query_one(
462            &inner_client,
463            pg_sql!("SELECT right(split_part(mz_version(), ' ', 1), -1)"),
464            &[],
465        )
466        .await
467        .context("getting semver of materialize")
468        .map(|row| row.get::<_, String>(0))?
469        .parse::<semver::Version>()
470        .context("parsing semver of materialize")?;
471
472        pg_batch_execute(&inner_client, pg_sql!("ALTER SYSTEM RESET ALL"))
473            .await
474            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
475
476        // Dangerous functions are useful for tests so we enable it for all tests.
477        {
478            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
479            let enable_unsafe_functions = if semver >= rename_version {
480                "unsafe_enable_unsafe_functions"
481            } else {
482                "enable_unsafe_functions"
483            };
484            let res = pg_batch_execute(
485                &inner_client,
486                pg_sql!(
487                    "ALTER SYSTEM SET {} = on",
488                    Sql::ident(enable_unsafe_functions)
489                ),
490            )
491            .await
492            .context("enabling dangerous functions");
493            if let Err(e) = res {
494                match e.root_cause().downcast_ref::<DbError>() {
495                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
496                        info!(
497                            "can't enable unsafe functions because the server is safe mode; \
498                             testdrive scripts will fail if they use unsafe functions",
499                        );
500                    }
501                    _ => return Err(e),
502                }
503            }
504        }
505
506        for row in pg_query(&inner_client, pg_sql!("SHOW DATABASES"), &[])
507            .await
508            .context("resetting materialize state: SHOW DATABASES")?
509        {
510            let db_name: String = row.get(0);
511            if db_name.starts_with("testdrive_no_reset_") {
512                continue;
513            }
514            let drop_database = pg_sql!("DROP DATABASE {}", Sql::ident(&db_name));
515            sql::print_query(drop_database.as_str(), None);
516            pg_batch_execute(&inner_client, drop_database)
517                .await
518                .context(format!(
519                    "resetting materialize state: DROP DATABASE {}",
520                    db_name,
521                ))?;
522        }
523
524        // Get all user clusters not running any objects owned by users
525        let inactive_user_clusters = "
526        WITH
527            active_user_clusters AS
528            (
529                SELECT DISTINCT cluster_id, object_id
530                FROM
531                    (
532                        SELECT cluster_id, id FROM mz_catalog.mz_sources
533                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
534                        UNION ALL
535                            SELECT cluster_id, id
536                            FROM mz_catalog.mz_materialized_views
537                        UNION ALL
538                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
539                        UNION ALL
540                            SELECT cluster_id, id
541                            FROM mz_internal.mz_subscriptions
542                    )
543                    AS t (cluster_id, object_id)
544                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
545            )
546        SELECT name
547        FROM mz_catalog.mz_clusters
548        WHERE
549            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
550                AND
551            owner_id LIKE 'u%';";
552
553        let inactive_clusters = pg_query(&inner_client, Sql::new(inactive_user_clusters), &[])
554            .await
555            .context("resetting materialize state: inactive_user_clusters")?;
556
557        if !inactive_clusters.is_empty() {
558            println!("cleaning up user clusters from previous tests...")
559        }
560
561        for cluster_name in inactive_clusters {
562            let cluster_name: String = cluster_name.get(0);
563            if cluster_name.starts_with("testdrive_no_reset_") {
564                continue;
565            }
566            let drop_cluster = pg_sql!("DROP CLUSTER {}", Sql::ident(&cluster_name));
567            sql::print_query(drop_cluster.as_str(), None);
568            pg_batch_execute(&inner_client, drop_cluster)
569                .await
570                .context(format!(
571                    "resetting materialize state: DROP CLUSTER {}",
572                    cluster_name,
573                ))?;
574        }
575
576        pg_batch_execute(&inner_client, pg_sql!("CREATE DATABASE materialize"))
577            .await
578            .context("resetting materialize state: CREATE DATABASE materialize")?;
579
580        // Attempt to remove all users but the current user. Old versions of
581        // Materialize did not support roles, so this degrades gracefully if
582        // mz_roles does not exist.
583        if let Ok(rows) = pg_query(&inner_client, pg_sql!("SELECT name FROM mz_roles"), &[]).await {
584            for row in rows {
585                let role_name: String = row.get(0);
586                if role_name == self.materialize.user || role_name.starts_with("mz_") {
587                    continue;
588                }
589                let drop_role = pg_sql!("DROP ROLE {}", Sql::ident(&role_name));
590                sql::print_query(drop_role.as_str(), None);
591                pg_batch_execute(&inner_client, drop_role)
592                    .await
593                    .context(format!(
594                        "resetting materialize state: DROP ROLE {}",
595                        role_name,
596                    ))?;
597            }
598        }
599
600        // Alter materialize user with all system privileges.
601        pg_batch_execute(
602            &inner_client,
603            pg_sql!(
604                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
605                Sql::ident(&self.materialize.user)
606            ),
607        )
608        .await?;
609
610        // Grant initial privileges.
611        pg_batch_execute(
612            &inner_client,
613            pg_sql!("GRANT USAGE ON DATABASE materialize TO PUBLIC"),
614        )
615        .await?;
616        pg_batch_execute(
617            &inner_client,
618            pg_sql!(
619                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
620                Sql::ident(&self.materialize.user)
621            ),
622        )
623        .await?;
624        pg_batch_execute(
625            &inner_client,
626            pg_sql!(
627                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
628                Sql::ident(&self.materialize.user)
629            ),
630        )
631        .await?;
632
633        let cluster = match version {
634            ..=8199 => "default",
635            8200.. => "quickstart",
636        };
637        pg_batch_execute(
638            &inner_client,
639            pg_sql!("GRANT USAGE ON CLUSTER {} TO PUBLIC", Sql::ident(cluster)),
640        )
641        .await?;
642        pg_batch_execute(
643            &inner_client,
644            pg_sql!(
645                "GRANT ALL PRIVILEGES ON CLUSTER {} TO {}",
646                Sql::ident(cluster),
647                Sql::ident(&self.materialize.user)
648            ),
649        )
650        .await?;
651
652        Ok(())
653    }
654
655    /// Delete Kafka topics + CCSR subjects that were created in this run
656    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
657        use rdkafka::types::RDKafkaErrorCode;
658        let mut errors: Vec<anyhow::Error> = Vec::new();
659
660        let metadata = self.kafka_producer.client().fetch_metadata(
661            None,
662            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
663        )?;
664
665        let testdrive_topics: Vec<_> = metadata
666            .topics()
667            .iter()
668            .filter_map(|t| {
669                if t.name().starts_with("testdrive-") {
670                    Some(t.name())
671                } else {
672                    None
673                }
674            })
675            .collect();
676
677        if !testdrive_topics.is_empty() {
678            match self
679                .kafka_admin
680                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
681                .await
682            {
683                Ok(res) => {
684                    if res.len() != testdrive_topics.len() {
685                        errors.push(anyhow!(
686                            "kafka topic deletion returned {} results, but exactly {} expected",
687                            res.len(),
688                            testdrive_topics.len()
689                        ));
690                    }
691                    for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
692                        match res {
693                            Ok(_) | Err((_, RDKafkaErrorCode::UnknownTopicOrPartition)) => (),
694                            Err((_, err)) => {
695                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
696                            }
697                        }
698                    }
699                }
700                Err(e) => {
701                    errors.push(e.into());
702                }
703            };
704        }
705
706        let schema_registry_errors = self.reset_schema_registry().await;
707
708        errors.extend(schema_registry_errors);
709        if errors.is_empty() {
710            Ok(())
711        } else {
712            bail!(
713                "deleting Kafka topics: {} errors: {}",
714                errors.len(),
715                errors
716                    .into_iter()
717                    .map(|e| e.to_string_with_causes())
718                    .join("\n")
719            );
720        }
721    }
722
723    #[allow(clippy::disallowed_types)]
724    async fn reset_schema_registry(&self) -> Vec<anyhow::Error> {
725        use std::collections::HashMap;
726
727        let mut errors = Vec::new();
728        match self
729            .ccsr_client
730            .list_subjects()
731            .await
732            .context("listing schema registry subjects")
733        {
734            Ok(subjects) => {
735                let testdrive_subjects: Vec<_> = subjects
736                    .into_iter()
737                    .filter(|s| s.starts_with("testdrive-"))
738                    .collect();
739
740                // Build the dependency graphs: subject -> list of subjects it references
741                let mut graphs: HashMap<SubjectVersion, Vec<SubjectVersion>> = HashMap::new();
742
743                for subject in &testdrive_subjects {
744                    match self.ccsr_client.get_subject_with_references(subject).await {
745                        Ok((subj, refs)) => {
746                            // Filter to only testdrive subjects
747                            let refs: Vec<_> = refs
748                                .into_iter()
749                                .filter(|r| r.subject.starts_with("testdrive-"))
750                                .collect();
751                            graphs.insert(
752                                SubjectVersion {
753                                    subject: subj.name,
754                                    version: subj.version,
755                                },
756                                refs,
757                            );
758                        }
759                        Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => {
760                            // Subject was already deleted, skip it
761                        }
762                        Err(e) => {
763                            errors.push(anyhow::anyhow!(
764                                "failed to get references for subject {}: {}",
765                                subject,
766                                e
767                            ));
768                        }
769                    }
770                }
771
772                // Get topological ordering (0 = root/no dependencies, higher = more dependents)
773                // We need to delete in reverse order (highest first = subjects that depend on others)
774                let subjects_to_delete: Vec<_> = match mz_ccsr::topological_sort(&graphs) {
775                    Ok(ordered) => {
776                        let mut subjects: Vec<_> = ordered.into_iter().collect();
777                        subjects.sort_by(|a, b| a.1.cmp(&b.1));
778                        subjects.into_iter().map(|(s, _)| s.clone()).collect()
779                    }
780                    Err(_) => {
781                        tracing::info!("Cycle detected, attempting to delete anyway");
782                        // Cycle detected or other error - fall back to deleting in any order
783                        graphs.into_keys().collect()
784                    }
785                };
786
787                for subject in subjects_to_delete {
788                    match self.ccsr_client.delete_subject(&subject.subject).await {
789                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
790                        Err(e) => errors.push(e.into()),
791                    }
792                }
793            }
794            Err(e) => {
795                errors.push(e);
796            }
797        }
798        errors
799    }
800}
801
802/// Configuration for the Catalog.
803#[derive(Debug, Clone)]
804pub struct CatalogConfig {
805    /// Handle to the persist consensus system.
806    pub persist_consensus_url: SensitiveUrl,
807    /// Handle to the persist blob storage.
808    pub persist_blob_url: SensitiveUrl,
809}
810
811pub enum ControlFlow {
812    Continue,
813    SkipBegin,
814    SkipEnd,
815}
816
817#[async_trait]
818pub(crate) trait Run {
819    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
820}
821
822#[async_trait]
823impl Run for PosCommand {
824    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
825        macro_rules! handle_version {
826            ($version_constraint:expr) => {
827                match $version_constraint {
828                    Some(VersionConstraint { min, max }) => {
829                        match version_check::run_version_check(min, max, state).await {
830                            Ok(true) => return Ok(ControlFlow::Continue),
831                            Ok(false) => {}
832                            Err(err) => return Err(PosError::new(err, self.pos)),
833                        }
834                    }
835                    None => {}
836                }
837            };
838        }
839
840        let wrap_err = |e| PosError::new(e, self.pos);
841        // Substitute variables at startup except for the command-specific ones
842        // Those will be substituted at runtime
843        let ignore_prefix = match &self.command {
844            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
845            _ => None,
846        };
847        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
848            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
849        };
850        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
851            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
852        };
853
854        let r = match self.command {
855            Command::Builtin(mut builtin, version_constraint) => {
856                handle_version!(version_constraint);
857                for val in builtin.args.values_mut() {
858                    *val = subst(val, &state.cmd_vars)?;
859                }
860                for line in &mut builtin.input {
861                    *line = subst(line, &state.cmd_vars)?;
862                }
863                match builtin.name.as_ref() {
864                    "check-consistency" => consistency::run_consistency_checks(state).await,
865                    "skip-consistency-checks" => {
866                        consistency::skip_consistency_checks(builtin, state)
867                    }
868                    "check-shard-tombstone" => {
869                        consistency::run_check_shard_tombstone(builtin, state).await
870                    }
871                    "duckdb-execute" => duckdb::run_execute(builtin, state).await,
872                    "duckdb-query" => duckdb::run_query(builtin, state).await,
873                    "fivetran-destination" => {
874                        fivetran::run_destination_command(builtin, state).await
875                    }
876                    "file-append" => file::run_append(builtin, state).await,
877                    "file-delete" => file::run_delete(builtin, state).await,
878                    "http-request" => http::run_request(builtin, state).await,
879                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
880                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
881                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
882                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
883                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
884                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
885                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
886                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
887                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
888                    "mysql-connect" => mysql::run_connect(builtin, state).await,
889                    "mysql-execute" => mysql::run_execute(builtin, state).await,
890                    "nop" => nop::run_nop(),
891                    "postgres-connect" => postgres::run_connect(builtin, state).await,
892                    "postgres-execute" => postgres::run_execute(builtin, state).await,
893                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
894                    "protobuf-compile-descriptors" => {
895                        protobuf::run_compile_descriptors(builtin, state).await
896                    }
897                    "psql-execute" => psql::run_execute(builtin, state).await,
898                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
899                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
900                    "s3-file-upload" => s3::run_upload(builtin, state).await,
901                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
902                    "s3-upload-parquet-types" => s3::run_upload_parquet_types(builtin, state).await,
903                    "s3-upload-parquet-unsorted-map" => {
904                        s3::run_upload_parquet_unsorted_map(builtin, state).await
905                    }
906                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
907                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
908                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
909                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
910                    "skip-end" => skip_end::run_skip_end(),
911                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
912                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
913                    "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
914                    "persist-force-compaction" => {
915                        persist::run_force_compaction(builtin, state).await
916                    }
917                    "random-sleep" => sleep::run_random_sleep(builtin),
918                    "set-regex" => set::run_regex_set(builtin, state),
919                    "unset-regex" => set::run_regex_unset(builtin, state),
920                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
921                    "set-max-tries" => set::run_max_tries(builtin, state),
922                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
923                        sleep::run_sleep(builtin)
924                    }
925                    "set" => set::set_vars(builtin, state),
926                    "set-arg-default" => set::run_set_arg_default(builtin, state),
927                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
928                    "set-from-file" => set::run_set_from_file(builtin, state).await,
929                    "webhook-append" => webhook::run_append(builtin, state).await,
930                    _ => {
931                        return Err(PosError::new(
932                            anyhow!("unknown built-in command {}", builtin.name),
933                            self.pos,
934                        ));
935                    }
936                }
937            }
938            Command::Sql(mut sql, version_constraint) => {
939                handle_version!(version_constraint);
940                sql.query = subst(&sql.query, &state.cmd_vars)?;
941                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
942                    for row in expected_rows {
943                        for col in row {
944                            *col = subst(col, &state.cmd_vars)?;
945                        }
946                    }
947                }
948                sql::run_sql(sql, state).await
949            }
950            Command::FailSql(mut sql, version_constraint) => {
951                handle_version!(version_constraint);
952                sql.query = subst(&sql.query, &state.cmd_vars)?;
953                sql.expected_error = match &sql.expected_error {
954                    SqlExpectedError::Contains(s) => {
955                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
956                    }
957                    SqlExpectedError::Exact(s) => {
958                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
959                    }
960                    SqlExpectedError::Regex(s) => {
961                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
962                    }
963                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
964                };
965                sql::run_fail_sql(sql, state).await
966            }
967        };
968
969        r.map_err(wrap_err)
970    }
971}
972
973/// Substituted `${}`-delimited variables from `vars` into `msg`
974fn substitute_vars(
975    msg: &str,
976    vars: &BTreeMap<String, String>,
977    ignore_prefix: &Option<String>,
978    regex_escape: bool,
979) -> Result<String, anyhow::Error> {
980    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
981    let mut err = None;
982    let out = RE.replace_all(msg, |caps: &Captures| {
983        let name = &caps[1];
984        if let Some(ignore_prefix) = &ignore_prefix {
985            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
986                // Do not substitute, leave original variable name in place
987                return caps.get(0).unwrap().as_str().to_string();
988            }
989        }
990
991        if let Some(val) = vars.get(name) {
992            if regex_escape {
993                regex::escape(val)
994            } else {
995                val.to_string()
996            }
997        } else {
998            err = Some(anyhow!("unknown variable: {}", name));
999            "#VAR-MISSING#".to_string()
1000        }
1001    });
1002    match err {
1003        Some(err) => Err(err),
1004        None => Ok(out.into_owned()),
1005    }
1006}
1007
1008/// Initializes a [`State`] object by connecting to the various external
1009/// services specified in `config`.
1010///
1011/// Returns the initialized `State` and a cleanup future. The cleanup future
1012/// should be `await`ed only *after* dropping the `State` to check whether any
1013/// errors occured while dropping the `State`. This awkward API is a workaround
1014/// for the lack of `AsyncDrop` support in Rust.
1015pub async fn create_state(
1016    config: &Config,
1017) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
1018    let seed = config
1019        .seed
1020        .clone()
1021        .unwrap_or_else(|| format!("{:010}", rand::random::<u32>()));
1022
1023    let (_tempfile, temp_path) = match &config.temp_dir {
1024        Some(temp_dir) => {
1025            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
1026            (None, PathBuf::from(&temp_dir))
1027        }
1028        _ => {
1029            // Stash the tempfile object so that it does not go out of scope and delete
1030            // the tempdir prematurely
1031            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
1032            let temp_path = tempfile_handle.path().to_path_buf();
1033            (Some(tempfile_handle), temp_path)
1034        }
1035    };
1036
1037    let materialize_catalog_config = config.materialize_catalog_config.clone();
1038
1039    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1040    info!("Connecting to {}", materialize_url.as_str());
1041    let (pgclient, pgconn) = Retry::default()
1042        .max_duration(config.default_timeout)
1043        .retry_async_canceling(|_| async move {
1044            let mut pgconfig = config.materialize_pgconfig.clone();
1045            pgconfig.connect_timeout(config.default_timeout);
1046            let tls = make_tls(&pgconfig)?;
1047            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
1048        })
1049        .await?;
1050
1051    let pgconn_task =
1052        task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
1053
1054    let materialize_state =
1055        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
1056
1057    let schema_registry_url = config.schema_registry_url.to_owned();
1058
1059    let ccsr_client = {
1060        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
1061
1062        if let Some(cert_path) = &config.cert_path {
1063            let cert = fs::read(cert_path).context("reading cert")?;
1064            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
1065            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
1066                .context("reading keystore file as pkcs12")?;
1067            ccsr_config = ccsr_config.identity(ident);
1068        }
1069
1070        if let Some(ccsr_username) = &config.ccsr_username {
1071            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1072        }
1073
1074        ccsr_config.build().context("Creating CCSR client")?
1075    };
1076
1077    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1078        use rdkafka::admin::{AdminClient, AdminOptions};
1079        use rdkafka::producer::FutureProducer;
1080
1081        let mut kafka_config = create_new_client_config_simple();
1082        kafka_config.set("bootstrap.servers", &config.kafka_addr);
1083        kafka_config.set("group.id", "materialize-testdrive");
1084        kafka_config.set("auto.offset.reset", "earliest");
1085        kafka_config.set("isolation.level", "read_committed");
1086        if let Some(cert_path) = &config.cert_path {
1087            kafka_config.set("security.protocol", "ssl");
1088            kafka_config.set("ssl.keystore.location", cert_path);
1089            if let Some(cert_password) = &config.cert_password {
1090                kafka_config.set("ssl.keystore.password", cert_password);
1091            }
1092        }
1093        kafka_config.set("message.max.bytes", "15728640");
1094
1095        for (key, value) in &config.kafka_opts {
1096            kafka_config.set(key, value);
1097        }
1098
1099        let admin: AdminClient<_> = kafka_config
1100            .create_with_context(MzClientContext::default())
1101            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1102
1103        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1104
1105        let producer: FutureProducer<_> = kafka_config
1106            .create_with_context(MzClientContext::default())
1107            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1108
1109        let topics = BTreeMap::new();
1110
1111        (
1112            config.kafka_addr.to_owned(),
1113            admin,
1114            admin_opts,
1115            producer,
1116            topics,
1117            kafka_config,
1118        )
1119    };
1120
1121    let mut state = State {
1122        config: config.clone(),
1123
1124        // === Testdrive state. ===
1125        arg_vars: config.arg_vars.clone(),
1126        cmd_vars: BTreeMap::new(),
1127        seed,
1128        temp_path,
1129        _tempfile,
1130        default_timeout: config.default_timeout,
1131        timeout: config.default_timeout,
1132        max_tries: config.default_max_tries,
1133        initial_backoff: config.initial_backoff,
1134        backoff_factor: config.backoff_factor,
1135        consistency_checks: config.consistency_checks,
1136        check_statement_logging: config.check_statement_logging,
1137        consistency_checks_adhoc_skip: false,
1138        regex: None,
1139        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1140        rewrite_results: config.rewrite_results,
1141        error_line_count: 0,
1142        error_string: "".to_string(),
1143
1144        // === Materialize state. ===
1145        materialize: materialize_state,
1146
1147        // === Persist state. ===
1148        persist_consensus_url: config.persist_consensus_url.clone(),
1149        persist_blob_url: config.persist_blob_url.clone(),
1150        build_info: config.build_info,
1151        persist_clients: PersistClientCache::new(
1152            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1153            &MetricsRegistry::new(),
1154            |_, _| PubSubClientConnection::noop(),
1155        ),
1156
1157        // === Confluent state. ===
1158        schema_registry_url,
1159        ccsr_client,
1160        kafka_addr,
1161        kafka_admin,
1162        kafka_admin_opts,
1163        kafka_config,
1164        kafka_default_partitions: config.kafka_default_partitions,
1165        kafka_producer,
1166        kafka_topics,
1167
1168        // === AWS state. ===
1169        aws_account: config.aws_account.clone(),
1170        aws_config: config.aws_config.clone(),
1171
1172        // === Database driver state. ===
1173        duckdb_clients: BTreeMap::new(),
1174        mysql_clients: BTreeMap::new(),
1175        postgres_clients: BTreeMap::new(),
1176        sql_server_clients: BTreeMap::new(),
1177
1178        // === Fivetran state. ===
1179        fivetran_destination_url: config.fivetran_destination_url.clone(),
1180        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1181
1182        rewrites: Vec::new(),
1183        rewrite_pos_start: 0,
1184        rewrite_pos_end: 0,
1185    };
1186    state.initialize_cmd_vars().await?;
1187    Ok((state, pgconn_task))
1188}
1189
1190async fn create_materialize_state(
1191    config: &&Config,
1192    materialize_catalog_config: Option<CatalogConfig>,
1193    pgclient: tokio_postgres::Client,
1194) -> Result<MaterializeState, anyhow::Error> {
1195    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1196    let materialize_internal_url =
1197        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1198
1199    for (key, value) in &config.materialize_params {
1200        // Session parameter values are raw SQL fragments from testdrive config.
1201        #[allow(clippy::disallowed_methods)]
1202        pgclient
1203            .batch_execute(&format!("SET {key} = {value}"))
1204            .await
1205            .context("setting session parameter")?;
1206    }
1207
1208    let materialize_user = config
1209        .materialize_pgconfig
1210        .get_user()
1211        .expect("testdrive URL must contain user")
1212        .to_string();
1213
1214    let materialize_sql_addr = format!(
1215        "{}:{}",
1216        materialize_url.host_str().unwrap(),
1217        materialize_url.port().unwrap()
1218    );
1219    let materialize_http_addr = format!(
1220        "{}:{}",
1221        materialize_url.host_str().unwrap(),
1222        config.materialize_http_port
1223    );
1224    let materialize_internal_sql_addr = format!(
1225        "{}:{}",
1226        materialize_internal_url.host_str().unwrap(),
1227        materialize_internal_url.port().unwrap()
1228    );
1229    let materialize_password_sql_addr = format!(
1230        "{}:{}",
1231        materialize_url.host_str().unwrap(),
1232        config.materialize_password_sql_port
1233    );
1234    let materialize_sasl_sql_addr = format!(
1235        "{}:{}",
1236        materialize_url.host_str().unwrap(),
1237        config.materialize_sasl_sql_port
1238    );
1239    let materialize_internal_http_addr = format!(
1240        "{}:{}",
1241        materialize_internal_url.host_str().unwrap(),
1242        config.materialize_internal_http_port
1243    );
1244    let environment_id = pg_query_one(&pgclient, pg_sql!("SELECT mz_environment_id()"), &[])
1245        .await?
1246        .get::<_, String>(0)
1247        .parse()
1248        .context("parsing environment ID")?;
1249
1250    let bootstrap_args = BootstrapArgs {
1251        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1252        default_cluster_replica_size: "ABC".to_string(),
1253        default_cluster_replication_factor: 1,
1254        bootstrap_role: None,
1255    };
1256
1257    let materialize_state = MaterializeState {
1258        catalog_config: materialize_catalog_config,
1259        sql_addr: materialize_sql_addr,
1260        use_https: config.materialize_use_https,
1261        http_addr: materialize_http_addr,
1262        internal_sql_addr: materialize_internal_sql_addr,
1263        internal_http_addr: materialize_internal_http_addr,
1264        password_sql_addr: materialize_password_sql_addr,
1265        sasl_sql_addr: materialize_sasl_sql_addr,
1266        user: materialize_user,
1267        pgclient,
1268        environment_id,
1269        bootstrap_args,
1270    };
1271
1272    Ok(materialize_state)
1273}