1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81 ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::pure::mysql::{ensure_binlog_full_metadata, is_binlog_full_metadata};
88use crate::{kafka_util, normalize};
89
90use self::error::{
91 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103 external_reference: UnresolvedItemName,
104 name: UnresolvedItemName,
105 meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 f.debug_struct("RequestedSourceExport")
111 .field("external_reference", &self.external_reference)
112 .field("name", &self.name)
113 .field("meta", &self.meta)
114 .finish()
115 }
116}
117
118impl<T> RequestedSourceExport<T> {
119 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120 RequestedSourceExport {
121 external_reference: self.external_reference,
122 name: self.name,
123 meta: new_meta,
124 }
125 }
126}
127
128fn source_export_name_gen(
133 source_name: &UnresolvedItemName,
134 subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137 partial.item = subsource_name.to_string();
138 Ok(UnresolvedItemName::from(partial))
139}
140
141fn validate_source_export_names<T>(
145 requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147 if let Some(name) = requested_source_exports
151 .iter()
152 .map(|subsource| &subsource.name)
153 .duplicates()
154 .next()
155 .cloned()
156 {
157 let mut upstream_references: Vec<_> = requested_source_exports
158 .into_iter()
159 .filter_map(|subsource| {
160 if &subsource.name == &name {
161 Some(subsource.external_reference.clone())
162 } else {
163 None
164 }
165 })
166 .collect();
167
168 upstream_references.sort();
169
170 Err(PlanError::SubsourceNameConflict {
171 name,
172 upstream_references,
173 })?;
174 }
175
176 if let Some(name) = requested_source_exports
182 .iter()
183 .map(|export| &export.external_reference)
184 .duplicates()
185 .next()
186 .cloned()
187 {
188 let mut target_names: Vec<_> = requested_source_exports
189 .into_iter()
190 .filter_map(|export| {
191 if &export.external_reference == &name {
192 Some(export.name.clone())
193 } else {
194 None
195 }
196 })
197 .collect();
198
199 target_names.sort();
200
201 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202 }
203
204 Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209 PurifiedCreateSource {
210 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212 create_source_stmt: CreateSourceStatement<Aug>,
213 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215 available_source_references: SourceReferences,
218 },
219 PurifiedAlterSource {
220 alter_source_stmt: AlterSourceStatement<Aug>,
221 },
222 PurifiedAlterSourceAddSubsources {
223 source_name: ResolvedItemName,
225 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230 },
231 PurifiedAlterSourceRefreshReferences {
232 source_name: ResolvedItemName,
233 available_source_references: SourceReferences,
235 },
236 PurifiedCreateSink(CreateSinkStatement<Aug>),
237 PurifiedCreateTableFromSource {
238 stmt: CreateTableFromSourceStatement<Aug>,
239 },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244 pub external_reference: UnresolvedItemName,
245 pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250 MySql {
251 table: MySqlTableDesc,
252 text_columns: Option<Vec<Ident>>,
253 exclude_columns: Option<Vec<Ident>>,
254 initial_gtid_set: String,
255 binlog_full_metadata: bool,
256 },
257 Postgres {
258 table: PostgresTableDesc,
259 text_columns: Option<Vec<Ident>>,
260 exclude_columns: Option<Vec<Ident>>,
261 },
262 SqlServer {
263 table: SqlServerTableDesc,
264 text_columns: Option<Vec<Ident>>,
265 excl_columns: Option<Vec<Ident>>,
266 capture_instance: Arc<str>,
267 initial_lsn: mz_sql_server_util::cdc::Lsn,
268 },
269 Kafka {},
270 LoadGenerator {
271 table: Option<RelationDesc>,
272 output: LoadGeneratorOutput,
273 },
274}
275
276pub async fn purify_statement(
286 catalog: impl SessionCatalog,
287 now: u64,
288 stmt: Statement<Aug>,
289 storage_configuration: &StorageConfiguration,
290) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
291 match stmt {
292 Statement::CreateSource(stmt) => {
293 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
294 (
295 purify_create_source(catalog, now, stmt, storage_configuration).await,
296 cluster_id,
297 )
298 }
299 Statement::AlterSource(stmt) => (
300 purify_alter_source(catalog, stmt, storage_configuration).await,
301 None,
302 ),
303 Statement::CreateSink(stmt) => {
304 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
305 (
306 purify_create_sink(catalog, stmt, storage_configuration).await,
307 cluster_id,
308 )
309 }
310 Statement::CreateTableFromSource(stmt) => (
311 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
312 None,
313 ),
314 o => (
315 Err(internal_err!(
316 "unexpected statement type in purification: {:?}",
317 o
318 )),
319 None,
320 ),
321 }
322}
323
324pub(crate) fn purify_create_sink_avro_doc_on_options(
329 catalog: &dyn SessionCatalog,
330 from_id: CatalogItemId,
331 format: &mut Option<FormatSpecifier<Aug>>,
332) -> Result<(), PlanError> {
333 let from = catalog.get_item(&from_id);
335 let object_ids = from
336 .references()
337 .items()
338 .copied()
339 .chain_one(from.id())
340 .collect::<Vec<_>>();
341
342 let mut avro_format_options = vec![];
345 for_each_format(format, |doc_on_schema, fmt| match fmt {
346 Format::Avro(AvroSchema::InlineSchema { .. })
347 | Format::Bytes
348 | Format::Csv { .. }
349 | Format::Json { .. }
350 | Format::Protobuf(..)
351 | Format::Regex(..)
352 | Format::Text => (),
353 Format::Avro(AvroSchema::Csr {
354 csr_connection: CsrConnectionAvro { connection, .. },
355 }) => {
356 avro_format_options.push((doc_on_schema, &mut connection.options));
357 }
358 });
359
360 for (for_schema, options) in avro_format_options {
363 let user_provided_comments = options
364 .iter()
365 .filter_map(|CsrConfigOption { name, .. }| match name {
366 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
367 _ => None,
368 })
369 .collect::<BTreeSet<_>>();
370
371 for object_id in &object_ids {
373 let item = catalog
375 .get_item(object_id)
376 .at_version(RelationVersionSelector::Latest);
377 let full_resolved_name = ResolvedItemName::Item {
378 id: *object_id,
379 qualifiers: item.name().qualifiers.clone(),
380 full_name: catalog.resolve_full_name(item.name()),
381 print_id: !matches!(
382 item.item_type(),
383 CatalogItemType::Func | CatalogItemType::Type
384 ),
385 version: RelationVersionSelector::Latest,
386 };
387
388 if let Some(comments_map) = catalog.get_item_comments(object_id) {
389 let doc_on_item_key = AvroDocOn {
392 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
393 for_schema,
394 };
395 if !user_provided_comments.contains(&doc_on_item_key) {
396 if let Some(root_comment) = comments_map.get(&None) {
397 options.push(CsrConfigOption {
398 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
399 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
400 root_comment.clone(),
401 ))),
402 });
403 }
404 }
405
406 let column_descs = match item.type_details() {
410 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
411 None => item.relation_desc().map(|d| d.into_owned()),
412 };
413
414 if let Some(desc) = column_descs {
415 for (pos, column_name) in desc.iter_names().enumerate() {
416 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
417 let doc_on_column_key = AvroDocOn {
418 identifier: DocOnIdentifier::Column(ColumnName {
419 relation: full_resolved_name.clone(),
420 column: ResolvedColumnReference::Column {
421 name: column_name.to_owned(),
422 index: pos,
423 },
424 }),
425 for_schema,
426 };
427 if !user_provided_comments.contains(&doc_on_column_key) {
428 options.push(CsrConfigOption {
429 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
430 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
431 Value::String(comment_str.clone()),
432 )),
433 });
434 }
435 }
436 }
437 }
438 }
439 }
440 }
441
442 Ok(())
443}
444
445async fn purify_create_sink(
448 catalog: impl SessionCatalog,
449 mut create_sink_stmt: CreateSinkStatement<Aug>,
450 storage_configuration: &StorageConfiguration,
451) -> Result<PurifiedStatement, PlanError> {
452 let CreateSinkStatement {
454 connection,
455 format,
456 with_options,
457 name: _,
458 in_cluster: _,
459 if_not_exists: _,
460 from,
461 envelope: _,
462 mode: _,
463 } = &mut create_sink_stmt;
464
465 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
467 CreateSinkOptionName::Snapshot,
468 CreateSinkOptionName::CommitInterval,
469 ];
470
471 if let Some(op) = with_options
472 .iter()
473 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
474 {
475 sql_bail!(
476 "CREATE SINK...WITH ({}..) is not allowed",
477 op.name.to_ast_string_simple(),
478 )
479 }
480
481 match &connection {
482 CreateSinkConnection::Kafka {
483 connection,
484 options,
485 key: _,
486 headers: _,
487 } => {
488 let scx = StatementContext::new(None, &catalog);
493 let connection = {
494 let item = scx.get_item_by_resolved_name(connection)?;
495 match item.connection()? {
497 Connection::Kafka(connection) => {
498 connection.clone().into_inline_connection(scx.catalog)
499 }
500 _ => sql_bail!(
501 "{} is not a kafka connection",
502 scx.catalog.resolve_full_name(item.name())
503 ),
504 }
505 };
506
507 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
508
509 if extracted_options.legacy_ids == Some(true) {
510 sql_bail!("LEGACY IDs option is not supported");
511 }
512
513 let client: AdminClient<_> = connection
514 .create_with_context(
515 storage_configuration,
516 MzClientContext::default(),
517 &BTreeMap::new(),
518 InTask::No,
519 )
520 .await
521 .map_err(|e| {
522 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
524 })?;
525
526 let metadata = client
527 .inner()
528 .fetch_metadata(
529 None,
530 storage_configuration
531 .parameters
532 .kafka_timeout_config
533 .fetch_metadata_timeout,
534 )
535 .map_err(|e| {
536 KafkaSinkPurificationError::AdminClientError(Arc::new(
537 ContextCreationError::KafkaError(e),
538 ))
539 })?;
540
541 if metadata.brokers().len() == 0 {
542 Err(KafkaSinkPurificationError::ZeroBrokers)?;
543 }
544 }
545 CreateSinkConnection::Iceberg {
546 connection,
547 aws_connection,
548 ..
549 } => {
550 let scx = StatementContext::new(None, &catalog);
551 let connection = {
552 let item = scx.get_item_by_resolved_name(connection)?;
553 match item.connection()? {
555 Connection::IcebergCatalog(connection) => {
556 connection.clone().into_inline_connection(scx.catalog)
557 }
558 _ => sql_bail!(
559 "{} is not an iceberg connection",
560 scx.catalog.resolve_full_name(item.name())
561 ),
562 }
563 };
564
565 let aws_conn_id = aws_connection.item_id();
566
567 let aws_connection = {
568 let item = scx.get_item_by_resolved_name(aws_connection)?;
569 match item.connection()? {
571 Connection::Aws(aws_connection) => aws_connection.clone(),
572 _ => sql_bail!(
573 "{} is not an aws connection",
574 scx.catalog.resolve_full_name(item.name())
575 ),
576 }
577 };
578
579 if let Some(s3tables) = connection.s3tables_catalog() {
583 let enable_region_check =
584 ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
585 if enable_region_check {
586 let env_id = &catalog.config().environment_id;
587 if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
588 let env_region = env_id.cloud_provider_region();
589 let s3_tables_region = s3tables
592 .aws_connection
593 .connection
594 .region
595 .clone()
596 .unwrap_or_else(|| "us-east-1".to_string());
597 if s3_tables_region != env_region {
598 Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
599 s3_tables_region,
600 environment_region: env_region.to_string(),
601 })?;
602 }
603 }
604 }
605 }
606
607 let _catalog = connection
608 .connect(storage_configuration, InTask::No)
609 .await
610 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
611
612 let _sdk_config = aws_connection
613 .load_sdk_config(
614 &storage_configuration.connection_context,
615 aws_conn_id.clone(),
616 InTask::No,
617 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
618 .get(storage_configuration.config_set()),
619 )
620 .await
621 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
622 }
623 }
624
625 let mut csr_connection_ids = BTreeSet::new();
626 for_each_format(format, |_, fmt| match fmt {
627 Format::Avro(AvroSchema::InlineSchema { .. })
628 | Format::Bytes
629 | Format::Csv { .. }
630 | Format::Json { .. }
631 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
632 | Format::Regex(..)
633 | Format::Text => (),
634 Format::Avro(AvroSchema::Csr {
635 csr_connection: CsrConnectionAvro { connection, .. },
636 })
637 | Format::Protobuf(ProtobufSchema::Csr {
638 csr_connection: CsrConnectionProtobuf { connection, .. },
639 }) => {
640 csr_connection_ids.insert(*connection.connection.item_id());
641 }
642 });
643
644 let scx = StatementContext::new(None, &catalog);
645 for csr_connection_id in csr_connection_ids {
646 let connection = {
647 let item = scx.get_item(&csr_connection_id);
648 match item.connection()? {
650 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
651 _ => Err(CsrPurificationError::NotCsrConnection(
652 scx.catalog.resolve_full_name(item.name()),
653 ))?,
654 }
655 };
656
657 let client = connection
658 .connect(storage_configuration, InTask::No)
659 .await
660 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
661
662 client
663 .list_subjects()
664 .await
665 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
666 }
667
668 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
669
670 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
671}
672
673fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
680where
681 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
682{
683 match format {
684 None => (),
685 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
686 Some(FormatSpecifier::KeyValue { key, value }) => {
687 f(DocOnSchema::KeyOnly, key);
688 f(DocOnSchema::ValueOnly, value);
689 }
690 }
691}
692
693#[derive(Debug, Copy, Clone, PartialEq, Eq)]
696pub(crate) enum SourceReferencePolicy {
697 NotAllowed,
700 Optional,
703 Required,
706}
707
708async fn purify_create_source(
709 catalog: impl SessionCatalog,
710 now: u64,
711 mut create_source_stmt: CreateSourceStatement<Aug>,
712 storage_configuration: &StorageConfiguration,
713) -> Result<PurifiedStatement, PlanError> {
714 let CreateSourceStatement {
715 name: source_name,
716 col_names,
717 key_constraint,
718 connection: source_connection,
719 format,
720 envelope,
721 include_metadata,
722 external_references,
723 progress_subsource,
724 with_options,
725 ..
726 } = &mut create_source_stmt;
727
728 let uses_old_syntax = !col_names.is_empty()
729 || key_constraint.is_some()
730 || format.is_some()
731 || envelope.is_some()
732 || !include_metadata.is_empty()
733 || external_references.is_some()
734 || progress_subsource.is_some();
735
736 if let Some(DeferredItemName::Named(_)) = progress_subsource {
737 sql_bail!("Cannot manually ID qualify progress subsource")
738 }
739
740 let mut requested_subsource_map = BTreeMap::new();
741
742 let progress_desc = match &source_connection {
743 CreateSourceConnection::Kafka { .. } => {
744 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
745 }
746 CreateSourceConnection::Postgres { .. } => {
747 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
748 }
749 CreateSourceConnection::SqlServer { .. } => {
750 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
751 }
752 CreateSourceConnection::MySql { .. } => {
753 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
754 }
755 CreateSourceConnection::LoadGenerator { .. } => {
756 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
757 }
758 };
759 let scx = StatementContext::new(None, &catalog);
760
761 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
765 && scx.catalog.system_vars().force_source_table_syntax()
766 {
767 SourceReferencePolicy::NotAllowed
768 } else if scx.catalog.system_vars().enable_create_table_from_source() {
769 SourceReferencePolicy::Optional
770 } else {
771 SourceReferencePolicy::Required
772 };
773
774 let mut format_options = SourceFormatOptions::Default;
775
776 let retrieved_source_references: RetrievedSourceReferences;
777
778 match source_connection {
779 CreateSourceConnection::Kafka {
780 connection,
781 options: base_with_options,
782 ..
783 } => {
784 if let Some(external_references) = external_references {
785 Err(KafkaSourcePurificationError::ReferencedSubsources(
786 external_references.clone(),
787 ))?;
788 }
789
790 let connection = {
791 let item = scx.get_item_by_resolved_name(connection)?;
792 match item.connection()? {
794 Connection::Kafka(connection) => {
795 connection.clone().into_inline_connection(&catalog)
796 }
797 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
798 scx.catalog.resolve_full_name(item.name()),
799 ))?,
800 }
801 };
802
803 let extracted_options: KafkaSourceConfigOptionExtracted =
804 base_with_options.clone().try_into()?;
805
806 let topic = extracted_options
807 .topic
808 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
809
810 let consumer = connection
811 .create_with_context(
812 storage_configuration,
813 MzClientContext::default(),
814 &BTreeMap::new(),
815 InTask::No,
816 )
817 .await
818 .map_err(|e| {
819 KafkaSourcePurificationError::KafkaConsumerError(
821 e.display_with_causes().to_string(),
822 )
823 })?;
824 let consumer = Arc::new(consumer);
825
826 match (
827 extracted_options.start_offset,
828 extracted_options.start_timestamp,
829 ) {
830 (None, None) => {
831 kafka_util::ensure_topic_exists(
833 Arc::clone(&consumer),
834 &topic,
835 storage_configuration
836 .parameters
837 .kafka_timeout_config
838 .fetch_metadata_timeout,
839 )
840 .await?;
841 }
842 (Some(_), Some(_)) => {
843 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
844 }
845 (Some(start_offsets), None) => {
846 kafka_util::validate_start_offsets(
848 Arc::clone(&consumer),
849 &topic,
850 start_offsets,
851 storage_configuration
852 .parameters
853 .kafka_timeout_config
854 .fetch_metadata_timeout,
855 )
856 .await?;
857 }
858 (None, Some(time_offset)) => {
859 let start_offsets = kafka_util::lookup_start_offsets(
861 Arc::clone(&consumer),
862 &topic,
863 time_offset,
864 now,
865 storage_configuration
866 .parameters
867 .kafka_timeout_config
868 .fetch_metadata_timeout,
869 )
870 .await?;
871
872 base_with_options.retain(|val| {
873 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
874 });
875 base_with_options.push(KafkaSourceConfigOption {
876 name: KafkaSourceConfigOptionName::StartOffset,
877 value: Some(WithOptionValue::Sequence(
878 start_offsets
879 .iter()
880 .map(|offset| {
881 WithOptionValue::Value(Value::Number(offset.to_string()))
882 })
883 .collect(),
884 )),
885 });
886 }
887 }
888
889 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
890 retrieved_source_references = reference_client.get_source_references().await?;
891
892 format_options = SourceFormatOptions::Kafka { topic };
893 }
894 CreateSourceConnection::Postgres {
895 connection,
896 options,
897 } => {
898 let connection_item = scx.get_item_by_resolved_name(connection)?;
899 let connection = match connection_item.connection().map_err(PlanError::from)? {
900 Connection::Postgres(connection) => {
901 connection.clone().into_inline_connection(&catalog)
902 }
903 _ => Err(PgSourcePurificationError::NotPgConnection(
904 scx.catalog.resolve_full_name(connection_item.name()),
905 ))?,
906 };
907 let crate::plan::statement::PgConfigOptionExtracted {
908 publication,
909 text_columns,
910 exclude_columns,
911 details,
912 ..
913 } = options.clone().try_into()?;
914 let publication =
915 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
916
917 if details.is_some() {
918 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
919 }
920
921 let client = connection
922 .validate(connection_item.id(), storage_configuration)
923 .await?;
924
925 let reference_client = SourceReferenceClient::Postgres {
926 client: &client,
927 publication: &publication,
928 database: &connection.database,
929 };
930 retrieved_source_references = reference_client.get_source_references().await?;
931
932 let postgres::PurifiedSourceExports {
933 source_exports: subsources,
934 normalized_text_columns,
935 } = postgres::purify_source_exports(
936 &client,
937 &retrieved_source_references,
938 external_references,
939 text_columns,
940 exclude_columns,
941 source_name,
942 &reference_policy,
943 )
944 .await?;
945
946 if let Some(text_cols_option) = options
947 .iter_mut()
948 .find(|option| option.name == PgConfigOptionName::TextColumns)
949 {
950 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
951 }
952
953 requested_subsource_map.extend(subsources);
954
955 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
958
959 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
961 let details = PostgresSourcePublicationDetails {
962 slot: format!(
963 "materialize_{}",
964 Uuid::new_v4().to_string().replace('-', "")
965 ),
966 timeline_id: Some(timeline_id),
967 database: connection.database,
968 };
969 options.push(PgConfigOption {
970 name: PgConfigOptionName::Details,
971 value: Some(WithOptionValue::Value(Value::String(hex::encode(
972 details.into_proto().encode_to_vec(),
973 )))),
974 })
975 }
976 CreateSourceConnection::SqlServer {
977 connection,
978 options,
979 } => {
980 let connection_item = scx.get_item_by_resolved_name(connection)?;
983 let connection = match connection_item.connection()? {
984 Connection::SqlServer(connection) => {
985 connection.clone().into_inline_connection(&catalog)
986 }
987 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
988 scx.catalog.resolve_full_name(connection_item.name()),
989 ))?,
990 };
991 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
992 details,
993 text_columns,
994 exclude_columns,
995 seen: _,
996 } = options.clone().try_into()?;
997
998 if details.is_some() {
999 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
1000 }
1001
1002 let mut client = connection
1003 .validate(connection_item.id(), storage_configuration)
1004 .await?;
1005
1006 let database: Arc<str> = connection.database.into();
1007 let reference_client = SourceReferenceClient::SqlServer {
1008 client: &mut client,
1009 database: Arc::clone(&database),
1010 };
1011 retrieved_source_references = reference_client.get_source_references().await?;
1012 tracing::debug!(?retrieved_source_references, "got source references");
1013
1014 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1015 .get(storage_configuration.config_set());
1016
1017 let purified_source_exports = sql_server::purify_source_exports(
1018 &*database,
1019 &mut client,
1020 &retrieved_source_references,
1021 external_references,
1022 &text_columns,
1023 &exclude_columns,
1024 source_name,
1025 timeout,
1026 &reference_policy,
1027 )
1028 .await?;
1029
1030 let sql_server::PurifiedSourceExports {
1031 source_exports,
1032 normalized_text_columns,
1033 normalized_excl_columns,
1034 } = purified_source_exports;
1035
1036 requested_subsource_map.extend(source_exports);
1038
1039 let restore_history_id =
1043 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1044 let details = SqlServerSourceExtras { restore_history_id };
1045
1046 options.retain(|SqlServerConfigOption { name, .. }| {
1047 name != &SqlServerConfigOptionName::Details
1048 });
1049 options.push(SqlServerConfigOption {
1050 name: SqlServerConfigOptionName::Details,
1051 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1052 details.into_proto().encode_to_vec(),
1053 )))),
1054 });
1055
1056 if let Some(text_cols_option) = options
1058 .iter_mut()
1059 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1060 {
1061 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1062 }
1063 if let Some(excl_cols_option) = options
1064 .iter_mut()
1065 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1066 {
1067 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1068 }
1069 }
1070 CreateSourceConnection::MySql {
1071 connection,
1072 options,
1073 } => {
1074 let connection_item = scx.get_item_by_resolved_name(connection)?;
1075 let connection = match connection_item.connection()? {
1076 Connection::MySql(connection) => {
1077 connection.clone().into_inline_connection(&catalog)
1078 }
1079 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1080 scx.catalog.resolve_full_name(connection_item.name()),
1081 ))?,
1082 };
1083 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1084 details,
1085 text_columns,
1086 exclude_columns,
1087 seen: _,
1088 } = options.clone().try_into()?;
1089
1090 if details.is_some() {
1091 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1092 }
1093
1094 let mut conn = connection
1095 .validate(connection_item.id(), storage_configuration)
1096 .await
1097 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1098
1099 let initial_gtid_set =
1103 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1104
1105 let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1106
1107 let reference_client = SourceReferenceClient::MySql {
1108 conn: &mut conn,
1109 include_system_schemas: mysql::references_system_schemas(external_references),
1110 };
1111 retrieved_source_references = reference_client.get_source_references().await?;
1112
1113 let mysql::PurifiedSourceExports {
1114 source_exports: subsources,
1115 normalized_text_columns,
1116 normalized_exclude_columns,
1117 } = mysql::purify_source_exports(
1118 &mut conn,
1119 &retrieved_source_references,
1120 external_references,
1121 text_columns,
1122 exclude_columns,
1123 source_name,
1124 initial_gtid_set.clone(),
1125 &reference_policy,
1126 binlog_full_metadata,
1127 )
1128 .await?;
1129 requested_subsource_map.extend(subsources);
1130
1131 let details = MySqlSourceDetails {};
1134 options
1136 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1137 options.push(MySqlConfigOption {
1138 name: MySqlConfigOptionName::Details,
1139 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1140 details.into_proto().encode_to_vec(),
1141 )))),
1142 });
1143
1144 if let Some(text_cols_option) = options
1145 .iter_mut()
1146 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1147 {
1148 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1149 }
1150 if let Some(exclude_cols_option) = options
1151 .iter_mut()
1152 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1153 {
1154 exclude_cols_option.value =
1155 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1156 }
1157 }
1158 CreateSourceConnection::LoadGenerator { generator, options } => {
1159 let load_generator =
1160 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1161
1162 let reference_client = SourceReferenceClient::LoadGenerator {
1163 generator: &load_generator,
1164 };
1165 retrieved_source_references = reference_client.get_source_references().await?;
1166 let multi_output_sources =
1170 retrieved_source_references
1171 .all_references()
1172 .iter()
1173 .any(|r| {
1174 matches!(
1175 r.load_generator_output(),
1176 Some(output) if output != &LoadGeneratorOutput::Default
1177 )
1178 });
1179
1180 match external_references {
1181 Some(requested)
1182 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1183 {
1184 Err(PlanError::UseTablesForSources(requested.to_string()))?
1185 }
1186 Some(requested) if !multi_output_sources => match requested {
1187 ExternalReferences::SubsetTables(_) => {
1188 Err(LoadGeneratorSourcePurificationError::ForTables)?
1189 }
1190 ExternalReferences::SubsetSchemas(_) => {
1191 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1192 }
1193 ExternalReferences::All => {
1194 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1195 }
1196 },
1197 Some(requested) => {
1198 let requested_exports = retrieved_source_references
1199 .requested_source_exports(Some(requested), source_name)?;
1200 for export in requested_exports {
1201 requested_subsource_map.insert(
1202 export.name,
1203 PurifiedSourceExport {
1204 external_reference: export.external_reference,
1205 details: PurifiedExportDetails::LoadGenerator {
1206 table: export
1207 .meta
1208 .load_generator_desc()
1209 .ok_or_else(|| {
1210 internal_err!(
1211 "expected load generator source reference"
1212 )
1213 })?
1214 .clone(),
1215 output: export
1216 .meta
1217 .load_generator_output()
1218 .ok_or_else(|| {
1219 internal_err!(
1220 "expected load generator source reference"
1221 )
1222 })?
1223 .clone(),
1224 },
1225 },
1226 );
1227 }
1228 }
1229 None => {
1230 if multi_output_sources
1231 && matches!(reference_policy, SourceReferencePolicy::Required)
1232 {
1233 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1234 }
1235 }
1236 }
1237
1238 if let LoadGenerator::Clock = generator {
1239 if !options
1240 .iter()
1241 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1242 {
1243 let now = catalog.now();
1244 options.push(LoadGeneratorOption {
1245 name: LoadGeneratorOptionName::AsOf,
1246 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1247 });
1248 }
1249 }
1250 }
1251 }
1252
1253 *external_references = None;
1257
1258 let create_progress_subsource_stmt = if uses_old_syntax {
1260 let name = match progress_subsource {
1262 Some(name) => match name {
1263 DeferredItemName::Deferred(name) => name.clone(),
1264 DeferredItemName::Named(_) => {
1266 sql_bail!("progress subsource name cannot be a resolved name")
1267 }
1268 },
1269 None => {
1270 let (item, prefix) = source_name
1271 .0
1272 .split_last()
1273 .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1274 let item_name =
1275 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1276 let mut suggested_name = prefix.to_vec();
1277 suggested_name.push(candidate.clone());
1278
1279 let partial =
1280 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1281 let qualified = scx.allocate_qualified_name(partial)?;
1282 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1283 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1284 Ok::<_, PlanError>(!item_exists && !type_exists)
1285 })?;
1286
1287 let mut full_name = prefix.to_vec();
1288 full_name.push(item_name);
1289 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1290 let qualified_name = scx.allocate_qualified_name(full_name)?;
1291 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1292
1293 UnresolvedItemName::from(full_name.clone())
1294 }
1295 };
1296
1297 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1298
1299 let mut progress_with_options: Vec<_> = with_options
1301 .iter()
1302 .filter_map(|opt| match opt.name {
1303 CreateSourceOptionName::TimestampInterval => None,
1304 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1305 name: CreateSubsourceOptionName::RetainHistory,
1306 value: opt.value.clone(),
1307 }),
1308 })
1309 .collect();
1310 progress_with_options.push(CreateSubsourceOption {
1311 name: CreateSubsourceOptionName::Progress,
1312 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1313 });
1314
1315 Some(CreateSubsourceStatement {
1316 name,
1317 columns,
1318 of_source: None,
1322 constraints,
1323 if_not_exists: false,
1324 with_options: progress_with_options,
1325 })
1326 } else {
1327 None
1328 };
1329
1330 purify_source_format(
1331 &catalog,
1332 format,
1333 &format_options,
1334 envelope,
1335 storage_configuration,
1336 )
1337 .await?;
1338
1339 Ok(PurifiedStatement::PurifiedCreateSource {
1340 create_progress_subsource_stmt,
1341 create_source_stmt,
1342 subsources: requested_subsource_map,
1343 available_source_references: retrieved_source_references.available_source_references(),
1344 })
1345}
1346
1347async fn purify_alter_source(
1350 catalog: impl SessionCatalog,
1351 stmt: AlterSourceStatement<Aug>,
1352 storage_configuration: &StorageConfiguration,
1353) -> Result<PurifiedStatement, PlanError> {
1354 let scx = StatementContext::new(None, &catalog);
1355 let AlterSourceStatement {
1356 source_name: unresolved_source_name,
1357 action,
1358 if_exists,
1359 } = stmt;
1360
1361 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1363 Ok(item) => item,
1364 Err(_) if if_exists => {
1365 return Ok(PurifiedStatement::PurifiedAlterSource {
1366 alter_source_stmt: AlterSourceStatement {
1367 source_name: unresolved_source_name,
1368 action,
1369 if_exists,
1370 },
1371 });
1372 }
1373 Err(e) => return Err(e),
1374 };
1375
1376 let desc = match item.source_desc()? {
1378 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1379 None => {
1380 sql_bail!("cannot ALTER this type of source")
1381 }
1382 };
1383
1384 let source_name = item.name();
1385
1386 let resolved_source_name = ResolvedItemName::Item {
1387 id: item.id(),
1388 qualifiers: item.name().qualifiers.clone(),
1389 full_name: scx.catalog.resolve_full_name(source_name),
1390 print_id: true,
1391 version: RelationVersionSelector::Latest,
1392 };
1393
1394 let partial_name = scx.catalog.minimal_qualification(source_name);
1395
1396 match action {
1397 AlterSourceAction::AddSubsources {
1398 external_references,
1399 options,
1400 } => {
1401 if scx.catalog.system_vars().enable_create_table_from_source()
1402 && scx.catalog.system_vars().force_source_table_syntax()
1403 {
1404 Err(PlanError::UseTablesForSources(
1405 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1406 ))?;
1407 }
1408
1409 purify_alter_source_add_subsources(
1410 external_references,
1411 options,
1412 desc,
1413 partial_name,
1414 unresolved_source_name,
1415 resolved_source_name,
1416 storage_configuration,
1417 )
1418 .await
1419 }
1420 AlterSourceAction::RefreshReferences => {
1421 purify_alter_source_refresh_references(
1422 desc,
1423 resolved_source_name,
1424 storage_configuration,
1425 )
1426 .await
1427 }
1428 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1429 alter_source_stmt: AlterSourceStatement {
1430 source_name: unresolved_source_name,
1431 action,
1432 if_exists,
1433 },
1434 }),
1435 }
1436}
1437
1438async fn purify_alter_source_add_subsources(
1441 external_references: Vec<ExternalReferenceExport>,
1442 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1443 desc: SourceDesc,
1444 partial_source_name: PartialItemName,
1445 unresolved_source_name: UnresolvedItemName,
1446 resolved_source_name: ResolvedItemName,
1447 storage_configuration: &StorageConfiguration,
1448) -> Result<PurifiedStatement, PlanError> {
1449 let connection_id = match &desc.connection {
1451 GenericSourceConnection::Postgres(c) => c.connection_id,
1452 GenericSourceConnection::MySql(c) => c.connection_id,
1453 GenericSourceConnection::SqlServer(c) => c.connection_id,
1454 _ => sql_bail!(
1455 "source {} does not support ALTER SOURCE.",
1456 partial_source_name
1457 ),
1458 };
1459
1460 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1461 text_columns,
1462 exclude_columns,
1463 details,
1464 seen: _,
1465 } = options.clone().try_into()?;
1466 if details.is_some() {
1467 sql_bail!("DETAILS option cannot be explicitly set");
1468 }
1469
1470 let mut requested_subsource_map = BTreeMap::new();
1471
1472 match desc.connection {
1473 GenericSourceConnection::Postgres(pg_source_connection) => {
1474 let pg_connection = &pg_source_connection.connection;
1476
1477 let client = pg_connection
1478 .validate(connection_id, storage_configuration)
1479 .await?;
1480
1481 let reference_client = SourceReferenceClient::Postgres {
1482 client: &client,
1483 publication: &pg_source_connection.publication,
1484 database: &pg_connection.database,
1485 };
1486 let retrieved_source_references = reference_client.get_source_references().await?;
1487
1488 let postgres::PurifiedSourceExports {
1489 source_exports: subsources,
1490 normalized_text_columns,
1491 } = postgres::purify_source_exports(
1492 &client,
1493 &retrieved_source_references,
1494 &Some(ExternalReferences::SubsetTables(external_references)),
1495 text_columns,
1496 exclude_columns,
1497 &unresolved_source_name,
1498 &SourceReferencePolicy::Required,
1499 )
1500 .await?;
1501
1502 if let Some(text_cols_option) = options
1503 .iter_mut()
1504 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1505 {
1506 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1507 }
1508
1509 requested_subsource_map.extend(subsources);
1510 }
1511 GenericSourceConnection::MySql(mysql_source_connection) => {
1512 let mysql_connection = &mysql_source_connection.connection;
1513 let config = mysql_connection
1514 .config(
1515 &storage_configuration.connection_context.secrets_reader,
1516 storage_configuration,
1517 InTask::No,
1518 )
1519 .await?;
1520
1521 let mut conn = config
1522 .connect(
1523 "mysql purification",
1524 &storage_configuration.connection_context.ssh_tunnel_manager,
1525 )
1526 .await?;
1527
1528 let initial_gtid_set =
1531 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1532
1533 let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1534
1535 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1536
1537 let reference_client = SourceReferenceClient::MySql {
1538 conn: &mut conn,
1539 include_system_schemas: mysql::references_system_schemas(&requested_references),
1540 };
1541 let retrieved_source_references = reference_client.get_source_references().await?;
1542
1543 let mysql::PurifiedSourceExports {
1544 source_exports: subsources,
1545 normalized_text_columns,
1546 normalized_exclude_columns,
1547 } = mysql::purify_source_exports(
1548 &mut conn,
1549 &retrieved_source_references,
1550 &requested_references,
1551 text_columns,
1552 exclude_columns,
1553 &unresolved_source_name,
1554 initial_gtid_set,
1555 &SourceReferencePolicy::Required,
1556 binlog_full_metadata,
1557 )
1558 .await?;
1559 requested_subsource_map.extend(subsources);
1560
1561 if let Some(text_cols_option) = options
1563 .iter_mut()
1564 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1565 {
1566 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1567 }
1568 if let Some(exclude_cols_option) = options
1569 .iter_mut()
1570 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1571 {
1572 exclude_cols_option.value =
1573 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1574 }
1575 }
1576 GenericSourceConnection::SqlServer(sql_server_source) => {
1577 let sql_server_connection = &sql_server_source.connection;
1579 let config = sql_server_connection
1580 .resolve_config(
1581 &storage_configuration.connection_context.secrets_reader,
1582 storage_configuration,
1583 InTask::No,
1584 )
1585 .await?;
1586 let mut client = mz_sql_server_util::Client::connect(config).await?;
1587
1588 let database = sql_server_connection.database.clone().into();
1590 let source_references = SourceReferenceClient::SqlServer {
1591 client: &mut client,
1592 database: Arc::clone(&database),
1593 }
1594 .get_source_references()
1595 .await?;
1596 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1597
1598 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1599 .get(storage_configuration.config_set());
1600
1601 let result = sql_server::purify_source_exports(
1602 &*database,
1603 &mut client,
1604 &source_references,
1605 &requested_references,
1606 &text_columns,
1607 &exclude_columns,
1608 &unresolved_source_name,
1609 timeout,
1610 &SourceReferencePolicy::Required,
1611 )
1612 .await;
1613 let sql_server::PurifiedSourceExports {
1614 source_exports,
1615 normalized_text_columns,
1616 normalized_excl_columns,
1617 } = result?;
1618
1619 requested_subsource_map.extend(source_exports);
1621
1622 if let Some(text_cols_option) = options
1624 .iter_mut()
1625 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1626 {
1627 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1628 }
1629 if let Some(exclude_cols_option) = options
1630 .iter_mut()
1631 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1632 {
1633 exclude_cols_option.value =
1634 Some(WithOptionValue::Sequence(normalized_excl_columns));
1635 }
1636 }
1637 _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1640 };
1641
1642 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1643 source_name: resolved_source_name,
1644 options,
1645 subsources: requested_subsource_map,
1646 })
1647}
1648
1649async fn purify_alter_source_refresh_references(
1650 desc: SourceDesc,
1651 resolved_source_name: ResolvedItemName,
1652 storage_configuration: &StorageConfiguration,
1653) -> Result<PurifiedStatement, PlanError> {
1654 let retrieved_source_references = match desc.connection {
1655 GenericSourceConnection::Postgres(pg_source_connection) => {
1656 let pg_connection = &pg_source_connection.connection;
1658
1659 let config = pg_connection
1660 .config(
1661 &storage_configuration.connection_context.secrets_reader,
1662 storage_configuration,
1663 InTask::No,
1664 )
1665 .await?;
1666
1667 let client = config
1668 .connect(
1669 "postgres_purification",
1670 &storage_configuration.connection_context.ssh_tunnel_manager,
1671 )
1672 .await?;
1673 let reference_client = SourceReferenceClient::Postgres {
1674 client: &client,
1675 publication: &pg_source_connection.publication,
1676 database: &pg_connection.database,
1677 };
1678 reference_client.get_source_references().await?
1679 }
1680 GenericSourceConnection::MySql(mysql_source_connection) => {
1681 let mysql_connection = &mysql_source_connection.connection;
1682 let config = mysql_connection
1683 .config(
1684 &storage_configuration.connection_context.secrets_reader,
1685 storage_configuration,
1686 InTask::No,
1687 )
1688 .await?;
1689
1690 let mut conn = config
1691 .connect(
1692 "mysql purification",
1693 &storage_configuration.connection_context.ssh_tunnel_manager,
1694 )
1695 .await?;
1696
1697 let reference_client = SourceReferenceClient::MySql {
1698 conn: &mut conn,
1699 include_system_schemas: false,
1700 };
1701 reference_client.get_source_references().await?
1702 }
1703 GenericSourceConnection::SqlServer(sql_server_source) => {
1704 let sql_server_connection = &sql_server_source.connection;
1706 let config = sql_server_connection
1707 .resolve_config(
1708 &storage_configuration.connection_context.secrets_reader,
1709 storage_configuration,
1710 InTask::No,
1711 )
1712 .await?;
1713 let mut client = mz_sql_server_util::Client::connect(config).await?;
1714
1715 let source_references = SourceReferenceClient::SqlServer {
1717 client: &mut client,
1718 database: sql_server_connection.database.clone().into(),
1719 }
1720 .get_source_references()
1721 .await?;
1722 source_references
1723 }
1724 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1725 let reference_client = SourceReferenceClient::LoadGenerator {
1726 generator: &load_gen_connection.load_generator,
1727 };
1728 reference_client.get_source_references().await?
1729 }
1730 GenericSourceConnection::Kafka(kafka_conn) => {
1731 let reference_client = SourceReferenceClient::Kafka {
1732 topic: &kafka_conn.topic,
1733 };
1734 reference_client.get_source_references().await?
1735 }
1736 };
1737 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1738 source_name: resolved_source_name,
1739 available_source_references: retrieved_source_references.available_source_references(),
1740 })
1741}
1742
1743async fn purify_create_table_from_source(
1744 catalog: impl SessionCatalog,
1745 mut stmt: CreateTableFromSourceStatement<Aug>,
1746 storage_configuration: &StorageConfiguration,
1747) -> Result<PurifiedStatement, PlanError> {
1748 let scx = StatementContext::new(None, &catalog);
1749 let CreateTableFromSourceStatement {
1750 name: _,
1751 columns,
1752 constraints,
1753 source: source_name,
1754 if_not_exists: _,
1755 external_reference,
1756 format,
1757 envelope,
1758 include_metadata: _,
1759 with_options,
1760 } = &mut stmt;
1761
1762 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1764 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1765 }
1766 if !constraints.is_empty() {
1767 sql_bail!(
1768 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1769 );
1770 }
1771
1772 let item = match scx.get_item_by_resolved_name(source_name) {
1774 Ok(item) => item,
1775 Err(e) => return Err(e),
1776 };
1777
1778 let desc = match item.source_desc()? {
1780 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1781 None => {
1782 sql_bail!("cannot ALTER this type of source")
1783 }
1784 };
1785 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1786
1787 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1788 text_columns,
1789 exclude_columns,
1790 retain_history: _,
1791 details,
1792 partition_by: _,
1793 seen: _,
1794 } = with_options.clone().try_into()?;
1795 if details.is_some() {
1796 sql_bail!("DETAILS option cannot be explicitly set");
1797 }
1798
1799 let qualified_text_columns = text_columns
1803 .iter()
1804 .map(|col| {
1805 UnresolvedItemName(
1806 external_reference
1807 .as_ref()
1808 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1809 .unwrap_or_else(|| vec![col.clone()]),
1810 )
1811 })
1812 .collect_vec();
1813 let qualified_exclude_columns = exclude_columns
1814 .iter()
1815 .map(|col| {
1816 UnresolvedItemName(
1817 external_reference
1818 .as_ref()
1819 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1820 .unwrap_or_else(|| vec![col.clone()]),
1821 )
1822 })
1823 .collect_vec();
1824
1825 let mut format_options = SourceFormatOptions::Default;
1827
1828 let retrieved_source_references: RetrievedSourceReferences;
1829
1830 let requested_references = external_reference.as_ref().map(|ref_name| {
1831 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1832 reference: ref_name.clone(),
1833 alias: None,
1834 }])
1835 });
1836
1837 let purified_export = match desc.connection {
1840 GenericSourceConnection::Postgres(pg_source_connection) => {
1841 let pg_connection = &pg_source_connection.connection;
1843
1844 let client = pg_connection
1845 .validate(pg_source_connection.connection_id, storage_configuration)
1846 .await?;
1847
1848 let reference_client = SourceReferenceClient::Postgres {
1849 client: &client,
1850 publication: &pg_source_connection.publication,
1851 database: &pg_connection.database,
1852 };
1853 retrieved_source_references = reference_client.get_source_references().await?;
1854
1855 let postgres::PurifiedSourceExports {
1856 source_exports,
1857 normalized_text_columns: _,
1861 } = postgres::purify_source_exports(
1862 &client,
1863 &retrieved_source_references,
1864 &requested_references,
1865 qualified_text_columns,
1866 qualified_exclude_columns,
1867 &unresolved_source_name,
1868 &SourceReferencePolicy::Required,
1869 )
1870 .await?;
1871 let (_, purified_export) = source_exports.into_element();
1873 purified_export
1874 }
1875 GenericSourceConnection::MySql(mysql_source_connection) => {
1876 let mysql_connection = &mysql_source_connection.connection;
1877 let config = mysql_connection
1878 .config(
1879 &storage_configuration.connection_context.secrets_reader,
1880 storage_configuration,
1881 InTask::No,
1882 )
1883 .await?;
1884
1885 let mut conn = config
1886 .connect(
1887 "mysql purification",
1888 &storage_configuration.connection_context.ssh_tunnel_manager,
1889 )
1890 .await?;
1891
1892 ensure_binlog_full_metadata(&mut conn).await?;
1893 let binlog_full_metadata = true;
1894
1895 let initial_gtid_set =
1898 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1899
1900 let reference_client = SourceReferenceClient::MySql {
1901 conn: &mut conn,
1902 include_system_schemas: mysql::references_system_schemas(&requested_references),
1903 };
1904 retrieved_source_references = reference_client.get_source_references().await?;
1905
1906 let mysql::PurifiedSourceExports {
1907 source_exports,
1908 normalized_text_columns: _,
1912 normalized_exclude_columns: _,
1913 } = mysql::purify_source_exports(
1914 &mut conn,
1915 &retrieved_source_references,
1916 &requested_references,
1917 qualified_text_columns,
1918 qualified_exclude_columns,
1919 &unresolved_source_name,
1920 initial_gtid_set,
1921 &SourceReferencePolicy::Required,
1922 binlog_full_metadata,
1923 )
1924 .await?;
1925 let (_, purified_export) = source_exports.into_element();
1927 purified_export
1928 }
1929 GenericSourceConnection::SqlServer(sql_server_source) => {
1930 let connection = sql_server_source.connection;
1931 let config = connection
1932 .resolve_config(
1933 &storage_configuration.connection_context.secrets_reader,
1934 storage_configuration,
1935 InTask::No,
1936 )
1937 .await?;
1938 let mut client = mz_sql_server_util::Client::connect(config).await?;
1939
1940 let database: Arc<str> = connection.database.into();
1941 let reference_client = SourceReferenceClient::SqlServer {
1942 client: &mut client,
1943 database: Arc::clone(&database),
1944 };
1945 retrieved_source_references = reference_client.get_source_references().await?;
1946 tracing::debug!(?retrieved_source_references, "got source references");
1947
1948 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1949 .get(storage_configuration.config_set());
1950
1951 let purified_source_exports = sql_server::purify_source_exports(
1952 &*database,
1953 &mut client,
1954 &retrieved_source_references,
1955 &requested_references,
1956 &qualified_text_columns,
1957 &qualified_exclude_columns,
1958 &unresolved_source_name,
1959 timeout,
1960 &SourceReferencePolicy::Required,
1961 )
1962 .await?;
1963
1964 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1966 purified_export
1967 }
1968 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1969 let reference_client = SourceReferenceClient::LoadGenerator {
1970 generator: &load_gen_connection.load_generator,
1971 };
1972 retrieved_source_references = reference_client.get_source_references().await?;
1973
1974 let requested_exports = retrieved_source_references
1975 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1976 let export = requested_exports.into_element();
1978 PurifiedSourceExport {
1979 external_reference: export.external_reference,
1980 details: PurifiedExportDetails::LoadGenerator {
1981 table: export
1982 .meta
1983 .load_generator_desc()
1984 .ok_or_else(|| internal_err!("expected load generator source reference"))?
1985 .clone(),
1986 output: export
1987 .meta
1988 .load_generator_output()
1989 .ok_or_else(|| internal_err!("expected load generator source reference"))?
1990 .clone(),
1991 },
1992 }
1993 }
1994 GenericSourceConnection::Kafka(kafka_conn) => {
1995 let reference_client = SourceReferenceClient::Kafka {
1996 topic: &kafka_conn.topic,
1997 };
1998 retrieved_source_references = reference_client.get_source_references().await?;
1999 let requested_exports = retrieved_source_references
2000 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2001 let export = requested_exports.into_element();
2003
2004 format_options = SourceFormatOptions::Kafka {
2005 topic: kafka_conn.topic.clone(),
2006 };
2007 PurifiedSourceExport {
2008 external_reference: export.external_reference,
2009 details: PurifiedExportDetails::Kafka {},
2010 }
2011 }
2012 };
2013
2014 purify_source_format(
2015 &catalog,
2016 format,
2017 &format_options,
2018 envelope,
2019 storage_configuration,
2020 )
2021 .await?;
2022
2023 *external_reference = Some(purified_export.external_reference.clone());
2026
2027 match &purified_export.details {
2029 PurifiedExportDetails::Postgres { .. } => {
2030 let mut unsupported_cols = vec![];
2031 let postgres::PostgresExportStatementValues {
2032 columns: gen_columns,
2033 constraints: gen_constraints,
2034 text_columns: gen_text_columns,
2035 exclude_columns: gen_exclude_columns,
2036 details: gen_details,
2037 external_reference: _,
2038 } = postgres::generate_source_export_statement_values(
2039 &scx,
2040 purified_export,
2041 &mut unsupported_cols,
2042 )?;
2043 if !unsupported_cols.is_empty() {
2044 unsupported_cols.sort();
2045 Err(PgSourcePurificationError::UnrecognizedTypes {
2046 cols: unsupported_cols,
2047 })?;
2048 }
2049
2050 if let Some(text_cols_option) = with_options
2051 .iter_mut()
2052 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2053 {
2054 match gen_text_columns {
2055 Some(gen_text_columns) => {
2056 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2057 }
2058 None => soft_panic_or_log!(
2059 "text_columns should be Some if text_cols_option is present"
2060 ),
2061 }
2062 }
2063 if let Some(exclude_cols_option) = with_options
2064 .iter_mut()
2065 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2066 {
2067 match gen_exclude_columns {
2068 Some(gen_exclude_columns) => {
2069 exclude_cols_option.value =
2070 Some(WithOptionValue::Sequence(gen_exclude_columns))
2071 }
2072 None => soft_panic_or_log!(
2073 "exclude_columns should be Some if exclude_cols_option is present"
2074 ),
2075 }
2076 }
2077 match columns {
2078 TableFromSourceColumns::Defined(_) => {
2079 bail_internal!(
2082 "column definitions cannot be explicitly set for this source type"
2083 )
2084 }
2085 TableFromSourceColumns::NotSpecified => {
2086 *columns = TableFromSourceColumns::Defined(gen_columns);
2087 *constraints = gen_constraints;
2088 }
2089 TableFromSourceColumns::Named(_) => {
2090 sql_bail!("columns cannot be named for Postgres sources")
2091 }
2092 }
2093 with_options.push(TableFromSourceOption {
2094 name: TableFromSourceOptionName::Details,
2095 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2096 gen_details.into_proto().encode_to_vec(),
2097 )))),
2098 })
2099 }
2100 PurifiedExportDetails::MySql { .. } => {
2101 let mysql::MySqlExportStatementValues {
2102 columns: gen_columns,
2103 constraints: gen_constraints,
2104 text_columns: gen_text_columns,
2105 exclude_columns: gen_exclude_columns,
2106 details: gen_details,
2107 external_reference: _,
2108 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2109
2110 if let Some(text_cols_option) = with_options
2111 .iter_mut()
2112 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2113 {
2114 match gen_text_columns {
2115 Some(gen_text_columns) => {
2116 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2117 }
2118 None => soft_panic_or_log!(
2119 "text_columns should be Some if text_cols_option is present"
2120 ),
2121 }
2122 }
2123 if let Some(exclude_cols_option) = with_options
2124 .iter_mut()
2125 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2126 {
2127 match gen_exclude_columns {
2128 Some(gen_exclude_columns) => {
2129 exclude_cols_option.value =
2130 Some(WithOptionValue::Sequence(gen_exclude_columns))
2131 }
2132 None => soft_panic_or_log!(
2133 "exclude_columns should be Some if exclude_cols_option is present"
2134 ),
2135 }
2136 }
2137 match columns {
2138 TableFromSourceColumns::Defined(_) => {
2139 bail_internal!(
2142 "column definitions cannot be explicitly set for this source type"
2143 )
2144 }
2145 TableFromSourceColumns::NotSpecified => {
2146 *columns = TableFromSourceColumns::Defined(gen_columns);
2147 *constraints = gen_constraints;
2148 }
2149 TableFromSourceColumns::Named(_) => {
2150 sql_bail!("columns cannot be named for MySQL sources")
2151 }
2152 }
2153 with_options.push(TableFromSourceOption {
2154 name: TableFromSourceOptionName::Details,
2155 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2156 gen_details.into_proto().encode_to_vec(),
2157 )))),
2158 })
2159 }
2160 PurifiedExportDetails::SqlServer { .. } => {
2161 let sql_server::SqlServerExportStatementValues {
2162 columns: gen_columns,
2163 constraints: gen_constraints,
2164 text_columns: gen_text_columns,
2165 excl_columns: gen_excl_columns,
2166 details: gen_details,
2167 external_reference: _,
2168 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2169
2170 if let Some(text_cols_option) = with_options
2171 .iter_mut()
2172 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2173 {
2174 match gen_text_columns {
2175 Some(gen_text_columns) => {
2176 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2177 }
2178 None => soft_panic_or_log!(
2179 "text_columns should be Some if text_cols_option is present"
2180 ),
2181 }
2182 }
2183 if let Some(exclude_cols_option) = with_options
2184 .iter_mut()
2185 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2186 {
2187 match gen_excl_columns {
2188 Some(gen_excl_columns) => {
2189 exclude_cols_option.value =
2190 Some(WithOptionValue::Sequence(gen_excl_columns))
2191 }
2192 None => soft_panic_or_log!(
2193 "excl_columns should be Some if excl_cols_option is present"
2194 ),
2195 }
2196 }
2197
2198 match columns {
2199 TableFromSourceColumns::NotSpecified => {
2200 *columns = TableFromSourceColumns::Defined(gen_columns);
2201 *constraints = gen_constraints;
2202 }
2203 TableFromSourceColumns::Named(_) => {
2204 sql_bail!("columns cannot be named for SQL Server sources")
2205 }
2206 TableFromSourceColumns::Defined(_) => {
2207 bail_internal!(
2210 "column definitions cannot be explicitly set for this source type"
2211 )
2212 }
2213 }
2214
2215 with_options.push(TableFromSourceOption {
2216 name: TableFromSourceOptionName::Details,
2217 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2218 gen_details.into_proto().encode_to_vec(),
2219 )))),
2220 })
2221 }
2222 PurifiedExportDetails::LoadGenerator { .. } => {
2223 let (desc, output) = match purified_export.details {
2224 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2225 _ => bail_internal!("purified export details must be load generator"),
2226 };
2227 if let Some(desc) = desc {
2232 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2233 match columns {
2234 TableFromSourceColumns::Defined(_) => bail_internal!(
2237 "column definitions cannot be explicitly set for this source type"
2238 ),
2239 TableFromSourceColumns::NotSpecified => {
2240 *columns = TableFromSourceColumns::Defined(gen_columns);
2241 *constraints = gen_constraints;
2242 }
2243 TableFromSourceColumns::Named(_) => {
2244 sql_bail!("columns cannot be named for multi-output load generator sources")
2245 }
2246 }
2247 }
2248 let details = SourceExportStatementDetails::LoadGenerator { output };
2249 with_options.push(TableFromSourceOption {
2250 name: TableFromSourceOptionName::Details,
2251 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2252 details.into_proto().encode_to_vec(),
2253 )))),
2254 })
2255 }
2256 PurifiedExportDetails::Kafka {} => {
2257 let details = SourceExportStatementDetails::Kafka {};
2261 with_options.push(TableFromSourceOption {
2262 name: TableFromSourceOptionName::Details,
2263 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2264 details.into_proto().encode_to_vec(),
2265 )))),
2266 })
2267 }
2268 };
2269
2270 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2274}
2275
2276enum SourceFormatOptions {
2277 Default,
2278 Kafka { topic: String },
2279}
2280
2281async fn purify_source_format(
2282 catalog: &dyn SessionCatalog,
2283 format: &mut Option<FormatSpecifier<Aug>>,
2284 options: &SourceFormatOptions,
2285 envelope: &Option<SourceEnvelope>,
2286 storage_configuration: &StorageConfiguration,
2287) -> Result<(), PlanError> {
2288 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2289 && !matches!(options, SourceFormatOptions::Kafka { .. })
2290 {
2291 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2292 }
2293
2294 match format.as_mut() {
2295 None => {}
2296 Some(FormatSpecifier::Bare(format)) => {
2297 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2298 .await?;
2299 }
2300
2301 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2302 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2303 .await?;
2304 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2305 .await?;
2306 }
2307 }
2308 Ok(())
2309}
2310
2311async fn purify_source_format_single(
2312 catalog: &dyn SessionCatalog,
2313 format: &mut Format<Aug>,
2314 options: &SourceFormatOptions,
2315 envelope: &Option<SourceEnvelope>,
2316 storage_configuration: &StorageConfiguration,
2317) -> Result<(), PlanError> {
2318 match format {
2319 Format::Avro(schema) => match schema {
2320 AvroSchema::Csr { csr_connection } => {
2321 purify_csr_connection_avro(
2322 catalog,
2323 options,
2324 csr_connection,
2325 envelope,
2326 storage_configuration,
2327 )
2328 .await?
2329 }
2330 AvroSchema::InlineSchema { .. } => {}
2331 },
2332 Format::Protobuf(schema) => match schema {
2333 ProtobufSchema::Csr { csr_connection } => {
2334 purify_csr_connection_proto(
2335 catalog,
2336 options,
2337 csr_connection,
2338 envelope,
2339 storage_configuration,
2340 )
2341 .await?;
2342 }
2343 ProtobufSchema::InlineSchema { .. } => {}
2344 },
2345 Format::Bytes
2346 | Format::Regex(_)
2347 | Format::Json { .. }
2348 | Format::Text
2349 | Format::Csv { .. } => (),
2350 }
2351 Ok(())
2352}
2353
2354pub fn generate_subsource_statements(
2355 scx: &StatementContext,
2356 source_name: ResolvedItemName,
2357 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2358) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2359 if subsources.is_empty() {
2361 return Ok(vec![]);
2362 }
2363 let (_, purified_export) = subsources
2364 .iter()
2365 .next()
2366 .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2367
2368 let statements = match &purified_export.details {
2369 PurifiedExportDetails::Postgres { .. } => {
2370 crate::pure::postgres::generate_create_subsource_statements(
2371 scx,
2372 source_name,
2373 subsources,
2374 )?
2375 }
2376 PurifiedExportDetails::MySql { .. } => {
2377 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2378 }
2379 PurifiedExportDetails::SqlServer { .. } => {
2380 crate::pure::sql_server::generate_create_subsource_statements(
2381 scx,
2382 source_name,
2383 subsources,
2384 )?
2385 }
2386 PurifiedExportDetails::LoadGenerator { .. } => {
2387 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2388 for (subsource_name, purified_export) in subsources {
2389 let (desc, output) = match purified_export.details {
2390 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2391 _ => {
2392 bail_internal!("purified export details must be load generator")
2393 }
2394 };
2395 let desc = desc.ok_or_else(|| {
2396 internal_err!(
2397 "subsources cannot be generated for single-output load generators"
2398 )
2399 })?;
2400
2401 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2402 let details = SourceExportStatementDetails::LoadGenerator { output };
2403 let subsource = CreateSubsourceStatement {
2405 name: subsource_name,
2406 columns,
2407 of_source: Some(source_name.clone()),
2408 constraints: table_constraints,
2413 if_not_exists: false,
2414 with_options: vec![
2415 CreateSubsourceOption {
2416 name: CreateSubsourceOptionName::ExternalReference,
2417 value: Some(WithOptionValue::UnresolvedItemName(
2418 purified_export.external_reference,
2419 )),
2420 },
2421 CreateSubsourceOption {
2422 name: CreateSubsourceOptionName::Details,
2423 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2424 details.into_proto().encode_to_vec(),
2425 )))),
2426 },
2427 ],
2428 };
2429 subsource_stmts.push(subsource);
2430 }
2431
2432 subsource_stmts
2433 }
2434 PurifiedExportDetails::Kafka { .. } => {
2435 if !subsources.is_empty() {
2439 bail_internal!("Kafka sources do not produce data-bearing subsources");
2440 }
2441 vec![]
2442 }
2443 };
2444 Ok(statements)
2445}
2446
2447async fn purify_csr_connection_proto(
2448 catalog: &dyn SessionCatalog,
2449 options: &SourceFormatOptions,
2450 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2451 envelope: &Option<SourceEnvelope>,
2452 storage_configuration: &StorageConfiguration,
2453) -> Result<(), PlanError> {
2454 let SourceFormatOptions::Kafka { topic } = options else {
2455 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2456 };
2457
2458 let CsrConnectionProtobuf {
2459 seed,
2460 connection: CsrConnection {
2461 connection,
2462 options: _,
2463 },
2464 } = csr_connection;
2465 match seed {
2466 None => {
2467 let scx = StatementContext::new(None, &*catalog);
2468
2469 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2470 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2471 _ => sql_bail!("{} is not a schema registry connection", connection),
2472 };
2473
2474 let ccsr_client = ccsr_connection
2475 .connect(storage_configuration, InTask::No)
2476 .await
2477 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2478
2479 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2480 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2481 .await
2482 .ok();
2483
2484 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2485 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2486 }
2487
2488 *seed = Some(CsrSeedProtobuf { value, key });
2489 }
2490 Some(_) => (),
2491 }
2492
2493 Ok(())
2494}
2495
2496async fn purify_csr_connection_avro(
2497 catalog: &dyn SessionCatalog,
2498 options: &SourceFormatOptions,
2499 csr_connection: &mut CsrConnectionAvro<Aug>,
2500 envelope: &Option<SourceEnvelope>,
2501 storage_configuration: &StorageConfiguration,
2502) -> Result<(), PlanError> {
2503 let SourceFormatOptions::Kafka { topic } = options else {
2504 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2505 };
2506
2507 let CsrConnectionAvro {
2508 connection: CsrConnection { connection, .. },
2509 seed,
2510 key_strategy,
2511 value_strategy,
2512 } = csr_connection;
2513 if seed.is_none() {
2514 let scx = StatementContext::new(None, &*catalog);
2515 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2516 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2517 _ => sql_bail!("{} is not a schema registry connection", connection),
2518 };
2519 let ccsr_client = csr_connection
2520 .connect(storage_configuration, InTask::No)
2521 .await
2522 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2523
2524 let Schema {
2525 key_schema,
2526 value_schema,
2527 key_reference_schemas,
2528 value_reference_schemas,
2529 } = get_remote_csr_schema(
2530 &ccsr_client,
2531 key_strategy.clone().unwrap_or_default(),
2532 value_strategy.clone().unwrap_or_default(),
2533 topic,
2534 )
2535 .await?;
2536 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2537 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2538 }
2539
2540 *seed = Some(CsrSeedAvro {
2541 key_schema,
2542 value_schema,
2543 key_reference_schemas,
2544 value_reference_schemas,
2545 })
2546 }
2547
2548 Ok(())
2549}
2550
2551#[derive(Debug)]
2552pub struct Schema {
2553 pub key_schema: Option<String>,
2554 pub value_schema: String,
2555 pub key_reference_schemas: Vec<String>,
2557 pub value_reference_schemas: Vec<String>,
2559}
2560
2561struct SchemaWithReferences {
2563 schema: String,
2565 references: Vec<String>,
2567}
2568
2569async fn get_schema_with_strategy(
2570 client: &Client,
2571 strategy: ReaderSchemaSelectionStrategy,
2572 subject: &str,
2573) -> Result<Option<SchemaWithReferences>, PlanError> {
2574 match strategy {
2575 ReaderSchemaSelectionStrategy::Latest => {
2576 match client.get_subject_and_references(subject).await {
2578 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2579 schema: primary.schema.raw,
2580 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2581 })),
2582 Err(GetBySubjectError::SubjectNotFound)
2583 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2584 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2585 schema_lookup: format!("subject {}", subject.quoted()),
2586 cause: Arc::new(e),
2587 }),
2588 }
2589 }
2590 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2594 schema: raw,
2595 references: vec![],
2596 })),
2597 ReaderSchemaSelectionStrategy::ById(id) => {
2598 match client.get_subject_and_references_by_id(id).await {
2599 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2600 schema: primary.schema.raw,
2601 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2602 })),
2603 Err(GetBySubjectError::SubjectNotFound)
2604 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2605 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2606 schema_lookup: format!("subject {}", subject.quoted()),
2607 cause: Arc::new(e),
2608 }),
2609 }
2610 }
2611 }
2612}
2613
2614async fn get_remote_csr_schema(
2615 ccsr_client: &mz_ccsr::Client,
2616 key_strategy: ReaderSchemaSelectionStrategy,
2617 value_strategy: ReaderSchemaSelectionStrategy,
2618 topic: &str,
2619) -> Result<Schema, PlanError> {
2620 let value_schema_name = format!("{}-value", topic);
2621 let value_result =
2622 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2623 let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2624
2625 let key_subject = format!("{}-key", topic);
2626 let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2627 Ok(Schema {
2628 key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2629 value_schema: value_result.schema,
2630 key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2631 value_reference_schemas: value_result.references,
2632 })
2633}
2634
2635async fn compile_proto(
2637 subject_name: &String,
2638 ccsr_client: &Client,
2639) -> Result<CsrSeedProtobufSchema, PlanError> {
2640 let (primary_subject, dependency_subjects) = ccsr_client
2641 .get_subject_and_references(subject_name)
2642 .await
2643 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2644 schema_lookup: format!("subject {}", subject_name.quoted()),
2645 cause: Arc::new(e),
2646 })?;
2647
2648 let mut source_tree = VirtualSourceTree::new();
2650
2651 source_tree.as_mut().map_well_known_types();
2655
2656 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2657 source_tree.as_mut().add_file(
2658 Path::new(&subject.name),
2659 subject.schema.raw.as_bytes().to_vec(),
2660 );
2661 }
2662 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2663 let fds = db
2664 .as_mut()
2665 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2666 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2667
2668 let primary_fd = fds.file(0);
2670 let message_name = match primary_fd.message_type_size() {
2671 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2672 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2673 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2674 };
2675
2676 let bytes = &fds
2678 .serialize()
2679 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2680 let mut schema = String::new();
2681 strconv::format_bytes(&mut schema, bytes);
2682
2683 Ok(CsrSeedProtobufSchema {
2684 schema,
2685 message_name,
2686 })
2687}
2688
2689const MZ_NOW_NAME: &str = "mz_now";
2690const MZ_NOW_SCHEMA: &str = "mz_catalog";
2691
2692pub fn purify_create_materialized_view_options(
2698 catalog: impl SessionCatalog,
2699 mz_now: Option<Timestamp>,
2700 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2701 resolved_ids: &mut ResolvedIds,
2702) {
2703 let (mz_now_id, mz_now_expr) = {
2706 let item = catalog
2707 .resolve_function(&PartialItemName {
2708 database: None,
2709 schema: Some(MZ_NOW_SCHEMA.to_string()),
2710 item: MZ_NOW_NAME.to_string(),
2711 })
2712 .expect("we should be able to resolve mz_now");
2713 (
2714 item.id(),
2715 Expr::Function(Function {
2716 name: ResolvedItemName::Item {
2717 id: item.id(),
2718 qualifiers: item.name().qualifiers.clone(),
2719 full_name: catalog.resolve_full_name(item.name()),
2720 print_id: false,
2721 version: RelationVersionSelector::Latest,
2722 },
2723 args: FunctionArgs::Args {
2724 args: Vec::new(),
2725 order_by: Vec::new(),
2726 },
2727 filter: None,
2728 over: None,
2729 distinct: false,
2730 }),
2731 )
2732 };
2733 let (mz_timestamp_id, mz_timestamp_type) = {
2735 let item = catalog.get_system_type("mz_timestamp");
2736 let full_name = catalog.resolve_full_name(item.name());
2737 (
2738 item.id(),
2739 ResolvedDataType::Named {
2740 id: item.id(),
2741 qualifiers: item.name().qualifiers.clone(),
2742 full_name,
2743 modifiers: vec![],
2744 print_id: true,
2745 },
2746 )
2747 };
2748
2749 let mut introduced_mz_timestamp = false;
2750
2751 for option in cmvs.with_options.iter_mut() {
2752 if matches!(
2754 option.value,
2755 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2756 ) {
2757 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2758 RefreshAtOptionValue {
2759 time: mz_now_expr.clone(),
2760 },
2761 )));
2762 }
2763
2764 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2766 RefreshEveryOptionValue { aligned_to, .. },
2767 ))) = &mut option.value
2768 {
2769 if aligned_to.is_none() {
2770 *aligned_to = Some(mz_now_expr.clone());
2771 }
2772 }
2773
2774 match &mut option.value {
2777 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2778 time,
2779 }))) => {
2780 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2781 visitor.visit_expr_mut(time);
2782 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2783 }
2784 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2785 RefreshEveryOptionValue {
2786 interval: _,
2787 aligned_to: Some(aligned_to),
2788 },
2789 ))) => {
2790 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2791 visitor.visit_expr_mut(aligned_to);
2792 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2793 }
2794 _ => {}
2795 }
2796 }
2797
2798 if !cmvs.with_options.iter().any(|o| {
2800 matches!(
2801 o,
2802 MaterializedViewOption {
2803 value: Some(WithOptionValue::Refresh(..)),
2804 ..
2805 }
2806 )
2807 }) {
2808 cmvs.with_options.push(MaterializedViewOption {
2809 name: MaterializedViewOptionName::Refresh,
2810 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2811 })
2812 }
2813
2814 if introduced_mz_timestamp {
2818 resolved_ids.add_item(mz_timestamp_id);
2819 }
2820 let mut visitor = ExprContainsTemporalVisitor::new();
2824 visitor.visit_create_materialized_view_statement(cmvs);
2825 if !visitor.contains_temporal {
2826 resolved_ids.remove_item(&mz_now_id);
2827 }
2828}
2829
2830pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2833 match &mvo.value {
2834 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2835 let mut visitor = ExprContainsTemporalVisitor::new();
2836 visitor.visit_expr(time);
2837 visitor.contains_temporal
2838 }
2839 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2840 interval: _,
2841 aligned_to: Some(aligned_to),
2842 }))) => {
2843 let mut visitor = ExprContainsTemporalVisitor::new();
2844 visitor.visit_expr(aligned_to);
2845 visitor.contains_temporal
2846 }
2847 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2848 interval: _,
2849 aligned_to: None,
2850 }))) => {
2851 true
2854 }
2855 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2856 true
2858 }
2859 _ => false,
2860 }
2861}
2862
2863struct ExprContainsTemporalVisitor {
2865 pub contains_temporal: bool,
2866}
2867
2868impl ExprContainsTemporalVisitor {
2869 pub fn new() -> ExprContainsTemporalVisitor {
2870 ExprContainsTemporalVisitor {
2871 contains_temporal: false,
2872 }
2873 }
2874}
2875
2876impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2877 fn visit_function(&mut self, func: &Function<Aug>) {
2878 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2879 visit_function(self, func);
2880 }
2881}
2882
2883struct MzNowPurifierVisitor {
2884 pub mz_now: Option<Timestamp>,
2885 pub mz_timestamp_type: ResolvedDataType,
2886 pub introduced_mz_timestamp: bool,
2887}
2888
2889impl MzNowPurifierVisitor {
2890 pub fn new(
2891 mz_now: Option<Timestamp>,
2892 mz_timestamp_type: ResolvedDataType,
2893 ) -> MzNowPurifierVisitor {
2894 MzNowPurifierVisitor {
2895 mz_now,
2896 mz_timestamp_type,
2897 introduced_mz_timestamp: false,
2898 }
2899 }
2900}
2901
2902impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2903 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2904 match expr {
2905 Expr::Function(Function {
2906 name:
2907 ResolvedItemName::Item {
2908 full_name: FullItemName { item, .. },
2909 ..
2910 },
2911 ..
2912 }) if item == &MZ_NOW_NAME.to_string() => {
2913 let mz_now = self.mz_now.expect(
2914 "we should have chosen a timestamp if the expression contains mz_now()",
2915 );
2916 *expr = Expr::Cast {
2919 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2920 data_type: self.mz_timestamp_type.clone(),
2921 };
2922 self.introduced_mz_timestamp = true;
2923 }
2924 _ => visit_expr_mut(self, expr),
2925 }
2926 }
2927}