mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rdkafka::ClientConfig;
43use rdkafka::producer::Producer;
44use regex::{Captures, Regex};
45use semver::Version;
46use tokio_postgres::error::{DbError, SqlState};
47use tracing::info;
48use url::Url;
49
50use crate::error::PosError;
51use crate::parser::{
52    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
53};
54use crate::util;
55use crate::util::postgres::postgres_client;
56
57pub mod consistency;
58
59mod file;
60mod fivetran;
61mod http;
62mod kafka;
63mod mysql;
64mod nop;
65mod persist;
66mod postgres;
67mod protobuf;
68mod psql;
69mod s3;
70mod schema_registry;
71mod set;
72mod skip_end;
73mod skip_if;
74mod sleep;
75mod sql;
76mod sql_server;
77mod version_check;
78mod webhook;
79
80/// User-settable configuration parameters.
81#[derive(Debug, Clone)]
82pub struct Config {
83    // === Testdrive options. ===
84    /// Variables to make available to the testdrive script.
85    ///
86    /// The value of each entry will be made available to the script in a
87    /// variable named `arg.KEY`.
88    pub arg_vars: BTreeMap<String, String>,
89    /// A random number to distinguish each run of a testdrive script.
90    pub seed: Option<u32>,
91    /// Whether to reset Materialize state before executing each script and
92    /// to clean up AWS state after each script.
93    pub reset: bool,
94    /// Force the use of the specified temporary directory to use.
95    ///
96    /// If unspecified, testdrive creates a temporary directory with a random
97    /// name.
98    pub temp_dir: Option<String>,
99    /// Source string to print out on errors.
100    pub source: Option<String>,
101    /// The default timeout for cancellable operations.
102    pub default_timeout: Duration,
103    /// The default number of tries for retriable operations.
104    pub default_max_tries: usize,
105    /// The initial backoff interval for retry operations.
106    ///
107    /// Set to 0 to retry immediately on failure.
108    pub initial_backoff: Duration,
109    /// Backoff factor to use for retry operations.
110    ///
111    /// Set to 1 to retry at a steady pace.
112    pub backoff_factor: f64,
113    /// Should we skip coordinator and catalog consistency checks.
114    pub consistency_checks: consistency::Level,
115    /// Whether to run statement logging consistency checks (adds a few seconds at the end of every
116    /// test file).
117    pub check_statement_logging: bool,
118    /// Whether to automatically rewrite wrong results instead of failing.
119    pub rewrite_results: bool,
120
121    // === Materialize options. ===
122    /// The pgwire connection parameters for the Materialize instance that
123    /// testdrive will connect to.
124    pub materialize_pgconfig: tokio_postgres::Config,
125    /// The internal pgwire connection parameters for the Materialize instance that
126    /// testdrive will connect to.
127    pub materialize_internal_pgconfig: tokio_postgres::Config,
128    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
129    pub materialize_use_https: bool,
130    /// The port for the public endpoints of the materialize instance that
131    /// testdrive will connect to via HTTP.
132    pub materialize_http_port: u16,
133    /// The port for the internal endpoints of the materialize instance that
134    /// testdrive will connect to via HTTP.
135    pub materialize_internal_http_port: u16,
136    /// The port for the password endpoints of the materialize instance that
137    /// testdrive will connect to via SQL.
138    pub materialize_password_sql_port: u16,
139    /// The port for the SASL endpoints of the materialize instance that
140    /// testdrive will connect to via SQL.
141    pub materialize_sasl_sql_port: u16,
142    /// Session parameters to set after connecting to materialize.
143    pub materialize_params: Vec<(String, String)>,
144    /// An optional catalog configuration.
145    pub materialize_catalog_config: Option<CatalogConfig>,
146    /// Build information
147    pub build_info: &'static BuildInfo,
148    /// Configured cluster replica sizes
149    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
150
151    // === Persist options. ===
152    /// Handle to the persist consensus system.
153    pub persist_consensus_url: Option<SensitiveUrl>,
154    /// Handle to the persist blob storage.
155    pub persist_blob_url: Option<SensitiveUrl>,
156
157    // === Confluent options. ===
158    /// The address of the Kafka broker that testdrive will interact with.
159    pub kafka_addr: String,
160    /// Default number of partitions to use for topics
161    pub kafka_default_partitions: usize,
162    /// Arbitrary rdkafka options for testdrive to use when connecting to the
163    /// Kafka broker.
164    pub kafka_opts: Vec<(String, String)>,
165    /// The URL of the schema registry that testdrive will connect to.
166    pub schema_registry_url: Url,
167    /// An optional path to a TLS certificate that testdrive will present when
168    /// performing client authentication.
169    ///
170    /// The keystore must be in the PKCS#12 format.
171    pub cert_path: Option<String>,
172    /// An optional password for the TLS certificate.
173    pub cert_password: Option<String>,
174    /// An optional username for basic authentication with the Confluent Schema
175    /// Registry.
176    pub ccsr_username: Option<String>,
177    /// An optional password for basic authentication with the Confluent Schema
178    /// Registry.
179    pub ccsr_password: Option<String>,
180
181    // === AWS options. ===
182    /// The configuration to use when connecting to AWS.
183    pub aws_config: SdkConfig,
184    /// The ID of the AWS account that `aws_config` configures.
185    pub aws_account: String,
186
187    // === Fivetran options. ===
188    /// Address of the Fivetran Destination that is currently running.
189    pub fivetran_destination_url: String,
190    /// Directory that is accessible to the Fivetran Destination.
191    pub fivetran_destination_files_path: String,
192}
193
194pub struct MaterializeState {
195    catalog_config: Option<CatalogConfig>,
196
197    sql_addr: String,
198    use_https: bool,
199    http_addr: String,
200    internal_sql_addr: String,
201    internal_http_addr: String,
202    password_sql_addr: String,
203    sasl_sql_addr: String,
204    user: String,
205    pgclient: tokio_postgres::Client,
206    environment_id: EnvironmentId,
207    bootstrap_args: BootstrapArgs,
208}
209
210pub struct State {
211    // The Config that this `State` was originally created from.
212    pub config: Config,
213
214    // === Testdrive state. ===
215    arg_vars: BTreeMap<String, String>,
216    cmd_vars: BTreeMap<String, String>,
217    seed: u32,
218    temp_path: PathBuf,
219    _tempfile: Option<tempfile::TempDir>,
220    default_timeout: Duration,
221    timeout: Duration,
222    max_tries: usize,
223    initial_backoff: Duration,
224    backoff_factor: f64,
225    consistency_checks: consistency::Level,
226    check_statement_logging: bool,
227    consistency_checks_adhoc_skip: bool,
228    regex: Option<Regex>,
229    regex_replacement: String,
230    error_line_count: usize,
231    error_string: String,
232
233    // === Materialize state. ===
234    materialize: MaterializeState,
235
236    // === Persist state. ===
237    persist_consensus_url: Option<SensitiveUrl>,
238    persist_blob_url: Option<SensitiveUrl>,
239    build_info: &'static BuildInfo,
240    persist_clients: PersistClientCache,
241
242    // === Confluent state. ===
243    schema_registry_url: Url,
244    ccsr_client: mz_ccsr::Client,
245    kafka_addr: String,
246    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
247    kafka_admin_opts: rdkafka::admin::AdminOptions,
248    kafka_config: ClientConfig,
249    kafka_default_partitions: usize,
250    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
251    kafka_topics: BTreeMap<String, usize>,
252
253    // === AWS state. ===
254    aws_account: String,
255    aws_config: SdkConfig,
256
257    // === Database driver state. ===
258    mysql_clients: BTreeMap<String, mysql_async::Conn>,
259    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
260    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
261
262    // === Fivetran state. ===
263    fivetran_destination_url: String,
264    fivetran_destination_files_path: String,
265
266    // === Rewrite state. ===
267    rewrite_results: bool,
268    /// Current file, results are replaced inline
269    pub rewrites: Vec<Rewrite>,
270    /// Start position of currently expected result
271    pub rewrite_pos_start: usize,
272    /// End position of currently expected result
273    pub rewrite_pos_end: usize,
274}
275
276pub struct Rewrite {
277    pub content: String,
278    pub start: usize,
279    pub end: usize,
280}
281
282impl State {
283    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
284        self.cmd_vars
285            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
286        self.cmd_vars.insert(
287            "testdrive.kafka-addr-resolved".into(),
288            self.kafka_addr
289                .to_socket_addrs()
290                .ok()
291                .and_then(|mut addrs| addrs.next())
292                .map(|addr| addr.to_string())
293                .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
294        );
295        self.cmd_vars.insert(
296            "testdrive.schema-registry-url".into(),
297            self.schema_registry_url.to_string(),
298        );
299        self.cmd_vars
300            .insert("testdrive.seed".into(), self.seed.to_string());
301        self.cmd_vars.insert(
302            "testdrive.temp-dir".into(),
303            self.temp_path.display().to_string(),
304        );
305        self.cmd_vars
306            .insert("testdrive.aws-region".into(), self.aws_region().into());
307        self.cmd_vars
308            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
309        self.cmd_vars
310            .insert("testdrive.aws-account".into(), self.aws_account.clone());
311        {
312            let aws_credentials = self
313                .aws_config
314                .credentials_provider()
315                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
316                .provide_credentials()
317                .await
318                .context("fetching AWS credentials")?;
319            self.cmd_vars.insert(
320                "testdrive.aws-access-key-id".into(),
321                aws_credentials.access_key_id().to_owned(),
322            );
323            self.cmd_vars.insert(
324                "testdrive.aws-secret-access-key".into(),
325                aws_credentials.secret_access_key().to_owned(),
326            );
327            self.cmd_vars.insert(
328                "testdrive.aws-token".into(),
329                aws_credentials
330                    .session_token()
331                    .map(|token| token.to_owned())
332                    .unwrap_or_else(String::new),
333            );
334        }
335        self.cmd_vars.insert(
336            "testdrive.materialize-environment-id".into(),
337            self.materialize.environment_id.to_string(),
338        );
339        self.cmd_vars.insert(
340            "testdrive.materialize-sql-addr".into(),
341            self.materialize.sql_addr.clone(),
342        );
343        self.cmd_vars.insert(
344            "testdrive.materialize-internal-sql-addr".into(),
345            self.materialize.internal_sql_addr.clone(),
346        );
347        self.cmd_vars.insert(
348            "testdrive.materialize-password-sql-addr".into(),
349            self.materialize.password_sql_addr.clone(),
350        );
351        self.cmd_vars.insert(
352            "testdrive.materialize-sasl-sql-addr".into(),
353            self.materialize.sasl_sql_addr.clone(),
354        );
355        self.cmd_vars.insert(
356            "testdrive.materialize-user".into(),
357            self.materialize.user.clone(),
358        );
359        self.cmd_vars.insert(
360            "testdrive.fivetran-destination-url".into(),
361            self.fivetran_destination_url.clone(),
362        );
363        self.cmd_vars.insert(
364            "testdrive.fivetran-destination-files-path".into(),
365            self.fivetran_destination_files_path.clone(),
366        );
367
368        for (key, value) in env::vars() {
369            self.cmd_vars.insert(format!("env.{}", key), value);
370        }
371
372        for (key, value) in &self.arg_vars {
373            validate_ident(key)?;
374            self.cmd_vars
375                .insert(format!("arg.{}", key), value.to_string());
376        }
377
378        Ok(())
379    }
380    /// Makes of copy of the durable catalog and runs a function on its
381    /// state. Returns `None` if there's no catalog information in the State.
382    pub async fn with_catalog_copy<F, T>(
383        &self,
384        system_parameter_defaults: BTreeMap<String, String>,
385        build_info: &'static BuildInfo,
386        bootstrap_args: &BootstrapArgs,
387        enable_expression_cache_override: Option<bool>,
388        f: F,
389    ) -> Result<Option<T>, anyhow::Error>
390    where
391        F: FnOnce(ConnCatalog) -> T,
392    {
393        async fn persist_client(
394            persist_consensus_url: SensitiveUrl,
395            persist_blob_url: SensitiveUrl,
396            persist_clients: &PersistClientCache,
397        ) -> Result<PersistClient, anyhow::Error> {
398            let persist_location = PersistLocation {
399                blob_uri: persist_blob_url,
400                consensus_uri: persist_consensus_url,
401            };
402            Ok(persist_clients.open(persist_location).await?)
403        }
404
405        if let Some(CatalogConfig {
406            persist_consensus_url,
407            persist_blob_url,
408        }) = &self.materialize.catalog_config
409        {
410            let persist_client = persist_client(
411                persist_consensus_url.clone(),
412                persist_blob_url.clone(),
413                &self.persist_clients,
414            )
415            .await?;
416            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
417                persist_client,
418                SYSTEM_TIME.clone(),
419                self.materialize.environment_id.clone(),
420                system_parameter_defaults,
421                build_info,
422                bootstrap_args,
423                enable_expression_cache_override,
424            )
425            .await?;
426            let res = f(catalog.for_session(&Session::dummy()));
427            catalog.expire().await;
428            Ok(Some(res))
429        } else {
430            Ok(None)
431        }
432    }
433
434    pub fn aws_endpoint(&self) -> &str {
435        self.aws_config.endpoint_url().unwrap_or("")
436    }
437
438    pub fn aws_region(&self) -> &str {
439        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
440    }
441
442    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
443    /// the consistency checks should be skipped for this current run.
444    pub fn clear_skip_consistency_checks(&mut self) -> bool {
445        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
446    }
447
448    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
449        let (inner_client, _) = postgres_client(
450            &format!(
451                "postgres://mz_system:materialize@{}",
452                self.materialize.internal_sql_addr
453            ),
454            self.default_timeout,
455        )
456        .await?;
457
458        let version = inner_client
459            .query_one("SELECT mz_version_num()", &[])
460            .await
461            .context("getting version of materialize")
462            .map(|row| row.get::<_, i32>(0))?;
463
464        let semver = inner_client
465            .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
466            .await
467            .context("getting semver of materialize")
468            .map(|row| row.get::<_, String>(0))?
469            .parse::<semver::Version>()
470            .context("parsing semver of materialize")?;
471
472        inner_client
473            .batch_execute("ALTER SYSTEM RESET ALL")
474            .await
475            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
476
477        // Dangerous functions are useful for tests so we enable it for all tests.
478        {
479            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
480            let enable_unsafe_functions = if semver >= rename_version {
481                "unsafe_enable_unsafe_functions"
482            } else {
483                "enable_unsafe_functions"
484            };
485            let res = inner_client
486                .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
487                .await
488                .context("enabling dangerous functions");
489            if let Err(e) = res {
490                match e.root_cause().downcast_ref::<DbError>() {
491                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
492                        info!(
493                            "can't enable unsafe functions because the server is safe mode; \
494                             testdrive scripts will fail if they use unsafe functions",
495                        );
496                    }
497                    _ => return Err(e),
498                }
499            }
500        }
501
502        for row in inner_client
503            .query("SHOW DATABASES", &[])
504            .await
505            .context("resetting materialize state: SHOW DATABASES")?
506        {
507            let db_name: String = row.get(0);
508            if db_name.starts_with("testdrive_no_reset_") {
509                continue;
510            }
511            let query = format!(
512                "DROP DATABASE {}",
513                postgres_protocol::escape::escape_identifier(&db_name)
514            );
515            sql::print_query(&query, None);
516            inner_client.batch_execute(&query).await.context(format!(
517                "resetting materialize state: DROP DATABASE {}",
518                db_name,
519            ))?;
520        }
521
522        // Get all user clusters not running any objects owned by users
523        let inactive_user_clusters = "
524        WITH
525            active_user_clusters AS
526            (
527                SELECT DISTINCT cluster_id, object_id
528                FROM
529                    (
530                        SELECT cluster_id, id FROM mz_catalog.mz_sources
531                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
532                        UNION ALL
533                            SELECT cluster_id, id
534                            FROM mz_catalog.mz_materialized_views
535                        UNION ALL
536                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
537                        UNION ALL
538                            SELECT cluster_id, id
539                            FROM mz_internal.mz_subscriptions
540                    )
541                    AS t (cluster_id, object_id)
542                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
543            )
544        SELECT name
545        FROM mz_catalog.mz_clusters
546        WHERE
547            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
548                AND
549            owner_id LIKE 'u%';";
550
551        let inactive_clusters = inner_client
552            .query(inactive_user_clusters, &[])
553            .await
554            .context("resetting materialize state: inactive_user_clusters")?;
555
556        if !inactive_clusters.is_empty() {
557            println!("cleaning up user clusters from previous tests...")
558        }
559
560        for cluster_name in inactive_clusters {
561            let cluster_name: String = cluster_name.get(0);
562            if cluster_name.starts_with("testdrive_no_reset_") {
563                continue;
564            }
565            let query = format!(
566                "DROP CLUSTER {}",
567                postgres_protocol::escape::escape_identifier(&cluster_name)
568            );
569            sql::print_query(&query, None);
570            inner_client.batch_execute(&query).await.context(format!(
571                "resetting materialize state: DROP CLUSTER {}",
572                cluster_name,
573            ))?;
574        }
575
576        inner_client
577            .batch_execute("CREATE DATABASE materialize")
578            .await
579            .context("resetting materialize state: CREATE DATABASE materialize")?;
580
581        // Attempt to remove all users but the current user. Old versions of
582        // Materialize did not support roles, so this degrades gracefully if
583        // mz_roles does not exist.
584        if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
585            for row in rows {
586                let role_name: String = row.get(0);
587                if role_name == self.materialize.user || role_name.starts_with("mz_") {
588                    continue;
589                }
590                let query = format!(
591                    "DROP ROLE {}",
592                    postgres_protocol::escape::escape_identifier(&role_name)
593                );
594                sql::print_query(&query, None);
595                inner_client.batch_execute(&query).await.context(format!(
596                    "resetting materialize state: DROP ROLE {}",
597                    role_name,
598                ))?;
599            }
600        }
601
602        // Alter materialize user with all system privileges.
603        inner_client
604            .batch_execute(&format!(
605                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
606                self.materialize.user
607            ))
608            .await?;
609
610        // Grant initial privileges.
611        inner_client
612            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
613            .await?;
614        inner_client
615            .batch_execute(&format!(
616                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
617                self.materialize.user
618            ))
619            .await?;
620        inner_client
621            .batch_execute(&format!(
622                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
623                self.materialize.user
624            ))
625            .await?;
626
627        let cluster = match version {
628            ..=8199 => "default",
629            8200.. => "quickstart",
630        };
631        inner_client
632            .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
633            .await?;
634        inner_client
635            .batch_execute(&format!(
636                "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
637                self.materialize.user
638            ))
639            .await?;
640
641        Ok(())
642    }
643
644    /// Delete Kafka topics + CCSR subjects that were created in this run
645    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
646        let mut errors: Vec<anyhow::Error> = Vec::new();
647
648        let metadata = self.kafka_producer.client().fetch_metadata(
649            None,
650            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
651        )?;
652
653        let testdrive_topics: Vec<_> = metadata
654            .topics()
655            .iter()
656            .filter_map(|t| {
657                if t.name().starts_with("testdrive-") {
658                    Some(t.name())
659                } else {
660                    None
661                }
662            })
663            .collect();
664
665        if !testdrive_topics.is_empty() {
666            match self
667                .kafka_admin
668                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
669                .await
670            {
671                Ok(res) => {
672                    if res.len() != testdrive_topics.len() {
673                        errors.push(anyhow!(
674                            "kafka topic deletion returned {} results, but exactly {} expected",
675                            res.len(),
676                            testdrive_topics.len()
677                        ));
678                    }
679                    for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
680                        match res {
681                            Ok(_)
682                            | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
683                                ()
684                            }
685                            Err((_, err)) => {
686                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
687                            }
688                        }
689                    }
690                }
691                Err(e) => {
692                    errors.push(e.into());
693                }
694            };
695        }
696
697        match self
698            .ccsr_client
699            .list_subjects()
700            .await
701            .context("listing schema registry subjects")
702        {
703            Ok(subjects) => {
704                let testdrive_subjects: Vec<_> = subjects
705                    .iter()
706                    .filter(|s| s.starts_with("testdrive-"))
707                    .collect();
708
709                for subject in testdrive_subjects {
710                    match self.ccsr_client.delete_subject(subject).await {
711                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
712                        Err(e) => errors.push(e.into()),
713                    }
714                }
715            }
716            Err(e) => {
717                errors.push(e);
718            }
719        }
720
721        if errors.is_empty() {
722            Ok(())
723        } else {
724            bail!(
725                "deleting Kafka topics: {} errors: {}",
726                errors.len(),
727                errors
728                    .into_iter()
729                    .map(|e| e.to_string_with_causes())
730                    .join("\n")
731            );
732        }
733    }
734}
735
736/// Configuration for the Catalog.
737#[derive(Debug, Clone)]
738pub struct CatalogConfig {
739    /// Handle to the persist consensus system.
740    pub persist_consensus_url: SensitiveUrl,
741    /// Handle to the persist blob storage.
742    pub persist_blob_url: SensitiveUrl,
743}
744
745pub enum ControlFlow {
746    Continue,
747    SkipBegin,
748    SkipEnd,
749}
750
751#[async_trait]
752pub(crate) trait Run {
753    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
754}
755
756#[async_trait]
757impl Run for PosCommand {
758    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
759        macro_rules! handle_version {
760            ($version_constraint:expr) => {
761                match $version_constraint {
762                    Some(VersionConstraint { min, max }) => {
763                        match version_check::run_version_check(min, max, state).await {
764                            Ok(true) => return Ok(ControlFlow::Continue),
765                            Ok(false) => {}
766                            Err(err) => return Err(PosError::new(err, self.pos)),
767                        }
768                    }
769                    None => {}
770                }
771            };
772        }
773
774        let wrap_err = |e| PosError::new(e, self.pos);
775        // Substitute variables at startup except for the command-specific ones
776        // Those will be substituted at runtime
777        let ignore_prefix = match &self.command {
778            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
779            _ => None,
780        };
781        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
782            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
783        };
784        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
785            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
786        };
787
788        let r = match self.command {
789            Command::Builtin(mut builtin, version_constraint) => {
790                handle_version!(version_constraint);
791                for val in builtin.args.values_mut() {
792                    *val = subst(val, &state.cmd_vars)?;
793                }
794                for line in &mut builtin.input {
795                    *line = subst(line, &state.cmd_vars)?;
796                }
797                match builtin.name.as_ref() {
798                    "check-consistency" => consistency::run_consistency_checks(state).await,
799                    "skip-consistency-checks" => {
800                        consistency::skip_consistency_checks(builtin, state)
801                    }
802                    "check-shard-tombstone" => {
803                        consistency::run_check_shard_tombstone(builtin, state).await
804                    }
805                    "fivetran-destination" => {
806                        fivetran::run_destination_command(builtin, state).await
807                    }
808                    "file-append" => file::run_append(builtin, state).await,
809                    "file-delete" => file::run_delete(builtin, state).await,
810                    "http-request" => http::run_request(builtin, state).await,
811                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
812                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
813                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
814                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
815                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
816                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
817                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
818                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
819                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
820                    "mysql-connect" => mysql::run_connect(builtin, state).await,
821                    "mysql-execute" => mysql::run_execute(builtin, state).await,
822                    "nop" => nop::run_nop(),
823                    "postgres-connect" => postgres::run_connect(builtin, state).await,
824                    "postgres-execute" => postgres::run_execute(builtin, state).await,
825                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
826                    "protobuf-compile-descriptors" => {
827                        protobuf::run_compile_descriptors(builtin, state).await
828                    }
829                    "psql-execute" => psql::run_execute(builtin, state).await,
830                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
831                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
832                    "s3-file-upload" => s3::run_upload(builtin, state).await,
833                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
834                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
835                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
836                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
837                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
838                    "skip-end" => skip_end::run_skip_end(),
839                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
840                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
841                    "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
842                    "persist-force-compaction" => {
843                        persist::run_force_compaction(builtin, state).await
844                    }
845                    "random-sleep" => sleep::run_random_sleep(builtin),
846                    "set-regex" => set::run_regex_set(builtin, state),
847                    "unset-regex" => set::run_regex_unset(builtin, state),
848                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
849                    "set-max-tries" => set::run_max_tries(builtin, state),
850                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
851                        sleep::run_sleep(builtin)
852                    }
853                    "set" => set::set_vars(builtin, state),
854                    "set-arg-default" => set::run_set_arg_default(builtin, state),
855                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
856                    "set-from-file" => set::run_set_from_file(builtin, state).await,
857                    "webhook-append" => webhook::run_append(builtin, state).await,
858                    _ => {
859                        return Err(PosError::new(
860                            anyhow!("unknown built-in command {}", builtin.name),
861                            self.pos,
862                        ));
863                    }
864                }
865            }
866            Command::Sql(mut sql, version_constraint) => {
867                handle_version!(version_constraint);
868                sql.query = subst(&sql.query, &state.cmd_vars)?;
869                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
870                    for row in expected_rows {
871                        for col in row {
872                            *col = subst(col, &state.cmd_vars)?;
873                        }
874                    }
875                }
876                sql::run_sql(sql, state).await
877            }
878            Command::FailSql(mut sql, version_constraint) => {
879                handle_version!(version_constraint);
880                sql.query = subst(&sql.query, &state.cmd_vars)?;
881                sql.expected_error = match &sql.expected_error {
882                    SqlExpectedError::Contains(s) => {
883                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
884                    }
885                    SqlExpectedError::Exact(s) => {
886                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
887                    }
888                    SqlExpectedError::Regex(s) => {
889                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
890                    }
891                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
892                };
893                sql::run_fail_sql(sql, state).await
894            }
895        };
896
897        r.map_err(wrap_err)
898    }
899}
900
901/// Substituted `${}`-delimited variables from `vars` into `msg`
902fn substitute_vars(
903    msg: &str,
904    vars: &BTreeMap<String, String>,
905    ignore_prefix: &Option<String>,
906    regex_escape: bool,
907) -> Result<String, anyhow::Error> {
908    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
909    let mut err = None;
910    let out = RE.replace_all(msg, |caps: &Captures| {
911        let name = &caps[1];
912        if let Some(ignore_prefix) = &ignore_prefix {
913            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
914                // Do not substitute, leave original variable name in place
915                return caps.get(0).unwrap().as_str().to_string();
916            }
917        }
918
919        if let Some(val) = vars.get(name) {
920            if regex_escape {
921                regex::escape(val)
922            } else {
923                val.to_string()
924            }
925        } else {
926            err = Some(anyhow!("unknown variable: {}", name));
927            "#VAR-MISSING#".to_string()
928        }
929    });
930    match err {
931        Some(err) => Err(err),
932        None => Ok(out.into_owned()),
933    }
934}
935
936/// Initializes a [`State`] object by connecting to the various external
937/// services specified in `config`.
938///
939/// Returns the initialized `State` and a cleanup future. The cleanup future
940/// should be `await`ed only *after* dropping the `State` to check whether any
941/// errors occured while dropping the `State`. This awkward API is a workaround
942/// for the lack of `AsyncDrop` support in Rust.
943pub async fn create_state(
944    config: &Config,
945) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
946    let seed = config.seed.unwrap_or_else(rand::random);
947
948    let (_tempfile, temp_path) = match &config.temp_dir {
949        Some(temp_dir) => {
950            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
951            (None, PathBuf::from(&temp_dir))
952        }
953        _ => {
954            // Stash the tempfile object so that it does not go out of scope and delete
955            // the tempdir prematurely
956            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
957            let temp_path = tempfile_handle.path().to_path_buf();
958            (Some(tempfile_handle), temp_path)
959        }
960    };
961
962    let materialize_catalog_config = config.materialize_catalog_config.clone();
963
964    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
965    info!("Connecting to {}", materialize_url.as_str());
966    let (pgclient, pgconn) = Retry::default()
967        .max_duration(config.default_timeout)
968        .retry_async_canceling(|_| async move {
969            let mut pgconfig = config.materialize_pgconfig.clone();
970            pgconfig.connect_timeout(config.default_timeout);
971            let tls = make_tls(&pgconfig)?;
972            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
973        })
974        .await?;
975
976    let pgconn_task =
977        task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
978
979    let materialize_state =
980        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
981
982    let schema_registry_url = config.schema_registry_url.to_owned();
983
984    let ccsr_client = {
985        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
986
987        if let Some(cert_path) = &config.cert_path {
988            let cert = fs::read(cert_path).context("reading cert")?;
989            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
990            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
991                .context("reading keystore file as pkcs12")?;
992            ccsr_config = ccsr_config.identity(ident);
993        }
994
995        if let Some(ccsr_username) = &config.ccsr_username {
996            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
997        }
998
999        ccsr_config.build().context("Creating CCSR client")?
1000    };
1001
1002    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1003        use rdkafka::admin::{AdminClient, AdminOptions};
1004        use rdkafka::producer::FutureProducer;
1005
1006        let mut kafka_config = create_new_client_config_simple();
1007        kafka_config.set("bootstrap.servers", &config.kafka_addr);
1008        kafka_config.set("group.id", "materialize-testdrive");
1009        kafka_config.set("auto.offset.reset", "earliest");
1010        kafka_config.set("isolation.level", "read_committed");
1011        if let Some(cert_path) = &config.cert_path {
1012            kafka_config.set("security.protocol", "ssl");
1013            kafka_config.set("ssl.keystore.location", cert_path);
1014            if let Some(cert_password) = &config.cert_password {
1015                kafka_config.set("ssl.keystore.password", cert_password);
1016            }
1017        }
1018        kafka_config.set("message.max.bytes", "15728640");
1019
1020        for (key, value) in &config.kafka_opts {
1021            kafka_config.set(key, value);
1022        }
1023
1024        let admin: AdminClient<_> = kafka_config
1025            .create_with_context(MzClientContext::default())
1026            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1027
1028        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1029
1030        let producer: FutureProducer<_> = kafka_config
1031            .create_with_context(MzClientContext::default())
1032            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1033
1034        let topics = BTreeMap::new();
1035
1036        (
1037            config.kafka_addr.to_owned(),
1038            admin,
1039            admin_opts,
1040            producer,
1041            topics,
1042            kafka_config,
1043        )
1044    };
1045
1046    let mut state = State {
1047        config: config.clone(),
1048
1049        // === Testdrive state. ===
1050        arg_vars: config.arg_vars.clone(),
1051        cmd_vars: BTreeMap::new(),
1052        seed,
1053        temp_path,
1054        _tempfile,
1055        default_timeout: config.default_timeout,
1056        timeout: config.default_timeout,
1057        max_tries: config.default_max_tries,
1058        initial_backoff: config.initial_backoff,
1059        backoff_factor: config.backoff_factor,
1060        consistency_checks: config.consistency_checks,
1061        check_statement_logging: config.check_statement_logging,
1062        consistency_checks_adhoc_skip: false,
1063        regex: None,
1064        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1065        rewrite_results: config.rewrite_results,
1066        error_line_count: 0,
1067        error_string: "".to_string(),
1068
1069        // === Materialize state. ===
1070        materialize: materialize_state,
1071
1072        // === Persist state. ===
1073        persist_consensus_url: config.persist_consensus_url.clone(),
1074        persist_blob_url: config.persist_blob_url.clone(),
1075        build_info: config.build_info,
1076        persist_clients: PersistClientCache::new(
1077            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1078            &MetricsRegistry::new(),
1079            |_, _| PubSubClientConnection::noop(),
1080        ),
1081
1082        // === Confluent state. ===
1083        schema_registry_url,
1084        ccsr_client,
1085        kafka_addr,
1086        kafka_admin,
1087        kafka_admin_opts,
1088        kafka_config,
1089        kafka_default_partitions: config.kafka_default_partitions,
1090        kafka_producer,
1091        kafka_topics,
1092
1093        // === AWS state. ===
1094        aws_account: config.aws_account.clone(),
1095        aws_config: config.aws_config.clone(),
1096
1097        // === Database driver state. ===
1098        mysql_clients: BTreeMap::new(),
1099        postgres_clients: BTreeMap::new(),
1100        sql_server_clients: BTreeMap::new(),
1101
1102        // === Fivetran state. ===
1103        fivetran_destination_url: config.fivetran_destination_url.clone(),
1104        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1105
1106        rewrites: Vec::new(),
1107        rewrite_pos_start: 0,
1108        rewrite_pos_end: 0,
1109    };
1110    state.initialize_cmd_vars().await?;
1111    Ok((state, pgconn_task))
1112}
1113
1114async fn create_materialize_state(
1115    config: &&Config,
1116    materialize_catalog_config: Option<CatalogConfig>,
1117    pgclient: tokio_postgres::Client,
1118) -> Result<MaterializeState, anyhow::Error> {
1119    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1120    let materialize_internal_url =
1121        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1122
1123    for (key, value) in &config.materialize_params {
1124        pgclient
1125            .batch_execute(&format!("SET {key} = {value}"))
1126            .await
1127            .context("setting session parameter")?;
1128    }
1129
1130    let materialize_user = config
1131        .materialize_pgconfig
1132        .get_user()
1133        .expect("testdrive URL must contain user")
1134        .to_string();
1135
1136    let materialize_sql_addr = format!(
1137        "{}:{}",
1138        materialize_url.host_str().unwrap(),
1139        materialize_url.port().unwrap()
1140    );
1141    let materialize_http_addr = format!(
1142        "{}:{}",
1143        materialize_url.host_str().unwrap(),
1144        config.materialize_http_port
1145    );
1146    let materialize_internal_sql_addr = format!(
1147        "{}:{}",
1148        materialize_internal_url.host_str().unwrap(),
1149        materialize_internal_url.port().unwrap()
1150    );
1151    let materialize_password_sql_addr = format!(
1152        "{}:{}",
1153        materialize_url.host_str().unwrap(),
1154        config.materialize_password_sql_port
1155    );
1156    let materialize_sasl_sql_addr = format!(
1157        "{}:{}",
1158        materialize_url.host_str().unwrap(),
1159        config.materialize_sasl_sql_port
1160    );
1161    let materialize_internal_http_addr = format!(
1162        "{}:{}",
1163        materialize_internal_url.host_str().unwrap(),
1164        config.materialize_internal_http_port
1165    );
1166    let environment_id = pgclient
1167        .query_one("SELECT mz_environment_id()", &[])
1168        .await?
1169        .get::<_, String>(0)
1170        .parse()
1171        .context("parsing environment ID")?;
1172
1173    let bootstrap_args = BootstrapArgs {
1174        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1175        default_cluster_replica_size: "ABC".to_string(),
1176        default_cluster_replication_factor: 1,
1177        bootstrap_role: None,
1178    };
1179
1180    let materialize_state = MaterializeState {
1181        catalog_config: materialize_catalog_config,
1182        sql_addr: materialize_sql_addr,
1183        use_https: config.materialize_use_https,
1184        http_addr: materialize_http_addr,
1185        internal_sql_addr: materialize_internal_sql_addr,
1186        internal_http_addr: materialize_internal_http_addr,
1187        password_sql_addr: materialize_password_sql_addr,
1188        sasl_sql_addr: materialize_sasl_sql_addr,
1189        user: materialize_user,
1190        pgclient,
1191        environment_id,
1192        bootstrap_args,
1193    };
1194
1195    Ok(materialize_state)
1196}