Skip to main content

mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::str::StrExt;
33use mz_ore::{assert_none, soft_panic_or_log};
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81    ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103    external_reference: UnresolvedItemName,
104    name: UnresolvedItemName,
105    meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        f.debug_struct("RequestedSourceExport")
111            .field("external_reference", &self.external_reference)
112            .field("name", &self.name)
113            .field("meta", &self.meta)
114            .finish()
115    }
116}
117
118impl<T> RequestedSourceExport<T> {
119    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120        RequestedSourceExport {
121            external_reference: self.external_reference,
122            name: self.name,
123            meta: new_meta,
124        }
125    }
126}
127
128/// Generates a subsource name by prepending source schema name if present
129///
130/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
131/// so that it's generated in the same schema as source
132fn source_export_name_gen(
133    source_name: &UnresolvedItemName,
134    subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137    partial.item = subsource_name.to_string();
138    Ok(UnresolvedItemName::from(partial))
139}
140
141// TODO(database-issues#8620): Remove once subsources are removed
142/// Validates the requested subsources do not have name conflicts with each other
143/// and that the same upstream table is not referenced multiple times.
144fn validate_source_export_names<T>(
145    requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147    // This condition would get caught during the catalog transaction, but produces a
148    // vague, non-contextual error. Instead, error here so we can suggest to the user
149    // how to fix the problem.
150    if let Some(name) = requested_source_exports
151        .iter()
152        .map(|subsource| &subsource.name)
153        .duplicates()
154        .next()
155        .cloned()
156    {
157        let mut upstream_references: Vec<_> = requested_source_exports
158            .into_iter()
159            .filter_map(|subsource| {
160                if &subsource.name == &name {
161                    Some(subsource.external_reference.clone())
162                } else {
163                    None
164                }
165            })
166            .collect();
167
168        upstream_references.sort();
169
170        Err(PlanError::SubsourceNameConflict {
171            name,
172            upstream_references,
173        })?;
174    }
175
176    // We disallow subsource statements from referencing the same upstream table, but we allow
177    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
178    // purification will only provide 1 `requested_source_export`, we can leave this here without
179    // needing to differentiate between the two types of statements.
180    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
181    if let Some(name) = requested_source_exports
182        .iter()
183        .map(|export| &export.external_reference)
184        .duplicates()
185        .next()
186        .cloned()
187    {
188        let mut target_names: Vec<_> = requested_source_exports
189            .into_iter()
190            .filter_map(|export| {
191                if &export.external_reference == &name {
192                    Some(export.name.clone())
193                } else {
194                    None
195                }
196            })
197            .collect();
198
199        target_names.sort();
200
201        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202    }
203
204    Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209    PurifiedCreateSource {
210        // The progress subsource, if we are offloading progress info to a separate relation
211        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212        create_source_stmt: CreateSourceStatement<Aug>,
213        // Map of subsource names to external details
214        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215        /// All the available upstream references that can be added as tables
216        /// to this primary source.
217        available_source_references: SourceReferences,
218    },
219    PurifiedAlterSource {
220        alter_source_stmt: AlterSourceStatement<Aug>,
221    },
222    PurifiedAlterSourceAddSubsources {
223        // This just saves us an annoying catalog lookup
224        source_name: ResolvedItemName,
225        /// Options that we will need the values of to update the source's
226        /// definition.
227        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228        // Map of subsource names to external details
229        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230    },
231    PurifiedAlterSourceRefreshReferences {
232        source_name: ResolvedItemName,
233        /// The updated available upstream references for the primary source.
234        available_source_references: SourceReferences,
235    },
236    PurifiedCreateSink(CreateSinkStatement<Aug>),
237    PurifiedCreateTableFromSource {
238        stmt: CreateTableFromSourceStatement<Aug>,
239    },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244    pub external_reference: UnresolvedItemName,
245    pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250    MySql {
251        table: MySqlTableDesc,
252        text_columns: Option<Vec<Ident>>,
253        exclude_columns: Option<Vec<Ident>>,
254        initial_gtid_set: String,
255    },
256    Postgres {
257        table: PostgresTableDesc,
258        text_columns: Option<Vec<Ident>>,
259        exclude_columns: Option<Vec<Ident>>,
260    },
261    SqlServer {
262        table: SqlServerTableDesc,
263        text_columns: Option<Vec<Ident>>,
264        excl_columns: Option<Vec<Ident>>,
265        capture_instance: Arc<str>,
266        initial_lsn: mz_sql_server_util::cdc::Lsn,
267    },
268    Kafka {},
269    LoadGenerator {
270        table: Option<RelationDesc>,
271        output: LoadGeneratorOutput,
272    },
273}
274
275/// Purifies a statement, removing any dependencies on external state.
276///
277/// See the section on [purification](crate#purification) in the crate
278/// documentation for details.
279///
280/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
281/// handled by [purify_create_materialized_view_options] instead.
282/// This could be made more consistent by a refactoring discussed here:
283/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
284pub async fn purify_statement(
285    catalog: impl SessionCatalog,
286    now: u64,
287    stmt: Statement<Aug>,
288    storage_configuration: &StorageConfiguration,
289) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
290    match stmt {
291        Statement::CreateSource(stmt) => {
292            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
293            (
294                purify_create_source(catalog, now, stmt, storage_configuration).await,
295                cluster_id,
296            )
297        }
298        Statement::AlterSource(stmt) => (
299            purify_alter_source(catalog, stmt, storage_configuration).await,
300            None,
301        ),
302        Statement::CreateSink(stmt) => {
303            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
304            (
305                purify_create_sink(catalog, stmt, storage_configuration).await,
306                cluster_id,
307            )
308        }
309        Statement::CreateTableFromSource(stmt) => (
310            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
311            None,
312        ),
313        o => unreachable!("{:?} does not need to be purified", o),
314    }
315}
316
317/// Injects `DOC ON` comments into all Avro formats that are using a schema
318/// registry by finding all SQL `COMMENT`s that are attached to the sink's
319/// underlying materialized view (as well as any types referenced by that
320/// underlying materialized view).
321pub(crate) fn purify_create_sink_avro_doc_on_options(
322    catalog: &dyn SessionCatalog,
323    from_id: CatalogItemId,
324    format: &mut Option<FormatSpecifier<Aug>>,
325) -> Result<(), PlanError> {
326    // Collect all objects referenced by the sink.
327    let from = catalog.get_item(&from_id);
328    let object_ids = from
329        .references()
330        .items()
331        .copied()
332        .chain_one(from.id())
333        .collect::<Vec<_>>();
334
335    // Collect all Avro formats that use a schema registry, as well as a set of
336    // all identifiers named in user-provided `DOC ON` options.
337    let mut avro_format_options = vec![];
338    for_each_format(format, |doc_on_schema, fmt| match fmt {
339        Format::Avro(AvroSchema::InlineSchema { .. })
340        | Format::Bytes
341        | Format::Csv { .. }
342        | Format::Json { .. }
343        | Format::Protobuf(..)
344        | Format::Regex(..)
345        | Format::Text => (),
346        Format::Avro(AvroSchema::Csr {
347            csr_connection: CsrConnectionAvro { connection, .. },
348        }) => {
349            avro_format_options.push((doc_on_schema, &mut connection.options));
350        }
351    });
352
353    // For each Avro format in the sink, inject the appropriate `DOC ON` options
354    // for each item referenced by the sink.
355    for (for_schema, options) in avro_format_options {
356        let user_provided_comments = options
357            .iter()
358            .filter_map(|CsrConfigOption { name, .. }| match name {
359                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
360                _ => None,
361            })
362            .collect::<BTreeSet<_>>();
363
364        // Adding existing comments if not already provided by user
365        for object_id in &object_ids {
366            // Always add comments to the latest version of the item.
367            let item = catalog
368                .get_item(object_id)
369                .at_version(RelationVersionSelector::Latest);
370            let full_resolved_name = ResolvedItemName::Item {
371                id: *object_id,
372                qualifiers: item.name().qualifiers.clone(),
373                full_name: catalog.resolve_full_name(item.name()),
374                print_id: !matches!(
375                    item.item_type(),
376                    CatalogItemType::Func | CatalogItemType::Type
377                ),
378                version: RelationVersionSelector::Latest,
379            };
380
381            if let Some(comments_map) = catalog.get_item_comments(object_id) {
382                // Attach comment for the item itself, if the user has not
383                // already provided an overriding `DOC ON` option for the item.
384                let doc_on_item_key = AvroDocOn {
385                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
386                    for_schema,
387                };
388                if !user_provided_comments.contains(&doc_on_item_key) {
389                    if let Some(root_comment) = comments_map.get(&None) {
390                        options.push(CsrConfigOption {
391                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
392                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
393                                root_comment.clone(),
394                            ))),
395                        });
396                    }
397                }
398
399                // Attach comment for each column in the item, if the user has
400                // not already provided an overriding `DOC ON` option for the
401                // column.
402                let column_descs = match item.type_details() {
403                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
404                    None => item.relation_desc().map(|d| d.into_owned()),
405                };
406
407                if let Some(desc) = column_descs {
408                    for (pos, column_name) in desc.iter_names().enumerate() {
409                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
410                            let doc_on_column_key = AvroDocOn {
411                                identifier: DocOnIdentifier::Column(ColumnName {
412                                    relation: full_resolved_name.clone(),
413                                    column: ResolvedColumnReference::Column {
414                                        name: column_name.to_owned(),
415                                        index: pos,
416                                    },
417                                }),
418                                for_schema,
419                            };
420                            if !user_provided_comments.contains(&doc_on_column_key) {
421                                options.push(CsrConfigOption {
422                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
423                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
424                                        Value::String(comment_str.clone()),
425                                    )),
426                                });
427                            }
428                        }
429                    }
430                }
431            }
432        }
433    }
434
435    Ok(())
436}
437
438/// Checks that the sink described in the statement can connect to its external
439/// resources.
440async fn purify_create_sink(
441    catalog: impl SessionCatalog,
442    mut create_sink_stmt: CreateSinkStatement<Aug>,
443    storage_configuration: &StorageConfiguration,
444) -> Result<PurifiedStatement, PlanError> {
445    // General purification
446    let CreateSinkStatement {
447        connection,
448        format,
449        with_options,
450        name: _,
451        in_cluster: _,
452        if_not_exists: _,
453        from,
454        envelope: _,
455        mode: _,
456    } = &mut create_sink_stmt;
457
458    // The list of options that the user is allowed to specify.
459    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
460        CreateSinkOptionName::Snapshot,
461        CreateSinkOptionName::CommitInterval,
462    ];
463
464    if let Some(op) = with_options
465        .iter()
466        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
467    {
468        sql_bail!(
469            "CREATE SINK...WITH ({}..) is not allowed",
470            op.name.to_ast_string_simple(),
471        )
472    }
473
474    match &connection {
475        CreateSinkConnection::Kafka {
476            connection,
477            options,
478            key: _,
479            headers: _,
480        } => {
481            // We must not leave any state behind in the Kafka broker, so just ensure that
482            // we can connect. This means we don't ensure that we can create the topic and
483            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
484            // preferable to leaking state in users' environments.
485            let scx = StatementContext::new(None, &catalog);
486            let connection = {
487                let item = scx.get_item_by_resolved_name(connection)?;
488                // Get Kafka connection
489                match item.connection()? {
490                    Connection::Kafka(connection) => {
491                        connection.clone().into_inline_connection(scx.catalog)
492                    }
493                    _ => sql_bail!(
494                        "{} is not a kafka connection",
495                        scx.catalog.resolve_full_name(item.name())
496                    ),
497                }
498            };
499
500            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
501
502            if extracted_options.legacy_ids == Some(true) {
503                sql_bail!("LEGACY IDs option is not supported");
504            }
505
506            let client: AdminClient<_> = connection
507                .create_with_context(
508                    storage_configuration,
509                    MzClientContext::default(),
510                    &BTreeMap::new(),
511                    InTask::No,
512                )
513                .await
514                .map_err(|e| {
515                    // anyhow doesn't support Clone, so not trivial to move into PlanError
516                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
517                })?;
518
519            let metadata = client
520                .inner()
521                .fetch_metadata(
522                    None,
523                    storage_configuration
524                        .parameters
525                        .kafka_timeout_config
526                        .fetch_metadata_timeout,
527                )
528                .map_err(|e| {
529                    KafkaSinkPurificationError::AdminClientError(Arc::new(
530                        ContextCreationError::KafkaError(e),
531                    ))
532                })?;
533
534            if metadata.brokers().len() == 0 {
535                Err(KafkaSinkPurificationError::ZeroBrokers)?;
536            }
537        }
538        CreateSinkConnection::Iceberg {
539            connection,
540            aws_connection,
541            ..
542        } => {
543            let scx = StatementContext::new(None, &catalog);
544            let connection = {
545                let item = scx.get_item_by_resolved_name(connection)?;
546                // Get Iceberg connection
547                match item.connection()? {
548                    Connection::IcebergCatalog(connection) => {
549                        connection.clone().into_inline_connection(scx.catalog)
550                    }
551                    _ => sql_bail!(
552                        "{} is not an iceberg connection",
553                        scx.catalog.resolve_full_name(item.name())
554                    ),
555                }
556            };
557
558            let aws_conn_id = aws_connection.item_id();
559
560            let aws_connection = {
561                let item = scx.get_item_by_resolved_name(aws_connection)?;
562                // Get AWS connection
563                match item.connection()? {
564                    Connection::Aws(aws_connection) => aws_connection.clone(),
565                    _ => sql_bail!(
566                        "{} is not an aws connection",
567                        scx.catalog.resolve_full_name(item.name())
568                    ),
569                }
570            };
571
572            // For S3 Tables connections in the Materialize Cloud product, verify the
573            // AWS region matches the environment's region. This check only applies when
574            // the enable_s3_tables_region_check dyncfg is set.
575            if let Some(s3tables) = connection.s3tables_catalog() {
576                let enable_region_check =
577                    ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
578                if enable_region_check {
579                    let env_id = &catalog.config().environment_id;
580                    if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
581                        let env_region = env_id.cloud_provider_region();
582                        // Later on we default to "us-east-1" if the region is not set on the S3 Tables
583                        // connection, so we need to do the same check here.
584                        let s3_tables_region = s3tables
585                            .aws_connection
586                            .connection
587                            .region
588                            .clone()
589                            .unwrap_or_else(|| "us-east-1".to_string());
590                        if s3_tables_region != env_region {
591                            Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
592                                s3_tables_region,
593                                environment_region: env_region.to_string(),
594                            })?;
595                        }
596                    }
597                }
598            }
599
600            let _catalog = connection
601                .connect(storage_configuration, InTask::No)
602                .await
603                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
604
605            let _sdk_config = aws_connection
606                .load_sdk_config(
607                    &storage_configuration.connection_context,
608                    aws_conn_id.clone(),
609                    InTask::No,
610                )
611                .await
612                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
613        }
614    }
615
616    let mut csr_connection_ids = BTreeSet::new();
617    for_each_format(format, |_, fmt| match fmt {
618        Format::Avro(AvroSchema::InlineSchema { .. })
619        | Format::Bytes
620        | Format::Csv { .. }
621        | Format::Json { .. }
622        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
623        | Format::Regex(..)
624        | Format::Text => (),
625        Format::Avro(AvroSchema::Csr {
626            csr_connection: CsrConnectionAvro { connection, .. },
627        })
628        | Format::Protobuf(ProtobufSchema::Csr {
629            csr_connection: CsrConnectionProtobuf { connection, .. },
630        }) => {
631            csr_connection_ids.insert(*connection.connection.item_id());
632        }
633    });
634
635    let scx = StatementContext::new(None, &catalog);
636    for csr_connection_id in csr_connection_ids {
637        let connection = {
638            let item = scx.get_item(&csr_connection_id);
639            // Get Kafka connection
640            match item.connection()? {
641                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
642                _ => Err(CsrPurificationError::NotCsrConnection(
643                    scx.catalog.resolve_full_name(item.name()),
644                ))?,
645            }
646        };
647
648        let client = connection
649            .connect(storage_configuration, InTask::No)
650            .await
651            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
652
653        client
654            .list_subjects()
655            .await
656            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
657    }
658
659    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
660
661    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
662}
663
664/// Runs a function on each format within the format specifier.
665///
666/// The function is provided with a `DocOnSchema` that indicates whether the
667/// format is for the key, value, or both.
668///
669// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
670fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
671where
672    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
673{
674    match format {
675        None => (),
676        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
677        Some(FormatSpecifier::KeyValue { key, value }) => {
678            f(DocOnSchema::KeyOnly, key);
679            f(DocOnSchema::ValueOnly, value);
680        }
681    }
682}
683
684/// Defines whether purification should enforce that at least one valid source
685/// reference is provided on the provided statement.
686#[derive(Debug, Copy, Clone, PartialEq, Eq)]
687pub(crate) enum SourceReferencePolicy {
688    /// Don't allow source references to be provided. This is used for
689    /// enforcing that `CREATE SOURCE` statements don't create subsources.
690    NotAllowed,
691    /// Allow empty references, such as when creating a source that
692    /// will have tables added afterwards.
693    Optional,
694    /// Require that at least one reference is resolved to an upstream
695    /// object.
696    Required,
697}
698
699async fn purify_create_source(
700    catalog: impl SessionCatalog,
701    now: u64,
702    mut create_source_stmt: CreateSourceStatement<Aug>,
703    storage_configuration: &StorageConfiguration,
704) -> Result<PurifiedStatement, PlanError> {
705    let CreateSourceStatement {
706        name: source_name,
707        col_names,
708        key_constraint,
709        connection: source_connection,
710        format,
711        envelope,
712        include_metadata,
713        external_references,
714        progress_subsource,
715        with_options,
716        ..
717    } = &mut create_source_stmt;
718
719    let uses_old_syntax = !col_names.is_empty()
720        || key_constraint.is_some()
721        || format.is_some()
722        || envelope.is_some()
723        || !include_metadata.is_empty()
724        || external_references.is_some()
725        || progress_subsource.is_some();
726
727    if let Some(DeferredItemName::Named(_)) = progress_subsource {
728        sql_bail!("Cannot manually ID qualify progress subsource")
729    }
730
731    let mut requested_subsource_map = BTreeMap::new();
732
733    let progress_desc = match &source_connection {
734        CreateSourceConnection::Kafka { .. } => {
735            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
736        }
737        CreateSourceConnection::Postgres { .. } => {
738            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
739        }
740        CreateSourceConnection::SqlServer { .. } => {
741            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
742        }
743        CreateSourceConnection::MySql { .. } => {
744            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
745        }
746        CreateSourceConnection::LoadGenerator { .. } => {
747            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
748        }
749    };
750    let scx = StatementContext::new(None, &catalog);
751
752    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
753    // to add tables after this source is created we might need to enforce that
754    // auto-generated subsources are created or not created by this source statement.
755    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
756        && scx.catalog.system_vars().force_source_table_syntax()
757    {
758        SourceReferencePolicy::NotAllowed
759    } else if scx.catalog.system_vars().enable_create_table_from_source() {
760        SourceReferencePolicy::Optional
761    } else {
762        SourceReferencePolicy::Required
763    };
764
765    let mut format_options = SourceFormatOptions::Default;
766
767    let retrieved_source_references: RetrievedSourceReferences;
768
769    match source_connection {
770        CreateSourceConnection::Kafka {
771            connection,
772            options: base_with_options,
773            ..
774        } => {
775            if let Some(external_references) = external_references {
776                Err(KafkaSourcePurificationError::ReferencedSubsources(
777                    external_references.clone(),
778                ))?;
779            }
780
781            let connection = {
782                let item = scx.get_item_by_resolved_name(connection)?;
783                // Get Kafka connection
784                match item.connection()? {
785                    Connection::Kafka(connection) => {
786                        connection.clone().into_inline_connection(&catalog)
787                    }
788                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
789                        scx.catalog.resolve_full_name(item.name()),
790                    ))?,
791                }
792            };
793
794            let extracted_options: KafkaSourceConfigOptionExtracted =
795                base_with_options.clone().try_into()?;
796
797            let topic = extracted_options
798                .topic
799                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
800
801            let consumer = connection
802                .create_with_context(
803                    storage_configuration,
804                    MzClientContext::default(),
805                    &BTreeMap::new(),
806                    InTask::No,
807                )
808                .await
809                .map_err(|e| {
810                    // anyhow doesn't support Clone, so not trivial to move into PlanError
811                    KafkaSourcePurificationError::KafkaConsumerError(
812                        e.display_with_causes().to_string(),
813                    )
814                })?;
815            let consumer = Arc::new(consumer);
816
817            match (
818                extracted_options.start_offset,
819                extracted_options.start_timestamp,
820            ) {
821                (None, None) => {
822                    // Validate that the topic at least exists.
823                    kafka_util::ensure_topic_exists(
824                        Arc::clone(&consumer),
825                        &topic,
826                        storage_configuration
827                            .parameters
828                            .kafka_timeout_config
829                            .fetch_metadata_timeout,
830                    )
831                    .await?;
832                }
833                (Some(_), Some(_)) => {
834                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
835                }
836                (Some(start_offsets), None) => {
837                    // Validate the start offsets.
838                    kafka_util::validate_start_offsets(
839                        Arc::clone(&consumer),
840                        &topic,
841                        start_offsets,
842                        storage_configuration
843                            .parameters
844                            .kafka_timeout_config
845                            .fetch_metadata_timeout,
846                    )
847                    .await?;
848                }
849                (None, Some(time_offset)) => {
850                    // Translate `START TIMESTAMP` to a start offset.
851                    let start_offsets = kafka_util::lookup_start_offsets(
852                        Arc::clone(&consumer),
853                        &topic,
854                        time_offset,
855                        now,
856                        storage_configuration
857                            .parameters
858                            .kafka_timeout_config
859                            .fetch_metadata_timeout,
860                    )
861                    .await?;
862
863                    base_with_options.retain(|val| {
864                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
865                    });
866                    base_with_options.push(KafkaSourceConfigOption {
867                        name: KafkaSourceConfigOptionName::StartOffset,
868                        value: Some(WithOptionValue::Sequence(
869                            start_offsets
870                                .iter()
871                                .map(|offset| {
872                                    WithOptionValue::Value(Value::Number(offset.to_string()))
873                                })
874                                .collect(),
875                        )),
876                    });
877                }
878            }
879
880            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
881            retrieved_source_references = reference_client.get_source_references().await?;
882
883            format_options = SourceFormatOptions::Kafka { topic };
884        }
885        CreateSourceConnection::Postgres {
886            connection,
887            options,
888        } => {
889            let connection_item = scx.get_item_by_resolved_name(connection)?;
890            let connection = match connection_item.connection().map_err(PlanError::from)? {
891                Connection::Postgres(connection) => {
892                    connection.clone().into_inline_connection(&catalog)
893                }
894                _ => Err(PgSourcePurificationError::NotPgConnection(
895                    scx.catalog.resolve_full_name(connection_item.name()),
896                ))?,
897            };
898            let crate::plan::statement::PgConfigOptionExtracted {
899                publication,
900                text_columns,
901                exclude_columns,
902                details,
903                ..
904            } = options.clone().try_into()?;
905            let publication =
906                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
907
908            if details.is_some() {
909                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
910            }
911
912            let client = connection
913                .validate(connection_item.id(), storage_configuration)
914                .await?;
915
916            let reference_client = SourceReferenceClient::Postgres {
917                client: &client,
918                publication: &publication,
919                database: &connection.database,
920            };
921            retrieved_source_references = reference_client.get_source_references().await?;
922
923            let postgres::PurifiedSourceExports {
924                source_exports: subsources,
925                normalized_text_columns,
926            } = postgres::purify_source_exports(
927                &client,
928                &retrieved_source_references,
929                external_references,
930                text_columns,
931                exclude_columns,
932                source_name,
933                &reference_policy,
934            )
935            .await?;
936
937            if let Some(text_cols_option) = options
938                .iter_mut()
939                .find(|option| option.name == PgConfigOptionName::TextColumns)
940            {
941                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
942            }
943
944            requested_subsource_map.extend(subsources);
945
946            // Record the active replication timeline_id to allow detection of a future upstream
947            // point-in-time-recovery that will put the source into an error state.
948            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
949
950            // Remove any old detail references
951            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
952            let details = PostgresSourcePublicationDetails {
953                slot: format!(
954                    "materialize_{}",
955                    Uuid::new_v4().to_string().replace('-', "")
956                ),
957                timeline_id: Some(timeline_id),
958                database: connection.database,
959            };
960            options.push(PgConfigOption {
961                name: PgConfigOptionName::Details,
962                value: Some(WithOptionValue::Value(Value::String(hex::encode(
963                    details.into_proto().encode_to_vec(),
964                )))),
965            })
966        }
967        CreateSourceConnection::SqlServer {
968            connection,
969            options,
970        } => {
971            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
972
973            // Connect to the upstream SQL Server instance so we can validate
974            // we're compatible with CDC.
975            let connection_item = scx.get_item_by_resolved_name(connection)?;
976            let connection = match connection_item.connection()? {
977                Connection::SqlServer(connection) => {
978                    connection.clone().into_inline_connection(&catalog)
979                }
980                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
981                    scx.catalog.resolve_full_name(connection_item.name()),
982                ))?,
983            };
984            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
985                details,
986                text_columns,
987                exclude_columns,
988                seen: _,
989            } = options.clone().try_into()?;
990
991            if details.is_some() {
992                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
993            }
994
995            let mut client = connection
996                .validate(connection_item.id(), storage_configuration)
997                .await?;
998
999            let database: Arc<str> = connection.database.into();
1000            let reference_client = SourceReferenceClient::SqlServer {
1001                client: &mut client,
1002                database: Arc::clone(&database),
1003            };
1004            retrieved_source_references = reference_client.get_source_references().await?;
1005            tracing::debug!(?retrieved_source_references, "got source references");
1006
1007            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1008                .get(storage_configuration.config_set());
1009
1010            let purified_source_exports = sql_server::purify_source_exports(
1011                &*database,
1012                &mut client,
1013                &retrieved_source_references,
1014                external_references,
1015                &text_columns,
1016                &exclude_columns,
1017                source_name,
1018                timeout,
1019                &reference_policy,
1020            )
1021            .await?;
1022
1023            let sql_server::PurifiedSourceExports {
1024                source_exports,
1025                normalized_text_columns,
1026                normalized_excl_columns,
1027            } = purified_source_exports;
1028
1029            // Update our set of requested source exports.
1030            requested_subsource_map.extend(source_exports);
1031
1032            // Record the most recent restore_history_id, or none if the system has never been
1033            // restored.
1034
1035            let restore_history_id =
1036                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1037            let details = SqlServerSourceExtras { restore_history_id };
1038
1039            options.retain(|SqlServerConfigOption { name, .. }| {
1040                name != &SqlServerConfigOptionName::Details
1041            });
1042            options.push(SqlServerConfigOption {
1043                name: SqlServerConfigOptionName::Details,
1044                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1045                    details.into_proto().encode_to_vec(),
1046                )))),
1047            });
1048
1049            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1050            if let Some(text_cols_option) = options
1051                .iter_mut()
1052                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1053            {
1054                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1055            }
1056            if let Some(excl_cols_option) = options
1057                .iter_mut()
1058                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1059            {
1060                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1061            }
1062        }
1063        CreateSourceConnection::MySql {
1064            connection,
1065            options,
1066        } => {
1067            let connection_item = scx.get_item_by_resolved_name(connection)?;
1068            let connection = match connection_item.connection()? {
1069                Connection::MySql(connection) => {
1070                    connection.clone().into_inline_connection(&catalog)
1071                }
1072                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1073                    scx.catalog.resolve_full_name(connection_item.name()),
1074                ))?,
1075            };
1076            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1077                details,
1078                text_columns,
1079                exclude_columns,
1080                seen: _,
1081            } = options.clone().try_into()?;
1082
1083            if details.is_some() {
1084                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1085            }
1086
1087            let mut conn = connection
1088                .validate(connection_item.id(), storage_configuration)
1089                .await
1090                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1091
1092            // Retrieve the current @gtid_executed value of the server to mark as the effective
1093            // initial snapshot point such that we can ensure consistency if the initial source
1094            // snapshot is broken up over multiple points in time.
1095            let initial_gtid_set =
1096                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1097
1098            let reference_client = SourceReferenceClient::MySql {
1099                conn: &mut conn,
1100                include_system_schemas: mysql::references_system_schemas(external_references),
1101            };
1102            retrieved_source_references = reference_client.get_source_references().await?;
1103
1104            let mysql::PurifiedSourceExports {
1105                source_exports: subsources,
1106                normalized_text_columns,
1107                normalized_exclude_columns,
1108            } = mysql::purify_source_exports(
1109                &mut conn,
1110                &retrieved_source_references,
1111                external_references,
1112                text_columns,
1113                exclude_columns,
1114                source_name,
1115                initial_gtid_set.clone(),
1116                &reference_policy,
1117            )
1118            .await?;
1119            requested_subsource_map.extend(subsources);
1120
1121            // We don't have any fields in this details struct but keep this around for
1122            // conformity with postgres and in-case we end up needing it again in the future.
1123            let details = MySqlSourceDetails {};
1124            // Update options with the purified details
1125            options
1126                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1127            options.push(MySqlConfigOption {
1128                name: MySqlConfigOptionName::Details,
1129                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1130                    details.into_proto().encode_to_vec(),
1131                )))),
1132            });
1133
1134            if let Some(text_cols_option) = options
1135                .iter_mut()
1136                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1137            {
1138                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1139            }
1140            if let Some(exclude_cols_option) = options
1141                .iter_mut()
1142                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1143            {
1144                exclude_cols_option.value =
1145                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1146            }
1147        }
1148        CreateSourceConnection::LoadGenerator { generator, options } => {
1149            let load_generator =
1150                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1151
1152            let reference_client = SourceReferenceClient::LoadGenerator {
1153                generator: &load_generator,
1154            };
1155            retrieved_source_references = reference_client.get_source_references().await?;
1156            // Filter to the references that need to be created as 'subsources', which
1157            // doesn't include the default output for single-output sources.
1158            // TODO(database-issues#8620): Remove once subsources are removed
1159            let multi_output_sources =
1160                retrieved_source_references
1161                    .all_references()
1162                    .iter()
1163                    .any(|r| {
1164                        r.load_generator_output().expect("is loadgen")
1165                            != &LoadGeneratorOutput::Default
1166                    });
1167
1168            match external_references {
1169                Some(requested)
1170                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1171                {
1172                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1173                }
1174                Some(requested) if !multi_output_sources => match requested {
1175                    ExternalReferences::SubsetTables(_) => {
1176                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1177                    }
1178                    ExternalReferences::SubsetSchemas(_) => {
1179                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1180                    }
1181                    ExternalReferences::All => {
1182                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1183                    }
1184                },
1185                Some(requested) => {
1186                    let requested_exports = retrieved_source_references
1187                        .requested_source_exports(Some(requested), source_name)?;
1188                    for export in requested_exports {
1189                        requested_subsource_map.insert(
1190                            export.name,
1191                            PurifiedSourceExport {
1192                                external_reference: export.external_reference,
1193                                details: PurifiedExportDetails::LoadGenerator {
1194                                    table: export
1195                                        .meta
1196                                        .load_generator_desc()
1197                                        .expect("is loadgen")
1198                                        .clone(),
1199                                    output: export
1200                                        .meta
1201                                        .load_generator_output()
1202                                        .expect("is loadgen")
1203                                        .clone(),
1204                                },
1205                            },
1206                        );
1207                    }
1208                }
1209                None => {
1210                    if multi_output_sources
1211                        && matches!(reference_policy, SourceReferencePolicy::Required)
1212                    {
1213                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1214                    }
1215                }
1216            }
1217
1218            if let LoadGenerator::Clock = generator {
1219                if !options
1220                    .iter()
1221                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1222                {
1223                    let now = catalog.now();
1224                    options.push(LoadGeneratorOption {
1225                        name: LoadGeneratorOptionName::AsOf,
1226                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1227                    });
1228                }
1229            }
1230        }
1231    }
1232
1233    // Now that we know which subsources to create alongside this
1234    // statement, remove the references so it is not canonicalized as
1235    // part of the `CREATE SOURCE` statement in the catalog.
1236    *external_references = None;
1237
1238    // Generate progress subsource for old syntax
1239    let create_progress_subsource_stmt = if uses_old_syntax {
1240        // Take name from input or generate name
1241        let name = match progress_subsource {
1242            Some(name) => match name {
1243                DeferredItemName::Deferred(name) => name.clone(),
1244                DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1245            },
1246            None => {
1247                let (item, prefix) = source_name.0.split_last().unwrap();
1248                let item_name =
1249                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1250                        let mut suggested_name = prefix.to_vec();
1251                        suggested_name.push(candidate.clone());
1252
1253                        let partial =
1254                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1255                        let qualified = scx.allocate_qualified_name(partial)?;
1256                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1257                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1258                        Ok::<_, PlanError>(!item_exists && !type_exists)
1259                    })?;
1260
1261                let mut full_name = prefix.to_vec();
1262                full_name.push(item_name);
1263                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1264                let qualified_name = scx.allocate_qualified_name(full_name)?;
1265                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1266
1267                UnresolvedItemName::from(full_name.clone())
1268            }
1269        };
1270
1271        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1272
1273        // Create the subsource statement
1274        let mut progress_with_options: Vec<_> = with_options
1275            .iter()
1276            .filter_map(|opt| match opt.name {
1277                CreateSourceOptionName::TimestampInterval => None,
1278                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1279                    name: CreateSubsourceOptionName::RetainHistory,
1280                    value: opt.value.clone(),
1281                }),
1282            })
1283            .collect();
1284        progress_with_options.push(CreateSubsourceOption {
1285            name: CreateSubsourceOptionName::Progress,
1286            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1287        });
1288
1289        Some(CreateSubsourceStatement {
1290            name,
1291            columns,
1292            // Progress subsources do not refer to the source to which they belong.
1293            // Instead the primary source depends on it (the opposite is true of
1294            // ingestion exports, which depend on the primary source).
1295            of_source: None,
1296            constraints,
1297            if_not_exists: false,
1298            with_options: progress_with_options,
1299        })
1300    } else {
1301        None
1302    };
1303
1304    purify_source_format(
1305        &catalog,
1306        format,
1307        &format_options,
1308        envelope,
1309        storage_configuration,
1310    )
1311    .await?;
1312
1313    Ok(PurifiedStatement::PurifiedCreateSource {
1314        create_progress_subsource_stmt,
1315        create_source_stmt,
1316        subsources: requested_subsource_map,
1317        available_source_references: retrieved_source_references.available_source_references(),
1318    })
1319}
1320
1321/// On success, returns the details on new subsources and updated
1322/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1323async fn purify_alter_source(
1324    catalog: impl SessionCatalog,
1325    stmt: AlterSourceStatement<Aug>,
1326    storage_configuration: &StorageConfiguration,
1327) -> Result<PurifiedStatement, PlanError> {
1328    let scx = StatementContext::new(None, &catalog);
1329    let AlterSourceStatement {
1330        source_name: unresolved_source_name,
1331        action,
1332        if_exists,
1333    } = stmt;
1334
1335    // Get name.
1336    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1337        Ok(item) => item,
1338        Err(_) if if_exists => {
1339            return Ok(PurifiedStatement::PurifiedAlterSource {
1340                alter_source_stmt: AlterSourceStatement {
1341                    source_name: unresolved_source_name,
1342                    action,
1343                    if_exists,
1344                },
1345            });
1346        }
1347        Err(e) => return Err(e),
1348    };
1349
1350    // Ensure it's an ingestion-based and alterable source.
1351    let desc = match item.source_desc()? {
1352        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1353        None => {
1354            sql_bail!("cannot ALTER this type of source")
1355        }
1356    };
1357
1358    let source_name = item.name();
1359
1360    let resolved_source_name = ResolvedItemName::Item {
1361        id: item.id(),
1362        qualifiers: item.name().qualifiers.clone(),
1363        full_name: scx.catalog.resolve_full_name(source_name),
1364        print_id: true,
1365        version: RelationVersionSelector::Latest,
1366    };
1367
1368    let partial_name = scx.catalog.minimal_qualification(source_name);
1369
1370    match action {
1371        AlterSourceAction::AddSubsources {
1372            external_references,
1373            options,
1374        } => {
1375            if scx.catalog.system_vars().enable_create_table_from_source()
1376                && scx.catalog.system_vars().force_source_table_syntax()
1377            {
1378                Err(PlanError::UseTablesForSources(
1379                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1380                ))?;
1381            }
1382
1383            purify_alter_source_add_subsources(
1384                external_references,
1385                options,
1386                desc,
1387                partial_name,
1388                unresolved_source_name,
1389                resolved_source_name,
1390                storage_configuration,
1391            )
1392            .await
1393        }
1394        AlterSourceAction::RefreshReferences => {
1395            purify_alter_source_refresh_references(
1396                desc,
1397                resolved_source_name,
1398                storage_configuration,
1399            )
1400            .await
1401        }
1402        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1403            alter_source_stmt: AlterSourceStatement {
1404                source_name: unresolved_source_name,
1405                action,
1406                if_exists,
1407            },
1408        }),
1409    }
1410}
1411
1412// TODO(database-issues#8620): Remove once subsources are removed
1413/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1414async fn purify_alter_source_add_subsources(
1415    external_references: Vec<ExternalReferenceExport>,
1416    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1417    desc: SourceDesc,
1418    partial_source_name: PartialItemName,
1419    unresolved_source_name: UnresolvedItemName,
1420    resolved_source_name: ResolvedItemName,
1421    storage_configuration: &StorageConfiguration,
1422) -> Result<PurifiedStatement, PlanError> {
1423    // Validate this is a source that can have subsources added.
1424    let connection_id = match &desc.connection {
1425        GenericSourceConnection::Postgres(c) => c.connection_id,
1426        GenericSourceConnection::MySql(c) => c.connection_id,
1427        GenericSourceConnection::SqlServer(c) => c.connection_id,
1428        _ => sql_bail!(
1429            "source {} does not support ALTER SOURCE.",
1430            partial_source_name
1431        ),
1432    };
1433
1434    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1435        text_columns,
1436        exclude_columns,
1437        details,
1438        seen: _,
1439    } = options.clone().try_into()?;
1440    assert_none!(details, "details cannot be explicitly set");
1441
1442    let mut requested_subsource_map = BTreeMap::new();
1443
1444    match desc.connection {
1445        GenericSourceConnection::Postgres(pg_source_connection) => {
1446            // Get PostgresConnection for generating subsources.
1447            let pg_connection = &pg_source_connection.connection;
1448
1449            let client = pg_connection
1450                .validate(connection_id, storage_configuration)
1451                .await?;
1452
1453            let reference_client = SourceReferenceClient::Postgres {
1454                client: &client,
1455                publication: &pg_source_connection.publication,
1456                database: &pg_connection.database,
1457            };
1458            let retrieved_source_references = reference_client.get_source_references().await?;
1459
1460            let postgres::PurifiedSourceExports {
1461                source_exports: subsources,
1462                normalized_text_columns,
1463            } = postgres::purify_source_exports(
1464                &client,
1465                &retrieved_source_references,
1466                &Some(ExternalReferences::SubsetTables(external_references)),
1467                text_columns,
1468                exclude_columns,
1469                &unresolved_source_name,
1470                &SourceReferencePolicy::Required,
1471            )
1472            .await?;
1473
1474            if let Some(text_cols_option) = options
1475                .iter_mut()
1476                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1477            {
1478                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1479            }
1480
1481            requested_subsource_map.extend(subsources);
1482        }
1483        GenericSourceConnection::MySql(mysql_source_connection) => {
1484            let mysql_connection = &mysql_source_connection.connection;
1485            let config = mysql_connection
1486                .config(
1487                    &storage_configuration.connection_context.secrets_reader,
1488                    storage_configuration,
1489                    InTask::No,
1490                )
1491                .await?;
1492
1493            let mut conn = config
1494                .connect(
1495                    "mysql purification",
1496                    &storage_configuration.connection_context.ssh_tunnel_manager,
1497                )
1498                .await?;
1499
1500            // Retrieve the current @gtid_executed value of the server to mark as the effective
1501            // initial snapshot point for these subsources.
1502            let initial_gtid_set =
1503                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1504
1505            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1506
1507            let reference_client = SourceReferenceClient::MySql {
1508                conn: &mut conn,
1509                include_system_schemas: mysql::references_system_schemas(&requested_references),
1510            };
1511            let retrieved_source_references = reference_client.get_source_references().await?;
1512
1513            let mysql::PurifiedSourceExports {
1514                source_exports: subsources,
1515                normalized_text_columns,
1516                normalized_exclude_columns,
1517            } = mysql::purify_source_exports(
1518                &mut conn,
1519                &retrieved_source_references,
1520                &requested_references,
1521                text_columns,
1522                exclude_columns,
1523                &unresolved_source_name,
1524                initial_gtid_set,
1525                &SourceReferencePolicy::Required,
1526            )
1527            .await?;
1528            requested_subsource_map.extend(subsources);
1529
1530            // Update options with the purified details
1531            if let Some(text_cols_option) = options
1532                .iter_mut()
1533                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1534            {
1535                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1536            }
1537            if let Some(exclude_cols_option) = options
1538                .iter_mut()
1539                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1540            {
1541                exclude_cols_option.value =
1542                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1543            }
1544        }
1545        GenericSourceConnection::SqlServer(sql_server_source) => {
1546            // Open a connection to the upstream SQL Server instance.
1547            let sql_server_connection = &sql_server_source.connection;
1548            let config = sql_server_connection
1549                .resolve_config(
1550                    &storage_configuration.connection_context.secrets_reader,
1551                    storage_configuration,
1552                    InTask::No,
1553                )
1554                .await?;
1555            let mut client = mz_sql_server_util::Client::connect(config).await?;
1556
1557            // Query the upstream SQL Server instance for available tables to replicate.
1558            let database = sql_server_connection.database.clone().into();
1559            let source_references = SourceReferenceClient::SqlServer {
1560                client: &mut client,
1561                database: Arc::clone(&database),
1562            }
1563            .get_source_references()
1564            .await?;
1565            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1566
1567            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1568                .get(storage_configuration.config_set());
1569
1570            let result = sql_server::purify_source_exports(
1571                &*database,
1572                &mut client,
1573                &source_references,
1574                &requested_references,
1575                &text_columns,
1576                &exclude_columns,
1577                &unresolved_source_name,
1578                timeout,
1579                &SourceReferencePolicy::Required,
1580            )
1581            .await;
1582            let sql_server::PurifiedSourceExports {
1583                source_exports,
1584                normalized_text_columns,
1585                normalized_excl_columns,
1586            } = result?;
1587
1588            // Add the new exports to our subsource map.
1589            requested_subsource_map.extend(source_exports);
1590
1591            // Update options on the CREATE SOURCE statement with the purified details.
1592            if let Some(text_cols_option) = options
1593                .iter_mut()
1594                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1595            {
1596                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1597            }
1598            if let Some(exclude_cols_option) = options
1599                .iter_mut()
1600                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1601            {
1602                exclude_cols_option.value =
1603                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1604            }
1605        }
1606        _ => unreachable!(),
1607    };
1608
1609    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1610        source_name: resolved_source_name,
1611        options,
1612        subsources: requested_subsource_map,
1613    })
1614}
1615
1616async fn purify_alter_source_refresh_references(
1617    desc: SourceDesc,
1618    resolved_source_name: ResolvedItemName,
1619    storage_configuration: &StorageConfiguration,
1620) -> Result<PurifiedStatement, PlanError> {
1621    let retrieved_source_references = match desc.connection {
1622        GenericSourceConnection::Postgres(pg_source_connection) => {
1623            // Get PostgresConnection for generating subsources.
1624            let pg_connection = &pg_source_connection.connection;
1625
1626            let config = pg_connection
1627                .config(
1628                    &storage_configuration.connection_context.secrets_reader,
1629                    storage_configuration,
1630                    InTask::No,
1631                )
1632                .await?;
1633
1634            let client = config
1635                .connect(
1636                    "postgres_purification",
1637                    &storage_configuration.connection_context.ssh_tunnel_manager,
1638                )
1639                .await?;
1640            let reference_client = SourceReferenceClient::Postgres {
1641                client: &client,
1642                publication: &pg_source_connection.publication,
1643                database: &pg_connection.database,
1644            };
1645            reference_client.get_source_references().await?
1646        }
1647        GenericSourceConnection::MySql(mysql_source_connection) => {
1648            let mysql_connection = &mysql_source_connection.connection;
1649            let config = mysql_connection
1650                .config(
1651                    &storage_configuration.connection_context.secrets_reader,
1652                    storage_configuration,
1653                    InTask::No,
1654                )
1655                .await?;
1656
1657            let mut conn = config
1658                .connect(
1659                    "mysql purification",
1660                    &storage_configuration.connection_context.ssh_tunnel_manager,
1661                )
1662                .await?;
1663
1664            let reference_client = SourceReferenceClient::MySql {
1665                conn: &mut conn,
1666                include_system_schemas: false,
1667            };
1668            reference_client.get_source_references().await?
1669        }
1670        GenericSourceConnection::SqlServer(sql_server_source) => {
1671            // Open a connection to the upstream SQL Server instance.
1672            let sql_server_connection = &sql_server_source.connection;
1673            let config = sql_server_connection
1674                .resolve_config(
1675                    &storage_configuration.connection_context.secrets_reader,
1676                    storage_configuration,
1677                    InTask::No,
1678                )
1679                .await?;
1680            let mut client = mz_sql_server_util::Client::connect(config).await?;
1681
1682            // Query the upstream SQL Server instance for available tables to replicate.
1683            let source_references = SourceReferenceClient::SqlServer {
1684                client: &mut client,
1685                database: sql_server_connection.database.clone().into(),
1686            }
1687            .get_source_references()
1688            .await?;
1689            source_references
1690        }
1691        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1692            let reference_client = SourceReferenceClient::LoadGenerator {
1693                generator: &load_gen_connection.load_generator,
1694            };
1695            reference_client.get_source_references().await?
1696        }
1697        GenericSourceConnection::Kafka(kafka_conn) => {
1698            let reference_client = SourceReferenceClient::Kafka {
1699                topic: &kafka_conn.topic,
1700            };
1701            reference_client.get_source_references().await?
1702        }
1703    };
1704    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1705        source_name: resolved_source_name,
1706        available_source_references: retrieved_source_references.available_source_references(),
1707    })
1708}
1709
1710async fn purify_create_table_from_source(
1711    catalog: impl SessionCatalog,
1712    mut stmt: CreateTableFromSourceStatement<Aug>,
1713    storage_configuration: &StorageConfiguration,
1714) -> Result<PurifiedStatement, PlanError> {
1715    let scx = StatementContext::new(None, &catalog);
1716    let CreateTableFromSourceStatement {
1717        name: _,
1718        columns,
1719        constraints,
1720        source: source_name,
1721        if_not_exists: _,
1722        external_reference,
1723        format,
1724        envelope,
1725        include_metadata: _,
1726        with_options,
1727    } = &mut stmt;
1728
1729    // Columns and constraints cannot be specified by the user but will be populated below.
1730    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1731        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1732    }
1733    if !constraints.is_empty() {
1734        sql_bail!(
1735            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1736        );
1737    }
1738
1739    // Get the source item
1740    let item = match scx.get_item_by_resolved_name(source_name) {
1741        Ok(item) => item,
1742        Err(e) => return Err(e),
1743    };
1744
1745    // Ensure it's an ingestion-based and alterable source.
1746    let desc = match item.source_desc()? {
1747        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1748        None => {
1749            sql_bail!("cannot ALTER this type of source")
1750        }
1751    };
1752    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1753
1754    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1755        text_columns,
1756        exclude_columns,
1757        retain_history: _,
1758        details,
1759        partition_by: _,
1760        seen: _,
1761    } = with_options.clone().try_into()?;
1762    assert_none!(details, "details cannot be explicitly set");
1763
1764    // Our text column values are unqualified (just column names), but the purification methods below
1765    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1766    // need to qualify them using the external reference first.
1767    let qualified_text_columns = text_columns
1768        .iter()
1769        .map(|col| {
1770            UnresolvedItemName(
1771                external_reference
1772                    .as_ref()
1773                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1774                    .unwrap_or_else(|| vec![col.clone()]),
1775            )
1776        })
1777        .collect_vec();
1778    let qualified_exclude_columns = exclude_columns
1779        .iter()
1780        .map(|col| {
1781            UnresolvedItemName(
1782                external_reference
1783                    .as_ref()
1784                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1785                    .unwrap_or_else(|| vec![col.clone()]),
1786            )
1787        })
1788        .collect_vec();
1789
1790    // Should be overriden below if a source-specific format is required.
1791    let mut format_options = SourceFormatOptions::Default;
1792
1793    let retrieved_source_references: RetrievedSourceReferences;
1794
1795    let requested_references = external_reference.as_ref().map(|ref_name| {
1796        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1797            reference: ref_name.clone(),
1798            alias: None,
1799        }])
1800    });
1801
1802    // Run purification work specific to each source type: resolve the external reference to
1803    // a fully qualified name and obtain the appropriate details for the source-export statement
1804    let purified_export = match desc.connection {
1805        GenericSourceConnection::Postgres(pg_source_connection) => {
1806            // Get PostgresConnection for generating subsources.
1807            let pg_connection = &pg_source_connection.connection;
1808
1809            let client = pg_connection
1810                .validate(pg_source_connection.connection_id, storage_configuration)
1811                .await?;
1812
1813            let reference_client = SourceReferenceClient::Postgres {
1814                client: &client,
1815                publication: &pg_source_connection.publication,
1816                database: &pg_connection.database,
1817            };
1818            retrieved_source_references = reference_client.get_source_references().await?;
1819
1820            let postgres::PurifiedSourceExports {
1821                source_exports,
1822                // TODO(database-issues#8620): Remove once subsources are removed
1823                // This `normalized_text_columns` is not relevant for us and is only returned for
1824                // `CREATE SOURCE` statements that automatically generate subsources
1825                normalized_text_columns: _,
1826            } = postgres::purify_source_exports(
1827                &client,
1828                &retrieved_source_references,
1829                &requested_references,
1830                qualified_text_columns,
1831                qualified_exclude_columns,
1832                &unresolved_source_name,
1833                &SourceReferencePolicy::Required,
1834            )
1835            .await?;
1836            // There should be exactly one source_export returned for this statement
1837            let (_, purified_export) = source_exports.into_element();
1838            purified_export
1839        }
1840        GenericSourceConnection::MySql(mysql_source_connection) => {
1841            let mysql_connection = &mysql_source_connection.connection;
1842            let config = mysql_connection
1843                .config(
1844                    &storage_configuration.connection_context.secrets_reader,
1845                    storage_configuration,
1846                    InTask::No,
1847                )
1848                .await?;
1849
1850            let mut conn = config
1851                .connect(
1852                    "mysql purification",
1853                    &storage_configuration.connection_context.ssh_tunnel_manager,
1854                )
1855                .await?;
1856
1857            // Retrieve the current @gtid_executed value of the server to mark as the effective
1858            // initial snapshot point for this table.
1859            let initial_gtid_set =
1860                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1861
1862            let reference_client = SourceReferenceClient::MySql {
1863                conn: &mut conn,
1864                include_system_schemas: mysql::references_system_schemas(&requested_references),
1865            };
1866            retrieved_source_references = reference_client.get_source_references().await?;
1867
1868            let mysql::PurifiedSourceExports {
1869                source_exports,
1870                // TODO(database-issues#8620): Remove once subsources are removed
1871                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1872                // `CREATE SOURCE` statements that automatically generate subsources
1873                normalized_text_columns: _,
1874                normalized_exclude_columns: _,
1875            } = mysql::purify_source_exports(
1876                &mut conn,
1877                &retrieved_source_references,
1878                &requested_references,
1879                qualified_text_columns,
1880                qualified_exclude_columns,
1881                &unresolved_source_name,
1882                initial_gtid_set,
1883                &SourceReferencePolicy::Required,
1884            )
1885            .await?;
1886            // There should be exactly one source_export returned for this statement
1887            let (_, purified_export) = source_exports.into_element();
1888            purified_export
1889        }
1890        GenericSourceConnection::SqlServer(sql_server_source) => {
1891            let connection = sql_server_source.connection;
1892            let config = connection
1893                .resolve_config(
1894                    &storage_configuration.connection_context.secrets_reader,
1895                    storage_configuration,
1896                    InTask::No,
1897                )
1898                .await?;
1899            let mut client = mz_sql_server_util::Client::connect(config).await?;
1900
1901            let database: Arc<str> = connection.database.into();
1902            let reference_client = SourceReferenceClient::SqlServer {
1903                client: &mut client,
1904                database: Arc::clone(&database),
1905            };
1906            retrieved_source_references = reference_client.get_source_references().await?;
1907            tracing::debug!(?retrieved_source_references, "got source references");
1908
1909            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1910                .get(storage_configuration.config_set());
1911
1912            let purified_source_exports = sql_server::purify_source_exports(
1913                &*database,
1914                &mut client,
1915                &retrieved_source_references,
1916                &requested_references,
1917                &qualified_text_columns,
1918                &qualified_exclude_columns,
1919                &unresolved_source_name,
1920                timeout,
1921                &SourceReferencePolicy::Required,
1922            )
1923            .await?;
1924
1925            // There should be exactly one source_export returned for this statement
1926            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1927            purified_export
1928        }
1929        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1930            let reference_client = SourceReferenceClient::LoadGenerator {
1931                generator: &load_gen_connection.load_generator,
1932            };
1933            retrieved_source_references = reference_client.get_source_references().await?;
1934
1935            let requested_exports = retrieved_source_references
1936                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1937            // There should be exactly one source_export returned
1938            let export = requested_exports.into_element();
1939            PurifiedSourceExport {
1940                external_reference: export.external_reference,
1941                details: PurifiedExportDetails::LoadGenerator {
1942                    table: export
1943                        .meta
1944                        .load_generator_desc()
1945                        .expect("is loadgen")
1946                        .clone(),
1947                    output: export
1948                        .meta
1949                        .load_generator_output()
1950                        .expect("is loadgen")
1951                        .clone(),
1952                },
1953            }
1954        }
1955        GenericSourceConnection::Kafka(kafka_conn) => {
1956            let reference_client = SourceReferenceClient::Kafka {
1957                topic: &kafka_conn.topic,
1958            };
1959            retrieved_source_references = reference_client.get_source_references().await?;
1960            let requested_exports = retrieved_source_references
1961                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1962            // There should be exactly one source_export returned
1963            let export = requested_exports.into_element();
1964
1965            format_options = SourceFormatOptions::Kafka {
1966                topic: kafka_conn.topic.clone(),
1967            };
1968            PurifiedSourceExport {
1969                external_reference: export.external_reference,
1970                details: PurifiedExportDetails::Kafka {},
1971            }
1972        }
1973    };
1974
1975    purify_source_format(
1976        &catalog,
1977        format,
1978        &format_options,
1979        envelope,
1980        storage_configuration,
1981    )
1982    .await?;
1983
1984    // Update the external reference in the statement to the resolved fully-qualified
1985    // external reference
1986    *external_reference = Some(purified_export.external_reference.clone());
1987
1988    // Update options in the statement using the purified export details
1989    match &purified_export.details {
1990        PurifiedExportDetails::Postgres { .. } => {
1991            let mut unsupported_cols = vec![];
1992            let postgres::PostgresExportStatementValues {
1993                columns: gen_columns,
1994                constraints: gen_constraints,
1995                text_columns: gen_text_columns,
1996                exclude_columns: gen_exclude_columns,
1997                details: gen_details,
1998                external_reference: _,
1999            } = postgres::generate_source_export_statement_values(
2000                &scx,
2001                purified_export,
2002                &mut unsupported_cols,
2003            )?;
2004            if !unsupported_cols.is_empty() {
2005                unsupported_cols.sort();
2006                Err(PgSourcePurificationError::UnrecognizedTypes {
2007                    cols: unsupported_cols,
2008                })?;
2009            }
2010
2011            if let Some(text_cols_option) = with_options
2012                .iter_mut()
2013                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2014            {
2015                match gen_text_columns {
2016                    Some(gen_text_columns) => {
2017                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2018                    }
2019                    None => soft_panic_or_log!(
2020                        "text_columns should be Some if text_cols_option is present"
2021                    ),
2022                }
2023            }
2024            if let Some(exclude_cols_option) = with_options
2025                .iter_mut()
2026                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2027            {
2028                match gen_exclude_columns {
2029                    Some(gen_exclude_columns) => {
2030                        exclude_cols_option.value =
2031                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2032                    }
2033                    None => soft_panic_or_log!(
2034                        "exclude_columns should be Some if exclude_cols_option is present"
2035                    ),
2036                }
2037            }
2038            match columns {
2039                TableFromSourceColumns::Defined(_) => unreachable!(),
2040                TableFromSourceColumns::NotSpecified => {
2041                    *columns = TableFromSourceColumns::Defined(gen_columns);
2042                    *constraints = gen_constraints;
2043                }
2044                TableFromSourceColumns::Named(_) => {
2045                    sql_bail!("columns cannot be named for Postgres sources")
2046                }
2047            }
2048            with_options.push(TableFromSourceOption {
2049                name: TableFromSourceOptionName::Details,
2050                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2051                    gen_details.into_proto().encode_to_vec(),
2052                )))),
2053            })
2054        }
2055        PurifiedExportDetails::MySql { .. } => {
2056            let mysql::MySqlExportStatementValues {
2057                columns: gen_columns,
2058                constraints: gen_constraints,
2059                text_columns: gen_text_columns,
2060                exclude_columns: gen_exclude_columns,
2061                details: gen_details,
2062                external_reference: _,
2063            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2064
2065            if let Some(text_cols_option) = with_options
2066                .iter_mut()
2067                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2068            {
2069                match gen_text_columns {
2070                    Some(gen_text_columns) => {
2071                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2072                    }
2073                    None => soft_panic_or_log!(
2074                        "text_columns should be Some if text_cols_option is present"
2075                    ),
2076                }
2077            }
2078            if let Some(exclude_cols_option) = with_options
2079                .iter_mut()
2080                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2081            {
2082                match gen_exclude_columns {
2083                    Some(gen_exclude_columns) => {
2084                        exclude_cols_option.value =
2085                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2086                    }
2087                    None => soft_panic_or_log!(
2088                        "exclude_columns should be Some if exclude_cols_option is present"
2089                    ),
2090                }
2091            }
2092            match columns {
2093                TableFromSourceColumns::Defined(_) => unreachable!(),
2094                TableFromSourceColumns::NotSpecified => {
2095                    *columns = TableFromSourceColumns::Defined(gen_columns);
2096                    *constraints = gen_constraints;
2097                }
2098                TableFromSourceColumns::Named(_) => {
2099                    sql_bail!("columns cannot be named for MySQL sources")
2100                }
2101            }
2102            with_options.push(TableFromSourceOption {
2103                name: TableFromSourceOptionName::Details,
2104                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2105                    gen_details.into_proto().encode_to_vec(),
2106                )))),
2107            })
2108        }
2109        PurifiedExportDetails::SqlServer { .. } => {
2110            let sql_server::SqlServerExportStatementValues {
2111                columns: gen_columns,
2112                constraints: gen_constraints,
2113                text_columns: gen_text_columns,
2114                excl_columns: gen_excl_columns,
2115                details: gen_details,
2116                external_reference: _,
2117            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2118
2119            if let Some(text_cols_option) = with_options
2120                .iter_mut()
2121                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2122            {
2123                match gen_text_columns {
2124                    Some(gen_text_columns) => {
2125                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2126                    }
2127                    None => soft_panic_or_log!(
2128                        "text_columns should be Some if text_cols_option is present"
2129                    ),
2130                }
2131            }
2132            if let Some(exclude_cols_option) = with_options
2133                .iter_mut()
2134                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2135            {
2136                match gen_excl_columns {
2137                    Some(gen_excl_columns) => {
2138                        exclude_cols_option.value =
2139                            Some(WithOptionValue::Sequence(gen_excl_columns))
2140                    }
2141                    None => soft_panic_or_log!(
2142                        "excl_columns should be Some if excl_cols_option is present"
2143                    ),
2144                }
2145            }
2146
2147            match columns {
2148                TableFromSourceColumns::NotSpecified => {
2149                    *columns = TableFromSourceColumns::Defined(gen_columns);
2150                    *constraints = gen_constraints;
2151                }
2152                TableFromSourceColumns::Named(_) => {
2153                    sql_bail!("columns cannot be named for SQL Server sources")
2154                }
2155                TableFromSourceColumns::Defined(_) => unreachable!(),
2156            }
2157
2158            with_options.push(TableFromSourceOption {
2159                name: TableFromSourceOptionName::Details,
2160                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2161                    gen_details.into_proto().encode_to_vec(),
2162                )))),
2163            })
2164        }
2165        PurifiedExportDetails::LoadGenerator { .. } => {
2166            let (desc, output) = match purified_export.details {
2167                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2168                _ => unreachable!("purified export details must be load generator"),
2169            };
2170            // We only determine the table description for multi-output load generator sources here,
2171            // whereas single-output load generators will have their relation description
2172            // determined during statment planning as envelope and format options may affect their
2173            // schema.
2174            if let Some(desc) = desc {
2175                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2176                match columns {
2177                    TableFromSourceColumns::Defined(_) => unreachable!(),
2178                    TableFromSourceColumns::NotSpecified => {
2179                        *columns = TableFromSourceColumns::Defined(gen_columns);
2180                        *constraints = gen_constraints;
2181                    }
2182                    TableFromSourceColumns::Named(_) => {
2183                        sql_bail!("columns cannot be named for multi-output load generator sources")
2184                    }
2185                }
2186            }
2187            let details = SourceExportStatementDetails::LoadGenerator { output };
2188            with_options.push(TableFromSourceOption {
2189                name: TableFromSourceOptionName::Details,
2190                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2191                    details.into_proto().encode_to_vec(),
2192                )))),
2193            })
2194        }
2195        PurifiedExportDetails::Kafka {} => {
2196            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2197            // format field, so we don't specify any columns or constraints to be stored
2198            // on the statement here. The RelationDesc will be determined during planning.
2199            let details = SourceExportStatementDetails::Kafka {};
2200            with_options.push(TableFromSourceOption {
2201                name: TableFromSourceOptionName::Details,
2202                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2203                    details.into_proto().encode_to_vec(),
2204                )))),
2205            })
2206        }
2207    };
2208
2209    // TODO: We might as well use the retrieved available references to update the source
2210    // available references table in the catalog, so plumb this through.
2211    // available_source_references: retrieved_source_references.available_source_references(),
2212    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2213}
2214
2215enum SourceFormatOptions {
2216    Default,
2217    Kafka { topic: String },
2218}
2219
2220async fn purify_source_format(
2221    catalog: &dyn SessionCatalog,
2222    format: &mut Option<FormatSpecifier<Aug>>,
2223    options: &SourceFormatOptions,
2224    envelope: &Option<SourceEnvelope>,
2225    storage_configuration: &StorageConfiguration,
2226) -> Result<(), PlanError> {
2227    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2228        && !matches!(options, SourceFormatOptions::Kafka { .. })
2229    {
2230        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2231    }
2232
2233    match format.as_mut() {
2234        None => {}
2235        Some(FormatSpecifier::Bare(format)) => {
2236            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2237                .await?;
2238        }
2239
2240        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2241            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2242                .await?;
2243            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2244                .await?;
2245        }
2246    }
2247    Ok(())
2248}
2249
2250async fn purify_source_format_single(
2251    catalog: &dyn SessionCatalog,
2252    format: &mut Format<Aug>,
2253    options: &SourceFormatOptions,
2254    envelope: &Option<SourceEnvelope>,
2255    storage_configuration: &StorageConfiguration,
2256) -> Result<(), PlanError> {
2257    match format {
2258        Format::Avro(schema) => match schema {
2259            AvroSchema::Csr { csr_connection } => {
2260                purify_csr_connection_avro(
2261                    catalog,
2262                    options,
2263                    csr_connection,
2264                    envelope,
2265                    storage_configuration,
2266                )
2267                .await?
2268            }
2269            AvroSchema::InlineSchema { .. } => {}
2270        },
2271        Format::Protobuf(schema) => match schema {
2272            ProtobufSchema::Csr { csr_connection } => {
2273                purify_csr_connection_proto(
2274                    catalog,
2275                    options,
2276                    csr_connection,
2277                    envelope,
2278                    storage_configuration,
2279                )
2280                .await?;
2281            }
2282            ProtobufSchema::InlineSchema { .. } => {}
2283        },
2284        Format::Bytes
2285        | Format::Regex(_)
2286        | Format::Json { .. }
2287        | Format::Text
2288        | Format::Csv { .. } => (),
2289    }
2290    Ok(())
2291}
2292
2293pub fn generate_subsource_statements(
2294    scx: &StatementContext,
2295    source_name: ResolvedItemName,
2296    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2297) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2298    // get the first subsource to determine the connection type
2299    if subsources.is_empty() {
2300        return Ok(vec![]);
2301    }
2302    let (_, purified_export) = subsources.iter().next().unwrap();
2303
2304    let statements = match &purified_export.details {
2305        PurifiedExportDetails::Postgres { .. } => {
2306            crate::pure::postgres::generate_create_subsource_statements(
2307                scx,
2308                source_name,
2309                subsources,
2310            )?
2311        }
2312        PurifiedExportDetails::MySql { .. } => {
2313            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2314        }
2315        PurifiedExportDetails::SqlServer { .. } => {
2316            crate::pure::sql_server::generate_create_subsource_statements(
2317                scx,
2318                source_name,
2319                subsources,
2320            )?
2321        }
2322        PurifiedExportDetails::LoadGenerator { .. } => {
2323            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2324            for (subsource_name, purified_export) in subsources {
2325                let (desc, output) = match purified_export.details {
2326                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2327                    _ => unreachable!("purified export details must be load generator"),
2328                };
2329                let desc =
2330                    desc.expect("subsources cannot be generated for single-output load generators");
2331
2332                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2333                let details = SourceExportStatementDetails::LoadGenerator { output };
2334                // Create the subsource statement
2335                let subsource = CreateSubsourceStatement {
2336                    name: subsource_name,
2337                    columns,
2338                    of_source: Some(source_name.clone()),
2339                    // unlike sources that come from an external upstream, we
2340                    // have more leniency to introduce different constraints
2341                    // every time the load generator is run; i.e. we are not as
2342                    // worried about introducing junk data.
2343                    constraints: table_constraints,
2344                    if_not_exists: false,
2345                    with_options: vec![
2346                        CreateSubsourceOption {
2347                            name: CreateSubsourceOptionName::ExternalReference,
2348                            value: Some(WithOptionValue::UnresolvedItemName(
2349                                purified_export.external_reference,
2350                            )),
2351                        },
2352                        CreateSubsourceOption {
2353                            name: CreateSubsourceOptionName::Details,
2354                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2355                                details.into_proto().encode_to_vec(),
2356                            )))),
2357                        },
2358                    ],
2359                };
2360                subsource_stmts.push(subsource);
2361            }
2362
2363            subsource_stmts
2364        }
2365        PurifiedExportDetails::Kafka { .. } => {
2366            // TODO: as part of database-issues#8322, Kafka sources will begin
2367            // producing data––we'll need to understand the schema
2368            // of the output here.
2369            assert!(
2370                subsources.is_empty(),
2371                "Kafka sources do not produce data-bearing subsources"
2372            );
2373            vec![]
2374        }
2375    };
2376    Ok(statements)
2377}
2378
2379async fn purify_csr_connection_proto(
2380    catalog: &dyn SessionCatalog,
2381    options: &SourceFormatOptions,
2382    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2383    envelope: &Option<SourceEnvelope>,
2384    storage_configuration: &StorageConfiguration,
2385) -> Result<(), PlanError> {
2386    let SourceFormatOptions::Kafka { topic } = options else {
2387        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2388    };
2389
2390    let CsrConnectionProtobuf {
2391        seed,
2392        connection: CsrConnection {
2393            connection,
2394            options: _,
2395        },
2396    } = csr_connection;
2397    match seed {
2398        None => {
2399            let scx = StatementContext::new(None, &*catalog);
2400
2401            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2402                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2403                _ => sql_bail!("{} is not a schema registry connection", connection),
2404            };
2405
2406            let ccsr_client = ccsr_connection
2407                .connect(storage_configuration, InTask::No)
2408                .await
2409                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2410
2411            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2412            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2413                .await
2414                .ok();
2415
2416            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2417                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2418            }
2419
2420            *seed = Some(CsrSeedProtobuf { value, key });
2421        }
2422        Some(_) => (),
2423    }
2424
2425    Ok(())
2426}
2427
2428async fn purify_csr_connection_avro(
2429    catalog: &dyn SessionCatalog,
2430    options: &SourceFormatOptions,
2431    csr_connection: &mut CsrConnectionAvro<Aug>,
2432    envelope: &Option<SourceEnvelope>,
2433    storage_configuration: &StorageConfiguration,
2434) -> Result<(), PlanError> {
2435    let SourceFormatOptions::Kafka { topic } = options else {
2436        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2437    };
2438
2439    let CsrConnectionAvro {
2440        connection: CsrConnection { connection, .. },
2441        seed,
2442        key_strategy,
2443        value_strategy,
2444    } = csr_connection;
2445    if seed.is_none() {
2446        let scx = StatementContext::new(None, &*catalog);
2447        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2448            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2449            _ => sql_bail!("{} is not a schema registry connection", connection),
2450        };
2451        let ccsr_client = csr_connection
2452            .connect(storage_configuration, InTask::No)
2453            .await
2454            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2455
2456        let Schema {
2457            key_schema,
2458            value_schema,
2459        } = get_remote_csr_schema(
2460            &ccsr_client,
2461            key_strategy.clone().unwrap_or_default(),
2462            value_strategy.clone().unwrap_or_default(),
2463            topic,
2464        )
2465        .await?;
2466        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2467            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2468        }
2469
2470        *seed = Some(CsrSeedAvro {
2471            key_schema,
2472            value_schema,
2473        })
2474    }
2475
2476    Ok(())
2477}
2478
2479#[derive(Debug)]
2480pub struct Schema {
2481    pub key_schema: Option<String>,
2482    pub value_schema: String,
2483}
2484
2485async fn get_schema_with_strategy(
2486    client: &Client,
2487    strategy: ReaderSchemaSelectionStrategy,
2488    subject: &str,
2489) -> Result<Option<String>, PlanError> {
2490    match strategy {
2491        ReaderSchemaSelectionStrategy::Latest => {
2492            match client.get_schema_by_subject(subject).await {
2493                Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2494                Err(GetBySubjectError::SubjectNotFound)
2495                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2496                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2497                    schema_lookup: format!("subject {}", subject.quoted()),
2498                    cause: Arc::new(e),
2499                }),
2500            }
2501        }
2502        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2503        ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2504            Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2505            Err(GetByIdError::SchemaNotFound) => Ok(None),
2506            Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2507                schema_lookup: format!("ID {}", id),
2508                cause: Arc::new(e),
2509            }),
2510        },
2511    }
2512}
2513
2514async fn get_remote_csr_schema(
2515    ccsr_client: &mz_ccsr::Client,
2516    key_strategy: ReaderSchemaSelectionStrategy,
2517    value_strategy: ReaderSchemaSelectionStrategy,
2518    topic: &str,
2519) -> Result<Schema, PlanError> {
2520    let value_schema_name = format!("{}-value", topic);
2521    let value_schema =
2522        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2523    let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2524    let subject = format!("{}-key", topic);
2525    let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2526    Ok(Schema {
2527        key_schema,
2528        value_schema,
2529    })
2530}
2531
2532/// Collect protobuf message descriptor from CSR and compile the descriptor.
2533async fn compile_proto(
2534    subject_name: &String,
2535    ccsr_client: &Client,
2536) -> Result<CsrSeedProtobufSchema, PlanError> {
2537    let (primary_subject, dependency_subjects) = ccsr_client
2538        .get_subject_and_references(subject_name)
2539        .await
2540        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2541            schema_lookup: format!("subject {}", subject_name.quoted()),
2542            cause: Arc::new(e),
2543        })?;
2544
2545    // Compile .proto files into a file descriptor set.
2546    let mut source_tree = VirtualSourceTree::new();
2547
2548    // Add well-known types (e.g., google/protobuf/timestamp.proto) to the source
2549    // tree. These are implicitly available to protoc but are typically not
2550    // registered in the schema registry.
2551    source_tree.as_mut().map_well_known_types();
2552
2553    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2554        source_tree.as_mut().add_file(
2555            Path::new(&subject.name),
2556            subject.schema.raw.as_bytes().to_vec(),
2557        );
2558    }
2559    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2560    let fds = db
2561        .as_mut()
2562        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2563        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2564
2565    // Ensure there is exactly one message in the file.
2566    let primary_fd = fds.file(0);
2567    let message_name = match primary_fd.message_type_size() {
2568        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2569        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2570        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2571    };
2572
2573    // Encode the file descriptor set into a SQL byte string.
2574    let bytes = &fds
2575        .serialize()
2576        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2577    let mut schema = String::new();
2578    strconv::format_bytes(&mut schema, bytes);
2579
2580    Ok(CsrSeedProtobufSchema {
2581        schema,
2582        message_name,
2583    })
2584}
2585
2586const MZ_NOW_NAME: &str = "mz_now";
2587const MZ_NOW_SCHEMA: &str = "mz_catalog";
2588
2589/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2590/// references to ids appear or disappear during the purification.
2591///
2592/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2593/// this function is not making any network calls.
2594pub fn purify_create_materialized_view_options(
2595    catalog: impl SessionCatalog,
2596    mz_now: Option<Timestamp>,
2597    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2598    resolved_ids: &mut ResolvedIds,
2599) {
2600    // 0. Preparations:
2601    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2602    let (mz_now_id, mz_now_expr) = {
2603        let item = catalog
2604            .resolve_function(&PartialItemName {
2605                database: None,
2606                schema: Some(MZ_NOW_SCHEMA.to_string()),
2607                item: MZ_NOW_NAME.to_string(),
2608            })
2609            .expect("we should be able to resolve mz_now");
2610        (
2611            item.id(),
2612            Expr::Function(Function {
2613                name: ResolvedItemName::Item {
2614                    id: item.id(),
2615                    qualifiers: item.name().qualifiers.clone(),
2616                    full_name: catalog.resolve_full_name(item.name()),
2617                    print_id: false,
2618                    version: RelationVersionSelector::Latest,
2619                },
2620                args: FunctionArgs::Args {
2621                    args: Vec::new(),
2622                    order_by: Vec::new(),
2623                },
2624                filter: None,
2625                over: None,
2626                distinct: false,
2627            }),
2628        )
2629    };
2630    // Prepare the `mz_timestamp` type.
2631    let (mz_timestamp_id, mz_timestamp_type) = {
2632        let item = catalog.get_system_type("mz_timestamp");
2633        let full_name = catalog.resolve_full_name(item.name());
2634        (
2635            item.id(),
2636            ResolvedDataType::Named {
2637                id: item.id(),
2638                qualifiers: item.name().qualifiers.clone(),
2639                full_name,
2640                modifiers: vec![],
2641                print_id: true,
2642            },
2643        )
2644    };
2645
2646    let mut introduced_mz_timestamp = false;
2647
2648    for option in cmvs.with_options.iter_mut() {
2649        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2650        if matches!(
2651            option.value,
2652            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2653        ) {
2654            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2655                RefreshAtOptionValue {
2656                    time: mz_now_expr.clone(),
2657                },
2658            )));
2659        }
2660
2661        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2662        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2663            RefreshEveryOptionValue { aligned_to, .. },
2664        ))) = &mut option.value
2665        {
2666            if aligned_to.is_none() {
2667                *aligned_to = Some(mz_now_expr.clone());
2668            }
2669        }
2670
2671        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2672        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2673        match &mut option.value {
2674            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2675                time,
2676            }))) => {
2677                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2678                visitor.visit_expr_mut(time);
2679                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2680            }
2681            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2682                RefreshEveryOptionValue {
2683                    interval: _,
2684                    aligned_to: Some(aligned_to),
2685                },
2686            ))) => {
2687                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2688                visitor.visit_expr_mut(aligned_to);
2689                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2690            }
2691            _ => {}
2692        }
2693    }
2694
2695    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2696    if !cmvs.with_options.iter().any(|o| {
2697        matches!(
2698            o,
2699            MaterializedViewOption {
2700                value: Some(WithOptionValue::Refresh(..)),
2701                ..
2702            }
2703        )
2704    }) {
2705        cmvs.with_options.push(MaterializedViewOption {
2706            name: MaterializedViewOptionName::Refresh,
2707            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2708        })
2709    }
2710
2711    // 5. Attend to `resolved_ids`: The purification might have
2712    // - added references to `mz_timestamp`;
2713    // - removed references to `mz_now`.
2714    if introduced_mz_timestamp {
2715        resolved_ids.add_item(mz_timestamp_id);
2716    }
2717    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2718    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2719    // for `mz_now()` everywhere.
2720    let mut visitor = ExprContainsTemporalVisitor::new();
2721    visitor.visit_create_materialized_view_statement(cmvs);
2722    if !visitor.contains_temporal {
2723        resolved_ids.remove_item(&mz_now_id);
2724    }
2725}
2726
2727/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2728/// after purification.
2729pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2730    match &mvo.value {
2731        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2732            let mut visitor = ExprContainsTemporalVisitor::new();
2733            visitor.visit_expr(time);
2734            visitor.contains_temporal
2735        }
2736        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2737            interval: _,
2738            aligned_to: Some(aligned_to),
2739        }))) => {
2740            let mut visitor = ExprContainsTemporalVisitor::new();
2741            visitor.visit_expr(aligned_to);
2742            visitor.contains_temporal
2743        }
2744        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2745            interval: _,
2746            aligned_to: None,
2747        }))) => {
2748            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2749            // `ALIGNED TO` to `mz_now()`.
2750            true
2751        }
2752        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2753            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2754            true
2755        }
2756        _ => false,
2757    }
2758}
2759
2760/// Determines whether the AST involves `mz_now()`.
2761struct ExprContainsTemporalVisitor {
2762    pub contains_temporal: bool,
2763}
2764
2765impl ExprContainsTemporalVisitor {
2766    pub fn new() -> ExprContainsTemporalVisitor {
2767        ExprContainsTemporalVisitor {
2768            contains_temporal: false,
2769        }
2770    }
2771}
2772
2773impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2774    fn visit_function(&mut self, func: &Function<Aug>) {
2775        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2776        visit_function(self, func);
2777    }
2778}
2779
2780struct MzNowPurifierVisitor {
2781    pub mz_now: Option<Timestamp>,
2782    pub mz_timestamp_type: ResolvedDataType,
2783    pub introduced_mz_timestamp: bool,
2784}
2785
2786impl MzNowPurifierVisitor {
2787    pub fn new(
2788        mz_now: Option<Timestamp>,
2789        mz_timestamp_type: ResolvedDataType,
2790    ) -> MzNowPurifierVisitor {
2791        MzNowPurifierVisitor {
2792            mz_now,
2793            mz_timestamp_type,
2794            introduced_mz_timestamp: false,
2795        }
2796    }
2797}
2798
2799impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2800    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2801        match expr {
2802            Expr::Function(Function {
2803                name:
2804                    ResolvedItemName::Item {
2805                        full_name: FullItemName { item, .. },
2806                        ..
2807                    },
2808                ..
2809            }) if item == &MZ_NOW_NAME.to_string() => {
2810                let mz_now = self.mz_now.expect(
2811                    "we should have chosen a timestamp if the expression contains mz_now()",
2812                );
2813                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2814                // not alter the type of the expression.
2815                *expr = Expr::Cast {
2816                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2817                    data_type: self.mz_timestamp_type.clone(),
2818                };
2819                self.introduced_mz_timestamp = true;
2820            }
2821            _ => visit_expr_mut(self, expr),
2822        }
2823    }
2824}