Skip to main content

mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_ccsr::SubjectVersion;
30use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
31use mz_ore::error::ErrorExt;
32use mz_ore::metrics::MetricsRegistry;
33use mz_ore::now::SYSTEM_TIME;
34use mz_ore::retry::Retry;
35use mz_ore::task;
36use mz_ore::url::SensitiveUrl;
37use mz_persist_client::cache::PersistClientCache;
38use mz_persist_client::cfg::PersistConfig;
39use mz_persist_client::rpc::PubSubClientConnection;
40use mz_persist_client::{PersistClient, PersistLocation};
41use mz_postgres_util::{
42    Sql, batch_execute as pg_batch_execute, query as pg_query, query_one as pg_query_one,
43    sql as pg_sql,
44};
45use mz_sql::catalog::EnvironmentId;
46use mz_tls_util::make_tls;
47use rdkafka::ClientConfig;
48use rdkafka::producer::Producer;
49use regex::{Captures, Regex};
50use semver::Version;
51use tokio_postgres::error::{DbError, SqlState};
52use tracing::info;
53use url::Url;
54
55use crate::error::PosError;
56use crate::parser::{
57    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
58};
59use crate::util;
60use crate::util::postgres::postgres_client;
61
62pub mod consistency;
63
64mod duckdb;
65mod file;
66mod fivetran;
67mod glue;
68mod http;
69mod kafka;
70mod mysql;
71mod nop;
72mod persist;
73mod postgres;
74mod protobuf;
75mod psql;
76mod s3;
77mod schema_registry;
78mod set;
79mod skip_end;
80mod skip_if;
81mod sleep;
82mod sql;
83mod sql_server;
84mod version_check;
85mod webhook;
86
87/// User-settable configuration parameters.
88#[derive(Debug, Clone)]
89pub struct Config {
90    // === Testdrive options. ===
91    /// Variables to make available to the testdrive script.
92    ///
93    /// The value of each entry will be made available to the script in a
94    /// variable named `arg.KEY`.
95    pub arg_vars: BTreeMap<String, String>,
96    /// A random number to distinguish each run of a testdrive script.
97    pub seed: Option<String>,
98    /// Whether to reset Materialize state before executing each script and
99    /// to clean up AWS state after each script.
100    pub reset: bool,
101    /// Force the use of the specified temporary directory to use.
102    ///
103    /// If unspecified, testdrive creates a temporary directory with a random
104    /// name.
105    pub temp_dir: Option<String>,
106    /// Source string to print out on errors.
107    pub source: Option<String>,
108    /// The default timeout for cancellable operations.
109    pub default_timeout: Duration,
110    /// The default number of tries for retriable operations.
111    pub default_max_tries: usize,
112    /// The initial backoff interval for retry operations.
113    ///
114    /// Set to 0 to retry immediately on failure.
115    pub initial_backoff: Duration,
116    /// Backoff factor to use for retry operations.
117    ///
118    /// Set to 1 to retry at a steady pace.
119    pub backoff_factor: f64,
120    /// Should we skip coordinator and catalog consistency checks.
121    pub consistency_checks: consistency::Level,
122    /// Whether to run statement logging consistency checks (adds a few seconds at the end of every
123    /// test file).
124    pub check_statement_logging: bool,
125    /// Whether to automatically rewrite wrong results instead of failing.
126    pub rewrite_results: bool,
127
128    // === Materialize options. ===
129    /// The pgwire connection parameters for the Materialize instance that
130    /// testdrive will connect to.
131    pub materialize_pgconfig: tokio_postgres::Config,
132    /// The internal pgwire connection parameters for the Materialize instance that
133    /// testdrive will connect to.
134    pub materialize_internal_pgconfig: tokio_postgres::Config,
135    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
136    pub materialize_use_https: bool,
137    /// The port for the public endpoints of the materialize instance that
138    /// testdrive will connect to via HTTP.
139    pub materialize_http_port: u16,
140    /// The port for the internal endpoints of the materialize instance that
141    /// testdrive will connect to via HTTP.
142    pub materialize_internal_http_port: u16,
143    /// The port for the password endpoints of the materialize instance that
144    /// testdrive will connect to via SQL.
145    pub materialize_password_sql_port: u16,
146    /// The port for the SASL endpoints of the materialize instance that
147    /// testdrive will connect to via SQL.
148    pub materialize_sasl_sql_port: u16,
149    /// Session parameters to set after connecting to materialize.
150    pub materialize_params: Vec<(String, String)>,
151    /// An optional catalog configuration.
152    pub materialize_catalog_config: Option<CatalogConfig>,
153    /// Build information
154    pub build_info: &'static BuildInfo,
155    /// Configured cluster replica sizes
156    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
157
158    // === Persist options. ===
159    /// Handle to the persist consensus system.
160    pub persist_consensus_url: Option<SensitiveUrl>,
161    /// Handle to the persist blob storage.
162    pub persist_blob_url: Option<SensitiveUrl>,
163
164    // === Confluent options. ===
165    /// The address of the Kafka broker that testdrive will interact with.
166    pub kafka_addr: String,
167    /// Default number of partitions to use for topics
168    pub kafka_default_partitions: usize,
169    /// Arbitrary rdkafka options for testdrive to use when connecting to the
170    /// Kafka broker.
171    pub kafka_opts: Vec<(String, String)>,
172    /// The URL of the schema registry that testdrive will connect to.
173    pub schema_registry_url: Url,
174    /// An optional path to a TLS certificate that testdrive will present when
175    /// performing client authentication.
176    ///
177    /// The keystore must be in the PKCS#12 format.
178    pub cert_path: Option<String>,
179    /// An optional password for the TLS certificate.
180    pub cert_password: Option<String>,
181    /// An optional username for basic authentication with the Confluent Schema
182    /// Registry.
183    pub ccsr_username: Option<String>,
184    /// An optional password for basic authentication with the Confluent Schema
185    /// Registry.
186    pub ccsr_password: Option<String>,
187
188    // === AWS options. ===
189    /// The configuration to use when connecting to AWS.
190    pub aws_config: SdkConfig,
191    /// The ID of the AWS account that `aws_config` configures.
192    pub aws_account: String,
193
194    // === Fivetran options. ===
195    /// Address of the Fivetran Destination that is currently running.
196    pub fivetran_destination_url: String,
197    /// Directory that is accessible to the Fivetran Destination.
198    pub fivetran_destination_files_path: String,
199}
200
201pub struct MaterializeState {
202    catalog_config: Option<CatalogConfig>,
203
204    sql_addr: String,
205    use_https: bool,
206    http_addr: String,
207    internal_sql_addr: String,
208    internal_http_addr: String,
209    password_sql_addr: String,
210    sasl_sql_addr: String,
211    user: String,
212    pgclient: tokio_postgres::Client,
213    environment_id: EnvironmentId,
214    bootstrap_args: BootstrapArgs,
215}
216
217pub struct State {
218    // The Config that this `State` was originally created from.
219    pub config: Config,
220
221    // === Testdrive state. ===
222    arg_vars: BTreeMap<String, String>,
223    cmd_vars: BTreeMap<String, String>,
224    seed: String,
225    temp_path: PathBuf,
226    _tempfile: Option<tempfile::TempDir>,
227    default_timeout: Duration,
228    timeout: Duration,
229    max_tries: usize,
230    initial_backoff: Duration,
231    backoff_factor: f64,
232    consistency_checks: consistency::Level,
233    check_statement_logging: bool,
234    consistency_checks_adhoc_skip: bool,
235    regex: Option<Regex>,
236    regex_replacement: String,
237    error_line_count: usize,
238    error_string: String,
239
240    // === Materialize state. ===
241    materialize: MaterializeState,
242
243    // === Persist state. ===
244    persist_consensus_url: Option<SensitiveUrl>,
245    persist_blob_url: Option<SensitiveUrl>,
246    build_info: &'static BuildInfo,
247    persist_clients: PersistClientCache,
248
249    // === Confluent state. ===
250    schema_registry_url: Url,
251    ccsr_client: mz_ccsr::Client,
252    kafka_addr: String,
253    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
254    kafka_admin_opts: rdkafka::admin::AdminOptions,
255    kafka_config: ClientConfig,
256    kafka_default_partitions: usize,
257    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
258    kafka_topics: BTreeMap<String, usize>,
259
260    // === AWS state. ===
261    aws_account: String,
262    aws_config: SdkConfig,
263
264    // === Database driver state. ===
265    pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
266    mysql_clients: BTreeMap<String, mysql_async::Conn>,
267    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
268    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
269
270    // === Fivetran state. ===
271    fivetran_destination_url: String,
272    fivetran_destination_files_path: String,
273
274    // === Rewrite state. ===
275    rewrite_results: bool,
276    /// Current file, results are replaced inline
277    pub rewrites: Vec<Rewrite>,
278    /// Start position of currently expected result
279    pub rewrite_pos_start: usize,
280    /// End position of currently expected result
281    pub rewrite_pos_end: usize,
282}
283
284pub struct Rewrite {
285    pub content: String,
286    pub start: usize,
287    pub end: usize,
288}
289
290impl State {
291    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
292        self.cmd_vars
293            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
294        self.cmd_vars.insert(
295            "testdrive.schema-registry-url".into(),
296            self.schema_registry_url.to_string(),
297        );
298        self.cmd_vars
299            .insert("testdrive.seed".into(), self.seed.clone());
300        self.cmd_vars.insert(
301            "testdrive.temp-dir".into(),
302            self.temp_path.display().to_string(),
303        );
304        self.cmd_vars
305            .insert("testdrive.aws-region".into(), self.aws_region().into());
306        self.cmd_vars
307            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
308        self.cmd_vars
309            .insert("testdrive.aws-account".into(), self.aws_account.clone());
310        {
311            let aws_credentials = self
312                .aws_config
313                .credentials_provider()
314                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
315                .provide_credentials()
316                .await
317                .context("fetching AWS credentials")?;
318            self.cmd_vars.insert(
319                "testdrive.aws-access-key-id".into(),
320                aws_credentials.access_key_id().to_owned(),
321            );
322            self.cmd_vars.insert(
323                "testdrive.aws-secret-access-key".into(),
324                aws_credentials.secret_access_key().to_owned(),
325            );
326            self.cmd_vars.insert(
327                "testdrive.aws-token".into(),
328                aws_credentials
329                    .session_token()
330                    .map(|token| token.to_owned())
331                    .unwrap_or_else(String::new),
332            );
333        }
334        self.cmd_vars.insert(
335            "testdrive.materialize-environment-id".into(),
336            self.materialize.environment_id.to_string(),
337        );
338        self.cmd_vars.insert(
339            "testdrive.materialize-sql-addr".into(),
340            self.materialize.sql_addr.clone(),
341        );
342        self.cmd_vars.insert(
343            "testdrive.materialize-internal-sql-addr".into(),
344            self.materialize.internal_sql_addr.clone(),
345        );
346        self.cmd_vars.insert(
347            "testdrive.materialize-password-sql-addr".into(),
348            self.materialize.password_sql_addr.clone(),
349        );
350        self.cmd_vars.insert(
351            "testdrive.materialize-sasl-sql-addr".into(),
352            self.materialize.sasl_sql_addr.clone(),
353        );
354        self.cmd_vars.insert(
355            "testdrive.materialize-user".into(),
356            self.materialize.user.clone(),
357        );
358        self.cmd_vars.insert(
359            "testdrive.fivetran-destination-url".into(),
360            self.fivetran_destination_url.clone(),
361        );
362        self.cmd_vars.insert(
363            "testdrive.fivetran-destination-files-path".into(),
364            self.fivetran_destination_files_path.clone(),
365        );
366
367        for (key, value) in env::vars() {
368            self.cmd_vars.insert(format!("env.{}", key), value);
369        }
370
371        for (key, value) in &self.arg_vars {
372            validate_ident(key)?;
373            self.cmd_vars
374                .insert(format!("arg.{}", key), value.to_string());
375        }
376
377        Ok(())
378    }
379    /// Makes of copy of the durable catalog and runs a function on its
380    /// state. Returns `None` if there's no catalog information in the State.
381    pub async fn with_catalog_copy<F, T>(
382        &self,
383        system_parameter_defaults: BTreeMap<String, String>,
384        build_info: &'static BuildInfo,
385        bootstrap_args: &BootstrapArgs,
386        enable_expression_cache_override: Option<bool>,
387        f: F,
388    ) -> Result<Option<T>, anyhow::Error>
389    where
390        F: FnOnce(ConnCatalog) -> T,
391    {
392        async fn persist_client(
393            persist_consensus_url: SensitiveUrl,
394            persist_blob_url: SensitiveUrl,
395            persist_clients: &PersistClientCache,
396        ) -> Result<PersistClient, anyhow::Error> {
397            let persist_location = PersistLocation {
398                blob_uri: persist_blob_url,
399                consensus_uri: persist_consensus_url,
400            };
401            Ok(persist_clients.open(persist_location).await?)
402        }
403
404        if let Some(CatalogConfig {
405            persist_consensus_url,
406            persist_blob_url,
407        }) = &self.materialize.catalog_config
408        {
409            let persist_client = persist_client(
410                persist_consensus_url.clone(),
411                persist_blob_url.clone(),
412                &self.persist_clients,
413            )
414            .await?;
415            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
416                persist_client,
417                SYSTEM_TIME.clone(),
418                self.materialize.environment_id.clone(),
419                system_parameter_defaults,
420                build_info,
421                bootstrap_args,
422                enable_expression_cache_override,
423            )
424            .await?;
425            let res = f(catalog.for_session(&Session::dummy()));
426            catalog.expire().await;
427            Ok(Some(res))
428        } else {
429            Ok(None)
430        }
431    }
432
433    pub fn aws_endpoint(&self) -> &str {
434        self.aws_config.endpoint_url().unwrap_or("")
435    }
436
437    pub fn aws_region(&self) -> &str {
438        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
439    }
440
441    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
442    /// the consistency checks should be skipped for this current run.
443    pub fn clear_skip_consistency_checks(&mut self) -> bool {
444        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
445    }
446
447    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
448        let (inner_client, _) = postgres_client(
449            &format!(
450                "postgres://mz_system:materialize@{}",
451                self.materialize.internal_sql_addr
452            ),
453            self.default_timeout,
454        )
455        .await?;
456
457        let version = pg_query_one(&inner_client, pg_sql!("SELECT mz_version_num()"), &[])
458            .await
459            .context("getting version of materialize")
460            .map(|row| row.get::<_, i32>(0))?;
461
462        let semver = pg_query_one(
463            &inner_client,
464            pg_sql!("SELECT right(split_part(mz_version(), ' ', 1), -1)"),
465            &[],
466        )
467        .await
468        .context("getting semver of materialize")
469        .map(|row| row.get::<_, String>(0))?
470        .parse::<semver::Version>()
471        .context("parsing semver of materialize")?;
472
473        pg_batch_execute(&inner_client, pg_sql!("ALTER SYSTEM RESET ALL"))
474            .await
475            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
476
477        // Dangerous functions are useful for tests so we enable it for all tests.
478        {
479            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
480            let enable_unsafe_functions = if semver >= rename_version {
481                "unsafe_enable_unsafe_functions"
482            } else {
483                "enable_unsafe_functions"
484            };
485            let res = pg_batch_execute(
486                &inner_client,
487                pg_sql!(
488                    "ALTER SYSTEM SET {} = on",
489                    Sql::ident(enable_unsafe_functions)
490                ),
491            )
492            .await
493            .context("enabling dangerous functions");
494            if let Err(e) = res {
495                match e.root_cause().downcast_ref::<DbError>() {
496                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
497                        info!(
498                            "can't enable unsafe functions because the server is safe mode; \
499                             testdrive scripts will fail if they use unsafe functions",
500                        );
501                    }
502                    _ => return Err(e),
503                }
504            }
505        }
506
507        for row in pg_query(&inner_client, pg_sql!("SHOW DATABASES"), &[])
508            .await
509            .context("resetting materialize state: SHOW DATABASES")?
510        {
511            let db_name: String = row.get(0);
512            if db_name.starts_with("testdrive_no_reset_") {
513                continue;
514            }
515            let drop_database = pg_sql!("DROP DATABASE {}", Sql::ident(&db_name));
516            sql::print_query(drop_database.as_str(), None);
517            pg_batch_execute(&inner_client, drop_database)
518                .await
519                .context(format!(
520                    "resetting materialize state: DROP DATABASE {}",
521                    db_name,
522                ))?;
523        }
524
525        // Get all user clusters not running any objects owned by users
526        let inactive_user_clusters = "
527        WITH
528            active_user_clusters AS
529            (
530                SELECT DISTINCT cluster_id, object_id
531                FROM
532                    (
533                        SELECT cluster_id, id FROM mz_catalog.mz_sources
534                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
535                        UNION ALL
536                            SELECT cluster_id, id
537                            FROM mz_catalog.mz_materialized_views
538                        UNION ALL
539                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
540                        UNION ALL
541                            SELECT cluster_id, id
542                            FROM mz_internal.mz_subscriptions
543                    )
544                    AS t (cluster_id, object_id)
545                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
546            )
547        SELECT name
548        FROM mz_catalog.mz_clusters
549        WHERE
550            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
551                AND
552            owner_id LIKE 'u%';";
553
554        let inactive_clusters = pg_query(&inner_client, Sql::new(inactive_user_clusters), &[])
555            .await
556            .context("resetting materialize state: inactive_user_clusters")?;
557
558        if !inactive_clusters.is_empty() {
559            println!("cleaning up user clusters from previous tests...")
560        }
561
562        for cluster_name in inactive_clusters {
563            let cluster_name: String = cluster_name.get(0);
564            if cluster_name.starts_with("testdrive_no_reset_") {
565                continue;
566            }
567            let drop_cluster = pg_sql!("DROP CLUSTER {}", Sql::ident(&cluster_name));
568            sql::print_query(drop_cluster.as_str(), None);
569            pg_batch_execute(&inner_client, drop_cluster)
570                .await
571                .context(format!(
572                    "resetting materialize state: DROP CLUSTER {}",
573                    cluster_name,
574                ))?;
575        }
576
577        pg_batch_execute(&inner_client, pg_sql!("CREATE DATABASE materialize"))
578            .await
579            .context("resetting materialize state: CREATE DATABASE materialize")?;
580
581        // Attempt to remove all users but the current user. Old versions of
582        // Materialize did not support roles, so this degrades gracefully if
583        // mz_roles does not exist.
584        if let Ok(rows) = pg_query(&inner_client, pg_sql!("SELECT name FROM mz_roles"), &[]).await {
585            for row in rows {
586                let role_name: String = row.get(0);
587                if role_name == self.materialize.user || role_name.starts_with("mz_") {
588                    continue;
589                }
590                let drop_role = pg_sql!("DROP ROLE {}", Sql::ident(&role_name));
591                sql::print_query(drop_role.as_str(), None);
592                pg_batch_execute(&inner_client, drop_role)
593                    .await
594                    .context(format!(
595                        "resetting materialize state: DROP ROLE {}",
596                        role_name,
597                    ))?;
598            }
599        }
600
601        // Alter materialize user with all system privileges.
602        pg_batch_execute(
603            &inner_client,
604            pg_sql!(
605                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
606                Sql::ident(&self.materialize.user)
607            ),
608        )
609        .await?;
610
611        // Grant initial privileges.
612        pg_batch_execute(
613            &inner_client,
614            pg_sql!("GRANT USAGE ON DATABASE materialize TO PUBLIC"),
615        )
616        .await?;
617        pg_batch_execute(
618            &inner_client,
619            pg_sql!(
620                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
621                Sql::ident(&self.materialize.user)
622            ),
623        )
624        .await?;
625        pg_batch_execute(
626            &inner_client,
627            pg_sql!(
628                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
629                Sql::ident(&self.materialize.user)
630            ),
631        )
632        .await?;
633
634        let cluster = match version {
635            ..=8199 => "default",
636            8200.. => "quickstart",
637        };
638        pg_batch_execute(
639            &inner_client,
640            pg_sql!("GRANT USAGE ON CLUSTER {} TO PUBLIC", Sql::ident(cluster)),
641        )
642        .await?;
643        pg_batch_execute(
644            &inner_client,
645            pg_sql!(
646                "GRANT ALL PRIVILEGES ON CLUSTER {} TO {}",
647                Sql::ident(cluster),
648                Sql::ident(&self.materialize.user)
649            ),
650        )
651        .await?;
652
653        Ok(())
654    }
655
656    /// Delete Kafka topics + CCSR subjects that were created in this run
657    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
658        use rdkafka::types::RDKafkaErrorCode;
659        let mut errors: Vec<anyhow::Error> = Vec::new();
660
661        let metadata = self.kafka_producer.client().fetch_metadata(
662            None,
663            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
664        )?;
665
666        let testdrive_topics: Vec<_> = metadata
667            .topics()
668            .iter()
669            .filter_map(|t| {
670                if t.name().starts_with("testdrive-") {
671                    Some(t.name())
672                } else {
673                    None
674                }
675            })
676            .collect();
677
678        if !testdrive_topics.is_empty() {
679            match self
680                .kafka_admin
681                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
682                .await
683            {
684                Ok(res) => {
685                    if res.len() != testdrive_topics.len() {
686                        errors.push(anyhow!(
687                            "kafka topic deletion returned {} results, but exactly {} expected",
688                            res.len(),
689                            testdrive_topics.len()
690                        ));
691                    }
692                    for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
693                        match res {
694                            Ok(_) | Err((_, RDKafkaErrorCode::UnknownTopicOrPartition)) => (),
695                            Err((_, err)) => {
696                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
697                            }
698                        }
699                    }
700                }
701                Err(e) => {
702                    errors.push(e.into());
703                }
704            };
705        }
706
707        let schema_registry_errors = self.reset_schema_registry().await;
708
709        errors.extend(schema_registry_errors);
710        if errors.is_empty() {
711            Ok(())
712        } else {
713            bail!(
714                "deleting Kafka topics: {} errors: {}",
715                errors.len(),
716                errors
717                    .into_iter()
718                    .map(|e| e.to_string_with_causes())
719                    .join("\n")
720            );
721        }
722    }
723
724    #[allow(clippy::disallowed_types)]
725    async fn reset_schema_registry(&self) -> Vec<anyhow::Error> {
726        use std::collections::HashMap;
727
728        let mut errors = Vec::new();
729        match self
730            .ccsr_client
731            .list_subjects()
732            .await
733            .context("listing schema registry subjects")
734        {
735            Ok(subjects) => {
736                let testdrive_subjects: Vec<_> = subjects
737                    .into_iter()
738                    .filter(|s| s.starts_with("testdrive-"))
739                    .collect();
740
741                // Build the dependency graphs: subject -> list of subjects it references
742                let mut graphs: HashMap<SubjectVersion, Vec<SubjectVersion>> = HashMap::new();
743
744                for subject in &testdrive_subjects {
745                    match self.ccsr_client.get_subject_with_references(subject).await {
746                        Ok((subj, refs)) => {
747                            // Filter to only testdrive subjects
748                            let refs: Vec<_> = refs
749                                .into_iter()
750                                .filter(|r| r.subject.starts_with("testdrive-"))
751                                .collect();
752                            graphs.insert(
753                                SubjectVersion {
754                                    subject: subj.name,
755                                    version: subj.version,
756                                },
757                                refs,
758                            );
759                        }
760                        Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => {
761                            // Subject was already deleted, skip it
762                        }
763                        Err(e) => {
764                            errors.push(anyhow::anyhow!(
765                                "failed to get references for subject {}: {}",
766                                subject,
767                                e
768                            ));
769                        }
770                    }
771                }
772
773                // Get topological ordering (0 = root/no dependencies, higher = more dependents)
774                // We need to delete in reverse order (highest first = subjects that depend on others)
775                let subjects_to_delete: Vec<_> = match mz_ccsr::topological_sort(&graphs) {
776                    Ok(ordered) => {
777                        let mut subjects: Vec<_> = ordered.into_iter().collect();
778                        subjects.sort_by(|a, b| a.1.cmp(&b.1));
779                        subjects.into_iter().map(|(s, _)| s.clone()).collect()
780                    }
781                    Err(_) => {
782                        tracing::info!("Cycle detected, attempting to delete anyway");
783                        // Cycle detected or other error - fall back to deleting in any order
784                        graphs.into_keys().collect()
785                    }
786                };
787
788                for subject in subjects_to_delete {
789                    match self.ccsr_client.delete_subject(&subject.subject).await {
790                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
791                        Err(e) => errors.push(e.into()),
792                    }
793                }
794            }
795            Err(e) => {
796                errors.push(e);
797            }
798        }
799        errors
800    }
801}
802
803/// Configuration for the Catalog.
804#[derive(Debug, Clone)]
805pub struct CatalogConfig {
806    /// Handle to the persist consensus system.
807    pub persist_consensus_url: SensitiveUrl,
808    /// Handle to the persist blob storage.
809    pub persist_blob_url: SensitiveUrl,
810}
811
812pub enum ControlFlow {
813    Continue,
814    SkipBegin,
815    SkipEnd,
816}
817
818#[async_trait]
819pub(crate) trait Run {
820    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
821}
822
823#[async_trait]
824impl Run for PosCommand {
825    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
826        macro_rules! handle_version {
827            ($version_constraint:expr) => {
828                match $version_constraint {
829                    Some(VersionConstraint { min, max }) => {
830                        match version_check::run_version_check(min, max, state).await {
831                            Ok(true) => return Ok(ControlFlow::Continue),
832                            Ok(false) => {}
833                            Err(err) => return Err(PosError::new(err, self.pos)),
834                        }
835                    }
836                    None => {}
837                }
838            };
839        }
840
841        let wrap_err = |e| PosError::new(e, self.pos);
842        // Substitute variables at startup except for the command-specific ones
843        // Those will be substituted at runtime
844        let ignore_prefix = match &self.command {
845            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
846            _ => None,
847        };
848        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
849            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
850        };
851        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
852            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
853        };
854
855        let r = match self.command {
856            Command::Builtin(mut builtin, version_constraint) => {
857                handle_version!(version_constraint);
858                for val in builtin.args.values_mut() {
859                    *val = subst(val, &state.cmd_vars)?;
860                }
861                for line in &mut builtin.input {
862                    *line = subst(line, &state.cmd_vars)?;
863                }
864                match builtin.name.as_ref() {
865                    "check-consistency" => consistency::run_consistency_checks(state).await,
866                    "skip-consistency-checks" => {
867                        consistency::skip_consistency_checks(builtin, state)
868                    }
869                    "check-shard-tombstone" => {
870                        consistency::run_check_shard_tombstone(builtin, state).await
871                    }
872                    "duckdb-execute" => duckdb::run_execute(builtin, state).await,
873                    "duckdb-query" => duckdb::run_query(builtin, state).await,
874                    "fivetran-destination" => {
875                        fivetran::run_destination_command(builtin, state).await
876                    }
877                    "file-append" => file::run_append(builtin, state).await,
878                    "file-delete" => file::run_delete(builtin, state).await,
879                    "glue-create-schema" => glue::run_create_schema(builtin, state).await,
880                    "http-request" => http::run_request(builtin, state).await,
881                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
882                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
883                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
884                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
885                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
886                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
887                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
888                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
889                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
890                    "mysql-connect" => mysql::run_connect(builtin, state).await,
891                    "mysql-execute" => mysql::run_execute(builtin, state).await,
892                    "nop" => nop::run_nop(),
893                    "postgres-connect" => postgres::run_connect(builtin, state).await,
894                    "postgres-execute" => postgres::run_execute(builtin, state).await,
895                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
896                    "protobuf-compile-descriptors" => {
897                        protobuf::run_compile_descriptors(builtin, state).await
898                    }
899                    "psql-execute" => psql::run_execute(builtin, state).await,
900                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
901                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
902                    "s3-file-upload" => s3::run_upload(builtin, state).await,
903                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
904                    "s3-upload-parquet-types" => s3::run_upload_parquet_types(builtin, state).await,
905                    "s3-upload-parquet-unsorted-map" => {
906                        s3::run_upload_parquet_unsorted_map(builtin, state).await
907                    }
908                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
909                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
910                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
911                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
912                    "skip-end" => skip_end::run_skip_end(),
913                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
914                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
915                    "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
916                    "persist-force-compaction" => {
917                        persist::run_force_compaction(builtin, state).await
918                    }
919                    "random-sleep" => sleep::run_random_sleep(builtin),
920                    "set-regex" => set::run_regex_set(builtin, state),
921                    "unset-regex" => set::run_regex_unset(builtin, state),
922                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
923                    "set-max-tries" => set::run_max_tries(builtin, state),
924                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
925                        sleep::run_sleep(builtin)
926                    }
927                    "set" => set::set_vars(builtin, state),
928                    "set-arg-default" => set::run_set_arg_default(builtin, state),
929                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
930                    "set-from-file" => set::run_set_from_file(builtin, state).await,
931                    "webhook-append" => webhook::run_append(builtin, state).await,
932                    _ => {
933                        return Err(PosError::new(
934                            anyhow!("unknown built-in command {}", builtin.name),
935                            self.pos,
936                        ));
937                    }
938                }
939            }
940            Command::Sql(mut sql, version_constraint) => {
941                handle_version!(version_constraint);
942                sql.query = subst(&sql.query, &state.cmd_vars)?;
943                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
944                    for row in expected_rows {
945                        for col in row {
946                            *col = subst(col, &state.cmd_vars)?;
947                        }
948                    }
949                }
950                sql::run_sql(sql, state).await
951            }
952            Command::FailSql(mut sql, version_constraint) => {
953                handle_version!(version_constraint);
954                sql.query = subst(&sql.query, &state.cmd_vars)?;
955                sql.expected_error = match &sql.expected_error {
956                    SqlExpectedError::Contains(s) => {
957                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
958                    }
959                    SqlExpectedError::Exact(s) => {
960                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
961                    }
962                    SqlExpectedError::Regex(s) => {
963                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
964                    }
965                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
966                };
967                sql::run_fail_sql(sql, state).await
968            }
969        };
970
971        r.map_err(wrap_err)
972    }
973}
974
975/// Substituted `${}`-delimited variables from `vars` into `msg`
976fn substitute_vars(
977    msg: &str,
978    vars: &BTreeMap<String, String>,
979    ignore_prefix: &Option<String>,
980    regex_escape: bool,
981) -> Result<String, anyhow::Error> {
982    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
983    let mut err = None;
984    let out = RE.replace_all(msg, |caps: &Captures| {
985        let name = &caps[1];
986        if let Some(ignore_prefix) = &ignore_prefix {
987            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
988                // Do not substitute, leave original variable name in place
989                return caps.get(0).unwrap().as_str().to_string();
990            }
991        }
992
993        if let Some(val) = vars.get(name) {
994            if regex_escape {
995                regex::escape(val)
996            } else {
997                val.to_string()
998            }
999        } else {
1000            err = Some(anyhow!("unknown variable: {}", name));
1001            "#VAR-MISSING#".to_string()
1002        }
1003    });
1004    match err {
1005        Some(err) => Err(err),
1006        None => Ok(out.into_owned()),
1007    }
1008}
1009
1010/// Initializes a [`State`] object by connecting to the various external
1011/// services specified in `config`.
1012///
1013/// Returns the initialized `State` and a cleanup future. The cleanup future
1014/// should be `await`ed only *after* dropping the `State` to check whether any
1015/// errors occured while dropping the `State`. This awkward API is a workaround
1016/// for the lack of `AsyncDrop` support in Rust.
1017pub async fn create_state(
1018    config: &Config,
1019) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
1020    let seed = config
1021        .seed
1022        .clone()
1023        .unwrap_or_else(|| format!("{:010}", rand::random::<u32>()));
1024
1025    let (_tempfile, temp_path) = match &config.temp_dir {
1026        Some(temp_dir) => {
1027            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
1028            (None, PathBuf::from(&temp_dir))
1029        }
1030        _ => {
1031            // Stash the tempfile object so that it does not go out of scope and delete
1032            // the tempdir prematurely
1033            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
1034            let temp_path = tempfile_handle.path().to_path_buf();
1035            (Some(tempfile_handle), temp_path)
1036        }
1037    };
1038
1039    let materialize_catalog_config = config.materialize_catalog_config.clone();
1040
1041    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1042    info!("Connecting to {}", materialize_url.as_str());
1043    let (pgclient, pgconn) = Retry::default()
1044        .max_duration(config.default_timeout)
1045        .retry_async_canceling(|_| async move {
1046            let mut pgconfig = config.materialize_pgconfig.clone();
1047            pgconfig.connect_timeout(config.default_timeout);
1048            let tls = make_tls(&pgconfig)?;
1049            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
1050        })
1051        .await?;
1052
1053    let pgconn_task =
1054        task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
1055
1056    let materialize_state =
1057        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
1058
1059    let schema_registry_url = config.schema_registry_url.to_owned();
1060
1061    let ccsr_client = {
1062        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
1063
1064        if let Some(cert_path) = &config.cert_path {
1065            let cert = fs::read(cert_path).context("reading cert")?;
1066            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
1067            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
1068                .context("reading keystore file as pkcs12")?;
1069            ccsr_config = ccsr_config.identity(ident);
1070        }
1071
1072        if let Some(ccsr_username) = &config.ccsr_username {
1073            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1074        }
1075
1076        ccsr_config.build().context("Creating CCSR client")?
1077    };
1078
1079    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1080        use rdkafka::admin::{AdminClient, AdminOptions};
1081        use rdkafka::producer::FutureProducer;
1082
1083        let mut kafka_config = create_new_client_config_simple();
1084        kafka_config.set("bootstrap.servers", &config.kafka_addr);
1085        kafka_config.set("group.id", "materialize-testdrive");
1086        kafka_config.set("auto.offset.reset", "earliest");
1087        kafka_config.set("isolation.level", "read_committed");
1088        if let Some(cert_path) = &config.cert_path {
1089            kafka_config.set("security.protocol", "ssl");
1090            kafka_config.set("ssl.keystore.location", cert_path);
1091            if let Some(cert_password) = &config.cert_password {
1092                kafka_config.set("ssl.keystore.password", cert_password);
1093            }
1094        }
1095        kafka_config.set("message.max.bytes", "15728640");
1096
1097        for (key, value) in &config.kafka_opts {
1098            kafka_config.set(key, value);
1099        }
1100
1101        let admin: AdminClient<_> = kafka_config
1102            .create_with_context(MzClientContext::default())
1103            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1104
1105        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1106
1107        let producer: FutureProducer<_> = kafka_config
1108            .create_with_context(MzClientContext::default())
1109            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1110
1111        let topics = BTreeMap::new();
1112
1113        (
1114            config.kafka_addr.to_owned(),
1115            admin,
1116            admin_opts,
1117            producer,
1118            topics,
1119            kafka_config,
1120        )
1121    };
1122
1123    let mut state = State {
1124        config: config.clone(),
1125
1126        // === Testdrive state. ===
1127        arg_vars: config.arg_vars.clone(),
1128        cmd_vars: BTreeMap::new(),
1129        seed,
1130        temp_path,
1131        _tempfile,
1132        default_timeout: config.default_timeout,
1133        timeout: config.default_timeout,
1134        max_tries: config.default_max_tries,
1135        initial_backoff: config.initial_backoff,
1136        backoff_factor: config.backoff_factor,
1137        consistency_checks: config.consistency_checks,
1138        check_statement_logging: config.check_statement_logging,
1139        consistency_checks_adhoc_skip: false,
1140        regex: None,
1141        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1142        rewrite_results: config.rewrite_results,
1143        error_line_count: 0,
1144        error_string: "".to_string(),
1145
1146        // === Materialize state. ===
1147        materialize: materialize_state,
1148
1149        // === Persist state. ===
1150        persist_consensus_url: config.persist_consensus_url.clone(),
1151        persist_blob_url: config.persist_blob_url.clone(),
1152        build_info: config.build_info,
1153        persist_clients: PersistClientCache::new(
1154            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1155            &MetricsRegistry::new(),
1156            |_, _| PubSubClientConnection::noop(),
1157        ),
1158
1159        // === Confluent state. ===
1160        schema_registry_url,
1161        ccsr_client,
1162        kafka_addr,
1163        kafka_admin,
1164        kafka_admin_opts,
1165        kafka_config,
1166        kafka_default_partitions: config.kafka_default_partitions,
1167        kafka_producer,
1168        kafka_topics,
1169
1170        // === AWS state. ===
1171        aws_account: config.aws_account.clone(),
1172        aws_config: config.aws_config.clone(),
1173
1174        // === Database driver state. ===
1175        duckdb_clients: BTreeMap::new(),
1176        mysql_clients: BTreeMap::new(),
1177        postgres_clients: BTreeMap::new(),
1178        sql_server_clients: BTreeMap::new(),
1179
1180        // === Fivetran state. ===
1181        fivetran_destination_url: config.fivetran_destination_url.clone(),
1182        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1183
1184        rewrites: Vec::new(),
1185        rewrite_pos_start: 0,
1186        rewrite_pos_end: 0,
1187    };
1188    state.initialize_cmd_vars().await?;
1189    Ok((state, pgconn_task))
1190}
1191
1192async fn create_materialize_state(
1193    config: &&Config,
1194    materialize_catalog_config: Option<CatalogConfig>,
1195    pgclient: tokio_postgres::Client,
1196) -> Result<MaterializeState, anyhow::Error> {
1197    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1198    let materialize_internal_url =
1199        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1200
1201    for (key, value) in &config.materialize_params {
1202        // Session parameter values are raw SQL fragments from testdrive config.
1203        #[allow(clippy::disallowed_methods)]
1204        pgclient
1205            .batch_execute(&format!("SET {key} = {value}"))
1206            .await
1207            .context("setting session parameter")?;
1208    }
1209
1210    let materialize_user = config
1211        .materialize_pgconfig
1212        .get_user()
1213        .expect("testdrive URL must contain user")
1214        .to_string();
1215
1216    let materialize_sql_addr = format!(
1217        "{}:{}",
1218        materialize_url.host_str().unwrap(),
1219        materialize_url.port().unwrap()
1220    );
1221    let materialize_http_addr = format!(
1222        "{}:{}",
1223        materialize_url.host_str().unwrap(),
1224        config.materialize_http_port
1225    );
1226    let materialize_internal_sql_addr = format!(
1227        "{}:{}",
1228        materialize_internal_url.host_str().unwrap(),
1229        materialize_internal_url.port().unwrap()
1230    );
1231    let materialize_password_sql_addr = format!(
1232        "{}:{}",
1233        materialize_url.host_str().unwrap(),
1234        config.materialize_password_sql_port
1235    );
1236    let materialize_sasl_sql_addr = format!(
1237        "{}:{}",
1238        materialize_url.host_str().unwrap(),
1239        config.materialize_sasl_sql_port
1240    );
1241    let materialize_internal_http_addr = format!(
1242        "{}:{}",
1243        materialize_internal_url.host_str().unwrap(),
1244        config.materialize_internal_http_port
1245    );
1246    let environment_id = pg_query_one(&pgclient, pg_sql!("SELECT mz_environment_id()"), &[])
1247        .await?
1248        .get::<_, String>(0)
1249        .parse()
1250        .context("parsing environment ID")?;
1251
1252    let bootstrap_args = BootstrapArgs {
1253        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1254        default_cluster_replica_size: "ABC".to_string(),
1255        default_cluster_replication_factor: 1,
1256        bootstrap_role: None,
1257    };
1258
1259    let materialize_state = MaterializeState {
1260        catalog_config: materialize_catalog_config,
1261        sql_addr: materialize_sql_addr,
1262        use_https: config.materialize_use_https,
1263        http_addr: materialize_http_addr,
1264        internal_sql_addr: materialize_internal_sql_addr,
1265        internal_http_addr: materialize_internal_http_addr,
1266        password_sql_addr: materialize_password_sql_addr,
1267        sasl_sql_addr: materialize_sasl_sql_addr,
1268        user: materialize_user,
1269        pgclient,
1270        environment_id,
1271        bootstrap_args,
1272    };
1273
1274    Ok(materialize_state)
1275}