1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::str::StrExt;
33use mz_postgres_util::desc::PostgresTableDesc;
34use mz_proto::RustType;
35use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
36use mz_sql_parser::ast::display::AstDisplay;
37use mz_sql_parser::ast::visit::{Visit, visit_function};
38use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
39use mz_sql_parser::ast::{
40 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
41 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
42 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
43 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
44 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
45 DocOnSchema, Expr, Function, FunctionArgs, GlueAvroOption, GlueAvroSeed, Ident,
46 KafkaSourceConfigOption, KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption,
47 LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption,
48 MySqlConfigOptionName, PgConfigOption, PgConfigOptionName, RawItemName,
49 ReaderSchemaSelectionStrategy, RefreshAtOptionValue, RefreshEveryOptionValue,
50 RefreshOptionValue, SourceEnvelope, SqlServerConfigOption, SqlServerConfigOptionName,
51 Statement, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
52 UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81 ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::pure::mysql::{ensure_binlog_full_metadata, is_binlog_full_metadata};
88use crate::{kafka_util, normalize};
89
90use self::error::{
91 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103 external_reference: UnresolvedItemName,
104 name: UnresolvedItemName,
105 meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 f.debug_struct("RequestedSourceExport")
111 .field("external_reference", &self.external_reference)
112 .field("name", &self.name)
113 .field("meta", &self.meta)
114 .finish()
115 }
116}
117
118impl<T> RequestedSourceExport<T> {
119 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120 RequestedSourceExport {
121 external_reference: self.external_reference,
122 name: self.name,
123 meta: new_meta,
124 }
125 }
126}
127
128fn source_export_name_gen(
133 source_name: &UnresolvedItemName,
134 subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137 partial.item = subsource_name.to_string();
138 Ok(UnresolvedItemName::from(partial))
139}
140
141fn validate_source_export_names<T>(
145 requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147 if let Some(name) = requested_source_exports
151 .iter()
152 .map(|subsource| &subsource.name)
153 .duplicates()
154 .next()
155 .cloned()
156 {
157 let mut upstream_references: Vec<_> = requested_source_exports
158 .into_iter()
159 .filter_map(|subsource| {
160 if &subsource.name == &name {
161 Some(subsource.external_reference.clone())
162 } else {
163 None
164 }
165 })
166 .collect();
167
168 upstream_references.sort();
169
170 Err(PlanError::SubsourceNameConflict {
171 name,
172 upstream_references,
173 })?;
174 }
175
176 if let Some(name) = requested_source_exports
182 .iter()
183 .map(|export| &export.external_reference)
184 .duplicates()
185 .next()
186 .cloned()
187 {
188 let mut target_names: Vec<_> = requested_source_exports
189 .into_iter()
190 .filter_map(|export| {
191 if &export.external_reference == &name {
192 Some(export.name.clone())
193 } else {
194 None
195 }
196 })
197 .collect();
198
199 target_names.sort();
200
201 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202 }
203
204 Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209 PurifiedCreateSource {
210 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212 create_source_stmt: CreateSourceStatement<Aug>,
213 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215 available_source_references: SourceReferences,
218 },
219 PurifiedAlterSource {
220 alter_source_stmt: AlterSourceStatement<Aug>,
221 },
222 PurifiedAlterSourceAddSubsources {
223 source_name: ResolvedItemName,
225 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230 },
231 PurifiedAlterSourceRefreshReferences {
232 source_name: ResolvedItemName,
233 available_source_references: SourceReferences,
235 },
236 PurifiedCreateSink(CreateSinkStatement<Aug>),
237 PurifiedCreateTableFromSource {
238 stmt: CreateTableFromSourceStatement<Aug>,
239 },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244 pub external_reference: UnresolvedItemName,
245 pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250 MySql {
251 table: MySqlTableDesc,
252 text_columns: Option<Vec<Ident>>,
253 exclude_columns: Option<Vec<Ident>>,
254 initial_gtid_set: String,
255 binlog_full_metadata: bool,
256 },
257 Postgres {
258 table: PostgresTableDesc,
259 text_columns: Option<Vec<Ident>>,
260 exclude_columns: Option<Vec<Ident>>,
261 },
262 SqlServer {
263 table: SqlServerTableDesc,
264 text_columns: Option<Vec<Ident>>,
265 excl_columns: Option<Vec<Ident>>,
266 capture_instance: Arc<str>,
267 initial_lsn: mz_sql_server_util::cdc::Lsn,
268 },
269 Kafka {},
270 LoadGenerator {
271 table: Option<RelationDesc>,
272 output: LoadGeneratorOutput,
273 },
274}
275
276pub async fn purify_statement(
286 catalog: impl SessionCatalog,
287 now: u64,
288 stmt: Statement<Aug>,
289 storage_configuration: &StorageConfiguration,
290) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
291 match stmt {
292 Statement::CreateSource(stmt) => {
293 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
294 (
295 purify_create_source(catalog, now, stmt, storage_configuration).await,
296 cluster_id,
297 )
298 }
299 Statement::AlterSource(stmt) => (
300 purify_alter_source(catalog, stmt, storage_configuration).await,
301 None,
302 ),
303 Statement::CreateSink(stmt) => {
304 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
305 (
306 purify_create_sink(catalog, stmt, storage_configuration).await,
307 cluster_id,
308 )
309 }
310 Statement::CreateTableFromSource(stmt) => (
311 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
312 None,
313 ),
314 o => (
315 Err(internal_err!(
316 "unexpected statement type in purification: {:?}",
317 o
318 )),
319 None,
320 ),
321 }
322}
323
324pub(crate) fn purify_create_sink_avro_doc_on_options(
329 catalog: &dyn SessionCatalog,
330 from_id: CatalogItemId,
331 format: &mut Option<FormatSpecifier<Aug>>,
332) -> Result<(), PlanError> {
333 let from = catalog.get_item(&from_id);
335 let object_ids = from
336 .references()
337 .items()
338 .copied()
339 .chain_one(from.id())
340 .collect::<Vec<_>>();
341
342 let mut avro_format_options = vec![];
345 for_each_format(format, |doc_on_schema, fmt| match fmt {
346 Format::Avro(AvroSchema::InlineSchema { .. })
347 | Format::Avro(AvroSchema::Glue { .. })
348 | Format::Bytes
349 | Format::Csv { .. }
350 | Format::Json { .. }
351 | Format::Protobuf(..)
352 | Format::Regex(..)
353 | Format::Text => (),
354 Format::Avro(AvroSchema::Csr {
355 csr_connection: CsrConnectionAvro { connection, .. },
356 }) => {
357 avro_format_options.push((doc_on_schema, &mut connection.options));
358 }
359 });
360
361 for (for_schema, options) in avro_format_options {
364 let user_provided_comments = options
365 .iter()
366 .filter_map(|CsrConfigOption { name, .. }| match name {
367 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
368 _ => None,
369 })
370 .collect::<BTreeSet<_>>();
371
372 for object_id in &object_ids {
374 let item = catalog
376 .get_item(object_id)
377 .at_version(RelationVersionSelector::Latest);
378 let full_resolved_name = ResolvedItemName::Item {
379 id: *object_id,
380 qualifiers: item.name().qualifiers.clone(),
381 full_name: catalog.resolve_full_name(item.name()),
382 print_id: !matches!(
383 item.item_type(),
384 CatalogItemType::Func | CatalogItemType::Type
385 ),
386 version: RelationVersionSelector::Latest,
387 };
388
389 if let Some(comments_map) = catalog.get_item_comments(object_id) {
390 let doc_on_item_key = AvroDocOn {
393 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
394 for_schema,
395 };
396 if !user_provided_comments.contains(&doc_on_item_key) {
397 if let Some(root_comment) = comments_map.get(&None) {
398 options.push(CsrConfigOption {
399 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
400 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
401 root_comment.clone(),
402 ))),
403 });
404 }
405 }
406
407 let column_descs = match item.type_details() {
411 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
412 None => item.relation_desc().map(|d| d.into_owned()),
413 };
414
415 if let Some(desc) = column_descs {
416 for (pos, column_name) in desc.iter_names().enumerate() {
417 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
418 let doc_on_column_key = AvroDocOn {
419 identifier: DocOnIdentifier::Column(ColumnName {
420 relation: full_resolved_name.clone(),
421 column: ResolvedColumnReference::Column {
422 name: column_name.to_owned(),
423 index: pos,
424 },
425 }),
426 for_schema,
427 };
428 if !user_provided_comments.contains(&doc_on_column_key) {
429 options.push(CsrConfigOption {
430 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
431 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
432 Value::String(comment_str.clone()),
433 )),
434 });
435 }
436 }
437 }
438 }
439 }
440 }
441 }
442
443 Ok(())
444}
445
446async fn purify_create_sink(
449 catalog: impl SessionCatalog,
450 mut create_sink_stmt: CreateSinkStatement<Aug>,
451 storage_configuration: &StorageConfiguration,
452) -> Result<PurifiedStatement, PlanError> {
453 let CreateSinkStatement {
455 connection,
456 format,
457 with_options,
458 name: _,
459 in_cluster: _,
460 if_not_exists: _,
461 from,
462 envelope: _,
463 mode: _,
464 } = &mut create_sink_stmt;
465
466 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
468 CreateSinkOptionName::Snapshot,
469 CreateSinkOptionName::CommitInterval,
470 ];
471
472 if let Some(op) = with_options
473 .iter()
474 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
475 {
476 sql_bail!(
477 "CREATE SINK...WITH ({}..) is not allowed",
478 op.name.to_ast_string_simple(),
479 )
480 }
481
482 match &connection {
483 CreateSinkConnection::Kafka {
484 connection,
485 options,
486 key: _,
487 headers: _,
488 } => {
489 let scx = StatementContext::new(None, &catalog);
494 let connection = {
495 let item = scx.get_item_by_resolved_name(connection)?;
496 match item.connection()? {
498 Connection::Kafka(connection) => {
499 connection.clone().into_inline_connection(scx.catalog)
500 }
501 _ => sql_bail!(
502 "{} is not a kafka connection",
503 scx.catalog.resolve_full_name(item.name())
504 ),
505 }
506 };
507
508 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
509
510 if extracted_options.legacy_ids == Some(true) {
511 sql_bail!("LEGACY IDs option is not supported");
512 }
513
514 let client: AdminClient<_> = connection
515 .create_with_context(
516 storage_configuration,
517 MzClientContext::default(),
518 &BTreeMap::new(),
519 InTask::No,
520 )
521 .await
522 .map_err(|e| {
523 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
525 })?;
526
527 let metadata = client
528 .inner()
529 .fetch_metadata(
530 None,
531 storage_configuration
532 .parameters
533 .kafka_timeout_config
534 .fetch_metadata_timeout,
535 )
536 .map_err(|e| {
537 KafkaSinkPurificationError::AdminClientError(Arc::new(
538 ContextCreationError::KafkaError(e),
539 ))
540 })?;
541
542 if metadata.brokers().len() == 0 {
543 Err(KafkaSinkPurificationError::ZeroBrokers)?;
544 }
545 }
546 CreateSinkConnection::Iceberg {
547 catalog_connection,
548 aws_connection,
549 ..
550 } => {
551 let scx = StatementContext::new(None, &catalog);
552 let connection = {
553 let item = scx.get_item_by_resolved_name(catalog_connection)?;
554 match item.connection()? {
556 Connection::IcebergCatalog(connection) => {
557 connection.clone().into_inline_connection(scx.catalog)
558 }
559 _ => sql_bail!(
560 "{} is not an iceberg connection",
561 scx.catalog.resolve_full_name(item.name())
562 ),
563 }
564 };
565
566 if let Some(s3tables) = connection.s3tables_catalog() {
570 let enable_region_check =
571 ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
572 if enable_region_check {
573 let env_id = &catalog.config().environment_id;
574 if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
575 let env_region = env_id.cloud_provider_region();
576 let s3_tables_region = s3tables
579 .aws_connection
580 .connection
581 .region
582 .clone()
583 .unwrap_or_else(|| "us-east-1".to_string());
584 if s3_tables_region != env_region {
585 Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
586 s3_tables_region,
587 environment_region: env_region.to_string(),
588 })?;
589 }
590 }
591 }
592 }
593
594 if let Some(aws_connection) = aws_connection {
600 let aws_conn_id = aws_connection.item_id();
601 let aws_connection = {
602 let item = scx.get_item_by_resolved_name(aws_connection)?;
603 match item.connection()? {
605 Connection::Aws(aws_connection) => aws_connection.clone(),
606 _ => sql_bail!(
607 "{} is not an aws connection",
608 scx.catalog.resolve_full_name(item.name())
609 ),
610 }
611 };
612
613 let _sdk_config = aws_connection
614 .load_sdk_config(
615 &storage_configuration.connection_context,
616 aws_conn_id.clone(),
617 InTask::No,
618 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
619 .get(storage_configuration.config_set()),
620 )
621 .await
622 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
623 }
624
625 let _catalog = connection
629 .connect(storage_configuration, InTask::No)
630 .await
631 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
632 }
633 }
634
635 let mut csr_connection_ids = BTreeSet::new();
636 for_each_format(format, |_, fmt| match fmt {
637 Format::Avro(AvroSchema::InlineSchema { .. })
638 | Format::Avro(AvroSchema::Glue { .. })
639 | Format::Bytes
640 | Format::Csv { .. }
641 | Format::Json { .. }
642 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
643 | Format::Regex(..)
644 | Format::Text => (),
645 Format::Avro(AvroSchema::Csr {
646 csr_connection: CsrConnectionAvro { connection, .. },
647 })
648 | Format::Protobuf(ProtobufSchema::Csr {
649 csr_connection: CsrConnectionProtobuf { connection, .. },
650 }) => {
651 csr_connection_ids.insert(*connection.connection.item_id());
652 }
653 });
654
655 let scx = StatementContext::new(None, &catalog);
656 for csr_connection_id in csr_connection_ids {
657 let connection = {
658 let item = scx.get_item(&csr_connection_id);
659 match item.connection()? {
661 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
662 _ => Err(CsrPurificationError::NotCsrConnection(
663 scx.catalog.resolve_full_name(item.name()),
664 ))?,
665 }
666 };
667
668 let client = connection
669 .connect(storage_configuration, InTask::No)
670 .await
671 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
672
673 client
674 .list_subjects()
675 .await
676 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
677 }
678
679 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
680
681 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
682}
683
684fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
691where
692 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
693{
694 match format {
695 None => (),
696 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
697 Some(FormatSpecifier::KeyValue { key, value }) => {
698 f(DocOnSchema::KeyOnly, key);
699 f(DocOnSchema::ValueOnly, value);
700 }
701 }
702}
703
704#[derive(Debug, Copy, Clone, PartialEq, Eq)]
707pub(crate) enum SourceReferencePolicy {
708 NotAllowed,
711 Optional,
714 Required,
717}
718
719async fn purify_create_source(
720 catalog: impl SessionCatalog,
721 now: u64,
722 mut create_source_stmt: CreateSourceStatement<Aug>,
723 storage_configuration: &StorageConfiguration,
724) -> Result<PurifiedStatement, PlanError> {
725 let CreateSourceStatement {
726 name: source_name,
727 col_names,
728 key_constraint,
729 connection: source_connection,
730 format,
731 envelope,
732 include_metadata,
733 external_references,
734 progress_subsource,
735 with_options,
736 ..
737 } = &mut create_source_stmt;
738
739 let uses_old_syntax = !col_names.is_empty()
740 || key_constraint.is_some()
741 || format.is_some()
742 || envelope.is_some()
743 || !include_metadata.is_empty()
744 || external_references.is_some()
745 || progress_subsource.is_some();
746
747 if let Some(DeferredItemName::Named(_)) = progress_subsource {
748 sql_bail!("Cannot manually ID qualify progress subsource")
749 }
750
751 let mut requested_subsource_map = BTreeMap::new();
752
753 let progress_desc = match &source_connection {
754 CreateSourceConnection::Kafka { .. } => {
755 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
756 }
757 CreateSourceConnection::Postgres { .. } => {
758 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
759 }
760 CreateSourceConnection::SqlServer { .. } => {
761 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
762 }
763 CreateSourceConnection::MySql { .. } => {
764 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
765 }
766 CreateSourceConnection::LoadGenerator { .. } => {
767 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
768 }
769 };
770 let scx = StatementContext::new(None, &catalog);
771
772 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
776 && scx.catalog.system_vars().force_source_table_syntax()
777 {
778 SourceReferencePolicy::NotAllowed
779 } else if scx.catalog.system_vars().enable_create_table_from_source() {
780 SourceReferencePolicy::Optional
781 } else {
782 SourceReferencePolicy::Required
783 };
784
785 let mut format_options = SourceFormatOptions::Default;
786
787 let retrieved_source_references: RetrievedSourceReferences;
788
789 match source_connection {
790 CreateSourceConnection::Kafka {
791 connection,
792 options: base_with_options,
793 ..
794 } => {
795 if let Some(external_references) = external_references {
796 Err(KafkaSourcePurificationError::ReferencedSubsources(
797 external_references.clone(),
798 ))?;
799 }
800
801 let connection = {
802 let item = scx.get_item_by_resolved_name(connection)?;
803 match item.connection()? {
805 Connection::Kafka(connection) => {
806 connection.clone().into_inline_connection(&catalog)
807 }
808 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
809 scx.catalog.resolve_full_name(item.name()),
810 ))?,
811 }
812 };
813
814 let extracted_options: KafkaSourceConfigOptionExtracted =
815 base_with_options.clone().try_into()?;
816
817 let topic = extracted_options
818 .topic
819 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
820
821 let consumer = connection
822 .create_with_context(
823 storage_configuration,
824 MzClientContext::default(),
825 &BTreeMap::new(),
826 InTask::No,
827 )
828 .await
829 .map_err(|e| {
830 KafkaSourcePurificationError::KafkaConsumerError(
832 e.display_with_causes().to_string(),
833 )
834 })?;
835 let consumer = Arc::new(consumer);
836
837 match (
838 extracted_options.start_offset,
839 extracted_options.start_timestamp,
840 ) {
841 (None, None) => {
842 kafka_util::ensure_topic_exists(
844 Arc::clone(&consumer),
845 &topic,
846 storage_configuration
847 .parameters
848 .kafka_timeout_config
849 .fetch_metadata_timeout,
850 )
851 .await?;
852 }
853 (Some(_), Some(_)) => {
854 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
855 }
856 (Some(start_offsets), None) => {
857 kafka_util::validate_start_offsets(
859 Arc::clone(&consumer),
860 &topic,
861 start_offsets,
862 storage_configuration
863 .parameters
864 .kafka_timeout_config
865 .fetch_metadata_timeout,
866 )
867 .await?;
868 }
869 (None, Some(time_offset)) => {
870 let start_offsets = kafka_util::lookup_start_offsets(
872 Arc::clone(&consumer),
873 &topic,
874 time_offset,
875 now,
876 storage_configuration
877 .parameters
878 .kafka_timeout_config
879 .fetch_metadata_timeout,
880 )
881 .await?;
882
883 base_with_options.retain(|val| {
884 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
885 });
886 base_with_options.push(KafkaSourceConfigOption {
887 name: KafkaSourceConfigOptionName::StartOffset,
888 value: Some(WithOptionValue::Sequence(
889 start_offsets
890 .iter()
891 .map(|offset| {
892 WithOptionValue::Value(Value::Number(offset.to_string()))
893 })
894 .collect(),
895 )),
896 });
897 }
898 }
899
900 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
901 retrieved_source_references = reference_client.get_source_references().await?;
902
903 format_options = SourceFormatOptions::Kafka { topic };
904 }
905 CreateSourceConnection::Postgres {
906 connection,
907 options,
908 } => {
909 let connection_item = scx.get_item_by_resolved_name(connection)?;
910 let connection = match connection_item.connection().map_err(PlanError::from)? {
911 Connection::Postgres(connection) => {
912 connection.clone().into_inline_connection(&catalog)
913 }
914 _ => Err(PgSourcePurificationError::NotPgConnection(
915 scx.catalog.resolve_full_name(connection_item.name()),
916 ))?,
917 };
918 let crate::plan::statement::PgConfigOptionExtracted {
919 publication,
920 text_columns,
921 exclude_columns,
922 details,
923 ..
924 } = options.clone().try_into()?;
925 let publication =
926 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
927
928 if details.is_some() {
929 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
930 }
931
932 let client = connection
933 .validate(connection_item.id(), storage_configuration)
934 .await?;
935
936 let reference_client = SourceReferenceClient::Postgres {
937 client: &client,
938 publication: &publication,
939 database: &connection.database,
940 };
941 retrieved_source_references = reference_client.get_source_references().await?;
942
943 let postgres::PurifiedSourceExports {
944 source_exports: subsources,
945 normalized_text_columns,
946 } = postgres::purify_source_exports(
947 &client,
948 &retrieved_source_references,
949 external_references,
950 text_columns,
951 exclude_columns,
952 source_name,
953 &reference_policy,
954 )
955 .await?;
956
957 if let Some(text_cols_option) = options
958 .iter_mut()
959 .find(|option| option.name == PgConfigOptionName::TextColumns)
960 {
961 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
962 }
963
964 requested_subsource_map.extend(subsources);
965
966 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
969
970 let is_physical_replica = Some(mz_postgres_util::get_is_in_recovery(&client).await?);
973
974 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
976 let details = PostgresSourcePublicationDetails {
977 slot: format!(
978 "materialize_{}",
979 Uuid::new_v4().to_string().replace('-', "")
980 ),
981 timeline_id: Some(timeline_id),
982 database: connection.database,
983 is_physical_replica,
984 };
985 options.push(PgConfigOption {
986 name: PgConfigOptionName::Details,
987 value: Some(WithOptionValue::Value(Value::String(hex::encode(
988 details.into_proto().encode_to_vec(),
989 )))),
990 })
991 }
992 CreateSourceConnection::SqlServer {
993 connection,
994 options,
995 } => {
996 let connection_item = scx.get_item_by_resolved_name(connection)?;
999 let connection = match connection_item.connection()? {
1000 Connection::SqlServer(connection) => {
1001 connection.clone().into_inline_connection(&catalog)
1002 }
1003 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
1004 scx.catalog.resolve_full_name(connection_item.name()),
1005 ))?,
1006 };
1007 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
1008 details,
1009 text_columns,
1010 exclude_columns,
1011 seen: _,
1012 } = options.clone().try_into()?;
1013
1014 if details.is_some() {
1015 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
1016 }
1017
1018 let mut client = connection
1019 .validate(connection_item.id(), storage_configuration)
1020 .await?;
1021
1022 let database: Arc<str> = connection.database.into();
1023 let reference_client = SourceReferenceClient::SqlServer {
1024 client: &mut client,
1025 database: Arc::clone(&database),
1026 };
1027 retrieved_source_references = reference_client.get_source_references().await?;
1028 tracing::debug!(?retrieved_source_references, "got source references");
1029
1030 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1031 .get(storage_configuration.config_set());
1032
1033 let purified_source_exports = sql_server::purify_source_exports(
1034 &*database,
1035 &mut client,
1036 &retrieved_source_references,
1037 external_references,
1038 &text_columns,
1039 &exclude_columns,
1040 source_name,
1041 timeout,
1042 &reference_policy,
1043 )
1044 .await?;
1045
1046 let sql_server::PurifiedSourceExports {
1047 source_exports,
1048 normalized_text_columns,
1049 normalized_excl_columns,
1050 } = purified_source_exports;
1051
1052 requested_subsource_map.extend(source_exports);
1054
1055 let restore_history_id =
1059 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1060 let details = SqlServerSourceExtras { restore_history_id };
1061
1062 options.retain(|SqlServerConfigOption { name, .. }| {
1063 name != &SqlServerConfigOptionName::Details
1064 });
1065 options.push(SqlServerConfigOption {
1066 name: SqlServerConfigOptionName::Details,
1067 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1068 details.into_proto().encode_to_vec(),
1069 )))),
1070 });
1071
1072 if let Some(text_cols_option) = options
1074 .iter_mut()
1075 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1076 {
1077 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1078 }
1079 if let Some(excl_cols_option) = options
1080 .iter_mut()
1081 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1082 {
1083 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1084 }
1085 }
1086 CreateSourceConnection::MySql {
1087 connection,
1088 options,
1089 } => {
1090 let connection_item = scx.get_item_by_resolved_name(connection)?;
1091 let connection = match connection_item.connection()? {
1092 Connection::MySql(connection) => {
1093 connection.clone().into_inline_connection(&catalog)
1094 }
1095 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1096 scx.catalog.resolve_full_name(connection_item.name()),
1097 ))?,
1098 };
1099 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1100 details,
1101 text_columns,
1102 exclude_columns,
1103 seen: _,
1104 } = options.clone().try_into()?;
1105
1106 if details.is_some() {
1107 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1108 }
1109
1110 let mut conn = connection
1111 .validate(connection_item.id(), storage_configuration)
1112 .await
1113 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1114
1115 let initial_gtid_set =
1119 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1120
1121 let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1122
1123 let reference_client = SourceReferenceClient::MySql {
1124 conn: &mut conn,
1125 include_system_schemas: mysql::references_system_schemas(external_references),
1126 };
1127 retrieved_source_references = reference_client.get_source_references().await?;
1128
1129 let mysql::PurifiedSourceExports {
1130 source_exports: subsources,
1131 normalized_text_columns,
1132 normalized_exclude_columns,
1133 } = mysql::purify_source_exports(
1134 &mut conn,
1135 &retrieved_source_references,
1136 external_references,
1137 text_columns,
1138 exclude_columns,
1139 source_name,
1140 initial_gtid_set.clone(),
1141 &reference_policy,
1142 binlog_full_metadata,
1143 )
1144 .await?;
1145 requested_subsource_map.extend(subsources);
1146
1147 let details = MySqlSourceDetails {};
1150 options
1152 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1153 options.push(MySqlConfigOption {
1154 name: MySqlConfigOptionName::Details,
1155 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1156 details.into_proto().encode_to_vec(),
1157 )))),
1158 });
1159
1160 if let Some(text_cols_option) = options
1161 .iter_mut()
1162 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1163 {
1164 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1165 }
1166 if let Some(exclude_cols_option) = options
1167 .iter_mut()
1168 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1169 {
1170 exclude_cols_option.value =
1171 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1172 }
1173 }
1174 CreateSourceConnection::LoadGenerator { generator, options } => {
1175 let load_generator =
1176 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1177
1178 let reference_client = SourceReferenceClient::LoadGenerator {
1179 generator: &load_generator,
1180 };
1181 retrieved_source_references = reference_client.get_source_references().await?;
1182 let multi_output_sources =
1186 retrieved_source_references
1187 .all_references()
1188 .iter()
1189 .any(|r| {
1190 matches!(
1191 r.load_generator_output(),
1192 Some(output) if output != &LoadGeneratorOutput::Default
1193 )
1194 });
1195
1196 match external_references {
1197 Some(requested)
1198 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1199 {
1200 Err(PlanError::UseTablesForSources(requested.to_string()))?
1201 }
1202 Some(requested) if !multi_output_sources => match requested {
1203 ExternalReferences::SubsetTables(_) => {
1204 Err(LoadGeneratorSourcePurificationError::ForTables)?
1205 }
1206 ExternalReferences::SubsetSchemas(_) => {
1207 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1208 }
1209 ExternalReferences::All => {
1210 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1211 }
1212 },
1213 Some(requested) => {
1214 let requested_exports = retrieved_source_references
1215 .requested_source_exports(Some(requested), source_name)?;
1216 for export in requested_exports {
1217 requested_subsource_map.insert(
1218 export.name,
1219 PurifiedSourceExport {
1220 external_reference: export.external_reference,
1221 details: PurifiedExportDetails::LoadGenerator {
1222 table: export
1223 .meta
1224 .load_generator_desc()
1225 .ok_or_else(|| {
1226 internal_err!(
1227 "expected load generator source reference"
1228 )
1229 })?
1230 .clone(),
1231 output: export
1232 .meta
1233 .load_generator_output()
1234 .ok_or_else(|| {
1235 internal_err!(
1236 "expected load generator source reference"
1237 )
1238 })?
1239 .clone(),
1240 },
1241 },
1242 );
1243 }
1244 }
1245 None => {
1246 if multi_output_sources
1247 && matches!(reference_policy, SourceReferencePolicy::Required)
1248 {
1249 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1250 }
1251 }
1252 }
1253
1254 if let LoadGenerator::Clock = generator {
1255 if !options
1256 .iter()
1257 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1258 {
1259 let now = catalog.now();
1260 options.push(LoadGeneratorOption {
1261 name: LoadGeneratorOptionName::AsOf,
1262 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1263 });
1264 }
1265 }
1266 }
1267 }
1268
1269 *external_references = None;
1273
1274 let create_progress_subsource_stmt = if uses_old_syntax {
1276 let name = match progress_subsource {
1278 Some(name) => match name {
1279 DeferredItemName::Deferred(name) => name.clone(),
1280 DeferredItemName::Named(_) => {
1282 sql_bail!("progress subsource name cannot be a resolved name")
1283 }
1284 },
1285 None => {
1286 let (item, prefix) = source_name
1287 .0
1288 .split_last()
1289 .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1290 let item_name =
1291 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1292 let mut suggested_name = prefix.to_vec();
1293 suggested_name.push(candidate.clone());
1294
1295 let partial =
1296 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1297 let qualified = scx.allocate_qualified_name(partial)?;
1298 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1299 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1300 Ok::<_, PlanError>(!item_exists && !type_exists)
1301 })?;
1302
1303 let mut full_name = prefix.to_vec();
1304 full_name.push(item_name);
1305 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1306 let qualified_name = scx.allocate_qualified_name(full_name)?;
1307 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1308
1309 UnresolvedItemName::from(full_name.clone())
1310 }
1311 };
1312
1313 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1314
1315 let mut progress_with_options: Vec<_> = with_options
1317 .iter()
1318 .filter_map(|opt| match opt.name {
1319 CreateSourceOptionName::TimestampInterval => None,
1320 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1321 name: CreateSubsourceOptionName::RetainHistory,
1322 value: opt.value.clone(),
1323 }),
1324 })
1325 .collect();
1326 progress_with_options.push(CreateSubsourceOption {
1327 name: CreateSubsourceOptionName::Progress,
1328 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1329 });
1330
1331 Some(CreateSubsourceStatement {
1332 name,
1333 columns,
1334 of_source: None,
1338 constraints,
1339 if_not_exists: false,
1340 with_options: progress_with_options,
1341 })
1342 } else {
1343 None
1344 };
1345
1346 purify_source_format(
1347 &catalog,
1348 format,
1349 &format_options,
1350 envelope,
1351 storage_configuration,
1352 )
1353 .await?;
1354
1355 Ok(PurifiedStatement::PurifiedCreateSource {
1356 create_progress_subsource_stmt,
1357 create_source_stmt,
1358 subsources: requested_subsource_map,
1359 available_source_references: retrieved_source_references.available_source_references(),
1360 })
1361}
1362
1363async fn purify_alter_source(
1366 catalog: impl SessionCatalog,
1367 stmt: AlterSourceStatement<Aug>,
1368 storage_configuration: &StorageConfiguration,
1369) -> Result<PurifiedStatement, PlanError> {
1370 let scx = StatementContext::new(None, &catalog);
1371 let AlterSourceStatement {
1372 source_name: unresolved_source_name,
1373 action,
1374 if_exists,
1375 } = stmt;
1376
1377 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1379 Ok(item) => item,
1380 Err(_) if if_exists => {
1381 return Ok(PurifiedStatement::PurifiedAlterSource {
1382 alter_source_stmt: AlterSourceStatement {
1383 source_name: unresolved_source_name,
1384 action,
1385 if_exists,
1386 },
1387 });
1388 }
1389 Err(e) => return Err(e),
1390 };
1391
1392 let desc = match item.source_desc()? {
1394 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1395 None => {
1396 sql_bail!("cannot ALTER this type of source")
1397 }
1398 };
1399
1400 let source_name = item.name();
1401
1402 let resolved_source_name = ResolvedItemName::Item {
1403 id: item.id(),
1404 qualifiers: item.name().qualifiers.clone(),
1405 full_name: scx.catalog.resolve_full_name(source_name),
1406 print_id: true,
1407 version: RelationVersionSelector::Latest,
1408 };
1409
1410 let partial_name = scx.catalog.minimal_qualification(source_name);
1411
1412 match action {
1413 AlterSourceAction::AddSubsources {
1414 external_references,
1415 options,
1416 } => {
1417 if scx.catalog.system_vars().enable_create_table_from_source()
1418 && scx.catalog.system_vars().force_source_table_syntax()
1419 {
1420 Err(PlanError::UseTablesForSources(
1421 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1422 ))?;
1423 }
1424
1425 purify_alter_source_add_subsources(
1426 external_references,
1427 options,
1428 desc,
1429 partial_name,
1430 unresolved_source_name,
1431 resolved_source_name,
1432 storage_configuration,
1433 )
1434 .await
1435 }
1436 AlterSourceAction::RefreshReferences => {
1437 purify_alter_source_refresh_references(
1438 desc,
1439 resolved_source_name,
1440 storage_configuration,
1441 )
1442 .await
1443 }
1444 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1445 alter_source_stmt: AlterSourceStatement {
1446 source_name: unresolved_source_name,
1447 action,
1448 if_exists,
1449 },
1450 }),
1451 }
1452}
1453
1454async fn purify_alter_source_add_subsources(
1457 external_references: Vec<ExternalReferenceExport>,
1458 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1459 desc: SourceDesc,
1460 partial_source_name: PartialItemName,
1461 unresolved_source_name: UnresolvedItemName,
1462 resolved_source_name: ResolvedItemName,
1463 storage_configuration: &StorageConfiguration,
1464) -> Result<PurifiedStatement, PlanError> {
1465 let connection_id = match &desc.connection {
1467 GenericSourceConnection::Postgres(c) => c.connection_id,
1468 GenericSourceConnection::MySql(c) => c.connection_id,
1469 GenericSourceConnection::SqlServer(c) => c.connection_id,
1470 _ => sql_bail!(
1471 "source {} does not support ALTER SOURCE.",
1472 partial_source_name
1473 ),
1474 };
1475
1476 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1477 text_columns,
1478 exclude_columns,
1479 details,
1480 seen: _,
1481 } = options.clone().try_into()?;
1482 if details.is_some() {
1483 sql_bail!("DETAILS option cannot be explicitly set");
1484 }
1485
1486 let mut requested_subsource_map = BTreeMap::new();
1487
1488 match desc.connection {
1489 GenericSourceConnection::Postgres(pg_source_connection) => {
1490 let pg_connection = &pg_source_connection.connection;
1492
1493 let client = pg_connection
1494 .validate(connection_id, storage_configuration)
1495 .await?;
1496
1497 let reference_client = SourceReferenceClient::Postgres {
1498 client: &client,
1499 publication: &pg_source_connection.publication,
1500 database: &pg_connection.database,
1501 };
1502 let retrieved_source_references = reference_client.get_source_references().await?;
1503
1504 let postgres::PurifiedSourceExports {
1505 source_exports: subsources,
1506 normalized_text_columns,
1507 } = postgres::purify_source_exports(
1508 &client,
1509 &retrieved_source_references,
1510 &Some(ExternalReferences::SubsetTables(external_references)),
1511 text_columns,
1512 exclude_columns,
1513 &unresolved_source_name,
1514 &SourceReferencePolicy::Required,
1515 )
1516 .await?;
1517
1518 if let Some(text_cols_option) = options
1519 .iter_mut()
1520 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1521 {
1522 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1523 }
1524
1525 requested_subsource_map.extend(subsources);
1526 }
1527 GenericSourceConnection::MySql(mysql_source_connection) => {
1528 let mysql_connection = &mysql_source_connection.connection;
1529 let config = mysql_connection
1530 .config(
1531 &storage_configuration.connection_context.secrets_reader,
1532 storage_configuration,
1533 InTask::No,
1534 )
1535 .await?;
1536
1537 let mut conn = config
1538 .connect(
1539 "mysql purification",
1540 &storage_configuration.connection_context.ssh_tunnel_manager,
1541 )
1542 .await?;
1543
1544 let initial_gtid_set =
1547 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1548
1549 let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1550
1551 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1552
1553 let reference_client = SourceReferenceClient::MySql {
1554 conn: &mut conn,
1555 include_system_schemas: mysql::references_system_schemas(&requested_references),
1556 };
1557 let retrieved_source_references = reference_client.get_source_references().await?;
1558
1559 let mysql::PurifiedSourceExports {
1560 source_exports: subsources,
1561 normalized_text_columns,
1562 normalized_exclude_columns,
1563 } = mysql::purify_source_exports(
1564 &mut conn,
1565 &retrieved_source_references,
1566 &requested_references,
1567 text_columns,
1568 exclude_columns,
1569 &unresolved_source_name,
1570 initial_gtid_set,
1571 &SourceReferencePolicy::Required,
1572 binlog_full_metadata,
1573 )
1574 .await?;
1575 requested_subsource_map.extend(subsources);
1576
1577 if let Some(text_cols_option) = options
1579 .iter_mut()
1580 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1581 {
1582 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1583 }
1584 if let Some(exclude_cols_option) = options
1585 .iter_mut()
1586 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1587 {
1588 exclude_cols_option.value =
1589 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1590 }
1591 }
1592 GenericSourceConnection::SqlServer(sql_server_source) => {
1593 let sql_server_connection = &sql_server_source.connection;
1595 let config = sql_server_connection
1596 .resolve_config(
1597 &storage_configuration.connection_context.secrets_reader,
1598 storage_configuration,
1599 InTask::No,
1600 )
1601 .await?;
1602 let mut client = mz_sql_server_util::Client::connect(config).await?;
1603
1604 let database = sql_server_connection.database.clone().into();
1606 let source_references = SourceReferenceClient::SqlServer {
1607 client: &mut client,
1608 database: Arc::clone(&database),
1609 }
1610 .get_source_references()
1611 .await?;
1612 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1613
1614 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1615 .get(storage_configuration.config_set());
1616
1617 let result = sql_server::purify_source_exports(
1618 &*database,
1619 &mut client,
1620 &source_references,
1621 &requested_references,
1622 &text_columns,
1623 &exclude_columns,
1624 &unresolved_source_name,
1625 timeout,
1626 &SourceReferencePolicy::Required,
1627 )
1628 .await;
1629 let sql_server::PurifiedSourceExports {
1630 source_exports,
1631 normalized_text_columns,
1632 normalized_excl_columns,
1633 } = result?;
1634
1635 requested_subsource_map.extend(source_exports);
1637
1638 if let Some(text_cols_option) = options
1640 .iter_mut()
1641 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1642 {
1643 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1644 }
1645 if let Some(exclude_cols_option) = options
1646 .iter_mut()
1647 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1648 {
1649 exclude_cols_option.value =
1650 Some(WithOptionValue::Sequence(normalized_excl_columns));
1651 }
1652 }
1653 _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1656 };
1657
1658 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1659 source_name: resolved_source_name,
1660 options,
1661 subsources: requested_subsource_map,
1662 })
1663}
1664
1665async fn purify_alter_source_refresh_references(
1666 desc: SourceDesc,
1667 resolved_source_name: ResolvedItemName,
1668 storage_configuration: &StorageConfiguration,
1669) -> Result<PurifiedStatement, PlanError> {
1670 let retrieved_source_references = match desc.connection {
1671 GenericSourceConnection::Postgres(pg_source_connection) => {
1672 let pg_connection = &pg_source_connection.connection;
1674
1675 let config = pg_connection
1676 .config(
1677 &storage_configuration.connection_context.secrets_reader,
1678 storage_configuration,
1679 InTask::No,
1680 )
1681 .await?;
1682
1683 let client = config
1684 .connect(
1685 "postgres_purification",
1686 &storage_configuration.connection_context.ssh_tunnel_manager,
1687 )
1688 .await?;
1689 let reference_client = SourceReferenceClient::Postgres {
1690 client: &client,
1691 publication: &pg_source_connection.publication,
1692 database: &pg_connection.database,
1693 };
1694 reference_client.get_source_references().await?
1695 }
1696 GenericSourceConnection::MySql(mysql_source_connection) => {
1697 let mysql_connection = &mysql_source_connection.connection;
1698 let config = mysql_connection
1699 .config(
1700 &storage_configuration.connection_context.secrets_reader,
1701 storage_configuration,
1702 InTask::No,
1703 )
1704 .await?;
1705
1706 let mut conn = config
1707 .connect(
1708 "mysql purification",
1709 &storage_configuration.connection_context.ssh_tunnel_manager,
1710 )
1711 .await?;
1712
1713 let reference_client = SourceReferenceClient::MySql {
1714 conn: &mut conn,
1715 include_system_schemas: false,
1716 };
1717 reference_client.get_source_references().await?
1718 }
1719 GenericSourceConnection::SqlServer(sql_server_source) => {
1720 let sql_server_connection = &sql_server_source.connection;
1722 let config = sql_server_connection
1723 .resolve_config(
1724 &storage_configuration.connection_context.secrets_reader,
1725 storage_configuration,
1726 InTask::No,
1727 )
1728 .await?;
1729 let mut client = mz_sql_server_util::Client::connect(config).await?;
1730
1731 let source_references = SourceReferenceClient::SqlServer {
1733 client: &mut client,
1734 database: sql_server_connection.database.clone().into(),
1735 }
1736 .get_source_references()
1737 .await?;
1738 source_references
1739 }
1740 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1741 let reference_client = SourceReferenceClient::LoadGenerator {
1742 generator: &load_gen_connection.load_generator,
1743 };
1744 reference_client.get_source_references().await?
1745 }
1746 GenericSourceConnection::Kafka(kafka_conn) => {
1747 let reference_client = SourceReferenceClient::Kafka {
1748 topic: &kafka_conn.topic,
1749 };
1750 reference_client.get_source_references().await?
1751 }
1752 };
1753 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1754 source_name: resolved_source_name,
1755 available_source_references: retrieved_source_references.available_source_references(),
1756 })
1757}
1758
1759async fn purify_create_table_from_source(
1760 catalog: impl SessionCatalog,
1761 mut stmt: CreateTableFromSourceStatement<Aug>,
1762 storage_configuration: &StorageConfiguration,
1763) -> Result<PurifiedStatement, PlanError> {
1764 let scx = StatementContext::new(None, &catalog);
1765 let CreateTableFromSourceStatement {
1766 name: _,
1767 columns,
1768 constraints,
1769 source: source_name,
1770 if_not_exists: _,
1771 external_reference,
1772 format,
1773 envelope,
1774 include_metadata: _,
1775 with_options,
1776 } = &mut stmt;
1777
1778 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1780 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1781 }
1782 if !constraints.is_empty() {
1783 sql_bail!(
1784 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1785 );
1786 }
1787
1788 let item = match scx.get_item_by_resolved_name(source_name) {
1790 Ok(item) => item,
1791 Err(e) => return Err(e),
1792 };
1793
1794 let desc = match item.source_desc()? {
1796 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1797 None => {
1798 sql_bail!("cannot ALTER this type of source")
1799 }
1800 };
1801 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1802
1803 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1804 text_columns,
1805 exclude_columns,
1806 retain_history: _,
1807 details,
1808 partition_by: _,
1809 seen: _,
1810 } = with_options.clone().try_into()?;
1811 if details.is_some() {
1812 sql_bail!("DETAILS option cannot be explicitly set");
1813 }
1814
1815 let qualified_text_columns = text_columns
1819 .iter()
1820 .map(|col| {
1821 UnresolvedItemName(
1822 external_reference
1823 .as_ref()
1824 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1825 .unwrap_or_else(|| vec![col.clone()]),
1826 )
1827 })
1828 .collect_vec();
1829 let qualified_exclude_columns = exclude_columns
1830 .iter()
1831 .map(|col| {
1832 UnresolvedItemName(
1833 external_reference
1834 .as_ref()
1835 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1836 .unwrap_or_else(|| vec![col.clone()]),
1837 )
1838 })
1839 .collect_vec();
1840
1841 let mut format_options = SourceFormatOptions::Default;
1843
1844 let retrieved_source_references: RetrievedSourceReferences;
1845
1846 let requested_references = external_reference.as_ref().map(|ref_name| {
1847 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1848 reference: ref_name.clone(),
1849 alias: None,
1850 }])
1851 });
1852
1853 let purified_export = match desc.connection {
1856 GenericSourceConnection::Postgres(pg_source_connection) => {
1857 let pg_connection = &pg_source_connection.connection;
1859
1860 let client = pg_connection
1861 .validate(pg_source_connection.connection_id, storage_configuration)
1862 .await?;
1863
1864 let reference_client = SourceReferenceClient::Postgres {
1865 client: &client,
1866 publication: &pg_source_connection.publication,
1867 database: &pg_connection.database,
1868 };
1869 retrieved_source_references = reference_client.get_source_references().await?;
1870
1871 let postgres::PurifiedSourceExports {
1872 source_exports,
1873 normalized_text_columns: _,
1877 } = postgres::purify_source_exports(
1878 &client,
1879 &retrieved_source_references,
1880 &requested_references,
1881 qualified_text_columns,
1882 qualified_exclude_columns,
1883 &unresolved_source_name,
1884 &SourceReferencePolicy::Required,
1885 )
1886 .await?;
1887 let (_, purified_export) = source_exports.into_element();
1889 purified_export
1890 }
1891 GenericSourceConnection::MySql(mysql_source_connection) => {
1892 let mysql_connection = &mysql_source_connection.connection;
1893 let config = mysql_connection
1894 .config(
1895 &storage_configuration.connection_context.secrets_reader,
1896 storage_configuration,
1897 InTask::No,
1898 )
1899 .await?;
1900
1901 let mut conn = config
1902 .connect(
1903 "mysql purification",
1904 &storage_configuration.connection_context.ssh_tunnel_manager,
1905 )
1906 .await?;
1907
1908 ensure_binlog_full_metadata(&mut conn).await?;
1909 let binlog_full_metadata = true;
1910
1911 let initial_gtid_set =
1914 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1915
1916 let reference_client = SourceReferenceClient::MySql {
1917 conn: &mut conn,
1918 include_system_schemas: mysql::references_system_schemas(&requested_references),
1919 };
1920 retrieved_source_references = reference_client.get_source_references().await?;
1921
1922 let mysql::PurifiedSourceExports {
1923 source_exports,
1924 normalized_text_columns: _,
1928 normalized_exclude_columns: _,
1929 } = mysql::purify_source_exports(
1930 &mut conn,
1931 &retrieved_source_references,
1932 &requested_references,
1933 qualified_text_columns,
1934 qualified_exclude_columns,
1935 &unresolved_source_name,
1936 initial_gtid_set,
1937 &SourceReferencePolicy::Required,
1938 binlog_full_metadata,
1939 )
1940 .await?;
1941 let (_, purified_export) = source_exports.into_element();
1943 purified_export
1944 }
1945 GenericSourceConnection::SqlServer(sql_server_source) => {
1946 let connection = sql_server_source.connection;
1947 let config = connection
1948 .resolve_config(
1949 &storage_configuration.connection_context.secrets_reader,
1950 storage_configuration,
1951 InTask::No,
1952 )
1953 .await?;
1954 let mut client = mz_sql_server_util::Client::connect(config).await?;
1955
1956 let database: Arc<str> = connection.database.into();
1957 let reference_client = SourceReferenceClient::SqlServer {
1958 client: &mut client,
1959 database: Arc::clone(&database),
1960 };
1961 retrieved_source_references = reference_client.get_source_references().await?;
1962 tracing::debug!(?retrieved_source_references, "got source references");
1963
1964 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1965 .get(storage_configuration.config_set());
1966
1967 let purified_source_exports = sql_server::purify_source_exports(
1968 &*database,
1969 &mut client,
1970 &retrieved_source_references,
1971 &requested_references,
1972 &qualified_text_columns,
1973 &qualified_exclude_columns,
1974 &unresolved_source_name,
1975 timeout,
1976 &SourceReferencePolicy::Required,
1977 )
1978 .await?;
1979
1980 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1982 purified_export
1983 }
1984 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1985 let reference_client = SourceReferenceClient::LoadGenerator {
1986 generator: &load_gen_connection.load_generator,
1987 };
1988 retrieved_source_references = reference_client.get_source_references().await?;
1989
1990 let requested_exports = retrieved_source_references
1991 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1992 let export = requested_exports.into_element();
1994 PurifiedSourceExport {
1995 external_reference: export.external_reference,
1996 details: PurifiedExportDetails::LoadGenerator {
1997 table: export
1998 .meta
1999 .load_generator_desc()
2000 .ok_or_else(|| internal_err!("expected load generator source reference"))?
2001 .clone(),
2002 output: export
2003 .meta
2004 .load_generator_output()
2005 .ok_or_else(|| internal_err!("expected load generator source reference"))?
2006 .clone(),
2007 },
2008 }
2009 }
2010 GenericSourceConnection::Kafka(kafka_conn) => {
2011 let reference_client = SourceReferenceClient::Kafka {
2012 topic: &kafka_conn.topic,
2013 };
2014 retrieved_source_references = reference_client.get_source_references().await?;
2015 let requested_exports = retrieved_source_references
2016 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2017 let export = requested_exports.into_element();
2019
2020 format_options = SourceFormatOptions::Kafka {
2021 topic: kafka_conn.topic.clone(),
2022 };
2023 PurifiedSourceExport {
2024 external_reference: export.external_reference,
2025 details: PurifiedExportDetails::Kafka {},
2026 }
2027 }
2028 };
2029
2030 purify_source_format(
2031 &catalog,
2032 format,
2033 &format_options,
2034 envelope,
2035 storage_configuration,
2036 )
2037 .await?;
2038
2039 *external_reference = Some(purified_export.external_reference.clone());
2042
2043 match &purified_export.details {
2045 PurifiedExportDetails::Postgres { .. } => {
2046 let mut unsupported_cols = vec![];
2047 let postgres::PostgresExportStatementValues {
2048 columns: gen_columns,
2049 constraints: gen_constraints,
2050 text_columns: gen_text_columns,
2051 exclude_columns: gen_exclude_columns,
2052 details: gen_details,
2053 external_reference: _,
2054 } = postgres::generate_source_export_statement_values(
2055 &scx,
2056 purified_export,
2057 &mut unsupported_cols,
2058 )?;
2059 if !unsupported_cols.is_empty() {
2060 unsupported_cols.sort();
2061 Err(PgSourcePurificationError::UnrecognizedTypes {
2062 cols: unsupported_cols,
2063 })?;
2064 }
2065
2066 if let Some(text_cols_option) = with_options
2067 .iter_mut()
2068 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2069 {
2070 if let Some(gen_text_columns) = gen_text_columns {
2071 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2072 }
2073 }
2074 if let Some(exclude_cols_option) = with_options
2075 .iter_mut()
2076 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2077 {
2078 if let Some(gen_exclude_columns) = gen_exclude_columns {
2079 exclude_cols_option.value =
2080 Some(WithOptionValue::Sequence(gen_exclude_columns));
2081 }
2082 }
2083 match columns {
2084 TableFromSourceColumns::Defined(_) => {
2085 bail_internal!(
2088 "column definitions cannot be explicitly set for this source type"
2089 )
2090 }
2091 TableFromSourceColumns::NotSpecified => {
2092 *columns = TableFromSourceColumns::Defined(gen_columns);
2093 *constraints = gen_constraints;
2094 }
2095 TableFromSourceColumns::Named(_) => {
2096 sql_bail!("columns cannot be named for Postgres sources")
2097 }
2098 }
2099 with_options.push(TableFromSourceOption {
2100 name: TableFromSourceOptionName::Details,
2101 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2102 gen_details.into_proto().encode_to_vec(),
2103 )))),
2104 })
2105 }
2106 PurifiedExportDetails::MySql { .. } => {
2107 let mysql::MySqlExportStatementValues {
2108 columns: gen_columns,
2109 constraints: gen_constraints,
2110 text_columns: gen_text_columns,
2111 exclude_columns: gen_exclude_columns,
2112 details: gen_details,
2113 external_reference: _,
2114 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2115
2116 if let Some(text_cols_option) = with_options
2117 .iter_mut()
2118 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2119 {
2120 if let Some(gen_text_columns) = gen_text_columns {
2121 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2122 }
2123 }
2124 if let Some(exclude_cols_option) = with_options
2125 .iter_mut()
2126 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2127 {
2128 if let Some(gen_exclude_columns) = gen_exclude_columns {
2129 exclude_cols_option.value =
2130 Some(WithOptionValue::Sequence(gen_exclude_columns));
2131 }
2132 }
2133 match columns {
2134 TableFromSourceColumns::Defined(_) => {
2135 bail_internal!(
2138 "column definitions cannot be explicitly set for this source type"
2139 )
2140 }
2141 TableFromSourceColumns::NotSpecified => {
2142 *columns = TableFromSourceColumns::Defined(gen_columns);
2143 *constraints = gen_constraints;
2144 }
2145 TableFromSourceColumns::Named(_) => {
2146 sql_bail!("columns cannot be named for MySQL sources")
2147 }
2148 }
2149 with_options.push(TableFromSourceOption {
2150 name: TableFromSourceOptionName::Details,
2151 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2152 gen_details.into_proto().encode_to_vec(),
2153 )))),
2154 })
2155 }
2156 PurifiedExportDetails::SqlServer { .. } => {
2157 let sql_server::SqlServerExportStatementValues {
2158 columns: gen_columns,
2159 constraints: gen_constraints,
2160 text_columns: gen_text_columns,
2161 excl_columns: gen_excl_columns,
2162 details: gen_details,
2163 external_reference: _,
2164 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2165
2166 if let Some(text_cols_option) = with_options
2167 .iter_mut()
2168 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2169 {
2170 if let Some(gen_text_columns) = gen_text_columns {
2171 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2172 }
2173 }
2174 if let Some(exclude_cols_option) = with_options
2175 .iter_mut()
2176 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2177 {
2178 if let Some(gen_excl_columns) = gen_excl_columns {
2179 exclude_cols_option.value = Some(WithOptionValue::Sequence(gen_excl_columns));
2180 }
2181 }
2182
2183 match columns {
2184 TableFromSourceColumns::NotSpecified => {
2185 *columns = TableFromSourceColumns::Defined(gen_columns);
2186 *constraints = gen_constraints;
2187 }
2188 TableFromSourceColumns::Named(_) => {
2189 sql_bail!("columns cannot be named for SQL Server sources")
2190 }
2191 TableFromSourceColumns::Defined(_) => {
2192 bail_internal!(
2195 "column definitions cannot be explicitly set for this source type"
2196 )
2197 }
2198 }
2199
2200 with_options.push(TableFromSourceOption {
2201 name: TableFromSourceOptionName::Details,
2202 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2203 gen_details.into_proto().encode_to_vec(),
2204 )))),
2205 })
2206 }
2207 PurifiedExportDetails::LoadGenerator { .. } => {
2208 let (desc, output) = match purified_export.details {
2209 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2210 _ => bail_internal!("purified export details must be load generator"),
2211 };
2212 if let Some(desc) = desc {
2217 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2218 match columns {
2219 TableFromSourceColumns::Defined(_) => bail_internal!(
2222 "column definitions cannot be explicitly set for this source type"
2223 ),
2224 TableFromSourceColumns::NotSpecified => {
2225 *columns = TableFromSourceColumns::Defined(gen_columns);
2226 *constraints = gen_constraints;
2227 }
2228 TableFromSourceColumns::Named(_) => {
2229 sql_bail!("columns cannot be named for multi-output load generator sources")
2230 }
2231 }
2232 }
2233 let details = SourceExportStatementDetails::LoadGenerator { output };
2234 with_options.push(TableFromSourceOption {
2235 name: TableFromSourceOptionName::Details,
2236 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2237 details.into_proto().encode_to_vec(),
2238 )))),
2239 })
2240 }
2241 PurifiedExportDetails::Kafka {} => {
2242 let details = SourceExportStatementDetails::Kafka {};
2246 with_options.push(TableFromSourceOption {
2247 name: TableFromSourceOptionName::Details,
2248 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2249 details.into_proto().encode_to_vec(),
2250 )))),
2251 })
2252 }
2253 };
2254
2255 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2259}
2260
2261enum SourceFormatOptions {
2262 Default,
2263 Kafka { topic: String },
2264}
2265
2266async fn purify_source_format(
2267 catalog: &dyn SessionCatalog,
2268 format: &mut Option<FormatSpecifier<Aug>>,
2269 options: &SourceFormatOptions,
2270 envelope: &Option<SourceEnvelope>,
2271 storage_configuration: &StorageConfiguration,
2272) -> Result<(), PlanError> {
2273 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2274 && !matches!(options, SourceFormatOptions::Kafka { .. })
2275 {
2276 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2277 }
2278
2279 match format.as_mut() {
2280 None => {}
2281 Some(FormatSpecifier::Bare(format)) => {
2282 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2283 .await?;
2284 }
2285
2286 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2287 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2288 .await?;
2289 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2290 .await?;
2291 }
2292 }
2293 Ok(())
2294}
2295
2296async fn purify_source_format_single(
2297 catalog: &dyn SessionCatalog,
2298 format: &mut Format<Aug>,
2299 options: &SourceFormatOptions,
2300 envelope: &Option<SourceEnvelope>,
2301 storage_configuration: &StorageConfiguration,
2302) -> Result<(), PlanError> {
2303 match format {
2304 Format::Avro(schema) => match schema {
2305 AvroSchema::Csr { csr_connection } => {
2306 purify_csr_connection_avro(
2307 catalog,
2308 options,
2309 csr_connection,
2310 envelope,
2311 storage_configuration,
2312 )
2313 .await?
2314 }
2315 AvroSchema::InlineSchema { .. } => {}
2316 AvroSchema::Glue {
2317 connection,
2318 with_options,
2319 seed,
2320 } => {
2321 purify_glue_connection_avro(
2322 catalog,
2323 options,
2324 connection,
2325 with_options,
2326 seed,
2327 storage_configuration,
2328 )
2329 .await?
2330 }
2331 },
2332 Format::Protobuf(schema) => match schema {
2333 ProtobufSchema::Csr { csr_connection } => {
2334 purify_csr_connection_proto(
2335 catalog,
2336 options,
2337 csr_connection,
2338 envelope,
2339 storage_configuration,
2340 )
2341 .await?;
2342 }
2343 ProtobufSchema::InlineSchema { .. } => {}
2344 },
2345 Format::Bytes
2346 | Format::Regex(_)
2347 | Format::Json { .. }
2348 | Format::Text
2349 | Format::Csv { .. } => (),
2350 }
2351 Ok(())
2352}
2353
2354pub fn generate_subsource_statements(
2355 scx: &StatementContext,
2356 source_name: ResolvedItemName,
2357 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2358) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2359 if subsources.is_empty() {
2361 return Ok(vec![]);
2362 }
2363 let (_, purified_export) = subsources
2364 .iter()
2365 .next()
2366 .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2367
2368 let statements = match &purified_export.details {
2369 PurifiedExportDetails::Postgres { .. } => {
2370 crate::pure::postgres::generate_create_subsource_statements(
2371 scx,
2372 source_name,
2373 subsources,
2374 )?
2375 }
2376 PurifiedExportDetails::MySql { .. } => {
2377 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2378 }
2379 PurifiedExportDetails::SqlServer { .. } => {
2380 crate::pure::sql_server::generate_create_subsource_statements(
2381 scx,
2382 source_name,
2383 subsources,
2384 )?
2385 }
2386 PurifiedExportDetails::LoadGenerator { .. } => {
2387 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2388 for (subsource_name, purified_export) in subsources {
2389 let (desc, output) = match purified_export.details {
2390 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2391 _ => {
2392 bail_internal!("purified export details must be load generator")
2393 }
2394 };
2395 let desc = desc.ok_or_else(|| {
2396 internal_err!(
2397 "subsources cannot be generated for single-output load generators"
2398 )
2399 })?;
2400
2401 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2402 let details = SourceExportStatementDetails::LoadGenerator { output };
2403 let subsource = CreateSubsourceStatement {
2405 name: subsource_name,
2406 columns,
2407 of_source: Some(source_name.clone()),
2408 constraints: table_constraints,
2413 if_not_exists: false,
2414 with_options: vec![
2415 CreateSubsourceOption {
2416 name: CreateSubsourceOptionName::ExternalReference,
2417 value: Some(WithOptionValue::UnresolvedItemName(
2418 purified_export.external_reference,
2419 )),
2420 },
2421 CreateSubsourceOption {
2422 name: CreateSubsourceOptionName::Details,
2423 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2424 details.into_proto().encode_to_vec(),
2425 )))),
2426 },
2427 ],
2428 };
2429 subsource_stmts.push(subsource);
2430 }
2431
2432 subsource_stmts
2433 }
2434 PurifiedExportDetails::Kafka { .. } => {
2435 if !subsources.is_empty() {
2439 bail_internal!("Kafka sources do not produce data-bearing subsources");
2440 }
2441 vec![]
2442 }
2443 };
2444 Ok(statements)
2445}
2446
2447async fn purify_csr_connection_proto(
2448 catalog: &dyn SessionCatalog,
2449 options: &SourceFormatOptions,
2450 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2451 envelope: &Option<SourceEnvelope>,
2452 storage_configuration: &StorageConfiguration,
2453) -> Result<(), PlanError> {
2454 let SourceFormatOptions::Kafka { topic } = options else {
2455 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2456 };
2457
2458 let CsrConnectionProtobuf {
2459 seed,
2460 connection: CsrConnection {
2461 connection,
2462 options: _,
2463 },
2464 } = csr_connection;
2465 match seed {
2466 None => {
2467 let scx = StatementContext::new(None, &*catalog);
2468
2469 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2470 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2471 _ => sql_bail!("{} is not a schema registry connection", connection),
2472 };
2473
2474 let ccsr_client = ccsr_connection
2475 .connect(storage_configuration, InTask::No)
2476 .await
2477 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2478
2479 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2480 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2481 .await
2482 .ok();
2483
2484 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2485 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2486 }
2487
2488 *seed = Some(CsrSeedProtobuf { value, key });
2489 }
2490 Some(_) => (),
2491 }
2492
2493 Ok(())
2494}
2495
2496async fn purify_csr_connection_avro(
2497 catalog: &dyn SessionCatalog,
2498 options: &SourceFormatOptions,
2499 csr_connection: &mut CsrConnectionAvro<Aug>,
2500 envelope: &Option<SourceEnvelope>,
2501 storage_configuration: &StorageConfiguration,
2502) -> Result<(), PlanError> {
2503 let SourceFormatOptions::Kafka { topic } = options else {
2504 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2505 };
2506
2507 let CsrConnectionAvro {
2508 connection: CsrConnection { connection, .. },
2509 seed,
2510 key_strategy,
2511 value_strategy,
2512 } = csr_connection;
2513 if seed.is_none() {
2514 let scx = StatementContext::new(None, &*catalog);
2515 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2516 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2517 _ => sql_bail!("{} is not a schema registry connection", connection),
2518 };
2519 let ccsr_client = csr_connection
2520 .connect(storage_configuration, InTask::No)
2521 .await
2522 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2523
2524 let Schema {
2525 key_schema,
2526 value_schema,
2527 key_reference_schemas,
2528 value_reference_schemas,
2529 } = get_remote_csr_schema(
2530 &ccsr_client,
2531 key_strategy.clone().unwrap_or_default(),
2532 value_strategy.clone().unwrap_or_default(),
2533 topic,
2534 )
2535 .await?;
2536 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2537 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2538 }
2539
2540 *seed = Some(CsrSeedAvro {
2541 key_schema,
2542 value_schema,
2543 key_reference_schemas,
2544 value_reference_schemas,
2545 })
2546 }
2547
2548 Ok(())
2549}
2550
2551async fn purify_glue_connection_avro(
2552 catalog: &dyn SessionCatalog,
2553 options: &SourceFormatOptions,
2554 connection: &ResolvedItemName,
2555 with_options: &[GlueAvroOption<Aug>],
2556 seed: &mut Option<GlueAvroSeed>,
2557 storage_configuration: &StorageConfiguration,
2558) -> Result<(), PlanError> {
2559 use crate::pure::error::GluePurificationError;
2560 let SourceFormatOptions::Kafka { .. } = options else {
2561 sql_bail!("AWS Glue Schema Registry is only supported with Kafka sources")
2562 };
2563
2564 let scx = StatementContext::new(None, &*catalog);
2565 let item = scx.get_item_by_resolved_name(connection)?;
2566 let full_name = scx.catalog.resolve_full_name(item.name());
2567 let Connection::GlueSchemaRegistry(gsr_connection) = item.connection()? else {
2574 return Err(GluePurificationError::NotGlueConnection(full_name).into());
2575 };
2576
2577 let crate::plan::statement::ddl::GlueAvroOptionExtracted { schema_name, .. } =
2581 with_options.to_vec().try_into()?;
2582 let schema_name = schema_name.ok_or(GluePurificationError::MissingSchemaName)?;
2583
2584 if seed.is_some() {
2585 return Ok(());
2590 }
2591 let gsr_connection = gsr_connection.into_inline_connection(catalog);
2592
2593 let enforce_external_addresses = mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
2596 .get(storage_configuration.config_set());
2597 let sdk_config = gsr_connection
2598 .aws_connection
2599 .connection
2600 .load_sdk_config(
2601 &storage_configuration.connection_context,
2602 gsr_connection.aws_connection.connection_id,
2603 InTask::No,
2605 enforce_external_addresses,
2606 )
2607 .await
2608 .map_err(|e| GluePurificationError::LoadSdkConfigError(Arc::new(e)))?;
2609 let glue_client = mz_aws_glue_schema_registry::ClientConfig::new(sdk_config).build();
2610
2611 let version = glue_client
2612 .get_schema_version_latest_by_name(&gsr_connection.registry_name, &schema_name)
2613 .await
2614 .map_err(|e| GluePurificationError::SchemaLookupError {
2615 registry: gsr_connection.registry_name.clone(),
2616 schema: schema_name.clone(),
2617 cause: Arc::new(e),
2618 })?;
2619 match &version.data_format {
2623 Some(mz_aws_glue_schema_registry::DataFormat::Avro) => {}
2624 other => {
2625 return Err(GluePurificationError::UnsupportedDataFormat {
2626 registry: gsr_connection.registry_name.clone(),
2627 schema: schema_name.clone(),
2628 format: other
2629 .as_ref()
2630 .map(|f| f.as_str().to_string())
2631 .unwrap_or_else(|| "<unspecified>".to_string()),
2632 }
2633 .into());
2634 }
2635 }
2636 let value_schema =
2637 version
2638 .definition
2639 .ok_or_else(|| GluePurificationError::EmptyDefinition {
2640 registry: gsr_connection.registry_name.clone(),
2641 schema: schema_name.clone(),
2642 })?;
2643
2644 *seed = Some(GlueAvroSeed { value_schema });
2645 Ok(())
2646}
2647
2648#[derive(Debug)]
2649pub struct Schema {
2650 pub key_schema: Option<String>,
2651 pub value_schema: String,
2652 pub key_reference_schemas: Vec<String>,
2654 pub value_reference_schemas: Vec<String>,
2656}
2657
2658struct SchemaWithReferences {
2660 schema: String,
2662 references: Vec<String>,
2664}
2665
2666async fn get_schema_with_strategy(
2667 client: &Client,
2668 strategy: ReaderSchemaSelectionStrategy,
2669 subject: &str,
2670) -> Result<Option<SchemaWithReferences>, PlanError> {
2671 match strategy {
2672 ReaderSchemaSelectionStrategy::Latest => {
2673 match client.get_subject_and_references(subject).await {
2675 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2676 schema: primary.schema.raw,
2677 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2678 })),
2679 Err(GetBySubjectError::SubjectNotFound)
2680 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2681 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2682 schema_lookup: format!("subject {}", subject.quoted()),
2683 cause: Arc::new(e),
2684 }),
2685 }
2686 }
2687 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2691 schema: raw,
2692 references: vec![],
2693 })),
2694 ReaderSchemaSelectionStrategy::ById(id) => {
2695 match client.get_subject_and_references_by_id(id).await {
2696 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2697 schema: primary.schema.raw,
2698 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2699 })),
2700 Err(GetBySubjectError::SubjectNotFound)
2701 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2702 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2703 schema_lookup: format!("subject {}", subject.quoted()),
2704 cause: Arc::new(e),
2705 }),
2706 }
2707 }
2708 }
2709}
2710
2711async fn get_remote_csr_schema(
2712 ccsr_client: &mz_ccsr::Client,
2713 key_strategy: ReaderSchemaSelectionStrategy,
2714 value_strategy: ReaderSchemaSelectionStrategy,
2715 topic: &str,
2716) -> Result<Schema, PlanError> {
2717 let value_schema_name = format!("{}-value", topic);
2718 let value_result =
2719 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2720 let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2721
2722 let key_subject = format!("{}-key", topic);
2723 let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2724 Ok(Schema {
2725 key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2726 value_schema: value_result.schema,
2727 key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2728 value_reference_schemas: value_result.references,
2729 })
2730}
2731
2732async fn compile_proto(
2734 subject_name: &String,
2735 ccsr_client: &Client,
2736) -> Result<CsrSeedProtobufSchema, PlanError> {
2737 let (primary_subject, dependency_subjects) = ccsr_client
2738 .get_subject_and_references(subject_name)
2739 .await
2740 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2741 schema_lookup: format!("subject {}", subject_name.quoted()),
2742 cause: Arc::new(e),
2743 })?;
2744
2745 let mut source_tree = VirtualSourceTree::new();
2747
2748 source_tree.as_mut().map_well_known_types();
2752
2753 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2754 source_tree.as_mut().add_file(
2755 Path::new(&subject.name),
2756 subject.schema.raw.as_bytes().to_vec(),
2757 );
2758 }
2759 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2760 let fds = db
2761 .as_mut()
2762 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2763 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2764
2765 let primary_fd = fds.file(0);
2767 let message_name = match primary_fd.message_type_size() {
2768 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2769 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2770 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2771 };
2772
2773 let bytes = &fds
2775 .serialize()
2776 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2777 let mut schema = String::new();
2778 strconv::format_bytes(&mut schema, bytes);
2779
2780 Ok(CsrSeedProtobufSchema {
2781 schema,
2782 message_name,
2783 })
2784}
2785
2786const MZ_NOW_NAME: &str = "mz_now";
2787const MZ_NOW_SCHEMA: &str = "mz_catalog";
2788
2789pub fn purify_create_materialized_view_options(
2795 catalog: impl SessionCatalog,
2796 mz_now: Option<Timestamp>,
2797 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2798 resolved_ids: &mut ResolvedIds,
2799) {
2800 let (mz_now_id, mz_now_expr) = {
2803 let item = catalog
2804 .resolve_function(&PartialItemName {
2805 database: None,
2806 schema: Some(MZ_NOW_SCHEMA.to_string()),
2807 item: MZ_NOW_NAME.to_string(),
2808 })
2809 .expect("we should be able to resolve mz_now");
2810 (
2811 item.id(),
2812 Expr::Function(Function {
2813 name: ResolvedItemName::Item {
2814 id: item.id(),
2815 qualifiers: item.name().qualifiers.clone(),
2816 full_name: catalog.resolve_full_name(item.name()),
2817 print_id: false,
2818 version: RelationVersionSelector::Latest,
2819 },
2820 args: FunctionArgs::Args {
2821 args: Vec::new(),
2822 order_by: Vec::new(),
2823 },
2824 filter: None,
2825 over: None,
2826 distinct: false,
2827 }),
2828 )
2829 };
2830 let (mz_timestamp_id, mz_timestamp_type) = {
2832 let item = catalog.get_system_type("mz_timestamp");
2833 let full_name = catalog.resolve_full_name(item.name());
2834 (
2835 item.id(),
2836 ResolvedDataType::Named {
2837 id: item.id(),
2838 qualifiers: item.name().qualifiers.clone(),
2839 full_name,
2840 modifiers: vec![],
2841 print_id: true,
2842 },
2843 )
2844 };
2845
2846 let mut introduced_mz_timestamp = false;
2847
2848 for option in cmvs.with_options.iter_mut() {
2849 if matches!(
2851 option.value,
2852 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2853 ) {
2854 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2855 RefreshAtOptionValue {
2856 time: mz_now_expr.clone(),
2857 },
2858 )));
2859 }
2860
2861 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2863 RefreshEveryOptionValue { aligned_to, .. },
2864 ))) = &mut option.value
2865 {
2866 if aligned_to.is_none() {
2867 *aligned_to = Some(mz_now_expr.clone());
2868 }
2869 }
2870
2871 match &mut option.value {
2874 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2875 time,
2876 }))) => {
2877 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2878 visitor.visit_expr_mut(time);
2879 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2880 }
2881 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2882 RefreshEveryOptionValue {
2883 interval: _,
2884 aligned_to: Some(aligned_to),
2885 },
2886 ))) => {
2887 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2888 visitor.visit_expr_mut(aligned_to);
2889 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2890 }
2891 _ => {}
2892 }
2893 }
2894
2895 if !cmvs.with_options.iter().any(|o| {
2897 matches!(
2898 o,
2899 MaterializedViewOption {
2900 value: Some(WithOptionValue::Refresh(..)),
2901 ..
2902 }
2903 )
2904 }) {
2905 cmvs.with_options.push(MaterializedViewOption {
2906 name: MaterializedViewOptionName::Refresh,
2907 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2908 })
2909 }
2910
2911 if introduced_mz_timestamp {
2915 resolved_ids.add_item(mz_timestamp_id);
2916 }
2917 let mut visitor = ExprContainsTemporalVisitor::new();
2921 visitor.visit_create_materialized_view_statement(cmvs);
2922 if !visitor.contains_temporal {
2923 resolved_ids.remove_item(&mz_now_id);
2924 }
2925}
2926
2927pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2930 match &mvo.value {
2931 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2932 let mut visitor = ExprContainsTemporalVisitor::new();
2933 visitor.visit_expr(time);
2934 visitor.contains_temporal
2935 }
2936 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2937 interval: _,
2938 aligned_to: Some(aligned_to),
2939 }))) => {
2940 let mut visitor = ExprContainsTemporalVisitor::new();
2941 visitor.visit_expr(aligned_to);
2942 visitor.contains_temporal
2943 }
2944 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2945 interval: _,
2946 aligned_to: None,
2947 }))) => {
2948 true
2951 }
2952 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2953 true
2955 }
2956 _ => false,
2957 }
2958}
2959
2960struct ExprContainsTemporalVisitor {
2962 pub contains_temporal: bool,
2963}
2964
2965impl ExprContainsTemporalVisitor {
2966 pub fn new() -> ExprContainsTemporalVisitor {
2967 ExprContainsTemporalVisitor {
2968 contains_temporal: false,
2969 }
2970 }
2971}
2972
2973impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2974 fn visit_function(&mut self, func: &Function<Aug>) {
2975 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2976 visit_function(self, func);
2977 }
2978}
2979
2980struct MzNowPurifierVisitor {
2981 pub mz_now: Option<Timestamp>,
2982 pub mz_timestamp_type: ResolvedDataType,
2983 pub introduced_mz_timestamp: bool,
2984}
2985
2986impl MzNowPurifierVisitor {
2987 pub fn new(
2988 mz_now: Option<Timestamp>,
2989 mz_timestamp_type: ResolvedDataType,
2990 ) -> MzNowPurifierVisitor {
2991 MzNowPurifierVisitor {
2992 mz_now,
2993 mz_timestamp_type,
2994 introduced_mz_timestamp: false,
2995 }
2996 }
2997}
2998
2999impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
3000 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
3001 match expr {
3002 Expr::Function(Function {
3003 name:
3004 ResolvedItemName::Item {
3005 full_name: FullItemName { item, .. },
3006 ..
3007 },
3008 ..
3009 }) if item == &MZ_NOW_NAME.to_string() => {
3010 let mz_now = self.mz_now.expect(
3011 "we should have chosen a timestamp if the expression contains mz_now()",
3012 );
3013 *expr = Expr::Cast {
3016 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
3017 data_type: self.mz_timestamp_type.clone(),
3018 };
3019 self.introduced_mz_timestamp = true;
3020 }
3021 _ => visit_expr_mut(self, expr),
3022 }
3023 }
3024}