mz_sql/
pure.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL purification.
11//!
12//! See the [crate-level documentation](crate) for details.
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::error::ErrorExt;
27use mz_ore::future::InTask;
28use mz_ore::iter::IteratorExt;
29use mz_ore::str::StrExt;
30use mz_ore::{assert_none, soft_panic_or_log};
31use mz_postgres_util::desc::PostgresTableDesc;
32use mz_postgres_util::replication::WalLevel;
33use mz_postgres_util::tunnel::PostgresFlavor;
34use mz_proto::RustType;
35use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
36use mz_sql_parser::ast::display::AstDisplay;
37use mz_sql_parser::ast::visit::{Visit, visit_function};
38use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
39use mz_sql_parser::ast::{
40    AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
41    ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
42    CreateSinkStatement, CreateSubsourceOption, CreateSubsourceOptionName,
43    CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
44    CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
45    DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
46    KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
47    MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
48    PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
49    RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
50    SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
51    TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
52};
53use mz_sql_server_util::desc::SqlServerTableDesc;
54use mz_storage_types::configuration::StorageConfiguration;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::connections::{Connection, PostgresConnection};
57use mz_storage_types::errors::ContextCreationError;
58use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
59use mz_storage_types::sources::mysql::MySqlSourceDetails;
60use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
61use mz_storage_types::sources::{
62    GenericSourceConnection, PostgresSourceConnection, SourceConnection, SourceDesc,
63    SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73    AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74    CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75    ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80    Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81    ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::SqlServerSourcePurificationError;
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91    CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92    LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103    external_reference: UnresolvedItemName,
104    name: UnresolvedItemName,
105    meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        f.debug_struct("RequestedSourceExport")
111            .field("external_reference", &self.external_reference)
112            .field("name", &self.name)
113            .field("meta", &self.meta)
114            .finish()
115    }
116}
117
118impl<T> RequestedSourceExport<T> {
119    fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120        RequestedSourceExport {
121            external_reference: self.external_reference,
122            name: self.name,
123            meta: new_meta,
124        }
125    }
126}
127
128/// Generates a subsource name by prepending source schema name if present
129///
130/// For eg. if source is `a.b`, then `a` will be prepended to the subsource name
131/// so that it's generated in the same schema as source
132fn source_export_name_gen(
133    source_name: &UnresolvedItemName,
134    subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136    let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137    partial.item = subsource_name.to_string();
138    Ok(UnresolvedItemName::from(partial))
139}
140
141// TODO(database-issues#8620): Remove once subsources are removed
142/// Validates the requested subsources do not have name conflicts with each other
143/// and that the same upstream table is not referenced multiple times.
144fn validate_source_export_names<T>(
145    requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147    // This condition would get caught during the catalog transaction, but produces a
148    // vague, non-contextual error. Instead, error here so we can suggest to the user
149    // how to fix the problem.
150    if let Some(name) = requested_source_exports
151        .iter()
152        .map(|subsource| &subsource.name)
153        .duplicates()
154        .next()
155        .cloned()
156    {
157        let mut upstream_references: Vec<_> = requested_source_exports
158            .into_iter()
159            .filter_map(|subsource| {
160                if &subsource.name == &name {
161                    Some(subsource.external_reference.clone())
162                } else {
163                    None
164                }
165            })
166            .collect();
167
168        upstream_references.sort();
169
170        Err(PlanError::SubsourceNameConflict {
171            name,
172            upstream_references,
173        })?;
174    }
175
176    // We disallow subsource statements from referencing the same upstream table, but we allow
177    // `CREATE TABLE .. FROM SOURCE` statements to do so. Since `CREATE TABLE .. FROM SOURCE`
178    // purification will only provide 1 `requested_source_export`, we can leave this here without
179    // needing to differentiate between the two types of statements.
180    // TODO(roshan): Remove this when auto-generated subsources are deprecated.
181    if let Some(name) = requested_source_exports
182        .iter()
183        .map(|export| &export.external_reference)
184        .duplicates()
185        .next()
186        .cloned()
187    {
188        let mut target_names: Vec<_> = requested_source_exports
189            .into_iter()
190            .filter_map(|export| {
191                if &export.external_reference == &name {
192                    Some(export.name.clone())
193                } else {
194                    None
195                }
196            })
197            .collect();
198
199        target_names.sort();
200
201        Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202    }
203
204    Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209    PurifiedCreateSource {
210        create_progress_subsource_stmt: CreateSubsourceStatement<Aug>,
211        create_source_stmt: CreateSourceStatement<Aug>,
212        // Map of subsource names to external details
213        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
214        /// All the available upstream references that can be added as tables
215        /// to this primary source.
216        available_source_references: SourceReferences,
217    },
218    PurifiedAlterSource {
219        alter_source_stmt: AlterSourceStatement<Aug>,
220    },
221    PurifiedAlterSourceAddSubsources {
222        // This just saves us an annoying catalog lookup
223        source_name: ResolvedItemName,
224        /// Options that we will need the values of to update the source's
225        /// definition.
226        options: Vec<AlterSourceAddSubsourceOption<Aug>>,
227        // Map of subsource names to external details
228        subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
229    },
230    PurifiedAlterSourceRefreshReferences {
231        source_name: ResolvedItemName,
232        /// The updated available upstream references for the primary source.
233        available_source_references: SourceReferences,
234    },
235    PurifiedCreateSink(CreateSinkStatement<Aug>),
236    PurifiedCreateTableFromSource {
237        stmt: CreateTableFromSourceStatement<Aug>,
238    },
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct PurifiedSourceExport {
243    pub external_reference: UnresolvedItemName,
244    pub details: PurifiedExportDetails,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum PurifiedExportDetails {
249    MySql {
250        table: MySqlTableDesc,
251        text_columns: Option<Vec<Ident>>,
252        exclude_columns: Option<Vec<Ident>>,
253        initial_gtid_set: String,
254    },
255    Postgres {
256        table: PostgresTableDesc,
257        text_columns: Option<Vec<Ident>>,
258    },
259    SqlServer {
260        table: SqlServerTableDesc,
261        text_columns: Option<Vec<Ident>>,
262        excl_columns: Option<Vec<Ident>>,
263        capture_instance: Arc<str>,
264    },
265    Kafka {},
266    LoadGenerator {
267        table: Option<RelationDesc>,
268        output: LoadGeneratorOutput,
269    },
270}
271
272/// Purifies a statement, removing any dependencies on external state.
273///
274/// See the section on [purification](crate#purification) in the crate
275/// documentation for details.
276///
277/// Note that this doesn't handle CREATE MATERIALIZED VIEW, which is
278/// handled by [purify_create_materialized_view_options] instead.
279/// This could be made more consistent by a refactoring discussed here:
280/// <https://github.com/MaterializeInc/materialize/pull/23870#discussion_r1435922709>
281pub async fn purify_statement(
282    catalog: impl SessionCatalog,
283    now: u64,
284    stmt: Statement<Aug>,
285    storage_configuration: &StorageConfiguration,
286) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
287    match stmt {
288        Statement::CreateSource(stmt) => {
289            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
290            (
291                purify_create_source(catalog, now, stmt, storage_configuration).await,
292                cluster_id,
293            )
294        }
295        Statement::AlterSource(stmt) => (
296            purify_alter_source(catalog, stmt, storage_configuration).await,
297            None,
298        ),
299        Statement::CreateSink(stmt) => {
300            let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
301            (
302                purify_create_sink(catalog, stmt, storage_configuration).await,
303                cluster_id,
304            )
305        }
306        Statement::CreateTableFromSource(stmt) => (
307            purify_create_table_from_source(catalog, stmt, storage_configuration).await,
308            None,
309        ),
310        o => unreachable!("{:?} does not need to be purified", o),
311    }
312}
313
314/// Injects `DOC ON` comments into all Avro formats that are using a schema
315/// registry by finding all SQL `COMMENT`s that are attached to the sink's
316/// underlying materialized view (as well as any types referenced by that
317/// underlying materialized view).
318pub(crate) fn purify_create_sink_avro_doc_on_options(
319    catalog: &dyn SessionCatalog,
320    from_id: CatalogItemId,
321    format: &mut Option<FormatSpecifier<Aug>>,
322) -> Result<(), PlanError> {
323    // Collect all objects referenced by the sink.
324    let from = catalog.get_item(&from_id);
325    let object_ids = from
326        .references()
327        .items()
328        .copied()
329        .chain_one(from.id())
330        .collect::<Vec<_>>();
331
332    // Collect all Avro formats that use a schema registry, as well as a set of
333    // all identifiers named in user-provided `DOC ON` options.
334    let mut avro_format_options = vec![];
335    for_each_format(format, |doc_on_schema, fmt| match fmt {
336        Format::Avro(AvroSchema::InlineSchema { .. })
337        | Format::Bytes
338        | Format::Csv { .. }
339        | Format::Json { .. }
340        | Format::Protobuf(..)
341        | Format::Regex(..)
342        | Format::Text => (),
343        Format::Avro(AvroSchema::Csr {
344            csr_connection: CsrConnectionAvro { connection, .. },
345        }) => {
346            avro_format_options.push((doc_on_schema, &mut connection.options));
347        }
348    });
349
350    // For each Avro format in the sink, inject the appropriate `DOC ON` options
351    // for each item referenced by the sink.
352    for (for_schema, options) in avro_format_options {
353        let user_provided_comments = options
354            .iter()
355            .filter_map(|CsrConfigOption { name, .. }| match name {
356                CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
357                _ => None,
358            })
359            .collect::<BTreeSet<_>>();
360
361        // Adding existing comments if not already provided by user
362        for object_id in &object_ids {
363            // Always add comments to the latest version of the item.
364            let item = catalog
365                .get_item(object_id)
366                .at_version(RelationVersionSelector::Latest);
367            let full_name = catalog.resolve_full_name(item.name());
368            let full_resolved_name = ResolvedItemName::Item {
369                id: *object_id,
370                qualifiers: item.name().qualifiers.clone(),
371                full_name: full_name.clone(),
372                print_id: !matches!(
373                    item.item_type(),
374                    CatalogItemType::Func | CatalogItemType::Type
375                ),
376                version: RelationVersionSelector::Latest,
377            };
378
379            if let Some(comments_map) = catalog.get_item_comments(object_id) {
380                // Attach comment for the item itself, if the user has not
381                // already provided an overriding `DOC ON` option for the item.
382                let doc_on_item_key = AvroDocOn {
383                    identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384                    for_schema,
385                };
386                if !user_provided_comments.contains(&doc_on_item_key) {
387                    if let Some(root_comment) = comments_map.get(&None) {
388                        options.push(CsrConfigOption {
389                            name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390                            value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391                                root_comment.clone(),
392                            ))),
393                        });
394                    }
395                }
396
397                // Attach comment for each column in the item, if the user has
398                // not already provided an overriding `DOC ON` option for the
399                // column.
400                if let Ok(desc) = item.desc(&full_name) {
401                    for (pos, column_name) in desc.iter_names().enumerate() {
402                        let comment = comments_map.get(&Some(pos + 1));
403                        if let Some(comment_str) = comment {
404                            let doc_on_column_key = AvroDocOn {
405                                identifier: DocOnIdentifier::Column(ColumnName {
406                                    relation: full_resolved_name.clone(),
407                                    column: ResolvedColumnReference::Column {
408                                        name: column_name.to_owned(),
409                                        index: pos,
410                                    },
411                                }),
412                                for_schema,
413                            };
414                            if !user_provided_comments.contains(&doc_on_column_key) {
415                                options.push(CsrConfigOption {
416                                    name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
417                                    value: Some(mz_sql_parser::ast::WithOptionValue::Value(
418                                        Value::String(comment_str.clone()),
419                                    )),
420                                });
421                            }
422                        }
423                    }
424                }
425            }
426        }
427    }
428
429    Ok(())
430}
431
432/// Checks that the sink described in the statement can connect to its external
433/// resources.
434///
435/// We must not leave any state behind in the Kafka broker, so just ensure that
436/// we can connect. This means we don't ensure that we can create the topic and
437/// introduces TOCTOU errors, but creating an inoperable sink is infinitely
438/// preferable to leaking state in users' environments.
439async fn purify_create_sink(
440    catalog: impl SessionCatalog,
441    mut create_sink_stmt: CreateSinkStatement<Aug>,
442    storage_configuration: &StorageConfiguration,
443) -> Result<PurifiedStatement, PlanError> {
444    // General purification
445    let CreateSinkStatement {
446        connection,
447        format,
448        with_options,
449        name: _,
450        in_cluster: _,
451        if_not_exists: _,
452        from,
453        envelope: _,
454    } = &mut create_sink_stmt;
455
456    // The list of options that the user is allowed to specify.
457    const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
458
459    if let Some(op) = with_options
460        .iter()
461        .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
462    {
463        sql_bail!(
464            "CREATE SINK...WITH ({}..) is not allowed",
465            op.name.to_ast_string_simple(),
466        )
467    }
468
469    match &connection {
470        CreateSinkConnection::Kafka {
471            connection,
472            options,
473            key: _,
474            headers: _,
475        } => {
476            let scx = StatementContext::new(None, &catalog);
477            let connection = {
478                let item = scx.get_item_by_resolved_name(connection)?;
479                // Get Kafka connection
480                match item.connection()? {
481                    Connection::Kafka(connection) => {
482                        connection.clone().into_inline_connection(scx.catalog)
483                    }
484                    _ => sql_bail!(
485                        "{} is not a kafka connection",
486                        scx.catalog.resolve_full_name(item.name())
487                    ),
488                }
489            };
490
491            let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
492
493            if extracted_options.legacy_ids == Some(true) {
494                sql_bail!("LEGACY IDs option is not supported");
495            }
496
497            let client: AdminClient<_> = connection
498                .create_with_context(
499                    storage_configuration,
500                    MzClientContext::default(),
501                    &BTreeMap::new(),
502                    InTask::No,
503                )
504                .await
505                .map_err(|e| {
506                    // anyhow doesn't support Clone, so not trivial to move into PlanError
507                    KafkaSinkPurificationError::AdminClientError(Arc::new(e))
508                })?;
509
510            let metadata = client
511                .inner()
512                .fetch_metadata(
513                    None,
514                    storage_configuration
515                        .parameters
516                        .kafka_timeout_config
517                        .fetch_metadata_timeout,
518                )
519                .map_err(|e| {
520                    KafkaSinkPurificationError::AdminClientError(Arc::new(
521                        ContextCreationError::KafkaError(e),
522                    ))
523                })?;
524
525            if metadata.brokers().len() == 0 {
526                Err(KafkaSinkPurificationError::ZeroBrokers)?;
527            }
528        }
529    }
530
531    let mut csr_connection_ids = BTreeSet::new();
532    for_each_format(format, |_, fmt| match fmt {
533        Format::Avro(AvroSchema::InlineSchema { .. })
534        | Format::Bytes
535        | Format::Csv { .. }
536        | Format::Json { .. }
537        | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
538        | Format::Regex(..)
539        | Format::Text => (),
540        Format::Avro(AvroSchema::Csr {
541            csr_connection: CsrConnectionAvro { connection, .. },
542        })
543        | Format::Protobuf(ProtobufSchema::Csr {
544            csr_connection: CsrConnectionProtobuf { connection, .. },
545        }) => {
546            csr_connection_ids.insert(*connection.connection.item_id());
547        }
548    });
549
550    let scx = StatementContext::new(None, &catalog);
551    for csr_connection_id in csr_connection_ids {
552        let connection = {
553            let item = scx.get_item(&csr_connection_id);
554            // Get Kafka connection
555            match item.connection()? {
556                Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
557                _ => Err(CsrPurificationError::NotCsrConnection(
558                    scx.catalog.resolve_full_name(item.name()),
559                ))?,
560            }
561        };
562
563        let client = connection
564            .connect(storage_configuration, InTask::No)
565            .await
566            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
567
568        client
569            .list_subjects()
570            .await
571            .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
572    }
573
574    purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
575
576    Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
577}
578
579/// Runs a function on each format within the format specifier.
580///
581/// The function is provided with a `DocOnSchema` that indicates whether the
582/// format is for the key, value, or both.
583///
584// TODO(benesch): rename `DocOnSchema` to the more general `FormatRestriction`.
585fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
586where
587    F: FnMut(DocOnSchema, &'a mut Format<Aug>),
588{
589    match format {
590        None => (),
591        Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
592        Some(FormatSpecifier::KeyValue { key, value }) => {
593            f(DocOnSchema::KeyOnly, key);
594            f(DocOnSchema::ValueOnly, value);
595        }
596    }
597}
598
599/// Defines whether purification should enforce that at least one valid source
600/// reference is provided on the provided statement.
601#[derive(Debug, Copy, Clone, PartialEq, Eq)]
602pub(crate) enum SourceReferencePolicy {
603    /// Don't allow source references to be provided. This is used for
604    /// enforcing that `CREATE SOURCE` statements don't create subsources.
605    NotAllowed,
606    /// Allow empty references, such as when creating a source that
607    /// will have tables added afterwards.
608    Optional,
609    /// Require that at least one reference is resolved to an upstream
610    /// object.
611    Required,
612}
613
614async fn purify_create_source(
615    catalog: impl SessionCatalog,
616    now: u64,
617    mut create_source_stmt: CreateSourceStatement<Aug>,
618    storage_configuration: &StorageConfiguration,
619) -> Result<PurifiedStatement, PlanError> {
620    let CreateSourceStatement {
621        name: source_name,
622        connection: source_connection,
623        format,
624        envelope,
625        include_metadata,
626        external_references,
627        progress_subsource,
628        ..
629    } = &mut create_source_stmt;
630
631    if let Some(DeferredItemName::Named(_)) = progress_subsource {
632        sql_bail!("Cannot manually ID qualify progress subsource")
633    }
634
635    let mut requested_subsource_map = BTreeMap::new();
636
637    let progress_desc = match &source_connection {
638        CreateSourceConnection::Kafka { .. } => {
639            &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
640        }
641        CreateSourceConnection::Postgres { .. } | CreateSourceConnection::Yugabyte { .. } => {
642            &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
643        }
644        CreateSourceConnection::SqlServer { .. } => {
645            &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
646        }
647        CreateSourceConnection::MySql { .. } => {
648            &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
649        }
650        CreateSourceConnection::LoadGenerator { .. } => {
651            &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
652        }
653    };
654    let scx = StatementContext::new(None, &catalog);
655
656    // Depending on if the user must or can use the `CREATE TABLE .. FROM SOURCE` statement
657    // to add tables after this source is created we might need to enforce that
658    // auto-generated subsources are created or not created by this source statement.
659    let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
660        && scx.catalog.system_vars().force_source_table_syntax()
661    {
662        SourceReferencePolicy::NotAllowed
663    } else if scx.catalog.system_vars().enable_create_table_from_source() {
664        SourceReferencePolicy::Optional
665    } else {
666        SourceReferencePolicy::Required
667    };
668
669    let mut format_options = SourceFormatOptions::Default;
670
671    let retrieved_source_references: RetrievedSourceReferences;
672
673    match source_connection {
674        CreateSourceConnection::Kafka {
675            connection,
676            options: base_with_options,
677            ..
678        } => {
679            if let Some(external_references) = external_references {
680                Err(KafkaSourcePurificationError::ReferencedSubsources(
681                    external_references.clone(),
682                ))?;
683            }
684
685            let connection = {
686                let item = scx.get_item_by_resolved_name(connection)?;
687                // Get Kafka connection
688                match item.connection()? {
689                    Connection::Kafka(connection) => {
690                        connection.clone().into_inline_connection(&catalog)
691                    }
692                    _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
693                        scx.catalog.resolve_full_name(item.name()),
694                    ))?,
695                }
696            };
697
698            let extracted_options: KafkaSourceConfigOptionExtracted =
699                base_with_options.clone().try_into()?;
700
701            let topic = extracted_options
702                .topic
703                .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
704
705            let consumer = connection
706                .create_with_context(
707                    storage_configuration,
708                    MzClientContext::default(),
709                    &BTreeMap::new(),
710                    InTask::No,
711                )
712                .await
713                .map_err(|e| {
714                    // anyhow doesn't support Clone, so not trivial to move into PlanError
715                    KafkaSourcePurificationError::KafkaConsumerError(
716                        e.display_with_causes().to_string(),
717                    )
718                })?;
719            let consumer = Arc::new(consumer);
720
721            match (
722                extracted_options.start_offset,
723                extracted_options.start_timestamp,
724            ) {
725                (None, None) => {
726                    // Validate that the topic at least exists.
727                    kafka_util::ensure_topic_exists(
728                        Arc::clone(&consumer),
729                        &topic,
730                        storage_configuration
731                            .parameters
732                            .kafka_timeout_config
733                            .fetch_metadata_timeout,
734                    )
735                    .await?;
736                }
737                (Some(_), Some(_)) => {
738                    sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
739                }
740                (Some(start_offsets), None) => {
741                    // Validate the start offsets.
742                    kafka_util::validate_start_offsets(
743                        Arc::clone(&consumer),
744                        &topic,
745                        start_offsets,
746                        storage_configuration
747                            .parameters
748                            .kafka_timeout_config
749                            .fetch_metadata_timeout,
750                    )
751                    .await?;
752                }
753                (None, Some(time_offset)) => {
754                    // Translate `START TIMESTAMP` to a start offset.
755                    let start_offsets = kafka_util::lookup_start_offsets(
756                        Arc::clone(&consumer),
757                        &topic,
758                        time_offset,
759                        now,
760                        storage_configuration
761                            .parameters
762                            .kafka_timeout_config
763                            .fetch_metadata_timeout,
764                    )
765                    .await?;
766
767                    base_with_options.retain(|val| {
768                        !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
769                    });
770                    base_with_options.push(KafkaSourceConfigOption {
771                        name: KafkaSourceConfigOptionName::StartOffset,
772                        value: Some(WithOptionValue::Sequence(
773                            start_offsets
774                                .iter()
775                                .map(|offset| {
776                                    WithOptionValue::Value(Value::Number(offset.to_string()))
777                                })
778                                .collect(),
779                        )),
780                    });
781                }
782            }
783
784            let reference_client = SourceReferenceClient::Kafka { topic: &topic };
785            retrieved_source_references = reference_client.get_source_references().await?;
786
787            format_options = SourceFormatOptions::Kafka { topic };
788        }
789        source_connection @ CreateSourceConnection::Postgres { .. }
790        | source_connection @ CreateSourceConnection::Yugabyte { .. } => {
791            let (source_flavor, connection, options) = match source_connection {
792                CreateSourceConnection::Postgres {
793                    connection,
794                    options,
795                } => (PostgresFlavor::Vanilla, connection, options),
796                CreateSourceConnection::Yugabyte {
797                    connection,
798                    options,
799                } => (PostgresFlavor::Yugabyte, connection, options),
800                _ => unreachable!(),
801            };
802            let connection = {
803                let item = scx.get_item_by_resolved_name(connection)?;
804                match item.connection().map_err(PlanError::from)? {
805                    Connection::Postgres(connection) => {
806                        let connection = connection.clone().into_inline_connection(&catalog);
807                        if connection.flavor != source_flavor {
808                            match source_flavor {
809                                PostgresFlavor::Vanilla => {
810                                    Err(PgSourcePurificationError::NotPgConnection(
811                                        scx.catalog.resolve_full_name(item.name()),
812                                    ))
813                                }
814                                PostgresFlavor::Yugabyte => {
815                                    Err(PgSourcePurificationError::NotYugabyteConnection(
816                                        scx.catalog.resolve_full_name(item.name()),
817                                    ))
818                                }
819                            }
820                        } else {
821                            Ok(connection)
822                        }
823                    }
824                    _ => match source_flavor {
825                        PostgresFlavor::Vanilla => Err(PgSourcePurificationError::NotPgConnection(
826                            scx.catalog.resolve_full_name(item.name()),
827                        )),
828                        PostgresFlavor::Yugabyte => {
829                            Err(PgSourcePurificationError::NotYugabyteConnection(
830                                scx.catalog.resolve_full_name(item.name()),
831                            ))
832                        }
833                    },
834                }
835            }?;
836            let crate::plan::statement::PgConfigOptionExtracted {
837                publication,
838                text_columns,
839                details,
840                ..
841            } = options.clone().try_into()?;
842            let publication =
843                publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
844
845            if details.is_some() {
846                Err(PgSourcePurificationError::UserSpecifiedDetails)?;
847            }
848
849            // verify that we can connect upstream and snapshot publication metadata
850            let config = connection
851                .config(
852                    &storage_configuration.connection_context.secrets_reader,
853                    storage_configuration,
854                    InTask::No,
855                )
856                .await?;
857
858            let client = config
859                .connect(
860                    "postgres_purification",
861                    &storage_configuration.connection_context.ssh_tunnel_manager,
862                )
863                .await?;
864
865            let wal_level = mz_postgres_util::get_wal_level(&client).await?;
866
867            if wal_level < WalLevel::Logical {
868                Err(PgSourcePurificationError::InsufficientWalLevel { wal_level })?;
869            }
870
871            let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
872
873            if max_wal_senders < 1 {
874                Err(PgSourcePurificationError::ReplicationDisabled)?;
875            }
876
877            let available_replication_slots =
878                mz_postgres_util::available_replication_slots(&client).await?;
879
880            // We need 1 replication slot for the snapshots and 1 for the continuing replication
881            if available_replication_slots < 2 {
882                Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 2 })?;
883            }
884
885            let reference_client = SourceReferenceClient::Postgres {
886                client: &client,
887                publication: &publication,
888                database: &connection.database,
889            };
890            retrieved_source_references = reference_client.get_source_references().await?;
891
892            let postgres::PurifiedSourceExports {
893                source_exports: subsources,
894                normalized_text_columns,
895            } = postgres::purify_source_exports(
896                &client,
897                &config,
898                &retrieved_source_references,
899                external_references,
900                text_columns,
901                source_name,
902                &reference_policy,
903            )
904            .await?;
905
906            if let Some(text_cols_option) = options
907                .iter_mut()
908                .find(|option| option.name == PgConfigOptionName::TextColumns)
909            {
910                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
911            }
912
913            requested_subsource_map.extend(subsources);
914
915            // Record the active replication timeline_id to allow detection of a future upstream
916            // point-in-time-recovery that will put the source into an error state.
917            let replication_client = config
918                .connect_replication(&storage_configuration.connection_context.ssh_tunnel_manager)
919                .await?;
920            let timeline_id = mz_postgres_util::get_timeline_id(&replication_client).await?;
921
922            // Remove any old detail references
923            options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
924            let details = PostgresSourcePublicationDetails {
925                slot: format!(
926                    "materialize_{}",
927                    Uuid::new_v4().to_string().replace('-', "")
928                ),
929                timeline_id: Some(timeline_id),
930                database: connection.database,
931            };
932            options.push(PgConfigOption {
933                name: PgConfigOptionName::Details,
934                value: Some(WithOptionValue::Value(Value::String(hex::encode(
935                    details.into_proto().encode_to_vec(),
936                )))),
937            })
938        }
939        CreateSourceConnection::SqlServer {
940            connection,
941            options,
942        } => {
943            scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
944
945            // Connect to the upstream SQL Server instance so we can validate
946            // we're compatible with CDC.
947            let connection_item = scx.get_item_by_resolved_name(connection)?;
948            let connection = match connection_item.connection()? {
949                Connection::SqlServer(connection) => {
950                    connection.clone().into_inline_connection(&catalog)
951                }
952                _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
953                    scx.catalog.resolve_full_name(connection_item.name()),
954                ))?,
955            };
956            let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
957                details,
958                text_columns,
959                exclude_columns,
960                seen: _,
961            } = options.clone().try_into()?;
962
963            if details.is_some() {
964                Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
965            }
966
967            let config = connection
968                .resolve_config(
969                    &storage_configuration.connection_context.secrets_reader,
970                    storage_configuration,
971                    InTask::No,
972                )
973                .await?;
974            let (mut client, conn) = mz_sql_server_util::Client::connect(config).await?;
975            // TODO(sql_server1): Create a version of the client where we don't need to separately
976            // spawn this task.
977            mz_ore::task::spawn(|| "sql-server-conn", async move { conn.await });
978
979            // Ensure the upstream SQL Server instance is configured to allow CDC.
980            //
981            // Run all of the checks necessary and collect the errors to provide the best
982            // guidance as to which system settings need to be enabled.
983            let mut replication_errors = vec![];
984            for error in [
985                mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
986                mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
987            ] {
988                match error {
989                    Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
990                        name,
991                        expected,
992                        actual,
993                    }) => replication_errors.push((name, expected, actual)),
994                    Err(other) => Err(other)?,
995                    Ok(()) => (),
996                }
997            }
998            if !replication_errors.is_empty() {
999                Err(SqlServerSourcePurificationError::ReplicationSettingsError(
1000                    replication_errors,
1001                ))?;
1002            }
1003
1004            // We've validated that CDC is configured for the system, now let's
1005            // purify the individual exports (i.e. subsources).
1006            let database: Arc<str> = connection.database.into();
1007            let reference_client = SourceReferenceClient::SqlServer {
1008                client: &mut client,
1009                database: Arc::clone(&database),
1010            };
1011            retrieved_source_references = reference_client.get_source_references().await?;
1012            tracing::debug!(?retrieved_source_references, "got source references");
1013
1014            let purified_source_exports = sql_server::purify_source_exports(
1015                &*database,
1016                &mut client,
1017                &retrieved_source_references,
1018                external_references,
1019                &text_columns,
1020                &exclude_columns,
1021                source_name,
1022                &reference_policy,
1023            )
1024            .await?;
1025
1026            let sql_server::PurifiedSourceExports {
1027                source_exports,
1028                normalized_text_columns,
1029                normalized_excl_columns,
1030            } = purified_source_exports;
1031
1032            // Update our set of requested source exports.
1033            requested_subsource_map.extend(source_exports);
1034
1035            // Reset the 'DETAILS' to ensure they're empty, we don't use them
1036            // in the SQL Server source.
1037            let details = SqlServerSourceExtras {};
1038            options.retain(|SqlServerConfigOption { name, .. }| {
1039                name != &SqlServerConfigOptionName::Details
1040            });
1041            options.push(SqlServerConfigOption {
1042                name: SqlServerConfigOptionName::Details,
1043                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1044                    details.into_proto().encode_to_vec(),
1045                )))),
1046            });
1047
1048            // Update our 'TEXT' and 'EXCLUDE' column options with the purified and normalized set.
1049            if let Some(text_cols_option) = options
1050                .iter_mut()
1051                .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1052            {
1053                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1054            }
1055            if let Some(excl_cols_option) = options
1056                .iter_mut()
1057                .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1058            {
1059                excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1060            }
1061        }
1062        CreateSourceConnection::MySql {
1063            connection,
1064            options,
1065        } => {
1066            let connection_item = scx.get_item_by_resolved_name(connection)?;
1067            let connection = match connection_item.connection()? {
1068                Connection::MySql(connection) => {
1069                    connection.clone().into_inline_connection(&catalog)
1070                }
1071                _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1072                    scx.catalog.resolve_full_name(connection_item.name()),
1073                ))?,
1074            };
1075            let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1076                details,
1077                text_columns,
1078                exclude_columns,
1079                seen: _,
1080            } = options.clone().try_into()?;
1081
1082            if details.is_some() {
1083                Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1084            }
1085
1086            let config = connection
1087                .config(
1088                    &storage_configuration.connection_context.secrets_reader,
1089                    storage_configuration,
1090                    InTask::No,
1091                )
1092                .await?;
1093
1094            let mut conn = config
1095                .connect(
1096                    "mysql purification",
1097                    &storage_configuration.connection_context.ssh_tunnel_manager,
1098                )
1099                .await?;
1100
1101            // Check if the MySQL database is configured to allow row-based consistent GTID replication
1102            let mut replication_errors = vec![];
1103            for error in [
1104                mz_mysql_util::ensure_gtid_consistency(&mut conn)
1105                    .await
1106                    .err(),
1107                mz_mysql_util::ensure_full_row_binlog_format(&mut conn)
1108                    .await
1109                    .err(),
1110                mz_mysql_util::ensure_replication_commit_order(&mut conn)
1111                    .await
1112                    .err(),
1113            ] {
1114                match error {
1115                    Some(mz_mysql_util::MySqlError::InvalidSystemSetting {
1116                        setting,
1117                        expected,
1118                        actual,
1119                    }) => {
1120                        replication_errors.push((setting, expected, actual));
1121                    }
1122                    Some(err) => Err(err)?,
1123                    None => (),
1124                }
1125            }
1126            if !replication_errors.is_empty() {
1127                Err(MySqlSourcePurificationError::ReplicationSettingsError(
1128                    replication_errors,
1129                ))?;
1130            }
1131
1132            // Retrieve the current @gtid_executed value of the server to mark as the effective
1133            // initial snapshot point such that we can ensure consistency if the initial source
1134            // snapshot is broken up over multiple points in time.
1135            let initial_gtid_set =
1136                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1137
1138            let reference_client = SourceReferenceClient::MySql {
1139                conn: &mut conn,
1140                include_system_schemas: mysql::references_system_schemas(external_references),
1141            };
1142            retrieved_source_references = reference_client.get_source_references().await?;
1143
1144            let mysql::PurifiedSourceExports {
1145                source_exports: subsources,
1146                normalized_text_columns,
1147                normalized_exclude_columns,
1148            } = mysql::purify_source_exports(
1149                &mut conn,
1150                &retrieved_source_references,
1151                external_references,
1152                text_columns,
1153                exclude_columns,
1154                source_name,
1155                initial_gtid_set.clone(),
1156                &reference_policy,
1157            )
1158            .await?;
1159            requested_subsource_map.extend(subsources);
1160
1161            // We don't have any fields in this details struct but keep this around for
1162            // conformity with postgres and in-case we end up needing it again in the future.
1163            let details = MySqlSourceDetails {};
1164            // Update options with the purified details
1165            options
1166                .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1167            options.push(MySqlConfigOption {
1168                name: MySqlConfigOptionName::Details,
1169                value: Some(WithOptionValue::Value(Value::String(hex::encode(
1170                    details.into_proto().encode_to_vec(),
1171                )))),
1172            });
1173
1174            if let Some(text_cols_option) = options
1175                .iter_mut()
1176                .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1177            {
1178                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1179            }
1180            if let Some(ignore_cols_option) = options
1181                .iter_mut()
1182                .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1183            {
1184                ignore_cols_option.value =
1185                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1186            }
1187        }
1188        CreateSourceConnection::LoadGenerator { generator, options } => {
1189            let load_generator =
1190                load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1191
1192            let reference_client = SourceReferenceClient::LoadGenerator {
1193                generator: &load_generator,
1194            };
1195            retrieved_source_references = reference_client.get_source_references().await?;
1196            // Filter to the references that need to be created as 'subsources', which
1197            // doesn't include the default output for single-output sources.
1198            // TODO(database-issues#8620): Remove once subsources are removed
1199            let subsource_references = retrieved_source_references
1200                .all_references()
1201                .iter()
1202                .filter(|r| {
1203                    r.load_generator_output().expect("is loadgen") != &LoadGeneratorOutput::Default
1204                })
1205                .collect::<Vec<_>>();
1206
1207            match external_references {
1208                Some(requested)
1209                    if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1210                {
1211                    Err(PlanError::UseTablesForSources(requested.to_string()))?
1212                }
1213                Some(requested) => {
1214                    let requested_exports = retrieved_source_references
1215                        .requested_source_exports(Some(requested), source_name)?;
1216                    for export in requested_exports {
1217                        requested_subsource_map.insert(
1218                            export.name,
1219                            PurifiedSourceExport {
1220                                external_reference: export.external_reference,
1221                                details: PurifiedExportDetails::LoadGenerator {
1222                                    table: export
1223                                        .meta
1224                                        .load_generator_desc()
1225                                        .expect("is loadgen")
1226                                        .clone(),
1227                                    output: export
1228                                        .meta
1229                                        .load_generator_output()
1230                                        .expect("is loadgen")
1231                                        .clone(),
1232                                },
1233                            },
1234                        );
1235                    }
1236                }
1237                None => {
1238                    if matches!(reference_policy, SourceReferencePolicy::Required)
1239                        && !subsource_references.is_empty()
1240                    {
1241                        Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1242                    }
1243                }
1244            }
1245
1246            if let LoadGenerator::Clock = generator {
1247                if !options
1248                    .iter()
1249                    .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1250                {
1251                    let now = catalog.now();
1252                    options.push(LoadGeneratorOption {
1253                        name: LoadGeneratorOptionName::AsOf,
1254                        value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1255                    });
1256                }
1257            }
1258        }
1259    }
1260
1261    // Now that we know which subsources to create alongside this
1262    // statement, remove the references so it is not canonicalized as
1263    // part of the `CREATE SOURCE` statement in the catalog.
1264    *external_references = None;
1265
1266    // Generate progress subsource
1267
1268    // Take name from input or generate name
1269    let name = match progress_subsource {
1270        Some(name) => match name {
1271            DeferredItemName::Deferred(name) => name.clone(),
1272            DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1273        },
1274        None => {
1275            let (item, prefix) = source_name.0.split_last().unwrap();
1276            let item_name = Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1277                let mut suggested_name = prefix.to_vec();
1278                suggested_name.push(candidate.clone());
1279
1280                let partial = normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1281                let qualified = scx.allocate_qualified_name(partial)?;
1282                let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1283                let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1284                Ok::<_, PlanError>(!item_exists && !type_exists)
1285            })?;
1286
1287            let mut full_name = prefix.to_vec();
1288            full_name.push(item_name);
1289            let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1290            let qualified_name = scx.allocate_qualified_name(full_name)?;
1291            let full_name = scx.catalog.resolve_full_name(&qualified_name);
1292
1293            UnresolvedItemName::from(full_name.clone())
1294        }
1295    };
1296
1297    let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1298
1299    // Create the subsource statement
1300    let create_progress_subsource_stmt = CreateSubsourceStatement {
1301        name,
1302        columns,
1303        // Progress subsources do not refer to the source to which they belong.
1304        // Instead the primary source depends on it (the opposite is true of
1305        // ingestion exports, which depend on the primary source).
1306        of_source: None,
1307        constraints,
1308        if_not_exists: false,
1309        with_options: vec![CreateSubsourceOption {
1310            name: CreateSubsourceOptionName::Progress,
1311            value: Some(WithOptionValue::Value(Value::Boolean(true))),
1312        }],
1313    };
1314
1315    purify_source_format(
1316        &catalog,
1317        format,
1318        &format_options,
1319        envelope,
1320        storage_configuration,
1321    )
1322    .await?;
1323
1324    Ok(PurifiedStatement::PurifiedCreateSource {
1325        create_progress_subsource_stmt,
1326        create_source_stmt,
1327        subsources: requested_subsource_map,
1328        available_source_references: retrieved_source_references.available_source_references(),
1329    })
1330}
1331
1332/// On success, returns the details on new subsources and updated
1333/// 'options' that sequencing expects for handling `ALTER SOURCE` statements.
1334async fn purify_alter_source(
1335    catalog: impl SessionCatalog,
1336    stmt: AlterSourceStatement<Aug>,
1337    storage_configuration: &StorageConfiguration,
1338) -> Result<PurifiedStatement, PlanError> {
1339    let scx = StatementContext::new(None, &catalog);
1340    let AlterSourceStatement {
1341        source_name: unresolved_source_name,
1342        action,
1343        if_exists,
1344    } = stmt;
1345
1346    // Get name.
1347    let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1348        Ok(item) => item,
1349        Err(_) if if_exists => {
1350            return Ok(PurifiedStatement::PurifiedAlterSource {
1351                alter_source_stmt: AlterSourceStatement {
1352                    source_name: unresolved_source_name,
1353                    action,
1354                    if_exists,
1355                },
1356            });
1357        }
1358        Err(e) => return Err(e),
1359    };
1360
1361    // Ensure it's an ingestion-based and alterable source.
1362    let desc = match item.source_desc()? {
1363        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1364        None => {
1365            sql_bail!("cannot ALTER this type of source")
1366        }
1367    };
1368
1369    let source_name = item.name();
1370
1371    let resolved_source_name = ResolvedItemName::Item {
1372        id: item.id(),
1373        qualifiers: item.name().qualifiers.clone(),
1374        full_name: scx.catalog.resolve_full_name(source_name),
1375        print_id: true,
1376        version: RelationVersionSelector::Latest,
1377    };
1378
1379    let partial_name = scx.catalog.minimal_qualification(source_name);
1380
1381    match action {
1382        AlterSourceAction::AddSubsources {
1383            external_references,
1384            options,
1385        } => {
1386            if scx.catalog.system_vars().enable_create_table_from_source()
1387                && scx.catalog.system_vars().force_source_table_syntax()
1388            {
1389                Err(PlanError::UseTablesForSources(
1390                    "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1391                ))?;
1392            }
1393
1394            purify_alter_source_add_subsources(
1395                external_references,
1396                options,
1397                desc,
1398                partial_name,
1399                unresolved_source_name,
1400                resolved_source_name,
1401                storage_configuration,
1402            )
1403            .await
1404        }
1405        AlterSourceAction::RefreshReferences => {
1406            purify_alter_source_refresh_references(
1407                desc,
1408                resolved_source_name,
1409                storage_configuration,
1410            )
1411            .await
1412        }
1413        _ => Ok(PurifiedStatement::PurifiedAlterSource {
1414            alter_source_stmt: AlterSourceStatement {
1415                source_name: unresolved_source_name,
1416                action,
1417                if_exists,
1418            },
1419        }),
1420    }
1421}
1422
1423// TODO(database-issues#8620): Remove once subsources are removed
1424/// Equivalent to `purify_create_source` but for `AlterSourceStatement`.
1425async fn purify_alter_source_add_subsources(
1426    external_references: Vec<ExternalReferenceExport>,
1427    mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1428    desc: SourceDesc,
1429    partial_source_name: PartialItemName,
1430    unresolved_source_name: UnresolvedItemName,
1431    resolved_source_name: ResolvedItemName,
1432    storage_configuration: &StorageConfiguration,
1433) -> Result<PurifiedStatement, PlanError> {
1434    // Validate this is a source that can have subsources added.
1435    match desc.connection {
1436        GenericSourceConnection::Postgres(PostgresSourceConnection {
1437            connection:
1438                PostgresConnection {
1439                    flavor: PostgresFlavor::Vanilla,
1440                    ..
1441                },
1442            ..
1443        }) => {}
1444        GenericSourceConnection::MySql(_) => {}
1445        GenericSourceConnection::SqlServer(_) => {}
1446        _ => sql_bail!(
1447            "source {} does not support ALTER SOURCE.",
1448            partial_source_name
1449        ),
1450    };
1451
1452    let connection_name = desc.connection.name();
1453
1454    let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1455        text_columns,
1456        exclude_columns,
1457        details,
1458        seen: _,
1459    } = options.clone().try_into()?;
1460    assert_none!(details, "details cannot be explicitly set");
1461
1462    let mut requested_subsource_map = BTreeMap::new();
1463
1464    match desc.connection {
1465        GenericSourceConnection::Postgres(pg_source_connection) => {
1466            // Get PostgresConnection for generating subsources.
1467            let pg_connection = &pg_source_connection.connection;
1468
1469            let config = pg_connection
1470                .config(
1471                    &storage_configuration.connection_context.secrets_reader,
1472                    storage_configuration,
1473                    InTask::No,
1474                )
1475                .await?;
1476
1477            let client = config
1478                .connect(
1479                    "postgres_purification",
1480                    &storage_configuration.connection_context.ssh_tunnel_manager,
1481                )
1482                .await?;
1483
1484            let available_replication_slots =
1485                mz_postgres_util::available_replication_slots(&client).await?;
1486
1487            // We need 1 additional replication slot for the snapshots
1488            if available_replication_slots < 1 {
1489                Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
1490            }
1491
1492            if !exclude_columns.is_empty() {
1493                sql_bail!(
1494                    "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1495                    partial_source_name,
1496                    connection_name
1497                )
1498            }
1499
1500            let reference_client = SourceReferenceClient::Postgres {
1501                client: &client,
1502                publication: &pg_source_connection.publication,
1503                database: &pg_connection.database,
1504            };
1505            let retrieved_source_references = reference_client.get_source_references().await?;
1506
1507            let postgres::PurifiedSourceExports {
1508                source_exports: subsources,
1509                normalized_text_columns,
1510            } = postgres::purify_source_exports(
1511                &client,
1512                &config,
1513                &retrieved_source_references,
1514                &Some(ExternalReferences::SubsetTables(external_references)),
1515                text_columns,
1516                &unresolved_source_name,
1517                &SourceReferencePolicy::Required,
1518            )
1519            .await?;
1520
1521            if let Some(text_cols_option) = options
1522                .iter_mut()
1523                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1524            {
1525                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1526            }
1527
1528            requested_subsource_map.extend(subsources);
1529        }
1530        GenericSourceConnection::MySql(mysql_source_connection) => {
1531            let mysql_connection = &mysql_source_connection.connection;
1532            let config = mysql_connection
1533                .config(
1534                    &storage_configuration.connection_context.secrets_reader,
1535                    storage_configuration,
1536                    InTask::No,
1537                )
1538                .await?;
1539
1540            let mut conn = config
1541                .connect(
1542                    "mysql purification",
1543                    &storage_configuration.connection_context.ssh_tunnel_manager,
1544                )
1545                .await?;
1546
1547            // Retrieve the current @gtid_executed value of the server to mark as the effective
1548            // initial snapshot point for these subsources.
1549            let initial_gtid_set =
1550                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1551
1552            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1553
1554            let reference_client = SourceReferenceClient::MySql {
1555                conn: &mut conn,
1556                include_system_schemas: mysql::references_system_schemas(&requested_references),
1557            };
1558            let retrieved_source_references = reference_client.get_source_references().await?;
1559
1560            let mysql::PurifiedSourceExports {
1561                source_exports: subsources,
1562                normalized_text_columns,
1563                normalized_exclude_columns,
1564            } = mysql::purify_source_exports(
1565                &mut conn,
1566                &retrieved_source_references,
1567                &requested_references,
1568                text_columns,
1569                exclude_columns,
1570                &unresolved_source_name,
1571                initial_gtid_set,
1572                &SourceReferencePolicy::Required,
1573            )
1574            .await?;
1575            requested_subsource_map.extend(subsources);
1576
1577            // Update options with the purified details
1578            if let Some(text_cols_option) = options
1579                .iter_mut()
1580                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1581            {
1582                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1583            }
1584            if let Some(ignore_cols_option) = options
1585                .iter_mut()
1586                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1587            {
1588                ignore_cols_option.value =
1589                    Some(WithOptionValue::Sequence(normalized_exclude_columns));
1590            }
1591        }
1592        GenericSourceConnection::SqlServer(sql_server_source) => {
1593            // Open a connection to the upstream SQL Server instance.
1594            let sql_server_connection = &sql_server_source.connection;
1595            let config = sql_server_connection
1596                .resolve_config(
1597                    &storage_configuration.connection_context.secrets_reader,
1598                    storage_configuration,
1599                    InTask::No,
1600                )
1601                .await?;
1602            let (mut client, conn) = mz_sql_server_util::Client::connect(config).await?;
1603            // TODO(sql_server1): Move the spawning of this task into the SQL Server client.
1604            mz_ore::task::spawn(|| "sql-server-connection", async move {
1605                conn.await;
1606            });
1607
1608            // Query the upstream SQL Server instance for available tables to replicate.
1609            let database = sql_server_connection.database.clone().into();
1610            let source_references = SourceReferenceClient::SqlServer {
1611                client: &mut client,
1612                database: Arc::clone(&database),
1613            }
1614            .get_source_references()
1615            .await?;
1616            let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1617
1618            let result = sql_server::purify_source_exports(
1619                &*database,
1620                &mut client,
1621                &source_references,
1622                &requested_references,
1623                &text_columns,
1624                &exclude_columns,
1625                &unresolved_source_name,
1626                &SourceReferencePolicy::Required,
1627            )
1628            .await;
1629            let sql_server::PurifiedSourceExports {
1630                source_exports,
1631                normalized_text_columns,
1632                normalized_excl_columns,
1633            } = result?;
1634
1635            // Add the new exports to our subsource map.
1636            requested_subsource_map.extend(source_exports);
1637
1638            // Update options on the CREATE SOURCE statement with the purified details.
1639            if let Some(text_cols_option) = options
1640                .iter_mut()
1641                .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1642            {
1643                text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1644            }
1645            if let Some(ignore_cols_option) = options
1646                .iter_mut()
1647                .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1648            {
1649                ignore_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1650            }
1651        }
1652        _ => unreachable!(),
1653    };
1654
1655    Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1656        source_name: resolved_source_name,
1657        options,
1658        subsources: requested_subsource_map,
1659    })
1660}
1661
1662async fn purify_alter_source_refresh_references(
1663    desc: SourceDesc,
1664    resolved_source_name: ResolvedItemName,
1665    storage_configuration: &StorageConfiguration,
1666) -> Result<PurifiedStatement, PlanError> {
1667    let retrieved_source_references = match desc.connection {
1668        GenericSourceConnection::Postgres(pg_source_connection) => {
1669            // Get PostgresConnection for generating subsources.
1670            let pg_connection = &pg_source_connection.connection;
1671
1672            let config = pg_connection
1673                .config(
1674                    &storage_configuration.connection_context.secrets_reader,
1675                    storage_configuration,
1676                    InTask::No,
1677                )
1678                .await?;
1679
1680            let client = config
1681                .connect(
1682                    "postgres_purification",
1683                    &storage_configuration.connection_context.ssh_tunnel_manager,
1684                )
1685                .await?;
1686            let reference_client = SourceReferenceClient::Postgres {
1687                client: &client,
1688                publication: &pg_source_connection.publication,
1689                database: &pg_connection.database,
1690            };
1691            reference_client.get_source_references().await?
1692        }
1693        GenericSourceConnection::MySql(mysql_source_connection) => {
1694            let mysql_connection = &mysql_source_connection.connection;
1695            let config = mysql_connection
1696                .config(
1697                    &storage_configuration.connection_context.secrets_reader,
1698                    storage_configuration,
1699                    InTask::No,
1700                )
1701                .await?;
1702
1703            let mut conn = config
1704                .connect(
1705                    "mysql purification",
1706                    &storage_configuration.connection_context.ssh_tunnel_manager,
1707                )
1708                .await?;
1709
1710            let reference_client = SourceReferenceClient::MySql {
1711                conn: &mut conn,
1712                include_system_schemas: false,
1713            };
1714            reference_client.get_source_references().await?
1715        }
1716        GenericSourceConnection::SqlServer(sql_server_source) => {
1717            // Open a connection to the upstream SQL Server instance.
1718            let sql_server_connection = &sql_server_source.connection;
1719            let config = sql_server_connection
1720                .resolve_config(
1721                    &storage_configuration.connection_context.secrets_reader,
1722                    storage_configuration,
1723                    InTask::No,
1724                )
1725                .await?;
1726            let (mut client, conn) = mz_sql_server_util::Client::connect(config).await?;
1727            // TODO(sql_server1): Move the spawning of this task into the SQL Server client.
1728            mz_ore::task::spawn(|| "sql-server-connection", async move {
1729                conn.await;
1730            });
1731
1732            // Query the upstream SQL Server instance for available tables to replicate.
1733            let source_references = SourceReferenceClient::SqlServer {
1734                client: &mut client,
1735                database: sql_server_connection.database.clone().into(),
1736            }
1737            .get_source_references()
1738            .await?;
1739            source_references
1740        }
1741        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1742            let reference_client = SourceReferenceClient::LoadGenerator {
1743                generator: &load_gen_connection.load_generator,
1744            };
1745            reference_client.get_source_references().await?
1746        }
1747        GenericSourceConnection::Kafka(kafka_conn) => {
1748            let reference_client = SourceReferenceClient::Kafka {
1749                topic: &kafka_conn.topic,
1750            };
1751            reference_client.get_source_references().await?
1752        }
1753    };
1754    Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1755        source_name: resolved_source_name,
1756        available_source_references: retrieved_source_references.available_source_references(),
1757    })
1758}
1759
1760async fn purify_create_table_from_source(
1761    catalog: impl SessionCatalog,
1762    mut stmt: CreateTableFromSourceStatement<Aug>,
1763    storage_configuration: &StorageConfiguration,
1764) -> Result<PurifiedStatement, PlanError> {
1765    let scx = StatementContext::new(None, &catalog);
1766    let CreateTableFromSourceStatement {
1767        name: _,
1768        columns,
1769        constraints,
1770        source: source_name,
1771        if_not_exists: _,
1772        external_reference,
1773        format,
1774        envelope,
1775        include_metadata: _,
1776        with_options,
1777    } = &mut stmt;
1778
1779    // Columns and constraints cannot be specified by the user but will be populated below.
1780    if matches!(columns, TableFromSourceColumns::Defined(_)) {
1781        sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1782    }
1783    if !constraints.is_empty() {
1784        sql_bail!(
1785            "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1786        );
1787    }
1788
1789    // Get the source item
1790    let item = match scx.get_item_by_resolved_name(source_name) {
1791        Ok(item) => item,
1792        Err(e) => return Err(e),
1793    };
1794
1795    // Ensure it's an ingestion-based and alterable source.
1796    let desc = match item.source_desc()? {
1797        Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1798        None => {
1799            sql_bail!("cannot ALTER this type of source")
1800        }
1801    };
1802    let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1803    let qualified_source_name = item.name();
1804    let connection_name = desc.connection.name();
1805
1806    let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1807        text_columns,
1808        exclude_columns,
1809        details,
1810        partition_by: _,
1811        seen: _,
1812    } = with_options.clone().try_into()?;
1813    assert_none!(details, "details cannot be explicitly set");
1814
1815    // Our text column values are unqualified (just column names), but the purification methods below
1816    // expect to match the fully-qualified names against the full set of tables in upstream, so we
1817    // need to qualify them using the external reference first.
1818    let qualified_text_columns = text_columns
1819        .iter()
1820        .map(|col| {
1821            UnresolvedItemName(
1822                external_reference
1823                    .as_ref()
1824                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1825                    .unwrap_or_else(|| vec![col.clone()]),
1826            )
1827        })
1828        .collect_vec();
1829    let qualified_exclude_columns = exclude_columns
1830        .iter()
1831        .map(|col| {
1832            UnresolvedItemName(
1833                external_reference
1834                    .as_ref()
1835                    .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1836                    .unwrap_or_else(|| vec![col.clone()]),
1837            )
1838        })
1839        .collect_vec();
1840
1841    // Should be overriden below if a source-specific format is required.
1842    let mut format_options = SourceFormatOptions::Default;
1843
1844    let retrieved_source_references: RetrievedSourceReferences;
1845
1846    let requested_references = external_reference.as_ref().map(|ref_name| {
1847        ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1848            reference: ref_name.clone(),
1849            alias: None,
1850        }])
1851    });
1852
1853    // Run purification work specific to each source type: resolve the external reference to
1854    // a fully qualified name and obtain the appropriate details for the source-export statement
1855    let purified_export = match desc.connection {
1856        GenericSourceConnection::Postgres(pg_source_connection) => {
1857            // Get PostgresConnection for generating subsources.
1858            let pg_connection = &pg_source_connection.connection;
1859
1860            let config = pg_connection
1861                .config(
1862                    &storage_configuration.connection_context.secrets_reader,
1863                    storage_configuration,
1864                    InTask::No,
1865                )
1866                .await?;
1867
1868            let client = config
1869                .connect(
1870                    "postgres_purification",
1871                    &storage_configuration.connection_context.ssh_tunnel_manager,
1872                )
1873                .await?;
1874
1875            let available_replication_slots =
1876                mz_postgres_util::available_replication_slots(&client).await?;
1877
1878            // We need 1 additional replication slot for the snapshots
1879            if available_replication_slots < 1 {
1880                Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
1881            }
1882
1883            // TODO(roshan): Add support for PG sources to allow this
1884            if !exclude_columns.is_empty() {
1885                sql_bail!(
1886                    "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1887                    scx.catalog.minimal_qualification(qualified_source_name),
1888                    connection_name
1889                )
1890            }
1891
1892            let reference_client = SourceReferenceClient::Postgres {
1893                client: &client,
1894                publication: &pg_source_connection.publication,
1895                database: &pg_connection.database,
1896            };
1897            retrieved_source_references = reference_client.get_source_references().await?;
1898
1899            let postgres::PurifiedSourceExports {
1900                source_exports,
1901                // TODO(database-issues#8620): Remove once subsources are removed
1902                // This `normalized_text_columns` is not relevant for us and is only returned for
1903                // `CREATE SOURCE` statements that automatically generate subsources
1904                normalized_text_columns: _,
1905            } = postgres::purify_source_exports(
1906                &client,
1907                &config,
1908                &retrieved_source_references,
1909                &requested_references,
1910                qualified_text_columns,
1911                &unresolved_source_name,
1912                &SourceReferencePolicy::Required,
1913            )
1914            .await?;
1915            // There should be exactly one source_export returned for this statement
1916            let (_, purified_export) = source_exports.into_iter().next().unwrap();
1917            purified_export
1918        }
1919        GenericSourceConnection::MySql(mysql_source_connection) => {
1920            let mysql_connection = &mysql_source_connection.connection;
1921            let config = mysql_connection
1922                .config(
1923                    &storage_configuration.connection_context.secrets_reader,
1924                    storage_configuration,
1925                    InTask::No,
1926                )
1927                .await?;
1928
1929            let mut conn = config
1930                .connect(
1931                    "mysql purification",
1932                    &storage_configuration.connection_context.ssh_tunnel_manager,
1933                )
1934                .await?;
1935
1936            // Retrieve the current @gtid_executed value of the server to mark as the effective
1937            // initial snapshot point for this table.
1938            let initial_gtid_set =
1939                mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1940
1941            let reference_client = SourceReferenceClient::MySql {
1942                conn: &mut conn,
1943                include_system_schemas: mysql::references_system_schemas(&requested_references),
1944            };
1945            retrieved_source_references = reference_client.get_source_references().await?;
1946
1947            let mysql::PurifiedSourceExports {
1948                source_exports,
1949                // TODO(database-issues#8620): Remove once subsources are removed
1950                // `normalized_text/exclude_columns` is not relevant for us and is only returned for
1951                // `CREATE SOURCE` statements that automatically generate subsources
1952                normalized_text_columns: _,
1953                normalized_exclude_columns: _,
1954            } = mysql::purify_source_exports(
1955                &mut conn,
1956                &retrieved_source_references,
1957                &requested_references,
1958                qualified_text_columns,
1959                qualified_exclude_columns,
1960                &unresolved_source_name,
1961                initial_gtid_set,
1962                &SourceReferencePolicy::Required,
1963            )
1964            .await?;
1965            // There should be exactly one source_export returned for this statement
1966            let (_, purified_export) = source_exports.into_iter().next().unwrap();
1967            purified_export
1968        }
1969        GenericSourceConnection::SqlServer(_sql_server_source) => {
1970            // TODO(sql_server1): Support CREATE TABLE ... FROM SOURCE.
1971            return Err(PlanError::Unsupported {
1972                feature: "CREATE TABLE ... FROM SQL SERVER SOURCE".to_string(),
1973                discussion_no: None,
1974            });
1975        }
1976        GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1977            let reference_client = SourceReferenceClient::LoadGenerator {
1978                generator: &load_gen_connection.load_generator,
1979            };
1980            retrieved_source_references = reference_client.get_source_references().await?;
1981
1982            let requested_exports = retrieved_source_references
1983                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1984            // There should be exactly one source_export returned
1985            let export = requested_exports.into_iter().next().unwrap();
1986            PurifiedSourceExport {
1987                external_reference: export.external_reference,
1988                details: PurifiedExportDetails::LoadGenerator {
1989                    table: export
1990                        .meta
1991                        .load_generator_desc()
1992                        .expect("is loadgen")
1993                        .clone(),
1994                    output: export
1995                        .meta
1996                        .load_generator_output()
1997                        .expect("is loadgen")
1998                        .clone(),
1999                },
2000            }
2001        }
2002        GenericSourceConnection::Kafka(kafka_conn) => {
2003            let reference_client = SourceReferenceClient::Kafka {
2004                topic: &kafka_conn.topic,
2005            };
2006            retrieved_source_references = reference_client.get_source_references().await?;
2007            let requested_exports = retrieved_source_references
2008                .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2009            // There should be exactly one source_export returned
2010            let export = requested_exports.into_iter().next().unwrap();
2011
2012            format_options = SourceFormatOptions::Kafka {
2013                topic: kafka_conn.topic.clone(),
2014            };
2015            PurifiedSourceExport {
2016                external_reference: export.external_reference,
2017                details: PurifiedExportDetails::Kafka {},
2018            }
2019        }
2020    };
2021
2022    purify_source_format(
2023        &catalog,
2024        format,
2025        &format_options,
2026        envelope,
2027        storage_configuration,
2028    )
2029    .await?;
2030
2031    // Update the external reference in the statement to the resolved fully-qualified
2032    // external reference
2033    *external_reference = Some(purified_export.external_reference.clone());
2034
2035    // Update options in the statement using the purified export details
2036    match &purified_export.details {
2037        PurifiedExportDetails::Postgres { .. } => {
2038            let mut unsupported_cols = vec![];
2039            let postgres::PostgresExportStatementValues {
2040                columns: gen_columns,
2041                constraints: gen_constraints,
2042                text_columns: gen_text_columns,
2043                details: gen_details,
2044                external_reference: _,
2045            } = postgres::generate_source_export_statement_values(
2046                &scx,
2047                purified_export,
2048                &mut unsupported_cols,
2049            )?;
2050            if !unsupported_cols.is_empty() {
2051                unsupported_cols.sort();
2052                Err(PgSourcePurificationError::UnrecognizedTypes {
2053                    cols: unsupported_cols,
2054                })?;
2055            }
2056
2057            if let Some(text_cols_option) = with_options
2058                .iter_mut()
2059                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2060            {
2061                if let Some(gen_text_columns) = gen_text_columns {
2062                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2063                } else {
2064                    soft_panic_or_log!(
2065                        "text_columns should be Some if text_cols_option is present"
2066                    );
2067                }
2068            }
2069            match columns {
2070                TableFromSourceColumns::Defined(_) => unreachable!(),
2071                TableFromSourceColumns::NotSpecified => {
2072                    *columns = TableFromSourceColumns::Defined(gen_columns);
2073                    *constraints = gen_constraints;
2074                }
2075                TableFromSourceColumns::Named(_) => {
2076                    sql_bail!("columns cannot be named for Postgres sources")
2077                }
2078            }
2079            with_options.push(TableFromSourceOption {
2080                name: TableFromSourceOptionName::Details,
2081                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2082                    gen_details.into_proto().encode_to_vec(),
2083                )))),
2084            })
2085        }
2086        PurifiedExportDetails::MySql { .. } => {
2087            let mysql::MySqlExportStatementValues {
2088                columns: gen_columns,
2089                constraints: gen_constraints,
2090                text_columns: gen_text_columns,
2091                exclude_columns: gen_exclude_columns,
2092                details: gen_details,
2093                external_reference: _,
2094            } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2095
2096            if let Some(text_cols_option) = with_options
2097                .iter_mut()
2098                .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2099            {
2100                if let Some(gen_text_columns) = gen_text_columns {
2101                    text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2102                } else {
2103                    soft_panic_or_log!(
2104                        "text_columns should be Some if text_cols_option is present"
2105                    );
2106                }
2107            }
2108            if let Some(ignore_cols_option) = with_options
2109                .iter_mut()
2110                .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2111            {
2112                if let Some(gen_exclude_columns) = gen_exclude_columns {
2113                    ignore_cols_option.value = Some(WithOptionValue::Sequence(gen_exclude_columns));
2114                } else {
2115                    soft_panic_or_log!(
2116                        "text_columns should be Some if ignore_cols_option is present"
2117                    );
2118                }
2119            }
2120            match columns {
2121                TableFromSourceColumns::Defined(_) => unreachable!(),
2122                TableFromSourceColumns::NotSpecified => {
2123                    *columns = TableFromSourceColumns::Defined(gen_columns);
2124                    *constraints = gen_constraints;
2125                }
2126                TableFromSourceColumns::Named(_) => {
2127                    sql_bail!("columns cannot be named for MySQL sources")
2128                }
2129            }
2130            with_options.push(TableFromSourceOption {
2131                name: TableFromSourceOptionName::Details,
2132                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2133                    gen_details.into_proto().encode_to_vec(),
2134                )))),
2135            })
2136        }
2137        PurifiedExportDetails::SqlServer { .. } => {
2138            // TODO(sql_server1): Support CREATE TABLE ... FROM SOURCE.
2139            return Err(PlanError::Unsupported {
2140                feature: "CREATE TABLE ... FROM SQL SERVER SOURCE".to_string(),
2141                discussion_no: None,
2142            });
2143        }
2144        PurifiedExportDetails::LoadGenerator { .. } => {
2145            let (desc, output) = match purified_export.details {
2146                PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2147                _ => unreachable!("purified export details must be load generator"),
2148            };
2149            // We only determine the table description for multi-output load generator sources here,
2150            // whereas single-output load generators will have their relation description
2151            // determined during statment planning as envelope and format options may affect their
2152            // schema.
2153            if let Some(desc) = desc {
2154                let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2155                match columns {
2156                    TableFromSourceColumns::Defined(_) => unreachable!(),
2157                    TableFromSourceColumns::NotSpecified => {
2158                        *columns = TableFromSourceColumns::Defined(gen_columns);
2159                        *constraints = gen_constraints;
2160                    }
2161                    TableFromSourceColumns::Named(_) => {
2162                        sql_bail!("columns cannot be named for multi-output load generator sources")
2163                    }
2164                }
2165            }
2166            let details = SourceExportStatementDetails::LoadGenerator { output };
2167            with_options.push(TableFromSourceOption {
2168                name: TableFromSourceOptionName::Details,
2169                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2170                    details.into_proto().encode_to_vec(),
2171                )))),
2172            })
2173        }
2174        PurifiedExportDetails::Kafka {} => {
2175            // NOTE: Kafka tables have their 'schemas' purified into the statement inside the
2176            // format field, so we don't specify any columns or constraints to be stored
2177            // on the statement here. The RelationDesc will be determined during planning.
2178            let details = SourceExportStatementDetails::Kafka {};
2179            with_options.push(TableFromSourceOption {
2180                name: TableFromSourceOptionName::Details,
2181                value: Some(WithOptionValue::Value(Value::String(hex::encode(
2182                    details.into_proto().encode_to_vec(),
2183                )))),
2184            })
2185        }
2186    };
2187
2188    // TODO: We might as well use the retrieved available references to update the source
2189    // available references table in the catalog, so plumb this through.
2190    // available_source_references: retrieved_source_references.available_source_references(),
2191    Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2192}
2193
2194enum SourceFormatOptions {
2195    Default,
2196    Kafka { topic: String },
2197}
2198
2199async fn purify_source_format(
2200    catalog: &dyn SessionCatalog,
2201    format: &mut Option<FormatSpecifier<Aug>>,
2202    options: &SourceFormatOptions,
2203    envelope: &Option<SourceEnvelope>,
2204    storage_configuration: &StorageConfiguration,
2205) -> Result<(), PlanError> {
2206    if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2207        && !matches!(options, SourceFormatOptions::Kafka { .. })
2208    {
2209        sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2210    }
2211
2212    match format.as_mut() {
2213        None => {}
2214        Some(FormatSpecifier::Bare(format)) => {
2215            purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2216                .await?;
2217        }
2218
2219        Some(FormatSpecifier::KeyValue { key, value: val }) => {
2220            purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2221                .await?;
2222            purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2223                .await?;
2224        }
2225    }
2226    Ok(())
2227}
2228
2229async fn purify_source_format_single(
2230    catalog: &dyn SessionCatalog,
2231    format: &mut Format<Aug>,
2232    options: &SourceFormatOptions,
2233    envelope: &Option<SourceEnvelope>,
2234    storage_configuration: &StorageConfiguration,
2235) -> Result<(), PlanError> {
2236    match format {
2237        Format::Avro(schema) => match schema {
2238            AvroSchema::Csr { csr_connection } => {
2239                purify_csr_connection_avro(
2240                    catalog,
2241                    options,
2242                    csr_connection,
2243                    envelope,
2244                    storage_configuration,
2245                )
2246                .await?
2247            }
2248            AvroSchema::InlineSchema { .. } => {}
2249        },
2250        Format::Protobuf(schema) => match schema {
2251            ProtobufSchema::Csr { csr_connection } => {
2252                purify_csr_connection_proto(
2253                    catalog,
2254                    options,
2255                    csr_connection,
2256                    envelope,
2257                    storage_configuration,
2258                )
2259                .await?;
2260            }
2261            ProtobufSchema::InlineSchema { .. } => {}
2262        },
2263        Format::Bytes
2264        | Format::Regex(_)
2265        | Format::Json { .. }
2266        | Format::Text
2267        | Format::Csv { .. } => (),
2268    }
2269    Ok(())
2270}
2271
2272pub fn generate_subsource_statements(
2273    scx: &StatementContext,
2274    source_name: ResolvedItemName,
2275    subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2276) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2277    // get the first subsource to determine the connection type
2278    if subsources.is_empty() {
2279        return Ok(vec![]);
2280    }
2281    let (_, purified_export) = subsources.iter().next().unwrap();
2282
2283    let statements = match &purified_export.details {
2284        PurifiedExportDetails::Postgres { .. } => {
2285            crate::pure::postgres::generate_create_subsource_statements(
2286                scx,
2287                source_name,
2288                subsources,
2289            )?
2290        }
2291        PurifiedExportDetails::MySql { .. } => {
2292            crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2293        }
2294        PurifiedExportDetails::SqlServer { .. } => {
2295            crate::pure::sql_server::generate_create_subsource_statements(
2296                scx,
2297                source_name,
2298                subsources,
2299            )?
2300        }
2301        PurifiedExportDetails::LoadGenerator { .. } => {
2302            let mut subsource_stmts = Vec::with_capacity(subsources.len());
2303            for (subsource_name, purified_export) in subsources {
2304                let (desc, output) = match purified_export.details {
2305                    PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2306                    _ => unreachable!("purified export details must be load generator"),
2307                };
2308                let desc =
2309                    desc.expect("subsources cannot be generated for single-output load generators");
2310
2311                let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2312                let details = SourceExportStatementDetails::LoadGenerator { output };
2313                // Create the subsource statement
2314                let subsource = CreateSubsourceStatement {
2315                    name: subsource_name,
2316                    columns,
2317                    of_source: Some(source_name.clone()),
2318                    // unlike sources that come from an external upstream, we
2319                    // have more leniency to introduce different constraints
2320                    // every time the load generator is run; i.e. we are not as
2321                    // worried about introducing junk data.
2322                    constraints: table_constraints,
2323                    if_not_exists: false,
2324                    with_options: vec![
2325                        CreateSubsourceOption {
2326                            name: CreateSubsourceOptionName::ExternalReference,
2327                            value: Some(WithOptionValue::UnresolvedItemName(
2328                                purified_export.external_reference,
2329                            )),
2330                        },
2331                        CreateSubsourceOption {
2332                            name: CreateSubsourceOptionName::Details,
2333                            value: Some(WithOptionValue::Value(Value::String(hex::encode(
2334                                details.into_proto().encode_to_vec(),
2335                            )))),
2336                        },
2337                    ],
2338                };
2339                subsource_stmts.push(subsource);
2340            }
2341
2342            subsource_stmts
2343        }
2344        PurifiedExportDetails::Kafka { .. } => {
2345            // TODO: as part of database-issues#8322, Kafka sources will begin
2346            // producing data––we'll need to understand the schema
2347            // of the output here.
2348            assert!(
2349                subsources.is_empty(),
2350                "Kafka sources do not produce data-bearing subsources"
2351            );
2352            vec![]
2353        }
2354    };
2355    Ok(statements)
2356}
2357
2358async fn purify_csr_connection_proto(
2359    catalog: &dyn SessionCatalog,
2360    options: &SourceFormatOptions,
2361    csr_connection: &mut CsrConnectionProtobuf<Aug>,
2362    envelope: &Option<SourceEnvelope>,
2363    storage_configuration: &StorageConfiguration,
2364) -> Result<(), PlanError> {
2365    let SourceFormatOptions::Kafka { topic } = options else {
2366        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2367    };
2368
2369    let CsrConnectionProtobuf {
2370        seed,
2371        connection: CsrConnection {
2372            connection,
2373            options: _,
2374        },
2375    } = csr_connection;
2376    match seed {
2377        None => {
2378            let scx = StatementContext::new(None, &*catalog);
2379
2380            let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2381                Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2382                _ => sql_bail!("{} is not a schema registry connection", connection),
2383            };
2384
2385            let ccsr_client = ccsr_connection
2386                .connect(storage_configuration, InTask::No)
2387                .await
2388                .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2389
2390            let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2391            let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2392                .await
2393                .ok();
2394
2395            if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2396                sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2397            }
2398
2399            *seed = Some(CsrSeedProtobuf { value, key });
2400        }
2401        Some(_) => (),
2402    }
2403
2404    Ok(())
2405}
2406
2407async fn purify_csr_connection_avro(
2408    catalog: &dyn SessionCatalog,
2409    options: &SourceFormatOptions,
2410    csr_connection: &mut CsrConnectionAvro<Aug>,
2411    envelope: &Option<SourceEnvelope>,
2412    storage_configuration: &StorageConfiguration,
2413) -> Result<(), PlanError> {
2414    let SourceFormatOptions::Kafka { topic } = options else {
2415        sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2416    };
2417
2418    let CsrConnectionAvro {
2419        connection: CsrConnection { connection, .. },
2420        seed,
2421        key_strategy,
2422        value_strategy,
2423    } = csr_connection;
2424    if seed.is_none() {
2425        let scx = StatementContext::new(None, &*catalog);
2426        let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2427            Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2428            _ => sql_bail!("{} is not a schema registry connection", connection),
2429        };
2430        let ccsr_client = csr_connection
2431            .connect(storage_configuration, InTask::No)
2432            .await
2433            .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2434
2435        let Schema {
2436            key_schema,
2437            value_schema,
2438        } = get_remote_csr_schema(
2439            &ccsr_client,
2440            key_strategy.clone().unwrap_or_default(),
2441            value_strategy.clone().unwrap_or_default(),
2442            topic,
2443        )
2444        .await?;
2445        if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2446            sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2447        }
2448
2449        *seed = Some(CsrSeedAvro {
2450            key_schema,
2451            value_schema,
2452        })
2453    }
2454
2455    Ok(())
2456}
2457
2458#[derive(Debug)]
2459pub struct Schema {
2460    pub key_schema: Option<String>,
2461    pub value_schema: String,
2462}
2463
2464async fn get_schema_with_strategy(
2465    client: &Client,
2466    strategy: ReaderSchemaSelectionStrategy,
2467    subject: &str,
2468) -> Result<Option<String>, PlanError> {
2469    match strategy {
2470        ReaderSchemaSelectionStrategy::Latest => {
2471            match client.get_schema_by_subject(subject).await {
2472                Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2473                Err(GetBySubjectError::SubjectNotFound)
2474                | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2475                Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2476                    schema_lookup: format!("subject {}", subject.quoted()),
2477                    cause: Arc::new(e),
2478                }),
2479            }
2480        }
2481        ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2482        ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2483            Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2484            Err(GetByIdError::SchemaNotFound) => Ok(None),
2485            Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2486                schema_lookup: format!("ID {}", id),
2487                cause: Arc::new(e),
2488            }),
2489        },
2490    }
2491}
2492
2493async fn get_remote_csr_schema(
2494    ccsr_client: &mz_ccsr::Client,
2495    key_strategy: ReaderSchemaSelectionStrategy,
2496    value_strategy: ReaderSchemaSelectionStrategy,
2497    topic: &str,
2498) -> Result<Schema, PlanError> {
2499    let value_schema_name = format!("{}-value", topic);
2500    let value_schema =
2501        get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2502    let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2503    let subject = format!("{}-key", topic);
2504    let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2505    Ok(Schema {
2506        key_schema,
2507        value_schema,
2508    })
2509}
2510
2511/// Collect protobuf message descriptor from CSR and compile the descriptor.
2512async fn compile_proto(
2513    subject_name: &String,
2514    ccsr_client: &Client,
2515) -> Result<CsrSeedProtobufSchema, PlanError> {
2516    let (primary_subject, dependency_subjects) = ccsr_client
2517        .get_subject_and_references(subject_name)
2518        .await
2519        .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2520            schema_lookup: format!("subject {}", subject_name.quoted()),
2521            cause: Arc::new(e),
2522        })?;
2523
2524    // Compile .proto files into a file descriptor set.
2525    let mut source_tree = VirtualSourceTree::new();
2526    for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2527        source_tree.as_mut().add_file(
2528            Path::new(&subject.name),
2529            subject.schema.raw.as_bytes().to_vec(),
2530        );
2531    }
2532    let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2533    let fds = db
2534        .as_mut()
2535        .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2536        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2537
2538    // Ensure there is exactly one message in the file.
2539    let primary_fd = fds.file(0);
2540    let message_name = match primary_fd.message_type_size() {
2541        1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2542        0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2543        _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2544    };
2545
2546    // Encode the file descriptor set into a SQL byte string.
2547    let bytes = &fds
2548        .serialize()
2549        .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2550    let mut schema = String::new();
2551    strconv::format_bytes(&mut schema, bytes);
2552
2553    Ok(CsrSeedProtobufSchema {
2554        schema,
2555        message_name,
2556    })
2557}
2558
2559const MZ_NOW_NAME: &str = "mz_now";
2560const MZ_NOW_SCHEMA: &str = "mz_catalog";
2561
2562/// Purifies a CREATE MATERIALIZED VIEW statement. Additionally, it adjusts `resolved_ids` if
2563/// references to ids appear or disappear during the purification.
2564///
2565/// Note that in contrast with [`purify_statement`], this doesn't need to be async, because
2566/// this function is not making any network calls.
2567pub fn purify_create_materialized_view_options(
2568    catalog: impl SessionCatalog,
2569    mz_now: Option<Timestamp>,
2570    cmvs: &mut CreateMaterializedViewStatement<Aug>,
2571    resolved_ids: &mut ResolvedIds,
2572) {
2573    // 0. Preparations:
2574    // Prepare an expression that calls `mz_now()`, which we can insert in various later steps.
2575    let (mz_now_id, mz_now_expr) = {
2576        let item = catalog
2577            .resolve_function(&PartialItemName {
2578                database: None,
2579                schema: Some(MZ_NOW_SCHEMA.to_string()),
2580                item: MZ_NOW_NAME.to_string(),
2581            })
2582            .expect("we should be able to resolve mz_now");
2583        (
2584            item.id(),
2585            Expr::Function(Function {
2586                name: ResolvedItemName::Item {
2587                    id: item.id(),
2588                    qualifiers: item.name().qualifiers.clone(),
2589                    full_name: catalog.resolve_full_name(item.name()),
2590                    print_id: false,
2591                    version: RelationVersionSelector::Latest,
2592                },
2593                args: FunctionArgs::Args {
2594                    args: Vec::new(),
2595                    order_by: Vec::new(),
2596                },
2597                filter: None,
2598                over: None,
2599                distinct: false,
2600            }),
2601        )
2602    };
2603    // Prepare the `mz_timestamp` type.
2604    let (mz_timestamp_id, mz_timestamp_type) = {
2605        let item = catalog.get_system_type("mz_timestamp");
2606        let full_name = catalog.resolve_full_name(item.name());
2607        (
2608            item.id(),
2609            ResolvedDataType::Named {
2610                id: item.id(),
2611                qualifiers: item.name().qualifiers.clone(),
2612                full_name,
2613                modifiers: vec![],
2614                print_id: true,
2615            },
2616        )
2617    };
2618
2619    let mut introduced_mz_timestamp = false;
2620
2621    for option in cmvs.with_options.iter_mut() {
2622        // 1. Purify `REFRESH AT CREATION` to `REFRESH AT mz_now()`.
2623        if matches!(
2624            option.value,
2625            Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2626        ) {
2627            option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2628                RefreshAtOptionValue {
2629                    time: mz_now_expr.clone(),
2630                },
2631            )));
2632        }
2633
2634        // 2. If `REFRESH EVERY` doesn't have an `ALIGNED TO`, then add `ALIGNED TO mz_now()`.
2635        if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2636            RefreshEveryOptionValue { aligned_to, .. },
2637        ))) = &mut option.value
2638        {
2639            if aligned_to.is_none() {
2640                *aligned_to = Some(mz_now_expr.clone());
2641            }
2642        }
2643
2644        // 3. Substitute `mz_now()` with the timestamp chosen for the CREATE MATERIALIZED VIEW
2645        // statement. (This has to happen after the above steps, which might introduce `mz_now()`.)
2646        match &mut option.value {
2647            Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2648                time,
2649            }))) => {
2650                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2651                visitor.visit_expr_mut(time);
2652                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2653            }
2654            Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2655                RefreshEveryOptionValue {
2656                    interval: _,
2657                    aligned_to: Some(aligned_to),
2658                },
2659            ))) => {
2660                let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2661                visitor.visit_expr_mut(aligned_to);
2662                introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2663            }
2664            _ => {}
2665        }
2666    }
2667
2668    // 4. If the user didn't give any REFRESH option, then default to ON COMMIT.
2669    if !cmvs.with_options.iter().any(|o| {
2670        matches!(
2671            o,
2672            MaterializedViewOption {
2673                value: Some(WithOptionValue::Refresh(..)),
2674                ..
2675            }
2676        )
2677    }) {
2678        cmvs.with_options.push(MaterializedViewOption {
2679            name: MaterializedViewOptionName::Refresh,
2680            value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2681        })
2682    }
2683
2684    // 5. Attend to `resolved_ids`: The purification might have
2685    // - added references to `mz_timestamp`;
2686    // - removed references to `mz_now`.
2687    if introduced_mz_timestamp {
2688        resolved_ids.add_item(mz_timestamp_id);
2689    }
2690    // Even though we always remove `mz_now()` from the `with_options`, there might be `mz_now()`
2691    // remaining in the main query expression of the MV, so let's visit the entire statement to look
2692    // for `mz_now()` everywhere.
2693    let mut visitor = ExprContainsTemporalVisitor::new();
2694    visitor.visit_create_materialized_view_statement(cmvs);
2695    if !visitor.contains_temporal {
2696        resolved_ids.remove_item(&mz_now_id);
2697    }
2698}
2699
2700/// Returns true if the [MaterializedViewOption] either already involves `mz_now()` or will involve
2701/// after purification.
2702pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2703    match &mvo.value {
2704        Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2705            let mut visitor = ExprContainsTemporalVisitor::new();
2706            visitor.visit_expr(time);
2707            visitor.contains_temporal
2708        }
2709        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2710            interval: _,
2711            aligned_to: Some(aligned_to),
2712        }))) => {
2713            let mut visitor = ExprContainsTemporalVisitor::new();
2714            visitor.visit_expr(aligned_to);
2715            visitor.contains_temporal
2716        }
2717        Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2718            interval: _,
2719            aligned_to: None,
2720        }))) => {
2721            // For a `REFRESH EVERY` without an `ALIGNED TO`, purification will default the
2722            // `ALIGNED TO` to `mz_now()`.
2723            true
2724        }
2725        Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2726            // `REFRESH AT CREATION` will be purified to `REFRESH AT mz_now()`.
2727            true
2728        }
2729        _ => false,
2730    }
2731}
2732
2733/// Determines whether the AST involves `mz_now()`.
2734struct ExprContainsTemporalVisitor {
2735    pub contains_temporal: bool,
2736}
2737
2738impl ExprContainsTemporalVisitor {
2739    pub fn new() -> ExprContainsTemporalVisitor {
2740        ExprContainsTemporalVisitor {
2741            contains_temporal: false,
2742        }
2743    }
2744}
2745
2746impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2747    fn visit_function(&mut self, func: &Function<Aug>) {
2748        self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2749        visit_function(self, func);
2750    }
2751}
2752
2753struct MzNowPurifierVisitor {
2754    pub mz_now: Option<Timestamp>,
2755    pub mz_timestamp_type: ResolvedDataType,
2756    pub introduced_mz_timestamp: bool,
2757}
2758
2759impl MzNowPurifierVisitor {
2760    pub fn new(
2761        mz_now: Option<Timestamp>,
2762        mz_timestamp_type: ResolvedDataType,
2763    ) -> MzNowPurifierVisitor {
2764        MzNowPurifierVisitor {
2765            mz_now,
2766            mz_timestamp_type,
2767            introduced_mz_timestamp: false,
2768        }
2769    }
2770}
2771
2772impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2773    fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2774        match expr {
2775            Expr::Function(Function {
2776                name:
2777                    ResolvedItemName::Item {
2778                        full_name: FullItemName { item, .. },
2779                        ..
2780                    },
2781                ..
2782            }) if item == &MZ_NOW_NAME.to_string() => {
2783                let mz_now = self.mz_now.expect(
2784                    "we should have chosen a timestamp if the expression contains mz_now()",
2785                );
2786                // We substitute `mz_now()` with number + a cast to `mz_timestamp`. The cast is to
2787                // not alter the type of the expression.
2788                *expr = Expr::Cast {
2789                    expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2790                    data_type: self.mz_timestamp_type.clone(),
2791                };
2792                self.introduced_mz_timestamp = true;
2793            }
2794            _ => visit_expr_mut(self, expr),
2795        }
2796    }
2797}