Skip to main content

mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::str::StrExt;
33use mz_postgres_util::desc::PostgresTableDesc;
34use mz_proto::RustType;
35use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
36use mz_sql_parser::ast::display::AstDisplay;
37use mz_sql_parser::ast::visit::{Visit, visit_function};
38use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
39use mz_sql_parser::ast::{
40    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
41    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
42    CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
43    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
44    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
45    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
46    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
47    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
48    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
49    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
50    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
51    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
52};
53use mz_sql_server_util::desc::SqlServerTableDesc;
54use mz_storage_types::configuration::StorageConfiguration;
55use mz_storage_types::connections::Connection;
56use mz_storage_types::connections::inline::IntoInlineConnection;
57use mz_storage_types::errors::ContextCreationError;
58use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
59use mz_storage_types::sources::mysql::MySqlSourceDetails;
60use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
61use mz_storage_types::sources::{
62    GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
63};
64use prost::Message;
65use protobuf_native::MessageLite;
66use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
67use rdkafka::admin::AdminClient;
68use references::{RetrievedSourceReferences, SourceReferenceClient};
69use uuid::Uuid;
70
71use crate::ast::{
72    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
73    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
74    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
75};
76use crate::catalog::{CatalogItemType, SessionCatalog};
77use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
78use crate::names::{
79    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
80    ResolvedItemName,
81};
82use crate::plan::error::PlanError;
83use crate::plan::statement::ddl::load_generator_ast_to_generator;
84use crate::plan::{SourceReferences, StatementContext};
85use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
86use crate::pure::mysql::{ensure_binlog_full_metadata, is_binlog_full_metadata};
87use crate::{kafka_util, normalize};
88
89use self::error::{
90    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
91    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
92};
93
94pub(crate) mod error;
95mod references;
96
97pub mod mysql;
98pub mod postgres;
99pub mod sql_server;
100
101pub(crate) struct RequestedSourceExport<T> {
102    external_reference: UnresolvedItemName,
103    name: UnresolvedItemName,
104    meta: T,
105}
106
107impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("RequestedSourceExport")
110            .field("external_reference", &self.external_reference)
111            .field("name", &self.name)
112            .field("meta", &self.meta)
113            .finish()
114    }
115}
116
117impl<T> RequestedSourceExport<T> {
118    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
119        RequestedSourceExport {
120            external_reference: self.external_reference,
121            name: self.name,
122            meta: new_meta,
123        }
124    }
125}
126
127/// Generates a subsource name by prepending source schema name if present
128///
129/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
130/// so that it's generated in the same schema as source
131fn source_export_name_gen(
132    source_name: &UnresolvedItemName,
133    subsource_name: &str,
134) -> Result<UnresolvedItemName, PlanError> {
135    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
136    partial.item = subsource_name.to_string();
137    Ok(UnresolvedItemName::from(partial))
138}
139
140// TODO(database-issues#8620): Remove once subsources are removed
141/// Validates the requested subsources do not have name conflicts with each other
142/// and that the same upstream table is not referenced multiple times.
143fn validate_source_export_names<T>(
144    requested_source_exports: &[RequestedSourceExport<T>],
145) -> Result<(), PlanError> {
146    // This condition would get caught during the catalog transaction, but produces a
147    // vague, non-contextual error. Instead, error here so we can suggest to the user
148    // how to fix the problem.
149    if let Some(name) = requested_source_exports
150        .iter()
151        .map(|subsource| &subsource.name)
152        .duplicates()
153        .next()
154        .cloned()
155    {
156        let mut upstream_references: Vec<_> = requested_source_exports
157            .into_iter()
158            .filter_map(|subsource| {
159                if &subsource.name == &name {
160                    Some(subsource.external_reference.clone())
161                } else {
162                    None
163                }
164            })
165            .collect();
166
167        upstream_references.sort();
168
169        Err(PlanError::SubsourceNameConflict {
170            name,
171            upstream_references,
172        })?;
173    }
174
175    // We disallow subsource statements from referencing the same upstream table, but we allow
176    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
177    // purification will only provide 1 `requested_source_export`, we can leave this here without
178    // needing to differentiate between the two types of statements.
179    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
180    if let Some(name) = requested_source_exports
181        .iter()
182        .map(|export| &export.external_reference)
183        .duplicates()
184        .next()
185        .cloned()
186    {
187        let mut target_names: Vec<_> = requested_source_exports
188            .into_iter()
189            .filter_map(|export| {
190                if &export.external_reference == &name {
191                    Some(export.name.clone())
192                } else {
193                    None
194                }
195            })
196            .collect();
197
198        target_names.sort();
199
200        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
201    }
202
203    Ok(())
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub enum PurifiedStatement {
208    PurifiedCreateSource {
209        // The progress subsource, if we are offloading progress info to a separate relation
210        create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
211        create_source_stmt: CreateSourceStatement<Aug>,
212        // Map of subsource names to external details
213        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
214        /// All the available upstream references that can be added as tables
215        /// to this primary source.
216        available_source_references: SourceReferences,
217    },
218    PurifiedAlterSource {
219        alter_source_stmt: AlterSourceStatement<Aug>,
220    },
221    PurifiedAlterSourceAddSubsources {
222        // This just saves us an annoying catalog lookup
223        source_name: ResolvedItemName,
224        /// Options that we will need the values of to update the source's
225        /// definition.
226        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
227        // Map of subsource names to external details
228        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
229    },
230    PurifiedAlterSourceRefreshReferences {
231        source_name: ResolvedItemName,
232        /// The updated available upstream references for the primary source.
233        available_source_references: SourceReferences,
234    },
235    PurifiedCreateSink(CreateSinkStatement<Aug>),
236    PurifiedCreateTableFromSource {
237        stmt: CreateTableFromSourceStatement<Aug>,
238    },
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct PurifiedSourceExport {
243    pub external_reference: UnresolvedItemName,
244    pub details: PurifiedExportDetails,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum PurifiedExportDetails {
249    MySql {
250        table: MySqlTableDesc,
251        text_columns: Option<Vec<Ident>>,
252        exclude_columns: Option<Vec<Ident>>,
253        initial_gtid_set: String,
254        binlog_full_metadata: bool,
255    },
256    Postgres {
257        table: PostgresTableDesc,
258        text_columns: Option<Vec<Ident>>,
259        exclude_columns: Option<Vec<Ident>>,
260    },
261    SqlServer {
262        table: SqlServerTableDesc,
263        text_columns: Option<Vec<Ident>>,
264        excl_columns: Option<Vec<Ident>>,
265        capture_instance: Arc<str>,
266        initial_lsn: mz_sql_server_util::cdc::Lsn,
267    },
268    Kafka {},
269    LoadGenerator {
270        table: Option<RelationDesc>,
271        output: LoadGeneratorOutput,
272    },
273}
274
275/// Purifies a statement, removing any dependencies on external state.
276///
277/// See the section on [purification](crate#purification) in the crate
278/// documentation for details.
279///
280/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
281/// handled by [purify_create_materialized_view_options] instead.
282/// This could be made more consistent by a refactoring discussed here:
283/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
284pub async fn purify_statement(
285    catalog: impl SessionCatalog,
286    now: u64,
287    stmt: Statement<Aug>,
288    storage_configuration: &StorageConfiguration,
289) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
290    match stmt {
291        Statement::CreateSource(stmt) => {
292            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
293            (
294                purify_create_source(catalog, now, stmt, storage_configuration).await,
295                cluster_id,
296            )
297        }
298        Statement::AlterSource(stmt) => (
299            purify_alter_source(catalog, stmt, storage_configuration).await,
300            None,
301        ),
302        Statement::CreateSink(stmt) => {
303            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
304            (
305                purify_create_sink(catalog, stmt, storage_configuration).await,
306                cluster_id,
307            )
308        }
309        Statement::CreateTableFromSource(stmt) => (
310            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
311            None,
312        ),
313        o => (
314            Err(internal_err!(
315                "unexpected statement type in purification: {:?}",
316                o
317            )),
318            None,
319        ),
320    }
321}
322
323/// Injects `DOC ON` comments into all Avro formats that are using a schema
324/// registry by finding all SQL `COMMENT`s that are attached to the sink's
325/// underlying materialized view (as well as any types referenced by that
326/// underlying materialized view).
327pub(crate) fn purify_create_sink_avro_doc_on_options(
328    catalog: &dyn SessionCatalog,
329    from_id: CatalogItemId,
330    format: &mut Option<FormatSpecifier<Aug>>,
331) -> Result<(), PlanError> {
332    // Collect all objects referenced by the sink.
333    let from = catalog.get_item(&from_id);
334    let object_ids = from
335        .references()
336        .items()
337        .copied()
338        .chain_one(from.id())
339        .collect::<Vec<_>>();
340
341    // Collect all Avro formats that use a schema registry, as well as a set of
342    // all identifiers named in user-provided `DOC ON` options.
343    let mut avro_format_options = vec![];
344    for_each_format(format, |doc_on_schema, fmt| match fmt {
345        Format::Avro(AvroSchema::InlineSchema { .. })
346        | Format::Bytes
347        | Format::Csv { .. }
348        | Format::Json { .. }
349        | Format::Protobuf(..)
350        | Format::Regex(..)
351        | Format::Text => (),
352        Format::Avro(AvroSchema::Csr {
353            csr_connection: CsrConnectionAvro { connection, .. },
354        }) => {
355            avro_format_options.push((doc_on_schema, &mut connection.options));
356        }
357    });
358
359    // For each Avro format in the sink, inject the appropriate `DOC ON` options
360    // for each item referenced by the sink.
361    for (for_schema, options) in avro_format_options {
362        let user_provided_comments = options
363            .iter()
364            .filter_map(|CsrConfigOption { name, .. }| match name {
365                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
366                _ => None,
367            })
368            .collect::<BTreeSet<_>>();
369
370        // Adding existing comments if not already provided by user
371        for object_id in &object_ids {
372            // Always add comments to the latest version of the item.
373            let item = catalog
374                .get_item(object_id)
375                .at_version(RelationVersionSelector::Latest);
376            let full_resolved_name = ResolvedItemName::Item {
377                id: *object_id,
378                qualifiers: item.name().qualifiers.clone(),
379                full_name: catalog.resolve_full_name(item.name()),
380                print_id: !matches!(
381                    item.item_type(),
382                    CatalogItemType::Func | CatalogItemType::Type
383                ),
384                version: RelationVersionSelector::Latest,
385            };
386
387            if let Some(comments_map) = catalog.get_item_comments(object_id) {
388                // Attach comment for the item itself, if the user has not
389                // already provided an overriding `DOC ON` option for the item.
390                let doc_on_item_key = AvroDocOn {
391                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
392                    for_schema,
393                };
394                if !user_provided_comments.contains(&doc_on_item_key) {
395                    if let Some(root_comment) = comments_map.get(&None) {
396                        options.push(CsrConfigOption {
397                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
398                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
399                                root_comment.clone(),
400                            ))),
401                        });
402                    }
403                }
404
405                // Attach comment for each column in the item, if the user has
406                // not already provided an overriding `DOC ON` option for the
407                // column.
408                let column_descs = match item.type_details() {
409                    Some(details) => details.typ.desc(catalog).unwrap_or_default(),
410                    None => item.relation_desc().map(|d| d.into_owned()),
411                };
412
413                if let Some(desc) = column_descs {
414                    for (pos, column_name) in desc.iter_names().enumerate() {
415                        if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
416                            let doc_on_column_key = AvroDocOn {
417                                identifier: DocOnIdentifier::Column(ColumnName {
418                                    relation: full_resolved_name.clone(),
419                                    column: ResolvedColumnReference::Column {
420                                        name: column_name.to_owned(),
421                                        index: pos,
422                                    },
423                                }),
424                                for_schema,
425                            };
426                            if !user_provided_comments.contains(&doc_on_column_key) {
427                                options.push(CsrConfigOption {
428                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
429                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
430                                        Value::String(comment_str.clone()),
431                                    )),
432                                });
433                            }
434                        }
435                    }
436                }
437            }
438        }
439    }
440
441    Ok(())
442}
443
444/// Checks that the sink described in the statement can connect to its external
445/// resources.
446async fn purify_create_sink(
447    catalog: impl SessionCatalog,
448    mut create_sink_stmt: CreateSinkStatement<Aug>,
449    storage_configuration: &StorageConfiguration,
450) -> Result<PurifiedStatement, PlanError> {
451    // General purification
452    let CreateSinkStatement {
453        connection,
454        format,
455        with_options,
456        name: _,
457        in_cluster: _,
458        if_not_exists: _,
459        from,
460        envelope: _,
461        mode: _,
462    } = &mut create_sink_stmt;
463
464    // The list of options that the user is allowed to specify.
465    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
466        CreateSinkOptionName::Snapshot,
467        CreateSinkOptionName::CommitInterval,
468    ];
469
470    if let Some(op) = with_options
471        .iter()
472        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
473    {
474        sql_bail!(
475            "CREATE SINK...WITH ({}..) is not allowed",
476            op.name.to_ast_string_simple(),
477        )
478    }
479
480    match &connection {
481        CreateSinkConnection::Kafka {
482            connection,
483            options,
484            key: _,
485            headers: _,
486        } => {
487            // We must not leave any state behind in the Kafka broker, so just ensure that
488            // we can connect. This means we don't ensure that we can create the topic and
489            // introduces TOCTOU errors, but creating an inoperable sink is infinitely
490            // preferable to leaking state in users' environments.
491            let scx = StatementContext::new(None, &catalog);
492            let connection = {
493                let item = scx.get_item_by_resolved_name(connection)?;
494                // Get Kafka connection
495                match item.connection()? {
496                    Connection::Kafka(connection) => {
497                        connection.clone().into_inline_connection(scx.catalog)
498                    }
499                    _ => sql_bail!(
500                        "{} is not a kafka connection",
501                        scx.catalog.resolve_full_name(item.name())
502                    ),
503                }
504            };
505
506            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
507
508            if extracted_options.legacy_ids == Some(true) {
509                sql_bail!("LEGACY IDs option is not supported");
510            }
511
512            let client: AdminClient<_> = connection
513                .create_with_context(
514                    storage_configuration,
515                    MzClientContext::default(),
516                    &BTreeMap::new(),
517                    InTask::No,
518                )
519                .await
520                .map_err(|e| {
521                    // anyhow doesn't support Clone, so not trivial to move into PlanError
522                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
523                })?;
524
525            let metadata = client
526                .inner()
527                .fetch_metadata(
528                    None,
529                    storage_configuration
530                        .parameters
531                        .kafka_timeout_config
532                        .fetch_metadata_timeout,
533                )
534                .map_err(|e| {
535                    KafkaSinkPurificationError::AdminClientError(Arc::new(
536                        ContextCreationError::KafkaError(e),
537                    ))
538                })?;
539
540            if metadata.brokers().len() == 0 {
541                Err(KafkaSinkPurificationError::ZeroBrokers)?;
542            }
543        }
544        CreateSinkConnection::Iceberg {
545            catalog_connection,
546            aws_connection,
547            ..
548        } => {
549            let scx = StatementContext::new(None, &catalog);
550            let connection = {
551                let item = scx.get_item_by_resolved_name(catalog_connection)?;
552                // Get Iceberg connection
553                match item.connection()? {
554                    Connection::IcebergCatalog(connection) => {
555                        connection.clone().into_inline_connection(scx.catalog)
556                    }
557                    _ => sql_bail!(
558                        "{} is not an iceberg connection",
559                        scx.catalog.resolve_full_name(item.name())
560                    ),
561                }
562            };
563
564            // For S3 Tables connections in the Materialize Cloud product, verify the
565            // AWS region matches the environment's region. This check only applies when
566            // the enable_s3_tables_region_check dyncfg is set.
567            if let Some(s3tables) = connection.s3tables_catalog() {
568                let enable_region_check =
569                    ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
570                if enable_region_check {
571                    let env_id = &catalog.config().environment_id;
572                    if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
573                        let env_region = env_id.cloud_provider_region();
574                        // Later on we default to "us-east-1" if the region is not set on the S3 Tables
575                        // connection, so we need to do the same check here.
576                        let s3_tables_region = s3tables
577                            .aws_connection
578                            .connection
579                            .region
580                            .clone()
581                            .unwrap_or_else(|| "us-east-1".to_string());
582                        if s3_tables_region != env_region {
583                            Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
584                                s3_tables_region,
585                                environment_region: env_region.to_string(),
586                            })?;
587                        }
588                    }
589                }
590            }
591
592            // Validate the sink's (optional) AWS connection even though we never use it.
593            // TODO(kynan): If we do start using the sink's creds, check again that this validation
594            //   accurately reflects what we need.
595            //   Consider rolling the storage creds validation into the catalog connection's "connect" fn,
596            //   which already validates the catalog creds (currently also used for the storage layer).
597            if let Some(aws_connection) = aws_connection {
598                let aws_conn_id = aws_connection.item_id();
599                let aws_connection = {
600                    let item = scx.get_item_by_resolved_name(aws_connection)?;
601                    // Get AWS connection
602                    match item.connection()? {
603                        Connection::Aws(aws_connection) => aws_connection.clone(),
604                        _ => sql_bail!(
605                            "{} is not an aws connection",
606                            scx.catalog.resolve_full_name(item.name())
607                        ),
608                    }
609                };
610
611                let _sdk_config = aws_connection
612                    .load_sdk_config(
613                        &storage_configuration.connection_context,
614                        aws_conn_id.clone(),
615                        InTask::No,
616                        mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
617                            .get(storage_configuration.config_set()),
618                    )
619                    .await
620                    .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
621            }
622
623            // Now that we've validated the sink's storage creds (if they exist)
624            // we _could_ use them to build a complete Iceberg client (both catalog and storage).
625            // TODO(kynan): Actually use those sink-specific creds here instead of ignoring them.
626            let _catalog = connection
627                .connect(storage_configuration, InTask::No)
628                .await
629                .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
630        }
631    }
632
633    let mut csr_connection_ids = BTreeSet::new();
634    for_each_format(format, |_, fmt| match fmt {
635        Format::Avro(AvroSchema::InlineSchema { .. })
636        | Format::Bytes
637        | Format::Csv { .. }
638        | Format::Json { .. }
639        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
640        | Format::Regex(..)
641        | Format::Text => (),
642        Format::Avro(AvroSchema::Csr {
643            csr_connection: CsrConnectionAvro { connection, .. },
644        })
645        | Format::Protobuf(ProtobufSchema::Csr {
646            csr_connection: CsrConnectionProtobuf { connection, .. },
647        }) => {
648            csr_connection_ids.insert(*connection.connection.item_id());
649        }
650    });
651
652    let scx = StatementContext::new(None, &catalog);
653    for csr_connection_id in csr_connection_ids {
654        let connection = {
655            let item = scx.get_item(&csr_connection_id);
656            // Get Kafka connection
657            match item.connection()? {
658                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
659                _ => Err(CsrPurificationError::NotCsrConnection(
660                    scx.catalog.resolve_full_name(item.name()),
661                ))?,
662            }
663        };
664
665        let client = connection
666            .connect(storage_configuration, InTask::No)
667            .await
668            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
669
670        client
671            .list_subjects()
672            .await
673            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
674    }
675
676    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
677
678    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
679}
680
681/// Runs a function on each format within the format specifier.
682///
683/// The function is provided with a `DocOnSchema` that indicates whether the
684/// format is for the key, value, or both.
685///
686// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
687fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
688where
689    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
690{
691    match format {
692        None => (),
693        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
694        Some(FormatSpecifier::KeyValue { key, value }) => {
695            f(DocOnSchema::KeyOnly, key);
696            f(DocOnSchema::ValueOnly, value);
697        }
698    }
699}
700
701/// Defines whether purification should enforce that at least one valid source
702/// reference is provided on the provided statement.
703#[derive(Debug, Copy, Clone, PartialEq, Eq)]
704pub(crate) enum SourceReferencePolicy {
705    /// Don't allow source references to be provided. This is used for
706    /// enforcing that `CREATE SOURCE` statements don't create subsources.
707    NotAllowed,
708    /// Allow empty references, such as when creating a source that
709    /// will have tables added afterwards.
710    Optional,
711    /// Require that at least one reference is resolved to an upstream
712    /// object.
713    Required,
714}
715
716async fn purify_create_source(
717    catalog: impl SessionCatalog,
718    now: u64,
719    mut create_source_stmt: CreateSourceStatement<Aug>,
720    storage_configuration: &StorageConfiguration,
721) -> Result<PurifiedStatement, PlanError> {
722    let CreateSourceStatement {
723        name: source_name,
724        col_names,
725        key_constraint,
726        connection: source_connection,
727        format,
728        envelope,
729        include_metadata,
730        external_references,
731        progress_subsource,
732        with_options,
733        ..
734    } = &mut create_source_stmt;
735
736    let uses_old_syntax = !col_names.is_empty()
737        || key_constraint.is_some()
738        || format.is_some()
739        || envelope.is_some()
740        || !include_metadata.is_empty()
741        || external_references.is_some()
742        || progress_subsource.is_some();
743
744    if let Some(DeferredItemName::Named(_)) = progress_subsource {
745        sql_bail!("Cannot manually ID qualify progress subsource")
746    }
747
748    let mut requested_subsource_map = BTreeMap::new();
749
750    let progress_desc = match &source_connection {
751        CreateSourceConnection::Kafka { .. } => {
752            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
753        }
754        CreateSourceConnection::Postgres { .. } => {
755            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
756        }
757        CreateSourceConnection::SqlServer { .. } => {
758            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
759        }
760        CreateSourceConnection::MySql { .. } => {
761            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
762        }
763        CreateSourceConnection::LoadGenerator { .. } => {
764            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
765        }
766    };
767    let scx = StatementContext::new(None, &catalog);
768
769    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
770    // to add tables after this source is created we might need to enforce that
771    // auto-generated subsources are created or not created by this source statement.
772    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
773        && scx.catalog.system_vars().force_source_table_syntax()
774    {
775        SourceReferencePolicy::NotAllowed
776    } else if scx.catalog.system_vars().enable_create_table_from_source() {
777        SourceReferencePolicy::Optional
778    } else {
779        SourceReferencePolicy::Required
780    };
781
782    let mut format_options = SourceFormatOptions::Default;
783
784    let retrieved_source_references: RetrievedSourceReferences;
785
786    match source_connection {
787        CreateSourceConnection::Kafka {
788            connection,
789            options: base_with_options,
790            ..
791        } => {
792            if let Some(external_references) = external_references {
793                Err(KafkaSourcePurificationError::ReferencedSubsources(
794                    external_references.clone(),
795                ))?;
796            }
797
798            let connection = {
799                let item = scx.get_item_by_resolved_name(connection)?;
800                // Get Kafka connection
801                match item.connection()? {
802                    Connection::Kafka(connection) => {
803                        connection.clone().into_inline_connection(&catalog)
804                    }
805                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
806                        scx.catalog.resolve_full_name(item.name()),
807                    ))?,
808                }
809            };
810
811            let extracted_options: KafkaSourceConfigOptionExtracted =
812                base_with_options.clone().try_into()?;
813
814            let topic = extracted_options
815                .topic
816                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
817
818            let consumer = connection
819                .create_with_context(
820                    storage_configuration,
821                    MzClientContext::default(),
822                    &BTreeMap::new(),
823                    InTask::No,
824                )
825                .await
826                .map_err(|e| {
827                    // anyhow doesn't support Clone, so not trivial to move into PlanError
828                    KafkaSourcePurificationError::KafkaConsumerError(
829                        e.display_with_causes().to_string(),
830                    )
831                })?;
832            let consumer = Arc::new(consumer);
833
834            match (
835                extracted_options.start_offset,
836                extracted_options.start_timestamp,
837            ) {
838                (None, None) => {
839                    // Validate that the topic at least exists.
840                    kafka_util::ensure_topic_exists(
841                        Arc::clone(&consumer),
842                        &topic,
843                        storage_configuration
844                            .parameters
845                            .kafka_timeout_config
846                            .fetch_metadata_timeout,
847                    )
848                    .await?;
849                }
850                (Some(_), Some(_)) => {
851                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
852                }
853                (Some(start_offsets), None) => {
854                    // Validate the start offsets.
855                    kafka_util::validate_start_offsets(
856                        Arc::clone(&consumer),
857                        &topic,
858                        start_offsets,
859                        storage_configuration
860                            .parameters
861                            .kafka_timeout_config
862                            .fetch_metadata_timeout,
863                    )
864                    .await?;
865                }
866                (None, Some(time_offset)) => {
867                    // Translate `START TIMESTAMP` to a start offset.
868                    let start_offsets = kafka_util::lookup_start_offsets(
869                        Arc::clone(&consumer),
870                        &topic,
871                        time_offset,
872                        now,
873                        storage_configuration
874                            .parameters
875                            .kafka_timeout_config
876                            .fetch_metadata_timeout,
877                    )
878                    .await?;
879
880                    base_with_options.retain(|val| {
881                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
882                    });
883                    base_with_options.push(KafkaSourceConfigOption {
884                        name: KafkaSourceConfigOptionName::StartOffset,
885                        value: Some(WithOptionValue::Sequence(
886                            start_offsets
887                                .iter()
888                                .map(|offset| {
889                                    WithOptionValue::Value(Value::Number(offset.to_string()))
890                                })
891                                .collect(),
892                        )),
893                    });
894                }
895            }
896
897            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
898            retrieved_source_references = reference_client.get_source_references().await?;
899
900            format_options = SourceFormatOptions::Kafka { topic };
901        }
902        CreateSourceConnection::Postgres {
903            connection,
904            options,
905        } => {
906            let connection_item = scx.get_item_by_resolved_name(connection)?;
907            let connection = match connection_item.connection().map_err(PlanError::from)? {
908                Connection::Postgres(connection) => {
909                    connection.clone().into_inline_connection(&catalog)
910                }
911                _ => Err(PgSourcePurificationError::NotPgConnection(
912                    scx.catalog.resolve_full_name(connection_item.name()),
913                ))?,
914            };
915            let crate::plan::statement::PgConfigOptionExtracted {
916                publication,
917                text_columns,
918                exclude_columns,
919                details,
920                ..
921            } = options.clone().try_into()?;
922            let publication =
923                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
924
925            if details.is_some() {
926                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
927            }
928
929            let client = connection
930                .validate(connection_item.id(), storage_configuration)
931                .await?;
932
933            let reference_client = SourceReferenceClient::Postgres {
934                client: &client,
935                publication: &publication,
936                database: &connection.database,
937            };
938            retrieved_source_references = reference_client.get_source_references().await?;
939
940            let postgres::PurifiedSourceExports {
941                source_exports: subsources,
942                normalized_text_columns,
943            } = postgres::purify_source_exports(
944                &client,
945                &retrieved_source_references,
946                external_references,
947                text_columns,
948                exclude_columns,
949                source_name,
950                &reference_policy,
951            )
952            .await?;
953
954            if let Some(text_cols_option) = options
955                .iter_mut()
956                .find(|option| option.name == PgConfigOptionName::TextColumns)
957            {
958                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
959            }
960
961            requested_subsource_map.extend(subsources);
962
963            // Record the active replication timeline_id to allow detection of a future upstream
964            // point-in-time-recovery that will put the source into an error state.
965            let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
966
967            // Record whether the upstream server is a physical replica, which changes how we can determine
968            // the latest LSN.
969            let is_physical_replica = Some(mz_postgres_util::get_is_in_recovery(&client).await?);
970
971            // Remove any old detail references
972            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
973            let details = PostgresSourcePublicationDetails {
974                slot: format!(
975                    "materialize_{}",
976                    Uuid::new_v4().to_string().replace('-', "")
977                ),
978                timeline_id: Some(timeline_id),
979                database: connection.database,
980                is_physical_replica,
981            };
982            options.push(PgConfigOption {
983                name: PgConfigOptionName::Details,
984                value: Some(WithOptionValue::Value(Value::String(hex::encode(
985                    details.into_proto().encode_to_vec(),
986                )))),
987            })
988        }
989        CreateSourceConnection::SqlServer {
990            connection,
991            options,
992        } => {
993            // Connect to the upstream SQL Server instance so we can validate
994            // we're compatible with CDC.
995            let connection_item = scx.get_item_by_resolved_name(connection)?;
996            let connection = match connection_item.connection()? {
997                Connection::SqlServer(connection) => {
998                    connection.clone().into_inline_connection(&catalog)
999                }
1000                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
1001                    scx.catalog.resolve_full_name(connection_item.name()),
1002                ))?,
1003            };
1004            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
1005                details,
1006                text_columns,
1007                exclude_columns,
1008                seen: _,
1009            } = options.clone().try_into()?;
1010
1011            if details.is_some() {
1012                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
1013            }
1014
1015            let mut client = connection
1016                .validate(connection_item.id(), storage_configuration)
1017                .await?;
1018
1019            let database: Arc<str> = connection.database.into();
1020            let reference_client = SourceReferenceClient::SqlServer {
1021                client: &mut client,
1022                database: Arc::clone(&database),
1023            };
1024            retrieved_source_references = reference_client.get_source_references().await?;
1025            tracing::debug!(?retrieved_source_references, "got source references");
1026
1027            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1028                .get(storage_configuration.config_set());
1029
1030            let purified_source_exports = sql_server::purify_source_exports(
1031                &*database,
1032                &mut client,
1033                &retrieved_source_references,
1034                external_references,
1035                &text_columns,
1036                &exclude_columns,
1037                source_name,
1038                timeout,
1039                &reference_policy,
1040            )
1041            .await?;
1042
1043            let sql_server::PurifiedSourceExports {
1044                source_exports,
1045                normalized_text_columns,
1046                normalized_excl_columns,
1047            } = purified_source_exports;
1048
1049            // Update our set of requested source exports.
1050            requested_subsource_map.extend(source_exports);
1051
1052            // Record the most recent restore_history_id, or none if the system has never been
1053            // restored.
1054
1055            let restore_history_id =
1056                mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1057            let details = SqlServerSourceExtras { restore_history_id };
1058
1059            options.retain(|SqlServerConfigOption { name, .. }| {
1060                name != &SqlServerConfigOptionName::Details
1061            });
1062            options.push(SqlServerConfigOption {
1063                name: SqlServerConfigOptionName::Details,
1064                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1065                    details.into_proto().encode_to_vec(),
1066                )))),
1067            });
1068
1069            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1070            if let Some(text_cols_option) = options
1071                .iter_mut()
1072                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1073            {
1074                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1075            }
1076            if let Some(excl_cols_option) = options
1077                .iter_mut()
1078                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1079            {
1080                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1081            }
1082        }
1083        CreateSourceConnection::MySql {
1084            connection,
1085            options,
1086        } => {
1087            let connection_item = scx.get_item_by_resolved_name(connection)?;
1088            let connection = match connection_item.connection()? {
1089                Connection::MySql(connection) => {
1090                    connection.clone().into_inline_connection(&catalog)
1091                }
1092                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1093                    scx.catalog.resolve_full_name(connection_item.name()),
1094                ))?,
1095            };
1096            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1097                details,
1098                text_columns,
1099                exclude_columns,
1100                seen: _,
1101            } = options.clone().try_into()?;
1102
1103            if details.is_some() {
1104                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1105            }
1106
1107            let mut conn = connection
1108                .validate(connection_item.id(), storage_configuration)
1109                .await
1110                .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1111
1112            // Retrieve the current @gtid_executed value of the server to mark as the effective
1113            // initial snapshot point such that we can ensure consistency if the initial source
1114            // snapshot is broken up over multiple points in time.
1115            let initial_gtid_set =
1116                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1117
1118            let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1119
1120            let reference_client = SourceReferenceClient::MySql {
1121                conn: &mut conn,
1122                include_system_schemas: mysql::references_system_schemas(external_references),
1123            };
1124            retrieved_source_references = reference_client.get_source_references().await?;
1125
1126            let mysql::PurifiedSourceExports {
1127                source_exports: subsources,
1128                normalized_text_columns,
1129                normalized_exclude_columns,
1130            } = mysql::purify_source_exports(
1131                &mut conn,
1132                &retrieved_source_references,
1133                external_references,
1134                text_columns,
1135                exclude_columns,
1136                source_name,
1137                initial_gtid_set.clone(),
1138                &reference_policy,
1139                binlog_full_metadata,
1140            )
1141            .await?;
1142            requested_subsource_map.extend(subsources);
1143
1144            // We don't have any fields in this details struct but keep this around for
1145            // conformity with postgres and in-case we end up needing it again in the future.
1146            let details = MySqlSourceDetails {};
1147            // Update options with the purified details
1148            options
1149                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1150            options.push(MySqlConfigOption {
1151                name: MySqlConfigOptionName::Details,
1152                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1153                    details.into_proto().encode_to_vec(),
1154                )))),
1155            });
1156
1157            if let Some(text_cols_option) = options
1158                .iter_mut()
1159                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1160            {
1161                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1162            }
1163            if let Some(exclude_cols_option) = options
1164                .iter_mut()
1165                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1166            {
1167                exclude_cols_option.value =
1168                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1169            }
1170        }
1171        CreateSourceConnection::LoadGenerator { generator, options } => {
1172            let load_generator =
1173                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1174
1175            let reference_client = SourceReferenceClient::LoadGenerator {
1176                generator: &load_generator,
1177            };
1178            retrieved_source_references = reference_client.get_source_references().await?;
1179            // Filter to the references that need to be created as 'subsources', which
1180            // doesn't include the default output for single-output sources.
1181            // TODO(database-issues#8620): Remove once subsources are removed
1182            let multi_output_sources =
1183                retrieved_source_references
1184                    .all_references()
1185                    .iter()
1186                    .any(|r| {
1187                        matches!(
1188                            r.load_generator_output(),
1189                            Some(output) if output != &LoadGeneratorOutput::Default
1190                        )
1191                    });
1192
1193            match external_references {
1194                Some(requested)
1195                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1196                {
1197                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1198                }
1199                Some(requested) if !multi_output_sources => match requested {
1200                    ExternalReferences::SubsetTables(_) => {
1201                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1202                    }
1203                    ExternalReferences::SubsetSchemas(_) => {
1204                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1205                    }
1206                    ExternalReferences::All => {
1207                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1208                    }
1209                },
1210                Some(requested) => {
1211                    let requested_exports = retrieved_source_references
1212                        .requested_source_exports(Some(requested), source_name)?;
1213                    for export in requested_exports {
1214                        requested_subsource_map.insert(
1215                            export.name,
1216                            PurifiedSourceExport {
1217                                external_reference: export.external_reference,
1218                                details: PurifiedExportDetails::LoadGenerator {
1219                                    table: export
1220                                        .meta
1221                                        .load_generator_desc()
1222                                        .ok_or_else(|| {
1223                                            internal_err!(
1224                                                "expected load generator source reference"
1225                                            )
1226                                        })?
1227                                        .clone(),
1228                                    output: export
1229                                        .meta
1230                                        .load_generator_output()
1231                                        .ok_or_else(|| {
1232                                            internal_err!(
1233                                                "expected load generator source reference"
1234                                            )
1235                                        })?
1236                                        .clone(),
1237                                },
1238                            },
1239                        );
1240                    }
1241                }
1242                None => {
1243                    if multi_output_sources
1244                        && matches!(reference_policy, SourceReferencePolicy::Required)
1245                    {
1246                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1247                    }
1248                }
1249            }
1250
1251            if let LoadGenerator::Clock = generator {
1252                if !options
1253                    .iter()
1254                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1255                {
1256                    let now = catalog.now();
1257                    options.push(LoadGeneratorOption {
1258                        name: LoadGeneratorOptionName::AsOf,
1259                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1260                    });
1261                }
1262            }
1263        }
1264    }
1265
1266    // Now that we know which subsources to create alongside this
1267    // statement, remove the references so it is not canonicalized as
1268    // part of the `CREATE SOURCE` statement in the catalog.
1269    *external_references = None;
1270
1271    // Generate progress subsource for old syntax
1272    let create_progress_subsource_stmt = if uses_old_syntax {
1273        // Take name from input or generate name
1274        let name = match progress_subsource {
1275            Some(name) => match name {
1276                DeferredItemName::Deferred(name) => name.clone(),
1277                // Already checked for this value above.
1278                DeferredItemName::Named(_) => {
1279                    sql_bail!("progress subsource name cannot be a resolved name")
1280                }
1281            },
1282            None => {
1283                let (item, prefix) = source_name
1284                    .0
1285                    .split_last()
1286                    .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1287                let item_name =
1288                    Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1289                        let mut suggested_name = prefix.to_vec();
1290                        suggested_name.push(candidate.clone());
1291
1292                        let partial =
1293                            normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1294                        let qualified = scx.allocate_qualified_name(partial)?;
1295                        let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1296                        let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1297                        Ok::<_, PlanError>(!item_exists && !type_exists)
1298                    })?;
1299
1300                let mut full_name = prefix.to_vec();
1301                full_name.push(item_name);
1302                let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1303                let qualified_name = scx.allocate_qualified_name(full_name)?;
1304                let full_name = scx.catalog.resolve_full_name(&qualified_name);
1305
1306                UnresolvedItemName::from(full_name.clone())
1307            }
1308        };
1309
1310        let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1311
1312        // Create the subsource statement
1313        let mut progress_with_options: Vec<_> = with_options
1314            .iter()
1315            .filter_map(|opt| match opt.name {
1316                CreateSourceOptionName::TimestampInterval => None,
1317                CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1318                    name: CreateSubsourceOptionName::RetainHistory,
1319                    value: opt.value.clone(),
1320                }),
1321            })
1322            .collect();
1323        progress_with_options.push(CreateSubsourceOption {
1324            name: CreateSubsourceOptionName::Progress,
1325            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1326        });
1327
1328        Some(CreateSubsourceStatement {
1329            name,
1330            columns,
1331            // Progress subsources do not refer to the source to which they belong.
1332            // Instead the primary source depends on it (the opposite is true of
1333            // ingestion exports, which depend on the primary source).
1334            of_source: None,
1335            constraints,
1336            if_not_exists: false,
1337            with_options: progress_with_options,
1338        })
1339    } else {
1340        None
1341    };
1342
1343    purify_source_format(
1344        &catalog,
1345        format,
1346        &format_options,
1347        envelope,
1348        storage_configuration,
1349    )
1350    .await?;
1351
1352    Ok(PurifiedStatement::PurifiedCreateSource {
1353        create_progress_subsource_stmt,
1354        create_source_stmt,
1355        subsources: requested_subsource_map,
1356        available_source_references: retrieved_source_references.available_source_references(),
1357    })
1358}
1359
1360/// On success, returns the details on new subsources and updated
1361/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1362async fn purify_alter_source(
1363    catalog: impl SessionCatalog,
1364    stmt: AlterSourceStatement<Aug>,
1365    storage_configuration: &StorageConfiguration,
1366) -> Result<PurifiedStatement, PlanError> {
1367    let scx = StatementContext::new(None, &catalog);
1368    let AlterSourceStatement {
1369        source_name: unresolved_source_name,
1370        action,
1371        if_exists,
1372    } = stmt;
1373
1374    // Get name.
1375    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1376        Ok(item) => item,
1377        Err(_) if if_exists => {
1378            return Ok(PurifiedStatement::PurifiedAlterSource {
1379                alter_source_stmt: AlterSourceStatement {
1380                    source_name: unresolved_source_name,
1381                    action,
1382                    if_exists,
1383                },
1384            });
1385        }
1386        Err(e) => return Err(e),
1387    };
1388
1389    // Ensure it's an ingestion-based and alterable source.
1390    let desc = match item.source_desc()? {
1391        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1392        None => {
1393            sql_bail!("cannot ALTER this type of source")
1394        }
1395    };
1396
1397    let source_name = item.name();
1398
1399    let resolved_source_name = ResolvedItemName::Item {
1400        id: item.id(),
1401        qualifiers: item.name().qualifiers.clone(),
1402        full_name: scx.catalog.resolve_full_name(source_name),
1403        print_id: true,
1404        version: RelationVersionSelector::Latest,
1405    };
1406
1407    let partial_name = scx.catalog.minimal_qualification(source_name);
1408
1409    match action {
1410        AlterSourceAction::AddSubsources {
1411            external_references,
1412            options,
1413        } => {
1414            if scx.catalog.system_vars().enable_create_table_from_source()
1415                && scx.catalog.system_vars().force_source_table_syntax()
1416            {
1417                Err(PlanError::UseTablesForSources(
1418                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1419                ))?;
1420            }
1421
1422            purify_alter_source_add_subsources(
1423                external_references,
1424                options,
1425                desc,
1426                partial_name,
1427                unresolved_source_name,
1428                resolved_source_name,
1429                storage_configuration,
1430            )
1431            .await
1432        }
1433        AlterSourceAction::RefreshReferences => {
1434            purify_alter_source_refresh_references(
1435                desc,
1436                resolved_source_name,
1437                storage_configuration,
1438            )
1439            .await
1440        }
1441        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1442            alter_source_stmt: AlterSourceStatement {
1443                source_name: unresolved_source_name,
1444                action,
1445                if_exists,
1446            },
1447        }),
1448    }
1449}
1450
1451// TODO(database-issues#8620): Remove once subsources are removed
1452/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1453async fn purify_alter_source_add_subsources(
1454    external_references: Vec<ExternalReferenceExport>,
1455    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1456    desc: SourceDesc,
1457    partial_source_name: PartialItemName,
1458    unresolved_source_name: UnresolvedItemName,
1459    resolved_source_name: ResolvedItemName,
1460    storage_configuration: &StorageConfiguration,
1461) -> Result<PurifiedStatement, PlanError> {
1462    // Validate this is a source that can have subsources added.
1463    let connection_id = match &desc.connection {
1464        GenericSourceConnection::Postgres(c) => c.connection_id,
1465        GenericSourceConnection::MySql(c) => c.connection_id,
1466        GenericSourceConnection::SqlServer(c) => c.connection_id,
1467        _ => sql_bail!(
1468            "source {} does not support ALTER SOURCE.",
1469            partial_source_name
1470        ),
1471    };
1472
1473    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1474        text_columns,
1475        exclude_columns,
1476        details,
1477        seen: _,
1478    } = options.clone().try_into()?;
1479    if details.is_some() {
1480        sql_bail!("DETAILS option cannot be explicitly set");
1481    }
1482
1483    let mut requested_subsource_map = BTreeMap::new();
1484
1485    match desc.connection {
1486        GenericSourceConnection::Postgres(pg_source_connection) => {
1487            // Get PostgresConnection for generating subsources.
1488            let pg_connection = &pg_source_connection.connection;
1489
1490            let client = pg_connection
1491                .validate(connection_id, storage_configuration)
1492                .await?;
1493
1494            let reference_client = SourceReferenceClient::Postgres {
1495                client: &client,
1496                publication: &pg_source_connection.publication,
1497                database: &pg_connection.database,
1498            };
1499            let retrieved_source_references = reference_client.get_source_references().await?;
1500
1501            let postgres::PurifiedSourceExports {
1502                source_exports: subsources,
1503                normalized_text_columns,
1504            } = postgres::purify_source_exports(
1505                &client,
1506                &retrieved_source_references,
1507                &Some(ExternalReferences::SubsetTables(external_references)),
1508                text_columns,
1509                exclude_columns,
1510                &unresolved_source_name,
1511                &SourceReferencePolicy::Required,
1512            )
1513            .await?;
1514
1515            if let Some(text_cols_option) = options
1516                .iter_mut()
1517                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1518            {
1519                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1520            }
1521
1522            requested_subsource_map.extend(subsources);
1523        }
1524        GenericSourceConnection::MySql(mysql_source_connection) => {
1525            let mysql_connection = &mysql_source_connection.connection;
1526            let config = mysql_connection
1527                .config(
1528                    &storage_configuration.connection_context.secrets_reader,
1529                    storage_configuration,
1530                    InTask::No,
1531                )
1532                .await?;
1533
1534            let mut conn = config
1535                .connect(
1536                    "mysql purification",
1537                    &storage_configuration.connection_context.ssh_tunnel_manager,
1538                )
1539                .await?;
1540
1541            // Retrieve the current @gtid_executed value of the server to mark as the effective
1542            // initial snapshot point for these subsources.
1543            let initial_gtid_set =
1544                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1545
1546            let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1547
1548            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1549
1550            let reference_client = SourceReferenceClient::MySql {
1551                conn: &mut conn,
1552                include_system_schemas: mysql::references_system_schemas(&requested_references),
1553            };
1554            let retrieved_source_references = reference_client.get_source_references().await?;
1555
1556            let mysql::PurifiedSourceExports {
1557                source_exports: subsources,
1558                normalized_text_columns,
1559                normalized_exclude_columns,
1560            } = mysql::purify_source_exports(
1561                &mut conn,
1562                &retrieved_source_references,
1563                &requested_references,
1564                text_columns,
1565                exclude_columns,
1566                &unresolved_source_name,
1567                initial_gtid_set,
1568                &SourceReferencePolicy::Required,
1569                binlog_full_metadata,
1570            )
1571            .await?;
1572            requested_subsource_map.extend(subsources);
1573
1574            // Update options with the purified details
1575            if let Some(text_cols_option) = options
1576                .iter_mut()
1577                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1578            {
1579                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1580            }
1581            if let Some(exclude_cols_option) = options
1582                .iter_mut()
1583                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1584            {
1585                exclude_cols_option.value =
1586                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1587            }
1588        }
1589        GenericSourceConnection::SqlServer(sql_server_source) => {
1590            // Open a connection to the upstream SQL Server instance.
1591            let sql_server_connection = &sql_server_source.connection;
1592            let config = sql_server_connection
1593                .resolve_config(
1594                    &storage_configuration.connection_context.secrets_reader,
1595                    storage_configuration,
1596                    InTask::No,
1597                )
1598                .await?;
1599            let mut client = mz_sql_server_util::Client::connect(config).await?;
1600
1601            // Query the upstream SQL Server instance for available tables to replicate.
1602            let database = sql_server_connection.database.clone().into();
1603            let source_references = SourceReferenceClient::SqlServer {
1604                client: &mut client,
1605                database: Arc::clone(&database),
1606            }
1607            .get_source_references()
1608            .await?;
1609            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1610
1611            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1612                .get(storage_configuration.config_set());
1613
1614            let result = sql_server::purify_source_exports(
1615                &*database,
1616                &mut client,
1617                &source_references,
1618                &requested_references,
1619                &text_columns,
1620                &exclude_columns,
1621                &unresolved_source_name,
1622                timeout,
1623                &SourceReferencePolicy::Required,
1624            )
1625            .await;
1626            let sql_server::PurifiedSourceExports {
1627                source_exports,
1628                normalized_text_columns,
1629                normalized_excl_columns,
1630            } = result?;
1631
1632            // Add the new exports to our subsource map.
1633            requested_subsource_map.extend(source_exports);
1634
1635            // Update options on the CREATE SOURCE statement with the purified details.
1636            if let Some(text_cols_option) = options
1637                .iter_mut()
1638                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1639            {
1640                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1641            }
1642            if let Some(exclude_cols_option) = options
1643                .iter_mut()
1644                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1645            {
1646                exclude_cols_option.value =
1647                    Some(WithOptionValue::Sequence(normalized_excl_columns));
1648            }
1649        }
1650        // The source kind was already established earlier in this function;
1651        // reaching this catch-all is an internal invariant violation.
1652        _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1653    };
1654
1655    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1656        source_name: resolved_source_name,
1657        options,
1658        subsources: requested_subsource_map,
1659    })
1660}
1661
1662async fn purify_alter_source_refresh_references(
1663    desc: SourceDesc,
1664    resolved_source_name: ResolvedItemName,
1665    storage_configuration: &StorageConfiguration,
1666) -> Result<PurifiedStatement, PlanError> {
1667    let retrieved_source_references = match desc.connection {
1668        GenericSourceConnection::Postgres(pg_source_connection) => {
1669            // Get PostgresConnection for generating subsources.
1670            let pg_connection = &pg_source_connection.connection;
1671
1672            let config = pg_connection
1673                .config(
1674                    &storage_configuration.connection_context.secrets_reader,
1675                    storage_configuration,
1676                    InTask::No,
1677                )
1678                .await?;
1679
1680            let client = config
1681                .connect(
1682                    "postgres_purification",
1683                    &storage_configuration.connection_context.ssh_tunnel_manager,
1684                )
1685                .await?;
1686            let reference_client = SourceReferenceClient::Postgres {
1687                client: &client,
1688                publication: &pg_source_connection.publication,
1689                database: &pg_connection.database,
1690            };
1691            reference_client.get_source_references().await?
1692        }
1693        GenericSourceConnection::MySql(mysql_source_connection) => {
1694            let mysql_connection = &mysql_source_connection.connection;
1695            let config = mysql_connection
1696                .config(
1697                    &storage_configuration.connection_context.secrets_reader,
1698                    storage_configuration,
1699                    InTask::No,
1700                )
1701                .await?;
1702
1703            let mut conn = config
1704                .connect(
1705                    "mysql purification",
1706                    &storage_configuration.connection_context.ssh_tunnel_manager,
1707                )
1708                .await?;
1709
1710            let reference_client = SourceReferenceClient::MySql {
1711                conn: &mut conn,
1712                include_system_schemas: false,
1713            };
1714            reference_client.get_source_references().await?
1715        }
1716        GenericSourceConnection::SqlServer(sql_server_source) => {
1717            // Open a connection to the upstream SQL Server instance.
1718            let sql_server_connection = &sql_server_source.connection;
1719            let config = sql_server_connection
1720                .resolve_config(
1721                    &storage_configuration.connection_context.secrets_reader,
1722                    storage_configuration,
1723                    InTask::No,
1724                )
1725                .await?;
1726            let mut client = mz_sql_server_util::Client::connect(config).await?;
1727
1728            // Query the upstream SQL Server instance for available tables to replicate.
1729            let source_references = SourceReferenceClient::SqlServer {
1730                client: &mut client,
1731                database: sql_server_connection.database.clone().into(),
1732            }
1733            .get_source_references()
1734            .await?;
1735            source_references
1736        }
1737        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1738            let reference_client = SourceReferenceClient::LoadGenerator {
1739                generator: &load_gen_connection.load_generator,
1740            };
1741            reference_client.get_source_references().await?
1742        }
1743        GenericSourceConnection::Kafka(kafka_conn) => {
1744            let reference_client = SourceReferenceClient::Kafka {
1745                topic: &kafka_conn.topic,
1746            };
1747            reference_client.get_source_references().await?
1748        }
1749    };
1750    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1751        source_name: resolved_source_name,
1752        available_source_references: retrieved_source_references.available_source_references(),
1753    })
1754}
1755
1756async fn purify_create_table_from_source(
1757    catalog: impl SessionCatalog,
1758    mut stmt: CreateTableFromSourceStatement<Aug>,
1759    storage_configuration: &StorageConfiguration,
1760) -> Result<PurifiedStatement, PlanError> {
1761    let scx = StatementContext::new(None, &catalog);
1762    let CreateTableFromSourceStatement {
1763        name: _,
1764        columns,
1765        constraints,
1766        source: source_name,
1767        if_not_exists: _,
1768        external_reference,
1769        format,
1770        envelope,
1771        include_metadata: _,
1772        with_options,
1773    } = &mut stmt;
1774
1775    // Columns and constraints cannot be specified by the user but will be populated below.
1776    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1777        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1778    }
1779    if !constraints.is_empty() {
1780        sql_bail!(
1781            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1782        );
1783    }
1784
1785    // Get the source item
1786    let item = match scx.get_item_by_resolved_name(source_name) {
1787        Ok(item) => item,
1788        Err(e) => return Err(e),
1789    };
1790
1791    // Ensure it's an ingestion-based and alterable source.
1792    let desc = match item.source_desc()? {
1793        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1794        None => {
1795            sql_bail!("cannot ALTER this type of source")
1796        }
1797    };
1798    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1799
1800    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1801        text_columns,
1802        exclude_columns,
1803        retain_history: _,
1804        details,
1805        partition_by: _,
1806        seen: _,
1807    } = with_options.clone().try_into()?;
1808    if details.is_some() {
1809        sql_bail!("DETAILS option cannot be explicitly set");
1810    }
1811
1812    // Our text column values are unqualified (just column names), but the purification methods below
1813    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1814    // need to qualify them using the external reference first.
1815    let qualified_text_columns = text_columns
1816        .iter()
1817        .map(|col| {
1818            UnresolvedItemName(
1819                external_reference
1820                    .as_ref()
1821                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1822                    .unwrap_or_else(|| vec![col.clone()]),
1823            )
1824        })
1825        .collect_vec();
1826    let qualified_exclude_columns = exclude_columns
1827        .iter()
1828        .map(|col| {
1829            UnresolvedItemName(
1830                external_reference
1831                    .as_ref()
1832                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1833                    .unwrap_or_else(|| vec![col.clone()]),
1834            )
1835        })
1836        .collect_vec();
1837
1838    // Should be overriden below if a source-specific format is required.
1839    let mut format_options = SourceFormatOptions::Default;
1840
1841    let retrieved_source_references: RetrievedSourceReferences;
1842
1843    let requested_references = external_reference.as_ref().map(|ref_name| {
1844        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1845            reference: ref_name.clone(),
1846            alias: None,
1847        }])
1848    });
1849
1850    // Run purification work specific to each source type: resolve the external reference to
1851    // a fully qualified name and obtain the appropriate details for the source-export statement
1852    let purified_export = match desc.connection {
1853        GenericSourceConnection::Postgres(pg_source_connection) => {
1854            // Get PostgresConnection for generating subsources.
1855            let pg_connection = &pg_source_connection.connection;
1856
1857            let client = pg_connection
1858                .validate(pg_source_connection.connection_id, storage_configuration)
1859                .await?;
1860
1861            let reference_client = SourceReferenceClient::Postgres {
1862                client: &client,
1863                publication: &pg_source_connection.publication,
1864                database: &pg_connection.database,
1865            };
1866            retrieved_source_references = reference_client.get_source_references().await?;
1867
1868            let postgres::PurifiedSourceExports {
1869                source_exports,
1870                // TODO(database-issues#8620): Remove once subsources are removed
1871                // This `normalized_text_columns` is not relevant for us and is only returned for
1872                // `CREATE SOURCE` statements that automatically generate subsources
1873                normalized_text_columns: _,
1874            } = postgres::purify_source_exports(
1875                &client,
1876                &retrieved_source_references,
1877                &requested_references,
1878                qualified_text_columns,
1879                qualified_exclude_columns,
1880                &unresolved_source_name,
1881                &SourceReferencePolicy::Required,
1882            )
1883            .await?;
1884            // There should be exactly one source_export returned for this statement
1885            let (_, purified_export) = source_exports.into_element();
1886            purified_export
1887        }
1888        GenericSourceConnection::MySql(mysql_source_connection) => {
1889            let mysql_connection = &mysql_source_connection.connection;
1890            let config = mysql_connection
1891                .config(
1892                    &storage_configuration.connection_context.secrets_reader,
1893                    storage_configuration,
1894                    InTask::No,
1895                )
1896                .await?;
1897
1898            let mut conn = config
1899                .connect(
1900                    "mysql purification",
1901                    &storage_configuration.connection_context.ssh_tunnel_manager,
1902                )
1903                .await?;
1904
1905            ensure_binlog_full_metadata(&mut conn).await?;
1906            let binlog_full_metadata = true;
1907
1908            // Retrieve the current @gtid_executed value of the server to mark as the effective
1909            // initial snapshot point for this table.
1910            let initial_gtid_set =
1911                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1912
1913            let reference_client = SourceReferenceClient::MySql {
1914                conn: &mut conn,
1915                include_system_schemas: mysql::references_system_schemas(&requested_references),
1916            };
1917            retrieved_source_references = reference_client.get_source_references().await?;
1918
1919            let mysql::PurifiedSourceExports {
1920                source_exports,
1921                // TODO(database-issues#8620): Remove once subsources are removed
1922                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1923                // `CREATE SOURCE` statements that automatically generate subsources
1924                normalized_text_columns: _,
1925                normalized_exclude_columns: _,
1926            } = mysql::purify_source_exports(
1927                &mut conn,
1928                &retrieved_source_references,
1929                &requested_references,
1930                qualified_text_columns,
1931                qualified_exclude_columns,
1932                &unresolved_source_name,
1933                initial_gtid_set,
1934                &SourceReferencePolicy::Required,
1935                binlog_full_metadata,
1936            )
1937            .await?;
1938            // There should be exactly one source_export returned for this statement
1939            let (_, purified_export) = source_exports.into_element();
1940            purified_export
1941        }
1942        GenericSourceConnection::SqlServer(sql_server_source) => {
1943            let connection = sql_server_source.connection;
1944            let config = connection
1945                .resolve_config(
1946                    &storage_configuration.connection_context.secrets_reader,
1947                    storage_configuration,
1948                    InTask::No,
1949                )
1950                .await?;
1951            let mut client = mz_sql_server_util::Client::connect(config).await?;
1952
1953            let database: Arc<str> = connection.database.into();
1954            let reference_client = SourceReferenceClient::SqlServer {
1955                client: &mut client,
1956                database: Arc::clone(&database),
1957            };
1958            retrieved_source_references = reference_client.get_source_references().await?;
1959            tracing::debug!(?retrieved_source_references, "got source references");
1960
1961            let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1962                .get(storage_configuration.config_set());
1963
1964            let purified_source_exports = sql_server::purify_source_exports(
1965                &*database,
1966                &mut client,
1967                &retrieved_source_references,
1968                &requested_references,
1969                &qualified_text_columns,
1970                &qualified_exclude_columns,
1971                &unresolved_source_name,
1972                timeout,
1973                &SourceReferencePolicy::Required,
1974            )
1975            .await?;
1976
1977            // There should be exactly one source_export returned for this statement
1978            let (_, purified_export) = purified_source_exports.source_exports.into_element();
1979            purified_export
1980        }
1981        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1982            let reference_client = SourceReferenceClient::LoadGenerator {
1983                generator: &load_gen_connection.load_generator,
1984            };
1985            retrieved_source_references = reference_client.get_source_references().await?;
1986
1987            let requested_exports = retrieved_source_references
1988                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1989            // There should be exactly one source_export returned
1990            let export = requested_exports.into_element();
1991            PurifiedSourceExport {
1992                external_reference: export.external_reference,
1993                details: PurifiedExportDetails::LoadGenerator {
1994                    table: export
1995                        .meta
1996                        .load_generator_desc()
1997                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
1998                        .clone(),
1999                    output: export
2000                        .meta
2001                        .load_generator_output()
2002                        .ok_or_else(|| internal_err!("expected load generator source reference"))?
2003                        .clone(),
2004                },
2005            }
2006        }
2007        GenericSourceConnection::Kafka(kafka_conn) => {
2008            let reference_client = SourceReferenceClient::Kafka {
2009                topic: &kafka_conn.topic,
2010            };
2011            retrieved_source_references = reference_client.get_source_references().await?;
2012            let requested_exports = retrieved_source_references
2013                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2014            // There should be exactly one source_export returned
2015            let export = requested_exports.into_element();
2016
2017            format_options = SourceFormatOptions::Kafka {
2018                topic: kafka_conn.topic.clone(),
2019            };
2020            PurifiedSourceExport {
2021                external_reference: export.external_reference,
2022                details: PurifiedExportDetails::Kafka {},
2023            }
2024        }
2025    };
2026
2027    purify_source_format(
2028        &catalog,
2029        format,
2030        &format_options,
2031        envelope,
2032        storage_configuration,
2033    )
2034    .await?;
2035
2036    // Update the external reference in the statement to the resolved fully-qualified
2037    // external reference
2038    *external_reference = Some(purified_export.external_reference.clone());
2039
2040    // Update options in the statement using the purified export details
2041    match &purified_export.details {
2042        PurifiedExportDetails::Postgres { .. } => {
2043            let mut unsupported_cols = vec![];
2044            let postgres::PostgresExportStatementValues {
2045                columns: gen_columns,
2046                constraints: gen_constraints,
2047                text_columns: gen_text_columns,
2048                exclude_columns: gen_exclude_columns,
2049                details: gen_details,
2050                external_reference: _,
2051            } = postgres::generate_source_export_statement_values(
2052                &scx,
2053                purified_export,
2054                &mut unsupported_cols,
2055            )?;
2056            if !unsupported_cols.is_empty() {
2057                unsupported_cols.sort();
2058                Err(PgSourcePurificationError::UnrecognizedTypes {
2059                    cols: unsupported_cols,
2060                })?;
2061            }
2062
2063            if let Some(text_cols_option) = with_options
2064                .iter_mut()
2065                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2066            {
2067                if let Some(gen_text_columns) = gen_text_columns {
2068                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2069                }
2070            }
2071            if let Some(exclude_cols_option) = with_options
2072                .iter_mut()
2073                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2074            {
2075                if let Some(gen_exclude_columns) = gen_exclude_columns {
2076                    exclude_cols_option.value =
2077                        Some(WithOptionValue::Sequence(gen_exclude_columns));
2078                }
2079            }
2080            match columns {
2081                TableFromSourceColumns::Defined(_) => {
2082                    // Purification is what populates the `Defined` variant;
2083                    // reaching this is an internal invariant violation.
2084                    bail_internal!(
2085                        "column definitions cannot be explicitly set for this source type"
2086                    )
2087                }
2088                TableFromSourceColumns::NotSpecified => {
2089                    *columns = TableFromSourceColumns::Defined(gen_columns);
2090                    *constraints = gen_constraints;
2091                }
2092                TableFromSourceColumns::Named(_) => {
2093                    sql_bail!("columns cannot be named for Postgres sources")
2094                }
2095            }
2096            with_options.push(TableFromSourceOption {
2097                name: TableFromSourceOptionName::Details,
2098                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2099                    gen_details.into_proto().encode_to_vec(),
2100                )))),
2101            })
2102        }
2103        PurifiedExportDetails::MySql { .. } => {
2104            let mysql::MySqlExportStatementValues {
2105                columns: gen_columns,
2106                constraints: gen_constraints,
2107                text_columns: gen_text_columns,
2108                exclude_columns: gen_exclude_columns,
2109                details: gen_details,
2110                external_reference: _,
2111            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2112
2113            if let Some(text_cols_option) = with_options
2114                .iter_mut()
2115                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2116            {
2117                if let Some(gen_text_columns) = gen_text_columns {
2118                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2119                }
2120            }
2121            if let Some(exclude_cols_option) = with_options
2122                .iter_mut()
2123                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2124            {
2125                if let Some(gen_exclude_columns) = gen_exclude_columns {
2126                    exclude_cols_option.value =
2127                        Some(WithOptionValue::Sequence(gen_exclude_columns));
2128                }
2129            }
2130            match columns {
2131                TableFromSourceColumns::Defined(_) => {
2132                    // Purification is what populates the `Defined` variant;
2133                    // reaching this is an internal invariant violation.
2134                    bail_internal!(
2135                        "column definitions cannot be explicitly set for this source type"
2136                    )
2137                }
2138                TableFromSourceColumns::NotSpecified => {
2139                    *columns = TableFromSourceColumns::Defined(gen_columns);
2140                    *constraints = gen_constraints;
2141                }
2142                TableFromSourceColumns::Named(_) => {
2143                    sql_bail!("columns cannot be named for MySQL sources")
2144                }
2145            }
2146            with_options.push(TableFromSourceOption {
2147                name: TableFromSourceOptionName::Details,
2148                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2149                    gen_details.into_proto().encode_to_vec(),
2150                )))),
2151            })
2152        }
2153        PurifiedExportDetails::SqlServer { .. } => {
2154            let sql_server::SqlServerExportStatementValues {
2155                columns: gen_columns,
2156                constraints: gen_constraints,
2157                text_columns: gen_text_columns,
2158                excl_columns: gen_excl_columns,
2159                details: gen_details,
2160                external_reference: _,
2161            } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2162
2163            if let Some(text_cols_option) = with_options
2164                .iter_mut()
2165                .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2166            {
2167                if let Some(gen_text_columns) = gen_text_columns {
2168                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2169                }
2170            }
2171            if let Some(exclude_cols_option) = with_options
2172                .iter_mut()
2173                .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2174            {
2175                if let Some(gen_excl_columns) = gen_excl_columns {
2176                    exclude_cols_option.value = Some(WithOptionValue::Sequence(gen_excl_columns));
2177                }
2178            }
2179
2180            match columns {
2181                TableFromSourceColumns::NotSpecified => {
2182                    *columns = TableFromSourceColumns::Defined(gen_columns);
2183                    *constraints = gen_constraints;
2184                }
2185                TableFromSourceColumns::Named(_) => {
2186                    sql_bail!("columns cannot be named for SQL Server sources")
2187                }
2188                TableFromSourceColumns::Defined(_) => {
2189                    // Purification is what populates the `Defined` variant;
2190                    // reaching this is an internal invariant violation.
2191                    bail_internal!(
2192                        "column definitions cannot be explicitly set for this source type"
2193                    )
2194                }
2195            }
2196
2197            with_options.push(TableFromSourceOption {
2198                name: TableFromSourceOptionName::Details,
2199                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2200                    gen_details.into_proto().encode_to_vec(),
2201                )))),
2202            })
2203        }
2204        PurifiedExportDetails::LoadGenerator { .. } => {
2205            let (desc, output) = match purified_export.details {
2206                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2207                _ => bail_internal!("purified export details must be load generator"),
2208            };
2209            // We only determine the table description for multi-output load generator sources here,
2210            // whereas single-output load generators will have their relation description
2211            // determined during statment planning as envelope and format options may affect their
2212            // schema.
2213            if let Some(desc) = desc {
2214                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2215                match columns {
2216                    // Purification is what populates the `Defined` variant;
2217                    // reaching this is an internal invariant violation.
2218                    TableFromSourceColumns::Defined(_) => bail_internal!(
2219                        "column definitions cannot be explicitly set for this source type"
2220                    ),
2221                    TableFromSourceColumns::NotSpecified => {
2222                        *columns = TableFromSourceColumns::Defined(gen_columns);
2223                        *constraints = gen_constraints;
2224                    }
2225                    TableFromSourceColumns::Named(_) => {
2226                        sql_bail!("columns cannot be named for multi-output load generator sources")
2227                    }
2228                }
2229            }
2230            let details = SourceExportStatementDetails::LoadGenerator { output };
2231            with_options.push(TableFromSourceOption {
2232                name: TableFromSourceOptionName::Details,
2233                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2234                    details.into_proto().encode_to_vec(),
2235                )))),
2236            })
2237        }
2238        PurifiedExportDetails::Kafka {} => {
2239            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2240            // format field, so we don't specify any columns or constraints to be stored
2241            // on the statement here. The RelationDesc will be determined during planning.
2242            let details = SourceExportStatementDetails::Kafka {};
2243            with_options.push(TableFromSourceOption {
2244                name: TableFromSourceOptionName::Details,
2245                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2246                    details.into_proto().encode_to_vec(),
2247                )))),
2248            })
2249        }
2250    };
2251
2252    // TODO: We might as well use the retrieved available references to update the source
2253    // available references table in the catalog, so plumb this through.
2254    // available_source_references: retrieved_source_references.available_source_references(),
2255    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2256}
2257
2258enum SourceFormatOptions {
2259    Default,
2260    Kafka { topic: String },
2261}
2262
2263async fn purify_source_format(
2264    catalog: &dyn SessionCatalog,
2265    format: &mut Option<FormatSpecifier<Aug>>,
2266    options: &SourceFormatOptions,
2267    envelope: &Option<SourceEnvelope>,
2268    storage_configuration: &StorageConfiguration,
2269) -> Result<(), PlanError> {
2270    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2271        && !matches!(options, SourceFormatOptions::Kafka { .. })
2272    {
2273        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2274    }
2275
2276    match format.as_mut() {
2277        None => {}
2278        Some(FormatSpecifier::Bare(format)) => {
2279            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2280                .await?;
2281        }
2282
2283        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2284            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2285                .await?;
2286            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2287                .await?;
2288        }
2289    }
2290    Ok(())
2291}
2292
2293async fn purify_source_format_single(
2294    catalog: &dyn SessionCatalog,
2295    format: &mut Format<Aug>,
2296    options: &SourceFormatOptions,
2297    envelope: &Option<SourceEnvelope>,
2298    storage_configuration: &StorageConfiguration,
2299) -> Result<(), PlanError> {
2300    match format {
2301        Format::Avro(schema) => match schema {
2302            AvroSchema::Csr { csr_connection } => {
2303                purify_csr_connection_avro(
2304                    catalog,
2305                    options,
2306                    csr_connection,
2307                    envelope,
2308                    storage_configuration,
2309                )
2310                .await?
2311            }
2312            AvroSchema::InlineSchema { .. } => {}
2313        },
2314        Format::Protobuf(schema) => match schema {
2315            ProtobufSchema::Csr { csr_connection } => {
2316                purify_csr_connection_proto(
2317                    catalog,
2318                    options,
2319                    csr_connection,
2320                    envelope,
2321                    storage_configuration,
2322                )
2323                .await?;
2324            }
2325            ProtobufSchema::InlineSchema { .. } => {}
2326        },
2327        Format::Bytes
2328        | Format::Regex(_)
2329        | Format::Json { .. }
2330        | Format::Text
2331        | Format::Csv { .. } => (),
2332    }
2333    Ok(())
2334}
2335
2336pub fn generate_subsource_statements(
2337    scx: &StatementContext,
2338    source_name: ResolvedItemName,
2339    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2340) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2341    // get the first subsource to determine the connection type
2342    if subsources.is_empty() {
2343        return Ok(vec![]);
2344    }
2345    let (_, purified_export) = subsources
2346        .iter()
2347        .next()
2348        .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2349
2350    let statements = match &purified_export.details {
2351        PurifiedExportDetails::Postgres { .. } => {
2352            crate::pure::postgres::generate_create_subsource_statements(
2353                scx,
2354                source_name,
2355                subsources,
2356            )?
2357        }
2358        PurifiedExportDetails::MySql { .. } => {
2359            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2360        }
2361        PurifiedExportDetails::SqlServer { .. } => {
2362            crate::pure::sql_server::generate_create_subsource_statements(
2363                scx,
2364                source_name,
2365                subsources,
2366            )?
2367        }
2368        PurifiedExportDetails::LoadGenerator { .. } => {
2369            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2370            for (subsource_name, purified_export) in subsources {
2371                let (desc, output) = match purified_export.details {
2372                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2373                    _ => {
2374                        bail_internal!("purified export details must be load generator")
2375                    }
2376                };
2377                let desc = desc.ok_or_else(|| {
2378                    internal_err!(
2379                        "subsources cannot be generated for single-output load generators"
2380                    )
2381                })?;
2382
2383                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2384                let details = SourceExportStatementDetails::LoadGenerator { output };
2385                // Create the subsource statement
2386                let subsource = CreateSubsourceStatement {
2387                    name: subsource_name,
2388                    columns,
2389                    of_source: Some(source_name.clone()),
2390                    // unlike sources that come from an external upstream, we
2391                    // have more leniency to introduce different constraints
2392                    // every time the load generator is run; i.e. we are not as
2393                    // worried about introducing junk data.
2394                    constraints: table_constraints,
2395                    if_not_exists: false,
2396                    with_options: vec![
2397                        CreateSubsourceOption {
2398                            name: CreateSubsourceOptionName::ExternalReference,
2399                            value: Some(WithOptionValue::UnresolvedItemName(
2400                                purified_export.external_reference,
2401                            )),
2402                        },
2403                        CreateSubsourceOption {
2404                            name: CreateSubsourceOptionName::Details,
2405                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2406                                details.into_proto().encode_to_vec(),
2407                            )))),
2408                        },
2409                    ],
2410                };
2411                subsource_stmts.push(subsource);
2412            }
2413
2414            subsource_stmts
2415        }
2416        PurifiedExportDetails::Kafka { .. } => {
2417            // TODO: as part of database-issues#8322, Kafka sources will begin
2418            // producing data––we'll need to understand the schema
2419            // of the output here.
2420            if !subsources.is_empty() {
2421                bail_internal!("Kafka sources do not produce data-bearing subsources");
2422            }
2423            vec![]
2424        }
2425    };
2426    Ok(statements)
2427}
2428
2429async fn purify_csr_connection_proto(
2430    catalog: &dyn SessionCatalog,
2431    options: &SourceFormatOptions,
2432    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2433    envelope: &Option<SourceEnvelope>,
2434    storage_configuration: &StorageConfiguration,
2435) -> Result<(), PlanError> {
2436    let SourceFormatOptions::Kafka { topic } = options else {
2437        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2438    };
2439
2440    let CsrConnectionProtobuf {
2441        seed,
2442        connection: CsrConnection {
2443            connection,
2444            options: _,
2445        },
2446    } = csr_connection;
2447    match seed {
2448        None => {
2449            let scx = StatementContext::new(None, &*catalog);
2450
2451            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2452                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2453                _ => sql_bail!("{} is not a schema registry connection", connection),
2454            };
2455
2456            let ccsr_client = ccsr_connection
2457                .connect(storage_configuration, InTask::No)
2458                .await
2459                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2460
2461            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2462            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2463                .await
2464                .ok();
2465
2466            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2467                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2468            }
2469
2470            *seed = Some(CsrSeedProtobuf { value, key });
2471        }
2472        Some(_) => (),
2473    }
2474
2475    Ok(())
2476}
2477
2478async fn purify_csr_connection_avro(
2479    catalog: &dyn SessionCatalog,
2480    options: &SourceFormatOptions,
2481    csr_connection: &mut CsrConnectionAvro<Aug>,
2482    envelope: &Option<SourceEnvelope>,
2483    storage_configuration: &StorageConfiguration,
2484) -> Result<(), PlanError> {
2485    let SourceFormatOptions::Kafka { topic } = options else {
2486        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2487    };
2488
2489    let CsrConnectionAvro {
2490        connection: CsrConnection { connection, .. },
2491        seed,
2492        key_strategy,
2493        value_strategy,
2494    } = csr_connection;
2495    if seed.is_none() {
2496        let scx = StatementContext::new(None, &*catalog);
2497        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2498            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2499            _ => sql_bail!("{} is not a schema registry connection", connection),
2500        };
2501        let ccsr_client = csr_connection
2502            .connect(storage_configuration, InTask::No)
2503            .await
2504            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2505
2506        let Schema {
2507            key_schema,
2508            value_schema,
2509            key_reference_schemas,
2510            value_reference_schemas,
2511        } = get_remote_csr_schema(
2512            &ccsr_client,
2513            key_strategy.clone().unwrap_or_default(),
2514            value_strategy.clone().unwrap_or_default(),
2515            topic,
2516        )
2517        .await?;
2518        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2519            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2520        }
2521
2522        *seed = Some(CsrSeedAvro {
2523            key_schema,
2524            value_schema,
2525            key_reference_schemas,
2526            value_reference_schemas,
2527        })
2528    }
2529
2530    Ok(())
2531}
2532
2533#[derive(Debug)]
2534pub struct Schema {
2535    pub key_schema: Option<String>,
2536    pub value_schema: String,
2537    /// Reference schemas for the key schema, in dependency order.
2538    pub key_reference_schemas: Vec<String>,
2539    /// Reference schemas for the value schema, in dependency order.
2540    pub value_reference_schemas: Vec<String>,
2541}
2542
2543/// Result of fetching a schema, including any referenced schemas.
2544struct SchemaWithReferences {
2545    /// The primary schema.
2546    schema: String,
2547    /// Reference schemas in dependency order.
2548    references: Vec<String>,
2549}
2550
2551async fn get_schema_with_strategy(
2552    client: &Client,
2553    strategy: ReaderSchemaSelectionStrategy,
2554    subject: &str,
2555) -> Result<Option<SchemaWithReferences>, PlanError> {
2556    match strategy {
2557        ReaderSchemaSelectionStrategy::Latest => {
2558            // Use get_subject_and_references to also fetch referenced schemas
2559            match client.get_subject_and_references(subject).await {
2560                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2561                    schema: primary.schema.raw,
2562                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2563                })),
2564                Err(GetBySubjectError::SubjectNotFound)
2565                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2566                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2567                    schema_lookup: format!("subject {}", subject.quoted()),
2568                    cause: Arc::new(e),
2569                }),
2570            }
2571        }
2572        // It's possible that CSR was provided with an inline strategy, but without a subject
2573        // or schema id to look up, there isn't a clean way to lookup references for the schema.
2574        // This could be done with extra work, but at this point, it isn't clear this is needed.
2575        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2576            schema: raw,
2577            references: vec![],
2578        })),
2579        ReaderSchemaSelectionStrategy::ById(id) => {
2580            match client.get_subject_and_references_by_id(id).await {
2581                Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2582                    schema: primary.schema.raw,
2583                    references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2584                })),
2585                Err(GetBySubjectError::SubjectNotFound)
2586                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2587                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2588                    schema_lookup: format!("subject {}", subject.quoted()),
2589                    cause: Arc::new(e),
2590                }),
2591            }
2592        }
2593    }
2594}
2595
2596async fn get_remote_csr_schema(
2597    ccsr_client: &mz_ccsr::Client,
2598    key_strategy: ReaderSchemaSelectionStrategy,
2599    value_strategy: ReaderSchemaSelectionStrategy,
2600    topic: &str,
2601) -> Result<Schema, PlanError> {
2602    let value_schema_name = format!("{}-value", topic);
2603    let value_result =
2604        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2605    let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2606
2607    let key_subject = format!("{}-key", topic);
2608    let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2609    Ok(Schema {
2610        key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2611        value_schema: value_result.schema,
2612        key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2613        value_reference_schemas: value_result.references,
2614    })
2615}
2616
2617/// Collect protobuf message descriptor from CSR and compile the descriptor.
2618async fn compile_proto(
2619    subject_name: &String,
2620    ccsr_client: &Client,
2621) -> Result<CsrSeedProtobufSchema, PlanError> {
2622    let (primary_subject, dependency_subjects) = ccsr_client
2623        .get_subject_and_references(subject_name)
2624        .await
2625        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2626            schema_lookup: format!("subject {}", subject_name.quoted()),
2627            cause: Arc::new(e),
2628        })?;
2629
2630    // Compile .proto files into a file descriptor set.
2631    let mut source_tree = VirtualSourceTree::new();
2632
2633    // Add well-known types (e.g., google/protobuf/timestamp.proto) to the source
2634    // tree. These are implicitly available to protoc but are typically not
2635    // registered in the schema registry.
2636    source_tree.as_mut().map_well_known_types();
2637
2638    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2639        source_tree.as_mut().add_file(
2640            Path::new(&subject.name),
2641            subject.schema.raw.as_bytes().to_vec(),
2642        );
2643    }
2644    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2645    let fds = db
2646        .as_mut()
2647        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2648        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2649
2650    // Ensure there is exactly one message in the file.
2651    let primary_fd = fds.file(0);
2652    let message_name = match primary_fd.message_type_size() {
2653        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2654        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2655        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2656    };
2657
2658    // Encode the file descriptor set into a SQL byte string.
2659    let bytes = &fds
2660        .serialize()
2661        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2662    let mut schema = String::new();
2663    strconv::format_bytes(&mut schema, bytes);
2664
2665    Ok(CsrSeedProtobufSchema {
2666        schema,
2667        message_name,
2668    })
2669}
2670
2671const MZ_NOW_NAME: &str = "mz_now";
2672const MZ_NOW_SCHEMA: &str = "mz_catalog";
2673
2674/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2675/// references to ids appear or disappear during the purification.
2676///
2677/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2678/// this function is not making any network calls.
2679pub fn purify_create_materialized_view_options(
2680    catalog: impl SessionCatalog,
2681    mz_now: Option<Timestamp>,
2682    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2683    resolved_ids: &mut ResolvedIds,
2684) {
2685    // 0. Preparations:
2686    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2687    let (mz_now_id, mz_now_expr) = {
2688        let item = catalog
2689            .resolve_function(&PartialItemName {
2690                database: None,
2691                schema: Some(MZ_NOW_SCHEMA.to_string()),
2692                item: MZ_NOW_NAME.to_string(),
2693            })
2694            .expect("we should be able to resolve mz_now");
2695        (
2696            item.id(),
2697            Expr::Function(Function {
2698                name: ResolvedItemName::Item {
2699                    id: item.id(),
2700                    qualifiers: item.name().qualifiers.clone(),
2701                    full_name: catalog.resolve_full_name(item.name()),
2702                    print_id: false,
2703                    version: RelationVersionSelector::Latest,
2704                },
2705                args: FunctionArgs::Args {
2706                    args: Vec::new(),
2707                    order_by: Vec::new(),
2708                },
2709                filter: None,
2710                over: None,
2711                distinct: false,
2712            }),
2713        )
2714    };
2715    // Prepare the `mz_timestamp` type.
2716    let (mz_timestamp_id, mz_timestamp_type) = {
2717        let item = catalog.get_system_type("mz_timestamp");
2718        let full_name = catalog.resolve_full_name(item.name());
2719        (
2720            item.id(),
2721            ResolvedDataType::Named {
2722                id: item.id(),
2723                qualifiers: item.name().qualifiers.clone(),
2724                full_name,
2725                modifiers: vec![],
2726                print_id: true,
2727            },
2728        )
2729    };
2730
2731    let mut introduced_mz_timestamp = false;
2732
2733    for option in cmvs.with_options.iter_mut() {
2734        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2735        if matches!(
2736            option.value,
2737            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2738        ) {
2739            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2740                RefreshAtOptionValue {
2741                    time: mz_now_expr.clone(),
2742                },
2743            )));
2744        }
2745
2746        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2747        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2748            RefreshEveryOptionValue { aligned_to, .. },
2749        ))) = &mut option.value
2750        {
2751            if aligned_to.is_none() {
2752                *aligned_to = Some(mz_now_expr.clone());
2753            }
2754        }
2755
2756        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2757        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2758        match &mut option.value {
2759            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2760                time,
2761            }))) => {
2762                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2763                visitor.visit_expr_mut(time);
2764                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2765            }
2766            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2767                RefreshEveryOptionValue {
2768                    interval: _,
2769                    aligned_to: Some(aligned_to),
2770                },
2771            ))) => {
2772                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2773                visitor.visit_expr_mut(aligned_to);
2774                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2775            }
2776            _ => {}
2777        }
2778    }
2779
2780    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2781    if !cmvs.with_options.iter().any(|o| {
2782        matches!(
2783            o,
2784            MaterializedViewOption {
2785                value: Some(WithOptionValue::Refresh(..)),
2786                ..
2787            }
2788        )
2789    }) {
2790        cmvs.with_options.push(MaterializedViewOption {
2791            name: MaterializedViewOptionName::Refresh,
2792            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2793        })
2794    }
2795
2796    // 5. Attend to `resolved_ids`: The purification might have
2797    // - added references to `mz_timestamp`;
2798    // - removed references to `mz_now`.
2799    if introduced_mz_timestamp {
2800        resolved_ids.add_item(mz_timestamp_id);
2801    }
2802    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2803    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2804    // for `mz_now()` everywhere.
2805    let mut visitor = ExprContainsTemporalVisitor::new();
2806    visitor.visit_create_materialized_view_statement(cmvs);
2807    if !visitor.contains_temporal {
2808        resolved_ids.remove_item(&mz_now_id);
2809    }
2810}
2811
2812/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2813/// after purification.
2814pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2815    match &mvo.value {
2816        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2817            let mut visitor = ExprContainsTemporalVisitor::new();
2818            visitor.visit_expr(time);
2819            visitor.contains_temporal
2820        }
2821        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2822            interval: _,
2823            aligned_to: Some(aligned_to),
2824        }))) => {
2825            let mut visitor = ExprContainsTemporalVisitor::new();
2826            visitor.visit_expr(aligned_to);
2827            visitor.contains_temporal
2828        }
2829        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2830            interval: _,
2831            aligned_to: None,
2832        }))) => {
2833            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2834            // `ALIGNED TO` to `mz_now()`.
2835            true
2836        }
2837        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2838            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2839            true
2840        }
2841        _ => false,
2842    }
2843}
2844
2845/// Determines whether the AST involves `mz_now()`.
2846struct ExprContainsTemporalVisitor {
2847    pub contains_temporal: bool,
2848}
2849
2850impl ExprContainsTemporalVisitor {
2851    pub fn new() -> ExprContainsTemporalVisitor {
2852        ExprContainsTemporalVisitor {
2853            contains_temporal: false,
2854        }
2855    }
2856}
2857
2858impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2859    fn visit_function(&mut self, func: &Function<Aug>) {
2860        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2861        visit_function(self, func);
2862    }
2863}
2864
2865struct MzNowPurifierVisitor {
2866    pub mz_now: Option<Timestamp>,
2867    pub mz_timestamp_type: ResolvedDataType,
2868    pub introduced_mz_timestamp: bool,
2869}
2870
2871impl MzNowPurifierVisitor {
2872    pub fn new(
2873        mz_now: Option<Timestamp>,
2874        mz_timestamp_type: ResolvedDataType,
2875    ) -> MzNowPurifierVisitor {
2876        MzNowPurifierVisitor {
2877            mz_now,
2878            mz_timestamp_type,
2879            introduced_mz_timestamp: false,
2880        }
2881    }
2882}
2883
2884impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2885    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2886        match expr {
2887            Expr::Function(Function {
2888                name:
2889                    ResolvedItemName::Item {
2890                        full_name: FullItemName { item, .. },
2891                        ..
2892                    },
2893                ..
2894            }) if item == &MZ_NOW_NAME.to_string() => {
2895                let mz_now = self.mz_now.expect(
2896                    "we should have chosen a timestamp if the expression contains mz_now()",
2897                );
2898                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2899                // not alter the type of the expression.
2900                *expr = Expr::Cast {
2901                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2902                    data_type: self.mz_timestamp_type.clone(),
2903                };
2904                self.introduced_mz_timestamp = true;
2905            }
2906            _ => visit_expr_mut(self, expr),
2907        }
2908    }
2909}