Skip to main content

mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81    ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103    external_reference: UnresolvedItemName,
104    name: UnresolvedItemName,
105    meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        f.debug_struct("RequestedSourceExport")
111            .field("external_reference", &self.external_reference)
112            .field("name", &self.name)
113            .field("meta", &self.meta)
114            .finish()
115    }
116}
117
118impl<T> RequestedSourceExport<T> {
119    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120        RequestedSourceExport {
121            external_reference: self.external_reference,
122            name: self.name,
123            meta: new_meta,
124        }
125    }
126}
127
128/// Generates a subsource name by prepending source schema name if present
129///
130/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
131/// so that it's generated in the same schema as source
132fn source_export_name_gen(
133    source_name: &UnresolvedItemName,
134    subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137    partial.item = subsource_name.to_string();
138    Ok(UnresolvedItemName::from(partial))
139}
140
141// TODO(database-issues#8620): Remove once subsources are removed
142/// Validates the requested subsources do not have name conflicts with each other
143/// and that the same upstream table is not referenced multiple times.
144fn validate_source_export_names<T>(
145    requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147    // This condition would get caught during the catalog transaction, but produces a
148    // vague, non-contextual error. Instead, error here so we can suggest to the user
149    // how to fix the problem.
150    if let Some(name) = requested_source_exports
151        .iter()
152        .map(|subsource| &subsource.name)
153        .duplicates()
154        .next()
155        .cloned()
156    {
157        let mut upstream_references: Vec<_> = requested_source_exports
158            .into_iter()
159            .filter_map(|subsource| {
160                if &subsource.name == &name {
161                    Some(subsource.external_reference.clone())
162                } else {
163                    None
164                }
165            })
166            .collect();
167
168        upstream_references.sort();
169
170        Err(PlanError::SubsourceNameConflict {
171            name,
172            upstream_references,
173        })?;
174    }
175
176    // We disallow subsource statements from referencing the same upstream table, but we allow
177    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
178    // purification will only provide 1 `requested_source_export`, we can leave this here without
179    // needing to differentiate between the two types of statements.
180    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
181    if let Some(name) = requested_source_exports
182        .iter()
183        .map(|export| &export.external_reference)
184        .duplicates()
185        .next()
186        .cloned()
187    {
188        let mut target_names: Vec<_> = requested_source_exports
189            .into_iter()
190            .filter_map(|export| {
191                if &export.external_reference == &name {
192                    Some(export.name.clone())
193                } else {
194                    None
195                }
196            })
197            .collect();
198
199        target_names.sort();
200
201        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202    }
203
204    Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209    PurifiedCreateSource {
210        // The progress subsource, if we are offloading progress info to a separate relation
211        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212        create_source_stmt: CreateSourceStatement<Aug>,
213        // Map of subsource names to external details
214        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215        /// All the available upstream references that can be added as tables
216        /// to this primary source.
217        available_source_references: SourceReferences,
218    },
219    PurifiedAlterSource {
220        alter_source_stmt: AlterSourceStatement<Aug>,
221    },
222    PurifiedAlterSourceAddSubsources {
223        // This just saves us an annoying catalog lookup
224        source_name: ResolvedItemName,
225        /// Options that we will need the values of to update the source's
226        /// definition.
227        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228        // Map of subsource names to external details
229        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230    },
231    PurifiedAlterSourceRefreshReferences {
232        source_name: ResolvedItemName,
233        /// The updated available upstream references for the primary source.
234        available_source_references: SourceReferences,
235    },
236    PurifiedCreateSink(CreateSinkStatement<Aug>),
237    PurifiedCreateTableFromSource {
238        stmt: CreateTableFromSourceStatement<Aug>,
239    },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244    pub external_reference: UnresolvedItemName,
245    pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250    MySql {
251        table: MySqlTableDesc,
252        text_columns: Option<Vec<Ident>>,
253        exclude_columns: Option<Vec<Ident>>,
254        initial_gtid_set: String,
255    },
256    Postgres {
257        table: PostgresTableDesc,
258        text_columns: Option<Vec<Ident>>,
259        exclude_columns: Option<Vec<Ident>>,
260    },
261    SqlServer {
262        table: SqlServerTableDesc,
263        text_columns: Option<Vec<Ident>>,
264        excl_columns: Option<Vec<Ident>>,
265        capture_instance: Arc<str>,
266        initial_lsn: mz_sql_server_util::cdc::Lsn,
267    },
268    Kafka {},
269    LoadGenerator {
270        table: Option<RelationDesc>,
271        output: LoadGeneratorOutput,
272    },
273}
274
275/// Purifies a statement, removing any dependencies on external state.
276///
277/// See the section on [purification](crate#purification) in the crate
278/// documentation for details.
279///
280/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
281/// handled by [purify_create_materialized_view_options] instead.
282/// This could be made more consistent by a refactoring discussed here:
283/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
284pub async fn purify_statement(
285    catalog: impl SessionCatalog,
286    now: u64,
287    stmt: Statement<Aug>,
288    storage_configuration: &StorageConfiguration,
289) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
290    match stmt {
291        Statement::CreateSource(stmt) => {
292            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
293            (
294                purify_create_source(catalog, now, stmt, storage_configuration).await,
295                cluster_id,
296            )
297        }
298        Statement::AlterSource(stmt) => (
299            purify_alter_source(catalog, stmt, storage_configuration).await,
300            None,
301        ),
302        Statement::CreateSink(stmt) => {
303            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
304            (
305                purify_create_sink(catalog, stmt, storage_configuration).await,
306                cluster_id,
307            )
308        }
309        Statement::CreateTableFromSource(stmt) => (
310            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
311            None,
312        ),
313        o => unreachable!("{:?} does not need to be purified", o),
314    }
315}
316
317/// Injects `DOC ON` comments into all Avro formats that are using a schema
318/// registry by finding all SQL `COMMENT`s that are attached to the sink's
319/// underlying materialized view (as well as any types referenced by that
320/// underlying materialized view).
321pub(crate) fn purify_create_sink_avro_doc_on_options(
322    catalog: &dyn SessionCatalog,
323    from_id: CatalogItemId,
324    format: &mut Option<FormatSpecifier<Aug>>,
325) -> Result<(), PlanError> {
326    // Collect all objects referenced by the sink.
327    let from = catalog.get_item(&from_id);
328    let object_ids = from
329        .references()
330        .items()
331        .copied()
332        .chain_one(from.id())
333        .collect::<Vec<_>>();
334
335    // Collect all Avro formats that use a schema registry, as well as a set of
336    // all identifiers named in user-provided `DOC ON` options.
337    let mut avro_format_options = vec![];
338    for_each_format(format, |doc_on_schema, fmt| match fmt {
339        Format::Avro(AvroSchema::InlineSchema { .. })
340        | Format::Bytes
341        | Format::Csv { .. }
342        | Format::Json { .. }
343        | Format::Protobuf(..)
344        | Format::Regex(..)
345        | Format::Text => (),
346        Format::Avro(AvroSchema::Csr {
347            csr_connection: CsrConnectionAvro { connection, .. },
348        }) => {
349            avro_format_options.push((doc_on_schema, &mut connection.options));
350        }
351    });
352
353    // For each Avro format in the sink, inject the appropriate `DOC ON` options
354    // for each item referenced by the sink.
355    for (for_schema, options) in avro_format_options {
356        let user_provided_comments = options
357            .iter()
358            .filter_map(|CsrConfigOption { name, .. }| match name {
359                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
360                _ => None,
361            })
362            .collect::<BTreeSet<_>>();
363
364        // Adding existing comments if not already provided by user
365        for object_id in &object_ids {
366            // Always add comments to the latest version of the item.
367            let item = catalog
368                .get_item(object_id)
369                .at_version(RelationVersionSelector::Latest);
370            let full_resolved_name = ResolvedItemName::Item {
371                id: *object_id,
372                qualifiers: item.name().qualifiers.clone(),
373                full_name: catalog.resolve_full_name(item.name()),
374                print_id: !matches!(
375                    item.item_type(),
376                    CatalogItemType::Func | CatalogItemType::Type
377                ),
378                version: RelationVersionSelector::Latest,
379            };
380
381            if let Some(comments_map) = catalog.get_item_comments(object_id) {
382                // Attach comment for the item itself, if the user has not
383                // already provided an overriding `DOC ON` option for the item.
384                let doc_on_item_key = AvroDocOn {
385                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
386                    for_schema,
387                };
388                if !user_provided_comments.contains(&doc_on_item_key) {
389                    if let Some(root_comment) = comments_map.get(&None) {
390                        options.push(CsrConfigOption {
391                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
392                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
393                                root_comment.clone(),
394                            ))),
395                        });
396                    }
397                }
398
399                // Attach comment for each column in the item, if the user has
400                // not already provided an overriding `DOC ON` option for the
401                // column.
402                let column_descs = match item.type_details() {
403                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
404                    None => item.relation_desc().map(|d| d.into_owned()),
405                };
406
407                if let Some(desc) = column_descs {
408                    for (pos, column_name) in desc.iter_names().enumerate() {
409                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
410                            let doc_on_column_key = AvroDocOn {
411                                identifier: DocOnIdentifier::Column(ColumnName {
412                                    relation: full_resolved_name.clone(),
413                                    column: ResolvedColumnReference::Column {
414                                        name: column_name.to_owned(),
415                                        index: pos,
416                                    },
417                                }),
418                                for_schema,
419                            };
420                            if !user_provided_comments.contains(&doc_on_column_key) {
421                                options.push(CsrConfigOption {
422                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
423                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
424                                        Value::String(comment_str.clone()),
425                                    )),
426                                });
427                            }
428                        }
429                    }
430                }
431            }
432        }
433    }
434
435    Ok(())
436}
437
438/// Checks that the sink described in the statement can connect to its external
439/// resources.
440async fn purify_create_sink(
441    catalog: impl SessionCatalog,
442    mut create_sink_stmt: CreateSinkStatement<Aug>,
443    storage_configuration: &StorageConfiguration,
444) -> Result<PurifiedStatement, PlanError> {
445    // General purification
446    let CreateSinkStatement {
447        connection,
448        format,
449        with_options,
450        name: _,
451        in_cluster: _,
452        if_not_exists: _,
453        from,
454        envelope: _,
455        mode: _,
456    } = &mut create_sink_stmt;
457
458    // The list of options that the user is allowed to specify.
459    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
460        CreateSinkOptionName::Snapshot,
461        CreateSinkOptionName::CommitInterval,
462    ];
463
464    if let Some(op) = with_options
465        .iter()
466        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
467    {
468        sql_bail!(
469            "CREATE SINK...WITH ({}..) is not allowed",
470            op.name.to_ast_string_simple(),
471        )
472    }
473
474    match &connection {
475        CreateSinkConnection::Kafka {
476            connection,
477            options,
478            key: _,
479            headers: _,
480        } => {
481            // We must not leave any state behind in the Kafka broker, so just ensure that
482            // we can connect. This means we don't ensure that we can create the topic and
483            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
484            // preferable to leaking state in users' environments.
485            let scx = StatementContext::new(None, &catalog);
486            let connection = {
487                let item = scx.get_item_by_resolved_name(connection)?;
488                // Get Kafka connection
489                match item.connection()? {
490                    Connection::Kafka(connection) => {
491                        connection.clone().into_inline_connection(scx.catalog)
492                    }
493                    _ => sql_bail!(
494                        "{} is not a kafka connection",
495                        scx.catalog.resolve_full_name(item.name())
496                    ),
497                }
498            };
499
500            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
501
502            if extracted_options.legacy_ids == Some(true) {
503                sql_bail!("LEGACY IDs option is not supported");
504            }
505
506            let client: AdminClient<_> = connection
507                .create_with_context(
508                    storage_configuration,
509                    MzClientContext::default(),
510                    &BTreeMap::new(),
511                    InTask::No,
512                )
513                .await
514                .map_err(|e| {
515                    // anyhow doesn't support Clone, so not trivial to move into PlanError
516                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
517                })?;
518
519            let metadata = client
520                .inner()
521                .fetch_metadata(
522                    None,
523                    storage_configuration
524                        .parameters
525                        .kafka_timeout_config
526                        .fetch_metadata_timeout,
527                )
528                .map_err(|e| {
529                    KafkaSinkPurificationError::AdminClientError(Arc::new(
530                        ContextCreationError::KafkaError(e),
531                    ))
532                })?;
533
534            if metadata.brokers().len() == 0 {
535                Err(KafkaSinkPurificationError::ZeroBrokers)?;
536            }
537        }
538        CreateSinkConnection::Iceberg {
539            connection,
540            aws_connection,
541            ..
542        } => {
543            let scx = StatementContext::new(None, &catalog);
544            let connection = {
545                let item = scx.get_item_by_resolved_name(connection)?;
546                // Get Iceberg connection
547                match item.connection()? {
548                    Connection::IcebergCatalog(connection) => {
549                        connection.clone().into_inline_connection(scx.catalog)
550                    }
551                    _ => sql_bail!(
552                        "{} is not an iceberg connection",
553                        scx.catalog.resolve_full_name(item.name())
554                    ),
555                }
556            };
557
558            let aws_conn_id = aws_connection.item_id();
559
560            let aws_connection = {
561                let item = scx.get_item_by_resolved_name(aws_connection)?;
562                // Get AWS connection
563                match item.connection()? {
564                    Connection::Aws(aws_connection) => aws_connection.clone(),
565                    _ => sql_bail!(
566                        "{} is not an aws connection",
567                        scx.catalog.resolve_full_name(item.name())
568                    ),
569                }
570            };
571
572            // For S3 Tables connections in the Materialize Cloud product, verify the
573            // AWS region matches the environment's region. This check only applies when
574            // the enable_s3_tables_region_check dyncfg is set.
575            if let Some(s3tables) = connection.s3tables_catalog() {
576                let enable_region_check =
577                    ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
578                if enable_region_check {
579                    let env_id = &catalog.config().environment_id;
580                    if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
581                        let env_region = env_id.cloud_provider_region();
582                        // Later on we default to "us-east-1" if the region is not set on the S3 Tables
583                        // connection, so we need to do the same check here.
584                        let s3_tables_region = s3tables
585                            .aws_connection
586                            .connection
587                            .region
588                            .clone()
589                            .unwrap_or_else(|| "us-east-1".to_string());
590                        if s3_tables_region != env_region {
591                            Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
592                                s3_tables_region,
593                                environment_region: env_region.to_string(),
594                            })?;
595                        }
596                    }
597                }
598            }
599
600            let _catalog = connection
601                .connect(storage_configuration, InTask::No)
602                .await
603                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
604
605            let _sdk_config = aws_connection
606                .load_sdk_config(
607                    &storage_configuration.connection_context,
608                    aws_conn_id.clone(),
609                    InTask::No,
610                )
611                .await
612                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
613        }
614    }
615
616    let mut csr_connection_ids = BTreeSet::new();
617    for_each_format(format, |_, fmt| match fmt {
618        Format::Avro(AvroSchema::InlineSchema { .. })
619        | Format::Bytes
620        | Format::Csv { .. }
621        | Format::Json { .. }
622        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
623        | Format::Regex(..)
624        | Format::Text => (),
625        Format::Avro(AvroSchema::Csr {
626            csr_connection: CsrConnectionAvro { connection, .. },
627        })
628        | Format::Protobuf(ProtobufSchema::Csr {
629            csr_connection: CsrConnectionProtobuf { connection, .. },
630        }) => {
631            csr_connection_ids.insert(*connection.connection.item_id());
632        }
633    });
634
635    let scx = StatementContext::new(None, &catalog);
636    for csr_connection_id in csr_connection_ids {
637        let connection = {
638            let item = scx.get_item(&csr_connection_id);
639            // Get Kafka connection
640            match item.connection()? {
641                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
642                _ => Err(CsrPurificationError::NotCsrConnection(
643                    scx.catalog.resolve_full_name(item.name()),
644                ))?,
645            }
646        };
647
648        let client = connection
649            .connect(storage_configuration, InTask::No)
650            .await
651            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
652
653        client
654            .list_subjects()
655            .await
656            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
657    }
658
659    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
660
661    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
662}
663
664/// Runs a function on each format within the format specifier.
665///
666/// The function is provided with a `DocOnSchema` that indicates whether the
667/// format is for the key, value, or both.
668///
669// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
670fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
671where
672    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
673{
674    match format {
675        None => (),
676        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
677        Some(FormatSpecifier::KeyValue { key, value }) => {
678            f(DocOnSchema::KeyOnly, key);
679            f(DocOnSchema::ValueOnly, value);
680        }
681    }
682}
683
684/// Defines whether purification should enforce that at least one valid source
685/// reference is provided on the provided statement.
686#[derive(Debug, Copy, Clone, PartialEq, Eq)]
687pub(crate) enum SourceReferencePolicy {
688    /// Don't allow source references to be provided. This is used for
689    /// enforcing that `CREATE SOURCE` statements don't create subsources.
690    NotAllowed,
691    /// Allow empty references, such as when creating a source that
692    /// will have tables added afterwards.
693    Optional,
694    /// Require that at least one reference is resolved to an upstream
695    /// object.
696    Required,
697}
698
699async fn purify_create_source(
700    catalog: impl SessionCatalog,
701    now: u64,
702    mut create_source_stmt: CreateSourceStatement<Aug>,
703    storage_configuration: &StorageConfiguration,
704) -> Result<PurifiedStatement, PlanError> {
705    let CreateSourceStatement {
706        name: source_name,
707        col_names,
708        key_constraint,
709        connection: source_connection,
710        format,
711        envelope,
712        include_metadata,
713        external_references,
714        progress_subsource,
715        with_options,
716        ..
717    } = &mut create_source_stmt;
718
719    let uses_old_syntax = !col_names.is_empty()
720        || key_constraint.is_some()
721        || format.is_some()
722        || envelope.is_some()
723        || !include_metadata.is_empty()
724        || external_references.is_some()
725        || progress_subsource.is_some();
726
727    if let Some(DeferredItemName::Named(_)) = progress_subsource {
728        sql_bail!("Cannot manually ID qualify progress subsource")
729    }
730
731    let mut requested_subsource_map = BTreeMap::new();
732
733    let progress_desc = match &source_connection {
734        CreateSourceConnection::Kafka { .. } => {
735            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
736        }
737        CreateSourceConnection::Postgres { .. } => {
738            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
739        }
740        CreateSourceConnection::SqlServer { .. } => {
741            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
742        }
743        CreateSourceConnection::MySql { .. } => {
744            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
745        }
746        CreateSourceConnection::LoadGenerator { .. } => {
747            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
748        }
749    };
750    let scx = StatementContext::new(None, &catalog);
751
752    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
753    // to add tables after this source is created we might need to enforce that
754    // auto-generated subsources are created or not created by this source statement.
755    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
756        && scx.catalog.system_vars().force_source_table_syntax()
757    {
758        SourceReferencePolicy::NotAllowed
759    } else if scx.catalog.system_vars().enable_create_table_from_source() {
760        SourceReferencePolicy::Optional
761    } else {
762        SourceReferencePolicy::Required
763    };
764
765    let mut format_options = SourceFormatOptions::Default;
766
767    let retrieved_source_references: RetrievedSourceReferences;
768
769    match source_connection {
770        CreateSourceConnection::Kafka {
771            connection,
772            options: base_with_options,
773            ..
774        } => {
775            if let Some(external_references) = external_references {
776                Err(KafkaSourcePurificationError::ReferencedSubsources(
777                    external_references.clone(),
778                ))?;
779            }
780
781            let connection = {
782                let item = scx.get_item_by_resolved_name(connection)?;
783                // Get Kafka connection
784                match item.connection()? {
785                    Connection::Kafka(connection) => {
786                        connection.clone().into_inline_connection(&catalog)
787                    }
788                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
789                        scx.catalog.resolve_full_name(item.name()),
790                    ))?,
791                }
792            };
793
794            let extracted_options: KafkaSourceConfigOptionExtracted =
795                base_with_options.clone().try_into()?;
796
797            let topic = extracted_options
798                .topic
799                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
800
801            let consumer = connection
802                .create_with_context(
803                    storage_configuration,
804                    MzClientContext::default(),
805                    &BTreeMap::new(),
806                    InTask::No,
807                )
808                .await
809                .map_err(|e| {
810                    // anyhow doesn't support Clone, so not trivial to move into PlanError
811                    KafkaSourcePurificationError::KafkaConsumerError(
812                        e.display_with_causes().to_string(),
813                    )
814                })?;
815            let consumer = Arc::new(consumer);
816
817            match (
818                extracted_options.start_offset,
819                extracted_options.start_timestamp,
820            ) {
821                (None, None) => {
822                    // Validate that the topic at least exists.
823                    kafka_util::ensure_topic_exists(
824                        Arc::clone(&consumer),
825                        &topic,
826                        storage_configuration
827                            .parameters
828                            .kafka_timeout_config
829                            .fetch_metadata_timeout,
830                    )
831                    .await?;
832                }
833                (Some(_), Some(_)) => {
834                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
835                }
836                (Some(start_offsets), None) => {
837                    // Validate the start offsets.
838                    kafka_util::validate_start_offsets(
839                        Arc::clone(&consumer),
840                        &topic,
841                        start_offsets,
842                        storage_configuration
843                            .parameters
844                            .kafka_timeout_config
845                            .fetch_metadata_timeout,
846                    )
847                    .await?;
848                }
849                (None, Some(time_offset)) => {
850                    // Translate `START TIMESTAMP` to a start offset.
851                    let start_offsets = kafka_util::lookup_start_offsets(
852                        Arc::clone(&consumer),
853                        &topic,
854                        time_offset,
855                        now,
856                        storage_configuration
857                            .parameters
858                            .kafka_timeout_config
859                            .fetch_metadata_timeout,
860                    )
861                    .await?;
862
863                    base_with_options.retain(|val| {
864                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
865                    });
866                    base_with_options.push(KafkaSourceConfigOption {
867                        name: KafkaSourceConfigOptionName::StartOffset,
868                        value: Some(WithOptionValue::Sequence(
869                            start_offsets
870                                .iter()
871                                .map(|offset| {
872                                    WithOptionValue::Value(Value::Number(offset.to_string()))
873                                })
874                                .collect(),
875                        )),
876                    });
877                }
878            }
879
880            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
881            retrieved_source_references = reference_client.get_source_references().await?;
882
883            format_options = SourceFormatOptions::Kafka { topic };
884        }
885        CreateSourceConnection::Postgres {
886            connection,
887            options,
888        } => {
889            let connection_item = scx.get_item_by_resolved_name(connection)?;
890            let connection = match connection_item.connection().map_err(PlanError::from)? {
891                Connection::Postgres(connection) => {
892                    connection.clone().into_inline_connection(&catalog)
893                }
894                _ => Err(PgSourcePurificationError::NotPgConnection(
895                    scx.catalog.resolve_full_name(connection_item.name()),
896                ))?,
897            };
898            let crate::plan::statement::PgConfigOptionExtracted {
899                publication,
900                text_columns,
901                exclude_columns,
902                details,
903                ..
904            } = options.clone().try_into()?;
905            let publication =
906                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
907
908            if details.is_some() {
909                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
910            }
911
912            let client = connection
913                .validate(connection_item.id(), storage_configuration)
914                .await?;
915
916            let reference_client = SourceReferenceClient::Postgres {
917                client: &client,
918                publication: &publication,
919                database: &connection.database,
920            };
921            retrieved_source_references = reference_client.get_source_references().await?;
922
923            let postgres::PurifiedSourceExports {
924                source_exports: subsources,
925                normalized_text_columns,
926            } = postgres::purify_source_exports(
927                &client,
928                &retrieved_source_references,
929                external_references,
930                text_columns,
931                exclude_columns,
932                source_name,
933                &reference_policy,
934            )
935            .await?;
936
937            if let Some(text_cols_option) = options
938                .iter_mut()
939                .find(|option| option.name == PgConfigOptionName::TextColumns)
940            {
941                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
942            }
943
944            requested_subsource_map.extend(subsources);
945
946            // Record the active replication timeline_id to allow detection of a future upstream
947            // point-in-time-recovery that will put the source into an error state.
948            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
949
950            // Remove any old detail references
951            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
952            let details = PostgresSourcePublicationDetails {
953                slot: format!(
954                    "materialize_{}",
955                    Uuid::new_v4().to_string().replace('-', "")
956                ),
957                timeline_id: Some(timeline_id),
958                database: connection.database,
959            };
960            options.push(PgConfigOption {
961                name: PgConfigOptionName::Details,
962                value: Some(WithOptionValue::Value(Value::String(hex::encode(
963                    details.into_proto().encode_to_vec(),
964                )))),
965            })
966        }
967        CreateSourceConnection::SqlServer {
968            connection,
969            options,
970        } => {
971            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
972
973            // Connect to the upstream SQL Server instance so we can validate
974            // we're compatible with CDC.
975            let connection_item = scx.get_item_by_resolved_name(connection)?;
976            let connection = match connection_item.connection()? {
977                Connection::SqlServer(connection) => {
978                    connection.clone().into_inline_connection(&catalog)
979                }
980                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
981                    scx.catalog.resolve_full_name(connection_item.name()),
982                ))?,
983            };
984            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
985                details,
986                text_columns,
987                exclude_columns,
988                seen: _,
989            } = options.clone().try_into()?;
990
991            if details.is_some() {
992                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
993            }
994
995            let mut client = connection
996                .validate(connection_item.id(), storage_configuration)
997                .await?;
998
999            let database: Arc<str> = connection.database.into();
1000            let reference_client = SourceReferenceClient::SqlServer {
1001                client: &mut client,
1002                database: Arc::clone(&database),
1003            };
1004            retrieved_source_references = reference_client.get_source_references().await?;
1005            tracing::debug!(?retrieved_source_references, "got source references");
1006
1007            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1008                .get(storage_configuration.config_set());
1009
1010            let purified_source_exports = sql_server::purify_source_exports(
1011                &*database,
1012                &mut client,
1013                &retrieved_source_references,
1014                external_references,
1015                &text_columns,
1016                &exclude_columns,
1017                source_name,
1018                timeout,
1019                &reference_policy,
1020            )
1021            .await?;
1022
1023            let sql_server::PurifiedSourceExports {
1024                source_exports,
1025                normalized_text_columns,
1026                normalized_excl_columns,
1027            } = purified_source_exports;
1028
1029            // Update our set of requested source exports.
1030            requested_subsource_map.extend(source_exports);
1031
1032            // Record the most recent restore_history_id, or none if the system has never been
1033            // restored.
1034
1035            let restore_history_id =
1036                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1037            let details = SqlServerSourceExtras { restore_history_id };
1038
1039            options.retain(|SqlServerConfigOption { name, .. }| {
1040                name != &SqlServerConfigOptionName::Details
1041            });
1042            options.push(SqlServerConfigOption {
1043                name: SqlServerConfigOptionName::Details,
1044                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1045                    details.into_proto().encode_to_vec(),
1046                )))),
1047            });
1048
1049            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1050            if let Some(text_cols_option) = options
1051                .iter_mut()
1052                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1053            {
1054                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1055            }
1056            if let Some(excl_cols_option) = options
1057                .iter_mut()
1058                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1059            {
1060                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1061            }
1062        }
1063        CreateSourceConnection::MySql {
1064            connection,
1065            options,
1066        } => {
1067            let connection_item = scx.get_item_by_resolved_name(connection)?;
1068            let connection = match connection_item.connection()? {
1069                Connection::MySql(connection) => {
1070                    connection.clone().into_inline_connection(&catalog)
1071                }
1072                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1073                    scx.catalog.resolve_full_name(connection_item.name()),
1074                ))?,
1075            };
1076            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1077                details,
1078                text_columns,
1079                exclude_columns,
1080                seen: _,
1081            } = options.clone().try_into()?;
1082
1083            if details.is_some() {
1084                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1085            }
1086
1087            let mut conn = connection
1088                .validate(connection_item.id(), storage_configuration)
1089                .await
1090                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1091
1092            // Retrieve the current @gtid_executed value of the server to mark as the effective
1093            // initial snapshot point such that we can ensure consistency if the initial source
1094            // snapshot is broken up over multiple points in time.
1095            let initial_gtid_set =
1096                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1097
1098            let reference_client = SourceReferenceClient::MySql {
1099                conn: &mut conn,
1100                include_system_schemas: mysql::references_system_schemas(external_references),
1101            };
1102            retrieved_source_references = reference_client.get_source_references().await?;
1103
1104            let mysql::PurifiedSourceExports {
1105                source_exports: subsources,
1106                normalized_text_columns,
1107                normalized_exclude_columns,
1108            } = mysql::purify_source_exports(
1109                &mut conn,
1110                &retrieved_source_references,
1111                external_references,
1112                text_columns,
1113                exclude_columns,
1114                source_name,
1115                initial_gtid_set.clone(),
1116                &reference_policy,
1117            )
1118            .await?;
1119            requested_subsource_map.extend(subsources);
1120
1121            // We don't have any fields in this details struct but keep this around for
1122            // conformity with postgres and in-case we end up needing it again in the future.
1123            let details = MySqlSourceDetails {};
1124            // Update options with the purified details
1125            options
1126                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1127            options.push(MySqlConfigOption {
1128                name: MySqlConfigOptionName::Details,
1129                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1130                    details.into_proto().encode_to_vec(),
1131                )))),
1132            });
1133
1134            if let Some(text_cols_option) = options
1135                .iter_mut()
1136                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1137            {
1138                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1139            }
1140            if let Some(exclude_cols_option) = options
1141                .iter_mut()
1142                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1143            {
1144                exclude_cols_option.value =
1145                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1146            }
1147        }
1148        CreateSourceConnection::LoadGenerator { generator, options } => {
1149            let load_generator =
1150                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1151
1152            let reference_client = SourceReferenceClient::LoadGenerator {
1153                generator: &load_generator,
1154            };
1155            retrieved_source_references = reference_client.get_source_references().await?;
1156            // Filter to the references that need to be created as 'subsources', which
1157            // doesn't include the default output for single-output sources.
1158            // TODO(database-issues#8620): Remove once subsources are removed
1159            let multi_output_sources =
1160                retrieved_source_references
1161                    .all_references()
1162                    .iter()
1163                    .any(|r| {
1164                        r.load_generator_output().expect("is loadgen")
1165                            != &LoadGeneratorOutput::Default
1166                    });
1167
1168            match external_references {
1169                Some(requested)
1170                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1171                {
1172                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1173                }
1174                Some(requested) if !multi_output_sources => match requested {
1175                    ExternalReferences::SubsetTables(_) => {
1176                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1177                    }
1178                    ExternalReferences::SubsetSchemas(_) => {
1179                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1180                    }
1181                    ExternalReferences::All => {
1182                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1183                    }
1184                },
1185                Some(requested) => {
1186                    let requested_exports = retrieved_source_references
1187                        .requested_source_exports(Some(requested), source_name)?;
1188                    for export in requested_exports {
1189                        requested_subsource_map.insert(
1190                            export.name,
1191                            PurifiedSourceExport {
1192                                external_reference: export.external_reference,
1193                                details: PurifiedExportDetails::LoadGenerator {
1194                                    table: export
1195                                        .meta
1196                                        .load_generator_desc()
1197                                        .expect("is loadgen")
1198                                        .clone(),
1199                                    output: export
1200                                        .meta
1201                                        .load_generator_output()
1202                                        .expect("is loadgen")
1203                                        .clone(),
1204                                },
1205                            },
1206                        );
1207                    }
1208                }
1209                None => {
1210                    if multi_output_sources
1211                        && matches!(reference_policy, SourceReferencePolicy::Required)
1212                    {
1213                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1214                    }
1215                }
1216            }
1217
1218            if let LoadGenerator::Clock = generator {
1219                if !options
1220                    .iter()
1221                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1222                {
1223                    let now = catalog.now();
1224                    options.push(LoadGeneratorOption {
1225                        name: LoadGeneratorOptionName::AsOf,
1226                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1227                    });
1228                }
1229            }
1230        }
1231    }
1232
1233    // Now that we know which subsources to create alongside this
1234    // statement, remove the references so it is not canonicalized as
1235    // part of the `CREATE SOURCE` statement in the catalog.
1236    *external_references = None;
1237
1238    // Generate progress subsource for old syntax
1239    let create_progress_subsource_stmt = if uses_old_syntax {
1240        // Take name from input or generate name
1241        let name = match progress_subsource {
1242            Some(name) => match name {
1243                DeferredItemName::Deferred(name) => name.clone(),
1244                DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1245            },
1246            None => {
1247                let (item, prefix) = source_name.0.split_last().unwrap();
1248                let item_name =
1249                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1250                        let mut suggested_name = prefix.to_vec();
1251                        suggested_name.push(candidate.clone());
1252
1253                        let partial =
1254                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1255                        let qualified = scx.allocate_qualified_name(partial)?;
1256                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1257                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1258                        Ok::<_, PlanError>(!item_exists && !type_exists)
1259                    })?;
1260
1261                let mut full_name = prefix.to_vec();
1262                full_name.push(item_name);
1263                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1264                let qualified_name = scx.allocate_qualified_name(full_name)?;
1265                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1266
1267                UnresolvedItemName::from(full_name.clone())
1268            }
1269        };
1270
1271        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1272
1273        // Create the subsource statement
1274        let mut progress_with_options: Vec<_> = with_options
1275            .iter()
1276            .filter_map(|opt| match opt.name {
1277                CreateSourceOptionName::TimestampInterval => None,
1278                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1279                    name: CreateSubsourceOptionName::RetainHistory,
1280                    value: opt.value.clone(),
1281                }),
1282            })
1283            .collect();
1284        progress_with_options.push(CreateSubsourceOption {
1285            name: CreateSubsourceOptionName::Progress,
1286            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1287        });
1288
1289        Some(CreateSubsourceStatement {
1290            name,
1291            columns,
1292            // Progress subsources do not refer to the source to which they belong.
1293            // Instead the primary source depends on it (the opposite is true of
1294            // ingestion exports, which depend on the primary source).
1295            of_source: None,
1296            constraints,
1297            if_not_exists: false,
1298            with_options: progress_with_options,
1299        })
1300    } else {
1301        None
1302    };
1303
1304    purify_source_format(
1305        &catalog,
1306        format,
1307        &format_options,
1308        envelope,
1309        storage_configuration,
1310    )
1311    .await?;
1312
1313    Ok(PurifiedStatement::PurifiedCreateSource {
1314        create_progress_subsource_stmt,
1315        create_source_stmt,
1316        subsources: requested_subsource_map,
1317        available_source_references: retrieved_source_references.available_source_references(),
1318    })
1319}
1320
1321/// On success, returns the details on new subsources and updated
1322/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1323async fn purify_alter_source(
1324    catalog: impl SessionCatalog,
1325    stmt: AlterSourceStatement<Aug>,
1326    storage_configuration: &StorageConfiguration,
1327) -> Result<PurifiedStatement, PlanError> {
1328    let scx = StatementContext::new(None, &catalog);
1329    let AlterSourceStatement {
1330        source_name: unresolved_source_name,
1331        action,
1332        if_exists,
1333    } = stmt;
1334
1335    // Get name.
1336    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1337        Ok(item) => item,
1338        Err(_) if if_exists => {
1339            return Ok(PurifiedStatement::PurifiedAlterSource {
1340                alter_source_stmt: AlterSourceStatement {
1341                    source_name: unresolved_source_name,
1342                    action,
1343                    if_exists,
1344                },
1345            });
1346        }
1347        Err(e) => return Err(e),
1348    };
1349
1350    // Ensure it's an ingestion-based and alterable source.
1351    let desc = match item.source_desc()? {
1352        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1353        None => {
1354            sql_bail!("cannot ALTER this type of source")
1355        }
1356    };
1357
1358    let source_name = item.name();
1359
1360    let resolved_source_name = ResolvedItemName::Item {
1361        id: item.id(),
1362        qualifiers: item.name().qualifiers.clone(),
1363        full_name: scx.catalog.resolve_full_name(source_name),
1364        print_id: true,
1365        version: RelationVersionSelector::Latest,
1366    };
1367
1368    let partial_name = scx.catalog.minimal_qualification(source_name);
1369
1370    match action {
1371        AlterSourceAction::AddSubsources {
1372            external_references,
1373            options,
1374        } => {
1375            if scx.catalog.system_vars().enable_create_table_from_source()
1376                && scx.catalog.system_vars().force_source_table_syntax()
1377            {
1378                Err(PlanError::UseTablesForSources(
1379                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1380                ))?;
1381            }
1382
1383            purify_alter_source_add_subsources(
1384                external_references,
1385                options,
1386                desc,
1387                partial_name,
1388                unresolved_source_name,
1389                resolved_source_name,
1390                storage_configuration,
1391            )
1392            .await
1393        }
1394        AlterSourceAction::RefreshReferences => {
1395            purify_alter_source_refresh_references(
1396                desc,
1397                resolved_source_name,
1398                storage_configuration,
1399            )
1400            .await
1401        }
1402        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1403            alter_source_stmt: AlterSourceStatement {
1404                source_name: unresolved_source_name,
1405                action,
1406                if_exists,
1407            },
1408        }),
1409    }
1410}
1411
1412// TODO(database-issues#8620): Remove once subsources are removed
1413/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1414async fn purify_alter_source_add_subsources(
1415    external_references: Vec<ExternalReferenceExport>,
1416    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1417    desc: SourceDesc,
1418    partial_source_name: PartialItemName,
1419    unresolved_source_name: UnresolvedItemName,
1420    resolved_source_name: ResolvedItemName,
1421    storage_configuration: &StorageConfiguration,
1422) -> Result<PurifiedStatement, PlanError> {
1423    // Validate this is a source that can have subsources added.
1424    let connection_id = match &desc.connection {
1425        GenericSourceConnection::Postgres(c) => c.connection_id,
1426        GenericSourceConnection::MySql(c) => c.connection_id,
1427        GenericSourceConnection::SqlServer(c) => c.connection_id,
1428        _ => sql_bail!(
1429            "source {} does not support ALTER SOURCE.",
1430            partial_source_name
1431        ),
1432    };
1433
1434    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1435        text_columns,
1436        exclude_columns,
1437        details,
1438        seen: _,
1439    } = options.clone().try_into()?;
1440    if details.is_some() {
1441        sql_bail!("DETAILS option cannot be explicitly set");
1442    }
1443
1444    let mut requested_subsource_map = BTreeMap::new();
1445
1446    match desc.connection {
1447        GenericSourceConnection::Postgres(pg_source_connection) => {
1448            // Get PostgresConnection for generating subsources.
1449            let pg_connection = &pg_source_connection.connection;
1450
1451            let client = pg_connection
1452                .validate(connection_id, storage_configuration)
1453                .await?;
1454
1455            let reference_client = SourceReferenceClient::Postgres {
1456                client: &client,
1457                publication: &pg_source_connection.publication,
1458                database: &pg_connection.database,
1459            };
1460            let retrieved_source_references = reference_client.get_source_references().await?;
1461
1462            let postgres::PurifiedSourceExports {
1463                source_exports: subsources,
1464                normalized_text_columns,
1465            } = postgres::purify_source_exports(
1466                &client,
1467                &retrieved_source_references,
1468                &Some(ExternalReferences::SubsetTables(external_references)),
1469                text_columns,
1470                exclude_columns,
1471                &unresolved_source_name,
1472                &SourceReferencePolicy::Required,
1473            )
1474            .await?;
1475
1476            if let Some(text_cols_option) = options
1477                .iter_mut()
1478                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1479            {
1480                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1481            }
1482
1483            requested_subsource_map.extend(subsources);
1484        }
1485        GenericSourceConnection::MySql(mysql_source_connection) => {
1486            let mysql_connection = &mysql_source_connection.connection;
1487            let config = mysql_connection
1488                .config(
1489                    &storage_configuration.connection_context.secrets_reader,
1490                    storage_configuration,
1491                    InTask::No,
1492                )
1493                .await?;
1494
1495            let mut conn = config
1496                .connect(
1497                    "mysql purification",
1498                    &storage_configuration.connection_context.ssh_tunnel_manager,
1499                )
1500                .await?;
1501
1502            // Retrieve the current @gtid_executed value of the server to mark as the effective
1503            // initial snapshot point for these subsources.
1504            let initial_gtid_set =
1505                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1506
1507            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1508
1509            let reference_client = SourceReferenceClient::MySql {
1510                conn: &mut conn,
1511                include_system_schemas: mysql::references_system_schemas(&requested_references),
1512            };
1513            let retrieved_source_references = reference_client.get_source_references().await?;
1514
1515            let mysql::PurifiedSourceExports {
1516                source_exports: subsources,
1517                normalized_text_columns,
1518                normalized_exclude_columns,
1519            } = mysql::purify_source_exports(
1520                &mut conn,
1521                &retrieved_source_references,
1522                &requested_references,
1523                text_columns,
1524                exclude_columns,
1525                &unresolved_source_name,
1526                initial_gtid_set,
1527                &SourceReferencePolicy::Required,
1528            )
1529            .await?;
1530            requested_subsource_map.extend(subsources);
1531
1532            // Update options with the purified details
1533            if let Some(text_cols_option) = options
1534                .iter_mut()
1535                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1536            {
1537                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1538            }
1539            if let Some(exclude_cols_option) = options
1540                .iter_mut()
1541                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1542            {
1543                exclude_cols_option.value =
1544                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1545            }
1546        }
1547        GenericSourceConnection::SqlServer(sql_server_source) => {
1548            // Open a connection to the upstream SQL Server instance.
1549            let sql_server_connection = &sql_server_source.connection;
1550            let config = sql_server_connection
1551                .resolve_config(
1552                    &storage_configuration.connection_context.secrets_reader,
1553                    storage_configuration,
1554                    InTask::No,
1555                )
1556                .await?;
1557            let mut client = mz_sql_server_util::Client::connect(config).await?;
1558
1559            // Query the upstream SQL Server instance for available tables to replicate.
1560            let database = sql_server_connection.database.clone().into();
1561            let source_references = SourceReferenceClient::SqlServer {
1562                client: &mut client,
1563                database: Arc::clone(&database),
1564            }
1565            .get_source_references()
1566            .await?;
1567            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1568
1569            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1570                .get(storage_configuration.config_set());
1571
1572            let result = sql_server::purify_source_exports(
1573                &*database,
1574                &mut client,
1575                &source_references,
1576                &requested_references,
1577                &text_columns,
1578                &exclude_columns,
1579                &unresolved_source_name,
1580                timeout,
1581                &SourceReferencePolicy::Required,
1582            )
1583            .await;
1584            let sql_server::PurifiedSourceExports {
1585                source_exports,
1586                normalized_text_columns,
1587                normalized_excl_columns,
1588            } = result?;
1589
1590            // Add the new exports to our subsource map.
1591            requested_subsource_map.extend(source_exports);
1592
1593            // Update options on the CREATE SOURCE statement with the purified details.
1594            if let Some(text_cols_option) = options
1595                .iter_mut()
1596                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1597            {
1598                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1599            }
1600            if let Some(exclude_cols_option) = options
1601                .iter_mut()
1602                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1603            {
1604                exclude_cols_option.value =
1605                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1606            }
1607        }
1608        _ => unreachable!(),
1609    };
1610
1611    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1612        source_name: resolved_source_name,
1613        options,
1614        subsources: requested_subsource_map,
1615    })
1616}
1617
1618async fn purify_alter_source_refresh_references(
1619    desc: SourceDesc,
1620    resolved_source_name: ResolvedItemName,
1621    storage_configuration: &StorageConfiguration,
1622) -> Result<PurifiedStatement, PlanError> {
1623    let retrieved_source_references = match desc.connection {
1624        GenericSourceConnection::Postgres(pg_source_connection) => {
1625            // Get PostgresConnection for generating subsources.
1626            let pg_connection = &pg_source_connection.connection;
1627
1628            let config = pg_connection
1629                .config(
1630                    &storage_configuration.connection_context.secrets_reader,
1631                    storage_configuration,
1632                    InTask::No,
1633                )
1634                .await?;
1635
1636            let client = config
1637                .connect(
1638                    "postgres_purification",
1639                    &storage_configuration.connection_context.ssh_tunnel_manager,
1640                )
1641                .await?;
1642            let reference_client = SourceReferenceClient::Postgres {
1643                client: &client,
1644                publication: &pg_source_connection.publication,
1645                database: &pg_connection.database,
1646            };
1647            reference_client.get_source_references().await?
1648        }
1649        GenericSourceConnection::MySql(mysql_source_connection) => {
1650            let mysql_connection = &mysql_source_connection.connection;
1651            let config = mysql_connection
1652                .config(
1653                    &storage_configuration.connection_context.secrets_reader,
1654                    storage_configuration,
1655                    InTask::No,
1656                )
1657                .await?;
1658
1659            let mut conn = config
1660                .connect(
1661                    "mysql purification",
1662                    &storage_configuration.connection_context.ssh_tunnel_manager,
1663                )
1664                .await?;
1665
1666            let reference_client = SourceReferenceClient::MySql {
1667                conn: &mut conn,
1668                include_system_schemas: false,
1669            };
1670            reference_client.get_source_references().await?
1671        }
1672        GenericSourceConnection::SqlServer(sql_server_source) => {
1673            // Open a connection to the upstream SQL Server instance.
1674            let sql_server_connection = &sql_server_source.connection;
1675            let config = sql_server_connection
1676                .resolve_config(
1677                    &storage_configuration.connection_context.secrets_reader,
1678                    storage_configuration,
1679                    InTask::No,
1680                )
1681                .await?;
1682            let mut client = mz_sql_server_util::Client::connect(config).await?;
1683
1684            // Query the upstream SQL Server instance for available tables to replicate.
1685            let source_references = SourceReferenceClient::SqlServer {
1686                client: &mut client,
1687                database: sql_server_connection.database.clone().into(),
1688            }
1689            .get_source_references()
1690            .await?;
1691            source_references
1692        }
1693        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1694            let reference_client = SourceReferenceClient::LoadGenerator {
1695                generator: &load_gen_connection.load_generator,
1696            };
1697            reference_client.get_source_references().await?
1698        }
1699        GenericSourceConnection::Kafka(kafka_conn) => {
1700            let reference_client = SourceReferenceClient::Kafka {
1701                topic: &kafka_conn.topic,
1702            };
1703            reference_client.get_source_references().await?
1704        }
1705    };
1706    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1707        source_name: resolved_source_name,
1708        available_source_references: retrieved_source_references.available_source_references(),
1709    })
1710}
1711
1712async fn purify_create_table_from_source(
1713    catalog: impl SessionCatalog,
1714    mut stmt: CreateTableFromSourceStatement<Aug>,
1715    storage_configuration: &StorageConfiguration,
1716) -> Result<PurifiedStatement, PlanError> {
1717    let scx = StatementContext::new(None, &catalog);
1718    let CreateTableFromSourceStatement {
1719        name: _,
1720        columns,
1721        constraints,
1722        source: source_name,
1723        if_not_exists: _,
1724        external_reference,
1725        format,
1726        envelope,
1727        include_metadata: _,
1728        with_options,
1729    } = &mut stmt;
1730
1731    // Columns and constraints cannot be specified by the user but will be populated below.
1732    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1733        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1734    }
1735    if !constraints.is_empty() {
1736        sql_bail!(
1737            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1738        );
1739    }
1740
1741    // Get the source item
1742    let item = match scx.get_item_by_resolved_name(source_name) {
1743        Ok(item) => item,
1744        Err(e) => return Err(e),
1745    };
1746
1747    // Ensure it's an ingestion-based and alterable source.
1748    let desc = match item.source_desc()? {
1749        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1750        None => {
1751            sql_bail!("cannot ALTER this type of source")
1752        }
1753    };
1754    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1755
1756    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1757        text_columns,
1758        exclude_columns,
1759        retain_history: _,
1760        details,
1761        partition_by: _,
1762        seen: _,
1763    } = with_options.clone().try_into()?;
1764    if details.is_some() {
1765        sql_bail!("DETAILS option cannot be explicitly set");
1766    }
1767
1768    // Our text column values are unqualified (just column names), but the purification methods below
1769    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1770    // need to qualify them using the external reference first.
1771    let qualified_text_columns = text_columns
1772        .iter()
1773        .map(|col| {
1774            UnresolvedItemName(
1775                external_reference
1776                    .as_ref()
1777                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1778                    .unwrap_or_else(|| vec![col.clone()]),
1779            )
1780        })
1781        .collect_vec();
1782    let qualified_exclude_columns = exclude_columns
1783        .iter()
1784        .map(|col| {
1785            UnresolvedItemName(
1786                external_reference
1787                    .as_ref()
1788                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1789                    .unwrap_or_else(|| vec![col.clone()]),
1790            )
1791        })
1792        .collect_vec();
1793
1794    // Should be overriden below if a source-specific format is required.
1795    let mut format_options = SourceFormatOptions::Default;
1796
1797    let retrieved_source_references: RetrievedSourceReferences;
1798
1799    let requested_references = external_reference.as_ref().map(|ref_name| {
1800        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1801            reference: ref_name.clone(),
1802            alias: None,
1803        }])
1804    });
1805
1806    // Run purification work specific to each source type: resolve the external reference to
1807    // a fully qualified name and obtain the appropriate details for the source-export statement
1808    let purified_export = match desc.connection {
1809        GenericSourceConnection::Postgres(pg_source_connection) => {
1810            // Get PostgresConnection for generating subsources.
1811            let pg_connection = &pg_source_connection.connection;
1812
1813            let client = pg_connection
1814                .validate(pg_source_connection.connection_id, storage_configuration)
1815                .await?;
1816
1817            let reference_client = SourceReferenceClient::Postgres {
1818                client: &client,
1819                publication: &pg_source_connection.publication,
1820                database: &pg_connection.database,
1821            };
1822            retrieved_source_references = reference_client.get_source_references().await?;
1823
1824            let postgres::PurifiedSourceExports {
1825                source_exports,
1826                // TODO(database-issues#8620): Remove once subsources are removed
1827                // This `normalized_text_columns` is not relevant for us and is only returned for
1828                // `CREATE SOURCE` statements that automatically generate subsources
1829                normalized_text_columns: _,
1830            } = postgres::purify_source_exports(
1831                &client,
1832                &retrieved_source_references,
1833                &requested_references,
1834                qualified_text_columns,
1835                qualified_exclude_columns,
1836                &unresolved_source_name,
1837                &SourceReferencePolicy::Required,
1838            )
1839            .await?;
1840            // There should be exactly one source_export returned for this statement
1841            let (_, purified_export) = source_exports.into_element();
1842            purified_export
1843        }
1844        GenericSourceConnection::MySql(mysql_source_connection) => {
1845            let mysql_connection = &mysql_source_connection.connection;
1846            let config = mysql_connection
1847                .config(
1848                    &storage_configuration.connection_context.secrets_reader,
1849                    storage_configuration,
1850                    InTask::No,
1851                )
1852                .await?;
1853
1854            let mut conn = config
1855                .connect(
1856                    "mysql purification",
1857                    &storage_configuration.connection_context.ssh_tunnel_manager,
1858                )
1859                .await?;
1860
1861            // Retrieve the current @gtid_executed value of the server to mark as the effective
1862            // initial snapshot point for this table.
1863            let initial_gtid_set =
1864                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1865
1866            let reference_client = SourceReferenceClient::MySql {
1867                conn: &mut conn,
1868                include_system_schemas: mysql::references_system_schemas(&requested_references),
1869            };
1870            retrieved_source_references = reference_client.get_source_references().await?;
1871
1872            let mysql::PurifiedSourceExports {
1873                source_exports,
1874                // TODO(database-issues#8620): Remove once subsources are removed
1875                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1876                // `CREATE SOURCE` statements that automatically generate subsources
1877                normalized_text_columns: _,
1878                normalized_exclude_columns: _,
1879            } = mysql::purify_source_exports(
1880                &mut conn,
1881                &retrieved_source_references,
1882                &requested_references,
1883                qualified_text_columns,
1884                qualified_exclude_columns,
1885                &unresolved_source_name,
1886                initial_gtid_set,
1887                &SourceReferencePolicy::Required,
1888            )
1889            .await?;
1890            // There should be exactly one source_export returned for this statement
1891            let (_, purified_export) = source_exports.into_element();
1892            purified_export
1893        }
1894        GenericSourceConnection::SqlServer(sql_server_source) => {
1895            let connection = sql_server_source.connection;
1896            let config = connection
1897                .resolve_config(
1898                    &storage_configuration.connection_context.secrets_reader,
1899                    storage_configuration,
1900                    InTask::No,
1901                )
1902                .await?;
1903            let mut client = mz_sql_server_util::Client::connect(config).await?;
1904
1905            let database: Arc<str> = connection.database.into();
1906            let reference_client = SourceReferenceClient::SqlServer {
1907                client: &mut client,
1908                database: Arc::clone(&database),
1909            };
1910            retrieved_source_references = reference_client.get_source_references().await?;
1911            tracing::debug!(?retrieved_source_references, "got source references");
1912
1913            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1914                .get(storage_configuration.config_set());
1915
1916            let purified_source_exports = sql_server::purify_source_exports(
1917                &*database,
1918                &mut client,
1919                &retrieved_source_references,
1920                &requested_references,
1921                &qualified_text_columns,
1922                &qualified_exclude_columns,
1923                &unresolved_source_name,
1924                timeout,
1925                &SourceReferencePolicy::Required,
1926            )
1927            .await?;
1928
1929            // There should be exactly one source_export returned for this statement
1930            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1931            purified_export
1932        }
1933        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1934            let reference_client = SourceReferenceClient::LoadGenerator {
1935                generator: &load_gen_connection.load_generator,
1936            };
1937            retrieved_source_references = reference_client.get_source_references().await?;
1938
1939            let requested_exports = retrieved_source_references
1940                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1941            // There should be exactly one source_export returned
1942            let export = requested_exports.into_element();
1943            PurifiedSourceExport {
1944                external_reference: export.external_reference,
1945                details: PurifiedExportDetails::LoadGenerator {
1946                    table: export
1947                        .meta
1948                        .load_generator_desc()
1949                        .expect("is loadgen")
1950                        .clone(),
1951                    output: export
1952                        .meta
1953                        .load_generator_output()
1954                        .expect("is loadgen")
1955                        .clone(),
1956                },
1957            }
1958        }
1959        GenericSourceConnection::Kafka(kafka_conn) => {
1960            let reference_client = SourceReferenceClient::Kafka {
1961                topic: &kafka_conn.topic,
1962            };
1963            retrieved_source_references = reference_client.get_source_references().await?;
1964            let requested_exports = retrieved_source_references
1965                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1966            // There should be exactly one source_export returned
1967            let export = requested_exports.into_element();
1968
1969            format_options = SourceFormatOptions::Kafka {
1970                topic: kafka_conn.topic.clone(),
1971            };
1972            PurifiedSourceExport {
1973                external_reference: export.external_reference,
1974                details: PurifiedExportDetails::Kafka {},
1975            }
1976        }
1977    };
1978
1979    purify_source_format(
1980        &catalog,
1981        format,
1982        &format_options,
1983        envelope,
1984        storage_configuration,
1985    )
1986    .await?;
1987
1988    // Update the external reference in the statement to the resolved fully-qualified
1989    // external reference
1990    *external_reference = Some(purified_export.external_reference.clone());
1991
1992    // Update options in the statement using the purified export details
1993    match &purified_export.details {
1994        PurifiedExportDetails::Postgres { .. } => {
1995            let mut unsupported_cols = vec![];
1996            let postgres::PostgresExportStatementValues {
1997                columns: gen_columns,
1998                constraints: gen_constraints,
1999                text_columns: gen_text_columns,
2000                exclude_columns: gen_exclude_columns,
2001                details: gen_details,
2002                external_reference: _,
2003            } = postgres::generate_source_export_statement_values(
2004                &scx,
2005                purified_export,
2006                &mut unsupported_cols,
2007            )?;
2008            if !unsupported_cols.is_empty() {
2009                unsupported_cols.sort();
2010                Err(PgSourcePurificationError::UnrecognizedTypes {
2011                    cols: unsupported_cols,
2012                })?;
2013            }
2014
2015            if let Some(text_cols_option) = with_options
2016                .iter_mut()
2017                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2018            {
2019                match gen_text_columns {
2020                    Some(gen_text_columns) => {
2021                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2022                    }
2023                    None => soft_panic_or_log!(
2024                        "text_columns should be Some if text_cols_option is present"
2025                    ),
2026                }
2027            }
2028            if let Some(exclude_cols_option) = with_options
2029                .iter_mut()
2030                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2031            {
2032                match gen_exclude_columns {
2033                    Some(gen_exclude_columns) => {
2034                        exclude_cols_option.value =
2035                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2036                    }
2037                    None => soft_panic_or_log!(
2038                        "exclude_columns should be Some if exclude_cols_option is present"
2039                    ),
2040                }
2041            }
2042            match columns {
2043                TableFromSourceColumns::Defined(_) => unreachable!(),
2044                TableFromSourceColumns::NotSpecified => {
2045                    *columns = TableFromSourceColumns::Defined(gen_columns);
2046                    *constraints = gen_constraints;
2047                }
2048                TableFromSourceColumns::Named(_) => {
2049                    sql_bail!("columns cannot be named for Postgres sources")
2050                }
2051            }
2052            with_options.push(TableFromSourceOption {
2053                name: TableFromSourceOptionName::Details,
2054                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2055                    gen_details.into_proto().encode_to_vec(),
2056                )))),
2057            })
2058        }
2059        PurifiedExportDetails::MySql { .. } => {
2060            let mysql::MySqlExportStatementValues {
2061                columns: gen_columns,
2062                constraints: gen_constraints,
2063                text_columns: gen_text_columns,
2064                exclude_columns: gen_exclude_columns,
2065                details: gen_details,
2066                external_reference: _,
2067            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2068
2069            if let Some(text_cols_option) = with_options
2070                .iter_mut()
2071                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2072            {
2073                match gen_text_columns {
2074                    Some(gen_text_columns) => {
2075                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2076                    }
2077                    None => soft_panic_or_log!(
2078                        "text_columns should be Some if text_cols_option is present"
2079                    ),
2080                }
2081            }
2082            if let Some(exclude_cols_option) = with_options
2083                .iter_mut()
2084                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2085            {
2086                match gen_exclude_columns {
2087                    Some(gen_exclude_columns) => {
2088                        exclude_cols_option.value =
2089                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2090                    }
2091                    None => soft_panic_or_log!(
2092                        "exclude_columns should be Some if exclude_cols_option is present"
2093                    ),
2094                }
2095            }
2096            match columns {
2097                TableFromSourceColumns::Defined(_) => unreachable!(),
2098                TableFromSourceColumns::NotSpecified => {
2099                    *columns = TableFromSourceColumns::Defined(gen_columns);
2100                    *constraints = gen_constraints;
2101                }
2102                TableFromSourceColumns::Named(_) => {
2103                    sql_bail!("columns cannot be named for MySQL sources")
2104                }
2105            }
2106            with_options.push(TableFromSourceOption {
2107                name: TableFromSourceOptionName::Details,
2108                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2109                    gen_details.into_proto().encode_to_vec(),
2110                )))),
2111            })
2112        }
2113        PurifiedExportDetails::SqlServer { .. } => {
2114            let sql_server::SqlServerExportStatementValues {
2115                columns: gen_columns,
2116                constraints: gen_constraints,
2117                text_columns: gen_text_columns,
2118                excl_columns: gen_excl_columns,
2119                details: gen_details,
2120                external_reference: _,
2121            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2122
2123            if let Some(text_cols_option) = with_options
2124                .iter_mut()
2125                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2126            {
2127                match gen_text_columns {
2128                    Some(gen_text_columns) => {
2129                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2130                    }
2131                    None => soft_panic_or_log!(
2132                        "text_columns should be Some if text_cols_option is present"
2133                    ),
2134                }
2135            }
2136            if let Some(exclude_cols_option) = with_options
2137                .iter_mut()
2138                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2139            {
2140                match gen_excl_columns {
2141                    Some(gen_excl_columns) => {
2142                        exclude_cols_option.value =
2143                            Some(WithOptionValue::Sequence(gen_excl_columns))
2144                    }
2145                    None => soft_panic_or_log!(
2146                        "excl_columns should be Some if excl_cols_option is present"
2147                    ),
2148                }
2149            }
2150
2151            match columns {
2152                TableFromSourceColumns::NotSpecified => {
2153                    *columns = TableFromSourceColumns::Defined(gen_columns);
2154                    *constraints = gen_constraints;
2155                }
2156                TableFromSourceColumns::Named(_) => {
2157                    sql_bail!("columns cannot be named for SQL Server sources")
2158                }
2159                TableFromSourceColumns::Defined(_) => unreachable!(),
2160            }
2161
2162            with_options.push(TableFromSourceOption {
2163                name: TableFromSourceOptionName::Details,
2164                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2165                    gen_details.into_proto().encode_to_vec(),
2166                )))),
2167            })
2168        }
2169        PurifiedExportDetails::LoadGenerator { .. } => {
2170            let (desc, output) = match purified_export.details {
2171                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2172                _ => unreachable!("purified export details must be load generator"),
2173            };
2174            // We only determine the table description for multi-output load generator sources here,
2175            // whereas single-output load generators will have their relation description
2176            // determined during statment planning as envelope and format options may affect their
2177            // schema.
2178            if let Some(desc) = desc {
2179                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2180                match columns {
2181                    TableFromSourceColumns::Defined(_) => unreachable!(),
2182                    TableFromSourceColumns::NotSpecified => {
2183                        *columns = TableFromSourceColumns::Defined(gen_columns);
2184                        *constraints = gen_constraints;
2185                    }
2186                    TableFromSourceColumns::Named(_) => {
2187                        sql_bail!("columns cannot be named for multi-output load generator sources")
2188                    }
2189                }
2190            }
2191            let details = SourceExportStatementDetails::LoadGenerator { output };
2192            with_options.push(TableFromSourceOption {
2193                name: TableFromSourceOptionName::Details,
2194                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2195                    details.into_proto().encode_to_vec(),
2196                )))),
2197            })
2198        }
2199        PurifiedExportDetails::Kafka {} => {
2200            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2201            // format field, so we don't specify any columns or constraints to be stored
2202            // on the statement here. The RelationDesc will be determined during planning.
2203            let details = SourceExportStatementDetails::Kafka {};
2204            with_options.push(TableFromSourceOption {
2205                name: TableFromSourceOptionName::Details,
2206                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2207                    details.into_proto().encode_to_vec(),
2208                )))),
2209            })
2210        }
2211    };
2212
2213    // TODO: We might as well use the retrieved available references to update the source
2214    // available references table in the catalog, so plumb this through.
2215    // available_source_references: retrieved_source_references.available_source_references(),
2216    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2217}
2218
2219enum SourceFormatOptions {
2220    Default,
2221    Kafka { topic: String },
2222}
2223
2224async fn purify_source_format(
2225    catalog: &dyn SessionCatalog,
2226    format: &mut Option<FormatSpecifier<Aug>>,
2227    options: &SourceFormatOptions,
2228    envelope: &Option<SourceEnvelope>,
2229    storage_configuration: &StorageConfiguration,
2230) -> Result<(), PlanError> {
2231    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2232        && !matches!(options, SourceFormatOptions::Kafka { .. })
2233    {
2234        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2235    }
2236
2237    match format.as_mut() {
2238        None => {}
2239        Some(FormatSpecifier::Bare(format)) => {
2240            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2241                .await?;
2242        }
2243
2244        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2245            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2246                .await?;
2247            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2248                .await?;
2249        }
2250    }
2251    Ok(())
2252}
2253
2254async fn purify_source_format_single(
2255    catalog: &dyn SessionCatalog,
2256    format: &mut Format<Aug>,
2257    options: &SourceFormatOptions,
2258    envelope: &Option<SourceEnvelope>,
2259    storage_configuration: &StorageConfiguration,
2260) -> Result<(), PlanError> {
2261    match format {
2262        Format::Avro(schema) => match schema {
2263            AvroSchema::Csr { csr_connection } => {
2264                purify_csr_connection_avro(
2265                    catalog,
2266                    options,
2267                    csr_connection,
2268                    envelope,
2269                    storage_configuration,
2270                )
2271                .await?
2272            }
2273            AvroSchema::InlineSchema { .. } => {}
2274        },
2275        Format::Protobuf(schema) => match schema {
2276            ProtobufSchema::Csr { csr_connection } => {
2277                purify_csr_connection_proto(
2278                    catalog,
2279                    options,
2280                    csr_connection,
2281                    envelope,
2282                    storage_configuration,
2283                )
2284                .await?;
2285            }
2286            ProtobufSchema::InlineSchema { .. } => {}
2287        },
2288        Format::Bytes
2289        | Format::Regex(_)
2290        | Format::Json { .. }
2291        | Format::Text
2292        | Format::Csv { .. } => (),
2293    }
2294    Ok(())
2295}
2296
2297pub fn generate_subsource_statements(
2298    scx: &StatementContext,
2299    source_name: ResolvedItemName,
2300    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2301) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2302    // get the first subsource to determine the connection type
2303    if subsources.is_empty() {
2304        return Ok(vec![]);
2305    }
2306    let (_, purified_export) = subsources.iter().next().unwrap();
2307
2308    let statements = match &purified_export.details {
2309        PurifiedExportDetails::Postgres { .. } => {
2310            crate::pure::postgres::generate_create_subsource_statements(
2311                scx,
2312                source_name,
2313                subsources,
2314            )?
2315        }
2316        PurifiedExportDetails::MySql { .. } => {
2317            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2318        }
2319        PurifiedExportDetails::SqlServer { .. } => {
2320            crate::pure::sql_server::generate_create_subsource_statements(
2321                scx,
2322                source_name,
2323                subsources,
2324            )?
2325        }
2326        PurifiedExportDetails::LoadGenerator { .. } => {
2327            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2328            for (subsource_name, purified_export) in subsources {
2329                let (desc, output) = match purified_export.details {
2330                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2331                    _ => unreachable!("purified export details must be load generator"),
2332                };
2333                let desc =
2334                    desc.expect("subsources cannot be generated for single-output load generators");
2335
2336                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2337                let details = SourceExportStatementDetails::LoadGenerator { output };
2338                // Create the subsource statement
2339                let subsource = CreateSubsourceStatement {
2340                    name: subsource_name,
2341                    columns,
2342                    of_source: Some(source_name.clone()),
2343                    // unlike sources that come from an external upstream, we
2344                    // have more leniency to introduce different constraints
2345                    // every time the load generator is run; i.e. we are not as
2346                    // worried about introducing junk data.
2347                    constraints: table_constraints,
2348                    if_not_exists: false,
2349                    with_options: vec![
2350                        CreateSubsourceOption {
2351                            name: CreateSubsourceOptionName::ExternalReference,
2352                            value: Some(WithOptionValue::UnresolvedItemName(
2353                                purified_export.external_reference,
2354                            )),
2355                        },
2356                        CreateSubsourceOption {
2357                            name: CreateSubsourceOptionName::Details,
2358                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2359                                details.into_proto().encode_to_vec(),
2360                            )))),
2361                        },
2362                    ],
2363                };
2364                subsource_stmts.push(subsource);
2365            }
2366
2367            subsource_stmts
2368        }
2369        PurifiedExportDetails::Kafka { .. } => {
2370            // TODO: as part of database-issues#8322, Kafka sources will begin
2371            // producing data––we'll need to understand the schema
2372            // of the output here.
2373            assert!(
2374                subsources.is_empty(),
2375                "Kafka sources do not produce data-bearing subsources"
2376            );
2377            vec![]
2378        }
2379    };
2380    Ok(statements)
2381}
2382
2383async fn purify_csr_connection_proto(
2384    catalog: &dyn SessionCatalog,
2385    options: &SourceFormatOptions,
2386    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2387    envelope: &Option<SourceEnvelope>,
2388    storage_configuration: &StorageConfiguration,
2389) -> Result<(), PlanError> {
2390    let SourceFormatOptions::Kafka { topic } = options else {
2391        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2392    };
2393
2394    let CsrConnectionProtobuf {
2395        seed,
2396        connection: CsrConnection {
2397            connection,
2398            options: _,
2399        },
2400    } = csr_connection;
2401    match seed {
2402        None => {
2403            let scx = StatementContext::new(None, &*catalog);
2404
2405            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2406                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2407                _ => sql_bail!("{} is not a schema registry connection", connection),
2408            };
2409
2410            let ccsr_client = ccsr_connection
2411                .connect(storage_configuration, InTask::No)
2412                .await
2413                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2414
2415            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2416            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2417                .await
2418                .ok();
2419
2420            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2421                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2422            }
2423
2424            *seed = Some(CsrSeedProtobuf { value, key });
2425        }
2426        Some(_) => (),
2427    }
2428
2429    Ok(())
2430}
2431
2432async fn purify_csr_connection_avro(
2433    catalog: &dyn SessionCatalog,
2434    options: &SourceFormatOptions,
2435    csr_connection: &mut CsrConnectionAvro<Aug>,
2436    envelope: &Option<SourceEnvelope>,
2437    storage_configuration: &StorageConfiguration,
2438) -> Result<(), PlanError> {
2439    let SourceFormatOptions::Kafka { topic } = options else {
2440        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2441    };
2442
2443    let CsrConnectionAvro {
2444        connection: CsrConnection { connection, .. },
2445        seed,
2446        key_strategy,
2447        value_strategy,
2448    } = csr_connection;
2449    if seed.is_none() {
2450        let scx = StatementContext::new(None, &*catalog);
2451        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2452            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2453            _ => sql_bail!("{} is not a schema registry connection", connection),
2454        };
2455        let ccsr_client = csr_connection
2456            .connect(storage_configuration, InTask::No)
2457            .await
2458            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2459
2460        let Schema {
2461            key_schema,
2462            value_schema,
2463            key_reference_schemas,
2464            value_reference_schemas,
2465        } = get_remote_csr_schema(
2466            &ccsr_client,
2467            key_strategy.clone().unwrap_or_default(),
2468            value_strategy.clone().unwrap_or_default(),
2469            topic,
2470        )
2471        .await?;
2472        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2473            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2474        }
2475
2476        *seed = Some(CsrSeedAvro {
2477            key_schema,
2478            value_schema,
2479            key_reference_schemas,
2480            value_reference_schemas,
2481        })
2482    }
2483
2484    Ok(())
2485}
2486
2487#[derive(Debug)]
2488pub struct Schema {
2489    pub key_schema: Option<String>,
2490    pub value_schema: String,
2491    /// Reference schemas for the key schema, in dependency order.
2492    pub key_reference_schemas: Vec<String>,
2493    /// Reference schemas for the value schema, in dependency order.
2494    pub value_reference_schemas: Vec<String>,
2495}
2496
2497/// Result of fetching a schema, including any referenced schemas.
2498struct SchemaWithReferences {
2499    /// The primary schema.
2500    schema: String,
2501    /// Reference schemas in dependency order.
2502    references: Vec<String>,
2503}
2504
2505async fn get_schema_with_strategy(
2506    client: &Client,
2507    strategy: ReaderSchemaSelectionStrategy,
2508    subject: &str,
2509) -> Result<Option<SchemaWithReferences>, PlanError> {
2510    match strategy {
2511        ReaderSchemaSelectionStrategy::Latest => {
2512            // Use get_subject_and_references to also fetch referenced schemas
2513            match client.get_subject_and_references(subject).await {
2514                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2515                    schema: primary.schema.raw,
2516                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2517                })),
2518                Err(GetBySubjectError::SubjectNotFound)
2519                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2520                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2521                    schema_lookup: format!("subject {}", subject.quoted()),
2522                    cause: Arc::new(e),
2523                }),
2524            }
2525        }
2526        // It's possible that CSR was provided with an inline strategy, but without a subject
2527        // or schema id to look up, there isn't a clean way to lookup references for the schema.
2528        // This could be done with extra work, but at this point, it isn't clear this is needed.
2529        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2530            schema: raw,
2531            references: vec![],
2532        })),
2533        ReaderSchemaSelectionStrategy::ById(id) => {
2534            match client.get_subject_and_references_by_id(id).await {
2535                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2536                    schema: primary.schema.raw,
2537                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2538                })),
2539                Err(GetBySubjectError::SubjectNotFound)
2540                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2541                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2542                    schema_lookup: format!("subject {}", subject.quoted()),
2543                    cause: Arc::new(e),
2544                }),
2545            }
2546        }
2547    }
2548}
2549
2550async fn get_remote_csr_schema(
2551    ccsr_client: &mz_ccsr::Client,
2552    key_strategy: ReaderSchemaSelectionStrategy,
2553    value_strategy: ReaderSchemaSelectionStrategy,
2554    topic: &str,
2555) -> Result<Schema, PlanError> {
2556    let value_schema_name = format!("{}-value", topic);
2557    let value_result =
2558        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2559    let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2560
2561    let key_subject = format!("{}-key", topic);
2562    let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2563    Ok(Schema {
2564        key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2565        value_schema: value_result.schema,
2566        key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2567        value_reference_schemas: value_result.references,
2568    })
2569}
2570
2571/// Collect protobuf message descriptor from CSR and compile the descriptor.
2572async fn compile_proto(
2573    subject_name: &String,
2574    ccsr_client: &Client,
2575) -> Result<CsrSeedProtobufSchema, PlanError> {
2576    let (primary_subject, dependency_subjects) = ccsr_client
2577        .get_subject_and_references(subject_name)
2578        .await
2579        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2580            schema_lookup: format!("subject {}", subject_name.quoted()),
2581            cause: Arc::new(e),
2582        })?;
2583
2584    // Compile .proto files into a file descriptor set.
2585    let mut source_tree = VirtualSourceTree::new();
2586
2587    // Add well-known types (e.g., google/protobuf/timestamp.proto) to the source
2588    // tree. These are implicitly available to protoc but are typically not
2589    // registered in the schema registry.
2590    source_tree.as_mut().map_well_known_types();
2591
2592    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2593        source_tree.as_mut().add_file(
2594            Path::new(&subject.name),
2595            subject.schema.raw.as_bytes().to_vec(),
2596        );
2597    }
2598    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2599    let fds = db
2600        .as_mut()
2601        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2602        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2603
2604    // Ensure there is exactly one message in the file.
2605    let primary_fd = fds.file(0);
2606    let message_name = match primary_fd.message_type_size() {
2607        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2608        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2609        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2610    };
2611
2612    // Encode the file descriptor set into a SQL byte string.
2613    let bytes = &fds
2614        .serialize()
2615        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2616    let mut schema = String::new();
2617    strconv::format_bytes(&mut schema, bytes);
2618
2619    Ok(CsrSeedProtobufSchema {
2620        schema,
2621        message_name,
2622    })
2623}
2624
2625const MZ_NOW_NAME: &str = "mz_now";
2626const MZ_NOW_SCHEMA: &str = "mz_catalog";
2627
2628/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2629/// references to ids appear or disappear during the purification.
2630///
2631/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2632/// this function is not making any network calls.
2633pub fn purify_create_materialized_view_options(
2634    catalog: impl SessionCatalog,
2635    mz_now: Option<Timestamp>,
2636    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2637    resolved_ids: &mut ResolvedIds,
2638) {
2639    // 0. Preparations:
2640    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2641    let (mz_now_id, mz_now_expr) = {
2642        let item = catalog
2643            .resolve_function(&PartialItemName {
2644                database: None,
2645                schema: Some(MZ_NOW_SCHEMA.to_string()),
2646                item: MZ_NOW_NAME.to_string(),
2647            })
2648            .expect("we should be able to resolve mz_now");
2649        (
2650            item.id(),
2651            Expr::Function(Function {
2652                name: ResolvedItemName::Item {
2653                    id: item.id(),
2654                    qualifiers: item.name().qualifiers.clone(),
2655                    full_name: catalog.resolve_full_name(item.name()),
2656                    print_id: false,
2657                    version: RelationVersionSelector::Latest,
2658                },
2659                args: FunctionArgs::Args {
2660                    args: Vec::new(),
2661                    order_by: Vec::new(),
2662                },
2663                filter: None,
2664                over: None,
2665                distinct: false,
2666            }),
2667        )
2668    };
2669    // Prepare the `mz_timestamp` type.
2670    let (mz_timestamp_id, mz_timestamp_type) = {
2671        let item = catalog.get_system_type("mz_timestamp");
2672        let full_name = catalog.resolve_full_name(item.name());
2673        (
2674            item.id(),
2675            ResolvedDataType::Named {
2676                id: item.id(),
2677                qualifiers: item.name().qualifiers.clone(),
2678                full_name,
2679                modifiers: vec![],
2680                print_id: true,
2681            },
2682        )
2683    };
2684
2685    let mut introduced_mz_timestamp = false;
2686
2687    for option in cmvs.with_options.iter_mut() {
2688        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2689        if matches!(
2690            option.value,
2691            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2692        ) {
2693            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2694                RefreshAtOptionValue {
2695                    time: mz_now_expr.clone(),
2696                },
2697            )));
2698        }
2699
2700        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2701        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2702            RefreshEveryOptionValue { aligned_to, .. },
2703        ))) = &mut option.value
2704        {
2705            if aligned_to.is_none() {
2706                *aligned_to = Some(mz_now_expr.clone());
2707            }
2708        }
2709
2710        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2711        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2712        match &mut option.value {
2713            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2714                time,
2715            }))) => {
2716                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2717                visitor.visit_expr_mut(time);
2718                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2719            }
2720            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2721                RefreshEveryOptionValue {
2722                    interval: _,
2723                    aligned_to: Some(aligned_to),
2724                },
2725            ))) => {
2726                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2727                visitor.visit_expr_mut(aligned_to);
2728                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2729            }
2730            _ => {}
2731        }
2732    }
2733
2734    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2735    if !cmvs.with_options.iter().any(|o| {
2736        matches!(
2737            o,
2738            MaterializedViewOption {
2739                value: Some(WithOptionValue::Refresh(..)),
2740                ..
2741            }
2742        )
2743    }) {
2744        cmvs.with_options.push(MaterializedViewOption {
2745            name: MaterializedViewOptionName::Refresh,
2746            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2747        })
2748    }
2749
2750    // 5. Attend to `resolved_ids`: The purification might have
2751    // - added references to `mz_timestamp`;
2752    // - removed references to `mz_now`.
2753    if introduced_mz_timestamp {
2754        resolved_ids.add_item(mz_timestamp_id);
2755    }
2756    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2757    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2758    // for `mz_now()` everywhere.
2759    let mut visitor = ExprContainsTemporalVisitor::new();
2760    visitor.visit_create_materialized_view_statement(cmvs);
2761    if !visitor.contains_temporal {
2762        resolved_ids.remove_item(&mz_now_id);
2763    }
2764}
2765
2766/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2767/// after purification.
2768pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2769    match &mvo.value {
2770        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2771            let mut visitor = ExprContainsTemporalVisitor::new();
2772            visitor.visit_expr(time);
2773            visitor.contains_temporal
2774        }
2775        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2776            interval: _,
2777            aligned_to: Some(aligned_to),
2778        }))) => {
2779            let mut visitor = ExprContainsTemporalVisitor::new();
2780            visitor.visit_expr(aligned_to);
2781            visitor.contains_temporal
2782        }
2783        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2784            interval: _,
2785            aligned_to: None,
2786        }))) => {
2787            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2788            // `ALIGNED TO` to `mz_now()`.
2789            true
2790        }
2791        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2792            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2793            true
2794        }
2795        _ => false,
2796    }
2797}
2798
2799/// Determines whether the AST involves `mz_now()`.
2800struct ExprContainsTemporalVisitor {
2801    pub contains_temporal: bool,
2802}
2803
2804impl ExprContainsTemporalVisitor {
2805    pub fn new() -> ExprContainsTemporalVisitor {
2806        ExprContainsTemporalVisitor {
2807            contains_temporal: false,
2808        }
2809    }
2810}
2811
2812impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2813    fn visit_function(&mut self, func: &Function<Aug>) {
2814        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2815        visit_function(self, func);
2816    }
2817}
2818
2819struct MzNowPurifierVisitor {
2820    pub mz_now: Option<Timestamp>,
2821    pub mz_timestamp_type: ResolvedDataType,
2822    pub introduced_mz_timestamp: bool,
2823}
2824
2825impl MzNowPurifierVisitor {
2826    pub fn new(
2827        mz_now: Option<Timestamp>,
2828        mz_timestamp_type: ResolvedDataType,
2829    ) -> MzNowPurifierVisitor {
2830        MzNowPurifierVisitor {
2831            mz_now,
2832            mz_timestamp_type,
2833            introduced_mz_timestamp: false,
2834        }
2835    }
2836}
2837
2838impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2839    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2840        match expr {
2841            Expr::Function(Function {
2842                name:
2843                    ResolvedItemName::Item {
2844                        full_name: FullItemName { item, .. },
2845                        ..
2846                    },
2847                ..
2848            }) if item == &MZ_NOW_NAME.to_string() => {
2849                let mz_now = self.mz_now.expect(
2850                    "we should have chosen a timestamp if the expression contains mz_now()",
2851                );
2852                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2853                // not alter the type of the expression.
2854                *expr = Expr::Cast {
2855                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2856                    data_type: self.mz_timestamp_type.clone(),
2857                };
2858                self.introduced_mz_timestamp = true;
2859            }
2860            _ => visit_expr_mut(self, expr),
2861        }
2862    }
2863}