Skip to main content

mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81    ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::pure::mysql::{ensure_binlog_full_metadata, is_binlog_full_metadata};
88use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
89use crate::{kafka_util, normalize};
90
91use self::error::{
92    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
93    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
94};
95
96pub(crate) mod error;
97mod references;
98
99pub mod mysql;
100pub mod postgres;
101pub mod sql_server;
102
103pub(crate) struct RequestedSourceExport<T> {
104    external_reference: UnresolvedItemName,
105    name: UnresolvedItemName,
106    meta: T,
107}
108
109impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        f.debug_struct("RequestedSourceExport")
112            .field("external_reference", &self.external_reference)
113            .field("name", &self.name)
114            .field("meta", &self.meta)
115            .finish()
116    }
117}
118
119impl<T> RequestedSourceExport<T> {
120    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
121        RequestedSourceExport {
122            external_reference: self.external_reference,
123            name: self.name,
124            meta: new_meta,
125        }
126    }
127}
128
129/// Generates a subsource name by prepending source schema name if present
130///
131/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
132/// so that it's generated in the same schema as source
133fn source_export_name_gen(
134    source_name: &UnresolvedItemName,
135    subsource_name: &str,
136) -> Result<UnresolvedItemName, PlanError> {
137    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
138    partial.item = subsource_name.to_string();
139    Ok(UnresolvedItemName::from(partial))
140}
141
142// TODO(database-issues#8620): Remove once subsources are removed
143/// Validates the requested subsources do not have name conflicts with each other
144/// and that the same upstream table is not referenced multiple times.
145fn validate_source_export_names<T>(
146    requested_source_exports: &[RequestedSourceExport<T>],
147) -> Result<(), PlanError> {
148    // This condition would get caught during the catalog transaction, but produces a
149    // vague, non-contextual error. Instead, error here so we can suggest to the user
150    // how to fix the problem.
151    if let Some(name) = requested_source_exports
152        .iter()
153        .map(|subsource| &subsource.name)
154        .duplicates()
155        .next()
156        .cloned()
157    {
158        let mut upstream_references: Vec<_> = requested_source_exports
159            .into_iter()
160            .filter_map(|subsource| {
161                if &subsource.name == &name {
162                    Some(subsource.external_reference.clone())
163                } else {
164                    None
165                }
166            })
167            .collect();
168
169        upstream_references.sort();
170
171        Err(PlanError::SubsourceNameConflict {
172            name,
173            upstream_references,
174        })?;
175    }
176
177    // We disallow subsource statements from referencing the same upstream table, but we allow
178    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
179    // purification will only provide 1 `requested_source_export`, we can leave this here without
180    // needing to differentiate between the two types of statements.
181    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
182    if let Some(name) = requested_source_exports
183        .iter()
184        .map(|export| &export.external_reference)
185        .duplicates()
186        .next()
187        .cloned()
188    {
189        let mut target_names: Vec<_> = requested_source_exports
190            .into_iter()
191            .filter_map(|export| {
192                if &export.external_reference == &name {
193                    Some(export.name.clone())
194                } else {
195                    None
196                }
197            })
198            .collect();
199
200        target_names.sort();
201
202        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
203    }
204
205    Ok(())
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub enum PurifiedStatement {
210    PurifiedCreateSource {
211        // The progress subsource, if we are offloading progress info to a separate relation
212        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
213        create_source_stmt: CreateSourceStatement<Aug>,
214        // Map of subsource names to external details
215        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
216        /// All the available upstream references that can be added as tables
217        /// to this primary source.
218        available_source_references: SourceReferences,
219    },
220    PurifiedAlterSource {
221        alter_source_stmt: AlterSourceStatement<Aug>,
222    },
223    PurifiedAlterSourceAddSubsources {
224        // This just saves us an annoying catalog lookup
225        source_name: ResolvedItemName,
226        /// Options that we will need the values of to update the source's
227        /// definition.
228        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
229        // Map of subsource names to external details
230        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
231    },
232    PurifiedAlterSourceRefreshReferences {
233        source_name: ResolvedItemName,
234        /// The updated available upstream references for the primary source.
235        available_source_references: SourceReferences,
236    },
237    PurifiedCreateSink(CreateSinkStatement<Aug>),
238    PurifiedCreateTableFromSource {
239        stmt: CreateTableFromSourceStatement<Aug>,
240    },
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct PurifiedSourceExport {
245    pub external_reference: UnresolvedItemName,
246    pub details: PurifiedExportDetails,
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
250pub enum PurifiedExportDetails {
251    MySql {
252        table: MySqlTableDesc,
253        text_columns: Option<Vec<Ident>>,
254        exclude_columns: Option<Vec<Ident>>,
255        initial_gtid_set: String,
256        binlog_full_metadata: bool,
257    },
258    Postgres {
259        table: PostgresTableDesc,
260        text_columns: Option<Vec<Ident>>,
261        exclude_columns: Option<Vec<Ident>>,
262    },
263    SqlServer {
264        table: SqlServerTableDesc,
265        text_columns: Option<Vec<Ident>>,
266        excl_columns: Option<Vec<Ident>>,
267        capture_instance: Arc<str>,
268        initial_lsn: mz_sql_server_util::cdc::Lsn,
269    },
270    Kafka {},
271    LoadGenerator {
272        table: Option<RelationDesc>,
273        output: LoadGeneratorOutput,
274    },
275}
276
277/// Purifies a statement, removing any dependencies on external state.
278///
279/// See the section on [purification](crate#purification) in the crate
280/// documentation for details.
281///
282/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
283/// handled by [purify_create_materialized_view_options] instead.
284/// This could be made more consistent by a refactoring discussed here:
285/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
286pub async fn purify_statement(
287    catalog: impl SessionCatalog,
288    now: u64,
289    stmt: Statement<Aug>,
290    storage_configuration: &StorageConfiguration,
291) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
292    match stmt {
293        Statement::CreateSource(stmt) => {
294            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
295            (
296                purify_create_source(catalog, now, stmt, storage_configuration).await,
297                cluster_id,
298            )
299        }
300        Statement::AlterSource(stmt) => (
301            purify_alter_source(catalog, stmt, storage_configuration).await,
302            None,
303        ),
304        Statement::CreateSink(stmt) => {
305            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
306            (
307                purify_create_sink(catalog, stmt, storage_configuration).await,
308                cluster_id,
309            )
310        }
311        Statement::CreateTableFromSource(stmt) => (
312            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
313            None,
314        ),
315        o => (
316            Err(internal_err!(
317                "unexpected statement type in purification: {:?}",
318                o
319            )),
320            None,
321        ),
322    }
323}
324
325/// Injects `DOC ON` comments into all Avro formats that are using a schema
326/// registry by finding all SQL `COMMENT`s that are attached to the sink's
327/// underlying materialized view (as well as any types referenced by that
328/// underlying materialized view).
329pub(crate) fn purify_create_sink_avro_doc_on_options(
330    catalog: &dyn SessionCatalog,
331    from_id: CatalogItemId,
332    format: &mut Option<FormatSpecifier<Aug>>,
333) -> Result<(), PlanError> {
334    // Collect all objects referenced by the sink.
335    let from = catalog.get_item(&from_id);
336    let object_ids = from
337        .references()
338        .items()
339        .copied()
340        .chain_one(from.id())
341        .collect::<Vec<_>>();
342
343    // Collect all Avro formats that use a schema registry, as well as a set of
344    // all identifiers named in user-provided `DOC ON` options.
345    let mut avro_format_options = vec![];
346    for_each_format(format, |doc_on_schema, fmt| match fmt {
347        Format::Avro(AvroSchema::InlineSchema { .. })
348        | Format::Bytes
349        | Format::Csv { .. }
350        | Format::Json { .. }
351        | Format::Protobuf(..)
352        | Format::Regex(..)
353        | Format::Text => (),
354        Format::Avro(AvroSchema::Csr {
355            csr_connection: CsrConnectionAvro { connection, .. },
356        }) => {
357            avro_format_options.push((doc_on_schema, &mut connection.options));
358        }
359    });
360
361    // For each Avro format in the sink, inject the appropriate `DOC ON` options
362    // for each item referenced by the sink.
363    for (for_schema, options) in avro_format_options {
364        let user_provided_comments = options
365            .iter()
366            .filter_map(|CsrConfigOption { name, .. }| match name {
367                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
368                _ => None,
369            })
370            .collect::<BTreeSet<_>>();
371
372        // Adding existing comments if not already provided by user
373        for object_id in &object_ids {
374            // Always add comments to the latest version of the item.
375            let item = catalog
376                .get_item(object_id)
377                .at_version(RelationVersionSelector::Latest);
378            let full_resolved_name = ResolvedItemName::Item {
379                id: *object_id,
380                qualifiers: item.name().qualifiers.clone(),
381                full_name: catalog.resolve_full_name(item.name()),
382                print_id: !matches!(
383                    item.item_type(),
384                    CatalogItemType::Func | CatalogItemType::Type
385                ),
386                version: RelationVersionSelector::Latest,
387            };
388
389            if let Some(comments_map) = catalog.get_item_comments(object_id) {
390                // Attach comment for the item itself, if the user has not
391                // already provided an overriding `DOC ON` option for the item.
392                let doc_on_item_key = AvroDocOn {
393                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
394                    for_schema,
395                };
396                if !user_provided_comments.contains(&doc_on_item_key) {
397                    if let Some(root_comment) = comments_map.get(&None) {
398                        options.push(CsrConfigOption {
399                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
400                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
401                                root_comment.clone(),
402                            ))),
403                        });
404                    }
405                }
406
407                // Attach comment for each column in the item, if the user has
408                // not already provided an overriding `DOC ON` option for the
409                // column.
410                let column_descs = match item.type_details() {
411                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
412                    None => item.relation_desc().map(|d| d.into_owned()),
413                };
414
415                if let Some(desc) = column_descs {
416                    for (pos, column_name) in desc.iter_names().enumerate() {
417                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
418                            let doc_on_column_key = AvroDocOn {
419                                identifier: DocOnIdentifier::Column(ColumnName {
420                                    relation: full_resolved_name.clone(),
421                                    column: ResolvedColumnReference::Column {
422                                        name: column_name.to_owned(),
423                                        index: pos,
424                                    },
425                                }),
426                                for_schema,
427                            };
428                            if !user_provided_comments.contains(&doc_on_column_key) {
429                                options.push(CsrConfigOption {
430                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
431                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
432                                        Value::String(comment_str.clone()),
433                                    )),
434                                });
435                            }
436                        }
437                    }
438                }
439            }
440        }
441    }
442
443    Ok(())
444}
445
446/// Checks that the sink described in the statement can connect to its external
447/// resources.
448async fn purify_create_sink(
449    catalog: impl SessionCatalog,
450    mut create_sink_stmt: CreateSinkStatement<Aug>,
451    storage_configuration: &StorageConfiguration,
452) -> Result<PurifiedStatement, PlanError> {
453    // General purification
454    let CreateSinkStatement {
455        connection,
456        format,
457        with_options,
458        name: _,
459        in_cluster: _,
460        if_not_exists: _,
461        from,
462        envelope: _,
463        mode: _,
464    } = &mut create_sink_stmt;
465
466    // The list of options that the user is allowed to specify.
467    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
468        CreateSinkOptionName::Snapshot,
469        CreateSinkOptionName::CommitInterval,
470    ];
471
472    if let Some(op) = with_options
473        .iter()
474        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
475    {
476        sql_bail!(
477            "CREATE SINK...WITH ({}..) is not allowed",
478            op.name.to_ast_string_simple(),
479        )
480    }
481
482    match &connection {
483        CreateSinkConnection::Kafka {
484            connection,
485            options,
486            key: _,
487            headers: _,
488        } => {
489            // We must not leave any state behind in the Kafka broker, so just ensure that
490            // we can connect. This means we don't ensure that we can create the topic and
491            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
492            // preferable to leaking state in users' environments.
493            let scx = StatementContext::new(None, &catalog);
494            let connection = {
495                let item = scx.get_item_by_resolved_name(connection)?;
496                // Get Kafka connection
497                match item.connection()? {
498                    Connection::Kafka(connection) => {
499                        connection.clone().into_inline_connection(scx.catalog)
500                    }
501                    _ => sql_bail!(
502                        "{} is not a kafka connection",
503                        scx.catalog.resolve_full_name(item.name())
504                    ),
505                }
506            };
507
508            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
509
510            if extracted_options.legacy_ids == Some(true) {
511                sql_bail!("LEGACY IDs option is not supported");
512            }
513
514            let client: AdminClient<_> = connection
515                .create_with_context(
516                    storage_configuration,
517                    MzClientContext::default(),
518                    &BTreeMap::new(),
519                    InTask::No,
520                )
521                .await
522                .map_err(|e| {
523                    // anyhow doesn't support Clone, so not trivial to move into PlanError
524                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
525                })?;
526
527            let metadata = client
528                .inner()
529                .fetch_metadata(
530                    None,
531                    storage_configuration
532                        .parameters
533                        .kafka_timeout_config
534                        .fetch_metadata_timeout,
535                )
536                .map_err(|e| {
537                    KafkaSinkPurificationError::AdminClientError(Arc::new(
538                        ContextCreationError::KafkaError(e),
539                    ))
540                })?;
541
542            if metadata.brokers().len() == 0 {
543                Err(KafkaSinkPurificationError::ZeroBrokers)?;
544            }
545        }
546        CreateSinkConnection::Iceberg {
547            connection,
548            aws_connection,
549            ..
550        } => {
551            let scx = StatementContext::new(None, &catalog);
552            let connection = {
553                let item = scx.get_item_by_resolved_name(connection)?;
554                // Get Iceberg connection
555                match item.connection()? {
556                    Connection::IcebergCatalog(connection) => {
557                        connection.clone().into_inline_connection(scx.catalog)
558                    }
559                    _ => sql_bail!(
560                        "{} is not an iceberg connection",
561                        scx.catalog.resolve_full_name(item.name())
562                    ),
563                }
564            };
565
566            let aws_conn_id = aws_connection.item_id();
567
568            let aws_connection = {
569                let item = scx.get_item_by_resolved_name(aws_connection)?;
570                // Get AWS connection
571                match item.connection()? {
572                    Connection::Aws(aws_connection) => aws_connection.clone(),
573                    _ => sql_bail!(
574                        "{} is not an aws connection",
575                        scx.catalog.resolve_full_name(item.name())
576                    ),
577                }
578            };
579
580            // For S3 Tables connections in the Materialize Cloud product, verify the
581            // AWS region matches the environment's region. This check only applies when
582            // the enable_s3_tables_region_check dyncfg is set.
583            if let Some(s3tables) = connection.s3tables_catalog() {
584                let enable_region_check =
585                    ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
586                if enable_region_check {
587                    let env_id = &catalog.config().environment_id;
588                    if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
589                        let env_region = env_id.cloud_provider_region();
590                        // Later on we default to "us-east-1" if the region is not set on the S3 Tables
591                        // connection, so we need to do the same check here.
592                        let s3_tables_region = s3tables
593                            .aws_connection
594                            .connection
595                            .region
596                            .clone()
597                            .unwrap_or_else(|| "us-east-1".to_string());
598                        if s3_tables_region != env_region {
599                            Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
600                                s3_tables_region,
601                                environment_region: env_region.to_string(),
602                            })?;
603                        }
604                    }
605                }
606            }
607
608            let _catalog = connection
609                .connect(storage_configuration, InTask::No)
610                .await
611                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
612
613            let _sdk_config = aws_connection
614                .load_sdk_config(
615                    &storage_configuration.connection_context,
616                    aws_conn_id.clone(),
617                    InTask::No,
618                    mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
619                        .get(storage_configuration.config_set()),
620                )
621                .await
622                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
623        }
624    }
625
626    let mut csr_connection_ids = BTreeSet::new();
627    for_each_format(format, |_, fmt| match fmt {
628        Format::Avro(AvroSchema::InlineSchema { .. })
629        | Format::Bytes
630        | Format::Csv { .. }
631        | Format::Json { .. }
632        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
633        | Format::Regex(..)
634        | Format::Text => (),
635        Format::Avro(AvroSchema::Csr {
636            csr_connection: CsrConnectionAvro { connection, .. },
637        })
638        | Format::Protobuf(ProtobufSchema::Csr {
639            csr_connection: CsrConnectionProtobuf { connection, .. },
640        }) => {
641            csr_connection_ids.insert(*connection.connection.item_id());
642        }
643    });
644
645    let scx = StatementContext::new(None, &catalog);
646    for csr_connection_id in csr_connection_ids {
647        let connection = {
648            let item = scx.get_item(&csr_connection_id);
649            // Get Kafka connection
650            match item.connection()? {
651                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
652                _ => Err(CsrPurificationError::NotCsrConnection(
653                    scx.catalog.resolve_full_name(item.name()),
654                ))?,
655            }
656        };
657
658        let client = connection
659            .connect(storage_configuration, InTask::No)
660            .await
661            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
662
663        client
664            .list_subjects()
665            .await
666            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
667    }
668
669    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
670
671    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
672}
673
674/// Runs a function on each format within the format specifier.
675///
676/// The function is provided with a `DocOnSchema` that indicates whether the
677/// format is for the key, value, or both.
678///
679// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
680fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
681where
682    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
683{
684    match format {
685        None => (),
686        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
687        Some(FormatSpecifier::KeyValue { key, value }) => {
688            f(DocOnSchema::KeyOnly, key);
689            f(DocOnSchema::ValueOnly, value);
690        }
691    }
692}
693
694/// Defines whether purification should enforce that at least one valid source
695/// reference is provided on the provided statement.
696#[derive(Debug, Copy, Clone, PartialEq, Eq)]
697pub(crate) enum SourceReferencePolicy {
698    /// Don't allow source references to be provided. This is used for
699    /// enforcing that `CREATE SOURCE` statements don't create subsources.
700    NotAllowed,
701    /// Allow empty references, such as when creating a source that
702    /// will have tables added afterwards.
703    Optional,
704    /// Require that at least one reference is resolved to an upstream
705    /// object.
706    Required,
707}
708
709async fn purify_create_source(
710    catalog: impl SessionCatalog,
711    now: u64,
712    mut create_source_stmt: CreateSourceStatement<Aug>,
713    storage_configuration: &StorageConfiguration,
714) -> Result<PurifiedStatement, PlanError> {
715    let CreateSourceStatement {
716        name: source_name,
717        col_names,
718        key_constraint,
719        connection: source_connection,
720        format,
721        envelope,
722        include_metadata,
723        external_references,
724        progress_subsource,
725        with_options,
726        ..
727    } = &mut create_source_stmt;
728
729    let uses_old_syntax = !col_names.is_empty()
730        || key_constraint.is_some()
731        || format.is_some()
732        || envelope.is_some()
733        || !include_metadata.is_empty()
734        || external_references.is_some()
735        || progress_subsource.is_some();
736
737    if let Some(DeferredItemName::Named(_)) = progress_subsource {
738        sql_bail!("Cannot manually ID qualify progress subsource")
739    }
740
741    let mut requested_subsource_map = BTreeMap::new();
742
743    let progress_desc = match &source_connection {
744        CreateSourceConnection::Kafka { .. } => {
745            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
746        }
747        CreateSourceConnection::Postgres { .. } => {
748            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
749        }
750        CreateSourceConnection::SqlServer { .. } => {
751            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
752        }
753        CreateSourceConnection::MySql { .. } => {
754            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
755        }
756        CreateSourceConnection::LoadGenerator { .. } => {
757            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
758        }
759    };
760    let scx = StatementContext::new(None, &catalog);
761
762    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
763    // to add tables after this source is created we might need to enforce that
764    // auto-generated subsources are created or not created by this source statement.
765    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
766        && scx.catalog.system_vars().force_source_table_syntax()
767    {
768        SourceReferencePolicy::NotAllowed
769    } else if scx.catalog.system_vars().enable_create_table_from_source() {
770        SourceReferencePolicy::Optional
771    } else {
772        SourceReferencePolicy::Required
773    };
774
775    let mut format_options = SourceFormatOptions::Default;
776
777    let retrieved_source_references: RetrievedSourceReferences;
778
779    match source_connection {
780        CreateSourceConnection::Kafka {
781            connection,
782            options: base_with_options,
783            ..
784        } => {
785            if let Some(external_references) = external_references {
786                Err(KafkaSourcePurificationError::ReferencedSubsources(
787                    external_references.clone(),
788                ))?;
789            }
790
791            let connection = {
792                let item = scx.get_item_by_resolved_name(connection)?;
793                // Get Kafka connection
794                match item.connection()? {
795                    Connection::Kafka(connection) => {
796                        connection.clone().into_inline_connection(&catalog)
797                    }
798                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
799                        scx.catalog.resolve_full_name(item.name()),
800                    ))?,
801                }
802            };
803
804            let extracted_options: KafkaSourceConfigOptionExtracted =
805                base_with_options.clone().try_into()?;
806
807            let topic = extracted_options
808                .topic
809                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
810
811            let consumer = connection
812                .create_with_context(
813                    storage_configuration,
814                    MzClientContext::default(),
815                    &BTreeMap::new(),
816                    InTask::No,
817                )
818                .await
819                .map_err(|e| {
820                    // anyhow doesn't support Clone, so not trivial to move into PlanError
821                    KafkaSourcePurificationError::KafkaConsumerError(
822                        e.display_with_causes().to_string(),
823                    )
824                })?;
825            let consumer = Arc::new(consumer);
826
827            match (
828                extracted_options.start_offset,
829                extracted_options.start_timestamp,
830            ) {
831                (None, None) => {
832                    // Validate that the topic at least exists.
833                    kafka_util::ensure_topic_exists(
834                        Arc::clone(&consumer),
835                        &topic,
836                        storage_configuration
837                            .parameters
838                            .kafka_timeout_config
839                            .fetch_metadata_timeout,
840                    )
841                    .await?;
842                }
843                (Some(_), Some(_)) => {
844                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
845                }
846                (Some(start_offsets), None) => {
847                    // Validate the start offsets.
848                    kafka_util::validate_start_offsets(
849                        Arc::clone(&consumer),
850                        &topic,
851                        start_offsets,
852                        storage_configuration
853                            .parameters
854                            .kafka_timeout_config
855                            .fetch_metadata_timeout,
856                    )
857                    .await?;
858                }
859                (None, Some(time_offset)) => {
860                    // Translate `START TIMESTAMP` to a start offset.
861                    let start_offsets = kafka_util::lookup_start_offsets(
862                        Arc::clone(&consumer),
863                        &topic,
864                        time_offset,
865                        now,
866                        storage_configuration
867                            .parameters
868                            .kafka_timeout_config
869                            .fetch_metadata_timeout,
870                    )
871                    .await?;
872
873                    base_with_options.retain(|val| {
874                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
875                    });
876                    base_with_options.push(KafkaSourceConfigOption {
877                        name: KafkaSourceConfigOptionName::StartOffset,
878                        value: Some(WithOptionValue::Sequence(
879                            start_offsets
880                                .iter()
881                                .map(|offset| {
882                                    WithOptionValue::Value(Value::Number(offset.to_string()))
883                                })
884                                .collect(),
885                        )),
886                    });
887                }
888            }
889
890            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
891            retrieved_source_references = reference_client.get_source_references().await?;
892
893            format_options = SourceFormatOptions::Kafka { topic };
894        }
895        CreateSourceConnection::Postgres {
896            connection,
897            options,
898        } => {
899            let connection_item = scx.get_item_by_resolved_name(connection)?;
900            let connection = match connection_item.connection().map_err(PlanError::from)? {
901                Connection::Postgres(connection) => {
902                    connection.clone().into_inline_connection(&catalog)
903                }
904                _ => Err(PgSourcePurificationError::NotPgConnection(
905                    scx.catalog.resolve_full_name(connection_item.name()),
906                ))?,
907            };
908            let crate::plan::statement::PgConfigOptionExtracted {
909                publication,
910                text_columns,
911                exclude_columns,
912                details,
913                ..
914            } = options.clone().try_into()?;
915            let publication =
916                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
917
918            if details.is_some() {
919                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
920            }
921
922            let client = connection
923                .validate(connection_item.id(), storage_configuration)
924                .await?;
925
926            let reference_client = SourceReferenceClient::Postgres {
927                client: &client,
928                publication: &publication,
929                database: &connection.database,
930            };
931            retrieved_source_references = reference_client.get_source_references().await?;
932
933            let postgres::PurifiedSourceExports {
934                source_exports: subsources,
935                normalized_text_columns,
936            } = postgres::purify_source_exports(
937                &client,
938                &retrieved_source_references,
939                external_references,
940                text_columns,
941                exclude_columns,
942                source_name,
943                &reference_policy,
944            )
945            .await?;
946
947            if let Some(text_cols_option) = options
948                .iter_mut()
949                .find(|option| option.name == PgConfigOptionName::TextColumns)
950            {
951                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
952            }
953
954            requested_subsource_map.extend(subsources);
955
956            // Record the active replication timeline_id to allow detection of a future upstream
957            // point-in-time-recovery that will put the source into an error state.
958            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
959
960            // Remove any old detail references
961            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
962            let details = PostgresSourcePublicationDetails {
963                slot: format!(
964                    "materialize_{}",
965                    Uuid::new_v4().to_string().replace('-', "")
966                ),
967                timeline_id: Some(timeline_id),
968                database: connection.database,
969            };
970            options.push(PgConfigOption {
971                name: PgConfigOptionName::Details,
972                value: Some(WithOptionValue::Value(Value::String(hex::encode(
973                    details.into_proto().encode_to_vec(),
974                )))),
975            })
976        }
977        CreateSourceConnection::SqlServer {
978            connection,
979            options,
980        } => {
981            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
982
983            // Connect to the upstream SQL Server instance so we can validate
984            // we're compatible with CDC.
985            let connection_item = scx.get_item_by_resolved_name(connection)?;
986            let connection = match connection_item.connection()? {
987                Connection::SqlServer(connection) => {
988                    connection.clone().into_inline_connection(&catalog)
989                }
990                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
991                    scx.catalog.resolve_full_name(connection_item.name()),
992                ))?,
993            };
994            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
995                details,
996                text_columns,
997                exclude_columns,
998                seen: _,
999            } = options.clone().try_into()?;
1000
1001            if details.is_some() {
1002                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
1003            }
1004
1005            let mut client = connection
1006                .validate(connection_item.id(), storage_configuration)
1007                .await?;
1008
1009            let database: Arc<str> = connection.database.into();
1010            let reference_client = SourceReferenceClient::SqlServer {
1011                client: &mut client,
1012                database: Arc::clone(&database),
1013            };
1014            retrieved_source_references = reference_client.get_source_references().await?;
1015            tracing::debug!(?retrieved_source_references, "got source references");
1016
1017            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1018                .get(storage_configuration.config_set());
1019
1020            let purified_source_exports = sql_server::purify_source_exports(
1021                &*database,
1022                &mut client,
1023                &retrieved_source_references,
1024                external_references,
1025                &text_columns,
1026                &exclude_columns,
1027                source_name,
1028                timeout,
1029                &reference_policy,
1030            )
1031            .await?;
1032
1033            let sql_server::PurifiedSourceExports {
1034                source_exports,
1035                normalized_text_columns,
1036                normalized_excl_columns,
1037            } = purified_source_exports;
1038
1039            // Update our set of requested source exports.
1040            requested_subsource_map.extend(source_exports);
1041
1042            // Record the most recent restore_history_id, or none if the system has never been
1043            // restored.
1044
1045            let restore_history_id =
1046                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1047            let details = SqlServerSourceExtras { restore_history_id };
1048
1049            options.retain(|SqlServerConfigOption { name, .. }| {
1050                name != &SqlServerConfigOptionName::Details
1051            });
1052            options.push(SqlServerConfigOption {
1053                name: SqlServerConfigOptionName::Details,
1054                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1055                    details.into_proto().encode_to_vec(),
1056                )))),
1057            });
1058
1059            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1060            if let Some(text_cols_option) = options
1061                .iter_mut()
1062                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1063            {
1064                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1065            }
1066            if let Some(excl_cols_option) = options
1067                .iter_mut()
1068                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1069            {
1070                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1071            }
1072        }
1073        CreateSourceConnection::MySql {
1074            connection,
1075            options,
1076        } => {
1077            let connection_item = scx.get_item_by_resolved_name(connection)?;
1078            let connection = match connection_item.connection()? {
1079                Connection::MySql(connection) => {
1080                    connection.clone().into_inline_connection(&catalog)
1081                }
1082                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1083                    scx.catalog.resolve_full_name(connection_item.name()),
1084                ))?,
1085            };
1086            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1087                details,
1088                text_columns,
1089                exclude_columns,
1090                seen: _,
1091            } = options.clone().try_into()?;
1092
1093            if details.is_some() {
1094                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1095            }
1096
1097            let mut conn = connection
1098                .validate(connection_item.id(), storage_configuration)
1099                .await
1100                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1101
1102            // Retrieve the current @gtid_executed value of the server to mark as the effective
1103            // initial snapshot point such that we can ensure consistency if the initial source
1104            // snapshot is broken up over multiple points in time.
1105            let initial_gtid_set =
1106                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1107
1108            let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1109
1110            let reference_client = SourceReferenceClient::MySql {
1111                conn: &mut conn,
1112                include_system_schemas: mysql::references_system_schemas(external_references),
1113            };
1114            retrieved_source_references = reference_client.get_source_references().await?;
1115
1116            let mysql::PurifiedSourceExports {
1117                source_exports: subsources,
1118                normalized_text_columns,
1119                normalized_exclude_columns,
1120            } = mysql::purify_source_exports(
1121                &mut conn,
1122                &retrieved_source_references,
1123                external_references,
1124                text_columns,
1125                exclude_columns,
1126                source_name,
1127                initial_gtid_set.clone(),
1128                &reference_policy,
1129                binlog_full_metadata,
1130            )
1131            .await?;
1132            requested_subsource_map.extend(subsources);
1133
1134            // We don't have any fields in this details struct but keep this around for
1135            // conformity with postgres and in-case we end up needing it again in the future.
1136            let details = MySqlSourceDetails {};
1137            // Update options with the purified details
1138            options
1139                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1140            options.push(MySqlConfigOption {
1141                name: MySqlConfigOptionName::Details,
1142                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1143                    details.into_proto().encode_to_vec(),
1144                )))),
1145            });
1146
1147            if let Some(text_cols_option) = options
1148                .iter_mut()
1149                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1150            {
1151                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1152            }
1153            if let Some(exclude_cols_option) = options
1154                .iter_mut()
1155                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1156            {
1157                exclude_cols_option.value =
1158                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1159            }
1160        }
1161        CreateSourceConnection::LoadGenerator { generator, options } => {
1162            let load_generator =
1163                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1164
1165            let reference_client = SourceReferenceClient::LoadGenerator {
1166                generator: &load_generator,
1167            };
1168            retrieved_source_references = reference_client.get_source_references().await?;
1169            // Filter to the references that need to be created as 'subsources', which
1170            // doesn't include the default output for single-output sources.
1171            // TODO(database-issues#8620): Remove once subsources are removed
1172            let multi_output_sources =
1173                retrieved_source_references
1174                    .all_references()
1175                    .iter()
1176                    .any(|r| {
1177                        matches!(
1178                            r.load_generator_output(),
1179                            Some(output) if output != &LoadGeneratorOutput::Default
1180                        )
1181                    });
1182
1183            match external_references {
1184                Some(requested)
1185                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1186                {
1187                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1188                }
1189                Some(requested) if !multi_output_sources => match requested {
1190                    ExternalReferences::SubsetTables(_) => {
1191                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1192                    }
1193                    ExternalReferences::SubsetSchemas(_) => {
1194                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1195                    }
1196                    ExternalReferences::All => {
1197                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1198                    }
1199                },
1200                Some(requested) => {
1201                    let requested_exports = retrieved_source_references
1202                        .requested_source_exports(Some(requested), source_name)?;
1203                    for export in requested_exports {
1204                        requested_subsource_map.insert(
1205                            export.name,
1206                            PurifiedSourceExport {
1207                                external_reference: export.external_reference,
1208                                details: PurifiedExportDetails::LoadGenerator {
1209                                    table: export
1210                                        .meta
1211                                        .load_generator_desc()
1212                                        .ok_or_else(|| {
1213                                            internal_err!(
1214                                                "expected load generator source reference"
1215                                            )
1216                                        })?
1217                                        .clone(),
1218                                    output: export
1219                                        .meta
1220                                        .load_generator_output()
1221                                        .ok_or_else(|| {
1222                                            internal_err!(
1223                                                "expected load generator source reference"
1224                                            )
1225                                        })?
1226                                        .clone(),
1227                                },
1228                            },
1229                        );
1230                    }
1231                }
1232                None => {
1233                    if multi_output_sources
1234                        && matches!(reference_policy, SourceReferencePolicy::Required)
1235                    {
1236                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1237                    }
1238                }
1239            }
1240
1241            if let LoadGenerator::Clock = generator {
1242                if !options
1243                    .iter()
1244                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1245                {
1246                    let now = catalog.now();
1247                    options.push(LoadGeneratorOption {
1248                        name: LoadGeneratorOptionName::AsOf,
1249                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1250                    });
1251                }
1252            }
1253        }
1254    }
1255
1256    // Now that we know which subsources to create alongside this
1257    // statement, remove the references so it is not canonicalized as
1258    // part of the `CREATE SOURCE` statement in the catalog.
1259    *external_references = None;
1260
1261    // Generate progress subsource for old syntax
1262    let create_progress_subsource_stmt = if uses_old_syntax {
1263        // Take name from input or generate name
1264        let name = match progress_subsource {
1265            Some(name) => match name {
1266                DeferredItemName::Deferred(name) => name.clone(),
1267                // Already checked for this value above.
1268                DeferredItemName::Named(_) => {
1269                    sql_bail!("progress subsource name cannot be a resolved name")
1270                }
1271            },
1272            None => {
1273                let (item, prefix) = source_name
1274                    .0
1275                    .split_last()
1276                    .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1277                let item_name =
1278                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1279                        let mut suggested_name = prefix.to_vec();
1280                        suggested_name.push(candidate.clone());
1281
1282                        let partial =
1283                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1284                        let qualified = scx.allocate_qualified_name(partial)?;
1285                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1286                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1287                        Ok::<_, PlanError>(!item_exists && !type_exists)
1288                    })?;
1289
1290                let mut full_name = prefix.to_vec();
1291                full_name.push(item_name);
1292                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1293                let qualified_name = scx.allocate_qualified_name(full_name)?;
1294                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1295
1296                UnresolvedItemName::from(full_name.clone())
1297            }
1298        };
1299
1300        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1301
1302        // Create the subsource statement
1303        let mut progress_with_options: Vec<_> = with_options
1304            .iter()
1305            .filter_map(|opt| match opt.name {
1306                CreateSourceOptionName::TimestampInterval => None,
1307                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1308                    name: CreateSubsourceOptionName::RetainHistory,
1309                    value: opt.value.clone(),
1310                }),
1311            })
1312            .collect();
1313        progress_with_options.push(CreateSubsourceOption {
1314            name: CreateSubsourceOptionName::Progress,
1315            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1316        });
1317
1318        Some(CreateSubsourceStatement {
1319            name,
1320            columns,
1321            // Progress subsources do not refer to the source to which they belong.
1322            // Instead the primary source depends on it (the opposite is true of
1323            // ingestion exports, which depend on the primary source).
1324            of_source: None,
1325            constraints,
1326            if_not_exists: false,
1327            with_options: progress_with_options,
1328        })
1329    } else {
1330        None
1331    };
1332
1333    purify_source_format(
1334        &catalog,
1335        format,
1336        &format_options,
1337        envelope,
1338        storage_configuration,
1339    )
1340    .await?;
1341
1342    Ok(PurifiedStatement::PurifiedCreateSource {
1343        create_progress_subsource_stmt,
1344        create_source_stmt,
1345        subsources: requested_subsource_map,
1346        available_source_references: retrieved_source_references.available_source_references(),
1347    })
1348}
1349
1350/// On success, returns the details on new subsources and updated
1351/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1352async fn purify_alter_source(
1353    catalog: impl SessionCatalog,
1354    stmt: AlterSourceStatement<Aug>,
1355    storage_configuration: &StorageConfiguration,
1356) -> Result<PurifiedStatement, PlanError> {
1357    let scx = StatementContext::new(None, &catalog);
1358    let AlterSourceStatement {
1359        source_name: unresolved_source_name,
1360        action,
1361        if_exists,
1362    } = stmt;
1363
1364    // Get name.
1365    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1366        Ok(item) => item,
1367        Err(_) if if_exists => {
1368            return Ok(PurifiedStatement::PurifiedAlterSource {
1369                alter_source_stmt: AlterSourceStatement {
1370                    source_name: unresolved_source_name,
1371                    action,
1372                    if_exists,
1373                },
1374            });
1375        }
1376        Err(e) => return Err(e),
1377    };
1378
1379    // Ensure it's an ingestion-based and alterable source.
1380    let desc = match item.source_desc()? {
1381        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1382        None => {
1383            sql_bail!("cannot ALTER this type of source")
1384        }
1385    };
1386
1387    let source_name = item.name();
1388
1389    let resolved_source_name = ResolvedItemName::Item {
1390        id: item.id(),
1391        qualifiers: item.name().qualifiers.clone(),
1392        full_name: scx.catalog.resolve_full_name(source_name),
1393        print_id: true,
1394        version: RelationVersionSelector::Latest,
1395    };
1396
1397    let partial_name = scx.catalog.minimal_qualification(source_name);
1398
1399    match action {
1400        AlterSourceAction::AddSubsources {
1401            external_references,
1402            options,
1403        } => {
1404            if scx.catalog.system_vars().enable_create_table_from_source()
1405                && scx.catalog.system_vars().force_source_table_syntax()
1406            {
1407                Err(PlanError::UseTablesForSources(
1408                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1409                ))?;
1410            }
1411
1412            purify_alter_source_add_subsources(
1413                external_references,
1414                options,
1415                desc,
1416                partial_name,
1417                unresolved_source_name,
1418                resolved_source_name,
1419                storage_configuration,
1420            )
1421            .await
1422        }
1423        AlterSourceAction::RefreshReferences => {
1424            purify_alter_source_refresh_references(
1425                desc,
1426                resolved_source_name,
1427                storage_configuration,
1428            )
1429            .await
1430        }
1431        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1432            alter_source_stmt: AlterSourceStatement {
1433                source_name: unresolved_source_name,
1434                action,
1435                if_exists,
1436            },
1437        }),
1438    }
1439}
1440
1441// TODO(database-issues#8620): Remove once subsources are removed
1442/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1443async fn purify_alter_source_add_subsources(
1444    external_references: Vec<ExternalReferenceExport>,
1445    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1446    desc: SourceDesc,
1447    partial_source_name: PartialItemName,
1448    unresolved_source_name: UnresolvedItemName,
1449    resolved_source_name: ResolvedItemName,
1450    storage_configuration: &StorageConfiguration,
1451) -> Result<PurifiedStatement, PlanError> {
1452    // Validate this is a source that can have subsources added.
1453    let connection_id = match &desc.connection {
1454        GenericSourceConnection::Postgres(c) => c.connection_id,
1455        GenericSourceConnection::MySql(c) => c.connection_id,
1456        GenericSourceConnection::SqlServer(c) => c.connection_id,
1457        _ => sql_bail!(
1458            "source {} does not support ALTER SOURCE.",
1459            partial_source_name
1460        ),
1461    };
1462
1463    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1464        text_columns,
1465        exclude_columns,
1466        details,
1467        seen: _,
1468    } = options.clone().try_into()?;
1469    if details.is_some() {
1470        sql_bail!("DETAILS option cannot be explicitly set");
1471    }
1472
1473    let mut requested_subsource_map = BTreeMap::new();
1474
1475    match desc.connection {
1476        GenericSourceConnection::Postgres(pg_source_connection) => {
1477            // Get PostgresConnection for generating subsources.
1478            let pg_connection = &pg_source_connection.connection;
1479
1480            let client = pg_connection
1481                .validate(connection_id, storage_configuration)
1482                .await?;
1483
1484            let reference_client = SourceReferenceClient::Postgres {
1485                client: &client,
1486                publication: &pg_source_connection.publication,
1487                database: &pg_connection.database,
1488            };
1489            let retrieved_source_references = reference_client.get_source_references().await?;
1490
1491            let postgres::PurifiedSourceExports {
1492                source_exports: subsources,
1493                normalized_text_columns,
1494            } = postgres::purify_source_exports(
1495                &client,
1496                &retrieved_source_references,
1497                &Some(ExternalReferences::SubsetTables(external_references)),
1498                text_columns,
1499                exclude_columns,
1500                &unresolved_source_name,
1501                &SourceReferencePolicy::Required,
1502            )
1503            .await?;
1504
1505            if let Some(text_cols_option) = options
1506                .iter_mut()
1507                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1508            {
1509                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1510            }
1511
1512            requested_subsource_map.extend(subsources);
1513        }
1514        GenericSourceConnection::MySql(mysql_source_connection) => {
1515            let mysql_connection = &mysql_source_connection.connection;
1516            let config = mysql_connection
1517                .config(
1518                    &storage_configuration.connection_context.secrets_reader,
1519                    storage_configuration,
1520                    InTask::No,
1521                )
1522                .await?;
1523
1524            let mut conn = config
1525                .connect(
1526                    "mysql purification",
1527                    &storage_configuration.connection_context.ssh_tunnel_manager,
1528                )
1529                .await?;
1530
1531            // Retrieve the current @gtid_executed value of the server to mark as the effective
1532            // initial snapshot point for these subsources.
1533            let initial_gtid_set =
1534                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1535
1536            let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1537
1538            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1539
1540            let reference_client = SourceReferenceClient::MySql {
1541                conn: &mut conn,
1542                include_system_schemas: mysql::references_system_schemas(&requested_references),
1543            };
1544            let retrieved_source_references = reference_client.get_source_references().await?;
1545
1546            let mysql::PurifiedSourceExports {
1547                source_exports: subsources,
1548                normalized_text_columns,
1549                normalized_exclude_columns,
1550            } = mysql::purify_source_exports(
1551                &mut conn,
1552                &retrieved_source_references,
1553                &requested_references,
1554                text_columns,
1555                exclude_columns,
1556                &unresolved_source_name,
1557                initial_gtid_set,
1558                &SourceReferencePolicy::Required,
1559                binlog_full_metadata,
1560            )
1561            .await?;
1562            requested_subsource_map.extend(subsources);
1563
1564            // Update options with the purified details
1565            if let Some(text_cols_option) = options
1566                .iter_mut()
1567                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1568            {
1569                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1570            }
1571            if let Some(exclude_cols_option) = options
1572                .iter_mut()
1573                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1574            {
1575                exclude_cols_option.value =
1576                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1577            }
1578        }
1579        GenericSourceConnection::SqlServer(sql_server_source) => {
1580            // Open a connection to the upstream SQL Server instance.
1581            let sql_server_connection = &sql_server_source.connection;
1582            let config = sql_server_connection
1583                .resolve_config(
1584                    &storage_configuration.connection_context.secrets_reader,
1585                    storage_configuration,
1586                    InTask::No,
1587                )
1588                .await?;
1589            let mut client = mz_sql_server_util::Client::connect(config).await?;
1590
1591            // Query the upstream SQL Server instance for available tables to replicate.
1592            let database = sql_server_connection.database.clone().into();
1593            let source_references = SourceReferenceClient::SqlServer {
1594                client: &mut client,
1595                database: Arc::clone(&database),
1596            }
1597            .get_source_references()
1598            .await?;
1599            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1600
1601            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1602                .get(storage_configuration.config_set());
1603
1604            let result = sql_server::purify_source_exports(
1605                &*database,
1606                &mut client,
1607                &source_references,
1608                &requested_references,
1609                &text_columns,
1610                &exclude_columns,
1611                &unresolved_source_name,
1612                timeout,
1613                &SourceReferencePolicy::Required,
1614            )
1615            .await;
1616            let sql_server::PurifiedSourceExports {
1617                source_exports,
1618                normalized_text_columns,
1619                normalized_excl_columns,
1620            } = result?;
1621
1622            // Add the new exports to our subsource map.
1623            requested_subsource_map.extend(source_exports);
1624
1625            // Update options on the CREATE SOURCE statement with the purified details.
1626            if let Some(text_cols_option) = options
1627                .iter_mut()
1628                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1629            {
1630                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1631            }
1632            if let Some(exclude_cols_option) = options
1633                .iter_mut()
1634                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1635            {
1636                exclude_cols_option.value =
1637                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1638            }
1639        }
1640        // The source kind was already established earlier in this function;
1641        // reaching this catch-all is an internal invariant violation.
1642        _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1643    };
1644
1645    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1646        source_name: resolved_source_name,
1647        options,
1648        subsources: requested_subsource_map,
1649    })
1650}
1651
1652async fn purify_alter_source_refresh_references(
1653    desc: SourceDesc,
1654    resolved_source_name: ResolvedItemName,
1655    storage_configuration: &StorageConfiguration,
1656) -> Result<PurifiedStatement, PlanError> {
1657    let retrieved_source_references = match desc.connection {
1658        GenericSourceConnection::Postgres(pg_source_connection) => {
1659            // Get PostgresConnection for generating subsources.
1660            let pg_connection = &pg_source_connection.connection;
1661
1662            let config = pg_connection
1663                .config(
1664                    &storage_configuration.connection_context.secrets_reader,
1665                    storage_configuration,
1666                    InTask::No,
1667                )
1668                .await?;
1669
1670            let client = config
1671                .connect(
1672                    "postgres_purification",
1673                    &storage_configuration.connection_context.ssh_tunnel_manager,
1674                )
1675                .await?;
1676            let reference_client = SourceReferenceClient::Postgres {
1677                client: &client,
1678                publication: &pg_source_connection.publication,
1679                database: &pg_connection.database,
1680            };
1681            reference_client.get_source_references().await?
1682        }
1683        GenericSourceConnection::MySql(mysql_source_connection) => {
1684            let mysql_connection = &mysql_source_connection.connection;
1685            let config = mysql_connection
1686                .config(
1687                    &storage_configuration.connection_context.secrets_reader,
1688                    storage_configuration,
1689                    InTask::No,
1690                )
1691                .await?;
1692
1693            let mut conn = config
1694                .connect(
1695                    "mysql purification",
1696                    &storage_configuration.connection_context.ssh_tunnel_manager,
1697                )
1698                .await?;
1699
1700            let reference_client = SourceReferenceClient::MySql {
1701                conn: &mut conn,
1702                include_system_schemas: false,
1703            };
1704            reference_client.get_source_references().await?
1705        }
1706        GenericSourceConnection::SqlServer(sql_server_source) => {
1707            // Open a connection to the upstream SQL Server instance.
1708            let sql_server_connection = &sql_server_source.connection;
1709            let config = sql_server_connection
1710                .resolve_config(
1711                    &storage_configuration.connection_context.secrets_reader,
1712                    storage_configuration,
1713                    InTask::No,
1714                )
1715                .await?;
1716            let mut client = mz_sql_server_util::Client::connect(config).await?;
1717
1718            // Query the upstream SQL Server instance for available tables to replicate.
1719            let source_references = SourceReferenceClient::SqlServer {
1720                client: &mut client,
1721                database: sql_server_connection.database.clone().into(),
1722            }
1723            .get_source_references()
1724            .await?;
1725            source_references
1726        }
1727        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1728            let reference_client = SourceReferenceClient::LoadGenerator {
1729                generator: &load_gen_connection.load_generator,
1730            };
1731            reference_client.get_source_references().await?
1732        }
1733        GenericSourceConnection::Kafka(kafka_conn) => {
1734            let reference_client = SourceReferenceClient::Kafka {
1735                topic: &kafka_conn.topic,
1736            };
1737            reference_client.get_source_references().await?
1738        }
1739    };
1740    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1741        source_name: resolved_source_name,
1742        available_source_references: retrieved_source_references.available_source_references(),
1743    })
1744}
1745
1746async fn purify_create_table_from_source(
1747    catalog: impl SessionCatalog,
1748    mut stmt: CreateTableFromSourceStatement<Aug>,
1749    storage_configuration: &StorageConfiguration,
1750) -> Result<PurifiedStatement, PlanError> {
1751    let scx = StatementContext::new(None, &catalog);
1752    let CreateTableFromSourceStatement {
1753        name: _,
1754        columns,
1755        constraints,
1756        source: source_name,
1757        if_not_exists: _,
1758        external_reference,
1759        format,
1760        envelope,
1761        include_metadata: _,
1762        with_options,
1763    } = &mut stmt;
1764
1765    // Columns and constraints cannot be specified by the user but will be populated below.
1766    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1767        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1768    }
1769    if !constraints.is_empty() {
1770        sql_bail!(
1771            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1772        );
1773    }
1774
1775    // Get the source item
1776    let item = match scx.get_item_by_resolved_name(source_name) {
1777        Ok(item) => item,
1778        Err(e) => return Err(e),
1779    };
1780
1781    // Ensure it's an ingestion-based and alterable source.
1782    let desc = match item.source_desc()? {
1783        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1784        None => {
1785            sql_bail!("cannot ALTER this type of source")
1786        }
1787    };
1788    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1789
1790    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1791        text_columns,
1792        exclude_columns,
1793        retain_history: _,
1794        details,
1795        partition_by: _,
1796        seen: _,
1797    } = with_options.clone().try_into()?;
1798    if details.is_some() {
1799        sql_bail!("DETAILS option cannot be explicitly set");
1800    }
1801
1802    // Our text column values are unqualified (just column names), but the purification methods below
1803    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1804    // need to qualify them using the external reference first.
1805    let qualified_text_columns = text_columns
1806        .iter()
1807        .map(|col| {
1808            UnresolvedItemName(
1809                external_reference
1810                    .as_ref()
1811                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1812                    .unwrap_or_else(|| vec![col.clone()]),
1813            )
1814        })
1815        .collect_vec();
1816    let qualified_exclude_columns = exclude_columns
1817        .iter()
1818        .map(|col| {
1819            UnresolvedItemName(
1820                external_reference
1821                    .as_ref()
1822                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1823                    .unwrap_or_else(|| vec![col.clone()]),
1824            )
1825        })
1826        .collect_vec();
1827
1828    // Should be overriden below if a source-specific format is required.
1829    let mut format_options = SourceFormatOptions::Default;
1830
1831    let retrieved_source_references: RetrievedSourceReferences;
1832
1833    let requested_references = external_reference.as_ref().map(|ref_name| {
1834        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1835            reference: ref_name.clone(),
1836            alias: None,
1837        }])
1838    });
1839
1840    // Run purification work specific to each source type: resolve the external reference to
1841    // a fully qualified name and obtain the appropriate details for the source-export statement
1842    let purified_export = match desc.connection {
1843        GenericSourceConnection::Postgres(pg_source_connection) => {
1844            // Get PostgresConnection for generating subsources.
1845            let pg_connection = &pg_source_connection.connection;
1846
1847            let client = pg_connection
1848                .validate(pg_source_connection.connection_id, storage_configuration)
1849                .await?;
1850
1851            let reference_client = SourceReferenceClient::Postgres {
1852                client: &client,
1853                publication: &pg_source_connection.publication,
1854                database: &pg_connection.database,
1855            };
1856            retrieved_source_references = reference_client.get_source_references().await?;
1857
1858            let postgres::PurifiedSourceExports {
1859                source_exports,
1860                // TODO(database-issues#8620): Remove once subsources are removed
1861                // This `normalized_text_columns` is not relevant for us and is only returned for
1862                // `CREATE SOURCE` statements that automatically generate subsources
1863                normalized_text_columns: _,
1864            } = postgres::purify_source_exports(
1865                &client,
1866                &retrieved_source_references,
1867                &requested_references,
1868                qualified_text_columns,
1869                qualified_exclude_columns,
1870                &unresolved_source_name,
1871                &SourceReferencePolicy::Required,
1872            )
1873            .await?;
1874            // There should be exactly one source_export returned for this statement
1875            let (_, purified_export) = source_exports.into_element();
1876            purified_export
1877        }
1878        GenericSourceConnection::MySql(mysql_source_connection) => {
1879            let mysql_connection = &mysql_source_connection.connection;
1880            let config = mysql_connection
1881                .config(
1882                    &storage_configuration.connection_context.secrets_reader,
1883                    storage_configuration,
1884                    InTask::No,
1885                )
1886                .await?;
1887
1888            let mut conn = config
1889                .connect(
1890                    "mysql purification",
1891                    &storage_configuration.connection_context.ssh_tunnel_manager,
1892                )
1893                .await?;
1894
1895            ensure_binlog_full_metadata(&mut conn).await?;
1896            let binlog_full_metadata = true;
1897
1898            // Retrieve the current @gtid_executed value of the server to mark as the effective
1899            // initial snapshot point for this table.
1900            let initial_gtid_set =
1901                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1902
1903            let reference_client = SourceReferenceClient::MySql {
1904                conn: &mut conn,
1905                include_system_schemas: mysql::references_system_schemas(&requested_references),
1906            };
1907            retrieved_source_references = reference_client.get_source_references().await?;
1908
1909            let mysql::PurifiedSourceExports {
1910                source_exports,
1911                // TODO(database-issues#8620): Remove once subsources are removed
1912                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1913                // `CREATE SOURCE` statements that automatically generate subsources
1914                normalized_text_columns: _,
1915                normalized_exclude_columns: _,
1916            } = mysql::purify_source_exports(
1917                &mut conn,
1918                &retrieved_source_references,
1919                &requested_references,
1920                qualified_text_columns,
1921                qualified_exclude_columns,
1922                &unresolved_source_name,
1923                initial_gtid_set,
1924                &SourceReferencePolicy::Required,
1925                binlog_full_metadata,
1926            )
1927            .await?;
1928            // There should be exactly one source_export returned for this statement
1929            let (_, purified_export) = source_exports.into_element();
1930            purified_export
1931        }
1932        GenericSourceConnection::SqlServer(sql_server_source) => {
1933            let connection = sql_server_source.connection;
1934            let config = connection
1935                .resolve_config(
1936                    &storage_configuration.connection_context.secrets_reader,
1937                    storage_configuration,
1938                    InTask::No,
1939                )
1940                .await?;
1941            let mut client = mz_sql_server_util::Client::connect(config).await?;
1942
1943            let database: Arc<str> = connection.database.into();
1944            let reference_client = SourceReferenceClient::SqlServer {
1945                client: &mut client,
1946                database: Arc::clone(&database),
1947            };
1948            retrieved_source_references = reference_client.get_source_references().await?;
1949            tracing::debug!(?retrieved_source_references, "got source references");
1950
1951            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1952                .get(storage_configuration.config_set());
1953
1954            let purified_source_exports = sql_server::purify_source_exports(
1955                &*database,
1956                &mut client,
1957                &retrieved_source_references,
1958                &requested_references,
1959                &qualified_text_columns,
1960                &qualified_exclude_columns,
1961                &unresolved_source_name,
1962                timeout,
1963                &SourceReferencePolicy::Required,
1964            )
1965            .await?;
1966
1967            // There should be exactly one source_export returned for this statement
1968            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1969            purified_export
1970        }
1971        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1972            let reference_client = SourceReferenceClient::LoadGenerator {
1973                generator: &load_gen_connection.load_generator,
1974            };
1975            retrieved_source_references = reference_client.get_source_references().await?;
1976
1977            let requested_exports = retrieved_source_references
1978                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1979            // There should be exactly one source_export returned
1980            let export = requested_exports.into_element();
1981            PurifiedSourceExport {
1982                external_reference: export.external_reference,
1983                details: PurifiedExportDetails::LoadGenerator {
1984                    table: export
1985                        .meta
1986                        .load_generator_desc()
1987                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
1988                        .clone(),
1989                    output: export
1990                        .meta
1991                        .load_generator_output()
1992                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
1993                        .clone(),
1994                },
1995            }
1996        }
1997        GenericSourceConnection::Kafka(kafka_conn) => {
1998            let reference_client = SourceReferenceClient::Kafka {
1999                topic: &kafka_conn.topic,
2000            };
2001            retrieved_source_references = reference_client.get_source_references().await?;
2002            let requested_exports = retrieved_source_references
2003                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2004            // There should be exactly one source_export returned
2005            let export = requested_exports.into_element();
2006
2007            format_options = SourceFormatOptions::Kafka {
2008                topic: kafka_conn.topic.clone(),
2009            };
2010            PurifiedSourceExport {
2011                external_reference: export.external_reference,
2012                details: PurifiedExportDetails::Kafka {},
2013            }
2014        }
2015    };
2016
2017    purify_source_format(
2018        &catalog,
2019        format,
2020        &format_options,
2021        envelope,
2022        storage_configuration,
2023    )
2024    .await?;
2025
2026    // Update the external reference in the statement to the resolved fully-qualified
2027    // external reference
2028    *external_reference = Some(purified_export.external_reference.clone());
2029
2030    // Update options in the statement using the purified export details
2031    match &purified_export.details {
2032        PurifiedExportDetails::Postgres { .. } => {
2033            let mut unsupported_cols = vec![];
2034            let postgres::PostgresExportStatementValues {
2035                columns: gen_columns,
2036                constraints: gen_constraints,
2037                text_columns: gen_text_columns,
2038                exclude_columns: gen_exclude_columns,
2039                details: gen_details,
2040                external_reference: _,
2041            } = postgres::generate_source_export_statement_values(
2042                &scx,
2043                purified_export,
2044                &mut unsupported_cols,
2045            )?;
2046            if !unsupported_cols.is_empty() {
2047                unsupported_cols.sort();
2048                Err(PgSourcePurificationError::UnrecognizedTypes {
2049                    cols: unsupported_cols,
2050                })?;
2051            }
2052
2053            if let Some(text_cols_option) = with_options
2054                .iter_mut()
2055                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2056            {
2057                match gen_text_columns {
2058                    Some(gen_text_columns) => {
2059                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2060                    }
2061                    None => soft_panic_or_log!(
2062                        "text_columns should be Some if text_cols_option is present"
2063                    ),
2064                }
2065            }
2066            if let Some(exclude_cols_option) = with_options
2067                .iter_mut()
2068                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2069            {
2070                match gen_exclude_columns {
2071                    Some(gen_exclude_columns) => {
2072                        exclude_cols_option.value =
2073                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2074                    }
2075                    None => soft_panic_or_log!(
2076                        "exclude_columns should be Some if exclude_cols_option is present"
2077                    ),
2078                }
2079            }
2080            match columns {
2081                TableFromSourceColumns::Defined(_) => {
2082                    // Purification is what populates the `Defined` variant;
2083                    // reaching this is an internal invariant violation.
2084                    bail_internal!(
2085                        "column definitions cannot be explicitly set for this source type"
2086                    )
2087                }
2088                TableFromSourceColumns::NotSpecified => {
2089                    *columns = TableFromSourceColumns::Defined(gen_columns);
2090                    *constraints = gen_constraints;
2091                }
2092                TableFromSourceColumns::Named(_) => {
2093                    sql_bail!("columns cannot be named for Postgres sources")
2094                }
2095            }
2096            with_options.push(TableFromSourceOption {
2097                name: TableFromSourceOptionName::Details,
2098                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2099                    gen_details.into_proto().encode_to_vec(),
2100                )))),
2101            })
2102        }
2103        PurifiedExportDetails::MySql { .. } => {
2104            let mysql::MySqlExportStatementValues {
2105                columns: gen_columns,
2106                constraints: gen_constraints,
2107                text_columns: gen_text_columns,
2108                exclude_columns: gen_exclude_columns,
2109                details: gen_details,
2110                external_reference: _,
2111            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2112
2113            if let Some(text_cols_option) = with_options
2114                .iter_mut()
2115                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2116            {
2117                match gen_text_columns {
2118                    Some(gen_text_columns) => {
2119                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2120                    }
2121                    None => soft_panic_or_log!(
2122                        "text_columns should be Some if text_cols_option is present"
2123                    ),
2124                }
2125            }
2126            if let Some(exclude_cols_option) = with_options
2127                .iter_mut()
2128                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2129            {
2130                match gen_exclude_columns {
2131                    Some(gen_exclude_columns) => {
2132                        exclude_cols_option.value =
2133                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2134                    }
2135                    None => soft_panic_or_log!(
2136                        "exclude_columns should be Some if exclude_cols_option is present"
2137                    ),
2138                }
2139            }
2140            match columns {
2141                TableFromSourceColumns::Defined(_) => {
2142                    // Purification is what populates the `Defined` variant;
2143                    // reaching this is an internal invariant violation.
2144                    bail_internal!(
2145                        "column definitions cannot be explicitly set for this source type"
2146                    )
2147                }
2148                TableFromSourceColumns::NotSpecified => {
2149                    *columns = TableFromSourceColumns::Defined(gen_columns);
2150                    *constraints = gen_constraints;
2151                }
2152                TableFromSourceColumns::Named(_) => {
2153                    sql_bail!("columns cannot be named for MySQL sources")
2154                }
2155            }
2156            with_options.push(TableFromSourceOption {
2157                name: TableFromSourceOptionName::Details,
2158                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2159                    gen_details.into_proto().encode_to_vec(),
2160                )))),
2161            })
2162        }
2163        PurifiedExportDetails::SqlServer { .. } => {
2164            let sql_server::SqlServerExportStatementValues {
2165                columns: gen_columns,
2166                constraints: gen_constraints,
2167                text_columns: gen_text_columns,
2168                excl_columns: gen_excl_columns,
2169                details: gen_details,
2170                external_reference: _,
2171            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2172
2173            if let Some(text_cols_option) = with_options
2174                .iter_mut()
2175                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2176            {
2177                match gen_text_columns {
2178                    Some(gen_text_columns) => {
2179                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2180                    }
2181                    None => soft_panic_or_log!(
2182                        "text_columns should be Some if text_cols_option is present"
2183                    ),
2184                }
2185            }
2186            if let Some(exclude_cols_option) = with_options
2187                .iter_mut()
2188                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2189            {
2190                match gen_excl_columns {
2191                    Some(gen_excl_columns) => {
2192                        exclude_cols_option.value =
2193                            Some(WithOptionValue::Sequence(gen_excl_columns))
2194                    }
2195                    None => soft_panic_or_log!(
2196                        "excl_columns should be Some if excl_cols_option is present"
2197                    ),
2198                }
2199            }
2200
2201            match columns {
2202                TableFromSourceColumns::NotSpecified => {
2203                    *columns = TableFromSourceColumns::Defined(gen_columns);
2204                    *constraints = gen_constraints;
2205                }
2206                TableFromSourceColumns::Named(_) => {
2207                    sql_bail!("columns cannot be named for SQL Server sources")
2208                }
2209                TableFromSourceColumns::Defined(_) => {
2210                    // Purification is what populates the `Defined` variant;
2211                    // reaching this is an internal invariant violation.
2212                    bail_internal!(
2213                        "column definitions cannot be explicitly set for this source type"
2214                    )
2215                }
2216            }
2217
2218            with_options.push(TableFromSourceOption {
2219                name: TableFromSourceOptionName::Details,
2220                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2221                    gen_details.into_proto().encode_to_vec(),
2222                )))),
2223            })
2224        }
2225        PurifiedExportDetails::LoadGenerator { .. } => {
2226            let (desc, output) = match purified_export.details {
2227                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2228                _ => bail_internal!("purified export details must be load generator"),
2229            };
2230            // We only determine the table description for multi-output load generator sources here,
2231            // whereas single-output load generators will have their relation description
2232            // determined during statment planning as envelope and format options may affect their
2233            // schema.
2234            if let Some(desc) = desc {
2235                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2236                match columns {
2237                    // Purification is what populates the `Defined` variant;
2238                    // reaching this is an internal invariant violation.
2239                    TableFromSourceColumns::Defined(_) => bail_internal!(
2240                        "column definitions cannot be explicitly set for this source type"
2241                    ),
2242                    TableFromSourceColumns::NotSpecified => {
2243                        *columns = TableFromSourceColumns::Defined(gen_columns);
2244                        *constraints = gen_constraints;
2245                    }
2246                    TableFromSourceColumns::Named(_) => {
2247                        sql_bail!("columns cannot be named for multi-output load generator sources")
2248                    }
2249                }
2250            }
2251            let details = SourceExportStatementDetails::LoadGenerator { output };
2252            with_options.push(TableFromSourceOption {
2253                name: TableFromSourceOptionName::Details,
2254                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2255                    details.into_proto().encode_to_vec(),
2256                )))),
2257            })
2258        }
2259        PurifiedExportDetails::Kafka {} => {
2260            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2261            // format field, so we don't specify any columns or constraints to be stored
2262            // on the statement here. The RelationDesc will be determined during planning.
2263            let details = SourceExportStatementDetails::Kafka {};
2264            with_options.push(TableFromSourceOption {
2265                name: TableFromSourceOptionName::Details,
2266                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2267                    details.into_proto().encode_to_vec(),
2268                )))),
2269            })
2270        }
2271    };
2272
2273    // TODO: We might as well use the retrieved available references to update the source
2274    // available references table in the catalog, so plumb this through.
2275    // available_source_references: retrieved_source_references.available_source_references(),
2276    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2277}
2278
2279enum SourceFormatOptions {
2280    Default,
2281    Kafka { topic: String },
2282}
2283
2284async fn purify_source_format(
2285    catalog: &dyn SessionCatalog,
2286    format: &mut Option<FormatSpecifier<Aug>>,
2287    options: &SourceFormatOptions,
2288    envelope: &Option<SourceEnvelope>,
2289    storage_configuration: &StorageConfiguration,
2290) -> Result<(), PlanError> {
2291    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2292        && !matches!(options, SourceFormatOptions::Kafka { .. })
2293    {
2294        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2295    }
2296
2297    match format.as_mut() {
2298        None => {}
2299        Some(FormatSpecifier::Bare(format)) => {
2300            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2301                .await?;
2302        }
2303
2304        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2305            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2306                .await?;
2307            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2308                .await?;
2309        }
2310    }
2311    Ok(())
2312}
2313
2314async fn purify_source_format_single(
2315    catalog: &dyn SessionCatalog,
2316    format: &mut Format<Aug>,
2317    options: &SourceFormatOptions,
2318    envelope: &Option<SourceEnvelope>,
2319    storage_configuration: &StorageConfiguration,
2320) -> Result<(), PlanError> {
2321    match format {
2322        Format::Avro(schema) => match schema {
2323            AvroSchema::Csr { csr_connection } => {
2324                purify_csr_connection_avro(
2325                    catalog,
2326                    options,
2327                    csr_connection,
2328                    envelope,
2329                    storage_configuration,
2330                )
2331                .await?
2332            }
2333            AvroSchema::InlineSchema { .. } => {}
2334        },
2335        Format::Protobuf(schema) => match schema {
2336            ProtobufSchema::Csr { csr_connection } => {
2337                purify_csr_connection_proto(
2338                    catalog,
2339                    options,
2340                    csr_connection,
2341                    envelope,
2342                    storage_configuration,
2343                )
2344                .await?;
2345            }
2346            ProtobufSchema::InlineSchema { .. } => {}
2347        },
2348        Format::Bytes
2349        | Format::Regex(_)
2350        | Format::Json { .. }
2351        | Format::Text
2352        | Format::Csv { .. } => (),
2353    }
2354    Ok(())
2355}
2356
2357pub fn generate_subsource_statements(
2358    scx: &StatementContext,
2359    source_name: ResolvedItemName,
2360    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2361) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2362    // get the first subsource to determine the connection type
2363    if subsources.is_empty() {
2364        return Ok(vec![]);
2365    }
2366    let (_, purified_export) = subsources
2367        .iter()
2368        .next()
2369        .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2370
2371    let statements = match &purified_export.details {
2372        PurifiedExportDetails::Postgres { .. } => {
2373            crate::pure::postgres::generate_create_subsource_statements(
2374                scx,
2375                source_name,
2376                subsources,
2377            )?
2378        }
2379        PurifiedExportDetails::MySql { .. } => {
2380            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2381        }
2382        PurifiedExportDetails::SqlServer { .. } => {
2383            crate::pure::sql_server::generate_create_subsource_statements(
2384                scx,
2385                source_name,
2386                subsources,
2387            )?
2388        }
2389        PurifiedExportDetails::LoadGenerator { .. } => {
2390            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2391            for (subsource_name, purified_export) in subsources {
2392                let (desc, output) = match purified_export.details {
2393                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2394                    _ => {
2395                        bail_internal!("purified export details must be load generator")
2396                    }
2397                };
2398                let desc = desc.ok_or_else(|| {
2399                    internal_err!(
2400                        "subsources cannot be generated for single-output load generators"
2401                    )
2402                })?;
2403
2404                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2405                let details = SourceExportStatementDetails::LoadGenerator { output };
2406                // Create the subsource statement
2407                let subsource = CreateSubsourceStatement {
2408                    name: subsource_name,
2409                    columns,
2410                    of_source: Some(source_name.clone()),
2411                    // unlike sources that come from an external upstream, we
2412                    // have more leniency to introduce different constraints
2413                    // every time the load generator is run; i.e. we are not as
2414                    // worried about introducing junk data.
2415                    constraints: table_constraints,
2416                    if_not_exists: false,
2417                    with_options: vec![
2418                        CreateSubsourceOption {
2419                            name: CreateSubsourceOptionName::ExternalReference,
2420                            value: Some(WithOptionValue::UnresolvedItemName(
2421                                purified_export.external_reference,
2422                            )),
2423                        },
2424                        CreateSubsourceOption {
2425                            name: CreateSubsourceOptionName::Details,
2426                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2427                                details.into_proto().encode_to_vec(),
2428                            )))),
2429                        },
2430                    ],
2431                };
2432                subsource_stmts.push(subsource);
2433            }
2434
2435            subsource_stmts
2436        }
2437        PurifiedExportDetails::Kafka { .. } => {
2438            // TODO: as part of database-issues#8322, Kafka sources will begin
2439            // producing data––we'll need to understand the schema
2440            // of the output here.
2441            if !subsources.is_empty() {
2442                bail_internal!("Kafka sources do not produce data-bearing subsources");
2443            }
2444            vec![]
2445        }
2446    };
2447    Ok(statements)
2448}
2449
2450async fn purify_csr_connection_proto(
2451    catalog: &dyn SessionCatalog,
2452    options: &SourceFormatOptions,
2453    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2454    envelope: &Option<SourceEnvelope>,
2455    storage_configuration: &StorageConfiguration,
2456) -> Result<(), PlanError> {
2457    let SourceFormatOptions::Kafka { topic } = options else {
2458        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2459    };
2460
2461    let CsrConnectionProtobuf {
2462        seed,
2463        connection: CsrConnection {
2464            connection,
2465            options: _,
2466        },
2467    } = csr_connection;
2468    match seed {
2469        None => {
2470            let scx = StatementContext::new(None, &*catalog);
2471
2472            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2473                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2474                _ => sql_bail!("{} is not a schema registry connection", connection),
2475            };
2476
2477            let ccsr_client = ccsr_connection
2478                .connect(storage_configuration, InTask::No)
2479                .await
2480                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2481
2482            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2483            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2484                .await
2485                .ok();
2486
2487            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2488                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2489            }
2490
2491            *seed = Some(CsrSeedProtobuf { value, key });
2492        }
2493        Some(_) => (),
2494    }
2495
2496    Ok(())
2497}
2498
2499async fn purify_csr_connection_avro(
2500    catalog: &dyn SessionCatalog,
2501    options: &SourceFormatOptions,
2502    csr_connection: &mut CsrConnectionAvro<Aug>,
2503    envelope: &Option<SourceEnvelope>,
2504    storage_configuration: &StorageConfiguration,
2505) -> Result<(), PlanError> {
2506    let SourceFormatOptions::Kafka { topic } = options else {
2507        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2508    };
2509
2510    let CsrConnectionAvro {
2511        connection: CsrConnection { connection, .. },
2512        seed,
2513        key_strategy,
2514        value_strategy,
2515    } = csr_connection;
2516    if seed.is_none() {
2517        let scx = StatementContext::new(None, &*catalog);
2518        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2519            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2520            _ => sql_bail!("{} is not a schema registry connection", connection),
2521        };
2522        let ccsr_client = csr_connection
2523            .connect(storage_configuration, InTask::No)
2524            .await
2525            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2526
2527        let Schema {
2528            key_schema,
2529            value_schema,
2530            key_reference_schemas,
2531            value_reference_schemas,
2532        } = get_remote_csr_schema(
2533            &ccsr_client,
2534            key_strategy.clone().unwrap_or_default(),
2535            value_strategy.clone().unwrap_or_default(),
2536            topic,
2537        )
2538        .await?;
2539        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2540            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2541        }
2542
2543        *seed = Some(CsrSeedAvro {
2544            key_schema,
2545            value_schema,
2546            key_reference_schemas,
2547            value_reference_schemas,
2548        })
2549    }
2550
2551    Ok(())
2552}
2553
2554#[derive(Debug)]
2555pub struct Schema {
2556    pub key_schema: Option<String>,
2557    pub value_schema: String,
2558    /// Reference schemas for the key schema, in dependency order.
2559    pub key_reference_schemas: Vec<String>,
2560    /// Reference schemas for the value schema, in dependency order.
2561    pub value_reference_schemas: Vec<String>,
2562}
2563
2564/// Result of fetching a schema, including any referenced schemas.
2565struct SchemaWithReferences {
2566    /// The primary schema.
2567    schema: String,
2568    /// Reference schemas in dependency order.
2569    references: Vec<String>,
2570}
2571
2572async fn get_schema_with_strategy(
2573    client: &Client,
2574    strategy: ReaderSchemaSelectionStrategy,
2575    subject: &str,
2576) -> Result<Option<SchemaWithReferences>, PlanError> {
2577    match strategy {
2578        ReaderSchemaSelectionStrategy::Latest => {
2579            // Use get_subject_and_references to also fetch referenced schemas
2580            match client.get_subject_and_references(subject).await {
2581                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2582                    schema: primary.schema.raw,
2583                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2584                })),
2585                Err(GetBySubjectError::SubjectNotFound)
2586                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2587                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2588                    schema_lookup: format!("subject {}", subject.quoted()),
2589                    cause: Arc::new(e),
2590                }),
2591            }
2592        }
2593        // It's possible that CSR was provided with an inline strategy, but without a subject
2594        // or schema id to look up, there isn't a clean way to lookup references for the schema.
2595        // This could be done with extra work, but at this point, it isn't clear this is needed.
2596        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2597            schema: raw,
2598            references: vec![],
2599        })),
2600        ReaderSchemaSelectionStrategy::ById(id) => {
2601            match client.get_subject_and_references_by_id(id).await {
2602                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2603                    schema: primary.schema.raw,
2604                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2605                })),
2606                Err(GetBySubjectError::SubjectNotFound)
2607                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2608                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2609                    schema_lookup: format!("subject {}", subject.quoted()),
2610                    cause: Arc::new(e),
2611                }),
2612            }
2613        }
2614    }
2615}
2616
2617async fn get_remote_csr_schema(
2618    ccsr_client: &mz_ccsr::Client,
2619    key_strategy: ReaderSchemaSelectionStrategy,
2620    value_strategy: ReaderSchemaSelectionStrategy,
2621    topic: &str,
2622) -> Result<Schema, PlanError> {
2623    let value_schema_name = format!("{}-value", topic);
2624    let value_result =
2625        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2626    let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2627
2628    let key_subject = format!("{}-key", topic);
2629    let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2630    Ok(Schema {
2631        key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2632        value_schema: value_result.schema,
2633        key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2634        value_reference_schemas: value_result.references,
2635    })
2636}
2637
2638/// Collect protobuf message descriptor from CSR and compile the descriptor.
2639async fn compile_proto(
2640    subject_name: &String,
2641    ccsr_client: &Client,
2642) -> Result<CsrSeedProtobufSchema, PlanError> {
2643    let (primary_subject, dependency_subjects) = ccsr_client
2644        .get_subject_and_references(subject_name)
2645        .await
2646        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2647            schema_lookup: format!("subject {}", subject_name.quoted()),
2648            cause: Arc::new(e),
2649        })?;
2650
2651    // Compile .proto files into a file descriptor set.
2652    let mut source_tree = VirtualSourceTree::new();
2653
2654    // Add well-known types (e.g., google/protobuf/timestamp.proto) to the source
2655    // tree. These are implicitly available to protoc but are typically not
2656    // registered in the schema registry.
2657    source_tree.as_mut().map_well_known_types();
2658
2659    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2660        source_tree.as_mut().add_file(
2661            Path::new(&subject.name),
2662            subject.schema.raw.as_bytes().to_vec(),
2663        );
2664    }
2665    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2666    let fds = db
2667        .as_mut()
2668        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2669        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2670
2671    // Ensure there is exactly one message in the file.
2672    let primary_fd = fds.file(0);
2673    let message_name = match primary_fd.message_type_size() {
2674        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2675        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2676        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2677    };
2678
2679    // Encode the file descriptor set into a SQL byte string.
2680    let bytes = &fds
2681        .serialize()
2682        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2683    let mut schema = String::new();
2684    strconv::format_bytes(&mut schema, bytes);
2685
2686    Ok(CsrSeedProtobufSchema {
2687        schema,
2688        message_name,
2689    })
2690}
2691
2692const MZ_NOW_NAME: &str = "mz_now";
2693const MZ_NOW_SCHEMA: &str = "mz_catalog";
2694
2695/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2696/// references to ids appear or disappear during the purification.
2697///
2698/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2699/// this function is not making any network calls.
2700pub fn purify_create_materialized_view_options(
2701    catalog: impl SessionCatalog,
2702    mz_now: Option<Timestamp>,
2703    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2704    resolved_ids: &mut ResolvedIds,
2705) {
2706    // 0. Preparations:
2707    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2708    let (mz_now_id, mz_now_expr) = {
2709        let item = catalog
2710            .resolve_function(&PartialItemName {
2711                database: None,
2712                schema: Some(MZ_NOW_SCHEMA.to_string()),
2713                item: MZ_NOW_NAME.to_string(),
2714            })
2715            .expect("we should be able to resolve mz_now");
2716        (
2717            item.id(),
2718            Expr::Function(Function {
2719                name: ResolvedItemName::Item {
2720                    id: item.id(),
2721                    qualifiers: item.name().qualifiers.clone(),
2722                    full_name: catalog.resolve_full_name(item.name()),
2723                    print_id: false,
2724                    version: RelationVersionSelector::Latest,
2725                },
2726                args: FunctionArgs::Args {
2727                    args: Vec::new(),
2728                    order_by: Vec::new(),
2729                },
2730                filter: None,
2731                over: None,
2732                distinct: false,
2733            }),
2734        )
2735    };
2736    // Prepare the `mz_timestamp` type.
2737    let (mz_timestamp_id, mz_timestamp_type) = {
2738        let item = catalog.get_system_type("mz_timestamp");
2739        let full_name = catalog.resolve_full_name(item.name());
2740        (
2741            item.id(),
2742            ResolvedDataType::Named {
2743                id: item.id(),
2744                qualifiers: item.name().qualifiers.clone(),
2745                full_name,
2746                modifiers: vec![],
2747                print_id: true,
2748            },
2749        )
2750    };
2751
2752    let mut introduced_mz_timestamp = false;
2753
2754    for option in cmvs.with_options.iter_mut() {
2755        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2756        if matches!(
2757            option.value,
2758            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2759        ) {
2760            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2761                RefreshAtOptionValue {
2762                    time: mz_now_expr.clone(),
2763                },
2764            )));
2765        }
2766
2767        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2768        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2769            RefreshEveryOptionValue { aligned_to, .. },
2770        ))) = &mut option.value
2771        {
2772            if aligned_to.is_none() {
2773                *aligned_to = Some(mz_now_expr.clone());
2774            }
2775        }
2776
2777        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2778        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2779        match &mut option.value {
2780            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2781                time,
2782            }))) => {
2783                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2784                visitor.visit_expr_mut(time);
2785                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2786            }
2787            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2788                RefreshEveryOptionValue {
2789                    interval: _,
2790                    aligned_to: Some(aligned_to),
2791                },
2792            ))) => {
2793                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2794                visitor.visit_expr_mut(aligned_to);
2795                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2796            }
2797            _ => {}
2798        }
2799    }
2800
2801    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2802    if !cmvs.with_options.iter().any(|o| {
2803        matches!(
2804            o,
2805            MaterializedViewOption {
2806                value: Some(WithOptionValue::Refresh(..)),
2807                ..
2808            }
2809        )
2810    }) {
2811        cmvs.with_options.push(MaterializedViewOption {
2812            name: MaterializedViewOptionName::Refresh,
2813            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2814        })
2815    }
2816
2817    // 5. Attend to `resolved_ids`: The purification might have
2818    // - added references to `mz_timestamp`;
2819    // - removed references to `mz_now`.
2820    if introduced_mz_timestamp {
2821        resolved_ids.add_item(mz_timestamp_id);
2822    }
2823    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2824    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2825    // for `mz_now()` everywhere.
2826    let mut visitor = ExprContainsTemporalVisitor::new();
2827    visitor.visit_create_materialized_view_statement(cmvs);
2828    if !visitor.contains_temporal {
2829        resolved_ids.remove_item(&mz_now_id);
2830    }
2831}
2832
2833/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2834/// after purification.
2835pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2836    match &mvo.value {
2837        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2838            let mut visitor = ExprContainsTemporalVisitor::new();
2839            visitor.visit_expr(time);
2840            visitor.contains_temporal
2841        }
2842        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2843            interval: _,
2844            aligned_to: Some(aligned_to),
2845        }))) => {
2846            let mut visitor = ExprContainsTemporalVisitor::new();
2847            visitor.visit_expr(aligned_to);
2848            visitor.contains_temporal
2849        }
2850        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2851            interval: _,
2852            aligned_to: None,
2853        }))) => {
2854            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2855            // `ALIGNED TO` to `mz_now()`.
2856            true
2857        }
2858        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2859            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2860            true
2861        }
2862        _ => false,
2863    }
2864}
2865
2866/// Determines whether the AST involves `mz_now()`.
2867struct ExprContainsTemporalVisitor {
2868    pub contains_temporal: bool,
2869}
2870
2871impl ExprContainsTemporalVisitor {
2872    pub fn new() -> ExprContainsTemporalVisitor {
2873        ExprContainsTemporalVisitor {
2874            contains_temporal: false,
2875        }
2876    }
2877}
2878
2879impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2880    fn visit_function(&mut self, func: &Function<Aug>) {
2881        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2882        visit_function(self, func);
2883    }
2884}
2885
2886struct MzNowPurifierVisitor {
2887    pub mz_now: Option<Timestamp>,
2888    pub mz_timestamp_type: ResolvedDataType,
2889    pub introduced_mz_timestamp: bool,
2890}
2891
2892impl MzNowPurifierVisitor {
2893    pub fn new(
2894        mz_now: Option<Timestamp>,
2895        mz_timestamp_type: ResolvedDataType,
2896    ) -> MzNowPurifierVisitor {
2897        MzNowPurifierVisitor {
2898            mz_now,
2899            mz_timestamp_type,
2900            introduced_mz_timestamp: false,
2901        }
2902    }
2903}
2904
2905impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2906    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2907        match expr {
2908            Expr::Function(Function {
2909                name:
2910                    ResolvedItemName::Item {
2911                        full_name: FullItemName { item, .. },
2912                        ..
2913                    },
2914                ..
2915            }) if item == &MZ_NOW_NAME.to_string() => {
2916                let mz_now = self.mz_now.expect(
2917                    "we should have chosen a timestamp if the expression contains mz_now()",
2918                );
2919                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2920                // not alter the type of the expression.
2921                *expr = Expr::Cast {
2922                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2923                    data_type: self.mz_timestamp_type.clone(),
2924                };
2925                self.introduced_mz_timestamp = true;
2926            }
2927            _ => visit_expr_mut(self, expr),
2928        }
2929    }
2930}