Skip to main content

mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_ccsr::SubjectVersion;
30use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
31use mz_ore::error::ErrorExt;
32use mz_ore::metrics::MetricsRegistry;
33use mz_ore::now::SYSTEM_TIME;
34use mz_ore::retry::Retry;
35use mz_ore::task;
36use mz_ore::url::SensitiveUrl;
37use mz_persist_client::cache::PersistClientCache;
38use mz_persist_client::cfg::PersistConfig;
39use mz_persist_client::rpc::PubSubClientConnection;
40use mz_persist_client::{PersistClient, PersistLocation};
41use mz_sql::catalog::EnvironmentId;
42use mz_tls_util::make_tls;
43use rdkafka::ClientConfig;
44use rdkafka::producer::Producer;
45use regex::{Captures, Regex};
46use semver::Version;
47use tokio_postgres::error::{DbError, SqlState};
48use tracing::info;
49use url::Url;
50
51use crate::error::PosError;
52use crate::parser::{
53    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
54};
55use crate::util;
56use crate::util::postgres::postgres_client;
57
58pub mod consistency;
59
60mod duckdb;
61mod file;
62mod fivetran;
63mod http;
64mod kafka;
65mod mysql;
66mod nop;
67mod persist;
68mod postgres;
69mod protobuf;
70mod psql;
71mod s3;
72mod schema_registry;
73mod set;
74mod skip_end;
75mod skip_if;
76mod sleep;
77mod sql;
78mod sql_server;
79mod version_check;
80mod webhook;
81
82/// User-settable configuration parameters.
83#[derive(Debug, Clone)]
84pub struct Config {
85    // === Testdrive options. ===
86    /// Variables to make available to the testdrive script.
87    ///
88    /// The value of each entry will be made available to the script in a
89    /// variable named `arg.KEY`.
90    pub arg_vars: BTreeMap<String, String>,
91    /// A random number to distinguish each run of a testdrive script.
92    pub seed: Option<String>,
93    /// Whether to reset Materialize state before executing each script and
94    /// to clean up AWS state after each script.
95    pub reset: bool,
96    /// Force the use of the specified temporary directory to use.
97    ///
98    /// If unspecified, testdrive creates a temporary directory with a random
99    /// name.
100    pub temp_dir: Option<String>,
101    /// Source string to print out on errors.
102    pub source: Option<String>,
103    /// The default timeout for cancellable operations.
104    pub default_timeout: Duration,
105    /// The default number of tries for retriable operations.
106    pub default_max_tries: usize,
107    /// The initial backoff interval for retry operations.
108    ///
109    /// Set to 0 to retry immediately on failure.
110    pub initial_backoff: Duration,
111    /// Backoff factor to use for retry operations.
112    ///
113    /// Set to 1 to retry at a steady pace.
114    pub backoff_factor: f64,
115    /// Should we skip coordinator and catalog consistency checks.
116    pub consistency_checks: consistency::Level,
117    /// Whether to run statement logging consistency checks (adds a few seconds at the end of every
118    /// test file).
119    pub check_statement_logging: bool,
120    /// Whether to automatically rewrite wrong results instead of failing.
121    pub rewrite_results: bool,
122
123    // === Materialize options. ===
124    /// The pgwire connection parameters for the Materialize instance that
125    /// testdrive will connect to.
126    pub materialize_pgconfig: tokio_postgres::Config,
127    /// The internal pgwire connection parameters for the Materialize instance that
128    /// testdrive will connect to.
129    pub materialize_internal_pgconfig: tokio_postgres::Config,
130    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
131    pub materialize_use_https: bool,
132    /// The port for the public endpoints of the materialize instance that
133    /// testdrive will connect to via HTTP.
134    pub materialize_http_port: u16,
135    /// The port for the internal endpoints of the materialize instance that
136    /// testdrive will connect to via HTTP.
137    pub materialize_internal_http_port: u16,
138    /// The port for the password endpoints of the materialize instance that
139    /// testdrive will connect to via SQL.
140    pub materialize_password_sql_port: u16,
141    /// The port for the SASL endpoints of the materialize instance that
142    /// testdrive will connect to via SQL.
143    pub materialize_sasl_sql_port: u16,
144    /// Session parameters to set after connecting to materialize.
145    pub materialize_params: Vec<(String, String)>,
146    /// An optional catalog configuration.
147    pub materialize_catalog_config: Option<CatalogConfig>,
148    /// Build information
149    pub build_info: &'static BuildInfo,
150    /// Configured cluster replica sizes
151    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
152
153    // === Persist options. ===
154    /// Handle to the persist consensus system.
155    pub persist_consensus_url: Option<SensitiveUrl>,
156    /// Handle to the persist blob storage.
157    pub persist_blob_url: Option<SensitiveUrl>,
158
159    // === Confluent options. ===
160    /// The address of the Kafka broker that testdrive will interact with.
161    pub kafka_addr: String,
162    /// Default number of partitions to use for topics
163    pub kafka_default_partitions: usize,
164    /// Arbitrary rdkafka options for testdrive to use when connecting to the
165    /// Kafka broker.
166    pub kafka_opts: Vec<(String, String)>,
167    /// The URL of the schema registry that testdrive will connect to.
168    pub schema_registry_url: Url,
169    /// An optional path to a TLS certificate that testdrive will present when
170    /// performing client authentication.
171    ///
172    /// The keystore must be in the PKCS#12 format.
173    pub cert_path: Option<String>,
174    /// An optional password for the TLS certificate.
175    pub cert_password: Option<String>,
176    /// An optional username for basic authentication with the Confluent Schema
177    /// Registry.
178    pub ccsr_username: Option<String>,
179    /// An optional password for basic authentication with the Confluent Schema
180    /// Registry.
181    pub ccsr_password: Option<String>,
182
183    // === AWS options. ===
184    /// The configuration to use when connecting to AWS.
185    pub aws_config: SdkConfig,
186    /// The ID of the AWS account that `aws_config` configures.
187    pub aws_account: String,
188
189    // === Fivetran options. ===
190    /// Address of the Fivetran Destination that is currently running.
191    pub fivetran_destination_url: String,
192    /// Directory that is accessible to the Fivetran Destination.
193    pub fivetran_destination_files_path: String,
194}
195
196pub struct MaterializeState {
197    catalog_config: Option<CatalogConfig>,
198
199    sql_addr: String,
200    use_https: bool,
201    http_addr: String,
202    internal_sql_addr: String,
203    internal_http_addr: String,
204    password_sql_addr: String,
205    sasl_sql_addr: String,
206    user: String,
207    pgclient: tokio_postgres::Client,
208    environment_id: EnvironmentId,
209    bootstrap_args: BootstrapArgs,
210}
211
212pub struct State {
213    // The Config that this `State` was originally created from.
214    pub config: Config,
215
216    // === Testdrive state. ===
217    arg_vars: BTreeMap<String, String>,
218    cmd_vars: BTreeMap<String, String>,
219    seed: String,
220    temp_path: PathBuf,
221    _tempfile: Option<tempfile::TempDir>,
222    default_timeout: Duration,
223    timeout: Duration,
224    max_tries: usize,
225    initial_backoff: Duration,
226    backoff_factor: f64,
227    consistency_checks: consistency::Level,
228    check_statement_logging: bool,
229    consistency_checks_adhoc_skip: bool,
230    regex: Option<Regex>,
231    regex_replacement: String,
232    error_line_count: usize,
233    error_string: String,
234
235    // === Materialize state. ===
236    materialize: MaterializeState,
237
238    // === Persist state. ===
239    persist_consensus_url: Option<SensitiveUrl>,
240    persist_blob_url: Option<SensitiveUrl>,
241    build_info: &'static BuildInfo,
242    persist_clients: PersistClientCache,
243
244    // === Confluent state. ===
245    schema_registry_url: Url,
246    ccsr_client: mz_ccsr::Client,
247    kafka_addr: String,
248    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
249    kafka_admin_opts: rdkafka::admin::AdminOptions,
250    kafka_config: ClientConfig,
251    kafka_default_partitions: usize,
252    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
253    kafka_topics: BTreeMap<String, usize>,
254
255    // === AWS state. ===
256    aws_account: String,
257    aws_config: SdkConfig,
258
259    // === Database driver state. ===
260    pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
261    mysql_clients: BTreeMap<String, mysql_async::Conn>,
262    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
263    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
264
265    // === Fivetran state. ===
266    fivetran_destination_url: String,
267    fivetran_destination_files_path: String,
268
269    // === Rewrite state. ===
270    rewrite_results: bool,
271    /// Current file, results are replaced inline
272    pub rewrites: Vec<Rewrite>,
273    /// Start position of currently expected result
274    pub rewrite_pos_start: usize,
275    /// End position of currently expected result
276    pub rewrite_pos_end: usize,
277}
278
279pub struct Rewrite {
280    pub content: String,
281    pub start: usize,
282    pub end: usize,
283}
284
285impl State {
286    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
287        self.cmd_vars
288            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
289        self.cmd_vars.insert(
290            "testdrive.schema-registry-url".into(),
291            self.schema_registry_url.to_string(),
292        );
293        self.cmd_vars
294            .insert("testdrive.seed".into(), self.seed.clone());
295        self.cmd_vars.insert(
296            "testdrive.temp-dir".into(),
297            self.temp_path.display().to_string(),
298        );
299        self.cmd_vars
300            .insert("testdrive.aws-region".into(), self.aws_region().into());
301        self.cmd_vars
302            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
303        self.cmd_vars
304            .insert("testdrive.aws-account".into(), self.aws_account.clone());
305        {
306            let aws_credentials = self
307                .aws_config
308                .credentials_provider()
309                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
310                .provide_credentials()
311                .await
312                .context("fetching AWS credentials")?;
313            self.cmd_vars.insert(
314                "testdrive.aws-access-key-id".into(),
315                aws_credentials.access_key_id().to_owned(),
316            );
317            self.cmd_vars.insert(
318                "testdrive.aws-secret-access-key".into(),
319                aws_credentials.secret_access_key().to_owned(),
320            );
321            self.cmd_vars.insert(
322                "testdrive.aws-token".into(),
323                aws_credentials
324                    .session_token()
325                    .map(|token| token.to_owned())
326                    .unwrap_or_else(String::new),
327            );
328        }
329        self.cmd_vars.insert(
330            "testdrive.materialize-environment-id".into(),
331            self.materialize.environment_id.to_string(),
332        );
333        self.cmd_vars.insert(
334            "testdrive.materialize-sql-addr".into(),
335            self.materialize.sql_addr.clone(),
336        );
337        self.cmd_vars.insert(
338            "testdrive.materialize-internal-sql-addr".into(),
339            self.materialize.internal_sql_addr.clone(),
340        );
341        self.cmd_vars.insert(
342            "testdrive.materialize-password-sql-addr".into(),
343            self.materialize.password_sql_addr.clone(),
344        );
345        self.cmd_vars.insert(
346            "testdrive.materialize-sasl-sql-addr".into(),
347            self.materialize.sasl_sql_addr.clone(),
348        );
349        self.cmd_vars.insert(
350            "testdrive.materialize-user".into(),
351            self.materialize.user.clone(),
352        );
353        self.cmd_vars.insert(
354            "testdrive.fivetran-destination-url".into(),
355            self.fivetran_destination_url.clone(),
356        );
357        self.cmd_vars.insert(
358            "testdrive.fivetran-destination-files-path".into(),
359            self.fivetran_destination_files_path.clone(),
360        );
361
362        for (key, value) in env::vars() {
363            self.cmd_vars.insert(format!("env.{}", key), value);
364        }
365
366        for (key, value) in &self.arg_vars {
367            validate_ident(key)?;
368            self.cmd_vars
369                .insert(format!("arg.{}", key), value.to_string());
370        }
371
372        Ok(())
373    }
374    /// Makes of copy of the durable catalog and runs a function on its
375    /// state. Returns `None` if there's no catalog information in the State.
376    pub async fn with_catalog_copy<F, T>(
377        &self,
378        system_parameter_defaults: BTreeMap<String, String>,
379        build_info: &'static BuildInfo,
380        bootstrap_args: &BootstrapArgs,
381        enable_expression_cache_override: Option<bool>,
382        f: F,
383    ) -> Result<Option<T>, anyhow::Error>
384    where
385        F: FnOnce(ConnCatalog) -> T,
386    {
387        async fn persist_client(
388            persist_consensus_url: SensitiveUrl,
389            persist_blob_url: SensitiveUrl,
390            persist_clients: &PersistClientCache,
391        ) -> Result<PersistClient, anyhow::Error> {
392            let persist_location = PersistLocation {
393                blob_uri: persist_blob_url,
394                consensus_uri: persist_consensus_url,
395            };
396            Ok(persist_clients.open(persist_location).await?)
397        }
398
399        if let Some(CatalogConfig {
400            persist_consensus_url,
401            persist_blob_url,
402        }) = &self.materialize.catalog_config
403        {
404            let persist_client = persist_client(
405                persist_consensus_url.clone(),
406                persist_blob_url.clone(),
407                &self.persist_clients,
408            )
409            .await?;
410            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
411                persist_client,
412                SYSTEM_TIME.clone(),
413                self.materialize.environment_id.clone(),
414                system_parameter_defaults,
415                build_info,
416                bootstrap_args,
417                enable_expression_cache_override,
418            )
419            .await?;
420            let res = f(catalog.for_session(&Session::dummy()));
421            catalog.expire().await;
422            Ok(Some(res))
423        } else {
424            Ok(None)
425        }
426    }
427
428    pub fn aws_endpoint(&self) -> &str {
429        self.aws_config.endpoint_url().unwrap_or("")
430    }
431
432    pub fn aws_region(&self) -> &str {
433        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
434    }
435
436    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
437    /// the consistency checks should be skipped for this current run.
438    pub fn clear_skip_consistency_checks(&mut self) -> bool {
439        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
440    }
441
442    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
443        let (inner_client, _) = postgres_client(
444            &format!(
445                "postgres://mz_system:materialize@{}",
446                self.materialize.internal_sql_addr
447            ),
448            self.default_timeout,
449        )
450        .await?;
451
452        let version = inner_client
453            .query_one("SELECT mz_version_num()", &[])
454            .await
455            .context("getting version of materialize")
456            .map(|row| row.get::<_, i32>(0))?;
457
458        let semver = inner_client
459            .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
460            .await
461            .context("getting semver of materialize")
462            .map(|row| row.get::<_, String>(0))?
463            .parse::<semver::Version>()
464            .context("parsing semver of materialize")?;
465
466        inner_client
467            .batch_execute("ALTER SYSTEM RESET ALL")
468            .await
469            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
470
471        // Dangerous functions are useful for tests so we enable it for all tests.
472        {
473            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
474            let enable_unsafe_functions = if semver >= rename_version {
475                "unsafe_enable_unsafe_functions"
476            } else {
477                "enable_unsafe_functions"
478            };
479            let res = inner_client
480                .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
481                .await
482                .context("enabling dangerous functions");
483            if let Err(e) = res {
484                match e.root_cause().downcast_ref::<DbError>() {
485                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
486                        info!(
487                            "can't enable unsafe functions because the server is safe mode; \
488                             testdrive scripts will fail if they use unsafe functions",
489                        );
490                    }
491                    _ => return Err(e),
492                }
493            }
494        }
495
496        for row in inner_client
497            .query("SHOW DATABASES", &[])
498            .await
499            .context("resetting materialize state: SHOW DATABASES")?
500        {
501            let db_name: String = row.get(0);
502            if db_name.starts_with("testdrive_no_reset_") {
503                continue;
504            }
505            let query = format!(
506                "DROP DATABASE {}",
507                postgres_protocol::escape::escape_identifier(&db_name)
508            );
509            sql::print_query(&query, None);
510            inner_client.batch_execute(&query).await.context(format!(
511                "resetting materialize state: DROP DATABASE {}",
512                db_name,
513            ))?;
514        }
515
516        // Get all user clusters not running any objects owned by users
517        let inactive_user_clusters = "
518        WITH
519            active_user_clusters AS
520            (
521                SELECT DISTINCT cluster_id, object_id
522                FROM
523                    (
524                        SELECT cluster_id, id FROM mz_catalog.mz_sources
525                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
526                        UNION ALL
527                            SELECT cluster_id, id
528                            FROM mz_catalog.mz_materialized_views
529                        UNION ALL
530                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
531                        UNION ALL
532                            SELECT cluster_id, id
533                            FROM mz_internal.mz_subscriptions
534                    )
535                    AS t (cluster_id, object_id)
536                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
537            )
538        SELECT name
539        FROM mz_catalog.mz_clusters
540        WHERE
541            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
542                AND
543            owner_id LIKE 'u%';";
544
545        let inactive_clusters = inner_client
546            .query(inactive_user_clusters, &[])
547            .await
548            .context("resetting materialize state: inactive_user_clusters")?;
549
550        if !inactive_clusters.is_empty() {
551            println!("cleaning up user clusters from previous tests...")
552        }
553
554        for cluster_name in inactive_clusters {
555            let cluster_name: String = cluster_name.get(0);
556            if cluster_name.starts_with("testdrive_no_reset_") {
557                continue;
558            }
559            let query = format!(
560                "DROP CLUSTER {}",
561                postgres_protocol::escape::escape_identifier(&cluster_name)
562            );
563            sql::print_query(&query, None);
564            inner_client.batch_execute(&query).await.context(format!(
565                "resetting materialize state: DROP CLUSTER {}",
566                cluster_name,
567            ))?;
568        }
569
570        inner_client
571            .batch_execute("CREATE DATABASE materialize")
572            .await
573            .context("resetting materialize state: CREATE DATABASE materialize")?;
574
575        // Attempt to remove all users but the current user. Old versions of
576        // Materialize did not support roles, so this degrades gracefully if
577        // mz_roles does not exist.
578        if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
579            for row in rows {
580                let role_name: String = row.get(0);
581                if role_name == self.materialize.user || role_name.starts_with("mz_") {
582                    continue;
583                }
584                let query = format!(
585                    "DROP ROLE {}",
586                    postgres_protocol::escape::escape_identifier(&role_name)
587                );
588                sql::print_query(&query, None);
589                inner_client.batch_execute(&query).await.context(format!(
590                    "resetting materialize state: DROP ROLE {}",
591                    role_name,
592                ))?;
593            }
594        }
595
596        // Alter materialize user with all system privileges.
597        inner_client
598            .batch_execute(&format!(
599                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
600                self.materialize.user
601            ))
602            .await?;
603
604        // Grant initial privileges.
605        inner_client
606            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
607            .await?;
608        inner_client
609            .batch_execute(&format!(
610                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
611                self.materialize.user
612            ))
613            .await?;
614        inner_client
615            .batch_execute(&format!(
616                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
617                self.materialize.user
618            ))
619            .await?;
620
621        let cluster = match version {
622            ..=8199 => "default",
623            8200.. => "quickstart",
624        };
625        inner_client
626            .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
627            .await?;
628        inner_client
629            .batch_execute(&format!(
630                "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
631                self.materialize.user
632            ))
633            .await?;
634
635        Ok(())
636    }
637
638    /// Delete Kafka topics + CCSR subjects that were created in this run
639    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
640        use rdkafka::types::RDKafkaErrorCode;
641        let mut errors: Vec<anyhow::Error> = Vec::new();
642
643        let metadata = self.kafka_producer.client().fetch_metadata(
644            None,
645            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
646        )?;
647
648        let testdrive_topics: Vec<_> = metadata
649            .topics()
650            .iter()
651            .filter_map(|t| {
652                if t.name().starts_with("testdrive-") {
653                    Some(t.name())
654                } else {
655                    None
656                }
657            })
658            .collect();
659
660        if !testdrive_topics.is_empty() {
661            match self
662                .kafka_admin
663                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
664                .await
665            {
666                Ok(res) => {
667                    if res.len() != testdrive_topics.len() {
668                        errors.push(anyhow!(
669                            "kafka topic deletion returned {} results, but exactly {} expected",
670                            res.len(),
671                            testdrive_topics.len()
672                        ));
673                    }
674                    for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
675                        match res {
676                            Ok(_) | Err((_, RDKafkaErrorCode::UnknownTopicOrPartition)) => (),
677                            Err((_, err)) => {
678                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
679                            }
680                        }
681                    }
682                }
683                Err(e) => {
684                    errors.push(e.into());
685                }
686            };
687        }
688
689        let schema_registry_errors = self.reset_schema_registry().await;
690
691        errors.extend(schema_registry_errors);
692        if errors.is_empty() {
693            Ok(())
694        } else {
695            bail!(
696                "deleting Kafka topics: {} errors: {}",
697                errors.len(),
698                errors
699                    .into_iter()
700                    .map(|e| e.to_string_with_causes())
701                    .join("\n")
702            );
703        }
704    }
705
706    #[allow(clippy::disallowed_types)]
707    async fn reset_schema_registry(&self) -> Vec<anyhow::Error> {
708        use std::collections::HashMap;
709
710        let mut errors = Vec::new();
711        match self
712            .ccsr_client
713            .list_subjects()
714            .await
715            .context("listing schema registry subjects")
716        {
717            Ok(subjects) => {
718                let testdrive_subjects: Vec<_> = subjects
719                    .into_iter()
720                    .filter(|s| s.starts_with("testdrive-"))
721                    .collect();
722
723                // Build the dependency graphs: subject -> list of subjects it references
724                let mut graphs: HashMap<SubjectVersion, Vec<SubjectVersion>> = HashMap::new();
725
726                for subject in &testdrive_subjects {
727                    match self.ccsr_client.get_subject_with_references(subject).await {
728                        Ok((subj, refs)) => {
729                            // Filter to only testdrive subjects
730                            let refs: Vec<_> = refs
731                                .into_iter()
732                                .filter(|r| r.subject.starts_with("testdrive-"))
733                                .collect();
734                            graphs.insert(
735                                SubjectVersion {
736                                    subject: subj.name,
737                                    version: subj.version,
738                                },
739                                refs,
740                            );
741                        }
742                        Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => {
743                            // Subject was already deleted, skip it
744                        }
745                        Err(e) => {
746                            errors.push(anyhow::anyhow!(
747                                "failed to get references for subject {}: {}",
748                                subject,
749                                e
750                            ));
751                        }
752                    }
753                }
754
755                // Get topological ordering (0 = root/no dependencies, higher = more dependents)
756                // We need to delete in reverse order (highest first = subjects that depend on others)
757                let subjects_to_delete: Vec<_> = match mz_ccsr::topological_sort(&graphs) {
758                    Ok(ordered) => {
759                        let mut subjects: Vec<_> = ordered.into_iter().collect();
760                        subjects.sort_by(|a, b| a.1.cmp(&b.1));
761                        subjects.into_iter().map(|(s, _)| s.clone()).collect()
762                    }
763                    Err(_) => {
764                        tracing::info!("Cycle detected, attempting to delete anyway");
765                        // Cycle detected or other error - fall back to deleting in any order
766                        graphs.into_keys().collect()
767                    }
768                };
769
770                for subject in subjects_to_delete {
771                    match self.ccsr_client.delete_subject(&subject.subject).await {
772                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
773                        Err(e) => errors.push(e.into()),
774                    }
775                }
776            }
777            Err(e) => {
778                errors.push(e);
779            }
780        }
781        errors
782    }
783}
784
785/// Configuration for the Catalog.
786#[derive(Debug, Clone)]
787pub struct CatalogConfig {
788    /// Handle to the persist consensus system.
789    pub persist_consensus_url: SensitiveUrl,
790    /// Handle to the persist blob storage.
791    pub persist_blob_url: SensitiveUrl,
792}
793
794pub enum ControlFlow {
795    Continue,
796    SkipBegin,
797    SkipEnd,
798}
799
800#[async_trait]
801pub(crate) trait Run {
802    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
803}
804
805#[async_trait]
806impl Run for PosCommand {
807    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
808        macro_rules! handle_version {
809            ($version_constraint:expr) => {
810                match $version_constraint {
811                    Some(VersionConstraint { min, max }) => {
812                        match version_check::run_version_check(min, max, state).await {
813                            Ok(true) => return Ok(ControlFlow::Continue),
814                            Ok(false) => {}
815                            Err(err) => return Err(PosError::new(err, self.pos)),
816                        }
817                    }
818                    None => {}
819                }
820            };
821        }
822
823        let wrap_err = |e| PosError::new(e, self.pos);
824        // Substitute variables at startup except for the command-specific ones
825        // Those will be substituted at runtime
826        let ignore_prefix = match &self.command {
827            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
828            _ => None,
829        };
830        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
831            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
832        };
833        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
834            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
835        };
836
837        let r = match self.command {
838            Command::Builtin(mut builtin, version_constraint) => {
839                handle_version!(version_constraint);
840                for val in builtin.args.values_mut() {
841                    *val = subst(val, &state.cmd_vars)?;
842                }
843                for line in &mut builtin.input {
844                    *line = subst(line, &state.cmd_vars)?;
845                }
846                match builtin.name.as_ref() {
847                    "check-consistency" => consistency::run_consistency_checks(state).await,
848                    "skip-consistency-checks" => {
849                        consistency::skip_consistency_checks(builtin, state)
850                    }
851                    "check-shard-tombstone" => {
852                        consistency::run_check_shard_tombstone(builtin, state).await
853                    }
854                    "duckdb-execute" => duckdb::run_execute(builtin, state).await,
855                    "duckdb-query" => duckdb::run_query(builtin, state).await,
856                    "fivetran-destination" => {
857                        fivetran::run_destination_command(builtin, state).await
858                    }
859                    "file-append" => file::run_append(builtin, state).await,
860                    "file-delete" => file::run_delete(builtin, state).await,
861                    "http-request" => http::run_request(builtin, state).await,
862                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
863                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
864                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
865                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
866                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
867                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
868                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
869                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
870                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
871                    "mysql-connect" => mysql::run_connect(builtin, state).await,
872                    "mysql-execute" => mysql::run_execute(builtin, state).await,
873                    "nop" => nop::run_nop(),
874                    "postgres-connect" => postgres::run_connect(builtin, state).await,
875                    "postgres-execute" => postgres::run_execute(builtin, state).await,
876                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
877                    "protobuf-compile-descriptors" => {
878                        protobuf::run_compile_descriptors(builtin, state).await
879                    }
880                    "psql-execute" => psql::run_execute(builtin, state).await,
881                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
882                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
883                    "s3-file-upload" => s3::run_upload(builtin, state).await,
884                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
885                    "s3-upload-parquet-types" => s3::run_upload_parquet_types(builtin, state).await,
886                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
887                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
888                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
889                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
890                    "skip-end" => skip_end::run_skip_end(),
891                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
892                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
893                    "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
894                    "persist-force-compaction" => {
895                        persist::run_force_compaction(builtin, state).await
896                    }
897                    "random-sleep" => sleep::run_random_sleep(builtin),
898                    "set-regex" => set::run_regex_set(builtin, state),
899                    "unset-regex" => set::run_regex_unset(builtin, state),
900                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
901                    "set-max-tries" => set::run_max_tries(builtin, state),
902                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
903                        sleep::run_sleep(builtin)
904                    }
905                    "set" => set::set_vars(builtin, state),
906                    "set-arg-default" => set::run_set_arg_default(builtin, state),
907                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
908                    "set-from-file" => set::run_set_from_file(builtin, state).await,
909                    "webhook-append" => webhook::run_append(builtin, state).await,
910                    _ => {
911                        return Err(PosError::new(
912                            anyhow!("unknown built-in command {}", builtin.name),
913                            self.pos,
914                        ));
915                    }
916                }
917            }
918            Command::Sql(mut sql, version_constraint) => {
919                handle_version!(version_constraint);
920                sql.query = subst(&sql.query, &state.cmd_vars)?;
921                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
922                    for row in expected_rows {
923                        for col in row {
924                            *col = subst(col, &state.cmd_vars)?;
925                        }
926                    }
927                }
928                sql::run_sql(sql, state).await
929            }
930            Command::FailSql(mut sql, version_constraint) => {
931                handle_version!(version_constraint);
932                sql.query = subst(&sql.query, &state.cmd_vars)?;
933                sql.expected_error = match &sql.expected_error {
934                    SqlExpectedError::Contains(s) => {
935                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
936                    }
937                    SqlExpectedError::Exact(s) => {
938                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
939                    }
940                    SqlExpectedError::Regex(s) => {
941                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
942                    }
943                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
944                };
945                sql::run_fail_sql(sql, state).await
946            }
947        };
948
949        r.map_err(wrap_err)
950    }
951}
952
953/// Substituted `${}`-delimited variables from `vars` into `msg`
954fn substitute_vars(
955    msg: &str,
956    vars: &BTreeMap<String, String>,
957    ignore_prefix: &Option<String>,
958    regex_escape: bool,
959) -> Result<String, anyhow::Error> {
960    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
961    let mut err = None;
962    let out = RE.replace_all(msg, |caps: &Captures| {
963        let name = &caps[1];
964        if let Some(ignore_prefix) = &ignore_prefix {
965            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
966                // Do not substitute, leave original variable name in place
967                return caps.get(0).unwrap().as_str().to_string();
968            }
969        }
970
971        if let Some(val) = vars.get(name) {
972            if regex_escape {
973                regex::escape(val)
974            } else {
975                val.to_string()
976            }
977        } else {
978            err = Some(anyhow!("unknown variable: {}", name));
979            "#VAR-MISSING#".to_string()
980        }
981    });
982    match err {
983        Some(err) => Err(err),
984        None => Ok(out.into_owned()),
985    }
986}
987
988/// Initializes a [`State`] object by connecting to the various external
989/// services specified in `config`.
990///
991/// Returns the initialized `State` and a cleanup future. The cleanup future
992/// should be `await`ed only *after* dropping the `State` to check whether any
993/// errors occured while dropping the `State`. This awkward API is a workaround
994/// for the lack of `AsyncDrop` support in Rust.
995pub async fn create_state(
996    config: &Config,
997) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
998    let seed = config
999        .seed
1000        .clone()
1001        .unwrap_or_else(|| format!("{:010}", rand::random::<u32>()));
1002
1003    let (_tempfile, temp_path) = match &config.temp_dir {
1004        Some(temp_dir) => {
1005            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
1006            (None, PathBuf::from(&temp_dir))
1007        }
1008        _ => {
1009            // Stash the tempfile object so that it does not go out of scope and delete
1010            // the tempdir prematurely
1011            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
1012            let temp_path = tempfile_handle.path().to_path_buf();
1013            (Some(tempfile_handle), temp_path)
1014        }
1015    };
1016
1017    let materialize_catalog_config = config.materialize_catalog_config.clone();
1018
1019    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1020    info!("Connecting to {}", materialize_url.as_str());
1021    let (pgclient, pgconn) = Retry::default()
1022        .max_duration(config.default_timeout)
1023        .retry_async_canceling(|_| async move {
1024            let mut pgconfig = config.materialize_pgconfig.clone();
1025            pgconfig.connect_timeout(config.default_timeout);
1026            let tls = make_tls(&pgconfig)?;
1027            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
1028        })
1029        .await?;
1030
1031    let pgconn_task =
1032        task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
1033
1034    let materialize_state =
1035        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
1036
1037    let schema_registry_url = config.schema_registry_url.to_owned();
1038
1039    let ccsr_client = {
1040        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
1041
1042        if let Some(cert_path) = &config.cert_path {
1043            let cert = fs::read(cert_path).context("reading cert")?;
1044            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
1045            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
1046                .context("reading keystore file as pkcs12")?;
1047            ccsr_config = ccsr_config.identity(ident);
1048        }
1049
1050        if let Some(ccsr_username) = &config.ccsr_username {
1051            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1052        }
1053
1054        ccsr_config.build().context("Creating CCSR client")?
1055    };
1056
1057    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1058        use rdkafka::admin::{AdminClient, AdminOptions};
1059        use rdkafka::producer::FutureProducer;
1060
1061        let mut kafka_config = create_new_client_config_simple();
1062        kafka_config.set("bootstrap.servers", &config.kafka_addr);
1063        kafka_config.set("group.id", "materialize-testdrive");
1064        kafka_config.set("auto.offset.reset", "earliest");
1065        kafka_config.set("isolation.level", "read_committed");
1066        if let Some(cert_path) = &config.cert_path {
1067            kafka_config.set("security.protocol", "ssl");
1068            kafka_config.set("ssl.keystore.location", cert_path);
1069            if let Some(cert_password) = &config.cert_password {
1070                kafka_config.set("ssl.keystore.password", cert_password);
1071            }
1072        }
1073        kafka_config.set("message.max.bytes", "15728640");
1074
1075        for (key, value) in &config.kafka_opts {
1076            kafka_config.set(key, value);
1077        }
1078
1079        let admin: AdminClient<_> = kafka_config
1080            .create_with_context(MzClientContext::default())
1081            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1082
1083        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1084
1085        let producer: FutureProducer<_> = kafka_config
1086            .create_with_context(MzClientContext::default())
1087            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1088
1089        let topics = BTreeMap::new();
1090
1091        (
1092            config.kafka_addr.to_owned(),
1093            admin,
1094            admin_opts,
1095            producer,
1096            topics,
1097            kafka_config,
1098        )
1099    };
1100
1101    let mut state = State {
1102        config: config.clone(),
1103
1104        // === Testdrive state. ===
1105        arg_vars: config.arg_vars.clone(),
1106        cmd_vars: BTreeMap::new(),
1107        seed,
1108        temp_path,
1109        _tempfile,
1110        default_timeout: config.default_timeout,
1111        timeout: config.default_timeout,
1112        max_tries: config.default_max_tries,
1113        initial_backoff: config.initial_backoff,
1114        backoff_factor: config.backoff_factor,
1115        consistency_checks: config.consistency_checks,
1116        check_statement_logging: config.check_statement_logging,
1117        consistency_checks_adhoc_skip: false,
1118        regex: None,
1119        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1120        rewrite_results: config.rewrite_results,
1121        error_line_count: 0,
1122        error_string: "".to_string(),
1123
1124        // === Materialize state. ===
1125        materialize: materialize_state,
1126
1127        // === Persist state. ===
1128        persist_consensus_url: config.persist_consensus_url.clone(),
1129        persist_blob_url: config.persist_blob_url.clone(),
1130        build_info: config.build_info,
1131        persist_clients: PersistClientCache::new(
1132            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1133            &MetricsRegistry::new(),
1134            |_, _| PubSubClientConnection::noop(),
1135        ),
1136
1137        // === Confluent state. ===
1138        schema_registry_url,
1139        ccsr_client,
1140        kafka_addr,
1141        kafka_admin,
1142        kafka_admin_opts,
1143        kafka_config,
1144        kafka_default_partitions: config.kafka_default_partitions,
1145        kafka_producer,
1146        kafka_topics,
1147
1148        // === AWS state. ===
1149        aws_account: config.aws_account.clone(),
1150        aws_config: config.aws_config.clone(),
1151
1152        // === Database driver state. ===
1153        duckdb_clients: BTreeMap::new(),
1154        mysql_clients: BTreeMap::new(),
1155        postgres_clients: BTreeMap::new(),
1156        sql_server_clients: BTreeMap::new(),
1157
1158        // === Fivetran state. ===
1159        fivetran_destination_url: config.fivetran_destination_url.clone(),
1160        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1161
1162        rewrites: Vec::new(),
1163        rewrite_pos_start: 0,
1164        rewrite_pos_end: 0,
1165    };
1166    state.initialize_cmd_vars().await?;
1167    Ok((state, pgconn_task))
1168}
1169
1170async fn create_materialize_state(
1171    config: &&Config,
1172    materialize_catalog_config: Option<CatalogConfig>,
1173    pgclient: tokio_postgres::Client,
1174) -> Result<MaterializeState, anyhow::Error> {
1175    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1176    let materialize_internal_url =
1177        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1178
1179    for (key, value) in &config.materialize_params {
1180        pgclient
1181            .batch_execute(&format!("SET {key} = {value}"))
1182            .await
1183            .context("setting session parameter")?;
1184    }
1185
1186    let materialize_user = config
1187        .materialize_pgconfig
1188        .get_user()
1189        .expect("testdrive URL must contain user")
1190        .to_string();
1191
1192    let materialize_sql_addr = format!(
1193        "{}:{}",
1194        materialize_url.host_str().unwrap(),
1195        materialize_url.port().unwrap()
1196    );
1197    let materialize_http_addr = format!(
1198        "{}:{}",
1199        materialize_url.host_str().unwrap(),
1200        config.materialize_http_port
1201    );
1202    let materialize_internal_sql_addr = format!(
1203        "{}:{}",
1204        materialize_internal_url.host_str().unwrap(),
1205        materialize_internal_url.port().unwrap()
1206    );
1207    let materialize_password_sql_addr = format!(
1208        "{}:{}",
1209        materialize_url.host_str().unwrap(),
1210        config.materialize_password_sql_port
1211    );
1212    let materialize_sasl_sql_addr = format!(
1213        "{}:{}",
1214        materialize_url.host_str().unwrap(),
1215        config.materialize_sasl_sql_port
1216    );
1217    let materialize_internal_http_addr = format!(
1218        "{}:{}",
1219        materialize_internal_url.host_str().unwrap(),
1220        config.materialize_internal_http_port
1221    );
1222    let environment_id = pgclient
1223        .query_one("SELECT mz_environment_id()", &[])
1224        .await?
1225        .get::<_, String>(0)
1226        .parse()
1227        .context("parsing environment ID")?;
1228
1229    let bootstrap_args = BootstrapArgs {
1230        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1231        default_cluster_replica_size: "ABC".to_string(),
1232        default_cluster_replication_factor: 1,
1233        bootstrap_role: None,
1234    };
1235
1236    let materialize_state = MaterializeState {
1237        catalog_config: materialize_catalog_config,
1238        sql_addr: materialize_sql_addr,
1239        use_https: config.materialize_use_https,
1240        http_addr: materialize_http_addr,
1241        internal_sql_addr: materialize_internal_sql_addr,
1242        internal_http_addr: materialize_internal_http_addr,
1243        password_sql_addr: materialize_password_sql_addr,
1244        sasl_sql_addr: materialize_sasl_sql_addr,
1245        user: materialize_user,
1246        pgclient,
1247        environment_id,
1248        bootstrap_args,
1249    };
1250
1251    Ok(materialize_state)
1252}