mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
62};
63use prost::Message;
64use protobuf_native::MessageLite;
65use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
66use rdkafka::admin::AdminClient;
67use references::{RetrievedSourceReferences, SourceReferenceClient};
68use uuid::Uuid;
69
70use crate::ast::{
71    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
72    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
73    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
74};
75use crate::catalog::{CatalogItemType, SessionCatalog};
76use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
77use crate::names::{
78    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
79    ResolvedItemName,
80};
81use crate::plan::error::PlanError;
82use crate::plan::statement::ddl::load_generator_ast_to_generator;
83use crate::plan::{SourceReferences, StatementContext};
84use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
85use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
86use crate::{kafka_util, normalize};
87
88use self::error::{
89    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
90    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
91};
92
93pub(crate) mod error;
94mod references;
95
96pub mod mysql;
97pub mod postgres;
98pub mod sql_server;
99
100pub(crate) struct RequestedSourceExport<T> {
101    external_reference: UnresolvedItemName,
102    name: UnresolvedItemName,
103    meta: T,
104}
105
106impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.debug_struct("RequestedSourceExport")
109            .field("external_reference", &self.external_reference)
110            .field("name", &self.name)
111            .field("meta", &self.meta)
112            .finish()
113    }
114}
115
116impl<T> RequestedSourceExport<T> {
117    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
118        RequestedSourceExport {
119            external_reference: self.external_reference,
120            name: self.name,
121            meta: new_meta,
122        }
123    }
124}
125
126/// Generates a subsource name by prepending source schema name if present
127///
128/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
129/// so that it's generated in the same schema as source
130fn source_export_name_gen(
131    source_name: &UnresolvedItemName,
132    subsource_name: &str,
133) -> Result<UnresolvedItemName, PlanError> {
134    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
135    partial.item = subsource_name.to_string();
136    Ok(UnresolvedItemName::from(partial))
137}
138
139// TODO(database-issues#8620): Remove once subsources are removed
140/// Validates the requested subsources do not have name conflicts with each other
141/// and that the same upstream table is not referenced multiple times.
142fn validate_source_export_names<T>(
143    requested_source_exports: &[RequestedSourceExport<T>],
144) -> Result<(), PlanError> {
145    // This condition would get caught during the catalog transaction, but produces a
146    // vague, non-contextual error. Instead, error here so we can suggest to the user
147    // how to fix the problem.
148    if let Some(name) = requested_source_exports
149        .iter()
150        .map(|subsource| &subsource.name)
151        .duplicates()
152        .next()
153        .cloned()
154    {
155        let mut upstream_references: Vec<_> = requested_source_exports
156            .into_iter()
157            .filter_map(|subsource| {
158                if &subsource.name == &name {
159                    Some(subsource.external_reference.clone())
160                } else {
161                    None
162                }
163            })
164            .collect();
165
166        upstream_references.sort();
167
168        Err(PlanError::SubsourceNameConflict {
169            name,
170            upstream_references,
171        })?;
172    }
173
174    // We disallow subsource statements from referencing the same upstream table, but we allow
175    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
176    // purification will only provide 1 `requested_source_export`, we can leave this here without
177    // needing to differentiate between the two types of statements.
178    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
179    if let Some(name) = requested_source_exports
180        .iter()
181        .map(|export| &export.external_reference)
182        .duplicates()
183        .next()
184        .cloned()
185    {
186        let mut target_names: Vec<_> = requested_source_exports
187            .into_iter()
188            .filter_map(|export| {
189                if &export.external_reference == &name {
190                    Some(export.name.clone())
191                } else {
192                    None
193                }
194            })
195            .collect();
196
197        target_names.sort();
198
199        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
200    }
201
202    Ok(())
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum PurifiedStatement {
207    PurifiedCreateSource {
208        // The progress subsource, if we are offloading progress info to a separate relation
209        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
210        create_source_stmt: CreateSourceStatement<Aug>,
211        // Map of subsource names to external details
212        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213        /// All the available upstream references that can be added as tables
214        /// to this primary source.
215        available_source_references: SourceReferences,
216    },
217    PurifiedAlterSource {
218        alter_source_stmt: AlterSourceStatement<Aug>,
219    },
220    PurifiedAlterSourceAddSubsources {
221        // This just saves us an annoying catalog lookup
222        source_name: ResolvedItemName,
223        /// Options that we will need the values of to update the source's
224        /// definition.
225        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226        // Map of subsource names to external details
227        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228    },
229    PurifiedAlterSourceRefreshReferences {
230        source_name: ResolvedItemName,
231        /// The updated available upstream references for the primary source.
232        available_source_references: SourceReferences,
233    },
234    PurifiedCreateSink(CreateSinkStatement<Aug>),
235    PurifiedCreateTableFromSource {
236        stmt: CreateTableFromSourceStatement<Aug>,
237    },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242    pub external_reference: UnresolvedItemName,
243    pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248    MySql {
249        table: MySqlTableDesc,
250        text_columns: Option<Vec<Ident>>,
251        exclude_columns: Option<Vec<Ident>>,
252        initial_gtid_set: String,
253    },
254    Postgres {
255        table: PostgresTableDesc,
256        text_columns: Option<Vec<Ident>>,
257        exclude_columns: Option<Vec<Ident>>,
258    },
259    SqlServer {
260        table: SqlServerTableDesc,
261        text_columns: Option<Vec<Ident>>,
262        excl_columns: Option<Vec<Ident>>,
263        capture_instance: Arc<str>,
264        initial_lsn: mz_sql_server_util::cdc::Lsn,
265    },
266    Kafka {},
267    LoadGenerator {
268        table: Option<RelationDesc>,
269        output: LoadGeneratorOutput,
270    },
271}
272
273/// Purifies a statement, removing any dependencies on external state.
274///
275/// See the section on [purification](crate#purification) in the crate
276/// documentation for details.
277///
278/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
279/// handled by [purify_create_materialized_view_options] instead.
280/// This could be made more consistent by a refactoring discussed here:
281/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
282pub async fn purify_statement(
283    catalog: impl SessionCatalog,
284    now: u64,
285    stmt: Statement<Aug>,
286    storage_configuration: &StorageConfiguration,
287) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
288    match stmt {
289        Statement::CreateSource(stmt) => {
290            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
291            (
292                purify_create_source(catalog, now, stmt, storage_configuration).await,
293                cluster_id,
294            )
295        }
296        Statement::AlterSource(stmt) => (
297            purify_alter_source(catalog, stmt, storage_configuration).await,
298            None,
299        ),
300        Statement::CreateSink(stmt) => {
301            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
302            (
303                purify_create_sink(catalog, stmt, storage_configuration).await,
304                cluster_id,
305            )
306        }
307        Statement::CreateTableFromSource(stmt) => (
308            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
309            None,
310        ),
311        o => unreachable!("{:?} does not need to be purified", o),
312    }
313}
314
315/// Injects `DOC ON` comments into all Avro formats that are using a schema
316/// registry by finding all SQL `COMMENT`s that are attached to the sink's
317/// underlying materialized view (as well as any types referenced by that
318/// underlying materialized view).
319pub(crate) fn purify_create_sink_avro_doc_on_options(
320    catalog: &dyn SessionCatalog,
321    from_id: CatalogItemId,
322    format: &mut Option<FormatSpecifier<Aug>>,
323) -> Result<(), PlanError> {
324    // Collect all objects referenced by the sink.
325    let from = catalog.get_item(&from_id);
326    let object_ids = from
327        .references()
328        .items()
329        .copied()
330        .chain_one(from.id())
331        .collect::<Vec<_>>();
332
333    // Collect all Avro formats that use a schema registry, as well as a set of
334    // all identifiers named in user-provided `DOC ON` options.
335    let mut avro_format_options = vec![];
336    for_each_format(format, |doc_on_schema, fmt| match fmt {
337        Format::Avro(AvroSchema::InlineSchema { .. })
338        | Format::Bytes
339        | Format::Csv { .. }
340        | Format::Json { .. }
341        | Format::Protobuf(..)
342        | Format::Regex(..)
343        | Format::Text => (),
344        Format::Avro(AvroSchema::Csr {
345            csr_connection: CsrConnectionAvro { connection, .. },
346        }) => {
347            avro_format_options.push((doc_on_schema, &mut connection.options));
348        }
349    });
350
351    // For each Avro format in the sink, inject the appropriate `DOC ON` options
352    // for each item referenced by the sink.
353    for (for_schema, options) in avro_format_options {
354        let user_provided_comments = options
355            .iter()
356            .filter_map(|CsrConfigOption { name, .. }| match name {
357                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
358                _ => None,
359            })
360            .collect::<BTreeSet<_>>();
361
362        // Adding existing comments if not already provided by user
363        for object_id in &object_ids {
364            // Always add comments to the latest version of the item.
365            let item = catalog
366                .get_item(object_id)
367                .at_version(RelationVersionSelector::Latest);
368            let full_name = catalog.resolve_full_name(item.name());
369            let full_resolved_name = ResolvedItemName::Item {
370                id: *object_id,
371                qualifiers: item.name().qualifiers.clone(),
372                full_name: full_name.clone(),
373                print_id: !matches!(
374                    item.item_type(),
375                    CatalogItemType::Func | CatalogItemType::Type
376                ),
377                version: RelationVersionSelector::Latest,
378            };
379
380            if let Some(comments_map) = catalog.get_item_comments(object_id) {
381                // Attach comment for the item itself, if the user has not
382                // already provided an overriding `DOC ON` option for the item.
383                let doc_on_item_key = AvroDocOn {
384                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
385                    for_schema,
386                };
387                if !user_provided_comments.contains(&doc_on_item_key) {
388                    if let Some(root_comment) = comments_map.get(&None) {
389                        options.push(CsrConfigOption {
390                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
391                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
392                                root_comment.clone(),
393                            ))),
394                        });
395                    }
396                }
397
398                // Attach comment for each column in the item, if the user has
399                // not already provided an overriding `DOC ON` option for the
400                // column.
401                let column_descs: Result<Option<RelationDesc>, PlanError> = match item.item_type() {
402                    CatalogItemType::Type => item
403                        .type_details()
404                        .and_then(|details| details.typ.desc(catalog).transpose())
405                        .transpose(),
406                    _ => item
407                        .desc(&full_name)
408                        .map(|desc| Some(desc.into_owned()))
409                        .map_err(Into::into),
410                };
411
412                if let Ok(Some(desc)) = column_descs {
413                    for (pos, column_name) in desc.iter_names().enumerate() {
414                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
415                            let doc_on_column_key = AvroDocOn {
416                                identifier: DocOnIdentifier::Column(ColumnName {
417                                    relation: full_resolved_name.clone(),
418                                    column: ResolvedColumnReference::Column {
419                                        name: column_name.to_owned(),
420                                        index: pos,
421                                    },
422                                }),
423                                for_schema,
424                            };
425                            if !user_provided_comments.contains(&doc_on_column_key) {
426                                options.push(CsrConfigOption {
427                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
428                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
429                                        Value::String(comment_str.clone()),
430                                    )),
431                                });
432                            }
433                        }
434                    }
435                }
436            }
437        }
438    }
439
440    Ok(())
441}
442
443/// Checks that the sink described in the statement can connect to its external
444/// resources.
445async fn purify_create_sink(
446    catalog: impl SessionCatalog,
447    mut create_sink_stmt: CreateSinkStatement<Aug>,
448    storage_configuration: &StorageConfiguration,
449) -> Result<PurifiedStatement, PlanError> {
450    // General purification
451    let CreateSinkStatement {
452        connection,
453        format,
454        with_options,
455        name: _,
456        in_cluster: _,
457        if_not_exists: _,
458        from,
459        envelope: _,
460    } = &mut create_sink_stmt;
461
462    // The list of options that the user is allowed to specify.
463    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
464
465    if let Some(op) = with_options
466        .iter()
467        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
468    {
469        sql_bail!(
470            "CREATE SINK...WITH ({}..) is not allowed",
471            op.name.to_ast_string_simple(),
472        )
473    }
474
475    match &connection {
476        CreateSinkConnection::Kafka {
477            connection,
478            options,
479            key: _,
480            headers: _,
481        } => {
482            // We must not leave any state behind in the Kafka broker, so just ensure that
483            // we can connect. This means we don't ensure that we can create the topic and
484            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
485            // preferable to leaking state in users' environments.
486            let scx = StatementContext::new(None, &catalog);
487            let connection = {
488                let item = scx.get_item_by_resolved_name(connection)?;
489                // Get Kafka connection
490                match item.connection()? {
491                    Connection::Kafka(connection) => {
492                        connection.clone().into_inline_connection(scx.catalog)
493                    }
494                    _ => sql_bail!(
495                        "{} is not a kafka connection",
496                        scx.catalog.resolve_full_name(item.name())
497                    ),
498                }
499            };
500
501            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
502
503            if extracted_options.legacy_ids == Some(true) {
504                sql_bail!("LEGACY IDs option is not supported");
505            }
506
507            let client: AdminClient<_> = connection
508                .create_with_context(
509                    storage_configuration,
510                    MzClientContext::default(),
511                    &BTreeMap::new(),
512                    InTask::No,
513                )
514                .await
515                .map_err(|e| {
516                    // anyhow doesn't support Clone, so not trivial to move into PlanError
517                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
518                })?;
519
520            let metadata = client
521                .inner()
522                .fetch_metadata(
523                    None,
524                    storage_configuration
525                        .parameters
526                        .kafka_timeout_config
527                        .fetch_metadata_timeout,
528                )
529                .map_err(|e| {
530                    KafkaSinkPurificationError::AdminClientError(Arc::new(
531                        ContextCreationError::KafkaError(e),
532                    ))
533                })?;
534
535            if metadata.brokers().len() == 0 {
536                Err(KafkaSinkPurificationError::ZeroBrokers)?;
537            }
538        }
539        CreateSinkConnection::Iceberg {
540            connection,
541            aws_connection,
542            ..
543        } => {
544            let scx = StatementContext::new(None, &catalog);
545            let connection = {
546                let item = scx.get_item_by_resolved_name(connection)?;
547                // Get Iceberg connection
548                match item.connection()? {
549                    Connection::IcebergCatalog(connection) => {
550                        connection.clone().into_inline_connection(scx.catalog)
551                    }
552                    _ => sql_bail!(
553                        "{} is not an iceberg connection",
554                        scx.catalog.resolve_full_name(item.name())
555                    ),
556                }
557            };
558
559            let aws_conn_id = aws_connection.item_id();
560
561            let aws_connection = {
562                let item = scx.get_item_by_resolved_name(aws_connection)?;
563                // Get AWS connection
564                match item.connection()? {
565                    Connection::Aws(aws_connection) => aws_connection.clone(),
566                    _ => sql_bail!(
567                        "{} is not an aws connection",
568                        scx.catalog.resolve_full_name(item.name())
569                    ),
570                }
571            };
572
573            let _catalog = connection
574                .connect(storage_configuration, InTask::No)
575                .await
576                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
577
578            let sdk_config = aws_connection
579                .load_sdk_config(
580                    &storage_configuration.connection_context,
581                    aws_conn_id.clone(),
582                    InTask::No,
583                )
584                .await
585                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
586
587            let sts_client = aws_sdk_sts::Client::new(&sdk_config);
588            let _ = sts_client.get_caller_identity().send().await.map_err(|e| {
589                IcebergSinkPurificationError::StsIdentityError(Arc::new(e.into_service_error()))
590            })?;
591        }
592    }
593
594    let mut csr_connection_ids = BTreeSet::new();
595    for_each_format(format, |_, fmt| match fmt {
596        Format::Avro(AvroSchema::InlineSchema { .. })
597        | Format::Bytes
598        | Format::Csv { .. }
599        | Format::Json { .. }
600        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
601        | Format::Regex(..)
602        | Format::Text => (),
603        Format::Avro(AvroSchema::Csr {
604            csr_connection: CsrConnectionAvro { connection, .. },
605        })
606        | Format::Protobuf(ProtobufSchema::Csr {
607            csr_connection: CsrConnectionProtobuf { connection, .. },
608        }) => {
609            csr_connection_ids.insert(*connection.connection.item_id());
610        }
611    });
612
613    let scx = StatementContext::new(None, &catalog);
614    for csr_connection_id in csr_connection_ids {
615        let connection = {
616            let item = scx.get_item(&csr_connection_id);
617            // Get Kafka connection
618            match item.connection()? {
619                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
620                _ => Err(CsrPurificationError::NotCsrConnection(
621                    scx.catalog.resolve_full_name(item.name()),
622                ))?,
623            }
624        };
625
626        let client = connection
627            .connect(storage_configuration, InTask::No)
628            .await
629            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
630
631        client
632            .list_subjects()
633            .await
634            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
635    }
636
637    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
638
639    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
640}
641
642/// Runs a function on each format within the format specifier.
643///
644/// The function is provided with a `DocOnSchema` that indicates whether the
645/// format is for the key, value, or both.
646///
647// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
648fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
649where
650    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
651{
652    match format {
653        None => (),
654        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
655        Some(FormatSpecifier::KeyValue { key, value }) => {
656            f(DocOnSchema::KeyOnly, key);
657            f(DocOnSchema::ValueOnly, value);
658        }
659    }
660}
661
662/// Defines whether purification should enforce that at least one valid source
663/// reference is provided on the provided statement.
664#[derive(Debug, Copy, Clone, PartialEq, Eq)]
665pub(crate) enum SourceReferencePolicy {
666    /// Don't allow source references to be provided. This is used for
667    /// enforcing that `CREATE SOURCE` statements don't create subsources.
668    NotAllowed,
669    /// Allow empty references, such as when creating a source that
670    /// will have tables added afterwards.
671    Optional,
672    /// Require that at least one reference is resolved to an upstream
673    /// object.
674    Required,
675}
676
677async fn purify_create_source(
678    catalog: impl SessionCatalog,
679    now: u64,
680    mut create_source_stmt: CreateSourceStatement<Aug>,
681    storage_configuration: &StorageConfiguration,
682) -> Result<PurifiedStatement, PlanError> {
683    let CreateSourceStatement {
684        name: source_name,
685        col_names,
686        key_constraint,
687        connection: source_connection,
688        format,
689        envelope,
690        include_metadata,
691        external_references,
692        progress_subsource,
693        with_options,
694        ..
695    } = &mut create_source_stmt;
696
697    let uses_old_syntax = !col_names.is_empty()
698        || key_constraint.is_some()
699        || format.is_some()
700        || envelope.is_some()
701        || !include_metadata.is_empty()
702        || external_references.is_some()
703        || progress_subsource.is_some();
704
705    if let Some(DeferredItemName::Named(_)) = progress_subsource {
706        sql_bail!("Cannot manually ID qualify progress subsource")
707    }
708
709    let mut requested_subsource_map = BTreeMap::new();
710
711    let progress_desc = match &source_connection {
712        CreateSourceConnection::Kafka { .. } => {
713            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
714        }
715        CreateSourceConnection::Postgres { .. } => {
716            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
717        }
718        CreateSourceConnection::SqlServer { .. } => {
719            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
720        }
721        CreateSourceConnection::MySql { .. } => {
722            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
723        }
724        CreateSourceConnection::LoadGenerator { .. } => {
725            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
726        }
727    };
728    let scx = StatementContext::new(None, &catalog);
729
730    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
731    // to add tables after this source is created we might need to enforce that
732    // auto-generated subsources are created or not created by this source statement.
733    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
734        && scx.catalog.system_vars().force_source_table_syntax()
735    {
736        SourceReferencePolicy::NotAllowed
737    } else if scx.catalog.system_vars().enable_create_table_from_source() {
738        SourceReferencePolicy::Optional
739    } else {
740        SourceReferencePolicy::Required
741    };
742
743    let mut format_options = SourceFormatOptions::Default;
744
745    let retrieved_source_references: RetrievedSourceReferences;
746
747    match source_connection {
748        CreateSourceConnection::Kafka {
749            connection,
750            options: base_with_options,
751            ..
752        } => {
753            if let Some(external_references) = external_references {
754                Err(KafkaSourcePurificationError::ReferencedSubsources(
755                    external_references.clone(),
756                ))?;
757            }
758
759            let connection = {
760                let item = scx.get_item_by_resolved_name(connection)?;
761                // Get Kafka connection
762                match item.connection()? {
763                    Connection::Kafka(connection) => {
764                        connection.clone().into_inline_connection(&catalog)
765                    }
766                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
767                        scx.catalog.resolve_full_name(item.name()),
768                    ))?,
769                }
770            };
771
772            let extracted_options: KafkaSourceConfigOptionExtracted =
773                base_with_options.clone().try_into()?;
774
775            let topic = extracted_options
776                .topic
777                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
778
779            let consumer = connection
780                .create_with_context(
781                    storage_configuration,
782                    MzClientContext::default(),
783                    &BTreeMap::new(),
784                    InTask::No,
785                )
786                .await
787                .map_err(|e| {
788                    // anyhow doesn't support Clone, so not trivial to move into PlanError
789                    KafkaSourcePurificationError::KafkaConsumerError(
790                        e.display_with_causes().to_string(),
791                    )
792                })?;
793            let consumer = Arc::new(consumer);
794
795            match (
796                extracted_options.start_offset,
797                extracted_options.start_timestamp,
798            ) {
799                (None, None) => {
800                    // Validate that the topic at least exists.
801                    kafka_util::ensure_topic_exists(
802                        Arc::clone(&consumer),
803                        &topic,
804                        storage_configuration
805                            .parameters
806                            .kafka_timeout_config
807                            .fetch_metadata_timeout,
808                    )
809                    .await?;
810                }
811                (Some(_), Some(_)) => {
812                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
813                }
814                (Some(start_offsets), None) => {
815                    // Validate the start offsets.
816                    kafka_util::validate_start_offsets(
817                        Arc::clone(&consumer),
818                        &topic,
819                        start_offsets,
820                        storage_configuration
821                            .parameters
822                            .kafka_timeout_config
823                            .fetch_metadata_timeout,
824                    )
825                    .await?;
826                }
827                (None, Some(time_offset)) => {
828                    // Translate `START TIMESTAMP` to a start offset.
829                    let start_offsets = kafka_util::lookup_start_offsets(
830                        Arc::clone(&consumer),
831                        &topic,
832                        time_offset,
833                        now,
834                        storage_configuration
835                            .parameters
836                            .kafka_timeout_config
837                            .fetch_metadata_timeout,
838                    )
839                    .await?;
840
841                    base_with_options.retain(|val| {
842                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
843                    });
844                    base_with_options.push(KafkaSourceConfigOption {
845                        name: KafkaSourceConfigOptionName::StartOffset,
846                        value: Some(WithOptionValue::Sequence(
847                            start_offsets
848                                .iter()
849                                .map(|offset| {
850                                    WithOptionValue::Value(Value::Number(offset.to_string()))
851                                })
852                                .collect(),
853                        )),
854                    });
855                }
856            }
857
858            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
859            retrieved_source_references = reference_client.get_source_references().await?;
860
861            format_options = SourceFormatOptions::Kafka { topic };
862        }
863        CreateSourceConnection::Postgres {
864            connection,
865            options,
866        } => {
867            let connection_item = scx.get_item_by_resolved_name(connection)?;
868            let connection = match connection_item.connection().map_err(PlanError::from)? {
869                Connection::Postgres(connection) => {
870                    connection.clone().into_inline_connection(&catalog)
871                }
872                _ => Err(PgSourcePurificationError::NotPgConnection(
873                    scx.catalog.resolve_full_name(connection_item.name()),
874                ))?,
875            };
876            let crate::plan::statement::PgConfigOptionExtracted {
877                publication,
878                text_columns,
879                exclude_columns,
880                details,
881                ..
882            } = options.clone().try_into()?;
883            let publication =
884                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
885
886            if details.is_some() {
887                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
888            }
889
890            let client = connection
891                .validate(connection_item.id(), storage_configuration)
892                .await?;
893
894            let reference_client = SourceReferenceClient::Postgres {
895                client: &client,
896                publication: &publication,
897                database: &connection.database,
898            };
899            retrieved_source_references = reference_client.get_source_references().await?;
900
901            let postgres::PurifiedSourceExports {
902                source_exports: subsources,
903                normalized_text_columns,
904            } = postgres::purify_source_exports(
905                &client,
906                &retrieved_source_references,
907                external_references,
908                text_columns,
909                exclude_columns,
910                source_name,
911                &reference_policy,
912            )
913            .await?;
914
915            if let Some(text_cols_option) = options
916                .iter_mut()
917                .find(|option| option.name == PgConfigOptionName::TextColumns)
918            {
919                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
920            }
921
922            requested_subsource_map.extend(subsources);
923
924            // Record the active replication timeline_id to allow detection of a future upstream
925            // point-in-time-recovery that will put the source into an error state.
926            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
927
928            // Remove any old detail references
929            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
930            let details = PostgresSourcePublicationDetails {
931                slot: format!(
932                    "materialize_{}",
933                    Uuid::new_v4().to_string().replace('-', "")
934                ),
935                timeline_id: Some(timeline_id),
936                database: connection.database,
937            };
938            options.push(PgConfigOption {
939                name: PgConfigOptionName::Details,
940                value: Some(WithOptionValue::Value(Value::String(hex::encode(
941                    details.into_proto().encode_to_vec(),
942                )))),
943            })
944        }
945        CreateSourceConnection::SqlServer {
946            connection,
947            options,
948        } => {
949            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
950
951            // Connect to the upstream SQL Server instance so we can validate
952            // we're compatible with CDC.
953            let connection_item = scx.get_item_by_resolved_name(connection)?;
954            let connection = match connection_item.connection()? {
955                Connection::SqlServer(connection) => {
956                    connection.clone().into_inline_connection(&catalog)
957                }
958                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
959                    scx.catalog.resolve_full_name(connection_item.name()),
960                ))?,
961            };
962            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
963                details,
964                text_columns,
965                exclude_columns,
966                seen: _,
967            } = options.clone().try_into()?;
968
969            if details.is_some() {
970                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
971            }
972
973            let mut client = connection
974                .validate(connection_item.id(), storage_configuration)
975                .await?;
976
977            let database: Arc<str> = connection.database.into();
978            let reference_client = SourceReferenceClient::SqlServer {
979                client: &mut client,
980                database: Arc::clone(&database),
981            };
982            retrieved_source_references = reference_client.get_source_references().await?;
983            tracing::debug!(?retrieved_source_references, "got source references");
984
985            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
986                .get(storage_configuration.config_set());
987
988            let purified_source_exports = sql_server::purify_source_exports(
989                &*database,
990                &mut client,
991                &retrieved_source_references,
992                external_references,
993                &text_columns,
994                &exclude_columns,
995                source_name,
996                timeout,
997                &reference_policy,
998            )
999            .await?;
1000
1001            let sql_server::PurifiedSourceExports {
1002                source_exports,
1003                normalized_text_columns,
1004                normalized_excl_columns,
1005            } = purified_source_exports;
1006
1007            // Update our set of requested source exports.
1008            requested_subsource_map.extend(source_exports);
1009
1010            // Record the most recent restore_history_id, or none if the system has never been
1011            // restored.
1012
1013            let restore_history_id =
1014                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1015            let details = SqlServerSourceExtras { restore_history_id };
1016
1017            options.retain(|SqlServerConfigOption { name, .. }| {
1018                name != &SqlServerConfigOptionName::Details
1019            });
1020            options.push(SqlServerConfigOption {
1021                name: SqlServerConfigOptionName::Details,
1022                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1023                    details.into_proto().encode_to_vec(),
1024                )))),
1025            });
1026
1027            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1028            if let Some(text_cols_option) = options
1029                .iter_mut()
1030                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1031            {
1032                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1033            }
1034            if let Some(excl_cols_option) = options
1035                .iter_mut()
1036                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1037            {
1038                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1039            }
1040        }
1041        CreateSourceConnection::MySql {
1042            connection,
1043            options,
1044        } => {
1045            let connection_item = scx.get_item_by_resolved_name(connection)?;
1046            let connection = match connection_item.connection()? {
1047                Connection::MySql(connection) => {
1048                    connection.clone().into_inline_connection(&catalog)
1049                }
1050                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1051                    scx.catalog.resolve_full_name(connection_item.name()),
1052                ))?,
1053            };
1054            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1055                details,
1056                text_columns,
1057                exclude_columns,
1058                seen: _,
1059            } = options.clone().try_into()?;
1060
1061            if details.is_some() {
1062                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1063            }
1064
1065            let mut conn = connection
1066                .validate(connection_item.id(), storage_configuration)
1067                .await
1068                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1069
1070            // Retrieve the current @gtid_executed value of the server to mark as the effective
1071            // initial snapshot point such that we can ensure consistency if the initial source
1072            // snapshot is broken up over multiple points in time.
1073            let initial_gtid_set =
1074                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1075
1076            let reference_client = SourceReferenceClient::MySql {
1077                conn: &mut conn,
1078                include_system_schemas: mysql::references_system_schemas(external_references),
1079            };
1080            retrieved_source_references = reference_client.get_source_references().await?;
1081
1082            let mysql::PurifiedSourceExports {
1083                source_exports: subsources,
1084                normalized_text_columns,
1085                normalized_exclude_columns,
1086            } = mysql::purify_source_exports(
1087                &mut conn,
1088                &retrieved_source_references,
1089                external_references,
1090                text_columns,
1091                exclude_columns,
1092                source_name,
1093                initial_gtid_set.clone(),
1094                &reference_policy,
1095            )
1096            .await?;
1097            requested_subsource_map.extend(subsources);
1098
1099            // We don't have any fields in this details struct but keep this around for
1100            // conformity with postgres and in-case we end up needing it again in the future.
1101            let details = MySqlSourceDetails {};
1102            // Update options with the purified details
1103            options
1104                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1105            options.push(MySqlConfigOption {
1106                name: MySqlConfigOptionName::Details,
1107                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1108                    details.into_proto().encode_to_vec(),
1109                )))),
1110            });
1111
1112            if let Some(text_cols_option) = options
1113                .iter_mut()
1114                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1115            {
1116                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1117            }
1118            if let Some(exclude_cols_option) = options
1119                .iter_mut()
1120                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1121            {
1122                exclude_cols_option.value =
1123                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1124            }
1125        }
1126        CreateSourceConnection::LoadGenerator { generator, options } => {
1127            let load_generator =
1128                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1129
1130            let reference_client = SourceReferenceClient::LoadGenerator {
1131                generator: &load_generator,
1132            };
1133            retrieved_source_references = reference_client.get_source_references().await?;
1134            // Filter to the references that need to be created as 'subsources', which
1135            // doesn't include the default output for single-output sources.
1136            // TODO(database-issues#8620): Remove once subsources are removed
1137            let multi_output_sources =
1138                retrieved_source_references
1139                    .all_references()
1140                    .iter()
1141                    .any(|r| {
1142                        r.load_generator_output().expect("is loadgen")
1143                            != &LoadGeneratorOutput::Default
1144                    });
1145
1146            match external_references {
1147                Some(requested)
1148                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1149                {
1150                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1151                }
1152                Some(requested) if !multi_output_sources => match requested {
1153                    ExternalReferences::SubsetTables(_) => {
1154                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1155                    }
1156                    ExternalReferences::SubsetSchemas(_) => {
1157                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1158                    }
1159                    ExternalReferences::All => {
1160                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1161                    }
1162                },
1163                Some(requested) => {
1164                    let requested_exports = retrieved_source_references
1165                        .requested_source_exports(Some(requested), source_name)?;
1166                    for export in requested_exports {
1167                        requested_subsource_map.insert(
1168                            export.name,
1169                            PurifiedSourceExport {
1170                                external_reference: export.external_reference,
1171                                details: PurifiedExportDetails::LoadGenerator {
1172                                    table: export
1173                                        .meta
1174                                        .load_generator_desc()
1175                                        .expect("is loadgen")
1176                                        .clone(),
1177                                    output: export
1178                                        .meta
1179                                        .load_generator_output()
1180                                        .expect("is loadgen")
1181                                        .clone(),
1182                                },
1183                            },
1184                        );
1185                    }
1186                }
1187                None => {
1188                    if multi_output_sources
1189                        && matches!(reference_policy, SourceReferencePolicy::Required)
1190                    {
1191                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1192                    }
1193                }
1194            }
1195
1196            if let LoadGenerator::Clock = generator {
1197                if !options
1198                    .iter()
1199                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1200                {
1201                    let now = catalog.now();
1202                    options.push(LoadGeneratorOption {
1203                        name: LoadGeneratorOptionName::AsOf,
1204                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1205                    });
1206                }
1207            }
1208        }
1209    }
1210
1211    // Now that we know which subsources to create alongside this
1212    // statement, remove the references so it is not canonicalized as
1213    // part of the `CREATE SOURCE` statement in the catalog.
1214    *external_references = None;
1215
1216    // Generate progress subsource for old syntax
1217    let create_progress_subsource_stmt = if uses_old_syntax {
1218        // Take name from input or generate name
1219        let name = match progress_subsource {
1220            Some(name) => match name {
1221                DeferredItemName::Deferred(name) => name.clone(),
1222                DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1223            },
1224            None => {
1225                let (item, prefix) = source_name.0.split_last().unwrap();
1226                let item_name =
1227                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1228                        let mut suggested_name = prefix.to_vec();
1229                        suggested_name.push(candidate.clone());
1230
1231                        let partial =
1232                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1233                        let qualified = scx.allocate_qualified_name(partial)?;
1234                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1235                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1236                        Ok::<_, PlanError>(!item_exists && !type_exists)
1237                    })?;
1238
1239                let mut full_name = prefix.to_vec();
1240                full_name.push(item_name);
1241                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1242                let qualified_name = scx.allocate_qualified_name(full_name)?;
1243                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1244
1245                UnresolvedItemName::from(full_name.clone())
1246            }
1247        };
1248
1249        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1250
1251        // Create the subsource statement
1252        let mut progress_with_options: Vec<_> = with_options
1253            .iter()
1254            .filter_map(|opt| match opt.name {
1255                CreateSourceOptionName::TimestampInterval => None,
1256                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1257                    name: CreateSubsourceOptionName::RetainHistory,
1258                    value: opt.value.clone(),
1259                }),
1260            })
1261            .collect();
1262        progress_with_options.push(CreateSubsourceOption {
1263            name: CreateSubsourceOptionName::Progress,
1264            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1265        });
1266
1267        Some(CreateSubsourceStatement {
1268            name,
1269            columns,
1270            // Progress subsources do not refer to the source to which they belong.
1271            // Instead the primary source depends on it (the opposite is true of
1272            // ingestion exports, which depend on the primary source).
1273            of_source: None,
1274            constraints,
1275            if_not_exists: false,
1276            with_options: progress_with_options,
1277        })
1278    } else {
1279        None
1280    };
1281
1282    purify_source_format(
1283        &catalog,
1284        format,
1285        &format_options,
1286        envelope,
1287        storage_configuration,
1288    )
1289    .await?;
1290
1291    Ok(PurifiedStatement::PurifiedCreateSource {
1292        create_progress_subsource_stmt,
1293        create_source_stmt,
1294        subsources: requested_subsource_map,
1295        available_source_references: retrieved_source_references.available_source_references(),
1296    })
1297}
1298
1299/// On success, returns the details on new subsources and updated
1300/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1301async fn purify_alter_source(
1302    catalog: impl SessionCatalog,
1303    stmt: AlterSourceStatement<Aug>,
1304    storage_configuration: &StorageConfiguration,
1305) -> Result<PurifiedStatement, PlanError> {
1306    let scx = StatementContext::new(None, &catalog);
1307    let AlterSourceStatement {
1308        source_name: unresolved_source_name,
1309        action,
1310        if_exists,
1311    } = stmt;
1312
1313    // Get name.
1314    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1315        Ok(item) => item,
1316        Err(_) if if_exists => {
1317            return Ok(PurifiedStatement::PurifiedAlterSource {
1318                alter_source_stmt: AlterSourceStatement {
1319                    source_name: unresolved_source_name,
1320                    action,
1321                    if_exists,
1322                },
1323            });
1324        }
1325        Err(e) => return Err(e),
1326    };
1327
1328    // Ensure it's an ingestion-based and alterable source.
1329    let desc = match item.source_desc()? {
1330        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1331        None => {
1332            sql_bail!("cannot ALTER this type of source")
1333        }
1334    };
1335
1336    let source_name = item.name();
1337
1338    let resolved_source_name = ResolvedItemName::Item {
1339        id: item.id(),
1340        qualifiers: item.name().qualifiers.clone(),
1341        full_name: scx.catalog.resolve_full_name(source_name),
1342        print_id: true,
1343        version: RelationVersionSelector::Latest,
1344    };
1345
1346    let partial_name = scx.catalog.minimal_qualification(source_name);
1347
1348    match action {
1349        AlterSourceAction::AddSubsources {
1350            external_references,
1351            options,
1352        } => {
1353            if scx.catalog.system_vars().enable_create_table_from_source()
1354                && scx.catalog.system_vars().force_source_table_syntax()
1355            {
1356                Err(PlanError::UseTablesForSources(
1357                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1358                ))?;
1359            }
1360
1361            purify_alter_source_add_subsources(
1362                external_references,
1363                options,
1364                desc,
1365                partial_name,
1366                unresolved_source_name,
1367                resolved_source_name,
1368                storage_configuration,
1369            )
1370            .await
1371        }
1372        AlterSourceAction::RefreshReferences => {
1373            purify_alter_source_refresh_references(
1374                desc,
1375                resolved_source_name,
1376                storage_configuration,
1377            )
1378            .await
1379        }
1380        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1381            alter_source_stmt: AlterSourceStatement {
1382                source_name: unresolved_source_name,
1383                action,
1384                if_exists,
1385            },
1386        }),
1387    }
1388}
1389
1390// TODO(database-issues#8620): Remove once subsources are removed
1391/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1392async fn purify_alter_source_add_subsources(
1393    external_references: Vec<ExternalReferenceExport>,
1394    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1395    desc: SourceDesc,
1396    partial_source_name: PartialItemName,
1397    unresolved_source_name: UnresolvedItemName,
1398    resolved_source_name: ResolvedItemName,
1399    storage_configuration: &StorageConfiguration,
1400) -> Result<PurifiedStatement, PlanError> {
1401    // Validate this is a source that can have subsources added.
1402    let connection_id = match &desc.connection {
1403        GenericSourceConnection::Postgres(c) => c.connection_id,
1404        GenericSourceConnection::MySql(c) => c.connection_id,
1405        GenericSourceConnection::SqlServer(c) => c.connection_id,
1406        _ => sql_bail!(
1407            "source {} does not support ALTER SOURCE.",
1408            partial_source_name
1409        ),
1410    };
1411
1412    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1413        text_columns,
1414        exclude_columns,
1415        details,
1416        seen: _,
1417    } = options.clone().try_into()?;
1418    assert_none!(details, "details cannot be explicitly set");
1419
1420    let mut requested_subsource_map = BTreeMap::new();
1421
1422    match desc.connection {
1423        GenericSourceConnection::Postgres(pg_source_connection) => {
1424            // Get PostgresConnection for generating subsources.
1425            let pg_connection = &pg_source_connection.connection;
1426
1427            let client = pg_connection
1428                .validate(connection_id, storage_configuration)
1429                .await?;
1430
1431            let reference_client = SourceReferenceClient::Postgres {
1432                client: &client,
1433                publication: &pg_source_connection.publication,
1434                database: &pg_connection.database,
1435            };
1436            let retrieved_source_references = reference_client.get_source_references().await?;
1437
1438            let postgres::PurifiedSourceExports {
1439                source_exports: subsources,
1440                normalized_text_columns,
1441            } = postgres::purify_source_exports(
1442                &client,
1443                &retrieved_source_references,
1444                &Some(ExternalReferences::SubsetTables(external_references)),
1445                text_columns,
1446                exclude_columns,
1447                &unresolved_source_name,
1448                &SourceReferencePolicy::Required,
1449            )
1450            .await?;
1451
1452            if let Some(text_cols_option) = options
1453                .iter_mut()
1454                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1455            {
1456                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1457            }
1458
1459            requested_subsource_map.extend(subsources);
1460        }
1461        GenericSourceConnection::MySql(mysql_source_connection) => {
1462            let mysql_connection = &mysql_source_connection.connection;
1463            let config = mysql_connection
1464                .config(
1465                    &storage_configuration.connection_context.secrets_reader,
1466                    storage_configuration,
1467                    InTask::No,
1468                )
1469                .await?;
1470
1471            let mut conn = config
1472                .connect(
1473                    "mysql purification",
1474                    &storage_configuration.connection_context.ssh_tunnel_manager,
1475                )
1476                .await?;
1477
1478            // Retrieve the current @gtid_executed value of the server to mark as the effective
1479            // initial snapshot point for these subsources.
1480            let initial_gtid_set =
1481                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1482
1483            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1484
1485            let reference_client = SourceReferenceClient::MySql {
1486                conn: &mut conn,
1487                include_system_schemas: mysql::references_system_schemas(&requested_references),
1488            };
1489            let retrieved_source_references = reference_client.get_source_references().await?;
1490
1491            let mysql::PurifiedSourceExports {
1492                source_exports: subsources,
1493                normalized_text_columns,
1494                normalized_exclude_columns,
1495            } = mysql::purify_source_exports(
1496                &mut conn,
1497                &retrieved_source_references,
1498                &requested_references,
1499                text_columns,
1500                exclude_columns,
1501                &unresolved_source_name,
1502                initial_gtid_set,
1503                &SourceReferencePolicy::Required,
1504            )
1505            .await?;
1506            requested_subsource_map.extend(subsources);
1507
1508            // Update options with the purified details
1509            if let Some(text_cols_option) = options
1510                .iter_mut()
1511                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1512            {
1513                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1514            }
1515            if let Some(exclude_cols_option) = options
1516                .iter_mut()
1517                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1518            {
1519                exclude_cols_option.value =
1520                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1521            }
1522        }
1523        GenericSourceConnection::SqlServer(sql_server_source) => {
1524            // Open a connection to the upstream SQL Server instance.
1525            let sql_server_connection = &sql_server_source.connection;
1526            let config = sql_server_connection
1527                .resolve_config(
1528                    &storage_configuration.connection_context.secrets_reader,
1529                    storage_configuration,
1530                    InTask::No,
1531                )
1532                .await?;
1533            let mut client = mz_sql_server_util::Client::connect(config).await?;
1534
1535            // Query the upstream SQL Server instance for available tables to replicate.
1536            let database = sql_server_connection.database.clone().into();
1537            let source_references = SourceReferenceClient::SqlServer {
1538                client: &mut client,
1539                database: Arc::clone(&database),
1540            }
1541            .get_source_references()
1542            .await?;
1543            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1544
1545            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1546                .get(storage_configuration.config_set());
1547
1548            let result = sql_server::purify_source_exports(
1549                &*database,
1550                &mut client,
1551                &source_references,
1552                &requested_references,
1553                &text_columns,
1554                &exclude_columns,
1555                &unresolved_source_name,
1556                timeout,
1557                &SourceReferencePolicy::Required,
1558            )
1559            .await;
1560            let sql_server::PurifiedSourceExports {
1561                source_exports,
1562                normalized_text_columns,
1563                normalized_excl_columns,
1564            } = result?;
1565
1566            // Add the new exports to our subsource map.
1567            requested_subsource_map.extend(source_exports);
1568
1569            // Update options on the CREATE SOURCE statement with the purified details.
1570            if let Some(text_cols_option) = options
1571                .iter_mut()
1572                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1573            {
1574                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1575            }
1576            if let Some(exclude_cols_option) = options
1577                .iter_mut()
1578                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1579            {
1580                exclude_cols_option.value =
1581                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1582            }
1583        }
1584        _ => unreachable!(),
1585    };
1586
1587    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1588        source_name: resolved_source_name,
1589        options,
1590        subsources: requested_subsource_map,
1591    })
1592}
1593
1594async fn purify_alter_source_refresh_references(
1595    desc: SourceDesc,
1596    resolved_source_name: ResolvedItemName,
1597    storage_configuration: &StorageConfiguration,
1598) -> Result<PurifiedStatement, PlanError> {
1599    let retrieved_source_references = match desc.connection {
1600        GenericSourceConnection::Postgres(pg_source_connection) => {
1601            // Get PostgresConnection for generating subsources.
1602            let pg_connection = &pg_source_connection.connection;
1603
1604            let config = pg_connection
1605                .config(
1606                    &storage_configuration.connection_context.secrets_reader,
1607                    storage_configuration,
1608                    InTask::No,
1609                )
1610                .await?;
1611
1612            let client = config
1613                .connect(
1614                    "postgres_purification",
1615                    &storage_configuration.connection_context.ssh_tunnel_manager,
1616                )
1617                .await?;
1618            let reference_client = SourceReferenceClient::Postgres {
1619                client: &client,
1620                publication: &pg_source_connection.publication,
1621                database: &pg_connection.database,
1622            };
1623            reference_client.get_source_references().await?
1624        }
1625        GenericSourceConnection::MySql(mysql_source_connection) => {
1626            let mysql_connection = &mysql_source_connection.connection;
1627            let config = mysql_connection
1628                .config(
1629                    &storage_configuration.connection_context.secrets_reader,
1630                    storage_configuration,
1631                    InTask::No,
1632                )
1633                .await?;
1634
1635            let mut conn = config
1636                .connect(
1637                    "mysql purification",
1638                    &storage_configuration.connection_context.ssh_tunnel_manager,
1639                )
1640                .await?;
1641
1642            let reference_client = SourceReferenceClient::MySql {
1643                conn: &mut conn,
1644                include_system_schemas: false,
1645            };
1646            reference_client.get_source_references().await?
1647        }
1648        GenericSourceConnection::SqlServer(sql_server_source) => {
1649            // Open a connection to the upstream SQL Server instance.
1650            let sql_server_connection = &sql_server_source.connection;
1651            let config = sql_server_connection
1652                .resolve_config(
1653                    &storage_configuration.connection_context.secrets_reader,
1654                    storage_configuration,
1655                    InTask::No,
1656                )
1657                .await?;
1658            let mut client = mz_sql_server_util::Client::connect(config).await?;
1659
1660            // Query the upstream SQL Server instance for available tables to replicate.
1661            let source_references = SourceReferenceClient::SqlServer {
1662                client: &mut client,
1663                database: sql_server_connection.database.clone().into(),
1664            }
1665            .get_source_references()
1666            .await?;
1667            source_references
1668        }
1669        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1670            let reference_client = SourceReferenceClient::LoadGenerator {
1671                generator: &load_gen_connection.load_generator,
1672            };
1673            reference_client.get_source_references().await?
1674        }
1675        GenericSourceConnection::Kafka(kafka_conn) => {
1676            let reference_client = SourceReferenceClient::Kafka {
1677                topic: &kafka_conn.topic,
1678            };
1679            reference_client.get_source_references().await?
1680        }
1681    };
1682    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1683        source_name: resolved_source_name,
1684        available_source_references: retrieved_source_references.available_source_references(),
1685    })
1686}
1687
1688async fn purify_create_table_from_source(
1689    catalog: impl SessionCatalog,
1690    mut stmt: CreateTableFromSourceStatement<Aug>,
1691    storage_configuration: &StorageConfiguration,
1692) -> Result<PurifiedStatement, PlanError> {
1693    let scx = StatementContext::new(None, &catalog);
1694    let CreateTableFromSourceStatement {
1695        name: _,
1696        columns,
1697        constraints,
1698        source: source_name,
1699        if_not_exists: _,
1700        external_reference,
1701        format,
1702        envelope,
1703        include_metadata: _,
1704        with_options,
1705    } = &mut stmt;
1706
1707    // Columns and constraints cannot be specified by the user but will be populated below.
1708    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1709        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1710    }
1711    if !constraints.is_empty() {
1712        sql_bail!(
1713            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1714        );
1715    }
1716
1717    // Get the source item
1718    let item = match scx.get_item_by_resolved_name(source_name) {
1719        Ok(item) => item,
1720        Err(e) => return Err(e),
1721    };
1722
1723    // Ensure it's an ingestion-based and alterable source.
1724    let desc = match item.source_desc()? {
1725        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1726        None => {
1727            sql_bail!("cannot ALTER this type of source")
1728        }
1729    };
1730    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1731
1732    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1733        text_columns,
1734        exclude_columns,
1735        retain_history: _,
1736        details,
1737        partition_by: _,
1738        seen: _,
1739    } = with_options.clone().try_into()?;
1740    assert_none!(details, "details cannot be explicitly set");
1741
1742    // Our text column values are unqualified (just column names), but the purification methods below
1743    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1744    // need to qualify them using the external reference first.
1745    let qualified_text_columns = text_columns
1746        .iter()
1747        .map(|col| {
1748            UnresolvedItemName(
1749                external_reference
1750                    .as_ref()
1751                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1752                    .unwrap_or_else(|| vec![col.clone()]),
1753            )
1754        })
1755        .collect_vec();
1756    let qualified_exclude_columns = exclude_columns
1757        .iter()
1758        .map(|col| {
1759            UnresolvedItemName(
1760                external_reference
1761                    .as_ref()
1762                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1763                    .unwrap_or_else(|| vec![col.clone()]),
1764            )
1765        })
1766        .collect_vec();
1767
1768    // Should be overriden below if a source-specific format is required.
1769    let mut format_options = SourceFormatOptions::Default;
1770
1771    let retrieved_source_references: RetrievedSourceReferences;
1772
1773    let requested_references = external_reference.as_ref().map(|ref_name| {
1774        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1775            reference: ref_name.clone(),
1776            alias: None,
1777        }])
1778    });
1779
1780    // Run purification work specific to each source type: resolve the external reference to
1781    // a fully qualified name and obtain the appropriate details for the source-export statement
1782    let purified_export = match desc.connection {
1783        GenericSourceConnection::Postgres(pg_source_connection) => {
1784            // Get PostgresConnection for generating subsources.
1785            let pg_connection = &pg_source_connection.connection;
1786
1787            let client = pg_connection
1788                .validate(pg_source_connection.connection_id, storage_configuration)
1789                .await?;
1790
1791            let reference_client = SourceReferenceClient::Postgres {
1792                client: &client,
1793                publication: &pg_source_connection.publication,
1794                database: &pg_connection.database,
1795            };
1796            retrieved_source_references = reference_client.get_source_references().await?;
1797
1798            let postgres::PurifiedSourceExports {
1799                source_exports,
1800                // TODO(database-issues#8620): Remove once subsources are removed
1801                // This `normalized_text_columns` is not relevant for us and is only returned for
1802                // `CREATE SOURCE` statements that automatically generate subsources
1803                normalized_text_columns: _,
1804            } = postgres::purify_source_exports(
1805                &client,
1806                &retrieved_source_references,
1807                &requested_references,
1808                qualified_text_columns,
1809                qualified_exclude_columns,
1810                &unresolved_source_name,
1811                &SourceReferencePolicy::Required,
1812            )
1813            .await?;
1814            // There should be exactly one source_export returned for this statement
1815            let (_, purified_export) = source_exports.into_element();
1816            purified_export
1817        }
1818        GenericSourceConnection::MySql(mysql_source_connection) => {
1819            let mysql_connection = &mysql_source_connection.connection;
1820            let config = mysql_connection
1821                .config(
1822                    &storage_configuration.connection_context.secrets_reader,
1823                    storage_configuration,
1824                    InTask::No,
1825                )
1826                .await?;
1827
1828            let mut conn = config
1829                .connect(
1830                    "mysql purification",
1831                    &storage_configuration.connection_context.ssh_tunnel_manager,
1832                )
1833                .await?;
1834
1835            // Retrieve the current @gtid_executed value of the server to mark as the effective
1836            // initial snapshot point for this table.
1837            let initial_gtid_set =
1838                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1839
1840            let reference_client = SourceReferenceClient::MySql {
1841                conn: &mut conn,
1842                include_system_schemas: mysql::references_system_schemas(&requested_references),
1843            };
1844            retrieved_source_references = reference_client.get_source_references().await?;
1845
1846            let mysql::PurifiedSourceExports {
1847                source_exports,
1848                // TODO(database-issues#8620): Remove once subsources are removed
1849                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1850                // `CREATE SOURCE` statements that automatically generate subsources
1851                normalized_text_columns: _,
1852                normalized_exclude_columns: _,
1853            } = mysql::purify_source_exports(
1854                &mut conn,
1855                &retrieved_source_references,
1856                &requested_references,
1857                qualified_text_columns,
1858                qualified_exclude_columns,
1859                &unresolved_source_name,
1860                initial_gtid_set,
1861                &SourceReferencePolicy::Required,
1862            )
1863            .await?;
1864            // There should be exactly one source_export returned for this statement
1865            let (_, purified_export) = source_exports.into_element();
1866            purified_export
1867        }
1868        GenericSourceConnection::SqlServer(sql_server_source) => {
1869            let connection = sql_server_source.connection;
1870            let config = connection
1871                .resolve_config(
1872                    &storage_configuration.connection_context.secrets_reader,
1873                    storage_configuration,
1874                    InTask::No,
1875                )
1876                .await?;
1877            let mut client = mz_sql_server_util::Client::connect(config).await?;
1878
1879            let database: Arc<str> = connection.database.into();
1880            let reference_client = SourceReferenceClient::SqlServer {
1881                client: &mut client,
1882                database: Arc::clone(&database),
1883            };
1884            retrieved_source_references = reference_client.get_source_references().await?;
1885            tracing::debug!(?retrieved_source_references, "got source references");
1886
1887            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1888                .get(storage_configuration.config_set());
1889
1890            let purified_source_exports = sql_server::purify_source_exports(
1891                &*database,
1892                &mut client,
1893                &retrieved_source_references,
1894                &requested_references,
1895                &qualified_text_columns,
1896                &qualified_exclude_columns,
1897                &unresolved_source_name,
1898                timeout,
1899                &SourceReferencePolicy::Required,
1900            )
1901            .await?;
1902
1903            // There should be exactly one source_export returned for this statement
1904            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1905            purified_export
1906        }
1907        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1908            let reference_client = SourceReferenceClient::LoadGenerator {
1909                generator: &load_gen_connection.load_generator,
1910            };
1911            retrieved_source_references = reference_client.get_source_references().await?;
1912
1913            let requested_exports = retrieved_source_references
1914                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1915            // There should be exactly one source_export returned
1916            let export = requested_exports.into_element();
1917            PurifiedSourceExport {
1918                external_reference: export.external_reference,
1919                details: PurifiedExportDetails::LoadGenerator {
1920                    table: export
1921                        .meta
1922                        .load_generator_desc()
1923                        .expect("is loadgen")
1924                        .clone(),
1925                    output: export
1926                        .meta
1927                        .load_generator_output()
1928                        .expect("is loadgen")
1929                        .clone(),
1930                },
1931            }
1932        }
1933        GenericSourceConnection::Kafka(kafka_conn) => {
1934            let reference_client = SourceReferenceClient::Kafka {
1935                topic: &kafka_conn.topic,
1936            };
1937            retrieved_source_references = reference_client.get_source_references().await?;
1938            let requested_exports = retrieved_source_references
1939                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1940            // There should be exactly one source_export returned
1941            let export = requested_exports.into_element();
1942
1943            format_options = SourceFormatOptions::Kafka {
1944                topic: kafka_conn.topic.clone(),
1945            };
1946            PurifiedSourceExport {
1947                external_reference: export.external_reference,
1948                details: PurifiedExportDetails::Kafka {},
1949            }
1950        }
1951    };
1952
1953    purify_source_format(
1954        &catalog,
1955        format,
1956        &format_options,
1957        envelope,
1958        storage_configuration,
1959    )
1960    .await?;
1961
1962    // Update the external reference in the statement to the resolved fully-qualified
1963    // external reference
1964    *external_reference = Some(purified_export.external_reference.clone());
1965
1966    // Update options in the statement using the purified export details
1967    match &purified_export.details {
1968        PurifiedExportDetails::Postgres { .. } => {
1969            let mut unsupported_cols = vec![];
1970            let postgres::PostgresExportStatementValues {
1971                columns: gen_columns,
1972                constraints: gen_constraints,
1973                text_columns: gen_text_columns,
1974                exclude_columns: gen_exclude_columns,
1975                details: gen_details,
1976                external_reference: _,
1977            } = postgres::generate_source_export_statement_values(
1978                &scx,
1979                purified_export,
1980                &mut unsupported_cols,
1981            )?;
1982            if !unsupported_cols.is_empty() {
1983                unsupported_cols.sort();
1984                Err(PgSourcePurificationError::UnrecognizedTypes {
1985                    cols: unsupported_cols,
1986                })?;
1987            }
1988
1989            if let Some(text_cols_option) = with_options
1990                .iter_mut()
1991                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1992            {
1993                match gen_text_columns {
1994                    Some(gen_text_columns) => {
1995                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1996                    }
1997                    None => soft_panic_or_log!(
1998                        "text_columns should be Some if text_cols_option is present"
1999                    ),
2000                }
2001            }
2002            if let Some(exclude_cols_option) = with_options
2003                .iter_mut()
2004                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2005            {
2006                match gen_exclude_columns {
2007                    Some(gen_exclude_columns) => {
2008                        exclude_cols_option.value =
2009                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2010                    }
2011                    None => soft_panic_or_log!(
2012                        "exclude_columns should be Some if exclude_cols_option is present"
2013                    ),
2014                }
2015            }
2016            match columns {
2017                TableFromSourceColumns::Defined(_) => unreachable!(),
2018                TableFromSourceColumns::NotSpecified => {
2019                    *columns = TableFromSourceColumns::Defined(gen_columns);
2020                    *constraints = gen_constraints;
2021                }
2022                TableFromSourceColumns::Named(_) => {
2023                    sql_bail!("columns cannot be named for Postgres sources")
2024                }
2025            }
2026            with_options.push(TableFromSourceOption {
2027                name: TableFromSourceOptionName::Details,
2028                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2029                    gen_details.into_proto().encode_to_vec(),
2030                )))),
2031            })
2032        }
2033        PurifiedExportDetails::MySql { .. } => {
2034            let mysql::MySqlExportStatementValues {
2035                columns: gen_columns,
2036                constraints: gen_constraints,
2037                text_columns: gen_text_columns,
2038                exclude_columns: gen_exclude_columns,
2039                details: gen_details,
2040                external_reference: _,
2041            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2042
2043            if let Some(text_cols_option) = with_options
2044                .iter_mut()
2045                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2046            {
2047                match gen_text_columns {
2048                    Some(gen_text_columns) => {
2049                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2050                    }
2051                    None => soft_panic_or_log!(
2052                        "text_columns should be Some if text_cols_option is present"
2053                    ),
2054                }
2055            }
2056            if let Some(exclude_cols_option) = with_options
2057                .iter_mut()
2058                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2059            {
2060                match gen_exclude_columns {
2061                    Some(gen_exclude_columns) => {
2062                        exclude_cols_option.value =
2063                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2064                    }
2065                    None => soft_panic_or_log!(
2066                        "exclude_columns should be Some if exclude_cols_option is present"
2067                    ),
2068                }
2069            }
2070            match columns {
2071                TableFromSourceColumns::Defined(_) => unreachable!(),
2072                TableFromSourceColumns::NotSpecified => {
2073                    *columns = TableFromSourceColumns::Defined(gen_columns);
2074                    *constraints = gen_constraints;
2075                }
2076                TableFromSourceColumns::Named(_) => {
2077                    sql_bail!("columns cannot be named for MySQL sources")
2078                }
2079            }
2080            with_options.push(TableFromSourceOption {
2081                name: TableFromSourceOptionName::Details,
2082                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2083                    gen_details.into_proto().encode_to_vec(),
2084                )))),
2085            })
2086        }
2087        PurifiedExportDetails::SqlServer { .. } => {
2088            let sql_server::SqlServerExportStatementValues {
2089                columns: gen_columns,
2090                constraints: gen_constraints,
2091                text_columns: gen_text_columns,
2092                excl_columns: gen_excl_columns,
2093                details: gen_details,
2094                external_reference: _,
2095            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2096
2097            if let Some(text_cols_option) = with_options
2098                .iter_mut()
2099                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2100            {
2101                match gen_text_columns {
2102                    Some(gen_text_columns) => {
2103                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2104                    }
2105                    None => soft_panic_or_log!(
2106                        "text_columns should be Some if text_cols_option is present"
2107                    ),
2108                }
2109            }
2110            if let Some(exclude_cols_option) = with_options
2111                .iter_mut()
2112                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2113            {
2114                match gen_excl_columns {
2115                    Some(gen_excl_columns) => {
2116                        exclude_cols_option.value =
2117                            Some(WithOptionValue::Sequence(gen_excl_columns))
2118                    }
2119                    None => soft_panic_or_log!(
2120                        "excl_columns should be Some if excl_cols_option is present"
2121                    ),
2122                }
2123            }
2124
2125            match columns {
2126                TableFromSourceColumns::NotSpecified => {
2127                    *columns = TableFromSourceColumns::Defined(gen_columns);
2128                    *constraints = gen_constraints;
2129                }
2130                TableFromSourceColumns::Named(_) => {
2131                    sql_bail!("columns cannot be named for SQL Server sources")
2132                }
2133                TableFromSourceColumns::Defined(_) => unreachable!(),
2134            }
2135
2136            with_options.push(TableFromSourceOption {
2137                name: TableFromSourceOptionName::Details,
2138                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2139                    gen_details.into_proto().encode_to_vec(),
2140                )))),
2141            })
2142        }
2143        PurifiedExportDetails::LoadGenerator { .. } => {
2144            let (desc, output) = match purified_export.details {
2145                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2146                _ => unreachable!("purified export details must be load generator"),
2147            };
2148            // We only determine the table description for multi-output load generator sources here,
2149            // whereas single-output load generators will have their relation description
2150            // determined during statment planning as envelope and format options may affect their
2151            // schema.
2152            if let Some(desc) = desc {
2153                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2154                match columns {
2155                    TableFromSourceColumns::Defined(_) => unreachable!(),
2156                    TableFromSourceColumns::NotSpecified => {
2157                        *columns = TableFromSourceColumns::Defined(gen_columns);
2158                        *constraints = gen_constraints;
2159                    }
2160                    TableFromSourceColumns::Named(_) => {
2161                        sql_bail!("columns cannot be named for multi-output load generator sources")
2162                    }
2163                }
2164            }
2165            let details = SourceExportStatementDetails::LoadGenerator { output };
2166            with_options.push(TableFromSourceOption {
2167                name: TableFromSourceOptionName::Details,
2168                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2169                    details.into_proto().encode_to_vec(),
2170                )))),
2171            })
2172        }
2173        PurifiedExportDetails::Kafka {} => {
2174            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2175            // format field, so we don't specify any columns or constraints to be stored
2176            // on the statement here. The RelationDesc will be determined during planning.
2177            let details = SourceExportStatementDetails::Kafka {};
2178            with_options.push(TableFromSourceOption {
2179                name: TableFromSourceOptionName::Details,
2180                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2181                    details.into_proto().encode_to_vec(),
2182                )))),
2183            })
2184        }
2185    };
2186
2187    // TODO: We might as well use the retrieved available references to update the source
2188    // available references table in the catalog, so plumb this through.
2189    // available_source_references: retrieved_source_references.available_source_references(),
2190    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2191}
2192
2193enum SourceFormatOptions {
2194    Default,
2195    Kafka { topic: String },
2196}
2197
2198async fn purify_source_format(
2199    catalog: &dyn SessionCatalog,
2200    format: &mut Option<FormatSpecifier<Aug>>,
2201    options: &SourceFormatOptions,
2202    envelope: &Option<SourceEnvelope>,
2203    storage_configuration: &StorageConfiguration,
2204) -> Result<(), PlanError> {
2205    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2206        && !matches!(options, SourceFormatOptions::Kafka { .. })
2207    {
2208        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2209    }
2210
2211    match format.as_mut() {
2212        None => {}
2213        Some(FormatSpecifier::Bare(format)) => {
2214            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2215                .await?;
2216        }
2217
2218        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2219            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2220                .await?;
2221            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2222                .await?;
2223        }
2224    }
2225    Ok(())
2226}
2227
2228async fn purify_source_format_single(
2229    catalog: &dyn SessionCatalog,
2230    format: &mut Format<Aug>,
2231    options: &SourceFormatOptions,
2232    envelope: &Option<SourceEnvelope>,
2233    storage_configuration: &StorageConfiguration,
2234) -> Result<(), PlanError> {
2235    match format {
2236        Format::Avro(schema) => match schema {
2237            AvroSchema::Csr { csr_connection } => {
2238                purify_csr_connection_avro(
2239                    catalog,
2240                    options,
2241                    csr_connection,
2242                    envelope,
2243                    storage_configuration,
2244                )
2245                .await?
2246            }
2247            AvroSchema::InlineSchema { .. } => {}
2248        },
2249        Format::Protobuf(schema) => match schema {
2250            ProtobufSchema::Csr { csr_connection } => {
2251                purify_csr_connection_proto(
2252                    catalog,
2253                    options,
2254                    csr_connection,
2255                    envelope,
2256                    storage_configuration,
2257                )
2258                .await?;
2259            }
2260            ProtobufSchema::InlineSchema { .. } => {}
2261        },
2262        Format::Bytes
2263        | Format::Regex(_)
2264        | Format::Json { .. }
2265        | Format::Text
2266        | Format::Csv { .. } => (),
2267    }
2268    Ok(())
2269}
2270
2271pub fn generate_subsource_statements(
2272    scx: &StatementContext,
2273    source_name: ResolvedItemName,
2274    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2275) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2276    // get the first subsource to determine the connection type
2277    if subsources.is_empty() {
2278        return Ok(vec![]);
2279    }
2280    let (_, purified_export) = subsources.iter().next().unwrap();
2281
2282    let statements = match &purified_export.details {
2283        PurifiedExportDetails::Postgres { .. } => {
2284            crate::pure::postgres::generate_create_subsource_statements(
2285                scx,
2286                source_name,
2287                subsources,
2288            )?
2289        }
2290        PurifiedExportDetails::MySql { .. } => {
2291            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2292        }
2293        PurifiedExportDetails::SqlServer { .. } => {
2294            crate::pure::sql_server::generate_create_subsource_statements(
2295                scx,
2296                source_name,
2297                subsources,
2298            )?
2299        }
2300        PurifiedExportDetails::LoadGenerator { .. } => {
2301            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2302            for (subsource_name, purified_export) in subsources {
2303                let (desc, output) = match purified_export.details {
2304                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2305                    _ => unreachable!("purified export details must be load generator"),
2306                };
2307                let desc =
2308                    desc.expect("subsources cannot be generated for single-output load generators");
2309
2310                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2311                let details = SourceExportStatementDetails::LoadGenerator { output };
2312                // Create the subsource statement
2313                let subsource = CreateSubsourceStatement {
2314                    name: subsource_name,
2315                    columns,
2316                    of_source: Some(source_name.clone()),
2317                    // unlike sources that come from an external upstream, we
2318                    // have more leniency to introduce different constraints
2319                    // every time the load generator is run; i.e. we are not as
2320                    // worried about introducing junk data.
2321                    constraints: table_constraints,
2322                    if_not_exists: false,
2323                    with_options: vec![
2324                        CreateSubsourceOption {
2325                            name: CreateSubsourceOptionName::ExternalReference,
2326                            value: Some(WithOptionValue::UnresolvedItemName(
2327                                purified_export.external_reference,
2328                            )),
2329                        },
2330                        CreateSubsourceOption {
2331                            name: CreateSubsourceOptionName::Details,
2332                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2333                                details.into_proto().encode_to_vec(),
2334                            )))),
2335                        },
2336                    ],
2337                };
2338                subsource_stmts.push(subsource);
2339            }
2340
2341            subsource_stmts
2342        }
2343        PurifiedExportDetails::Kafka { .. } => {
2344            // TODO: as part of database-issues#8322, Kafka sources will begin
2345            // producing data––we'll need to understand the schema
2346            // of the output here.
2347            assert!(
2348                subsources.is_empty(),
2349                "Kafka sources do not produce data-bearing subsources"
2350            );
2351            vec![]
2352        }
2353    };
2354    Ok(statements)
2355}
2356
2357async fn purify_csr_connection_proto(
2358    catalog: &dyn SessionCatalog,
2359    options: &SourceFormatOptions,
2360    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2361    envelope: &Option<SourceEnvelope>,
2362    storage_configuration: &StorageConfiguration,
2363) -> Result<(), PlanError> {
2364    let SourceFormatOptions::Kafka { topic } = options else {
2365        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2366    };
2367
2368    let CsrConnectionProtobuf {
2369        seed,
2370        connection: CsrConnection {
2371            connection,
2372            options: _,
2373        },
2374    } = csr_connection;
2375    match seed {
2376        None => {
2377            let scx = StatementContext::new(None, &*catalog);
2378
2379            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2380                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2381                _ => sql_bail!("{} is not a schema registry connection", connection),
2382            };
2383
2384            let ccsr_client = ccsr_connection
2385                .connect(storage_configuration, InTask::No)
2386                .await
2387                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2388
2389            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2390            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2391                .await
2392                .ok();
2393
2394            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2395                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2396            }
2397
2398            *seed = Some(CsrSeedProtobuf { value, key });
2399        }
2400        Some(_) => (),
2401    }
2402
2403    Ok(())
2404}
2405
2406async fn purify_csr_connection_avro(
2407    catalog: &dyn SessionCatalog,
2408    options: &SourceFormatOptions,
2409    csr_connection: &mut CsrConnectionAvro<Aug>,
2410    envelope: &Option<SourceEnvelope>,
2411    storage_configuration: &StorageConfiguration,
2412) -> Result<(), PlanError> {
2413    let SourceFormatOptions::Kafka { topic } = options else {
2414        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2415    };
2416
2417    let CsrConnectionAvro {
2418        connection: CsrConnection { connection, .. },
2419        seed,
2420        key_strategy,
2421        value_strategy,
2422    } = csr_connection;
2423    if seed.is_none() {
2424        let scx = StatementContext::new(None, &*catalog);
2425        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2426            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2427            _ => sql_bail!("{} is not a schema registry connection", connection),
2428        };
2429        let ccsr_client = csr_connection
2430            .connect(storage_configuration, InTask::No)
2431            .await
2432            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2433
2434        let Schema {
2435            key_schema,
2436            value_schema,
2437        } = get_remote_csr_schema(
2438            &ccsr_client,
2439            key_strategy.clone().unwrap_or_default(),
2440            value_strategy.clone().unwrap_or_default(),
2441            topic,
2442        )
2443        .await?;
2444        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2445            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2446        }
2447
2448        *seed = Some(CsrSeedAvro {
2449            key_schema,
2450            value_schema,
2451        })
2452    }
2453
2454    Ok(())
2455}
2456
2457#[derive(Debug)]
2458pub struct Schema {
2459    pub key_schema: Option<String>,
2460    pub value_schema: String,
2461}
2462
2463async fn get_schema_with_strategy(
2464    client: &Client,
2465    strategy: ReaderSchemaSelectionStrategy,
2466    subject: &str,
2467) -> Result<Option<String>, PlanError> {
2468    match strategy {
2469        ReaderSchemaSelectionStrategy::Latest => {
2470            match client.get_schema_by_subject(subject).await {
2471                Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2472                Err(GetBySubjectError::SubjectNotFound)
2473                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2474                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2475                    schema_lookup: format!("subject {}", subject.quoted()),
2476                    cause: Arc::new(e),
2477                }),
2478            }
2479        }
2480        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2481        ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2482            Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2483            Err(GetByIdError::SchemaNotFound) => Ok(None),
2484            Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2485                schema_lookup: format!("ID {}", id),
2486                cause: Arc::new(e),
2487            }),
2488        },
2489    }
2490}
2491
2492async fn get_remote_csr_schema(
2493    ccsr_client: &mz_ccsr::Client,
2494    key_strategy: ReaderSchemaSelectionStrategy,
2495    value_strategy: ReaderSchemaSelectionStrategy,
2496    topic: &str,
2497) -> Result<Schema, PlanError> {
2498    let value_schema_name = format!("{}-value", topic);
2499    let value_schema =
2500        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2501    let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2502    let subject = format!("{}-key", topic);
2503    let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2504    Ok(Schema {
2505        key_schema,
2506        value_schema,
2507    })
2508}
2509
2510/// Collect protobuf message descriptor from CSR and compile the descriptor.
2511async fn compile_proto(
2512    subject_name: &String,
2513    ccsr_client: &Client,
2514) -> Result<CsrSeedProtobufSchema, PlanError> {
2515    let (primary_subject, dependency_subjects) = ccsr_client
2516        .get_subject_and_references(subject_name)
2517        .await
2518        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2519            schema_lookup: format!("subject {}", subject_name.quoted()),
2520            cause: Arc::new(e),
2521        })?;
2522
2523    // Compile .proto files into a file descriptor set.
2524    let mut source_tree = VirtualSourceTree::new();
2525    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2526        source_tree.as_mut().add_file(
2527            Path::new(&subject.name),
2528            subject.schema.raw.as_bytes().to_vec(),
2529        );
2530    }
2531    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2532    let fds = db
2533        .as_mut()
2534        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2535        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2536
2537    // Ensure there is exactly one message in the file.
2538    let primary_fd = fds.file(0);
2539    let message_name = match primary_fd.message_type_size() {
2540        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2541        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2542        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2543    };
2544
2545    // Encode the file descriptor set into a SQL byte string.
2546    let bytes = &fds
2547        .serialize()
2548        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2549    let mut schema = String::new();
2550    strconv::format_bytes(&mut schema, bytes);
2551
2552    Ok(CsrSeedProtobufSchema {
2553        schema,
2554        message_name,
2555    })
2556}
2557
2558const MZ_NOW_NAME: &str = "mz_now";
2559const MZ_NOW_SCHEMA: &str = "mz_catalog";
2560
2561/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2562/// references to ids appear or disappear during the purification.
2563///
2564/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2565/// this function is not making any network calls.
2566pub fn purify_create_materialized_view_options(
2567    catalog: impl SessionCatalog,
2568    mz_now: Option<Timestamp>,
2569    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2570    resolved_ids: &mut ResolvedIds,
2571) {
2572    // 0. Preparations:
2573    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2574    let (mz_now_id, mz_now_expr) = {
2575        let item = catalog
2576            .resolve_function(&PartialItemName {
2577                database: None,
2578                schema: Some(MZ_NOW_SCHEMA.to_string()),
2579                item: MZ_NOW_NAME.to_string(),
2580            })
2581            .expect("we should be able to resolve mz_now");
2582        (
2583            item.id(),
2584            Expr::Function(Function {
2585                name: ResolvedItemName::Item {
2586                    id: item.id(),
2587                    qualifiers: item.name().qualifiers.clone(),
2588                    full_name: catalog.resolve_full_name(item.name()),
2589                    print_id: false,
2590                    version: RelationVersionSelector::Latest,
2591                },
2592                args: FunctionArgs::Args {
2593                    args: Vec::new(),
2594                    order_by: Vec::new(),
2595                },
2596                filter: None,
2597                over: None,
2598                distinct: false,
2599            }),
2600        )
2601    };
2602    // Prepare the `mz_timestamp` type.
2603    let (mz_timestamp_id, mz_timestamp_type) = {
2604        let item = catalog.get_system_type("mz_timestamp");
2605        let full_name = catalog.resolve_full_name(item.name());
2606        (
2607            item.id(),
2608            ResolvedDataType::Named {
2609                id: item.id(),
2610                qualifiers: item.name().qualifiers.clone(),
2611                full_name,
2612                modifiers: vec![],
2613                print_id: true,
2614            },
2615        )
2616    };
2617
2618    let mut introduced_mz_timestamp = false;
2619
2620    for option in cmvs.with_options.iter_mut() {
2621        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2622        if matches!(
2623            option.value,
2624            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2625        ) {
2626            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2627                RefreshAtOptionValue {
2628                    time: mz_now_expr.clone(),
2629                },
2630            )));
2631        }
2632
2633        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2634        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2635            RefreshEveryOptionValue { aligned_to, .. },
2636        ))) = &mut option.value
2637        {
2638            if aligned_to.is_none() {
2639                *aligned_to = Some(mz_now_expr.clone());
2640            }
2641        }
2642
2643        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2644        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2645        match &mut option.value {
2646            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2647                time,
2648            }))) => {
2649                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2650                visitor.visit_expr_mut(time);
2651                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2652            }
2653            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2654                RefreshEveryOptionValue {
2655                    interval: _,
2656                    aligned_to: Some(aligned_to),
2657                },
2658            ))) => {
2659                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2660                visitor.visit_expr_mut(aligned_to);
2661                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2662            }
2663            _ => {}
2664        }
2665    }
2666
2667    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2668    if !cmvs.with_options.iter().any(|o| {
2669        matches!(
2670            o,
2671            MaterializedViewOption {
2672                value: Some(WithOptionValue::Refresh(..)),
2673                ..
2674            }
2675        )
2676    }) {
2677        cmvs.with_options.push(MaterializedViewOption {
2678            name: MaterializedViewOptionName::Refresh,
2679            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2680        })
2681    }
2682
2683    // 5. Attend to `resolved_ids`: The purification might have
2684    // - added references to `mz_timestamp`;
2685    // - removed references to `mz_now`.
2686    if introduced_mz_timestamp {
2687        resolved_ids.add_item(mz_timestamp_id);
2688    }
2689    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2690    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2691    // for `mz_now()` everywhere.
2692    let mut visitor = ExprContainsTemporalVisitor::new();
2693    visitor.visit_create_materialized_view_statement(cmvs);
2694    if !visitor.contains_temporal {
2695        resolved_ids.remove_item(&mz_now_id);
2696    }
2697}
2698
2699/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2700/// after purification.
2701pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2702    match &mvo.value {
2703        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2704            let mut visitor = ExprContainsTemporalVisitor::new();
2705            visitor.visit_expr(time);
2706            visitor.contains_temporal
2707        }
2708        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2709            interval: _,
2710            aligned_to: Some(aligned_to),
2711        }))) => {
2712            let mut visitor = ExprContainsTemporalVisitor::new();
2713            visitor.visit_expr(aligned_to);
2714            visitor.contains_temporal
2715        }
2716        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2717            interval: _,
2718            aligned_to: None,
2719        }))) => {
2720            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2721            // `ALIGNED TO` to `mz_now()`.
2722            true
2723        }
2724        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2725            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2726            true
2727        }
2728        _ => false,
2729    }
2730}
2731
2732/// Determines whether the AST involves `mz_now()`.
2733struct ExprContainsTemporalVisitor {
2734    pub contains_temporal: bool,
2735}
2736
2737impl ExprContainsTemporalVisitor {
2738    pub fn new() -> ExprContainsTemporalVisitor {
2739        ExprContainsTemporalVisitor {
2740            contains_temporal: false,
2741        }
2742    }
2743}
2744
2745impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2746    fn visit_function(&mut self, func: &Function<Aug>) {
2747        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2748        visit_function(self, func);
2749    }
2750}
2751
2752struct MzNowPurifierVisitor {
2753    pub mz_now: Option<Timestamp>,
2754    pub mz_timestamp_type: ResolvedDataType,
2755    pub introduced_mz_timestamp: bool,
2756}
2757
2758impl MzNowPurifierVisitor {
2759    pub fn new(
2760        mz_now: Option<Timestamp>,
2761        mz_timestamp_type: ResolvedDataType,
2762    ) -> MzNowPurifierVisitor {
2763        MzNowPurifierVisitor {
2764            mz_now,
2765            mz_timestamp_type,
2766            introduced_mz_timestamp: false,
2767        }
2768    }
2769}
2770
2771impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2772    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2773        match expr {
2774            Expr::Function(Function {
2775                name:
2776                    ResolvedItemName::Item {
2777                        full_name: FullItemName { item, .. },
2778                        ..
2779                    },
2780                ..
2781            }) if item == &MZ_NOW_NAME.to_string() => {
2782                let mz_now = self.mz_now.expect(
2783                    "we should have chosen a timestamp if the expression contains mz_now()",
2784                );
2785                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2786                // not alter the type of the expression.
2787                *expr = Expr::Cast {
2788                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2789                    data_type: self.mz_timestamp_type.clone(),
2790                };
2791                self.introduced_mz_timestamp = true;
2792            }
2793            _ => visit_expr_mut(self, expr),
2794        }
2795    }
2796}