mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::error::ErrorExt;
27use mz_ore::future::InTask;
28use mz_ore::iter::IteratorExt;
29use mz_ore::str::StrExt;
30use mz_ore::{assert_none, soft_panic_or_log};
31use mz_postgres_util::desc::PostgresTableDesc;
32use mz_postgres_util::replication::WalLevel;
33use mz_postgres_util::tunnel::PostgresFlavor;
34use mz_proto::RustType;
35use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
36use mz_sql_parser::ast::display::AstDisplay;
37use mz_sql_parser::ast::visit::{Visit, visit_function};
38use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
39use mz_sql_parser::ast::{
40    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
41    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
42    CreateSinkStatement, CreateSubsourceOption, CreateSubsourceOptionName,
43    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
44    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
45    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
46    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
47    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
48    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
49    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
50    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
51    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
52};
53use mz_sql_server_util::desc::SqlServerTableDesc;
54use mz_storage_types::configuration::StorageConfiguration;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::connections::{Connection, PostgresConnection};
57use mz_storage_types::errors::ContextCreationError;
58use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
59use mz_storage_types::sources::mysql::MySqlSourceDetails;
60use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
61use mz_storage_types::sources::{
62    GenericSourceConnection, PostgresSourceConnection, SourceConnection, SourceDesc,
63    SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81    ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::SqlServerSourcePurificationError;
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103    external_reference: UnresolvedItemName,
104    name: UnresolvedItemName,
105    meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        f.debug_struct("RequestedSourceExport")
111            .field("external_reference", &self.external_reference)
112            .field("name", &self.name)
113            .field("meta", &self.meta)
114            .finish()
115    }
116}
117
118impl<T> RequestedSourceExport<T> {
119    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120        RequestedSourceExport {
121            external_reference: self.external_reference,
122            name: self.name,
123            meta: new_meta,
124        }
125    }
126}
127
128/// Generates a subsource name by prepending source schema name if present
129///
130/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
131/// so that it's generated in the same schema as source
132fn source_export_name_gen(
133    source_name: &UnresolvedItemName,
134    subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137    partial.item = subsource_name.to_string();
138    Ok(UnresolvedItemName::from(partial))
139}
140
141// TODO(database-issues#8620): Remove once subsources are removed
142/// Validates the requested subsources do not have name conflicts with each other
143/// and that the same upstream table is not referenced multiple times.
144fn validate_source_export_names<T>(
145    requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147    // This condition would get caught during the catalog transaction, but produces a
148    // vague, non-contextual error. Instead, error here so we can suggest to the user
149    // how to fix the problem.
150    if let Some(name) = requested_source_exports
151        .iter()
152        .map(|subsource| &subsource.name)
153        .duplicates()
154        .next()
155        .cloned()
156    {
157        let mut upstream_references: Vec<_> = requested_source_exports
158            .into_iter()
159            .filter_map(|subsource| {
160                if &subsource.name == &name {
161                    Some(subsource.external_reference.clone())
162                } else {
163                    None
164                }
165            })
166            .collect();
167
168        upstream_references.sort();
169
170        Err(PlanError::SubsourceNameConflict {
171            name,
172            upstream_references,
173        })?;
174    }
175
176    // We disallow subsource statements from referencing the same upstream table, but we allow
177    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
178    // purification will only provide 1 `requested_source_export`, we can leave this here without
179    // needing to differentiate between the two types of statements.
180    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
181    if let Some(name) = requested_source_exports
182        .iter()
183        .map(|export| &export.external_reference)
184        .duplicates()
185        .next()
186        .cloned()
187    {
188        let mut target_names: Vec<_> = requested_source_exports
189            .into_iter()
190            .filter_map(|export| {
191                if &export.external_reference == &name {
192                    Some(export.name.clone())
193                } else {
194                    None
195                }
196            })
197            .collect();
198
199        target_names.sort();
200
201        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202    }
203
204    Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209    PurifiedCreateSource {
210        create_progress_subsource_stmt: CreateSubsourceStatement<Aug>,
211        create_source_stmt: CreateSourceStatement<Aug>,
212        // Map of subsource names to external details
213        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
214        /// All the available upstream references that can be added as tables
215        /// to this primary source.
216        available_source_references: SourceReferences,
217    },
218    PurifiedAlterSource {
219        alter_source_stmt: AlterSourceStatement<Aug>,
220    },
221    PurifiedAlterSourceAddSubsources {
222        // This just saves us an annoying catalog lookup
223        source_name: ResolvedItemName,
224        /// Options that we will need the values of to update the source's
225        /// definition.
226        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
227        // Map of subsource names to external details
228        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
229    },
230    PurifiedAlterSourceRefreshReferences {
231        source_name: ResolvedItemName,
232        /// The updated available upstream references for the primary source.
233        available_source_references: SourceReferences,
234    },
235    PurifiedCreateSink(CreateSinkStatement<Aug>),
236    PurifiedCreateTableFromSource {
237        stmt: CreateTableFromSourceStatement<Aug>,
238    },
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct PurifiedSourceExport {
243    pub external_reference: UnresolvedItemName,
244    pub details: PurifiedExportDetails,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum PurifiedExportDetails {
249    MySql {
250        table: MySqlTableDesc,
251        text_columns: Option<Vec<Ident>>,
252        exclude_columns: Option<Vec<Ident>>,
253        initial_gtid_set: String,
254    },
255    Postgres {
256        table: PostgresTableDesc,
257        text_columns: Option<Vec<Ident>>,
258    },
259    SqlServer {
260        table: SqlServerTableDesc,
261        text_columns: Option<Vec<Ident>>,
262        excl_columns: Option<Vec<Ident>>,
263        capture_instance: Arc<str>,
264    },
265    Kafka {},
266    LoadGenerator {
267        table: Option<RelationDesc>,
268        output: LoadGeneratorOutput,
269    },
270}
271
272/// Purifies a statement, removing any dependencies on external state.
273///
274/// See the section on [purification](crate#purification) in the crate
275/// documentation for details.
276///
277/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
278/// handled by [purify_create_materialized_view_options] instead.
279/// This could be made more consistent by a refactoring discussed here:
280/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
281pub async fn purify_statement(
282    catalog: impl SessionCatalog,
283    now: u64,
284    stmt: Statement<Aug>,
285    storage_configuration: &StorageConfiguration,
286) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
287    match stmt {
288        Statement::CreateSource(stmt) => {
289            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
290            (
291                purify_create_source(catalog, now, stmt, storage_configuration).await,
292                cluster_id,
293            )
294        }
295        Statement::AlterSource(stmt) => (
296            purify_alter_source(catalog, stmt, storage_configuration).await,
297            None,
298        ),
299        Statement::CreateSink(stmt) => {
300            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
301            (
302                purify_create_sink(catalog, stmt, storage_configuration).await,
303                cluster_id,
304            )
305        }
306        Statement::CreateTableFromSource(stmt) => (
307            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
308            None,
309        ),
310        o => unreachable!("{:?} does not need to be purified", o),
311    }
312}
313
314/// Injects `DOC ON` comments into all Avro formats that are using a schema
315/// registry by finding all SQL `COMMENT`s that are attached to the sink's
316/// underlying materialized view (as well as any types referenced by that
317/// underlying materialized view).
318pub(crate) fn purify_create_sink_avro_doc_on_options(
319    catalog: &dyn SessionCatalog,
320    from_id: CatalogItemId,
321    format: &mut Option<FormatSpecifier<Aug>>,
322) -> Result<(), PlanError> {
323    // Collect all objects referenced by the sink.
324    let from = catalog.get_item(&from_id);
325    let object_ids = from
326        .references()
327        .items()
328        .copied()
329        .chain_one(from.id())
330        .collect::<Vec<_>>();
331
332    // Collect all Avro formats that use a schema registry, as well as a set of
333    // all identifiers named in user-provided `DOC ON` options.
334    let mut avro_format_options = vec![];
335    for_each_format(format, |doc_on_schema, fmt| match fmt {
336        Format::Avro(AvroSchema::InlineSchema { .. })
337        | Format::Bytes
338        | Format::Csv { .. }
339        | Format::Json { .. }
340        | Format::Protobuf(..)
341        | Format::Regex(..)
342        | Format::Text => (),
343        Format::Avro(AvroSchema::Csr {
344            csr_connection: CsrConnectionAvro { connection, .. },
345        }) => {
346            avro_format_options.push((doc_on_schema, &mut connection.options));
347        }
348    });
349
350    // For each Avro format in the sink, inject the appropriate `DOC ON` options
351    // for each item referenced by the sink.
352    for (for_schema, options) in avro_format_options {
353        let user_provided_comments = options
354            .iter()
355            .filter_map(|CsrConfigOption { name, .. }| match name {
356                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
357                _ => None,
358            })
359            .collect::<BTreeSet<_>>();
360
361        // Adding existing comments if not already provided by user
362        for object_id in &object_ids {
363            // Always add comments to the latest version of the item.
364            let item = catalog
365                .get_item(object_id)
366                .at_version(RelationVersionSelector::Latest);
367            let full_name = catalog.resolve_full_name(item.name());
368            let full_resolved_name = ResolvedItemName::Item {
369                id: *object_id,
370                qualifiers: item.name().qualifiers.clone(),
371                full_name: full_name.clone(),
372                print_id: !matches!(
373                    item.item_type(),
374                    CatalogItemType::Func | CatalogItemType::Type
375                ),
376                version: RelationVersionSelector::Latest,
377            };
378
379            if let Some(comments_map) = catalog.get_item_comments(object_id) {
380                // Attach comment for the item itself, if the user has not
381                // already provided an overriding `DOC ON` option for the item.
382                let doc_on_item_key = AvroDocOn {
383                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384                    for_schema,
385                };
386                if !user_provided_comments.contains(&doc_on_item_key) {
387                    if let Some(root_comment) = comments_map.get(&None) {
388                        options.push(CsrConfigOption {
389                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391                                root_comment.clone(),
392                            ))),
393                        });
394                    }
395                }
396
397                // Attach comment for each column in the item, if the user has
398                // not already provided an overriding `DOC ON` option for the
399                // column.
400                if let Ok(desc) = item.desc(&full_name) {
401                    for (pos, column_name) in desc.iter_names().enumerate() {
402                        let comment = comments_map.get(&Some(pos + 1));
403                        if let Some(comment_str) = comment {
404                            let doc_on_column_key = AvroDocOn {
405                                identifier: DocOnIdentifier::Column(ColumnName {
406                                    relation: full_resolved_name.clone(),
407                                    column: ResolvedColumnReference::Column {
408                                        name: column_name.to_owned(),
409                                        index: pos,
410                                    },
411                                }),
412                                for_schema,
413                            };
414                            if !user_provided_comments.contains(&doc_on_column_key) {
415                                options.push(CsrConfigOption {
416                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
417                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
418                                        Value::String(comment_str.clone()),
419                                    )),
420                                });
421                            }
422                        }
423                    }
424                }
425            }
426        }
427    }
428
429    Ok(())
430}
431
432/// Checks that the sink described in the statement can connect to its external
433/// resources.
434///
435/// We must not leave any state behind in the Kafka broker, so just ensure that
436/// we can connect. This means we don't ensure that we can create the topic and
437/// introduces TOCTOU errors, but creating an inoperable sink is infinitely
438/// preferable to leaking state in users' environments.
439async fn purify_create_sink(
440    catalog: impl SessionCatalog,
441    mut create_sink_stmt: CreateSinkStatement<Aug>,
442    storage_configuration: &StorageConfiguration,
443) -> Result<PurifiedStatement, PlanError> {
444    // General purification
445    let CreateSinkStatement {
446        connection,
447        format,
448        with_options,
449        name: _,
450        in_cluster: _,
451        if_not_exists: _,
452        from,
453        envelope: _,
454    } = &mut create_sink_stmt;
455
456    // The list of options that the user is allowed to specify.
457    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
458
459    if let Some(op) = with_options
460        .iter()
461        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
462    {
463        sql_bail!(
464            "CREATE SINK...WITH ({}..) is not allowed",
465            op.name.to_ast_string_simple(),
466        )
467    }
468
469    match &connection {
470        CreateSinkConnection::Kafka {
471            connection,
472            options,
473            key: _,
474            headers: _,
475        } => {
476            let scx = StatementContext::new(None, &catalog);
477            let connection = {
478                let item = scx.get_item_by_resolved_name(connection)?;
479                // Get Kafka connection
480                match item.connection()? {
481                    Connection::Kafka(connection) => {
482                        connection.clone().into_inline_connection(scx.catalog)
483                    }
484                    _ => sql_bail!(
485                        "{} is not a kafka connection",
486                        scx.catalog.resolve_full_name(item.name())
487                    ),
488                }
489            };
490
491            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
492
493            if extracted_options.legacy_ids == Some(true) {
494                sql_bail!("LEGACY IDs option is not supported");
495            }
496
497            let client: AdminClient<_> = connection
498                .create_with_context(
499                    storage_configuration,
500                    MzClientContext::default(),
501                    &BTreeMap::new(),
502                    InTask::No,
503                )
504                .await
505                .map_err(|e| {
506                    // anyhow doesn't support Clone, so not trivial to move into PlanError
507                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
508                })?;
509
510            let metadata = client
511                .inner()
512                .fetch_metadata(
513                    None,
514                    storage_configuration
515                        .parameters
516                        .kafka_timeout_config
517                        .fetch_metadata_timeout,
518                )
519                .map_err(|e| {
520                    KafkaSinkPurificationError::AdminClientError(Arc::new(
521                        ContextCreationError::KafkaError(e),
522                    ))
523                })?;
524
525            if metadata.brokers().len() == 0 {
526                Err(KafkaSinkPurificationError::ZeroBrokers)?;
527            }
528        }
529    }
530
531    let mut csr_connection_ids = BTreeSet::new();
532    for_each_format(format, |_, fmt| match fmt {
533        Format::Avro(AvroSchema::InlineSchema { .. })
534        | Format::Bytes
535        | Format::Csv { .. }
536        | Format::Json { .. }
537        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
538        | Format::Regex(..)
539        | Format::Text => (),
540        Format::Avro(AvroSchema::Csr {
541            csr_connection: CsrConnectionAvro { connection, .. },
542        })
543        | Format::Protobuf(ProtobufSchema::Csr {
544            csr_connection: CsrConnectionProtobuf { connection, .. },
545        }) => {
546            csr_connection_ids.insert(*connection.connection.item_id());
547        }
548    });
549
550    let scx = StatementContext::new(None, &catalog);
551    for csr_connection_id in csr_connection_ids {
552        let connection = {
553            let item = scx.get_item(&csr_connection_id);
554            // Get Kafka connection
555            match item.connection()? {
556                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
557                _ => Err(CsrPurificationError::NotCsrConnection(
558                    scx.catalog.resolve_full_name(item.name()),
559                ))?,
560            }
561        };
562
563        let client = connection
564            .connect(storage_configuration, InTask::No)
565            .await
566            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
567
568        client
569            .list_subjects()
570            .await
571            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
572    }
573
574    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
575
576    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
577}
578
579/// Runs a function on each format within the format specifier.
580///
581/// The function is provided with a `DocOnSchema` that indicates whether the
582/// format is for the key, value, or both.
583///
584// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
585fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
586where
587    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
588{
589    match format {
590        None => (),
591        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
592        Some(FormatSpecifier::KeyValue { key, value }) => {
593            f(DocOnSchema::KeyOnly, key);
594            f(DocOnSchema::ValueOnly, value);
595        }
596    }
597}
598
599/// Defines whether purification should enforce that at least one valid source
600/// reference is provided on the provided statement.
601#[derive(Debug, Copy, Clone, PartialEq, Eq)]
602pub(crate) enum SourceReferencePolicy {
603    /// Don't allow source references to be provided. This is used for
604    /// enforcing that `CREATE SOURCE` statements don't create subsources.
605    NotAllowed,
606    /// Allow empty references, such as when creating a source that
607    /// will have tables added afterwards.
608    Optional,
609    /// Require that at least one reference is resolved to an upstream
610    /// object.
611    Required,
612}
613
614async fn purify_create_source(
615    catalog: impl SessionCatalog,
616    now: u64,
617    mut create_source_stmt: CreateSourceStatement<Aug>,
618    storage_configuration: &StorageConfiguration,
619) -> Result<PurifiedStatement, PlanError> {
620    let CreateSourceStatement {
621        name: source_name,
622        connection: source_connection,
623        format,
624        envelope,
625        include_metadata,
626        external_references,
627        progress_subsource,
628        ..
629    } = &mut create_source_stmt;
630
631    if let Some(DeferredItemName::Named(_)) = progress_subsource {
632        sql_bail!("Cannot manually ID qualify progress subsource")
633    }
634
635    let mut requested_subsource_map = BTreeMap::new();
636
637    let progress_desc = match &source_connection {
638        CreateSourceConnection::Kafka { .. } => {
639            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
640        }
641        CreateSourceConnection::Postgres { .. } | CreateSourceConnection::Yugabyte { .. } => {
642            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
643        }
644        CreateSourceConnection::SqlServer { .. } => {
645            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
646        }
647        CreateSourceConnection::MySql { .. } => {
648            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
649        }
650        CreateSourceConnection::LoadGenerator { .. } => {
651            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
652        }
653    };
654    let scx = StatementContext::new(None, &catalog);
655
656    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
657    // to add tables after this source is created we might need to enforce that
658    // auto-generated subsources are created or not created by this source statement.
659    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
660        && scx.catalog.system_vars().force_source_table_syntax()
661    {
662        SourceReferencePolicy::NotAllowed
663    } else if scx.catalog.system_vars().enable_create_table_from_source() {
664        SourceReferencePolicy::Optional
665    } else {
666        SourceReferencePolicy::Required
667    };
668
669    let mut format_options = SourceFormatOptions::Default;
670
671    let retrieved_source_references: RetrievedSourceReferences;
672
673    match source_connection {
674        CreateSourceConnection::Kafka {
675            connection,
676            options: base_with_options,
677            ..
678        } => {
679            if let Some(external_references) = external_references {
680                Err(KafkaSourcePurificationError::ReferencedSubsources(
681                    external_references.clone(),
682                ))?;
683            }
684
685            let connection = {
686                let item = scx.get_item_by_resolved_name(connection)?;
687                // Get Kafka connection
688                match item.connection()? {
689                    Connection::Kafka(connection) => {
690                        connection.clone().into_inline_connection(&catalog)
691                    }
692                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
693                        scx.catalog.resolve_full_name(item.name()),
694                    ))?,
695                }
696            };
697
698            let extracted_options: KafkaSourceConfigOptionExtracted =
699                base_with_options.clone().try_into()?;
700
701            let topic = extracted_options
702                .topic
703                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
704
705            let consumer = connection
706                .create_with_context(
707                    storage_configuration,
708                    MzClientContext::default(),
709                    &BTreeMap::new(),
710                    InTask::No,
711                )
712                .await
713                .map_err(|e| {
714                    // anyhow doesn't support Clone, so not trivial to move into PlanError
715                    KafkaSourcePurificationError::KafkaConsumerError(
716                        e.display_with_causes().to_string(),
717                    )
718                })?;
719            let consumer = Arc::new(consumer);
720
721            match (
722                extracted_options.start_offset,
723                extracted_options.start_timestamp,
724            ) {
725                (None, None) => {
726                    // Validate that the topic at least exists.
727                    kafka_util::ensure_topic_exists(
728                        Arc::clone(&consumer),
729                        &topic,
730                        storage_configuration
731                            .parameters
732                            .kafka_timeout_config
733                            .fetch_metadata_timeout,
734                    )
735                    .await?;
736                }
737                (Some(_), Some(_)) => {
738                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
739                }
740                (Some(start_offsets), None) => {
741                    // Validate the start offsets.
742                    kafka_util::validate_start_offsets(
743                        Arc::clone(&consumer),
744                        &topic,
745                        start_offsets,
746                        storage_configuration
747                            .parameters
748                            .kafka_timeout_config
749                            .fetch_metadata_timeout,
750                    )
751                    .await?;
752                }
753                (None, Some(time_offset)) => {
754                    // Translate `START TIMESTAMP` to a start offset.
755                    let start_offsets = kafka_util::lookup_start_offsets(
756                        Arc::clone(&consumer),
757                        &topic,
758                        time_offset,
759                        now,
760                        storage_configuration
761                            .parameters
762                            .kafka_timeout_config
763                            .fetch_metadata_timeout,
764                    )
765                    .await?;
766
767                    base_with_options.retain(|val| {
768                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
769                    });
770                    base_with_options.push(KafkaSourceConfigOption {
771                        name: KafkaSourceConfigOptionName::StartOffset,
772                        value: Some(WithOptionValue::Sequence(
773                            start_offsets
774                                .iter()
775                                .map(|offset| {
776                                    WithOptionValue::Value(Value::Number(offset.to_string()))
777                                })
778                                .collect(),
779                        )),
780                    });
781                }
782            }
783
784            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
785            retrieved_source_references = reference_client.get_source_references().await?;
786
787            format_options = SourceFormatOptions::Kafka { topic };
788        }
789        source_connection @ CreateSourceConnection::Postgres { .. }
790        | source_connection @ CreateSourceConnection::Yugabyte { .. } => {
791            let (source_flavor, connection, options) = match source_connection {
792                CreateSourceConnection::Postgres {
793                    connection,
794                    options,
795                } => (PostgresFlavor::Vanilla, connection, options),
796                CreateSourceConnection::Yugabyte {
797                    connection,
798                    options,
799                } => (PostgresFlavor::Yugabyte, connection, options),
800                _ => unreachable!(),
801            };
802            let connection = {
803                let item = scx.get_item_by_resolved_name(connection)?;
804                match item.connection().map_err(PlanError::from)? {
805                    Connection::Postgres(connection) => {
806                        let connection = connection.clone().into_inline_connection(&catalog);
807                        if connection.flavor != source_flavor {
808                            match source_flavor {
809                                PostgresFlavor::Vanilla => {
810                                    Err(PgSourcePurificationError::NotPgConnection(
811                                        scx.catalog.resolve_full_name(item.name()),
812                                    ))
813                                }
814                                PostgresFlavor::Yugabyte => {
815                                    Err(PgSourcePurificationError::NotYugabyteConnection(
816                                        scx.catalog.resolve_full_name(item.name()),
817                                    ))
818                                }
819                            }
820                        } else {
821                            Ok(connection)
822                        }
823                    }
824                    _ => match source_flavor {
825                        PostgresFlavor::Vanilla => Err(PgSourcePurificationError::NotPgConnection(
826                            scx.catalog.resolve_full_name(item.name()),
827                        )),
828                        PostgresFlavor::Yugabyte => {
829                            Err(PgSourcePurificationError::NotYugabyteConnection(
830                                scx.catalog.resolve_full_name(item.name()),
831                            ))
832                        }
833                    },
834                }
835            }?;
836            let crate::plan::statement::PgConfigOptionExtracted {
837                publication,
838                text_columns,
839                details,
840                ..
841            } = options.clone().try_into()?;
842            let publication =
843                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
844
845            if details.is_some() {
846                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
847            }
848
849            // verify that we can connect upstream and snapshot publication metadata
850            let config = connection
851                .config(
852                    &storage_configuration.connection_context.secrets_reader,
853                    storage_configuration,
854                    InTask::No,
855                )
856                .await?;
857
858            let client = config
859                .connect(
860                    "postgres_purification",
861                    &storage_configuration.connection_context.ssh_tunnel_manager,
862                )
863                .await?;
864
865            let wal_level = mz_postgres_util::get_wal_level(&client).await?;
866
867            if wal_level < WalLevel::Logical {
868                Err(PgSourcePurificationError::InsufficientWalLevel { wal_level })?;
869            }
870
871            let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
872
873            if max_wal_senders < 1 {
874                Err(PgSourcePurificationError::ReplicationDisabled)?;
875            }
876
877            let available_replication_slots =
878                mz_postgres_util::available_replication_slots(&client).await?;
879
880            // We need 1 replication slot for the snapshots and 1 for the continuing replication
881            if available_replication_slots < 2 {
882                Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 2 })?;
883            }
884
885            let reference_client = SourceReferenceClient::Postgres {
886                client: &client,
887                publication: &publication,
888                database: &connection.database,
889            };
890            retrieved_source_references = reference_client.get_source_references().await?;
891
892            let postgres::PurifiedSourceExports {
893                source_exports: subsources,
894                normalized_text_columns,
895            } = postgres::purify_source_exports(
896                &client,
897                &config,
898                &retrieved_source_references,
899                external_references,
900                text_columns,
901                source_name,
902                &reference_policy,
903            )
904            .await?;
905
906            if let Some(text_cols_option) = options
907                .iter_mut()
908                .find(|option| option.name == PgConfigOptionName::TextColumns)
909            {
910                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
911            }
912
913            requested_subsource_map.extend(subsources);
914
915            // Record the active replication timeline_id to allow detection of a future upstream
916            // point-in-time-recovery that will put the source into an error state.
917            let replication_client = config
918                .connect_replication(&storage_configuration.connection_context.ssh_tunnel_manager)
919                .await?;
920            let timeline_id = mz_postgres_util::get_timeline_id(&replication_client).await?;
921
922            // Remove any old detail references
923            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
924            let details = PostgresSourcePublicationDetails {
925                slot: format!(
926                    "materialize_{}",
927                    Uuid::new_v4().to_string().replace('-', "")
928                ),
929                timeline_id: Some(timeline_id),
930                database: connection.database,
931            };
932            options.push(PgConfigOption {
933                name: PgConfigOptionName::Details,
934                value: Some(WithOptionValue::Value(Value::String(hex::encode(
935                    details.into_proto().encode_to_vec(),
936                )))),
937            })
938        }
939        CreateSourceConnection::SqlServer {
940            connection,
941            options,
942        } => {
943            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
944
945            // Connect to the upstream SQL Server instance so we can validate
946            // we're compatible with CDC.
947            let connection_item = scx.get_item_by_resolved_name(connection)?;
948            let connection = match connection_item.connection()? {
949                Connection::SqlServer(connection) => {
950                    connection.clone().into_inline_connection(&catalog)
951                }
952                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
953                    scx.catalog.resolve_full_name(connection_item.name()),
954                ))?,
955            };
956            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
957                details,
958                text_columns,
959                exclude_columns,
960                seen: _,
961            } = options.clone().try_into()?;
962
963            if details.is_some() {
964                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
965            }
966
967            let config = connection
968                .resolve_config(
969                    &storage_configuration.connection_context.secrets_reader,
970                    storage_configuration,
971                    InTask::No,
972                )
973                .await?;
974            let mut client = mz_sql_server_util::Client::connect(config).await?;
975
976            // Ensure the upstream SQL Server instance is configured to allow CDC.
977            //
978            // Run all of the checks necessary and collect the errors to provide the best
979            // guidance as to which system settings need to be enabled.
980            let mut replication_errors = vec![];
981            for error in [
982                mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
983                mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
984            ] {
985                match error {
986                    Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
987                        name,
988                        expected,
989                        actual,
990                    }) => replication_errors.push((name, expected, actual)),
991                    Err(other) => Err(other)?,
992                    Ok(()) => (),
993                }
994            }
995            if !replication_errors.is_empty() {
996                Err(SqlServerSourcePurificationError::ReplicationSettingsError(
997                    replication_errors,
998                ))?;
999            }
1000
1001            // We've validated that CDC is configured for the system, now let's
1002            // purify the individual exports (i.e. subsources).
1003            let database: Arc<str> = connection.database.into();
1004            let reference_client = SourceReferenceClient::SqlServer {
1005                client: &mut client,
1006                database: Arc::clone(&database),
1007            };
1008            retrieved_source_references = reference_client.get_source_references().await?;
1009            tracing::debug!(?retrieved_source_references, "got source references");
1010
1011            let purified_source_exports = sql_server::purify_source_exports(
1012                &*database,
1013                &mut client,
1014                &retrieved_source_references,
1015                external_references,
1016                &text_columns,
1017                &exclude_columns,
1018                source_name,
1019                &reference_policy,
1020            )
1021            .await?;
1022
1023            let sql_server::PurifiedSourceExports {
1024                source_exports,
1025                normalized_text_columns,
1026                normalized_excl_columns,
1027            } = purified_source_exports;
1028
1029            // Update our set of requested source exports.
1030            requested_subsource_map.extend(source_exports);
1031
1032            // Reset the 'DETAILS' to ensure they're empty, we don't use them
1033            // in the SQL Server source.
1034            let details = SqlServerSourceExtras {};
1035            options.retain(|SqlServerConfigOption { name, .. }| {
1036                name != &SqlServerConfigOptionName::Details
1037            });
1038            options.push(SqlServerConfigOption {
1039                name: SqlServerConfigOptionName::Details,
1040                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1041                    details.into_proto().encode_to_vec(),
1042                )))),
1043            });
1044
1045            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1046            if let Some(text_cols_option) = options
1047                .iter_mut()
1048                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1049            {
1050                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1051            }
1052            if let Some(excl_cols_option) = options
1053                .iter_mut()
1054                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1055            {
1056                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1057            }
1058        }
1059        CreateSourceConnection::MySql {
1060            connection,
1061            options,
1062        } => {
1063            let connection_item = scx.get_item_by_resolved_name(connection)?;
1064            let connection = match connection_item.connection()? {
1065                Connection::MySql(connection) => {
1066                    connection.clone().into_inline_connection(&catalog)
1067                }
1068                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1069                    scx.catalog.resolve_full_name(connection_item.name()),
1070                ))?,
1071            };
1072            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1073                details,
1074                text_columns,
1075                exclude_columns,
1076                seen: _,
1077            } = options.clone().try_into()?;
1078
1079            if details.is_some() {
1080                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1081            }
1082
1083            let config = connection
1084                .config(
1085                    &storage_configuration.connection_context.secrets_reader,
1086                    storage_configuration,
1087                    InTask::No,
1088                )
1089                .await?;
1090
1091            let mut conn = config
1092                .connect(
1093                    "mysql purification",
1094                    &storage_configuration.connection_context.ssh_tunnel_manager,
1095                )
1096                .await?;
1097
1098            // Check if the MySQL database is configured to allow row-based consistent GTID replication
1099            let mut replication_errors = vec![];
1100            for error in [
1101                mz_mysql_util::ensure_gtid_consistency(&mut conn)
1102                    .await
1103                    .err(),
1104                mz_mysql_util::ensure_full_row_binlog_format(&mut conn)
1105                    .await
1106                    .err(),
1107                mz_mysql_util::ensure_replication_commit_order(&mut conn)
1108                    .await
1109                    .err(),
1110            ] {
1111                match error {
1112                    Some(mz_mysql_util::MySqlError::InvalidSystemSetting {
1113                        setting,
1114                        expected,
1115                        actual,
1116                    }) => {
1117                        replication_errors.push((setting, expected, actual));
1118                    }
1119                    Some(err) => Err(err)?,
1120                    None => (),
1121                }
1122            }
1123            if !replication_errors.is_empty() {
1124                Err(MySqlSourcePurificationError::ReplicationSettingsError(
1125                    replication_errors,
1126                ))?;
1127            }
1128
1129            // Retrieve the current @gtid_executed value of the server to mark as the effective
1130            // initial snapshot point such that we can ensure consistency if the initial source
1131            // snapshot is broken up over multiple points in time.
1132            let initial_gtid_set =
1133                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1134
1135            let reference_client = SourceReferenceClient::MySql {
1136                conn: &mut conn,
1137                include_system_schemas: mysql::references_system_schemas(external_references),
1138            };
1139            retrieved_source_references = reference_client.get_source_references().await?;
1140
1141            let mysql::PurifiedSourceExports {
1142                source_exports: subsources,
1143                normalized_text_columns,
1144                normalized_exclude_columns,
1145            } = mysql::purify_source_exports(
1146                &mut conn,
1147                &retrieved_source_references,
1148                external_references,
1149                text_columns,
1150                exclude_columns,
1151                source_name,
1152                initial_gtid_set.clone(),
1153                &reference_policy,
1154            )
1155            .await?;
1156            requested_subsource_map.extend(subsources);
1157
1158            // We don't have any fields in this details struct but keep this around for
1159            // conformity with postgres and in-case we end up needing it again in the future.
1160            let details = MySqlSourceDetails {};
1161            // Update options with the purified details
1162            options
1163                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1164            options.push(MySqlConfigOption {
1165                name: MySqlConfigOptionName::Details,
1166                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1167                    details.into_proto().encode_to_vec(),
1168                )))),
1169            });
1170
1171            if let Some(text_cols_option) = options
1172                .iter_mut()
1173                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1174            {
1175                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1176            }
1177            if let Some(ignore_cols_option) = options
1178                .iter_mut()
1179                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1180            {
1181                ignore_cols_option.value =
1182                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1183            }
1184        }
1185        CreateSourceConnection::LoadGenerator { generator, options } => {
1186            let load_generator =
1187                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1188
1189            let reference_client = SourceReferenceClient::LoadGenerator {
1190                generator: &load_generator,
1191            };
1192            retrieved_source_references = reference_client.get_source_references().await?;
1193            // Filter to the references that need to be created as 'subsources', which
1194            // doesn't include the default output for single-output sources.
1195            // TODO(database-issues#8620): Remove once subsources are removed
1196            let multi_output_sources =
1197                retrieved_source_references
1198                    .all_references()
1199                    .iter()
1200                    .any(|r| {
1201                        r.load_generator_output().expect("is loadgen")
1202                            != &LoadGeneratorOutput::Default
1203                    });
1204
1205            match external_references {
1206                Some(requested)
1207                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1208                {
1209                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1210                }
1211                Some(requested) if !multi_output_sources => match requested {
1212                    ExternalReferences::SubsetTables(_) => {
1213                        Err(LoadGeneratorSourcePurificationError::ForTables)?
1214                    }
1215                    ExternalReferences::SubsetSchemas(_) => {
1216                        Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1217                    }
1218                    ExternalReferences::All => {
1219                        Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1220                    }
1221                },
1222                Some(requested) => {
1223                    let requested_exports = retrieved_source_references
1224                        .requested_source_exports(Some(requested), source_name)?;
1225                    for export in requested_exports {
1226                        requested_subsource_map.insert(
1227                            export.name,
1228                            PurifiedSourceExport {
1229                                external_reference: export.external_reference,
1230                                details: PurifiedExportDetails::LoadGenerator {
1231                                    table: export
1232                                        .meta
1233                                        .load_generator_desc()
1234                                        .expect("is loadgen")
1235                                        .clone(),
1236                                    output: export
1237                                        .meta
1238                                        .load_generator_output()
1239                                        .expect("is loadgen")
1240                                        .clone(),
1241                                },
1242                            },
1243                        );
1244                    }
1245                }
1246                None => {
1247                    if multi_output_sources
1248                        && matches!(reference_policy, SourceReferencePolicy::Required)
1249                    {
1250                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1251                    }
1252                }
1253            }
1254
1255            if let LoadGenerator::Clock = generator {
1256                if !options
1257                    .iter()
1258                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1259                {
1260                    let now = catalog.now();
1261                    options.push(LoadGeneratorOption {
1262                        name: LoadGeneratorOptionName::AsOf,
1263                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1264                    });
1265                }
1266            }
1267        }
1268    }
1269
1270    // Now that we know which subsources to create alongside this
1271    // statement, remove the references so it is not canonicalized as
1272    // part of the `CREATE SOURCE` statement in the catalog.
1273    *external_references = None;
1274
1275    // Generate progress subsource
1276
1277    // Take name from input or generate name
1278    let name = match progress_subsource {
1279        Some(name) => match name {
1280            DeferredItemName::Deferred(name) => name.clone(),
1281            DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1282        },
1283        None => {
1284            let (item, prefix) = source_name.0.split_last().unwrap();
1285            let item_name = Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1286                let mut suggested_name = prefix.to_vec();
1287                suggested_name.push(candidate.clone());
1288
1289                let partial = normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1290                let qualified = scx.allocate_qualified_name(partial)?;
1291                let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1292                let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1293                Ok::<_, PlanError>(!item_exists && !type_exists)
1294            })?;
1295
1296            let mut full_name = prefix.to_vec();
1297            full_name.push(item_name);
1298            let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1299            let qualified_name = scx.allocate_qualified_name(full_name)?;
1300            let full_name = scx.catalog.resolve_full_name(&qualified_name);
1301
1302            UnresolvedItemName::from(full_name.clone())
1303        }
1304    };
1305
1306    let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1307
1308    // Create the subsource statement
1309    let create_progress_subsource_stmt = CreateSubsourceStatement {
1310        name,
1311        columns,
1312        // Progress subsources do not refer to the source to which they belong.
1313        // Instead the primary source depends on it (the opposite is true of
1314        // ingestion exports, which depend on the primary source).
1315        of_source: None,
1316        constraints,
1317        if_not_exists: false,
1318        with_options: vec![CreateSubsourceOption {
1319            name: CreateSubsourceOptionName::Progress,
1320            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1321        }],
1322    };
1323
1324    purify_source_format(
1325        &catalog,
1326        format,
1327        &format_options,
1328        envelope,
1329        storage_configuration,
1330    )
1331    .await?;
1332
1333    Ok(PurifiedStatement::PurifiedCreateSource {
1334        create_progress_subsource_stmt,
1335        create_source_stmt,
1336        subsources: requested_subsource_map,
1337        available_source_references: retrieved_source_references.available_source_references(),
1338    })
1339}
1340
1341/// On success, returns the details on new subsources and updated
1342/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1343async fn purify_alter_source(
1344    catalog: impl SessionCatalog,
1345    stmt: AlterSourceStatement<Aug>,
1346    storage_configuration: &StorageConfiguration,
1347) -> Result<PurifiedStatement, PlanError> {
1348    let scx = StatementContext::new(None, &catalog);
1349    let AlterSourceStatement {
1350        source_name: unresolved_source_name,
1351        action,
1352        if_exists,
1353    } = stmt;
1354
1355    // Get name.
1356    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1357        Ok(item) => item,
1358        Err(_) if if_exists => {
1359            return Ok(PurifiedStatement::PurifiedAlterSource {
1360                alter_source_stmt: AlterSourceStatement {
1361                    source_name: unresolved_source_name,
1362                    action,
1363                    if_exists,
1364                },
1365            });
1366        }
1367        Err(e) => return Err(e),
1368    };
1369
1370    // Ensure it's an ingestion-based and alterable source.
1371    let desc = match item.source_desc()? {
1372        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1373        None => {
1374            sql_bail!("cannot ALTER this type of source")
1375        }
1376    };
1377
1378    let source_name = item.name();
1379
1380    let resolved_source_name = ResolvedItemName::Item {
1381        id: item.id(),
1382        qualifiers: item.name().qualifiers.clone(),
1383        full_name: scx.catalog.resolve_full_name(source_name),
1384        print_id: true,
1385        version: RelationVersionSelector::Latest,
1386    };
1387
1388    let partial_name = scx.catalog.minimal_qualification(source_name);
1389
1390    match action {
1391        AlterSourceAction::AddSubsources {
1392            external_references,
1393            options,
1394        } => {
1395            if scx.catalog.system_vars().enable_create_table_from_source()
1396                && scx.catalog.system_vars().force_source_table_syntax()
1397            {
1398                Err(PlanError::UseTablesForSources(
1399                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1400                ))?;
1401            }
1402
1403            purify_alter_source_add_subsources(
1404                external_references,
1405                options,
1406                desc,
1407                partial_name,
1408                unresolved_source_name,
1409                resolved_source_name,
1410                storage_configuration,
1411            )
1412            .await
1413        }
1414        AlterSourceAction::RefreshReferences => {
1415            purify_alter_source_refresh_references(
1416                desc,
1417                resolved_source_name,
1418                storage_configuration,
1419            )
1420            .await
1421        }
1422        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1423            alter_source_stmt: AlterSourceStatement {
1424                source_name: unresolved_source_name,
1425                action,
1426                if_exists,
1427            },
1428        }),
1429    }
1430}
1431
1432// TODO(database-issues#8620): Remove once subsources are removed
1433/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1434async fn purify_alter_source_add_subsources(
1435    external_references: Vec<ExternalReferenceExport>,
1436    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1437    desc: SourceDesc,
1438    partial_source_name: PartialItemName,
1439    unresolved_source_name: UnresolvedItemName,
1440    resolved_source_name: ResolvedItemName,
1441    storage_configuration: &StorageConfiguration,
1442) -> Result<PurifiedStatement, PlanError> {
1443    // Validate this is a source that can have subsources added.
1444    match desc.connection {
1445        GenericSourceConnection::Postgres(PostgresSourceConnection {
1446            connection:
1447                PostgresConnection {
1448                    flavor: PostgresFlavor::Vanilla,
1449                    ..
1450                },
1451            ..
1452        }) => {}
1453        GenericSourceConnection::MySql(_) => {}
1454        GenericSourceConnection::SqlServer(_) => {}
1455        _ => sql_bail!(
1456            "source {} does not support ALTER SOURCE.",
1457            partial_source_name
1458        ),
1459    };
1460
1461    let connection_name = desc.connection.name();
1462
1463    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1464        text_columns,
1465        exclude_columns,
1466        details,
1467        seen: _,
1468    } = options.clone().try_into()?;
1469    assert_none!(details, "details cannot be explicitly set");
1470
1471    let mut requested_subsource_map = BTreeMap::new();
1472
1473    match desc.connection {
1474        GenericSourceConnection::Postgres(pg_source_connection) => {
1475            // Get PostgresConnection for generating subsources.
1476            let pg_connection = &pg_source_connection.connection;
1477
1478            let config = pg_connection
1479                .config(
1480                    &storage_configuration.connection_context.secrets_reader,
1481                    storage_configuration,
1482                    InTask::No,
1483                )
1484                .await?;
1485
1486            let client = config
1487                .connect(
1488                    "postgres_purification",
1489                    &storage_configuration.connection_context.ssh_tunnel_manager,
1490                )
1491                .await?;
1492
1493            let available_replication_slots =
1494                mz_postgres_util::available_replication_slots(&client).await?;
1495
1496            // We need 1 additional replication slot for the snapshots
1497            if available_replication_slots < 1 {
1498                Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
1499            }
1500
1501            if !exclude_columns.is_empty() {
1502                sql_bail!(
1503                    "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1504                    partial_source_name,
1505                    connection_name
1506                )
1507            }
1508
1509            let reference_client = SourceReferenceClient::Postgres {
1510                client: &client,
1511                publication: &pg_source_connection.publication,
1512                database: &pg_connection.database,
1513            };
1514            let retrieved_source_references = reference_client.get_source_references().await?;
1515
1516            let postgres::PurifiedSourceExports {
1517                source_exports: subsources,
1518                normalized_text_columns,
1519            } = postgres::purify_source_exports(
1520                &client,
1521                &config,
1522                &retrieved_source_references,
1523                &Some(ExternalReferences::SubsetTables(external_references)),
1524                text_columns,
1525                &unresolved_source_name,
1526                &SourceReferencePolicy::Required,
1527            )
1528            .await?;
1529
1530            if let Some(text_cols_option) = options
1531                .iter_mut()
1532                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1533            {
1534                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1535            }
1536
1537            requested_subsource_map.extend(subsources);
1538        }
1539        GenericSourceConnection::MySql(mysql_source_connection) => {
1540            let mysql_connection = &mysql_source_connection.connection;
1541            let config = mysql_connection
1542                .config(
1543                    &storage_configuration.connection_context.secrets_reader,
1544                    storage_configuration,
1545                    InTask::No,
1546                )
1547                .await?;
1548
1549            let mut conn = config
1550                .connect(
1551                    "mysql purification",
1552                    &storage_configuration.connection_context.ssh_tunnel_manager,
1553                )
1554                .await?;
1555
1556            // Retrieve the current @gtid_executed value of the server to mark as the effective
1557            // initial snapshot point for these subsources.
1558            let initial_gtid_set =
1559                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1560
1561            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1562
1563            let reference_client = SourceReferenceClient::MySql {
1564                conn: &mut conn,
1565                include_system_schemas: mysql::references_system_schemas(&requested_references),
1566            };
1567            let retrieved_source_references = reference_client.get_source_references().await?;
1568
1569            let mysql::PurifiedSourceExports {
1570                source_exports: subsources,
1571                normalized_text_columns,
1572                normalized_exclude_columns,
1573            } = mysql::purify_source_exports(
1574                &mut conn,
1575                &retrieved_source_references,
1576                &requested_references,
1577                text_columns,
1578                exclude_columns,
1579                &unresolved_source_name,
1580                initial_gtid_set,
1581                &SourceReferencePolicy::Required,
1582            )
1583            .await?;
1584            requested_subsource_map.extend(subsources);
1585
1586            // Update options with the purified details
1587            if let Some(text_cols_option) = options
1588                .iter_mut()
1589                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1590            {
1591                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1592            }
1593            if let Some(ignore_cols_option) = options
1594                .iter_mut()
1595                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1596            {
1597                ignore_cols_option.value =
1598                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1599            }
1600        }
1601        GenericSourceConnection::SqlServer(sql_server_source) => {
1602            // Open a connection to the upstream SQL Server instance.
1603            let sql_server_connection = &sql_server_source.connection;
1604            let config = sql_server_connection
1605                .resolve_config(
1606                    &storage_configuration.connection_context.secrets_reader,
1607                    storage_configuration,
1608                    InTask::No,
1609                )
1610                .await?;
1611            let mut client = mz_sql_server_util::Client::connect(config).await?;
1612
1613            // Query the upstream SQL Server instance for available tables to replicate.
1614            let database = sql_server_connection.database.clone().into();
1615            let source_references = SourceReferenceClient::SqlServer {
1616                client: &mut client,
1617                database: Arc::clone(&database),
1618            }
1619            .get_source_references()
1620            .await?;
1621            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1622
1623            let result = sql_server::purify_source_exports(
1624                &*database,
1625                &mut client,
1626                &source_references,
1627                &requested_references,
1628                &text_columns,
1629                &exclude_columns,
1630                &unresolved_source_name,
1631                &SourceReferencePolicy::Required,
1632            )
1633            .await;
1634            let sql_server::PurifiedSourceExports {
1635                source_exports,
1636                normalized_text_columns,
1637                normalized_excl_columns,
1638            } = result?;
1639
1640            // Add the new exports to our subsource map.
1641            requested_subsource_map.extend(source_exports);
1642
1643            // Update options on the CREATE SOURCE statement with the purified details.
1644            if let Some(text_cols_option) = options
1645                .iter_mut()
1646                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1647            {
1648                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1649            }
1650            if let Some(ignore_cols_option) = options
1651                .iter_mut()
1652                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1653            {
1654                ignore_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1655            }
1656        }
1657        _ => unreachable!(),
1658    };
1659
1660    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1661        source_name: resolved_source_name,
1662        options,
1663        subsources: requested_subsource_map,
1664    })
1665}
1666
1667async fn purify_alter_source_refresh_references(
1668    desc: SourceDesc,
1669    resolved_source_name: ResolvedItemName,
1670    storage_configuration: &StorageConfiguration,
1671) -> Result<PurifiedStatement, PlanError> {
1672    let retrieved_source_references = match desc.connection {
1673        GenericSourceConnection::Postgres(pg_source_connection) => {
1674            // Get PostgresConnection for generating subsources.
1675            let pg_connection = &pg_source_connection.connection;
1676
1677            let config = pg_connection
1678                .config(
1679                    &storage_configuration.connection_context.secrets_reader,
1680                    storage_configuration,
1681                    InTask::No,
1682                )
1683                .await?;
1684
1685            let client = config
1686                .connect(
1687                    "postgres_purification",
1688                    &storage_configuration.connection_context.ssh_tunnel_manager,
1689                )
1690                .await?;
1691            let reference_client = SourceReferenceClient::Postgres {
1692                client: &client,
1693                publication: &pg_source_connection.publication,
1694                database: &pg_connection.database,
1695            };
1696            reference_client.get_source_references().await?
1697        }
1698        GenericSourceConnection::MySql(mysql_source_connection) => {
1699            let mysql_connection = &mysql_source_connection.connection;
1700            let config = mysql_connection
1701                .config(
1702                    &storage_configuration.connection_context.secrets_reader,
1703                    storage_configuration,
1704                    InTask::No,
1705                )
1706                .await?;
1707
1708            let mut conn = config
1709                .connect(
1710                    "mysql purification",
1711                    &storage_configuration.connection_context.ssh_tunnel_manager,
1712                )
1713                .await?;
1714
1715            let reference_client = SourceReferenceClient::MySql {
1716                conn: &mut conn,
1717                include_system_schemas: false,
1718            };
1719            reference_client.get_source_references().await?
1720        }
1721        GenericSourceConnection::SqlServer(sql_server_source) => {
1722            // Open a connection to the upstream SQL Server instance.
1723            let sql_server_connection = &sql_server_source.connection;
1724            let config = sql_server_connection
1725                .resolve_config(
1726                    &storage_configuration.connection_context.secrets_reader,
1727                    storage_configuration,
1728                    InTask::No,
1729                )
1730                .await?;
1731            let mut client = mz_sql_server_util::Client::connect(config).await?;
1732
1733            // Query the upstream SQL Server instance for available tables to replicate.
1734            let source_references = SourceReferenceClient::SqlServer {
1735                client: &mut client,
1736                database: sql_server_connection.database.clone().into(),
1737            }
1738            .get_source_references()
1739            .await?;
1740            source_references
1741        }
1742        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1743            let reference_client = SourceReferenceClient::LoadGenerator {
1744                generator: &load_gen_connection.load_generator,
1745            };
1746            reference_client.get_source_references().await?
1747        }
1748        GenericSourceConnection::Kafka(kafka_conn) => {
1749            let reference_client = SourceReferenceClient::Kafka {
1750                topic: &kafka_conn.topic,
1751            };
1752            reference_client.get_source_references().await?
1753        }
1754    };
1755    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1756        source_name: resolved_source_name,
1757        available_source_references: retrieved_source_references.available_source_references(),
1758    })
1759}
1760
1761async fn purify_create_table_from_source(
1762    catalog: impl SessionCatalog,
1763    mut stmt: CreateTableFromSourceStatement<Aug>,
1764    storage_configuration: &StorageConfiguration,
1765) -> Result<PurifiedStatement, PlanError> {
1766    let scx = StatementContext::new(None, &catalog);
1767    let CreateTableFromSourceStatement {
1768        name: _,
1769        columns,
1770        constraints,
1771        source: source_name,
1772        if_not_exists: _,
1773        external_reference,
1774        format,
1775        envelope,
1776        include_metadata: _,
1777        with_options,
1778    } = &mut stmt;
1779
1780    // Columns and constraints cannot be specified by the user but will be populated below.
1781    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1782        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1783    }
1784    if !constraints.is_empty() {
1785        sql_bail!(
1786            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1787        );
1788    }
1789
1790    // Get the source item
1791    let item = match scx.get_item_by_resolved_name(source_name) {
1792        Ok(item) => item,
1793        Err(e) => return Err(e),
1794    };
1795
1796    // Ensure it's an ingestion-based and alterable source.
1797    let desc = match item.source_desc()? {
1798        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1799        None => {
1800            sql_bail!("cannot ALTER this type of source")
1801        }
1802    };
1803    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1804    let qualified_source_name = item.name();
1805    let connection_name = desc.connection.name();
1806
1807    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1808        text_columns,
1809        exclude_columns,
1810        details,
1811        partition_by: _,
1812        seen: _,
1813    } = with_options.clone().try_into()?;
1814    assert_none!(details, "details cannot be explicitly set");
1815
1816    // Our text column values are unqualified (just column names), but the purification methods below
1817    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1818    // need to qualify them using the external reference first.
1819    let qualified_text_columns = text_columns
1820        .iter()
1821        .map(|col| {
1822            UnresolvedItemName(
1823                external_reference
1824                    .as_ref()
1825                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1826                    .unwrap_or_else(|| vec![col.clone()]),
1827            )
1828        })
1829        .collect_vec();
1830    let qualified_exclude_columns = exclude_columns
1831        .iter()
1832        .map(|col| {
1833            UnresolvedItemName(
1834                external_reference
1835                    .as_ref()
1836                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1837                    .unwrap_or_else(|| vec![col.clone()]),
1838            )
1839        })
1840        .collect_vec();
1841
1842    // Should be overriden below if a source-specific format is required.
1843    let mut format_options = SourceFormatOptions::Default;
1844
1845    let retrieved_source_references: RetrievedSourceReferences;
1846
1847    let requested_references = external_reference.as_ref().map(|ref_name| {
1848        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1849            reference: ref_name.clone(),
1850            alias: None,
1851        }])
1852    });
1853
1854    // Run purification work specific to each source type: resolve the external reference to
1855    // a fully qualified name and obtain the appropriate details for the source-export statement
1856    let purified_export = match desc.connection {
1857        GenericSourceConnection::Postgres(pg_source_connection) => {
1858            // Get PostgresConnection for generating subsources.
1859            let pg_connection = &pg_source_connection.connection;
1860
1861            let config = pg_connection
1862                .config(
1863                    &storage_configuration.connection_context.secrets_reader,
1864                    storage_configuration,
1865                    InTask::No,
1866                )
1867                .await?;
1868
1869            let client = config
1870                .connect(
1871                    "postgres_purification",
1872                    &storage_configuration.connection_context.ssh_tunnel_manager,
1873                )
1874                .await?;
1875
1876            let available_replication_slots =
1877                mz_postgres_util::available_replication_slots(&client).await?;
1878
1879            // We need 1 additional replication slot for the snapshots
1880            if available_replication_slots < 1 {
1881                Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
1882            }
1883
1884            // TODO(roshan): Add support for PG sources to allow this
1885            if !exclude_columns.is_empty() {
1886                sql_bail!(
1887                    "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1888                    scx.catalog.minimal_qualification(qualified_source_name),
1889                    connection_name
1890                )
1891            }
1892
1893            let reference_client = SourceReferenceClient::Postgres {
1894                client: &client,
1895                publication: &pg_source_connection.publication,
1896                database: &pg_connection.database,
1897            };
1898            retrieved_source_references = reference_client.get_source_references().await?;
1899
1900            let postgres::PurifiedSourceExports {
1901                source_exports,
1902                // TODO(database-issues#8620): Remove once subsources are removed
1903                // This `normalized_text_columns` is not relevant for us and is only returned for
1904                // `CREATE SOURCE` statements that automatically generate subsources
1905                normalized_text_columns: _,
1906            } = postgres::purify_source_exports(
1907                &client,
1908                &config,
1909                &retrieved_source_references,
1910                &requested_references,
1911                qualified_text_columns,
1912                &unresolved_source_name,
1913                &SourceReferencePolicy::Required,
1914            )
1915            .await?;
1916            // There should be exactly one source_export returned for this statement
1917            let (_, purified_export) = source_exports.into_iter().next().unwrap();
1918            purified_export
1919        }
1920        GenericSourceConnection::MySql(mysql_source_connection) => {
1921            let mysql_connection = &mysql_source_connection.connection;
1922            let config = mysql_connection
1923                .config(
1924                    &storage_configuration.connection_context.secrets_reader,
1925                    storage_configuration,
1926                    InTask::No,
1927                )
1928                .await?;
1929
1930            let mut conn = config
1931                .connect(
1932                    "mysql purification",
1933                    &storage_configuration.connection_context.ssh_tunnel_manager,
1934                )
1935                .await?;
1936
1937            // Retrieve the current @gtid_executed value of the server to mark as the effective
1938            // initial snapshot point for this table.
1939            let initial_gtid_set =
1940                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1941
1942            let reference_client = SourceReferenceClient::MySql {
1943                conn: &mut conn,
1944                include_system_schemas: mysql::references_system_schemas(&requested_references),
1945            };
1946            retrieved_source_references = reference_client.get_source_references().await?;
1947
1948            let mysql::PurifiedSourceExports {
1949                source_exports,
1950                // TODO(database-issues#8620): Remove once subsources are removed
1951                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1952                // `CREATE SOURCE` statements that automatically generate subsources
1953                normalized_text_columns: _,
1954                normalized_exclude_columns: _,
1955            } = mysql::purify_source_exports(
1956                &mut conn,
1957                &retrieved_source_references,
1958                &requested_references,
1959                qualified_text_columns,
1960                qualified_exclude_columns,
1961                &unresolved_source_name,
1962                initial_gtid_set,
1963                &SourceReferencePolicy::Required,
1964            )
1965            .await?;
1966            // There should be exactly one source_export returned for this statement
1967            let (_, purified_export) = source_exports.into_iter().next().unwrap();
1968            purified_export
1969        }
1970        GenericSourceConnection::SqlServer(_sql_server_source) => {
1971            // TODO(sql_server2): Support CREATE TABLE ... FROM SOURCE.
1972            return Err(PlanError::Unsupported {
1973                feature: "CREATE TABLE ... FROM SQL SERVER SOURCE".to_string(),
1974                discussion_no: None,
1975            });
1976        }
1977        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1978            let reference_client = SourceReferenceClient::LoadGenerator {
1979                generator: &load_gen_connection.load_generator,
1980            };
1981            retrieved_source_references = reference_client.get_source_references().await?;
1982
1983            let requested_exports = retrieved_source_references
1984                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1985            // There should be exactly one source_export returned
1986            let export = requested_exports.into_iter().next().unwrap();
1987            PurifiedSourceExport {
1988                external_reference: export.external_reference,
1989                details: PurifiedExportDetails::LoadGenerator {
1990                    table: export
1991                        .meta
1992                        .load_generator_desc()
1993                        .expect("is loadgen")
1994                        .clone(),
1995                    output: export
1996                        .meta
1997                        .load_generator_output()
1998                        .expect("is loadgen")
1999                        .clone(),
2000                },
2001            }
2002        }
2003        GenericSourceConnection::Kafka(kafka_conn) => {
2004            let reference_client = SourceReferenceClient::Kafka {
2005                topic: &kafka_conn.topic,
2006            };
2007            retrieved_source_references = reference_client.get_source_references().await?;
2008            let requested_exports = retrieved_source_references
2009                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2010            // There should be exactly one source_export returned
2011            let export = requested_exports.into_iter().next().unwrap();
2012
2013            format_options = SourceFormatOptions::Kafka {
2014                topic: kafka_conn.topic.clone(),
2015            };
2016            PurifiedSourceExport {
2017                external_reference: export.external_reference,
2018                details: PurifiedExportDetails::Kafka {},
2019            }
2020        }
2021    };
2022
2023    purify_source_format(
2024        &catalog,
2025        format,
2026        &format_options,
2027        envelope,
2028        storage_configuration,
2029    )
2030    .await?;
2031
2032    // Update the external reference in the statement to the resolved fully-qualified
2033    // external reference
2034    *external_reference = Some(purified_export.external_reference.clone());
2035
2036    // Update options in the statement using the purified export details
2037    match &purified_export.details {
2038        PurifiedExportDetails::Postgres { .. } => {
2039            let mut unsupported_cols = vec![];
2040            let postgres::PostgresExportStatementValues {
2041                columns: gen_columns,
2042                constraints: gen_constraints,
2043                text_columns: gen_text_columns,
2044                details: gen_details,
2045                external_reference: _,
2046            } = postgres::generate_source_export_statement_values(
2047                &scx,
2048                purified_export,
2049                &mut unsupported_cols,
2050            )?;
2051            if !unsupported_cols.is_empty() {
2052                unsupported_cols.sort();
2053                Err(PgSourcePurificationError::UnrecognizedTypes {
2054                    cols: unsupported_cols,
2055                })?;
2056            }
2057
2058            if let Some(text_cols_option) = with_options
2059                .iter_mut()
2060                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2061            {
2062                if let Some(gen_text_columns) = gen_text_columns {
2063                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2064                } else {
2065                    soft_panic_or_log!(
2066                        "text_columns should be Some if text_cols_option is present"
2067                    );
2068                }
2069            }
2070            match columns {
2071                TableFromSourceColumns::Defined(_) => unreachable!(),
2072                TableFromSourceColumns::NotSpecified => {
2073                    *columns = TableFromSourceColumns::Defined(gen_columns);
2074                    *constraints = gen_constraints;
2075                }
2076                TableFromSourceColumns::Named(_) => {
2077                    sql_bail!("columns cannot be named for Postgres sources")
2078                }
2079            }
2080            with_options.push(TableFromSourceOption {
2081                name: TableFromSourceOptionName::Details,
2082                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2083                    gen_details.into_proto().encode_to_vec(),
2084                )))),
2085            })
2086        }
2087        PurifiedExportDetails::MySql { .. } => {
2088            let mysql::MySqlExportStatementValues {
2089                columns: gen_columns,
2090                constraints: gen_constraints,
2091                text_columns: gen_text_columns,
2092                exclude_columns: gen_exclude_columns,
2093                details: gen_details,
2094                external_reference: _,
2095            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2096
2097            if let Some(text_cols_option) = with_options
2098                .iter_mut()
2099                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2100            {
2101                if let Some(gen_text_columns) = gen_text_columns {
2102                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2103                } else {
2104                    soft_panic_or_log!(
2105                        "text_columns should be Some if text_cols_option is present"
2106                    );
2107                }
2108            }
2109            if let Some(ignore_cols_option) = with_options
2110                .iter_mut()
2111                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2112            {
2113                if let Some(gen_exclude_columns) = gen_exclude_columns {
2114                    ignore_cols_option.value = Some(WithOptionValue::Sequence(gen_exclude_columns));
2115                } else {
2116                    soft_panic_or_log!(
2117                        "text_columns should be Some if ignore_cols_option is present"
2118                    );
2119                }
2120            }
2121            match columns {
2122                TableFromSourceColumns::Defined(_) => unreachable!(),
2123                TableFromSourceColumns::NotSpecified => {
2124                    *columns = TableFromSourceColumns::Defined(gen_columns);
2125                    *constraints = gen_constraints;
2126                }
2127                TableFromSourceColumns::Named(_) => {
2128                    sql_bail!("columns cannot be named for MySQL sources")
2129                }
2130            }
2131            with_options.push(TableFromSourceOption {
2132                name: TableFromSourceOptionName::Details,
2133                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2134                    gen_details.into_proto().encode_to_vec(),
2135                )))),
2136            })
2137        }
2138        PurifiedExportDetails::SqlServer { .. } => {
2139            // TODO(sql_server2): Support CREATE TABLE ... FROM SOURCE.
2140            return Err(PlanError::Unsupported {
2141                feature: "CREATE TABLE ... FROM SQL SERVER SOURCE".to_string(),
2142                discussion_no: None,
2143            });
2144        }
2145        PurifiedExportDetails::LoadGenerator { .. } => {
2146            let (desc, output) = match purified_export.details {
2147                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2148                _ => unreachable!("purified export details must be load generator"),
2149            };
2150            // We only determine the table description for multi-output load generator sources here,
2151            // whereas single-output load generators will have their relation description
2152            // determined during statment planning as envelope and format options may affect their
2153            // schema.
2154            if let Some(desc) = desc {
2155                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2156                match columns {
2157                    TableFromSourceColumns::Defined(_) => unreachable!(),
2158                    TableFromSourceColumns::NotSpecified => {
2159                        *columns = TableFromSourceColumns::Defined(gen_columns);
2160                        *constraints = gen_constraints;
2161                    }
2162                    TableFromSourceColumns::Named(_) => {
2163                        sql_bail!("columns cannot be named for multi-output load generator sources")
2164                    }
2165                }
2166            }
2167            let details = SourceExportStatementDetails::LoadGenerator { output };
2168            with_options.push(TableFromSourceOption {
2169                name: TableFromSourceOptionName::Details,
2170                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2171                    details.into_proto().encode_to_vec(),
2172                )))),
2173            })
2174        }
2175        PurifiedExportDetails::Kafka {} => {
2176            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2177            // format field, so we don't specify any columns or constraints to be stored
2178            // on the statement here. The RelationDesc will be determined during planning.
2179            let details = SourceExportStatementDetails::Kafka {};
2180            with_options.push(TableFromSourceOption {
2181                name: TableFromSourceOptionName::Details,
2182                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2183                    details.into_proto().encode_to_vec(),
2184                )))),
2185            })
2186        }
2187    };
2188
2189    // TODO: We might as well use the retrieved available references to update the source
2190    // available references table in the catalog, so plumb this through.
2191    // available_source_references: retrieved_source_references.available_source_references(),
2192    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2193}
2194
2195enum SourceFormatOptions {
2196    Default,
2197    Kafka { topic: String },
2198}
2199
2200async fn purify_source_format(
2201    catalog: &dyn SessionCatalog,
2202    format: &mut Option<FormatSpecifier<Aug>>,
2203    options: &SourceFormatOptions,
2204    envelope: &Option<SourceEnvelope>,
2205    storage_configuration: &StorageConfiguration,
2206) -> Result<(), PlanError> {
2207    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2208        && !matches!(options, SourceFormatOptions::Kafka { .. })
2209    {
2210        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2211    }
2212
2213    match format.as_mut() {
2214        None => {}
2215        Some(FormatSpecifier::Bare(format)) => {
2216            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2217                .await?;
2218        }
2219
2220        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2221            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2222                .await?;
2223            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2224                .await?;
2225        }
2226    }
2227    Ok(())
2228}
2229
2230async fn purify_source_format_single(
2231    catalog: &dyn SessionCatalog,
2232    format: &mut Format<Aug>,
2233    options: &SourceFormatOptions,
2234    envelope: &Option<SourceEnvelope>,
2235    storage_configuration: &StorageConfiguration,
2236) -> Result<(), PlanError> {
2237    match format {
2238        Format::Avro(schema) => match schema {
2239            AvroSchema::Csr { csr_connection } => {
2240                purify_csr_connection_avro(
2241                    catalog,
2242                    options,
2243                    csr_connection,
2244                    envelope,
2245                    storage_configuration,
2246                )
2247                .await?
2248            }
2249            AvroSchema::InlineSchema { .. } => {}
2250        },
2251        Format::Protobuf(schema) => match schema {
2252            ProtobufSchema::Csr { csr_connection } => {
2253                purify_csr_connection_proto(
2254                    catalog,
2255                    options,
2256                    csr_connection,
2257                    envelope,
2258                    storage_configuration,
2259                )
2260                .await?;
2261            }
2262            ProtobufSchema::InlineSchema { .. } => {}
2263        },
2264        Format::Bytes
2265        | Format::Regex(_)
2266        | Format::Json { .. }
2267        | Format::Text
2268        | Format::Csv { .. } => (),
2269    }
2270    Ok(())
2271}
2272
2273pub fn generate_subsource_statements(
2274    scx: &StatementContext,
2275    source_name: ResolvedItemName,
2276    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2277) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2278    // get the first subsource to determine the connection type
2279    if subsources.is_empty() {
2280        return Ok(vec![]);
2281    }
2282    let (_, purified_export) = subsources.iter().next().unwrap();
2283
2284    let statements = match &purified_export.details {
2285        PurifiedExportDetails::Postgres { .. } => {
2286            crate::pure::postgres::generate_create_subsource_statements(
2287                scx,
2288                source_name,
2289                subsources,
2290            )?
2291        }
2292        PurifiedExportDetails::MySql { .. } => {
2293            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2294        }
2295        PurifiedExportDetails::SqlServer { .. } => {
2296            crate::pure::sql_server::generate_create_subsource_statements(
2297                scx,
2298                source_name,
2299                subsources,
2300            )?
2301        }
2302        PurifiedExportDetails::LoadGenerator { .. } => {
2303            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2304            for (subsource_name, purified_export) in subsources {
2305                let (desc, output) = match purified_export.details {
2306                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2307                    _ => unreachable!("purified export details must be load generator"),
2308                };
2309                let desc =
2310                    desc.expect("subsources cannot be generated for single-output load generators");
2311
2312                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2313                let details = SourceExportStatementDetails::LoadGenerator { output };
2314                // Create the subsource statement
2315                let subsource = CreateSubsourceStatement {
2316                    name: subsource_name,
2317                    columns,
2318                    of_source: Some(source_name.clone()),
2319                    // unlike sources that come from an external upstream, we
2320                    // have more leniency to introduce different constraints
2321                    // every time the load generator is run; i.e. we are not as
2322                    // worried about introducing junk data.
2323                    constraints: table_constraints,
2324                    if_not_exists: false,
2325                    with_options: vec![
2326                        CreateSubsourceOption {
2327                            name: CreateSubsourceOptionName::ExternalReference,
2328                            value: Some(WithOptionValue::UnresolvedItemName(
2329                                purified_export.external_reference,
2330                            )),
2331                        },
2332                        CreateSubsourceOption {
2333                            name: CreateSubsourceOptionName::Details,
2334                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2335                                details.into_proto().encode_to_vec(),
2336                            )))),
2337                        },
2338                    ],
2339                };
2340                subsource_stmts.push(subsource);
2341            }
2342
2343            subsource_stmts
2344        }
2345        PurifiedExportDetails::Kafka { .. } => {
2346            // TODO: as part of database-issues#8322, Kafka sources will begin
2347            // producing data––we'll need to understand the schema
2348            // of the output here.
2349            assert!(
2350                subsources.is_empty(),
2351                "Kafka sources do not produce data-bearing subsources"
2352            );
2353            vec![]
2354        }
2355    };
2356    Ok(statements)
2357}
2358
2359async fn purify_csr_connection_proto(
2360    catalog: &dyn SessionCatalog,
2361    options: &SourceFormatOptions,
2362    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2363    envelope: &Option<SourceEnvelope>,
2364    storage_configuration: &StorageConfiguration,
2365) -> Result<(), PlanError> {
2366    let SourceFormatOptions::Kafka { topic } = options else {
2367        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2368    };
2369
2370    let CsrConnectionProtobuf {
2371        seed,
2372        connection: CsrConnection {
2373            connection,
2374            options: _,
2375        },
2376    } = csr_connection;
2377    match seed {
2378        None => {
2379            let scx = StatementContext::new(None, &*catalog);
2380
2381            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2382                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2383                _ => sql_bail!("{} is not a schema registry connection", connection),
2384            };
2385
2386            let ccsr_client = ccsr_connection
2387                .connect(storage_configuration, InTask::No)
2388                .await
2389                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2390
2391            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2392            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2393                .await
2394                .ok();
2395
2396            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2397                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2398            }
2399
2400            *seed = Some(CsrSeedProtobuf { value, key });
2401        }
2402        Some(_) => (),
2403    }
2404
2405    Ok(())
2406}
2407
2408async fn purify_csr_connection_avro(
2409    catalog: &dyn SessionCatalog,
2410    options: &SourceFormatOptions,
2411    csr_connection: &mut CsrConnectionAvro<Aug>,
2412    envelope: &Option<SourceEnvelope>,
2413    storage_configuration: &StorageConfiguration,
2414) -> Result<(), PlanError> {
2415    let SourceFormatOptions::Kafka { topic } = options else {
2416        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2417    };
2418
2419    let CsrConnectionAvro {
2420        connection: CsrConnection { connection, .. },
2421        seed,
2422        key_strategy,
2423        value_strategy,
2424    } = csr_connection;
2425    if seed.is_none() {
2426        let scx = StatementContext::new(None, &*catalog);
2427        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2428            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2429            _ => sql_bail!("{} is not a schema registry connection", connection),
2430        };
2431        let ccsr_client = csr_connection
2432            .connect(storage_configuration, InTask::No)
2433            .await
2434            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2435
2436        let Schema {
2437            key_schema,
2438            value_schema,
2439        } = get_remote_csr_schema(
2440            &ccsr_client,
2441            key_strategy.clone().unwrap_or_default(),
2442            value_strategy.clone().unwrap_or_default(),
2443            topic,
2444        )
2445        .await?;
2446        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2447            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2448        }
2449
2450        *seed = Some(CsrSeedAvro {
2451            key_schema,
2452            value_schema,
2453        })
2454    }
2455
2456    Ok(())
2457}
2458
2459#[derive(Debug)]
2460pub struct Schema {
2461    pub key_schema: Option<String>,
2462    pub value_schema: String,
2463}
2464
2465async fn get_schema_with_strategy(
2466    client: &Client,
2467    strategy: ReaderSchemaSelectionStrategy,
2468    subject: &str,
2469) -> Result<Option<String>, PlanError> {
2470    match strategy {
2471        ReaderSchemaSelectionStrategy::Latest => {
2472            match client.get_schema_by_subject(subject).await {
2473                Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2474                Err(GetBySubjectError::SubjectNotFound)
2475                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2476                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2477                    schema_lookup: format!("subject {}", subject.quoted()),
2478                    cause: Arc::new(e),
2479                }),
2480            }
2481        }
2482        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2483        ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2484            Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2485            Err(GetByIdError::SchemaNotFound) => Ok(None),
2486            Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2487                schema_lookup: format!("ID {}", id),
2488                cause: Arc::new(e),
2489            }),
2490        },
2491    }
2492}
2493
2494async fn get_remote_csr_schema(
2495    ccsr_client: &mz_ccsr::Client,
2496    key_strategy: ReaderSchemaSelectionStrategy,
2497    value_strategy: ReaderSchemaSelectionStrategy,
2498    topic: &str,
2499) -> Result<Schema, PlanError> {
2500    let value_schema_name = format!("{}-value", topic);
2501    let value_schema =
2502        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2503    let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2504    let subject = format!("{}-key", topic);
2505    let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2506    Ok(Schema {
2507        key_schema,
2508        value_schema,
2509    })
2510}
2511
2512/// Collect protobuf message descriptor from CSR and compile the descriptor.
2513async fn compile_proto(
2514    subject_name: &String,
2515    ccsr_client: &Client,
2516) -> Result<CsrSeedProtobufSchema, PlanError> {
2517    let (primary_subject, dependency_subjects) = ccsr_client
2518        .get_subject_and_references(subject_name)
2519        .await
2520        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2521            schema_lookup: format!("subject {}", subject_name.quoted()),
2522            cause: Arc::new(e),
2523        })?;
2524
2525    // Compile .proto files into a file descriptor set.
2526    let mut source_tree = VirtualSourceTree::new();
2527    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2528        source_tree.as_mut().add_file(
2529            Path::new(&subject.name),
2530            subject.schema.raw.as_bytes().to_vec(),
2531        );
2532    }
2533    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2534    let fds = db
2535        .as_mut()
2536        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2537        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2538
2539    // Ensure there is exactly one message in the file.
2540    let primary_fd = fds.file(0);
2541    let message_name = match primary_fd.message_type_size() {
2542        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2543        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2544        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2545    };
2546
2547    // Encode the file descriptor set into a SQL byte string.
2548    let bytes = &fds
2549        .serialize()
2550        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2551    let mut schema = String::new();
2552    strconv::format_bytes(&mut schema, bytes);
2553
2554    Ok(CsrSeedProtobufSchema {
2555        schema,
2556        message_name,
2557    })
2558}
2559
2560const MZ_NOW_NAME: &str = "mz_now";
2561const MZ_NOW_SCHEMA: &str = "mz_catalog";
2562
2563/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2564/// references to ids appear or disappear during the purification.
2565///
2566/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2567/// this function is not making any network calls.
2568pub fn purify_create_materialized_view_options(
2569    catalog: impl SessionCatalog,
2570    mz_now: Option<Timestamp>,
2571    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2572    resolved_ids: &mut ResolvedIds,
2573) {
2574    // 0. Preparations:
2575    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2576    let (mz_now_id, mz_now_expr) = {
2577        let item = catalog
2578            .resolve_function(&PartialItemName {
2579                database: None,
2580                schema: Some(MZ_NOW_SCHEMA.to_string()),
2581                item: MZ_NOW_NAME.to_string(),
2582            })
2583            .expect("we should be able to resolve mz_now");
2584        (
2585            item.id(),
2586            Expr::Function(Function {
2587                name: ResolvedItemName::Item {
2588                    id: item.id(),
2589                    qualifiers: item.name().qualifiers.clone(),
2590                    full_name: catalog.resolve_full_name(item.name()),
2591                    print_id: false,
2592                    version: RelationVersionSelector::Latest,
2593                },
2594                args: FunctionArgs::Args {
2595                    args: Vec::new(),
2596                    order_by: Vec::new(),
2597                },
2598                filter: None,
2599                over: None,
2600                distinct: false,
2601            }),
2602        )
2603    };
2604    // Prepare the `mz_timestamp` type.
2605    let (mz_timestamp_id, mz_timestamp_type) = {
2606        let item = catalog.get_system_type("mz_timestamp");
2607        let full_name = catalog.resolve_full_name(item.name());
2608        (
2609            item.id(),
2610            ResolvedDataType::Named {
2611                id: item.id(),
2612                qualifiers: item.name().qualifiers.clone(),
2613                full_name,
2614                modifiers: vec![],
2615                print_id: true,
2616            },
2617        )
2618    };
2619
2620    let mut introduced_mz_timestamp = false;
2621
2622    for option in cmvs.with_options.iter_mut() {
2623        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2624        if matches!(
2625            option.value,
2626            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2627        ) {
2628            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2629                RefreshAtOptionValue {
2630                    time: mz_now_expr.clone(),
2631                },
2632            )));
2633        }
2634
2635        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2636        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2637            RefreshEveryOptionValue { aligned_to, .. },
2638        ))) = &mut option.value
2639        {
2640            if aligned_to.is_none() {
2641                *aligned_to = Some(mz_now_expr.clone());
2642            }
2643        }
2644
2645        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2646        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2647        match &mut option.value {
2648            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2649                time,
2650            }))) => {
2651                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2652                visitor.visit_expr_mut(time);
2653                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2654            }
2655            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2656                RefreshEveryOptionValue {
2657                    interval: _,
2658                    aligned_to: Some(aligned_to),
2659                },
2660            ))) => {
2661                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2662                visitor.visit_expr_mut(aligned_to);
2663                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2664            }
2665            _ => {}
2666        }
2667    }
2668
2669    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2670    if !cmvs.with_options.iter().any(|o| {
2671        matches!(
2672            o,
2673            MaterializedViewOption {
2674                value: Some(WithOptionValue::Refresh(..)),
2675                ..
2676            }
2677        )
2678    }) {
2679        cmvs.with_options.push(MaterializedViewOption {
2680            name: MaterializedViewOptionName::Refresh,
2681            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2682        })
2683    }
2684
2685    // 5. Attend to `resolved_ids`: The purification might have
2686    // - added references to `mz_timestamp`;
2687    // - removed references to `mz_now`.
2688    if introduced_mz_timestamp {
2689        resolved_ids.add_item(mz_timestamp_id);
2690    }
2691    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2692    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2693    // for `mz_now()` everywhere.
2694    let mut visitor = ExprContainsTemporalVisitor::new();
2695    visitor.visit_create_materialized_view_statement(cmvs);
2696    if !visitor.contains_temporal {
2697        resolved_ids.remove_item(&mz_now_id);
2698    }
2699}
2700
2701/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2702/// after purification.
2703pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2704    match &mvo.value {
2705        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2706            let mut visitor = ExprContainsTemporalVisitor::new();
2707            visitor.visit_expr(time);
2708            visitor.contains_temporal
2709        }
2710        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2711            interval: _,
2712            aligned_to: Some(aligned_to),
2713        }))) => {
2714            let mut visitor = ExprContainsTemporalVisitor::new();
2715            visitor.visit_expr(aligned_to);
2716            visitor.contains_temporal
2717        }
2718        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2719            interval: _,
2720            aligned_to: None,
2721        }))) => {
2722            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2723            // `ALIGNED TO` to `mz_now()`.
2724            true
2725        }
2726        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2727            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2728            true
2729        }
2730        _ => false,
2731    }
2732}
2733
2734/// Determines whether the AST involves `mz_now()`.
2735struct ExprContainsTemporalVisitor {
2736    pub contains_temporal: bool,
2737}
2738
2739impl ExprContainsTemporalVisitor {
2740    pub fn new() -> ExprContainsTemporalVisitor {
2741        ExprContainsTemporalVisitor {
2742            contains_temporal: false,
2743        }
2744    }
2745}
2746
2747impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2748    fn visit_function(&mut self, func: &Function<Aug>) {
2749        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2750        visit_function(self, func);
2751    }
2752}
2753
2754struct MzNowPurifierVisitor {
2755    pub mz_now: Option<Timestamp>,
2756    pub mz_timestamp_type: ResolvedDataType,
2757    pub introduced_mz_timestamp: bool,
2758}
2759
2760impl MzNowPurifierVisitor {
2761    pub fn new(
2762        mz_now: Option<Timestamp>,
2763        mz_timestamp_type: ResolvedDataType,
2764    ) -> MzNowPurifierVisitor {
2765        MzNowPurifierVisitor {
2766            mz_now,
2767            mz_timestamp_type,
2768            introduced_mz_timestamp: false,
2769        }
2770    }
2771}
2772
2773impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2774    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2775        match expr {
2776            Expr::Function(Function {
2777                name:
2778                    ResolvedItemName::Item {
2779                        full_name: FullItemName { item, .. },
2780                        ..
2781                    },
2782                ..
2783            }) if item == &MZ_NOW_NAME.to_string() => {
2784                let mz_now = self.mz_now.expect(
2785                    "we should have chosen a timestamp if the expression contains mz_now()",
2786                );
2787                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2788                // not alter the type of the expression.
2789                *expr = Expr::Cast {
2790                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2791                    data_type: self.mz_timestamp_type.clone(),
2792                };
2793                self.introduced_mz_timestamp = true;
2794            }
2795            _ => visit_expr_mut(self, expr),
2796        }
2797    }
2798}