mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
62};
63use prost::Message;
64use protobuf_native::MessageLite;
65use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
66use rdkafka::admin::AdminClient;
67use references::{RetrievedSourceReferences, SourceReferenceClient};
68use uuid::Uuid;
69
70use crate::ast::{
71    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
72    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
73    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
74};
75use crate::catalog::{CatalogItemType, SessionCatalog};
76use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
77use crate::names::{
78    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
79    ResolvedItemName,
80};
81use crate::plan::error::PlanError;
82use crate::plan::statement::ddl::load_generator_ast_to_generator;
83use crate::plan::{SourceReferences, StatementContext};
84use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
85use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
86use crate::{kafka_util, normalize};
87
88use self::error::{
89    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
90    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
91};
92
93pub(crate) mod error;
94mod references;
95
96pub mod mysql;
97pub mod postgres;
98pub mod sql_server;
99
100pub(crate) struct RequestedSourceExport<T> {
101    external_reference: UnresolvedItemName,
102    name: UnresolvedItemName,
103    meta: T,
104}
105
106impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.debug_struct("RequestedSourceExport")
109            .field("external_reference", &self.external_reference)
110            .field("name", &self.name)
111            .field("meta", &self.meta)
112            .finish()
113    }
114}
115
116impl<T> RequestedSourceExport<T> {
117    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
118        RequestedSourceExport {
119            external_reference: self.external_reference,
120            name: self.name,
121            meta: new_meta,
122        }
123    }
124}
125
126/// Generates a subsource name by prepending source schema name if present
127///
128/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
129/// so that it's generated in the same schema as source
130fn source_export_name_gen(
131    source_name: &UnresolvedItemName,
132    subsource_name: &str,
133) -> Result<UnresolvedItemName, PlanError> {
134    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
135    partial.item = subsource_name.to_string();
136    Ok(UnresolvedItemName::from(partial))
137}
138
139// TODO(database-issues#8620): Remove once subsources are removed
140/// Validates the requested subsources do not have name conflicts with each other
141/// and that the same upstream table is not referenced multiple times.
142fn validate_source_export_names<T>(
143    requested_source_exports: &[RequestedSourceExport<T>],
144) -> Result<(), PlanError> {
145    // This condition would get caught during the catalog transaction, but produces a
146    // vague, non-contextual error. Instead, error here so we can suggest to the user
147    // how to fix the problem.
148    if let Some(name) = requested_source_exports
149        .iter()
150        .map(|subsource| &subsource.name)
151        .duplicates()
152        .next()
153        .cloned()
154    {
155        let mut upstream_references: Vec<_> = requested_source_exports
156            .into_iter()
157            .filter_map(|subsource| {
158                if &subsource.name == &name {
159                    Some(subsource.external_reference.clone())
160                } else {
161                    None
162                }
163            })
164            .collect();
165
166        upstream_references.sort();
167
168        Err(PlanError::SubsourceNameConflict {
169            name,
170            upstream_references,
171        })?;
172    }
173
174    // We disallow subsource statements from referencing the same upstream table, but we allow
175    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
176    // purification will only provide 1 `requested_source_export`, we can leave this here without
177    // needing to differentiate between the two types of statements.
178    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
179    if let Some(name) = requested_source_exports
180        .iter()
181        .map(|export| &export.external_reference)
182        .duplicates()
183        .next()
184        .cloned()
185    {
186        let mut target_names: Vec<_> = requested_source_exports
187            .into_iter()
188            .filter_map(|export| {
189                if &export.external_reference == &name {
190                    Some(export.name.clone())
191                } else {
192                    None
193                }
194            })
195            .collect();
196
197        target_names.sort();
198
199        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
200    }
201
202    Ok(())
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum PurifiedStatement {
207    PurifiedCreateSource {
208        // The progress subsource, if we are offloading progress info to a separate relation
209        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
210        create_source_stmt: CreateSourceStatement<Aug>,
211        // Map of subsource names to external details
212        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213        /// All the available upstream references that can be added as tables
214        /// to this primary source.
215        available_source_references: SourceReferences,
216    },
217    PurifiedAlterSource {
218        alter_source_stmt: AlterSourceStatement<Aug>,
219    },
220    PurifiedAlterSourceAddSubsources {
221        // This just saves us an annoying catalog lookup
222        source_name: ResolvedItemName,
223        /// Options that we will need the values of to update the source's
224        /// definition.
225        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226        // Map of subsource names to external details
227        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228    },
229    PurifiedAlterSourceRefreshReferences {
230        source_name: ResolvedItemName,
231        /// The updated available upstream references for the primary source.
232        available_source_references: SourceReferences,
233    },
234    PurifiedCreateSink(CreateSinkStatement<Aug>),
235    PurifiedCreateTableFromSource {
236        stmt: CreateTableFromSourceStatement<Aug>,
237    },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242    pub external_reference: UnresolvedItemName,
243    pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248    MySql {
249        table: MySqlTableDesc,
250        text_columns: Option<Vec<Ident>>,
251        exclude_columns: Option<Vec<Ident>>,
252        initial_gtid_set: String,
253    },
254    Postgres {
255        table: PostgresTableDesc,
256        text_columns: Option<Vec<Ident>>,
257        exclude_columns: Option<Vec<Ident>>,
258    },
259    SqlServer {
260        table: SqlServerTableDesc,
261        text_columns: Option<Vec<Ident>>,
262        excl_columns: Option<Vec<Ident>>,
263        capture_instance: Arc<str>,
264        initial_lsn: mz_sql_server_util::cdc::Lsn,
265    },
266    Kafka {},
267    LoadGenerator {
268        table: Option<RelationDesc>,
269        output: LoadGeneratorOutput,
270    },
271}
272
273/// Purifies a statement, removing any dependencies on external state.
274///
275/// See the section on [purification](crate#purification) in the crate
276/// documentation for details.
277///
278/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
279/// handled by [purify_create_materialized_view_options] instead.
280/// This could be made more consistent by a refactoring discussed here:
281/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
282pub async fn purify_statement(
283    catalog: impl SessionCatalog,
284    now: u64,
285    stmt: Statement<Aug>,
286    storage_configuration: &StorageConfiguration,
287) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
288    match stmt {
289        Statement::CreateSource(stmt) => {
290            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
291            (
292                purify_create_source(catalog, now, stmt, storage_configuration).await,
293                cluster_id,
294            )
295        }
296        Statement::AlterSource(stmt) => (
297            purify_alter_source(catalog, stmt, storage_configuration).await,
298            None,
299        ),
300        Statement::CreateSink(stmt) => {
301            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
302            (
303                purify_create_sink(catalog, stmt, storage_configuration).await,
304                cluster_id,
305            )
306        }
307        Statement::CreateTableFromSource(stmt) => (
308            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
309            None,
310        ),
311        o => unreachable!("{:?} does not need to be purified", o),
312    }
313}
314
315/// Injects `DOC ON` comments into all Avro formats that are using a schema
316/// registry by finding all SQL `COMMENT`s that are attached to the sink's
317/// underlying materialized view (as well as any types referenced by that
318/// underlying materialized view).
319pub(crate) fn purify_create_sink_avro_doc_on_options(
320    catalog: &dyn SessionCatalog,
321    from_id: CatalogItemId,
322    format: &mut Option<FormatSpecifier<Aug>>,
323) -> Result<(), PlanError> {
324    // Collect all objects referenced by the sink.
325    let from = catalog.get_item(&from_id);
326    let object_ids = from
327        .references()
328        .items()
329        .copied()
330        .chain_one(from.id())
331        .collect::<Vec<_>>();
332
333    // Collect all Avro formats that use a schema registry, as well as a set of
334    // all identifiers named in user-provided `DOC ON` options.
335    let mut avro_format_options = vec![];
336    for_each_format(format, |doc_on_schema, fmt| match fmt {
337        Format::Avro(AvroSchema::InlineSchema { .. })
338        | Format::Bytes
339        | Format::Csv { .. }
340        | Format::Json { .. }
341        | Format::Protobuf(..)
342        | Format::Regex(..)
343        | Format::Text => (),
344        Format::Avro(AvroSchema::Csr {
345            csr_connection: CsrConnectionAvro { connection, .. },
346        }) => {
347            avro_format_options.push((doc_on_schema, &mut connection.options));
348        }
349    });
350
351    // For each Avro format in the sink, inject the appropriate `DOC ON` options
352    // for each item referenced by the sink.
353    for (for_schema, options) in avro_format_options {
354        let user_provided_comments = options
355            .iter()
356            .filter_map(|CsrConfigOption { name, .. }| match name {
357                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
358                _ => None,
359            })
360            .collect::<BTreeSet<_>>();
361
362        // Adding existing comments if not already provided by user
363        for object_id in &object_ids {
364            // Always add comments to the latest version of the item.
365            let item = catalog
366                .get_item(object_id)
367                .at_version(RelationVersionSelector::Latest);
368            let full_name = catalog.resolve_full_name(item.name());
369            let full_resolved_name = ResolvedItemName::Item {
370                id: *object_id,
371                qualifiers: item.name().qualifiers.clone(),
372                full_name: full_name.clone(),
373                print_id: !matches!(
374                    item.item_type(),
375                    CatalogItemType::Func | CatalogItemType::Type
376                ),
377                version: RelationVersionSelector::Latest,
378            };
379
380            if let Some(comments_map) = catalog.get_item_comments(object_id) {
381                // Attach comment for the item itself, if the user has not
382                // already provided an overriding `DOC ON` option for the item.
383                let doc_on_item_key = AvroDocOn {
384                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
385                    for_schema,
386                };
387                if !user_provided_comments.contains(&doc_on_item_key) {
388                    if let Some(root_comment) = comments_map.get(&None) {
389                        options.push(CsrConfigOption {
390                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
391                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
392                                root_comment.clone(),
393                            ))),
394                        });
395                    }
396                }
397
398                // Attach comment for each column in the item, if the user has
399                // not already provided an overriding `DOC ON` option for the
400                // column.
401                if let Ok(desc) = item.desc(&full_name) {
402                    for (pos, column_name) in desc.iter_names().enumerate() {
403                        let comment = comments_map.get(&Some(pos + 1));
404                        if let Some(comment_str) = comment {
405                            let doc_on_column_key = AvroDocOn {
406                                identifier: DocOnIdentifier::Column(ColumnName {
407                                    relation: full_resolved_name.clone(),
408                                    column: ResolvedColumnReference::Column {
409                                        name: column_name.to_owned(),
410                                        index: pos,
411                                    },
412                                }),
413                                for_schema,
414                            };
415                            if !user_provided_comments.contains(&doc_on_column_key) {
416                                options.push(CsrConfigOption {
417                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
418                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
419                                        Value::String(comment_str.clone()),
420                                    )),
421                                });
422                            }
423                        }
424                    }
425                }
426            }
427        }
428    }
429
430    Ok(())
431}
432
433/// Checks that the sink described in the statement can connect to its external
434/// resources.
435async fn purify_create_sink(
436    catalog: impl SessionCatalog,
437    mut create_sink_stmt: CreateSinkStatement<Aug>,
438    storage_configuration: &StorageConfiguration,
439) -> Result<PurifiedStatement, PlanError> {
440    // General purification
441    let CreateSinkStatement {
442        connection,
443        format,
444        with_options,
445        name: _,
446        in_cluster: _,
447        if_not_exists: _,
448        from,
449        envelope: _,
450    } = &mut create_sink_stmt;
451
452    // The list of options that the user is allowed to specify.
453    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
454
455    if let Some(op) = with_options
456        .iter()
457        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
458    {
459        sql_bail!(
460            "CREATE SINK...WITH ({}..) is not allowed",
461            op.name.to_ast_string_simple(),
462        )
463    }
464
465    match &connection {
466        CreateSinkConnection::Kafka {
467            connection,
468            options,
469            key: _,
470            headers: _,
471        } => {
472            // We must not leave any state behind in the Kafka broker, so just ensure that
473            // we can connect. This means we don't ensure that we can create the topic and
474            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
475            // preferable to leaking state in users' environments.
476            let scx = StatementContext::new(None, &catalog);
477            let connection = {
478                let item = scx.get_item_by_resolved_name(connection)?;
479                // Get Kafka connection
480                match item.connection()? {
481                    Connection::Kafka(connection) => {
482                        connection.clone().into_inline_connection(scx.catalog)
483                    }
484                    _ => sql_bail!(
485                        "{} is not a kafka connection",
486                        scx.catalog.resolve_full_name(item.name())
487                    ),
488                }
489            };
490
491            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
492
493            if extracted_options.legacy_ids == Some(true) {
494                sql_bail!("LEGACY IDs option is not supported");
495            }
496
497            let client: AdminClient<_> = connection
498                .create_with_context(
499                    storage_configuration,
500                    MzClientContext::default(),
501                    &BTreeMap::new(),
502                    InTask::No,
503                )
504                .await
505                .map_err(|e| {
506                    // anyhow doesn't support Clone, so not trivial to move into PlanError
507                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
508                })?;
509
510            let metadata = client
511                .inner()
512                .fetch_metadata(
513                    None,
514                    storage_configuration
515                        .parameters
516                        .kafka_timeout_config
517                        .fetch_metadata_timeout,
518                )
519                .map_err(|e| {
520                    KafkaSinkPurificationError::AdminClientError(Arc::new(
521                        ContextCreationError::KafkaError(e),
522                    ))
523                })?;
524
525            if metadata.brokers().len() == 0 {
526                Err(KafkaSinkPurificationError::ZeroBrokers)?;
527            }
528        }
529        CreateSinkConnection::Iceberg {
530            connection,
531            aws_connection,
532            ..
533        } => {
534            let scx = StatementContext::new(None, &catalog);
535            let connection = {
536                let item = scx.get_item_by_resolved_name(connection)?;
537                // Get Iceberg connection
538                match item.connection()? {
539                    Connection::IcebergCatalog(connection) => {
540                        connection.clone().into_inline_connection(scx.catalog)
541                    }
542                    _ => sql_bail!(
543                        "{} is not an iceberg connection",
544                        scx.catalog.resolve_full_name(item.name())
545                    ),
546                }
547            };
548
549            let aws_conn_id = aws_connection.item_id();
550
551            let aws_connection = {
552                let item = scx.get_item_by_resolved_name(aws_connection)?;
553                // Get AWS connection
554                match item.connection()? {
555                    Connection::Aws(aws_connection) => aws_connection.clone(),
556                    _ => sql_bail!(
557                        "{} is not an aws connection",
558                        scx.catalog.resolve_full_name(item.name())
559                    ),
560                }
561            };
562
563            let _catalog = connection
564                .connect(storage_configuration, InTask::No)
565                .await
566                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
567
568            let sdk_config = aws_connection
569                .load_sdk_config(
570                    &storage_configuration.connection_context,
571                    aws_conn_id.clone(),
572                    InTask::No,
573                )
574                .await
575                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
576
577            let sts_client = aws_sdk_sts::Client::new(&sdk_config);
578            let _ = sts_client.get_caller_identity().send().await.map_err(|e| {
579                IcebergSinkPurificationError::StsIdentityError(Arc::new(e.into_service_error()))
580            })?;
581        }
582    }
583
584    let mut csr_connection_ids = BTreeSet::new();
585    for_each_format(format, |_, fmt| match fmt {
586        Format::Avro(AvroSchema::InlineSchema { .. })
587        | Format::Bytes
588        | Format::Csv { .. }
589        | Format::Json { .. }
590        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
591        | Format::Regex(..)
592        | Format::Text => (),
593        Format::Avro(AvroSchema::Csr {
594            csr_connection: CsrConnectionAvro { connection, .. },
595        })
596        | Format::Protobuf(ProtobufSchema::Csr {
597            csr_connection: CsrConnectionProtobuf { connection, .. },
598        }) => {
599            csr_connection_ids.insert(*connection.connection.item_id());
600        }
601    });
602
603    let scx = StatementContext::new(None, &catalog);
604    for csr_connection_id in csr_connection_ids {
605        let connection = {
606            let item = scx.get_item(&csr_connection_id);
607            // Get Kafka connection
608            match item.connection()? {
609                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
610                _ => Err(CsrPurificationError::NotCsrConnection(
611                    scx.catalog.resolve_full_name(item.name()),
612                ))?,
613            }
614        };
615
616        let client = connection
617            .connect(storage_configuration, InTask::No)
618            .await
619            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
620
621        client
622            .list_subjects()
623            .await
624            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
625    }
626
627    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
628
629    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
630}
631
632/// Runs a function on each format within the format specifier.
633///
634/// The function is provided with a `DocOnSchema` that indicates whether the
635/// format is for the key, value, or both.
636///
637// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
638fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
639where
640    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
641{
642    match format {
643        None => (),
644        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
645        Some(FormatSpecifier::KeyValue { key, value }) => {
646            f(DocOnSchema::KeyOnly, key);
647            f(DocOnSchema::ValueOnly, value);
648        }
649    }
650}
651
652/// Defines whether purification should enforce that at least one valid source
653/// reference is provided on the provided statement.
654#[derive(Debug, Copy, Clone, PartialEq, Eq)]
655pub(crate) enum SourceReferencePolicy {
656    /// Don't allow source references to be provided. This is used for
657    /// enforcing that `CREATE SOURCE` statements don't create subsources.
658    NotAllowed,
659    /// Allow empty references, such as when creating a source that
660    /// will have tables added afterwards.
661    Optional,
662    /// Require that at least one reference is resolved to an upstream
663    /// object.
664    Required,
665}
666
667async fn purify_create_source(
668    catalog: impl SessionCatalog,
669    now: u64,
670    mut create_source_stmt: CreateSourceStatement<Aug>,
671    storage_configuration: &StorageConfiguration,
672) -> Result<PurifiedStatement, PlanError> {
673    let CreateSourceStatement {
674        name: source_name,
675        col_names,
676        key_constraint,
677        connection: source_connection,
678        format,
679        envelope,
680        include_metadata,
681        external_references,
682        progress_subsource,
683        with_options,
684        ..
685    } = &mut create_source_stmt;
686
687    let uses_old_syntax = !col_names.is_empty()
688        || key_constraint.is_some()
689        || format.is_some()
690        || envelope.is_some()
691        || !include_metadata.is_empty()
692        || external_references.is_some()
693        || progress_subsource.is_some();
694
695    if let Some(DeferredItemName::Named(_)) = progress_subsource {
696        sql_bail!("Cannot manually ID qualify progress subsource")
697    }
698
699    let mut requested_subsource_map = BTreeMap::new();
700
701    let progress_desc = match &source_connection {
702        CreateSourceConnection::Kafka { .. } => {
703            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
704        }
705        CreateSourceConnection::Postgres { .. } => {
706            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
707        }
708        CreateSourceConnection::SqlServer { .. } => {
709            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
710        }
711        CreateSourceConnection::MySql { .. } => {
712            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
713        }
714        CreateSourceConnection::LoadGenerator { .. } => {
715            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
716        }
717    };
718    let scx = StatementContext::new(None, &catalog);
719
720    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
721    // to add tables after this source is created we might need to enforce that
722    // auto-generated subsources are created or not created by this source statement.
723    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
724        && scx.catalog.system_vars().force_source_table_syntax()
725    {
726        SourceReferencePolicy::NotAllowed
727    } else if scx.catalog.system_vars().enable_create_table_from_source() {
728        SourceReferencePolicy::Optional
729    } else {
730        SourceReferencePolicy::Required
731    };
732
733    let mut format_options = SourceFormatOptions::Default;
734
735    let retrieved_source_references: RetrievedSourceReferences;
736
737    match source_connection {
738        CreateSourceConnection::Kafka {
739            connection,
740            options: base_with_options,
741            ..
742        } => {
743            if let Some(external_references) = external_references {
744                Err(KafkaSourcePurificationError::ReferencedSubsources(
745                    external_references.clone(),
746                ))?;
747            }
748
749            let connection = {
750                let item = scx.get_item_by_resolved_name(connection)?;
751                // Get Kafka connection
752                match item.connection()? {
753                    Connection::Kafka(connection) => {
754                        connection.clone().into_inline_connection(&catalog)
755                    }
756                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
757                        scx.catalog.resolve_full_name(item.name()),
758                    ))?,
759                }
760            };
761
762            let extracted_options: KafkaSourceConfigOptionExtracted =
763                base_with_options.clone().try_into()?;
764
765            let topic = extracted_options
766                .topic
767                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
768
769            let consumer = connection
770                .create_with_context(
771                    storage_configuration,
772                    MzClientContext::default(),
773                    &BTreeMap::new(),
774                    InTask::No,
775                )
776                .await
777                .map_err(|e| {
778                    // anyhow doesn't support Clone, so not trivial to move into PlanError
779                    KafkaSourcePurificationError::KafkaConsumerError(
780                        e.display_with_causes().to_string(),
781                    )
782                })?;
783            let consumer = Arc::new(consumer);
784
785            match (
786                extracted_options.start_offset,
787                extracted_options.start_timestamp,
788            ) {
789                (None, None) => {
790                    // Validate that the topic at least exists.
791                    kafka_util::ensure_topic_exists(
792                        Arc::clone(&consumer),
793                        &topic,
794                        storage_configuration
795                            .parameters
796                            .kafka_timeout_config
797                            .fetch_metadata_timeout,
798                    )
799                    .await?;
800                }
801                (Some(_), Some(_)) => {
802                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
803                }
804                (Some(start_offsets), None) => {
805                    // Validate the start offsets.
806                    kafka_util::validate_start_offsets(
807                        Arc::clone(&consumer),
808                        &topic,
809                        start_offsets,
810                        storage_configuration
811                            .parameters
812                            .kafka_timeout_config
813                            .fetch_metadata_timeout,
814                    )
815                    .await?;
816                }
817                (None, Some(time_offset)) => {
818                    // Translate `START TIMESTAMP` to a start offset.
819                    let start_offsets = kafka_util::lookup_start_offsets(
820                        Arc::clone(&consumer),
821                        &topic,
822                        time_offset,
823                        now,
824                        storage_configuration
825                            .parameters
826                            .kafka_timeout_config
827                            .fetch_metadata_timeout,
828                    )
829                    .await?;
830
831                    base_with_options.retain(|val| {
832                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
833                    });
834                    base_with_options.push(KafkaSourceConfigOption {
835                        name: KafkaSourceConfigOptionName::StartOffset,
836                        value: Some(WithOptionValue::Sequence(
837                            start_offsets
838                                .iter()
839                                .map(|offset| {
840                                    WithOptionValue::Value(Value::Number(offset.to_string()))
841                                })
842                                .collect(),
843                        )),
844                    });
845                }
846            }
847
848            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
849            retrieved_source_references = reference_client.get_source_references().await?;
850
851            format_options = SourceFormatOptions::Kafka { topic };
852        }
853        CreateSourceConnection::Postgres {
854            connection,
855            options,
856        } => {
857            let connection_item = scx.get_item_by_resolved_name(connection)?;
858            let connection = match connection_item.connection().map_err(PlanError::from)? {
859                Connection::Postgres(connection) => {
860                    connection.clone().into_inline_connection(&catalog)
861                }
862                _ => Err(PgSourcePurificationError::NotPgConnection(
863                    scx.catalog.resolve_full_name(connection_item.name()),
864                ))?,
865            };
866            let crate::plan::statement::PgConfigOptionExtracted {
867                publication,
868                text_columns,
869                exclude_columns,
870                details,
871                ..
872            } = options.clone().try_into()?;
873            let publication =
874                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
875
876            if details.is_some() {
877                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
878            }
879
880            let client = connection
881                .validate(connection_item.id(), storage_configuration)
882                .await?;
883
884            let reference_client = SourceReferenceClient::Postgres {
885                client: &client,
886                publication: &publication,
887                database: &connection.database,
888            };
889            retrieved_source_references = reference_client.get_source_references().await?;
890
891            let postgres::PurifiedSourceExports {
892                source_exports: subsources,
893                normalized_text_columns,
894            } = postgres::purify_source_exports(
895                &client,
896                &retrieved_source_references,
897                external_references,
898                text_columns,
899                exclude_columns,
900                source_name,
901                &reference_policy,
902            )
903            .await?;
904
905            if let Some(text_cols_option) = options
906                .iter_mut()
907                .find(|option| option.name == PgConfigOptionName::TextColumns)
908            {
909                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
910            }
911
912            requested_subsource_map.extend(subsources);
913
914            // Record the active replication timeline_id to allow detection of a future upstream
915            // point-in-time-recovery that will put the source into an error state.
916            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
917
918            // Remove any old detail references
919            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
920            let details = PostgresSourcePublicationDetails {
921                slot: format!(
922                    "materialize_{}",
923                    Uuid::new_v4().to_string().replace('-', "")
924                ),
925                timeline_id: Some(timeline_id),
926                database: connection.database,
927            };
928            options.push(PgConfigOption {
929                name: PgConfigOptionName::Details,
930                value: Some(WithOptionValue::Value(Value::String(hex::encode(
931                    details.into_proto().encode_to_vec(),
932                )))),
933            })
934        }
935        CreateSourceConnection::SqlServer {
936            connection,
937            options,
938        } => {
939            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
940
941            // Connect to the upstream SQL Server instance so we can validate
942            // we're compatible with CDC.
943            let connection_item = scx.get_item_by_resolved_name(connection)?;
944            let connection = match connection_item.connection()? {
945                Connection::SqlServer(connection) => {
946                    connection.clone().into_inline_connection(&catalog)
947                }
948                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
949                    scx.catalog.resolve_full_name(connection_item.name()),
950                ))?,
951            };
952            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
953                details,
954                text_columns,
955                exclude_columns,
956                seen: _,
957            } = options.clone().try_into()?;
958
959            if details.is_some() {
960                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
961            }
962
963            let mut client = connection
964                .validate(connection_item.id(), storage_configuration)
965                .await?;
966
967            let database: Arc<str> = connection.database.into();
968            let reference_client = SourceReferenceClient::SqlServer {
969                client: &mut client,
970                database: Arc::clone(&database),
971            };
972            retrieved_source_references = reference_client.get_source_references().await?;
973            tracing::debug!(?retrieved_source_references, "got source references");
974
975            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
976                .get(storage_configuration.config_set());
977
978            let purified_source_exports = sql_server::purify_source_exports(
979                &*database,
980                &mut client,
981                &retrieved_source_references,
982                external_references,
983                &text_columns,
984                &exclude_columns,
985                source_name,
986                timeout,
987                &reference_policy,
988            )
989            .await?;
990
991            let sql_server::PurifiedSourceExports {
992                source_exports,
993                normalized_text_columns,
994                normalized_excl_columns,
995            } = purified_source_exports;
996
997            // Update our set of requested source exports.
998            requested_subsource_map.extend(source_exports);
999
1000            // Record the most recent restore_history_id, or none if the system has never been
1001            // restored.
1002
1003            let restore_history_id =
1004                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1005            let details = SqlServerSourceExtras { restore_history_id };
1006
1007            options.retain(|SqlServerConfigOption { name, .. }| {
1008                name != &SqlServerConfigOptionName::Details
1009            });
1010            options.push(SqlServerConfigOption {
1011                name: SqlServerConfigOptionName::Details,
1012                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1013                    details.into_proto().encode_to_vec(),
1014                )))),
1015            });
1016
1017            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1018            if let Some(text_cols_option) = options
1019                .iter_mut()
1020                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1021            {
1022                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1023            }
1024            if let Some(excl_cols_option) = options
1025                .iter_mut()
1026                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1027            {
1028                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1029            }
1030        }
1031        CreateSourceConnection::MySql {
1032            connection,
1033            options,
1034        } => {
1035            let connection_item = scx.get_item_by_resolved_name(connection)?;
1036            let connection = match connection_item.connection()? {
1037                Connection::MySql(connection) => {
1038                    connection.clone().into_inline_connection(&catalog)
1039                }
1040                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1041                    scx.catalog.resolve_full_name(connection_item.name()),
1042                ))?,
1043            };
1044            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1045                details,
1046                text_columns,
1047                exclude_columns,
1048                seen: _,
1049            } = options.clone().try_into()?;
1050
1051            if details.is_some() {
1052                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1053            }
1054
1055            let mut conn = connection
1056                .validate(connection_item.id(), storage_configuration)
1057                .await
1058                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1059
1060            // Retrieve the current @gtid_executed value of the server to mark as the effective
1061            // initial snapshot point such that we can ensure consistency if the initial source
1062            // snapshot is broken up over multiple points in time.
1063            let initial_gtid_set =
1064                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1065
1066            let reference_client = SourceReferenceClient::MySql {
1067                conn: &mut conn,
1068                include_system_schemas: mysql::references_system_schemas(external_references),
1069            };
1070            retrieved_source_references = reference_client.get_source_references().await?;
1071
1072            let mysql::PurifiedSourceExports {
1073                source_exports: subsources,
1074                normalized_text_columns,
1075                normalized_exclude_columns,
1076            } = mysql::purify_source_exports(
1077                &mut conn,
1078                &retrieved_source_references,
1079                external_references,
1080                text_columns,
1081                exclude_columns,
1082                source_name,
1083                initial_gtid_set.clone(),
1084                &reference_policy,
1085            )
1086            .await?;
1087            requested_subsource_map.extend(subsources);
1088
1089            // We don't have any fields in this details struct but keep this around for
1090            // conformity with postgres and in-case we end up needing it again in the future.
1091            let details = MySqlSourceDetails {};
1092            // Update options with the purified details
1093            options
1094                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1095            options.push(MySqlConfigOption {
1096                name: MySqlConfigOptionName::Details,
1097                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1098                    details.into_proto().encode_to_vec(),
1099                )))),
1100            });
1101
1102            if let Some(text_cols_option) = options
1103                .iter_mut()
1104                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1105            {
1106                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1107            }
1108            if let Some(exclude_cols_option) = options
1109                .iter_mut()
1110                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1111            {
1112                exclude_cols_option.value =
1113                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1114            }
1115        }
1116        CreateSourceConnection::LoadGenerator { generator, options } => {
1117            let load_generator =
1118                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1119
1120            let reference_client = SourceReferenceClient::LoadGenerator {
1121                generator: &load_generator,
1122            };
1123            retrieved_source_references = reference_client.get_source_references().await?;
1124            // Filter to the references that need to be created as 'subsources', which
1125            // doesn't include the default output for single-output sources.
1126            // TODO(database-issues#8620): Remove once subsources are removed
1127            let multi_output_sources =
1128                retrieved_source_references
1129                    .all_references()
1130                    .iter()
1131                    .any(|r| {
1132                        r.load_generator_output().expect("is loadgen")
1133                            != &LoadGeneratorOutput::Default
1134                    });
1135
1136            match external_references {
1137                Some(requested)
1138                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1139                {
1140                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1141                }
1142                Some(requested) if !multi_output_sources => match requested {
1143                    ExternalReferences::SubsetTables(_) => {
1144                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1145                    }
1146                    ExternalReferences::SubsetSchemas(_) => {
1147                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1148                    }
1149                    ExternalReferences::All => {
1150                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1151                    }
1152                },
1153                Some(requested) => {
1154                    let requested_exports = retrieved_source_references
1155                        .requested_source_exports(Some(requested), source_name)?;
1156                    for export in requested_exports {
1157                        requested_subsource_map.insert(
1158                            export.name,
1159                            PurifiedSourceExport {
1160                                external_reference: export.external_reference,
1161                                details: PurifiedExportDetails::LoadGenerator {
1162                                    table: export
1163                                        .meta
1164                                        .load_generator_desc()
1165                                        .expect("is loadgen")
1166                                        .clone(),
1167                                    output: export
1168                                        .meta
1169                                        .load_generator_output()
1170                                        .expect("is loadgen")
1171                                        .clone(),
1172                                },
1173                            },
1174                        );
1175                    }
1176                }
1177                None => {
1178                    if multi_output_sources
1179                        && matches!(reference_policy, SourceReferencePolicy::Required)
1180                    {
1181                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1182                    }
1183                }
1184            }
1185
1186            if let LoadGenerator::Clock = generator {
1187                if !options
1188                    .iter()
1189                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1190                {
1191                    let now = catalog.now();
1192                    options.push(LoadGeneratorOption {
1193                        name: LoadGeneratorOptionName::AsOf,
1194                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1195                    });
1196                }
1197            }
1198        }
1199    }
1200
1201    // Now that we know which subsources to create alongside this
1202    // statement, remove the references so it is not canonicalized as
1203    // part of the `CREATE SOURCE` statement in the catalog.
1204    *external_references = None;
1205
1206    // Generate progress subsource for old syntax
1207    let create_progress_subsource_stmt = if uses_old_syntax {
1208        // Take name from input or generate name
1209        let name = match progress_subsource {
1210            Some(name) => match name {
1211                DeferredItemName::Deferred(name) => name.clone(),
1212                DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1213            },
1214            None => {
1215                let (item, prefix) = source_name.0.split_last().unwrap();
1216                let item_name =
1217                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1218                        let mut suggested_name = prefix.to_vec();
1219                        suggested_name.push(candidate.clone());
1220
1221                        let partial =
1222                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1223                        let qualified = scx.allocate_qualified_name(partial)?;
1224                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1225                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1226                        Ok::<_, PlanError>(!item_exists && !type_exists)
1227                    })?;
1228
1229                let mut full_name = prefix.to_vec();
1230                full_name.push(item_name);
1231                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1232                let qualified_name = scx.allocate_qualified_name(full_name)?;
1233                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1234
1235                UnresolvedItemName::from(full_name.clone())
1236            }
1237        };
1238
1239        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1240
1241        // Create the subsource statement
1242        let mut progress_with_options: Vec<_> = with_options
1243            .iter()
1244            .filter_map(|opt| match opt.name {
1245                CreateSourceOptionName::TimestampInterval => None,
1246                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1247                    name: CreateSubsourceOptionName::RetainHistory,
1248                    value: opt.value.clone(),
1249                }),
1250            })
1251            .collect();
1252        progress_with_options.push(CreateSubsourceOption {
1253            name: CreateSubsourceOptionName::Progress,
1254            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1255        });
1256
1257        Some(CreateSubsourceStatement {
1258            name,
1259            columns,
1260            // Progress subsources do not refer to the source to which they belong.
1261            // Instead the primary source depends on it (the opposite is true of
1262            // ingestion exports, which depend on the primary source).
1263            of_source: None,
1264            constraints,
1265            if_not_exists: false,
1266            with_options: progress_with_options,
1267        })
1268    } else {
1269        None
1270    };
1271
1272    purify_source_format(
1273        &catalog,
1274        format,
1275        &format_options,
1276        envelope,
1277        storage_configuration,
1278    )
1279    .await?;
1280
1281    Ok(PurifiedStatement::PurifiedCreateSource {
1282        create_progress_subsource_stmt,
1283        create_source_stmt,
1284        subsources: requested_subsource_map,
1285        available_source_references: retrieved_source_references.available_source_references(),
1286    })
1287}
1288
1289/// On success, returns the details on new subsources and updated
1290/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1291async fn purify_alter_source(
1292    catalog: impl SessionCatalog,
1293    stmt: AlterSourceStatement<Aug>,
1294    storage_configuration: &StorageConfiguration,
1295) -> Result<PurifiedStatement, PlanError> {
1296    let scx = StatementContext::new(None, &catalog);
1297    let AlterSourceStatement {
1298        source_name: unresolved_source_name,
1299        action,
1300        if_exists,
1301    } = stmt;
1302
1303    // Get name.
1304    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1305        Ok(item) => item,
1306        Err(_) if if_exists => {
1307            return Ok(PurifiedStatement::PurifiedAlterSource {
1308                alter_source_stmt: AlterSourceStatement {
1309                    source_name: unresolved_source_name,
1310                    action,
1311                    if_exists,
1312                },
1313            });
1314        }
1315        Err(e) => return Err(e),
1316    };
1317
1318    // Ensure it's an ingestion-based and alterable source.
1319    let desc = match item.source_desc()? {
1320        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1321        None => {
1322            sql_bail!("cannot ALTER this type of source")
1323        }
1324    };
1325
1326    let source_name = item.name();
1327
1328    let resolved_source_name = ResolvedItemName::Item {
1329        id: item.id(),
1330        qualifiers: item.name().qualifiers.clone(),
1331        full_name: scx.catalog.resolve_full_name(source_name),
1332        print_id: true,
1333        version: RelationVersionSelector::Latest,
1334    };
1335
1336    let partial_name = scx.catalog.minimal_qualification(source_name);
1337
1338    match action {
1339        AlterSourceAction::AddSubsources {
1340            external_references,
1341            options,
1342        } => {
1343            if scx.catalog.system_vars().enable_create_table_from_source()
1344                && scx.catalog.system_vars().force_source_table_syntax()
1345            {
1346                Err(PlanError::UseTablesForSources(
1347                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1348                ))?;
1349            }
1350
1351            purify_alter_source_add_subsources(
1352                external_references,
1353                options,
1354                desc,
1355                partial_name,
1356                unresolved_source_name,
1357                resolved_source_name,
1358                storage_configuration,
1359            )
1360            .await
1361        }
1362        AlterSourceAction::RefreshReferences => {
1363            purify_alter_source_refresh_references(
1364                desc,
1365                resolved_source_name,
1366                storage_configuration,
1367            )
1368            .await
1369        }
1370        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1371            alter_source_stmt: AlterSourceStatement {
1372                source_name: unresolved_source_name,
1373                action,
1374                if_exists,
1375            },
1376        }),
1377    }
1378}
1379
1380// TODO(database-issues#8620): Remove once subsources are removed
1381/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1382async fn purify_alter_source_add_subsources(
1383    external_references: Vec<ExternalReferenceExport>,
1384    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1385    desc: SourceDesc,
1386    partial_source_name: PartialItemName,
1387    unresolved_source_name: UnresolvedItemName,
1388    resolved_source_name: ResolvedItemName,
1389    storage_configuration: &StorageConfiguration,
1390) -> Result<PurifiedStatement, PlanError> {
1391    // Validate this is a source that can have subsources added.
1392    let connection_id = match &desc.connection {
1393        GenericSourceConnection::Postgres(c) => c.connection_id,
1394        GenericSourceConnection::MySql(c) => c.connection_id,
1395        GenericSourceConnection::SqlServer(c) => c.connection_id,
1396        _ => sql_bail!(
1397            "source {} does not support ALTER SOURCE.",
1398            partial_source_name
1399        ),
1400    };
1401
1402    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1403        text_columns,
1404        exclude_columns,
1405        details,
1406        seen: _,
1407    } = options.clone().try_into()?;
1408    assert_none!(details, "details cannot be explicitly set");
1409
1410    let mut requested_subsource_map = BTreeMap::new();
1411
1412    match desc.connection {
1413        GenericSourceConnection::Postgres(pg_source_connection) => {
1414            // Get PostgresConnection for generating subsources.
1415            let pg_connection = &pg_source_connection.connection;
1416
1417            let client = pg_connection
1418                .validate(connection_id, storage_configuration)
1419                .await?;
1420
1421            let reference_client = SourceReferenceClient::Postgres {
1422                client: &client,
1423                publication: &pg_source_connection.publication,
1424                database: &pg_connection.database,
1425            };
1426            let retrieved_source_references = reference_client.get_source_references().await?;
1427
1428            let postgres::PurifiedSourceExports {
1429                source_exports: subsources,
1430                normalized_text_columns,
1431            } = postgres::purify_source_exports(
1432                &client,
1433                &retrieved_source_references,
1434                &Some(ExternalReferences::SubsetTables(external_references)),
1435                text_columns,
1436                exclude_columns,
1437                &unresolved_source_name,
1438                &SourceReferencePolicy::Required,
1439            )
1440            .await?;
1441
1442            if let Some(text_cols_option) = options
1443                .iter_mut()
1444                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1445            {
1446                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1447            }
1448
1449            requested_subsource_map.extend(subsources);
1450        }
1451        GenericSourceConnection::MySql(mysql_source_connection) => {
1452            let mysql_connection = &mysql_source_connection.connection;
1453            let config = mysql_connection
1454                .config(
1455                    &storage_configuration.connection_context.secrets_reader,
1456                    storage_configuration,
1457                    InTask::No,
1458                )
1459                .await?;
1460
1461            let mut conn = config
1462                .connect(
1463                    "mysql purification",
1464                    &storage_configuration.connection_context.ssh_tunnel_manager,
1465                )
1466                .await?;
1467
1468            // Retrieve the current @gtid_executed value of the server to mark as the effective
1469            // initial snapshot point for these subsources.
1470            let initial_gtid_set =
1471                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1472
1473            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1474
1475            let reference_client = SourceReferenceClient::MySql {
1476                conn: &mut conn,
1477                include_system_schemas: mysql::references_system_schemas(&requested_references),
1478            };
1479            let retrieved_source_references = reference_client.get_source_references().await?;
1480
1481            let mysql::PurifiedSourceExports {
1482                source_exports: subsources,
1483                normalized_text_columns,
1484                normalized_exclude_columns,
1485            } = mysql::purify_source_exports(
1486                &mut conn,
1487                &retrieved_source_references,
1488                &requested_references,
1489                text_columns,
1490                exclude_columns,
1491                &unresolved_source_name,
1492                initial_gtid_set,
1493                &SourceReferencePolicy::Required,
1494            )
1495            .await?;
1496            requested_subsource_map.extend(subsources);
1497
1498            // Update options with the purified details
1499            if let Some(text_cols_option) = options
1500                .iter_mut()
1501                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1502            {
1503                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1504            }
1505            if let Some(exclude_cols_option) = options
1506                .iter_mut()
1507                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1508            {
1509                exclude_cols_option.value =
1510                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1511            }
1512        }
1513        GenericSourceConnection::SqlServer(sql_server_source) => {
1514            // Open a connection to the upstream SQL Server instance.
1515            let sql_server_connection = &sql_server_source.connection;
1516            let config = sql_server_connection
1517                .resolve_config(
1518                    &storage_configuration.connection_context.secrets_reader,
1519                    storage_configuration,
1520                    InTask::No,
1521                )
1522                .await?;
1523            let mut client = mz_sql_server_util::Client::connect(config).await?;
1524
1525            // Query the upstream SQL Server instance for available tables to replicate.
1526            let database = sql_server_connection.database.clone().into();
1527            let source_references = SourceReferenceClient::SqlServer {
1528                client: &mut client,
1529                database: Arc::clone(&database),
1530            }
1531            .get_source_references()
1532            .await?;
1533            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1534
1535            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1536                .get(storage_configuration.config_set());
1537
1538            let result = sql_server::purify_source_exports(
1539                &*database,
1540                &mut client,
1541                &source_references,
1542                &requested_references,
1543                &text_columns,
1544                &exclude_columns,
1545                &unresolved_source_name,
1546                timeout,
1547                &SourceReferencePolicy::Required,
1548            )
1549            .await;
1550            let sql_server::PurifiedSourceExports {
1551                source_exports,
1552                normalized_text_columns,
1553                normalized_excl_columns,
1554            } = result?;
1555
1556            // Add the new exports to our subsource map.
1557            requested_subsource_map.extend(source_exports);
1558
1559            // Update options on the CREATE SOURCE statement with the purified details.
1560            if let Some(text_cols_option) = options
1561                .iter_mut()
1562                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1563            {
1564                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1565            }
1566            if let Some(exclude_cols_option) = options
1567                .iter_mut()
1568                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1569            {
1570                exclude_cols_option.value =
1571                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1572            }
1573        }
1574        _ => unreachable!(),
1575    };
1576
1577    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1578        source_name: resolved_source_name,
1579        options,
1580        subsources: requested_subsource_map,
1581    })
1582}
1583
1584async fn purify_alter_source_refresh_references(
1585    desc: SourceDesc,
1586    resolved_source_name: ResolvedItemName,
1587    storage_configuration: &StorageConfiguration,
1588) -> Result<PurifiedStatement, PlanError> {
1589    let retrieved_source_references = match desc.connection {
1590        GenericSourceConnection::Postgres(pg_source_connection) => {
1591            // Get PostgresConnection for generating subsources.
1592            let pg_connection = &pg_source_connection.connection;
1593
1594            let config = pg_connection
1595                .config(
1596                    &storage_configuration.connection_context.secrets_reader,
1597                    storage_configuration,
1598                    InTask::No,
1599                )
1600                .await?;
1601
1602            let client = config
1603                .connect(
1604                    "postgres_purification",
1605                    &storage_configuration.connection_context.ssh_tunnel_manager,
1606                )
1607                .await?;
1608            let reference_client = SourceReferenceClient::Postgres {
1609                client: &client,
1610                publication: &pg_source_connection.publication,
1611                database: &pg_connection.database,
1612            };
1613            reference_client.get_source_references().await?
1614        }
1615        GenericSourceConnection::MySql(mysql_source_connection) => {
1616            let mysql_connection = &mysql_source_connection.connection;
1617            let config = mysql_connection
1618                .config(
1619                    &storage_configuration.connection_context.secrets_reader,
1620                    storage_configuration,
1621                    InTask::No,
1622                )
1623                .await?;
1624
1625            let mut conn = config
1626                .connect(
1627                    "mysql purification",
1628                    &storage_configuration.connection_context.ssh_tunnel_manager,
1629                )
1630                .await?;
1631
1632            let reference_client = SourceReferenceClient::MySql {
1633                conn: &mut conn,
1634                include_system_schemas: false,
1635            };
1636            reference_client.get_source_references().await?
1637        }
1638        GenericSourceConnection::SqlServer(sql_server_source) => {
1639            // Open a connection to the upstream SQL Server instance.
1640            let sql_server_connection = &sql_server_source.connection;
1641            let config = sql_server_connection
1642                .resolve_config(
1643                    &storage_configuration.connection_context.secrets_reader,
1644                    storage_configuration,
1645                    InTask::No,
1646                )
1647                .await?;
1648            let mut client = mz_sql_server_util::Client::connect(config).await?;
1649
1650            // Query the upstream SQL Server instance for available tables to replicate.
1651            let source_references = SourceReferenceClient::SqlServer {
1652                client: &mut client,
1653                database: sql_server_connection.database.clone().into(),
1654            }
1655            .get_source_references()
1656            .await?;
1657            source_references
1658        }
1659        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1660            let reference_client = SourceReferenceClient::LoadGenerator {
1661                generator: &load_gen_connection.load_generator,
1662            };
1663            reference_client.get_source_references().await?
1664        }
1665        GenericSourceConnection::Kafka(kafka_conn) => {
1666            let reference_client = SourceReferenceClient::Kafka {
1667                topic: &kafka_conn.topic,
1668            };
1669            reference_client.get_source_references().await?
1670        }
1671    };
1672    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1673        source_name: resolved_source_name,
1674        available_source_references: retrieved_source_references.available_source_references(),
1675    })
1676}
1677
1678async fn purify_create_table_from_source(
1679    catalog: impl SessionCatalog,
1680    mut stmt: CreateTableFromSourceStatement<Aug>,
1681    storage_configuration: &StorageConfiguration,
1682) -> Result<PurifiedStatement, PlanError> {
1683    let scx = StatementContext::new(None, &catalog);
1684    let CreateTableFromSourceStatement {
1685        name: _,
1686        columns,
1687        constraints,
1688        source: source_name,
1689        if_not_exists: _,
1690        external_reference,
1691        format,
1692        envelope,
1693        include_metadata: _,
1694        with_options,
1695    } = &mut stmt;
1696
1697    // Columns and constraints cannot be specified by the user but will be populated below.
1698    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1699        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1700    }
1701    if !constraints.is_empty() {
1702        sql_bail!(
1703            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1704        );
1705    }
1706
1707    // Get the source item
1708    let item = match scx.get_item_by_resolved_name(source_name) {
1709        Ok(item) => item,
1710        Err(e) => return Err(e),
1711    };
1712
1713    // Ensure it's an ingestion-based and alterable source.
1714    let desc = match item.source_desc()? {
1715        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1716        None => {
1717            sql_bail!("cannot ALTER this type of source")
1718        }
1719    };
1720    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1721
1722    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1723        text_columns,
1724        exclude_columns,
1725        retain_history: _,
1726        details,
1727        partition_by: _,
1728        seen: _,
1729    } = with_options.clone().try_into()?;
1730    assert_none!(details, "details cannot be explicitly set");
1731
1732    // Our text column values are unqualified (just column names), but the purification methods below
1733    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1734    // need to qualify them using the external reference first.
1735    let qualified_text_columns = text_columns
1736        .iter()
1737        .map(|col| {
1738            UnresolvedItemName(
1739                external_reference
1740                    .as_ref()
1741                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1742                    .unwrap_or_else(|| vec![col.clone()]),
1743            )
1744        })
1745        .collect_vec();
1746    let qualified_exclude_columns = exclude_columns
1747        .iter()
1748        .map(|col| {
1749            UnresolvedItemName(
1750                external_reference
1751                    .as_ref()
1752                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1753                    .unwrap_or_else(|| vec![col.clone()]),
1754            )
1755        })
1756        .collect_vec();
1757
1758    // Should be overriden below if a source-specific format is required.
1759    let mut format_options = SourceFormatOptions::Default;
1760
1761    let retrieved_source_references: RetrievedSourceReferences;
1762
1763    let requested_references = external_reference.as_ref().map(|ref_name| {
1764        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1765            reference: ref_name.clone(),
1766            alias: None,
1767        }])
1768    });
1769
1770    // Run purification work specific to each source type: resolve the external reference to
1771    // a fully qualified name and obtain the appropriate details for the source-export statement
1772    let purified_export = match desc.connection {
1773        GenericSourceConnection::Postgres(pg_source_connection) => {
1774            // Get PostgresConnection for generating subsources.
1775            let pg_connection = &pg_source_connection.connection;
1776
1777            let client = pg_connection
1778                .validate(pg_source_connection.connection_id, storage_configuration)
1779                .await?;
1780
1781            let reference_client = SourceReferenceClient::Postgres {
1782                client: &client,
1783                publication: &pg_source_connection.publication,
1784                database: &pg_connection.database,
1785            };
1786            retrieved_source_references = reference_client.get_source_references().await?;
1787
1788            let postgres::PurifiedSourceExports {
1789                source_exports,
1790                // TODO(database-issues#8620): Remove once subsources are removed
1791                // This `normalized_text_columns` is not relevant for us and is only returned for
1792                // `CREATE SOURCE` statements that automatically generate subsources
1793                normalized_text_columns: _,
1794            } = postgres::purify_source_exports(
1795                &client,
1796                &retrieved_source_references,
1797                &requested_references,
1798                qualified_text_columns,
1799                qualified_exclude_columns,
1800                &unresolved_source_name,
1801                &SourceReferencePolicy::Required,
1802            )
1803            .await?;
1804            // There should be exactly one source_export returned for this statement
1805            let (_, purified_export) = source_exports.into_element();
1806            purified_export
1807        }
1808        GenericSourceConnection::MySql(mysql_source_connection) => {
1809            let mysql_connection = &mysql_source_connection.connection;
1810            let config = mysql_connection
1811                .config(
1812                    &storage_configuration.connection_context.secrets_reader,
1813                    storage_configuration,
1814                    InTask::No,
1815                )
1816                .await?;
1817
1818            let mut conn = config
1819                .connect(
1820                    "mysql purification",
1821                    &storage_configuration.connection_context.ssh_tunnel_manager,
1822                )
1823                .await?;
1824
1825            // Retrieve the current @gtid_executed value of the server to mark as the effective
1826            // initial snapshot point for this table.
1827            let initial_gtid_set =
1828                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1829
1830            let reference_client = SourceReferenceClient::MySql {
1831                conn: &mut conn,
1832                include_system_schemas: mysql::references_system_schemas(&requested_references),
1833            };
1834            retrieved_source_references = reference_client.get_source_references().await?;
1835
1836            let mysql::PurifiedSourceExports {
1837                source_exports,
1838                // TODO(database-issues#8620): Remove once subsources are removed
1839                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1840                // `CREATE SOURCE` statements that automatically generate subsources
1841                normalized_text_columns: _,
1842                normalized_exclude_columns: _,
1843            } = mysql::purify_source_exports(
1844                &mut conn,
1845                &retrieved_source_references,
1846                &requested_references,
1847                qualified_text_columns,
1848                qualified_exclude_columns,
1849                &unresolved_source_name,
1850                initial_gtid_set,
1851                &SourceReferencePolicy::Required,
1852            )
1853            .await?;
1854            // There should be exactly one source_export returned for this statement
1855            let (_, purified_export) = source_exports.into_element();
1856            purified_export
1857        }
1858        GenericSourceConnection::SqlServer(sql_server_source) => {
1859            let connection = sql_server_source.connection;
1860            let config = connection
1861                .resolve_config(
1862                    &storage_configuration.connection_context.secrets_reader,
1863                    storage_configuration,
1864                    InTask::No,
1865                )
1866                .await?;
1867            let mut client = mz_sql_server_util::Client::connect(config).await?;
1868
1869            let database: Arc<str> = connection.database.into();
1870            let reference_client = SourceReferenceClient::SqlServer {
1871                client: &mut client,
1872                database: Arc::clone(&database),
1873            };
1874            retrieved_source_references = reference_client.get_source_references().await?;
1875            tracing::debug!(?retrieved_source_references, "got source references");
1876
1877            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1878                .get(storage_configuration.config_set());
1879
1880            let purified_source_exports = sql_server::purify_source_exports(
1881                &*database,
1882                &mut client,
1883                &retrieved_source_references,
1884                &requested_references,
1885                &qualified_text_columns,
1886                &qualified_exclude_columns,
1887                &unresolved_source_name,
1888                timeout,
1889                &SourceReferencePolicy::Required,
1890            )
1891            .await?;
1892
1893            // There should be exactly one source_export returned for this statement
1894            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1895            purified_export
1896        }
1897        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1898            let reference_client = SourceReferenceClient::LoadGenerator {
1899                generator: &load_gen_connection.load_generator,
1900            };
1901            retrieved_source_references = reference_client.get_source_references().await?;
1902
1903            let requested_exports = retrieved_source_references
1904                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1905            // There should be exactly one source_export returned
1906            let export = requested_exports.into_element();
1907            PurifiedSourceExport {
1908                external_reference: export.external_reference,
1909                details: PurifiedExportDetails::LoadGenerator {
1910                    table: export
1911                        .meta
1912                        .load_generator_desc()
1913                        .expect("is loadgen")
1914                        .clone(),
1915                    output: export
1916                        .meta
1917                        .load_generator_output()
1918                        .expect("is loadgen")
1919                        .clone(),
1920                },
1921            }
1922        }
1923        GenericSourceConnection::Kafka(kafka_conn) => {
1924            let reference_client = SourceReferenceClient::Kafka {
1925                topic: &kafka_conn.topic,
1926            };
1927            retrieved_source_references = reference_client.get_source_references().await?;
1928            let requested_exports = retrieved_source_references
1929                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1930            // There should be exactly one source_export returned
1931            let export = requested_exports.into_element();
1932
1933            format_options = SourceFormatOptions::Kafka {
1934                topic: kafka_conn.topic.clone(),
1935            };
1936            PurifiedSourceExport {
1937                external_reference: export.external_reference,
1938                details: PurifiedExportDetails::Kafka {},
1939            }
1940        }
1941    };
1942
1943    purify_source_format(
1944        &catalog,
1945        format,
1946        &format_options,
1947        envelope,
1948        storage_configuration,
1949    )
1950    .await?;
1951
1952    // Update the external reference in the statement to the resolved fully-qualified
1953    // external reference
1954    *external_reference = Some(purified_export.external_reference.clone());
1955
1956    // Update options in the statement using the purified export details
1957    match &purified_export.details {
1958        PurifiedExportDetails::Postgres { .. } => {
1959            let mut unsupported_cols = vec![];
1960            let postgres::PostgresExportStatementValues {
1961                columns: gen_columns,
1962                constraints: gen_constraints,
1963                text_columns: gen_text_columns,
1964                exclude_columns: gen_exclude_columns,
1965                details: gen_details,
1966                external_reference: _,
1967            } = postgres::generate_source_export_statement_values(
1968                &scx,
1969                purified_export,
1970                &mut unsupported_cols,
1971            )?;
1972            if !unsupported_cols.is_empty() {
1973                unsupported_cols.sort();
1974                Err(PgSourcePurificationError::UnrecognizedTypes {
1975                    cols: unsupported_cols,
1976                })?;
1977            }
1978
1979            if let Some(text_cols_option) = with_options
1980                .iter_mut()
1981                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1982            {
1983                match gen_text_columns {
1984                    Some(gen_text_columns) => {
1985                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1986                    }
1987                    None => soft_panic_or_log!(
1988                        "text_columns should be Some if text_cols_option is present"
1989                    ),
1990                }
1991            }
1992            if let Some(exclude_cols_option) = with_options
1993                .iter_mut()
1994                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
1995            {
1996                match gen_exclude_columns {
1997                    Some(gen_exclude_columns) => {
1998                        exclude_cols_option.value =
1999                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2000                    }
2001                    None => soft_panic_or_log!(
2002                        "exclude_columns should be Some if exclude_cols_option is present"
2003                    ),
2004                }
2005            }
2006            match columns {
2007                TableFromSourceColumns::Defined(_) => unreachable!(),
2008                TableFromSourceColumns::NotSpecified => {
2009                    *columns = TableFromSourceColumns::Defined(gen_columns);
2010                    *constraints = gen_constraints;
2011                }
2012                TableFromSourceColumns::Named(_) => {
2013                    sql_bail!("columns cannot be named for Postgres sources")
2014                }
2015            }
2016            with_options.push(TableFromSourceOption {
2017                name: TableFromSourceOptionName::Details,
2018                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2019                    gen_details.into_proto().encode_to_vec(),
2020                )))),
2021            })
2022        }
2023        PurifiedExportDetails::MySql { .. } => {
2024            let mysql::MySqlExportStatementValues {
2025                columns: gen_columns,
2026                constraints: gen_constraints,
2027                text_columns: gen_text_columns,
2028                exclude_columns: gen_exclude_columns,
2029                details: gen_details,
2030                external_reference: _,
2031            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2032
2033            if let Some(text_cols_option) = with_options
2034                .iter_mut()
2035                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2036            {
2037                match gen_text_columns {
2038                    Some(gen_text_columns) => {
2039                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2040                    }
2041                    None => soft_panic_or_log!(
2042                        "text_columns should be Some if text_cols_option is present"
2043                    ),
2044                }
2045            }
2046            if let Some(exclude_cols_option) = with_options
2047                .iter_mut()
2048                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2049            {
2050                match gen_exclude_columns {
2051                    Some(gen_exclude_columns) => {
2052                        exclude_cols_option.value =
2053                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2054                    }
2055                    None => soft_panic_or_log!(
2056                        "exclude_columns should be Some if exclude_cols_option is present"
2057                    ),
2058                }
2059            }
2060            match columns {
2061                TableFromSourceColumns::Defined(_) => unreachable!(),
2062                TableFromSourceColumns::NotSpecified => {
2063                    *columns = TableFromSourceColumns::Defined(gen_columns);
2064                    *constraints = gen_constraints;
2065                }
2066                TableFromSourceColumns::Named(_) => {
2067                    sql_bail!("columns cannot be named for MySQL sources")
2068                }
2069            }
2070            with_options.push(TableFromSourceOption {
2071                name: TableFromSourceOptionName::Details,
2072                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2073                    gen_details.into_proto().encode_to_vec(),
2074                )))),
2075            })
2076        }
2077        PurifiedExportDetails::SqlServer { .. } => {
2078            let sql_server::SqlServerExportStatementValues {
2079                columns: gen_columns,
2080                constraints: gen_constraints,
2081                text_columns: gen_text_columns,
2082                excl_columns: gen_excl_columns,
2083                details: gen_details,
2084                external_reference: _,
2085            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2086
2087            if let Some(text_cols_option) = with_options
2088                .iter_mut()
2089                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2090            {
2091                match gen_text_columns {
2092                    Some(gen_text_columns) => {
2093                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2094                    }
2095                    None => soft_panic_or_log!(
2096                        "text_columns should be Some if text_cols_option is present"
2097                    ),
2098                }
2099            }
2100            if let Some(exclude_cols_option) = with_options
2101                .iter_mut()
2102                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2103            {
2104                match gen_excl_columns {
2105                    Some(gen_excl_columns) => {
2106                        exclude_cols_option.value =
2107                            Some(WithOptionValue::Sequence(gen_excl_columns))
2108                    }
2109                    None => soft_panic_or_log!(
2110                        "excl_columns should be Some if excl_cols_option is present"
2111                    ),
2112                }
2113            }
2114
2115            match columns {
2116                TableFromSourceColumns::NotSpecified => {
2117                    *columns = TableFromSourceColumns::Defined(gen_columns);
2118                    *constraints = gen_constraints;
2119                }
2120                TableFromSourceColumns::Named(_) => {
2121                    sql_bail!("columns cannot be named for SQL Server sources")
2122                }
2123                TableFromSourceColumns::Defined(_) => unreachable!(),
2124            }
2125
2126            with_options.push(TableFromSourceOption {
2127                name: TableFromSourceOptionName::Details,
2128                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2129                    gen_details.into_proto().encode_to_vec(),
2130                )))),
2131            })
2132        }
2133        PurifiedExportDetails::LoadGenerator { .. } => {
2134            let (desc, output) = match purified_export.details {
2135                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2136                _ => unreachable!("purified export details must be load generator"),
2137            };
2138            // We only determine the table description for multi-output load generator sources here,
2139            // whereas single-output load generators will have their relation description
2140            // determined during statment planning as envelope and format options may affect their
2141            // schema.
2142            if let Some(desc) = desc {
2143                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2144                match columns {
2145                    TableFromSourceColumns::Defined(_) => unreachable!(),
2146                    TableFromSourceColumns::NotSpecified => {
2147                        *columns = TableFromSourceColumns::Defined(gen_columns);
2148                        *constraints = gen_constraints;
2149                    }
2150                    TableFromSourceColumns::Named(_) => {
2151                        sql_bail!("columns cannot be named for multi-output load generator sources")
2152                    }
2153                }
2154            }
2155            let details = SourceExportStatementDetails::LoadGenerator { output };
2156            with_options.push(TableFromSourceOption {
2157                name: TableFromSourceOptionName::Details,
2158                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2159                    details.into_proto().encode_to_vec(),
2160                )))),
2161            })
2162        }
2163        PurifiedExportDetails::Kafka {} => {
2164            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2165            // format field, so we don't specify any columns or constraints to be stored
2166            // on the statement here. The RelationDesc will be determined during planning.
2167            let details = SourceExportStatementDetails::Kafka {};
2168            with_options.push(TableFromSourceOption {
2169                name: TableFromSourceOptionName::Details,
2170                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2171                    details.into_proto().encode_to_vec(),
2172                )))),
2173            })
2174        }
2175    };
2176
2177    // TODO: We might as well use the retrieved available references to update the source
2178    // available references table in the catalog, so plumb this through.
2179    // available_source_references: retrieved_source_references.available_source_references(),
2180    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2181}
2182
2183enum SourceFormatOptions {
2184    Default,
2185    Kafka { topic: String },
2186}
2187
2188async fn purify_source_format(
2189    catalog: &dyn SessionCatalog,
2190    format: &mut Option<FormatSpecifier<Aug>>,
2191    options: &SourceFormatOptions,
2192    envelope: &Option<SourceEnvelope>,
2193    storage_configuration: &StorageConfiguration,
2194) -> Result<(), PlanError> {
2195    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2196        && !matches!(options, SourceFormatOptions::Kafka { .. })
2197    {
2198        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2199    }
2200
2201    match format.as_mut() {
2202        None => {}
2203        Some(FormatSpecifier::Bare(format)) => {
2204            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2205                .await?;
2206        }
2207
2208        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2209            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2210                .await?;
2211            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2212                .await?;
2213        }
2214    }
2215    Ok(())
2216}
2217
2218async fn purify_source_format_single(
2219    catalog: &dyn SessionCatalog,
2220    format: &mut Format<Aug>,
2221    options: &SourceFormatOptions,
2222    envelope: &Option<SourceEnvelope>,
2223    storage_configuration: &StorageConfiguration,
2224) -> Result<(), PlanError> {
2225    match format {
2226        Format::Avro(schema) => match schema {
2227            AvroSchema::Csr { csr_connection } => {
2228                purify_csr_connection_avro(
2229                    catalog,
2230                    options,
2231                    csr_connection,
2232                    envelope,
2233                    storage_configuration,
2234                )
2235                .await?
2236            }
2237            AvroSchema::InlineSchema { .. } => {}
2238        },
2239        Format::Protobuf(schema) => match schema {
2240            ProtobufSchema::Csr { csr_connection } => {
2241                purify_csr_connection_proto(
2242                    catalog,
2243                    options,
2244                    csr_connection,
2245                    envelope,
2246                    storage_configuration,
2247                )
2248                .await?;
2249            }
2250            ProtobufSchema::InlineSchema { .. } => {}
2251        },
2252        Format::Bytes
2253        | Format::Regex(_)
2254        | Format::Json { .. }
2255        | Format::Text
2256        | Format::Csv { .. } => (),
2257    }
2258    Ok(())
2259}
2260
2261pub fn generate_subsource_statements(
2262    scx: &StatementContext,
2263    source_name: ResolvedItemName,
2264    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2265) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2266    // get the first subsource to determine the connection type
2267    if subsources.is_empty() {
2268        return Ok(vec![]);
2269    }
2270    let (_, purified_export) = subsources.iter().next().unwrap();
2271
2272    let statements = match &purified_export.details {
2273        PurifiedExportDetails::Postgres { .. } => {
2274            crate::pure::postgres::generate_create_subsource_statements(
2275                scx,
2276                source_name,
2277                subsources,
2278            )?
2279        }
2280        PurifiedExportDetails::MySql { .. } => {
2281            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2282        }
2283        PurifiedExportDetails::SqlServer { .. } => {
2284            crate::pure::sql_server::generate_create_subsource_statements(
2285                scx,
2286                source_name,
2287                subsources,
2288            )?
2289        }
2290        PurifiedExportDetails::LoadGenerator { .. } => {
2291            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2292            for (subsource_name, purified_export) in subsources {
2293                let (desc, output) = match purified_export.details {
2294                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2295                    _ => unreachable!("purified export details must be load generator"),
2296                };
2297                let desc =
2298                    desc.expect("subsources cannot be generated for single-output load generators");
2299
2300                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2301                let details = SourceExportStatementDetails::LoadGenerator { output };
2302                // Create the subsource statement
2303                let subsource = CreateSubsourceStatement {
2304                    name: subsource_name,
2305                    columns,
2306                    of_source: Some(source_name.clone()),
2307                    // unlike sources that come from an external upstream, we
2308                    // have more leniency to introduce different constraints
2309                    // every time the load generator is run; i.e. we are not as
2310                    // worried about introducing junk data.
2311                    constraints: table_constraints,
2312                    if_not_exists: false,
2313                    with_options: vec![
2314                        CreateSubsourceOption {
2315                            name: CreateSubsourceOptionName::ExternalReference,
2316                            value: Some(WithOptionValue::UnresolvedItemName(
2317                                purified_export.external_reference,
2318                            )),
2319                        },
2320                        CreateSubsourceOption {
2321                            name: CreateSubsourceOptionName::Details,
2322                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2323                                details.into_proto().encode_to_vec(),
2324                            )))),
2325                        },
2326                    ],
2327                };
2328                subsource_stmts.push(subsource);
2329            }
2330
2331            subsource_stmts
2332        }
2333        PurifiedExportDetails::Kafka { .. } => {
2334            // TODO: as part of database-issues#8322, Kafka sources will begin
2335            // producing data––we'll need to understand the schema
2336            // of the output here.
2337            assert!(
2338                subsources.is_empty(),
2339                "Kafka sources do not produce data-bearing subsources"
2340            );
2341            vec![]
2342        }
2343    };
2344    Ok(statements)
2345}
2346
2347async fn purify_csr_connection_proto(
2348    catalog: &dyn SessionCatalog,
2349    options: &SourceFormatOptions,
2350    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2351    envelope: &Option<SourceEnvelope>,
2352    storage_configuration: &StorageConfiguration,
2353) -> Result<(), PlanError> {
2354    let SourceFormatOptions::Kafka { topic } = options else {
2355        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2356    };
2357
2358    let CsrConnectionProtobuf {
2359        seed,
2360        connection: CsrConnection {
2361            connection,
2362            options: _,
2363        },
2364    } = csr_connection;
2365    match seed {
2366        None => {
2367            let scx = StatementContext::new(None, &*catalog);
2368
2369            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2370                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2371                _ => sql_bail!("{} is not a schema registry connection", connection),
2372            };
2373
2374            let ccsr_client = ccsr_connection
2375                .connect(storage_configuration, InTask::No)
2376                .await
2377                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2378
2379            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2380            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2381                .await
2382                .ok();
2383
2384            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2385                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2386            }
2387
2388            *seed = Some(CsrSeedProtobuf { value, key });
2389        }
2390        Some(_) => (),
2391    }
2392
2393    Ok(())
2394}
2395
2396async fn purify_csr_connection_avro(
2397    catalog: &dyn SessionCatalog,
2398    options: &SourceFormatOptions,
2399    csr_connection: &mut CsrConnectionAvro<Aug>,
2400    envelope: &Option<SourceEnvelope>,
2401    storage_configuration: &StorageConfiguration,
2402) -> Result<(), PlanError> {
2403    let SourceFormatOptions::Kafka { topic } = options else {
2404        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2405    };
2406
2407    let CsrConnectionAvro {
2408        connection: CsrConnection { connection, .. },
2409        seed,
2410        key_strategy,
2411        value_strategy,
2412    } = csr_connection;
2413    if seed.is_none() {
2414        let scx = StatementContext::new(None, &*catalog);
2415        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2416            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2417            _ => sql_bail!("{} is not a schema registry connection", connection),
2418        };
2419        let ccsr_client = csr_connection
2420            .connect(storage_configuration, InTask::No)
2421            .await
2422            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2423
2424        let Schema {
2425            key_schema,
2426            value_schema,
2427        } = get_remote_csr_schema(
2428            &ccsr_client,
2429            key_strategy.clone().unwrap_or_default(),
2430            value_strategy.clone().unwrap_or_default(),
2431            topic,
2432        )
2433        .await?;
2434        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2435            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2436        }
2437
2438        *seed = Some(CsrSeedAvro {
2439            key_schema,
2440            value_schema,
2441        })
2442    }
2443
2444    Ok(())
2445}
2446
2447#[derive(Debug)]
2448pub struct Schema {
2449    pub key_schema: Option<String>,
2450    pub value_schema: String,
2451}
2452
2453async fn get_schema_with_strategy(
2454    client: &Client,
2455    strategy: ReaderSchemaSelectionStrategy,
2456    subject: &str,
2457) -> Result<Option<String>, PlanError> {
2458    match strategy {
2459        ReaderSchemaSelectionStrategy::Latest => {
2460            match client.get_schema_by_subject(subject).await {
2461                Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2462                Err(GetBySubjectError::SubjectNotFound)
2463                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2464                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2465                    schema_lookup: format!("subject {}", subject.quoted()),
2466                    cause: Arc::new(e),
2467                }),
2468            }
2469        }
2470        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2471        ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2472            Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2473            Err(GetByIdError::SchemaNotFound) => Ok(None),
2474            Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2475                schema_lookup: format!("ID {}", id),
2476                cause: Arc::new(e),
2477            }),
2478        },
2479    }
2480}
2481
2482async fn get_remote_csr_schema(
2483    ccsr_client: &mz_ccsr::Client,
2484    key_strategy: ReaderSchemaSelectionStrategy,
2485    value_strategy: ReaderSchemaSelectionStrategy,
2486    topic: &str,
2487) -> Result<Schema, PlanError> {
2488    let value_schema_name = format!("{}-value", topic);
2489    let value_schema =
2490        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2491    let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2492    let subject = format!("{}-key", topic);
2493    let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2494    Ok(Schema {
2495        key_schema,
2496        value_schema,
2497    })
2498}
2499
2500/// Collect protobuf message descriptor from CSR and compile the descriptor.
2501async fn compile_proto(
2502    subject_name: &String,
2503    ccsr_client: &Client,
2504) -> Result<CsrSeedProtobufSchema, PlanError> {
2505    let (primary_subject, dependency_subjects) = ccsr_client
2506        .get_subject_and_references(subject_name)
2507        .await
2508        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2509            schema_lookup: format!("subject {}", subject_name.quoted()),
2510            cause: Arc::new(e),
2511        })?;
2512
2513    // Compile .proto files into a file descriptor set.
2514    let mut source_tree = VirtualSourceTree::new();
2515    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2516        source_tree.as_mut().add_file(
2517            Path::new(&subject.name),
2518            subject.schema.raw.as_bytes().to_vec(),
2519        );
2520    }
2521    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2522    let fds = db
2523        .as_mut()
2524        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2525        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2526
2527    // Ensure there is exactly one message in the file.
2528    let primary_fd = fds.file(0);
2529    let message_name = match primary_fd.message_type_size() {
2530        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2531        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2532        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2533    };
2534
2535    // Encode the file descriptor set into a SQL byte string.
2536    let bytes = &fds
2537        .serialize()
2538        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2539    let mut schema = String::new();
2540    strconv::format_bytes(&mut schema, bytes);
2541
2542    Ok(CsrSeedProtobufSchema {
2543        schema,
2544        message_name,
2545    })
2546}
2547
2548const MZ_NOW_NAME: &str = "mz_now";
2549const MZ_NOW_SCHEMA: &str = "mz_catalog";
2550
2551/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2552/// references to ids appear or disappear during the purification.
2553///
2554/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2555/// this function is not making any network calls.
2556pub fn purify_create_materialized_view_options(
2557    catalog: impl SessionCatalog,
2558    mz_now: Option<Timestamp>,
2559    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2560    resolved_ids: &mut ResolvedIds,
2561) {
2562    // 0. Preparations:
2563    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2564    let (mz_now_id, mz_now_expr) = {
2565        let item = catalog
2566            .resolve_function(&PartialItemName {
2567                database: None,
2568                schema: Some(MZ_NOW_SCHEMA.to_string()),
2569                item: MZ_NOW_NAME.to_string(),
2570            })
2571            .expect("we should be able to resolve mz_now");
2572        (
2573            item.id(),
2574            Expr::Function(Function {
2575                name: ResolvedItemName::Item {
2576                    id: item.id(),
2577                    qualifiers: item.name().qualifiers.clone(),
2578                    full_name: catalog.resolve_full_name(item.name()),
2579                    print_id: false,
2580                    version: RelationVersionSelector::Latest,
2581                },
2582                args: FunctionArgs::Args {
2583                    args: Vec::new(),
2584                    order_by: Vec::new(),
2585                },
2586                filter: None,
2587                over: None,
2588                distinct: false,
2589            }),
2590        )
2591    };
2592    // Prepare the `mz_timestamp` type.
2593    let (mz_timestamp_id, mz_timestamp_type) = {
2594        let item = catalog.get_system_type("mz_timestamp");
2595        let full_name = catalog.resolve_full_name(item.name());
2596        (
2597            item.id(),
2598            ResolvedDataType::Named {
2599                id: item.id(),
2600                qualifiers: item.name().qualifiers.clone(),
2601                full_name,
2602                modifiers: vec![],
2603                print_id: true,
2604            },
2605        )
2606    };
2607
2608    let mut introduced_mz_timestamp = false;
2609
2610    for option in cmvs.with_options.iter_mut() {
2611        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2612        if matches!(
2613            option.value,
2614            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2615        ) {
2616            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2617                RefreshAtOptionValue {
2618                    time: mz_now_expr.clone(),
2619                },
2620            )));
2621        }
2622
2623        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2624        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2625            RefreshEveryOptionValue { aligned_to, .. },
2626        ))) = &mut option.value
2627        {
2628            if aligned_to.is_none() {
2629                *aligned_to = Some(mz_now_expr.clone());
2630            }
2631        }
2632
2633        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2634        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2635        match &mut option.value {
2636            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2637                time,
2638            }))) => {
2639                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2640                visitor.visit_expr_mut(time);
2641                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2642            }
2643            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2644                RefreshEveryOptionValue {
2645                    interval: _,
2646                    aligned_to: Some(aligned_to),
2647                },
2648            ))) => {
2649                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2650                visitor.visit_expr_mut(aligned_to);
2651                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2652            }
2653            _ => {}
2654        }
2655    }
2656
2657    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2658    if !cmvs.with_options.iter().any(|o| {
2659        matches!(
2660            o,
2661            MaterializedViewOption {
2662                value: Some(WithOptionValue::Refresh(..)),
2663                ..
2664            }
2665        )
2666    }) {
2667        cmvs.with_options.push(MaterializedViewOption {
2668            name: MaterializedViewOptionName::Refresh,
2669            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2670        })
2671    }
2672
2673    // 5. Attend to `resolved_ids`: The purification might have
2674    // - added references to `mz_timestamp`;
2675    // - removed references to `mz_now`.
2676    if introduced_mz_timestamp {
2677        resolved_ids.add_item(mz_timestamp_id);
2678    }
2679    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2680    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2681    // for `mz_now()` everywhere.
2682    let mut visitor = ExprContainsTemporalVisitor::new();
2683    visitor.visit_create_materialized_view_statement(cmvs);
2684    if !visitor.contains_temporal {
2685        resolved_ids.remove_item(&mz_now_id);
2686    }
2687}
2688
2689/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2690/// after purification.
2691pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2692    match &mvo.value {
2693        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2694            let mut visitor = ExprContainsTemporalVisitor::new();
2695            visitor.visit_expr(time);
2696            visitor.contains_temporal
2697        }
2698        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2699            interval: _,
2700            aligned_to: Some(aligned_to),
2701        }))) => {
2702            let mut visitor = ExprContainsTemporalVisitor::new();
2703            visitor.visit_expr(aligned_to);
2704            visitor.contains_temporal
2705        }
2706        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2707            interval: _,
2708            aligned_to: None,
2709        }))) => {
2710            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2711            // `ALIGNED TO` to `mz_now()`.
2712            true
2713        }
2714        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2715            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2716            true
2717        }
2718        _ => false,
2719    }
2720}
2721
2722/// Determines whether the AST involves `mz_now()`.
2723struct ExprContainsTemporalVisitor {
2724    pub contains_temporal: bool,
2725}
2726
2727impl ExprContainsTemporalVisitor {
2728    pub fn new() -> ExprContainsTemporalVisitor {
2729        ExprContainsTemporalVisitor {
2730            contains_temporal: false,
2731        }
2732    }
2733}
2734
2735impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2736    fn visit_function(&mut self, func: &Function<Aug>) {
2737        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2738        visit_function(self, func);
2739    }
2740}
2741
2742struct MzNowPurifierVisitor {
2743    pub mz_now: Option<Timestamp>,
2744    pub mz_timestamp_type: ResolvedDataType,
2745    pub introduced_mz_timestamp: bool,
2746}
2747
2748impl MzNowPurifierVisitor {
2749    pub fn new(
2750        mz_now: Option<Timestamp>,
2751        mz_timestamp_type: ResolvedDataType,
2752    ) -> MzNowPurifierVisitor {
2753        MzNowPurifierVisitor {
2754            mz_now,
2755            mz_timestamp_type,
2756            introduced_mz_timestamp: false,
2757        }
2758    }
2759}
2760
2761impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2762    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2763        match expr {
2764            Expr::Function(Function {
2765                name:
2766                    ResolvedItemName::Item {
2767                        full_name: FullItemName { item, .. },
2768                        ..
2769                    },
2770                ..
2771            }) if item == &MZ_NOW_NAME.to_string() => {
2772                let mz_now = self.mz_now.expect(
2773                    "we should have chosen a timestamp if the expression contains mz_now()",
2774                );
2775                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2776                // not alter the type of the expression.
2777                *expr = Expr::Cast {
2778                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2779                    data_type: self.mz_timestamp_type.clone(),
2780                };
2781                self.introduced_mz_timestamp = true;
2782            }
2783            _ => visit_expr_mut(self, expr),
2784        }
2785    }
2786}