mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
62};
63use prost::Message;
64use protobuf_native::MessageLite;
65use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
66use rdkafka::admin::AdminClient;
67use references::{RetrievedSourceReferences, SourceReferenceClient};
68use uuid::Uuid;
69
70use crate::ast::{
71    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
72    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
73    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
74};
75use crate::catalog::{CatalogItemType, SessionCatalog};
76use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
77use crate::names::{
78    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
79    ResolvedItemName,
80};
81use crate::plan::error::PlanError;
82use crate::plan::statement::ddl::load_generator_ast_to_generator;
83use crate::plan::{SourceReferences, StatementContext};
84use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
85use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
86use crate::{kafka_util, normalize};
87
88use self::error::{
89    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
90    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
91};
92
93pub(crate) mod error;
94mod references;
95
96pub mod mysql;
97pub mod postgres;
98pub mod sql_server;
99
100pub(crate) struct RequestedSourceExport<T> {
101    external_reference: UnresolvedItemName,
102    name: UnresolvedItemName,
103    meta: T,
104}
105
106impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.debug_struct("RequestedSourceExport")
109            .field("external_reference", &self.external_reference)
110            .field("name", &self.name)
111            .field("meta", &self.meta)
112            .finish()
113    }
114}
115
116impl<T> RequestedSourceExport<T> {
117    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
118        RequestedSourceExport {
119            external_reference: self.external_reference,
120            name: self.name,
121            meta: new_meta,
122        }
123    }
124}
125
126/// Generates a subsource name by prepending source schema name if present
127///
128/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
129/// so that it's generated in the same schema as source
130fn source_export_name_gen(
131    source_name: &UnresolvedItemName,
132    subsource_name: &str,
133) -> Result<UnresolvedItemName, PlanError> {
134    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
135    partial.item = subsource_name.to_string();
136    Ok(UnresolvedItemName::from(partial))
137}
138
139// TODO(database-issues#8620): Remove once subsources are removed
140/// Validates the requested subsources do not have name conflicts with each other
141/// and that the same upstream table is not referenced multiple times.
142fn validate_source_export_names<T>(
143    requested_source_exports: &[RequestedSourceExport<T>],
144) -> Result<(), PlanError> {
145    // This condition would get caught during the catalog transaction, but produces a
146    // vague, non-contextual error. Instead, error here so we can suggest to the user
147    // how to fix the problem.
148    if let Some(name) = requested_source_exports
149        .iter()
150        .map(|subsource| &subsource.name)
151        .duplicates()
152        .next()
153        .cloned()
154    {
155        let mut upstream_references: Vec<_> = requested_source_exports
156            .into_iter()
157            .filter_map(|subsource| {
158                if &subsource.name == &name {
159                    Some(subsource.external_reference.clone())
160                } else {
161                    None
162                }
163            })
164            .collect();
165
166        upstream_references.sort();
167
168        Err(PlanError::SubsourceNameConflict {
169            name,
170            upstream_references,
171        })?;
172    }
173
174    // We disallow subsource statements from referencing the same upstream table, but we allow
175    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
176    // purification will only provide 1 `requested_source_export`, we can leave this here without
177    // needing to differentiate between the two types of statements.
178    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
179    if let Some(name) = requested_source_exports
180        .iter()
181        .map(|export| &export.external_reference)
182        .duplicates()
183        .next()
184        .cloned()
185    {
186        let mut target_names: Vec<_> = requested_source_exports
187            .into_iter()
188            .filter_map(|export| {
189                if &export.external_reference == &name {
190                    Some(export.name.clone())
191                } else {
192                    None
193                }
194            })
195            .collect();
196
197        target_names.sort();
198
199        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
200    }
201
202    Ok(())
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum PurifiedStatement {
207    PurifiedCreateSource {
208        // The progress subsource, if we are offloading progress info to a separate relation
209        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
210        create_source_stmt: CreateSourceStatement<Aug>,
211        // Map of subsource names to external details
212        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213        /// All the available upstream references that can be added as tables
214        /// to this primary source.
215        available_source_references: SourceReferences,
216    },
217    PurifiedAlterSource {
218        alter_source_stmt: AlterSourceStatement<Aug>,
219    },
220    PurifiedAlterSourceAddSubsources {
221        // This just saves us an annoying catalog lookup
222        source_name: ResolvedItemName,
223        /// Options that we will need the values of to update the source's
224        /// definition.
225        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226        // Map of subsource names to external details
227        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228    },
229    PurifiedAlterSourceRefreshReferences {
230        source_name: ResolvedItemName,
231        /// The updated available upstream references for the primary source.
232        available_source_references: SourceReferences,
233    },
234    PurifiedCreateSink(CreateSinkStatement<Aug>),
235    PurifiedCreateTableFromSource {
236        stmt: CreateTableFromSourceStatement<Aug>,
237    },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242    pub external_reference: UnresolvedItemName,
243    pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248    MySql {
249        table: MySqlTableDesc,
250        text_columns: Option<Vec<Ident>>,
251        exclude_columns: Option<Vec<Ident>>,
252        initial_gtid_set: String,
253    },
254    Postgres {
255        table: PostgresTableDesc,
256        text_columns: Option<Vec<Ident>>,
257        exclude_columns: Option<Vec<Ident>>,
258    },
259    SqlServer {
260        table: SqlServerTableDesc,
261        text_columns: Option<Vec<Ident>>,
262        excl_columns: Option<Vec<Ident>>,
263        capture_instance: Arc<str>,
264        initial_lsn: mz_sql_server_util::cdc::Lsn,
265    },
266    Kafka {},
267    LoadGenerator {
268        table: Option<RelationDesc>,
269        output: LoadGeneratorOutput,
270    },
271}
272
273/// Purifies a statement, removing any dependencies on external state.
274///
275/// See the section on [purification](crate#purification) in the crate
276/// documentation for details.
277///
278/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
279/// handled by [purify_create_materialized_view_options] instead.
280/// This could be made more consistent by a refactoring discussed here:
281/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
282pub async fn purify_statement(
283    catalog: impl SessionCatalog,
284    now: u64,
285    stmt: Statement<Aug>,
286    storage_configuration: &StorageConfiguration,
287) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
288    match stmt {
289        Statement::CreateSource(stmt) => {
290            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
291            (
292                purify_create_source(catalog, now, stmt, storage_configuration).await,
293                cluster_id,
294            )
295        }
296        Statement::AlterSource(stmt) => (
297            purify_alter_source(catalog, stmt, storage_configuration).await,
298            None,
299        ),
300        Statement::CreateSink(stmt) => {
301            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
302            (
303                purify_create_sink(catalog, stmt, storage_configuration).await,
304                cluster_id,
305            )
306        }
307        Statement::CreateTableFromSource(stmt) => (
308            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
309            None,
310        ),
311        o => unreachable!("{:?} does not need to be purified", o),
312    }
313}
314
315/// Injects `DOC ON` comments into all Avro formats that are using a schema
316/// registry by finding all SQL `COMMENT`s that are attached to the sink's
317/// underlying materialized view (as well as any types referenced by that
318/// underlying materialized view).
319pub(crate) fn purify_create_sink_avro_doc_on_options(
320    catalog: &dyn SessionCatalog,
321    from_id: CatalogItemId,
322    format: &mut Option<FormatSpecifier<Aug>>,
323) -> Result<(), PlanError> {
324    // Collect all objects referenced by the sink.
325    let from = catalog.get_item(&from_id);
326    let object_ids = from
327        .references()
328        .items()
329        .copied()
330        .chain_one(from.id())
331        .collect::<Vec<_>>();
332
333    // Collect all Avro formats that use a schema registry, as well as a set of
334    // all identifiers named in user-provided `DOC ON` options.
335    let mut avro_format_options = vec![];
336    for_each_format(format, |doc_on_schema, fmt| match fmt {
337        Format::Avro(AvroSchema::InlineSchema { .. })
338        | Format::Bytes
339        | Format::Csv { .. }
340        | Format::Json { .. }
341        | Format::Protobuf(..)
342        | Format::Regex(..)
343        | Format::Text => (),
344        Format::Avro(AvroSchema::Csr {
345            csr_connection: CsrConnectionAvro { connection, .. },
346        }) => {
347            avro_format_options.push((doc_on_schema, &mut connection.options));
348        }
349    });
350
351    // For each Avro format in the sink, inject the appropriate `DOC ON` options
352    // for each item referenced by the sink.
353    for (for_schema, options) in avro_format_options {
354        let user_provided_comments = options
355            .iter()
356            .filter_map(|CsrConfigOption { name, .. }| match name {
357                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
358                _ => None,
359            })
360            .collect::<BTreeSet<_>>();
361
362        // Adding existing comments if not already provided by user
363        for object_id in &object_ids {
364            // Always add comments to the latest version of the item.
365            let item = catalog
366                .get_item(object_id)
367                .at_version(RelationVersionSelector::Latest);
368            let full_resolved_name = ResolvedItemName::Item {
369                id: *object_id,
370                qualifiers: item.name().qualifiers.clone(),
371                full_name: catalog.resolve_full_name(item.name()),
372                print_id: !matches!(
373                    item.item_type(),
374                    CatalogItemType::Func | CatalogItemType::Type
375                ),
376                version: RelationVersionSelector::Latest,
377            };
378
379            if let Some(comments_map) = catalog.get_item_comments(object_id) {
380                // Attach comment for the item itself, if the user has not
381                // already provided an overriding `DOC ON` option for the item.
382                let doc_on_item_key = AvroDocOn {
383                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384                    for_schema,
385                };
386                if !user_provided_comments.contains(&doc_on_item_key) {
387                    if let Some(root_comment) = comments_map.get(&None) {
388                        options.push(CsrConfigOption {
389                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391                                root_comment.clone(),
392                            ))),
393                        });
394                    }
395                }
396
397                // Attach comment for each column in the item, if the user has
398                // not already provided an overriding `DOC ON` option for the
399                // column.
400                let column_descs = match item.type_details() {
401                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
402                    None => item.relation_desc().map(|d| d.into_owned()),
403                };
404
405                if let Some(desc) = column_descs {
406                    for (pos, column_name) in desc.iter_names().enumerate() {
407                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
408                            let doc_on_column_key = AvroDocOn {
409                                identifier: DocOnIdentifier::Column(ColumnName {
410                                    relation: full_resolved_name.clone(),
411                                    column: ResolvedColumnReference::Column {
412                                        name: column_name.to_owned(),
413                                        index: pos,
414                                    },
415                                }),
416                                for_schema,
417                            };
418                            if !user_provided_comments.contains(&doc_on_column_key) {
419                                options.push(CsrConfigOption {
420                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
421                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
422                                        Value::String(comment_str.clone()),
423                                    )),
424                                });
425                            }
426                        }
427                    }
428                }
429            }
430        }
431    }
432
433    Ok(())
434}
435
436/// Checks that the sink described in the statement can connect to its external
437/// resources.
438async fn purify_create_sink(
439    catalog: impl SessionCatalog,
440    mut create_sink_stmt: CreateSinkStatement<Aug>,
441    storage_configuration: &StorageConfiguration,
442) -> Result<PurifiedStatement, PlanError> {
443    // General purification
444    let CreateSinkStatement {
445        connection,
446        format,
447        with_options,
448        name: _,
449        in_cluster: _,
450        if_not_exists: _,
451        from,
452        envelope: _,
453    } = &mut create_sink_stmt;
454
455    // The list of options that the user is allowed to specify.
456    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
457        CreateSinkOptionName::Snapshot,
458        CreateSinkOptionName::CommitInterval,
459    ];
460
461    if let Some(op) = with_options
462        .iter()
463        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
464    {
465        sql_bail!(
466            "CREATE SINK...WITH ({}..) is not allowed",
467            op.name.to_ast_string_simple(),
468        )
469    }
470
471    match &connection {
472        CreateSinkConnection::Kafka {
473            connection,
474            options,
475            key: _,
476            headers: _,
477        } => {
478            // We must not leave any state behind in the Kafka broker, so just ensure that
479            // we can connect. This means we don't ensure that we can create the topic and
480            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
481            // preferable to leaking state in users' environments.
482            let scx = StatementContext::new(None, &catalog);
483            let connection = {
484                let item = scx.get_item_by_resolved_name(connection)?;
485                // Get Kafka connection
486                match item.connection()? {
487                    Connection::Kafka(connection) => {
488                        connection.clone().into_inline_connection(scx.catalog)
489                    }
490                    _ => sql_bail!(
491                        "{} is not a kafka connection",
492                        scx.catalog.resolve_full_name(item.name())
493                    ),
494                }
495            };
496
497            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
498
499            if extracted_options.legacy_ids == Some(true) {
500                sql_bail!("LEGACY IDs option is not supported");
501            }
502
503            let client: AdminClient<_> = connection
504                .create_with_context(
505                    storage_configuration,
506                    MzClientContext::default(),
507                    &BTreeMap::new(),
508                    InTask::No,
509                )
510                .await
511                .map_err(|e| {
512                    // anyhow doesn't support Clone, so not trivial to move into PlanError
513                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
514                })?;
515
516            let metadata = client
517                .inner()
518                .fetch_metadata(
519                    None,
520                    storage_configuration
521                        .parameters
522                        .kafka_timeout_config
523                        .fetch_metadata_timeout,
524                )
525                .map_err(|e| {
526                    KafkaSinkPurificationError::AdminClientError(Arc::new(
527                        ContextCreationError::KafkaError(e),
528                    ))
529                })?;
530
531            if metadata.brokers().len() == 0 {
532                Err(KafkaSinkPurificationError::ZeroBrokers)?;
533            }
534        }
535        CreateSinkConnection::Iceberg {
536            connection,
537            aws_connection,
538            ..
539        } => {
540            let scx = StatementContext::new(None, &catalog);
541            let connection = {
542                let item = scx.get_item_by_resolved_name(connection)?;
543                // Get Iceberg connection
544                match item.connection()? {
545                    Connection::IcebergCatalog(connection) => {
546                        connection.clone().into_inline_connection(scx.catalog)
547                    }
548                    _ => sql_bail!(
549                        "{} is not an iceberg connection",
550                        scx.catalog.resolve_full_name(item.name())
551                    ),
552                }
553            };
554
555            let aws_conn_id = aws_connection.item_id();
556
557            let aws_connection = {
558                let item = scx.get_item_by_resolved_name(aws_connection)?;
559                // Get AWS connection
560                match item.connection()? {
561                    Connection::Aws(aws_connection) => aws_connection.clone(),
562                    _ => sql_bail!(
563                        "{} is not an aws connection",
564                        scx.catalog.resolve_full_name(item.name())
565                    ),
566                }
567            };
568
569            let _catalog = connection
570                .connect(storage_configuration, InTask::No)
571                .await
572                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
573
574            let sdk_config = aws_connection
575                .load_sdk_config(
576                    &storage_configuration.connection_context,
577                    aws_conn_id.clone(),
578                    InTask::No,
579                )
580                .await
581                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
582
583            let sts_client = aws_sdk_sts::Client::new(&sdk_config);
584            let _ = sts_client.get_caller_identity().send().await.map_err(|e| {
585                IcebergSinkPurificationError::StsIdentityError(Arc::new(e.into_service_error()))
586            })?;
587        }
588    }
589
590    let mut csr_connection_ids = BTreeSet::new();
591    for_each_format(format, |_, fmt| match fmt {
592        Format::Avro(AvroSchema::InlineSchema { .. })
593        | Format::Bytes
594        | Format::Csv { .. }
595        | Format::Json { .. }
596        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
597        | Format::Regex(..)
598        | Format::Text => (),
599        Format::Avro(AvroSchema::Csr {
600            csr_connection: CsrConnectionAvro { connection, .. },
601        })
602        | Format::Protobuf(ProtobufSchema::Csr {
603            csr_connection: CsrConnectionProtobuf { connection, .. },
604        }) => {
605            csr_connection_ids.insert(*connection.connection.item_id());
606        }
607    });
608
609    let scx = StatementContext::new(None, &catalog);
610    for csr_connection_id in csr_connection_ids {
611        let connection = {
612            let item = scx.get_item(&csr_connection_id);
613            // Get Kafka connection
614            match item.connection()? {
615                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
616                _ => Err(CsrPurificationError::NotCsrConnection(
617                    scx.catalog.resolve_full_name(item.name()),
618                ))?,
619            }
620        };
621
622        let client = connection
623            .connect(storage_configuration, InTask::No)
624            .await
625            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
626
627        client
628            .list_subjects()
629            .await
630            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
631    }
632
633    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
634
635    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
636}
637
638/// Runs a function on each format within the format specifier.
639///
640/// The function is provided with a `DocOnSchema` that indicates whether the
641/// format is for the key, value, or both.
642///
643// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
644fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
645where
646    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
647{
648    match format {
649        None => (),
650        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
651        Some(FormatSpecifier::KeyValue { key, value }) => {
652            f(DocOnSchema::KeyOnly, key);
653            f(DocOnSchema::ValueOnly, value);
654        }
655    }
656}
657
658/// Defines whether purification should enforce that at least one valid source
659/// reference is provided on the provided statement.
660#[derive(Debug, Copy, Clone, PartialEq, Eq)]
661pub(crate) enum SourceReferencePolicy {
662    /// Don't allow source references to be provided. This is used for
663    /// enforcing that `CREATE SOURCE` statements don't create subsources.
664    NotAllowed,
665    /// Allow empty references, such as when creating a source that
666    /// will have tables added afterwards.
667    Optional,
668    /// Require that at least one reference is resolved to an upstream
669    /// object.
670    Required,
671}
672
673async fn purify_create_source(
674    catalog: impl SessionCatalog,
675    now: u64,
676    mut create_source_stmt: CreateSourceStatement<Aug>,
677    storage_configuration: &StorageConfiguration,
678) -> Result<PurifiedStatement, PlanError> {
679    let CreateSourceStatement {
680        name: source_name,
681        col_names,
682        key_constraint,
683        connection: source_connection,
684        format,
685        envelope,
686        include_metadata,
687        external_references,
688        progress_subsource,
689        with_options,
690        ..
691    } = &mut create_source_stmt;
692
693    let uses_old_syntax = !col_names.is_empty()
694        || key_constraint.is_some()
695        || format.is_some()
696        || envelope.is_some()
697        || !include_metadata.is_empty()
698        || external_references.is_some()
699        || progress_subsource.is_some();
700
701    if let Some(DeferredItemName::Named(_)) = progress_subsource {
702        sql_bail!("Cannot manually ID qualify progress subsource")
703    }
704
705    let mut requested_subsource_map = BTreeMap::new();
706
707    let progress_desc = match &source_connection {
708        CreateSourceConnection::Kafka { .. } => {
709            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
710        }
711        CreateSourceConnection::Postgres { .. } => {
712            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
713        }
714        CreateSourceConnection::SqlServer { .. } => {
715            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
716        }
717        CreateSourceConnection::MySql { .. } => {
718            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
719        }
720        CreateSourceConnection::LoadGenerator { .. } => {
721            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
722        }
723    };
724    let scx = StatementContext::new(None, &catalog);
725
726    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
727    // to add tables after this source is created we might need to enforce that
728    // auto-generated subsources are created or not created by this source statement.
729    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
730        && scx.catalog.system_vars().force_source_table_syntax()
731    {
732        SourceReferencePolicy::NotAllowed
733    } else if scx.catalog.system_vars().enable_create_table_from_source() {
734        SourceReferencePolicy::Optional
735    } else {
736        SourceReferencePolicy::Required
737    };
738
739    let mut format_options = SourceFormatOptions::Default;
740
741    let retrieved_source_references: RetrievedSourceReferences;
742
743    match source_connection {
744        CreateSourceConnection::Kafka {
745            connection,
746            options: base_with_options,
747            ..
748        } => {
749            if let Some(external_references) = external_references {
750                Err(KafkaSourcePurificationError::ReferencedSubsources(
751                    external_references.clone(),
752                ))?;
753            }
754
755            let connection = {
756                let item = scx.get_item_by_resolved_name(connection)?;
757                // Get Kafka connection
758                match item.connection()? {
759                    Connection::Kafka(connection) => {
760                        connection.clone().into_inline_connection(&catalog)
761                    }
762                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
763                        scx.catalog.resolve_full_name(item.name()),
764                    ))?,
765                }
766            };
767
768            let extracted_options: KafkaSourceConfigOptionExtracted =
769                base_with_options.clone().try_into()?;
770
771            let topic = extracted_options
772                .topic
773                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
774
775            let consumer = connection
776                .create_with_context(
777                    storage_configuration,
778                    MzClientContext::default(),
779                    &BTreeMap::new(),
780                    InTask::No,
781                )
782                .await
783                .map_err(|e| {
784                    // anyhow doesn't support Clone, so not trivial to move into PlanError
785                    KafkaSourcePurificationError::KafkaConsumerError(
786                        e.display_with_causes().to_string(),
787                    )
788                })?;
789            let consumer = Arc::new(consumer);
790
791            match (
792                extracted_options.start_offset,
793                extracted_options.start_timestamp,
794            ) {
795                (None, None) => {
796                    // Validate that the topic at least exists.
797                    kafka_util::ensure_topic_exists(
798                        Arc::clone(&consumer),
799                        &topic,
800                        storage_configuration
801                            .parameters
802                            .kafka_timeout_config
803                            .fetch_metadata_timeout,
804                    )
805                    .await?;
806                }
807                (Some(_), Some(_)) => {
808                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
809                }
810                (Some(start_offsets), None) => {
811                    // Validate the start offsets.
812                    kafka_util::validate_start_offsets(
813                        Arc::clone(&consumer),
814                        &topic,
815                        start_offsets,
816                        storage_configuration
817                            .parameters
818                            .kafka_timeout_config
819                            .fetch_metadata_timeout,
820                    )
821                    .await?;
822                }
823                (None, Some(time_offset)) => {
824                    // Translate `START TIMESTAMP` to a start offset.
825                    let start_offsets = kafka_util::lookup_start_offsets(
826                        Arc::clone(&consumer),
827                        &topic,
828                        time_offset,
829                        now,
830                        storage_configuration
831                            .parameters
832                            .kafka_timeout_config
833                            .fetch_metadata_timeout,
834                    )
835                    .await?;
836
837                    base_with_options.retain(|val| {
838                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
839                    });
840                    base_with_options.push(KafkaSourceConfigOption {
841                        name: KafkaSourceConfigOptionName::StartOffset,
842                        value: Some(WithOptionValue::Sequence(
843                            start_offsets
844                                .iter()
845                                .map(|offset| {
846                                    WithOptionValue::Value(Value::Number(offset.to_string()))
847                                })
848                                .collect(),
849                        )),
850                    });
851                }
852            }
853
854            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
855            retrieved_source_references = reference_client.get_source_references().await?;
856
857            format_options = SourceFormatOptions::Kafka { topic };
858        }
859        CreateSourceConnection::Postgres {
860            connection,
861            options,
862        } => {
863            let connection_item = scx.get_item_by_resolved_name(connection)?;
864            let connection = match connection_item.connection().map_err(PlanError::from)? {
865                Connection::Postgres(connection) => {
866                    connection.clone().into_inline_connection(&catalog)
867                }
868                _ => Err(PgSourcePurificationError::NotPgConnection(
869                    scx.catalog.resolve_full_name(connection_item.name()),
870                ))?,
871            };
872            let crate::plan::statement::PgConfigOptionExtracted {
873                publication,
874                text_columns,
875                exclude_columns,
876                details,
877                ..
878            } = options.clone().try_into()?;
879            let publication =
880                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
881
882            if details.is_some() {
883                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
884            }
885
886            let client = connection
887                .validate(connection_item.id(), storage_configuration)
888                .await?;
889
890            let reference_client = SourceReferenceClient::Postgres {
891                client: &client,
892                publication: &publication,
893                database: &connection.database,
894            };
895            retrieved_source_references = reference_client.get_source_references().await?;
896
897            let postgres::PurifiedSourceExports {
898                source_exports: subsources,
899                normalized_text_columns,
900            } = postgres::purify_source_exports(
901                &client,
902                &retrieved_source_references,
903                external_references,
904                text_columns,
905                exclude_columns,
906                source_name,
907                &reference_policy,
908            )
909            .await?;
910
911            if let Some(text_cols_option) = options
912                .iter_mut()
913                .find(|option| option.name == PgConfigOptionName::TextColumns)
914            {
915                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
916            }
917
918            requested_subsource_map.extend(subsources);
919
920            // Record the active replication timeline_id to allow detection of a future upstream
921            // point-in-time-recovery that will put the source into an error state.
922            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
923
924            // Remove any old detail references
925            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
926            let details = PostgresSourcePublicationDetails {
927                slot: format!(
928                    "materialize_{}",
929                    Uuid::new_v4().to_string().replace('-', "")
930                ),
931                timeline_id: Some(timeline_id),
932                database: connection.database,
933            };
934            options.push(PgConfigOption {
935                name: PgConfigOptionName::Details,
936                value: Some(WithOptionValue::Value(Value::String(hex::encode(
937                    details.into_proto().encode_to_vec(),
938                )))),
939            })
940        }
941        CreateSourceConnection::SqlServer {
942            connection,
943            options,
944        } => {
945            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
946
947            // Connect to the upstream SQL Server instance so we can validate
948            // we're compatible with CDC.
949            let connection_item = scx.get_item_by_resolved_name(connection)?;
950            let connection = match connection_item.connection()? {
951                Connection::SqlServer(connection) => {
952                    connection.clone().into_inline_connection(&catalog)
953                }
954                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
955                    scx.catalog.resolve_full_name(connection_item.name()),
956                ))?,
957            };
958            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
959                details,
960                text_columns,
961                exclude_columns,
962                seen: _,
963            } = options.clone().try_into()?;
964
965            if details.is_some() {
966                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
967            }
968
969            let mut client = connection
970                .validate(connection_item.id(), storage_configuration)
971                .await?;
972
973            let database: Arc<str> = connection.database.into();
974            let reference_client = SourceReferenceClient::SqlServer {
975                client: &mut client,
976                database: Arc::clone(&database),
977            };
978            retrieved_source_references = reference_client.get_source_references().await?;
979            tracing::debug!(?retrieved_source_references, "got source references");
980
981            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
982                .get(storage_configuration.config_set());
983
984            let purified_source_exports = sql_server::purify_source_exports(
985                &*database,
986                &mut client,
987                &retrieved_source_references,
988                external_references,
989                &text_columns,
990                &exclude_columns,
991                source_name,
992                timeout,
993                &reference_policy,
994            )
995            .await?;
996
997            let sql_server::PurifiedSourceExports {
998                source_exports,
999                normalized_text_columns,
1000                normalized_excl_columns,
1001            } = purified_source_exports;
1002
1003            // Update our set of requested source exports.
1004            requested_subsource_map.extend(source_exports);
1005
1006            // Record the most recent restore_history_id, or none if the system has never been
1007            // restored.
1008
1009            let restore_history_id =
1010                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1011            let details = SqlServerSourceExtras { restore_history_id };
1012
1013            options.retain(|SqlServerConfigOption { name, .. }| {
1014                name != &SqlServerConfigOptionName::Details
1015            });
1016            options.push(SqlServerConfigOption {
1017                name: SqlServerConfigOptionName::Details,
1018                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1019                    details.into_proto().encode_to_vec(),
1020                )))),
1021            });
1022
1023            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1024            if let Some(text_cols_option) = options
1025                .iter_mut()
1026                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1027            {
1028                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1029            }
1030            if let Some(excl_cols_option) = options
1031                .iter_mut()
1032                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1033            {
1034                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1035            }
1036        }
1037        CreateSourceConnection::MySql {
1038            connection,
1039            options,
1040        } => {
1041            let connection_item = scx.get_item_by_resolved_name(connection)?;
1042            let connection = match connection_item.connection()? {
1043                Connection::MySql(connection) => {
1044                    connection.clone().into_inline_connection(&catalog)
1045                }
1046                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1047                    scx.catalog.resolve_full_name(connection_item.name()),
1048                ))?,
1049            };
1050            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1051                details,
1052                text_columns,
1053                exclude_columns,
1054                seen: _,
1055            } = options.clone().try_into()?;
1056
1057            if details.is_some() {
1058                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1059            }
1060
1061            let mut conn = connection
1062                .validate(connection_item.id(), storage_configuration)
1063                .await
1064                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1065
1066            // Retrieve the current @gtid_executed value of the server to mark as the effective
1067            // initial snapshot point such that we can ensure consistency if the initial source
1068            // snapshot is broken up over multiple points in time.
1069            let initial_gtid_set =
1070                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1071
1072            let reference_client = SourceReferenceClient::MySql {
1073                conn: &mut conn,
1074                include_system_schemas: mysql::references_system_schemas(external_references),
1075            };
1076            retrieved_source_references = reference_client.get_source_references().await?;
1077
1078            let mysql::PurifiedSourceExports {
1079                source_exports: subsources,
1080                normalized_text_columns,
1081                normalized_exclude_columns,
1082            } = mysql::purify_source_exports(
1083                &mut conn,
1084                &retrieved_source_references,
1085                external_references,
1086                text_columns,
1087                exclude_columns,
1088                source_name,
1089                initial_gtid_set.clone(),
1090                &reference_policy,
1091            )
1092            .await?;
1093            requested_subsource_map.extend(subsources);
1094
1095            // We don't have any fields in this details struct but keep this around for
1096            // conformity with postgres and in-case we end up needing it again in the future.
1097            let details = MySqlSourceDetails {};
1098            // Update options with the purified details
1099            options
1100                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1101            options.push(MySqlConfigOption {
1102                name: MySqlConfigOptionName::Details,
1103                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1104                    details.into_proto().encode_to_vec(),
1105                )))),
1106            });
1107
1108            if let Some(text_cols_option) = options
1109                .iter_mut()
1110                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1111            {
1112                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1113            }
1114            if let Some(exclude_cols_option) = options
1115                .iter_mut()
1116                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1117            {
1118                exclude_cols_option.value =
1119                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1120            }
1121        }
1122        CreateSourceConnection::LoadGenerator { generator, options } => {
1123            let load_generator =
1124                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1125
1126            let reference_client = SourceReferenceClient::LoadGenerator {
1127                generator: &load_generator,
1128            };
1129            retrieved_source_references = reference_client.get_source_references().await?;
1130            // Filter to the references that need to be created as 'subsources', which
1131            // doesn't include the default output for single-output sources.
1132            // TODO(database-issues#8620): Remove once subsources are removed
1133            let multi_output_sources =
1134                retrieved_source_references
1135                    .all_references()
1136                    .iter()
1137                    .any(|r| {
1138                        r.load_generator_output().expect("is loadgen")
1139                            != &LoadGeneratorOutput::Default
1140                    });
1141
1142            match external_references {
1143                Some(requested)
1144                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1145                {
1146                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1147                }
1148                Some(requested) if !multi_output_sources => match requested {
1149                    ExternalReferences::SubsetTables(_) => {
1150                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1151                    }
1152                    ExternalReferences::SubsetSchemas(_) => {
1153                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1154                    }
1155                    ExternalReferences::All => {
1156                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1157                    }
1158                },
1159                Some(requested) => {
1160                    let requested_exports = retrieved_source_references
1161                        .requested_source_exports(Some(requested), source_name)?;
1162                    for export in requested_exports {
1163                        requested_subsource_map.insert(
1164                            export.name,
1165                            PurifiedSourceExport {
1166                                external_reference: export.external_reference,
1167                                details: PurifiedExportDetails::LoadGenerator {
1168                                    table: export
1169                                        .meta
1170                                        .load_generator_desc()
1171                                        .expect("is loadgen")
1172                                        .clone(),
1173                                    output: export
1174                                        .meta
1175                                        .load_generator_output()
1176                                        .expect("is loadgen")
1177                                        .clone(),
1178                                },
1179                            },
1180                        );
1181                    }
1182                }
1183                None => {
1184                    if multi_output_sources
1185                        && matches!(reference_policy, SourceReferencePolicy::Required)
1186                    {
1187                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1188                    }
1189                }
1190            }
1191
1192            if let LoadGenerator::Clock = generator {
1193                if !options
1194                    .iter()
1195                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1196                {
1197                    let now = catalog.now();
1198                    options.push(LoadGeneratorOption {
1199                        name: LoadGeneratorOptionName::AsOf,
1200                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1201                    });
1202                }
1203            }
1204        }
1205    }
1206
1207    // Now that we know which subsources to create alongside this
1208    // statement, remove the references so it is not canonicalized as
1209    // part of the `CREATE SOURCE` statement in the catalog.
1210    *external_references = None;
1211
1212    // Generate progress subsource for old syntax
1213    let create_progress_subsource_stmt = if uses_old_syntax {
1214        // Take name from input or generate name
1215        let name = match progress_subsource {
1216            Some(name) => match name {
1217                DeferredItemName::Deferred(name) => name.clone(),
1218                DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1219            },
1220            None => {
1221                let (item, prefix) = source_name.0.split_last().unwrap();
1222                let item_name =
1223                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1224                        let mut suggested_name = prefix.to_vec();
1225                        suggested_name.push(candidate.clone());
1226
1227                        let partial =
1228                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1229                        let qualified = scx.allocate_qualified_name(partial)?;
1230                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1231                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1232                        Ok::<_, PlanError>(!item_exists && !type_exists)
1233                    })?;
1234
1235                let mut full_name = prefix.to_vec();
1236                full_name.push(item_name);
1237                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1238                let qualified_name = scx.allocate_qualified_name(full_name)?;
1239                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1240
1241                UnresolvedItemName::from(full_name.clone())
1242            }
1243        };
1244
1245        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1246
1247        // Create the subsource statement
1248        let mut progress_with_options: Vec<_> = with_options
1249            .iter()
1250            .filter_map(|opt| match opt.name {
1251                CreateSourceOptionName::TimestampInterval => None,
1252                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1253                    name: CreateSubsourceOptionName::RetainHistory,
1254                    value: opt.value.clone(),
1255                }),
1256            })
1257            .collect();
1258        progress_with_options.push(CreateSubsourceOption {
1259            name: CreateSubsourceOptionName::Progress,
1260            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1261        });
1262
1263        Some(CreateSubsourceStatement {
1264            name,
1265            columns,
1266            // Progress subsources do not refer to the source to which they belong.
1267            // Instead the primary source depends on it (the opposite is true of
1268            // ingestion exports, which depend on the primary source).
1269            of_source: None,
1270            constraints,
1271            if_not_exists: false,
1272            with_options: progress_with_options,
1273        })
1274    } else {
1275        None
1276    };
1277
1278    purify_source_format(
1279        &catalog,
1280        format,
1281        &format_options,
1282        envelope,
1283        storage_configuration,
1284    )
1285    .await?;
1286
1287    Ok(PurifiedStatement::PurifiedCreateSource {
1288        create_progress_subsource_stmt,
1289        create_source_stmt,
1290        subsources: requested_subsource_map,
1291        available_source_references: retrieved_source_references.available_source_references(),
1292    })
1293}
1294
1295/// On success, returns the details on new subsources and updated
1296/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1297async fn purify_alter_source(
1298    catalog: impl SessionCatalog,
1299    stmt: AlterSourceStatement<Aug>,
1300    storage_configuration: &StorageConfiguration,
1301) -> Result<PurifiedStatement, PlanError> {
1302    let scx = StatementContext::new(None, &catalog);
1303    let AlterSourceStatement {
1304        source_name: unresolved_source_name,
1305        action,
1306        if_exists,
1307    } = stmt;
1308
1309    // Get name.
1310    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1311        Ok(item) => item,
1312        Err(_) if if_exists => {
1313            return Ok(PurifiedStatement::PurifiedAlterSource {
1314                alter_source_stmt: AlterSourceStatement {
1315                    source_name: unresolved_source_name,
1316                    action,
1317                    if_exists,
1318                },
1319            });
1320        }
1321        Err(e) => return Err(e),
1322    };
1323
1324    // Ensure it's an ingestion-based and alterable source.
1325    let desc = match item.source_desc()? {
1326        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1327        None => {
1328            sql_bail!("cannot ALTER this type of source")
1329        }
1330    };
1331
1332    let source_name = item.name();
1333
1334    let resolved_source_name = ResolvedItemName::Item {
1335        id: item.id(),
1336        qualifiers: item.name().qualifiers.clone(),
1337        full_name: scx.catalog.resolve_full_name(source_name),
1338        print_id: true,
1339        version: RelationVersionSelector::Latest,
1340    };
1341
1342    let partial_name = scx.catalog.minimal_qualification(source_name);
1343
1344    match action {
1345        AlterSourceAction::AddSubsources {
1346            external_references,
1347            options,
1348        } => {
1349            if scx.catalog.system_vars().enable_create_table_from_source()
1350                && scx.catalog.system_vars().force_source_table_syntax()
1351            {
1352                Err(PlanError::UseTablesForSources(
1353                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1354                ))?;
1355            }
1356
1357            purify_alter_source_add_subsources(
1358                external_references,
1359                options,
1360                desc,
1361                partial_name,
1362                unresolved_source_name,
1363                resolved_source_name,
1364                storage_configuration,
1365            )
1366            .await
1367        }
1368        AlterSourceAction::RefreshReferences => {
1369            purify_alter_source_refresh_references(
1370                desc,
1371                resolved_source_name,
1372                storage_configuration,
1373            )
1374            .await
1375        }
1376        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1377            alter_source_stmt: AlterSourceStatement {
1378                source_name: unresolved_source_name,
1379                action,
1380                if_exists,
1381            },
1382        }),
1383    }
1384}
1385
1386// TODO(database-issues#8620): Remove once subsources are removed
1387/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1388async fn purify_alter_source_add_subsources(
1389    external_references: Vec<ExternalReferenceExport>,
1390    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1391    desc: SourceDesc,
1392    partial_source_name: PartialItemName,
1393    unresolved_source_name: UnresolvedItemName,
1394    resolved_source_name: ResolvedItemName,
1395    storage_configuration: &StorageConfiguration,
1396) -> Result<PurifiedStatement, PlanError> {
1397    // Validate this is a source that can have subsources added.
1398    let connection_id = match &desc.connection {
1399        GenericSourceConnection::Postgres(c) => c.connection_id,
1400        GenericSourceConnection::MySql(c) => c.connection_id,
1401        GenericSourceConnection::SqlServer(c) => c.connection_id,
1402        _ => sql_bail!(
1403            "source {} does not support ALTER SOURCE.",
1404            partial_source_name
1405        ),
1406    };
1407
1408    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1409        text_columns,
1410        exclude_columns,
1411        details,
1412        seen: _,
1413    } = options.clone().try_into()?;
1414    assert_none!(details, "details cannot be explicitly set");
1415
1416    let mut requested_subsource_map = BTreeMap::new();
1417
1418    match desc.connection {
1419        GenericSourceConnection::Postgres(pg_source_connection) => {
1420            // Get PostgresConnection for generating subsources.
1421            let pg_connection = &pg_source_connection.connection;
1422
1423            let client = pg_connection
1424                .validate(connection_id, storage_configuration)
1425                .await?;
1426
1427            let reference_client = SourceReferenceClient::Postgres {
1428                client: &client,
1429                publication: &pg_source_connection.publication,
1430                database: &pg_connection.database,
1431            };
1432            let retrieved_source_references = reference_client.get_source_references().await?;
1433
1434            let postgres::PurifiedSourceExports {
1435                source_exports: subsources,
1436                normalized_text_columns,
1437            } = postgres::purify_source_exports(
1438                &client,
1439                &retrieved_source_references,
1440                &Some(ExternalReferences::SubsetTables(external_references)),
1441                text_columns,
1442                exclude_columns,
1443                &unresolved_source_name,
1444                &SourceReferencePolicy::Required,
1445            )
1446            .await?;
1447
1448            if let Some(text_cols_option) = options
1449                .iter_mut()
1450                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1451            {
1452                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1453            }
1454
1455            requested_subsource_map.extend(subsources);
1456        }
1457        GenericSourceConnection::MySql(mysql_source_connection) => {
1458            let mysql_connection = &mysql_source_connection.connection;
1459            let config = mysql_connection
1460                .config(
1461                    &storage_configuration.connection_context.secrets_reader,
1462                    storage_configuration,
1463                    InTask::No,
1464                )
1465                .await?;
1466
1467            let mut conn = config
1468                .connect(
1469                    "mysql purification",
1470                    &storage_configuration.connection_context.ssh_tunnel_manager,
1471                )
1472                .await?;
1473
1474            // Retrieve the current @gtid_executed value of the server to mark as the effective
1475            // initial snapshot point for these subsources.
1476            let initial_gtid_set =
1477                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1478
1479            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1480
1481            let reference_client = SourceReferenceClient::MySql {
1482                conn: &mut conn,
1483                include_system_schemas: mysql::references_system_schemas(&requested_references),
1484            };
1485            let retrieved_source_references = reference_client.get_source_references().await?;
1486
1487            let mysql::PurifiedSourceExports {
1488                source_exports: subsources,
1489                normalized_text_columns,
1490                normalized_exclude_columns,
1491            } = mysql::purify_source_exports(
1492                &mut conn,
1493                &retrieved_source_references,
1494                &requested_references,
1495                text_columns,
1496                exclude_columns,
1497                &unresolved_source_name,
1498                initial_gtid_set,
1499                &SourceReferencePolicy::Required,
1500            )
1501            .await?;
1502            requested_subsource_map.extend(subsources);
1503
1504            // Update options with the purified details
1505            if let Some(text_cols_option) = options
1506                .iter_mut()
1507                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1508            {
1509                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1510            }
1511            if let Some(exclude_cols_option) = options
1512                .iter_mut()
1513                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1514            {
1515                exclude_cols_option.value =
1516                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1517            }
1518        }
1519        GenericSourceConnection::SqlServer(sql_server_source) => {
1520            // Open a connection to the upstream SQL Server instance.
1521            let sql_server_connection = &sql_server_source.connection;
1522            let config = sql_server_connection
1523                .resolve_config(
1524                    &storage_configuration.connection_context.secrets_reader,
1525                    storage_configuration,
1526                    InTask::No,
1527                )
1528                .await?;
1529            let mut client = mz_sql_server_util::Client::connect(config).await?;
1530
1531            // Query the upstream SQL Server instance for available tables to replicate.
1532            let database = sql_server_connection.database.clone().into();
1533            let source_references = SourceReferenceClient::SqlServer {
1534                client: &mut client,
1535                database: Arc::clone(&database),
1536            }
1537            .get_source_references()
1538            .await?;
1539            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1540
1541            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1542                .get(storage_configuration.config_set());
1543
1544            let result = sql_server::purify_source_exports(
1545                &*database,
1546                &mut client,
1547                &source_references,
1548                &requested_references,
1549                &text_columns,
1550                &exclude_columns,
1551                &unresolved_source_name,
1552                timeout,
1553                &SourceReferencePolicy::Required,
1554            )
1555            .await;
1556            let sql_server::PurifiedSourceExports {
1557                source_exports,
1558                normalized_text_columns,
1559                normalized_excl_columns,
1560            } = result?;
1561
1562            // Add the new exports to our subsource map.
1563            requested_subsource_map.extend(source_exports);
1564
1565            // Update options on the CREATE SOURCE statement with the purified details.
1566            if let Some(text_cols_option) = options
1567                .iter_mut()
1568                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1569            {
1570                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1571            }
1572            if let Some(exclude_cols_option) = options
1573                .iter_mut()
1574                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1575            {
1576                exclude_cols_option.value =
1577                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1578            }
1579        }
1580        _ => unreachable!(),
1581    };
1582
1583    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1584        source_name: resolved_source_name,
1585        options,
1586        subsources: requested_subsource_map,
1587    })
1588}
1589
1590async fn purify_alter_source_refresh_references(
1591    desc: SourceDesc,
1592    resolved_source_name: ResolvedItemName,
1593    storage_configuration: &StorageConfiguration,
1594) -> Result<PurifiedStatement, PlanError> {
1595    let retrieved_source_references = match desc.connection {
1596        GenericSourceConnection::Postgres(pg_source_connection) => {
1597            // Get PostgresConnection for generating subsources.
1598            let pg_connection = &pg_source_connection.connection;
1599
1600            let config = pg_connection
1601                .config(
1602                    &storage_configuration.connection_context.secrets_reader,
1603                    storage_configuration,
1604                    InTask::No,
1605                )
1606                .await?;
1607
1608            let client = config
1609                .connect(
1610                    "postgres_purification",
1611                    &storage_configuration.connection_context.ssh_tunnel_manager,
1612                )
1613                .await?;
1614            let reference_client = SourceReferenceClient::Postgres {
1615                client: &client,
1616                publication: &pg_source_connection.publication,
1617                database: &pg_connection.database,
1618            };
1619            reference_client.get_source_references().await?
1620        }
1621        GenericSourceConnection::MySql(mysql_source_connection) => {
1622            let mysql_connection = &mysql_source_connection.connection;
1623            let config = mysql_connection
1624                .config(
1625                    &storage_configuration.connection_context.secrets_reader,
1626                    storage_configuration,
1627                    InTask::No,
1628                )
1629                .await?;
1630
1631            let mut conn = config
1632                .connect(
1633                    "mysql purification",
1634                    &storage_configuration.connection_context.ssh_tunnel_manager,
1635                )
1636                .await?;
1637
1638            let reference_client = SourceReferenceClient::MySql {
1639                conn: &mut conn,
1640                include_system_schemas: false,
1641            };
1642            reference_client.get_source_references().await?
1643        }
1644        GenericSourceConnection::SqlServer(sql_server_source) => {
1645            // Open a connection to the upstream SQL Server instance.
1646            let sql_server_connection = &sql_server_source.connection;
1647            let config = sql_server_connection
1648                .resolve_config(
1649                    &storage_configuration.connection_context.secrets_reader,
1650                    storage_configuration,
1651                    InTask::No,
1652                )
1653                .await?;
1654            let mut client = mz_sql_server_util::Client::connect(config).await?;
1655
1656            // Query the upstream SQL Server instance for available tables to replicate.
1657            let source_references = SourceReferenceClient::SqlServer {
1658                client: &mut client,
1659                database: sql_server_connection.database.clone().into(),
1660            }
1661            .get_source_references()
1662            .await?;
1663            source_references
1664        }
1665        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1666            let reference_client = SourceReferenceClient::LoadGenerator {
1667                generator: &load_gen_connection.load_generator,
1668            };
1669            reference_client.get_source_references().await?
1670        }
1671        GenericSourceConnection::Kafka(kafka_conn) => {
1672            let reference_client = SourceReferenceClient::Kafka {
1673                topic: &kafka_conn.topic,
1674            };
1675            reference_client.get_source_references().await?
1676        }
1677    };
1678    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1679        source_name: resolved_source_name,
1680        available_source_references: retrieved_source_references.available_source_references(),
1681    })
1682}
1683
1684async fn purify_create_table_from_source(
1685    catalog: impl SessionCatalog,
1686    mut stmt: CreateTableFromSourceStatement<Aug>,
1687    storage_configuration: &StorageConfiguration,
1688) -> Result<PurifiedStatement, PlanError> {
1689    let scx = StatementContext::new(None, &catalog);
1690    let CreateTableFromSourceStatement {
1691        name: _,
1692        columns,
1693        constraints,
1694        source: source_name,
1695        if_not_exists: _,
1696        external_reference,
1697        format,
1698        envelope,
1699        include_metadata: _,
1700        with_options,
1701    } = &mut stmt;
1702
1703    // Columns and constraints cannot be specified by the user but will be populated below.
1704    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1705        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1706    }
1707    if !constraints.is_empty() {
1708        sql_bail!(
1709            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1710        );
1711    }
1712
1713    // Get the source item
1714    let item = match scx.get_item_by_resolved_name(source_name) {
1715        Ok(item) => item,
1716        Err(e) => return Err(e),
1717    };
1718
1719    // Ensure it's an ingestion-based and alterable source.
1720    let desc = match item.source_desc()? {
1721        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1722        None => {
1723            sql_bail!("cannot ALTER this type of source")
1724        }
1725    };
1726    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1727
1728    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1729        text_columns,
1730        exclude_columns,
1731        retain_history: _,
1732        details,
1733        partition_by: _,
1734        seen: _,
1735    } = with_options.clone().try_into()?;
1736    assert_none!(details, "details cannot be explicitly set");
1737
1738    // Our text column values are unqualified (just column names), but the purification methods below
1739    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1740    // need to qualify them using the external reference first.
1741    let qualified_text_columns = text_columns
1742        .iter()
1743        .map(|col| {
1744            UnresolvedItemName(
1745                external_reference
1746                    .as_ref()
1747                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1748                    .unwrap_or_else(|| vec![col.clone()]),
1749            )
1750        })
1751        .collect_vec();
1752    let qualified_exclude_columns = exclude_columns
1753        .iter()
1754        .map(|col| {
1755            UnresolvedItemName(
1756                external_reference
1757                    .as_ref()
1758                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1759                    .unwrap_or_else(|| vec![col.clone()]),
1760            )
1761        })
1762        .collect_vec();
1763
1764    // Should be overriden below if a source-specific format is required.
1765    let mut format_options = SourceFormatOptions::Default;
1766
1767    let retrieved_source_references: RetrievedSourceReferences;
1768
1769    let requested_references = external_reference.as_ref().map(|ref_name| {
1770        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1771            reference: ref_name.clone(),
1772            alias: None,
1773        }])
1774    });
1775
1776    // Run purification work specific to each source type: resolve the external reference to
1777    // a fully qualified name and obtain the appropriate details for the source-export statement
1778    let purified_export = match desc.connection {
1779        GenericSourceConnection::Postgres(pg_source_connection) => {
1780            // Get PostgresConnection for generating subsources.
1781            let pg_connection = &pg_source_connection.connection;
1782
1783            let client = pg_connection
1784                .validate(pg_source_connection.connection_id, storage_configuration)
1785                .await?;
1786
1787            let reference_client = SourceReferenceClient::Postgres {
1788                client: &client,
1789                publication: &pg_source_connection.publication,
1790                database: &pg_connection.database,
1791            };
1792            retrieved_source_references = reference_client.get_source_references().await?;
1793
1794            let postgres::PurifiedSourceExports {
1795                source_exports,
1796                // TODO(database-issues#8620): Remove once subsources are removed
1797                // This `normalized_text_columns` is not relevant for us and is only returned for
1798                // `CREATE SOURCE` statements that automatically generate subsources
1799                normalized_text_columns: _,
1800            } = postgres::purify_source_exports(
1801                &client,
1802                &retrieved_source_references,
1803                &requested_references,
1804                qualified_text_columns,
1805                qualified_exclude_columns,
1806                &unresolved_source_name,
1807                &SourceReferencePolicy::Required,
1808            )
1809            .await?;
1810            // There should be exactly one source_export returned for this statement
1811            let (_, purified_export) = source_exports.into_element();
1812            purified_export
1813        }
1814        GenericSourceConnection::MySql(mysql_source_connection) => {
1815            let mysql_connection = &mysql_source_connection.connection;
1816            let config = mysql_connection
1817                .config(
1818                    &storage_configuration.connection_context.secrets_reader,
1819                    storage_configuration,
1820                    InTask::No,
1821                )
1822                .await?;
1823
1824            let mut conn = config
1825                .connect(
1826                    "mysql purification",
1827                    &storage_configuration.connection_context.ssh_tunnel_manager,
1828                )
1829                .await?;
1830
1831            // Retrieve the current @gtid_executed value of the server to mark as the effective
1832            // initial snapshot point for this table.
1833            let initial_gtid_set =
1834                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1835
1836            let reference_client = SourceReferenceClient::MySql {
1837                conn: &mut conn,
1838                include_system_schemas: mysql::references_system_schemas(&requested_references),
1839            };
1840            retrieved_source_references = reference_client.get_source_references().await?;
1841
1842            let mysql::PurifiedSourceExports {
1843                source_exports,
1844                // TODO(database-issues#8620): Remove once subsources are removed
1845                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1846                // `CREATE SOURCE` statements that automatically generate subsources
1847                normalized_text_columns: _,
1848                normalized_exclude_columns: _,
1849            } = mysql::purify_source_exports(
1850                &mut conn,
1851                &retrieved_source_references,
1852                &requested_references,
1853                qualified_text_columns,
1854                qualified_exclude_columns,
1855                &unresolved_source_name,
1856                initial_gtid_set,
1857                &SourceReferencePolicy::Required,
1858            )
1859            .await?;
1860            // There should be exactly one source_export returned for this statement
1861            let (_, purified_export) = source_exports.into_element();
1862            purified_export
1863        }
1864        GenericSourceConnection::SqlServer(sql_server_source) => {
1865            let connection = sql_server_source.connection;
1866            let config = connection
1867                .resolve_config(
1868                    &storage_configuration.connection_context.secrets_reader,
1869                    storage_configuration,
1870                    InTask::No,
1871                )
1872                .await?;
1873            let mut client = mz_sql_server_util::Client::connect(config).await?;
1874
1875            let database: Arc<str> = connection.database.into();
1876            let reference_client = SourceReferenceClient::SqlServer {
1877                client: &mut client,
1878                database: Arc::clone(&database),
1879            };
1880            retrieved_source_references = reference_client.get_source_references().await?;
1881            tracing::debug!(?retrieved_source_references, "got source references");
1882
1883            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1884                .get(storage_configuration.config_set());
1885
1886            let purified_source_exports = sql_server::purify_source_exports(
1887                &*database,
1888                &mut client,
1889                &retrieved_source_references,
1890                &requested_references,
1891                &qualified_text_columns,
1892                &qualified_exclude_columns,
1893                &unresolved_source_name,
1894                timeout,
1895                &SourceReferencePolicy::Required,
1896            )
1897            .await?;
1898
1899            // There should be exactly one source_export returned for this statement
1900            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1901            purified_export
1902        }
1903        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1904            let reference_client = SourceReferenceClient::LoadGenerator {
1905                generator: &load_gen_connection.load_generator,
1906            };
1907            retrieved_source_references = reference_client.get_source_references().await?;
1908
1909            let requested_exports = retrieved_source_references
1910                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1911            // There should be exactly one source_export returned
1912            let export = requested_exports.into_element();
1913            PurifiedSourceExport {
1914                external_reference: export.external_reference,
1915                details: PurifiedExportDetails::LoadGenerator {
1916                    table: export
1917                        .meta
1918                        .load_generator_desc()
1919                        .expect("is loadgen")
1920                        .clone(),
1921                    output: export
1922                        .meta
1923                        .load_generator_output()
1924                        .expect("is loadgen")
1925                        .clone(),
1926                },
1927            }
1928        }
1929        GenericSourceConnection::Kafka(kafka_conn) => {
1930            let reference_client = SourceReferenceClient::Kafka {
1931                topic: &kafka_conn.topic,
1932            };
1933            retrieved_source_references = reference_client.get_source_references().await?;
1934            let requested_exports = retrieved_source_references
1935                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1936            // There should be exactly one source_export returned
1937            let export = requested_exports.into_element();
1938
1939            format_options = SourceFormatOptions::Kafka {
1940                topic: kafka_conn.topic.clone(),
1941            };
1942            PurifiedSourceExport {
1943                external_reference: export.external_reference,
1944                details: PurifiedExportDetails::Kafka {},
1945            }
1946        }
1947    };
1948
1949    purify_source_format(
1950        &catalog,
1951        format,
1952        &format_options,
1953        envelope,
1954        storage_configuration,
1955    )
1956    .await?;
1957
1958    // Update the external reference in the statement to the resolved fully-qualified
1959    // external reference
1960    *external_reference = Some(purified_export.external_reference.clone());
1961
1962    // Update options in the statement using the purified export details
1963    match &purified_export.details {
1964        PurifiedExportDetails::Postgres { .. } => {
1965            let mut unsupported_cols = vec![];
1966            let postgres::PostgresExportStatementValues {
1967                columns: gen_columns,
1968                constraints: gen_constraints,
1969                text_columns: gen_text_columns,
1970                exclude_columns: gen_exclude_columns,
1971                details: gen_details,
1972                external_reference: _,
1973            } = postgres::generate_source_export_statement_values(
1974                &scx,
1975                purified_export,
1976                &mut unsupported_cols,
1977            )?;
1978            if !unsupported_cols.is_empty() {
1979                unsupported_cols.sort();
1980                Err(PgSourcePurificationError::UnrecognizedTypes {
1981                    cols: unsupported_cols,
1982                })?;
1983            }
1984
1985            if let Some(text_cols_option) = with_options
1986                .iter_mut()
1987                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1988            {
1989                match gen_text_columns {
1990                    Some(gen_text_columns) => {
1991                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1992                    }
1993                    None => soft_panic_or_log!(
1994                        "text_columns should be Some if text_cols_option is present"
1995                    ),
1996                }
1997            }
1998            if let Some(exclude_cols_option) = with_options
1999                .iter_mut()
2000                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2001            {
2002                match gen_exclude_columns {
2003                    Some(gen_exclude_columns) => {
2004                        exclude_cols_option.value =
2005                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2006                    }
2007                    None => soft_panic_or_log!(
2008                        "exclude_columns should be Some if exclude_cols_option is present"
2009                    ),
2010                }
2011            }
2012            match columns {
2013                TableFromSourceColumns::Defined(_) => unreachable!(),
2014                TableFromSourceColumns::NotSpecified => {
2015                    *columns = TableFromSourceColumns::Defined(gen_columns);
2016                    *constraints = gen_constraints;
2017                }
2018                TableFromSourceColumns::Named(_) => {
2019                    sql_bail!("columns cannot be named for Postgres sources")
2020                }
2021            }
2022            with_options.push(TableFromSourceOption {
2023                name: TableFromSourceOptionName::Details,
2024                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2025                    gen_details.into_proto().encode_to_vec(),
2026                )))),
2027            })
2028        }
2029        PurifiedExportDetails::MySql { .. } => {
2030            let mysql::MySqlExportStatementValues {
2031                columns: gen_columns,
2032                constraints: gen_constraints,
2033                text_columns: gen_text_columns,
2034                exclude_columns: gen_exclude_columns,
2035                details: gen_details,
2036                external_reference: _,
2037            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2038
2039            if let Some(text_cols_option) = with_options
2040                .iter_mut()
2041                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2042            {
2043                match gen_text_columns {
2044                    Some(gen_text_columns) => {
2045                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2046                    }
2047                    None => soft_panic_or_log!(
2048                        "text_columns should be Some if text_cols_option is present"
2049                    ),
2050                }
2051            }
2052            if let Some(exclude_cols_option) = with_options
2053                .iter_mut()
2054                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2055            {
2056                match gen_exclude_columns {
2057                    Some(gen_exclude_columns) => {
2058                        exclude_cols_option.value =
2059                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2060                    }
2061                    None => soft_panic_or_log!(
2062                        "exclude_columns should be Some if exclude_cols_option is present"
2063                    ),
2064                }
2065            }
2066            match columns {
2067                TableFromSourceColumns::Defined(_) => unreachable!(),
2068                TableFromSourceColumns::NotSpecified => {
2069                    *columns = TableFromSourceColumns::Defined(gen_columns);
2070                    *constraints = gen_constraints;
2071                }
2072                TableFromSourceColumns::Named(_) => {
2073                    sql_bail!("columns cannot be named for MySQL sources")
2074                }
2075            }
2076            with_options.push(TableFromSourceOption {
2077                name: TableFromSourceOptionName::Details,
2078                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2079                    gen_details.into_proto().encode_to_vec(),
2080                )))),
2081            })
2082        }
2083        PurifiedExportDetails::SqlServer { .. } => {
2084            let sql_server::SqlServerExportStatementValues {
2085                columns: gen_columns,
2086                constraints: gen_constraints,
2087                text_columns: gen_text_columns,
2088                excl_columns: gen_excl_columns,
2089                details: gen_details,
2090                external_reference: _,
2091            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2092
2093            if let Some(text_cols_option) = with_options
2094                .iter_mut()
2095                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2096            {
2097                match gen_text_columns {
2098                    Some(gen_text_columns) => {
2099                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2100                    }
2101                    None => soft_panic_or_log!(
2102                        "text_columns should be Some if text_cols_option is present"
2103                    ),
2104                }
2105            }
2106            if let Some(exclude_cols_option) = with_options
2107                .iter_mut()
2108                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2109            {
2110                match gen_excl_columns {
2111                    Some(gen_excl_columns) => {
2112                        exclude_cols_option.value =
2113                            Some(WithOptionValue::Sequence(gen_excl_columns))
2114                    }
2115                    None => soft_panic_or_log!(
2116                        "excl_columns should be Some if excl_cols_option is present"
2117                    ),
2118                }
2119            }
2120
2121            match columns {
2122                TableFromSourceColumns::NotSpecified => {
2123                    *columns = TableFromSourceColumns::Defined(gen_columns);
2124                    *constraints = gen_constraints;
2125                }
2126                TableFromSourceColumns::Named(_) => {
2127                    sql_bail!("columns cannot be named for SQL Server sources")
2128                }
2129                TableFromSourceColumns::Defined(_) => unreachable!(),
2130            }
2131
2132            with_options.push(TableFromSourceOption {
2133                name: TableFromSourceOptionName::Details,
2134                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2135                    gen_details.into_proto().encode_to_vec(),
2136                )))),
2137            })
2138        }
2139        PurifiedExportDetails::LoadGenerator { .. } => {
2140            let (desc, output) = match purified_export.details {
2141                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2142                _ => unreachable!("purified export details must be load generator"),
2143            };
2144            // We only determine the table description for multi-output load generator sources here,
2145            // whereas single-output load generators will have their relation description
2146            // determined during statment planning as envelope and format options may affect their
2147            // schema.
2148            if let Some(desc) = desc {
2149                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2150                match columns {
2151                    TableFromSourceColumns::Defined(_) => unreachable!(),
2152                    TableFromSourceColumns::NotSpecified => {
2153                        *columns = TableFromSourceColumns::Defined(gen_columns);
2154                        *constraints = gen_constraints;
2155                    }
2156                    TableFromSourceColumns::Named(_) => {
2157                        sql_bail!("columns cannot be named for multi-output load generator sources")
2158                    }
2159                }
2160            }
2161            let details = SourceExportStatementDetails::LoadGenerator { output };
2162            with_options.push(TableFromSourceOption {
2163                name: TableFromSourceOptionName::Details,
2164                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2165                    details.into_proto().encode_to_vec(),
2166                )))),
2167            })
2168        }
2169        PurifiedExportDetails::Kafka {} => {
2170            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2171            // format field, so we don't specify any columns or constraints to be stored
2172            // on the statement here. The RelationDesc will be determined during planning.
2173            let details = SourceExportStatementDetails::Kafka {};
2174            with_options.push(TableFromSourceOption {
2175                name: TableFromSourceOptionName::Details,
2176                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2177                    details.into_proto().encode_to_vec(),
2178                )))),
2179            })
2180        }
2181    };
2182
2183    // TODO: We might as well use the retrieved available references to update the source
2184    // available references table in the catalog, so plumb this through.
2185    // available_source_references: retrieved_source_references.available_source_references(),
2186    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2187}
2188
2189enum SourceFormatOptions {
2190    Default,
2191    Kafka { topic: String },
2192}
2193
2194async fn purify_source_format(
2195    catalog: &dyn SessionCatalog,
2196    format: &mut Option<FormatSpecifier<Aug>>,
2197    options: &SourceFormatOptions,
2198    envelope: &Option<SourceEnvelope>,
2199    storage_configuration: &StorageConfiguration,
2200) -> Result<(), PlanError> {
2201    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2202        && !matches!(options, SourceFormatOptions::Kafka { .. })
2203    {
2204        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2205    }
2206
2207    match format.as_mut() {
2208        None => {}
2209        Some(FormatSpecifier::Bare(format)) => {
2210            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2211                .await?;
2212        }
2213
2214        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2215            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2216                .await?;
2217            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2218                .await?;
2219        }
2220    }
2221    Ok(())
2222}
2223
2224async fn purify_source_format_single(
2225    catalog: &dyn SessionCatalog,
2226    format: &mut Format<Aug>,
2227    options: &SourceFormatOptions,
2228    envelope: &Option<SourceEnvelope>,
2229    storage_configuration: &StorageConfiguration,
2230) -> Result<(), PlanError> {
2231    match format {
2232        Format::Avro(schema) => match schema {
2233            AvroSchema::Csr { csr_connection } => {
2234                purify_csr_connection_avro(
2235                    catalog,
2236                    options,
2237                    csr_connection,
2238                    envelope,
2239                    storage_configuration,
2240                )
2241                .await?
2242            }
2243            AvroSchema::InlineSchema { .. } => {}
2244        },
2245        Format::Protobuf(schema) => match schema {
2246            ProtobufSchema::Csr { csr_connection } => {
2247                purify_csr_connection_proto(
2248                    catalog,
2249                    options,
2250                    csr_connection,
2251                    envelope,
2252                    storage_configuration,
2253                )
2254                .await?;
2255            }
2256            ProtobufSchema::InlineSchema { .. } => {}
2257        },
2258        Format::Bytes
2259        | Format::Regex(_)
2260        | Format::Json { .. }
2261        | Format::Text
2262        | Format::Csv { .. } => (),
2263    }
2264    Ok(())
2265}
2266
2267pub fn generate_subsource_statements(
2268    scx: &StatementContext,
2269    source_name: ResolvedItemName,
2270    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2271) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2272    // get the first subsource to determine the connection type
2273    if subsources.is_empty() {
2274        return Ok(vec![]);
2275    }
2276    let (_, purified_export) = subsources.iter().next().unwrap();
2277
2278    let statements = match &purified_export.details {
2279        PurifiedExportDetails::Postgres { .. } => {
2280            crate::pure::postgres::generate_create_subsource_statements(
2281                scx,
2282                source_name,
2283                subsources,
2284            )?
2285        }
2286        PurifiedExportDetails::MySql { .. } => {
2287            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2288        }
2289        PurifiedExportDetails::SqlServer { .. } => {
2290            crate::pure::sql_server::generate_create_subsource_statements(
2291                scx,
2292                source_name,
2293                subsources,
2294            )?
2295        }
2296        PurifiedExportDetails::LoadGenerator { .. } => {
2297            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2298            for (subsource_name, purified_export) in subsources {
2299                let (desc, output) = match purified_export.details {
2300                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2301                    _ => unreachable!("purified export details must be load generator"),
2302                };
2303                let desc =
2304                    desc.expect("subsources cannot be generated for single-output load generators");
2305
2306                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2307                let details = SourceExportStatementDetails::LoadGenerator { output };
2308                // Create the subsource statement
2309                let subsource = CreateSubsourceStatement {
2310                    name: subsource_name,
2311                    columns,
2312                    of_source: Some(source_name.clone()),
2313                    // unlike sources that come from an external upstream, we
2314                    // have more leniency to introduce different constraints
2315                    // every time the load generator is run; i.e. we are not as
2316                    // worried about introducing junk data.
2317                    constraints: table_constraints,
2318                    if_not_exists: false,
2319                    with_options: vec![
2320                        CreateSubsourceOption {
2321                            name: CreateSubsourceOptionName::ExternalReference,
2322                            value: Some(WithOptionValue::UnresolvedItemName(
2323                                purified_export.external_reference,
2324                            )),
2325                        },
2326                        CreateSubsourceOption {
2327                            name: CreateSubsourceOptionName::Details,
2328                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2329                                details.into_proto().encode_to_vec(),
2330                            )))),
2331                        },
2332                    ],
2333                };
2334                subsource_stmts.push(subsource);
2335            }
2336
2337            subsource_stmts
2338        }
2339        PurifiedExportDetails::Kafka { .. } => {
2340            // TODO: as part of database-issues#8322, Kafka sources will begin
2341            // producing data––we'll need to understand the schema
2342            // of the output here.
2343            assert!(
2344                subsources.is_empty(),
2345                "Kafka sources do not produce data-bearing subsources"
2346            );
2347            vec![]
2348        }
2349    };
2350    Ok(statements)
2351}
2352
2353async fn purify_csr_connection_proto(
2354    catalog: &dyn SessionCatalog,
2355    options: &SourceFormatOptions,
2356    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2357    envelope: &Option<SourceEnvelope>,
2358    storage_configuration: &StorageConfiguration,
2359) -> Result<(), PlanError> {
2360    let SourceFormatOptions::Kafka { topic } = options else {
2361        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2362    };
2363
2364    let CsrConnectionProtobuf {
2365        seed,
2366        connection: CsrConnection {
2367            connection,
2368            options: _,
2369        },
2370    } = csr_connection;
2371    match seed {
2372        None => {
2373            let scx = StatementContext::new(None, &*catalog);
2374
2375            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2376                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2377                _ => sql_bail!("{} is not a schema registry connection", connection),
2378            };
2379
2380            let ccsr_client = ccsr_connection
2381                .connect(storage_configuration, InTask::No)
2382                .await
2383                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2384
2385            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2386            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2387                .await
2388                .ok();
2389
2390            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2391                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2392            }
2393
2394            *seed = Some(CsrSeedProtobuf { value, key });
2395        }
2396        Some(_) => (),
2397    }
2398
2399    Ok(())
2400}
2401
2402async fn purify_csr_connection_avro(
2403    catalog: &dyn SessionCatalog,
2404    options: &SourceFormatOptions,
2405    csr_connection: &mut CsrConnectionAvro<Aug>,
2406    envelope: &Option<SourceEnvelope>,
2407    storage_configuration: &StorageConfiguration,
2408) -> Result<(), PlanError> {
2409    let SourceFormatOptions::Kafka { topic } = options else {
2410        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2411    };
2412
2413    let CsrConnectionAvro {
2414        connection: CsrConnection { connection, .. },
2415        seed,
2416        key_strategy,
2417        value_strategy,
2418    } = csr_connection;
2419    if seed.is_none() {
2420        let scx = StatementContext::new(None, &*catalog);
2421        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2422            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2423            _ => sql_bail!("{} is not a schema registry connection", connection),
2424        };
2425        let ccsr_client = csr_connection
2426            .connect(storage_configuration, InTask::No)
2427            .await
2428            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2429
2430        let Schema {
2431            key_schema,
2432            value_schema,
2433        } = get_remote_csr_schema(
2434            &ccsr_client,
2435            key_strategy.clone().unwrap_or_default(),
2436            value_strategy.clone().unwrap_or_default(),
2437            topic,
2438        )
2439        .await?;
2440        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2441            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2442        }
2443
2444        *seed = Some(CsrSeedAvro {
2445            key_schema,
2446            value_schema,
2447        })
2448    }
2449
2450    Ok(())
2451}
2452
2453#[derive(Debug)]
2454pub struct Schema {
2455    pub key_schema: Option<String>,
2456    pub value_schema: String,
2457}
2458
2459async fn get_schema_with_strategy(
2460    client: &Client,
2461    strategy: ReaderSchemaSelectionStrategy,
2462    subject: &str,
2463) -> Result<Option<String>, PlanError> {
2464    match strategy {
2465        ReaderSchemaSelectionStrategy::Latest => {
2466            match client.get_schema_by_subject(subject).await {
2467                Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2468                Err(GetBySubjectError::SubjectNotFound)
2469                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2470                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2471                    schema_lookup: format!("subject {}", subject.quoted()),
2472                    cause: Arc::new(e),
2473                }),
2474            }
2475        }
2476        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2477        ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2478            Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2479            Err(GetByIdError::SchemaNotFound) => Ok(None),
2480            Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2481                schema_lookup: format!("ID {}", id),
2482                cause: Arc::new(e),
2483            }),
2484        },
2485    }
2486}
2487
2488async fn get_remote_csr_schema(
2489    ccsr_client: &mz_ccsr::Client,
2490    key_strategy: ReaderSchemaSelectionStrategy,
2491    value_strategy: ReaderSchemaSelectionStrategy,
2492    topic: &str,
2493) -> Result<Schema, PlanError> {
2494    let value_schema_name = format!("{}-value", topic);
2495    let value_schema =
2496        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2497    let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2498    let subject = format!("{}-key", topic);
2499    let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2500    Ok(Schema {
2501        key_schema,
2502        value_schema,
2503    })
2504}
2505
2506/// Collect protobuf message descriptor from CSR and compile the descriptor.
2507async fn compile_proto(
2508    subject_name: &String,
2509    ccsr_client: &Client,
2510) -> Result<CsrSeedProtobufSchema, PlanError> {
2511    let (primary_subject, dependency_subjects) = ccsr_client
2512        .get_subject_and_references(subject_name)
2513        .await
2514        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2515            schema_lookup: format!("subject {}", subject_name.quoted()),
2516            cause: Arc::new(e),
2517        })?;
2518
2519    // Compile .proto files into a file descriptor set.
2520    let mut source_tree = VirtualSourceTree::new();
2521    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2522        source_tree.as_mut().add_file(
2523            Path::new(&subject.name),
2524            subject.schema.raw.as_bytes().to_vec(),
2525        );
2526    }
2527    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2528    let fds = db
2529        .as_mut()
2530        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2531        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2532
2533    // Ensure there is exactly one message in the file.
2534    let primary_fd = fds.file(0);
2535    let message_name = match primary_fd.message_type_size() {
2536        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2537        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2538        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2539    };
2540
2541    // Encode the file descriptor set into a SQL byte string.
2542    let bytes = &fds
2543        .serialize()
2544        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2545    let mut schema = String::new();
2546    strconv::format_bytes(&mut schema, bytes);
2547
2548    Ok(CsrSeedProtobufSchema {
2549        schema,
2550        message_name,
2551    })
2552}
2553
2554const MZ_NOW_NAME: &str = "mz_now";
2555const MZ_NOW_SCHEMA: &str = "mz_catalog";
2556
2557/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2558/// references to ids appear or disappear during the purification.
2559///
2560/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2561/// this function is not making any network calls.
2562pub fn purify_create_materialized_view_options(
2563    catalog: impl SessionCatalog,
2564    mz_now: Option<Timestamp>,
2565    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2566    resolved_ids: &mut ResolvedIds,
2567) {
2568    // 0. Preparations:
2569    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2570    let (mz_now_id, mz_now_expr) = {
2571        let item = catalog
2572            .resolve_function(&PartialItemName {
2573                database: None,
2574                schema: Some(MZ_NOW_SCHEMA.to_string()),
2575                item: MZ_NOW_NAME.to_string(),
2576            })
2577            .expect("we should be able to resolve mz_now");
2578        (
2579            item.id(),
2580            Expr::Function(Function {
2581                name: ResolvedItemName::Item {
2582                    id: item.id(),
2583                    qualifiers: item.name().qualifiers.clone(),
2584                    full_name: catalog.resolve_full_name(item.name()),
2585                    print_id: false,
2586                    version: RelationVersionSelector::Latest,
2587                },
2588                args: FunctionArgs::Args {
2589                    args: Vec::new(),
2590                    order_by: Vec::new(),
2591                },
2592                filter: None,
2593                over: None,
2594                distinct: false,
2595            }),
2596        )
2597    };
2598    // Prepare the `mz_timestamp` type.
2599    let (mz_timestamp_id, mz_timestamp_type) = {
2600        let item = catalog.get_system_type("mz_timestamp");
2601        let full_name = catalog.resolve_full_name(item.name());
2602        (
2603            item.id(),
2604            ResolvedDataType::Named {
2605                id: item.id(),
2606                qualifiers: item.name().qualifiers.clone(),
2607                full_name,
2608                modifiers: vec![],
2609                print_id: true,
2610            },
2611        )
2612    };
2613
2614    let mut introduced_mz_timestamp = false;
2615
2616    for option in cmvs.with_options.iter_mut() {
2617        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2618        if matches!(
2619            option.value,
2620            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2621        ) {
2622            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2623                RefreshAtOptionValue {
2624                    time: mz_now_expr.clone(),
2625                },
2626            )));
2627        }
2628
2629        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2630        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2631            RefreshEveryOptionValue { aligned_to, .. },
2632        ))) = &mut option.value
2633        {
2634            if aligned_to.is_none() {
2635                *aligned_to = Some(mz_now_expr.clone());
2636            }
2637        }
2638
2639        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2640        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2641        match &mut option.value {
2642            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2643                time,
2644            }))) => {
2645                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2646                visitor.visit_expr_mut(time);
2647                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2648            }
2649            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2650                RefreshEveryOptionValue {
2651                    interval: _,
2652                    aligned_to: Some(aligned_to),
2653                },
2654            ))) => {
2655                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2656                visitor.visit_expr_mut(aligned_to);
2657                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2658            }
2659            _ => {}
2660        }
2661    }
2662
2663    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2664    if !cmvs.with_options.iter().any(|o| {
2665        matches!(
2666            o,
2667            MaterializedViewOption {
2668                value: Some(WithOptionValue::Refresh(..)),
2669                ..
2670            }
2671        )
2672    }) {
2673        cmvs.with_options.push(MaterializedViewOption {
2674            name: MaterializedViewOptionName::Refresh,
2675            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2676        })
2677    }
2678
2679    // 5. Attend to `resolved_ids`: The purification might have
2680    // - added references to `mz_timestamp`;
2681    // - removed references to `mz_now`.
2682    if introduced_mz_timestamp {
2683        resolved_ids.add_item(mz_timestamp_id);
2684    }
2685    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2686    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2687    // for `mz_now()` everywhere.
2688    let mut visitor = ExprContainsTemporalVisitor::new();
2689    visitor.visit_create_materialized_view_statement(cmvs);
2690    if !visitor.contains_temporal {
2691        resolved_ids.remove_item(&mz_now_id);
2692    }
2693}
2694
2695/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2696/// after purification.
2697pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2698    match &mvo.value {
2699        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2700            let mut visitor = ExprContainsTemporalVisitor::new();
2701            visitor.visit_expr(time);
2702            visitor.contains_temporal
2703        }
2704        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2705            interval: _,
2706            aligned_to: Some(aligned_to),
2707        }))) => {
2708            let mut visitor = ExprContainsTemporalVisitor::new();
2709            visitor.visit_expr(aligned_to);
2710            visitor.contains_temporal
2711        }
2712        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2713            interval: _,
2714            aligned_to: None,
2715        }))) => {
2716            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2717            // `ALIGNED TO` to `mz_now()`.
2718            true
2719        }
2720        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2721            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2722            true
2723        }
2724        _ => false,
2725    }
2726}
2727
2728/// Determines whether the AST involves `mz_now()`.
2729struct ExprContainsTemporalVisitor {
2730    pub contains_temporal: bool,
2731}
2732
2733impl ExprContainsTemporalVisitor {
2734    pub fn new() -> ExprContainsTemporalVisitor {
2735        ExprContainsTemporalVisitor {
2736            contains_temporal: false,
2737        }
2738    }
2739}
2740
2741impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2742    fn visit_function(&mut self, func: &Function<Aug>) {
2743        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2744        visit_function(self, func);
2745    }
2746}
2747
2748struct MzNowPurifierVisitor {
2749    pub mz_now: Option<Timestamp>,
2750    pub mz_timestamp_type: ResolvedDataType,
2751    pub introduced_mz_timestamp: bool,
2752}
2753
2754impl MzNowPurifierVisitor {
2755    pub fn new(
2756        mz_now: Option<Timestamp>,
2757        mz_timestamp_type: ResolvedDataType,
2758    ) -> MzNowPurifierVisitor {
2759        MzNowPurifierVisitor {
2760            mz_now,
2761            mz_timestamp_type,
2762            introduced_mz_timestamp: false,
2763        }
2764    }
2765}
2766
2767impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2768    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2769        match expr {
2770            Expr::Function(Function {
2771                name:
2772                    ResolvedItemName::Item {
2773                        full_name: FullItemName { item, .. },
2774                        ..
2775                    },
2776                ..
2777            }) if item == &MZ_NOW_NAME.to_string() => {
2778                let mz_now = self.mz_now.expect(
2779                    "we should have chosen a timestamp if the expression contains mz_now()",
2780                );
2781                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2782                // not alter the type of the expression.
2783                *expr = Expr::Cast {
2784                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2785                    data_type: self.mz_timestamp_type.clone(),
2786                };
2787                self.introduced_mz_timestamp = true;
2788            }
2789            _ => visit_expr_mut(self, expr),
2790        }
2791    }
2792}