Skip to main content

mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::str::StrExt;
33use mz_postgres_util::desc::PostgresTableDesc;
34use mz_proto::RustType;
35use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
36use mz_sql_parser::ast::display::AstDisplay;
37use mz_sql_parser::ast::visit::{Visit, visit_function};
38use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
39use mz_sql_parser::ast::{
40    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
41    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
42    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
43    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
44    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
45    DocOnSchema, Expr, Function, FunctionArgs, GlueAvroOption, GlueAvroSeed, Ident,
46    KafkaSourceConfigOption, KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption,
47    LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
48    MySqlConfigOptionName, PgConfigOption, PgConfigOptionName, RawItemName,
49    ReaderSchemaSelectionStrategy, RefreshAtOptionValue, RefreshEveryOptionValue,
50    RefreshOptionValue, SourceEnvelope, SqlServerConfigOption, SqlServerConfigOptionName,
51    Statement, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
52    UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81    ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::pure::mysql::{ensure_binlog_full_metadata, is_binlog_full_metadata};
88use crate::{kafka_util, normalize};
89
90use self::error::{
91    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103    external_reference: UnresolvedItemName,
104    name: UnresolvedItemName,
105    meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        f.debug_struct("RequestedSourceExport")
111            .field("external_reference", &self.external_reference)
112            .field("name", &self.name)
113            .field("meta", &self.meta)
114            .finish()
115    }
116}
117
118impl<T> RequestedSourceExport<T> {
119    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120        RequestedSourceExport {
121            external_reference: self.external_reference,
122            name: self.name,
123            meta: new_meta,
124        }
125    }
126}
127
128/// Generates a subsource name by prepending source schema name if present
129///
130/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
131/// so that it's generated in the same schema as source
132fn source_export_name_gen(
133    source_name: &UnresolvedItemName,
134    subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137    partial.item = subsource_name.to_string();
138    Ok(UnresolvedItemName::from(partial))
139}
140
141// TODO(database-issues#8620): Remove once subsources are removed
142/// Validates the requested subsources do not have name conflicts with each other
143/// and that the same upstream table is not referenced multiple times.
144fn validate_source_export_names<T>(
145    requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147    // This condition would get caught during the catalog transaction, but produces a
148    // vague, non-contextual error. Instead, error here so we can suggest to the user
149    // how to fix the problem.
150    if let Some(name) = requested_source_exports
151        .iter()
152        .map(|subsource| &subsource.name)
153        .duplicates()
154        .next()
155        .cloned()
156    {
157        let mut upstream_references: Vec<_> = requested_source_exports
158            .into_iter()
159            .filter_map(|subsource| {
160                if &subsource.name == &name {
161                    Some(subsource.external_reference.clone())
162                } else {
163                    None
164                }
165            })
166            .collect();
167
168        upstream_references.sort();
169
170        Err(PlanError::SubsourceNameConflict {
171            name,
172            upstream_references,
173        })?;
174    }
175
176    // We disallow subsource statements from referencing the same upstream table, but we allow
177    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
178    // purification will only provide 1 `requested_source_export`, we can leave this here without
179    // needing to differentiate between the two types of statements.
180    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
181    if let Some(name) = requested_source_exports
182        .iter()
183        .map(|export| &export.external_reference)
184        .duplicates()
185        .next()
186        .cloned()
187    {
188        let mut target_names: Vec<_> = requested_source_exports
189            .into_iter()
190            .filter_map(|export| {
191                if &export.external_reference == &name {
192                    Some(export.name.clone())
193                } else {
194                    None
195                }
196            })
197            .collect();
198
199        target_names.sort();
200
201        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202    }
203
204    Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209    PurifiedCreateSource {
210        // The progress subsource, if we are offloading progress info to a separate relation
211        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212        create_source_stmt: CreateSourceStatement<Aug>,
213        // Map of subsource names to external details
214        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215        /// All the available upstream references that can be added as tables
216        /// to this primary source.
217        available_source_references: SourceReferences,
218    },
219    PurifiedAlterSource {
220        alter_source_stmt: AlterSourceStatement<Aug>,
221    },
222    PurifiedAlterSourceAddSubsources {
223        // This just saves us an annoying catalog lookup
224        source_name: ResolvedItemName,
225        /// Options that we will need the values of to update the source's
226        /// definition.
227        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228        // Map of subsource names to external details
229        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230    },
231    PurifiedAlterSourceRefreshReferences {
232        source_name: ResolvedItemName,
233        /// The updated available upstream references for the primary source.
234        available_source_references: SourceReferences,
235    },
236    PurifiedCreateSink(CreateSinkStatement<Aug>),
237    PurifiedCreateTableFromSource {
238        stmt: CreateTableFromSourceStatement<Aug>,
239    },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244    pub external_reference: UnresolvedItemName,
245    pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250    MySql {
251        table: MySqlTableDesc,
252        text_columns: Option<Vec<Ident>>,
253        exclude_columns: Option<Vec<Ident>>,
254        initial_gtid_set: String,
255        binlog_full_metadata: bool,
256    },
257    Postgres {
258        table: PostgresTableDesc,
259        text_columns: Option<Vec<Ident>>,
260        exclude_columns: Option<Vec<Ident>>,
261    },
262    SqlServer {
263        table: SqlServerTableDesc,
264        text_columns: Option<Vec<Ident>>,
265        excl_columns: Option<Vec<Ident>>,
266        capture_instance: Arc<str>,
267        initial_lsn: mz_sql_server_util::cdc::Lsn,
268    },
269    Kafka {},
270    LoadGenerator {
271        table: Option<RelationDesc>,
272        output: LoadGeneratorOutput,
273    },
274}
275
276/// Purifies a statement, removing any dependencies on external state.
277///
278/// See the section on [purification](crate#purification) in the crate
279/// documentation for details.
280///
281/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
282/// handled by [purify_create_materialized_view_options] instead.
283/// This could be made more consistent by a refactoring discussed here:
284/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
285pub async fn purify_statement(
286    catalog: impl SessionCatalog,
287    now: u64,
288    stmt: Statement<Aug>,
289    storage_configuration: &StorageConfiguration,
290) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
291    match stmt {
292        Statement::CreateSource(stmt) => {
293            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
294            (
295                purify_create_source(catalog, now, stmt, storage_configuration).await,
296                cluster_id,
297            )
298        }
299        Statement::AlterSource(stmt) => (
300            purify_alter_source(catalog, stmt, storage_configuration).await,
301            None,
302        ),
303        Statement::CreateSink(stmt) => {
304            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
305            (
306                purify_create_sink(catalog, stmt, storage_configuration).await,
307                cluster_id,
308            )
309        }
310        Statement::CreateTableFromSource(stmt) => (
311            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
312            None,
313        ),
314        o => (
315            Err(internal_err!(
316                "unexpected statement type in purification: {:?}",
317                o
318            )),
319            None,
320        ),
321    }
322}
323
324/// Injects `DOC ON` comments into all Avro formats that are using a schema
325/// registry by finding all SQL `COMMENT`s that are attached to the sink's
326/// underlying materialized view (as well as any types referenced by that
327/// underlying materialized view).
328pub(crate) fn purify_create_sink_avro_doc_on_options(
329    catalog: &dyn SessionCatalog,
330    from_id: CatalogItemId,
331    format: &mut Option<FormatSpecifier<Aug>>,
332) -> Result<(), PlanError> {
333    // Collect all objects referenced by the sink.
334    let from = catalog.get_item(&from_id);
335    let object_ids = from
336        .references()
337        .items()
338        .copied()
339        .chain_one(from.id())
340        .collect::<Vec<_>>();
341
342    // Collect all Avro formats that use a schema registry, as well as a set of
343    // all identifiers named in user-provided `DOC ON` options.
344    let mut avro_format_options = vec![];
345    for_each_format(format, |doc_on_schema, fmt| match fmt {
346        Format::Avro(AvroSchema::InlineSchema { .. })
347        | Format::Avro(AvroSchema::Glue { .. })
348        | Format::Bytes
349        | Format::Csv { .. }
350        | Format::Json { .. }
351        | Format::Protobuf(..)
352        | Format::Regex(..)
353        | Format::Text => (),
354        Format::Avro(AvroSchema::Csr {
355            csr_connection: CsrConnectionAvro { connection, .. },
356        }) => {
357            avro_format_options.push((doc_on_schema, &mut connection.options));
358        }
359    });
360
361    // For each Avro format in the sink, inject the appropriate `DOC ON` options
362    // for each item referenced by the sink.
363    for (for_schema, options) in avro_format_options {
364        let user_provided_comments = options
365            .iter()
366            .filter_map(|CsrConfigOption { name, .. }| match name {
367                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
368                _ => None,
369            })
370            .collect::<BTreeSet<_>>();
371
372        // Adding existing comments if not already provided by user
373        for object_id in &object_ids {
374            // Always add comments to the latest version of the item.
375            let item = catalog
376                .get_item(object_id)
377                .at_version(RelationVersionSelector::Latest);
378            let full_resolved_name = ResolvedItemName::Item {
379                id: *object_id,
380                qualifiers: item.name().qualifiers.clone(),
381                full_name: catalog.resolve_full_name(item.name()),
382                print_id: !matches!(
383                    item.item_type(),
384                    CatalogItemType::Func | CatalogItemType::Type
385                ),
386                version: RelationVersionSelector::Latest,
387            };
388
389            if let Some(comments_map) = catalog.get_item_comments(object_id) {
390                // Attach comment for the item itself, if the user has not
391                // already provided an overriding `DOC ON` option for the item.
392                let doc_on_item_key = AvroDocOn {
393                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
394                    for_schema,
395                };
396                if !user_provided_comments.contains(&doc_on_item_key) {
397                    if let Some(root_comment) = comments_map.get(&None) {
398                        options.push(CsrConfigOption {
399                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
400                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
401                                root_comment.clone(),
402                            ))),
403                        });
404                    }
405                }
406
407                // Attach comment for each column in the item, if the user has
408                // not already provided an overriding `DOC ON` option for the
409                // column.
410                let column_descs = match item.type_details() {
411                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
412                    None => item.relation_desc().map(|d| d.into_owned()),
413                };
414
415                if let Some(desc) = column_descs {
416                    for (pos, column_name) in desc.iter_names().enumerate() {
417                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
418                            let doc_on_column_key = AvroDocOn {
419                                identifier: DocOnIdentifier::Column(ColumnName {
420                                    relation: full_resolved_name.clone(),
421                                    column: ResolvedColumnReference::Column {
422                                        name: column_name.to_owned(),
423                                        index: pos,
424                                    },
425                                }),
426                                for_schema,
427                            };
428                            if !user_provided_comments.contains(&doc_on_column_key) {
429                                options.push(CsrConfigOption {
430                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
431                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
432                                        Value::String(comment_str.clone()),
433                                    )),
434                                });
435                            }
436                        }
437                    }
438                }
439            }
440        }
441    }
442
443    Ok(())
444}
445
446/// Checks that the sink described in the statement can connect to its external
447/// resources.
448async fn purify_create_sink(
449    catalog: impl SessionCatalog,
450    mut create_sink_stmt: CreateSinkStatement<Aug>,
451    storage_configuration: &StorageConfiguration,
452) -> Result<PurifiedStatement, PlanError> {
453    // General purification
454    let CreateSinkStatement {
455        connection,
456        format,
457        with_options,
458        name: _,
459        in_cluster: _,
460        if_not_exists: _,
461        from,
462        envelope: _,
463        mode: _,
464    } = &mut create_sink_stmt;
465
466    // The list of options that the user is allowed to specify.
467    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
468        CreateSinkOptionName::Snapshot,
469        CreateSinkOptionName::CommitInterval,
470    ];
471
472    if let Some(op) = with_options
473        .iter()
474        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
475    {
476        sql_bail!(
477            "CREATE SINK...WITH ({}..) is not allowed",
478            op.name.to_ast_string_simple(),
479        )
480    }
481
482    match &connection {
483        CreateSinkConnection::Kafka {
484            connection,
485            options,
486            key: _,
487            headers: _,
488        } => {
489            // We must not leave any state behind in the Kafka broker, so just ensure that
490            // we can connect. This means we don't ensure that we can create the topic and
491            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
492            // preferable to leaking state in users' environments.
493            let scx = StatementContext::new(None, &catalog);
494            let connection = {
495                let item = scx.get_item_by_resolved_name(connection)?;
496                // Get Kafka connection
497                match item.connection()? {
498                    Connection::Kafka(connection) => {
499                        connection.clone().into_inline_connection(scx.catalog)
500                    }
501                    _ => sql_bail!(
502                        "{} is not a kafka connection",
503                        scx.catalog.resolve_full_name(item.name())
504                    ),
505                }
506            };
507
508            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
509
510            if extracted_options.legacy_ids == Some(true) {
511                sql_bail!("LEGACY IDs option is not supported");
512            }
513
514            let client: AdminClient<_> = connection
515                .create_with_context(
516                    storage_configuration,
517                    MzClientContext::default(),
518                    &BTreeMap::new(),
519                    InTask::No,
520                )
521                .await
522                .map_err(|e| {
523                    // anyhow doesn't support Clone, so not trivial to move into PlanError
524                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
525                })?;
526
527            let metadata = client
528                .inner()
529                .fetch_metadata(
530                    None,
531                    storage_configuration
532                        .parameters
533                        .kafka_timeout_config
534                        .fetch_metadata_timeout,
535                )
536                .map_err(|e| {
537                    KafkaSinkPurificationError::AdminClientError(Arc::new(
538                        ContextCreationError::KafkaError(e),
539                    ))
540                })?;
541
542            if metadata.brokers().len() == 0 {
543                Err(KafkaSinkPurificationError::ZeroBrokers)?;
544            }
545        }
546        CreateSinkConnection::Iceberg {
547            catalog_connection,
548            aws_connection,
549            ..
550        } => {
551            let scx = StatementContext::new(None, &catalog);
552            let connection = {
553                let item = scx.get_item_by_resolved_name(catalog_connection)?;
554                // Get Iceberg connection
555                match item.connection()? {
556                    Connection::IcebergCatalog(connection) => {
557                        connection.clone().into_inline_connection(scx.catalog)
558                    }
559                    _ => sql_bail!(
560                        "{} is not an iceberg connection",
561                        scx.catalog.resolve_full_name(item.name())
562                    ),
563                }
564            };
565
566            // For S3 Tables connections in the Materialize Cloud product, verify the
567            // AWS region matches the environment's region. This check only applies when
568            // the enable_s3_tables_region_check dyncfg is set.
569            if let Some(s3tables) = connection.s3tables_catalog() {
570                let enable_region_check =
571                    ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
572                if enable_region_check {
573                    let env_id = &catalog.config().environment_id;
574                    if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
575                        let env_region = env_id.cloud_provider_region();
576                        // Later on we default to "us-east-1" if the region is not set on the S3 Tables
577                        // connection, so we need to do the same check here.
578                        let s3_tables_region = s3tables
579                            .aws_connection
580                            .connection
581                            .region
582                            .clone()
583                            .unwrap_or_else(|| "us-east-1".to_string());
584                        if s3_tables_region != env_region {
585                            Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
586                                s3_tables_region,
587                                environment_region: env_region.to_string(),
588                            })?;
589                        }
590                    }
591                }
592            }
593
594            // Validate the sink's (optional) AWS connection even though we never use it.
595            // TODO(kynan): If we do start using the sink's creds, check again that this validation
596            //   accurately reflects what we need.
597            //   Consider rolling the storage creds validation into the catalog connection's "connect" fn,
598            //   which already validates the catalog creds (currently also used for the storage layer).
599            if let Some(aws_connection) = aws_connection {
600                let aws_conn_id = aws_connection.item_id();
601                let aws_connection = {
602                    let item = scx.get_item_by_resolved_name(aws_connection)?;
603                    // Get AWS connection
604                    match item.connection()? {
605                        Connection::Aws(aws_connection) => aws_connection.clone(),
606                        _ => sql_bail!(
607                            "{} is not an aws connection",
608                            scx.catalog.resolve_full_name(item.name())
609                        ),
610                    }
611                };
612
613                let _sdk_config = aws_connection
614                    .load_sdk_config(
615                        &storage_configuration.connection_context,
616                        aws_conn_id.clone(),
617                        InTask::No,
618                        mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
619                            .get(storage_configuration.config_set()),
620                    )
621                    .await
622                    .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
623            }
624
625            // Now that we've validated the sink's storage creds (if they exist)
626            // we _could_ use them to build a complete Iceberg client (both catalog and storage).
627            // TODO(kynan): Actually use those sink-specific creds here instead of ignoring them.
628            let _catalog = connection
629                .connect(storage_configuration, InTask::No)
630                .await
631                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
632        }
633    }
634
635    let mut csr_connection_ids = BTreeSet::new();
636    for_each_format(format, |_, fmt| match fmt {
637        Format::Avro(AvroSchema::InlineSchema { .. })
638        | Format::Avro(AvroSchema::Glue { .. })
639        | Format::Bytes
640        | Format::Csv { .. }
641        | Format::Json { .. }
642        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
643        | Format::Regex(..)
644        | Format::Text => (),
645        Format::Avro(AvroSchema::Csr {
646            csr_connection: CsrConnectionAvro { connection, .. },
647        })
648        | Format::Protobuf(ProtobufSchema::Csr {
649            csr_connection: CsrConnectionProtobuf { connection, .. },
650        }) => {
651            csr_connection_ids.insert(*connection.connection.item_id());
652        }
653    });
654
655    let scx = StatementContext::new(None, &catalog);
656    for csr_connection_id in csr_connection_ids {
657        let connection = {
658            let item = scx.get_item(&csr_connection_id);
659            // Get Kafka connection
660            match item.connection()? {
661                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
662                _ => Err(CsrPurificationError::NotCsrConnection(
663                    scx.catalog.resolve_full_name(item.name()),
664                ))?,
665            }
666        };
667
668        let client = connection
669            .connect(storage_configuration, InTask::No)
670            .await
671            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
672
673        client
674            .list_subjects()
675            .await
676            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
677    }
678
679    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
680
681    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
682}
683
684/// Runs a function on each format within the format specifier.
685///
686/// The function is provided with a `DocOnSchema` that indicates whether the
687/// format is for the key, value, or both.
688///
689// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
690fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
691where
692    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
693{
694    match format {
695        None => (),
696        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
697        Some(FormatSpecifier::KeyValue { key, value }) => {
698            f(DocOnSchema::KeyOnly, key);
699            f(DocOnSchema::ValueOnly, value);
700        }
701    }
702}
703
704/// Defines whether purification should enforce that at least one valid source
705/// reference is provided on the provided statement.
706#[derive(Debug, Copy, Clone, PartialEq, Eq)]
707pub(crate) enum SourceReferencePolicy {
708    /// Don't allow source references to be provided. This is used for
709    /// enforcing that `CREATE SOURCE` statements don't create subsources.
710    NotAllowed,
711    /// Allow empty references, such as when creating a source that
712    /// will have tables added afterwards.
713    Optional,
714    /// Require that at least one reference is resolved to an upstream
715    /// object.
716    Required,
717}
718
719async fn purify_create_source(
720    catalog: impl SessionCatalog,
721    now: u64,
722    mut create_source_stmt: CreateSourceStatement<Aug>,
723    storage_configuration: &StorageConfiguration,
724) -> Result<PurifiedStatement, PlanError> {
725    let CreateSourceStatement {
726        name: source_name,
727        col_names,
728        key_constraint,
729        connection: source_connection,
730        format,
731        envelope,
732        include_metadata,
733        external_references,
734        progress_subsource,
735        with_options,
736        ..
737    } = &mut create_source_stmt;
738
739    let uses_old_syntax = !col_names.is_empty()
740        || key_constraint.is_some()
741        || format.is_some()
742        || envelope.is_some()
743        || !include_metadata.is_empty()
744        || external_references.is_some()
745        || progress_subsource.is_some();
746
747    if let Some(DeferredItemName::Named(_)) = progress_subsource {
748        sql_bail!("Cannot manually ID qualify progress subsource")
749    }
750
751    let mut requested_subsource_map = BTreeMap::new();
752
753    let progress_desc = match &source_connection {
754        CreateSourceConnection::Kafka { .. } => {
755            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
756        }
757        CreateSourceConnection::Postgres { .. } => {
758            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
759        }
760        CreateSourceConnection::SqlServer { .. } => {
761            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
762        }
763        CreateSourceConnection::MySql { .. } => {
764            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
765        }
766        CreateSourceConnection::LoadGenerator { .. } => {
767            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
768        }
769    };
770    let scx = StatementContext::new(None, &catalog);
771
772    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
773    // to add tables after this source is created we might need to enforce that
774    // auto-generated subsources are created or not created by this source statement.
775    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
776        && scx.catalog.system_vars().force_source_table_syntax()
777    {
778        SourceReferencePolicy::NotAllowed
779    } else if scx.catalog.system_vars().enable_create_table_from_source() {
780        SourceReferencePolicy::Optional
781    } else {
782        SourceReferencePolicy::Required
783    };
784
785    let mut format_options = SourceFormatOptions::Default;
786
787    let retrieved_source_references: RetrievedSourceReferences;
788
789    match source_connection {
790        CreateSourceConnection::Kafka {
791            connection,
792            options: base_with_options,
793            ..
794        } => {
795            if let Some(external_references) = external_references {
796                Err(KafkaSourcePurificationError::ReferencedSubsources(
797                    external_references.clone(),
798                ))?;
799            }
800
801            let connection = {
802                let item = scx.get_item_by_resolved_name(connection)?;
803                // Get Kafka connection
804                match item.connection()? {
805                    Connection::Kafka(connection) => {
806                        connection.clone().into_inline_connection(&catalog)
807                    }
808                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
809                        scx.catalog.resolve_full_name(item.name()),
810                    ))?,
811                }
812            };
813
814            let extracted_options: KafkaSourceConfigOptionExtracted =
815                base_with_options.clone().try_into()?;
816
817            let topic = extracted_options
818                .topic
819                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
820
821            let consumer = connection
822                .create_with_context(
823                    storage_configuration,
824                    MzClientContext::default(),
825                    &BTreeMap::new(),
826                    InTask::No,
827                )
828                .await
829                .map_err(|e| {
830                    // anyhow doesn't support Clone, so not trivial to move into PlanError
831                    KafkaSourcePurificationError::KafkaConsumerError(
832                        e.display_with_causes().to_string(),
833                    )
834                })?;
835            let consumer = Arc::new(consumer);
836
837            match (
838                extracted_options.start_offset,
839                extracted_options.start_timestamp,
840            ) {
841                (None, None) => {
842                    // Validate that the topic at least exists.
843                    kafka_util::ensure_topic_exists(
844                        Arc::clone(&consumer),
845                        &topic,
846                        storage_configuration
847                            .parameters
848                            .kafka_timeout_config
849                            .fetch_metadata_timeout,
850                    )
851                    .await?;
852                }
853                (Some(_), Some(_)) => {
854                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
855                }
856                (Some(start_offsets), None) => {
857                    // Validate the start offsets.
858                    kafka_util::validate_start_offsets(
859                        Arc::clone(&consumer),
860                        &topic,
861                        start_offsets,
862                        storage_configuration
863                            .parameters
864                            .kafka_timeout_config
865                            .fetch_metadata_timeout,
866                    )
867                    .await?;
868                }
869                (None, Some(time_offset)) => {
870                    // Translate `START TIMESTAMP` to a start offset.
871                    let start_offsets = kafka_util::lookup_start_offsets(
872                        Arc::clone(&consumer),
873                        &topic,
874                        time_offset,
875                        now,
876                        storage_configuration
877                            .parameters
878                            .kafka_timeout_config
879                            .fetch_metadata_timeout,
880                    )
881                    .await?;
882
883                    base_with_options.retain(|val| {
884                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
885                    });
886                    base_with_options.push(KafkaSourceConfigOption {
887                        name: KafkaSourceConfigOptionName::StartOffset,
888                        value: Some(WithOptionValue::Sequence(
889                            start_offsets
890                                .iter()
891                                .map(|offset| {
892                                    WithOptionValue::Value(Value::Number(offset.to_string()))
893                                })
894                                .collect(),
895                        )),
896                    });
897                }
898            }
899
900            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
901            retrieved_source_references = reference_client.get_source_references().await?;
902
903            format_options = SourceFormatOptions::Kafka { topic };
904        }
905        CreateSourceConnection::Postgres {
906            connection,
907            options,
908        } => {
909            let connection_item = scx.get_item_by_resolved_name(connection)?;
910            let connection = match connection_item.connection().map_err(PlanError::from)? {
911                Connection::Postgres(connection) => {
912                    connection.clone().into_inline_connection(&catalog)
913                }
914                _ => Err(PgSourcePurificationError::NotPgConnection(
915                    scx.catalog.resolve_full_name(connection_item.name()),
916                ))?,
917            };
918            let crate::plan::statement::PgConfigOptionExtracted {
919                publication,
920                text_columns,
921                exclude_columns,
922                details,
923                ..
924            } = options.clone().try_into()?;
925            let publication =
926                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
927
928            if details.is_some() {
929                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
930            }
931
932            let client = connection
933                .validate(connection_item.id(), storage_configuration)
934                .await?;
935
936            let reference_client = SourceReferenceClient::Postgres {
937                client: &client,
938                publication: &publication,
939                database: &connection.database,
940            };
941            retrieved_source_references = reference_client.get_source_references().await?;
942
943            let postgres::PurifiedSourceExports {
944                source_exports: subsources,
945                normalized_text_columns,
946            } = postgres::purify_source_exports(
947                &client,
948                &retrieved_source_references,
949                external_references,
950                text_columns,
951                exclude_columns,
952                source_name,
953                &reference_policy,
954            )
955            .await?;
956
957            if let Some(text_cols_option) = options
958                .iter_mut()
959                .find(|option| option.name == PgConfigOptionName::TextColumns)
960            {
961                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
962            }
963
964            requested_subsource_map.extend(subsources);
965
966            // Record the active replication timeline_id to allow detection of a future upstream
967            // point-in-time-recovery that will put the source into an error state.
968            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
969
970            // Record whether the upstream server is a physical replica, which changes how we can determine
971            // the latest LSN.
972            let is_physical_replica = Some(mz_postgres_util::get_is_in_recovery(&client).await?);
973
974            // Remove any old detail references
975            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
976            let details = PostgresSourcePublicationDetails {
977                slot: format!(
978                    "materialize_{}",
979                    Uuid::new_v4().to_string().replace('-', "")
980                ),
981                timeline_id: Some(timeline_id),
982                database: connection.database,
983                is_physical_replica,
984            };
985            options.push(PgConfigOption {
986                name: PgConfigOptionName::Details,
987                value: Some(WithOptionValue::Value(Value::String(hex::encode(
988                    details.into_proto().encode_to_vec(),
989                )))),
990            })
991        }
992        CreateSourceConnection::SqlServer {
993            connection,
994            options,
995        } => {
996            // Connect to the upstream SQL Server instance so we can validate
997            // we're compatible with CDC.
998            let connection_item = scx.get_item_by_resolved_name(connection)?;
999            let connection = match connection_item.connection()? {
1000                Connection::SqlServer(connection) => {
1001                    connection.clone().into_inline_connection(&catalog)
1002                }
1003                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
1004                    scx.catalog.resolve_full_name(connection_item.name()),
1005                ))?,
1006            };
1007            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
1008                details,
1009                text_columns,
1010                exclude_columns,
1011                seen: _,
1012            } = options.clone().try_into()?;
1013
1014            if details.is_some() {
1015                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
1016            }
1017
1018            let mut client = connection
1019                .validate(connection_item.id(), storage_configuration)
1020                .await?;
1021
1022            let database: Arc<str> = connection.database.into();
1023            let reference_client = SourceReferenceClient::SqlServer {
1024                client: &mut client,
1025                database: Arc::clone(&database),
1026            };
1027            retrieved_source_references = reference_client.get_source_references().await?;
1028            tracing::debug!(?retrieved_source_references, "got source references");
1029
1030            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1031                .get(storage_configuration.config_set());
1032
1033            let purified_source_exports = sql_server::purify_source_exports(
1034                &*database,
1035                &mut client,
1036                &retrieved_source_references,
1037                external_references,
1038                &text_columns,
1039                &exclude_columns,
1040                source_name,
1041                timeout,
1042                &reference_policy,
1043            )
1044            .await?;
1045
1046            let sql_server::PurifiedSourceExports {
1047                source_exports,
1048                normalized_text_columns,
1049                normalized_excl_columns,
1050            } = purified_source_exports;
1051
1052            // Update our set of requested source exports.
1053            requested_subsource_map.extend(source_exports);
1054
1055            // Record the most recent restore_history_id, or none if the system has never been
1056            // restored.
1057
1058            let restore_history_id =
1059                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1060            let details = SqlServerSourceExtras { restore_history_id };
1061
1062            options.retain(|SqlServerConfigOption { name, .. }| {
1063                name != &SqlServerConfigOptionName::Details
1064            });
1065            options.push(SqlServerConfigOption {
1066                name: SqlServerConfigOptionName::Details,
1067                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1068                    details.into_proto().encode_to_vec(),
1069                )))),
1070            });
1071
1072            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1073            if let Some(text_cols_option) = options
1074                .iter_mut()
1075                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1076            {
1077                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1078            }
1079            if let Some(excl_cols_option) = options
1080                .iter_mut()
1081                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1082            {
1083                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1084            }
1085        }
1086        CreateSourceConnection::MySql {
1087            connection,
1088            options,
1089        } => {
1090            let connection_item = scx.get_item_by_resolved_name(connection)?;
1091            let connection = match connection_item.connection()? {
1092                Connection::MySql(connection) => {
1093                    connection.clone().into_inline_connection(&catalog)
1094                }
1095                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1096                    scx.catalog.resolve_full_name(connection_item.name()),
1097                ))?,
1098            };
1099            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1100                details,
1101                text_columns,
1102                exclude_columns,
1103                seen: _,
1104            } = options.clone().try_into()?;
1105
1106            if details.is_some() {
1107                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1108            }
1109
1110            let mut conn = connection
1111                .validate(connection_item.id(), storage_configuration)
1112                .await
1113                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1114
1115            // Retrieve the current @gtid_executed value of the server to mark as the effective
1116            // initial snapshot point such that we can ensure consistency if the initial source
1117            // snapshot is broken up over multiple points in time.
1118            let initial_gtid_set =
1119                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1120
1121            let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1122
1123            let reference_client = SourceReferenceClient::MySql {
1124                conn: &mut conn,
1125                include_system_schemas: mysql::references_system_schemas(external_references),
1126            };
1127            retrieved_source_references = reference_client.get_source_references().await?;
1128
1129            let mysql::PurifiedSourceExports {
1130                source_exports: subsources,
1131                normalized_text_columns,
1132                normalized_exclude_columns,
1133            } = mysql::purify_source_exports(
1134                &mut conn,
1135                &retrieved_source_references,
1136                external_references,
1137                text_columns,
1138                exclude_columns,
1139                source_name,
1140                initial_gtid_set.clone(),
1141                &reference_policy,
1142                binlog_full_metadata,
1143            )
1144            .await?;
1145            requested_subsource_map.extend(subsources);
1146
1147            // We don't have any fields in this details struct but keep this around for
1148            // conformity with postgres and in-case we end up needing it again in the future.
1149            let details = MySqlSourceDetails {};
1150            // Update options with the purified details
1151            options
1152                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1153            options.push(MySqlConfigOption {
1154                name: MySqlConfigOptionName::Details,
1155                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1156                    details.into_proto().encode_to_vec(),
1157                )))),
1158            });
1159
1160            if let Some(text_cols_option) = options
1161                .iter_mut()
1162                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1163            {
1164                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1165            }
1166            if let Some(exclude_cols_option) = options
1167                .iter_mut()
1168                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1169            {
1170                exclude_cols_option.value =
1171                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1172            }
1173        }
1174        CreateSourceConnection::LoadGenerator { generator, options } => {
1175            let load_generator =
1176                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1177
1178            let reference_client = SourceReferenceClient::LoadGenerator {
1179                generator: &load_generator,
1180            };
1181            retrieved_source_references = reference_client.get_source_references().await?;
1182            // Filter to the references that need to be created as 'subsources', which
1183            // doesn't include the default output for single-output sources.
1184            // TODO(database-issues#8620): Remove once subsources are removed
1185            let multi_output_sources =
1186                retrieved_source_references
1187                    .all_references()
1188                    .iter()
1189                    .any(|r| {
1190                        matches!(
1191                            r.load_generator_output(),
1192                            Some(output) if output != &LoadGeneratorOutput::Default
1193                        )
1194                    });
1195
1196            match external_references {
1197                Some(requested)
1198                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1199                {
1200                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1201                }
1202                Some(requested) if !multi_output_sources => match requested {
1203                    ExternalReferences::SubsetTables(_) => {
1204                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1205                    }
1206                    ExternalReferences::SubsetSchemas(_) => {
1207                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1208                    }
1209                    ExternalReferences::All => {
1210                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1211                    }
1212                },
1213                Some(requested) => {
1214                    let requested_exports = retrieved_source_references
1215                        .requested_source_exports(Some(requested), source_name)?;
1216                    for export in requested_exports {
1217                        requested_subsource_map.insert(
1218                            export.name,
1219                            PurifiedSourceExport {
1220                                external_reference: export.external_reference,
1221                                details: PurifiedExportDetails::LoadGenerator {
1222                                    table: export
1223                                        .meta
1224                                        .load_generator_desc()
1225                                        .ok_or_else(|| {
1226                                            internal_err!(
1227                                                "expected load generator source reference"
1228                                            )
1229                                        })?
1230                                        .clone(),
1231                                    output: export
1232                                        .meta
1233                                        .load_generator_output()
1234                                        .ok_or_else(|| {
1235                                            internal_err!(
1236                                                "expected load generator source reference"
1237                                            )
1238                                        })?
1239                                        .clone(),
1240                                },
1241                            },
1242                        );
1243                    }
1244                }
1245                None => {
1246                    if multi_output_sources
1247                        && matches!(reference_policy, SourceReferencePolicy::Required)
1248                    {
1249                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1250                    }
1251                }
1252            }
1253
1254            if let LoadGenerator::Clock = generator {
1255                if !options
1256                    .iter()
1257                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1258                {
1259                    let now = catalog.now();
1260                    options.push(LoadGeneratorOption {
1261                        name: LoadGeneratorOptionName::AsOf,
1262                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1263                    });
1264                }
1265            }
1266        }
1267    }
1268
1269    // Now that we know which subsources to create alongside this
1270    // statement, remove the references so it is not canonicalized as
1271    // part of the `CREATE SOURCE` statement in the catalog.
1272    *external_references = None;
1273
1274    // Generate progress subsource for old syntax
1275    let create_progress_subsource_stmt = if uses_old_syntax {
1276        // Take name from input or generate name
1277        let name = match progress_subsource {
1278            Some(name) => match name {
1279                DeferredItemName::Deferred(name) => name.clone(),
1280                // Already checked for this value above.
1281                DeferredItemName::Named(_) => {
1282                    sql_bail!("progress subsource name cannot be a resolved name")
1283                }
1284            },
1285            None => {
1286                let (item, prefix) = source_name
1287                    .0
1288                    .split_last()
1289                    .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1290                let item_name =
1291                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1292                        let mut suggested_name = prefix.to_vec();
1293                        suggested_name.push(candidate.clone());
1294
1295                        let partial =
1296                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1297                        let qualified = scx.allocate_qualified_name(partial)?;
1298                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1299                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1300                        Ok::<_, PlanError>(!item_exists && !type_exists)
1301                    })?;
1302
1303                let mut full_name = prefix.to_vec();
1304                full_name.push(item_name);
1305                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1306                let qualified_name = scx.allocate_qualified_name(full_name)?;
1307                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1308
1309                UnresolvedItemName::from(full_name.clone())
1310            }
1311        };
1312
1313        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1314
1315        // Create the subsource statement
1316        let mut progress_with_options: Vec<_> = with_options
1317            .iter()
1318            .filter_map(|opt| match opt.name {
1319                CreateSourceOptionName::TimestampInterval => None,
1320                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1321                    name: CreateSubsourceOptionName::RetainHistory,
1322                    value: opt.value.clone(),
1323                }),
1324            })
1325            .collect();
1326        progress_with_options.push(CreateSubsourceOption {
1327            name: CreateSubsourceOptionName::Progress,
1328            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1329        });
1330
1331        Some(CreateSubsourceStatement {
1332            name,
1333            columns,
1334            // Progress subsources do not refer to the source to which they belong.
1335            // Instead the primary source depends on it (the opposite is true of
1336            // ingestion exports, which depend on the primary source).
1337            of_source: None,
1338            constraints,
1339            if_not_exists: false,
1340            with_options: progress_with_options,
1341        })
1342    } else {
1343        None
1344    };
1345
1346    purify_source_format(
1347        &catalog,
1348        format,
1349        &format_options,
1350        envelope,
1351        storage_configuration,
1352    )
1353    .await?;
1354
1355    Ok(PurifiedStatement::PurifiedCreateSource {
1356        create_progress_subsource_stmt,
1357        create_source_stmt,
1358        subsources: requested_subsource_map,
1359        available_source_references: retrieved_source_references.available_source_references(),
1360    })
1361}
1362
1363/// On success, returns the details on new subsources and updated
1364/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1365async fn purify_alter_source(
1366    catalog: impl SessionCatalog,
1367    stmt: AlterSourceStatement<Aug>,
1368    storage_configuration: &StorageConfiguration,
1369) -> Result<PurifiedStatement, PlanError> {
1370    let scx = StatementContext::new(None, &catalog);
1371    let AlterSourceStatement {
1372        source_name: unresolved_source_name,
1373        action,
1374        if_exists,
1375    } = stmt;
1376
1377    // Get name.
1378    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1379        Ok(item) => item,
1380        Err(_) if if_exists => {
1381            return Ok(PurifiedStatement::PurifiedAlterSource {
1382                alter_source_stmt: AlterSourceStatement {
1383                    source_name: unresolved_source_name,
1384                    action,
1385                    if_exists,
1386                },
1387            });
1388        }
1389        Err(e) => return Err(e),
1390    };
1391
1392    // Ensure it's an ingestion-based and alterable source.
1393    let desc = match item.source_desc()? {
1394        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1395        None => {
1396            sql_bail!("cannot ALTER this type of source")
1397        }
1398    };
1399
1400    let source_name = item.name();
1401
1402    let resolved_source_name = ResolvedItemName::Item {
1403        id: item.id(),
1404        qualifiers: item.name().qualifiers.clone(),
1405        full_name: scx.catalog.resolve_full_name(source_name),
1406        print_id: true,
1407        version: RelationVersionSelector::Latest,
1408    };
1409
1410    let partial_name = scx.catalog.minimal_qualification(source_name);
1411
1412    match action {
1413        AlterSourceAction::AddSubsources {
1414            external_references,
1415            options,
1416        } => {
1417            if scx.catalog.system_vars().enable_create_table_from_source()
1418                && scx.catalog.system_vars().force_source_table_syntax()
1419            {
1420                Err(PlanError::UseTablesForSources(
1421                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1422                ))?;
1423            }
1424
1425            purify_alter_source_add_subsources(
1426                external_references,
1427                options,
1428                desc,
1429                partial_name,
1430                unresolved_source_name,
1431                resolved_source_name,
1432                storage_configuration,
1433            )
1434            .await
1435        }
1436        AlterSourceAction::RefreshReferences => {
1437            purify_alter_source_refresh_references(
1438                desc,
1439                resolved_source_name,
1440                storage_configuration,
1441            )
1442            .await
1443        }
1444        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1445            alter_source_stmt: AlterSourceStatement {
1446                source_name: unresolved_source_name,
1447                action,
1448                if_exists,
1449            },
1450        }),
1451    }
1452}
1453
1454// TODO(database-issues#8620): Remove once subsources are removed
1455/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1456async fn purify_alter_source_add_subsources(
1457    external_references: Vec<ExternalReferenceExport>,
1458    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1459    desc: SourceDesc,
1460    partial_source_name: PartialItemName,
1461    unresolved_source_name: UnresolvedItemName,
1462    resolved_source_name: ResolvedItemName,
1463    storage_configuration: &StorageConfiguration,
1464) -> Result<PurifiedStatement, PlanError> {
1465    // Validate this is a source that can have subsources added.
1466    let connection_id = match &desc.connection {
1467        GenericSourceConnection::Postgres(c) => c.connection_id,
1468        GenericSourceConnection::MySql(c) => c.connection_id,
1469        GenericSourceConnection::SqlServer(c) => c.connection_id,
1470        _ => sql_bail!(
1471            "source {} does not support ALTER SOURCE.",
1472            partial_source_name
1473        ),
1474    };
1475
1476    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1477        text_columns,
1478        exclude_columns,
1479        details,
1480        seen: _,
1481    } = options.clone().try_into()?;
1482    if details.is_some() {
1483        sql_bail!("DETAILS option cannot be explicitly set");
1484    }
1485
1486    let mut requested_subsource_map = BTreeMap::new();
1487
1488    match desc.connection {
1489        GenericSourceConnection::Postgres(pg_source_connection) => {
1490            // Get PostgresConnection for generating subsources.
1491            let pg_connection = &pg_source_connection.connection;
1492
1493            let client = pg_connection
1494                .validate(connection_id, storage_configuration)
1495                .await?;
1496
1497            let reference_client = SourceReferenceClient::Postgres {
1498                client: &client,
1499                publication: &pg_source_connection.publication,
1500                database: &pg_connection.database,
1501            };
1502            let retrieved_source_references = reference_client.get_source_references().await?;
1503
1504            let postgres::PurifiedSourceExports {
1505                source_exports: subsources,
1506                normalized_text_columns,
1507            } = postgres::purify_source_exports(
1508                &client,
1509                &retrieved_source_references,
1510                &Some(ExternalReferences::SubsetTables(external_references)),
1511                text_columns,
1512                exclude_columns,
1513                &unresolved_source_name,
1514                &SourceReferencePolicy::Required,
1515            )
1516            .await?;
1517
1518            if let Some(text_cols_option) = options
1519                .iter_mut()
1520                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1521            {
1522                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1523            }
1524
1525            requested_subsource_map.extend(subsources);
1526        }
1527        GenericSourceConnection::MySql(mysql_source_connection) => {
1528            let mysql_connection = &mysql_source_connection.connection;
1529            let config = mysql_connection
1530                .config(
1531                    &storage_configuration.connection_context.secrets_reader,
1532                    storage_configuration,
1533                    InTask::No,
1534                )
1535                .await?;
1536
1537            let mut conn = config
1538                .connect(
1539                    "mysql purification",
1540                    &storage_configuration.connection_context.ssh_tunnel_manager,
1541                )
1542                .await?;
1543
1544            // Retrieve the current @gtid_executed value of the server to mark as the effective
1545            // initial snapshot point for these subsources.
1546            let initial_gtid_set =
1547                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1548
1549            let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1550
1551            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1552
1553            let reference_client = SourceReferenceClient::MySql {
1554                conn: &mut conn,
1555                include_system_schemas: mysql::references_system_schemas(&requested_references),
1556            };
1557            let retrieved_source_references = reference_client.get_source_references().await?;
1558
1559            let mysql::PurifiedSourceExports {
1560                source_exports: subsources,
1561                normalized_text_columns,
1562                normalized_exclude_columns,
1563            } = mysql::purify_source_exports(
1564                &mut conn,
1565                &retrieved_source_references,
1566                &requested_references,
1567                text_columns,
1568                exclude_columns,
1569                &unresolved_source_name,
1570                initial_gtid_set,
1571                &SourceReferencePolicy::Required,
1572                binlog_full_metadata,
1573            )
1574            .await?;
1575            requested_subsource_map.extend(subsources);
1576
1577            // Update options with the purified details
1578            if let Some(text_cols_option) = options
1579                .iter_mut()
1580                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1581            {
1582                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1583            }
1584            if let Some(exclude_cols_option) = options
1585                .iter_mut()
1586                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1587            {
1588                exclude_cols_option.value =
1589                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1590            }
1591        }
1592        GenericSourceConnection::SqlServer(sql_server_source) => {
1593            // Open a connection to the upstream SQL Server instance.
1594            let sql_server_connection = &sql_server_source.connection;
1595            let config = sql_server_connection
1596                .resolve_config(
1597                    &storage_configuration.connection_context.secrets_reader,
1598                    storage_configuration,
1599                    InTask::No,
1600                )
1601                .await?;
1602            let mut client = mz_sql_server_util::Client::connect(config).await?;
1603
1604            // Query the upstream SQL Server instance for available tables to replicate.
1605            let database = sql_server_connection.database.clone().into();
1606            let source_references = SourceReferenceClient::SqlServer {
1607                client: &mut client,
1608                database: Arc::clone(&database),
1609            }
1610            .get_source_references()
1611            .await?;
1612            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1613
1614            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1615                .get(storage_configuration.config_set());
1616
1617            let result = sql_server::purify_source_exports(
1618                &*database,
1619                &mut client,
1620                &source_references,
1621                &requested_references,
1622                &text_columns,
1623                &exclude_columns,
1624                &unresolved_source_name,
1625                timeout,
1626                &SourceReferencePolicy::Required,
1627            )
1628            .await;
1629            let sql_server::PurifiedSourceExports {
1630                source_exports,
1631                normalized_text_columns,
1632                normalized_excl_columns,
1633            } = result?;
1634
1635            // Add the new exports to our subsource map.
1636            requested_subsource_map.extend(source_exports);
1637
1638            // Update options on the CREATE SOURCE statement with the purified details.
1639            if let Some(text_cols_option) = options
1640                .iter_mut()
1641                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1642            {
1643                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1644            }
1645            if let Some(exclude_cols_option) = options
1646                .iter_mut()
1647                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1648            {
1649                exclude_cols_option.value =
1650                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1651            }
1652        }
1653        // The source kind was already established earlier in this function;
1654        // reaching this catch-all is an internal invariant violation.
1655        _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1656    };
1657
1658    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1659        source_name: resolved_source_name,
1660        options,
1661        subsources: requested_subsource_map,
1662    })
1663}
1664
1665async fn purify_alter_source_refresh_references(
1666    desc: SourceDesc,
1667    resolved_source_name: ResolvedItemName,
1668    storage_configuration: &StorageConfiguration,
1669) -> Result<PurifiedStatement, PlanError> {
1670    let retrieved_source_references = match desc.connection {
1671        GenericSourceConnection::Postgres(pg_source_connection) => {
1672            // Get PostgresConnection for generating subsources.
1673            let pg_connection = &pg_source_connection.connection;
1674
1675            let config = pg_connection
1676                .config(
1677                    &storage_configuration.connection_context.secrets_reader,
1678                    storage_configuration,
1679                    InTask::No,
1680                )
1681                .await?;
1682
1683            let client = config
1684                .connect(
1685                    "postgres_purification",
1686                    &storage_configuration.connection_context.ssh_tunnel_manager,
1687                )
1688                .await?;
1689            let reference_client = SourceReferenceClient::Postgres {
1690                client: &client,
1691                publication: &pg_source_connection.publication,
1692                database: &pg_connection.database,
1693            };
1694            reference_client.get_source_references().await?
1695        }
1696        GenericSourceConnection::MySql(mysql_source_connection) => {
1697            let mysql_connection = &mysql_source_connection.connection;
1698            let config = mysql_connection
1699                .config(
1700                    &storage_configuration.connection_context.secrets_reader,
1701                    storage_configuration,
1702                    InTask::No,
1703                )
1704                .await?;
1705
1706            let mut conn = config
1707                .connect(
1708                    "mysql purification",
1709                    &storage_configuration.connection_context.ssh_tunnel_manager,
1710                )
1711                .await?;
1712
1713            let reference_client = SourceReferenceClient::MySql {
1714                conn: &mut conn,
1715                include_system_schemas: false,
1716            };
1717            reference_client.get_source_references().await?
1718        }
1719        GenericSourceConnection::SqlServer(sql_server_source) => {
1720            // Open a connection to the upstream SQL Server instance.
1721            let sql_server_connection = &sql_server_source.connection;
1722            let config = sql_server_connection
1723                .resolve_config(
1724                    &storage_configuration.connection_context.secrets_reader,
1725                    storage_configuration,
1726                    InTask::No,
1727                )
1728                .await?;
1729            let mut client = mz_sql_server_util::Client::connect(config).await?;
1730
1731            // Query the upstream SQL Server instance for available tables to replicate.
1732            let source_references = SourceReferenceClient::SqlServer {
1733                client: &mut client,
1734                database: sql_server_connection.database.clone().into(),
1735            }
1736            .get_source_references()
1737            .await?;
1738            source_references
1739        }
1740        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1741            let reference_client = SourceReferenceClient::LoadGenerator {
1742                generator: &load_gen_connection.load_generator,
1743            };
1744            reference_client.get_source_references().await?
1745        }
1746        GenericSourceConnection::Kafka(kafka_conn) => {
1747            let reference_client = SourceReferenceClient::Kafka {
1748                topic: &kafka_conn.topic,
1749            };
1750            reference_client.get_source_references().await?
1751        }
1752    };
1753    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1754        source_name: resolved_source_name,
1755        available_source_references: retrieved_source_references.available_source_references(),
1756    })
1757}
1758
1759async fn purify_create_table_from_source(
1760    catalog: impl SessionCatalog,
1761    mut stmt: CreateTableFromSourceStatement<Aug>,
1762    storage_configuration: &StorageConfiguration,
1763) -> Result<PurifiedStatement, PlanError> {
1764    let scx = StatementContext::new(None, &catalog);
1765    let CreateTableFromSourceStatement {
1766        name: _,
1767        columns,
1768        constraints,
1769        source: source_name,
1770        if_not_exists: _,
1771        external_reference,
1772        format,
1773        envelope,
1774        include_metadata: _,
1775        with_options,
1776    } = &mut stmt;
1777
1778    // Columns and constraints cannot be specified by the user but will be populated below.
1779    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1780        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1781    }
1782    if !constraints.is_empty() {
1783        sql_bail!(
1784            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1785        );
1786    }
1787
1788    // Get the source item
1789    let item = match scx.get_item_by_resolved_name(source_name) {
1790        Ok(item) => item,
1791        Err(e) => return Err(e),
1792    };
1793
1794    // Ensure it's an ingestion-based and alterable source.
1795    let desc = match item.source_desc()? {
1796        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1797        None => {
1798            sql_bail!("cannot ALTER this type of source")
1799        }
1800    };
1801    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1802
1803    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1804        text_columns,
1805        exclude_columns,
1806        retain_history: _,
1807        details,
1808        partition_by: _,
1809        seen: _,
1810    } = with_options.clone().try_into()?;
1811    if details.is_some() {
1812        sql_bail!("DETAILS option cannot be explicitly set");
1813    }
1814
1815    // Our text column values are unqualified (just column names), but the purification methods below
1816    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1817    // need to qualify them using the external reference first.
1818    let qualified_text_columns = text_columns
1819        .iter()
1820        .map(|col| {
1821            UnresolvedItemName(
1822                external_reference
1823                    .as_ref()
1824                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1825                    .unwrap_or_else(|| vec![col.clone()]),
1826            )
1827        })
1828        .collect_vec();
1829    let qualified_exclude_columns = exclude_columns
1830        .iter()
1831        .map(|col| {
1832            UnresolvedItemName(
1833                external_reference
1834                    .as_ref()
1835                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1836                    .unwrap_or_else(|| vec![col.clone()]),
1837            )
1838        })
1839        .collect_vec();
1840
1841    // Should be overriden below if a source-specific format is required.
1842    let mut format_options = SourceFormatOptions::Default;
1843
1844    let retrieved_source_references: RetrievedSourceReferences;
1845
1846    let requested_references = external_reference.as_ref().map(|ref_name| {
1847        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1848            reference: ref_name.clone(),
1849            alias: None,
1850        }])
1851    });
1852
1853    // Run purification work specific to each source type: resolve the external reference to
1854    // a fully qualified name and obtain the appropriate details for the source-export statement
1855    let purified_export = match desc.connection {
1856        GenericSourceConnection::Postgres(pg_source_connection) => {
1857            // Get PostgresConnection for generating subsources.
1858            let pg_connection = &pg_source_connection.connection;
1859
1860            let client = pg_connection
1861                .validate(pg_source_connection.connection_id, storage_configuration)
1862                .await?;
1863
1864            let reference_client = SourceReferenceClient::Postgres {
1865                client: &client,
1866                publication: &pg_source_connection.publication,
1867                database: &pg_connection.database,
1868            };
1869            retrieved_source_references = reference_client.get_source_references().await?;
1870
1871            let postgres::PurifiedSourceExports {
1872                source_exports,
1873                // TODO(database-issues#8620): Remove once subsources are removed
1874                // This `normalized_text_columns` is not relevant for us and is only returned for
1875                // `CREATE SOURCE` statements that automatically generate subsources
1876                normalized_text_columns: _,
1877            } = postgres::purify_source_exports(
1878                &client,
1879                &retrieved_source_references,
1880                &requested_references,
1881                qualified_text_columns,
1882                qualified_exclude_columns,
1883                &unresolved_source_name,
1884                &SourceReferencePolicy::Required,
1885            )
1886            .await?;
1887            // There should be exactly one source_export returned for this statement
1888            let (_, purified_export) = source_exports.into_element();
1889            purified_export
1890        }
1891        GenericSourceConnection::MySql(mysql_source_connection) => {
1892            let mysql_connection = &mysql_source_connection.connection;
1893            let config = mysql_connection
1894                .config(
1895                    &storage_configuration.connection_context.secrets_reader,
1896                    storage_configuration,
1897                    InTask::No,
1898                )
1899                .await?;
1900
1901            let mut conn = config
1902                .connect(
1903                    "mysql purification",
1904                    &storage_configuration.connection_context.ssh_tunnel_manager,
1905                )
1906                .await?;
1907
1908            ensure_binlog_full_metadata(&mut conn).await?;
1909            let binlog_full_metadata = true;
1910
1911            // Retrieve the current @gtid_executed value of the server to mark as the effective
1912            // initial snapshot point for this table.
1913            let initial_gtid_set =
1914                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1915
1916            let reference_client = SourceReferenceClient::MySql {
1917                conn: &mut conn,
1918                include_system_schemas: mysql::references_system_schemas(&requested_references),
1919            };
1920            retrieved_source_references = reference_client.get_source_references().await?;
1921
1922            let mysql::PurifiedSourceExports {
1923                source_exports,
1924                // TODO(database-issues#8620): Remove once subsources are removed
1925                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1926                // `CREATE SOURCE` statements that automatically generate subsources
1927                normalized_text_columns: _,
1928                normalized_exclude_columns: _,
1929            } = mysql::purify_source_exports(
1930                &mut conn,
1931                &retrieved_source_references,
1932                &requested_references,
1933                qualified_text_columns,
1934                qualified_exclude_columns,
1935                &unresolved_source_name,
1936                initial_gtid_set,
1937                &SourceReferencePolicy::Required,
1938                binlog_full_metadata,
1939            )
1940            .await?;
1941            // There should be exactly one source_export returned for this statement
1942            let (_, purified_export) = source_exports.into_element();
1943            purified_export
1944        }
1945        GenericSourceConnection::SqlServer(sql_server_source) => {
1946            let connection = sql_server_source.connection;
1947            let config = connection
1948                .resolve_config(
1949                    &storage_configuration.connection_context.secrets_reader,
1950                    storage_configuration,
1951                    InTask::No,
1952                )
1953                .await?;
1954            let mut client = mz_sql_server_util::Client::connect(config).await?;
1955
1956            let database: Arc<str> = connection.database.into();
1957            let reference_client = SourceReferenceClient::SqlServer {
1958                client: &mut client,
1959                database: Arc::clone(&database),
1960            };
1961            retrieved_source_references = reference_client.get_source_references().await?;
1962            tracing::debug!(?retrieved_source_references, "got source references");
1963
1964            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1965                .get(storage_configuration.config_set());
1966
1967            let purified_source_exports = sql_server::purify_source_exports(
1968                &*database,
1969                &mut client,
1970                &retrieved_source_references,
1971                &requested_references,
1972                &qualified_text_columns,
1973                &qualified_exclude_columns,
1974                &unresolved_source_name,
1975                timeout,
1976                &SourceReferencePolicy::Required,
1977            )
1978            .await?;
1979
1980            // There should be exactly one source_export returned for this statement
1981            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1982            purified_export
1983        }
1984        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1985            let reference_client = SourceReferenceClient::LoadGenerator {
1986                generator: &load_gen_connection.load_generator,
1987            };
1988            retrieved_source_references = reference_client.get_source_references().await?;
1989
1990            let requested_exports = retrieved_source_references
1991                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1992            // There should be exactly one source_export returned
1993            let export = requested_exports.into_element();
1994            PurifiedSourceExport {
1995                external_reference: export.external_reference,
1996                details: PurifiedExportDetails::LoadGenerator {
1997                    table: export
1998                        .meta
1999                        .load_generator_desc()
2000                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
2001                        .clone(),
2002                    output: export
2003                        .meta
2004                        .load_generator_output()
2005                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
2006                        .clone(),
2007                },
2008            }
2009        }
2010        GenericSourceConnection::Kafka(kafka_conn) => {
2011            let reference_client = SourceReferenceClient::Kafka {
2012                topic: &kafka_conn.topic,
2013            };
2014            retrieved_source_references = reference_client.get_source_references().await?;
2015            let requested_exports = retrieved_source_references
2016                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2017            // There should be exactly one source_export returned
2018            let export = requested_exports.into_element();
2019
2020            format_options = SourceFormatOptions::Kafka {
2021                topic: kafka_conn.topic.clone(),
2022            };
2023            PurifiedSourceExport {
2024                external_reference: export.external_reference,
2025                details: PurifiedExportDetails::Kafka {},
2026            }
2027        }
2028    };
2029
2030    purify_source_format(
2031        &catalog,
2032        format,
2033        &format_options,
2034        envelope,
2035        storage_configuration,
2036    )
2037    .await?;
2038
2039    // Update the external reference in the statement to the resolved fully-qualified
2040    // external reference
2041    *external_reference = Some(purified_export.external_reference.clone());
2042
2043    // Update options in the statement using the purified export details
2044    match &purified_export.details {
2045        PurifiedExportDetails::Postgres { .. } => {
2046            let mut unsupported_cols = vec![];
2047            let postgres::PostgresExportStatementValues {
2048                columns: gen_columns,
2049                constraints: gen_constraints,
2050                text_columns: gen_text_columns,
2051                exclude_columns: gen_exclude_columns,
2052                details: gen_details,
2053                external_reference: _,
2054            } = postgres::generate_source_export_statement_values(
2055                &scx,
2056                purified_export,
2057                &mut unsupported_cols,
2058            )?;
2059            if !unsupported_cols.is_empty() {
2060                unsupported_cols.sort();
2061                Err(PgSourcePurificationError::UnrecognizedTypes {
2062                    cols: unsupported_cols,
2063                })?;
2064            }
2065
2066            if let Some(text_cols_option) = with_options
2067                .iter_mut()
2068                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2069            {
2070                if let Some(gen_text_columns) = gen_text_columns {
2071                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2072                }
2073            }
2074            if let Some(exclude_cols_option) = with_options
2075                .iter_mut()
2076                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2077            {
2078                if let Some(gen_exclude_columns) = gen_exclude_columns {
2079                    exclude_cols_option.value =
2080                        Some(WithOptionValue::Sequence(gen_exclude_columns));
2081                }
2082            }
2083            match columns {
2084                TableFromSourceColumns::Defined(_) => {
2085                    // Purification is what populates the `Defined` variant;
2086                    // reaching this is an internal invariant violation.
2087                    bail_internal!(
2088                        "column definitions cannot be explicitly set for this source type"
2089                    )
2090                }
2091                TableFromSourceColumns::NotSpecified => {
2092                    *columns = TableFromSourceColumns::Defined(gen_columns);
2093                    *constraints = gen_constraints;
2094                }
2095                TableFromSourceColumns::Named(_) => {
2096                    sql_bail!("columns cannot be named for Postgres sources")
2097                }
2098            }
2099            with_options.push(TableFromSourceOption {
2100                name: TableFromSourceOptionName::Details,
2101                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2102                    gen_details.into_proto().encode_to_vec(),
2103                )))),
2104            })
2105        }
2106        PurifiedExportDetails::MySql { .. } => {
2107            let mysql::MySqlExportStatementValues {
2108                columns: gen_columns,
2109                constraints: gen_constraints,
2110                text_columns: gen_text_columns,
2111                exclude_columns: gen_exclude_columns,
2112                details: gen_details,
2113                external_reference: _,
2114            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2115
2116            if let Some(text_cols_option) = with_options
2117                .iter_mut()
2118                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2119            {
2120                if let Some(gen_text_columns) = gen_text_columns {
2121                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2122                }
2123            }
2124            if let Some(exclude_cols_option) = with_options
2125                .iter_mut()
2126                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2127            {
2128                if let Some(gen_exclude_columns) = gen_exclude_columns {
2129                    exclude_cols_option.value =
2130                        Some(WithOptionValue::Sequence(gen_exclude_columns));
2131                }
2132            }
2133            match columns {
2134                TableFromSourceColumns::Defined(_) => {
2135                    // Purification is what populates the `Defined` variant;
2136                    // reaching this is an internal invariant violation.
2137                    bail_internal!(
2138                        "column definitions cannot be explicitly set for this source type"
2139                    )
2140                }
2141                TableFromSourceColumns::NotSpecified => {
2142                    *columns = TableFromSourceColumns::Defined(gen_columns);
2143                    *constraints = gen_constraints;
2144                }
2145                TableFromSourceColumns::Named(_) => {
2146                    sql_bail!("columns cannot be named for MySQL sources")
2147                }
2148            }
2149            with_options.push(TableFromSourceOption {
2150                name: TableFromSourceOptionName::Details,
2151                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2152                    gen_details.into_proto().encode_to_vec(),
2153                )))),
2154            })
2155        }
2156        PurifiedExportDetails::SqlServer { .. } => {
2157            let sql_server::SqlServerExportStatementValues {
2158                columns: gen_columns,
2159                constraints: gen_constraints,
2160                text_columns: gen_text_columns,
2161                excl_columns: gen_excl_columns,
2162                details: gen_details,
2163                external_reference: _,
2164            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2165
2166            if let Some(text_cols_option) = with_options
2167                .iter_mut()
2168                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2169            {
2170                if let Some(gen_text_columns) = gen_text_columns {
2171                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2172                }
2173            }
2174            if let Some(exclude_cols_option) = with_options
2175                .iter_mut()
2176                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2177            {
2178                if let Some(gen_excl_columns) = gen_excl_columns {
2179                    exclude_cols_option.value = Some(WithOptionValue::Sequence(gen_excl_columns));
2180                }
2181            }
2182
2183            match columns {
2184                TableFromSourceColumns::NotSpecified => {
2185                    *columns = TableFromSourceColumns::Defined(gen_columns);
2186                    *constraints = gen_constraints;
2187                }
2188                TableFromSourceColumns::Named(_) => {
2189                    sql_bail!("columns cannot be named for SQL Server sources")
2190                }
2191                TableFromSourceColumns::Defined(_) => {
2192                    // Purification is what populates the `Defined` variant;
2193                    // reaching this is an internal invariant violation.
2194                    bail_internal!(
2195                        "column definitions cannot be explicitly set for this source type"
2196                    )
2197                }
2198            }
2199
2200            with_options.push(TableFromSourceOption {
2201                name: TableFromSourceOptionName::Details,
2202                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2203                    gen_details.into_proto().encode_to_vec(),
2204                )))),
2205            })
2206        }
2207        PurifiedExportDetails::LoadGenerator { .. } => {
2208            let (desc, output) = match purified_export.details {
2209                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2210                _ => bail_internal!("purified export details must be load generator"),
2211            };
2212            // We only determine the table description for multi-output load generator sources here,
2213            // whereas single-output load generators will have their relation description
2214            // determined during statment planning as envelope and format options may affect their
2215            // schema.
2216            if let Some(desc) = desc {
2217                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2218                match columns {
2219                    // Purification is what populates the `Defined` variant;
2220                    // reaching this is an internal invariant violation.
2221                    TableFromSourceColumns::Defined(_) => bail_internal!(
2222                        "column definitions cannot be explicitly set for this source type"
2223                    ),
2224                    TableFromSourceColumns::NotSpecified => {
2225                        *columns = TableFromSourceColumns::Defined(gen_columns);
2226                        *constraints = gen_constraints;
2227                    }
2228                    TableFromSourceColumns::Named(_) => {
2229                        sql_bail!("columns cannot be named for multi-output load generator sources")
2230                    }
2231                }
2232            }
2233            let details = SourceExportStatementDetails::LoadGenerator { output };
2234            with_options.push(TableFromSourceOption {
2235                name: TableFromSourceOptionName::Details,
2236                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2237                    details.into_proto().encode_to_vec(),
2238                )))),
2239            })
2240        }
2241        PurifiedExportDetails::Kafka {} => {
2242            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2243            // format field, so we don't specify any columns or constraints to be stored
2244            // on the statement here. The RelationDesc will be determined during planning.
2245            let details = SourceExportStatementDetails::Kafka {};
2246            with_options.push(TableFromSourceOption {
2247                name: TableFromSourceOptionName::Details,
2248                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2249                    details.into_proto().encode_to_vec(),
2250                )))),
2251            })
2252        }
2253    };
2254
2255    // TODO: We might as well use the retrieved available references to update the source
2256    // available references table in the catalog, so plumb this through.
2257    // available_source_references: retrieved_source_references.available_source_references(),
2258    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2259}
2260
2261enum SourceFormatOptions {
2262    Default,
2263    Kafka { topic: String },
2264}
2265
2266async fn purify_source_format(
2267    catalog: &dyn SessionCatalog,
2268    format: &mut Option<FormatSpecifier<Aug>>,
2269    options: &SourceFormatOptions,
2270    envelope: &Option<SourceEnvelope>,
2271    storage_configuration: &StorageConfiguration,
2272) -> Result<(), PlanError> {
2273    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2274        && !matches!(options, SourceFormatOptions::Kafka { .. })
2275    {
2276        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2277    }
2278
2279    match format.as_mut() {
2280        None => {}
2281        Some(FormatSpecifier::Bare(format)) => {
2282            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2283                .await?;
2284        }
2285
2286        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2287            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2288                .await?;
2289            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2290                .await?;
2291        }
2292    }
2293    Ok(())
2294}
2295
2296async fn purify_source_format_single(
2297    catalog: &dyn SessionCatalog,
2298    format: &mut Format<Aug>,
2299    options: &SourceFormatOptions,
2300    envelope: &Option<SourceEnvelope>,
2301    storage_configuration: &StorageConfiguration,
2302) -> Result<(), PlanError> {
2303    match format {
2304        Format::Avro(schema) => match schema {
2305            AvroSchema::Csr { csr_connection } => {
2306                purify_csr_connection_avro(
2307                    catalog,
2308                    options,
2309                    csr_connection,
2310                    envelope,
2311                    storage_configuration,
2312                )
2313                .await?
2314            }
2315            AvroSchema::InlineSchema { .. } => {}
2316            AvroSchema::Glue {
2317                connection,
2318                with_options,
2319                seed,
2320            } => {
2321                purify_glue_connection_avro(
2322                    catalog,
2323                    options,
2324                    connection,
2325                    with_options,
2326                    seed,
2327                    storage_configuration,
2328                )
2329                .await?
2330            }
2331        },
2332        Format::Protobuf(schema) => match schema {
2333            ProtobufSchema::Csr { csr_connection } => {
2334                purify_csr_connection_proto(
2335                    catalog,
2336                    options,
2337                    csr_connection,
2338                    envelope,
2339                    storage_configuration,
2340                )
2341                .await?;
2342            }
2343            ProtobufSchema::InlineSchema { .. } => {}
2344        },
2345        Format::Bytes
2346        | Format::Regex(_)
2347        | Format::Json { .. }
2348        | Format::Text
2349        | Format::Csv { .. } => (),
2350    }
2351    Ok(())
2352}
2353
2354pub fn generate_subsource_statements(
2355    scx: &StatementContext,
2356    source_name: ResolvedItemName,
2357    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2358) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2359    // get the first subsource to determine the connection type
2360    if subsources.is_empty() {
2361        return Ok(vec![]);
2362    }
2363    let (_, purified_export) = subsources
2364        .iter()
2365        .next()
2366        .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2367
2368    let statements = match &purified_export.details {
2369        PurifiedExportDetails::Postgres { .. } => {
2370            crate::pure::postgres::generate_create_subsource_statements(
2371                scx,
2372                source_name,
2373                subsources,
2374            )?
2375        }
2376        PurifiedExportDetails::MySql { .. } => {
2377            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2378        }
2379        PurifiedExportDetails::SqlServer { .. } => {
2380            crate::pure::sql_server::generate_create_subsource_statements(
2381                scx,
2382                source_name,
2383                subsources,
2384            )?
2385        }
2386        PurifiedExportDetails::LoadGenerator { .. } => {
2387            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2388            for (subsource_name, purified_export) in subsources {
2389                let (desc, output) = match purified_export.details {
2390                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2391                    _ => {
2392                        bail_internal!("purified export details must be load generator")
2393                    }
2394                };
2395                let desc = desc.ok_or_else(|| {
2396                    internal_err!(
2397                        "subsources cannot be generated for single-output load generators"
2398                    )
2399                })?;
2400
2401                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2402                let details = SourceExportStatementDetails::LoadGenerator { output };
2403                // Create the subsource statement
2404                let subsource = CreateSubsourceStatement {
2405                    name: subsource_name,
2406                    columns,
2407                    of_source: Some(source_name.clone()),
2408                    // unlike sources that come from an external upstream, we
2409                    // have more leniency to introduce different constraints
2410                    // every time the load generator is run; i.e. we are not as
2411                    // worried about introducing junk data.
2412                    constraints: table_constraints,
2413                    if_not_exists: false,
2414                    with_options: vec![
2415                        CreateSubsourceOption {
2416                            name: CreateSubsourceOptionName::ExternalReference,
2417                            value: Some(WithOptionValue::UnresolvedItemName(
2418                                purified_export.external_reference,
2419                            )),
2420                        },
2421                        CreateSubsourceOption {
2422                            name: CreateSubsourceOptionName::Details,
2423                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2424                                details.into_proto().encode_to_vec(),
2425                            )))),
2426                        },
2427                    ],
2428                };
2429                subsource_stmts.push(subsource);
2430            }
2431
2432            subsource_stmts
2433        }
2434        PurifiedExportDetails::Kafka { .. } => {
2435            // TODO: as part of database-issues#8322, Kafka sources will begin
2436            // producing data––we'll need to understand the schema
2437            // of the output here.
2438            if !subsources.is_empty() {
2439                bail_internal!("Kafka sources do not produce data-bearing subsources");
2440            }
2441            vec![]
2442        }
2443    };
2444    Ok(statements)
2445}
2446
2447async fn purify_csr_connection_proto(
2448    catalog: &dyn SessionCatalog,
2449    options: &SourceFormatOptions,
2450    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2451    envelope: &Option<SourceEnvelope>,
2452    storage_configuration: &StorageConfiguration,
2453) -> Result<(), PlanError> {
2454    let SourceFormatOptions::Kafka { topic } = options else {
2455        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2456    };
2457
2458    let CsrConnectionProtobuf {
2459        seed,
2460        connection: CsrConnection {
2461            connection,
2462            options: _,
2463        },
2464    } = csr_connection;
2465    match seed {
2466        None => {
2467            let scx = StatementContext::new(None, &*catalog);
2468
2469            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2470                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2471                _ => sql_bail!("{} is not a schema registry connection", connection),
2472            };
2473
2474            let ccsr_client = ccsr_connection
2475                .connect(storage_configuration, InTask::No)
2476                .await
2477                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2478
2479            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2480            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2481                .await
2482                .ok();
2483
2484            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2485                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2486            }
2487
2488            *seed = Some(CsrSeedProtobuf { value, key });
2489        }
2490        Some(_) => (),
2491    }
2492
2493    Ok(())
2494}
2495
2496async fn purify_csr_connection_avro(
2497    catalog: &dyn SessionCatalog,
2498    options: &SourceFormatOptions,
2499    csr_connection: &mut CsrConnectionAvro<Aug>,
2500    envelope: &Option<SourceEnvelope>,
2501    storage_configuration: &StorageConfiguration,
2502) -> Result<(), PlanError> {
2503    let SourceFormatOptions::Kafka { topic } = options else {
2504        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2505    };
2506
2507    let CsrConnectionAvro {
2508        connection: CsrConnection { connection, .. },
2509        seed,
2510        key_strategy,
2511        value_strategy,
2512    } = csr_connection;
2513    if seed.is_none() {
2514        let scx = StatementContext::new(None, &*catalog);
2515        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2516            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2517            _ => sql_bail!("{} is not a schema registry connection", connection),
2518        };
2519        let ccsr_client = csr_connection
2520            .connect(storage_configuration, InTask::No)
2521            .await
2522            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2523
2524        let Schema {
2525            key_schema,
2526            value_schema,
2527            key_reference_schemas,
2528            value_reference_schemas,
2529        } = get_remote_csr_schema(
2530            &ccsr_client,
2531            key_strategy.clone().unwrap_or_default(),
2532            value_strategy.clone().unwrap_or_default(),
2533            topic,
2534        )
2535        .await?;
2536        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2537            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2538        }
2539
2540        *seed = Some(CsrSeedAvro {
2541            key_schema,
2542            value_schema,
2543            key_reference_schemas,
2544            value_reference_schemas,
2545        })
2546    }
2547
2548    Ok(())
2549}
2550
2551async fn purify_glue_connection_avro(
2552    catalog: &dyn SessionCatalog,
2553    options: &SourceFormatOptions,
2554    connection: &ResolvedItemName,
2555    with_options: &[GlueAvroOption<Aug>],
2556    seed: &mut Option<GlueAvroSeed>,
2557    storage_configuration: &StorageConfiguration,
2558) -> Result<(), PlanError> {
2559    use crate::pure::error::GluePurificationError;
2560    let SourceFormatOptions::Kafka { .. } = options else {
2561        sql_bail!("AWS Glue Schema Registry is only supported with Kafka sources")
2562    };
2563
2564    let scx = StatementContext::new(None, &*catalog);
2565    let item = scx.get_item_by_resolved_name(connection)?;
2566    let full_name = scx.catalog.resolve_full_name(item.name());
2567    // Structural validation below runs unconditionally — even when a seed is
2568    // already present. The seed is not proof the rest of the statement is
2569    // well-formed: the grammar accepts a user-written `SEED VALUE SCHEMA`, so a
2570    // seeded statement may have arrived straight from a user rather than from
2571    // our own re-parsed `create_sql`. Only the Glue *fetch* is safe to skip
2572    // when seeded; the connection-type and `SCHEMA NAME` checks are not.
2573    let Connection::GlueSchemaRegistry(gsr_connection) = item.connection()? else {
2574        return Err(GluePurificationError::NotGlueConnection(full_name).into());
2575    };
2576
2577    // Pull `SCHEMA NAME` out of the option bag. Required. Use the shared
2578    // extractor (rather than a hand-rolled match) so non-string values get a
2579    // proper "invalid value" error instead of being silently dropped.
2580    let crate::plan::statement::ddl::GlueAvroOptionExtracted { schema_name, .. } =
2581        with_options.to_vec().try_into()?;
2582    let schema_name = schema_name.ok_or(GluePurificationError::MissingSchemaName)?;
2583
2584    if seed.is_some() {
2585        // The schema is already pinned — either we purified this statement
2586        // before and re-parsed it from persisted create_sql, or a user supplied
2587        // the seed directly. Either way the reader schema is fixed, so skip the
2588        // Glue lookup. (The structural checks above still ran.)
2589        return Ok(());
2590    }
2591    let gsr_connection = gsr_connection.into_inline_connection(catalog);
2592
2593    // Build the SDK config the same way the storage decoder does at runtime
2594    // — same auth, region, and endpoint override resolution.
2595    let enforce_external_addresses = mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
2596        .get(storage_configuration.config_set());
2597    let sdk_config = gsr_connection
2598        .aws_connection
2599        .connection
2600        .load_sdk_config(
2601            &storage_configuration.connection_context,
2602            gsr_connection.aws_connection.connection_id,
2603            // We are in a normal tokio context during purification.
2604            InTask::No,
2605            enforce_external_addresses,
2606        )
2607        .await
2608        .map_err(|e| GluePurificationError::LoadSdkConfigError(Arc::new(e)))?;
2609    let glue_client = mz_aws_glue_schema_registry::ClientConfig::new(sdk_config).build();
2610
2611    let version = glue_client
2612        .get_schema_version_latest_by_name(&gsr_connection.registry_name, &schema_name)
2613        .await
2614        .map_err(|e| GluePurificationError::SchemaLookupError {
2615            registry: gsr_connection.registry_name.clone(),
2616            schema: schema_name.clone(),
2617            cause: Arc::new(e),
2618        })?;
2619    // The runtime decode path only handles Avro; reject other formats at
2620    // planning time so the failure mode is a clear SQL error, not a
2621    // permanent decode error on every record.
2622    match &version.data_format {
2623        Some(mz_aws_glue_schema_registry::DataFormat::Avro) => {}
2624        other => {
2625            return Err(GluePurificationError::UnsupportedDataFormat {
2626                registry: gsr_connection.registry_name.clone(),
2627                schema: schema_name.clone(),
2628                format: other
2629                    .as_ref()
2630                    .map(|f| f.as_str().to_string())
2631                    .unwrap_or_else(|| "<unspecified>".to_string()),
2632            }
2633            .into());
2634        }
2635    }
2636    let value_schema =
2637        version
2638            .definition
2639            .ok_or_else(|| GluePurificationError::EmptyDefinition {
2640                registry: gsr_connection.registry_name.clone(),
2641                schema: schema_name.clone(),
2642            })?;
2643
2644    *seed = Some(GlueAvroSeed { value_schema });
2645    Ok(())
2646}
2647
2648#[derive(Debug)]
2649pub struct Schema {
2650    pub key_schema: Option<String>,
2651    pub value_schema: String,
2652    /// Reference schemas for the key schema, in dependency order.
2653    pub key_reference_schemas: Vec<String>,
2654    /// Reference schemas for the value schema, in dependency order.
2655    pub value_reference_schemas: Vec<String>,
2656}
2657
2658/// Result of fetching a schema, including any referenced schemas.
2659struct SchemaWithReferences {
2660    /// The primary schema.
2661    schema: String,
2662    /// Reference schemas in dependency order.
2663    references: Vec<String>,
2664}
2665
2666async fn get_schema_with_strategy(
2667    client: &Client,
2668    strategy: ReaderSchemaSelectionStrategy,
2669    subject: &str,
2670) -> Result<Option<SchemaWithReferences>, PlanError> {
2671    match strategy {
2672        ReaderSchemaSelectionStrategy::Latest => {
2673            // Use get_subject_and_references to also fetch referenced schemas
2674            match client.get_subject_and_references(subject).await {
2675                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2676                    schema: primary.schema.raw,
2677                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2678                })),
2679                Err(GetBySubjectError::SubjectNotFound)
2680                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2681                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2682                    schema_lookup: format!("subject {}", subject.quoted()),
2683                    cause: Arc::new(e),
2684                }),
2685            }
2686        }
2687        // It's possible that CSR was provided with an inline strategy, but without a subject
2688        // or schema id to look up, there isn't a clean way to lookup references for the schema.
2689        // This could be done with extra work, but at this point, it isn't clear this is needed.
2690        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2691            schema: raw,
2692            references: vec![],
2693        })),
2694        ReaderSchemaSelectionStrategy::ById(id) => {
2695            match client.get_subject_and_references_by_id(id).await {
2696                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2697                    schema: primary.schema.raw,
2698                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2699                })),
2700                Err(GetBySubjectError::SubjectNotFound)
2701                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2702                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2703                    schema_lookup: format!("subject {}", subject.quoted()),
2704                    cause: Arc::new(e),
2705                }),
2706            }
2707        }
2708    }
2709}
2710
2711async fn get_remote_csr_schema(
2712    ccsr_client: &mz_ccsr::Client,
2713    key_strategy: ReaderSchemaSelectionStrategy,
2714    value_strategy: ReaderSchemaSelectionStrategy,
2715    topic: &str,
2716) -> Result<Schema, PlanError> {
2717    let value_schema_name = format!("{}-value", topic);
2718    let value_result =
2719        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2720    let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2721
2722    let key_subject = format!("{}-key", topic);
2723    let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2724    Ok(Schema {
2725        key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2726        value_schema: value_result.schema,
2727        key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2728        value_reference_schemas: value_result.references,
2729    })
2730}
2731
2732/// Collect protobuf message descriptor from CSR and compile the descriptor.
2733async fn compile_proto(
2734    subject_name: &String,
2735    ccsr_client: &Client,
2736) -> Result<CsrSeedProtobufSchema, PlanError> {
2737    let (primary_subject, dependency_subjects) = ccsr_client
2738        .get_subject_and_references(subject_name)
2739        .await
2740        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2741            schema_lookup: format!("subject {}", subject_name.quoted()),
2742            cause: Arc::new(e),
2743        })?;
2744
2745    // Compile .proto files into a file descriptor set.
2746    let mut source_tree = VirtualSourceTree::new();
2747
2748    // Add well-known types (e.g., google/protobuf/timestamp.proto) to the source
2749    // tree. These are implicitly available to protoc but are typically not
2750    // registered in the schema registry.
2751    source_tree.as_mut().map_well_known_types();
2752
2753    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2754        source_tree.as_mut().add_file(
2755            Path::new(&subject.name),
2756            subject.schema.raw.as_bytes().to_vec(),
2757        );
2758    }
2759    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2760    let fds = db
2761        .as_mut()
2762        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2763        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2764
2765    // Ensure there is exactly one message in the file.
2766    let primary_fd = fds.file(0);
2767    let message_name = match primary_fd.message_type_size() {
2768        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2769        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2770        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2771    };
2772
2773    // Encode the file descriptor set into a SQL byte string.
2774    let bytes = &fds
2775        .serialize()
2776        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2777    let mut schema = String::new();
2778    strconv::format_bytes(&mut schema, bytes);
2779
2780    Ok(CsrSeedProtobufSchema {
2781        schema,
2782        message_name,
2783    })
2784}
2785
2786const MZ_NOW_NAME: &str = "mz_now";
2787const MZ_NOW_SCHEMA: &str = "mz_catalog";
2788
2789/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2790/// references to ids appear or disappear during the purification.
2791///
2792/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2793/// this function is not making any network calls.
2794pub fn purify_create_materialized_view_options(
2795    catalog: impl SessionCatalog,
2796    mz_now: Option<Timestamp>,
2797    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2798    resolved_ids: &mut ResolvedIds,
2799) {
2800    // 0. Preparations:
2801    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2802    let (mz_now_id, mz_now_expr) = {
2803        let item = catalog
2804            .resolve_function(&PartialItemName {
2805                database: None,
2806                schema: Some(MZ_NOW_SCHEMA.to_string()),
2807                item: MZ_NOW_NAME.to_string(),
2808            })
2809            .expect("we should be able to resolve mz_now");
2810        (
2811            item.id(),
2812            Expr::Function(Function {
2813                name: ResolvedItemName::Item {
2814                    id: item.id(),
2815                    qualifiers: item.name().qualifiers.clone(),
2816                    full_name: catalog.resolve_full_name(item.name()),
2817                    print_id: false,
2818                    version: RelationVersionSelector::Latest,
2819                },
2820                args: FunctionArgs::Args {
2821                    args: Vec::new(),
2822                    order_by: Vec::new(),
2823                },
2824                filter: None,
2825                over: None,
2826                distinct: false,
2827            }),
2828        )
2829    };
2830    // Prepare the `mz_timestamp` type.
2831    let (mz_timestamp_id, mz_timestamp_type) = {
2832        let item = catalog.get_system_type("mz_timestamp");
2833        let full_name = catalog.resolve_full_name(item.name());
2834        (
2835            item.id(),
2836            ResolvedDataType::Named {
2837                id: item.id(),
2838                qualifiers: item.name().qualifiers.clone(),
2839                full_name,
2840                modifiers: vec![],
2841                print_id: true,
2842            },
2843        )
2844    };
2845
2846    let mut introduced_mz_timestamp = false;
2847
2848    for option in cmvs.with_options.iter_mut() {
2849        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2850        if matches!(
2851            option.value,
2852            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2853        ) {
2854            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2855                RefreshAtOptionValue {
2856                    time: mz_now_expr.clone(),
2857                },
2858            )));
2859        }
2860
2861        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2862        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2863            RefreshEveryOptionValue { aligned_to, .. },
2864        ))) = &mut option.value
2865        {
2866            if aligned_to.is_none() {
2867                *aligned_to = Some(mz_now_expr.clone());
2868            }
2869        }
2870
2871        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2872        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2873        match &mut option.value {
2874            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2875                time,
2876            }))) => {
2877                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2878                visitor.visit_expr_mut(time);
2879                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2880            }
2881            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2882                RefreshEveryOptionValue {
2883                    interval: _,
2884                    aligned_to: Some(aligned_to),
2885                },
2886            ))) => {
2887                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2888                visitor.visit_expr_mut(aligned_to);
2889                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2890            }
2891            _ => {}
2892        }
2893    }
2894
2895    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2896    if !cmvs.with_options.iter().any(|o| {
2897        matches!(
2898            o,
2899            MaterializedViewOption {
2900                value: Some(WithOptionValue::Refresh(..)),
2901                ..
2902            }
2903        )
2904    }) {
2905        cmvs.with_options.push(MaterializedViewOption {
2906            name: MaterializedViewOptionName::Refresh,
2907            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2908        })
2909    }
2910
2911    // 5. Attend to `resolved_ids`: The purification might have
2912    // - added references to `mz_timestamp`;
2913    // - removed references to `mz_now`.
2914    if introduced_mz_timestamp {
2915        resolved_ids.add_item(mz_timestamp_id);
2916    }
2917    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2918    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2919    // for `mz_now()` everywhere.
2920    let mut visitor = ExprContainsTemporalVisitor::new();
2921    visitor.visit_create_materialized_view_statement(cmvs);
2922    if !visitor.contains_temporal {
2923        resolved_ids.remove_item(&mz_now_id);
2924    }
2925}
2926
2927/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2928/// after purification.
2929pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2930    match &mvo.value {
2931        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2932            let mut visitor = ExprContainsTemporalVisitor::new();
2933            visitor.visit_expr(time);
2934            visitor.contains_temporal
2935        }
2936        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2937            interval: _,
2938            aligned_to: Some(aligned_to),
2939        }))) => {
2940            let mut visitor = ExprContainsTemporalVisitor::new();
2941            visitor.visit_expr(aligned_to);
2942            visitor.contains_temporal
2943        }
2944        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2945            interval: _,
2946            aligned_to: None,
2947        }))) => {
2948            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2949            // `ALIGNED TO` to `mz_now()`.
2950            true
2951        }
2952        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2953            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2954            true
2955        }
2956        _ => false,
2957    }
2958}
2959
2960/// Determines whether the AST involves `mz_now()`.
2961struct ExprContainsTemporalVisitor {
2962    pub contains_temporal: bool,
2963}
2964
2965impl ExprContainsTemporalVisitor {
2966    pub fn new() -> ExprContainsTemporalVisitor {
2967        ExprContainsTemporalVisitor {
2968            contains_temporal: false,
2969        }
2970    }
2971}
2972
2973impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2974    fn visit_function(&mut self, func: &Function<Aug>) {
2975        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2976        visit_function(self, func);
2977    }
2978}
2979
2980struct MzNowPurifierVisitor {
2981    pub mz_now: Option<Timestamp>,
2982    pub mz_timestamp_type: ResolvedDataType,
2983    pub introduced_mz_timestamp: bool,
2984}
2985
2986impl MzNowPurifierVisitor {
2987    pub fn new(
2988        mz_now: Option<Timestamp>,
2989        mz_timestamp_type: ResolvedDataType,
2990    ) -> MzNowPurifierVisitor {
2991        MzNowPurifierVisitor {
2992            mz_now,
2993            mz_timestamp_type,
2994            introduced_mz_timestamp: false,
2995        }
2996    }
2997}
2998
2999impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
3000    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
3001        match expr {
3002            Expr::Function(Function {
3003                name:
3004                    ResolvedItemName::Item {
3005                        full_name: FullItemName { item, .. },
3006                        ..
3007                    },
3008                ..
3009            }) if item == &MZ_NOW_NAME.to_string() => {
3010                let mz_now = self.mz_now.expect(
3011                    "we should have chosen a timestamp if the expression contains mz_now()",
3012                );
3013                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
3014                // not alter the type of the expression.
3015                *expr = Expr::Cast {
3016                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
3017                    data_type: self.mz_timestamp_type.clone(),
3018                };
3019                self.introduced_mz_timestamp = true;
3020            }
3021            _ => visit_expr_mut(self, expr),
3022        }
3023    }
3024}