mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rdkafka::ClientConfig;
43use rdkafka::producer::Producer;
44use regex::{Captures, Regex};
45use semver::Version;
46use tokio_postgres::error::{DbError, SqlState};
47use tracing::info;
48use url::Url;
49
50use crate::error::PosError;
51use crate::parser::{
52    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
53};
54use crate::util;
55use crate::util::postgres::postgres_client;
56
57pub mod consistency;
58
59mod duckdb;
60mod file;
61mod fivetran;
62mod http;
63mod kafka;
64mod mysql;
65mod nop;
66mod persist;
67mod postgres;
68mod protobuf;
69mod psql;
70mod s3;
71mod schema_registry;
72mod set;
73mod skip_end;
74mod skip_if;
75mod sleep;
76mod sql;
77mod sql_server;
78mod version_check;
79mod webhook;
80
81/// User-settable configuration parameters.
82#[derive(Debug, Clone)]
83pub struct Config {
84    // === Testdrive options. ===
85    /// Variables to make available to the testdrive script.
86    ///
87    /// The value of each entry will be made available to the script in a
88    /// variable named `arg.KEY`.
89    pub arg_vars: BTreeMap<String, String>,
90    /// A random number to distinguish each run of a testdrive script.
91    pub seed: Option<u32>,
92    /// Whether to reset Materialize state before executing each script and
93    /// to clean up AWS state after each script.
94    pub reset: bool,
95    /// Force the use of the specified temporary directory to use.
96    ///
97    /// If unspecified, testdrive creates a temporary directory with a random
98    /// name.
99    pub temp_dir: Option<String>,
100    /// Source string to print out on errors.
101    pub source: Option<String>,
102    /// The default timeout for cancellable operations.
103    pub default_timeout: Duration,
104    /// The default number of tries for retriable operations.
105    pub default_max_tries: usize,
106    /// The initial backoff interval for retry operations.
107    ///
108    /// Set to 0 to retry immediately on failure.
109    pub initial_backoff: Duration,
110    /// Backoff factor to use for retry operations.
111    ///
112    /// Set to 1 to retry at a steady pace.
113    pub backoff_factor: f64,
114    /// Should we skip coordinator and catalog consistency checks.
115    pub consistency_checks: consistency::Level,
116    /// Whether to run statement logging consistency checks (adds a few seconds at the end of every
117    /// test file).
118    pub check_statement_logging: bool,
119    /// Whether to automatically rewrite wrong results instead of failing.
120    pub rewrite_results: bool,
121
122    // === Materialize options. ===
123    /// The pgwire connection parameters for the Materialize instance that
124    /// testdrive will connect to.
125    pub materialize_pgconfig: tokio_postgres::Config,
126    /// The internal pgwire connection parameters for the Materialize instance that
127    /// testdrive will connect to.
128    pub materialize_internal_pgconfig: tokio_postgres::Config,
129    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
130    pub materialize_use_https: bool,
131    /// The port for the public endpoints of the materialize instance that
132    /// testdrive will connect to via HTTP.
133    pub materialize_http_port: u16,
134    /// The port for the internal endpoints of the materialize instance that
135    /// testdrive will connect to via HTTP.
136    pub materialize_internal_http_port: u16,
137    /// The port for the password endpoints of the materialize instance that
138    /// testdrive will connect to via SQL.
139    pub materialize_password_sql_port: u16,
140    /// The port for the SASL endpoints of the materialize instance that
141    /// testdrive will connect to via SQL.
142    pub materialize_sasl_sql_port: u16,
143    /// Session parameters to set after connecting to materialize.
144    pub materialize_params: Vec<(String, String)>,
145    /// An optional catalog configuration.
146    pub materialize_catalog_config: Option<CatalogConfig>,
147    /// Build information
148    pub build_info: &'static BuildInfo,
149    /// Configured cluster replica sizes
150    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
151
152    // === Persist options. ===
153    /// Handle to the persist consensus system.
154    pub persist_consensus_url: Option<SensitiveUrl>,
155    /// Handle to the persist blob storage.
156    pub persist_blob_url: Option<SensitiveUrl>,
157
158    // === Confluent options. ===
159    /// The address of the Kafka broker that testdrive will interact with.
160    pub kafka_addr: String,
161    /// Default number of partitions to use for topics
162    pub kafka_default_partitions: usize,
163    /// Arbitrary rdkafka options for testdrive to use when connecting to the
164    /// Kafka broker.
165    pub kafka_opts: Vec<(String, String)>,
166    /// The URL of the schema registry that testdrive will connect to.
167    pub schema_registry_url: Url,
168    /// An optional path to a TLS certificate that testdrive will present when
169    /// performing client authentication.
170    ///
171    /// The keystore must be in the PKCS#12 format.
172    pub cert_path: Option<String>,
173    /// An optional password for the TLS certificate.
174    pub cert_password: Option<String>,
175    /// An optional username for basic authentication with the Confluent Schema
176    /// Registry.
177    pub ccsr_username: Option<String>,
178    /// An optional password for basic authentication with the Confluent Schema
179    /// Registry.
180    pub ccsr_password: Option<String>,
181
182    // === AWS options. ===
183    /// The configuration to use when connecting to AWS.
184    pub aws_config: SdkConfig,
185    /// The ID of the AWS account that `aws_config` configures.
186    pub aws_account: String,
187
188    // === Fivetran options. ===
189    /// Address of the Fivetran Destination that is currently running.
190    pub fivetran_destination_url: String,
191    /// Directory that is accessible to the Fivetran Destination.
192    pub fivetran_destination_files_path: String,
193}
194
195pub struct MaterializeState {
196    catalog_config: Option<CatalogConfig>,
197
198    sql_addr: String,
199    use_https: bool,
200    http_addr: String,
201    internal_sql_addr: String,
202    internal_http_addr: String,
203    password_sql_addr: String,
204    sasl_sql_addr: String,
205    user: String,
206    pgclient: tokio_postgres::Client,
207    environment_id: EnvironmentId,
208    bootstrap_args: BootstrapArgs,
209}
210
211pub struct State {
212    // The Config that this `State` was originally created from.
213    pub config: Config,
214
215    // === Testdrive state. ===
216    arg_vars: BTreeMap<String, String>,
217    cmd_vars: BTreeMap<String, String>,
218    seed: u32,
219    temp_path: PathBuf,
220    _tempfile: Option<tempfile::TempDir>,
221    default_timeout: Duration,
222    timeout: Duration,
223    max_tries: usize,
224    initial_backoff: Duration,
225    backoff_factor: f64,
226    consistency_checks: consistency::Level,
227    check_statement_logging: bool,
228    consistency_checks_adhoc_skip: bool,
229    regex: Option<Regex>,
230    regex_replacement: String,
231    error_line_count: usize,
232    error_string: String,
233
234    // === Materialize state. ===
235    materialize: MaterializeState,
236
237    // === Persist state. ===
238    persist_consensus_url: Option<SensitiveUrl>,
239    persist_blob_url: Option<SensitiveUrl>,
240    build_info: &'static BuildInfo,
241    persist_clients: PersistClientCache,
242
243    // === Confluent state. ===
244    schema_registry_url: Url,
245    ccsr_client: mz_ccsr::Client,
246    kafka_addr: String,
247    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
248    kafka_admin_opts: rdkafka::admin::AdminOptions,
249    kafka_config: ClientConfig,
250    kafka_default_partitions: usize,
251    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
252    kafka_topics: BTreeMap<String, usize>,
253
254    // === AWS state. ===
255    aws_account: String,
256    aws_config: SdkConfig,
257
258    // === Database driver state. ===
259    pub duckdb_clients: BTreeMap<String, std::sync::Arc<std::sync::Mutex<::duckdb::Connection>>>,
260    mysql_clients: BTreeMap<String, mysql_async::Conn>,
261    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
262    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
263
264    // === Fivetran state. ===
265    fivetran_destination_url: String,
266    fivetran_destination_files_path: String,
267
268    // === Rewrite state. ===
269    rewrite_results: bool,
270    /// Current file, results are replaced inline
271    pub rewrites: Vec<Rewrite>,
272    /// Start position of currently expected result
273    pub rewrite_pos_start: usize,
274    /// End position of currently expected result
275    pub rewrite_pos_end: usize,
276}
277
278pub struct Rewrite {
279    pub content: String,
280    pub start: usize,
281    pub end: usize,
282}
283
284impl State {
285    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
286        self.cmd_vars
287            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
288        self.cmd_vars.insert(
289            "testdrive.kafka-addr-resolved".into(),
290            self.kafka_addr
291                .to_socket_addrs()
292                .ok()
293                .and_then(|mut addrs| addrs.next())
294                .map(|addr| addr.to_string())
295                .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
296        );
297        self.cmd_vars.insert(
298            "testdrive.schema-registry-url".into(),
299            self.schema_registry_url.to_string(),
300        );
301        self.cmd_vars
302            .insert("testdrive.seed".into(), self.seed.to_string());
303        self.cmd_vars.insert(
304            "testdrive.temp-dir".into(),
305            self.temp_path.display().to_string(),
306        );
307        self.cmd_vars
308            .insert("testdrive.aws-region".into(), self.aws_region().into());
309        self.cmd_vars
310            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
311        self.cmd_vars
312            .insert("testdrive.aws-account".into(), self.aws_account.clone());
313        {
314            let aws_credentials = self
315                .aws_config
316                .credentials_provider()
317                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
318                .provide_credentials()
319                .await
320                .context("fetching AWS credentials")?;
321            self.cmd_vars.insert(
322                "testdrive.aws-access-key-id".into(),
323                aws_credentials.access_key_id().to_owned(),
324            );
325            self.cmd_vars.insert(
326                "testdrive.aws-secret-access-key".into(),
327                aws_credentials.secret_access_key().to_owned(),
328            );
329            self.cmd_vars.insert(
330                "testdrive.aws-token".into(),
331                aws_credentials
332                    .session_token()
333                    .map(|token| token.to_owned())
334                    .unwrap_or_else(String::new),
335            );
336        }
337        self.cmd_vars.insert(
338            "testdrive.materialize-environment-id".into(),
339            self.materialize.environment_id.to_string(),
340        );
341        self.cmd_vars.insert(
342            "testdrive.materialize-sql-addr".into(),
343            self.materialize.sql_addr.clone(),
344        );
345        self.cmd_vars.insert(
346            "testdrive.materialize-internal-sql-addr".into(),
347            self.materialize.internal_sql_addr.clone(),
348        );
349        self.cmd_vars.insert(
350            "testdrive.materialize-password-sql-addr".into(),
351            self.materialize.password_sql_addr.clone(),
352        );
353        self.cmd_vars.insert(
354            "testdrive.materialize-sasl-sql-addr".into(),
355            self.materialize.sasl_sql_addr.clone(),
356        );
357        self.cmd_vars.insert(
358            "testdrive.materialize-user".into(),
359            self.materialize.user.clone(),
360        );
361        self.cmd_vars.insert(
362            "testdrive.fivetran-destination-url".into(),
363            self.fivetran_destination_url.clone(),
364        );
365        self.cmd_vars.insert(
366            "testdrive.fivetran-destination-files-path".into(),
367            self.fivetran_destination_files_path.clone(),
368        );
369
370        for (key, value) in env::vars() {
371            self.cmd_vars.insert(format!("env.{}", key), value);
372        }
373
374        for (key, value) in &self.arg_vars {
375            validate_ident(key)?;
376            self.cmd_vars
377                .insert(format!("arg.{}", key), value.to_string());
378        }
379
380        Ok(())
381    }
382    /// Makes of copy of the durable catalog and runs a function on its
383    /// state. Returns `None` if there's no catalog information in the State.
384    pub async fn with_catalog_copy<F, T>(
385        &self,
386        system_parameter_defaults: BTreeMap<String, String>,
387        build_info: &'static BuildInfo,
388        bootstrap_args: &BootstrapArgs,
389        enable_expression_cache_override: Option<bool>,
390        f: F,
391    ) -> Result<Option<T>, anyhow::Error>
392    where
393        F: FnOnce(ConnCatalog) -> T,
394    {
395        async fn persist_client(
396            persist_consensus_url: SensitiveUrl,
397            persist_blob_url: SensitiveUrl,
398            persist_clients: &PersistClientCache,
399        ) -> Result<PersistClient, anyhow::Error> {
400            let persist_location = PersistLocation {
401                blob_uri: persist_blob_url,
402                consensus_uri: persist_consensus_url,
403            };
404            Ok(persist_clients.open(persist_location).await?)
405        }
406
407        if let Some(CatalogConfig {
408            persist_consensus_url,
409            persist_blob_url,
410        }) = &self.materialize.catalog_config
411        {
412            let persist_client = persist_client(
413                persist_consensus_url.clone(),
414                persist_blob_url.clone(),
415                &self.persist_clients,
416            )
417            .await?;
418            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
419                persist_client,
420                SYSTEM_TIME.clone(),
421                self.materialize.environment_id.clone(),
422                system_parameter_defaults,
423                build_info,
424                bootstrap_args,
425                enable_expression_cache_override,
426            )
427            .await?;
428            let res = f(catalog.for_session(&Session::dummy()));
429            catalog.expire().await;
430            Ok(Some(res))
431        } else {
432            Ok(None)
433        }
434    }
435
436    pub fn aws_endpoint(&self) -> &str {
437        self.aws_config.endpoint_url().unwrap_or("")
438    }
439
440    pub fn aws_region(&self) -> &str {
441        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
442    }
443
444    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
445    /// the consistency checks should be skipped for this current run.
446    pub fn clear_skip_consistency_checks(&mut self) -> bool {
447        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
448    }
449
450    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
451        let (inner_client, _) = postgres_client(
452            &format!(
453                "postgres://mz_system:materialize@{}",
454                self.materialize.internal_sql_addr
455            ),
456            self.default_timeout,
457        )
458        .await?;
459
460        let version = inner_client
461            .query_one("SELECT mz_version_num()", &[])
462            .await
463            .context("getting version of materialize")
464            .map(|row| row.get::<_, i32>(0))?;
465
466        let semver = inner_client
467            .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
468            .await
469            .context("getting semver of materialize")
470            .map(|row| row.get::<_, String>(0))?
471            .parse::<semver::Version>()
472            .context("parsing semver of materialize")?;
473
474        inner_client
475            .batch_execute("ALTER SYSTEM RESET ALL")
476            .await
477            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
478
479        // Dangerous functions are useful for tests so we enable it for all tests.
480        {
481            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
482            let enable_unsafe_functions = if semver >= rename_version {
483                "unsafe_enable_unsafe_functions"
484            } else {
485                "enable_unsafe_functions"
486            };
487            let res = inner_client
488                .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
489                .await
490                .context("enabling dangerous functions");
491            if let Err(e) = res {
492                match e.root_cause().downcast_ref::<DbError>() {
493                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
494                        info!(
495                            "can't enable unsafe functions because the server is safe mode; \
496                             testdrive scripts will fail if they use unsafe functions",
497                        );
498                    }
499                    _ => return Err(e),
500                }
501            }
502        }
503
504        for row in inner_client
505            .query("SHOW DATABASES", &[])
506            .await
507            .context("resetting materialize state: SHOW DATABASES")?
508        {
509            let db_name: String = row.get(0);
510            if db_name.starts_with("testdrive_no_reset_") {
511                continue;
512            }
513            let query = format!(
514                "DROP DATABASE {}",
515                postgres_protocol::escape::escape_identifier(&db_name)
516            );
517            sql::print_query(&query, None);
518            inner_client.batch_execute(&query).await.context(format!(
519                "resetting materialize state: DROP DATABASE {}",
520                db_name,
521            ))?;
522        }
523
524        // Get all user clusters not running any objects owned by users
525        let inactive_user_clusters = "
526        WITH
527            active_user_clusters AS
528            (
529                SELECT DISTINCT cluster_id, object_id
530                FROM
531                    (
532                        SELECT cluster_id, id FROM mz_catalog.mz_sources
533                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
534                        UNION ALL
535                            SELECT cluster_id, id
536                            FROM mz_catalog.mz_materialized_views
537                        UNION ALL
538                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
539                        UNION ALL
540                            SELECT cluster_id, id
541                            FROM mz_internal.mz_subscriptions
542                    )
543                    AS t (cluster_id, object_id)
544                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
545            )
546        SELECT name
547        FROM mz_catalog.mz_clusters
548        WHERE
549            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
550                AND
551            owner_id LIKE 'u%';";
552
553        let inactive_clusters = inner_client
554            .query(inactive_user_clusters, &[])
555            .await
556            .context("resetting materialize state: inactive_user_clusters")?;
557
558        if !inactive_clusters.is_empty() {
559            println!("cleaning up user clusters from previous tests...")
560        }
561
562        for cluster_name in inactive_clusters {
563            let cluster_name: String = cluster_name.get(0);
564            if cluster_name.starts_with("testdrive_no_reset_") {
565                continue;
566            }
567            let query = format!(
568                "DROP CLUSTER {}",
569                postgres_protocol::escape::escape_identifier(&cluster_name)
570            );
571            sql::print_query(&query, None);
572            inner_client.batch_execute(&query).await.context(format!(
573                "resetting materialize state: DROP CLUSTER {}",
574                cluster_name,
575            ))?;
576        }
577
578        inner_client
579            .batch_execute("CREATE DATABASE materialize")
580            .await
581            .context("resetting materialize state: CREATE DATABASE materialize")?;
582
583        // Attempt to remove all users but the current user. Old versions of
584        // Materialize did not support roles, so this degrades gracefully if
585        // mz_roles does not exist.
586        if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
587            for row in rows {
588                let role_name: String = row.get(0);
589                if role_name == self.materialize.user || role_name.starts_with("mz_") {
590                    continue;
591                }
592                let query = format!(
593                    "DROP ROLE {}",
594                    postgres_protocol::escape::escape_identifier(&role_name)
595                );
596                sql::print_query(&query, None);
597                inner_client.batch_execute(&query).await.context(format!(
598                    "resetting materialize state: DROP ROLE {}",
599                    role_name,
600                ))?;
601            }
602        }
603
604        // Alter materialize user with all system privileges.
605        inner_client
606            .batch_execute(&format!(
607                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
608                self.materialize.user
609            ))
610            .await?;
611
612        // Grant initial privileges.
613        inner_client
614            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
615            .await?;
616        inner_client
617            .batch_execute(&format!(
618                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
619                self.materialize.user
620            ))
621            .await?;
622        inner_client
623            .batch_execute(&format!(
624                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
625                self.materialize.user
626            ))
627            .await?;
628
629        let cluster = match version {
630            ..=8199 => "default",
631            8200.. => "quickstart",
632        };
633        inner_client
634            .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
635            .await?;
636        inner_client
637            .batch_execute(&format!(
638                "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
639                self.materialize.user
640            ))
641            .await?;
642
643        Ok(())
644    }
645
646    /// Delete Kafka topics + CCSR subjects that were created in this run
647    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
648        let mut errors: Vec<anyhow::Error> = Vec::new();
649
650        let metadata = self.kafka_producer.client().fetch_metadata(
651            None,
652            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
653        )?;
654
655        let testdrive_topics: Vec<_> = metadata
656            .topics()
657            .iter()
658            .filter_map(|t| {
659                if t.name().starts_with("testdrive-") {
660                    Some(t.name())
661                } else {
662                    None
663                }
664            })
665            .collect();
666
667        if !testdrive_topics.is_empty() {
668            match self
669                .kafka_admin
670                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
671                .await
672            {
673                Ok(res) => {
674                    if res.len() != testdrive_topics.len() {
675                        errors.push(anyhow!(
676                            "kafka topic deletion returned {} results, but exactly {} expected",
677                            res.len(),
678                            testdrive_topics.len()
679                        ));
680                    }
681                    for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
682                        match res {
683                            Ok(_)
684                            | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
685                                ()
686                            }
687                            Err((_, err)) => {
688                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
689                            }
690                        }
691                    }
692                }
693                Err(e) => {
694                    errors.push(e.into());
695                }
696            };
697        }
698
699        match self
700            .ccsr_client
701            .list_subjects()
702            .await
703            .context("listing schema registry subjects")
704        {
705            Ok(subjects) => {
706                let testdrive_subjects: Vec<_> = subjects
707                    .iter()
708                    .filter(|s| s.starts_with("testdrive-"))
709                    .collect();
710
711                for subject in testdrive_subjects {
712                    match self.ccsr_client.delete_subject(subject).await {
713                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
714                        Err(e) => errors.push(e.into()),
715                    }
716                }
717            }
718            Err(e) => {
719                errors.push(e);
720            }
721        }
722
723        if errors.is_empty() {
724            Ok(())
725        } else {
726            bail!(
727                "deleting Kafka topics: {} errors: {}",
728                errors.len(),
729                errors
730                    .into_iter()
731                    .map(|e| e.to_string_with_causes())
732                    .join("\n")
733            );
734        }
735    }
736}
737
738/// Configuration for the Catalog.
739#[derive(Debug, Clone)]
740pub struct CatalogConfig {
741    /// Handle to the persist consensus system.
742    pub persist_consensus_url: SensitiveUrl,
743    /// Handle to the persist blob storage.
744    pub persist_blob_url: SensitiveUrl,
745}
746
747pub enum ControlFlow {
748    Continue,
749    SkipBegin,
750    SkipEnd,
751}
752
753#[async_trait]
754pub(crate) trait Run {
755    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
756}
757
758#[async_trait]
759impl Run for PosCommand {
760    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
761        macro_rules! handle_version {
762            ($version_constraint:expr) => {
763                match $version_constraint {
764                    Some(VersionConstraint { min, max }) => {
765                        match version_check::run_version_check(min, max, state).await {
766                            Ok(true) => return Ok(ControlFlow::Continue),
767                            Ok(false) => {}
768                            Err(err) => return Err(PosError::new(err, self.pos)),
769                        }
770                    }
771                    None => {}
772                }
773            };
774        }
775
776        let wrap_err = |e| PosError::new(e, self.pos);
777        // Substitute variables at startup except for the command-specific ones
778        // Those will be substituted at runtime
779        let ignore_prefix = match &self.command {
780            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
781            _ => None,
782        };
783        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
784            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
785        };
786        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
787            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
788        };
789
790        let r = match self.command {
791            Command::Builtin(mut builtin, version_constraint) => {
792                handle_version!(version_constraint);
793                for val in builtin.args.values_mut() {
794                    *val = subst(val, &state.cmd_vars)?;
795                }
796                for line in &mut builtin.input {
797                    *line = subst(line, &state.cmd_vars)?;
798                }
799                match builtin.name.as_ref() {
800                    "check-consistency" => consistency::run_consistency_checks(state).await,
801                    "skip-consistency-checks" => {
802                        consistency::skip_consistency_checks(builtin, state)
803                    }
804                    "check-shard-tombstone" => {
805                        consistency::run_check_shard_tombstone(builtin, state).await
806                    }
807                    "duckdb-execute" => duckdb::run_execute(builtin, state).await,
808                    "duckdb-query" => duckdb::run_query(builtin, state).await,
809                    "fivetran-destination" => {
810                        fivetran::run_destination_command(builtin, state).await
811                    }
812                    "file-append" => file::run_append(builtin, state).await,
813                    "file-delete" => file::run_delete(builtin, state).await,
814                    "http-request" => http::run_request(builtin, state).await,
815                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
816                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
817                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
818                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
819                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
820                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
821                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
822                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
823                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
824                    "mysql-connect" => mysql::run_connect(builtin, state).await,
825                    "mysql-execute" => mysql::run_execute(builtin, state).await,
826                    "nop" => nop::run_nop(),
827                    "postgres-connect" => postgres::run_connect(builtin, state).await,
828                    "postgres-execute" => postgres::run_execute(builtin, state).await,
829                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
830                    "protobuf-compile-descriptors" => {
831                        protobuf::run_compile_descriptors(builtin, state).await
832                    }
833                    "psql-execute" => psql::run_execute(builtin, state).await,
834                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
835                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
836                    "s3-file-upload" => s3::run_upload(builtin, state).await,
837                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
838                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
839                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
840                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
841                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
842                    "skip-end" => skip_end::run_skip_end(),
843                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
844                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
845                    "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
846                    "persist-force-compaction" => {
847                        persist::run_force_compaction(builtin, state).await
848                    }
849                    "random-sleep" => sleep::run_random_sleep(builtin),
850                    "set-regex" => set::run_regex_set(builtin, state),
851                    "unset-regex" => set::run_regex_unset(builtin, state),
852                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
853                    "set-max-tries" => set::run_max_tries(builtin, state),
854                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
855                        sleep::run_sleep(builtin)
856                    }
857                    "set" => set::set_vars(builtin, state),
858                    "set-arg-default" => set::run_set_arg_default(builtin, state),
859                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
860                    "set-from-file" => set::run_set_from_file(builtin, state).await,
861                    "webhook-append" => webhook::run_append(builtin, state).await,
862                    _ => {
863                        return Err(PosError::new(
864                            anyhow!("unknown built-in command {}", builtin.name),
865                            self.pos,
866                        ));
867                    }
868                }
869            }
870            Command::Sql(mut sql, version_constraint) => {
871                handle_version!(version_constraint);
872                sql.query = subst(&sql.query, &state.cmd_vars)?;
873                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
874                    for row in expected_rows {
875                        for col in row {
876                            *col = subst(col, &state.cmd_vars)?;
877                        }
878                    }
879                }
880                sql::run_sql(sql, state).await
881            }
882            Command::FailSql(mut sql, version_constraint) => {
883                handle_version!(version_constraint);
884                sql.query = subst(&sql.query, &state.cmd_vars)?;
885                sql.expected_error = match &sql.expected_error {
886                    SqlExpectedError::Contains(s) => {
887                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
888                    }
889                    SqlExpectedError::Exact(s) => {
890                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
891                    }
892                    SqlExpectedError::Regex(s) => {
893                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
894                    }
895                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
896                };
897                sql::run_fail_sql(sql, state).await
898            }
899        };
900
901        r.map_err(wrap_err)
902    }
903}
904
905/// Substituted `${}`-delimited variables from `vars` into `msg`
906fn substitute_vars(
907    msg: &str,
908    vars: &BTreeMap<String, String>,
909    ignore_prefix: &Option<String>,
910    regex_escape: bool,
911) -> Result<String, anyhow::Error> {
912    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
913    let mut err = None;
914    let out = RE.replace_all(msg, |caps: &Captures| {
915        let name = &caps[1];
916        if let Some(ignore_prefix) = &ignore_prefix {
917            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
918                // Do not substitute, leave original variable name in place
919                return caps.get(0).unwrap().as_str().to_string();
920            }
921        }
922
923        if let Some(val) = vars.get(name) {
924            if regex_escape {
925                regex::escape(val)
926            } else {
927                val.to_string()
928            }
929        } else {
930            err = Some(anyhow!("unknown variable: {}", name));
931            "#VAR-MISSING#".to_string()
932        }
933    });
934    match err {
935        Some(err) => Err(err),
936        None => Ok(out.into_owned()),
937    }
938}
939
940/// Initializes a [`State`] object by connecting to the various external
941/// services specified in `config`.
942///
943/// Returns the initialized `State` and a cleanup future. The cleanup future
944/// should be `await`ed only *after* dropping the `State` to check whether any
945/// errors occured while dropping the `State`. This awkward API is a workaround
946/// for the lack of `AsyncDrop` support in Rust.
947pub async fn create_state(
948    config: &Config,
949) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
950    let seed = config.seed.unwrap_or_else(rand::random);
951
952    let (_tempfile, temp_path) = match &config.temp_dir {
953        Some(temp_dir) => {
954            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
955            (None, PathBuf::from(&temp_dir))
956        }
957        _ => {
958            // Stash the tempfile object so that it does not go out of scope and delete
959            // the tempdir prematurely
960            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
961            let temp_path = tempfile_handle.path().to_path_buf();
962            (Some(tempfile_handle), temp_path)
963        }
964    };
965
966    let materialize_catalog_config = config.materialize_catalog_config.clone();
967
968    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
969    info!("Connecting to {}", materialize_url.as_str());
970    let (pgclient, pgconn) = Retry::default()
971        .max_duration(config.default_timeout)
972        .retry_async_canceling(|_| async move {
973            let mut pgconfig = config.materialize_pgconfig.clone();
974            pgconfig.connect_timeout(config.default_timeout);
975            let tls = make_tls(&pgconfig)?;
976            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
977        })
978        .await?;
979
980    let pgconn_task =
981        task::spawn(|| "pgconn_task", pgconn).map(|join| join.context("running SQL connection"));
982
983    let materialize_state =
984        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
985
986    let schema_registry_url = config.schema_registry_url.to_owned();
987
988    let ccsr_client = {
989        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
990
991        if let Some(cert_path) = &config.cert_path {
992            let cert = fs::read(cert_path).context("reading cert")?;
993            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
994            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
995                .context("reading keystore file as pkcs12")?;
996            ccsr_config = ccsr_config.identity(ident);
997        }
998
999        if let Some(ccsr_username) = &config.ccsr_username {
1000            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
1001        }
1002
1003        ccsr_config.build().context("Creating CCSR client")?
1004    };
1005
1006    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
1007        use rdkafka::admin::{AdminClient, AdminOptions};
1008        use rdkafka::producer::FutureProducer;
1009
1010        let mut kafka_config = create_new_client_config_simple();
1011        kafka_config.set("bootstrap.servers", &config.kafka_addr);
1012        kafka_config.set("group.id", "materialize-testdrive");
1013        kafka_config.set("auto.offset.reset", "earliest");
1014        kafka_config.set("isolation.level", "read_committed");
1015        if let Some(cert_path) = &config.cert_path {
1016            kafka_config.set("security.protocol", "ssl");
1017            kafka_config.set("ssl.keystore.location", cert_path);
1018            if let Some(cert_password) = &config.cert_password {
1019                kafka_config.set("ssl.keystore.password", cert_password);
1020            }
1021        }
1022        kafka_config.set("message.max.bytes", "15728640");
1023
1024        for (key, value) in &config.kafka_opts {
1025            kafka_config.set(key, value);
1026        }
1027
1028        let admin: AdminClient<_> = kafka_config
1029            .create_with_context(MzClientContext::default())
1030            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1031
1032        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1033
1034        let producer: FutureProducer<_> = kafka_config
1035            .create_with_context(MzClientContext::default())
1036            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1037
1038        let topics = BTreeMap::new();
1039
1040        (
1041            config.kafka_addr.to_owned(),
1042            admin,
1043            admin_opts,
1044            producer,
1045            topics,
1046            kafka_config,
1047        )
1048    };
1049
1050    let mut state = State {
1051        config: config.clone(),
1052
1053        // === Testdrive state. ===
1054        arg_vars: config.arg_vars.clone(),
1055        cmd_vars: BTreeMap::new(),
1056        seed,
1057        temp_path,
1058        _tempfile,
1059        default_timeout: config.default_timeout,
1060        timeout: config.default_timeout,
1061        max_tries: config.default_max_tries,
1062        initial_backoff: config.initial_backoff,
1063        backoff_factor: config.backoff_factor,
1064        consistency_checks: config.consistency_checks,
1065        check_statement_logging: config.check_statement_logging,
1066        consistency_checks_adhoc_skip: false,
1067        regex: None,
1068        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1069        rewrite_results: config.rewrite_results,
1070        error_line_count: 0,
1071        error_string: "".to_string(),
1072
1073        // === Materialize state. ===
1074        materialize: materialize_state,
1075
1076        // === Persist state. ===
1077        persist_consensus_url: config.persist_consensus_url.clone(),
1078        persist_blob_url: config.persist_blob_url.clone(),
1079        build_info: config.build_info,
1080        persist_clients: PersistClientCache::new(
1081            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1082            &MetricsRegistry::new(),
1083            |_, _| PubSubClientConnection::noop(),
1084        ),
1085
1086        // === Confluent state. ===
1087        schema_registry_url,
1088        ccsr_client,
1089        kafka_addr,
1090        kafka_admin,
1091        kafka_admin_opts,
1092        kafka_config,
1093        kafka_default_partitions: config.kafka_default_partitions,
1094        kafka_producer,
1095        kafka_topics,
1096
1097        // === AWS state. ===
1098        aws_account: config.aws_account.clone(),
1099        aws_config: config.aws_config.clone(),
1100
1101        // === Database driver state. ===
1102        duckdb_clients: BTreeMap::new(),
1103        mysql_clients: BTreeMap::new(),
1104        postgres_clients: BTreeMap::new(),
1105        sql_server_clients: BTreeMap::new(),
1106
1107        // === Fivetran state. ===
1108        fivetran_destination_url: config.fivetran_destination_url.clone(),
1109        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1110
1111        rewrites: Vec::new(),
1112        rewrite_pos_start: 0,
1113        rewrite_pos_end: 0,
1114    };
1115    state.initialize_cmd_vars().await?;
1116    Ok((state, pgconn_task))
1117}
1118
1119async fn create_materialize_state(
1120    config: &&Config,
1121    materialize_catalog_config: Option<CatalogConfig>,
1122    pgclient: tokio_postgres::Client,
1123) -> Result<MaterializeState, anyhow::Error> {
1124    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1125    let materialize_internal_url =
1126        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1127
1128    for (key, value) in &config.materialize_params {
1129        pgclient
1130            .batch_execute(&format!("SET {key} = {value}"))
1131            .await
1132            .context("setting session parameter")?;
1133    }
1134
1135    let materialize_user = config
1136        .materialize_pgconfig
1137        .get_user()
1138        .expect("testdrive URL must contain user")
1139        .to_string();
1140
1141    let materialize_sql_addr = format!(
1142        "{}:{}",
1143        materialize_url.host_str().unwrap(),
1144        materialize_url.port().unwrap()
1145    );
1146    let materialize_http_addr = format!(
1147        "{}:{}",
1148        materialize_url.host_str().unwrap(),
1149        config.materialize_http_port
1150    );
1151    let materialize_internal_sql_addr = format!(
1152        "{}:{}",
1153        materialize_internal_url.host_str().unwrap(),
1154        materialize_internal_url.port().unwrap()
1155    );
1156    let materialize_password_sql_addr = format!(
1157        "{}:{}",
1158        materialize_url.host_str().unwrap(),
1159        config.materialize_password_sql_port
1160    );
1161    let materialize_sasl_sql_addr = format!(
1162        "{}:{}",
1163        materialize_url.host_str().unwrap(),
1164        config.materialize_sasl_sql_port
1165    );
1166    let materialize_internal_http_addr = format!(
1167        "{}:{}",
1168        materialize_internal_url.host_str().unwrap(),
1169        config.materialize_internal_http_port
1170    );
1171    let environment_id = pgclient
1172        .query_one("SELECT mz_environment_id()", &[])
1173        .await?
1174        .get::<_, String>(0)
1175        .parse()
1176        .context("parsing environment ID")?;
1177
1178    let bootstrap_args = BootstrapArgs {
1179        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1180        default_cluster_replica_size: "ABC".to_string(),
1181        default_cluster_replication_factor: 1,
1182        bootstrap_role: None,
1183    };
1184
1185    let materialize_state = MaterializeState {
1186        catalog_config: materialize_catalog_config,
1187        sql_addr: materialize_sql_addr,
1188        use_https: config.materialize_use_https,
1189        http_addr: materialize_http_addr,
1190        internal_sql_addr: materialize_internal_sql_addr,
1191        internal_http_addr: materialize_internal_http_addr,
1192        password_sql_addr: materialize_password_sql_addr,
1193        sasl_sql_addr: materialize_sasl_sql_addr,
1194        user: materialize_user,
1195        pgclient,
1196        environment_id,
1197        bootstrap_args,
1198    };
1199
1200    Ok(materialize_state)
1201}