Skip to main content

mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81    ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103    external_reference: UnresolvedItemName,
104    name: UnresolvedItemName,
105    meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        f.debug_struct("RequestedSourceExport")
111            .field("external_reference", &self.external_reference)
112            .field("name", &self.name)
113            .field("meta", &self.meta)
114            .finish()
115    }
116}
117
118impl<T> RequestedSourceExport<T> {
119    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120        RequestedSourceExport {
121            external_reference: self.external_reference,
122            name: self.name,
123            meta: new_meta,
124        }
125    }
126}
127
128/// Generates a subsource name by prepending source schema name if present
129///
130/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
131/// so that it's generated in the same schema as source
132fn source_export_name_gen(
133    source_name: &UnresolvedItemName,
134    subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137    partial.item = subsource_name.to_string();
138    Ok(UnresolvedItemName::from(partial))
139}
140
141// TODO(database-issues#8620): Remove once subsources are removed
142/// Validates the requested subsources do not have name conflicts with each other
143/// and that the same upstream table is not referenced multiple times.
144fn validate_source_export_names<T>(
145    requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147    // This condition would get caught during the catalog transaction, but produces a
148    // vague, non-contextual error. Instead, error here so we can suggest to the user
149    // how to fix the problem.
150    if let Some(name) = requested_source_exports
151        .iter()
152        .map(|subsource| &subsource.name)
153        .duplicates()
154        .next()
155        .cloned()
156    {
157        let mut upstream_references: Vec<_> = requested_source_exports
158            .into_iter()
159            .filter_map(|subsource| {
160                if &subsource.name == &name {
161                    Some(subsource.external_reference.clone())
162                } else {
163                    None
164                }
165            })
166            .collect();
167
168        upstream_references.sort();
169
170        Err(PlanError::SubsourceNameConflict {
171            name,
172            upstream_references,
173        })?;
174    }
175
176    // We disallow subsource statements from referencing the same upstream table, but we allow
177    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
178    // purification will only provide 1 `requested_source_export`, we can leave this here without
179    // needing to differentiate between the two types of statements.
180    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
181    if let Some(name) = requested_source_exports
182        .iter()
183        .map(|export| &export.external_reference)
184        .duplicates()
185        .next()
186        .cloned()
187    {
188        let mut target_names: Vec<_> = requested_source_exports
189            .into_iter()
190            .filter_map(|export| {
191                if &export.external_reference == &name {
192                    Some(export.name.clone())
193                } else {
194                    None
195                }
196            })
197            .collect();
198
199        target_names.sort();
200
201        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202    }
203
204    Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209    PurifiedCreateSource {
210        // The progress subsource, if we are offloading progress info to a separate relation
211        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212        create_source_stmt: CreateSourceStatement<Aug>,
213        // Map of subsource names to external details
214        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215        /// All the available upstream references that can be added as tables
216        /// to this primary source.
217        available_source_references: SourceReferences,
218    },
219    PurifiedAlterSource {
220        alter_source_stmt: AlterSourceStatement<Aug>,
221    },
222    PurifiedAlterSourceAddSubsources {
223        // This just saves us an annoying catalog lookup
224        source_name: ResolvedItemName,
225        /// Options that we will need the values of to update the source's
226        /// definition.
227        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228        // Map of subsource names to external details
229        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230    },
231    PurifiedAlterSourceRefreshReferences {
232        source_name: ResolvedItemName,
233        /// The updated available upstream references for the primary source.
234        available_source_references: SourceReferences,
235    },
236    PurifiedCreateSink(CreateSinkStatement<Aug>),
237    PurifiedCreateTableFromSource {
238        stmt: CreateTableFromSourceStatement<Aug>,
239    },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244    pub external_reference: UnresolvedItemName,
245    pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250    MySql {
251        table: MySqlTableDesc,
252        text_columns: Option<Vec<Ident>>,
253        exclude_columns: Option<Vec<Ident>>,
254        initial_gtid_set: String,
255    },
256    Postgres {
257        table: PostgresTableDesc,
258        text_columns: Option<Vec<Ident>>,
259        exclude_columns: Option<Vec<Ident>>,
260    },
261    SqlServer {
262        table: SqlServerTableDesc,
263        text_columns: Option<Vec<Ident>>,
264        excl_columns: Option<Vec<Ident>>,
265        capture_instance: Arc<str>,
266        initial_lsn: mz_sql_server_util::cdc::Lsn,
267    },
268    Kafka {},
269    LoadGenerator {
270        table: Option<RelationDesc>,
271        output: LoadGeneratorOutput,
272    },
273}
274
275/// Purifies a statement, removing any dependencies on external state.
276///
277/// See the section on [purification](crate#purification) in the crate
278/// documentation for details.
279///
280/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
281/// handled by [purify_create_materialized_view_options] instead.
282/// This could be made more consistent by a refactoring discussed here:
283/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
284pub async fn purify_statement(
285    catalog: impl SessionCatalog,
286    now: u64,
287    stmt: Statement<Aug>,
288    storage_configuration: &StorageConfiguration,
289) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
290    match stmt {
291        Statement::CreateSource(stmt) => {
292            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
293            (
294                purify_create_source(catalog, now, stmt, storage_configuration).await,
295                cluster_id,
296            )
297        }
298        Statement::AlterSource(stmt) => (
299            purify_alter_source(catalog, stmt, storage_configuration).await,
300            None,
301        ),
302        Statement::CreateSink(stmt) => {
303            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
304            (
305                purify_create_sink(catalog, stmt, storage_configuration).await,
306                cluster_id,
307            )
308        }
309        Statement::CreateTableFromSource(stmt) => (
310            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
311            None,
312        ),
313        o => (
314            Err(internal_err!(
315                "unexpected statement type in purification: {:?}",
316                o
317            )),
318            None,
319        ),
320    }
321}
322
323/// Injects `DOC ON` comments into all Avro formats that are using a schema
324/// registry by finding all SQL `COMMENT`s that are attached to the sink's
325/// underlying materialized view (as well as any types referenced by that
326/// underlying materialized view).
327pub(crate) fn purify_create_sink_avro_doc_on_options(
328    catalog: &dyn SessionCatalog,
329    from_id: CatalogItemId,
330    format: &mut Option<FormatSpecifier<Aug>>,
331) -> Result<(), PlanError> {
332    // Collect all objects referenced by the sink.
333    let from = catalog.get_item(&from_id);
334    let object_ids = from
335        .references()
336        .items()
337        .copied()
338        .chain_one(from.id())
339        .collect::<Vec<_>>();
340
341    // Collect all Avro formats that use a schema registry, as well as a set of
342    // all identifiers named in user-provided `DOC ON` options.
343    let mut avro_format_options = vec![];
344    for_each_format(format, |doc_on_schema, fmt| match fmt {
345        Format::Avro(AvroSchema::InlineSchema { .. })
346        | Format::Bytes
347        | Format::Csv { .. }
348        | Format::Json { .. }
349        | Format::Protobuf(..)
350        | Format::Regex(..)
351        | Format::Text => (),
352        Format::Avro(AvroSchema::Csr {
353            csr_connection: CsrConnectionAvro { connection, .. },
354        }) => {
355            avro_format_options.push((doc_on_schema, &mut connection.options));
356        }
357    });
358
359    // For each Avro format in the sink, inject the appropriate `DOC ON` options
360    // for each item referenced by the sink.
361    for (for_schema, options) in avro_format_options {
362        let user_provided_comments = options
363            .iter()
364            .filter_map(|CsrConfigOption { name, .. }| match name {
365                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
366                _ => None,
367            })
368            .collect::<BTreeSet<_>>();
369
370        // Adding existing comments if not already provided by user
371        for object_id in &object_ids {
372            // Always add comments to the latest version of the item.
373            let item = catalog
374                .get_item(object_id)
375                .at_version(RelationVersionSelector::Latest);
376            let full_resolved_name = ResolvedItemName::Item {
377                id: *object_id,
378                qualifiers: item.name().qualifiers.clone(),
379                full_name: catalog.resolve_full_name(item.name()),
380                print_id: !matches!(
381                    item.item_type(),
382                    CatalogItemType::Func | CatalogItemType::Type
383                ),
384                version: RelationVersionSelector::Latest,
385            };
386
387            if let Some(comments_map) = catalog.get_item_comments(object_id) {
388                // Attach comment for the item itself, if the user has not
389                // already provided an overriding `DOC ON` option for the item.
390                let doc_on_item_key = AvroDocOn {
391                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
392                    for_schema,
393                };
394                if !user_provided_comments.contains(&doc_on_item_key) {
395                    if let Some(root_comment) = comments_map.get(&None) {
396                        options.push(CsrConfigOption {
397                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
398                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
399                                root_comment.clone(),
400                            ))),
401                        });
402                    }
403                }
404
405                // Attach comment for each column in the item, if the user has
406                // not already provided an overriding `DOC ON` option for the
407                // column.
408                let column_descs = match item.type_details() {
409                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
410                    None => item.relation_desc().map(|d| d.into_owned()),
411                };
412
413                if let Some(desc) = column_descs {
414                    for (pos, column_name) in desc.iter_names().enumerate() {
415                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
416                            let doc_on_column_key = AvroDocOn {
417                                identifier: DocOnIdentifier::Column(ColumnName {
418                                    relation: full_resolved_name.clone(),
419                                    column: ResolvedColumnReference::Column {
420                                        name: column_name.to_owned(),
421                                        index: pos,
422                                    },
423                                }),
424                                for_schema,
425                            };
426                            if !user_provided_comments.contains(&doc_on_column_key) {
427                                options.push(CsrConfigOption {
428                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
429                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
430                                        Value::String(comment_str.clone()),
431                                    )),
432                                });
433                            }
434                        }
435                    }
436                }
437            }
438        }
439    }
440
441    Ok(())
442}
443
444/// Checks that the sink described in the statement can connect to its external
445/// resources.
446async fn purify_create_sink(
447    catalog: impl SessionCatalog,
448    mut create_sink_stmt: CreateSinkStatement<Aug>,
449    storage_configuration: &StorageConfiguration,
450) -> Result<PurifiedStatement, PlanError> {
451    // General purification
452    let CreateSinkStatement {
453        connection,
454        format,
455        with_options,
456        name: _,
457        in_cluster: _,
458        if_not_exists: _,
459        from,
460        envelope: _,
461        mode: _,
462    } = &mut create_sink_stmt;
463
464    // The list of options that the user is allowed to specify.
465    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
466        CreateSinkOptionName::Snapshot,
467        CreateSinkOptionName::CommitInterval,
468    ];
469
470    if let Some(op) = with_options
471        .iter()
472        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
473    {
474        sql_bail!(
475            "CREATE SINK...WITH ({}..) is not allowed",
476            op.name.to_ast_string_simple(),
477        )
478    }
479
480    match &connection {
481        CreateSinkConnection::Kafka {
482            connection,
483            options,
484            key: _,
485            headers: _,
486        } => {
487            // We must not leave any state behind in the Kafka broker, so just ensure that
488            // we can connect. This means we don't ensure that we can create the topic and
489            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
490            // preferable to leaking state in users' environments.
491            let scx = StatementContext::new(None, &catalog);
492            let connection = {
493                let item = scx.get_item_by_resolved_name(connection)?;
494                // Get Kafka connection
495                match item.connection()? {
496                    Connection::Kafka(connection) => {
497                        connection.clone().into_inline_connection(scx.catalog)
498                    }
499                    _ => sql_bail!(
500                        "{} is not a kafka connection",
501                        scx.catalog.resolve_full_name(item.name())
502                    ),
503                }
504            };
505
506            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
507
508            if extracted_options.legacy_ids == Some(true) {
509                sql_bail!("LEGACY IDs option is not supported");
510            }
511
512            let client: AdminClient<_> = connection
513                .create_with_context(
514                    storage_configuration,
515                    MzClientContext::default(),
516                    &BTreeMap::new(),
517                    InTask::No,
518                )
519                .await
520                .map_err(|e| {
521                    // anyhow doesn't support Clone, so not trivial to move into PlanError
522                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
523                })?;
524
525            let metadata = client
526                .inner()
527                .fetch_metadata(
528                    None,
529                    storage_configuration
530                        .parameters
531                        .kafka_timeout_config
532                        .fetch_metadata_timeout,
533                )
534                .map_err(|e| {
535                    KafkaSinkPurificationError::AdminClientError(Arc::new(
536                        ContextCreationError::KafkaError(e),
537                    ))
538                })?;
539
540            if metadata.brokers().len() == 0 {
541                Err(KafkaSinkPurificationError::ZeroBrokers)?;
542            }
543        }
544        CreateSinkConnection::Iceberg {
545            connection,
546            aws_connection,
547            ..
548        } => {
549            let scx = StatementContext::new(None, &catalog);
550            let connection = {
551                let item = scx.get_item_by_resolved_name(connection)?;
552                // Get Iceberg connection
553                match item.connection()? {
554                    Connection::IcebergCatalog(connection) => {
555                        connection.clone().into_inline_connection(scx.catalog)
556                    }
557                    _ => sql_bail!(
558                        "{} is not an iceberg connection",
559                        scx.catalog.resolve_full_name(item.name())
560                    ),
561                }
562            };
563
564            let aws_conn_id = aws_connection.item_id();
565
566            let aws_connection = {
567                let item = scx.get_item_by_resolved_name(aws_connection)?;
568                // Get AWS connection
569                match item.connection()? {
570                    Connection::Aws(aws_connection) => aws_connection.clone(),
571                    _ => sql_bail!(
572                        "{} is not an aws connection",
573                        scx.catalog.resolve_full_name(item.name())
574                    ),
575                }
576            };
577
578            // For S3 Tables connections in the Materialize Cloud product, verify the
579            // AWS region matches the environment's region. This check only applies when
580            // the enable_s3_tables_region_check dyncfg is set.
581            if let Some(s3tables) = connection.s3tables_catalog() {
582                let enable_region_check =
583                    ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
584                if enable_region_check {
585                    let env_id = &catalog.config().environment_id;
586                    if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
587                        let env_region = env_id.cloud_provider_region();
588                        // Later on we default to "us-east-1" if the region is not set on the S3 Tables
589                        // connection, so we need to do the same check here.
590                        let s3_tables_region = s3tables
591                            .aws_connection
592                            .connection
593                            .region
594                            .clone()
595                            .unwrap_or_else(|| "us-east-1".to_string());
596                        if s3_tables_region != env_region {
597                            Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
598                                s3_tables_region,
599                                environment_region: env_region.to_string(),
600                            })?;
601                        }
602                    }
603                }
604            }
605
606            let _catalog = connection
607                .connect(storage_configuration, InTask::No)
608                .await
609                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
610
611            let _sdk_config = aws_connection
612                .load_sdk_config(
613                    &storage_configuration.connection_context,
614                    aws_conn_id.clone(),
615                    InTask::No,
616                )
617                .await
618                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
619        }
620    }
621
622    let mut csr_connection_ids = BTreeSet::new();
623    for_each_format(format, |_, fmt| match fmt {
624        Format::Avro(AvroSchema::InlineSchema { .. })
625        | Format::Bytes
626        | Format::Csv { .. }
627        | Format::Json { .. }
628        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
629        | Format::Regex(..)
630        | Format::Text => (),
631        Format::Avro(AvroSchema::Csr {
632            csr_connection: CsrConnectionAvro { connection, .. },
633        })
634        | Format::Protobuf(ProtobufSchema::Csr {
635            csr_connection: CsrConnectionProtobuf { connection, .. },
636        }) => {
637            csr_connection_ids.insert(*connection.connection.item_id());
638        }
639    });
640
641    let scx = StatementContext::new(None, &catalog);
642    for csr_connection_id in csr_connection_ids {
643        let connection = {
644            let item = scx.get_item(&csr_connection_id);
645            // Get Kafka connection
646            match item.connection()? {
647                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
648                _ => Err(CsrPurificationError::NotCsrConnection(
649                    scx.catalog.resolve_full_name(item.name()),
650                ))?,
651            }
652        };
653
654        let client = connection
655            .connect(storage_configuration, InTask::No)
656            .await
657            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
658
659        client
660            .list_subjects()
661            .await
662            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
663    }
664
665    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
666
667    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
668}
669
670/// Runs a function on each format within the format specifier.
671///
672/// The function is provided with a `DocOnSchema` that indicates whether the
673/// format is for the key, value, or both.
674///
675// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
676fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
677where
678    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
679{
680    match format {
681        None => (),
682        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
683        Some(FormatSpecifier::KeyValue { key, value }) => {
684            f(DocOnSchema::KeyOnly, key);
685            f(DocOnSchema::ValueOnly, value);
686        }
687    }
688}
689
690/// Defines whether purification should enforce that at least one valid source
691/// reference is provided on the provided statement.
692#[derive(Debug, Copy, Clone, PartialEq, Eq)]
693pub(crate) enum SourceReferencePolicy {
694    /// Don't allow source references to be provided. This is used for
695    /// enforcing that `CREATE SOURCE` statements don't create subsources.
696    NotAllowed,
697    /// Allow empty references, such as when creating a source that
698    /// will have tables added afterwards.
699    Optional,
700    /// Require that at least one reference is resolved to an upstream
701    /// object.
702    Required,
703}
704
705async fn purify_create_source(
706    catalog: impl SessionCatalog,
707    now: u64,
708    mut create_source_stmt: CreateSourceStatement<Aug>,
709    storage_configuration: &StorageConfiguration,
710) -> Result<PurifiedStatement, PlanError> {
711    let CreateSourceStatement {
712        name: source_name,
713        col_names,
714        key_constraint,
715        connection: source_connection,
716        format,
717        envelope,
718        include_metadata,
719        external_references,
720        progress_subsource,
721        with_options,
722        ..
723    } = &mut create_source_stmt;
724
725    let uses_old_syntax = !col_names.is_empty()
726        || key_constraint.is_some()
727        || format.is_some()
728        || envelope.is_some()
729        || !include_metadata.is_empty()
730        || external_references.is_some()
731        || progress_subsource.is_some();
732
733    if let Some(DeferredItemName::Named(_)) = progress_subsource {
734        sql_bail!("Cannot manually ID qualify progress subsource")
735    }
736
737    let mut requested_subsource_map = BTreeMap::new();
738
739    let progress_desc = match &source_connection {
740        CreateSourceConnection::Kafka { .. } => {
741            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
742        }
743        CreateSourceConnection::Postgres { .. } => {
744            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
745        }
746        CreateSourceConnection::SqlServer { .. } => {
747            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
748        }
749        CreateSourceConnection::MySql { .. } => {
750            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
751        }
752        CreateSourceConnection::LoadGenerator { .. } => {
753            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
754        }
755    };
756    let scx = StatementContext::new(None, &catalog);
757
758    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
759    // to add tables after this source is created we might need to enforce that
760    // auto-generated subsources are created or not created by this source statement.
761    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
762        && scx.catalog.system_vars().force_source_table_syntax()
763    {
764        SourceReferencePolicy::NotAllowed
765    } else if scx.catalog.system_vars().enable_create_table_from_source() {
766        SourceReferencePolicy::Optional
767    } else {
768        SourceReferencePolicy::Required
769    };
770
771    let mut format_options = SourceFormatOptions::Default;
772
773    let retrieved_source_references: RetrievedSourceReferences;
774
775    match source_connection {
776        CreateSourceConnection::Kafka {
777            connection,
778            options: base_with_options,
779            ..
780        } => {
781            if let Some(external_references) = external_references {
782                Err(KafkaSourcePurificationError::ReferencedSubsources(
783                    external_references.clone(),
784                ))?;
785            }
786
787            let connection = {
788                let item = scx.get_item_by_resolved_name(connection)?;
789                // Get Kafka connection
790                match item.connection()? {
791                    Connection::Kafka(connection) => {
792                        connection.clone().into_inline_connection(&catalog)
793                    }
794                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
795                        scx.catalog.resolve_full_name(item.name()),
796                    ))?,
797                }
798            };
799
800            let extracted_options: KafkaSourceConfigOptionExtracted =
801                base_with_options.clone().try_into()?;
802
803            let topic = extracted_options
804                .topic
805                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
806
807            let consumer = connection
808                .create_with_context(
809                    storage_configuration,
810                    MzClientContext::default(),
811                    &BTreeMap::new(),
812                    InTask::No,
813                )
814                .await
815                .map_err(|e| {
816                    // anyhow doesn't support Clone, so not trivial to move into PlanError
817                    KafkaSourcePurificationError::KafkaConsumerError(
818                        e.display_with_causes().to_string(),
819                    )
820                })?;
821            let consumer = Arc::new(consumer);
822
823            match (
824                extracted_options.start_offset,
825                extracted_options.start_timestamp,
826            ) {
827                (None, None) => {
828                    // Validate that the topic at least exists.
829                    kafka_util::ensure_topic_exists(
830                        Arc::clone(&consumer),
831                        &topic,
832                        storage_configuration
833                            .parameters
834                            .kafka_timeout_config
835                            .fetch_metadata_timeout,
836                    )
837                    .await?;
838                }
839                (Some(_), Some(_)) => {
840                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
841                }
842                (Some(start_offsets), None) => {
843                    // Validate the start offsets.
844                    kafka_util::validate_start_offsets(
845                        Arc::clone(&consumer),
846                        &topic,
847                        start_offsets,
848                        storage_configuration
849                            .parameters
850                            .kafka_timeout_config
851                            .fetch_metadata_timeout,
852                    )
853                    .await?;
854                }
855                (None, Some(time_offset)) => {
856                    // Translate `START TIMESTAMP` to a start offset.
857                    let start_offsets = kafka_util::lookup_start_offsets(
858                        Arc::clone(&consumer),
859                        &topic,
860                        time_offset,
861                        now,
862                        storage_configuration
863                            .parameters
864                            .kafka_timeout_config
865                            .fetch_metadata_timeout,
866                    )
867                    .await?;
868
869                    base_with_options.retain(|val| {
870                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
871                    });
872                    base_with_options.push(KafkaSourceConfigOption {
873                        name: KafkaSourceConfigOptionName::StartOffset,
874                        value: Some(WithOptionValue::Sequence(
875                            start_offsets
876                                .iter()
877                                .map(|offset| {
878                                    WithOptionValue::Value(Value::Number(offset.to_string()))
879                                })
880                                .collect(),
881                        )),
882                    });
883                }
884            }
885
886            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
887            retrieved_source_references = reference_client.get_source_references().await?;
888
889            format_options = SourceFormatOptions::Kafka { topic };
890        }
891        CreateSourceConnection::Postgres {
892            connection,
893            options,
894        } => {
895            let connection_item = scx.get_item_by_resolved_name(connection)?;
896            let connection = match connection_item.connection().map_err(PlanError::from)? {
897                Connection::Postgres(connection) => {
898                    connection.clone().into_inline_connection(&catalog)
899                }
900                _ => Err(PgSourcePurificationError::NotPgConnection(
901                    scx.catalog.resolve_full_name(connection_item.name()),
902                ))?,
903            };
904            let crate::plan::statement::PgConfigOptionExtracted {
905                publication,
906                text_columns,
907                exclude_columns,
908                details,
909                ..
910            } = options.clone().try_into()?;
911            let publication =
912                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
913
914            if details.is_some() {
915                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
916            }
917
918            let client = connection
919                .validate(connection_item.id(), storage_configuration)
920                .await?;
921
922            let reference_client = SourceReferenceClient::Postgres {
923                client: &client,
924                publication: &publication,
925                database: &connection.database,
926            };
927            retrieved_source_references = reference_client.get_source_references().await?;
928
929            let postgres::PurifiedSourceExports {
930                source_exports: subsources,
931                normalized_text_columns,
932            } = postgres::purify_source_exports(
933                &client,
934                &retrieved_source_references,
935                external_references,
936                text_columns,
937                exclude_columns,
938                source_name,
939                &reference_policy,
940            )
941            .await?;
942
943            if let Some(text_cols_option) = options
944                .iter_mut()
945                .find(|option| option.name == PgConfigOptionName::TextColumns)
946            {
947                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
948            }
949
950            requested_subsource_map.extend(subsources);
951
952            // Record the active replication timeline_id to allow detection of a future upstream
953            // point-in-time-recovery that will put the source into an error state.
954            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
955
956            // Remove any old detail references
957            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
958            let details = PostgresSourcePublicationDetails {
959                slot: format!(
960                    "materialize_{}",
961                    Uuid::new_v4().to_string().replace('-', "")
962                ),
963                timeline_id: Some(timeline_id),
964                database: connection.database,
965            };
966            options.push(PgConfigOption {
967                name: PgConfigOptionName::Details,
968                value: Some(WithOptionValue::Value(Value::String(hex::encode(
969                    details.into_proto().encode_to_vec(),
970                )))),
971            })
972        }
973        CreateSourceConnection::SqlServer {
974            connection,
975            options,
976        } => {
977            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
978
979            // Connect to the upstream SQL Server instance so we can validate
980            // we're compatible with CDC.
981            let connection_item = scx.get_item_by_resolved_name(connection)?;
982            let connection = match connection_item.connection()? {
983                Connection::SqlServer(connection) => {
984                    connection.clone().into_inline_connection(&catalog)
985                }
986                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
987                    scx.catalog.resolve_full_name(connection_item.name()),
988                ))?,
989            };
990            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
991                details,
992                text_columns,
993                exclude_columns,
994                seen: _,
995            } = options.clone().try_into()?;
996
997            if details.is_some() {
998                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
999            }
1000
1001            let mut client = connection
1002                .validate(connection_item.id(), storage_configuration)
1003                .await?;
1004
1005            let database: Arc<str> = connection.database.into();
1006            let reference_client = SourceReferenceClient::SqlServer {
1007                client: &mut client,
1008                database: Arc::clone(&database),
1009            };
1010            retrieved_source_references = reference_client.get_source_references().await?;
1011            tracing::debug!(?retrieved_source_references, "got source references");
1012
1013            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1014                .get(storage_configuration.config_set());
1015
1016            let purified_source_exports = sql_server::purify_source_exports(
1017                &*database,
1018                &mut client,
1019                &retrieved_source_references,
1020                external_references,
1021                &text_columns,
1022                &exclude_columns,
1023                source_name,
1024                timeout,
1025                &reference_policy,
1026            )
1027            .await?;
1028
1029            let sql_server::PurifiedSourceExports {
1030                source_exports,
1031                normalized_text_columns,
1032                normalized_excl_columns,
1033            } = purified_source_exports;
1034
1035            // Update our set of requested source exports.
1036            requested_subsource_map.extend(source_exports);
1037
1038            // Record the most recent restore_history_id, or none if the system has never been
1039            // restored.
1040
1041            let restore_history_id =
1042                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1043            let details = SqlServerSourceExtras { restore_history_id };
1044
1045            options.retain(|SqlServerConfigOption { name, .. }| {
1046                name != &SqlServerConfigOptionName::Details
1047            });
1048            options.push(SqlServerConfigOption {
1049                name: SqlServerConfigOptionName::Details,
1050                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1051                    details.into_proto().encode_to_vec(),
1052                )))),
1053            });
1054
1055            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1056            if let Some(text_cols_option) = options
1057                .iter_mut()
1058                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1059            {
1060                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1061            }
1062            if let Some(excl_cols_option) = options
1063                .iter_mut()
1064                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1065            {
1066                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1067            }
1068        }
1069        CreateSourceConnection::MySql {
1070            connection,
1071            options,
1072        } => {
1073            let connection_item = scx.get_item_by_resolved_name(connection)?;
1074            let connection = match connection_item.connection()? {
1075                Connection::MySql(connection) => {
1076                    connection.clone().into_inline_connection(&catalog)
1077                }
1078                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1079                    scx.catalog.resolve_full_name(connection_item.name()),
1080                ))?,
1081            };
1082            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1083                details,
1084                text_columns,
1085                exclude_columns,
1086                seen: _,
1087            } = options.clone().try_into()?;
1088
1089            if details.is_some() {
1090                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1091            }
1092
1093            let mut conn = connection
1094                .validate(connection_item.id(), storage_configuration)
1095                .await
1096                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1097
1098            // Retrieve the current @gtid_executed value of the server to mark as the effective
1099            // initial snapshot point such that we can ensure consistency if the initial source
1100            // snapshot is broken up over multiple points in time.
1101            let initial_gtid_set =
1102                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1103
1104            let reference_client = SourceReferenceClient::MySql {
1105                conn: &mut conn,
1106                include_system_schemas: mysql::references_system_schemas(external_references),
1107            };
1108            retrieved_source_references = reference_client.get_source_references().await?;
1109
1110            let mysql::PurifiedSourceExports {
1111                source_exports: subsources,
1112                normalized_text_columns,
1113                normalized_exclude_columns,
1114            } = mysql::purify_source_exports(
1115                &mut conn,
1116                &retrieved_source_references,
1117                external_references,
1118                text_columns,
1119                exclude_columns,
1120                source_name,
1121                initial_gtid_set.clone(),
1122                &reference_policy,
1123            )
1124            .await?;
1125            requested_subsource_map.extend(subsources);
1126
1127            // We don't have any fields in this details struct but keep this around for
1128            // conformity with postgres and in-case we end up needing it again in the future.
1129            let details = MySqlSourceDetails {};
1130            // Update options with the purified details
1131            options
1132                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1133            options.push(MySqlConfigOption {
1134                name: MySqlConfigOptionName::Details,
1135                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1136                    details.into_proto().encode_to_vec(),
1137                )))),
1138            });
1139
1140            if let Some(text_cols_option) = options
1141                .iter_mut()
1142                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1143            {
1144                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1145            }
1146            if let Some(exclude_cols_option) = options
1147                .iter_mut()
1148                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1149            {
1150                exclude_cols_option.value =
1151                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1152            }
1153        }
1154        CreateSourceConnection::LoadGenerator { generator, options } => {
1155            let load_generator =
1156                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1157
1158            let reference_client = SourceReferenceClient::LoadGenerator {
1159                generator: &load_generator,
1160            };
1161            retrieved_source_references = reference_client.get_source_references().await?;
1162            // Filter to the references that need to be created as 'subsources', which
1163            // doesn't include the default output for single-output sources.
1164            // TODO(database-issues#8620): Remove once subsources are removed
1165            let multi_output_sources =
1166                retrieved_source_references
1167                    .all_references()
1168                    .iter()
1169                    .any(|r| {
1170                        matches!(
1171                            r.load_generator_output(),
1172                            Some(output) if output != &LoadGeneratorOutput::Default
1173                        )
1174                    });
1175
1176            match external_references {
1177                Some(requested)
1178                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1179                {
1180                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1181                }
1182                Some(requested) if !multi_output_sources => match requested {
1183                    ExternalReferences::SubsetTables(_) => {
1184                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1185                    }
1186                    ExternalReferences::SubsetSchemas(_) => {
1187                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1188                    }
1189                    ExternalReferences::All => {
1190                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1191                    }
1192                },
1193                Some(requested) => {
1194                    let requested_exports = retrieved_source_references
1195                        .requested_source_exports(Some(requested), source_name)?;
1196                    for export in requested_exports {
1197                        requested_subsource_map.insert(
1198                            export.name,
1199                            PurifiedSourceExport {
1200                                external_reference: export.external_reference,
1201                                details: PurifiedExportDetails::LoadGenerator {
1202                                    table: export
1203                                        .meta
1204                                        .load_generator_desc()
1205                                        .ok_or_else(|| {
1206                                            internal_err!(
1207                                                "expected load generator source reference"
1208                                            )
1209                                        })?
1210                                        .clone(),
1211                                    output: export
1212                                        .meta
1213                                        .load_generator_output()
1214                                        .ok_or_else(|| {
1215                                            internal_err!(
1216                                                "expected load generator source reference"
1217                                            )
1218                                        })?
1219                                        .clone(),
1220                                },
1221                            },
1222                        );
1223                    }
1224                }
1225                None => {
1226                    if multi_output_sources
1227                        && matches!(reference_policy, SourceReferencePolicy::Required)
1228                    {
1229                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1230                    }
1231                }
1232            }
1233
1234            if let LoadGenerator::Clock = generator {
1235                if !options
1236                    .iter()
1237                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1238                {
1239                    let now = catalog.now();
1240                    options.push(LoadGeneratorOption {
1241                        name: LoadGeneratorOptionName::AsOf,
1242                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1243                    });
1244                }
1245            }
1246        }
1247    }
1248
1249    // Now that we know which subsources to create alongside this
1250    // statement, remove the references so it is not canonicalized as
1251    // part of the `CREATE SOURCE` statement in the catalog.
1252    *external_references = None;
1253
1254    // Generate progress subsource for old syntax
1255    let create_progress_subsource_stmt = if uses_old_syntax {
1256        // Take name from input or generate name
1257        let name = match progress_subsource {
1258            Some(name) => match name {
1259                DeferredItemName::Deferred(name) => name.clone(),
1260                // Already checked for this value above.
1261                DeferredItemName::Named(_) => {
1262                    sql_bail!("progress subsource name cannot be a resolved name")
1263                }
1264            },
1265            None => {
1266                let (item, prefix) = source_name
1267                    .0
1268                    .split_last()
1269                    .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1270                let item_name =
1271                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1272                        let mut suggested_name = prefix.to_vec();
1273                        suggested_name.push(candidate.clone());
1274
1275                        let partial =
1276                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1277                        let qualified = scx.allocate_qualified_name(partial)?;
1278                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1279                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1280                        Ok::<_, PlanError>(!item_exists && !type_exists)
1281                    })?;
1282
1283                let mut full_name = prefix.to_vec();
1284                full_name.push(item_name);
1285                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1286                let qualified_name = scx.allocate_qualified_name(full_name)?;
1287                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1288
1289                UnresolvedItemName::from(full_name.clone())
1290            }
1291        };
1292
1293        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1294
1295        // Create the subsource statement
1296        let mut progress_with_options: Vec<_> = with_options
1297            .iter()
1298            .filter_map(|opt| match opt.name {
1299                CreateSourceOptionName::TimestampInterval => None,
1300                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1301                    name: CreateSubsourceOptionName::RetainHistory,
1302                    value: opt.value.clone(),
1303                }),
1304            })
1305            .collect();
1306        progress_with_options.push(CreateSubsourceOption {
1307            name: CreateSubsourceOptionName::Progress,
1308            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1309        });
1310
1311        Some(CreateSubsourceStatement {
1312            name,
1313            columns,
1314            // Progress subsources do not refer to the source to which they belong.
1315            // Instead the primary source depends on it (the opposite is true of
1316            // ingestion exports, which depend on the primary source).
1317            of_source: None,
1318            constraints,
1319            if_not_exists: false,
1320            with_options: progress_with_options,
1321        })
1322    } else {
1323        None
1324    };
1325
1326    purify_source_format(
1327        &catalog,
1328        format,
1329        &format_options,
1330        envelope,
1331        storage_configuration,
1332    )
1333    .await?;
1334
1335    Ok(PurifiedStatement::PurifiedCreateSource {
1336        create_progress_subsource_stmt,
1337        create_source_stmt,
1338        subsources: requested_subsource_map,
1339        available_source_references: retrieved_source_references.available_source_references(),
1340    })
1341}
1342
1343/// On success, returns the details on new subsources and updated
1344/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1345async fn purify_alter_source(
1346    catalog: impl SessionCatalog,
1347    stmt: AlterSourceStatement<Aug>,
1348    storage_configuration: &StorageConfiguration,
1349) -> Result<PurifiedStatement, PlanError> {
1350    let scx = StatementContext::new(None, &catalog);
1351    let AlterSourceStatement {
1352        source_name: unresolved_source_name,
1353        action,
1354        if_exists,
1355    } = stmt;
1356
1357    // Get name.
1358    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1359        Ok(item) => item,
1360        Err(_) if if_exists => {
1361            return Ok(PurifiedStatement::PurifiedAlterSource {
1362                alter_source_stmt: AlterSourceStatement {
1363                    source_name: unresolved_source_name,
1364                    action,
1365                    if_exists,
1366                },
1367            });
1368        }
1369        Err(e) => return Err(e),
1370    };
1371
1372    // Ensure it's an ingestion-based and alterable source.
1373    let desc = match item.source_desc()? {
1374        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1375        None => {
1376            sql_bail!("cannot ALTER this type of source")
1377        }
1378    };
1379
1380    let source_name = item.name();
1381
1382    let resolved_source_name = ResolvedItemName::Item {
1383        id: item.id(),
1384        qualifiers: item.name().qualifiers.clone(),
1385        full_name: scx.catalog.resolve_full_name(source_name),
1386        print_id: true,
1387        version: RelationVersionSelector::Latest,
1388    };
1389
1390    let partial_name = scx.catalog.minimal_qualification(source_name);
1391
1392    match action {
1393        AlterSourceAction::AddSubsources {
1394            external_references,
1395            options,
1396        } => {
1397            if scx.catalog.system_vars().enable_create_table_from_source()
1398                && scx.catalog.system_vars().force_source_table_syntax()
1399            {
1400                Err(PlanError::UseTablesForSources(
1401                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1402                ))?;
1403            }
1404
1405            purify_alter_source_add_subsources(
1406                external_references,
1407                options,
1408                desc,
1409                partial_name,
1410                unresolved_source_name,
1411                resolved_source_name,
1412                storage_configuration,
1413            )
1414            .await
1415        }
1416        AlterSourceAction::RefreshReferences => {
1417            purify_alter_source_refresh_references(
1418                desc,
1419                resolved_source_name,
1420                storage_configuration,
1421            )
1422            .await
1423        }
1424        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1425            alter_source_stmt: AlterSourceStatement {
1426                source_name: unresolved_source_name,
1427                action,
1428                if_exists,
1429            },
1430        }),
1431    }
1432}
1433
1434// TODO(database-issues#8620): Remove once subsources are removed
1435/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1436async fn purify_alter_source_add_subsources(
1437    external_references: Vec<ExternalReferenceExport>,
1438    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1439    desc: SourceDesc,
1440    partial_source_name: PartialItemName,
1441    unresolved_source_name: UnresolvedItemName,
1442    resolved_source_name: ResolvedItemName,
1443    storage_configuration: &StorageConfiguration,
1444) -> Result<PurifiedStatement, PlanError> {
1445    // Validate this is a source that can have subsources added.
1446    let connection_id = match &desc.connection {
1447        GenericSourceConnection::Postgres(c) => c.connection_id,
1448        GenericSourceConnection::MySql(c) => c.connection_id,
1449        GenericSourceConnection::SqlServer(c) => c.connection_id,
1450        _ => sql_bail!(
1451            "source {} does not support ALTER SOURCE.",
1452            partial_source_name
1453        ),
1454    };
1455
1456    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1457        text_columns,
1458        exclude_columns,
1459        details,
1460        seen: _,
1461    } = options.clone().try_into()?;
1462    if details.is_some() {
1463        sql_bail!("DETAILS option cannot be explicitly set");
1464    }
1465
1466    let mut requested_subsource_map = BTreeMap::new();
1467
1468    match desc.connection {
1469        GenericSourceConnection::Postgres(pg_source_connection) => {
1470            // Get PostgresConnection for generating subsources.
1471            let pg_connection = &pg_source_connection.connection;
1472
1473            let client = pg_connection
1474                .validate(connection_id, storage_configuration)
1475                .await?;
1476
1477            let reference_client = SourceReferenceClient::Postgres {
1478                client: &client,
1479                publication: &pg_source_connection.publication,
1480                database: &pg_connection.database,
1481            };
1482            let retrieved_source_references = reference_client.get_source_references().await?;
1483
1484            let postgres::PurifiedSourceExports {
1485                source_exports: subsources,
1486                normalized_text_columns,
1487            } = postgres::purify_source_exports(
1488                &client,
1489                &retrieved_source_references,
1490                &Some(ExternalReferences::SubsetTables(external_references)),
1491                text_columns,
1492                exclude_columns,
1493                &unresolved_source_name,
1494                &SourceReferencePolicy::Required,
1495            )
1496            .await?;
1497
1498            if let Some(text_cols_option) = options
1499                .iter_mut()
1500                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1501            {
1502                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1503            }
1504
1505            requested_subsource_map.extend(subsources);
1506        }
1507        GenericSourceConnection::MySql(mysql_source_connection) => {
1508            let mysql_connection = &mysql_source_connection.connection;
1509            let config = mysql_connection
1510                .config(
1511                    &storage_configuration.connection_context.secrets_reader,
1512                    storage_configuration,
1513                    InTask::No,
1514                )
1515                .await?;
1516
1517            let mut conn = config
1518                .connect(
1519                    "mysql purification",
1520                    &storage_configuration.connection_context.ssh_tunnel_manager,
1521                )
1522                .await?;
1523
1524            // Retrieve the current @gtid_executed value of the server to mark as the effective
1525            // initial snapshot point for these subsources.
1526            let initial_gtid_set =
1527                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1528
1529            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1530
1531            let reference_client = SourceReferenceClient::MySql {
1532                conn: &mut conn,
1533                include_system_schemas: mysql::references_system_schemas(&requested_references),
1534            };
1535            let retrieved_source_references = reference_client.get_source_references().await?;
1536
1537            let mysql::PurifiedSourceExports {
1538                source_exports: subsources,
1539                normalized_text_columns,
1540                normalized_exclude_columns,
1541            } = mysql::purify_source_exports(
1542                &mut conn,
1543                &retrieved_source_references,
1544                &requested_references,
1545                text_columns,
1546                exclude_columns,
1547                &unresolved_source_name,
1548                initial_gtid_set,
1549                &SourceReferencePolicy::Required,
1550            )
1551            .await?;
1552            requested_subsource_map.extend(subsources);
1553
1554            // Update options with the purified details
1555            if let Some(text_cols_option) = options
1556                .iter_mut()
1557                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1558            {
1559                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1560            }
1561            if let Some(exclude_cols_option) = options
1562                .iter_mut()
1563                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1564            {
1565                exclude_cols_option.value =
1566                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1567            }
1568        }
1569        GenericSourceConnection::SqlServer(sql_server_source) => {
1570            // Open a connection to the upstream SQL Server instance.
1571            let sql_server_connection = &sql_server_source.connection;
1572            let config = sql_server_connection
1573                .resolve_config(
1574                    &storage_configuration.connection_context.secrets_reader,
1575                    storage_configuration,
1576                    InTask::No,
1577                )
1578                .await?;
1579            let mut client = mz_sql_server_util::Client::connect(config).await?;
1580
1581            // Query the upstream SQL Server instance for available tables to replicate.
1582            let database = sql_server_connection.database.clone().into();
1583            let source_references = SourceReferenceClient::SqlServer {
1584                client: &mut client,
1585                database: Arc::clone(&database),
1586            }
1587            .get_source_references()
1588            .await?;
1589            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1590
1591            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1592                .get(storage_configuration.config_set());
1593
1594            let result = sql_server::purify_source_exports(
1595                &*database,
1596                &mut client,
1597                &source_references,
1598                &requested_references,
1599                &text_columns,
1600                &exclude_columns,
1601                &unresolved_source_name,
1602                timeout,
1603                &SourceReferencePolicy::Required,
1604            )
1605            .await;
1606            let sql_server::PurifiedSourceExports {
1607                source_exports,
1608                normalized_text_columns,
1609                normalized_excl_columns,
1610            } = result?;
1611
1612            // Add the new exports to our subsource map.
1613            requested_subsource_map.extend(source_exports);
1614
1615            // Update options on the CREATE SOURCE statement with the purified details.
1616            if let Some(text_cols_option) = options
1617                .iter_mut()
1618                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1619            {
1620                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1621            }
1622            if let Some(exclude_cols_option) = options
1623                .iter_mut()
1624                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1625            {
1626                exclude_cols_option.value =
1627                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1628            }
1629        }
1630        // The source kind was already established earlier in this function;
1631        // reaching this catch-all is an internal invariant violation.
1632        _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1633    };
1634
1635    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1636        source_name: resolved_source_name,
1637        options,
1638        subsources: requested_subsource_map,
1639    })
1640}
1641
1642async fn purify_alter_source_refresh_references(
1643    desc: SourceDesc,
1644    resolved_source_name: ResolvedItemName,
1645    storage_configuration: &StorageConfiguration,
1646) -> Result<PurifiedStatement, PlanError> {
1647    let retrieved_source_references = match desc.connection {
1648        GenericSourceConnection::Postgres(pg_source_connection) => {
1649            // Get PostgresConnection for generating subsources.
1650            let pg_connection = &pg_source_connection.connection;
1651
1652            let config = pg_connection
1653                .config(
1654                    &storage_configuration.connection_context.secrets_reader,
1655                    storage_configuration,
1656                    InTask::No,
1657                )
1658                .await?;
1659
1660            let client = config
1661                .connect(
1662                    "postgres_purification",
1663                    &storage_configuration.connection_context.ssh_tunnel_manager,
1664                )
1665                .await?;
1666            let reference_client = SourceReferenceClient::Postgres {
1667                client: &client,
1668                publication: &pg_source_connection.publication,
1669                database: &pg_connection.database,
1670            };
1671            reference_client.get_source_references().await?
1672        }
1673        GenericSourceConnection::MySql(mysql_source_connection) => {
1674            let mysql_connection = &mysql_source_connection.connection;
1675            let config = mysql_connection
1676                .config(
1677                    &storage_configuration.connection_context.secrets_reader,
1678                    storage_configuration,
1679                    InTask::No,
1680                )
1681                .await?;
1682
1683            let mut conn = config
1684                .connect(
1685                    "mysql purification",
1686                    &storage_configuration.connection_context.ssh_tunnel_manager,
1687                )
1688                .await?;
1689
1690            let reference_client = SourceReferenceClient::MySql {
1691                conn: &mut conn,
1692                include_system_schemas: false,
1693            };
1694            reference_client.get_source_references().await?
1695        }
1696        GenericSourceConnection::SqlServer(sql_server_source) => {
1697            // Open a connection to the upstream SQL Server instance.
1698            let sql_server_connection = &sql_server_source.connection;
1699            let config = sql_server_connection
1700                .resolve_config(
1701                    &storage_configuration.connection_context.secrets_reader,
1702                    storage_configuration,
1703                    InTask::No,
1704                )
1705                .await?;
1706            let mut client = mz_sql_server_util::Client::connect(config).await?;
1707
1708            // Query the upstream SQL Server instance for available tables to replicate.
1709            let source_references = SourceReferenceClient::SqlServer {
1710                client: &mut client,
1711                database: sql_server_connection.database.clone().into(),
1712            }
1713            .get_source_references()
1714            .await?;
1715            source_references
1716        }
1717        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1718            let reference_client = SourceReferenceClient::LoadGenerator {
1719                generator: &load_gen_connection.load_generator,
1720            };
1721            reference_client.get_source_references().await?
1722        }
1723        GenericSourceConnection::Kafka(kafka_conn) => {
1724            let reference_client = SourceReferenceClient::Kafka {
1725                topic: &kafka_conn.topic,
1726            };
1727            reference_client.get_source_references().await?
1728        }
1729    };
1730    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1731        source_name: resolved_source_name,
1732        available_source_references: retrieved_source_references.available_source_references(),
1733    })
1734}
1735
1736async fn purify_create_table_from_source(
1737    catalog: impl SessionCatalog,
1738    mut stmt: CreateTableFromSourceStatement<Aug>,
1739    storage_configuration: &StorageConfiguration,
1740) -> Result<PurifiedStatement, PlanError> {
1741    let scx = StatementContext::new(None, &catalog);
1742    let CreateTableFromSourceStatement {
1743        name: _,
1744        columns,
1745        constraints,
1746        source: source_name,
1747        if_not_exists: _,
1748        external_reference,
1749        format,
1750        envelope,
1751        include_metadata: _,
1752        with_options,
1753    } = &mut stmt;
1754
1755    // Columns and constraints cannot be specified by the user but will be populated below.
1756    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1757        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1758    }
1759    if !constraints.is_empty() {
1760        sql_bail!(
1761            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1762        );
1763    }
1764
1765    // Get the source item
1766    let item = match scx.get_item_by_resolved_name(source_name) {
1767        Ok(item) => item,
1768        Err(e) => return Err(e),
1769    };
1770
1771    // Ensure it's an ingestion-based and alterable source.
1772    let desc = match item.source_desc()? {
1773        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1774        None => {
1775            sql_bail!("cannot ALTER this type of source")
1776        }
1777    };
1778    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1779
1780    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1781        text_columns,
1782        exclude_columns,
1783        retain_history: _,
1784        details,
1785        partition_by: _,
1786        seen: _,
1787    } = with_options.clone().try_into()?;
1788    if details.is_some() {
1789        sql_bail!("DETAILS option cannot be explicitly set");
1790    }
1791
1792    // Our text column values are unqualified (just column names), but the purification methods below
1793    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1794    // need to qualify them using the external reference first.
1795    let qualified_text_columns = text_columns
1796        .iter()
1797        .map(|col| {
1798            UnresolvedItemName(
1799                external_reference
1800                    .as_ref()
1801                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1802                    .unwrap_or_else(|| vec![col.clone()]),
1803            )
1804        })
1805        .collect_vec();
1806    let qualified_exclude_columns = exclude_columns
1807        .iter()
1808        .map(|col| {
1809            UnresolvedItemName(
1810                external_reference
1811                    .as_ref()
1812                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1813                    .unwrap_or_else(|| vec![col.clone()]),
1814            )
1815        })
1816        .collect_vec();
1817
1818    // Should be overriden below if a source-specific format is required.
1819    let mut format_options = SourceFormatOptions::Default;
1820
1821    let retrieved_source_references: RetrievedSourceReferences;
1822
1823    let requested_references = external_reference.as_ref().map(|ref_name| {
1824        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1825            reference: ref_name.clone(),
1826            alias: None,
1827        }])
1828    });
1829
1830    // Run purification work specific to each source type: resolve the external reference to
1831    // a fully qualified name and obtain the appropriate details for the source-export statement
1832    let purified_export = match desc.connection {
1833        GenericSourceConnection::Postgres(pg_source_connection) => {
1834            // Get PostgresConnection for generating subsources.
1835            let pg_connection = &pg_source_connection.connection;
1836
1837            let client = pg_connection
1838                .validate(pg_source_connection.connection_id, storage_configuration)
1839                .await?;
1840
1841            let reference_client = SourceReferenceClient::Postgres {
1842                client: &client,
1843                publication: &pg_source_connection.publication,
1844                database: &pg_connection.database,
1845            };
1846            retrieved_source_references = reference_client.get_source_references().await?;
1847
1848            let postgres::PurifiedSourceExports {
1849                source_exports,
1850                // TODO(database-issues#8620): Remove once subsources are removed
1851                // This `normalized_text_columns` is not relevant for us and is only returned for
1852                // `CREATE SOURCE` statements that automatically generate subsources
1853                normalized_text_columns: _,
1854            } = postgres::purify_source_exports(
1855                &client,
1856                &retrieved_source_references,
1857                &requested_references,
1858                qualified_text_columns,
1859                qualified_exclude_columns,
1860                &unresolved_source_name,
1861                &SourceReferencePolicy::Required,
1862            )
1863            .await?;
1864            // There should be exactly one source_export returned for this statement
1865            let (_, purified_export) = source_exports.into_element();
1866            purified_export
1867        }
1868        GenericSourceConnection::MySql(mysql_source_connection) => {
1869            let mysql_connection = &mysql_source_connection.connection;
1870            let config = mysql_connection
1871                .config(
1872                    &storage_configuration.connection_context.secrets_reader,
1873                    storage_configuration,
1874                    InTask::No,
1875                )
1876                .await?;
1877
1878            let mut conn = config
1879                .connect(
1880                    "mysql purification",
1881                    &storage_configuration.connection_context.ssh_tunnel_manager,
1882                )
1883                .await?;
1884
1885            // Retrieve the current @gtid_executed value of the server to mark as the effective
1886            // initial snapshot point for this table.
1887            let initial_gtid_set =
1888                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1889
1890            let reference_client = SourceReferenceClient::MySql {
1891                conn: &mut conn,
1892                include_system_schemas: mysql::references_system_schemas(&requested_references),
1893            };
1894            retrieved_source_references = reference_client.get_source_references().await?;
1895
1896            let mysql::PurifiedSourceExports {
1897                source_exports,
1898                // TODO(database-issues#8620): Remove once subsources are removed
1899                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1900                // `CREATE SOURCE` statements that automatically generate subsources
1901                normalized_text_columns: _,
1902                normalized_exclude_columns: _,
1903            } = mysql::purify_source_exports(
1904                &mut conn,
1905                &retrieved_source_references,
1906                &requested_references,
1907                qualified_text_columns,
1908                qualified_exclude_columns,
1909                &unresolved_source_name,
1910                initial_gtid_set,
1911                &SourceReferencePolicy::Required,
1912            )
1913            .await?;
1914            // There should be exactly one source_export returned for this statement
1915            let (_, purified_export) = source_exports.into_element();
1916            purified_export
1917        }
1918        GenericSourceConnection::SqlServer(sql_server_source) => {
1919            let connection = sql_server_source.connection;
1920            let config = connection
1921                .resolve_config(
1922                    &storage_configuration.connection_context.secrets_reader,
1923                    storage_configuration,
1924                    InTask::No,
1925                )
1926                .await?;
1927            let mut client = mz_sql_server_util::Client::connect(config).await?;
1928
1929            let database: Arc<str> = connection.database.into();
1930            let reference_client = SourceReferenceClient::SqlServer {
1931                client: &mut client,
1932                database: Arc::clone(&database),
1933            };
1934            retrieved_source_references = reference_client.get_source_references().await?;
1935            tracing::debug!(?retrieved_source_references, "got source references");
1936
1937            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1938                .get(storage_configuration.config_set());
1939
1940            let purified_source_exports = sql_server::purify_source_exports(
1941                &*database,
1942                &mut client,
1943                &retrieved_source_references,
1944                &requested_references,
1945                &qualified_text_columns,
1946                &qualified_exclude_columns,
1947                &unresolved_source_name,
1948                timeout,
1949                &SourceReferencePolicy::Required,
1950            )
1951            .await?;
1952
1953            // There should be exactly one source_export returned for this statement
1954            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1955            purified_export
1956        }
1957        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1958            let reference_client = SourceReferenceClient::LoadGenerator {
1959                generator: &load_gen_connection.load_generator,
1960            };
1961            retrieved_source_references = reference_client.get_source_references().await?;
1962
1963            let requested_exports = retrieved_source_references
1964                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1965            // There should be exactly one source_export returned
1966            let export = requested_exports.into_element();
1967            PurifiedSourceExport {
1968                external_reference: export.external_reference,
1969                details: PurifiedExportDetails::LoadGenerator {
1970                    table: export
1971                        .meta
1972                        .load_generator_desc()
1973                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
1974                        .clone(),
1975                    output: export
1976                        .meta
1977                        .load_generator_output()
1978                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
1979                        .clone(),
1980                },
1981            }
1982        }
1983        GenericSourceConnection::Kafka(kafka_conn) => {
1984            let reference_client = SourceReferenceClient::Kafka {
1985                topic: &kafka_conn.topic,
1986            };
1987            retrieved_source_references = reference_client.get_source_references().await?;
1988            let requested_exports = retrieved_source_references
1989                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1990            // There should be exactly one source_export returned
1991            let export = requested_exports.into_element();
1992
1993            format_options = SourceFormatOptions::Kafka {
1994                topic: kafka_conn.topic.clone(),
1995            };
1996            PurifiedSourceExport {
1997                external_reference: export.external_reference,
1998                details: PurifiedExportDetails::Kafka {},
1999            }
2000        }
2001    };
2002
2003    purify_source_format(
2004        &catalog,
2005        format,
2006        &format_options,
2007        envelope,
2008        storage_configuration,
2009    )
2010    .await?;
2011
2012    // Update the external reference in the statement to the resolved fully-qualified
2013    // external reference
2014    *external_reference = Some(purified_export.external_reference.clone());
2015
2016    // Update options in the statement using the purified export details
2017    match &purified_export.details {
2018        PurifiedExportDetails::Postgres { .. } => {
2019            let mut unsupported_cols = vec![];
2020            let postgres::PostgresExportStatementValues {
2021                columns: gen_columns,
2022                constraints: gen_constraints,
2023                text_columns: gen_text_columns,
2024                exclude_columns: gen_exclude_columns,
2025                details: gen_details,
2026                external_reference: _,
2027            } = postgres::generate_source_export_statement_values(
2028                &scx,
2029                purified_export,
2030                &mut unsupported_cols,
2031            )?;
2032            if !unsupported_cols.is_empty() {
2033                unsupported_cols.sort();
2034                Err(PgSourcePurificationError::UnrecognizedTypes {
2035                    cols: unsupported_cols,
2036                })?;
2037            }
2038
2039            if let Some(text_cols_option) = with_options
2040                .iter_mut()
2041                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2042            {
2043                match gen_text_columns {
2044                    Some(gen_text_columns) => {
2045                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2046                    }
2047                    None => soft_panic_or_log!(
2048                        "text_columns should be Some if text_cols_option is present"
2049                    ),
2050                }
2051            }
2052            if let Some(exclude_cols_option) = with_options
2053                .iter_mut()
2054                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2055            {
2056                match gen_exclude_columns {
2057                    Some(gen_exclude_columns) => {
2058                        exclude_cols_option.value =
2059                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2060                    }
2061                    None => soft_panic_or_log!(
2062                        "exclude_columns should be Some if exclude_cols_option is present"
2063                    ),
2064                }
2065            }
2066            match columns {
2067                TableFromSourceColumns::Defined(_) => {
2068                    // Purification is what populates the `Defined` variant;
2069                    // reaching this is an internal invariant violation.
2070                    bail_internal!(
2071                        "column definitions cannot be explicitly set for this source type"
2072                    )
2073                }
2074                TableFromSourceColumns::NotSpecified => {
2075                    *columns = TableFromSourceColumns::Defined(gen_columns);
2076                    *constraints = gen_constraints;
2077                }
2078                TableFromSourceColumns::Named(_) => {
2079                    sql_bail!("columns cannot be named for Postgres sources")
2080                }
2081            }
2082            with_options.push(TableFromSourceOption {
2083                name: TableFromSourceOptionName::Details,
2084                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2085                    gen_details.into_proto().encode_to_vec(),
2086                )))),
2087            })
2088        }
2089        PurifiedExportDetails::MySql { .. } => {
2090            let mysql::MySqlExportStatementValues {
2091                columns: gen_columns,
2092                constraints: gen_constraints,
2093                text_columns: gen_text_columns,
2094                exclude_columns: gen_exclude_columns,
2095                details: gen_details,
2096                external_reference: _,
2097            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2098
2099            if let Some(text_cols_option) = with_options
2100                .iter_mut()
2101                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2102            {
2103                match gen_text_columns {
2104                    Some(gen_text_columns) => {
2105                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2106                    }
2107                    None => soft_panic_or_log!(
2108                        "text_columns should be Some if text_cols_option is present"
2109                    ),
2110                }
2111            }
2112            if let Some(exclude_cols_option) = with_options
2113                .iter_mut()
2114                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2115            {
2116                match gen_exclude_columns {
2117                    Some(gen_exclude_columns) => {
2118                        exclude_cols_option.value =
2119                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2120                    }
2121                    None => soft_panic_or_log!(
2122                        "exclude_columns should be Some if exclude_cols_option is present"
2123                    ),
2124                }
2125            }
2126            match columns {
2127                TableFromSourceColumns::Defined(_) => {
2128                    // Purification is what populates the `Defined` variant;
2129                    // reaching this is an internal invariant violation.
2130                    bail_internal!(
2131                        "column definitions cannot be explicitly set for this source type"
2132                    )
2133                }
2134                TableFromSourceColumns::NotSpecified => {
2135                    *columns = TableFromSourceColumns::Defined(gen_columns);
2136                    *constraints = gen_constraints;
2137                }
2138                TableFromSourceColumns::Named(_) => {
2139                    sql_bail!("columns cannot be named for MySQL sources")
2140                }
2141            }
2142            with_options.push(TableFromSourceOption {
2143                name: TableFromSourceOptionName::Details,
2144                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2145                    gen_details.into_proto().encode_to_vec(),
2146                )))),
2147            })
2148        }
2149        PurifiedExportDetails::SqlServer { .. } => {
2150            let sql_server::SqlServerExportStatementValues {
2151                columns: gen_columns,
2152                constraints: gen_constraints,
2153                text_columns: gen_text_columns,
2154                excl_columns: gen_excl_columns,
2155                details: gen_details,
2156                external_reference: _,
2157            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2158
2159            if let Some(text_cols_option) = with_options
2160                .iter_mut()
2161                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2162            {
2163                match gen_text_columns {
2164                    Some(gen_text_columns) => {
2165                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2166                    }
2167                    None => soft_panic_or_log!(
2168                        "text_columns should be Some if text_cols_option is present"
2169                    ),
2170                }
2171            }
2172            if let Some(exclude_cols_option) = with_options
2173                .iter_mut()
2174                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2175            {
2176                match gen_excl_columns {
2177                    Some(gen_excl_columns) => {
2178                        exclude_cols_option.value =
2179                            Some(WithOptionValue::Sequence(gen_excl_columns))
2180                    }
2181                    None => soft_panic_or_log!(
2182                        "excl_columns should be Some if excl_cols_option is present"
2183                    ),
2184                }
2185            }
2186
2187            match columns {
2188                TableFromSourceColumns::NotSpecified => {
2189                    *columns = TableFromSourceColumns::Defined(gen_columns);
2190                    *constraints = gen_constraints;
2191                }
2192                TableFromSourceColumns::Named(_) => {
2193                    sql_bail!("columns cannot be named for SQL Server sources")
2194                }
2195                TableFromSourceColumns::Defined(_) => {
2196                    // Purification is what populates the `Defined` variant;
2197                    // reaching this is an internal invariant violation.
2198                    bail_internal!(
2199                        "column definitions cannot be explicitly set for this source type"
2200                    )
2201                }
2202            }
2203
2204            with_options.push(TableFromSourceOption {
2205                name: TableFromSourceOptionName::Details,
2206                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2207                    gen_details.into_proto().encode_to_vec(),
2208                )))),
2209            })
2210        }
2211        PurifiedExportDetails::LoadGenerator { .. } => {
2212            let (desc, output) = match purified_export.details {
2213                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2214                _ => bail_internal!("purified export details must be load generator"),
2215            };
2216            // We only determine the table description for multi-output load generator sources here,
2217            // whereas single-output load generators will have their relation description
2218            // determined during statment planning as envelope and format options may affect their
2219            // schema.
2220            if let Some(desc) = desc {
2221                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2222                match columns {
2223                    // Purification is what populates the `Defined` variant;
2224                    // reaching this is an internal invariant violation.
2225                    TableFromSourceColumns::Defined(_) => bail_internal!(
2226                        "column definitions cannot be explicitly set for this source type"
2227                    ),
2228                    TableFromSourceColumns::NotSpecified => {
2229                        *columns = TableFromSourceColumns::Defined(gen_columns);
2230                        *constraints = gen_constraints;
2231                    }
2232                    TableFromSourceColumns::Named(_) => {
2233                        sql_bail!("columns cannot be named for multi-output load generator sources")
2234                    }
2235                }
2236            }
2237            let details = SourceExportStatementDetails::LoadGenerator { output };
2238            with_options.push(TableFromSourceOption {
2239                name: TableFromSourceOptionName::Details,
2240                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2241                    details.into_proto().encode_to_vec(),
2242                )))),
2243            })
2244        }
2245        PurifiedExportDetails::Kafka {} => {
2246            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2247            // format field, so we don't specify any columns or constraints to be stored
2248            // on the statement here. The RelationDesc will be determined during planning.
2249            let details = SourceExportStatementDetails::Kafka {};
2250            with_options.push(TableFromSourceOption {
2251                name: TableFromSourceOptionName::Details,
2252                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2253                    details.into_proto().encode_to_vec(),
2254                )))),
2255            })
2256        }
2257    };
2258
2259    // TODO: We might as well use the retrieved available references to update the source
2260    // available references table in the catalog, so plumb this through.
2261    // available_source_references: retrieved_source_references.available_source_references(),
2262    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2263}
2264
2265enum SourceFormatOptions {
2266    Default,
2267    Kafka { topic: String },
2268}
2269
2270async fn purify_source_format(
2271    catalog: &dyn SessionCatalog,
2272    format: &mut Option<FormatSpecifier<Aug>>,
2273    options: &SourceFormatOptions,
2274    envelope: &Option<SourceEnvelope>,
2275    storage_configuration: &StorageConfiguration,
2276) -> Result<(), PlanError> {
2277    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2278        && !matches!(options, SourceFormatOptions::Kafka { .. })
2279    {
2280        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2281    }
2282
2283    match format.as_mut() {
2284        None => {}
2285        Some(FormatSpecifier::Bare(format)) => {
2286            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2287                .await?;
2288        }
2289
2290        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2291            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2292                .await?;
2293            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2294                .await?;
2295        }
2296    }
2297    Ok(())
2298}
2299
2300async fn purify_source_format_single(
2301    catalog: &dyn SessionCatalog,
2302    format: &mut Format<Aug>,
2303    options: &SourceFormatOptions,
2304    envelope: &Option<SourceEnvelope>,
2305    storage_configuration: &StorageConfiguration,
2306) -> Result<(), PlanError> {
2307    match format {
2308        Format::Avro(schema) => match schema {
2309            AvroSchema::Csr { csr_connection } => {
2310                purify_csr_connection_avro(
2311                    catalog,
2312                    options,
2313                    csr_connection,
2314                    envelope,
2315                    storage_configuration,
2316                )
2317                .await?
2318            }
2319            AvroSchema::InlineSchema { .. } => {}
2320        },
2321        Format::Protobuf(schema) => match schema {
2322            ProtobufSchema::Csr { csr_connection } => {
2323                purify_csr_connection_proto(
2324                    catalog,
2325                    options,
2326                    csr_connection,
2327                    envelope,
2328                    storage_configuration,
2329                )
2330                .await?;
2331            }
2332            ProtobufSchema::InlineSchema { .. } => {}
2333        },
2334        Format::Bytes
2335        | Format::Regex(_)
2336        | Format::Json { .. }
2337        | Format::Text
2338        | Format::Csv { .. } => (),
2339    }
2340    Ok(())
2341}
2342
2343pub fn generate_subsource_statements(
2344    scx: &StatementContext,
2345    source_name: ResolvedItemName,
2346    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2347) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2348    // get the first subsource to determine the connection type
2349    if subsources.is_empty() {
2350        return Ok(vec![]);
2351    }
2352    let (_, purified_export) = subsources
2353        .iter()
2354        .next()
2355        .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2356
2357    let statements = match &purified_export.details {
2358        PurifiedExportDetails::Postgres { .. } => {
2359            crate::pure::postgres::generate_create_subsource_statements(
2360                scx,
2361                source_name,
2362                subsources,
2363            )?
2364        }
2365        PurifiedExportDetails::MySql { .. } => {
2366            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2367        }
2368        PurifiedExportDetails::SqlServer { .. } => {
2369            crate::pure::sql_server::generate_create_subsource_statements(
2370                scx,
2371                source_name,
2372                subsources,
2373            )?
2374        }
2375        PurifiedExportDetails::LoadGenerator { .. } => {
2376            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2377            for (subsource_name, purified_export) in subsources {
2378                let (desc, output) = match purified_export.details {
2379                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2380                    _ => {
2381                        bail_internal!("purified export details must be load generator")
2382                    }
2383                };
2384                let desc = desc.ok_or_else(|| {
2385                    internal_err!(
2386                        "subsources cannot be generated for single-output load generators"
2387                    )
2388                })?;
2389
2390                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2391                let details = SourceExportStatementDetails::LoadGenerator { output };
2392                // Create the subsource statement
2393                let subsource = CreateSubsourceStatement {
2394                    name: subsource_name,
2395                    columns,
2396                    of_source: Some(source_name.clone()),
2397                    // unlike sources that come from an external upstream, we
2398                    // have more leniency to introduce different constraints
2399                    // every time the load generator is run; i.e. we are not as
2400                    // worried about introducing junk data.
2401                    constraints: table_constraints,
2402                    if_not_exists: false,
2403                    with_options: vec![
2404                        CreateSubsourceOption {
2405                            name: CreateSubsourceOptionName::ExternalReference,
2406                            value: Some(WithOptionValue::UnresolvedItemName(
2407                                purified_export.external_reference,
2408                            )),
2409                        },
2410                        CreateSubsourceOption {
2411                            name: CreateSubsourceOptionName::Details,
2412                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2413                                details.into_proto().encode_to_vec(),
2414                            )))),
2415                        },
2416                    ],
2417                };
2418                subsource_stmts.push(subsource);
2419            }
2420
2421            subsource_stmts
2422        }
2423        PurifiedExportDetails::Kafka { .. } => {
2424            // TODO: as part of database-issues#8322, Kafka sources will begin
2425            // producing data––we'll need to understand the schema
2426            // of the output here.
2427            if !subsources.is_empty() {
2428                bail_internal!("Kafka sources do not produce data-bearing subsources");
2429            }
2430            vec![]
2431        }
2432    };
2433    Ok(statements)
2434}
2435
2436async fn purify_csr_connection_proto(
2437    catalog: &dyn SessionCatalog,
2438    options: &SourceFormatOptions,
2439    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2440    envelope: &Option<SourceEnvelope>,
2441    storage_configuration: &StorageConfiguration,
2442) -> Result<(), PlanError> {
2443    let SourceFormatOptions::Kafka { topic } = options else {
2444        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2445    };
2446
2447    let CsrConnectionProtobuf {
2448        seed,
2449        connection: CsrConnection {
2450            connection,
2451            options: _,
2452        },
2453    } = csr_connection;
2454    match seed {
2455        None => {
2456            let scx = StatementContext::new(None, &*catalog);
2457
2458            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2459                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2460                _ => sql_bail!("{} is not a schema registry connection", connection),
2461            };
2462
2463            let ccsr_client = ccsr_connection
2464                .connect(storage_configuration, InTask::No)
2465                .await
2466                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2467
2468            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2469            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2470                .await
2471                .ok();
2472
2473            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2474                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2475            }
2476
2477            *seed = Some(CsrSeedProtobuf { value, key });
2478        }
2479        Some(_) => (),
2480    }
2481
2482    Ok(())
2483}
2484
2485async fn purify_csr_connection_avro(
2486    catalog: &dyn SessionCatalog,
2487    options: &SourceFormatOptions,
2488    csr_connection: &mut CsrConnectionAvro<Aug>,
2489    envelope: &Option<SourceEnvelope>,
2490    storage_configuration: &StorageConfiguration,
2491) -> Result<(), PlanError> {
2492    let SourceFormatOptions::Kafka { topic } = options else {
2493        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2494    };
2495
2496    let CsrConnectionAvro {
2497        connection: CsrConnection { connection, .. },
2498        seed,
2499        key_strategy,
2500        value_strategy,
2501    } = csr_connection;
2502    if seed.is_none() {
2503        let scx = StatementContext::new(None, &*catalog);
2504        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2505            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2506            _ => sql_bail!("{} is not a schema registry connection", connection),
2507        };
2508        let ccsr_client = csr_connection
2509            .connect(storage_configuration, InTask::No)
2510            .await
2511            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2512
2513        let Schema {
2514            key_schema,
2515            value_schema,
2516            key_reference_schemas,
2517            value_reference_schemas,
2518        } = get_remote_csr_schema(
2519            &ccsr_client,
2520            key_strategy.clone().unwrap_or_default(),
2521            value_strategy.clone().unwrap_or_default(),
2522            topic,
2523        )
2524        .await?;
2525        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2526            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2527        }
2528
2529        *seed = Some(CsrSeedAvro {
2530            key_schema,
2531            value_schema,
2532            key_reference_schemas,
2533            value_reference_schemas,
2534        })
2535    }
2536
2537    Ok(())
2538}
2539
2540#[derive(Debug)]
2541pub struct Schema {
2542    pub key_schema: Option<String>,
2543    pub value_schema: String,
2544    /// Reference schemas for the key schema, in dependency order.
2545    pub key_reference_schemas: Vec<String>,
2546    /// Reference schemas for the value schema, in dependency order.
2547    pub value_reference_schemas: Vec<String>,
2548}
2549
2550/// Result of fetching a schema, including any referenced schemas.
2551struct SchemaWithReferences {
2552    /// The primary schema.
2553    schema: String,
2554    /// Reference schemas in dependency order.
2555    references: Vec<String>,
2556}
2557
2558async fn get_schema_with_strategy(
2559    client: &Client,
2560    strategy: ReaderSchemaSelectionStrategy,
2561    subject: &str,
2562) -> Result<Option<SchemaWithReferences>, PlanError> {
2563    match strategy {
2564        ReaderSchemaSelectionStrategy::Latest => {
2565            // Use get_subject_and_references to also fetch referenced schemas
2566            match client.get_subject_and_references(subject).await {
2567                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2568                    schema: primary.schema.raw,
2569                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2570                })),
2571                Err(GetBySubjectError::SubjectNotFound)
2572                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2573                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2574                    schema_lookup: format!("subject {}", subject.quoted()),
2575                    cause: Arc::new(e),
2576                }),
2577            }
2578        }
2579        // It's possible that CSR was provided with an inline strategy, but without a subject
2580        // or schema id to look up, there isn't a clean way to lookup references for the schema.
2581        // This could be done with extra work, but at this point, it isn't clear this is needed.
2582        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2583            schema: raw,
2584            references: vec![],
2585        })),
2586        ReaderSchemaSelectionStrategy::ById(id) => {
2587            match client.get_subject_and_references_by_id(id).await {
2588                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2589                    schema: primary.schema.raw,
2590                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2591                })),
2592                Err(GetBySubjectError::SubjectNotFound)
2593                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2594                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2595                    schema_lookup: format!("subject {}", subject.quoted()),
2596                    cause: Arc::new(e),
2597                }),
2598            }
2599        }
2600    }
2601}
2602
2603async fn get_remote_csr_schema(
2604    ccsr_client: &mz_ccsr::Client,
2605    key_strategy: ReaderSchemaSelectionStrategy,
2606    value_strategy: ReaderSchemaSelectionStrategy,
2607    topic: &str,
2608) -> Result<Schema, PlanError> {
2609    let value_schema_name = format!("{}-value", topic);
2610    let value_result =
2611        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2612    let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2613
2614    let key_subject = format!("{}-key", topic);
2615    let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2616    Ok(Schema {
2617        key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2618        value_schema: value_result.schema,
2619        key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2620        value_reference_schemas: value_result.references,
2621    })
2622}
2623
2624/// Collect protobuf message descriptor from CSR and compile the descriptor.
2625async fn compile_proto(
2626    subject_name: &String,
2627    ccsr_client: &Client,
2628) -> Result<CsrSeedProtobufSchema, PlanError> {
2629    let (primary_subject, dependency_subjects) = ccsr_client
2630        .get_subject_and_references(subject_name)
2631        .await
2632        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2633            schema_lookup: format!("subject {}", subject_name.quoted()),
2634            cause: Arc::new(e),
2635        })?;
2636
2637    // Compile .proto files into a file descriptor set.
2638    let mut source_tree = VirtualSourceTree::new();
2639
2640    // Add well-known types (e.g., google/protobuf/timestamp.proto) to the source
2641    // tree. These are implicitly available to protoc but are typically not
2642    // registered in the schema registry.
2643    source_tree.as_mut().map_well_known_types();
2644
2645    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2646        source_tree.as_mut().add_file(
2647            Path::new(&subject.name),
2648            subject.schema.raw.as_bytes().to_vec(),
2649        );
2650    }
2651    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2652    let fds = db
2653        .as_mut()
2654        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2655        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2656
2657    // Ensure there is exactly one message in the file.
2658    let primary_fd = fds.file(0);
2659    let message_name = match primary_fd.message_type_size() {
2660        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2661        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2662        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2663    };
2664
2665    // Encode the file descriptor set into a SQL byte string.
2666    let bytes = &fds
2667        .serialize()
2668        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2669    let mut schema = String::new();
2670    strconv::format_bytes(&mut schema, bytes);
2671
2672    Ok(CsrSeedProtobufSchema {
2673        schema,
2674        message_name,
2675    })
2676}
2677
2678const MZ_NOW_NAME: &str = "mz_now";
2679const MZ_NOW_SCHEMA: &str = "mz_catalog";
2680
2681/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2682/// references to ids appear or disappear during the purification.
2683///
2684/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2685/// this function is not making any network calls.
2686pub fn purify_create_materialized_view_options(
2687    catalog: impl SessionCatalog,
2688    mz_now: Option<Timestamp>,
2689    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2690    resolved_ids: &mut ResolvedIds,
2691) {
2692    // 0. Preparations:
2693    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2694    let (mz_now_id, mz_now_expr) = {
2695        let item = catalog
2696            .resolve_function(&PartialItemName {
2697                database: None,
2698                schema: Some(MZ_NOW_SCHEMA.to_string()),
2699                item: MZ_NOW_NAME.to_string(),
2700            })
2701            .expect("we should be able to resolve mz_now");
2702        (
2703            item.id(),
2704            Expr::Function(Function {
2705                name: ResolvedItemName::Item {
2706                    id: item.id(),
2707                    qualifiers: item.name().qualifiers.clone(),
2708                    full_name: catalog.resolve_full_name(item.name()),
2709                    print_id: false,
2710                    version: RelationVersionSelector::Latest,
2711                },
2712                args: FunctionArgs::Args {
2713                    args: Vec::new(),
2714                    order_by: Vec::new(),
2715                },
2716                filter: None,
2717                over: None,
2718                distinct: false,
2719            }),
2720        )
2721    };
2722    // Prepare the `mz_timestamp` type.
2723    let (mz_timestamp_id, mz_timestamp_type) = {
2724        let item = catalog.get_system_type("mz_timestamp");
2725        let full_name = catalog.resolve_full_name(item.name());
2726        (
2727            item.id(),
2728            ResolvedDataType::Named {
2729                id: item.id(),
2730                qualifiers: item.name().qualifiers.clone(),
2731                full_name,
2732                modifiers: vec![],
2733                print_id: true,
2734            },
2735        )
2736    };
2737
2738    let mut introduced_mz_timestamp = false;
2739
2740    for option in cmvs.with_options.iter_mut() {
2741        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2742        if matches!(
2743            option.value,
2744            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2745        ) {
2746            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2747                RefreshAtOptionValue {
2748                    time: mz_now_expr.clone(),
2749                },
2750            )));
2751        }
2752
2753        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2754        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2755            RefreshEveryOptionValue { aligned_to, .. },
2756        ))) = &mut option.value
2757        {
2758            if aligned_to.is_none() {
2759                *aligned_to = Some(mz_now_expr.clone());
2760            }
2761        }
2762
2763        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2764        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2765        match &mut option.value {
2766            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2767                time,
2768            }))) => {
2769                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2770                visitor.visit_expr_mut(time);
2771                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2772            }
2773            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2774                RefreshEveryOptionValue {
2775                    interval: _,
2776                    aligned_to: Some(aligned_to),
2777                },
2778            ))) => {
2779                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2780                visitor.visit_expr_mut(aligned_to);
2781                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2782            }
2783            _ => {}
2784        }
2785    }
2786
2787    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2788    if !cmvs.with_options.iter().any(|o| {
2789        matches!(
2790            o,
2791            MaterializedViewOption {
2792                value: Some(WithOptionValue::Refresh(..)),
2793                ..
2794            }
2795        )
2796    }) {
2797        cmvs.with_options.push(MaterializedViewOption {
2798            name: MaterializedViewOptionName::Refresh,
2799            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2800        })
2801    }
2802
2803    // 5. Attend to `resolved_ids`: The purification might have
2804    // - added references to `mz_timestamp`;
2805    // - removed references to `mz_now`.
2806    if introduced_mz_timestamp {
2807        resolved_ids.add_item(mz_timestamp_id);
2808    }
2809    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2810    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2811    // for `mz_now()` everywhere.
2812    let mut visitor = ExprContainsTemporalVisitor::new();
2813    visitor.visit_create_materialized_view_statement(cmvs);
2814    if !visitor.contains_temporal {
2815        resolved_ids.remove_item(&mz_now_id);
2816    }
2817}
2818
2819/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2820/// after purification.
2821pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2822    match &mvo.value {
2823        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2824            let mut visitor = ExprContainsTemporalVisitor::new();
2825            visitor.visit_expr(time);
2826            visitor.contains_temporal
2827        }
2828        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2829            interval: _,
2830            aligned_to: Some(aligned_to),
2831        }))) => {
2832            let mut visitor = ExprContainsTemporalVisitor::new();
2833            visitor.visit_expr(aligned_to);
2834            visitor.contains_temporal
2835        }
2836        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2837            interval: _,
2838            aligned_to: None,
2839        }))) => {
2840            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2841            // `ALIGNED TO` to `mz_now()`.
2842            true
2843        }
2844        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2845            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2846            true
2847        }
2848        _ => false,
2849    }
2850}
2851
2852/// Determines whether the AST involves `mz_now()`.
2853struct ExprContainsTemporalVisitor {
2854    pub contains_temporal: bool,
2855}
2856
2857impl ExprContainsTemporalVisitor {
2858    pub fn new() -> ExprContainsTemporalVisitor {
2859        ExprContainsTemporalVisitor {
2860            contains_temporal: false,
2861        }
2862    }
2863}
2864
2865impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2866    fn visit_function(&mut self, func: &Function<Aug>) {
2867        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2868        visit_function(self, func);
2869    }
2870}
2871
2872struct MzNowPurifierVisitor {
2873    pub mz_now: Option<Timestamp>,
2874    pub mz_timestamp_type: ResolvedDataType,
2875    pub introduced_mz_timestamp: bool,
2876}
2877
2878impl MzNowPurifierVisitor {
2879    pub fn new(
2880        mz_now: Option<Timestamp>,
2881        mz_timestamp_type: ResolvedDataType,
2882    ) -> MzNowPurifierVisitor {
2883        MzNowPurifierVisitor {
2884            mz_now,
2885            mz_timestamp_type,
2886            introduced_mz_timestamp: false,
2887        }
2888    }
2889}
2890
2891impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2892    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2893        match expr {
2894            Expr::Function(Function {
2895                name:
2896                    ResolvedItemName::Item {
2897                        full_name: FullItemName { item, .. },
2898                        ..
2899                    },
2900                ..
2901            }) if item == &MZ_NOW_NAME.to_string() => {
2902                let mz_now = self.mz_now.expect(
2903                    "we should have chosen a timestamp if the expression contains mz_now()",
2904                );
2905                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2906                // not alter the type of the expression.
2907                *expr = Expr::Cast {
2908                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2909                    data_type: self.mz_timestamp_type.clone(),
2910                };
2911                self.introduced_mz_timestamp = true;
2912            }
2913            _ => visit_expr_mut(self, expr),
2914        }
2915    }
2916}