mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
62};
63use prost::Message;
64use protobuf_native::MessageLite;
65use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
66use rdkafka::admin::AdminClient;
67use references::{RetrievedSourceReferences, SourceReferenceClient};
68use uuid::Uuid;
69
70use crate::ast::{
71    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
72    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
73    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
74};
75use crate::catalog::{CatalogItemType, SessionCatalog};
76use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
77use crate::names::{
78    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
79    ResolvedItemName,
80};
81use crate::plan::error::PlanError;
82use crate::plan::statement::ddl::load_generator_ast_to_generator;
83use crate::plan::{SourceReferences, StatementContext};
84use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
85use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
86use crate::{kafka_util, normalize};
87
88use self::error::{
89    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
90    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
91};
92
93pub(crate) mod error;
94mod references;
95
96pub mod mysql;
97pub mod postgres;
98pub mod sql_server;
99
100pub(crate) struct RequestedSourceExport<T> {
101    external_reference: UnresolvedItemName,
102    name: UnresolvedItemName,
103    meta: T,
104}
105
106impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.debug_struct("RequestedSourceExport")
109            .field("external_reference", &self.external_reference)
110            .field("name", &self.name)
111            .field("meta", &self.meta)
112            .finish()
113    }
114}
115
116impl<T> RequestedSourceExport<T> {
117    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
118        RequestedSourceExport {
119            external_reference: self.external_reference,
120            name: self.name,
121            meta: new_meta,
122        }
123    }
124}
125
126/// Generates a subsource name by prepending source schema name if present
127///
128/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
129/// so that it's generated in the same schema as source
130fn source_export_name_gen(
131    source_name: &UnresolvedItemName,
132    subsource_name: &str,
133) -> Result<UnresolvedItemName, PlanError> {
134    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
135    partial.item = subsource_name.to_string();
136    Ok(UnresolvedItemName::from(partial))
137}
138
139// TODO(database-issues#8620): Remove once subsources are removed
140/// Validates the requested subsources do not have name conflicts with each other
141/// and that the same upstream table is not referenced multiple times.
142fn validate_source_export_names<T>(
143    requested_source_exports: &[RequestedSourceExport<T>],
144) -> Result<(), PlanError> {
145    // This condition would get caught during the catalog transaction, but produces a
146    // vague, non-contextual error. Instead, error here so we can suggest to the user
147    // how to fix the problem.
148    if let Some(name) = requested_source_exports
149        .iter()
150        .map(|subsource| &subsource.name)
151        .duplicates()
152        .next()
153        .cloned()
154    {
155        let mut upstream_references: Vec<_> = requested_source_exports
156            .into_iter()
157            .filter_map(|subsource| {
158                if &subsource.name == &name {
159                    Some(subsource.external_reference.clone())
160                } else {
161                    None
162                }
163            })
164            .collect();
165
166        upstream_references.sort();
167
168        Err(PlanError::SubsourceNameConflict {
169            name,
170            upstream_references,
171        })?;
172    }
173
174    // We disallow subsource statements from referencing the same upstream table, but we allow
175    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
176    // purification will only provide 1 `requested_source_export`, we can leave this here without
177    // needing to differentiate between the two types of statements.
178    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
179    if let Some(name) = requested_source_exports
180        .iter()
181        .map(|export| &export.external_reference)
182        .duplicates()
183        .next()
184        .cloned()
185    {
186        let mut target_names: Vec<_> = requested_source_exports
187            .into_iter()
188            .filter_map(|export| {
189                if &export.external_reference == &name {
190                    Some(export.name.clone())
191                } else {
192                    None
193                }
194            })
195            .collect();
196
197        target_names.sort();
198
199        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
200    }
201
202    Ok(())
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum PurifiedStatement {
207    PurifiedCreateSource {
208        // The progress subsource, if we are offloading progress info to a separate relation
209        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
210        create_source_stmt: CreateSourceStatement<Aug>,
211        // Map of subsource names to external details
212        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213        /// All the available upstream references that can be added as tables
214        /// to this primary source.
215        available_source_references: SourceReferences,
216    },
217    PurifiedAlterSource {
218        alter_source_stmt: AlterSourceStatement<Aug>,
219    },
220    PurifiedAlterSourceAddSubsources {
221        // This just saves us an annoying catalog lookup
222        source_name: ResolvedItemName,
223        /// Options that we will need the values of to update the source's
224        /// definition.
225        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226        // Map of subsource names to external details
227        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228    },
229    PurifiedAlterSourceRefreshReferences {
230        source_name: ResolvedItemName,
231        /// The updated available upstream references for the primary source.
232        available_source_references: SourceReferences,
233    },
234    PurifiedCreateSink(CreateSinkStatement<Aug>),
235    PurifiedCreateTableFromSource {
236        stmt: CreateTableFromSourceStatement<Aug>,
237    },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242    pub external_reference: UnresolvedItemName,
243    pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248    MySql {
249        table: MySqlTableDesc,
250        text_columns: Option<Vec<Ident>>,
251        exclude_columns: Option<Vec<Ident>>,
252        initial_gtid_set: String,
253    },
254    Postgres {
255        table: PostgresTableDesc,
256        text_columns: Option<Vec<Ident>>,
257        exclude_columns: Option<Vec<Ident>>,
258    },
259    SqlServer {
260        table: SqlServerTableDesc,
261        text_columns: Option<Vec<Ident>>,
262        excl_columns: Option<Vec<Ident>>,
263        capture_instance: Arc<str>,
264        initial_lsn: mz_sql_server_util::cdc::Lsn,
265    },
266    Kafka {},
267    LoadGenerator {
268        table: Option<RelationDesc>,
269        output: LoadGeneratorOutput,
270    },
271}
272
273/// Purifies a statement, removing any dependencies on external state.
274///
275/// See the section on [purification](crate#purification) in the crate
276/// documentation for details.
277///
278/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
279/// handled by [purify_create_materialized_view_options] instead.
280/// This could be made more consistent by a refactoring discussed here:
281/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
282pub async fn purify_statement(
283    catalog: impl SessionCatalog,
284    now: u64,
285    stmt: Statement<Aug>,
286    storage_configuration: &StorageConfiguration,
287) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
288    match stmt {
289        Statement::CreateSource(stmt) => {
290            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
291            (
292                purify_create_source(catalog, now, stmt, storage_configuration).await,
293                cluster_id,
294            )
295        }
296        Statement::AlterSource(stmt) => (
297            purify_alter_source(catalog, stmt, storage_configuration).await,
298            None,
299        ),
300        Statement::CreateSink(stmt) => {
301            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
302            (
303                purify_create_sink(catalog, stmt, storage_configuration).await,
304                cluster_id,
305            )
306        }
307        Statement::CreateTableFromSource(stmt) => (
308            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
309            None,
310        ),
311        o => unreachable!("{:?} does not need to be purified", o),
312    }
313}
314
315/// Injects `DOC ON` comments into all Avro formats that are using a schema
316/// registry by finding all SQL `COMMENT`s that are attached to the sink's
317/// underlying materialized view (as well as any types referenced by that
318/// underlying materialized view).
319pub(crate) fn purify_create_sink_avro_doc_on_options(
320    catalog: &dyn SessionCatalog,
321    from_id: CatalogItemId,
322    format: &mut Option<FormatSpecifier<Aug>>,
323) -> Result<(), PlanError> {
324    // Collect all objects referenced by the sink.
325    let from = catalog.get_item(&from_id);
326    let object_ids = from
327        .references()
328        .items()
329        .copied()
330        .chain_one(from.id())
331        .collect::<Vec<_>>();
332
333    // Collect all Avro formats that use a schema registry, as well as a set of
334    // all identifiers named in user-provided `DOC ON` options.
335    let mut avro_format_options = vec![];
336    for_each_format(format, |doc_on_schema, fmt| match fmt {
337        Format::Avro(AvroSchema::InlineSchema { .. })
338        | Format::Bytes
339        | Format::Csv { .. }
340        | Format::Json { .. }
341        | Format::Protobuf(..)
342        | Format::Regex(..)
343        | Format::Text => (),
344        Format::Avro(AvroSchema::Csr {
345            csr_connection: CsrConnectionAvro { connection, .. },
346        }) => {
347            avro_format_options.push((doc_on_schema, &mut connection.options));
348        }
349    });
350
351    // For each Avro format in the sink, inject the appropriate `DOC ON` options
352    // for each item referenced by the sink.
353    for (for_schema, options) in avro_format_options {
354        let user_provided_comments = options
355            .iter()
356            .filter_map(|CsrConfigOption { name, .. }| match name {
357                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
358                _ => None,
359            })
360            .collect::<BTreeSet<_>>();
361
362        // Adding existing comments if not already provided by user
363        for object_id in &object_ids {
364            // Always add comments to the latest version of the item.
365            let item = catalog
366                .get_item(object_id)
367                .at_version(RelationVersionSelector::Latest);
368            let full_resolved_name = ResolvedItemName::Item {
369                id: *object_id,
370                qualifiers: item.name().qualifiers.clone(),
371                full_name: catalog.resolve_full_name(item.name()),
372                print_id: !matches!(
373                    item.item_type(),
374                    CatalogItemType::Func | CatalogItemType::Type
375                ),
376                version: RelationVersionSelector::Latest,
377            };
378
379            if let Some(comments_map) = catalog.get_item_comments(object_id) {
380                // Attach comment for the item itself, if the user has not
381                // already provided an overriding `DOC ON` option for the item.
382                let doc_on_item_key = AvroDocOn {
383                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384                    for_schema,
385                };
386                if !user_provided_comments.contains(&doc_on_item_key) {
387                    if let Some(root_comment) = comments_map.get(&None) {
388                        options.push(CsrConfigOption {
389                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391                                root_comment.clone(),
392                            ))),
393                        });
394                    }
395                }
396
397                // Attach comment for each column in the item, if the user has
398                // not already provided an overriding `DOC ON` option for the
399                // column.
400                let column_descs = match item.type_details() {
401                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
402                    None => item.relation_desc().map(|d| d.into_owned()),
403                };
404
405                if let Some(desc) = column_descs {
406                    for (pos, column_name) in desc.iter_names().enumerate() {
407                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
408                            let doc_on_column_key = AvroDocOn {
409                                identifier: DocOnIdentifier::Column(ColumnName {
410                                    relation: full_resolved_name.clone(),
411                                    column: ResolvedColumnReference::Column {
412                                        name: column_name.to_owned(),
413                                        index: pos,
414                                    },
415                                }),
416                                for_schema,
417                            };
418                            if !user_provided_comments.contains(&doc_on_column_key) {
419                                options.push(CsrConfigOption {
420                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
421                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
422                                        Value::String(comment_str.clone()),
423                                    )),
424                                });
425                            }
426                        }
427                    }
428                }
429            }
430        }
431    }
432
433    Ok(())
434}
435
436/// Checks that the sink described in the statement can connect to its external
437/// resources.
438async fn purify_create_sink(
439    catalog: impl SessionCatalog,
440    mut create_sink_stmt: CreateSinkStatement<Aug>,
441    storage_configuration: &StorageConfiguration,
442) -> Result<PurifiedStatement, PlanError> {
443    // General purification
444    let CreateSinkStatement {
445        connection,
446        format,
447        with_options,
448        name: _,
449        in_cluster: _,
450        if_not_exists: _,
451        from,
452        envelope: _,
453    } = &mut create_sink_stmt;
454
455    // The list of options that the user is allowed to specify.
456    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
457        CreateSinkOptionName::Snapshot,
458        CreateSinkOptionName::CommitInterval,
459    ];
460
461    if let Some(op) = with_options
462        .iter()
463        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
464    {
465        sql_bail!(
466            "CREATE SINK...WITH ({}..) is not allowed",
467            op.name.to_ast_string_simple(),
468        )
469    }
470
471    match &connection {
472        CreateSinkConnection::Kafka {
473            connection,
474            options,
475            key: _,
476            headers: _,
477        } => {
478            // We must not leave any state behind in the Kafka broker, so just ensure that
479            // we can connect. This means we don't ensure that we can create the topic and
480            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
481            // preferable to leaking state in users' environments.
482            let scx = StatementContext::new(None, &catalog);
483            let connection = {
484                let item = scx.get_item_by_resolved_name(connection)?;
485                // Get Kafka connection
486                match item.connection()? {
487                    Connection::Kafka(connection) => {
488                        connection.clone().into_inline_connection(scx.catalog)
489                    }
490                    _ => sql_bail!(
491                        "{} is not a kafka connection",
492                        scx.catalog.resolve_full_name(item.name())
493                    ),
494                }
495            };
496
497            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
498
499            if extracted_options.legacy_ids == Some(true) {
500                sql_bail!("LEGACY IDs option is not supported");
501            }
502
503            let client: AdminClient<_> = connection
504                .create_with_context(
505                    storage_configuration,
506                    MzClientContext::default(),
507                    &BTreeMap::new(),
508                    InTask::No,
509                )
510                .await
511                .map_err(|e| {
512                    // anyhow doesn't support Clone, so not trivial to move into PlanError
513                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
514                })?;
515
516            let metadata = client
517                .inner()
518                .fetch_metadata(
519                    None,
520                    storage_configuration
521                        .parameters
522                        .kafka_timeout_config
523                        .fetch_metadata_timeout,
524                )
525                .map_err(|e| {
526                    KafkaSinkPurificationError::AdminClientError(Arc::new(
527                        ContextCreationError::KafkaError(e),
528                    ))
529                })?;
530
531            if metadata.brokers().len() == 0 {
532                Err(KafkaSinkPurificationError::ZeroBrokers)?;
533            }
534        }
535        CreateSinkConnection::Iceberg {
536            connection,
537            aws_connection,
538            ..
539        } => {
540            let scx = StatementContext::new(None, &catalog);
541            let connection = {
542                let item = scx.get_item_by_resolved_name(connection)?;
543                // Get Iceberg connection
544                match item.connection()? {
545                    Connection::IcebergCatalog(connection) => {
546                        connection.clone().into_inline_connection(scx.catalog)
547                    }
548                    _ => sql_bail!(
549                        "{} is not an iceberg connection",
550                        scx.catalog.resolve_full_name(item.name())
551                    ),
552                }
553            };
554
555            let aws_conn_id = aws_connection.item_id();
556
557            let aws_connection = {
558                let item = scx.get_item_by_resolved_name(aws_connection)?;
559                // Get AWS connection
560                match item.connection()? {
561                    Connection::Aws(aws_connection) => aws_connection.clone(),
562                    _ => sql_bail!(
563                        "{} is not an aws connection",
564                        scx.catalog.resolve_full_name(item.name())
565                    ),
566                }
567            };
568
569            let _catalog = connection
570                .connect(storage_configuration, InTask::No)
571                .await
572                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
573
574            let _sdk_config = aws_connection
575                .load_sdk_config(
576                    &storage_configuration.connection_context,
577                    aws_conn_id.clone(),
578                    InTask::No,
579                )
580                .await
581                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
582        }
583    }
584
585    let mut csr_connection_ids = BTreeSet::new();
586    for_each_format(format, |_, fmt| match fmt {
587        Format::Avro(AvroSchema::InlineSchema { .. })
588        | Format::Bytes
589        | Format::Csv { .. }
590        | Format::Json { .. }
591        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
592        | Format::Regex(..)
593        | Format::Text => (),
594        Format::Avro(AvroSchema::Csr {
595            csr_connection: CsrConnectionAvro { connection, .. },
596        })
597        | Format::Protobuf(ProtobufSchema::Csr {
598            csr_connection: CsrConnectionProtobuf { connection, .. },
599        }) => {
600            csr_connection_ids.insert(*connection.connection.item_id());
601        }
602    });
603
604    let scx = StatementContext::new(None, &catalog);
605    for csr_connection_id in csr_connection_ids {
606        let connection = {
607            let item = scx.get_item(&csr_connection_id);
608            // Get Kafka connection
609            match item.connection()? {
610                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
611                _ => Err(CsrPurificationError::NotCsrConnection(
612                    scx.catalog.resolve_full_name(item.name()),
613                ))?,
614            }
615        };
616
617        let client = connection
618            .connect(storage_configuration, InTask::No)
619            .await
620            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
621
622        client
623            .list_subjects()
624            .await
625            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
626    }
627
628    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
629
630    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
631}
632
633/// Runs a function on each format within the format specifier.
634///
635/// The function is provided with a `DocOnSchema` that indicates whether the
636/// format is for the key, value, or both.
637///
638// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
639fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
640where
641    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
642{
643    match format {
644        None => (),
645        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
646        Some(FormatSpecifier::KeyValue { key, value }) => {
647            f(DocOnSchema::KeyOnly, key);
648            f(DocOnSchema::ValueOnly, value);
649        }
650    }
651}
652
653/// Defines whether purification should enforce that at least one valid source
654/// reference is provided on the provided statement.
655#[derive(Debug, Copy, Clone, PartialEq, Eq)]
656pub(crate) enum SourceReferencePolicy {
657    /// Don't allow source references to be provided. This is used for
658    /// enforcing that `CREATE SOURCE` statements don't create subsources.
659    NotAllowed,
660    /// Allow empty references, such as when creating a source that
661    /// will have tables added afterwards.
662    Optional,
663    /// Require that at least one reference is resolved to an upstream
664    /// object.
665    Required,
666}
667
668async fn purify_create_source(
669    catalog: impl SessionCatalog,
670    now: u64,
671    mut create_source_stmt: CreateSourceStatement<Aug>,
672    storage_configuration: &StorageConfiguration,
673) -> Result<PurifiedStatement, PlanError> {
674    let CreateSourceStatement {
675        name: source_name,
676        col_names,
677        key_constraint,
678        connection: source_connection,
679        format,
680        envelope,
681        include_metadata,
682        external_references,
683        progress_subsource,
684        with_options,
685        ..
686    } = &mut create_source_stmt;
687
688    let uses_old_syntax = !col_names.is_empty()
689        || key_constraint.is_some()
690        || format.is_some()
691        || envelope.is_some()
692        || !include_metadata.is_empty()
693        || external_references.is_some()
694        || progress_subsource.is_some();
695
696    if let Some(DeferredItemName::Named(_)) = progress_subsource {
697        sql_bail!("Cannot manually ID qualify progress subsource")
698    }
699
700    let mut requested_subsource_map = BTreeMap::new();
701
702    let progress_desc = match &source_connection {
703        CreateSourceConnection::Kafka { .. } => {
704            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
705        }
706        CreateSourceConnection::Postgres { .. } => {
707            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
708        }
709        CreateSourceConnection::SqlServer { .. } => {
710            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
711        }
712        CreateSourceConnection::MySql { .. } => {
713            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
714        }
715        CreateSourceConnection::LoadGenerator { .. } => {
716            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
717        }
718    };
719    let scx = StatementContext::new(None, &catalog);
720
721    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
722    // to add tables after this source is created we might need to enforce that
723    // auto-generated subsources are created or not created by this source statement.
724    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
725        && scx.catalog.system_vars().force_source_table_syntax()
726    {
727        SourceReferencePolicy::NotAllowed
728    } else if scx.catalog.system_vars().enable_create_table_from_source() {
729        SourceReferencePolicy::Optional
730    } else {
731        SourceReferencePolicy::Required
732    };
733
734    let mut format_options = SourceFormatOptions::Default;
735
736    let retrieved_source_references: RetrievedSourceReferences;
737
738    match source_connection {
739        CreateSourceConnection::Kafka {
740            connection,
741            options: base_with_options,
742            ..
743        } => {
744            if let Some(external_references) = external_references {
745                Err(KafkaSourcePurificationError::ReferencedSubsources(
746                    external_references.clone(),
747                ))?;
748            }
749
750            let connection = {
751                let item = scx.get_item_by_resolved_name(connection)?;
752                // Get Kafka connection
753                match item.connection()? {
754                    Connection::Kafka(connection) => {
755                        connection.clone().into_inline_connection(&catalog)
756                    }
757                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
758                        scx.catalog.resolve_full_name(item.name()),
759                    ))?,
760                }
761            };
762
763            let extracted_options: KafkaSourceConfigOptionExtracted =
764                base_with_options.clone().try_into()?;
765
766            let topic = extracted_options
767                .topic
768                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
769
770            let consumer = connection
771                .create_with_context(
772                    storage_configuration,
773                    MzClientContext::default(),
774                    &BTreeMap::new(),
775                    InTask::No,
776                )
777                .await
778                .map_err(|e| {
779                    // anyhow doesn't support Clone, so not trivial to move into PlanError
780                    KafkaSourcePurificationError::KafkaConsumerError(
781                        e.display_with_causes().to_string(),
782                    )
783                })?;
784            let consumer = Arc::new(consumer);
785
786            match (
787                extracted_options.start_offset,
788                extracted_options.start_timestamp,
789            ) {
790                (None, None) => {
791                    // Validate that the topic at least exists.
792                    kafka_util::ensure_topic_exists(
793                        Arc::clone(&consumer),
794                        &topic,
795                        storage_configuration
796                            .parameters
797                            .kafka_timeout_config
798                            .fetch_metadata_timeout,
799                    )
800                    .await?;
801                }
802                (Some(_), Some(_)) => {
803                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
804                }
805                (Some(start_offsets), None) => {
806                    // Validate the start offsets.
807                    kafka_util::validate_start_offsets(
808                        Arc::clone(&consumer),
809                        &topic,
810                        start_offsets,
811                        storage_configuration
812                            .parameters
813                            .kafka_timeout_config
814                            .fetch_metadata_timeout,
815                    )
816                    .await?;
817                }
818                (None, Some(time_offset)) => {
819                    // Translate `START TIMESTAMP` to a start offset.
820                    let start_offsets = kafka_util::lookup_start_offsets(
821                        Arc::clone(&consumer),
822                        &topic,
823                        time_offset,
824                        now,
825                        storage_configuration
826                            .parameters
827                            .kafka_timeout_config
828                            .fetch_metadata_timeout,
829                    )
830                    .await?;
831
832                    base_with_options.retain(|val| {
833                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
834                    });
835                    base_with_options.push(KafkaSourceConfigOption {
836                        name: KafkaSourceConfigOptionName::StartOffset,
837                        value: Some(WithOptionValue::Sequence(
838                            start_offsets
839                                .iter()
840                                .map(|offset| {
841                                    WithOptionValue::Value(Value::Number(offset.to_string()))
842                                })
843                                .collect(),
844                        )),
845                    });
846                }
847            }
848
849            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
850            retrieved_source_references = reference_client.get_source_references().await?;
851
852            format_options = SourceFormatOptions::Kafka { topic };
853        }
854        CreateSourceConnection::Postgres {
855            connection,
856            options,
857        } => {
858            let connection_item = scx.get_item_by_resolved_name(connection)?;
859            let connection = match connection_item.connection().map_err(PlanError::from)? {
860                Connection::Postgres(connection) => {
861                    connection.clone().into_inline_connection(&catalog)
862                }
863                _ => Err(PgSourcePurificationError::NotPgConnection(
864                    scx.catalog.resolve_full_name(connection_item.name()),
865                ))?,
866            };
867            let crate::plan::statement::PgConfigOptionExtracted {
868                publication,
869                text_columns,
870                exclude_columns,
871                details,
872                ..
873            } = options.clone().try_into()?;
874            let publication =
875                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
876
877            if details.is_some() {
878                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
879            }
880
881            let client = connection
882                .validate(connection_item.id(), storage_configuration)
883                .await?;
884
885            let reference_client = SourceReferenceClient::Postgres {
886                client: &client,
887                publication: &publication,
888                database: &connection.database,
889            };
890            retrieved_source_references = reference_client.get_source_references().await?;
891
892            let postgres::PurifiedSourceExports {
893                source_exports: subsources,
894                normalized_text_columns,
895            } = postgres::purify_source_exports(
896                &client,
897                &retrieved_source_references,
898                external_references,
899                text_columns,
900                exclude_columns,
901                source_name,
902                &reference_policy,
903            )
904            .await?;
905
906            if let Some(text_cols_option) = options
907                .iter_mut()
908                .find(|option| option.name == PgConfigOptionName::TextColumns)
909            {
910                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
911            }
912
913            requested_subsource_map.extend(subsources);
914
915            // Record the active replication timeline_id to allow detection of a future upstream
916            // point-in-time-recovery that will put the source into an error state.
917            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
918
919            // Remove any old detail references
920            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
921            let details = PostgresSourcePublicationDetails {
922                slot: format!(
923                    "materialize_{}",
924                    Uuid::new_v4().to_string().replace('-', "")
925                ),
926                timeline_id: Some(timeline_id),
927                database: connection.database,
928            };
929            options.push(PgConfigOption {
930                name: PgConfigOptionName::Details,
931                value: Some(WithOptionValue::Value(Value::String(hex::encode(
932                    details.into_proto().encode_to_vec(),
933                )))),
934            })
935        }
936        CreateSourceConnection::SqlServer {
937            connection,
938            options,
939        } => {
940            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
941
942            // Connect to the upstream SQL Server instance so we can validate
943            // we're compatible with CDC.
944            let connection_item = scx.get_item_by_resolved_name(connection)?;
945            let connection = match connection_item.connection()? {
946                Connection::SqlServer(connection) => {
947                    connection.clone().into_inline_connection(&catalog)
948                }
949                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
950                    scx.catalog.resolve_full_name(connection_item.name()),
951                ))?,
952            };
953            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
954                details,
955                text_columns,
956                exclude_columns,
957                seen: _,
958            } = options.clone().try_into()?;
959
960            if details.is_some() {
961                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
962            }
963
964            let mut client = connection
965                .validate(connection_item.id(), storage_configuration)
966                .await?;
967
968            let database: Arc<str> = connection.database.into();
969            let reference_client = SourceReferenceClient::SqlServer {
970                client: &mut client,
971                database: Arc::clone(&database),
972            };
973            retrieved_source_references = reference_client.get_source_references().await?;
974            tracing::debug!(?retrieved_source_references, "got source references");
975
976            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
977                .get(storage_configuration.config_set());
978
979            let purified_source_exports = sql_server::purify_source_exports(
980                &*database,
981                &mut client,
982                &retrieved_source_references,
983                external_references,
984                &text_columns,
985                &exclude_columns,
986                source_name,
987                timeout,
988                &reference_policy,
989            )
990            .await?;
991
992            let sql_server::PurifiedSourceExports {
993                source_exports,
994                normalized_text_columns,
995                normalized_excl_columns,
996            } = purified_source_exports;
997
998            // Update our set of requested source exports.
999            requested_subsource_map.extend(source_exports);
1000
1001            // Record the most recent restore_history_id, or none if the system has never been
1002            // restored.
1003
1004            let restore_history_id =
1005                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1006            let details = SqlServerSourceExtras { restore_history_id };
1007
1008            options.retain(|SqlServerConfigOption { name, .. }| {
1009                name != &SqlServerConfigOptionName::Details
1010            });
1011            options.push(SqlServerConfigOption {
1012                name: SqlServerConfigOptionName::Details,
1013                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1014                    details.into_proto().encode_to_vec(),
1015                )))),
1016            });
1017
1018            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1019            if let Some(text_cols_option) = options
1020                .iter_mut()
1021                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1022            {
1023                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1024            }
1025            if let Some(excl_cols_option) = options
1026                .iter_mut()
1027                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1028            {
1029                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1030            }
1031        }
1032        CreateSourceConnection::MySql {
1033            connection,
1034            options,
1035        } => {
1036            let connection_item = scx.get_item_by_resolved_name(connection)?;
1037            let connection = match connection_item.connection()? {
1038                Connection::MySql(connection) => {
1039                    connection.clone().into_inline_connection(&catalog)
1040                }
1041                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1042                    scx.catalog.resolve_full_name(connection_item.name()),
1043                ))?,
1044            };
1045            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1046                details,
1047                text_columns,
1048                exclude_columns,
1049                seen: _,
1050            } = options.clone().try_into()?;
1051
1052            if details.is_some() {
1053                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1054            }
1055
1056            let mut conn = connection
1057                .validate(connection_item.id(), storage_configuration)
1058                .await
1059                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1060
1061            // Retrieve the current @gtid_executed value of the server to mark as the effective
1062            // initial snapshot point such that we can ensure consistency if the initial source
1063            // snapshot is broken up over multiple points in time.
1064            let initial_gtid_set =
1065                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1066
1067            let reference_client = SourceReferenceClient::MySql {
1068                conn: &mut conn,
1069                include_system_schemas: mysql::references_system_schemas(external_references),
1070            };
1071            retrieved_source_references = reference_client.get_source_references().await?;
1072
1073            let mysql::PurifiedSourceExports {
1074                source_exports: subsources,
1075                normalized_text_columns,
1076                normalized_exclude_columns,
1077            } = mysql::purify_source_exports(
1078                &mut conn,
1079                &retrieved_source_references,
1080                external_references,
1081                text_columns,
1082                exclude_columns,
1083                source_name,
1084                initial_gtid_set.clone(),
1085                &reference_policy,
1086            )
1087            .await?;
1088            requested_subsource_map.extend(subsources);
1089
1090            // We don't have any fields in this details struct but keep this around for
1091            // conformity with postgres and in-case we end up needing it again in the future.
1092            let details = MySqlSourceDetails {};
1093            // Update options with the purified details
1094            options
1095                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1096            options.push(MySqlConfigOption {
1097                name: MySqlConfigOptionName::Details,
1098                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1099                    details.into_proto().encode_to_vec(),
1100                )))),
1101            });
1102
1103            if let Some(text_cols_option) = options
1104                .iter_mut()
1105                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1106            {
1107                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1108            }
1109            if let Some(exclude_cols_option) = options
1110                .iter_mut()
1111                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1112            {
1113                exclude_cols_option.value =
1114                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1115            }
1116        }
1117        CreateSourceConnection::LoadGenerator { generator, options } => {
1118            let load_generator =
1119                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1120
1121            let reference_client = SourceReferenceClient::LoadGenerator {
1122                generator: &load_generator,
1123            };
1124            retrieved_source_references = reference_client.get_source_references().await?;
1125            // Filter to the references that need to be created as 'subsources', which
1126            // doesn't include the default output for single-output sources.
1127            // TODO(database-issues#8620): Remove once subsources are removed
1128            let multi_output_sources =
1129                retrieved_source_references
1130                    .all_references()
1131                    .iter()
1132                    .any(|r| {
1133                        r.load_generator_output().expect("is loadgen")
1134                            != &LoadGeneratorOutput::Default
1135                    });
1136
1137            match external_references {
1138                Some(requested)
1139                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1140                {
1141                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1142                }
1143                Some(requested) if !multi_output_sources => match requested {
1144                    ExternalReferences::SubsetTables(_) => {
1145                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1146                    }
1147                    ExternalReferences::SubsetSchemas(_) => {
1148                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1149                    }
1150                    ExternalReferences::All => {
1151                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1152                    }
1153                },
1154                Some(requested) => {
1155                    let requested_exports = retrieved_source_references
1156                        .requested_source_exports(Some(requested), source_name)?;
1157                    for export in requested_exports {
1158                        requested_subsource_map.insert(
1159                            export.name,
1160                            PurifiedSourceExport {
1161                                external_reference: export.external_reference,
1162                                details: PurifiedExportDetails::LoadGenerator {
1163                                    table: export
1164                                        .meta
1165                                        .load_generator_desc()
1166                                        .expect("is loadgen")
1167                                        .clone(),
1168                                    output: export
1169                                        .meta
1170                                        .load_generator_output()
1171                                        .expect("is loadgen")
1172                                        .clone(),
1173                                },
1174                            },
1175                        );
1176                    }
1177                }
1178                None => {
1179                    if multi_output_sources
1180                        && matches!(reference_policy, SourceReferencePolicy::Required)
1181                    {
1182                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1183                    }
1184                }
1185            }
1186
1187            if let LoadGenerator::Clock = generator {
1188                if !options
1189                    .iter()
1190                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1191                {
1192                    let now = catalog.now();
1193                    options.push(LoadGeneratorOption {
1194                        name: LoadGeneratorOptionName::AsOf,
1195                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1196                    });
1197                }
1198            }
1199        }
1200    }
1201
1202    // Now that we know which subsources to create alongside this
1203    // statement, remove the references so it is not canonicalized as
1204    // part of the `CREATE SOURCE` statement in the catalog.
1205    *external_references = None;
1206
1207    // Generate progress subsource for old syntax
1208    let create_progress_subsource_stmt = if uses_old_syntax {
1209        // Take name from input or generate name
1210        let name = match progress_subsource {
1211            Some(name) => match name {
1212                DeferredItemName::Deferred(name) => name.clone(),
1213                DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1214            },
1215            None => {
1216                let (item, prefix) = source_name.0.split_last().unwrap();
1217                let item_name =
1218                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1219                        let mut suggested_name = prefix.to_vec();
1220                        suggested_name.push(candidate.clone());
1221
1222                        let partial =
1223                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1224                        let qualified = scx.allocate_qualified_name(partial)?;
1225                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1226                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1227                        Ok::<_, PlanError>(!item_exists && !type_exists)
1228                    })?;
1229
1230                let mut full_name = prefix.to_vec();
1231                full_name.push(item_name);
1232                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1233                let qualified_name = scx.allocate_qualified_name(full_name)?;
1234                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1235
1236                UnresolvedItemName::from(full_name.clone())
1237            }
1238        };
1239
1240        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1241
1242        // Create the subsource statement
1243        let mut progress_with_options: Vec<_> = with_options
1244            .iter()
1245            .filter_map(|opt| match opt.name {
1246                CreateSourceOptionName::TimestampInterval => None,
1247                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1248                    name: CreateSubsourceOptionName::RetainHistory,
1249                    value: opt.value.clone(),
1250                }),
1251            })
1252            .collect();
1253        progress_with_options.push(CreateSubsourceOption {
1254            name: CreateSubsourceOptionName::Progress,
1255            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1256        });
1257
1258        Some(CreateSubsourceStatement {
1259            name,
1260            columns,
1261            // Progress subsources do not refer to the source to which they belong.
1262            // Instead the primary source depends on it (the opposite is true of
1263            // ingestion exports, which depend on the primary source).
1264            of_source: None,
1265            constraints,
1266            if_not_exists: false,
1267            with_options: progress_with_options,
1268        })
1269    } else {
1270        None
1271    };
1272
1273    purify_source_format(
1274        &catalog,
1275        format,
1276        &format_options,
1277        envelope,
1278        storage_configuration,
1279    )
1280    .await?;
1281
1282    Ok(PurifiedStatement::PurifiedCreateSource {
1283        create_progress_subsource_stmt,
1284        create_source_stmt,
1285        subsources: requested_subsource_map,
1286        available_source_references: retrieved_source_references.available_source_references(),
1287    })
1288}
1289
1290/// On success, returns the details on new subsources and updated
1291/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1292async fn purify_alter_source(
1293    catalog: impl SessionCatalog,
1294    stmt: AlterSourceStatement<Aug>,
1295    storage_configuration: &StorageConfiguration,
1296) -> Result<PurifiedStatement, PlanError> {
1297    let scx = StatementContext::new(None, &catalog);
1298    let AlterSourceStatement {
1299        source_name: unresolved_source_name,
1300        action,
1301        if_exists,
1302    } = stmt;
1303
1304    // Get name.
1305    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1306        Ok(item) => item,
1307        Err(_) if if_exists => {
1308            return Ok(PurifiedStatement::PurifiedAlterSource {
1309                alter_source_stmt: AlterSourceStatement {
1310                    source_name: unresolved_source_name,
1311                    action,
1312                    if_exists,
1313                },
1314            });
1315        }
1316        Err(e) => return Err(e),
1317    };
1318
1319    // Ensure it's an ingestion-based and alterable source.
1320    let desc = match item.source_desc()? {
1321        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1322        None => {
1323            sql_bail!("cannot ALTER this type of source")
1324        }
1325    };
1326
1327    let source_name = item.name();
1328
1329    let resolved_source_name = ResolvedItemName::Item {
1330        id: item.id(),
1331        qualifiers: item.name().qualifiers.clone(),
1332        full_name: scx.catalog.resolve_full_name(source_name),
1333        print_id: true,
1334        version: RelationVersionSelector::Latest,
1335    };
1336
1337    let partial_name = scx.catalog.minimal_qualification(source_name);
1338
1339    match action {
1340        AlterSourceAction::AddSubsources {
1341            external_references,
1342            options,
1343        } => {
1344            if scx.catalog.system_vars().enable_create_table_from_source()
1345                && scx.catalog.system_vars().force_source_table_syntax()
1346            {
1347                Err(PlanError::UseTablesForSources(
1348                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1349                ))?;
1350            }
1351
1352            purify_alter_source_add_subsources(
1353                external_references,
1354                options,
1355                desc,
1356                partial_name,
1357                unresolved_source_name,
1358                resolved_source_name,
1359                storage_configuration,
1360            )
1361            .await
1362        }
1363        AlterSourceAction::RefreshReferences => {
1364            purify_alter_source_refresh_references(
1365                desc,
1366                resolved_source_name,
1367                storage_configuration,
1368            )
1369            .await
1370        }
1371        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1372            alter_source_stmt: AlterSourceStatement {
1373                source_name: unresolved_source_name,
1374                action,
1375                if_exists,
1376            },
1377        }),
1378    }
1379}
1380
1381// TODO(database-issues#8620): Remove once subsources are removed
1382/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1383async fn purify_alter_source_add_subsources(
1384    external_references: Vec<ExternalReferenceExport>,
1385    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1386    desc: SourceDesc,
1387    partial_source_name: PartialItemName,
1388    unresolved_source_name: UnresolvedItemName,
1389    resolved_source_name: ResolvedItemName,
1390    storage_configuration: &StorageConfiguration,
1391) -> Result<PurifiedStatement, PlanError> {
1392    // Validate this is a source that can have subsources added.
1393    let connection_id = match &desc.connection {
1394        GenericSourceConnection::Postgres(c) => c.connection_id,
1395        GenericSourceConnection::MySql(c) => c.connection_id,
1396        GenericSourceConnection::SqlServer(c) => c.connection_id,
1397        _ => sql_bail!(
1398            "source {} does not support ALTER SOURCE.",
1399            partial_source_name
1400        ),
1401    };
1402
1403    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1404        text_columns,
1405        exclude_columns,
1406        details,
1407        seen: _,
1408    } = options.clone().try_into()?;
1409    assert_none!(details, "details cannot be explicitly set");
1410
1411    let mut requested_subsource_map = BTreeMap::new();
1412
1413    match desc.connection {
1414        GenericSourceConnection::Postgres(pg_source_connection) => {
1415            // Get PostgresConnection for generating subsources.
1416            let pg_connection = &pg_source_connection.connection;
1417
1418            let client = pg_connection
1419                .validate(connection_id, storage_configuration)
1420                .await?;
1421
1422            let reference_client = SourceReferenceClient::Postgres {
1423                client: &client,
1424                publication: &pg_source_connection.publication,
1425                database: &pg_connection.database,
1426            };
1427            let retrieved_source_references = reference_client.get_source_references().await?;
1428
1429            let postgres::PurifiedSourceExports {
1430                source_exports: subsources,
1431                normalized_text_columns,
1432            } = postgres::purify_source_exports(
1433                &client,
1434                &retrieved_source_references,
1435                &Some(ExternalReferences::SubsetTables(external_references)),
1436                text_columns,
1437                exclude_columns,
1438                &unresolved_source_name,
1439                &SourceReferencePolicy::Required,
1440            )
1441            .await?;
1442
1443            if let Some(text_cols_option) = options
1444                .iter_mut()
1445                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1446            {
1447                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1448            }
1449
1450            requested_subsource_map.extend(subsources);
1451        }
1452        GenericSourceConnection::MySql(mysql_source_connection) => {
1453            let mysql_connection = &mysql_source_connection.connection;
1454            let config = mysql_connection
1455                .config(
1456                    &storage_configuration.connection_context.secrets_reader,
1457                    storage_configuration,
1458                    InTask::No,
1459                )
1460                .await?;
1461
1462            let mut conn = config
1463                .connect(
1464                    "mysql purification",
1465                    &storage_configuration.connection_context.ssh_tunnel_manager,
1466                )
1467                .await?;
1468
1469            // Retrieve the current @gtid_executed value of the server to mark as the effective
1470            // initial snapshot point for these subsources.
1471            let initial_gtid_set =
1472                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1473
1474            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1475
1476            let reference_client = SourceReferenceClient::MySql {
1477                conn: &mut conn,
1478                include_system_schemas: mysql::references_system_schemas(&requested_references),
1479            };
1480            let retrieved_source_references = reference_client.get_source_references().await?;
1481
1482            let mysql::PurifiedSourceExports {
1483                source_exports: subsources,
1484                normalized_text_columns,
1485                normalized_exclude_columns,
1486            } = mysql::purify_source_exports(
1487                &mut conn,
1488                &retrieved_source_references,
1489                &requested_references,
1490                text_columns,
1491                exclude_columns,
1492                &unresolved_source_name,
1493                initial_gtid_set,
1494                &SourceReferencePolicy::Required,
1495            )
1496            .await?;
1497            requested_subsource_map.extend(subsources);
1498
1499            // Update options with the purified details
1500            if let Some(text_cols_option) = options
1501                .iter_mut()
1502                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1503            {
1504                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1505            }
1506            if let Some(exclude_cols_option) = options
1507                .iter_mut()
1508                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1509            {
1510                exclude_cols_option.value =
1511                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1512            }
1513        }
1514        GenericSourceConnection::SqlServer(sql_server_source) => {
1515            // Open a connection to the upstream SQL Server instance.
1516            let sql_server_connection = &sql_server_source.connection;
1517            let config = sql_server_connection
1518                .resolve_config(
1519                    &storage_configuration.connection_context.secrets_reader,
1520                    storage_configuration,
1521                    InTask::No,
1522                )
1523                .await?;
1524            let mut client = mz_sql_server_util::Client::connect(config).await?;
1525
1526            // Query the upstream SQL Server instance for available tables to replicate.
1527            let database = sql_server_connection.database.clone().into();
1528            let source_references = SourceReferenceClient::SqlServer {
1529                client: &mut client,
1530                database: Arc::clone(&database),
1531            }
1532            .get_source_references()
1533            .await?;
1534            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1535
1536            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1537                .get(storage_configuration.config_set());
1538
1539            let result = sql_server::purify_source_exports(
1540                &*database,
1541                &mut client,
1542                &source_references,
1543                &requested_references,
1544                &text_columns,
1545                &exclude_columns,
1546                &unresolved_source_name,
1547                timeout,
1548                &SourceReferencePolicy::Required,
1549            )
1550            .await;
1551            let sql_server::PurifiedSourceExports {
1552                source_exports,
1553                normalized_text_columns,
1554                normalized_excl_columns,
1555            } = result?;
1556
1557            // Add the new exports to our subsource map.
1558            requested_subsource_map.extend(source_exports);
1559
1560            // Update options on the CREATE SOURCE statement with the purified details.
1561            if let Some(text_cols_option) = options
1562                .iter_mut()
1563                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1564            {
1565                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1566            }
1567            if let Some(exclude_cols_option) = options
1568                .iter_mut()
1569                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1570            {
1571                exclude_cols_option.value =
1572                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1573            }
1574        }
1575        _ => unreachable!(),
1576    };
1577
1578    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1579        source_name: resolved_source_name,
1580        options,
1581        subsources: requested_subsource_map,
1582    })
1583}
1584
1585async fn purify_alter_source_refresh_references(
1586    desc: SourceDesc,
1587    resolved_source_name: ResolvedItemName,
1588    storage_configuration: &StorageConfiguration,
1589) -> Result<PurifiedStatement, PlanError> {
1590    let retrieved_source_references = match desc.connection {
1591        GenericSourceConnection::Postgres(pg_source_connection) => {
1592            // Get PostgresConnection for generating subsources.
1593            let pg_connection = &pg_source_connection.connection;
1594
1595            let config = pg_connection
1596                .config(
1597                    &storage_configuration.connection_context.secrets_reader,
1598                    storage_configuration,
1599                    InTask::No,
1600                )
1601                .await?;
1602
1603            let client = config
1604                .connect(
1605                    "postgres_purification",
1606                    &storage_configuration.connection_context.ssh_tunnel_manager,
1607                )
1608                .await?;
1609            let reference_client = SourceReferenceClient::Postgres {
1610                client: &client,
1611                publication: &pg_source_connection.publication,
1612                database: &pg_connection.database,
1613            };
1614            reference_client.get_source_references().await?
1615        }
1616        GenericSourceConnection::MySql(mysql_source_connection) => {
1617            let mysql_connection = &mysql_source_connection.connection;
1618            let config = mysql_connection
1619                .config(
1620                    &storage_configuration.connection_context.secrets_reader,
1621                    storage_configuration,
1622                    InTask::No,
1623                )
1624                .await?;
1625
1626            let mut conn = config
1627                .connect(
1628                    "mysql purification",
1629                    &storage_configuration.connection_context.ssh_tunnel_manager,
1630                )
1631                .await?;
1632
1633            let reference_client = SourceReferenceClient::MySql {
1634                conn: &mut conn,
1635                include_system_schemas: false,
1636            };
1637            reference_client.get_source_references().await?
1638        }
1639        GenericSourceConnection::SqlServer(sql_server_source) => {
1640            // Open a connection to the upstream SQL Server instance.
1641            let sql_server_connection = &sql_server_source.connection;
1642            let config = sql_server_connection
1643                .resolve_config(
1644                    &storage_configuration.connection_context.secrets_reader,
1645                    storage_configuration,
1646                    InTask::No,
1647                )
1648                .await?;
1649            let mut client = mz_sql_server_util::Client::connect(config).await?;
1650
1651            // Query the upstream SQL Server instance for available tables to replicate.
1652            let source_references = SourceReferenceClient::SqlServer {
1653                client: &mut client,
1654                database: sql_server_connection.database.clone().into(),
1655            }
1656            .get_source_references()
1657            .await?;
1658            source_references
1659        }
1660        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1661            let reference_client = SourceReferenceClient::LoadGenerator {
1662                generator: &load_gen_connection.load_generator,
1663            };
1664            reference_client.get_source_references().await?
1665        }
1666        GenericSourceConnection::Kafka(kafka_conn) => {
1667            let reference_client = SourceReferenceClient::Kafka {
1668                topic: &kafka_conn.topic,
1669            };
1670            reference_client.get_source_references().await?
1671        }
1672    };
1673    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1674        source_name: resolved_source_name,
1675        available_source_references: retrieved_source_references.available_source_references(),
1676    })
1677}
1678
1679async fn purify_create_table_from_source(
1680    catalog: impl SessionCatalog,
1681    mut stmt: CreateTableFromSourceStatement<Aug>,
1682    storage_configuration: &StorageConfiguration,
1683) -> Result<PurifiedStatement, PlanError> {
1684    let scx = StatementContext::new(None, &catalog);
1685    let CreateTableFromSourceStatement {
1686        name: _,
1687        columns,
1688        constraints,
1689        source: source_name,
1690        if_not_exists: _,
1691        external_reference,
1692        format,
1693        envelope,
1694        include_metadata: _,
1695        with_options,
1696    } = &mut stmt;
1697
1698    // Columns and constraints cannot be specified by the user but will be populated below.
1699    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1700        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1701    }
1702    if !constraints.is_empty() {
1703        sql_bail!(
1704            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1705        );
1706    }
1707
1708    // Get the source item
1709    let item = match scx.get_item_by_resolved_name(source_name) {
1710        Ok(item) => item,
1711        Err(e) => return Err(e),
1712    };
1713
1714    // Ensure it's an ingestion-based and alterable source.
1715    let desc = match item.source_desc()? {
1716        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1717        None => {
1718            sql_bail!("cannot ALTER this type of source")
1719        }
1720    };
1721    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1722
1723    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1724        text_columns,
1725        exclude_columns,
1726        retain_history: _,
1727        details,
1728        partition_by: _,
1729        seen: _,
1730    } = with_options.clone().try_into()?;
1731    assert_none!(details, "details cannot be explicitly set");
1732
1733    // Our text column values are unqualified (just column names), but the purification methods below
1734    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1735    // need to qualify them using the external reference first.
1736    let qualified_text_columns = text_columns
1737        .iter()
1738        .map(|col| {
1739            UnresolvedItemName(
1740                external_reference
1741                    .as_ref()
1742                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1743                    .unwrap_or_else(|| vec![col.clone()]),
1744            )
1745        })
1746        .collect_vec();
1747    let qualified_exclude_columns = exclude_columns
1748        .iter()
1749        .map(|col| {
1750            UnresolvedItemName(
1751                external_reference
1752                    .as_ref()
1753                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1754                    .unwrap_or_else(|| vec![col.clone()]),
1755            )
1756        })
1757        .collect_vec();
1758
1759    // Should be overriden below if a source-specific format is required.
1760    let mut format_options = SourceFormatOptions::Default;
1761
1762    let retrieved_source_references: RetrievedSourceReferences;
1763
1764    let requested_references = external_reference.as_ref().map(|ref_name| {
1765        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1766            reference: ref_name.clone(),
1767            alias: None,
1768        }])
1769    });
1770
1771    // Run purification work specific to each source type: resolve the external reference to
1772    // a fully qualified name and obtain the appropriate details for the source-export statement
1773    let purified_export = match desc.connection {
1774        GenericSourceConnection::Postgres(pg_source_connection) => {
1775            // Get PostgresConnection for generating subsources.
1776            let pg_connection = &pg_source_connection.connection;
1777
1778            let client = pg_connection
1779                .validate(pg_source_connection.connection_id, storage_configuration)
1780                .await?;
1781
1782            let reference_client = SourceReferenceClient::Postgres {
1783                client: &client,
1784                publication: &pg_source_connection.publication,
1785                database: &pg_connection.database,
1786            };
1787            retrieved_source_references = reference_client.get_source_references().await?;
1788
1789            let postgres::PurifiedSourceExports {
1790                source_exports,
1791                // TODO(database-issues#8620): Remove once subsources are removed
1792                // This `normalized_text_columns` is not relevant for us and is only returned for
1793                // `CREATE SOURCE` statements that automatically generate subsources
1794                normalized_text_columns: _,
1795            } = postgres::purify_source_exports(
1796                &client,
1797                &retrieved_source_references,
1798                &requested_references,
1799                qualified_text_columns,
1800                qualified_exclude_columns,
1801                &unresolved_source_name,
1802                &SourceReferencePolicy::Required,
1803            )
1804            .await?;
1805            // There should be exactly one source_export returned for this statement
1806            let (_, purified_export) = source_exports.into_element();
1807            purified_export
1808        }
1809        GenericSourceConnection::MySql(mysql_source_connection) => {
1810            let mysql_connection = &mysql_source_connection.connection;
1811            let config = mysql_connection
1812                .config(
1813                    &storage_configuration.connection_context.secrets_reader,
1814                    storage_configuration,
1815                    InTask::No,
1816                )
1817                .await?;
1818
1819            let mut conn = config
1820                .connect(
1821                    "mysql purification",
1822                    &storage_configuration.connection_context.ssh_tunnel_manager,
1823                )
1824                .await?;
1825
1826            // Retrieve the current @gtid_executed value of the server to mark as the effective
1827            // initial snapshot point for this table.
1828            let initial_gtid_set =
1829                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1830
1831            let reference_client = SourceReferenceClient::MySql {
1832                conn: &mut conn,
1833                include_system_schemas: mysql::references_system_schemas(&requested_references),
1834            };
1835            retrieved_source_references = reference_client.get_source_references().await?;
1836
1837            let mysql::PurifiedSourceExports {
1838                source_exports,
1839                // TODO(database-issues#8620): Remove once subsources are removed
1840                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1841                // `CREATE SOURCE` statements that automatically generate subsources
1842                normalized_text_columns: _,
1843                normalized_exclude_columns: _,
1844            } = mysql::purify_source_exports(
1845                &mut conn,
1846                &retrieved_source_references,
1847                &requested_references,
1848                qualified_text_columns,
1849                qualified_exclude_columns,
1850                &unresolved_source_name,
1851                initial_gtid_set,
1852                &SourceReferencePolicy::Required,
1853            )
1854            .await?;
1855            // There should be exactly one source_export returned for this statement
1856            let (_, purified_export) = source_exports.into_element();
1857            purified_export
1858        }
1859        GenericSourceConnection::SqlServer(sql_server_source) => {
1860            let connection = sql_server_source.connection;
1861            let config = connection
1862                .resolve_config(
1863                    &storage_configuration.connection_context.secrets_reader,
1864                    storage_configuration,
1865                    InTask::No,
1866                )
1867                .await?;
1868            let mut client = mz_sql_server_util::Client::connect(config).await?;
1869
1870            let database: Arc<str> = connection.database.into();
1871            let reference_client = SourceReferenceClient::SqlServer {
1872                client: &mut client,
1873                database: Arc::clone(&database),
1874            };
1875            retrieved_source_references = reference_client.get_source_references().await?;
1876            tracing::debug!(?retrieved_source_references, "got source references");
1877
1878            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1879                .get(storage_configuration.config_set());
1880
1881            let purified_source_exports = sql_server::purify_source_exports(
1882                &*database,
1883                &mut client,
1884                &retrieved_source_references,
1885                &requested_references,
1886                &qualified_text_columns,
1887                &qualified_exclude_columns,
1888                &unresolved_source_name,
1889                timeout,
1890                &SourceReferencePolicy::Required,
1891            )
1892            .await?;
1893
1894            // There should be exactly one source_export returned for this statement
1895            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1896            purified_export
1897        }
1898        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1899            let reference_client = SourceReferenceClient::LoadGenerator {
1900                generator: &load_gen_connection.load_generator,
1901            };
1902            retrieved_source_references = reference_client.get_source_references().await?;
1903
1904            let requested_exports = retrieved_source_references
1905                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1906            // There should be exactly one source_export returned
1907            let export = requested_exports.into_element();
1908            PurifiedSourceExport {
1909                external_reference: export.external_reference,
1910                details: PurifiedExportDetails::LoadGenerator {
1911                    table: export
1912                        .meta
1913                        .load_generator_desc()
1914                        .expect("is loadgen")
1915                        .clone(),
1916                    output: export
1917                        .meta
1918                        .load_generator_output()
1919                        .expect("is loadgen")
1920                        .clone(),
1921                },
1922            }
1923        }
1924        GenericSourceConnection::Kafka(kafka_conn) => {
1925            let reference_client = SourceReferenceClient::Kafka {
1926                topic: &kafka_conn.topic,
1927            };
1928            retrieved_source_references = reference_client.get_source_references().await?;
1929            let requested_exports = retrieved_source_references
1930                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1931            // There should be exactly one source_export returned
1932            let export = requested_exports.into_element();
1933
1934            format_options = SourceFormatOptions::Kafka {
1935                topic: kafka_conn.topic.clone(),
1936            };
1937            PurifiedSourceExport {
1938                external_reference: export.external_reference,
1939                details: PurifiedExportDetails::Kafka {},
1940            }
1941        }
1942    };
1943
1944    purify_source_format(
1945        &catalog,
1946        format,
1947        &format_options,
1948        envelope,
1949        storage_configuration,
1950    )
1951    .await?;
1952
1953    // Update the external reference in the statement to the resolved fully-qualified
1954    // external reference
1955    *external_reference = Some(purified_export.external_reference.clone());
1956
1957    // Update options in the statement using the purified export details
1958    match &purified_export.details {
1959        PurifiedExportDetails::Postgres { .. } => {
1960            let mut unsupported_cols = vec![];
1961            let postgres::PostgresExportStatementValues {
1962                columns: gen_columns,
1963                constraints: gen_constraints,
1964                text_columns: gen_text_columns,
1965                exclude_columns: gen_exclude_columns,
1966                details: gen_details,
1967                external_reference: _,
1968            } = postgres::generate_source_export_statement_values(
1969                &scx,
1970                purified_export,
1971                &mut unsupported_cols,
1972            )?;
1973            if !unsupported_cols.is_empty() {
1974                unsupported_cols.sort();
1975                Err(PgSourcePurificationError::UnrecognizedTypes {
1976                    cols: unsupported_cols,
1977                })?;
1978            }
1979
1980            if let Some(text_cols_option) = with_options
1981                .iter_mut()
1982                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1983            {
1984                match gen_text_columns {
1985                    Some(gen_text_columns) => {
1986                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1987                    }
1988                    None => soft_panic_or_log!(
1989                        "text_columns should be Some if text_cols_option is present"
1990                    ),
1991                }
1992            }
1993            if let Some(exclude_cols_option) = with_options
1994                .iter_mut()
1995                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
1996            {
1997                match gen_exclude_columns {
1998                    Some(gen_exclude_columns) => {
1999                        exclude_cols_option.value =
2000                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2001                    }
2002                    None => soft_panic_or_log!(
2003                        "exclude_columns should be Some if exclude_cols_option is present"
2004                    ),
2005                }
2006            }
2007            match columns {
2008                TableFromSourceColumns::Defined(_) => unreachable!(),
2009                TableFromSourceColumns::NotSpecified => {
2010                    *columns = TableFromSourceColumns::Defined(gen_columns);
2011                    *constraints = gen_constraints;
2012                }
2013                TableFromSourceColumns::Named(_) => {
2014                    sql_bail!("columns cannot be named for Postgres sources")
2015                }
2016            }
2017            with_options.push(TableFromSourceOption {
2018                name: TableFromSourceOptionName::Details,
2019                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2020                    gen_details.into_proto().encode_to_vec(),
2021                )))),
2022            })
2023        }
2024        PurifiedExportDetails::MySql { .. } => {
2025            let mysql::MySqlExportStatementValues {
2026                columns: gen_columns,
2027                constraints: gen_constraints,
2028                text_columns: gen_text_columns,
2029                exclude_columns: gen_exclude_columns,
2030                details: gen_details,
2031                external_reference: _,
2032            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2033
2034            if let Some(text_cols_option) = with_options
2035                .iter_mut()
2036                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2037            {
2038                match gen_text_columns {
2039                    Some(gen_text_columns) => {
2040                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2041                    }
2042                    None => soft_panic_or_log!(
2043                        "text_columns should be Some if text_cols_option is present"
2044                    ),
2045                }
2046            }
2047            if let Some(exclude_cols_option) = with_options
2048                .iter_mut()
2049                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2050            {
2051                match gen_exclude_columns {
2052                    Some(gen_exclude_columns) => {
2053                        exclude_cols_option.value =
2054                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2055                    }
2056                    None => soft_panic_or_log!(
2057                        "exclude_columns should be Some if exclude_cols_option is present"
2058                    ),
2059                }
2060            }
2061            match columns {
2062                TableFromSourceColumns::Defined(_) => unreachable!(),
2063                TableFromSourceColumns::NotSpecified => {
2064                    *columns = TableFromSourceColumns::Defined(gen_columns);
2065                    *constraints = gen_constraints;
2066                }
2067                TableFromSourceColumns::Named(_) => {
2068                    sql_bail!("columns cannot be named for MySQL sources")
2069                }
2070            }
2071            with_options.push(TableFromSourceOption {
2072                name: TableFromSourceOptionName::Details,
2073                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2074                    gen_details.into_proto().encode_to_vec(),
2075                )))),
2076            })
2077        }
2078        PurifiedExportDetails::SqlServer { .. } => {
2079            let sql_server::SqlServerExportStatementValues {
2080                columns: gen_columns,
2081                constraints: gen_constraints,
2082                text_columns: gen_text_columns,
2083                excl_columns: gen_excl_columns,
2084                details: gen_details,
2085                external_reference: _,
2086            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2087
2088            if let Some(text_cols_option) = with_options
2089                .iter_mut()
2090                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2091            {
2092                match gen_text_columns {
2093                    Some(gen_text_columns) => {
2094                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2095                    }
2096                    None => soft_panic_or_log!(
2097                        "text_columns should be Some if text_cols_option is present"
2098                    ),
2099                }
2100            }
2101            if let Some(exclude_cols_option) = with_options
2102                .iter_mut()
2103                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2104            {
2105                match gen_excl_columns {
2106                    Some(gen_excl_columns) => {
2107                        exclude_cols_option.value =
2108                            Some(WithOptionValue::Sequence(gen_excl_columns))
2109                    }
2110                    None => soft_panic_or_log!(
2111                        "excl_columns should be Some if excl_cols_option is present"
2112                    ),
2113                }
2114            }
2115
2116            match columns {
2117                TableFromSourceColumns::NotSpecified => {
2118                    *columns = TableFromSourceColumns::Defined(gen_columns);
2119                    *constraints = gen_constraints;
2120                }
2121                TableFromSourceColumns::Named(_) => {
2122                    sql_bail!("columns cannot be named for SQL Server sources")
2123                }
2124                TableFromSourceColumns::Defined(_) => unreachable!(),
2125            }
2126
2127            with_options.push(TableFromSourceOption {
2128                name: TableFromSourceOptionName::Details,
2129                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2130                    gen_details.into_proto().encode_to_vec(),
2131                )))),
2132            })
2133        }
2134        PurifiedExportDetails::LoadGenerator { .. } => {
2135            let (desc, output) = match purified_export.details {
2136                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2137                _ => unreachable!("purified export details must be load generator"),
2138            };
2139            // We only determine the table description for multi-output load generator sources here,
2140            // whereas single-output load generators will have their relation description
2141            // determined during statment planning as envelope and format options may affect their
2142            // schema.
2143            if let Some(desc) = desc {
2144                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2145                match columns {
2146                    TableFromSourceColumns::Defined(_) => unreachable!(),
2147                    TableFromSourceColumns::NotSpecified => {
2148                        *columns = TableFromSourceColumns::Defined(gen_columns);
2149                        *constraints = gen_constraints;
2150                    }
2151                    TableFromSourceColumns::Named(_) => {
2152                        sql_bail!("columns cannot be named for multi-output load generator sources")
2153                    }
2154                }
2155            }
2156            let details = SourceExportStatementDetails::LoadGenerator { output };
2157            with_options.push(TableFromSourceOption {
2158                name: TableFromSourceOptionName::Details,
2159                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2160                    details.into_proto().encode_to_vec(),
2161                )))),
2162            })
2163        }
2164        PurifiedExportDetails::Kafka {} => {
2165            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2166            // format field, so we don't specify any columns or constraints to be stored
2167            // on the statement here. The RelationDesc will be determined during planning.
2168            let details = SourceExportStatementDetails::Kafka {};
2169            with_options.push(TableFromSourceOption {
2170                name: TableFromSourceOptionName::Details,
2171                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2172                    details.into_proto().encode_to_vec(),
2173                )))),
2174            })
2175        }
2176    };
2177
2178    // TODO: We might as well use the retrieved available references to update the source
2179    // available references table in the catalog, so plumb this through.
2180    // available_source_references: retrieved_source_references.available_source_references(),
2181    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2182}
2183
2184enum SourceFormatOptions {
2185    Default,
2186    Kafka { topic: String },
2187}
2188
2189async fn purify_source_format(
2190    catalog: &dyn SessionCatalog,
2191    format: &mut Option<FormatSpecifier<Aug>>,
2192    options: &SourceFormatOptions,
2193    envelope: &Option<SourceEnvelope>,
2194    storage_configuration: &StorageConfiguration,
2195) -> Result<(), PlanError> {
2196    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2197        && !matches!(options, SourceFormatOptions::Kafka { .. })
2198    {
2199        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2200    }
2201
2202    match format.as_mut() {
2203        None => {}
2204        Some(FormatSpecifier::Bare(format)) => {
2205            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2206                .await?;
2207        }
2208
2209        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2210            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2211                .await?;
2212            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2213                .await?;
2214        }
2215    }
2216    Ok(())
2217}
2218
2219async fn purify_source_format_single(
2220    catalog: &dyn SessionCatalog,
2221    format: &mut Format<Aug>,
2222    options: &SourceFormatOptions,
2223    envelope: &Option<SourceEnvelope>,
2224    storage_configuration: &StorageConfiguration,
2225) -> Result<(), PlanError> {
2226    match format {
2227        Format::Avro(schema) => match schema {
2228            AvroSchema::Csr { csr_connection } => {
2229                purify_csr_connection_avro(
2230                    catalog,
2231                    options,
2232                    csr_connection,
2233                    envelope,
2234                    storage_configuration,
2235                )
2236                .await?
2237            }
2238            AvroSchema::InlineSchema { .. } => {}
2239        },
2240        Format::Protobuf(schema) => match schema {
2241            ProtobufSchema::Csr { csr_connection } => {
2242                purify_csr_connection_proto(
2243                    catalog,
2244                    options,
2245                    csr_connection,
2246                    envelope,
2247                    storage_configuration,
2248                )
2249                .await?;
2250            }
2251            ProtobufSchema::InlineSchema { .. } => {}
2252        },
2253        Format::Bytes
2254        | Format::Regex(_)
2255        | Format::Json { .. }
2256        | Format::Text
2257        | Format::Csv { .. } => (),
2258    }
2259    Ok(())
2260}
2261
2262pub fn generate_subsource_statements(
2263    scx: &StatementContext,
2264    source_name: ResolvedItemName,
2265    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2266) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2267    // get the first subsource to determine the connection type
2268    if subsources.is_empty() {
2269        return Ok(vec![]);
2270    }
2271    let (_, purified_export) = subsources.iter().next().unwrap();
2272
2273    let statements = match &purified_export.details {
2274        PurifiedExportDetails::Postgres { .. } => {
2275            crate::pure::postgres::generate_create_subsource_statements(
2276                scx,
2277                source_name,
2278                subsources,
2279            )?
2280        }
2281        PurifiedExportDetails::MySql { .. } => {
2282            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2283        }
2284        PurifiedExportDetails::SqlServer { .. } => {
2285            crate::pure::sql_server::generate_create_subsource_statements(
2286                scx,
2287                source_name,
2288                subsources,
2289            )?
2290        }
2291        PurifiedExportDetails::LoadGenerator { .. } => {
2292            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2293            for (subsource_name, purified_export) in subsources {
2294                let (desc, output) = match purified_export.details {
2295                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2296                    _ => unreachable!("purified export details must be load generator"),
2297                };
2298                let desc =
2299                    desc.expect("subsources cannot be generated for single-output load generators");
2300
2301                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2302                let details = SourceExportStatementDetails::LoadGenerator { output };
2303                // Create the subsource statement
2304                let subsource = CreateSubsourceStatement {
2305                    name: subsource_name,
2306                    columns,
2307                    of_source: Some(source_name.clone()),
2308                    // unlike sources that come from an external upstream, we
2309                    // have more leniency to introduce different constraints
2310                    // every time the load generator is run; i.e. we are not as
2311                    // worried about introducing junk data.
2312                    constraints: table_constraints,
2313                    if_not_exists: false,
2314                    with_options: vec![
2315                        CreateSubsourceOption {
2316                            name: CreateSubsourceOptionName::ExternalReference,
2317                            value: Some(WithOptionValue::UnresolvedItemName(
2318                                purified_export.external_reference,
2319                            )),
2320                        },
2321                        CreateSubsourceOption {
2322                            name: CreateSubsourceOptionName::Details,
2323                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2324                                details.into_proto().encode_to_vec(),
2325                            )))),
2326                        },
2327                    ],
2328                };
2329                subsource_stmts.push(subsource);
2330            }
2331
2332            subsource_stmts
2333        }
2334        PurifiedExportDetails::Kafka { .. } => {
2335            // TODO: as part of database-issues#8322, Kafka sources will begin
2336            // producing data––we'll need to understand the schema
2337            // of the output here.
2338            assert!(
2339                subsources.is_empty(),
2340                "Kafka sources do not produce data-bearing subsources"
2341            );
2342            vec![]
2343        }
2344    };
2345    Ok(statements)
2346}
2347
2348async fn purify_csr_connection_proto(
2349    catalog: &dyn SessionCatalog,
2350    options: &SourceFormatOptions,
2351    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2352    envelope: &Option<SourceEnvelope>,
2353    storage_configuration: &StorageConfiguration,
2354) -> Result<(), PlanError> {
2355    let SourceFormatOptions::Kafka { topic } = options else {
2356        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2357    };
2358
2359    let CsrConnectionProtobuf {
2360        seed,
2361        connection: CsrConnection {
2362            connection,
2363            options: _,
2364        },
2365    } = csr_connection;
2366    match seed {
2367        None => {
2368            let scx = StatementContext::new(None, &*catalog);
2369
2370            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2371                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2372                _ => sql_bail!("{} is not a schema registry connection", connection),
2373            };
2374
2375            let ccsr_client = ccsr_connection
2376                .connect(storage_configuration, InTask::No)
2377                .await
2378                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2379
2380            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2381            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2382                .await
2383                .ok();
2384
2385            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2386                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2387            }
2388
2389            *seed = Some(CsrSeedProtobuf { value, key });
2390        }
2391        Some(_) => (),
2392    }
2393
2394    Ok(())
2395}
2396
2397async fn purify_csr_connection_avro(
2398    catalog: &dyn SessionCatalog,
2399    options: &SourceFormatOptions,
2400    csr_connection: &mut CsrConnectionAvro<Aug>,
2401    envelope: &Option<SourceEnvelope>,
2402    storage_configuration: &StorageConfiguration,
2403) -> Result<(), PlanError> {
2404    let SourceFormatOptions::Kafka { topic } = options else {
2405        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2406    };
2407
2408    let CsrConnectionAvro {
2409        connection: CsrConnection { connection, .. },
2410        seed,
2411        key_strategy,
2412        value_strategy,
2413    } = csr_connection;
2414    if seed.is_none() {
2415        let scx = StatementContext::new(None, &*catalog);
2416        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2417            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2418            _ => sql_bail!("{} is not a schema registry connection", connection),
2419        };
2420        let ccsr_client = csr_connection
2421            .connect(storage_configuration, InTask::No)
2422            .await
2423            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2424
2425        let Schema {
2426            key_schema,
2427            value_schema,
2428        } = get_remote_csr_schema(
2429            &ccsr_client,
2430            key_strategy.clone().unwrap_or_default(),
2431            value_strategy.clone().unwrap_or_default(),
2432            topic,
2433        )
2434        .await?;
2435        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2436            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2437        }
2438
2439        *seed = Some(CsrSeedAvro {
2440            key_schema,
2441            value_schema,
2442        })
2443    }
2444
2445    Ok(())
2446}
2447
2448#[derive(Debug)]
2449pub struct Schema {
2450    pub key_schema: Option<String>,
2451    pub value_schema: String,
2452}
2453
2454async fn get_schema_with_strategy(
2455    client: &Client,
2456    strategy: ReaderSchemaSelectionStrategy,
2457    subject: &str,
2458) -> Result<Option<String>, PlanError> {
2459    match strategy {
2460        ReaderSchemaSelectionStrategy::Latest => {
2461            match client.get_schema_by_subject(subject).await {
2462                Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2463                Err(GetBySubjectError::SubjectNotFound)
2464                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2465                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2466                    schema_lookup: format!("subject {}", subject.quoted()),
2467                    cause: Arc::new(e),
2468                }),
2469            }
2470        }
2471        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2472        ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2473            Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2474            Err(GetByIdError::SchemaNotFound) => Ok(None),
2475            Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2476                schema_lookup: format!("ID {}", id),
2477                cause: Arc::new(e),
2478            }),
2479        },
2480    }
2481}
2482
2483async fn get_remote_csr_schema(
2484    ccsr_client: &mz_ccsr::Client,
2485    key_strategy: ReaderSchemaSelectionStrategy,
2486    value_strategy: ReaderSchemaSelectionStrategy,
2487    topic: &str,
2488) -> Result<Schema, PlanError> {
2489    let value_schema_name = format!("{}-value", topic);
2490    let value_schema =
2491        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2492    let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2493    let subject = format!("{}-key", topic);
2494    let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2495    Ok(Schema {
2496        key_schema,
2497        value_schema,
2498    })
2499}
2500
2501/// Collect protobuf message descriptor from CSR and compile the descriptor.
2502async fn compile_proto(
2503    subject_name: &String,
2504    ccsr_client: &Client,
2505) -> Result<CsrSeedProtobufSchema, PlanError> {
2506    let (primary_subject, dependency_subjects) = ccsr_client
2507        .get_subject_and_references(subject_name)
2508        .await
2509        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2510            schema_lookup: format!("subject {}", subject_name.quoted()),
2511            cause: Arc::new(e),
2512        })?;
2513
2514    // Compile .proto files into a file descriptor set.
2515    let mut source_tree = VirtualSourceTree::new();
2516    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2517        source_tree.as_mut().add_file(
2518            Path::new(&subject.name),
2519            subject.schema.raw.as_bytes().to_vec(),
2520        );
2521    }
2522    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2523    let fds = db
2524        .as_mut()
2525        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2526        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2527
2528    // Ensure there is exactly one message in the file.
2529    let primary_fd = fds.file(0);
2530    let message_name = match primary_fd.message_type_size() {
2531        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2532        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2533        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2534    };
2535
2536    // Encode the file descriptor set into a SQL byte string.
2537    let bytes = &fds
2538        .serialize()
2539        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2540    let mut schema = String::new();
2541    strconv::format_bytes(&mut schema, bytes);
2542
2543    Ok(CsrSeedProtobufSchema {
2544        schema,
2545        message_name,
2546    })
2547}
2548
2549const MZ_NOW_NAME: &str = "mz_now";
2550const MZ_NOW_SCHEMA: &str = "mz_catalog";
2551
2552/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2553/// references to ids appear or disappear during the purification.
2554///
2555/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2556/// this function is not making any network calls.
2557pub fn purify_create_materialized_view_options(
2558    catalog: impl SessionCatalog,
2559    mz_now: Option<Timestamp>,
2560    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2561    resolved_ids: &mut ResolvedIds,
2562) {
2563    // 0. Preparations:
2564    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2565    let (mz_now_id, mz_now_expr) = {
2566        let item = catalog
2567            .resolve_function(&PartialItemName {
2568                database: None,
2569                schema: Some(MZ_NOW_SCHEMA.to_string()),
2570                item: MZ_NOW_NAME.to_string(),
2571            })
2572            .expect("we should be able to resolve mz_now");
2573        (
2574            item.id(),
2575            Expr::Function(Function {
2576                name: ResolvedItemName::Item {
2577                    id: item.id(),
2578                    qualifiers: item.name().qualifiers.clone(),
2579                    full_name: catalog.resolve_full_name(item.name()),
2580                    print_id: false,
2581                    version: RelationVersionSelector::Latest,
2582                },
2583                args: FunctionArgs::Args {
2584                    args: Vec::new(),
2585                    order_by: Vec::new(),
2586                },
2587                filter: None,
2588                over: None,
2589                distinct: false,
2590            }),
2591        )
2592    };
2593    // Prepare the `mz_timestamp` type.
2594    let (mz_timestamp_id, mz_timestamp_type) = {
2595        let item = catalog.get_system_type("mz_timestamp");
2596        let full_name = catalog.resolve_full_name(item.name());
2597        (
2598            item.id(),
2599            ResolvedDataType::Named {
2600                id: item.id(),
2601                qualifiers: item.name().qualifiers.clone(),
2602                full_name,
2603                modifiers: vec![],
2604                print_id: true,
2605            },
2606        )
2607    };
2608
2609    let mut introduced_mz_timestamp = false;
2610
2611    for option in cmvs.with_options.iter_mut() {
2612        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2613        if matches!(
2614            option.value,
2615            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2616        ) {
2617            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2618                RefreshAtOptionValue {
2619                    time: mz_now_expr.clone(),
2620                },
2621            )));
2622        }
2623
2624        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2625        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2626            RefreshEveryOptionValue { aligned_to, .. },
2627        ))) = &mut option.value
2628        {
2629            if aligned_to.is_none() {
2630                *aligned_to = Some(mz_now_expr.clone());
2631            }
2632        }
2633
2634        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2635        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2636        match &mut option.value {
2637            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2638                time,
2639            }))) => {
2640                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2641                visitor.visit_expr_mut(time);
2642                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2643            }
2644            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2645                RefreshEveryOptionValue {
2646                    interval: _,
2647                    aligned_to: Some(aligned_to),
2648                },
2649            ))) => {
2650                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2651                visitor.visit_expr_mut(aligned_to);
2652                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2653            }
2654            _ => {}
2655        }
2656    }
2657
2658    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2659    if !cmvs.with_options.iter().any(|o| {
2660        matches!(
2661            o,
2662            MaterializedViewOption {
2663                value: Some(WithOptionValue::Refresh(..)),
2664                ..
2665            }
2666        )
2667    }) {
2668        cmvs.with_options.push(MaterializedViewOption {
2669            name: MaterializedViewOptionName::Refresh,
2670            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2671        })
2672    }
2673
2674    // 5. Attend to `resolved_ids`: The purification might have
2675    // - added references to `mz_timestamp`;
2676    // - removed references to `mz_now`.
2677    if introduced_mz_timestamp {
2678        resolved_ids.add_item(mz_timestamp_id);
2679    }
2680    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2681    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2682    // for `mz_now()` everywhere.
2683    let mut visitor = ExprContainsTemporalVisitor::new();
2684    visitor.visit_create_materialized_view_statement(cmvs);
2685    if !visitor.contains_temporal {
2686        resolved_ids.remove_item(&mz_now_id);
2687    }
2688}
2689
2690/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2691/// after purification.
2692pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2693    match &mvo.value {
2694        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2695            let mut visitor = ExprContainsTemporalVisitor::new();
2696            visitor.visit_expr(time);
2697            visitor.contains_temporal
2698        }
2699        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2700            interval: _,
2701            aligned_to: Some(aligned_to),
2702        }))) => {
2703            let mut visitor = ExprContainsTemporalVisitor::new();
2704            visitor.visit_expr(aligned_to);
2705            visitor.contains_temporal
2706        }
2707        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2708            interval: _,
2709            aligned_to: None,
2710        }))) => {
2711            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2712            // `ALIGNED TO` to `mz_now()`.
2713            true
2714        }
2715        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2716            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2717            true
2718        }
2719        _ => false,
2720    }
2721}
2722
2723/// Determines whether the AST involves `mz_now()`.
2724struct ExprContainsTemporalVisitor {
2725    pub contains_temporal: bool,
2726}
2727
2728impl ExprContainsTemporalVisitor {
2729    pub fn new() -> ExprContainsTemporalVisitor {
2730        ExprContainsTemporalVisitor {
2731            contains_temporal: false,
2732        }
2733    }
2734}
2735
2736impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2737    fn visit_function(&mut self, func: &Function<Aug>) {
2738        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2739        visit_function(self, func);
2740    }
2741}
2742
2743struct MzNowPurifierVisitor {
2744    pub mz_now: Option<Timestamp>,
2745    pub mz_timestamp_type: ResolvedDataType,
2746    pub introduced_mz_timestamp: bool,
2747}
2748
2749impl MzNowPurifierVisitor {
2750    pub fn new(
2751        mz_now: Option<Timestamp>,
2752        mz_timestamp_type: ResolvedDataType,
2753    ) -> MzNowPurifierVisitor {
2754        MzNowPurifierVisitor {
2755            mz_now,
2756            mz_timestamp_type,
2757            introduced_mz_timestamp: false,
2758        }
2759    }
2760}
2761
2762impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2763    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2764        match expr {
2765            Expr::Function(Function {
2766                name:
2767                    ResolvedItemName::Item {
2768                        full_name: FullItemName { item, .. },
2769                        ..
2770                    },
2771                ..
2772            }) if item == &MZ_NOW_NAME.to_string() => {
2773                let mz_now = self.mz_now.expect(
2774                    "we should have chosen a timestamp if the expression contains mz_now()",
2775                );
2776                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2777                // not alter the type of the expression.
2778                *expr = Expr::Cast {
2779                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2780                    data_type: self.mz_timestamp_type.clone(),
2781                };
2782                self.introduced_mz_timestamp = true;
2783            }
2784            _ => visit_expr_mut(self, expr),
2785        }
2786    }
2787}