mz_testdrive/
action.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::net::ToSocketAddrs;
13use std::path::PathBuf;
14use std::sync::LazyLock;
15use std::time::Duration;
16use std::{env, fs};
17
18use anyhow::{Context, anyhow, bail};
19use async_trait::async_trait;
20use aws_credential_types::provider::ProvideCredentials;
21use aws_types::SdkConfig;
22use futures::future::FutureExt;
23use itertools::Itertools;
24use mz_adapter::catalog::{Catalog, ConnCatalog};
25use mz_adapter::session::Session;
26use mz_build_info::BuildInfo;
27use mz_catalog::config::ClusterReplicaSizeMap;
28use mz_catalog::durable::BootstrapArgs;
29use mz_kafka_util::client::{MzClientContext, create_new_client_config_simple};
30use mz_ore::error::ErrorExt;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::now::SYSTEM_TIME;
33use mz_ore::retry::Retry;
34use mz_ore::task;
35use mz_ore::url::SensitiveUrl;
36use mz_persist_client::cache::PersistClientCache;
37use mz_persist_client::cfg::PersistConfig;
38use mz_persist_client::rpc::PubSubClientConnection;
39use mz_persist_client::{PersistClient, PersistLocation};
40use mz_sql::catalog::EnvironmentId;
41use mz_tls_util::make_tls;
42use rand::Rng;
43use rdkafka::ClientConfig;
44use rdkafka::producer::Producer;
45use regex::{Captures, Regex};
46use semver::Version;
47use tokio_postgres::error::{DbError, SqlState};
48use tracing::info;
49use url::Url;
50
51use crate::error::PosError;
52use crate::parser::{
53    Command, PosCommand, SqlExpectedError, SqlOutput, VersionConstraint, validate_ident,
54};
55use crate::util;
56use crate::util::postgres::postgres_client;
57
58pub mod consistency;
59
60mod file;
61mod fivetran;
62mod http;
63mod kafka;
64mod mysql;
65mod nop;
66mod persist;
67mod postgres;
68mod protobuf;
69mod psql;
70mod s3;
71mod schema_registry;
72mod set;
73mod skip_end;
74mod skip_if;
75mod sleep;
76mod sql;
77mod sql_server;
78mod version_check;
79mod webhook;
80
81/// User-settable configuration parameters.
82#[derive(Debug)]
83pub struct Config {
84    // === Testdrive options. ===
85    /// Variables to make available to the testdrive script.
86    ///
87    /// The value of each entry will be made available to the script in a
88    /// variable named `arg.KEY`.
89    pub arg_vars: BTreeMap<String, String>,
90    /// A random number to distinguish each run of a testdrive script.
91    pub seed: Option<u32>,
92    /// Whether to reset Materialize state before executing each script and
93    /// to clean up AWS state after each script.
94    pub reset: bool,
95    /// Force the use of the specified temporary directory to use.
96    ///
97    /// If unspecified, testdrive creates a temporary directory with a random
98    /// name.
99    pub temp_dir: Option<String>,
100    /// Source string to print out on errors.
101    pub source: Option<String>,
102    /// The default timeout for cancellable operations.
103    pub default_timeout: Duration,
104    /// The default number of tries for retriable operations.
105    pub default_max_tries: usize,
106    /// The initial backoff interval for retry operations.
107    ///
108    /// Set to 0 to retry immediately on failure.
109    pub initial_backoff: Duration,
110    /// Backoff factor to use for retry operations.
111    ///
112    /// Set to 1 to retry at a steady pace.
113    pub backoff_factor: f64,
114    /// Should we skip coordinator and catalog consistency checks.
115    pub consistency_checks: consistency::Level,
116    /// Whether to automatically rewrite wrong results instead of failing.
117    pub rewrite_results: bool,
118
119    // === Materialize options. ===
120    /// The pgwire connection parameters for the Materialize instance that
121    /// testdrive will connect to.
122    pub materialize_pgconfig: tokio_postgres::Config,
123    /// The internal pgwire connection parameters for the Materialize instance that
124    /// testdrive will connect to.
125    pub materialize_internal_pgconfig: tokio_postgres::Config,
126    /// Whether to use HTTPS instead of plain HTTP for the HTTP(S) connections.
127    pub materialize_use_https: bool,
128    /// The port for the public endpoints of the materialize instance that
129    /// testdrive will connect to via HTTP.
130    pub materialize_http_port: u16,
131    /// The port for the internal endpoints of the materialize instance that
132    /// testdrive will connect to via HTTP.
133    pub materialize_internal_http_port: u16,
134    /// Session parameters to set after connecting to materialize.
135    pub materialize_params: Vec<(String, String)>,
136    /// An optional catalog configuration.
137    pub materialize_catalog_config: Option<CatalogConfig>,
138    /// Build information
139    pub build_info: &'static BuildInfo,
140    /// Configured cluster replica sizes
141    pub materialize_cluster_replica_sizes: ClusterReplicaSizeMap,
142
143    // === Persist options. ===
144    /// Handle to the persist consensus system.
145    pub persist_consensus_url: Option<SensitiveUrl>,
146    /// Handle to the persist blob storage.
147    pub persist_blob_url: Option<SensitiveUrl>,
148
149    // === Confluent options. ===
150    /// The address of the Kafka broker that testdrive will interact with.
151    pub kafka_addr: String,
152    /// Default number of partitions to use for topics
153    pub kafka_default_partitions: usize,
154    /// Arbitrary rdkafka options for testdrive to use when connecting to the
155    /// Kafka broker.
156    pub kafka_opts: Vec<(String, String)>,
157    /// The URL of the schema registry that testdrive will connect to.
158    pub schema_registry_url: Url,
159    /// An optional path to a TLS certificate that testdrive will present when
160    /// performing client authentication.
161    ///
162    /// The keystore must be in the PKCS#12 format.
163    pub cert_path: Option<String>,
164    /// An optional password for the TLS certificate.
165    pub cert_password: Option<String>,
166    /// An optional username for basic authentication with the Confluent Schema
167    /// Registry.
168    pub ccsr_username: Option<String>,
169    /// An optional password for basic authentication with the Confluent Schema
170    /// Registry.
171    pub ccsr_password: Option<String>,
172
173    // === AWS options. ===
174    /// The configuration to use when connecting to AWS.
175    pub aws_config: SdkConfig,
176    /// The ID of the AWS account that `aws_config` configures.
177    pub aws_account: String,
178
179    // === Fivetran options. ===
180    /// Address of the Fivetran Destination that is currently running.
181    pub fivetran_destination_url: String,
182    /// Directory that is accessible to the Fivetran Destination.
183    pub fivetran_destination_files_path: String,
184}
185
186pub struct MaterializeState {
187    catalog_config: Option<CatalogConfig>,
188
189    sql_addr: String,
190    use_https: bool,
191    http_addr: String,
192    internal_sql_addr: String,
193    internal_http_addr: String,
194    user: String,
195    pgclient: tokio_postgres::Client,
196    environment_id: EnvironmentId,
197    bootstrap_args: BootstrapArgs,
198}
199
200pub struct State {
201    // === Testdrive state. ===
202    arg_vars: BTreeMap<String, String>,
203    cmd_vars: BTreeMap<String, String>,
204    seed: u32,
205    temp_path: PathBuf,
206    _tempfile: Option<tempfile::TempDir>,
207    default_timeout: Duration,
208    timeout: Duration,
209    max_tries: usize,
210    initial_backoff: Duration,
211    backoff_factor: f64,
212    consistency_checks: consistency::Level,
213    consistency_checks_adhoc_skip: bool,
214    regex: Option<Regex>,
215    regex_replacement: String,
216
217    // === Materialize state. ===
218    materialize: MaterializeState,
219
220    // === Persist state. ===
221    persist_consensus_url: Option<SensitiveUrl>,
222    persist_blob_url: Option<SensitiveUrl>,
223    build_info: &'static BuildInfo,
224    persist_clients: PersistClientCache,
225
226    // === Confluent state. ===
227    schema_registry_url: Url,
228    ccsr_client: mz_ccsr::Client,
229    kafka_addr: String,
230    kafka_admin: rdkafka::admin::AdminClient<MzClientContext>,
231    kafka_admin_opts: rdkafka::admin::AdminOptions,
232    kafka_config: ClientConfig,
233    kafka_default_partitions: usize,
234    kafka_producer: rdkafka::producer::FutureProducer<MzClientContext>,
235    kafka_topics: BTreeMap<String, usize>,
236
237    // === AWS state. ===
238    aws_account: String,
239    aws_config: SdkConfig,
240
241    // === Database driver state. ===
242    mysql_clients: BTreeMap<String, mysql_async::Conn>,
243    postgres_clients: BTreeMap<String, tokio_postgres::Client>,
244    sql_server_clients: BTreeMap<String, mz_sql_server_util::Client>,
245
246    // === Fivetran state. ===
247    fivetran_destination_url: String,
248    fivetran_destination_files_path: String,
249
250    // === Rewrite state. ===
251    rewrite_results: bool,
252    /// Current file, results are replaced inline
253    pub rewrites: Vec<Rewrite>,
254    /// Start position of currently expected result
255    pub rewrite_pos_start: usize,
256    /// End position of currently expected result
257    pub rewrite_pos_end: usize,
258}
259
260pub struct Rewrite {
261    pub content: String,
262    pub start: usize,
263    pub end: usize,
264}
265
266impl State {
267    pub async fn initialize_cmd_vars(&mut self) -> Result<(), anyhow::Error> {
268        self.cmd_vars
269            .insert("testdrive.kafka-addr".into(), self.kafka_addr.clone());
270        self.cmd_vars.insert(
271            "testdrive.kafka-addr-resolved".into(),
272            self.kafka_addr
273                .to_socket_addrs()
274                .ok()
275                .and_then(|mut addrs| addrs.next())
276                .map(|addr| addr.to_string())
277                .unwrap_or_else(|| "#RESOLUTION-FAILURE#".into()),
278        );
279        self.cmd_vars.insert(
280            "testdrive.schema-registry-url".into(),
281            self.schema_registry_url.to_string(),
282        );
283        self.cmd_vars
284            .insert("testdrive.seed".into(), self.seed.to_string());
285        self.cmd_vars.insert(
286            "testdrive.temp-dir".into(),
287            self.temp_path.display().to_string(),
288        );
289        self.cmd_vars
290            .insert("testdrive.aws-region".into(), self.aws_region().into());
291        self.cmd_vars
292            .insert("testdrive.aws-endpoint".into(), self.aws_endpoint().into());
293        self.cmd_vars
294            .insert("testdrive.aws-account".into(), self.aws_account.clone());
295        {
296            let aws_credentials = self
297                .aws_config
298                .credentials_provider()
299                .ok_or_else(|| anyhow!("no AWS credentials provider configured"))?
300                .provide_credentials()
301                .await
302                .context("fetching AWS credentials")?;
303            self.cmd_vars.insert(
304                "testdrive.aws-access-key-id".into(),
305                aws_credentials.access_key_id().to_owned(),
306            );
307            self.cmd_vars.insert(
308                "testdrive.aws-secret-access-key".into(),
309                aws_credentials.secret_access_key().to_owned(),
310            );
311            self.cmd_vars.insert(
312                "testdrive.aws-token".into(),
313                aws_credentials
314                    .session_token()
315                    .map(|token| token.to_owned())
316                    .unwrap_or_else(String::new),
317            );
318        }
319        self.cmd_vars.insert(
320            "testdrive.materialize-environment-id".into(),
321            self.materialize.environment_id.to_string(),
322        );
323        self.cmd_vars.insert(
324            "testdrive.materialize-sql-addr".into(),
325            self.materialize.sql_addr.clone(),
326        );
327        self.cmd_vars.insert(
328            "testdrive.materialize-internal-sql-addr".into(),
329            self.materialize.internal_sql_addr.clone(),
330        );
331        self.cmd_vars.insert(
332            "testdrive.materialize-user".into(),
333            self.materialize.user.clone(),
334        );
335        self.cmd_vars.insert(
336            "testdrive.fivetran-destination-url".into(),
337            self.fivetran_destination_url.clone(),
338        );
339        self.cmd_vars.insert(
340            "testdrive.fivetran-destination-files-path".into(),
341            self.fivetran_destination_files_path.clone(),
342        );
343
344        for (key, value) in env::vars() {
345            self.cmd_vars.insert(format!("env.{}", key), value);
346        }
347
348        for (key, value) in &self.arg_vars {
349            validate_ident(key)?;
350            self.cmd_vars
351                .insert(format!("arg.{}", key), value.to_string());
352        }
353
354        Ok(())
355    }
356    /// Makes of copy of the durable catalog and runs a function on its
357    /// state. Returns `None` if there's no catalog information in the State.
358    pub async fn with_catalog_copy<F, T>(
359        &self,
360        system_parameter_defaults: BTreeMap<String, String>,
361        build_info: &'static BuildInfo,
362        bootstrap_args: &BootstrapArgs,
363        enable_expression_cache_override: Option<bool>,
364        f: F,
365    ) -> Result<Option<T>, anyhow::Error>
366    where
367        F: FnOnce(ConnCatalog) -> T,
368    {
369        async fn persist_client(
370            persist_consensus_url: SensitiveUrl,
371            persist_blob_url: SensitiveUrl,
372            persist_clients: &PersistClientCache,
373        ) -> Result<PersistClient, anyhow::Error> {
374            let persist_location = PersistLocation {
375                blob_uri: persist_blob_url,
376                consensus_uri: persist_consensus_url,
377            };
378            Ok(persist_clients.open(persist_location).await?)
379        }
380
381        if let Some(CatalogConfig {
382            persist_consensus_url,
383            persist_blob_url,
384        }) = &self.materialize.catalog_config
385        {
386            let persist_client = persist_client(
387                persist_consensus_url.clone(),
388                persist_blob_url.clone(),
389                &self.persist_clients,
390            )
391            .await?;
392            let catalog = Catalog::open_debug_read_only_persist_catalog_config(
393                persist_client,
394                SYSTEM_TIME.clone(),
395                self.materialize.environment_id.clone(),
396                system_parameter_defaults,
397                build_info,
398                bootstrap_args,
399                enable_expression_cache_override,
400            )
401            .await?;
402            let res = f(catalog.for_session(&Session::dummy()));
403            catalog.expire().await;
404            Ok(Some(res))
405        } else {
406            Ok(None)
407        }
408    }
409
410    pub fn aws_endpoint(&self) -> &str {
411        self.aws_config.endpoint_url().unwrap_or("")
412    }
413
414    pub fn aws_region(&self) -> &str {
415        self.aws_config.region().map(|r| r.as_ref()).unwrap_or("")
416    }
417
418    /// Resets the adhoc skip consistency check that users can toggle per-file, and returns whether
419    /// the consistency checks should be skipped for this current run.
420    pub fn clear_skip_consistency_checks(&mut self) -> bool {
421        std::mem::replace(&mut self.consistency_checks_adhoc_skip, false)
422    }
423
424    pub async fn reset_materialize(&self) -> Result<(), anyhow::Error> {
425        let (inner_client, _) = postgres_client(
426            &format!(
427                "postgres://mz_system:materialize@{}",
428                self.materialize.internal_sql_addr
429            ),
430            self.default_timeout,
431        )
432        .await?;
433
434        let version = inner_client
435            .query_one("SELECT mz_version_num()", &[])
436            .await
437            .context("getting version of materialize")
438            .map(|row| row.get::<_, i32>(0))?;
439
440        let semver = inner_client
441            .query_one("SELECT right(split_part(mz_version(), ' ', 1), -1)", &[])
442            .await
443            .context("getting semver of materialize")
444            .map(|row| row.get::<_, String>(0))?
445            .parse::<semver::Version>()
446            .context("parsing semver of materialize")?;
447
448        inner_client
449            .batch_execute("ALTER SYSTEM RESET ALL")
450            .await
451            .context("resetting materialize state: ALTER SYSTEM RESET ALL")?;
452
453        // Dangerous functions are useful for tests so we enable it for all tests.
454        {
455            let rename_version = Version::parse("0.128.0-dev.1").expect("known to be valid");
456            let enable_unsafe_functions = if semver >= rename_version {
457                "unsafe_enable_unsafe_functions"
458            } else {
459                "enable_unsafe_functions"
460            };
461            let res = inner_client
462                .batch_execute(&format!("ALTER SYSTEM SET {enable_unsafe_functions} = on"))
463                .await
464                .context("enabling dangerous functions");
465            if let Err(e) = res {
466                match e.root_cause().downcast_ref::<DbError>() {
467                    Some(e) if *e.code() == SqlState::CANT_CHANGE_RUNTIME_PARAM => {
468                        info!(
469                            "can't enable unsafe functions because the server is safe mode; \
470                             testdrive scripts will fail if they use unsafe functions",
471                        );
472                    }
473                    _ => return Err(e),
474                }
475            }
476        }
477
478        for row in inner_client
479            .query("SHOW DATABASES", &[])
480            .await
481            .context("resetting materialize state: SHOW DATABASES")?
482        {
483            let db_name: String = row.get(0);
484            if db_name.starts_with("testdrive_no_reset_") {
485                continue;
486            }
487            let query = format!(
488                "DROP DATABASE {}",
489                postgres_protocol::escape::escape_identifier(&db_name)
490            );
491            sql::print_query(&query, None);
492            inner_client.batch_execute(&query).await.context(format!(
493                "resetting materialize state: DROP DATABASE {}",
494                db_name,
495            ))?;
496        }
497
498        // Get all user clusters not running any objects owned by users
499        let inactive_user_clusters = "
500        WITH
501            active_user_clusters AS
502            (
503                SELECT DISTINCT cluster_id, object_id
504                FROM
505                    (
506                        SELECT cluster_id, id FROM mz_catalog.mz_sources
507                        UNION ALL SELECT cluster_id, id FROM mz_catalog.mz_sinks
508                        UNION ALL
509                            SELECT cluster_id, id
510                            FROM mz_catalog.mz_materialized_views
511                        UNION ALL
512                            SELECT cluster_id, id FROM mz_catalog.mz_indexes
513                        UNION ALL
514                            SELECT cluster_id, id
515                            FROM mz_internal.mz_subscriptions
516                    )
517                    AS t (cluster_id, object_id)
518                WHERE cluster_id IS NOT NULL AND object_id LIKE 'u%'
519            )
520        SELECT name
521        FROM mz_catalog.mz_clusters
522        WHERE
523            id NOT IN ( SELECT cluster_id FROM active_user_clusters ) AND id LIKE 'u%'
524                AND
525            owner_id LIKE 'u%';";
526
527        let inactive_clusters = inner_client
528            .query(inactive_user_clusters, &[])
529            .await
530            .context("resetting materialize state: inactive_user_clusters")?;
531
532        if !inactive_clusters.is_empty() {
533            println!("cleaning up user clusters from previous tests...")
534        }
535
536        for cluster_name in inactive_clusters {
537            let cluster_name: String = cluster_name.get(0);
538            if cluster_name.starts_with("testdrive_no_reset_") {
539                continue;
540            }
541            let query = format!(
542                "DROP CLUSTER {}",
543                postgres_protocol::escape::escape_identifier(&cluster_name)
544            );
545            sql::print_query(&query, None);
546            inner_client.batch_execute(&query).await.context(format!(
547                "resetting materialize state: DROP CLUSTER {}",
548                cluster_name,
549            ))?;
550        }
551
552        inner_client
553            .batch_execute("CREATE DATABASE materialize")
554            .await
555            .context("resetting materialize state: CREATE DATABASE materialize")?;
556
557        // Attempt to remove all users but the current user. Old versions of
558        // Materialize did not support roles, so this degrades gracefully if
559        // mz_roles does not exist.
560        if let Ok(rows) = inner_client.query("SELECT name FROM mz_roles", &[]).await {
561            for row in rows {
562                let role_name: String = row.get(0);
563                if role_name == self.materialize.user || role_name.starts_with("mz_") {
564                    continue;
565                }
566                let query = format!(
567                    "DROP ROLE {}",
568                    postgres_protocol::escape::escape_identifier(&role_name)
569                );
570                sql::print_query(&query, None);
571                inner_client.batch_execute(&query).await.context(format!(
572                    "resetting materialize state: DROP ROLE {}",
573                    role_name,
574                ))?;
575            }
576        }
577
578        // Alter materialize user with all system privileges.
579        inner_client
580            .batch_execute(&format!(
581                "GRANT ALL PRIVILEGES ON SYSTEM TO {}",
582                self.materialize.user
583            ))
584            .await?;
585
586        // Grant initial privileges.
587        inner_client
588            .batch_execute("GRANT USAGE ON DATABASE materialize TO PUBLIC")
589            .await?;
590        inner_client
591            .batch_execute(&format!(
592                "GRANT ALL PRIVILEGES ON DATABASE materialize TO {}",
593                self.materialize.user
594            ))
595            .await?;
596        inner_client
597            .batch_execute(&format!(
598                "GRANT ALL PRIVILEGES ON SCHEMA materialize.public TO {}",
599                self.materialize.user
600            ))
601            .await?;
602
603        let cluster = match version {
604            ..=8199 => "default",
605            8200.. => "quickstart",
606        };
607        inner_client
608            .batch_execute(&format!("GRANT USAGE ON CLUSTER {cluster} TO PUBLIC"))
609            .await?;
610        inner_client
611            .batch_execute(&format!(
612                "GRANT ALL PRIVILEGES ON CLUSTER {cluster} TO {}",
613                self.materialize.user
614            ))
615            .await?;
616
617        Ok(())
618    }
619
620    /// Delete Kafka topics + CCSR subjects that were created in this run
621    pub async fn reset_kafka(&self) -> Result<(), anyhow::Error> {
622        let mut errors: Vec<anyhow::Error> = Vec::new();
623
624        let metadata = self.kafka_producer.client().fetch_metadata(
625            None,
626            Some(std::cmp::max(Duration::from_secs(1), self.default_timeout)),
627        )?;
628
629        let testdrive_topics: Vec<_> = metadata
630            .topics()
631            .iter()
632            .filter_map(|t| {
633                if t.name().starts_with("testdrive-") {
634                    Some(t.name())
635                } else {
636                    None
637                }
638            })
639            .collect();
640
641        if !testdrive_topics.is_empty() {
642            match self
643                .kafka_admin
644                .delete_topics(&testdrive_topics, &self.kafka_admin_opts)
645                .await
646            {
647                Ok(res) => {
648                    if res.len() != testdrive_topics.len() {
649                        errors.push(anyhow!(
650                            "kafka topic deletion returned {} results, but exactly {} expected",
651                            res.len(),
652                            testdrive_topics.len()
653                        ));
654                    }
655                    for (res, topic) in res.iter().zip(testdrive_topics.iter()) {
656                        match res {
657                            Ok(_)
658                            | Err((_, rdkafka::types::RDKafkaErrorCode::UnknownTopicOrPartition)) => {
659                                ()
660                            }
661                            Err((_, err)) => {
662                                errors.push(anyhow!("unable to delete {}: {}", topic, err));
663                            }
664                        }
665                    }
666                }
667                Err(e) => {
668                    errors.push(e.into());
669                }
670            };
671        }
672
673        match self
674            .ccsr_client
675            .list_subjects()
676            .await
677            .context("listing schema registry subjects")
678        {
679            Ok(subjects) => {
680                let testdrive_subjects: Vec<_> = subjects
681                    .iter()
682                    .filter(|s| s.starts_with("testdrive-"))
683                    .collect();
684
685                for subject in testdrive_subjects {
686                    match self.ccsr_client.delete_subject(subject).await {
687                        Ok(()) | Err(mz_ccsr::DeleteError::SubjectNotFound) => (),
688                        Err(e) => errors.push(e.into()),
689                    }
690                }
691            }
692            Err(e) => {
693                errors.push(e);
694            }
695        }
696
697        if errors.is_empty() {
698            Ok(())
699        } else {
700            bail!(
701                "deleting Kafka topics: {} errors: {}",
702                errors.len(),
703                errors
704                    .into_iter()
705                    .map(|e| e.to_string_with_causes())
706                    .join("\n")
707            );
708        }
709    }
710}
711
712/// Configuration for the Catalog.
713#[derive(Debug, Clone)]
714pub struct CatalogConfig {
715    /// Handle to the persist consensus system.
716    pub persist_consensus_url: SensitiveUrl,
717    /// Handle to the persist blob storage.
718    pub persist_blob_url: SensitiveUrl,
719}
720
721pub enum ControlFlow {
722    Continue,
723    SkipBegin,
724    SkipEnd,
725}
726
727#[async_trait]
728pub(crate) trait Run {
729    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError>;
730}
731
732#[async_trait]
733impl Run for PosCommand {
734    async fn run(self, state: &mut State) -> Result<ControlFlow, PosError> {
735        macro_rules! handle_version {
736            ($version_constraint:expr) => {
737                match $version_constraint {
738                    Some(VersionConstraint { min, max }) => {
739                        match version_check::run_version_check(min, max, state).await {
740                            Ok(true) => return Ok(ControlFlow::Continue),
741                            Ok(false) => {}
742                            Err(err) => return Err(PosError::new(err, self.pos)),
743                        }
744                    }
745                    None => {}
746                }
747            };
748        }
749
750        let wrap_err = |e| PosError::new(e, self.pos);
751        // Substitute variables at startup except for the command-specific ones
752        // Those will be substituted at runtime
753        let ignore_prefix = match &self.command {
754            Command::Builtin(builtin, _) => Some(builtin.name.clone()),
755            _ => None,
756        };
757        let subst = |msg: &str, vars: &BTreeMap<String, String>| {
758            substitute_vars(msg, vars, &ignore_prefix, false).map_err(wrap_err)
759        };
760        let subst_re = |msg: &str, vars: &BTreeMap<String, String>| {
761            substitute_vars(msg, vars, &ignore_prefix, true).map_err(wrap_err)
762        };
763
764        let r = match self.command {
765            Command::Builtin(mut builtin, version_constraint) => {
766                handle_version!(version_constraint);
767                for val in builtin.args.values_mut() {
768                    *val = subst(val, &state.cmd_vars)?;
769                }
770                for line in &mut builtin.input {
771                    *line = subst(line, &state.cmd_vars)?;
772                }
773                match builtin.name.as_ref() {
774                    "check-consistency" => consistency::run_consistency_checks(state).await,
775                    "skip-consistency-checks" => {
776                        consistency::skip_consistency_checks(builtin, state)
777                    }
778                    "check-shard-tombstone" => {
779                        consistency::run_check_shard_tombstoned(builtin, state).await
780                    }
781                    "fivetran-destination" => {
782                        fivetran::run_destination_command(builtin, state).await
783                    }
784                    "file-append" => file::run_append(builtin, state).await,
785                    "file-delete" => file::run_delete(builtin, state).await,
786                    "http-request" => http::run_request(builtin, state).await,
787                    "kafka-add-partitions" => kafka::run_add_partitions(builtin, state).await,
788                    "kafka-create-topic" => kafka::run_create_topic(builtin, state).await,
789                    "kafka-wait-topic" => kafka::run_wait_topic(builtin, state).await,
790                    "kafka-delete-records" => kafka::run_delete_records(builtin, state).await,
791                    "kafka-delete-topic-flaky" => kafka::run_delete_topic(builtin, state).await,
792                    "kafka-ingest" => kafka::run_ingest(builtin, state).await,
793                    "kafka-verify-data" => kafka::run_verify_data(builtin, state).await,
794                    "kafka-verify-commit" => kafka::run_verify_commit(builtin, state).await,
795                    "kafka-verify-topic" => kafka::run_verify_topic(builtin, state).await,
796                    "mysql-connect" => mysql::run_connect(builtin, state).await,
797                    "mysql-execute" => mysql::run_execute(builtin, state).await,
798                    "nop" => nop::run_nop(),
799                    "postgres-connect" => postgres::run_connect(builtin, state).await,
800                    "postgres-execute" => postgres::run_execute(builtin, state).await,
801                    "postgres-verify-slot" => postgres::run_verify_slot(builtin, state).await,
802                    "protobuf-compile-descriptors" => {
803                        protobuf::run_compile_descriptors(builtin, state).await
804                    }
805                    "psql-execute" => psql::run_execute(builtin, state).await,
806                    "s3-verify-data" => s3::run_verify_data(builtin, state).await,
807                    "s3-verify-keys" => s3::run_verify_keys(builtin, state).await,
808                    "s3-file-upload" => s3::run_upload(builtin, state).await,
809                    "s3-set-presigned-url" => s3::run_set_presigned_url(builtin, state).await,
810                    "schema-registry-publish" => schema_registry::run_publish(builtin, state).await,
811                    "schema-registry-verify" => schema_registry::run_verify(builtin, state).await,
812                    "schema-registry-wait" => schema_registry::run_wait(builtin, state).await,
813                    "skip-if" => skip_if::run_skip_if(builtin, state).await,
814                    "skip-end" => skip_end::run_skip_end(),
815                    "sql-server-connect" => sql_server::run_connect(builtin, state).await,
816                    "sql-server-execute" => sql_server::run_execute(builtin, state).await,
817                    "persist-force-compaction" => {
818                        persist::run_force_compaction(builtin, state).await
819                    }
820                    "random-sleep" => sleep::run_random_sleep(builtin),
821                    "set-regex" => set::run_regex_set(builtin, state),
822                    "unset-regex" => set::run_regex_unset(builtin, state),
823                    "set-sql-timeout" => set::run_sql_timeout(builtin, state),
824                    "set-max-tries" => set::run_max_tries(builtin, state),
825                    "sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment" => {
826                        sleep::run_sleep(builtin)
827                    }
828                    "set" => set::set_vars(builtin, state),
829                    "set-arg-default" => set::run_set_arg_default(builtin, state),
830                    "set-from-sql" => set::run_set_from_sql(builtin, state).await,
831                    "set-from-file" => set::run_set_from_file(builtin, state).await,
832                    "webhook-append" => webhook::run_append(builtin, state).await,
833                    _ => {
834                        return Err(PosError::new(
835                            anyhow!("unknown built-in command {}", builtin.name),
836                            self.pos,
837                        ));
838                    }
839                }
840            }
841            Command::Sql(mut sql, version_constraint) => {
842                handle_version!(version_constraint);
843                sql.query = subst(&sql.query, &state.cmd_vars)?;
844                if let SqlOutput::Full { expected_rows, .. } = &mut sql.expected_output {
845                    for row in expected_rows {
846                        for col in row {
847                            *col = subst(col, &state.cmd_vars)?;
848                        }
849                    }
850                }
851                sql::run_sql(sql, state).await
852            }
853            Command::FailSql(mut sql, version_constraint) => {
854                handle_version!(version_constraint);
855                sql.query = subst(&sql.query, &state.cmd_vars)?;
856                sql.expected_error = match &sql.expected_error {
857                    SqlExpectedError::Contains(s) => {
858                        SqlExpectedError::Contains(subst(s, &state.cmd_vars)?)
859                    }
860                    SqlExpectedError::Exact(s) => {
861                        SqlExpectedError::Exact(subst(s, &state.cmd_vars)?)
862                    }
863                    SqlExpectedError::Regex(s) => {
864                        SqlExpectedError::Regex(subst_re(s, &state.cmd_vars)?)
865                    }
866                    SqlExpectedError::Timeout => SqlExpectedError::Timeout,
867                };
868                sql::run_fail_sql(sql, state).await
869            }
870        };
871
872        r.map_err(wrap_err)
873    }
874}
875
876/// Substituted `${}`-delimited variables from `vars` into `msg`
877fn substitute_vars(
878    msg: &str,
879    vars: &BTreeMap<String, String>,
880    ignore_prefix: &Option<String>,
881    regex_escape: bool,
882) -> Result<String, anyhow::Error> {
883    static RE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\$\{([^}]+)\}").unwrap());
884    let mut err = None;
885    let out = RE.replace_all(msg, |caps: &Captures| {
886        let name = &caps[1];
887        if let Some(ignore_prefix) = &ignore_prefix {
888            if name.starts_with(format!("{}.", ignore_prefix).as_str()) {
889                // Do not substitute, leave original variable name in place
890                return caps.get(0).unwrap().as_str().to_string();
891            }
892        }
893
894        if let Some(val) = vars.get(name) {
895            if regex_escape {
896                regex::escape(val)
897            } else {
898                val.to_string()
899            }
900        } else {
901            err = Some(anyhow!("unknown variable: {}", name));
902            "#VAR-MISSING#".to_string()
903        }
904    });
905    match err {
906        Some(err) => Err(err),
907        None => Ok(out.into_owned()),
908    }
909}
910
911/// Initializes a [`State`] object by connecting to the various external
912/// services specified in `config`.
913///
914/// Returns the initialized `State` and a cleanup future. The cleanup future
915/// should be `await`ed only *after* dropping the `State` to check whether any
916/// errors occured while dropping the `State`. This awkward API is a workaround
917/// for the lack of `AsyncDrop` support in Rust.
918pub async fn create_state(
919    config: &Config,
920) -> Result<(State, impl Future<Output = Result<(), anyhow::Error>>), anyhow::Error> {
921    let seed = config.seed.unwrap_or_else(|| rand::thread_rng().r#gen());
922
923    let (_tempfile, temp_path) = match &config.temp_dir {
924        Some(temp_dir) => {
925            fs::create_dir_all(temp_dir).context("creating temporary directory")?;
926            (None, PathBuf::from(&temp_dir))
927        }
928        _ => {
929            // Stash the tempfile object so that it does not go out of scope and delete
930            // the tempdir prematurely
931            let tempfile_handle = tempfile::tempdir().context("creating temporary directory")?;
932            let temp_path = tempfile_handle.path().to_path_buf();
933            (Some(tempfile_handle), temp_path)
934        }
935    };
936
937    let materialize_catalog_config = config.materialize_catalog_config.clone();
938
939    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
940    info!("Connecting to {}", materialize_url.as_str());
941    let (pgclient, pgconn) = Retry::default()
942        .max_duration(config.default_timeout)
943        .retry_async_canceling(|_| async move {
944            let mut pgconfig = config.materialize_pgconfig.clone();
945            pgconfig.connect_timeout(config.default_timeout);
946            let tls = make_tls(&pgconfig)?;
947            pgconfig.connect(tls).await.map_err(|e| anyhow!(e))
948        })
949        .await?;
950
951    let pgconn_task = task::spawn(|| "pgconn_task", pgconn).map(|join| {
952        join.expect("pgconn_task unexpectedly canceled")
953            .context("running SQL connection")
954    });
955
956    let materialize_state =
957        create_materialize_state(&config, materialize_catalog_config, pgclient).await?;
958
959    let schema_registry_url = config.schema_registry_url.to_owned();
960
961    let ccsr_client = {
962        let mut ccsr_config = mz_ccsr::ClientConfig::new(schema_registry_url.clone());
963
964        if let Some(cert_path) = &config.cert_path {
965            let cert = fs::read(cert_path).context("reading cert")?;
966            let pass = config.cert_password.as_deref().unwrap_or("").to_owned();
967            let ident = mz_ccsr::tls::Identity::from_pkcs12_der(cert, pass)
968                .context("reading keystore file as pkcs12")?;
969            ccsr_config = ccsr_config.identity(ident);
970        }
971
972        if let Some(ccsr_username) = &config.ccsr_username {
973            ccsr_config = ccsr_config.auth(ccsr_username.clone(), config.ccsr_password.clone());
974        }
975
976        ccsr_config.build().context("Creating CCSR client")?
977    };
978
979    let (kafka_addr, kafka_admin, kafka_admin_opts, kafka_producer, kafka_topics, kafka_config) = {
980        use rdkafka::admin::{AdminClient, AdminOptions};
981        use rdkafka::producer::FutureProducer;
982
983        let mut kafka_config = create_new_client_config_simple();
984        kafka_config.set("bootstrap.servers", &config.kafka_addr);
985        kafka_config.set("group.id", "materialize-testdrive");
986        kafka_config.set("auto.offset.reset", "earliest");
987        kafka_config.set("isolation.level", "read_committed");
988        if let Some(cert_path) = &config.cert_path {
989            kafka_config.set("security.protocol", "ssl");
990            kafka_config.set("ssl.keystore.location", cert_path);
991            if let Some(cert_password) = &config.cert_password {
992                kafka_config.set("ssl.keystore.password", cert_password);
993            }
994        }
995        kafka_config.set("message.max.bytes", "15728640");
996
997        for (key, value) in &config.kafka_opts {
998            kafka_config.set(key, value);
999        }
1000
1001        let admin: AdminClient<_> = kafka_config
1002            .create_with_context(MzClientContext::default())
1003            .with_context(|| format!("opening Kafka connection: {}", config.kafka_addr))?;
1004
1005        let admin_opts = AdminOptions::new().operation_timeout(Some(config.default_timeout));
1006
1007        let producer: FutureProducer<_> = kafka_config
1008            .create_with_context(MzClientContext::default())
1009            .with_context(|| format!("opening Kafka producer connection: {}", config.kafka_addr))?;
1010
1011        let topics = BTreeMap::new();
1012
1013        (
1014            config.kafka_addr.to_owned(),
1015            admin,
1016            admin_opts,
1017            producer,
1018            topics,
1019            kafka_config,
1020        )
1021    };
1022
1023    let mut state = State {
1024        // === Testdrive state. ===
1025        arg_vars: config.arg_vars.clone(),
1026        cmd_vars: BTreeMap::new(),
1027        seed,
1028        temp_path,
1029        _tempfile,
1030        default_timeout: config.default_timeout,
1031        timeout: config.default_timeout,
1032        max_tries: config.default_max_tries,
1033        initial_backoff: config.initial_backoff,
1034        backoff_factor: config.backoff_factor,
1035        consistency_checks: config.consistency_checks,
1036        consistency_checks_adhoc_skip: false,
1037        regex: None,
1038        regex_replacement: set::DEFAULT_REGEX_REPLACEMENT.into(),
1039        rewrite_results: config.rewrite_results,
1040
1041        // === Materialize state. ===
1042        materialize: materialize_state,
1043
1044        // === Persist state. ===
1045        persist_consensus_url: config.persist_consensus_url.clone(),
1046        persist_blob_url: config.persist_blob_url.clone(),
1047        build_info: config.build_info,
1048        persist_clients: PersistClientCache::new(
1049            PersistConfig::new_default_configs(config.build_info, SYSTEM_TIME.clone()),
1050            &MetricsRegistry::new(),
1051            |_, _| PubSubClientConnection::noop(),
1052        ),
1053
1054        // === Confluent state. ===
1055        schema_registry_url,
1056        ccsr_client,
1057        kafka_addr,
1058        kafka_admin,
1059        kafka_admin_opts,
1060        kafka_config,
1061        kafka_default_partitions: config.kafka_default_partitions,
1062        kafka_producer,
1063        kafka_topics,
1064
1065        // === AWS state. ===
1066        aws_account: config.aws_account.clone(),
1067        aws_config: config.aws_config.clone(),
1068
1069        // === Database driver state. ===
1070        mysql_clients: BTreeMap::new(),
1071        postgres_clients: BTreeMap::new(),
1072        sql_server_clients: BTreeMap::new(),
1073
1074        // === Fivetran state. ===
1075        fivetran_destination_url: config.fivetran_destination_url.clone(),
1076        fivetran_destination_files_path: config.fivetran_destination_files_path.clone(),
1077
1078        rewrites: Vec::new(),
1079        rewrite_pos_start: 0,
1080        rewrite_pos_end: 0,
1081    };
1082    state.initialize_cmd_vars().await?;
1083    Ok((state, pgconn_task))
1084}
1085
1086async fn create_materialize_state(
1087    config: &&Config,
1088    materialize_catalog_config: Option<CatalogConfig>,
1089    pgclient: tokio_postgres::Client,
1090) -> Result<MaterializeState, anyhow::Error> {
1091    let materialize_url = util::postgres::config_url(&config.materialize_pgconfig)?;
1092    let materialize_internal_url =
1093        util::postgres::config_url(&config.materialize_internal_pgconfig)?;
1094
1095    for (key, value) in &config.materialize_params {
1096        pgclient
1097            .batch_execute(&format!("SET {key} = {value}"))
1098            .await
1099            .context("setting session parameter")?;
1100    }
1101
1102    let materialize_user = config
1103        .materialize_pgconfig
1104        .get_user()
1105        .expect("testdrive URL must contain user")
1106        .to_string();
1107
1108    let materialize_sql_addr = format!(
1109        "{}:{}",
1110        materialize_url.host_str().unwrap(),
1111        materialize_url.port().unwrap()
1112    );
1113    let materialize_http_addr = format!(
1114        "{}:{}",
1115        materialize_url.host_str().unwrap(),
1116        config.materialize_http_port
1117    );
1118    let materialize_internal_sql_addr = format!(
1119        "{}:{}",
1120        materialize_internal_url.host_str().unwrap(),
1121        materialize_internal_url.port().unwrap()
1122    );
1123    let materialize_internal_http_addr = format!(
1124        "{}:{}",
1125        materialize_internal_url.host_str().unwrap(),
1126        config.materialize_internal_http_port
1127    );
1128    let environment_id = pgclient
1129        .query_one("SELECT mz_environment_id()", &[])
1130        .await?
1131        .get::<_, String>(0)
1132        .parse()
1133        .context("parsing environment ID")?;
1134
1135    let bootstrap_args = BootstrapArgs {
1136        cluster_replica_size_map: config.materialize_cluster_replica_sizes.clone(),
1137        default_cluster_replica_size: "ABC".to_string(),
1138        default_cluster_replication_factor: 1,
1139        bootstrap_role: None,
1140    };
1141
1142    let materialize_state = MaterializeState {
1143        catalog_config: materialize_catalog_config,
1144        sql_addr: materialize_sql_addr,
1145        use_https: config.materialize_use_https,
1146        http_addr: materialize_http_addr,
1147        internal_sql_addr: materialize_internal_sql_addr,
1148        internal_http_addr: materialize_internal_http_addr,
1149        user: materialize_user,
1150        pgclient,
1151        environment_id,
1152        bootstrap_args,
1153    };
1154
1155    Ok(materialize_state)
1156}