mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61    GenericSourceConnection, SourceConnection, SourceDesc, SourceExportStatementDetails,
62    SqlServerSourceExtras,
63};
64use prost::Message;
65use protobuf_native::MessageLite;
66use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
67use rdkafka::admin::AdminClient;
68use references::{RetrievedSourceReferences, SourceReferenceClient};
69use uuid::Uuid;
70
71use crate::ast::{
72    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
73    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
74    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
75};
76use crate::catalog::{CatalogItemType, SessionCatalog};
77use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
78use crate::names::{
79    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
80    ResolvedItemName,
81};
82use crate::plan::error::PlanError;
83use crate::plan::statement::ddl::load_generator_ast_to_generator;
84use crate::plan::{SourceReferences, StatementContext};
85use crate::pure::error::SqlServerSourcePurificationError;
86use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
87use crate::{kafka_util, normalize};
88
89use self::error::{
90    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
91    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
92};
93
94pub(crate) mod error;
95mod references;
96
97pub mod mysql;
98pub mod postgres;
99pub mod sql_server;
100
101pub(crate) struct RequestedSourceExport<T> {
102    external_reference: UnresolvedItemName,
103    name: UnresolvedItemName,
104    meta: T,
105}
106
107impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("RequestedSourceExport")
110            .field("external_reference", &self.external_reference)
111            .field("name", &self.name)
112            .field("meta", &self.meta)
113            .finish()
114    }
115}
116
117impl<T> RequestedSourceExport<T> {
118    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
119        RequestedSourceExport {
120            external_reference: self.external_reference,
121            name: self.name,
122            meta: new_meta,
123        }
124    }
125}
126
127/// Generates a subsource name by prepending source schema name if present
128///
129/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
130/// so that it's generated in the same schema as source
131fn source_export_name_gen(
132    source_name: &UnresolvedItemName,
133    subsource_name: &str,
134) -> Result<UnresolvedItemName, PlanError> {
135    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
136    partial.item = subsource_name.to_string();
137    Ok(UnresolvedItemName::from(partial))
138}
139
140// TODO(database-issues#8620): Remove once subsources are removed
141/// Validates the requested subsources do not have name conflicts with each other
142/// and that the same upstream table is not referenced multiple times.
143fn validate_source_export_names<T>(
144    requested_source_exports: &[RequestedSourceExport<T>],
145) -> Result<(), PlanError> {
146    // This condition would get caught during the catalog transaction, but produces a
147    // vague, non-contextual error. Instead, error here so we can suggest to the user
148    // how to fix the problem.
149    if let Some(name) = requested_source_exports
150        .iter()
151        .map(|subsource| &subsource.name)
152        .duplicates()
153        .next()
154        .cloned()
155    {
156        let mut upstream_references: Vec<_> = requested_source_exports
157            .into_iter()
158            .filter_map(|subsource| {
159                if &subsource.name == &name {
160                    Some(subsource.external_reference.clone())
161                } else {
162                    None
163                }
164            })
165            .collect();
166
167        upstream_references.sort();
168
169        Err(PlanError::SubsourceNameConflict {
170            name,
171            upstream_references,
172        })?;
173    }
174
175    // We disallow subsource statements from referencing the same upstream table, but we allow
176    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
177    // purification will only provide 1 `requested_source_export`, we can leave this here without
178    // needing to differentiate between the two types of statements.
179    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
180    if let Some(name) = requested_source_exports
181        .iter()
182        .map(|export| &export.external_reference)
183        .duplicates()
184        .next()
185        .cloned()
186    {
187        let mut target_names: Vec<_> = requested_source_exports
188            .into_iter()
189            .filter_map(|export| {
190                if &export.external_reference == &name {
191                    Some(export.name.clone())
192                } else {
193                    None
194                }
195            })
196            .collect();
197
198        target_names.sort();
199
200        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
201    }
202
203    Ok(())
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub enum PurifiedStatement {
208    PurifiedCreateSource {
209        create_progress_subsource_stmt: CreateSubsourceStatement<Aug>,
210        create_source_stmt: CreateSourceStatement<Aug>,
211        // Map of subsource names to external details
212        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213        /// All the available upstream references that can be added as tables
214        /// to this primary source.
215        available_source_references: SourceReferences,
216    },
217    PurifiedAlterSource {
218        alter_source_stmt: AlterSourceStatement<Aug>,
219    },
220    PurifiedAlterSourceAddSubsources {
221        // This just saves us an annoying catalog lookup
222        source_name: ResolvedItemName,
223        /// Options that we will need the values of to update the source's
224        /// definition.
225        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226        // Map of subsource names to external details
227        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228    },
229    PurifiedAlterSourceRefreshReferences {
230        source_name: ResolvedItemName,
231        /// The updated available upstream references for the primary source.
232        available_source_references: SourceReferences,
233    },
234    PurifiedCreateSink(CreateSinkStatement<Aug>),
235    PurifiedCreateTableFromSource {
236        stmt: CreateTableFromSourceStatement<Aug>,
237    },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242    pub external_reference: UnresolvedItemName,
243    pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248    MySql {
249        table: MySqlTableDesc,
250        text_columns: Option<Vec<Ident>>,
251        exclude_columns: Option<Vec<Ident>>,
252        initial_gtid_set: String,
253    },
254    Postgres {
255        table: PostgresTableDesc,
256        text_columns: Option<Vec<Ident>>,
257    },
258    SqlServer {
259        table: SqlServerTableDesc,
260        text_columns: Option<Vec<Ident>>,
261        excl_columns: Option<Vec<Ident>>,
262        capture_instance: Arc<str>,
263        initial_lsn: mz_sql_server_util::cdc::Lsn,
264    },
265    Kafka {},
266    LoadGenerator {
267        table: Option<RelationDesc>,
268        output: LoadGeneratorOutput,
269    },
270}
271
272/// Purifies a statement, removing any dependencies on external state.
273///
274/// See the section on [purification](crate#purification) in the crate
275/// documentation for details.
276///
277/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
278/// handled by [purify_create_materialized_view_options] instead.
279/// This could be made more consistent by a refactoring discussed here:
280/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
281pub async fn purify_statement(
282    catalog: impl SessionCatalog,
283    now: u64,
284    stmt: Statement<Aug>,
285    storage_configuration: &StorageConfiguration,
286) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
287    match stmt {
288        Statement::CreateSource(stmt) => {
289            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
290            (
291                purify_create_source(catalog, now, stmt, storage_configuration).await,
292                cluster_id,
293            )
294        }
295        Statement::AlterSource(stmt) => (
296            purify_alter_source(catalog, stmt, storage_configuration).await,
297            None,
298        ),
299        Statement::CreateSink(stmt) => {
300            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
301            (
302                purify_create_sink(catalog, stmt, storage_configuration).await,
303                cluster_id,
304            )
305        }
306        Statement::CreateTableFromSource(stmt) => (
307            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
308            None,
309        ),
310        o => unreachable!("{:?} does not need to be purified", o),
311    }
312}
313
314/// Injects `DOC ON` comments into all Avro formats that are using a schema
315/// registry by finding all SQL `COMMENT`s that are attached to the sink's
316/// underlying materialized view (as well as any types referenced by that
317/// underlying materialized view).
318pub(crate) fn purify_create_sink_avro_doc_on_options(
319    catalog: &dyn SessionCatalog,
320    from_id: CatalogItemId,
321    format: &mut Option<FormatSpecifier<Aug>>,
322) -> Result<(), PlanError> {
323    // Collect all objects referenced by the sink.
324    let from = catalog.get_item(&from_id);
325    let object_ids = from
326        .references()
327        .items()
328        .copied()
329        .chain_one(from.id())
330        .collect::<Vec<_>>();
331
332    // Collect all Avro formats that use a schema registry, as well as a set of
333    // all identifiers named in user-provided `DOC ON` options.
334    let mut avro_format_options = vec![];
335    for_each_format(format, |doc_on_schema, fmt| match fmt {
336        Format::Avro(AvroSchema::InlineSchema { .. })
337        | Format::Bytes
338        | Format::Csv { .. }
339        | Format::Json { .. }
340        | Format::Protobuf(..)
341        | Format::Regex(..)
342        | Format::Text => (),
343        Format::Avro(AvroSchema::Csr {
344            csr_connection: CsrConnectionAvro { connection, .. },
345        }) => {
346            avro_format_options.push((doc_on_schema, &mut connection.options));
347        }
348    });
349
350    // For each Avro format in the sink, inject the appropriate `DOC ON` options
351    // for each item referenced by the sink.
352    for (for_schema, options) in avro_format_options {
353        let user_provided_comments = options
354            .iter()
355            .filter_map(|CsrConfigOption { name, .. }| match name {
356                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
357                _ => None,
358            })
359            .collect::<BTreeSet<_>>();
360
361        // Adding existing comments if not already provided by user
362        for object_id in &object_ids {
363            // Always add comments to the latest version of the item.
364            let item = catalog
365                .get_item(object_id)
366                .at_version(RelationVersionSelector::Latest);
367            let full_name = catalog.resolve_full_name(item.name());
368            let full_resolved_name = ResolvedItemName::Item {
369                id: *object_id,
370                qualifiers: item.name().qualifiers.clone(),
371                full_name: full_name.clone(),
372                print_id: !matches!(
373                    item.item_type(),
374                    CatalogItemType::Func | CatalogItemType::Type
375                ),
376                version: RelationVersionSelector::Latest,
377            };
378
379            if let Some(comments_map) = catalog.get_item_comments(object_id) {
380                // Attach comment for the item itself, if the user has not
381                // already provided an overriding `DOC ON` option for the item.
382                let doc_on_item_key = AvroDocOn {
383                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384                    for_schema,
385                };
386                if !user_provided_comments.contains(&doc_on_item_key) {
387                    if let Some(root_comment) = comments_map.get(&None) {
388                        options.push(CsrConfigOption {
389                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391                                root_comment.clone(),
392                            ))),
393                        });
394                    }
395                }
396
397                // Attach comment for each column in the item, if the user has
398                // not already provided an overriding `DOC ON` option for the
399                // column.
400                if let Ok(desc) = item.desc(&full_name) {
401                    for (pos, column_name) in desc.iter_names().enumerate() {
402                        let comment = comments_map.get(&Some(pos + 1));
403                        if let Some(comment_str) = comment {
404                            let doc_on_column_key = AvroDocOn {
405                                identifier: DocOnIdentifier::Column(ColumnName {
406                                    relation: full_resolved_name.clone(),
407                                    column: ResolvedColumnReference::Column {
408                                        name: column_name.to_owned(),
409                                        index: pos,
410                                    },
411                                }),
412                                for_schema,
413                            };
414                            if !user_provided_comments.contains(&doc_on_column_key) {
415                                options.push(CsrConfigOption {
416                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
417                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
418                                        Value::String(comment_str.clone()),
419                                    )),
420                                });
421                            }
422                        }
423                    }
424                }
425            }
426        }
427    }
428
429    Ok(())
430}
431
432/// Checks that the sink described in the statement can connect to its external
433/// resources.
434///
435/// We must not leave any state behind in the Kafka broker, so just ensure that
436/// we can connect. This means we don't ensure that we can create the topic and
437/// introduces TOCTOU errors, but creating an inoperable sink is infinitely
438/// preferable to leaking state in users' environments.
439async fn purify_create_sink(
440    catalog: impl SessionCatalog,
441    mut create_sink_stmt: CreateSinkStatement<Aug>,
442    storage_configuration: &StorageConfiguration,
443) -> Result<PurifiedStatement, PlanError> {
444    // General purification
445    let CreateSinkStatement {
446        connection,
447        format,
448        with_options,
449        name: _,
450        in_cluster: _,
451        if_not_exists: _,
452        from,
453        envelope: _,
454    } = &mut create_sink_stmt;
455
456    // The list of options that the user is allowed to specify.
457    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
458
459    if let Some(op) = with_options
460        .iter()
461        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
462    {
463        sql_bail!(
464            "CREATE SINK...WITH ({}..) is not allowed",
465            op.name.to_ast_string_simple(),
466        )
467    }
468
469    match &connection {
470        CreateSinkConnection::Kafka {
471            connection,
472            options,
473            key: _,
474            headers: _,
475        } => {
476            let scx = StatementContext::new(None, &catalog);
477            let connection = {
478                let item = scx.get_item_by_resolved_name(connection)?;
479                // Get Kafka connection
480                match item.connection()? {
481                    Connection::Kafka(connection) => {
482                        connection.clone().into_inline_connection(scx.catalog)
483                    }
484                    _ => sql_bail!(
485                        "{} is not a kafka connection",
486                        scx.catalog.resolve_full_name(item.name())
487                    ),
488                }
489            };
490
491            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
492
493            if extracted_options.legacy_ids == Some(true) {
494                sql_bail!("LEGACY IDs option is not supported");
495            }
496
497            let client: AdminClient<_> = connection
498                .create_with_context(
499                    storage_configuration,
500                    MzClientContext::default(),
501                    &BTreeMap::new(),
502                    InTask::No,
503                )
504                .await
505                .map_err(|e| {
506                    // anyhow doesn't support Clone, so not trivial to move into PlanError
507                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
508                })?;
509
510            let metadata = client
511                .inner()
512                .fetch_metadata(
513                    None,
514                    storage_configuration
515                        .parameters
516                        .kafka_timeout_config
517                        .fetch_metadata_timeout,
518                )
519                .map_err(|e| {
520                    KafkaSinkPurificationError::AdminClientError(Arc::new(
521                        ContextCreationError::KafkaError(e),
522                    ))
523                })?;
524
525            if metadata.brokers().len() == 0 {
526                Err(KafkaSinkPurificationError::ZeroBrokers)?;
527            }
528        }
529    }
530
531    let mut csr_connection_ids = BTreeSet::new();
532    for_each_format(format, |_, fmt| match fmt {
533        Format::Avro(AvroSchema::InlineSchema { .. })
534        | Format::Bytes
535        | Format::Csv { .. }
536        | Format::Json { .. }
537        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
538        | Format::Regex(..)
539        | Format::Text => (),
540        Format::Avro(AvroSchema::Csr {
541            csr_connection: CsrConnectionAvro { connection, .. },
542        })
543        | Format::Protobuf(ProtobufSchema::Csr {
544            csr_connection: CsrConnectionProtobuf { connection, .. },
545        }) => {
546            csr_connection_ids.insert(*connection.connection.item_id());
547        }
548    });
549
550    let scx = StatementContext::new(None, &catalog);
551    for csr_connection_id in csr_connection_ids {
552        let connection = {
553            let item = scx.get_item(&csr_connection_id);
554            // Get Kafka connection
555            match item.connection()? {
556                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
557                _ => Err(CsrPurificationError::NotCsrConnection(
558                    scx.catalog.resolve_full_name(item.name()),
559                ))?,
560            }
561        };
562
563        let client = connection
564            .connect(storage_configuration, InTask::No)
565            .await
566            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
567
568        client
569            .list_subjects()
570            .await
571            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
572    }
573
574    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
575
576    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
577}
578
579/// Runs a function on each format within the format specifier.
580///
581/// The function is provided with a `DocOnSchema` that indicates whether the
582/// format is for the key, value, or both.
583///
584// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
585fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
586where
587    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
588{
589    match format {
590        None => (),
591        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
592        Some(FormatSpecifier::KeyValue { key, value }) => {
593            f(DocOnSchema::KeyOnly, key);
594            f(DocOnSchema::ValueOnly, value);
595        }
596    }
597}
598
599/// Defines whether purification should enforce that at least one valid source
600/// reference is provided on the provided statement.
601#[derive(Debug, Copy, Clone, PartialEq, Eq)]
602pub(crate) enum SourceReferencePolicy {
603    /// Don't allow source references to be provided. This is used for
604    /// enforcing that `CREATE SOURCE` statements don't create subsources.
605    NotAllowed,
606    /// Allow empty references, such as when creating a source that
607    /// will have tables added afterwards.
608    Optional,
609    /// Require that at least one reference is resolved to an upstream
610    /// object.
611    Required,
612}
613
614async fn purify_create_source(
615    catalog: impl SessionCatalog,
616    now: u64,
617    mut create_source_stmt: CreateSourceStatement<Aug>,
618    storage_configuration: &StorageConfiguration,
619) -> Result<PurifiedStatement, PlanError> {
620    let CreateSourceStatement {
621        name: source_name,
622        connection: source_connection,
623        format,
624        envelope,
625        include_metadata,
626        external_references,
627        progress_subsource,
628        with_options,
629        ..
630    } = &mut create_source_stmt;
631
632    if let Some(DeferredItemName::Named(_)) = progress_subsource {
633        sql_bail!("Cannot manually ID qualify progress subsource")
634    }
635
636    let mut requested_subsource_map = BTreeMap::new();
637
638    let progress_desc = match &source_connection {
639        CreateSourceConnection::Kafka { .. } => {
640            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
641        }
642        CreateSourceConnection::Postgres { .. } => {
643            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
644        }
645        CreateSourceConnection::SqlServer { .. } => {
646            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
647        }
648        CreateSourceConnection::MySql { .. } => {
649            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
650        }
651        CreateSourceConnection::LoadGenerator { .. } => {
652            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
653        }
654    };
655    let scx = StatementContext::new(None, &catalog);
656
657    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
658    // to add tables after this source is created we might need to enforce that
659    // auto-generated subsources are created or not created by this source statement.
660    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
661        && scx.catalog.system_vars().force_source_table_syntax()
662    {
663        SourceReferencePolicy::NotAllowed
664    } else if scx.catalog.system_vars().enable_create_table_from_source() {
665        SourceReferencePolicy::Optional
666    } else {
667        SourceReferencePolicy::Required
668    };
669
670    let mut format_options = SourceFormatOptions::Default;
671
672    let retrieved_source_references: RetrievedSourceReferences;
673
674    match source_connection {
675        CreateSourceConnection::Kafka {
676            connection,
677            options: base_with_options,
678            ..
679        } => {
680            if let Some(external_references) = external_references {
681                Err(KafkaSourcePurificationError::ReferencedSubsources(
682                    external_references.clone(),
683                ))?;
684            }
685
686            let connection = {
687                let item = scx.get_item_by_resolved_name(connection)?;
688                // Get Kafka connection
689                match item.connection()? {
690                    Connection::Kafka(connection) => {
691                        connection.clone().into_inline_connection(&catalog)
692                    }
693                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
694                        scx.catalog.resolve_full_name(item.name()),
695                    ))?,
696                }
697            };
698
699            let extracted_options: KafkaSourceConfigOptionExtracted =
700                base_with_options.clone().try_into()?;
701
702            let topic = extracted_options
703                .topic
704                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
705
706            let consumer = connection
707                .create_with_context(
708                    storage_configuration,
709                    MzClientContext::default(),
710                    &BTreeMap::new(),
711                    InTask::No,
712                )
713                .await
714                .map_err(|e| {
715                    // anyhow doesn't support Clone, so not trivial to move into PlanError
716                    KafkaSourcePurificationError::KafkaConsumerError(
717                        e.display_with_causes().to_string(),
718                    )
719                })?;
720            let consumer = Arc::new(consumer);
721
722            match (
723                extracted_options.start_offset,
724                extracted_options.start_timestamp,
725            ) {
726                (None, None) => {
727                    // Validate that the topic at least exists.
728                    kafka_util::ensure_topic_exists(
729                        Arc::clone(&consumer),
730                        &topic,
731                        storage_configuration
732                            .parameters
733                            .kafka_timeout_config
734                            .fetch_metadata_timeout,
735                    )
736                    .await?;
737                }
738                (Some(_), Some(_)) => {
739                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
740                }
741                (Some(start_offsets), None) => {
742                    // Validate the start offsets.
743                    kafka_util::validate_start_offsets(
744                        Arc::clone(&consumer),
745                        &topic,
746                        start_offsets,
747                        storage_configuration
748                            .parameters
749                            .kafka_timeout_config
750                            .fetch_metadata_timeout,
751                    )
752                    .await?;
753                }
754                (None, Some(time_offset)) => {
755                    // Translate `START TIMESTAMP` to a start offset.
756                    let start_offsets = kafka_util::lookup_start_offsets(
757                        Arc::clone(&consumer),
758                        &topic,
759                        time_offset,
760                        now,
761                        storage_configuration
762                            .parameters
763                            .kafka_timeout_config
764                            .fetch_metadata_timeout,
765                    )
766                    .await?;
767
768                    base_with_options.retain(|val| {
769                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
770                    });
771                    base_with_options.push(KafkaSourceConfigOption {
772                        name: KafkaSourceConfigOptionName::StartOffset,
773                        value: Some(WithOptionValue::Sequence(
774                            start_offsets
775                                .iter()
776                                .map(|offset| {
777                                    WithOptionValue::Value(Value::Number(offset.to_string()))
778                                })
779                                .collect(),
780                        )),
781                    });
782                }
783            }
784
785            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
786            retrieved_source_references = reference_client.get_source_references().await?;
787
788            format_options = SourceFormatOptions::Kafka { topic };
789        }
790        CreateSourceConnection::Postgres {
791            connection,
792            options,
793        } => {
794            let connection_item = scx.get_item_by_resolved_name(connection)?;
795            let connection = match connection_item.connection().map_err(PlanError::from)? {
796                Connection::Postgres(connection) => {
797                    connection.clone().into_inline_connection(&catalog)
798                }
799                _ => Err(PgSourcePurificationError::NotPgConnection(
800                    scx.catalog.resolve_full_name(connection_item.name()),
801                ))?,
802            };
803            let crate::plan::statement::PgConfigOptionExtracted {
804                publication,
805                text_columns,
806                details,
807                ..
808            } = options.clone().try_into()?;
809            let publication =
810                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
811
812            if details.is_some() {
813                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
814            }
815
816            let client = connection
817                .validate(connection_item.id(), storage_configuration)
818                .await?;
819
820            let reference_client = SourceReferenceClient::Postgres {
821                client: &client,
822                publication: &publication,
823                database: &connection.database,
824            };
825            retrieved_source_references = reference_client.get_source_references().await?;
826
827            let postgres::PurifiedSourceExports {
828                source_exports: subsources,
829                normalized_text_columns,
830            } = postgres::purify_source_exports(
831                &client,
832                &retrieved_source_references,
833                external_references,
834                text_columns,
835                source_name,
836                &reference_policy,
837            )
838            .await?;
839
840            if let Some(text_cols_option) = options
841                .iter_mut()
842                .find(|option| option.name == PgConfigOptionName::TextColumns)
843            {
844                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
845            }
846
847            requested_subsource_map.extend(subsources);
848
849            // Record the active replication timeline_id to allow detection of a future upstream
850            // point-in-time-recovery that will put the source into an error state.
851            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
852
853            // Remove any old detail references
854            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
855            let details = PostgresSourcePublicationDetails {
856                slot: format!(
857                    "materialize_{}",
858                    Uuid::new_v4().to_string().replace('-', "")
859                ),
860                timeline_id: Some(timeline_id),
861                database: connection.database,
862            };
863            options.push(PgConfigOption {
864                name: PgConfigOptionName::Details,
865                value: Some(WithOptionValue::Value(Value::String(hex::encode(
866                    details.into_proto().encode_to_vec(),
867                )))),
868            })
869        }
870        CreateSourceConnection::SqlServer {
871            connection,
872            options,
873        } => {
874            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
875
876            // Connect to the upstream SQL Server instance so we can validate
877            // we're compatible with CDC.
878            let connection_item = scx.get_item_by_resolved_name(connection)?;
879            let connection = match connection_item.connection()? {
880                Connection::SqlServer(connection) => {
881                    connection.clone().into_inline_connection(&catalog)
882                }
883                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
884                    scx.catalog.resolve_full_name(connection_item.name()),
885                ))?,
886            };
887            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
888                details,
889                text_columns,
890                exclude_columns,
891                seen: _,
892            } = options.clone().try_into()?;
893
894            if details.is_some() {
895                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
896            }
897
898            let mut client = connection
899                .validate(connection_item.id(), storage_configuration)
900                .await?;
901
902            let database: Arc<str> = connection.database.into();
903            let reference_client = SourceReferenceClient::SqlServer {
904                client: &mut client,
905                database: Arc::clone(&database),
906            };
907            retrieved_source_references = reference_client.get_source_references().await?;
908            tracing::debug!(?retrieved_source_references, "got source references");
909
910            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
911                .get(storage_configuration.config_set());
912
913            let purified_source_exports = sql_server::purify_source_exports(
914                &*database,
915                &mut client,
916                &retrieved_source_references,
917                external_references,
918                &text_columns,
919                &exclude_columns,
920                source_name,
921                timeout,
922                &reference_policy,
923            )
924            .await?;
925
926            let sql_server::PurifiedSourceExports {
927                source_exports,
928                normalized_text_columns,
929                normalized_excl_columns,
930            } = purified_source_exports;
931
932            // Update our set of requested source exports.
933            requested_subsource_map.extend(source_exports);
934
935            // Record the most recent restore_history_id, or none if the system has never been
936            // restored.
937
938            let restore_history_id =
939                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
940            let details = SqlServerSourceExtras { restore_history_id };
941
942            options.retain(|SqlServerConfigOption { name, .. }| {
943                name != &SqlServerConfigOptionName::Details
944            });
945            options.push(SqlServerConfigOption {
946                name: SqlServerConfigOptionName::Details,
947                value: Some(WithOptionValue::Value(Value::String(hex::encode(
948                    details.into_proto().encode_to_vec(),
949                )))),
950            });
951
952            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
953            if let Some(text_cols_option) = options
954                .iter_mut()
955                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
956            {
957                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
958            }
959            if let Some(excl_cols_option) = options
960                .iter_mut()
961                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
962            {
963                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
964            }
965        }
966        CreateSourceConnection::MySql {
967            connection,
968            options,
969        } => {
970            let connection_item = scx.get_item_by_resolved_name(connection)?;
971            let connection = match connection_item.connection()? {
972                Connection::MySql(connection) => {
973                    connection.clone().into_inline_connection(&catalog)
974                }
975                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
976                    scx.catalog.resolve_full_name(connection_item.name()),
977                ))?,
978            };
979            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
980                details,
981                text_columns,
982                exclude_columns,
983                seen: _,
984            } = options.clone().try_into()?;
985
986            if details.is_some() {
987                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
988            }
989
990            let mut conn = connection
991                .validate(connection_item.id(), storage_configuration)
992                .await
993                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
994
995            // Retrieve the current @gtid_executed value of the server to mark as the effective
996            // initial snapshot point such that we can ensure consistency if the initial source
997            // snapshot is broken up over multiple points in time.
998            let initial_gtid_set =
999                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1000
1001            let reference_client = SourceReferenceClient::MySql {
1002                conn: &mut conn,
1003                include_system_schemas: mysql::references_system_schemas(external_references),
1004            };
1005            retrieved_source_references = reference_client.get_source_references().await?;
1006
1007            let mysql::PurifiedSourceExports {
1008                source_exports: subsources,
1009                normalized_text_columns,
1010                normalized_exclude_columns,
1011            } = mysql::purify_source_exports(
1012                &mut conn,
1013                &retrieved_source_references,
1014                external_references,
1015                text_columns,
1016                exclude_columns,
1017                source_name,
1018                initial_gtid_set.clone(),
1019                &reference_policy,
1020            )
1021            .await?;
1022            requested_subsource_map.extend(subsources);
1023
1024            // We don't have any fields in this details struct but keep this around for
1025            // conformity with postgres and in-case we end up needing it again in the future.
1026            let details = MySqlSourceDetails {};
1027            // Update options with the purified details
1028            options
1029                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1030            options.push(MySqlConfigOption {
1031                name: MySqlConfigOptionName::Details,
1032                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1033                    details.into_proto().encode_to_vec(),
1034                )))),
1035            });
1036
1037            if let Some(text_cols_option) = options
1038                .iter_mut()
1039                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1040            {
1041                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1042            }
1043            if let Some(ignore_cols_option) = options
1044                .iter_mut()
1045                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1046            {
1047                ignore_cols_option.value =
1048                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1049            }
1050        }
1051        CreateSourceConnection::LoadGenerator { generator, options } => {
1052            let load_generator =
1053                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1054
1055            let reference_client = SourceReferenceClient::LoadGenerator {
1056                generator: &load_generator,
1057            };
1058            retrieved_source_references = reference_client.get_source_references().await?;
1059            // Filter to the references that need to be created as 'subsources', which
1060            // doesn't include the default output for single-output sources.
1061            // TODO(database-issues#8620): Remove once subsources are removed
1062            let multi_output_sources =
1063                retrieved_source_references
1064                    .all_references()
1065                    .iter()
1066                    .any(|r| {
1067                        r.load_generator_output().expect("is loadgen")
1068                            != &LoadGeneratorOutput::Default
1069                    });
1070
1071            match external_references {
1072                Some(requested)
1073                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1074                {
1075                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1076                }
1077                Some(requested) if !multi_output_sources => match requested {
1078                    ExternalReferences::SubsetTables(_) => {
1079                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1080                    }
1081                    ExternalReferences::SubsetSchemas(_) => {
1082                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1083                    }
1084                    ExternalReferences::All => {
1085                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1086                    }
1087                },
1088                Some(requested) => {
1089                    let requested_exports = retrieved_source_references
1090                        .requested_source_exports(Some(requested), source_name)?;
1091                    for export in requested_exports {
1092                        requested_subsource_map.insert(
1093                            export.name,
1094                            PurifiedSourceExport {
1095                                external_reference: export.external_reference,
1096                                details: PurifiedExportDetails::LoadGenerator {
1097                                    table: export
1098                                        .meta
1099                                        .load_generator_desc()
1100                                        .expect("is loadgen")
1101                                        .clone(),
1102                                    output: export
1103                                        .meta
1104                                        .load_generator_output()
1105                                        .expect("is loadgen")
1106                                        .clone(),
1107                                },
1108                            },
1109                        );
1110                    }
1111                }
1112                None => {
1113                    if multi_output_sources
1114                        && matches!(reference_policy, SourceReferencePolicy::Required)
1115                    {
1116                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1117                    }
1118                }
1119            }
1120
1121            if let LoadGenerator::Clock = generator {
1122                if !options
1123                    .iter()
1124                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1125                {
1126                    let now = catalog.now();
1127                    options.push(LoadGeneratorOption {
1128                        name: LoadGeneratorOptionName::AsOf,
1129                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1130                    });
1131                }
1132            }
1133        }
1134    }
1135
1136    // Now that we know which subsources to create alongside this
1137    // statement, remove the references so it is not canonicalized as
1138    // part of the `CREATE SOURCE` statement in the catalog.
1139    *external_references = None;
1140
1141    // Generate progress subsource
1142
1143    // Take name from input or generate name
1144    let name = match progress_subsource {
1145        Some(name) => match name {
1146            DeferredItemName::Deferred(name) => name.clone(),
1147            DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1148        },
1149        None => {
1150            let (item, prefix) = source_name.0.split_last().unwrap();
1151            let item_name = Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1152                let mut suggested_name = prefix.to_vec();
1153                suggested_name.push(candidate.clone());
1154
1155                let partial = normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1156                let qualified = scx.allocate_qualified_name(partial)?;
1157                let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1158                let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1159                Ok::<_, PlanError>(!item_exists && !type_exists)
1160            })?;
1161
1162            let mut full_name = prefix.to_vec();
1163            full_name.push(item_name);
1164            let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1165            let qualified_name = scx.allocate_qualified_name(full_name)?;
1166            let full_name = scx.catalog.resolve_full_name(&qualified_name);
1167
1168            UnresolvedItemName::from(full_name.clone())
1169        }
1170    };
1171
1172    let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1173
1174    // Create the subsource statement
1175    let mut progress_with_options: Vec<_> = with_options
1176        .iter()
1177        .filter_map(|opt| match opt.name {
1178            CreateSourceOptionName::TimestampInterval => None,
1179            CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1180                name: CreateSubsourceOptionName::RetainHistory,
1181                value: opt.value.clone(),
1182            }),
1183        })
1184        .collect();
1185    progress_with_options.push(CreateSubsourceOption {
1186        name: CreateSubsourceOptionName::Progress,
1187        value: Some(WithOptionValue::Value(Value::Boolean(true))),
1188    });
1189    let create_progress_subsource_stmt = CreateSubsourceStatement {
1190        name,
1191        columns,
1192        // Progress subsources do not refer to the source to which they belong.
1193        // Instead the primary source depends on it (the opposite is true of
1194        // ingestion exports, which depend on the primary source).
1195        of_source: None,
1196        constraints,
1197        if_not_exists: false,
1198        with_options: progress_with_options,
1199    };
1200
1201    purify_source_format(
1202        &catalog,
1203        format,
1204        &format_options,
1205        envelope,
1206        storage_configuration,
1207    )
1208    .await?;
1209
1210    Ok(PurifiedStatement::PurifiedCreateSource {
1211        create_progress_subsource_stmt,
1212        create_source_stmt,
1213        subsources: requested_subsource_map,
1214        available_source_references: retrieved_source_references.available_source_references(),
1215    })
1216}
1217
1218/// On success, returns the details on new subsources and updated
1219/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1220async fn purify_alter_source(
1221    catalog: impl SessionCatalog,
1222    stmt: AlterSourceStatement<Aug>,
1223    storage_configuration: &StorageConfiguration,
1224) -> Result<PurifiedStatement, PlanError> {
1225    let scx = StatementContext::new(None, &catalog);
1226    let AlterSourceStatement {
1227        source_name: unresolved_source_name,
1228        action,
1229        if_exists,
1230    } = stmt;
1231
1232    // Get name.
1233    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1234        Ok(item) => item,
1235        Err(_) if if_exists => {
1236            return Ok(PurifiedStatement::PurifiedAlterSource {
1237                alter_source_stmt: AlterSourceStatement {
1238                    source_name: unresolved_source_name,
1239                    action,
1240                    if_exists,
1241                },
1242            });
1243        }
1244        Err(e) => return Err(e),
1245    };
1246
1247    // Ensure it's an ingestion-based and alterable source.
1248    let desc = match item.source_desc()? {
1249        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1250        None => {
1251            sql_bail!("cannot ALTER this type of source")
1252        }
1253    };
1254
1255    let source_name = item.name();
1256
1257    let resolved_source_name = ResolvedItemName::Item {
1258        id: item.id(),
1259        qualifiers: item.name().qualifiers.clone(),
1260        full_name: scx.catalog.resolve_full_name(source_name),
1261        print_id: true,
1262        version: RelationVersionSelector::Latest,
1263    };
1264
1265    let partial_name = scx.catalog.minimal_qualification(source_name);
1266
1267    match action {
1268        AlterSourceAction::AddSubsources {
1269            external_references,
1270            options,
1271        } => {
1272            if scx.catalog.system_vars().enable_create_table_from_source()
1273                && scx.catalog.system_vars().force_source_table_syntax()
1274            {
1275                Err(PlanError::UseTablesForSources(
1276                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1277                ))?;
1278            }
1279
1280            purify_alter_source_add_subsources(
1281                external_references,
1282                options,
1283                desc,
1284                partial_name,
1285                unresolved_source_name,
1286                resolved_source_name,
1287                storage_configuration,
1288            )
1289            .await
1290        }
1291        AlterSourceAction::RefreshReferences => {
1292            purify_alter_source_refresh_references(
1293                desc,
1294                resolved_source_name,
1295                storage_configuration,
1296            )
1297            .await
1298        }
1299        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1300            alter_source_stmt: AlterSourceStatement {
1301                source_name: unresolved_source_name,
1302                action,
1303                if_exists,
1304            },
1305        }),
1306    }
1307}
1308
1309// TODO(database-issues#8620): Remove once subsources are removed
1310/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1311async fn purify_alter_source_add_subsources(
1312    external_references: Vec<ExternalReferenceExport>,
1313    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1314    desc: SourceDesc,
1315    partial_source_name: PartialItemName,
1316    unresolved_source_name: UnresolvedItemName,
1317    resolved_source_name: ResolvedItemName,
1318    storage_configuration: &StorageConfiguration,
1319) -> Result<PurifiedStatement, PlanError> {
1320    // Validate this is a source that can have subsources added.
1321    let connection_id = match &desc.connection {
1322        GenericSourceConnection::Postgres(c) => c.connection_id,
1323        GenericSourceConnection::MySql(c) => c.connection_id,
1324        GenericSourceConnection::SqlServer(c) => c.connection_id,
1325        _ => sql_bail!(
1326            "source {} does not support ALTER SOURCE.",
1327            partial_source_name
1328        ),
1329    };
1330
1331    let connection_name = desc.connection.name();
1332
1333    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1334        text_columns,
1335        exclude_columns,
1336        details,
1337        seen: _,
1338    } = options.clone().try_into()?;
1339    assert_none!(details, "details cannot be explicitly set");
1340
1341    let mut requested_subsource_map = BTreeMap::new();
1342
1343    match desc.connection {
1344        GenericSourceConnection::Postgres(pg_source_connection) => {
1345            // Get PostgresConnection for generating subsources.
1346            let pg_connection = &pg_source_connection.connection;
1347
1348            let client = pg_connection
1349                .validate(connection_id, storage_configuration)
1350                .await?;
1351
1352            if !exclude_columns.is_empty() {
1353                sql_bail!(
1354                    "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1355                    partial_source_name,
1356                    connection_name
1357                )
1358            }
1359
1360            let reference_client = SourceReferenceClient::Postgres {
1361                client: &client,
1362                publication: &pg_source_connection.publication,
1363                database: &pg_connection.database,
1364            };
1365            let retrieved_source_references = reference_client.get_source_references().await?;
1366
1367            let postgres::PurifiedSourceExports {
1368                source_exports: subsources,
1369                normalized_text_columns,
1370            } = postgres::purify_source_exports(
1371                &client,
1372                &retrieved_source_references,
1373                &Some(ExternalReferences::SubsetTables(external_references)),
1374                text_columns,
1375                &unresolved_source_name,
1376                &SourceReferencePolicy::Required,
1377            )
1378            .await?;
1379
1380            if let Some(text_cols_option) = options
1381                .iter_mut()
1382                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1383            {
1384                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1385            }
1386
1387            requested_subsource_map.extend(subsources);
1388        }
1389        GenericSourceConnection::MySql(mysql_source_connection) => {
1390            let mysql_connection = &mysql_source_connection.connection;
1391            let config = mysql_connection
1392                .config(
1393                    &storage_configuration.connection_context.secrets_reader,
1394                    storage_configuration,
1395                    InTask::No,
1396                )
1397                .await?;
1398
1399            let mut conn = config
1400                .connect(
1401                    "mysql purification",
1402                    &storage_configuration.connection_context.ssh_tunnel_manager,
1403                )
1404                .await?;
1405
1406            // Retrieve the current @gtid_executed value of the server to mark as the effective
1407            // initial snapshot point for these subsources.
1408            let initial_gtid_set =
1409                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1410
1411            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1412
1413            let reference_client = SourceReferenceClient::MySql {
1414                conn: &mut conn,
1415                include_system_schemas: mysql::references_system_schemas(&requested_references),
1416            };
1417            let retrieved_source_references = reference_client.get_source_references().await?;
1418
1419            let mysql::PurifiedSourceExports {
1420                source_exports: subsources,
1421                normalized_text_columns,
1422                normalized_exclude_columns,
1423            } = mysql::purify_source_exports(
1424                &mut conn,
1425                &retrieved_source_references,
1426                &requested_references,
1427                text_columns,
1428                exclude_columns,
1429                &unresolved_source_name,
1430                initial_gtid_set,
1431                &SourceReferencePolicy::Required,
1432            )
1433            .await?;
1434            requested_subsource_map.extend(subsources);
1435
1436            // Update options with the purified details
1437            if let Some(text_cols_option) = options
1438                .iter_mut()
1439                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1440            {
1441                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1442            }
1443            if let Some(ignore_cols_option) = options
1444                .iter_mut()
1445                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1446            {
1447                ignore_cols_option.value =
1448                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1449            }
1450        }
1451        GenericSourceConnection::SqlServer(sql_server_source) => {
1452            // Open a connection to the upstream SQL Server instance.
1453            let sql_server_connection = &sql_server_source.connection;
1454            let config = sql_server_connection
1455                .resolve_config(
1456                    &storage_configuration.connection_context.secrets_reader,
1457                    storage_configuration,
1458                    InTask::No,
1459                )
1460                .await?;
1461            let mut client = mz_sql_server_util::Client::connect(config).await?;
1462
1463            // Query the upstream SQL Server instance for available tables to replicate.
1464            let database = sql_server_connection.database.clone().into();
1465            let source_references = SourceReferenceClient::SqlServer {
1466                client: &mut client,
1467                database: Arc::clone(&database),
1468            }
1469            .get_source_references()
1470            .await?;
1471            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1472
1473            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1474                .get(storage_configuration.config_set());
1475
1476            let result = sql_server::purify_source_exports(
1477                &*database,
1478                &mut client,
1479                &source_references,
1480                &requested_references,
1481                &text_columns,
1482                &exclude_columns,
1483                &unresolved_source_name,
1484                timeout,
1485                &SourceReferencePolicy::Required,
1486            )
1487            .await;
1488            let sql_server::PurifiedSourceExports {
1489                source_exports,
1490                normalized_text_columns,
1491                normalized_excl_columns,
1492            } = result?;
1493
1494            // Add the new exports to our subsource map.
1495            requested_subsource_map.extend(source_exports);
1496
1497            // Update options on the CREATE SOURCE statement with the purified details.
1498            if let Some(text_cols_option) = options
1499                .iter_mut()
1500                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1501            {
1502                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1503            }
1504            if let Some(ignore_cols_option) = options
1505                .iter_mut()
1506                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1507            {
1508                ignore_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1509            }
1510        }
1511        _ => unreachable!(),
1512    };
1513
1514    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1515        source_name: resolved_source_name,
1516        options,
1517        subsources: requested_subsource_map,
1518    })
1519}
1520
1521async fn purify_alter_source_refresh_references(
1522    desc: SourceDesc,
1523    resolved_source_name: ResolvedItemName,
1524    storage_configuration: &StorageConfiguration,
1525) -> Result<PurifiedStatement, PlanError> {
1526    let retrieved_source_references = match desc.connection {
1527        GenericSourceConnection::Postgres(pg_source_connection) => {
1528            // Get PostgresConnection for generating subsources.
1529            let pg_connection = &pg_source_connection.connection;
1530
1531            let config = pg_connection
1532                .config(
1533                    &storage_configuration.connection_context.secrets_reader,
1534                    storage_configuration,
1535                    InTask::No,
1536                )
1537                .await?;
1538
1539            let client = config
1540                .connect(
1541                    "postgres_purification",
1542                    &storage_configuration.connection_context.ssh_tunnel_manager,
1543                )
1544                .await?;
1545            let reference_client = SourceReferenceClient::Postgres {
1546                client: &client,
1547                publication: &pg_source_connection.publication,
1548                database: &pg_connection.database,
1549            };
1550            reference_client.get_source_references().await?
1551        }
1552        GenericSourceConnection::MySql(mysql_source_connection) => {
1553            let mysql_connection = &mysql_source_connection.connection;
1554            let config = mysql_connection
1555                .config(
1556                    &storage_configuration.connection_context.secrets_reader,
1557                    storage_configuration,
1558                    InTask::No,
1559                )
1560                .await?;
1561
1562            let mut conn = config
1563                .connect(
1564                    "mysql purification",
1565                    &storage_configuration.connection_context.ssh_tunnel_manager,
1566                )
1567                .await?;
1568
1569            let reference_client = SourceReferenceClient::MySql {
1570                conn: &mut conn,
1571                include_system_schemas: false,
1572            };
1573            reference_client.get_source_references().await?
1574        }
1575        GenericSourceConnection::SqlServer(sql_server_source) => {
1576            // Open a connection to the upstream SQL Server instance.
1577            let sql_server_connection = &sql_server_source.connection;
1578            let config = sql_server_connection
1579                .resolve_config(
1580                    &storage_configuration.connection_context.secrets_reader,
1581                    storage_configuration,
1582                    InTask::No,
1583                )
1584                .await?;
1585            let mut client = mz_sql_server_util::Client::connect(config).await?;
1586
1587            // Query the upstream SQL Server instance for available tables to replicate.
1588            let source_references = SourceReferenceClient::SqlServer {
1589                client: &mut client,
1590                database: sql_server_connection.database.clone().into(),
1591            }
1592            .get_source_references()
1593            .await?;
1594            source_references
1595        }
1596        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1597            let reference_client = SourceReferenceClient::LoadGenerator {
1598                generator: &load_gen_connection.load_generator,
1599            };
1600            reference_client.get_source_references().await?
1601        }
1602        GenericSourceConnection::Kafka(kafka_conn) => {
1603            let reference_client = SourceReferenceClient::Kafka {
1604                topic: &kafka_conn.topic,
1605            };
1606            reference_client.get_source_references().await?
1607        }
1608    };
1609    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1610        source_name: resolved_source_name,
1611        available_source_references: retrieved_source_references.available_source_references(),
1612    })
1613}
1614
1615async fn purify_create_table_from_source(
1616    catalog: impl SessionCatalog,
1617    mut stmt: CreateTableFromSourceStatement<Aug>,
1618    storage_configuration: &StorageConfiguration,
1619) -> Result<PurifiedStatement, PlanError> {
1620    let scx = StatementContext::new(None, &catalog);
1621    let CreateTableFromSourceStatement {
1622        name: _,
1623        columns,
1624        constraints,
1625        source: source_name,
1626        if_not_exists: _,
1627        external_reference,
1628        format,
1629        envelope,
1630        include_metadata: _,
1631        with_options,
1632    } = &mut stmt;
1633
1634    // Columns and constraints cannot be specified by the user but will be populated below.
1635    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1636        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1637    }
1638    if !constraints.is_empty() {
1639        sql_bail!(
1640            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1641        );
1642    }
1643
1644    // Get the source item
1645    let item = match scx.get_item_by_resolved_name(source_name) {
1646        Ok(item) => item,
1647        Err(e) => return Err(e),
1648    };
1649
1650    // Ensure it's an ingestion-based and alterable source.
1651    let desc = match item.source_desc()? {
1652        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1653        None => {
1654            sql_bail!("cannot ALTER this type of source")
1655        }
1656    };
1657    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1658    let qualified_source_name = item.name();
1659    let connection_name = desc.connection.name();
1660
1661    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1662        text_columns,
1663        exclude_columns,
1664        retain_history: _,
1665        details,
1666        partition_by: _,
1667        seen: _,
1668    } = with_options.clone().try_into()?;
1669    assert_none!(details, "details cannot be explicitly set");
1670
1671    // Our text column values are unqualified (just column names), but the purification methods below
1672    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1673    // need to qualify them using the external reference first.
1674    let qualified_text_columns = text_columns
1675        .iter()
1676        .map(|col| {
1677            UnresolvedItemName(
1678                external_reference
1679                    .as_ref()
1680                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1681                    .unwrap_or_else(|| vec![col.clone()]),
1682            )
1683        })
1684        .collect_vec();
1685    let qualified_exclude_columns = exclude_columns
1686        .iter()
1687        .map(|col| {
1688            UnresolvedItemName(
1689                external_reference
1690                    .as_ref()
1691                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1692                    .unwrap_or_else(|| vec![col.clone()]),
1693            )
1694        })
1695        .collect_vec();
1696
1697    // Should be overriden below if a source-specific format is required.
1698    let mut format_options = SourceFormatOptions::Default;
1699
1700    let retrieved_source_references: RetrievedSourceReferences;
1701
1702    let requested_references = external_reference.as_ref().map(|ref_name| {
1703        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1704            reference: ref_name.clone(),
1705            alias: None,
1706        }])
1707    });
1708
1709    // Run purification work specific to each source type: resolve the external reference to
1710    // a fully qualified name and obtain the appropriate details for the source-export statement
1711    let purified_export = match desc.connection {
1712        GenericSourceConnection::Postgres(pg_source_connection) => {
1713            // Get PostgresConnection for generating subsources.
1714            let pg_connection = &pg_source_connection.connection;
1715
1716            let client = pg_connection
1717                .validate(pg_source_connection.connection_id, storage_configuration)
1718                .await?;
1719
1720            // TODO(roshan): Add support for PG sources to allow this
1721            if !exclude_columns.is_empty() {
1722                sql_bail!(
1723                    "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1724                    scx.catalog.minimal_qualification(qualified_source_name),
1725                    connection_name
1726                )
1727            }
1728
1729            let reference_client = SourceReferenceClient::Postgres {
1730                client: &client,
1731                publication: &pg_source_connection.publication,
1732                database: &pg_connection.database,
1733            };
1734            retrieved_source_references = reference_client.get_source_references().await?;
1735
1736            let postgres::PurifiedSourceExports {
1737                source_exports,
1738                // TODO(database-issues#8620): Remove once subsources are removed
1739                // This `normalized_text_columns` is not relevant for us and is only returned for
1740                // `CREATE SOURCE` statements that automatically generate subsources
1741                normalized_text_columns: _,
1742            } = postgres::purify_source_exports(
1743                &client,
1744                &retrieved_source_references,
1745                &requested_references,
1746                qualified_text_columns,
1747                &unresolved_source_name,
1748                &SourceReferencePolicy::Required,
1749            )
1750            .await?;
1751            // There should be exactly one source_export returned for this statement
1752            let (_, purified_export) = source_exports.into_element();
1753            purified_export
1754        }
1755        GenericSourceConnection::MySql(mysql_source_connection) => {
1756            let mysql_connection = &mysql_source_connection.connection;
1757            let config = mysql_connection
1758                .config(
1759                    &storage_configuration.connection_context.secrets_reader,
1760                    storage_configuration,
1761                    InTask::No,
1762                )
1763                .await?;
1764
1765            let mut conn = config
1766                .connect(
1767                    "mysql purification",
1768                    &storage_configuration.connection_context.ssh_tunnel_manager,
1769                )
1770                .await?;
1771
1772            // Retrieve the current @gtid_executed value of the server to mark as the effective
1773            // initial snapshot point for this table.
1774            let initial_gtid_set =
1775                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1776
1777            let reference_client = SourceReferenceClient::MySql {
1778                conn: &mut conn,
1779                include_system_schemas: mysql::references_system_schemas(&requested_references),
1780            };
1781            retrieved_source_references = reference_client.get_source_references().await?;
1782
1783            let mysql::PurifiedSourceExports {
1784                source_exports,
1785                // TODO(database-issues#8620): Remove once subsources are removed
1786                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1787                // `CREATE SOURCE` statements that automatically generate subsources
1788                normalized_text_columns: _,
1789                normalized_exclude_columns: _,
1790            } = mysql::purify_source_exports(
1791                &mut conn,
1792                &retrieved_source_references,
1793                &requested_references,
1794                qualified_text_columns,
1795                qualified_exclude_columns,
1796                &unresolved_source_name,
1797                initial_gtid_set,
1798                &SourceReferencePolicy::Required,
1799            )
1800            .await?;
1801            // There should be exactly one source_export returned for this statement
1802            let (_, purified_export) = source_exports.into_element();
1803            purified_export
1804        }
1805        GenericSourceConnection::SqlServer(sql_server_source) => {
1806            let connection = sql_server_source.connection;
1807            let config = connection
1808                .resolve_config(
1809                    &storage_configuration.connection_context.secrets_reader,
1810                    storage_configuration,
1811                    InTask::No,
1812                )
1813                .await?;
1814            let mut client = mz_sql_server_util::Client::connect(config).await?;
1815
1816            let database: Arc<str> = connection.database.into();
1817            let reference_client = SourceReferenceClient::SqlServer {
1818                client: &mut client,
1819                database: Arc::clone(&database),
1820            };
1821            retrieved_source_references = reference_client.get_source_references().await?;
1822            tracing::debug!(?retrieved_source_references, "got source references");
1823
1824            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1825                .get(storage_configuration.config_set());
1826
1827            let purified_source_exports = sql_server::purify_source_exports(
1828                &*database,
1829                &mut client,
1830                &retrieved_source_references,
1831                &requested_references,
1832                &qualified_text_columns,
1833                &qualified_exclude_columns,
1834                &unresolved_source_name,
1835                timeout,
1836                &SourceReferencePolicy::Required,
1837            )
1838            .await?;
1839
1840            // There should be exactly one source_export returned for this statement
1841            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1842            purified_export
1843        }
1844        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1845            let reference_client = SourceReferenceClient::LoadGenerator {
1846                generator: &load_gen_connection.load_generator,
1847            };
1848            retrieved_source_references = reference_client.get_source_references().await?;
1849
1850            let requested_exports = retrieved_source_references
1851                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1852            // There should be exactly one source_export returned
1853            let export = requested_exports.into_element();
1854            PurifiedSourceExport {
1855                external_reference: export.external_reference,
1856                details: PurifiedExportDetails::LoadGenerator {
1857                    table: export
1858                        .meta
1859                        .load_generator_desc()
1860                        .expect("is loadgen")
1861                        .clone(),
1862                    output: export
1863                        .meta
1864                        .load_generator_output()
1865                        .expect("is loadgen")
1866                        .clone(),
1867                },
1868            }
1869        }
1870        GenericSourceConnection::Kafka(kafka_conn) => {
1871            let reference_client = SourceReferenceClient::Kafka {
1872                topic: &kafka_conn.topic,
1873            };
1874            retrieved_source_references = reference_client.get_source_references().await?;
1875            let requested_exports = retrieved_source_references
1876                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1877            // There should be exactly one source_export returned
1878            let export = requested_exports.into_element();
1879
1880            format_options = SourceFormatOptions::Kafka {
1881                topic: kafka_conn.topic.clone(),
1882            };
1883            PurifiedSourceExport {
1884                external_reference: export.external_reference,
1885                details: PurifiedExportDetails::Kafka {},
1886            }
1887        }
1888    };
1889
1890    purify_source_format(
1891        &catalog,
1892        format,
1893        &format_options,
1894        envelope,
1895        storage_configuration,
1896    )
1897    .await?;
1898
1899    // Update the external reference in the statement to the resolved fully-qualified
1900    // external reference
1901    *external_reference = Some(purified_export.external_reference.clone());
1902
1903    // Update options in the statement using the purified export details
1904    match &purified_export.details {
1905        PurifiedExportDetails::Postgres { .. } => {
1906            let mut unsupported_cols = vec![];
1907            let postgres::PostgresExportStatementValues {
1908                columns: gen_columns,
1909                constraints: gen_constraints,
1910                text_columns: gen_text_columns,
1911                details: gen_details,
1912                external_reference: _,
1913            } = postgres::generate_source_export_statement_values(
1914                &scx,
1915                purified_export,
1916                &mut unsupported_cols,
1917            )?;
1918            if !unsupported_cols.is_empty() {
1919                unsupported_cols.sort();
1920                Err(PgSourcePurificationError::UnrecognizedTypes {
1921                    cols: unsupported_cols,
1922                })?;
1923            }
1924
1925            if let Some(text_cols_option) = with_options
1926                .iter_mut()
1927                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1928            {
1929                match gen_text_columns {
1930                    Some(gen_text_columns) => {
1931                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1932                    }
1933                    None => soft_panic_or_log!(
1934                        "text_columns should be Some if text_cols_option is present"
1935                    ),
1936                }
1937            }
1938            match columns {
1939                TableFromSourceColumns::Defined(_) => unreachable!(),
1940                TableFromSourceColumns::NotSpecified => {
1941                    *columns = TableFromSourceColumns::Defined(gen_columns);
1942                    *constraints = gen_constraints;
1943                }
1944                TableFromSourceColumns::Named(_) => {
1945                    sql_bail!("columns cannot be named for Postgres sources")
1946                }
1947            }
1948            with_options.push(TableFromSourceOption {
1949                name: TableFromSourceOptionName::Details,
1950                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1951                    gen_details.into_proto().encode_to_vec(),
1952                )))),
1953            })
1954        }
1955        PurifiedExportDetails::MySql { .. } => {
1956            let mysql::MySqlExportStatementValues {
1957                columns: gen_columns,
1958                constraints: gen_constraints,
1959                text_columns: gen_text_columns,
1960                exclude_columns: gen_exclude_columns,
1961                details: gen_details,
1962                external_reference: _,
1963            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
1964
1965            if let Some(text_cols_option) = with_options
1966                .iter_mut()
1967                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1968            {
1969                match gen_text_columns {
1970                    Some(gen_text_columns) => {
1971                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1972                    }
1973                    None => soft_panic_or_log!(
1974                        "text_columns should be Some if text_cols_option is present"
1975                    ),
1976                }
1977            }
1978            if let Some(ignore_cols_option) = with_options
1979                .iter_mut()
1980                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
1981            {
1982                match gen_exclude_columns {
1983                    Some(gen_exclude_columns) => {
1984                        ignore_cols_option.value =
1985                            Some(WithOptionValue::Sequence(gen_exclude_columns))
1986                    }
1987                    None => soft_panic_or_log!(
1988                        "exclude_columns should be Some if ignore_cols_option is present"
1989                    ),
1990                }
1991            }
1992            match columns {
1993                TableFromSourceColumns::Defined(_) => unreachable!(),
1994                TableFromSourceColumns::NotSpecified => {
1995                    *columns = TableFromSourceColumns::Defined(gen_columns);
1996                    *constraints = gen_constraints;
1997                }
1998                TableFromSourceColumns::Named(_) => {
1999                    sql_bail!("columns cannot be named for MySQL sources")
2000                }
2001            }
2002            with_options.push(TableFromSourceOption {
2003                name: TableFromSourceOptionName::Details,
2004                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2005                    gen_details.into_proto().encode_to_vec(),
2006                )))),
2007            })
2008        }
2009        PurifiedExportDetails::SqlServer { .. } => {
2010            let sql_server::SqlServerExportStatementValues {
2011                columns: gen_columns,
2012                constraints: gen_constraints,
2013                text_columns: gen_text_columns,
2014                excl_columns: gen_excl_columns,
2015                details: gen_details,
2016                external_reference: _,
2017            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2018
2019            if let Some(text_cols_option) = with_options
2020                .iter_mut()
2021                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2022            {
2023                match gen_text_columns {
2024                    Some(gen_text_columns) => {
2025                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2026                    }
2027                    None => soft_panic_or_log!(
2028                        "text_columns should be Some if text_cols_option is present"
2029                    ),
2030                }
2031            }
2032            if let Some(exclude_cols_option) = with_options
2033                .iter_mut()
2034                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2035            {
2036                match gen_excl_columns {
2037                    Some(gen_excl_columns) => {
2038                        exclude_cols_option.value =
2039                            Some(WithOptionValue::Sequence(gen_excl_columns))
2040                    }
2041                    None => soft_panic_or_log!(
2042                        "excl_columns should be Some if excl_cols_option is present"
2043                    ),
2044                }
2045            }
2046
2047            match columns {
2048                TableFromSourceColumns::NotSpecified => {
2049                    *columns = TableFromSourceColumns::Defined(gen_columns);
2050                    *constraints = gen_constraints;
2051                }
2052                TableFromSourceColumns::Named(_) => {
2053                    sql_bail!("columns cannot be named for SQL Server sources")
2054                }
2055                TableFromSourceColumns::Defined(_) => unreachable!(),
2056            }
2057
2058            with_options.push(TableFromSourceOption {
2059                name: TableFromSourceOptionName::Details,
2060                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2061                    gen_details.into_proto().encode_to_vec(),
2062                )))),
2063            })
2064        }
2065        PurifiedExportDetails::LoadGenerator { .. } => {
2066            let (desc, output) = match purified_export.details {
2067                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2068                _ => unreachable!("purified export details must be load generator"),
2069            };
2070            // We only determine the table description for multi-output load generator sources here,
2071            // whereas single-output load generators will have their relation description
2072            // determined during statment planning as envelope and format options may affect their
2073            // schema.
2074            if let Some(desc) = desc {
2075                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2076                match columns {
2077                    TableFromSourceColumns::Defined(_) => unreachable!(),
2078                    TableFromSourceColumns::NotSpecified => {
2079                        *columns = TableFromSourceColumns::Defined(gen_columns);
2080                        *constraints = gen_constraints;
2081                    }
2082                    TableFromSourceColumns::Named(_) => {
2083                        sql_bail!("columns cannot be named for multi-output load generator sources")
2084                    }
2085                }
2086            }
2087            let details = SourceExportStatementDetails::LoadGenerator { output };
2088            with_options.push(TableFromSourceOption {
2089                name: TableFromSourceOptionName::Details,
2090                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2091                    details.into_proto().encode_to_vec(),
2092                )))),
2093            })
2094        }
2095        PurifiedExportDetails::Kafka {} => {
2096            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2097            // format field, so we don't specify any columns or constraints to be stored
2098            // on the statement here. The RelationDesc will be determined during planning.
2099            let details = SourceExportStatementDetails::Kafka {};
2100            with_options.push(TableFromSourceOption {
2101                name: TableFromSourceOptionName::Details,
2102                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2103                    details.into_proto().encode_to_vec(),
2104                )))),
2105            })
2106        }
2107    };
2108
2109    // TODO: We might as well use the retrieved available references to update the source
2110    // available references table in the catalog, so plumb this through.
2111    // available_source_references: retrieved_source_references.available_source_references(),
2112    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2113}
2114
2115enum SourceFormatOptions {
2116    Default,
2117    Kafka { topic: String },
2118}
2119
2120async fn purify_source_format(
2121    catalog: &dyn SessionCatalog,
2122    format: &mut Option<FormatSpecifier<Aug>>,
2123    options: &SourceFormatOptions,
2124    envelope: &Option<SourceEnvelope>,
2125    storage_configuration: &StorageConfiguration,
2126) -> Result<(), PlanError> {
2127    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2128        && !matches!(options, SourceFormatOptions::Kafka { .. })
2129    {
2130        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2131    }
2132
2133    match format.as_mut() {
2134        None => {}
2135        Some(FormatSpecifier::Bare(format)) => {
2136            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2137                .await?;
2138        }
2139
2140        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2141            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2142                .await?;
2143            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2144                .await?;
2145        }
2146    }
2147    Ok(())
2148}
2149
2150async fn purify_source_format_single(
2151    catalog: &dyn SessionCatalog,
2152    format: &mut Format<Aug>,
2153    options: &SourceFormatOptions,
2154    envelope: &Option<SourceEnvelope>,
2155    storage_configuration: &StorageConfiguration,
2156) -> Result<(), PlanError> {
2157    match format {
2158        Format::Avro(schema) => match schema {
2159            AvroSchema::Csr { csr_connection } => {
2160                purify_csr_connection_avro(
2161                    catalog,
2162                    options,
2163                    csr_connection,
2164                    envelope,
2165                    storage_configuration,
2166                )
2167                .await?
2168            }
2169            AvroSchema::InlineSchema { .. } => {}
2170        },
2171        Format::Protobuf(schema) => match schema {
2172            ProtobufSchema::Csr { csr_connection } => {
2173                purify_csr_connection_proto(
2174                    catalog,
2175                    options,
2176                    csr_connection,
2177                    envelope,
2178                    storage_configuration,
2179                )
2180                .await?;
2181            }
2182            ProtobufSchema::InlineSchema { .. } => {}
2183        },
2184        Format::Bytes
2185        | Format::Regex(_)
2186        | Format::Json { .. }
2187        | Format::Text
2188        | Format::Csv { .. } => (),
2189    }
2190    Ok(())
2191}
2192
2193pub fn generate_subsource_statements(
2194    scx: &StatementContext,
2195    source_name: ResolvedItemName,
2196    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2197) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2198    // get the first subsource to determine the connection type
2199    if subsources.is_empty() {
2200        return Ok(vec![]);
2201    }
2202    let (_, purified_export) = subsources.iter().next().unwrap();
2203
2204    let statements = match &purified_export.details {
2205        PurifiedExportDetails::Postgres { .. } => {
2206            crate::pure::postgres::generate_create_subsource_statements(
2207                scx,
2208                source_name,
2209                subsources,
2210            )?
2211        }
2212        PurifiedExportDetails::MySql { .. } => {
2213            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2214        }
2215        PurifiedExportDetails::SqlServer { .. } => {
2216            crate::pure::sql_server::generate_create_subsource_statements(
2217                scx,
2218                source_name,
2219                subsources,
2220            )?
2221        }
2222        PurifiedExportDetails::LoadGenerator { .. } => {
2223            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2224            for (subsource_name, purified_export) in subsources {
2225                let (desc, output) = match purified_export.details {
2226                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2227                    _ => unreachable!("purified export details must be load generator"),
2228                };
2229                let desc =
2230                    desc.expect("subsources cannot be generated for single-output load generators");
2231
2232                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2233                let details = SourceExportStatementDetails::LoadGenerator { output };
2234                // Create the subsource statement
2235                let subsource = CreateSubsourceStatement {
2236                    name: subsource_name,
2237                    columns,
2238                    of_source: Some(source_name.clone()),
2239                    // unlike sources that come from an external upstream, we
2240                    // have more leniency to introduce different constraints
2241                    // every time the load generator is run; i.e. we are not as
2242                    // worried about introducing junk data.
2243                    constraints: table_constraints,
2244                    if_not_exists: false,
2245                    with_options: vec![
2246                        CreateSubsourceOption {
2247                            name: CreateSubsourceOptionName::ExternalReference,
2248                            value: Some(WithOptionValue::UnresolvedItemName(
2249                                purified_export.external_reference,
2250                            )),
2251                        },
2252                        CreateSubsourceOption {
2253                            name: CreateSubsourceOptionName::Details,
2254                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2255                                details.into_proto().encode_to_vec(),
2256                            )))),
2257                        },
2258                    ],
2259                };
2260                subsource_stmts.push(subsource);
2261            }
2262
2263            subsource_stmts
2264        }
2265        PurifiedExportDetails::Kafka { .. } => {
2266            // TODO: as part of database-issues#8322, Kafka sources will begin
2267            // producing data––we'll need to understand the schema
2268            // of the output here.
2269            assert!(
2270                subsources.is_empty(),
2271                "Kafka sources do not produce data-bearing subsources"
2272            );
2273            vec![]
2274        }
2275    };
2276    Ok(statements)
2277}
2278
2279async fn purify_csr_connection_proto(
2280    catalog: &dyn SessionCatalog,
2281    options: &SourceFormatOptions,
2282    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2283    envelope: &Option<SourceEnvelope>,
2284    storage_configuration: &StorageConfiguration,
2285) -> Result<(), PlanError> {
2286    let SourceFormatOptions::Kafka { topic } = options else {
2287        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2288    };
2289
2290    let CsrConnectionProtobuf {
2291        seed,
2292        connection: CsrConnection {
2293            connection,
2294            options: _,
2295        },
2296    } = csr_connection;
2297    match seed {
2298        None => {
2299            let scx = StatementContext::new(None, &*catalog);
2300
2301            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2302                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2303                _ => sql_bail!("{} is not a schema registry connection", connection),
2304            };
2305
2306            let ccsr_client = ccsr_connection
2307                .connect(storage_configuration, InTask::No)
2308                .await
2309                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2310
2311            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2312            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2313                .await
2314                .ok();
2315
2316            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2317                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2318            }
2319
2320            *seed = Some(CsrSeedProtobuf { value, key });
2321        }
2322        Some(_) => (),
2323    }
2324
2325    Ok(())
2326}
2327
2328async fn purify_csr_connection_avro(
2329    catalog: &dyn SessionCatalog,
2330    options: &SourceFormatOptions,
2331    csr_connection: &mut CsrConnectionAvro<Aug>,
2332    envelope: &Option<SourceEnvelope>,
2333    storage_configuration: &StorageConfiguration,
2334) -> Result<(), PlanError> {
2335    let SourceFormatOptions::Kafka { topic } = options else {
2336        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2337    };
2338
2339    let CsrConnectionAvro {
2340        connection: CsrConnection { connection, .. },
2341        seed,
2342        key_strategy,
2343        value_strategy,
2344    } = csr_connection;
2345    if seed.is_none() {
2346        let scx = StatementContext::new(None, &*catalog);
2347        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2348            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2349            _ => sql_bail!("{} is not a schema registry connection", connection),
2350        };
2351        let ccsr_client = csr_connection
2352            .connect(storage_configuration, InTask::No)
2353            .await
2354            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2355
2356        let Schema {
2357            key_schema,
2358            value_schema,
2359        } = get_remote_csr_schema(
2360            &ccsr_client,
2361            key_strategy.clone().unwrap_or_default(),
2362            value_strategy.clone().unwrap_or_default(),
2363            topic,
2364        )
2365        .await?;
2366        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2367            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2368        }
2369
2370        *seed = Some(CsrSeedAvro {
2371            key_schema,
2372            value_schema,
2373        })
2374    }
2375
2376    Ok(())
2377}
2378
2379#[derive(Debug)]
2380pub struct Schema {
2381    pub key_schema: Option<String>,
2382    pub value_schema: String,
2383}
2384
2385async fn get_schema_with_strategy(
2386    client: &Client,
2387    strategy: ReaderSchemaSelectionStrategy,
2388    subject: &str,
2389) -> Result<Option<String>, PlanError> {
2390    match strategy {
2391        ReaderSchemaSelectionStrategy::Latest => {
2392            match client.get_schema_by_subject(subject).await {
2393                Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2394                Err(GetBySubjectError::SubjectNotFound)
2395                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2396                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2397                    schema_lookup: format!("subject {}", subject.quoted()),
2398                    cause: Arc::new(e),
2399                }),
2400            }
2401        }
2402        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2403        ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2404            Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2405            Err(GetByIdError::SchemaNotFound) => Ok(None),
2406            Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2407                schema_lookup: format!("ID {}", id),
2408                cause: Arc::new(e),
2409            }),
2410        },
2411    }
2412}
2413
2414async fn get_remote_csr_schema(
2415    ccsr_client: &mz_ccsr::Client,
2416    key_strategy: ReaderSchemaSelectionStrategy,
2417    value_strategy: ReaderSchemaSelectionStrategy,
2418    topic: &str,
2419) -> Result<Schema, PlanError> {
2420    let value_schema_name = format!("{}-value", topic);
2421    let value_schema =
2422        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2423    let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2424    let subject = format!("{}-key", topic);
2425    let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2426    Ok(Schema {
2427        key_schema,
2428        value_schema,
2429    })
2430}
2431
2432/// Collect protobuf message descriptor from CSR and compile the descriptor.
2433async fn compile_proto(
2434    subject_name: &String,
2435    ccsr_client: &Client,
2436) -> Result<CsrSeedProtobufSchema, PlanError> {
2437    let (primary_subject, dependency_subjects) = ccsr_client
2438        .get_subject_and_references(subject_name)
2439        .await
2440        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2441            schema_lookup: format!("subject {}", subject_name.quoted()),
2442            cause: Arc::new(e),
2443        })?;
2444
2445    // Compile .proto files into a file descriptor set.
2446    let mut source_tree = VirtualSourceTree::new();
2447    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2448        source_tree.as_mut().add_file(
2449            Path::new(&subject.name),
2450            subject.schema.raw.as_bytes().to_vec(),
2451        );
2452    }
2453    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2454    let fds = db
2455        .as_mut()
2456        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2457        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2458
2459    // Ensure there is exactly one message in the file.
2460    let primary_fd = fds.file(0);
2461    let message_name = match primary_fd.message_type_size() {
2462        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2463        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2464        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2465    };
2466
2467    // Encode the file descriptor set into a SQL byte string.
2468    let bytes = &fds
2469        .serialize()
2470        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2471    let mut schema = String::new();
2472    strconv::format_bytes(&mut schema, bytes);
2473
2474    Ok(CsrSeedProtobufSchema {
2475        schema,
2476        message_name,
2477    })
2478}
2479
2480const MZ_NOW_NAME: &str = "mz_now";
2481const MZ_NOW_SCHEMA: &str = "mz_catalog";
2482
2483/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2484/// references to ids appear or disappear during the purification.
2485///
2486/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2487/// this function is not making any network calls.
2488pub fn purify_create_materialized_view_options(
2489    catalog: impl SessionCatalog,
2490    mz_now: Option<Timestamp>,
2491    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2492    resolved_ids: &mut ResolvedIds,
2493) {
2494    // 0. Preparations:
2495    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2496    let (mz_now_id, mz_now_expr) = {
2497        let item = catalog
2498            .resolve_function(&PartialItemName {
2499                database: None,
2500                schema: Some(MZ_NOW_SCHEMA.to_string()),
2501                item: MZ_NOW_NAME.to_string(),
2502            })
2503            .expect("we should be able to resolve mz_now");
2504        (
2505            item.id(),
2506            Expr::Function(Function {
2507                name: ResolvedItemName::Item {
2508                    id: item.id(),
2509                    qualifiers: item.name().qualifiers.clone(),
2510                    full_name: catalog.resolve_full_name(item.name()),
2511                    print_id: false,
2512                    version: RelationVersionSelector::Latest,
2513                },
2514                args: FunctionArgs::Args {
2515                    args: Vec::new(),
2516                    order_by: Vec::new(),
2517                },
2518                filter: None,
2519                over: None,
2520                distinct: false,
2521            }),
2522        )
2523    };
2524    // Prepare the `mz_timestamp` type.
2525    let (mz_timestamp_id, mz_timestamp_type) = {
2526        let item = catalog.get_system_type("mz_timestamp");
2527        let full_name = catalog.resolve_full_name(item.name());
2528        (
2529            item.id(),
2530            ResolvedDataType::Named {
2531                id: item.id(),
2532                qualifiers: item.name().qualifiers.clone(),
2533                full_name,
2534                modifiers: vec![],
2535                print_id: true,
2536            },
2537        )
2538    };
2539
2540    let mut introduced_mz_timestamp = false;
2541
2542    for option in cmvs.with_options.iter_mut() {
2543        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2544        if matches!(
2545            option.value,
2546            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2547        ) {
2548            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2549                RefreshAtOptionValue {
2550                    time: mz_now_expr.clone(),
2551                },
2552            )));
2553        }
2554
2555        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2556        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2557            RefreshEveryOptionValue { aligned_to, .. },
2558        ))) = &mut option.value
2559        {
2560            if aligned_to.is_none() {
2561                *aligned_to = Some(mz_now_expr.clone());
2562            }
2563        }
2564
2565        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2566        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2567        match &mut option.value {
2568            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2569                time,
2570            }))) => {
2571                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2572                visitor.visit_expr_mut(time);
2573                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2574            }
2575            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2576                RefreshEveryOptionValue {
2577                    interval: _,
2578                    aligned_to: Some(aligned_to),
2579                },
2580            ))) => {
2581                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2582                visitor.visit_expr_mut(aligned_to);
2583                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2584            }
2585            _ => {}
2586        }
2587    }
2588
2589    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2590    if !cmvs.with_options.iter().any(|o| {
2591        matches!(
2592            o,
2593            MaterializedViewOption {
2594                value: Some(WithOptionValue::Refresh(..)),
2595                ..
2596            }
2597        )
2598    }) {
2599        cmvs.with_options.push(MaterializedViewOption {
2600            name: MaterializedViewOptionName::Refresh,
2601            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2602        })
2603    }
2604
2605    // 5. Attend to `resolved_ids`: The purification might have
2606    // - added references to `mz_timestamp`;
2607    // - removed references to `mz_now`.
2608    if introduced_mz_timestamp {
2609        resolved_ids.add_item(mz_timestamp_id);
2610    }
2611    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2612    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2613    // for `mz_now()` everywhere.
2614    let mut visitor = ExprContainsTemporalVisitor::new();
2615    visitor.visit_create_materialized_view_statement(cmvs);
2616    if !visitor.contains_temporal {
2617        resolved_ids.remove_item(&mz_now_id);
2618    }
2619}
2620
2621/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2622/// after purification.
2623pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2624    match &mvo.value {
2625        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2626            let mut visitor = ExprContainsTemporalVisitor::new();
2627            visitor.visit_expr(time);
2628            visitor.contains_temporal
2629        }
2630        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2631            interval: _,
2632            aligned_to: Some(aligned_to),
2633        }))) => {
2634            let mut visitor = ExprContainsTemporalVisitor::new();
2635            visitor.visit_expr(aligned_to);
2636            visitor.contains_temporal
2637        }
2638        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2639            interval: _,
2640            aligned_to: None,
2641        }))) => {
2642            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2643            // `ALIGNED TO` to `mz_now()`.
2644            true
2645        }
2646        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2647            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2648            true
2649        }
2650        _ => false,
2651    }
2652}
2653
2654/// Determines whether the AST involves `mz_now()`.
2655struct ExprContainsTemporalVisitor {
2656    pub contains_temporal: bool,
2657}
2658
2659impl ExprContainsTemporalVisitor {
2660    pub fn new() -> ExprContainsTemporalVisitor {
2661        ExprContainsTemporalVisitor {
2662            contains_temporal: false,
2663        }
2664    }
2665}
2666
2667impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2668    fn visit_function(&mut self, func: &Function<Aug>) {
2669        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2670        visit_function(self, func);
2671    }
2672}
2673
2674struct MzNowPurifierVisitor {
2675    pub mz_now: Option<Timestamp>,
2676    pub mz_timestamp_type: ResolvedDataType,
2677    pub introduced_mz_timestamp: bool,
2678}
2679
2680impl MzNowPurifierVisitor {
2681    pub fn new(
2682        mz_now: Option<Timestamp>,
2683        mz_timestamp_type: ResolvedDataType,
2684    ) -> MzNowPurifierVisitor {
2685        MzNowPurifierVisitor {
2686            mz_now,
2687            mz_timestamp_type,
2688            introduced_mz_timestamp: false,
2689        }
2690    }
2691}
2692
2693impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2694    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2695        match expr {
2696            Expr::Function(Function {
2697                name:
2698                    ResolvedItemName::Item {
2699                        full_name: FullItemName { item, .. },
2700                        ..
2701                    },
2702                ..
2703            }) if item == &MZ_NOW_NAME.to_string() => {
2704                let mz_now = self.mz_now.expect(
2705                    "we should have chosen a timestamp if the expression contains mz_now()",
2706                );
2707                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2708                // not alter the type of the expression.
2709                *expr = Expr::Cast {
2710                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2711                    data_type: self.mz_timestamp_type.clone(),
2712                };
2713                self.introduced_mz_timestamp = true;
2714            }
2715            _ => visit_expr_mut(self, expr),
2716        }
2717    }
2718}