Skip to main content

mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81    ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::pure::mysql::{ensure_binlog_full_metadata, is_binlog_full_metadata};
88use crate::{kafka_util, normalize};
89
90use self::error::{
91    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103    external_reference: UnresolvedItemName,
104    name: UnresolvedItemName,
105    meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        f.debug_struct("RequestedSourceExport")
111            .field("external_reference", &self.external_reference)
112            .field("name", &self.name)
113            .field("meta", &self.meta)
114            .finish()
115    }
116}
117
118impl<T> RequestedSourceExport<T> {
119    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120        RequestedSourceExport {
121            external_reference: self.external_reference,
122            name: self.name,
123            meta: new_meta,
124        }
125    }
126}
127
128/// Generates a subsource name by prepending source schema name if present
129///
130/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
131/// so that it's generated in the same schema as source
132fn source_export_name_gen(
133    source_name: &UnresolvedItemName,
134    subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137    partial.item = subsource_name.to_string();
138    Ok(UnresolvedItemName::from(partial))
139}
140
141// TODO(database-issues#8620): Remove once subsources are removed
142/// Validates the requested subsources do not have name conflicts with each other
143/// and that the same upstream table is not referenced multiple times.
144fn validate_source_export_names<T>(
145    requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147    // This condition would get caught during the catalog transaction, but produces a
148    // vague, non-contextual error. Instead, error here so we can suggest to the user
149    // how to fix the problem.
150    if let Some(name) = requested_source_exports
151        .iter()
152        .map(|subsource| &subsource.name)
153        .duplicates()
154        .next()
155        .cloned()
156    {
157        let mut upstream_references: Vec<_> = requested_source_exports
158            .into_iter()
159            .filter_map(|subsource| {
160                if &subsource.name == &name {
161                    Some(subsource.external_reference.clone())
162                } else {
163                    None
164                }
165            })
166            .collect();
167
168        upstream_references.sort();
169
170        Err(PlanError::SubsourceNameConflict {
171            name,
172            upstream_references,
173        })?;
174    }
175
176    // We disallow subsource statements from referencing the same upstream table, but we allow
177    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
178    // purification will only provide 1 `requested_source_export`, we can leave this here without
179    // needing to differentiate between the two types of statements.
180    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
181    if let Some(name) = requested_source_exports
182        .iter()
183        .map(|export| &export.external_reference)
184        .duplicates()
185        .next()
186        .cloned()
187    {
188        let mut target_names: Vec<_> = requested_source_exports
189            .into_iter()
190            .filter_map(|export| {
191                if &export.external_reference == &name {
192                    Some(export.name.clone())
193                } else {
194                    None
195                }
196            })
197            .collect();
198
199        target_names.sort();
200
201        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202    }
203
204    Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209    PurifiedCreateSource {
210        // The progress subsource, if we are offloading progress info to a separate relation
211        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212        create_source_stmt: CreateSourceStatement<Aug>,
213        // Map of subsource names to external details
214        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215        /// All the available upstream references that can be added as tables
216        /// to this primary source.
217        available_source_references: SourceReferences,
218    },
219    PurifiedAlterSource {
220        alter_source_stmt: AlterSourceStatement<Aug>,
221    },
222    PurifiedAlterSourceAddSubsources {
223        // This just saves us an annoying catalog lookup
224        source_name: ResolvedItemName,
225        /// Options that we will need the values of to update the source's
226        /// definition.
227        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228        // Map of subsource names to external details
229        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230    },
231    PurifiedAlterSourceRefreshReferences {
232        source_name: ResolvedItemName,
233        /// The updated available upstream references for the primary source.
234        available_source_references: SourceReferences,
235    },
236    PurifiedCreateSink(CreateSinkStatement<Aug>),
237    PurifiedCreateTableFromSource {
238        stmt: CreateTableFromSourceStatement<Aug>,
239    },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244    pub external_reference: UnresolvedItemName,
245    pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250    MySql {
251        table: MySqlTableDesc,
252        text_columns: Option<Vec<Ident>>,
253        exclude_columns: Option<Vec<Ident>>,
254        initial_gtid_set: String,
255        binlog_full_metadata: bool,
256    },
257    Postgres {
258        table: PostgresTableDesc,
259        text_columns: Option<Vec<Ident>>,
260        exclude_columns: Option<Vec<Ident>>,
261    },
262    SqlServer {
263        table: SqlServerTableDesc,
264        text_columns: Option<Vec<Ident>>,
265        excl_columns: Option<Vec<Ident>>,
266        capture_instance: Arc<str>,
267        initial_lsn: mz_sql_server_util::cdc::Lsn,
268    },
269    Kafka {},
270    LoadGenerator {
271        table: Option<RelationDesc>,
272        output: LoadGeneratorOutput,
273    },
274}
275
276/// Purifies a statement, removing any dependencies on external state.
277///
278/// See the section on [purification](crate#purification) in the crate
279/// documentation for details.
280///
281/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
282/// handled by [purify_create_materialized_view_options] instead.
283/// This could be made more consistent by a refactoring discussed here:
284/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
285pub async fn purify_statement(
286    catalog: impl SessionCatalog,
287    now: u64,
288    stmt: Statement<Aug>,
289    storage_configuration: &StorageConfiguration,
290) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
291    match stmt {
292        Statement::CreateSource(stmt) => {
293            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
294            (
295                purify_create_source(catalog, now, stmt, storage_configuration).await,
296                cluster_id,
297            )
298        }
299        Statement::AlterSource(stmt) => (
300            purify_alter_source(catalog, stmt, storage_configuration).await,
301            None,
302        ),
303        Statement::CreateSink(stmt) => {
304            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
305            (
306                purify_create_sink(catalog, stmt, storage_configuration).await,
307                cluster_id,
308            )
309        }
310        Statement::CreateTableFromSource(stmt) => (
311            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
312            None,
313        ),
314        o => (
315            Err(internal_err!(
316                "unexpected statement type in purification: {:?}",
317                o
318            )),
319            None,
320        ),
321    }
322}
323
324/// Injects `DOC ON` comments into all Avro formats that are using a schema
325/// registry by finding all SQL `COMMENT`s that are attached to the sink's
326/// underlying materialized view (as well as any types referenced by that
327/// underlying materialized view).
328pub(crate) fn purify_create_sink_avro_doc_on_options(
329    catalog: &dyn SessionCatalog,
330    from_id: CatalogItemId,
331    format: &mut Option<FormatSpecifier<Aug>>,
332) -> Result<(), PlanError> {
333    // Collect all objects referenced by the sink.
334    let from = catalog.get_item(&from_id);
335    let object_ids = from
336        .references()
337        .items()
338        .copied()
339        .chain_one(from.id())
340        .collect::<Vec<_>>();
341
342    // Collect all Avro formats that use a schema registry, as well as a set of
343    // all identifiers named in user-provided `DOC ON` options.
344    let mut avro_format_options = vec![];
345    for_each_format(format, |doc_on_schema, fmt| match fmt {
346        Format::Avro(AvroSchema::InlineSchema { .. })
347        | Format::Bytes
348        | Format::Csv { .. }
349        | Format::Json { .. }
350        | Format::Protobuf(..)
351        | Format::Regex(..)
352        | Format::Text => (),
353        Format::Avro(AvroSchema::Csr {
354            csr_connection: CsrConnectionAvro { connection, .. },
355        }) => {
356            avro_format_options.push((doc_on_schema, &mut connection.options));
357        }
358    });
359
360    // For each Avro format in the sink, inject the appropriate `DOC ON` options
361    // for each item referenced by the sink.
362    for (for_schema, options) in avro_format_options {
363        let user_provided_comments = options
364            .iter()
365            .filter_map(|CsrConfigOption { name, .. }| match name {
366                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
367                _ => None,
368            })
369            .collect::<BTreeSet<_>>();
370
371        // Adding existing comments if not already provided by user
372        for object_id in &object_ids {
373            // Always add comments to the latest version of the item.
374            let item = catalog
375                .get_item(object_id)
376                .at_version(RelationVersionSelector::Latest);
377            let full_resolved_name = ResolvedItemName::Item {
378                id: *object_id,
379                qualifiers: item.name().qualifiers.clone(),
380                full_name: catalog.resolve_full_name(item.name()),
381                print_id: !matches!(
382                    item.item_type(),
383                    CatalogItemType::Func | CatalogItemType::Type
384                ),
385                version: RelationVersionSelector::Latest,
386            };
387
388            if let Some(comments_map) = catalog.get_item_comments(object_id) {
389                // Attach comment for the item itself, if the user has not
390                // already provided an overriding `DOC ON` option for the item.
391                let doc_on_item_key = AvroDocOn {
392                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
393                    for_schema,
394                };
395                if !user_provided_comments.contains(&doc_on_item_key) {
396                    if let Some(root_comment) = comments_map.get(&None) {
397                        options.push(CsrConfigOption {
398                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
399                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
400                                root_comment.clone(),
401                            ))),
402                        });
403                    }
404                }
405
406                // Attach comment for each column in the item, if the user has
407                // not already provided an overriding `DOC ON` option for the
408                // column.
409                let column_descs = match item.type_details() {
410                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
411                    None => item.relation_desc().map(|d| d.into_owned()),
412                };
413
414                if let Some(desc) = column_descs {
415                    for (pos, column_name) in desc.iter_names().enumerate() {
416                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
417                            let doc_on_column_key = AvroDocOn {
418                                identifier: DocOnIdentifier::Column(ColumnName {
419                                    relation: full_resolved_name.clone(),
420                                    column: ResolvedColumnReference::Column {
421                                        name: column_name.to_owned(),
422                                        index: pos,
423                                    },
424                                }),
425                                for_schema,
426                            };
427                            if !user_provided_comments.contains(&doc_on_column_key) {
428                                options.push(CsrConfigOption {
429                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
430                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
431                                        Value::String(comment_str.clone()),
432                                    )),
433                                });
434                            }
435                        }
436                    }
437                }
438            }
439        }
440    }
441
442    Ok(())
443}
444
445/// Checks that the sink described in the statement can connect to its external
446/// resources.
447async fn purify_create_sink(
448    catalog: impl SessionCatalog,
449    mut create_sink_stmt: CreateSinkStatement<Aug>,
450    storage_configuration: &StorageConfiguration,
451) -> Result<PurifiedStatement, PlanError> {
452    // General purification
453    let CreateSinkStatement {
454        connection,
455        format,
456        with_options,
457        name: _,
458        in_cluster: _,
459        if_not_exists: _,
460        from,
461        envelope: _,
462        mode: _,
463    } = &mut create_sink_stmt;
464
465    // The list of options that the user is allowed to specify.
466    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
467        CreateSinkOptionName::Snapshot,
468        CreateSinkOptionName::CommitInterval,
469    ];
470
471    if let Some(op) = with_options
472        .iter()
473        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
474    {
475        sql_bail!(
476            "CREATE SINK...WITH ({}..) is not allowed",
477            op.name.to_ast_string_simple(),
478        )
479    }
480
481    match &connection {
482        CreateSinkConnection::Kafka {
483            connection,
484            options,
485            key: _,
486            headers: _,
487        } => {
488            // We must not leave any state behind in the Kafka broker, so just ensure that
489            // we can connect. This means we don't ensure that we can create the topic and
490            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
491            // preferable to leaking state in users' environments.
492            let scx = StatementContext::new(None, &catalog);
493            let connection = {
494                let item = scx.get_item_by_resolved_name(connection)?;
495                // Get Kafka connection
496                match item.connection()? {
497                    Connection::Kafka(connection) => {
498                        connection.clone().into_inline_connection(scx.catalog)
499                    }
500                    _ => sql_bail!(
501                        "{} is not a kafka connection",
502                        scx.catalog.resolve_full_name(item.name())
503                    ),
504                }
505            };
506
507            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
508
509            if extracted_options.legacy_ids == Some(true) {
510                sql_bail!("LEGACY IDs option is not supported");
511            }
512
513            let client: AdminClient<_> = connection
514                .create_with_context(
515                    storage_configuration,
516                    MzClientContext::default(),
517                    &BTreeMap::new(),
518                    InTask::No,
519                )
520                .await
521                .map_err(|e| {
522                    // anyhow doesn't support Clone, so not trivial to move into PlanError
523                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
524                })?;
525
526            let metadata = client
527                .inner()
528                .fetch_metadata(
529                    None,
530                    storage_configuration
531                        .parameters
532                        .kafka_timeout_config
533                        .fetch_metadata_timeout,
534                )
535                .map_err(|e| {
536                    KafkaSinkPurificationError::AdminClientError(Arc::new(
537                        ContextCreationError::KafkaError(e),
538                    ))
539                })?;
540
541            if metadata.brokers().len() == 0 {
542                Err(KafkaSinkPurificationError::ZeroBrokers)?;
543            }
544        }
545        CreateSinkConnection::Iceberg {
546            connection,
547            aws_connection,
548            ..
549        } => {
550            let scx = StatementContext::new(None, &catalog);
551            let connection = {
552                let item = scx.get_item_by_resolved_name(connection)?;
553                // Get Iceberg connection
554                match item.connection()? {
555                    Connection::IcebergCatalog(connection) => {
556                        connection.clone().into_inline_connection(scx.catalog)
557                    }
558                    _ => sql_bail!(
559                        "{} is not an iceberg connection",
560                        scx.catalog.resolve_full_name(item.name())
561                    ),
562                }
563            };
564
565            let aws_conn_id = aws_connection.item_id();
566
567            let aws_connection = {
568                let item = scx.get_item_by_resolved_name(aws_connection)?;
569                // Get AWS connection
570                match item.connection()? {
571                    Connection::Aws(aws_connection) => aws_connection.clone(),
572                    _ => sql_bail!(
573                        "{} is not an aws connection",
574                        scx.catalog.resolve_full_name(item.name())
575                    ),
576                }
577            };
578
579            // For S3 Tables connections in the Materialize Cloud product, verify the
580            // AWS region matches the environment's region. This check only applies when
581            // the enable_s3_tables_region_check dyncfg is set.
582            if let Some(s3tables) = connection.s3tables_catalog() {
583                let enable_region_check =
584                    ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
585                if enable_region_check {
586                    let env_id = &catalog.config().environment_id;
587                    if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
588                        let env_region = env_id.cloud_provider_region();
589                        // Later on we default to "us-east-1" if the region is not set on the S3 Tables
590                        // connection, so we need to do the same check here.
591                        let s3_tables_region = s3tables
592                            .aws_connection
593                            .connection
594                            .region
595                            .clone()
596                            .unwrap_or_else(|| "us-east-1".to_string());
597                        if s3_tables_region != env_region {
598                            Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
599                                s3_tables_region,
600                                environment_region: env_region.to_string(),
601                            })?;
602                        }
603                    }
604                }
605            }
606
607            let _catalog = connection
608                .connect(storage_configuration, InTask::No)
609                .await
610                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
611
612            let _sdk_config = aws_connection
613                .load_sdk_config(
614                    &storage_configuration.connection_context,
615                    aws_conn_id.clone(),
616                    InTask::No,
617                    mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
618                        .get(storage_configuration.config_set()),
619                )
620                .await
621                .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
622        }
623    }
624
625    let mut csr_connection_ids = BTreeSet::new();
626    for_each_format(format, |_, fmt| match fmt {
627        Format::Avro(AvroSchema::InlineSchema { .. })
628        | Format::Bytes
629        | Format::Csv { .. }
630        | Format::Json { .. }
631        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
632        | Format::Regex(..)
633        | Format::Text => (),
634        Format::Avro(AvroSchema::Csr {
635            csr_connection: CsrConnectionAvro { connection, .. },
636        })
637        | Format::Protobuf(ProtobufSchema::Csr {
638            csr_connection: CsrConnectionProtobuf { connection, .. },
639        }) => {
640            csr_connection_ids.insert(*connection.connection.item_id());
641        }
642    });
643
644    let scx = StatementContext::new(None, &catalog);
645    for csr_connection_id in csr_connection_ids {
646        let connection = {
647            let item = scx.get_item(&csr_connection_id);
648            // Get Kafka connection
649            match item.connection()? {
650                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
651                _ => Err(CsrPurificationError::NotCsrConnection(
652                    scx.catalog.resolve_full_name(item.name()),
653                ))?,
654            }
655        };
656
657        let client = connection
658            .connect(storage_configuration, InTask::No)
659            .await
660            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
661
662        client
663            .list_subjects()
664            .await
665            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
666    }
667
668    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
669
670    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
671}
672
673/// Runs a function on each format within the format specifier.
674///
675/// The function is provided with a `DocOnSchema` that indicates whether the
676/// format is for the key, value, or both.
677///
678// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
679fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
680where
681    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
682{
683    match format {
684        None => (),
685        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
686        Some(FormatSpecifier::KeyValue { key, value }) => {
687            f(DocOnSchema::KeyOnly, key);
688            f(DocOnSchema::ValueOnly, value);
689        }
690    }
691}
692
693/// Defines whether purification should enforce that at least one valid source
694/// reference is provided on the provided statement.
695#[derive(Debug, Copy, Clone, PartialEq, Eq)]
696pub(crate) enum SourceReferencePolicy {
697    /// Don't allow source references to be provided. This is used for
698    /// enforcing that `CREATE SOURCE` statements don't create subsources.
699    NotAllowed,
700    /// Allow empty references, such as when creating a source that
701    /// will have tables added afterwards.
702    Optional,
703    /// Require that at least one reference is resolved to an upstream
704    /// object.
705    Required,
706}
707
708async fn purify_create_source(
709    catalog: impl SessionCatalog,
710    now: u64,
711    mut create_source_stmt: CreateSourceStatement<Aug>,
712    storage_configuration: &StorageConfiguration,
713) -> Result<PurifiedStatement, PlanError> {
714    let CreateSourceStatement {
715        name: source_name,
716        col_names,
717        key_constraint,
718        connection: source_connection,
719        format,
720        envelope,
721        include_metadata,
722        external_references,
723        progress_subsource,
724        with_options,
725        ..
726    } = &mut create_source_stmt;
727
728    let uses_old_syntax = !col_names.is_empty()
729        || key_constraint.is_some()
730        || format.is_some()
731        || envelope.is_some()
732        || !include_metadata.is_empty()
733        || external_references.is_some()
734        || progress_subsource.is_some();
735
736    if let Some(DeferredItemName::Named(_)) = progress_subsource {
737        sql_bail!("Cannot manually ID qualify progress subsource")
738    }
739
740    let mut requested_subsource_map = BTreeMap::new();
741
742    let progress_desc = match &source_connection {
743        CreateSourceConnection::Kafka { .. } => {
744            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
745        }
746        CreateSourceConnection::Postgres { .. } => {
747            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
748        }
749        CreateSourceConnection::SqlServer { .. } => {
750            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
751        }
752        CreateSourceConnection::MySql { .. } => {
753            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
754        }
755        CreateSourceConnection::LoadGenerator { .. } => {
756            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
757        }
758    };
759    let scx = StatementContext::new(None, &catalog);
760
761    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
762    // to add tables after this source is created we might need to enforce that
763    // auto-generated subsources are created or not created by this source statement.
764    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
765        && scx.catalog.system_vars().force_source_table_syntax()
766    {
767        SourceReferencePolicy::NotAllowed
768    } else if scx.catalog.system_vars().enable_create_table_from_source() {
769        SourceReferencePolicy::Optional
770    } else {
771        SourceReferencePolicy::Required
772    };
773
774    let mut format_options = SourceFormatOptions::Default;
775
776    let retrieved_source_references: RetrievedSourceReferences;
777
778    match source_connection {
779        CreateSourceConnection::Kafka {
780            connection,
781            options: base_with_options,
782            ..
783        } => {
784            if let Some(external_references) = external_references {
785                Err(KafkaSourcePurificationError::ReferencedSubsources(
786                    external_references.clone(),
787                ))?;
788            }
789
790            let connection = {
791                let item = scx.get_item_by_resolved_name(connection)?;
792                // Get Kafka connection
793                match item.connection()? {
794                    Connection::Kafka(connection) => {
795                        connection.clone().into_inline_connection(&catalog)
796                    }
797                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
798                        scx.catalog.resolve_full_name(item.name()),
799                    ))?,
800                }
801            };
802
803            let extracted_options: KafkaSourceConfigOptionExtracted =
804                base_with_options.clone().try_into()?;
805
806            let topic = extracted_options
807                .topic
808                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
809
810            let consumer = connection
811                .create_with_context(
812                    storage_configuration,
813                    MzClientContext::default(),
814                    &BTreeMap::new(),
815                    InTask::No,
816                )
817                .await
818                .map_err(|e| {
819                    // anyhow doesn't support Clone, so not trivial to move into PlanError
820                    KafkaSourcePurificationError::KafkaConsumerError(
821                        e.display_with_causes().to_string(),
822                    )
823                })?;
824            let consumer = Arc::new(consumer);
825
826            match (
827                extracted_options.start_offset,
828                extracted_options.start_timestamp,
829            ) {
830                (None, None) => {
831                    // Validate that the topic at least exists.
832                    kafka_util::ensure_topic_exists(
833                        Arc::clone(&consumer),
834                        &topic,
835                        storage_configuration
836                            .parameters
837                            .kafka_timeout_config
838                            .fetch_metadata_timeout,
839                    )
840                    .await?;
841                }
842                (Some(_), Some(_)) => {
843                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
844                }
845                (Some(start_offsets), None) => {
846                    // Validate the start offsets.
847                    kafka_util::validate_start_offsets(
848                        Arc::clone(&consumer),
849                        &topic,
850                        start_offsets,
851                        storage_configuration
852                            .parameters
853                            .kafka_timeout_config
854                            .fetch_metadata_timeout,
855                    )
856                    .await?;
857                }
858                (None, Some(time_offset)) => {
859                    // Translate `START TIMESTAMP` to a start offset.
860                    let start_offsets = kafka_util::lookup_start_offsets(
861                        Arc::clone(&consumer),
862                        &topic,
863                        time_offset,
864                        now,
865                        storage_configuration
866                            .parameters
867                            .kafka_timeout_config
868                            .fetch_metadata_timeout,
869                    )
870                    .await?;
871
872                    base_with_options.retain(|val| {
873                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
874                    });
875                    base_with_options.push(KafkaSourceConfigOption {
876                        name: KafkaSourceConfigOptionName::StartOffset,
877                        value: Some(WithOptionValue::Sequence(
878                            start_offsets
879                                .iter()
880                                .map(|offset| {
881                                    WithOptionValue::Value(Value::Number(offset.to_string()))
882                                })
883                                .collect(),
884                        )),
885                    });
886                }
887            }
888
889            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
890            retrieved_source_references = reference_client.get_source_references().await?;
891
892            format_options = SourceFormatOptions::Kafka { topic };
893        }
894        CreateSourceConnection::Postgres {
895            connection,
896            options,
897        } => {
898            let connection_item = scx.get_item_by_resolved_name(connection)?;
899            let connection = match connection_item.connection().map_err(PlanError::from)? {
900                Connection::Postgres(connection) => {
901                    connection.clone().into_inline_connection(&catalog)
902                }
903                _ => Err(PgSourcePurificationError::NotPgConnection(
904                    scx.catalog.resolve_full_name(connection_item.name()),
905                ))?,
906            };
907            let crate::plan::statement::PgConfigOptionExtracted {
908                publication,
909                text_columns,
910                exclude_columns,
911                details,
912                ..
913            } = options.clone().try_into()?;
914            let publication =
915                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
916
917            if details.is_some() {
918                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
919            }
920
921            let client = connection
922                .validate(connection_item.id(), storage_configuration)
923                .await?;
924
925            let reference_client = SourceReferenceClient::Postgres {
926                client: &client,
927                publication: &publication,
928                database: &connection.database,
929            };
930            retrieved_source_references = reference_client.get_source_references().await?;
931
932            let postgres::PurifiedSourceExports {
933                source_exports: subsources,
934                normalized_text_columns,
935            } = postgres::purify_source_exports(
936                &client,
937                &retrieved_source_references,
938                external_references,
939                text_columns,
940                exclude_columns,
941                source_name,
942                &reference_policy,
943            )
944            .await?;
945
946            if let Some(text_cols_option) = options
947                .iter_mut()
948                .find(|option| option.name == PgConfigOptionName::TextColumns)
949            {
950                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
951            }
952
953            requested_subsource_map.extend(subsources);
954
955            // Record the active replication timeline_id to allow detection of a future upstream
956            // point-in-time-recovery that will put the source into an error state.
957            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
958
959            // Remove any old detail references
960            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
961            let details = PostgresSourcePublicationDetails {
962                slot: format!(
963                    "materialize_{}",
964                    Uuid::new_v4().to_string().replace('-', "")
965                ),
966                timeline_id: Some(timeline_id),
967                database: connection.database,
968            };
969            options.push(PgConfigOption {
970                name: PgConfigOptionName::Details,
971                value: Some(WithOptionValue::Value(Value::String(hex::encode(
972                    details.into_proto().encode_to_vec(),
973                )))),
974            })
975        }
976        CreateSourceConnection::SqlServer {
977            connection,
978            options,
979        } => {
980            // Connect to the upstream SQL Server instance so we can validate
981            // we're compatible with CDC.
982            let connection_item = scx.get_item_by_resolved_name(connection)?;
983            let connection = match connection_item.connection()? {
984                Connection::SqlServer(connection) => {
985                    connection.clone().into_inline_connection(&catalog)
986                }
987                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
988                    scx.catalog.resolve_full_name(connection_item.name()),
989                ))?,
990            };
991            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
992                details,
993                text_columns,
994                exclude_columns,
995                seen: _,
996            } = options.clone().try_into()?;
997
998            if details.is_some() {
999                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
1000            }
1001
1002            let mut client = connection
1003                .validate(connection_item.id(), storage_configuration)
1004                .await?;
1005
1006            let database: Arc<str> = connection.database.into();
1007            let reference_client = SourceReferenceClient::SqlServer {
1008                client: &mut client,
1009                database: Arc::clone(&database),
1010            };
1011            retrieved_source_references = reference_client.get_source_references().await?;
1012            tracing::debug!(?retrieved_source_references, "got source references");
1013
1014            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1015                .get(storage_configuration.config_set());
1016
1017            let purified_source_exports = sql_server::purify_source_exports(
1018                &*database,
1019                &mut client,
1020                &retrieved_source_references,
1021                external_references,
1022                &text_columns,
1023                &exclude_columns,
1024                source_name,
1025                timeout,
1026                &reference_policy,
1027            )
1028            .await?;
1029
1030            let sql_server::PurifiedSourceExports {
1031                source_exports,
1032                normalized_text_columns,
1033                normalized_excl_columns,
1034            } = purified_source_exports;
1035
1036            // Update our set of requested source exports.
1037            requested_subsource_map.extend(source_exports);
1038
1039            // Record the most recent restore_history_id, or none if the system has never been
1040            // restored.
1041
1042            let restore_history_id =
1043                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1044            let details = SqlServerSourceExtras { restore_history_id };
1045
1046            options.retain(|SqlServerConfigOption { name, .. }| {
1047                name != &SqlServerConfigOptionName::Details
1048            });
1049            options.push(SqlServerConfigOption {
1050                name: SqlServerConfigOptionName::Details,
1051                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1052                    details.into_proto().encode_to_vec(),
1053                )))),
1054            });
1055
1056            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1057            if let Some(text_cols_option) = options
1058                .iter_mut()
1059                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1060            {
1061                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1062            }
1063            if let Some(excl_cols_option) = options
1064                .iter_mut()
1065                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1066            {
1067                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1068            }
1069        }
1070        CreateSourceConnection::MySql {
1071            connection,
1072            options,
1073        } => {
1074            let connection_item = scx.get_item_by_resolved_name(connection)?;
1075            let connection = match connection_item.connection()? {
1076                Connection::MySql(connection) => {
1077                    connection.clone().into_inline_connection(&catalog)
1078                }
1079                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1080                    scx.catalog.resolve_full_name(connection_item.name()),
1081                ))?,
1082            };
1083            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1084                details,
1085                text_columns,
1086                exclude_columns,
1087                seen: _,
1088            } = options.clone().try_into()?;
1089
1090            if details.is_some() {
1091                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1092            }
1093
1094            let mut conn = connection
1095                .validate(connection_item.id(), storage_configuration)
1096                .await
1097                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1098
1099            // Retrieve the current @gtid_executed value of the server to mark as the effective
1100            // initial snapshot point such that we can ensure consistency if the initial source
1101            // snapshot is broken up over multiple points in time.
1102            let initial_gtid_set =
1103                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1104
1105            let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1106
1107            let reference_client = SourceReferenceClient::MySql {
1108                conn: &mut conn,
1109                include_system_schemas: mysql::references_system_schemas(external_references),
1110            };
1111            retrieved_source_references = reference_client.get_source_references().await?;
1112
1113            let mysql::PurifiedSourceExports {
1114                source_exports: subsources,
1115                normalized_text_columns,
1116                normalized_exclude_columns,
1117            } = mysql::purify_source_exports(
1118                &mut conn,
1119                &retrieved_source_references,
1120                external_references,
1121                text_columns,
1122                exclude_columns,
1123                source_name,
1124                initial_gtid_set.clone(),
1125                &reference_policy,
1126                binlog_full_metadata,
1127            )
1128            .await?;
1129            requested_subsource_map.extend(subsources);
1130
1131            // We don't have any fields in this details struct but keep this around for
1132            // conformity with postgres and in-case we end up needing it again in the future.
1133            let details = MySqlSourceDetails {};
1134            // Update options with the purified details
1135            options
1136                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1137            options.push(MySqlConfigOption {
1138                name: MySqlConfigOptionName::Details,
1139                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1140                    details.into_proto().encode_to_vec(),
1141                )))),
1142            });
1143
1144            if let Some(text_cols_option) = options
1145                .iter_mut()
1146                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1147            {
1148                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1149            }
1150            if let Some(exclude_cols_option) = options
1151                .iter_mut()
1152                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1153            {
1154                exclude_cols_option.value =
1155                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1156            }
1157        }
1158        CreateSourceConnection::LoadGenerator { generator, options } => {
1159            let load_generator =
1160                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1161
1162            let reference_client = SourceReferenceClient::LoadGenerator {
1163                generator: &load_generator,
1164            };
1165            retrieved_source_references = reference_client.get_source_references().await?;
1166            // Filter to the references that need to be created as 'subsources', which
1167            // doesn't include the default output for single-output sources.
1168            // TODO(database-issues#8620): Remove once subsources are removed
1169            let multi_output_sources =
1170                retrieved_source_references
1171                    .all_references()
1172                    .iter()
1173                    .any(|r| {
1174                        matches!(
1175                            r.load_generator_output(),
1176                            Some(output) if output != &LoadGeneratorOutput::Default
1177                        )
1178                    });
1179
1180            match external_references {
1181                Some(requested)
1182                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1183                {
1184                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1185                }
1186                Some(requested) if !multi_output_sources => match requested {
1187                    ExternalReferences::SubsetTables(_) => {
1188                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1189                    }
1190                    ExternalReferences::SubsetSchemas(_) => {
1191                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1192                    }
1193                    ExternalReferences::All => {
1194                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1195                    }
1196                },
1197                Some(requested) => {
1198                    let requested_exports = retrieved_source_references
1199                        .requested_source_exports(Some(requested), source_name)?;
1200                    for export in requested_exports {
1201                        requested_subsource_map.insert(
1202                            export.name,
1203                            PurifiedSourceExport {
1204                                external_reference: export.external_reference,
1205                                details: PurifiedExportDetails::LoadGenerator {
1206                                    table: export
1207                                        .meta
1208                                        .load_generator_desc()
1209                                        .ok_or_else(|| {
1210                                            internal_err!(
1211                                                "expected load generator source reference"
1212                                            )
1213                                        })?
1214                                        .clone(),
1215                                    output: export
1216                                        .meta
1217                                        .load_generator_output()
1218                                        .ok_or_else(|| {
1219                                            internal_err!(
1220                                                "expected load generator source reference"
1221                                            )
1222                                        })?
1223                                        .clone(),
1224                                },
1225                            },
1226                        );
1227                    }
1228                }
1229                None => {
1230                    if multi_output_sources
1231                        && matches!(reference_policy, SourceReferencePolicy::Required)
1232                    {
1233                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1234                    }
1235                }
1236            }
1237
1238            if let LoadGenerator::Clock = generator {
1239                if !options
1240                    .iter()
1241                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1242                {
1243                    let now = catalog.now();
1244                    options.push(LoadGeneratorOption {
1245                        name: LoadGeneratorOptionName::AsOf,
1246                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1247                    });
1248                }
1249            }
1250        }
1251    }
1252
1253    // Now that we know which subsources to create alongside this
1254    // statement, remove the references so it is not canonicalized as
1255    // part of the `CREATE SOURCE` statement in the catalog.
1256    *external_references = None;
1257
1258    // Generate progress subsource for old syntax
1259    let create_progress_subsource_stmt = if uses_old_syntax {
1260        // Take name from input or generate name
1261        let name = match progress_subsource {
1262            Some(name) => match name {
1263                DeferredItemName::Deferred(name) => name.clone(),
1264                // Already checked for this value above.
1265                DeferredItemName::Named(_) => {
1266                    sql_bail!("progress subsource name cannot be a resolved name")
1267                }
1268            },
1269            None => {
1270                let (item, prefix) = source_name
1271                    .0
1272                    .split_last()
1273                    .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1274                let item_name =
1275                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1276                        let mut suggested_name = prefix.to_vec();
1277                        suggested_name.push(candidate.clone());
1278
1279                        let partial =
1280                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1281                        let qualified = scx.allocate_qualified_name(partial)?;
1282                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1283                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1284                        Ok::<_, PlanError>(!item_exists && !type_exists)
1285                    })?;
1286
1287                let mut full_name = prefix.to_vec();
1288                full_name.push(item_name);
1289                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1290                let qualified_name = scx.allocate_qualified_name(full_name)?;
1291                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1292
1293                UnresolvedItemName::from(full_name.clone())
1294            }
1295        };
1296
1297        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1298
1299        // Create the subsource statement
1300        let mut progress_with_options: Vec<_> = with_options
1301            .iter()
1302            .filter_map(|opt| match opt.name {
1303                CreateSourceOptionName::TimestampInterval => None,
1304                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1305                    name: CreateSubsourceOptionName::RetainHistory,
1306                    value: opt.value.clone(),
1307                }),
1308            })
1309            .collect();
1310        progress_with_options.push(CreateSubsourceOption {
1311            name: CreateSubsourceOptionName::Progress,
1312            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1313        });
1314
1315        Some(CreateSubsourceStatement {
1316            name,
1317            columns,
1318            // Progress subsources do not refer to the source to which they belong.
1319            // Instead the primary source depends on it (the opposite is true of
1320            // ingestion exports, which depend on the primary source).
1321            of_source: None,
1322            constraints,
1323            if_not_exists: false,
1324            with_options: progress_with_options,
1325        })
1326    } else {
1327        None
1328    };
1329
1330    purify_source_format(
1331        &catalog,
1332        format,
1333        &format_options,
1334        envelope,
1335        storage_configuration,
1336    )
1337    .await?;
1338
1339    Ok(PurifiedStatement::PurifiedCreateSource {
1340        create_progress_subsource_stmt,
1341        create_source_stmt,
1342        subsources: requested_subsource_map,
1343        available_source_references: retrieved_source_references.available_source_references(),
1344    })
1345}
1346
1347/// On success, returns the details on new subsources and updated
1348/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1349async fn purify_alter_source(
1350    catalog: impl SessionCatalog,
1351    stmt: AlterSourceStatement<Aug>,
1352    storage_configuration: &StorageConfiguration,
1353) -> Result<PurifiedStatement, PlanError> {
1354    let scx = StatementContext::new(None, &catalog);
1355    let AlterSourceStatement {
1356        source_name: unresolved_source_name,
1357        action,
1358        if_exists,
1359    } = stmt;
1360
1361    // Get name.
1362    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1363        Ok(item) => item,
1364        Err(_) if if_exists => {
1365            return Ok(PurifiedStatement::PurifiedAlterSource {
1366                alter_source_stmt: AlterSourceStatement {
1367                    source_name: unresolved_source_name,
1368                    action,
1369                    if_exists,
1370                },
1371            });
1372        }
1373        Err(e) => return Err(e),
1374    };
1375
1376    // Ensure it's an ingestion-based and alterable source.
1377    let desc = match item.source_desc()? {
1378        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1379        None => {
1380            sql_bail!("cannot ALTER this type of source")
1381        }
1382    };
1383
1384    let source_name = item.name();
1385
1386    let resolved_source_name = ResolvedItemName::Item {
1387        id: item.id(),
1388        qualifiers: item.name().qualifiers.clone(),
1389        full_name: scx.catalog.resolve_full_name(source_name),
1390        print_id: true,
1391        version: RelationVersionSelector::Latest,
1392    };
1393
1394    let partial_name = scx.catalog.minimal_qualification(source_name);
1395
1396    match action {
1397        AlterSourceAction::AddSubsources {
1398            external_references,
1399            options,
1400        } => {
1401            if scx.catalog.system_vars().enable_create_table_from_source()
1402                && scx.catalog.system_vars().force_source_table_syntax()
1403            {
1404                Err(PlanError::UseTablesForSources(
1405                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1406                ))?;
1407            }
1408
1409            purify_alter_source_add_subsources(
1410                external_references,
1411                options,
1412                desc,
1413                partial_name,
1414                unresolved_source_name,
1415                resolved_source_name,
1416                storage_configuration,
1417            )
1418            .await
1419        }
1420        AlterSourceAction::RefreshReferences => {
1421            purify_alter_source_refresh_references(
1422                desc,
1423                resolved_source_name,
1424                storage_configuration,
1425            )
1426            .await
1427        }
1428        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1429            alter_source_stmt: AlterSourceStatement {
1430                source_name: unresolved_source_name,
1431                action,
1432                if_exists,
1433            },
1434        }),
1435    }
1436}
1437
1438// TODO(database-issues#8620): Remove once subsources are removed
1439/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1440async fn purify_alter_source_add_subsources(
1441    external_references: Vec<ExternalReferenceExport>,
1442    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1443    desc: SourceDesc,
1444    partial_source_name: PartialItemName,
1445    unresolved_source_name: UnresolvedItemName,
1446    resolved_source_name: ResolvedItemName,
1447    storage_configuration: &StorageConfiguration,
1448) -> Result<PurifiedStatement, PlanError> {
1449    // Validate this is a source that can have subsources added.
1450    let connection_id = match &desc.connection {
1451        GenericSourceConnection::Postgres(c) => c.connection_id,
1452        GenericSourceConnection::MySql(c) => c.connection_id,
1453        GenericSourceConnection::SqlServer(c) => c.connection_id,
1454        _ => sql_bail!(
1455            "source {} does not support ALTER SOURCE.",
1456            partial_source_name
1457        ),
1458    };
1459
1460    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1461        text_columns,
1462        exclude_columns,
1463        details,
1464        seen: _,
1465    } = options.clone().try_into()?;
1466    if details.is_some() {
1467        sql_bail!("DETAILS option cannot be explicitly set");
1468    }
1469
1470    let mut requested_subsource_map = BTreeMap::new();
1471
1472    match desc.connection {
1473        GenericSourceConnection::Postgres(pg_source_connection) => {
1474            // Get PostgresConnection for generating subsources.
1475            let pg_connection = &pg_source_connection.connection;
1476
1477            let client = pg_connection
1478                .validate(connection_id, storage_configuration)
1479                .await?;
1480
1481            let reference_client = SourceReferenceClient::Postgres {
1482                client: &client,
1483                publication: &pg_source_connection.publication,
1484                database: &pg_connection.database,
1485            };
1486            let retrieved_source_references = reference_client.get_source_references().await?;
1487
1488            let postgres::PurifiedSourceExports {
1489                source_exports: subsources,
1490                normalized_text_columns,
1491            } = postgres::purify_source_exports(
1492                &client,
1493                &retrieved_source_references,
1494                &Some(ExternalReferences::SubsetTables(external_references)),
1495                text_columns,
1496                exclude_columns,
1497                &unresolved_source_name,
1498                &SourceReferencePolicy::Required,
1499            )
1500            .await?;
1501
1502            if let Some(text_cols_option) = options
1503                .iter_mut()
1504                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1505            {
1506                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1507            }
1508
1509            requested_subsource_map.extend(subsources);
1510        }
1511        GenericSourceConnection::MySql(mysql_source_connection) => {
1512            let mysql_connection = &mysql_source_connection.connection;
1513            let config = mysql_connection
1514                .config(
1515                    &storage_configuration.connection_context.secrets_reader,
1516                    storage_configuration,
1517                    InTask::No,
1518                )
1519                .await?;
1520
1521            let mut conn = config
1522                .connect(
1523                    "mysql purification",
1524                    &storage_configuration.connection_context.ssh_tunnel_manager,
1525                )
1526                .await?;
1527
1528            // Retrieve the current @gtid_executed value of the server to mark as the effective
1529            // initial snapshot point for these subsources.
1530            let initial_gtid_set =
1531                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1532
1533            let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1534
1535            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1536
1537            let reference_client = SourceReferenceClient::MySql {
1538                conn: &mut conn,
1539                include_system_schemas: mysql::references_system_schemas(&requested_references),
1540            };
1541            let retrieved_source_references = reference_client.get_source_references().await?;
1542
1543            let mysql::PurifiedSourceExports {
1544                source_exports: subsources,
1545                normalized_text_columns,
1546                normalized_exclude_columns,
1547            } = mysql::purify_source_exports(
1548                &mut conn,
1549                &retrieved_source_references,
1550                &requested_references,
1551                text_columns,
1552                exclude_columns,
1553                &unresolved_source_name,
1554                initial_gtid_set,
1555                &SourceReferencePolicy::Required,
1556                binlog_full_metadata,
1557            )
1558            .await?;
1559            requested_subsource_map.extend(subsources);
1560
1561            // Update options with the purified details
1562            if let Some(text_cols_option) = options
1563                .iter_mut()
1564                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1565            {
1566                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1567            }
1568            if let Some(exclude_cols_option) = options
1569                .iter_mut()
1570                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1571            {
1572                exclude_cols_option.value =
1573                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1574            }
1575        }
1576        GenericSourceConnection::SqlServer(sql_server_source) => {
1577            // Open a connection to the upstream SQL Server instance.
1578            let sql_server_connection = &sql_server_source.connection;
1579            let config = sql_server_connection
1580                .resolve_config(
1581                    &storage_configuration.connection_context.secrets_reader,
1582                    storage_configuration,
1583                    InTask::No,
1584                )
1585                .await?;
1586            let mut client = mz_sql_server_util::Client::connect(config).await?;
1587
1588            // Query the upstream SQL Server instance for available tables to replicate.
1589            let database = sql_server_connection.database.clone().into();
1590            let source_references = SourceReferenceClient::SqlServer {
1591                client: &mut client,
1592                database: Arc::clone(&database),
1593            }
1594            .get_source_references()
1595            .await?;
1596            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1597
1598            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1599                .get(storage_configuration.config_set());
1600
1601            let result = sql_server::purify_source_exports(
1602                &*database,
1603                &mut client,
1604                &source_references,
1605                &requested_references,
1606                &text_columns,
1607                &exclude_columns,
1608                &unresolved_source_name,
1609                timeout,
1610                &SourceReferencePolicy::Required,
1611            )
1612            .await;
1613            let sql_server::PurifiedSourceExports {
1614                source_exports,
1615                normalized_text_columns,
1616                normalized_excl_columns,
1617            } = result?;
1618
1619            // Add the new exports to our subsource map.
1620            requested_subsource_map.extend(source_exports);
1621
1622            // Update options on the CREATE SOURCE statement with the purified details.
1623            if let Some(text_cols_option) = options
1624                .iter_mut()
1625                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1626            {
1627                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1628            }
1629            if let Some(exclude_cols_option) = options
1630                .iter_mut()
1631                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1632            {
1633                exclude_cols_option.value =
1634                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1635            }
1636        }
1637        // The source kind was already established earlier in this function;
1638        // reaching this catch-all is an internal invariant violation.
1639        _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1640    };
1641
1642    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1643        source_name: resolved_source_name,
1644        options,
1645        subsources: requested_subsource_map,
1646    })
1647}
1648
1649async fn purify_alter_source_refresh_references(
1650    desc: SourceDesc,
1651    resolved_source_name: ResolvedItemName,
1652    storage_configuration: &StorageConfiguration,
1653) -> Result<PurifiedStatement, PlanError> {
1654    let retrieved_source_references = match desc.connection {
1655        GenericSourceConnection::Postgres(pg_source_connection) => {
1656            // Get PostgresConnection for generating subsources.
1657            let pg_connection = &pg_source_connection.connection;
1658
1659            let config = pg_connection
1660                .config(
1661                    &storage_configuration.connection_context.secrets_reader,
1662                    storage_configuration,
1663                    InTask::No,
1664                )
1665                .await?;
1666
1667            let client = config
1668                .connect(
1669                    "postgres_purification",
1670                    &storage_configuration.connection_context.ssh_tunnel_manager,
1671                )
1672                .await?;
1673            let reference_client = SourceReferenceClient::Postgres {
1674                client: &client,
1675                publication: &pg_source_connection.publication,
1676                database: &pg_connection.database,
1677            };
1678            reference_client.get_source_references().await?
1679        }
1680        GenericSourceConnection::MySql(mysql_source_connection) => {
1681            let mysql_connection = &mysql_source_connection.connection;
1682            let config = mysql_connection
1683                .config(
1684                    &storage_configuration.connection_context.secrets_reader,
1685                    storage_configuration,
1686                    InTask::No,
1687                )
1688                .await?;
1689
1690            let mut conn = config
1691                .connect(
1692                    "mysql purification",
1693                    &storage_configuration.connection_context.ssh_tunnel_manager,
1694                )
1695                .await?;
1696
1697            let reference_client = SourceReferenceClient::MySql {
1698                conn: &mut conn,
1699                include_system_schemas: false,
1700            };
1701            reference_client.get_source_references().await?
1702        }
1703        GenericSourceConnection::SqlServer(sql_server_source) => {
1704            // Open a connection to the upstream SQL Server instance.
1705            let sql_server_connection = &sql_server_source.connection;
1706            let config = sql_server_connection
1707                .resolve_config(
1708                    &storage_configuration.connection_context.secrets_reader,
1709                    storage_configuration,
1710                    InTask::No,
1711                )
1712                .await?;
1713            let mut client = mz_sql_server_util::Client::connect(config).await?;
1714
1715            // Query the upstream SQL Server instance for available tables to replicate.
1716            let source_references = SourceReferenceClient::SqlServer {
1717                client: &mut client,
1718                database: sql_server_connection.database.clone().into(),
1719            }
1720            .get_source_references()
1721            .await?;
1722            source_references
1723        }
1724        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1725            let reference_client = SourceReferenceClient::LoadGenerator {
1726                generator: &load_gen_connection.load_generator,
1727            };
1728            reference_client.get_source_references().await?
1729        }
1730        GenericSourceConnection::Kafka(kafka_conn) => {
1731            let reference_client = SourceReferenceClient::Kafka {
1732                topic: &kafka_conn.topic,
1733            };
1734            reference_client.get_source_references().await?
1735        }
1736    };
1737    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1738        source_name: resolved_source_name,
1739        available_source_references: retrieved_source_references.available_source_references(),
1740    })
1741}
1742
1743async fn purify_create_table_from_source(
1744    catalog: impl SessionCatalog,
1745    mut stmt: CreateTableFromSourceStatement<Aug>,
1746    storage_configuration: &StorageConfiguration,
1747) -> Result<PurifiedStatement, PlanError> {
1748    let scx = StatementContext::new(None, &catalog);
1749    let CreateTableFromSourceStatement {
1750        name: _,
1751        columns,
1752        constraints,
1753        source: source_name,
1754        if_not_exists: _,
1755        external_reference,
1756        format,
1757        envelope,
1758        include_metadata: _,
1759        with_options,
1760    } = &mut stmt;
1761
1762    // Columns and constraints cannot be specified by the user but will be populated below.
1763    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1764        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1765    }
1766    if !constraints.is_empty() {
1767        sql_bail!(
1768            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1769        );
1770    }
1771
1772    // Get the source item
1773    let item = match scx.get_item_by_resolved_name(source_name) {
1774        Ok(item) => item,
1775        Err(e) => return Err(e),
1776    };
1777
1778    // Ensure it's an ingestion-based and alterable source.
1779    let desc = match item.source_desc()? {
1780        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1781        None => {
1782            sql_bail!("cannot ALTER this type of source")
1783        }
1784    };
1785    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1786
1787    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1788        text_columns,
1789        exclude_columns,
1790        retain_history: _,
1791        details,
1792        partition_by: _,
1793        seen: _,
1794    } = with_options.clone().try_into()?;
1795    if details.is_some() {
1796        sql_bail!("DETAILS option cannot be explicitly set");
1797    }
1798
1799    // Our text column values are unqualified (just column names), but the purification methods below
1800    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1801    // need to qualify them using the external reference first.
1802    let qualified_text_columns = text_columns
1803        .iter()
1804        .map(|col| {
1805            UnresolvedItemName(
1806                external_reference
1807                    .as_ref()
1808                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1809                    .unwrap_or_else(|| vec![col.clone()]),
1810            )
1811        })
1812        .collect_vec();
1813    let qualified_exclude_columns = exclude_columns
1814        .iter()
1815        .map(|col| {
1816            UnresolvedItemName(
1817                external_reference
1818                    .as_ref()
1819                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1820                    .unwrap_or_else(|| vec![col.clone()]),
1821            )
1822        })
1823        .collect_vec();
1824
1825    // Should be overriden below if a source-specific format is required.
1826    let mut format_options = SourceFormatOptions::Default;
1827
1828    let retrieved_source_references: RetrievedSourceReferences;
1829
1830    let requested_references = external_reference.as_ref().map(|ref_name| {
1831        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1832            reference: ref_name.clone(),
1833            alias: None,
1834        }])
1835    });
1836
1837    // Run purification work specific to each source type: resolve the external reference to
1838    // a fully qualified name and obtain the appropriate details for the source-export statement
1839    let purified_export = match desc.connection {
1840        GenericSourceConnection::Postgres(pg_source_connection) => {
1841            // Get PostgresConnection for generating subsources.
1842            let pg_connection = &pg_source_connection.connection;
1843
1844            let client = pg_connection
1845                .validate(pg_source_connection.connection_id, storage_configuration)
1846                .await?;
1847
1848            let reference_client = SourceReferenceClient::Postgres {
1849                client: &client,
1850                publication: &pg_source_connection.publication,
1851                database: &pg_connection.database,
1852            };
1853            retrieved_source_references = reference_client.get_source_references().await?;
1854
1855            let postgres::PurifiedSourceExports {
1856                source_exports,
1857                // TODO(database-issues#8620): Remove once subsources are removed
1858                // This `normalized_text_columns` is not relevant for us and is only returned for
1859                // `CREATE SOURCE` statements that automatically generate subsources
1860                normalized_text_columns: _,
1861            } = postgres::purify_source_exports(
1862                &client,
1863                &retrieved_source_references,
1864                &requested_references,
1865                qualified_text_columns,
1866                qualified_exclude_columns,
1867                &unresolved_source_name,
1868                &SourceReferencePolicy::Required,
1869            )
1870            .await?;
1871            // There should be exactly one source_export returned for this statement
1872            let (_, purified_export) = source_exports.into_element();
1873            purified_export
1874        }
1875        GenericSourceConnection::MySql(mysql_source_connection) => {
1876            let mysql_connection = &mysql_source_connection.connection;
1877            let config = mysql_connection
1878                .config(
1879                    &storage_configuration.connection_context.secrets_reader,
1880                    storage_configuration,
1881                    InTask::No,
1882                )
1883                .await?;
1884
1885            let mut conn = config
1886                .connect(
1887                    "mysql purification",
1888                    &storage_configuration.connection_context.ssh_tunnel_manager,
1889                )
1890                .await?;
1891
1892            ensure_binlog_full_metadata(&mut conn).await?;
1893            let binlog_full_metadata = true;
1894
1895            // Retrieve the current @gtid_executed value of the server to mark as the effective
1896            // initial snapshot point for this table.
1897            let initial_gtid_set =
1898                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1899
1900            let reference_client = SourceReferenceClient::MySql {
1901                conn: &mut conn,
1902                include_system_schemas: mysql::references_system_schemas(&requested_references),
1903            };
1904            retrieved_source_references = reference_client.get_source_references().await?;
1905
1906            let mysql::PurifiedSourceExports {
1907                source_exports,
1908                // TODO(database-issues#8620): Remove once subsources are removed
1909                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1910                // `CREATE SOURCE` statements that automatically generate subsources
1911                normalized_text_columns: _,
1912                normalized_exclude_columns: _,
1913            } = mysql::purify_source_exports(
1914                &mut conn,
1915                &retrieved_source_references,
1916                &requested_references,
1917                qualified_text_columns,
1918                qualified_exclude_columns,
1919                &unresolved_source_name,
1920                initial_gtid_set,
1921                &SourceReferencePolicy::Required,
1922                binlog_full_metadata,
1923            )
1924            .await?;
1925            // There should be exactly one source_export returned for this statement
1926            let (_, purified_export) = source_exports.into_element();
1927            purified_export
1928        }
1929        GenericSourceConnection::SqlServer(sql_server_source) => {
1930            let connection = sql_server_source.connection;
1931            let config = connection
1932                .resolve_config(
1933                    &storage_configuration.connection_context.secrets_reader,
1934                    storage_configuration,
1935                    InTask::No,
1936                )
1937                .await?;
1938            let mut client = mz_sql_server_util::Client::connect(config).await?;
1939
1940            let database: Arc<str> = connection.database.into();
1941            let reference_client = SourceReferenceClient::SqlServer {
1942                client: &mut client,
1943                database: Arc::clone(&database),
1944            };
1945            retrieved_source_references = reference_client.get_source_references().await?;
1946            tracing::debug!(?retrieved_source_references, "got source references");
1947
1948            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1949                .get(storage_configuration.config_set());
1950
1951            let purified_source_exports = sql_server::purify_source_exports(
1952                &*database,
1953                &mut client,
1954                &retrieved_source_references,
1955                &requested_references,
1956                &qualified_text_columns,
1957                &qualified_exclude_columns,
1958                &unresolved_source_name,
1959                timeout,
1960                &SourceReferencePolicy::Required,
1961            )
1962            .await?;
1963
1964            // There should be exactly one source_export returned for this statement
1965            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1966            purified_export
1967        }
1968        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1969            let reference_client = SourceReferenceClient::LoadGenerator {
1970                generator: &load_gen_connection.load_generator,
1971            };
1972            retrieved_source_references = reference_client.get_source_references().await?;
1973
1974            let requested_exports = retrieved_source_references
1975                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1976            // There should be exactly one source_export returned
1977            let export = requested_exports.into_element();
1978            PurifiedSourceExport {
1979                external_reference: export.external_reference,
1980                details: PurifiedExportDetails::LoadGenerator {
1981                    table: export
1982                        .meta
1983                        .load_generator_desc()
1984                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
1985                        .clone(),
1986                    output: export
1987                        .meta
1988                        .load_generator_output()
1989                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
1990                        .clone(),
1991                },
1992            }
1993        }
1994        GenericSourceConnection::Kafka(kafka_conn) => {
1995            let reference_client = SourceReferenceClient::Kafka {
1996                topic: &kafka_conn.topic,
1997            };
1998            retrieved_source_references = reference_client.get_source_references().await?;
1999            let requested_exports = retrieved_source_references
2000                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2001            // There should be exactly one source_export returned
2002            let export = requested_exports.into_element();
2003
2004            format_options = SourceFormatOptions::Kafka {
2005                topic: kafka_conn.topic.clone(),
2006            };
2007            PurifiedSourceExport {
2008                external_reference: export.external_reference,
2009                details: PurifiedExportDetails::Kafka {},
2010            }
2011        }
2012    };
2013
2014    purify_source_format(
2015        &catalog,
2016        format,
2017        &format_options,
2018        envelope,
2019        storage_configuration,
2020    )
2021    .await?;
2022
2023    // Update the external reference in the statement to the resolved fully-qualified
2024    // external reference
2025    *external_reference = Some(purified_export.external_reference.clone());
2026
2027    // Update options in the statement using the purified export details
2028    match &purified_export.details {
2029        PurifiedExportDetails::Postgres { .. } => {
2030            let mut unsupported_cols = vec![];
2031            let postgres::PostgresExportStatementValues {
2032                columns: gen_columns,
2033                constraints: gen_constraints,
2034                text_columns: gen_text_columns,
2035                exclude_columns: gen_exclude_columns,
2036                details: gen_details,
2037                external_reference: _,
2038            } = postgres::generate_source_export_statement_values(
2039                &scx,
2040                purified_export,
2041                &mut unsupported_cols,
2042            )?;
2043            if !unsupported_cols.is_empty() {
2044                unsupported_cols.sort();
2045                Err(PgSourcePurificationError::UnrecognizedTypes {
2046                    cols: unsupported_cols,
2047                })?;
2048            }
2049
2050            if let Some(text_cols_option) = with_options
2051                .iter_mut()
2052                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2053            {
2054                match gen_text_columns {
2055                    Some(gen_text_columns) => {
2056                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2057                    }
2058                    None => soft_panic_or_log!(
2059                        "text_columns should be Some if text_cols_option is present"
2060                    ),
2061                }
2062            }
2063            if let Some(exclude_cols_option) = with_options
2064                .iter_mut()
2065                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2066            {
2067                match gen_exclude_columns {
2068                    Some(gen_exclude_columns) => {
2069                        exclude_cols_option.value =
2070                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2071                    }
2072                    None => soft_panic_or_log!(
2073                        "exclude_columns should be Some if exclude_cols_option is present"
2074                    ),
2075                }
2076            }
2077            match columns {
2078                TableFromSourceColumns::Defined(_) => {
2079                    // Purification is what populates the `Defined` variant;
2080                    // reaching this is an internal invariant violation.
2081                    bail_internal!(
2082                        "column definitions cannot be explicitly set for this source type"
2083                    )
2084                }
2085                TableFromSourceColumns::NotSpecified => {
2086                    *columns = TableFromSourceColumns::Defined(gen_columns);
2087                    *constraints = gen_constraints;
2088                }
2089                TableFromSourceColumns::Named(_) => {
2090                    sql_bail!("columns cannot be named for Postgres sources")
2091                }
2092            }
2093            with_options.push(TableFromSourceOption {
2094                name: TableFromSourceOptionName::Details,
2095                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2096                    gen_details.into_proto().encode_to_vec(),
2097                )))),
2098            })
2099        }
2100        PurifiedExportDetails::MySql { .. } => {
2101            let mysql::MySqlExportStatementValues {
2102                columns: gen_columns,
2103                constraints: gen_constraints,
2104                text_columns: gen_text_columns,
2105                exclude_columns: gen_exclude_columns,
2106                details: gen_details,
2107                external_reference: _,
2108            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2109
2110            if let Some(text_cols_option) = with_options
2111                .iter_mut()
2112                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2113            {
2114                match gen_text_columns {
2115                    Some(gen_text_columns) => {
2116                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2117                    }
2118                    None => soft_panic_or_log!(
2119                        "text_columns should be Some if text_cols_option is present"
2120                    ),
2121                }
2122            }
2123            if let Some(exclude_cols_option) = with_options
2124                .iter_mut()
2125                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2126            {
2127                match gen_exclude_columns {
2128                    Some(gen_exclude_columns) => {
2129                        exclude_cols_option.value =
2130                            Some(WithOptionValue::Sequence(gen_exclude_columns))
2131                    }
2132                    None => soft_panic_or_log!(
2133                        "exclude_columns should be Some if exclude_cols_option is present"
2134                    ),
2135                }
2136            }
2137            match columns {
2138                TableFromSourceColumns::Defined(_) => {
2139                    // Purification is what populates the `Defined` variant;
2140                    // reaching this is an internal invariant violation.
2141                    bail_internal!(
2142                        "column definitions cannot be explicitly set for this source type"
2143                    )
2144                }
2145                TableFromSourceColumns::NotSpecified => {
2146                    *columns = TableFromSourceColumns::Defined(gen_columns);
2147                    *constraints = gen_constraints;
2148                }
2149                TableFromSourceColumns::Named(_) => {
2150                    sql_bail!("columns cannot be named for MySQL sources")
2151                }
2152            }
2153            with_options.push(TableFromSourceOption {
2154                name: TableFromSourceOptionName::Details,
2155                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2156                    gen_details.into_proto().encode_to_vec(),
2157                )))),
2158            })
2159        }
2160        PurifiedExportDetails::SqlServer { .. } => {
2161            let sql_server::SqlServerExportStatementValues {
2162                columns: gen_columns,
2163                constraints: gen_constraints,
2164                text_columns: gen_text_columns,
2165                excl_columns: gen_excl_columns,
2166                details: gen_details,
2167                external_reference: _,
2168            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2169
2170            if let Some(text_cols_option) = with_options
2171                .iter_mut()
2172                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2173            {
2174                match gen_text_columns {
2175                    Some(gen_text_columns) => {
2176                        text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2177                    }
2178                    None => soft_panic_or_log!(
2179                        "text_columns should be Some if text_cols_option is present"
2180                    ),
2181                }
2182            }
2183            if let Some(exclude_cols_option) = with_options
2184                .iter_mut()
2185                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2186            {
2187                match gen_excl_columns {
2188                    Some(gen_excl_columns) => {
2189                        exclude_cols_option.value =
2190                            Some(WithOptionValue::Sequence(gen_excl_columns))
2191                    }
2192                    None => soft_panic_or_log!(
2193                        "excl_columns should be Some if excl_cols_option is present"
2194                    ),
2195                }
2196            }
2197
2198            match columns {
2199                TableFromSourceColumns::NotSpecified => {
2200                    *columns = TableFromSourceColumns::Defined(gen_columns);
2201                    *constraints = gen_constraints;
2202                }
2203                TableFromSourceColumns::Named(_) => {
2204                    sql_bail!("columns cannot be named for SQL Server sources")
2205                }
2206                TableFromSourceColumns::Defined(_) => {
2207                    // Purification is what populates the `Defined` variant;
2208                    // reaching this is an internal invariant violation.
2209                    bail_internal!(
2210                        "column definitions cannot be explicitly set for this source type"
2211                    )
2212                }
2213            }
2214
2215            with_options.push(TableFromSourceOption {
2216                name: TableFromSourceOptionName::Details,
2217                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2218                    gen_details.into_proto().encode_to_vec(),
2219                )))),
2220            })
2221        }
2222        PurifiedExportDetails::LoadGenerator { .. } => {
2223            let (desc, output) = match purified_export.details {
2224                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2225                _ => bail_internal!("purified export details must be load generator"),
2226            };
2227            // We only determine the table description for multi-output load generator sources here,
2228            // whereas single-output load generators will have their relation description
2229            // determined during statment planning as envelope and format options may affect their
2230            // schema.
2231            if let Some(desc) = desc {
2232                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2233                match columns {
2234                    // Purification is what populates the `Defined` variant;
2235                    // reaching this is an internal invariant violation.
2236                    TableFromSourceColumns::Defined(_) => bail_internal!(
2237                        "column definitions cannot be explicitly set for this source type"
2238                    ),
2239                    TableFromSourceColumns::NotSpecified => {
2240                        *columns = TableFromSourceColumns::Defined(gen_columns);
2241                        *constraints = gen_constraints;
2242                    }
2243                    TableFromSourceColumns::Named(_) => {
2244                        sql_bail!("columns cannot be named for multi-output load generator sources")
2245                    }
2246                }
2247            }
2248            let details = SourceExportStatementDetails::LoadGenerator { output };
2249            with_options.push(TableFromSourceOption {
2250                name: TableFromSourceOptionName::Details,
2251                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2252                    details.into_proto().encode_to_vec(),
2253                )))),
2254            })
2255        }
2256        PurifiedExportDetails::Kafka {} => {
2257            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2258            // format field, so we don't specify any columns or constraints to be stored
2259            // on the statement here. The RelationDesc will be determined during planning.
2260            let details = SourceExportStatementDetails::Kafka {};
2261            with_options.push(TableFromSourceOption {
2262                name: TableFromSourceOptionName::Details,
2263                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2264                    details.into_proto().encode_to_vec(),
2265                )))),
2266            })
2267        }
2268    };
2269
2270    // TODO: We might as well use the retrieved available references to update the source
2271    // available references table in the catalog, so plumb this through.
2272    // available_source_references: retrieved_source_references.available_source_references(),
2273    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2274}
2275
2276enum SourceFormatOptions {
2277    Default,
2278    Kafka { topic: String },
2279}
2280
2281async fn purify_source_format(
2282    catalog: &dyn SessionCatalog,
2283    format: &mut Option<FormatSpecifier<Aug>>,
2284    options: &SourceFormatOptions,
2285    envelope: &Option<SourceEnvelope>,
2286    storage_configuration: &StorageConfiguration,
2287) -> Result<(), PlanError> {
2288    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2289        && !matches!(options, SourceFormatOptions::Kafka { .. })
2290    {
2291        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2292    }
2293
2294    match format.as_mut() {
2295        None => {}
2296        Some(FormatSpecifier::Bare(format)) => {
2297            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2298                .await?;
2299        }
2300
2301        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2302            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2303                .await?;
2304            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2305                .await?;
2306        }
2307    }
2308    Ok(())
2309}
2310
2311async fn purify_source_format_single(
2312    catalog: &dyn SessionCatalog,
2313    format: &mut Format<Aug>,
2314    options: &SourceFormatOptions,
2315    envelope: &Option<SourceEnvelope>,
2316    storage_configuration: &StorageConfiguration,
2317) -> Result<(), PlanError> {
2318    match format {
2319        Format::Avro(schema) => match schema {
2320            AvroSchema::Csr { csr_connection } => {
2321                purify_csr_connection_avro(
2322                    catalog,
2323                    options,
2324                    csr_connection,
2325                    envelope,
2326                    storage_configuration,
2327                )
2328                .await?
2329            }
2330            AvroSchema::InlineSchema { .. } => {}
2331        },
2332        Format::Protobuf(schema) => match schema {
2333            ProtobufSchema::Csr { csr_connection } => {
2334                purify_csr_connection_proto(
2335                    catalog,
2336                    options,
2337                    csr_connection,
2338                    envelope,
2339                    storage_configuration,
2340                )
2341                .await?;
2342            }
2343            ProtobufSchema::InlineSchema { .. } => {}
2344        },
2345        Format::Bytes
2346        | Format::Regex(_)
2347        | Format::Json { .. }
2348        | Format::Text
2349        | Format::Csv { .. } => (),
2350    }
2351    Ok(())
2352}
2353
2354pub fn generate_subsource_statements(
2355    scx: &StatementContext,
2356    source_name: ResolvedItemName,
2357    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2358) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2359    // get the first subsource to determine the connection type
2360    if subsources.is_empty() {
2361        return Ok(vec![]);
2362    }
2363    let (_, purified_export) = subsources
2364        .iter()
2365        .next()
2366        .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2367
2368    let statements = match &purified_export.details {
2369        PurifiedExportDetails::Postgres { .. } => {
2370            crate::pure::postgres::generate_create_subsource_statements(
2371                scx,
2372                source_name,
2373                subsources,
2374            )?
2375        }
2376        PurifiedExportDetails::MySql { .. } => {
2377            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2378        }
2379        PurifiedExportDetails::SqlServer { .. } => {
2380            crate::pure::sql_server::generate_create_subsource_statements(
2381                scx,
2382                source_name,
2383                subsources,
2384            )?
2385        }
2386        PurifiedExportDetails::LoadGenerator { .. } => {
2387            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2388            for (subsource_name, purified_export) in subsources {
2389                let (desc, output) = match purified_export.details {
2390                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2391                    _ => {
2392                        bail_internal!("purified export details must be load generator")
2393                    }
2394                };
2395                let desc = desc.ok_or_else(|| {
2396                    internal_err!(
2397                        "subsources cannot be generated for single-output load generators"
2398                    )
2399                })?;
2400
2401                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2402                let details = SourceExportStatementDetails::LoadGenerator { output };
2403                // Create the subsource statement
2404                let subsource = CreateSubsourceStatement {
2405                    name: subsource_name,
2406                    columns,
2407                    of_source: Some(source_name.clone()),
2408                    // unlike sources that come from an external upstream, we
2409                    // have more leniency to introduce different constraints
2410                    // every time the load generator is run; i.e. we are not as
2411                    // worried about introducing junk data.
2412                    constraints: table_constraints,
2413                    if_not_exists: false,
2414                    with_options: vec![
2415                        CreateSubsourceOption {
2416                            name: CreateSubsourceOptionName::ExternalReference,
2417                            value: Some(WithOptionValue::UnresolvedItemName(
2418                                purified_export.external_reference,
2419                            )),
2420                        },
2421                        CreateSubsourceOption {
2422                            name: CreateSubsourceOptionName::Details,
2423                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2424                                details.into_proto().encode_to_vec(),
2425                            )))),
2426                        },
2427                    ],
2428                };
2429                subsource_stmts.push(subsource);
2430            }
2431
2432            subsource_stmts
2433        }
2434        PurifiedExportDetails::Kafka { .. } => {
2435            // TODO: as part of database-issues#8322, Kafka sources will begin
2436            // producing data––we'll need to understand the schema
2437            // of the output here.
2438            if !subsources.is_empty() {
2439                bail_internal!("Kafka sources do not produce data-bearing subsources");
2440            }
2441            vec![]
2442        }
2443    };
2444    Ok(statements)
2445}
2446
2447async fn purify_csr_connection_proto(
2448    catalog: &dyn SessionCatalog,
2449    options: &SourceFormatOptions,
2450    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2451    envelope: &Option<SourceEnvelope>,
2452    storage_configuration: &StorageConfiguration,
2453) -> Result<(), PlanError> {
2454    let SourceFormatOptions::Kafka { topic } = options else {
2455        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2456    };
2457
2458    let CsrConnectionProtobuf {
2459        seed,
2460        connection: CsrConnection {
2461            connection,
2462            options: _,
2463        },
2464    } = csr_connection;
2465    match seed {
2466        None => {
2467            let scx = StatementContext::new(None, &*catalog);
2468
2469            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2470                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2471                _ => sql_bail!("{} is not a schema registry connection", connection),
2472            };
2473
2474            let ccsr_client = ccsr_connection
2475                .connect(storage_configuration, InTask::No)
2476                .await
2477                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2478
2479            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2480            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2481                .await
2482                .ok();
2483
2484            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2485                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2486            }
2487
2488            *seed = Some(CsrSeedProtobuf { value, key });
2489        }
2490        Some(_) => (),
2491    }
2492
2493    Ok(())
2494}
2495
2496async fn purify_csr_connection_avro(
2497    catalog: &dyn SessionCatalog,
2498    options: &SourceFormatOptions,
2499    csr_connection: &mut CsrConnectionAvro<Aug>,
2500    envelope: &Option<SourceEnvelope>,
2501    storage_configuration: &StorageConfiguration,
2502) -> Result<(), PlanError> {
2503    let SourceFormatOptions::Kafka { topic } = options else {
2504        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2505    };
2506
2507    let CsrConnectionAvro {
2508        connection: CsrConnection { connection, .. },
2509        seed,
2510        key_strategy,
2511        value_strategy,
2512    } = csr_connection;
2513    if seed.is_none() {
2514        let scx = StatementContext::new(None, &*catalog);
2515        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2516            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2517            _ => sql_bail!("{} is not a schema registry connection", connection),
2518        };
2519        let ccsr_client = csr_connection
2520            .connect(storage_configuration, InTask::No)
2521            .await
2522            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2523
2524        let Schema {
2525            key_schema,
2526            value_schema,
2527            key_reference_schemas,
2528            value_reference_schemas,
2529        } = get_remote_csr_schema(
2530            &ccsr_client,
2531            key_strategy.clone().unwrap_or_default(),
2532            value_strategy.clone().unwrap_or_default(),
2533            topic,
2534        )
2535        .await?;
2536        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2537            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2538        }
2539
2540        *seed = Some(CsrSeedAvro {
2541            key_schema,
2542            value_schema,
2543            key_reference_schemas,
2544            value_reference_schemas,
2545        })
2546    }
2547
2548    Ok(())
2549}
2550
2551#[derive(Debug)]
2552pub struct Schema {
2553    pub key_schema: Option<String>,
2554    pub value_schema: String,
2555    /// Reference schemas for the key schema, in dependency order.
2556    pub key_reference_schemas: Vec<String>,
2557    /// Reference schemas for the value schema, in dependency order.
2558    pub value_reference_schemas: Vec<String>,
2559}
2560
2561/// Result of fetching a schema, including any referenced schemas.
2562struct SchemaWithReferences {
2563    /// The primary schema.
2564    schema: String,
2565    /// Reference schemas in dependency order.
2566    references: Vec<String>,
2567}
2568
2569async fn get_schema_with_strategy(
2570    client: &Client,
2571    strategy: ReaderSchemaSelectionStrategy,
2572    subject: &str,
2573) -> Result<Option<SchemaWithReferences>, PlanError> {
2574    match strategy {
2575        ReaderSchemaSelectionStrategy::Latest => {
2576            // Use get_subject_and_references to also fetch referenced schemas
2577            match client.get_subject_and_references(subject).await {
2578                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2579                    schema: primary.schema.raw,
2580                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2581                })),
2582                Err(GetBySubjectError::SubjectNotFound)
2583                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2584                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2585                    schema_lookup: format!("subject {}", subject.quoted()),
2586                    cause: Arc::new(e),
2587                }),
2588            }
2589        }
2590        // It's possible that CSR was provided with an inline strategy, but without a subject
2591        // or schema id to look up, there isn't a clean way to lookup references for the schema.
2592        // This could be done with extra work, but at this point, it isn't clear this is needed.
2593        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2594            schema: raw,
2595            references: vec![],
2596        })),
2597        ReaderSchemaSelectionStrategy::ById(id) => {
2598            match client.get_subject_and_references_by_id(id).await {
2599                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2600                    schema: primary.schema.raw,
2601                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2602                })),
2603                Err(GetBySubjectError::SubjectNotFound)
2604                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2605                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2606                    schema_lookup: format!("subject {}", subject.quoted()),
2607                    cause: Arc::new(e),
2608                }),
2609            }
2610        }
2611    }
2612}
2613
2614async fn get_remote_csr_schema(
2615    ccsr_client: &mz_ccsr::Client,
2616    key_strategy: ReaderSchemaSelectionStrategy,
2617    value_strategy: ReaderSchemaSelectionStrategy,
2618    topic: &str,
2619) -> Result<Schema, PlanError> {
2620    let value_schema_name = format!("{}-value", topic);
2621    let value_result =
2622        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2623    let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2624
2625    let key_subject = format!("{}-key", topic);
2626    let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2627    Ok(Schema {
2628        key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2629        value_schema: value_result.schema,
2630        key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2631        value_reference_schemas: value_result.references,
2632    })
2633}
2634
2635/// Collect protobuf message descriptor from CSR and compile the descriptor.
2636async fn compile_proto(
2637    subject_name: &String,
2638    ccsr_client: &Client,
2639) -> Result<CsrSeedProtobufSchema, PlanError> {
2640    let (primary_subject, dependency_subjects) = ccsr_client
2641        .get_subject_and_references(subject_name)
2642        .await
2643        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2644            schema_lookup: format!("subject {}", subject_name.quoted()),
2645            cause: Arc::new(e),
2646        })?;
2647
2648    // Compile .proto files into a file descriptor set.
2649    let mut source_tree = VirtualSourceTree::new();
2650
2651    // Add well-known types (e.g., google/protobuf/timestamp.proto) to the source
2652    // tree. These are implicitly available to protoc but are typically not
2653    // registered in the schema registry.
2654    source_tree.as_mut().map_well_known_types();
2655
2656    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2657        source_tree.as_mut().add_file(
2658            Path::new(&subject.name),
2659            subject.schema.raw.as_bytes().to_vec(),
2660        );
2661    }
2662    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2663    let fds = db
2664        .as_mut()
2665        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2666        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2667
2668    // Ensure there is exactly one message in the file.
2669    let primary_fd = fds.file(0);
2670    let message_name = match primary_fd.message_type_size() {
2671        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2672        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2673        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2674    };
2675
2676    // Encode the file descriptor set into a SQL byte string.
2677    let bytes = &fds
2678        .serialize()
2679        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2680    let mut schema = String::new();
2681    strconv::format_bytes(&mut schema, bytes);
2682
2683    Ok(CsrSeedProtobufSchema {
2684        schema,
2685        message_name,
2686    })
2687}
2688
2689const MZ_NOW_NAME: &str = "mz_now";
2690const MZ_NOW_SCHEMA: &str = "mz_catalog";
2691
2692/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2693/// references to ids appear or disappear during the purification.
2694///
2695/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2696/// this function is not making any network calls.
2697pub fn purify_create_materialized_view_options(
2698    catalog: impl SessionCatalog,
2699    mz_now: Option<Timestamp>,
2700    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2701    resolved_ids: &mut ResolvedIds,
2702) {
2703    // 0. Preparations:
2704    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2705    let (mz_now_id, mz_now_expr) = {
2706        let item = catalog
2707            .resolve_function(&PartialItemName {
2708                database: None,
2709                schema: Some(MZ_NOW_SCHEMA.to_string()),
2710                item: MZ_NOW_NAME.to_string(),
2711            })
2712            .expect("we should be able to resolve mz_now");
2713        (
2714            item.id(),
2715            Expr::Function(Function {
2716                name: ResolvedItemName::Item {
2717                    id: item.id(),
2718                    qualifiers: item.name().qualifiers.clone(),
2719                    full_name: catalog.resolve_full_name(item.name()),
2720                    print_id: false,
2721                    version: RelationVersionSelector::Latest,
2722                },
2723                args: FunctionArgs::Args {
2724                    args: Vec::new(),
2725                    order_by: Vec::new(),
2726                },
2727                filter: None,
2728                over: None,
2729                distinct: false,
2730            }),
2731        )
2732    };
2733    // Prepare the `mz_timestamp` type.
2734    let (mz_timestamp_id, mz_timestamp_type) = {
2735        let item = catalog.get_system_type("mz_timestamp");
2736        let full_name = catalog.resolve_full_name(item.name());
2737        (
2738            item.id(),
2739            ResolvedDataType::Named {
2740                id: item.id(),
2741                qualifiers: item.name().qualifiers.clone(),
2742                full_name,
2743                modifiers: vec![],
2744                print_id: true,
2745            },
2746        )
2747    };
2748
2749    let mut introduced_mz_timestamp = false;
2750
2751    for option in cmvs.with_options.iter_mut() {
2752        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2753        if matches!(
2754            option.value,
2755            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2756        ) {
2757            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2758                RefreshAtOptionValue {
2759                    time: mz_now_expr.clone(),
2760                },
2761            )));
2762        }
2763
2764        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2765        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2766            RefreshEveryOptionValue { aligned_to, .. },
2767        ))) = &mut option.value
2768        {
2769            if aligned_to.is_none() {
2770                *aligned_to = Some(mz_now_expr.clone());
2771            }
2772        }
2773
2774        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2775        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2776        match &mut option.value {
2777            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2778                time,
2779            }))) => {
2780                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2781                visitor.visit_expr_mut(time);
2782                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2783            }
2784            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2785                RefreshEveryOptionValue {
2786                    interval: _,
2787                    aligned_to: Some(aligned_to),
2788                },
2789            ))) => {
2790                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2791                visitor.visit_expr_mut(aligned_to);
2792                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2793            }
2794            _ => {}
2795        }
2796    }
2797
2798    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2799    if !cmvs.with_options.iter().any(|o| {
2800        matches!(
2801            o,
2802            MaterializedViewOption {
2803                value: Some(WithOptionValue::Refresh(..)),
2804                ..
2805            }
2806        )
2807    }) {
2808        cmvs.with_options.push(MaterializedViewOption {
2809            name: MaterializedViewOptionName::Refresh,
2810            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2811        })
2812    }
2813
2814    // 5. Attend to `resolved_ids`: The purification might have
2815    // - added references to `mz_timestamp`;
2816    // - removed references to `mz_now`.
2817    if introduced_mz_timestamp {
2818        resolved_ids.add_item(mz_timestamp_id);
2819    }
2820    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2821    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2822    // for `mz_now()` everywhere.
2823    let mut visitor = ExprContainsTemporalVisitor::new();
2824    visitor.visit_create_materialized_view_statement(cmvs);
2825    if !visitor.contains_temporal {
2826        resolved_ids.remove_item(&mz_now_id);
2827    }
2828}
2829
2830/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2831/// after purification.
2832pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2833    match &mvo.value {
2834        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2835            let mut visitor = ExprContainsTemporalVisitor::new();
2836            visitor.visit_expr(time);
2837            visitor.contains_temporal
2838        }
2839        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2840            interval: _,
2841            aligned_to: Some(aligned_to),
2842        }))) => {
2843            let mut visitor = ExprContainsTemporalVisitor::new();
2844            visitor.visit_expr(aligned_to);
2845            visitor.contains_temporal
2846        }
2847        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2848            interval: _,
2849            aligned_to: None,
2850        }))) => {
2851            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2852            // `ALIGNED TO` to `mz_now()`.
2853            true
2854        }
2855        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2856            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2857            true
2858        }
2859        _ => false,
2860    }
2861}
2862
2863/// Determines whether the AST involves `mz_now()`.
2864struct ExprContainsTemporalVisitor {
2865    pub contains_temporal: bool,
2866}
2867
2868impl ExprContainsTemporalVisitor {
2869    pub fn new() -> ExprContainsTemporalVisitor {
2870        ExprContainsTemporalVisitor {
2871            contains_temporal: false,
2872        }
2873    }
2874}
2875
2876impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2877    fn visit_function(&mut self, func: &Function<Aug>) {
2878        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2879        visit_function(self, func);
2880    }
2881}
2882
2883struct MzNowPurifierVisitor {
2884    pub mz_now: Option<Timestamp>,
2885    pub mz_timestamp_type: ResolvedDataType,
2886    pub introduced_mz_timestamp: bool,
2887}
2888
2889impl MzNowPurifierVisitor {
2890    pub fn new(
2891        mz_now: Option<Timestamp>,
2892        mz_timestamp_type: ResolvedDataType,
2893    ) -> MzNowPurifierVisitor {
2894        MzNowPurifierVisitor {
2895            mz_now,
2896            mz_timestamp_type,
2897            introduced_mz_timestamp: false,
2898        }
2899    }
2900}
2901
2902impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2903    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2904        match expr {
2905            Expr::Function(Function {
2906                name:
2907                    ResolvedItemName::Item {
2908                        full_name: FullItemName { item, .. },
2909                        ..
2910                    },
2911                ..
2912            }) if item == &MZ_NOW_NAME.to_string() => {
2913                let mz_now = self.mz_now.expect(
2914                    "we should have chosen a timestamp if the expression contains mz_now()",
2915                );
2916                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2917                // not alter the type of the expression.
2918                *expr = Expr::Cast {
2919                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2920                    data_type: self.mz_timestamp_type.clone(),
2921                };
2922                self.introduced_mz_timestamp = true;
2923            }
2924            _ => visit_expr_mut(self, expr),
2925        }
2926    }
2927}