mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rand::Rng;
43use rdkafka::ClientConfig;
44use rdkafka::producer::Producer;
45use regex::{Captures, Regex};
46use semver::Version;
47use tokio_postgres::error::{DbError, SqlState};
48use tracing::info;
49use url::Url;
50
51use crate::error::PosError;
52use crate::parser::{
53    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
54};
55use crate::util;
56use crate::util::postgres::postgres_client;
57
58pub mod consistency;
59
60mod file;
61mod fivetran;
62mod http;
63mod kafka;
64mod mysql;
65mod nop;
66mod persist;
67mod postgres;
68mod protobuf;
69mod psql;
70mod s3;
71mod schema_registry;
72mod set;
73mod skip_end;
74mod skip_if;
75mod sleep;
76mod sql;
77mod sql_server;
78mod version_check;
79mod webhook;
80
81/// User-settable configuration parameters.
82#[derive(Debug)]
83pub struct Config {
84    // === Testdrive options. ===
85    /// Variables to make available to the testdrive script.
86    ///
87    /// The value of each entry will be made available to the script in a
88    /// variable named `arg.KEY`.
89    pub arg_vars: BTreeMap<String, String>,
90    /// A random number to distinguish each run of a testdrive script.
91    pub seed: Option<u32>,
92    /// Whether to reset Materialize state before executing each script and
93    /// to clean up AWS state after each script.
94    pub reset: bool,
95    /// Force the use of the specified temporary directory to use.
96    ///
97    /// If unspecified, testdrive creates a temporary directory with a random
98    /// name.
99    pub temp_dir: Option<String>,
100    /// Source string to print out on errors.
101    pub source: Option<String>,
102    /// The default timeout for cancellable operations.
103    pub default_timeout: Duration,
104    /// The default number of tries for retriable operations.
105    pub default_max_tries: usize,
106    /// The initial backoff interval for retry operations.
107    ///
108    /// Set to 0 to retry immediately on failure.
109    pub initial_backoff: Duration,
110    /// Backoff factor to use for retry operations.
111    ///
112    /// Set to 1 to retry at a steady pace.
113    pub backoff_factor: f64,
114    /// Should we skip coordinator and catalog consistency checks.
115    pub consistency_checks: consistency::Level,
116    /// Whether to automatically rewrite wrong results instead of failing.
117    pub rewrite_results: bool,
118
119    // === Materialize options. ===
120    /// The pgwire connection parameters for the Materialize instance that
121    /// testdrive will connect to.
122    pub materialize_pgconfig: tokio_postgres::Config,
123    /// The internal pgwire connection parameters for the Materialize instance that
124    /// testdrive will connect to.
125    pub materialize_internal_pgconfig: tokio_postgres::Config,
126    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
127    pub materialize_use_https: bool,
128    /// The port for the public endpoints of the materialize instance that
129    /// testdrive will connect to via HTTP.
130    pub materialize_http_port: u16,
131    /// The port for the internal endpoints of the materialize instance that
132    /// testdrive will connect to via HTTP.
133    pub materialize_internal_http_port: u16,
134    /// The port for the password endpoints of the materialize instance that
135    /// testdrive will connect to via HTTP.
136    pub materialize_password_sql_port: u16,
137    /// Session parameters to set after connecting to materialize.
138    pub materialize_params: Vec<(String, String)>,
139    /// An optional catalog configuration.
140    pub materialize_catalog_config: Option<CatalogConfig>,
141    /// Build information
142    pub build_info: &'static BuildInfo,
143    /// Configured cluster replica sizes
144    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
145
146    // === Persist options. ===
147    /// Handle to the persist consensus system.
148    pub persist_consensus_url: Option<SensitiveUrl>,
149    /// Handle to the persist blob storage.
150    pub persist_blob_url: Option<SensitiveUrl>,
151
152    // === Confluent options. ===
153    /// The address of the Kafka broker that testdrive will interact with.
154    pub kafka_addr: String,
155    /// Default number of partitions to use for topics
156    pub kafka_default_partitions: usize,
157    /// Arbitrary rdkafka options for testdrive to use when connecting to the
158    /// Kafka broker.
159    pub kafka_opts: Vec<(String, String)>,
160    /// The URL of the schema registry that testdrive will connect to.
161    pub schema_registry_url: Url,
162    /// An optional path to a TLS certificate that testdrive will present when
163    /// performing client authentication.
164    ///
165    /// The keystore must be in the PKCS#12 format.
166    pub cert_path: Option<String>,
167    /// An optional password for the TLS certificate.
168    pub cert_password: Option<String>,
169    /// An optional username for basic authentication with the Confluent Schema
170    /// Registry.
171    pub ccsr_username: Option<String>,
172    /// An optional password for basic authentication with the Confluent Schema
173    /// Registry.
174    pub ccsr_password: Option<String>,
175
176    // === AWS options. ===
177    /// The configuration to use when connecting to AWS.
178    pub aws_config: SdkConfig,
179    /// The ID of the AWS account that `aws_config` configures.
180    pub aws_account: String,
181
182    // === Fivetran options. ===
183    /// Address of the Fivetran Destination that is currently running.
184    pub fivetran_destination_url: String,
185    /// Directory that is accessible to the Fivetran Destination.
186    pub fivetran_destination_files_path: String,
187}
188
189pub struct MaterializeState {
190    catalog_config: Option<CatalogConfig>,
191
192    sql_addr: String,
193    use_https: bool,
194    http_addr: String,
195    internal_sql_addr: String,
196    internal_http_addr: String,
197    password_sql_addr: String,
198    user: String,
199    pgclient: tokio_postgres::Client,
200    environment_id: EnvironmentId,
201    bootstrap_args: BootstrapArgs,
202}
203
204pub struct State {
205    // === Testdrive state. ===
206    arg_vars: BTreeMap<String, String>,
207    cmd_vars: BTreeMap<String, String>,
208    seed: u32,
209    temp_path: PathBuf,
210    _tempfile: Option<tempfile::TempDir>,
211    default_timeout: Duration,
212    timeout: Duration,
213    max_tries: usize,
214    initial_backoff: Duration,
215    backoff_factor: f64,
216    consistency_checks: consistency::Level,
217    consistency_checks_adhoc_skip: bool,
218    regex: Option<Regex>,
219    regex_replacement: String,
220    error_line_count: usize,
221    error_string: String,
222
223    // === Materialize state. ===
224    materialize: MaterializeState,
225
226    // === Persist state. ===
227    persist_consensus_url: Option<SensitiveUrl>,
228    persist_blob_url: Option<SensitiveUrl>,
229    build_info: &'static BuildInfo,
230    persist_clients: PersistClientCache,
231
232    // === Confluent state. ===
233    schema_registry_url: Url,
234    ccsr_client: mz_ccsr::Client,
235    kafka_addr: String,
236    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
237    kafka_admin_opts: rdkafka::admin::AdminOptions,
238    kafka_config: ClientConfig,
239    kafka_default_partitions: usize,
240    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
241    kafka_topics: BTreeMap<String, usize>,
242
243    // === AWS state. ===
244    aws_account: String,
245    aws_config: SdkConfig,
246
247    // === Database driver state. ===
248    mysql_clients: BTreeMap<String, mysql_async::Conn>,
249    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
250    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
251
252    // === Fivetran state. ===
253    fivetran_destination_url: String,
254    fivetran_destination_files_path: String,
255
256    // === Rewrite state. ===
257    rewrite_results: bool,
258    /// Current file, results are replaced inline
259    pub rewrites: Vec<Rewrite>,
260    /// Start position of currently expected result
261    pub rewrite_pos_start: usize,
262    /// End position of currently expected result
263    pub rewrite_pos_end: usize,
264}
265
266pub struct Rewrite {
267    pub content: String,
268    pub start: usize,
269    pub end: usize,
270}
271
272impl State {
273    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
274        self.cmd_vars
275            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
276        self.cmd_vars.insert(
277            "testdrive.kafka-addr-resolved".into(),
278            self.kafka_addr
279                .to_socket_addrs()
280                .ok()
281                .and_then(|mut addrs| addrs.next())
282                .map(|addr| addr.to_string())
283                .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
284        );
285        self.cmd_vars.insert(
286            "testdrive.schema-registry-url".into(),
287            self.schema_registry_url.to_string(),
288        );
289        self.cmd_vars
290            .insert("testdrive.seed".into(), self.seed.to_string());
291        self.cmd_vars.insert(
292            "testdrive.temp-dir".into(),
293            self.temp_path.display().to_string(),
294        );
295        self.cmd_vars
296            .insert("testdrive.aws-region".into(), self.aws_region().into());
297        self.cmd_vars
298            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
299        self.cmd_vars
300            .insert("testdrive.aws-account".into(), self.aws_account.clone());
301        {
302            let aws_credentials = self
303                .aws_config
304                .credentials_provider()
305                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
306                .provide_credentials()
307                .await
308                .context("fetching AWS credentials")?;
309            self.cmd_vars.insert(
310                "testdrive.aws-access-key-id".into(),
311                aws_credentials.access_key_id().to_owned(),
312            );
313            self.cmd_vars.insert(
314                "testdrive.aws-secret-access-key".into(),
315                aws_credentials.secret_access_key().to_owned(),
316            );
317            self.cmd_vars.insert(
318                "testdrive.aws-token".into(),
319                aws_credentials
320                    .session_token()
321                    .map(|token| token.to_owned())
322                    .unwrap_or_else(String::new),
323            );
324        }
325        self.cmd_vars.insert(
326            "testdrive.materialize-environment-id".into(),
327            self.materialize.environment_id.to_string(),
328        );
329        self.cmd_vars.insert(
330            "testdrive.materialize-sql-addr".into(),
331            self.materialize.sql_addr.clone(),
332        );
333        self.cmd_vars.insert(
334            "testdrive.materialize-internal-sql-addr".into(),
335            self.materialize.internal_sql_addr.clone(),
336        );
337        self.cmd_vars.insert(
338            "testdrive.materialize-password-sql-addr".into(),
339            self.materialize.password_sql_addr.clone(),
340        );
341        self.cmd_vars.insert(
342            "testdrive.materialize-user".into(),
343            self.materialize.user.clone(),
344        );
345        self.cmd_vars.insert(
346            "testdrive.fivetran-destination-url".into(),
347            self.fivetran_destination_url.clone(),
348        );
349        self.cmd_vars.insert(
350            "testdrive.fivetran-destination-files-path".into(),
351            self.fivetran_destination_files_path.clone(),
352        );
353
354        for (key, value) in env::vars() {
355            self.cmd_vars.insert(format!("env.{}", key), value);
356        }
357
358        for (key, value) in &self.arg_vars {
359            validate_ident(key)?;
360            self.cmd_vars
361                .insert(format!("arg.{}", key), value.to_string());
362        }
363
364        Ok(())
365    }
366    /// Makes of copy of the durable catalog and runs a function on its
367    /// state. Returns `None` if there's no catalog information in the State.
368    pub async fn with_catalog_copy<F, T>(
369        &self,
370        system_parameter_defaults: BTreeMap<String, String>,
371        build_info: &'static BuildInfo,
372        bootstrap_args: &BootstrapArgs,
373        enable_expression_cache_override: Option<bool>,
374        f: F,
375    ) -> Result<Option<T>, anyhow::Error>
376    where
377        F: FnOnce(ConnCatalog) -> T,
378    {
379        async fn persist_client(
380            persist_consensus_url: SensitiveUrl,
381            persist_blob_url: SensitiveUrl,
382            persist_clients: &PersistClientCache,
383        ) -> Result<PersistClient, anyhow::Error> {
384            let persist_location = PersistLocation {
385                blob_uri: persist_blob_url,
386                consensus_uri: persist_consensus_url,
387            };
388            Ok(persist_clients.open(persist_location).await?)
389        }
390
391        if let Some(CatalogConfig {
392            persist_consensus_url,
393            persist_blob_url,
394        }) = &self.materialize.catalog_config
395        {
396            let persist_client = persist_client(
397                persist_consensus_url.clone(),
398                persist_blob_url.clone(),
399                &self.persist_clients,
400            )
401            .await?;
402            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
403                persist_client,
404                SYSTEM_TIME.clone(),
405                self.materialize.environment_id.clone(),
406                system_parameter_defaults,
407                build_info,
408                bootstrap_args,
409                enable_expression_cache_override,
410            )
411            .await?;
412            let res = f(catalog.for_session(&Session::dummy()));
413            catalog.expire().await;
414            Ok(Some(res))
415        } else {
416            Ok(None)
417        }
418    }
419
420    pub fn aws_endpoint(&self) -> &str {
421        self.aws_config.endpoint_url().unwrap_or("")
422    }
423
424    pub fn aws_region(&self) -> &str {
425        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
426    }
427
428    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
429    /// the consistency checks should be skipped for this current run.
430    pub fn clear_skip_consistency_checks(&mut self) -> bool {
431        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
432    }
433
434    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
435        let (inner_client, _) = postgres_client(
436            &format!(
437                "postgres://mz_system:materialize@{}",
438                self.materialize.internal_sql_addr
439            ),
440            self.default_timeout,
441        )
442        .await?;
443
444        let version = inner_client
445            .query_one("SELECT mz_version_num()", &[])
446            .await
447            .context("getting version of materialize")
448            .map(|row| row.get::<_, i32>(0))?;
449
450        let semver = inner_client
451            .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
452            .await
453            .context("getting semver of materialize")
454            .map(|row| row.get::<_, String>(0))?
455            .parse::<semver::Version>()
456            .context("parsing semver of materialize")?;
457
458        inner_client
459            .batch_execute("ALTER SYSTEM RESET ALL")
460            .await
461            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
462
463        // Dangerous functions are useful for tests so we enable it for all tests.
464        {
465            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
466            let enable_unsafe_functions = if semver >= rename_version {
467                "unsafe_enable_unsafe_functions"
468            } else {
469                "enable_unsafe_functions"
470            };
471            let res = inner_client
472                .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
473                .await
474                .context("enabling dangerous functions");
475            if let Err(e) = res {
476                match e.root_cause().downcast_ref::<DbError>() {
477                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
478                        info!(
479                            "can't enable unsafe functions because the server is safe mode; \
480                             testdrive scripts will fail if they use unsafe functions",
481                        );
482                    }
483                    _ => return Err(e),
484                }
485            }
486        }
487
488        for row in inner_client
489            .query("SHOW DATABASES", &[])
490            .await
491            .context("resetting materialize state: SHOW DATABASES")?
492        {
493            let db_name: String = row.get(0);
494            if db_name.starts_with("testdrive_no_reset_") {
495                continue;
496            }
497            let query = format!(
498                "DROP DATABASE {}",
499                postgres_protocol::escape::escape_identifier(&db_name)
500            );
501            sql::print_query(&query, None);
502            inner_client.batch_execute(&query).await.context(format!(
503                "resetting materialize state: DROP DATABASE {}",
504                db_name,
505            ))?;
506        }
507
508        // Get all user clusters not running any objects owned by users
509        let inactive_user_clusters = "
510        WITH
511            active_user_clusters AS
512            (
513                SELECT DISTINCT cluster_id, object_id
514                FROM
515                    (
516                        SELECT cluster_id, id FROM mz_catalog.mz_sources
517                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
518                        UNION ALL
519                            SELECT cluster_id, id
520                            FROM mz_catalog.mz_materialized_views
521                        UNION ALL
522                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
523                        UNION ALL
524                            SELECT cluster_id, id
525                            FROM mz_internal.mz_subscriptions
526                    )
527                    AS t (cluster_id, object_id)
528                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
529            )
530        SELECT name
531        FROM mz_catalog.mz_clusters
532        WHERE
533            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
534                AND
535            owner_id LIKE 'u%';";
536
537        let inactive_clusters = inner_client
538            .query(inactive_user_clusters, &[])
539            .await
540            .context("resetting materialize state: inactive_user_clusters")?;
541
542        if !inactive_clusters.is_empty() {
543            println!("cleaning up user clusters from previous tests...")
544        }
545
546        for cluster_name in inactive_clusters {
547            let cluster_name: String = cluster_name.get(0);
548            if cluster_name.starts_with("testdrive_no_reset_") {
549                continue;
550            }
551            let query = format!(
552                "DROP CLUSTER {}",
553                postgres_protocol::escape::escape_identifier(&cluster_name)
554            );
555            sql::print_query(&query, None);
556            inner_client.batch_execute(&query).await.context(format!(
557                "resetting materialize state: DROP CLUSTER {}",
558                cluster_name,
559            ))?;
560        }
561
562        inner_client
563            .batch_execute("CREATE DATABASE materialize")
564            .await
565            .context("resetting materialize state: CREATE DATABASE materialize")?;
566
567        // Attempt to remove all users but the current user. Old versions of
568        // Materialize did not support roles, so this degrades gracefully if
569        // mz_roles does not exist.
570        if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
571            for row in rows {
572                let role_name: String = row.get(0);
573                if role_name == self.materialize.user || role_name.starts_with("mz_") {
574                    continue;
575                }
576                let query = format!(
577                    "DROP ROLE {}",
578                    postgres_protocol::escape::escape_identifier(&role_name)
579                );
580                sql::print_query(&query, None);
581                inner_client.batch_execute(&query).await.context(format!(
582                    "resetting materialize state: DROP ROLE {}",
583                    role_name,
584                ))?;
585            }
586        }
587
588        // Alter materialize user with all system privileges.
589        inner_client
590            .batch_execute(&format!(
591                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
592                self.materialize.user
593            ))
594            .await?;
595
596        // Grant initial privileges.
597        inner_client
598            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
599            .await?;
600        inner_client
601            .batch_execute(&format!(
602                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
603                self.materialize.user
604            ))
605            .await?;
606        inner_client
607            .batch_execute(&format!(
608                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
609                self.materialize.user
610            ))
611            .await?;
612
613        let cluster = match version {
614            ..=8199 => "default",
615            8200.. => "quickstart",
616        };
617        inner_client
618            .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
619            .await?;
620        inner_client
621            .batch_execute(&format!(
622                "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
623                self.materialize.user
624            ))
625            .await?;
626
627        Ok(())
628    }
629
630    /// Delete Kafka topics + CCSR subjects that were created in this run
631    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
632        let mut errors: Vec<anyhow::Error> = Vec::new();
633
634        let metadata = self.kafka_producer.client().fetch_metadata(
635            None,
636            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
637        )?;
638
639        let testdrive_topics: Vec<_> = metadata
640            .topics()
641            .iter()
642            .filter_map(|t| {
643                if t.name().starts_with("testdrive-") {
644                    Some(t.name())
645                } else {
646                    None
647                }
648            })
649            .collect();
650
651        if !testdrive_topics.is_empty() {
652            match self
653                .kafka_admin
654                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
655                .await
656            {
657                Ok(res) => {
658                    if res.len() != testdrive_topics.len() {
659                        errors.push(anyhow!(
660                            "kafka topic deletion returned {} results, but exactly {} expected",
661                            res.len(),
662                            testdrive_topics.len()
663                        ));
664                    }
665                    for (res, topic) in res.iter().zip_eq(testdrive_topics.iter()) {
666                        match res {
667                            Ok(_)
668                            | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
669                                ()
670                            }
671                            Err((_, err)) => {
672                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
673                            }
674                        }
675                    }
676                }
677                Err(e) => {
678                    errors.push(e.into());
679                }
680            };
681        }
682
683        match self
684            .ccsr_client
685            .list_subjects()
686            .await
687            .context("listing schema registry subjects")
688        {
689            Ok(subjects) => {
690                let testdrive_subjects: Vec<_> = subjects
691                    .iter()
692                    .filter(|s| s.starts_with("testdrive-"))
693                    .collect();
694
695                for subject in testdrive_subjects {
696                    match self.ccsr_client.delete_subject(subject).await {
697                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
698                        Err(e) => errors.push(e.into()),
699                    }
700                }
701            }
702            Err(e) => {
703                errors.push(e);
704            }
705        }
706
707        if errors.is_empty() {
708            Ok(())
709        } else {
710            bail!(
711                "deleting Kafka topics: {} errors: {}",
712                errors.len(),
713                errors
714                    .into_iter()
715                    .map(|e| e.to_string_with_causes())
716                    .join("\n")
717            );
718        }
719    }
720}
721
722/// Configuration for the Catalog.
723#[derive(Debug, Clone)]
724pub struct CatalogConfig {
725    /// Handle to the persist consensus system.
726    pub persist_consensus_url: SensitiveUrl,
727    /// Handle to the persist blob storage.
728    pub persist_blob_url: SensitiveUrl,
729}
730
731pub enum ControlFlow {
732    Continue,
733    SkipBegin,
734    SkipEnd,
735}
736
737#[async_trait]
738pub(crate) trait Run {
739    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
740}
741
742#[async_trait]
743impl Run for PosCommand {
744    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
745        macro_rules! handle_version {
746            ($version_constraint:expr) => {
747                match $version_constraint {
748                    Some(VersionConstraint { min, max }) => {
749                        match version_check::run_version_check(min, max, state).await {
750                            Ok(true) => return Ok(ControlFlow::Continue),
751                            Ok(false) => {}
752                            Err(err) => return Err(PosError::new(err, self.pos)),
753                        }
754                    }
755                    None => {}
756                }
757            };
758        }
759
760        let wrap_err = |e| PosError::new(e, self.pos);
761        // Substitute variables at startup except for the command-specific ones
762        // Those will be substituted at runtime
763        let ignore_prefix = match &self.command {
764            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
765            _ => None,
766        };
767        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
768            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
769        };
770        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
771            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
772        };
773
774        let r = match self.command {
775            Command::Builtin(mut builtin, version_constraint) => {
776                handle_version!(version_constraint);
777                for val in builtin.args.values_mut() {
778                    *val = subst(val, &state.cmd_vars)?;
779                }
780                for line in &mut builtin.input {
781                    *line = subst(line, &state.cmd_vars)?;
782                }
783                match builtin.name.as_ref() {
784                    "check-consistency" => consistency::run_consistency_checks(state).await,
785                    "skip-consistency-checks" => {
786                        consistency::skip_consistency_checks(builtin, state)
787                    }
788                    "check-shard-tombstone" => {
789                        consistency::run_check_shard_tombstone(builtin, state).await
790                    }
791                    "fivetran-destination" => {
792                        fivetran::run_destination_command(builtin, state).await
793                    }
794                    "file-append" => file::run_append(builtin, state).await,
795                    "file-delete" => file::run_delete(builtin, state).await,
796                    "http-request" => http::run_request(builtin, state).await,
797                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
798                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
799                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
800                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
801                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
802                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
803                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
804                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
805                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
806                    "mysql-connect" => mysql::run_connect(builtin, state).await,
807                    "mysql-execute" => mysql::run_execute(builtin, state).await,
808                    "nop" => nop::run_nop(),
809                    "postgres-connect" => postgres::run_connect(builtin, state).await,
810                    "postgres-execute" => postgres::run_execute(builtin, state).await,
811                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
812                    "protobuf-compile-descriptors" => {
813                        protobuf::run_compile_descriptors(builtin, state).await
814                    }
815                    "psql-execute" => psql::run_execute(builtin, state).await,
816                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
817                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
818                    "s3-file-upload" => s3::run_upload(builtin, state).await,
819                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
820                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
821                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
822                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
823                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
824                    "skip-end" => skip_end::run_skip_end(),
825                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
826                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
827                    "sql-server-set-from-sql" => sql_server::run_set_from_sql(builtin, state).await,
828                    "persist-force-compaction" => {
829                        persist::run_force_compaction(builtin, state).await
830                    }
831                    "random-sleep" => sleep::run_random_sleep(builtin),
832                    "set-regex" => set::run_regex_set(builtin, state),
833                    "unset-regex" => set::run_regex_unset(builtin, state),
834                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
835                    "set-max-tries" => set::run_max_tries(builtin, state),
836                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
837                        sleep::run_sleep(builtin)
838                    }
839                    "set" => set::set_vars(builtin, state),
840                    "set-arg-default" => set::run_set_arg_default(builtin, state),
841                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
842                    "set-from-file" => set::run_set_from_file(builtin, state).await,
843                    "webhook-append" => webhook::run_append(builtin, state).await,
844                    _ => {
845                        return Err(PosError::new(
846                            anyhow!("unknown built-in command {}", builtin.name),
847                            self.pos,
848                        ));
849                    }
850                }
851            }
852            Command::Sql(mut sql, version_constraint) => {
853                handle_version!(version_constraint);
854                sql.query = subst(&sql.query, &state.cmd_vars)?;
855                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
856                    for row in expected_rows {
857                        for col in row {
858                            *col = subst(col, &state.cmd_vars)?;
859                        }
860                    }
861                }
862                sql::run_sql(sql, state).await
863            }
864            Command::FailSql(mut sql, version_constraint) => {
865                handle_version!(version_constraint);
866                sql.query = subst(&sql.query, &state.cmd_vars)?;
867                sql.expected_error = match &sql.expected_error {
868                    SqlExpectedError::Contains(s) => {
869                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
870                    }
871                    SqlExpectedError::Exact(s) => {
872                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
873                    }
874                    SqlExpectedError::Regex(s) => {
875                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
876                    }
877                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
878                };
879                sql::run_fail_sql(sql, state).await
880            }
881        };
882
883        r.map_err(wrap_err)
884    }
885}
886
887/// Substituted `${}`-delimited variables from `vars` into `msg`
888fn substitute_vars(
889    msg: &str,
890    vars: &BTreeMap<String, String>,
891    ignore_prefix: &Option<String>,
892    regex_escape: bool,
893) -> Result<String, anyhow::Error> {
894    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
895    let mut err = None;
896    let out = RE.replace_all(msg, |caps: &Captures| {
897        let name = &caps[1];
898        if let Some(ignore_prefix) = &ignore_prefix {
899            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
900                // Do not substitute, leave original variable name in place
901                return caps.get(0).unwrap().as_str().to_string();
902            }
903        }
904
905        if let Some(val) = vars.get(name) {
906            if regex_escape {
907                regex::escape(val)
908            } else {
909                val.to_string()
910            }
911        } else {
912            err = Some(anyhow!("unknown variable: {}", name));
913            "#VAR-MISSING#".to_string()
914        }
915    });
916    match err {
917        Some(err) => Err(err),
918        None => Ok(out.into_owned()),
919    }
920}
921
922/// Initializes a [`State`] object by connecting to the various external
923/// services specified in `config`.
924///
925/// Returns the initialized `State` and a cleanup future. The cleanup future
926/// should be `await`ed only *after* dropping the `State` to check whether any
927/// errors occured while dropping the `State`. This awkward API is a workaround
928/// for the lack of `AsyncDrop` support in Rust.
929pub async fn create_state(
930    config: &Config,
931) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
932    let seed = config.seed.unwrap_or_else(|| rand::thread_rng().r#gen());
933
934    let (_tempfile, temp_path) = match &config.temp_dir {
935        Some(temp_dir) => {
936            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
937            (None, PathBuf::from(&temp_dir))
938        }
939        _ => {
940            // Stash the tempfile object so that it does not go out of scope and delete
941            // the tempdir prematurely
942            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
943            let temp_path = tempfile_handle.path().to_path_buf();
944            (Some(tempfile_handle), temp_path)
945        }
946    };
947
948    let materialize_catalog_config = config.materialize_catalog_config.clone();
949
950    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
951    info!("Connecting to {}", materialize_url.as_str());
952    let (pgclient, pgconn) = Retry::default()
953        .max_duration(config.default_timeout)
954        .retry_async_canceling(|_| async move {
955            let mut pgconfig = config.materialize_pgconfig.clone();
956            pgconfig.connect_timeout(config.default_timeout);
957            let tls = make_tls(&pgconfig)?;
958            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
959        })
960        .await?;
961
962    let pgconn_task = task::spawn(|| "pgconn_task", pgconn).map(|join| {
963        join.expect("pgconn_task unexpectedly canceled")
964            .context("running SQL connection")
965    });
966
967    let materialize_state =
968        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
969
970    let schema_registry_url = config.schema_registry_url.to_owned();
971
972    let ccsr_client = {
973        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
974
975        if let Some(cert_path) = &config.cert_path {
976            let cert = fs::read(cert_path).context("reading cert")?;
977            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
978            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
979                .context("reading keystore file as pkcs12")?;
980            ccsr_config = ccsr_config.identity(ident);
981        }
982
983        if let Some(ccsr_username) = &config.ccsr_username {
984            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
985        }
986
987        ccsr_config.build().context("Creating CCSR client")?
988    };
989
990    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
991        use rdkafka::admin::{AdminClient, AdminOptions};
992        use rdkafka::producer::FutureProducer;
993
994        let mut kafka_config = create_new_client_config_simple();
995        kafka_config.set("bootstrap.servers", &config.kafka_addr);
996        kafka_config.set("group.id", "materialize-testdrive");
997        kafka_config.set("auto.offset.reset", "earliest");
998        kafka_config.set("isolation.level", "read_committed");
999        if let Some(cert_path) = &config.cert_path {
1000            kafka_config.set("security.protocol", "ssl");
1001            kafka_config.set("ssl.keystore.location", cert_path);
1002            if let Some(cert_password) = &config.cert_password {
1003                kafka_config.set("ssl.keystore.password", cert_password);
1004            }
1005        }
1006        kafka_config.set("message.max.bytes", "15728640");
1007
1008        for (key, value) in &config.kafka_opts {
1009            kafka_config.set(key, value);
1010        }
1011
1012        let admin: AdminClient<_> = kafka_config
1013            .create_with_context(MzClientContext::default())
1014            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1015
1016        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1017
1018        let producer: FutureProducer<_> = kafka_config
1019            .create_with_context(MzClientContext::default())
1020            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1021
1022        let topics = BTreeMap::new();
1023
1024        (
1025            config.kafka_addr.to_owned(),
1026            admin,
1027            admin_opts,
1028            producer,
1029            topics,
1030            kafka_config,
1031        )
1032    };
1033
1034    let mut state = State {
1035        // === Testdrive state. ===
1036        arg_vars: config.arg_vars.clone(),
1037        cmd_vars: BTreeMap::new(),
1038        seed,
1039        temp_path,
1040        _tempfile,
1041        default_timeout: config.default_timeout,
1042        timeout: config.default_timeout,
1043        max_tries: config.default_max_tries,
1044        initial_backoff: config.initial_backoff,
1045        backoff_factor: config.backoff_factor,
1046        consistency_checks: config.consistency_checks,
1047        consistency_checks_adhoc_skip: false,
1048        regex: None,
1049        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1050        rewrite_results: config.rewrite_results,
1051        error_line_count: 0,
1052        error_string: "".to_string(),
1053
1054        // === Materialize state. ===
1055        materialize: materialize_state,
1056
1057        // === Persist state. ===
1058        persist_consensus_url: config.persist_consensus_url.clone(),
1059        persist_blob_url: config.persist_blob_url.clone(),
1060        build_info: config.build_info,
1061        persist_clients: PersistClientCache::new(
1062            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1063            &MetricsRegistry::new(),
1064            |_, _| PubSubClientConnection::noop(),
1065        ),
1066
1067        // === Confluent state. ===
1068        schema_registry_url,
1069        ccsr_client,
1070        kafka_addr,
1071        kafka_admin,
1072        kafka_admin_opts,
1073        kafka_config,
1074        kafka_default_partitions: config.kafka_default_partitions,
1075        kafka_producer,
1076        kafka_topics,
1077
1078        // === AWS state. ===
1079        aws_account: config.aws_account.clone(),
1080        aws_config: config.aws_config.clone(),
1081
1082        // === Database driver state. ===
1083        mysql_clients: BTreeMap::new(),
1084        postgres_clients: BTreeMap::new(),
1085        sql_server_clients: BTreeMap::new(),
1086
1087        // === Fivetran state. ===
1088        fivetran_destination_url: config.fivetran_destination_url.clone(),
1089        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1090
1091        rewrites: Vec::new(),
1092        rewrite_pos_start: 0,
1093        rewrite_pos_end: 0,
1094    };
1095    state.initialize_cmd_vars().await?;
1096    Ok((state, pgconn_task))
1097}
1098
1099async fn create_materialize_state(
1100    config: &&Config,
1101    materialize_catalog_config: Option<CatalogConfig>,
1102    pgclient: tokio_postgres::Client,
1103) -> Result<MaterializeState, anyhow::Error> {
1104    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1105    let materialize_internal_url =
1106        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1107
1108    for (key, value) in &config.materialize_params {
1109        pgclient
1110            .batch_execute(&format!("SET {key} = {value}"))
1111            .await
1112            .context("setting session parameter")?;
1113    }
1114
1115    let materialize_user = config
1116        .materialize_pgconfig
1117        .get_user()
1118        .expect("testdrive URL must contain user")
1119        .to_string();
1120
1121    let materialize_sql_addr = format!(
1122        "{}:{}",
1123        materialize_url.host_str().unwrap(),
1124        materialize_url.port().unwrap()
1125    );
1126    let materialize_http_addr = format!(
1127        "{}:{}",
1128        materialize_url.host_str().unwrap(),
1129        config.materialize_http_port
1130    );
1131    let materialize_internal_sql_addr = format!(
1132        "{}:{}",
1133        materialize_internal_url.host_str().unwrap(),
1134        materialize_internal_url.port().unwrap()
1135    );
1136    let materialize_password_sql_addr = format!(
1137        "{}:{}",
1138        materialize_url.host_str().unwrap(),
1139        config.materialize_password_sql_port
1140    );
1141    let materialize_internal_http_addr = format!(
1142        "{}:{}",
1143        materialize_internal_url.host_str().unwrap(),
1144        config.materialize_internal_http_port
1145    );
1146    let environment_id = pgclient
1147        .query_one("SELECT mz_environment_id()", &[])
1148        .await?
1149        .get::<_, String>(0)
1150        .parse()
1151        .context("parsing environment ID")?;
1152
1153    let bootstrap_args = BootstrapArgs {
1154        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1155        default_cluster_replica_size: "ABC".to_string(),
1156        default_cluster_replication_factor: 1,
1157        bootstrap_role: None,
1158    };
1159
1160    let materialize_state = MaterializeState {
1161        catalog_config: materialize_catalog_config,
1162        sql_addr: materialize_sql_addr,
1163        use_https: config.materialize_use_https,
1164        http_addr: materialize_http_addr,
1165        internal_sql_addr: materialize_internal_sql_addr,
1166        internal_http_addr: materialize_internal_http_addr,
1167        password_sql_addr: materialize_password_sql_addr,
1168        user: materialize_user,
1169        pgclient,
1170        environment_id,
1171        bootstrap_args,
1172    };
1173
1174    Ok(materialize_state)
1175}