Skip to main content

mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_ccsr::SubjectVersion;
30use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
31use mz_ore::error::ErrorExt;
32use mz_ore::metrics::MetricsRegistry;
33use mz_ore::now::SYSTEM_TIME;
34use mz_ore::retry::Retry;
35use mz_ore::task;
36use mz_ore::url::SensitiveUrl;
37use mz_persist_client::cache::PersistClientCache;
38use mz_persist_client::cfg::PersistConfig;
39use mz_persist_client::rpc::PubSubClientConnection;
40use mz_persist_client::{PersistClient, PersistLocation};
41use mz_sql::catalog::EnvironmentId;
42use mz_tls_util::make_tls;
43use rdkafka::ClientConfig;
44use rdkafka::producer::Producer;
45use regex::{Captures, Regex};
46use semver::Version;
47use tokio_postgres::error::{DbError, SqlState};
48use tracing::info;
49use url::Url;
50
51use crate::error::PosError;
52use crate::parser::{
53    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
54};
55use crate::util;
56use crate::util::postgres::postgres_client;
57
58pub mod consistency;
59
60mod duckdb;
61mod file;
62mod fivetran;
63mod http;
64mod kafka;
65mod mysql;
66mod nop;
67mod persist;
68mod postgres;
69mod protobuf;
70mod psql;
71mod s3;
72mod schema_registry;
73mod set;
74mod skip_end;
75mod skip_if;
76mod sleep;
77mod sql;
78mod sql_server;
79mod version_check;
80mod webhook;
81
82/// User-settable configuration parameters.
83#[derive(Debug, Clone)]
84pub struct Config {
85    // === Testdrive options. ===
86    /// Variables to make available to the testdrive script.
87    ///
88    /// The value of each entry will be made available to the script in a
89    /// variable named `arg.KEY`.
90    pub arg_vars: BTreeMap<String, String>,
91    /// A random number to distinguish each run of a testdrive script.
92    pub seed: Option<String>,
93    /// Whether to reset Materialize state before executing each script and
94    /// to clean up AWS state after each script.
95    pub reset: bool,
96    /// Force the use of the specified temporary directory to use.
97    ///
98    /// If unspecified, testdrive creates a temporary directory with a random
99    /// name.
100    pub temp_dir: Option<String>,
101    /// Source string to print out on errors.
102    pub source: Option<String>,
103    /// The default timeout for cancellable operations.
104    pub default_timeout: Duration,
105    /// The default number of tries for retriable operations.
106    pub default_max_tries: usize,
107    /// The initial backoff interval for retry operations.
108    ///
109    /// Set to 0 to retry immediately on failure.
110    pub initial_backoff: Duration,
111    /// Backoff factor to use for retry operations.
112    ///
113    /// Set to 1 to retry at a steady pace.
114    pub backoff_factor: f64,
115    /// Should we skip coordinator and catalog consistency checks.
116    pub consistency_checks: consistency::Level,
117    /// Whether to run statement logging consistency checks (adds a few seconds at the end of every
118    /// test file).
119    pub check_statement_logging: bool,
120    /// Whether to automatically rewrite wrong results instead of failing.
121    pub rewrite_results: bool,
122
123    // === Materialize options. ===
124    /// The pgwire connection parameters for the Materialize instance that
125    /// testdrive will connect to.
126    pub materialize_pgconfig: tokio_postgres::Config,
127    /// The internal pgwire connection parameters for the Materialize instance that
128    /// testdrive will connect to.
129    pub materialize_internal_pgconfig: tokio_postgres::Config,
130    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
131    pub materialize_use_https: bool,
132    /// The port for the public endpoints of the materialize instance that
133    /// testdrive will connect to via HTTP.
134    pub materialize_http_port: u16,
135    /// The port for the internal endpoints of the materialize instance that
136    /// testdrive will connect to via HTTP.
137    pub materialize_internal_http_port: u16,
138    /// The port for the password endpoints of the materialize instance that
139    /// testdrive will connect to via SQL.
140    pub materialize_password_sql_port: u16,
141    /// The port for the SASL endpoints of the materialize instance that
142    /// testdrive will connect to via SQL.
143    pub materialize_sasl_sql_port: u16,
144    /// Session parameters to set after connecting to materialize.
145    pub materialize_params: Vec<(String, String)>,
146    /// An optional catalog configuration.
147    pub materialize_catalog_config: Option<CatalogConfig>,
148    /// Build information
149    pub build_info: &'static BuildInfo,
150    /// Configured cluster replica sizes
151    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
152
153    // === Persist options. ===
154    /// Handle to the persist consensus system.
155    pub persist_consensus_url: Option<SensitiveUrl>,
156    /// Handle to the persist blob storage.
157    pub persist_blob_url: Option<SensitiveUrl>,
158
159    // === Confluent options. ===
160    /// The address of the Kafka broker that testdrive will interact with.
161    pub kafka_addr: String,
162    /// Default number of partitions to use for topics
163    pub kafka_default_partitions: usize,
164    /// Arbitrary rdkafka options for testdrive to use when connecting to the
165    /// Kafka broker.
166    pub kafka_opts: Vec<(String, String)>,
167    /// The URL of the schema registry that testdrive will connect to.
168    pub schema_registry_url: Url,
169    /// An optional path to a TLS certificate that testdrive will present when
170    /// performing client authentication.
171    ///
172    /// The keystore must be in the PKCS#12 format.
173    pub cert_path: Option<String>,
174    /// An optional password for the TLS certificate.
175    pub cert_password: Option<String>,
176    /// An optional username for basic authentication with the Confluent Schema
177    /// Registry.
178    pub ccsr_username: Option<String>,
179    /// An optional password for basic authentication with the Confluent Schema
180    /// Registry.
181    pub ccsr_password: Option<String>,
182
183    // === AWS options. ===
184    /// The configuration to use when connecting to AWS.
185    pub aws_config: SdkConfig,
186    /// The ID of the AWS account that `aws_config` configures.
187    pub aws_account: String,
188
189    // === Fivetran options. ===
190    /// Address of the Fivetran Destination that is currently running.
191    pub fivetran_destination_url: String,
192    /// Directory that is accessible to the Fivetran Destination.
193    pub fivetran_destination_files_path: String,
194}
195
196pub struct MaterializeState {
197    catalog_config: Option<CatalogConfig>,
198
199    sql_addr: String,
200    use_https: bool,
201    http_addr: String,
202    internal_sql_addr: String,
203    internal_http_addr: String,
204    password_sql_addr: String,
205    sasl_sql_addr: String,
206    user: String,
207    pgclient: tokio_postgres::Client,
208    environment_id: EnvironmentId,
209    bootstrap_args: BootstrapArgs,
210}
211
212pub struct State {
213    // The Config that this `State` was originally created from.
214    pub config: Config,
215
216    // === Testdrive state. ===
217    arg_vars: BTreeMap<String, String>,
218    cmd_vars: BTreeMap<String, String>,
219    seed: String,
220    temp_path: PathBuf,
221    _tempfile: Option<tempfile::TempDir>,
222    default_timeout: Duration,
223    timeout: Duration,
224    max_tries: usize,
225    initial_backoff: Duration,
226    backoff_factor: f64,
227    consistency_checks: consistency::Level,
228    check_statement_logging: bool,
229    consistency_checks_adhoc_skip: bool,
230    regex: Option<Regex>,
231    regex_replacement: String,
232    error_line_count: usize,
233    error_string: String,
234
235    // === Materialize state. ===
236    materialize: MaterializeState,
237
238    // === Persist state. ===
239    persist_consensus_url: Option<SensitiveUrl>,
240    persist_blob_url: Option<SensitiveUrl>,
241    build_info: &'static BuildInfo,
242    persist_clients: PersistClientCache,
243
244    // === Confluent state. ===
245    schema_registry_url: Url,
246    ccsr_client: mz_ccsr::Client,
247    kafka_addr: String,
248    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
249    kafka_admin_opts: rdkafka::admin::AdminOptions,
250    kafka_config: ClientConfig,
251    kafka_default_partitions: usize,
252    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
253    kafka_topics: BTreeMap<String, usize>,
254
255    // === AWS state. ===
256    aws_account: String,
257    aws_config: SdkConfig,
258
259    // === Database driver state. ===
260    pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
261    mysql_clients: BTreeMap<String, mysql_async::Conn>,
262    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
263    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
264
265    // === Fivetran state. ===
266    fivetran_destination_url: String,
267    fivetran_destination_files_path: String,
268
269    // === Rewrite state. ===
270    rewrite_results: bool,
271    /// Current file, results are replaced inline
272    pub rewrites: Vec<Rewrite>,
273    /// Start position of currently expected result
274    pub rewrite_pos_start: usize,
275    /// End position of currently expected result
276    pub rewrite_pos_end: usize,
277}
278
279pub struct Rewrite {
280    pub content: String,
281    pub start: usize,
282    pub end: usize,
283}
284
285impl State {
286    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
287        self.cmd_vars
288            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
289        self.cmd_vars.insert(
290            "testdrive.kafka-addr-resolved".into(),
291            self.kafka_addr
292                .to_socket_addrs()
293                .ok()
294                .and_then(|mut addrs| addrs.next())
295                .map(|addr| addr.to_string())
296                .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
297        );
298        self.cmd_vars.insert(
299            "testdrive.schema-registry-url".into(),
300            self.schema_registry_url.to_string(),
301        );
302        self.cmd_vars
303            .insert("testdrive.seed".into(), self.seed.clone());
304        self.cmd_vars.insert(
305            "testdrive.temp-dir".into(),
306            self.temp_path.display().to_string(),
307        );
308        self.cmd_vars
309            .insert("testdrive.aws-region".into(), self.aws_region().into());
310        self.cmd_vars
311            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
312        self.cmd_vars
313            .insert("testdrive.aws-account".into(), self.aws_account.clone());
314        {
315            let aws_credentials = self
316                .aws_config
317                .credentials_provider()
318                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
319                .provide_credentials()
320                .await
321                .context("fetching AWS credentials")?;
322            self.cmd_vars.insert(
323                "testdrive.aws-access-key-id".into(),
324                aws_credentials.access_key_id().to_owned(),
325            );
326            self.cmd_vars.insert(
327                "testdrive.aws-secret-access-key".into(),
328                aws_credentials.secret_access_key().to_owned(),
329            );
330            self.cmd_vars.insert(
331                "testdrive.aws-token".into(),
332                aws_credentials
333                    .session_token()
334                    .map(|token| token.to_owned())
335                    .unwrap_or_else(String::new),
336            );
337        }
338        self.cmd_vars.insert(
339            "testdrive.materialize-environment-id".into(),
340            self.materialize.environment_id.to_string(),
341        );
342        self.cmd_vars.insert(
343            "testdrive.materialize-sql-addr".into(),
344            self.materialize.sql_addr.clone(),
345        );
346        self.cmd_vars.insert(
347            "testdrive.materialize-internal-sql-addr".into(),
348            self.materialize.internal_sql_addr.clone(),
349        );
350        self.cmd_vars.insert(
351            "testdrive.materialize-password-sql-addr".into(),
352            self.materialize.password_sql_addr.clone(),
353        );
354        self.cmd_vars.insert(
355            "testdrive.materialize-sasl-sql-addr".into(),
356            self.materialize.sasl_sql_addr.clone(),
357        );
358        self.cmd_vars.insert(
359            "testdrive.materialize-user".into(),
360            self.materialize.user.clone(),
361        );
362        self.cmd_vars.insert(
363            "testdrive.fivetran-destination-url".into(),
364            self.fivetran_destination_url.clone(),
365        );
366        self.cmd_vars.insert(
367            "testdrive.fivetran-destination-files-path".into(),
368            self.fivetran_destination_files_path.clone(),
369        );
370
371        for (key, value) in env::vars() {
372            self.cmd_vars.insert(format!("env.{}", key), value);
373        }
374
375        for (key, value) in &self.arg_vars {
376            validate_ident(key)?;
377            self.cmd_vars
378                .insert(format!("arg.{}", key), value.to_string());
379        }
380
381        Ok(())
382    }
383    /// Makes of copy of the durable catalog and runs a function on its
384    /// state. Returns `None` if there's no catalog information in the State.
385    pub async fn with_catalog_copy<F, T>(
386        &self,
387        system_parameter_defaults: BTreeMap<String, String>,
388        build_info: &'static BuildInfo,
389        bootstrap_args: &BootstrapArgs,
390        enable_expression_cache_override: Option<bool>,
391        f: F,
392    ) -> Result<Option<T>, anyhow::Error>
393    where
394        F: FnOnce(ConnCatalog) -> T,
395    {
396        async fn persist_client(
397            persist_consensus_url: SensitiveUrl,
398            persist_blob_url: SensitiveUrl,
399            persist_clients: &PersistClientCache,
400        ) -> Result<PersistClient, anyhow::Error> {
401            let persist_location = PersistLocation {
402                blob_uri: persist_blob_url,
403                consensus_uri: persist_consensus_url,
404            };
405            Ok(persist_clients.open(persist_location).await?)
406        }
407
408        if let Some(CatalogConfig {
409            persist_consensus_url,
410            persist_blob_url,
411        }) = &self.materialize.catalog_config
412        {
413            let persist_client = persist_client(
414                persist_consensus_url.clone(),
415                persist_blob_url.clone(),
416                &self.persist_clients,
417            )
418            .await?;
419            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
420                persist_client,
421                SYSTEM_TIME.clone(),
422                self.materialize.environment_id.clone(),
423                system_parameter_defaults,
424                build_info,
425                bootstrap_args,
426                enable_expression_cache_override,
427            )
428            .await?;
429            let res = f(catalog.for_session(&Session::dummy()));
430            catalog.expire().await;
431            Ok(Some(res))
432        } else {
433            Ok(None)
434        }
435    }
436
437    pub fn aws_endpoint(&self) -> &str {
438        self.aws_config.endpoint_url().unwrap_or("")
439    }
440
441    pub fn aws_region(&self) -> &str {
442        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
443    }
444
445    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
446    /// the consistency checks should be skipped for this current run.
447    pub fn clear_skip_consistency_checks(&mut self) -> bool {
448        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
449    }
450
451    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
452        let (inner_client, _) = postgres_client(
453            &format!(
454                "postgres://mz_system:materialize@{}",
455                self.materialize.internal_sql_addr
456            ),
457            self.default_timeout,
458        )
459        .await?;
460
461        let version = inner_client
462            .query_one("SELECT mz_version_num()", &[])
463            .await
464            .context("getting version of materialize")
465            .map(|row| row.get::<_, i32>(0))?;
466
467        let semver = inner_client
468            .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
469            .await
470            .context("getting semver of materialize")
471            .map(|row| row.get::<_, String>(0))?
472            .parse::<semver::Version>()
473            .context("parsing semver of materialize")?;
474
475        inner_client
476            .batch_execute("ALTER SYSTEM RESET ALL")
477            .await
478            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
479
480        // Dangerous functions are useful for tests so we enable it for all tests.
481        {
482            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
483            let enable_unsafe_functions = if semver >= rename_version {
484                "unsafe_enable_unsafe_functions"
485            } else {
486                "enable_unsafe_functions"
487            };
488            let res = inner_client
489                .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
490                .await
491                .context("enabling dangerous functions");
492            if let Err(e) = res {
493                match e.root_cause().downcast_ref::<DbError>() {
494                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
495                        info!(
496                            "can't enable unsafe functions because the server is safe mode; \
497                             testdrive scripts will fail if they use unsafe functions",
498                        );
499                    }
500                    _ => return Err(e),
501                }
502            }
503        }
504
505        for row in inner_client
506            .query("SHOW DATABASES", &[])
507            .await
508            .context("resetting materialize state: SHOW DATABASES")?
509        {
510            let db_name: String = row.get(0);
511            if db_name.starts_with("testdrive_no_reset_") {
512                continue;
513            }
514            let query = format!(
515                "DROP DATABASE {}",
516                postgres_protocol::escape::escape_identifier(&db_name)
517            );
518            sql::print_query(&query, None);
519            inner_client.batch_execute(&query).await.context(format!(
520                "resetting materialize state: DROP DATABASE {}",
521                db_name,
522            ))?;
523        }
524
525        // Get all user clusters not running any objects owned by users
526        let inactive_user_clusters = "
527        WITH
528            active_user_clusters AS
529            (
530                SELECT DISTINCT cluster_id, object_id
531                FROM
532                    (
533                        SELECT cluster_id, id FROM mz_catalog.mz_sources
534                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
535                        UNION ALL
536                            SELECT cluster_id, id
537                            FROM mz_catalog.mz_materialized_views
538                        UNION ALL
539                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
540                        UNION ALL
541                            SELECT cluster_id, id
542                            FROM mz_internal.mz_subscriptions
543                    )
544                    AS t (cluster_id, object_id)
545                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
546            )
547        SELECT name
548        FROM mz_catalog.mz_clusters
549        WHERE
550            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
551                AND
552            owner_id LIKE 'u%';";
553
554        let inactive_clusters = inner_client
555            .query(inactive_user_clusters, &[])
556            .await
557            .context("resetting materialize state: inactive_user_clusters")?;
558
559        if !inactive_clusters.is_empty() {
560            println!("cleaning up user clusters from previous tests...")
561        }
562
563        for cluster_name in inactive_clusters {
564            let cluster_name: String = cluster_name.get(0);
565            if cluster_name.starts_with("testdrive_no_reset_") {
566                continue;
567            }
568            let query = format!(
569                "DROP CLUSTER {}",
570                postgres_protocol::escape::escape_identifier(&cluster_name)
571            );
572            sql::print_query(&query, None);
573            inner_client.batch_execute(&query).await.context(format!(
574                "resetting materialize state: DROP CLUSTER {}",
575                cluster_name,
576            ))?;
577        }
578
579        inner_client
580            .batch_execute("CREATE DATABASE materialize")
581            .await
582            .context("resetting materialize state: CREATE DATABASE materialize")?;
583
584        // Attempt to remove all users but the current user. Old versions of
585        // Materialize did not support roles, so this degrades gracefully if
586        // mz_roles does not exist.
587        if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
588            for row in rows {
589                let role_name: String = row.get(0);
590                if role_name == self.materialize.user || role_name.starts_with("mz_") {
591                    continue;
592                }
593                let query = format!(
594                    "DROP ROLE {}",
595                    postgres_protocol::escape::escape_identifier(&role_name)
596                );
597                sql::print_query(&query, None);
598                inner_client.batch_execute(&query).await.context(format!(
599                    "resetting materialize state: DROP ROLE {}",
600                    role_name,
601                ))?;
602            }
603        }
604
605        // Alter materialize user with all system privileges.
606        inner_client
607            .batch_execute(&format!(
608                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
609                self.materialize.user
610            ))
611            .await?;
612
613        // Grant initial privileges.
614        inner_client
615            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
616            .await?;
617        inner_client
618            .batch_execute(&format!(
619                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
620                self.materialize.user
621            ))
622            .await?;
623        inner_client
624            .batch_execute(&format!(
625                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
626                self.materialize.user
627            ))
628            .await?;
629
630        let cluster = match version {
631            ..=8199 => "default",
632            8200.. => "quickstart",
633        };
634        inner_client
635            .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
636            .await?;
637        inner_client
638            .batch_execute(&format!(
639                "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
640                self.materialize.user
641            ))
642            .await?;
643
644        Ok(())
645    }
646
647    /// Delete Kafka topics + CCSR subjects that were created in this run
648    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
649        use rdkafka::types::RDKafkaErrorCode;
650        let mut errors: Vec<anyhow::Error> = Vec::new();
651
652        let metadata = self.kafka_producer.client().fetch_metadata(
653            None,
654            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
655        )?;
656
657        let testdrive_topics: Vec<_> = metadata
658            .topics()
659            .iter()
660            .filter_map(|t| {
661                if t.name().starts_with("testdrive-") {
662                    Some(t.name())
663                } else {
664                    None
665                }
666            })
667            .collect();
668
669        if !testdrive_topics.is_empty() {
670            match self
671                .kafka_admin
672                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
673                .await
674            {
675                Ok(res) => {
676                    if res.len() != testdrive_topics.len() {
677                        errors.push(anyhow!(
678                            "kafka topic deletion returned {} results, but exactly {} expected",
679                            res.len(),
680                            testdrive_topics.len()
681                        ));
682                    }
683                    for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
684                        match res {
685                            Ok(_) | Err((_, RDKafkaErrorCode::UnknownTopicOrPartition)) => (),
686                            Err((_, err)) => {
687                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
688                            }
689                        }
690                    }
691                }
692                Err(e) => {
693                    errors.push(e.into());
694                }
695            };
696        }
697
698        let schema_registry_errors = self.reset_schema_registry().await;
699
700        errors.extend(schema_registry_errors);
701        if errors.is_empty() {
702            Ok(())
703        } else {
704            bail!(
705                "deleting Kafka topics: {} errors: {}",
706                errors.len(),
707                errors
708                    .into_iter()
709                    .map(|e| e.to_string_with_causes())
710                    .join("\n")
711            );
712        }
713    }
714
715    #[allow(clippy::disallowed_types)]
716    async fn reset_schema_registry(&self) -> Vec<anyhow::Error> {
717        use std::collections::HashMap;
718
719        let mut errors = Vec::new();
720        match self
721            .ccsr_client
722            .list_subjects()
723            .await
724            .context("listing schema registry subjects")
725        {
726            Ok(subjects) => {
727                let testdrive_subjects: Vec<_> = subjects
728                    .into_iter()
729                    .filter(|s| s.starts_with("testdrive-"))
730                    .collect();
731
732                // Build the dependency graphs: subject -> list of subjects it references
733                let mut graphs: HashMap<SubjectVersion, Vec<SubjectVersion>> = HashMap::new();
734
735                for subject in &testdrive_subjects {
736                    match self.ccsr_client.get_subject_with_references(subject).await {
737                        Ok((subj, refs)) => {
738                            // Filter to only testdrive subjects
739                            let refs: Vec<_> = refs
740                                .into_iter()
741                                .filter(|r| r.subject.starts_with("testdrive-"))
742                                .collect();
743                            graphs.insert(
744                                SubjectVersion {
745                                    subject: subj.name,
746                                    version: subj.version,
747                                },
748                                refs,
749                            );
750                        }
751                        Err(mz_ccsr::GetBySubjectError::SubjectNotFound) => {
752                            // Subject was already deleted, skip it
753                        }
754                        Err(e) => {
755                            errors.push(anyhow::anyhow!(
756                                "failed to get references for subject {}: {}",
757                                subject,
758                                e
759                            ));
760                        }
761                    }
762                }
763
764                // Get topological ordering (0 = root/no dependencies, higher = more dependents)
765                // We need to delete in reverse order (highest first = subjects that depend on others)
766                let subjects_to_delete: Vec<_> = match mz_ccsr::topological_sort(&graphs) {
767                    Ok(ordered) => {
768                        let mut subjects: Vec<_> = ordered.into_iter().collect();
769                        subjects.sort_by(|a, b| a.1.cmp(&b.1));
770                        subjects.into_iter().map(|(s, _)| s.clone()).collect()
771                    }
772                    Err(_) => {
773                        tracing::info!("Cycle detected, attempting to delete anyway");
774                        // Cycle detected or other error - fall back to deleting in any order
775                        graphs.into_keys().collect()
776                    }
777                };
778
779                for subject in subjects_to_delete {
780                    match self.ccsr_client.delete_subject(&subject.subject).await {
781                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
782                        Err(e) => errors.push(e.into()),
783                    }
784                }
785            }
786            Err(e) => {
787                errors.push(e);
788            }
789        }
790        errors
791    }
792}
793
794/// Configuration for the Catalog.
795#[derive(Debug, Clone)]
796pub struct CatalogConfig {
797    /// Handle to the persist consensus system.
798    pub persist_consensus_url: SensitiveUrl,
799    /// Handle to the persist blob storage.
800    pub persist_blob_url: SensitiveUrl,
801}
802
803pub enum ControlFlow {
804    Continue,
805    SkipBegin,
806    SkipEnd,
807}
808
809#[async_trait]
810pub(crate) trait Run {
811    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
812}
813
814#[async_trait]
815impl Run for PosCommand {
816    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
817        macro_rules! handle_version {
818            ($version_constraint:expr) => {
819                match $version_constraint {
820                    Some(VersionConstraint { min, max }) => {
821                        match version_check::run_version_check(min, max, state).await {
822                            Ok(true) => return Ok(ControlFlow::Continue),
823                            Ok(false) => {}
824                            Err(err) => return Err(PosError::new(err, self.pos)),
825                        }
826                    }
827                    None => {}
828                }
829            };
830        }
831
832        let wrap_err = |e| PosError::new(e, self.pos);
833        // Substitute variables at startup except for the command-specific ones
834        // Those will be substituted at runtime
835        let ignore_prefix = match &self.command {
836            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
837            _ => None,
838        };
839        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
840            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
841        };
842        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
843            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
844        };
845
846        let r = match self.command {
847            Command::Builtin(mut builtin, version_constraint) => {
848                handle_version!(version_constraint);
849                for val in builtin.args.values_mut() {
850                    *val = subst(val, &state.cmd_vars)?;
851                }
852                for line in &mut builtin.input {
853                    *line = subst(line, &state.cmd_vars)?;
854                }
855                match builtin.name.as_ref() {
856                    "check-consistency" => consistency::run_consistency_checks(state).await,
857                    "skip-consistency-checks" => {
858                        consistency::skip_consistency_checks(builtin, state)
859                    }
860                    "check-shard-tombstone" => {
861                        consistency::run_check_shard_tombstone(builtin, state).await
862                    }
863                    "duckdb-execute" => duckdb::run_execute(builtin, state).await,
864                    "duckdb-query" => duckdb::run_query(builtin, state).await,
865                    "fivetran-destination" => {
866                        fivetran::run_destination_command(builtin, state).await
867                    }
868                    "file-append" => file::run_append(builtin, state).await,
869                    "file-delete" => file::run_delete(builtin, state).await,
870                    "http-request" => http::run_request(builtin, state).await,
871                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
872                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
873                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
874                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
875                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
876                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
877                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
878                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
879                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
880                    "mysql-connect" => mysql::run_connect(builtin, state).await,
881                    "mysql-execute" => mysql::run_execute(builtin, state).await,
882                    "nop" => nop::run_nop(),
883                    "postgres-connect" => postgres::run_connect(builtin, state).await,
884                    "postgres-execute" => postgres::run_execute(builtin, state).await,
885                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
886                    "protobuf-compile-descriptors" => {
887                        protobuf::run_compile_descriptors(builtin, state).await
888                    }
889                    "psql-execute" => psql::run_execute(builtin, state).await,
890                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
891                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
892                    "s3-file-upload" => s3::run_upload(builtin, state).await,
893                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
894                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
895                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
896                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
897                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
898                    "skip-end" => skip_end::run_skip_end(),
899                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
900                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
901                    "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
902                    "persist-force-compaction" => {
903                        persist::run_force_compaction(builtin, state).await
904                    }
905                    "random-sleep" => sleep::run_random_sleep(builtin),
906                    "set-regex" => set::run_regex_set(builtin, state),
907                    "unset-regex" => set::run_regex_unset(builtin, state),
908                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
909                    "set-max-tries" => set::run_max_tries(builtin, state),
910                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
911                        sleep::run_sleep(builtin)
912                    }
913                    "set" => set::set_vars(builtin, state),
914                    "set-arg-default" => set::run_set_arg_default(builtin, state),
915                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
916                    "set-from-file" => set::run_set_from_file(builtin, state).await,
917                    "webhook-append" => webhook::run_append(builtin, state).await,
918                    _ => {
919                        return Err(PosError::new(
920                            anyhow!("unknown built-in command {}", builtin.name),
921                            self.pos,
922                        ));
923                    }
924                }
925            }
926            Command::Sql(mut sql, version_constraint) => {
927                handle_version!(version_constraint);
928                sql.query = subst(&sql.query, &state.cmd_vars)?;
929                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
930                    for row in expected_rows {
931                        for col in row {
932                            *col = subst(col, &state.cmd_vars)?;
933                        }
934                    }
935                }
936                sql::run_sql(sql, state).await
937            }
938            Command::FailSql(mut sql, version_constraint) => {
939                handle_version!(version_constraint);
940                sql.query = subst(&sql.query, &state.cmd_vars)?;
941                sql.expected_error = match &sql.expected_error {
942                    SqlExpectedError::Contains(s) => {
943                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
944                    }
945                    SqlExpectedError::Exact(s) => {
946                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
947                    }
948                    SqlExpectedError::Regex(s) => {
949                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
950                    }
951                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
952                };
953                sql::run_fail_sql(sql, state).await
954            }
955        };
956
957        r.map_err(wrap_err)
958    }
959}
960
961/// Substituted `${}`-delimited variables from `vars` into `msg`
962fn substitute_vars(
963    msg: &str,
964    vars: &BTreeMap<String, String>,
965    ignore_prefix: &Option<String>,
966    regex_escape: bool,
967) -> Result<String, anyhow::Error> {
968    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
969    let mut err = None;
970    let out = RE.replace_all(msg, |caps: &Captures| {
971        let name = &caps[1];
972        if let Some(ignore_prefix) = &ignore_prefix {
973            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
974                // Do not substitute, leave original variable name in place
975                return caps.get(0).unwrap().as_str().to_string();
976            }
977        }
978
979        if let Some(val) = vars.get(name) {
980            if regex_escape {
981                regex::escape(val)
982            } else {
983                val.to_string()
984            }
985        } else {
986            err = Some(anyhow!("unknown variable: {}", name));
987            "#VAR-MISSING#".to_string()
988        }
989    });
990    match err {
991        Some(err) => Err(err),
992        None => Ok(out.into_owned()),
993    }
994}
995
996/// Initializes a [`State`] object by connecting to the various external
997/// services specified in `config`.
998///
999/// Returns the initialized `State` and a cleanup future. The cleanup future
1000/// should be `await`ed only *after* dropping the `State` to check whether any
1001/// errors occured while dropping the `State`. This awkward API is a workaround
1002/// for the lack of `AsyncDrop` support in Rust.
1003pub async fn create_state(
1004    config: &Config,
1005) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
1006    let seed = config
1007        .seed
1008        .clone()
1009        .unwrap_or_else(|| format!("{:010}", rand::random::<u32>()));
1010
1011    let (_tempfile, temp_path) = match &config.temp_dir {
1012        Some(temp_dir) => {
1013            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
1014            (None, PathBuf::from(&temp_dir))
1015        }
1016        _ => {
1017            // Stash the tempfile object so that it does not go out of scope and delete
1018            // the tempdir prematurely
1019            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
1020            let temp_path = tempfile_handle.path().to_path_buf();
1021            (Some(tempfile_handle), temp_path)
1022        }
1023    };
1024
1025    let materialize_catalog_config = config.materialize_catalog_config.clone();
1026
1027    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1028    info!("Connecting to {}", materialize_url.as_str());
1029    let (pgclient, pgconn) = Retry::default()
1030        .max_duration(config.default_timeout)
1031        .retry_async_canceling(|_| async move {
1032            let mut pgconfig = config.materialize_pgconfig.clone();
1033            pgconfig.connect_timeout(config.default_timeout);
1034            let tls = make_tls(&pgconfig)?;
1035            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
1036        })
1037        .await?;
1038
1039    let pgconn_task =
1040        task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
1041
1042    let materialize_state =
1043        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
1044
1045    let schema_registry_url = config.schema_registry_url.to_owned();
1046
1047    let ccsr_client = {
1048        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
1049
1050        if let Some(cert_path) = &config.cert_path {
1051            let cert = fs::read(cert_path).context("reading cert")?;
1052            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
1053            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
1054                .context("reading keystore file as pkcs12")?;
1055            ccsr_config = ccsr_config.identity(ident);
1056        }
1057
1058        if let Some(ccsr_username) = &config.ccsr_username {
1059            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1060        }
1061
1062        ccsr_config.build().context("Creating CCSR client")?
1063    };
1064
1065    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1066        use rdkafka::admin::{AdminClient, AdminOptions};
1067        use rdkafka::producer::FutureProducer;
1068
1069        let mut kafka_config = create_new_client_config_simple();
1070        kafka_config.set("bootstrap.servers", &config.kafka_addr);
1071        kafka_config.set("group.id", "materialize-testdrive");
1072        kafka_config.set("auto.offset.reset", "earliest");
1073        kafka_config.set("isolation.level", "read_committed");
1074        if let Some(cert_path) = &config.cert_path {
1075            kafka_config.set("security.protocol", "ssl");
1076            kafka_config.set("ssl.keystore.location", cert_path);
1077            if let Some(cert_password) = &config.cert_password {
1078                kafka_config.set("ssl.keystore.password", cert_password);
1079            }
1080        }
1081        kafka_config.set("message.max.bytes", "15728640");
1082
1083        for (key, value) in &config.kafka_opts {
1084            kafka_config.set(key, value);
1085        }
1086
1087        let admin: AdminClient<_> = kafka_config
1088            .create_with_context(MzClientContext::default())
1089            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1090
1091        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1092
1093        let producer: FutureProducer<_> = kafka_config
1094            .create_with_context(MzClientContext::default())
1095            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1096
1097        let topics = BTreeMap::new();
1098
1099        (
1100            config.kafka_addr.to_owned(),
1101            admin,
1102            admin_opts,
1103            producer,
1104            topics,
1105            kafka_config,
1106        )
1107    };
1108
1109    let mut state = State {
1110        config: config.clone(),
1111
1112        // === Testdrive state. ===
1113        arg_vars: config.arg_vars.clone(),
1114        cmd_vars: BTreeMap::new(),
1115        seed,
1116        temp_path,
1117        _tempfile,
1118        default_timeout: config.default_timeout,
1119        timeout: config.default_timeout,
1120        max_tries: config.default_max_tries,
1121        initial_backoff: config.initial_backoff,
1122        backoff_factor: config.backoff_factor,
1123        consistency_checks: config.consistency_checks,
1124        check_statement_logging: config.check_statement_logging,
1125        consistency_checks_adhoc_skip: false,
1126        regex: None,
1127        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1128        rewrite_results: config.rewrite_results,
1129        error_line_count: 0,
1130        error_string: "".to_string(),
1131
1132        // === Materialize state. ===
1133        materialize: materialize_state,
1134
1135        // === Persist state. ===
1136        persist_consensus_url: config.persist_consensus_url.clone(),
1137        persist_blob_url: config.persist_blob_url.clone(),
1138        build_info: config.build_info,
1139        persist_clients: PersistClientCache::new(
1140            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1141            &MetricsRegistry::new(),
1142            |_, _| PubSubClientConnection::noop(),
1143        ),
1144
1145        // === Confluent state. ===
1146        schema_registry_url,
1147        ccsr_client,
1148        kafka_addr,
1149        kafka_admin,
1150        kafka_admin_opts,
1151        kafka_config,
1152        kafka_default_partitions: config.kafka_default_partitions,
1153        kafka_producer,
1154        kafka_topics,
1155
1156        // === AWS state. ===
1157        aws_account: config.aws_account.clone(),
1158        aws_config: config.aws_config.clone(),
1159
1160        // === Database driver state. ===
1161        duckdb_clients: BTreeMap::new(),
1162        mysql_clients: BTreeMap::new(),
1163        postgres_clients: BTreeMap::new(),
1164        sql_server_clients: BTreeMap::new(),
1165
1166        // === Fivetran state. ===
1167        fivetran_destination_url: config.fivetran_destination_url.clone(),
1168        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1169
1170        rewrites: Vec::new(),
1171        rewrite_pos_start: 0,
1172        rewrite_pos_end: 0,
1173    };
1174    state.initialize_cmd_vars().await?;
1175    Ok((state, pgconn_task))
1176}
1177
1178async fn create_materialize_state(
1179    config: &&Config,
1180    materialize_catalog_config: Option<CatalogConfig>,
1181    pgclient: tokio_postgres::Client,
1182) -> Result<MaterializeState, anyhow::Error> {
1183    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1184    let materialize_internal_url =
1185        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1186
1187    for (key, value) in &config.materialize_params {
1188        pgclient
1189            .batch_execute(&format!("SET {key} = {value}"))
1190            .await
1191            .context("setting session parameter")?;
1192    }
1193
1194    let materialize_user = config
1195        .materialize_pgconfig
1196        .get_user()
1197        .expect("testdrive URL must contain user")
1198        .to_string();
1199
1200    let materialize_sql_addr = format!(
1201        "{}:{}",
1202        materialize_url.host_str().unwrap(),
1203        materialize_url.port().unwrap()
1204    );
1205    let materialize_http_addr = format!(
1206        "{}:{}",
1207        materialize_url.host_str().unwrap(),
1208        config.materialize_http_port
1209    );
1210    let materialize_internal_sql_addr = format!(
1211        "{}:{}",
1212        materialize_internal_url.host_str().unwrap(),
1213        materialize_internal_url.port().unwrap()
1214    );
1215    let materialize_password_sql_addr = format!(
1216        "{}:{}",
1217        materialize_url.host_str().unwrap(),
1218        config.materialize_password_sql_port
1219    );
1220    let materialize_sasl_sql_addr = format!(
1221        "{}:{}",
1222        materialize_url.host_str().unwrap(),
1223        config.materialize_sasl_sql_port
1224    );
1225    let materialize_internal_http_addr = format!(
1226        "{}:{}",
1227        materialize_internal_url.host_str().unwrap(),
1228        config.materialize_internal_http_port
1229    );
1230    let environment_id = pgclient
1231        .query_one("SELECT mz_environment_id()", &[])
1232        .await?
1233        .get::<_, String>(0)
1234        .parse()
1235        .context("parsing environment ID")?;
1236
1237    let bootstrap_args = BootstrapArgs {
1238        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1239        default_cluster_replica_size: "ABC".to_string(),
1240        default_cluster_replication_factor: 1,
1241        bootstrap_role: None,
1242    };
1243
1244    let materialize_state = MaterializeState {
1245        catalog_config: materialize_catalog_config,
1246        sql_addr: materialize_sql_addr,
1247        use_https: config.materialize_use_https,
1248        http_addr: materialize_http_addr,
1249        internal_sql_addr: materialize_internal_sql_addr,
1250        internal_http_addr: materialize_internal_http_addr,
1251        password_sql_addr: materialize_password_sql_addr,
1252        sasl_sql_addr: materialize_sasl_sql_addr,
1253        user: materialize_user,
1254        pgclient,
1255        environment_id,
1256        bootstrap_args,
1257    };
1258
1259    Ok(materialize_state)
1260}