1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81 ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::pure::mysql::{ensure_binlog_full_metadata, is_binlog_full_metadata};
88use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
89use crate::{kafka_util, normalize};
90
91use self::error::{
92 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
93 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
94};
95
96pub(crate) mod error;
97mod references;
98
99pub mod mysql;
100pub mod postgres;
101pub mod sql_server;
102
103pub(crate) struct RequestedSourceExport<T> {
104 external_reference: UnresolvedItemName,
105 name: UnresolvedItemName,
106 meta: T,
107}
108
109impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 f.debug_struct("RequestedSourceExport")
112 .field("external_reference", &self.external_reference)
113 .field("name", &self.name)
114 .field("meta", &self.meta)
115 .finish()
116 }
117}
118
119impl<T> RequestedSourceExport<T> {
120 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
121 RequestedSourceExport {
122 external_reference: self.external_reference,
123 name: self.name,
124 meta: new_meta,
125 }
126 }
127}
128
129fn source_export_name_gen(
134 source_name: &UnresolvedItemName,
135 subsource_name: &str,
136) -> Result<UnresolvedItemName, PlanError> {
137 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
138 partial.item = subsource_name.to_string();
139 Ok(UnresolvedItemName::from(partial))
140}
141
142fn validate_source_export_names<T>(
146 requested_source_exports: &[RequestedSourceExport<T>],
147) -> Result<(), PlanError> {
148 if let Some(name) = requested_source_exports
152 .iter()
153 .map(|subsource| &subsource.name)
154 .duplicates()
155 .next()
156 .cloned()
157 {
158 let mut upstream_references: Vec<_> = requested_source_exports
159 .into_iter()
160 .filter_map(|subsource| {
161 if &subsource.name == &name {
162 Some(subsource.external_reference.clone())
163 } else {
164 None
165 }
166 })
167 .collect();
168
169 upstream_references.sort();
170
171 Err(PlanError::SubsourceNameConflict {
172 name,
173 upstream_references,
174 })?;
175 }
176
177 if let Some(name) = requested_source_exports
183 .iter()
184 .map(|export| &export.external_reference)
185 .duplicates()
186 .next()
187 .cloned()
188 {
189 let mut target_names: Vec<_> = requested_source_exports
190 .into_iter()
191 .filter_map(|export| {
192 if &export.external_reference == &name {
193 Some(export.name.clone())
194 } else {
195 None
196 }
197 })
198 .collect();
199
200 target_names.sort();
201
202 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
203 }
204
205 Ok(())
206}
207
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub enum PurifiedStatement {
210 PurifiedCreateSource {
211 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
213 create_source_stmt: CreateSourceStatement<Aug>,
214 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
216 available_source_references: SourceReferences,
219 },
220 PurifiedAlterSource {
221 alter_source_stmt: AlterSourceStatement<Aug>,
222 },
223 PurifiedAlterSourceAddSubsources {
224 source_name: ResolvedItemName,
226 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
229 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
231 },
232 PurifiedAlterSourceRefreshReferences {
233 source_name: ResolvedItemName,
234 available_source_references: SourceReferences,
236 },
237 PurifiedCreateSink(CreateSinkStatement<Aug>),
238 PurifiedCreateTableFromSource {
239 stmt: CreateTableFromSourceStatement<Aug>,
240 },
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub struct PurifiedSourceExport {
245 pub external_reference: UnresolvedItemName,
246 pub details: PurifiedExportDetails,
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
250pub enum PurifiedExportDetails {
251 MySql {
252 table: MySqlTableDesc,
253 text_columns: Option<Vec<Ident>>,
254 exclude_columns: Option<Vec<Ident>>,
255 initial_gtid_set: String,
256 binlog_full_metadata: bool,
257 },
258 Postgres {
259 table: PostgresTableDesc,
260 text_columns: Option<Vec<Ident>>,
261 exclude_columns: Option<Vec<Ident>>,
262 },
263 SqlServer {
264 table: SqlServerTableDesc,
265 text_columns: Option<Vec<Ident>>,
266 excl_columns: Option<Vec<Ident>>,
267 capture_instance: Arc<str>,
268 initial_lsn: mz_sql_server_util::cdc::Lsn,
269 },
270 Kafka {},
271 LoadGenerator {
272 table: Option<RelationDesc>,
273 output: LoadGeneratorOutput,
274 },
275}
276
277pub async fn purify_statement(
287 catalog: impl SessionCatalog,
288 now: u64,
289 stmt: Statement<Aug>,
290 storage_configuration: &StorageConfiguration,
291) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
292 match stmt {
293 Statement::CreateSource(stmt) => {
294 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
295 (
296 purify_create_source(catalog, now, stmt, storage_configuration).await,
297 cluster_id,
298 )
299 }
300 Statement::AlterSource(stmt) => (
301 purify_alter_source(catalog, stmt, storage_configuration).await,
302 None,
303 ),
304 Statement::CreateSink(stmt) => {
305 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
306 (
307 purify_create_sink(catalog, stmt, storage_configuration).await,
308 cluster_id,
309 )
310 }
311 Statement::CreateTableFromSource(stmt) => (
312 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
313 None,
314 ),
315 o => (
316 Err(internal_err!(
317 "unexpected statement type in purification: {:?}",
318 o
319 )),
320 None,
321 ),
322 }
323}
324
325pub(crate) fn purify_create_sink_avro_doc_on_options(
330 catalog: &dyn SessionCatalog,
331 from_id: CatalogItemId,
332 format: &mut Option<FormatSpecifier<Aug>>,
333) -> Result<(), PlanError> {
334 let from = catalog.get_item(&from_id);
336 let object_ids = from
337 .references()
338 .items()
339 .copied()
340 .chain_one(from.id())
341 .collect::<Vec<_>>();
342
343 let mut avro_format_options = vec![];
346 for_each_format(format, |doc_on_schema, fmt| match fmt {
347 Format::Avro(AvroSchema::InlineSchema { .. })
348 | Format::Bytes
349 | Format::Csv { .. }
350 | Format::Json { .. }
351 | Format::Protobuf(..)
352 | Format::Regex(..)
353 | Format::Text => (),
354 Format::Avro(AvroSchema::Csr {
355 csr_connection: CsrConnectionAvro { connection, .. },
356 }) => {
357 avro_format_options.push((doc_on_schema, &mut connection.options));
358 }
359 });
360
361 for (for_schema, options) in avro_format_options {
364 let user_provided_comments = options
365 .iter()
366 .filter_map(|CsrConfigOption { name, .. }| match name {
367 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
368 _ => None,
369 })
370 .collect::<BTreeSet<_>>();
371
372 for object_id in &object_ids {
374 let item = catalog
376 .get_item(object_id)
377 .at_version(RelationVersionSelector::Latest);
378 let full_resolved_name = ResolvedItemName::Item {
379 id: *object_id,
380 qualifiers: item.name().qualifiers.clone(),
381 full_name: catalog.resolve_full_name(item.name()),
382 print_id: !matches!(
383 item.item_type(),
384 CatalogItemType::Func | CatalogItemType::Type
385 ),
386 version: RelationVersionSelector::Latest,
387 };
388
389 if let Some(comments_map) = catalog.get_item_comments(object_id) {
390 let doc_on_item_key = AvroDocOn {
393 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
394 for_schema,
395 };
396 if !user_provided_comments.contains(&doc_on_item_key) {
397 if let Some(root_comment) = comments_map.get(&None) {
398 options.push(CsrConfigOption {
399 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
400 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
401 root_comment.clone(),
402 ))),
403 });
404 }
405 }
406
407 let column_descs = match item.type_details() {
411 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
412 None => item.relation_desc().map(|d| d.into_owned()),
413 };
414
415 if let Some(desc) = column_descs {
416 for (pos, column_name) in desc.iter_names().enumerate() {
417 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
418 let doc_on_column_key = AvroDocOn {
419 identifier: DocOnIdentifier::Column(ColumnName {
420 relation: full_resolved_name.clone(),
421 column: ResolvedColumnReference::Column {
422 name: column_name.to_owned(),
423 index: pos,
424 },
425 }),
426 for_schema,
427 };
428 if !user_provided_comments.contains(&doc_on_column_key) {
429 options.push(CsrConfigOption {
430 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
431 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
432 Value::String(comment_str.clone()),
433 )),
434 });
435 }
436 }
437 }
438 }
439 }
440 }
441 }
442
443 Ok(())
444}
445
446async fn purify_create_sink(
449 catalog: impl SessionCatalog,
450 mut create_sink_stmt: CreateSinkStatement<Aug>,
451 storage_configuration: &StorageConfiguration,
452) -> Result<PurifiedStatement, PlanError> {
453 let CreateSinkStatement {
455 connection,
456 format,
457 with_options,
458 name: _,
459 in_cluster: _,
460 if_not_exists: _,
461 from,
462 envelope: _,
463 mode: _,
464 } = &mut create_sink_stmt;
465
466 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
468 CreateSinkOptionName::Snapshot,
469 CreateSinkOptionName::CommitInterval,
470 ];
471
472 if let Some(op) = with_options
473 .iter()
474 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
475 {
476 sql_bail!(
477 "CREATE SINK...WITH ({}..) is not allowed",
478 op.name.to_ast_string_simple(),
479 )
480 }
481
482 match &connection {
483 CreateSinkConnection::Kafka {
484 connection,
485 options,
486 key: _,
487 headers: _,
488 } => {
489 let scx = StatementContext::new(None, &catalog);
494 let connection = {
495 let item = scx.get_item_by_resolved_name(connection)?;
496 match item.connection()? {
498 Connection::Kafka(connection) => {
499 connection.clone().into_inline_connection(scx.catalog)
500 }
501 _ => sql_bail!(
502 "{} is not a kafka connection",
503 scx.catalog.resolve_full_name(item.name())
504 ),
505 }
506 };
507
508 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
509
510 if extracted_options.legacy_ids == Some(true) {
511 sql_bail!("LEGACY IDs option is not supported");
512 }
513
514 let client: AdminClient<_> = connection
515 .create_with_context(
516 storage_configuration,
517 MzClientContext::default(),
518 &BTreeMap::new(),
519 InTask::No,
520 )
521 .await
522 .map_err(|e| {
523 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
525 })?;
526
527 let metadata = client
528 .inner()
529 .fetch_metadata(
530 None,
531 storage_configuration
532 .parameters
533 .kafka_timeout_config
534 .fetch_metadata_timeout,
535 )
536 .map_err(|e| {
537 KafkaSinkPurificationError::AdminClientError(Arc::new(
538 ContextCreationError::KafkaError(e),
539 ))
540 })?;
541
542 if metadata.brokers().len() == 0 {
543 Err(KafkaSinkPurificationError::ZeroBrokers)?;
544 }
545 }
546 CreateSinkConnection::Iceberg {
547 connection,
548 aws_connection,
549 ..
550 } => {
551 let scx = StatementContext::new(None, &catalog);
552 let connection = {
553 let item = scx.get_item_by_resolved_name(connection)?;
554 match item.connection()? {
556 Connection::IcebergCatalog(connection) => {
557 connection.clone().into_inline_connection(scx.catalog)
558 }
559 _ => sql_bail!(
560 "{} is not an iceberg connection",
561 scx.catalog.resolve_full_name(item.name())
562 ),
563 }
564 };
565
566 let aws_conn_id = aws_connection.item_id();
567
568 let aws_connection = {
569 let item = scx.get_item_by_resolved_name(aws_connection)?;
570 match item.connection()? {
572 Connection::Aws(aws_connection) => aws_connection.clone(),
573 _ => sql_bail!(
574 "{} is not an aws connection",
575 scx.catalog.resolve_full_name(item.name())
576 ),
577 }
578 };
579
580 if let Some(s3tables) = connection.s3tables_catalog() {
584 let enable_region_check =
585 ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
586 if enable_region_check {
587 let env_id = &catalog.config().environment_id;
588 if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
589 let env_region = env_id.cloud_provider_region();
590 let s3_tables_region = s3tables
593 .aws_connection
594 .connection
595 .region
596 .clone()
597 .unwrap_or_else(|| "us-east-1".to_string());
598 if s3_tables_region != env_region {
599 Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
600 s3_tables_region,
601 environment_region: env_region.to_string(),
602 })?;
603 }
604 }
605 }
606 }
607
608 let _catalog = connection
609 .connect(storage_configuration, InTask::No)
610 .await
611 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
612
613 let _sdk_config = aws_connection
614 .load_sdk_config(
615 &storage_configuration.connection_context,
616 aws_conn_id.clone(),
617 InTask::No,
618 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
619 .get(storage_configuration.config_set()),
620 )
621 .await
622 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
623 }
624 }
625
626 let mut csr_connection_ids = BTreeSet::new();
627 for_each_format(format, |_, fmt| match fmt {
628 Format::Avro(AvroSchema::InlineSchema { .. })
629 | Format::Bytes
630 | Format::Csv { .. }
631 | Format::Json { .. }
632 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
633 | Format::Regex(..)
634 | Format::Text => (),
635 Format::Avro(AvroSchema::Csr {
636 csr_connection: CsrConnectionAvro { connection, .. },
637 })
638 | Format::Protobuf(ProtobufSchema::Csr {
639 csr_connection: CsrConnectionProtobuf { connection, .. },
640 }) => {
641 csr_connection_ids.insert(*connection.connection.item_id());
642 }
643 });
644
645 let scx = StatementContext::new(None, &catalog);
646 for csr_connection_id in csr_connection_ids {
647 let connection = {
648 let item = scx.get_item(&csr_connection_id);
649 match item.connection()? {
651 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
652 _ => Err(CsrPurificationError::NotCsrConnection(
653 scx.catalog.resolve_full_name(item.name()),
654 ))?,
655 }
656 };
657
658 let client = connection
659 .connect(storage_configuration, InTask::No)
660 .await
661 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
662
663 client
664 .list_subjects()
665 .await
666 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
667 }
668
669 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
670
671 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
672}
673
674fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
681where
682 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
683{
684 match format {
685 None => (),
686 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
687 Some(FormatSpecifier::KeyValue { key, value }) => {
688 f(DocOnSchema::KeyOnly, key);
689 f(DocOnSchema::ValueOnly, value);
690 }
691 }
692}
693
694#[derive(Debug, Copy, Clone, PartialEq, Eq)]
697pub(crate) enum SourceReferencePolicy {
698 NotAllowed,
701 Optional,
704 Required,
707}
708
709async fn purify_create_source(
710 catalog: impl SessionCatalog,
711 now: u64,
712 mut create_source_stmt: CreateSourceStatement<Aug>,
713 storage_configuration: &StorageConfiguration,
714) -> Result<PurifiedStatement, PlanError> {
715 let CreateSourceStatement {
716 name: source_name,
717 col_names,
718 key_constraint,
719 connection: source_connection,
720 format,
721 envelope,
722 include_metadata,
723 external_references,
724 progress_subsource,
725 with_options,
726 ..
727 } = &mut create_source_stmt;
728
729 let uses_old_syntax = !col_names.is_empty()
730 || key_constraint.is_some()
731 || format.is_some()
732 || envelope.is_some()
733 || !include_metadata.is_empty()
734 || external_references.is_some()
735 || progress_subsource.is_some();
736
737 if let Some(DeferredItemName::Named(_)) = progress_subsource {
738 sql_bail!("Cannot manually ID qualify progress subsource")
739 }
740
741 let mut requested_subsource_map = BTreeMap::new();
742
743 let progress_desc = match &source_connection {
744 CreateSourceConnection::Kafka { .. } => {
745 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
746 }
747 CreateSourceConnection::Postgres { .. } => {
748 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
749 }
750 CreateSourceConnection::SqlServer { .. } => {
751 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
752 }
753 CreateSourceConnection::MySql { .. } => {
754 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
755 }
756 CreateSourceConnection::LoadGenerator { .. } => {
757 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
758 }
759 };
760 let scx = StatementContext::new(None, &catalog);
761
762 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
766 && scx.catalog.system_vars().force_source_table_syntax()
767 {
768 SourceReferencePolicy::NotAllowed
769 } else if scx.catalog.system_vars().enable_create_table_from_source() {
770 SourceReferencePolicy::Optional
771 } else {
772 SourceReferencePolicy::Required
773 };
774
775 let mut format_options = SourceFormatOptions::Default;
776
777 let retrieved_source_references: RetrievedSourceReferences;
778
779 match source_connection {
780 CreateSourceConnection::Kafka {
781 connection,
782 options: base_with_options,
783 ..
784 } => {
785 if let Some(external_references) = external_references {
786 Err(KafkaSourcePurificationError::ReferencedSubsources(
787 external_references.clone(),
788 ))?;
789 }
790
791 let connection = {
792 let item = scx.get_item_by_resolved_name(connection)?;
793 match item.connection()? {
795 Connection::Kafka(connection) => {
796 connection.clone().into_inline_connection(&catalog)
797 }
798 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
799 scx.catalog.resolve_full_name(item.name()),
800 ))?,
801 }
802 };
803
804 let extracted_options: KafkaSourceConfigOptionExtracted =
805 base_with_options.clone().try_into()?;
806
807 let topic = extracted_options
808 .topic
809 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
810
811 let consumer = connection
812 .create_with_context(
813 storage_configuration,
814 MzClientContext::default(),
815 &BTreeMap::new(),
816 InTask::No,
817 )
818 .await
819 .map_err(|e| {
820 KafkaSourcePurificationError::KafkaConsumerError(
822 e.display_with_causes().to_string(),
823 )
824 })?;
825 let consumer = Arc::new(consumer);
826
827 match (
828 extracted_options.start_offset,
829 extracted_options.start_timestamp,
830 ) {
831 (None, None) => {
832 kafka_util::ensure_topic_exists(
834 Arc::clone(&consumer),
835 &topic,
836 storage_configuration
837 .parameters
838 .kafka_timeout_config
839 .fetch_metadata_timeout,
840 )
841 .await?;
842 }
843 (Some(_), Some(_)) => {
844 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
845 }
846 (Some(start_offsets), None) => {
847 kafka_util::validate_start_offsets(
849 Arc::clone(&consumer),
850 &topic,
851 start_offsets,
852 storage_configuration
853 .parameters
854 .kafka_timeout_config
855 .fetch_metadata_timeout,
856 )
857 .await?;
858 }
859 (None, Some(time_offset)) => {
860 let start_offsets = kafka_util::lookup_start_offsets(
862 Arc::clone(&consumer),
863 &topic,
864 time_offset,
865 now,
866 storage_configuration
867 .parameters
868 .kafka_timeout_config
869 .fetch_metadata_timeout,
870 )
871 .await?;
872
873 base_with_options.retain(|val| {
874 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
875 });
876 base_with_options.push(KafkaSourceConfigOption {
877 name: KafkaSourceConfigOptionName::StartOffset,
878 value: Some(WithOptionValue::Sequence(
879 start_offsets
880 .iter()
881 .map(|offset| {
882 WithOptionValue::Value(Value::Number(offset.to_string()))
883 })
884 .collect(),
885 )),
886 });
887 }
888 }
889
890 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
891 retrieved_source_references = reference_client.get_source_references().await?;
892
893 format_options = SourceFormatOptions::Kafka { topic };
894 }
895 CreateSourceConnection::Postgres {
896 connection,
897 options,
898 } => {
899 let connection_item = scx.get_item_by_resolved_name(connection)?;
900 let connection = match connection_item.connection().map_err(PlanError::from)? {
901 Connection::Postgres(connection) => {
902 connection.clone().into_inline_connection(&catalog)
903 }
904 _ => Err(PgSourcePurificationError::NotPgConnection(
905 scx.catalog.resolve_full_name(connection_item.name()),
906 ))?,
907 };
908 let crate::plan::statement::PgConfigOptionExtracted {
909 publication,
910 text_columns,
911 exclude_columns,
912 details,
913 ..
914 } = options.clone().try_into()?;
915 let publication =
916 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
917
918 if details.is_some() {
919 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
920 }
921
922 let client = connection
923 .validate(connection_item.id(), storage_configuration)
924 .await?;
925
926 let reference_client = SourceReferenceClient::Postgres {
927 client: &client,
928 publication: &publication,
929 database: &connection.database,
930 };
931 retrieved_source_references = reference_client.get_source_references().await?;
932
933 let postgres::PurifiedSourceExports {
934 source_exports: subsources,
935 normalized_text_columns,
936 } = postgres::purify_source_exports(
937 &client,
938 &retrieved_source_references,
939 external_references,
940 text_columns,
941 exclude_columns,
942 source_name,
943 &reference_policy,
944 )
945 .await?;
946
947 if let Some(text_cols_option) = options
948 .iter_mut()
949 .find(|option| option.name == PgConfigOptionName::TextColumns)
950 {
951 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
952 }
953
954 requested_subsource_map.extend(subsources);
955
956 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
959
960 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
962 let details = PostgresSourcePublicationDetails {
963 slot: format!(
964 "materialize_{}",
965 Uuid::new_v4().to_string().replace('-', "")
966 ),
967 timeline_id: Some(timeline_id),
968 database: connection.database,
969 };
970 options.push(PgConfigOption {
971 name: PgConfigOptionName::Details,
972 value: Some(WithOptionValue::Value(Value::String(hex::encode(
973 details.into_proto().encode_to_vec(),
974 )))),
975 })
976 }
977 CreateSourceConnection::SqlServer {
978 connection,
979 options,
980 } => {
981 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
982
983 let connection_item = scx.get_item_by_resolved_name(connection)?;
986 let connection = match connection_item.connection()? {
987 Connection::SqlServer(connection) => {
988 connection.clone().into_inline_connection(&catalog)
989 }
990 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
991 scx.catalog.resolve_full_name(connection_item.name()),
992 ))?,
993 };
994 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
995 details,
996 text_columns,
997 exclude_columns,
998 seen: _,
999 } = options.clone().try_into()?;
1000
1001 if details.is_some() {
1002 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
1003 }
1004
1005 let mut client = connection
1006 .validate(connection_item.id(), storage_configuration)
1007 .await?;
1008
1009 let database: Arc<str> = connection.database.into();
1010 let reference_client = SourceReferenceClient::SqlServer {
1011 client: &mut client,
1012 database: Arc::clone(&database),
1013 };
1014 retrieved_source_references = reference_client.get_source_references().await?;
1015 tracing::debug!(?retrieved_source_references, "got source references");
1016
1017 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1018 .get(storage_configuration.config_set());
1019
1020 let purified_source_exports = sql_server::purify_source_exports(
1021 &*database,
1022 &mut client,
1023 &retrieved_source_references,
1024 external_references,
1025 &text_columns,
1026 &exclude_columns,
1027 source_name,
1028 timeout,
1029 &reference_policy,
1030 )
1031 .await?;
1032
1033 let sql_server::PurifiedSourceExports {
1034 source_exports,
1035 normalized_text_columns,
1036 normalized_excl_columns,
1037 } = purified_source_exports;
1038
1039 requested_subsource_map.extend(source_exports);
1041
1042 let restore_history_id =
1046 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1047 let details = SqlServerSourceExtras { restore_history_id };
1048
1049 options.retain(|SqlServerConfigOption { name, .. }| {
1050 name != &SqlServerConfigOptionName::Details
1051 });
1052 options.push(SqlServerConfigOption {
1053 name: SqlServerConfigOptionName::Details,
1054 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1055 details.into_proto().encode_to_vec(),
1056 )))),
1057 });
1058
1059 if let Some(text_cols_option) = options
1061 .iter_mut()
1062 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1063 {
1064 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1065 }
1066 if let Some(excl_cols_option) = options
1067 .iter_mut()
1068 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1069 {
1070 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1071 }
1072 }
1073 CreateSourceConnection::MySql {
1074 connection,
1075 options,
1076 } => {
1077 let connection_item = scx.get_item_by_resolved_name(connection)?;
1078 let connection = match connection_item.connection()? {
1079 Connection::MySql(connection) => {
1080 connection.clone().into_inline_connection(&catalog)
1081 }
1082 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1083 scx.catalog.resolve_full_name(connection_item.name()),
1084 ))?,
1085 };
1086 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1087 details,
1088 text_columns,
1089 exclude_columns,
1090 seen: _,
1091 } = options.clone().try_into()?;
1092
1093 if details.is_some() {
1094 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1095 }
1096
1097 let mut conn = connection
1098 .validate(connection_item.id(), storage_configuration)
1099 .await
1100 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1101
1102 let initial_gtid_set =
1106 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1107
1108 let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1109
1110 let reference_client = SourceReferenceClient::MySql {
1111 conn: &mut conn,
1112 include_system_schemas: mysql::references_system_schemas(external_references),
1113 };
1114 retrieved_source_references = reference_client.get_source_references().await?;
1115
1116 let mysql::PurifiedSourceExports {
1117 source_exports: subsources,
1118 normalized_text_columns,
1119 normalized_exclude_columns,
1120 } = mysql::purify_source_exports(
1121 &mut conn,
1122 &retrieved_source_references,
1123 external_references,
1124 text_columns,
1125 exclude_columns,
1126 source_name,
1127 initial_gtid_set.clone(),
1128 &reference_policy,
1129 binlog_full_metadata,
1130 )
1131 .await?;
1132 requested_subsource_map.extend(subsources);
1133
1134 let details = MySqlSourceDetails {};
1137 options
1139 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1140 options.push(MySqlConfigOption {
1141 name: MySqlConfigOptionName::Details,
1142 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1143 details.into_proto().encode_to_vec(),
1144 )))),
1145 });
1146
1147 if let Some(text_cols_option) = options
1148 .iter_mut()
1149 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1150 {
1151 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1152 }
1153 if let Some(exclude_cols_option) = options
1154 .iter_mut()
1155 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1156 {
1157 exclude_cols_option.value =
1158 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1159 }
1160 }
1161 CreateSourceConnection::LoadGenerator { generator, options } => {
1162 let load_generator =
1163 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1164
1165 let reference_client = SourceReferenceClient::LoadGenerator {
1166 generator: &load_generator,
1167 };
1168 retrieved_source_references = reference_client.get_source_references().await?;
1169 let multi_output_sources =
1173 retrieved_source_references
1174 .all_references()
1175 .iter()
1176 .any(|r| {
1177 matches!(
1178 r.load_generator_output(),
1179 Some(output) if output != &LoadGeneratorOutput::Default
1180 )
1181 });
1182
1183 match external_references {
1184 Some(requested)
1185 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1186 {
1187 Err(PlanError::UseTablesForSources(requested.to_string()))?
1188 }
1189 Some(requested) if !multi_output_sources => match requested {
1190 ExternalReferences::SubsetTables(_) => {
1191 Err(LoadGeneratorSourcePurificationError::ForTables)?
1192 }
1193 ExternalReferences::SubsetSchemas(_) => {
1194 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1195 }
1196 ExternalReferences::All => {
1197 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1198 }
1199 },
1200 Some(requested) => {
1201 let requested_exports = retrieved_source_references
1202 .requested_source_exports(Some(requested), source_name)?;
1203 for export in requested_exports {
1204 requested_subsource_map.insert(
1205 export.name,
1206 PurifiedSourceExport {
1207 external_reference: export.external_reference,
1208 details: PurifiedExportDetails::LoadGenerator {
1209 table: export
1210 .meta
1211 .load_generator_desc()
1212 .ok_or_else(|| {
1213 internal_err!(
1214 "expected load generator source reference"
1215 )
1216 })?
1217 .clone(),
1218 output: export
1219 .meta
1220 .load_generator_output()
1221 .ok_or_else(|| {
1222 internal_err!(
1223 "expected load generator source reference"
1224 )
1225 })?
1226 .clone(),
1227 },
1228 },
1229 );
1230 }
1231 }
1232 None => {
1233 if multi_output_sources
1234 && matches!(reference_policy, SourceReferencePolicy::Required)
1235 {
1236 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1237 }
1238 }
1239 }
1240
1241 if let LoadGenerator::Clock = generator {
1242 if !options
1243 .iter()
1244 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1245 {
1246 let now = catalog.now();
1247 options.push(LoadGeneratorOption {
1248 name: LoadGeneratorOptionName::AsOf,
1249 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1250 });
1251 }
1252 }
1253 }
1254 }
1255
1256 *external_references = None;
1260
1261 let create_progress_subsource_stmt = if uses_old_syntax {
1263 let name = match progress_subsource {
1265 Some(name) => match name {
1266 DeferredItemName::Deferred(name) => name.clone(),
1267 DeferredItemName::Named(_) => {
1269 sql_bail!("progress subsource name cannot be a resolved name")
1270 }
1271 },
1272 None => {
1273 let (item, prefix) = source_name
1274 .0
1275 .split_last()
1276 .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1277 let item_name =
1278 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1279 let mut suggested_name = prefix.to_vec();
1280 suggested_name.push(candidate.clone());
1281
1282 let partial =
1283 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1284 let qualified = scx.allocate_qualified_name(partial)?;
1285 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1286 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1287 Ok::<_, PlanError>(!item_exists && !type_exists)
1288 })?;
1289
1290 let mut full_name = prefix.to_vec();
1291 full_name.push(item_name);
1292 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1293 let qualified_name = scx.allocate_qualified_name(full_name)?;
1294 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1295
1296 UnresolvedItemName::from(full_name.clone())
1297 }
1298 };
1299
1300 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1301
1302 let mut progress_with_options: Vec<_> = with_options
1304 .iter()
1305 .filter_map(|opt| match opt.name {
1306 CreateSourceOptionName::TimestampInterval => None,
1307 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1308 name: CreateSubsourceOptionName::RetainHistory,
1309 value: opt.value.clone(),
1310 }),
1311 })
1312 .collect();
1313 progress_with_options.push(CreateSubsourceOption {
1314 name: CreateSubsourceOptionName::Progress,
1315 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1316 });
1317
1318 Some(CreateSubsourceStatement {
1319 name,
1320 columns,
1321 of_source: None,
1325 constraints,
1326 if_not_exists: false,
1327 with_options: progress_with_options,
1328 })
1329 } else {
1330 None
1331 };
1332
1333 purify_source_format(
1334 &catalog,
1335 format,
1336 &format_options,
1337 envelope,
1338 storage_configuration,
1339 )
1340 .await?;
1341
1342 Ok(PurifiedStatement::PurifiedCreateSource {
1343 create_progress_subsource_stmt,
1344 create_source_stmt,
1345 subsources: requested_subsource_map,
1346 available_source_references: retrieved_source_references.available_source_references(),
1347 })
1348}
1349
1350async fn purify_alter_source(
1353 catalog: impl SessionCatalog,
1354 stmt: AlterSourceStatement<Aug>,
1355 storage_configuration: &StorageConfiguration,
1356) -> Result<PurifiedStatement, PlanError> {
1357 let scx = StatementContext::new(None, &catalog);
1358 let AlterSourceStatement {
1359 source_name: unresolved_source_name,
1360 action,
1361 if_exists,
1362 } = stmt;
1363
1364 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1366 Ok(item) => item,
1367 Err(_) if if_exists => {
1368 return Ok(PurifiedStatement::PurifiedAlterSource {
1369 alter_source_stmt: AlterSourceStatement {
1370 source_name: unresolved_source_name,
1371 action,
1372 if_exists,
1373 },
1374 });
1375 }
1376 Err(e) => return Err(e),
1377 };
1378
1379 let desc = match item.source_desc()? {
1381 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1382 None => {
1383 sql_bail!("cannot ALTER this type of source")
1384 }
1385 };
1386
1387 let source_name = item.name();
1388
1389 let resolved_source_name = ResolvedItemName::Item {
1390 id: item.id(),
1391 qualifiers: item.name().qualifiers.clone(),
1392 full_name: scx.catalog.resolve_full_name(source_name),
1393 print_id: true,
1394 version: RelationVersionSelector::Latest,
1395 };
1396
1397 let partial_name = scx.catalog.minimal_qualification(source_name);
1398
1399 match action {
1400 AlterSourceAction::AddSubsources {
1401 external_references,
1402 options,
1403 } => {
1404 if scx.catalog.system_vars().enable_create_table_from_source()
1405 && scx.catalog.system_vars().force_source_table_syntax()
1406 {
1407 Err(PlanError::UseTablesForSources(
1408 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1409 ))?;
1410 }
1411
1412 purify_alter_source_add_subsources(
1413 external_references,
1414 options,
1415 desc,
1416 partial_name,
1417 unresolved_source_name,
1418 resolved_source_name,
1419 storage_configuration,
1420 )
1421 .await
1422 }
1423 AlterSourceAction::RefreshReferences => {
1424 purify_alter_source_refresh_references(
1425 desc,
1426 resolved_source_name,
1427 storage_configuration,
1428 )
1429 .await
1430 }
1431 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1432 alter_source_stmt: AlterSourceStatement {
1433 source_name: unresolved_source_name,
1434 action,
1435 if_exists,
1436 },
1437 }),
1438 }
1439}
1440
1441async fn purify_alter_source_add_subsources(
1444 external_references: Vec<ExternalReferenceExport>,
1445 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1446 desc: SourceDesc,
1447 partial_source_name: PartialItemName,
1448 unresolved_source_name: UnresolvedItemName,
1449 resolved_source_name: ResolvedItemName,
1450 storage_configuration: &StorageConfiguration,
1451) -> Result<PurifiedStatement, PlanError> {
1452 let connection_id = match &desc.connection {
1454 GenericSourceConnection::Postgres(c) => c.connection_id,
1455 GenericSourceConnection::MySql(c) => c.connection_id,
1456 GenericSourceConnection::SqlServer(c) => c.connection_id,
1457 _ => sql_bail!(
1458 "source {} does not support ALTER SOURCE.",
1459 partial_source_name
1460 ),
1461 };
1462
1463 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1464 text_columns,
1465 exclude_columns,
1466 details,
1467 seen: _,
1468 } = options.clone().try_into()?;
1469 if details.is_some() {
1470 sql_bail!("DETAILS option cannot be explicitly set");
1471 }
1472
1473 let mut requested_subsource_map = BTreeMap::new();
1474
1475 match desc.connection {
1476 GenericSourceConnection::Postgres(pg_source_connection) => {
1477 let pg_connection = &pg_source_connection.connection;
1479
1480 let client = pg_connection
1481 .validate(connection_id, storage_configuration)
1482 .await?;
1483
1484 let reference_client = SourceReferenceClient::Postgres {
1485 client: &client,
1486 publication: &pg_source_connection.publication,
1487 database: &pg_connection.database,
1488 };
1489 let retrieved_source_references = reference_client.get_source_references().await?;
1490
1491 let postgres::PurifiedSourceExports {
1492 source_exports: subsources,
1493 normalized_text_columns,
1494 } = postgres::purify_source_exports(
1495 &client,
1496 &retrieved_source_references,
1497 &Some(ExternalReferences::SubsetTables(external_references)),
1498 text_columns,
1499 exclude_columns,
1500 &unresolved_source_name,
1501 &SourceReferencePolicy::Required,
1502 )
1503 .await?;
1504
1505 if let Some(text_cols_option) = options
1506 .iter_mut()
1507 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1508 {
1509 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1510 }
1511
1512 requested_subsource_map.extend(subsources);
1513 }
1514 GenericSourceConnection::MySql(mysql_source_connection) => {
1515 let mysql_connection = &mysql_source_connection.connection;
1516 let config = mysql_connection
1517 .config(
1518 &storage_configuration.connection_context.secrets_reader,
1519 storage_configuration,
1520 InTask::No,
1521 )
1522 .await?;
1523
1524 let mut conn = config
1525 .connect(
1526 "mysql purification",
1527 &storage_configuration.connection_context.ssh_tunnel_manager,
1528 )
1529 .await?;
1530
1531 let initial_gtid_set =
1534 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1535
1536 let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1537
1538 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1539
1540 let reference_client = SourceReferenceClient::MySql {
1541 conn: &mut conn,
1542 include_system_schemas: mysql::references_system_schemas(&requested_references),
1543 };
1544 let retrieved_source_references = reference_client.get_source_references().await?;
1545
1546 let mysql::PurifiedSourceExports {
1547 source_exports: subsources,
1548 normalized_text_columns,
1549 normalized_exclude_columns,
1550 } = mysql::purify_source_exports(
1551 &mut conn,
1552 &retrieved_source_references,
1553 &requested_references,
1554 text_columns,
1555 exclude_columns,
1556 &unresolved_source_name,
1557 initial_gtid_set,
1558 &SourceReferencePolicy::Required,
1559 binlog_full_metadata,
1560 )
1561 .await?;
1562 requested_subsource_map.extend(subsources);
1563
1564 if let Some(text_cols_option) = options
1566 .iter_mut()
1567 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1568 {
1569 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1570 }
1571 if let Some(exclude_cols_option) = options
1572 .iter_mut()
1573 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1574 {
1575 exclude_cols_option.value =
1576 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1577 }
1578 }
1579 GenericSourceConnection::SqlServer(sql_server_source) => {
1580 let sql_server_connection = &sql_server_source.connection;
1582 let config = sql_server_connection
1583 .resolve_config(
1584 &storage_configuration.connection_context.secrets_reader,
1585 storage_configuration,
1586 InTask::No,
1587 )
1588 .await?;
1589 let mut client = mz_sql_server_util::Client::connect(config).await?;
1590
1591 let database = sql_server_connection.database.clone().into();
1593 let source_references = SourceReferenceClient::SqlServer {
1594 client: &mut client,
1595 database: Arc::clone(&database),
1596 }
1597 .get_source_references()
1598 .await?;
1599 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1600
1601 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1602 .get(storage_configuration.config_set());
1603
1604 let result = sql_server::purify_source_exports(
1605 &*database,
1606 &mut client,
1607 &source_references,
1608 &requested_references,
1609 &text_columns,
1610 &exclude_columns,
1611 &unresolved_source_name,
1612 timeout,
1613 &SourceReferencePolicy::Required,
1614 )
1615 .await;
1616 let sql_server::PurifiedSourceExports {
1617 source_exports,
1618 normalized_text_columns,
1619 normalized_excl_columns,
1620 } = result?;
1621
1622 requested_subsource_map.extend(source_exports);
1624
1625 if let Some(text_cols_option) = options
1627 .iter_mut()
1628 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1629 {
1630 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1631 }
1632 if let Some(exclude_cols_option) = options
1633 .iter_mut()
1634 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1635 {
1636 exclude_cols_option.value =
1637 Some(WithOptionValue::Sequence(normalized_excl_columns));
1638 }
1639 }
1640 _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1643 };
1644
1645 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1646 source_name: resolved_source_name,
1647 options,
1648 subsources: requested_subsource_map,
1649 })
1650}
1651
1652async fn purify_alter_source_refresh_references(
1653 desc: SourceDesc,
1654 resolved_source_name: ResolvedItemName,
1655 storage_configuration: &StorageConfiguration,
1656) -> Result<PurifiedStatement, PlanError> {
1657 let retrieved_source_references = match desc.connection {
1658 GenericSourceConnection::Postgres(pg_source_connection) => {
1659 let pg_connection = &pg_source_connection.connection;
1661
1662 let config = pg_connection
1663 .config(
1664 &storage_configuration.connection_context.secrets_reader,
1665 storage_configuration,
1666 InTask::No,
1667 )
1668 .await?;
1669
1670 let client = config
1671 .connect(
1672 "postgres_purification",
1673 &storage_configuration.connection_context.ssh_tunnel_manager,
1674 )
1675 .await?;
1676 let reference_client = SourceReferenceClient::Postgres {
1677 client: &client,
1678 publication: &pg_source_connection.publication,
1679 database: &pg_connection.database,
1680 };
1681 reference_client.get_source_references().await?
1682 }
1683 GenericSourceConnection::MySql(mysql_source_connection) => {
1684 let mysql_connection = &mysql_source_connection.connection;
1685 let config = mysql_connection
1686 .config(
1687 &storage_configuration.connection_context.secrets_reader,
1688 storage_configuration,
1689 InTask::No,
1690 )
1691 .await?;
1692
1693 let mut conn = config
1694 .connect(
1695 "mysql purification",
1696 &storage_configuration.connection_context.ssh_tunnel_manager,
1697 )
1698 .await?;
1699
1700 let reference_client = SourceReferenceClient::MySql {
1701 conn: &mut conn,
1702 include_system_schemas: false,
1703 };
1704 reference_client.get_source_references().await?
1705 }
1706 GenericSourceConnection::SqlServer(sql_server_source) => {
1707 let sql_server_connection = &sql_server_source.connection;
1709 let config = sql_server_connection
1710 .resolve_config(
1711 &storage_configuration.connection_context.secrets_reader,
1712 storage_configuration,
1713 InTask::No,
1714 )
1715 .await?;
1716 let mut client = mz_sql_server_util::Client::connect(config).await?;
1717
1718 let source_references = SourceReferenceClient::SqlServer {
1720 client: &mut client,
1721 database: sql_server_connection.database.clone().into(),
1722 }
1723 .get_source_references()
1724 .await?;
1725 source_references
1726 }
1727 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1728 let reference_client = SourceReferenceClient::LoadGenerator {
1729 generator: &load_gen_connection.load_generator,
1730 };
1731 reference_client.get_source_references().await?
1732 }
1733 GenericSourceConnection::Kafka(kafka_conn) => {
1734 let reference_client = SourceReferenceClient::Kafka {
1735 topic: &kafka_conn.topic,
1736 };
1737 reference_client.get_source_references().await?
1738 }
1739 };
1740 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1741 source_name: resolved_source_name,
1742 available_source_references: retrieved_source_references.available_source_references(),
1743 })
1744}
1745
1746async fn purify_create_table_from_source(
1747 catalog: impl SessionCatalog,
1748 mut stmt: CreateTableFromSourceStatement<Aug>,
1749 storage_configuration: &StorageConfiguration,
1750) -> Result<PurifiedStatement, PlanError> {
1751 let scx = StatementContext::new(None, &catalog);
1752 let CreateTableFromSourceStatement {
1753 name: _,
1754 columns,
1755 constraints,
1756 source: source_name,
1757 if_not_exists: _,
1758 external_reference,
1759 format,
1760 envelope,
1761 include_metadata: _,
1762 with_options,
1763 } = &mut stmt;
1764
1765 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1767 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1768 }
1769 if !constraints.is_empty() {
1770 sql_bail!(
1771 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1772 );
1773 }
1774
1775 let item = match scx.get_item_by_resolved_name(source_name) {
1777 Ok(item) => item,
1778 Err(e) => return Err(e),
1779 };
1780
1781 let desc = match item.source_desc()? {
1783 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1784 None => {
1785 sql_bail!("cannot ALTER this type of source")
1786 }
1787 };
1788 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1789
1790 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1791 text_columns,
1792 exclude_columns,
1793 retain_history: _,
1794 details,
1795 partition_by: _,
1796 seen: _,
1797 } = with_options.clone().try_into()?;
1798 if details.is_some() {
1799 sql_bail!("DETAILS option cannot be explicitly set");
1800 }
1801
1802 let qualified_text_columns = text_columns
1806 .iter()
1807 .map(|col| {
1808 UnresolvedItemName(
1809 external_reference
1810 .as_ref()
1811 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1812 .unwrap_or_else(|| vec![col.clone()]),
1813 )
1814 })
1815 .collect_vec();
1816 let qualified_exclude_columns = exclude_columns
1817 .iter()
1818 .map(|col| {
1819 UnresolvedItemName(
1820 external_reference
1821 .as_ref()
1822 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1823 .unwrap_or_else(|| vec![col.clone()]),
1824 )
1825 })
1826 .collect_vec();
1827
1828 let mut format_options = SourceFormatOptions::Default;
1830
1831 let retrieved_source_references: RetrievedSourceReferences;
1832
1833 let requested_references = external_reference.as_ref().map(|ref_name| {
1834 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1835 reference: ref_name.clone(),
1836 alias: None,
1837 }])
1838 });
1839
1840 let purified_export = match desc.connection {
1843 GenericSourceConnection::Postgres(pg_source_connection) => {
1844 let pg_connection = &pg_source_connection.connection;
1846
1847 let client = pg_connection
1848 .validate(pg_source_connection.connection_id, storage_configuration)
1849 .await?;
1850
1851 let reference_client = SourceReferenceClient::Postgres {
1852 client: &client,
1853 publication: &pg_source_connection.publication,
1854 database: &pg_connection.database,
1855 };
1856 retrieved_source_references = reference_client.get_source_references().await?;
1857
1858 let postgres::PurifiedSourceExports {
1859 source_exports,
1860 normalized_text_columns: _,
1864 } = postgres::purify_source_exports(
1865 &client,
1866 &retrieved_source_references,
1867 &requested_references,
1868 qualified_text_columns,
1869 qualified_exclude_columns,
1870 &unresolved_source_name,
1871 &SourceReferencePolicy::Required,
1872 )
1873 .await?;
1874 let (_, purified_export) = source_exports.into_element();
1876 purified_export
1877 }
1878 GenericSourceConnection::MySql(mysql_source_connection) => {
1879 let mysql_connection = &mysql_source_connection.connection;
1880 let config = mysql_connection
1881 .config(
1882 &storage_configuration.connection_context.secrets_reader,
1883 storage_configuration,
1884 InTask::No,
1885 )
1886 .await?;
1887
1888 let mut conn = config
1889 .connect(
1890 "mysql purification",
1891 &storage_configuration.connection_context.ssh_tunnel_manager,
1892 )
1893 .await?;
1894
1895 ensure_binlog_full_metadata(&mut conn).await?;
1896 let binlog_full_metadata = true;
1897
1898 let initial_gtid_set =
1901 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1902
1903 let reference_client = SourceReferenceClient::MySql {
1904 conn: &mut conn,
1905 include_system_schemas: mysql::references_system_schemas(&requested_references),
1906 };
1907 retrieved_source_references = reference_client.get_source_references().await?;
1908
1909 let mysql::PurifiedSourceExports {
1910 source_exports,
1911 normalized_text_columns: _,
1915 normalized_exclude_columns: _,
1916 } = mysql::purify_source_exports(
1917 &mut conn,
1918 &retrieved_source_references,
1919 &requested_references,
1920 qualified_text_columns,
1921 qualified_exclude_columns,
1922 &unresolved_source_name,
1923 initial_gtid_set,
1924 &SourceReferencePolicy::Required,
1925 binlog_full_metadata,
1926 )
1927 .await?;
1928 let (_, purified_export) = source_exports.into_element();
1930 purified_export
1931 }
1932 GenericSourceConnection::SqlServer(sql_server_source) => {
1933 let connection = sql_server_source.connection;
1934 let config = connection
1935 .resolve_config(
1936 &storage_configuration.connection_context.secrets_reader,
1937 storage_configuration,
1938 InTask::No,
1939 )
1940 .await?;
1941 let mut client = mz_sql_server_util::Client::connect(config).await?;
1942
1943 let database: Arc<str> = connection.database.into();
1944 let reference_client = SourceReferenceClient::SqlServer {
1945 client: &mut client,
1946 database: Arc::clone(&database),
1947 };
1948 retrieved_source_references = reference_client.get_source_references().await?;
1949 tracing::debug!(?retrieved_source_references, "got source references");
1950
1951 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1952 .get(storage_configuration.config_set());
1953
1954 let purified_source_exports = sql_server::purify_source_exports(
1955 &*database,
1956 &mut client,
1957 &retrieved_source_references,
1958 &requested_references,
1959 &qualified_text_columns,
1960 &qualified_exclude_columns,
1961 &unresolved_source_name,
1962 timeout,
1963 &SourceReferencePolicy::Required,
1964 )
1965 .await?;
1966
1967 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1969 purified_export
1970 }
1971 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1972 let reference_client = SourceReferenceClient::LoadGenerator {
1973 generator: &load_gen_connection.load_generator,
1974 };
1975 retrieved_source_references = reference_client.get_source_references().await?;
1976
1977 let requested_exports = retrieved_source_references
1978 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1979 let export = requested_exports.into_element();
1981 PurifiedSourceExport {
1982 external_reference: export.external_reference,
1983 details: PurifiedExportDetails::LoadGenerator {
1984 table: export
1985 .meta
1986 .load_generator_desc()
1987 .ok_or_else(|| internal_err!("expected load generator source reference"))?
1988 .clone(),
1989 output: export
1990 .meta
1991 .load_generator_output()
1992 .ok_or_else(|| internal_err!("expected load generator source reference"))?
1993 .clone(),
1994 },
1995 }
1996 }
1997 GenericSourceConnection::Kafka(kafka_conn) => {
1998 let reference_client = SourceReferenceClient::Kafka {
1999 topic: &kafka_conn.topic,
2000 };
2001 retrieved_source_references = reference_client.get_source_references().await?;
2002 let requested_exports = retrieved_source_references
2003 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2004 let export = requested_exports.into_element();
2006
2007 format_options = SourceFormatOptions::Kafka {
2008 topic: kafka_conn.topic.clone(),
2009 };
2010 PurifiedSourceExport {
2011 external_reference: export.external_reference,
2012 details: PurifiedExportDetails::Kafka {},
2013 }
2014 }
2015 };
2016
2017 purify_source_format(
2018 &catalog,
2019 format,
2020 &format_options,
2021 envelope,
2022 storage_configuration,
2023 )
2024 .await?;
2025
2026 *external_reference = Some(purified_export.external_reference.clone());
2029
2030 match &purified_export.details {
2032 PurifiedExportDetails::Postgres { .. } => {
2033 let mut unsupported_cols = vec![];
2034 let postgres::PostgresExportStatementValues {
2035 columns: gen_columns,
2036 constraints: gen_constraints,
2037 text_columns: gen_text_columns,
2038 exclude_columns: gen_exclude_columns,
2039 details: gen_details,
2040 external_reference: _,
2041 } = postgres::generate_source_export_statement_values(
2042 &scx,
2043 purified_export,
2044 &mut unsupported_cols,
2045 )?;
2046 if !unsupported_cols.is_empty() {
2047 unsupported_cols.sort();
2048 Err(PgSourcePurificationError::UnrecognizedTypes {
2049 cols: unsupported_cols,
2050 })?;
2051 }
2052
2053 if let Some(text_cols_option) = with_options
2054 .iter_mut()
2055 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2056 {
2057 match gen_text_columns {
2058 Some(gen_text_columns) => {
2059 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2060 }
2061 None => soft_panic_or_log!(
2062 "text_columns should be Some if text_cols_option is present"
2063 ),
2064 }
2065 }
2066 if let Some(exclude_cols_option) = with_options
2067 .iter_mut()
2068 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2069 {
2070 match gen_exclude_columns {
2071 Some(gen_exclude_columns) => {
2072 exclude_cols_option.value =
2073 Some(WithOptionValue::Sequence(gen_exclude_columns))
2074 }
2075 None => soft_panic_or_log!(
2076 "exclude_columns should be Some if exclude_cols_option is present"
2077 ),
2078 }
2079 }
2080 match columns {
2081 TableFromSourceColumns::Defined(_) => {
2082 bail_internal!(
2085 "column definitions cannot be explicitly set for this source type"
2086 )
2087 }
2088 TableFromSourceColumns::NotSpecified => {
2089 *columns = TableFromSourceColumns::Defined(gen_columns);
2090 *constraints = gen_constraints;
2091 }
2092 TableFromSourceColumns::Named(_) => {
2093 sql_bail!("columns cannot be named for Postgres sources")
2094 }
2095 }
2096 with_options.push(TableFromSourceOption {
2097 name: TableFromSourceOptionName::Details,
2098 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2099 gen_details.into_proto().encode_to_vec(),
2100 )))),
2101 })
2102 }
2103 PurifiedExportDetails::MySql { .. } => {
2104 let mysql::MySqlExportStatementValues {
2105 columns: gen_columns,
2106 constraints: gen_constraints,
2107 text_columns: gen_text_columns,
2108 exclude_columns: gen_exclude_columns,
2109 details: gen_details,
2110 external_reference: _,
2111 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2112
2113 if let Some(text_cols_option) = with_options
2114 .iter_mut()
2115 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2116 {
2117 match gen_text_columns {
2118 Some(gen_text_columns) => {
2119 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2120 }
2121 None => soft_panic_or_log!(
2122 "text_columns should be Some if text_cols_option is present"
2123 ),
2124 }
2125 }
2126 if let Some(exclude_cols_option) = with_options
2127 .iter_mut()
2128 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2129 {
2130 match gen_exclude_columns {
2131 Some(gen_exclude_columns) => {
2132 exclude_cols_option.value =
2133 Some(WithOptionValue::Sequence(gen_exclude_columns))
2134 }
2135 None => soft_panic_or_log!(
2136 "exclude_columns should be Some if exclude_cols_option is present"
2137 ),
2138 }
2139 }
2140 match columns {
2141 TableFromSourceColumns::Defined(_) => {
2142 bail_internal!(
2145 "column definitions cannot be explicitly set for this source type"
2146 )
2147 }
2148 TableFromSourceColumns::NotSpecified => {
2149 *columns = TableFromSourceColumns::Defined(gen_columns);
2150 *constraints = gen_constraints;
2151 }
2152 TableFromSourceColumns::Named(_) => {
2153 sql_bail!("columns cannot be named for MySQL sources")
2154 }
2155 }
2156 with_options.push(TableFromSourceOption {
2157 name: TableFromSourceOptionName::Details,
2158 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2159 gen_details.into_proto().encode_to_vec(),
2160 )))),
2161 })
2162 }
2163 PurifiedExportDetails::SqlServer { .. } => {
2164 let sql_server::SqlServerExportStatementValues {
2165 columns: gen_columns,
2166 constraints: gen_constraints,
2167 text_columns: gen_text_columns,
2168 excl_columns: gen_excl_columns,
2169 details: gen_details,
2170 external_reference: _,
2171 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2172
2173 if let Some(text_cols_option) = with_options
2174 .iter_mut()
2175 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2176 {
2177 match gen_text_columns {
2178 Some(gen_text_columns) => {
2179 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2180 }
2181 None => soft_panic_or_log!(
2182 "text_columns should be Some if text_cols_option is present"
2183 ),
2184 }
2185 }
2186 if let Some(exclude_cols_option) = with_options
2187 .iter_mut()
2188 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2189 {
2190 match gen_excl_columns {
2191 Some(gen_excl_columns) => {
2192 exclude_cols_option.value =
2193 Some(WithOptionValue::Sequence(gen_excl_columns))
2194 }
2195 None => soft_panic_or_log!(
2196 "excl_columns should be Some if excl_cols_option is present"
2197 ),
2198 }
2199 }
2200
2201 match columns {
2202 TableFromSourceColumns::NotSpecified => {
2203 *columns = TableFromSourceColumns::Defined(gen_columns);
2204 *constraints = gen_constraints;
2205 }
2206 TableFromSourceColumns::Named(_) => {
2207 sql_bail!("columns cannot be named for SQL Server sources")
2208 }
2209 TableFromSourceColumns::Defined(_) => {
2210 bail_internal!(
2213 "column definitions cannot be explicitly set for this source type"
2214 )
2215 }
2216 }
2217
2218 with_options.push(TableFromSourceOption {
2219 name: TableFromSourceOptionName::Details,
2220 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2221 gen_details.into_proto().encode_to_vec(),
2222 )))),
2223 })
2224 }
2225 PurifiedExportDetails::LoadGenerator { .. } => {
2226 let (desc, output) = match purified_export.details {
2227 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2228 _ => bail_internal!("purified export details must be load generator"),
2229 };
2230 if let Some(desc) = desc {
2235 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2236 match columns {
2237 TableFromSourceColumns::Defined(_) => bail_internal!(
2240 "column definitions cannot be explicitly set for this source type"
2241 ),
2242 TableFromSourceColumns::NotSpecified => {
2243 *columns = TableFromSourceColumns::Defined(gen_columns);
2244 *constraints = gen_constraints;
2245 }
2246 TableFromSourceColumns::Named(_) => {
2247 sql_bail!("columns cannot be named for multi-output load generator sources")
2248 }
2249 }
2250 }
2251 let details = SourceExportStatementDetails::LoadGenerator { output };
2252 with_options.push(TableFromSourceOption {
2253 name: TableFromSourceOptionName::Details,
2254 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2255 details.into_proto().encode_to_vec(),
2256 )))),
2257 })
2258 }
2259 PurifiedExportDetails::Kafka {} => {
2260 let details = SourceExportStatementDetails::Kafka {};
2264 with_options.push(TableFromSourceOption {
2265 name: TableFromSourceOptionName::Details,
2266 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2267 details.into_proto().encode_to_vec(),
2268 )))),
2269 })
2270 }
2271 };
2272
2273 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2277}
2278
2279enum SourceFormatOptions {
2280 Default,
2281 Kafka { topic: String },
2282}
2283
2284async fn purify_source_format(
2285 catalog: &dyn SessionCatalog,
2286 format: &mut Option<FormatSpecifier<Aug>>,
2287 options: &SourceFormatOptions,
2288 envelope: &Option<SourceEnvelope>,
2289 storage_configuration: &StorageConfiguration,
2290) -> Result<(), PlanError> {
2291 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2292 && !matches!(options, SourceFormatOptions::Kafka { .. })
2293 {
2294 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2295 }
2296
2297 match format.as_mut() {
2298 None => {}
2299 Some(FormatSpecifier::Bare(format)) => {
2300 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2301 .await?;
2302 }
2303
2304 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2305 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2306 .await?;
2307 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2308 .await?;
2309 }
2310 }
2311 Ok(())
2312}
2313
2314async fn purify_source_format_single(
2315 catalog: &dyn SessionCatalog,
2316 format: &mut Format<Aug>,
2317 options: &SourceFormatOptions,
2318 envelope: &Option<SourceEnvelope>,
2319 storage_configuration: &StorageConfiguration,
2320) -> Result<(), PlanError> {
2321 match format {
2322 Format::Avro(schema) => match schema {
2323 AvroSchema::Csr { csr_connection } => {
2324 purify_csr_connection_avro(
2325 catalog,
2326 options,
2327 csr_connection,
2328 envelope,
2329 storage_configuration,
2330 )
2331 .await?
2332 }
2333 AvroSchema::InlineSchema { .. } => {}
2334 },
2335 Format::Protobuf(schema) => match schema {
2336 ProtobufSchema::Csr { csr_connection } => {
2337 purify_csr_connection_proto(
2338 catalog,
2339 options,
2340 csr_connection,
2341 envelope,
2342 storage_configuration,
2343 )
2344 .await?;
2345 }
2346 ProtobufSchema::InlineSchema { .. } => {}
2347 },
2348 Format::Bytes
2349 | Format::Regex(_)
2350 | Format::Json { .. }
2351 | Format::Text
2352 | Format::Csv { .. } => (),
2353 }
2354 Ok(())
2355}
2356
2357pub fn generate_subsource_statements(
2358 scx: &StatementContext,
2359 source_name: ResolvedItemName,
2360 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2361) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2362 if subsources.is_empty() {
2364 return Ok(vec![]);
2365 }
2366 let (_, purified_export) = subsources
2367 .iter()
2368 .next()
2369 .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2370
2371 let statements = match &purified_export.details {
2372 PurifiedExportDetails::Postgres { .. } => {
2373 crate::pure::postgres::generate_create_subsource_statements(
2374 scx,
2375 source_name,
2376 subsources,
2377 )?
2378 }
2379 PurifiedExportDetails::MySql { .. } => {
2380 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2381 }
2382 PurifiedExportDetails::SqlServer { .. } => {
2383 crate::pure::sql_server::generate_create_subsource_statements(
2384 scx,
2385 source_name,
2386 subsources,
2387 )?
2388 }
2389 PurifiedExportDetails::LoadGenerator { .. } => {
2390 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2391 for (subsource_name, purified_export) in subsources {
2392 let (desc, output) = match purified_export.details {
2393 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2394 _ => {
2395 bail_internal!("purified export details must be load generator")
2396 }
2397 };
2398 let desc = desc.ok_or_else(|| {
2399 internal_err!(
2400 "subsources cannot be generated for single-output load generators"
2401 )
2402 })?;
2403
2404 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2405 let details = SourceExportStatementDetails::LoadGenerator { output };
2406 let subsource = CreateSubsourceStatement {
2408 name: subsource_name,
2409 columns,
2410 of_source: Some(source_name.clone()),
2411 constraints: table_constraints,
2416 if_not_exists: false,
2417 with_options: vec![
2418 CreateSubsourceOption {
2419 name: CreateSubsourceOptionName::ExternalReference,
2420 value: Some(WithOptionValue::UnresolvedItemName(
2421 purified_export.external_reference,
2422 )),
2423 },
2424 CreateSubsourceOption {
2425 name: CreateSubsourceOptionName::Details,
2426 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2427 details.into_proto().encode_to_vec(),
2428 )))),
2429 },
2430 ],
2431 };
2432 subsource_stmts.push(subsource);
2433 }
2434
2435 subsource_stmts
2436 }
2437 PurifiedExportDetails::Kafka { .. } => {
2438 if !subsources.is_empty() {
2442 bail_internal!("Kafka sources do not produce data-bearing subsources");
2443 }
2444 vec![]
2445 }
2446 };
2447 Ok(statements)
2448}
2449
2450async fn purify_csr_connection_proto(
2451 catalog: &dyn SessionCatalog,
2452 options: &SourceFormatOptions,
2453 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2454 envelope: &Option<SourceEnvelope>,
2455 storage_configuration: &StorageConfiguration,
2456) -> Result<(), PlanError> {
2457 let SourceFormatOptions::Kafka { topic } = options else {
2458 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2459 };
2460
2461 let CsrConnectionProtobuf {
2462 seed,
2463 connection: CsrConnection {
2464 connection,
2465 options: _,
2466 },
2467 } = csr_connection;
2468 match seed {
2469 None => {
2470 let scx = StatementContext::new(None, &*catalog);
2471
2472 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2473 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2474 _ => sql_bail!("{} is not a schema registry connection", connection),
2475 };
2476
2477 let ccsr_client = ccsr_connection
2478 .connect(storage_configuration, InTask::No)
2479 .await
2480 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2481
2482 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2483 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2484 .await
2485 .ok();
2486
2487 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2488 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2489 }
2490
2491 *seed = Some(CsrSeedProtobuf { value, key });
2492 }
2493 Some(_) => (),
2494 }
2495
2496 Ok(())
2497}
2498
2499async fn purify_csr_connection_avro(
2500 catalog: &dyn SessionCatalog,
2501 options: &SourceFormatOptions,
2502 csr_connection: &mut CsrConnectionAvro<Aug>,
2503 envelope: &Option<SourceEnvelope>,
2504 storage_configuration: &StorageConfiguration,
2505) -> Result<(), PlanError> {
2506 let SourceFormatOptions::Kafka { topic } = options else {
2507 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2508 };
2509
2510 let CsrConnectionAvro {
2511 connection: CsrConnection { connection, .. },
2512 seed,
2513 key_strategy,
2514 value_strategy,
2515 } = csr_connection;
2516 if seed.is_none() {
2517 let scx = StatementContext::new(None, &*catalog);
2518 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2519 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2520 _ => sql_bail!("{} is not a schema registry connection", connection),
2521 };
2522 let ccsr_client = csr_connection
2523 .connect(storage_configuration, InTask::No)
2524 .await
2525 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2526
2527 let Schema {
2528 key_schema,
2529 value_schema,
2530 key_reference_schemas,
2531 value_reference_schemas,
2532 } = get_remote_csr_schema(
2533 &ccsr_client,
2534 key_strategy.clone().unwrap_or_default(),
2535 value_strategy.clone().unwrap_or_default(),
2536 topic,
2537 )
2538 .await?;
2539 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2540 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2541 }
2542
2543 *seed = Some(CsrSeedAvro {
2544 key_schema,
2545 value_schema,
2546 key_reference_schemas,
2547 value_reference_schemas,
2548 })
2549 }
2550
2551 Ok(())
2552}
2553
2554#[derive(Debug)]
2555pub struct Schema {
2556 pub key_schema: Option<String>,
2557 pub value_schema: String,
2558 pub key_reference_schemas: Vec<String>,
2560 pub value_reference_schemas: Vec<String>,
2562}
2563
2564struct SchemaWithReferences {
2566 schema: String,
2568 references: Vec<String>,
2570}
2571
2572async fn get_schema_with_strategy(
2573 client: &Client,
2574 strategy: ReaderSchemaSelectionStrategy,
2575 subject: &str,
2576) -> Result<Option<SchemaWithReferences>, PlanError> {
2577 match strategy {
2578 ReaderSchemaSelectionStrategy::Latest => {
2579 match client.get_subject_and_references(subject).await {
2581 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2582 schema: primary.schema.raw,
2583 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2584 })),
2585 Err(GetBySubjectError::SubjectNotFound)
2586 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2587 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2588 schema_lookup: format!("subject {}", subject.quoted()),
2589 cause: Arc::new(e),
2590 }),
2591 }
2592 }
2593 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2597 schema: raw,
2598 references: vec![],
2599 })),
2600 ReaderSchemaSelectionStrategy::ById(id) => {
2601 match client.get_subject_and_references_by_id(id).await {
2602 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2603 schema: primary.schema.raw,
2604 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2605 })),
2606 Err(GetBySubjectError::SubjectNotFound)
2607 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2608 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2609 schema_lookup: format!("subject {}", subject.quoted()),
2610 cause: Arc::new(e),
2611 }),
2612 }
2613 }
2614 }
2615}
2616
2617async fn get_remote_csr_schema(
2618 ccsr_client: &mz_ccsr::Client,
2619 key_strategy: ReaderSchemaSelectionStrategy,
2620 value_strategy: ReaderSchemaSelectionStrategy,
2621 topic: &str,
2622) -> Result<Schema, PlanError> {
2623 let value_schema_name = format!("{}-value", topic);
2624 let value_result =
2625 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2626 let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2627
2628 let key_subject = format!("{}-key", topic);
2629 let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2630 Ok(Schema {
2631 key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2632 value_schema: value_result.schema,
2633 key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2634 value_reference_schemas: value_result.references,
2635 })
2636}
2637
2638async fn compile_proto(
2640 subject_name: &String,
2641 ccsr_client: &Client,
2642) -> Result<CsrSeedProtobufSchema, PlanError> {
2643 let (primary_subject, dependency_subjects) = ccsr_client
2644 .get_subject_and_references(subject_name)
2645 .await
2646 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2647 schema_lookup: format!("subject {}", subject_name.quoted()),
2648 cause: Arc::new(e),
2649 })?;
2650
2651 let mut source_tree = VirtualSourceTree::new();
2653
2654 source_tree.as_mut().map_well_known_types();
2658
2659 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2660 source_tree.as_mut().add_file(
2661 Path::new(&subject.name),
2662 subject.schema.raw.as_bytes().to_vec(),
2663 );
2664 }
2665 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2666 let fds = db
2667 .as_mut()
2668 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2669 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2670
2671 let primary_fd = fds.file(0);
2673 let message_name = match primary_fd.message_type_size() {
2674 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2675 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2676 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2677 };
2678
2679 let bytes = &fds
2681 .serialize()
2682 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2683 let mut schema = String::new();
2684 strconv::format_bytes(&mut schema, bytes);
2685
2686 Ok(CsrSeedProtobufSchema {
2687 schema,
2688 message_name,
2689 })
2690}
2691
2692const MZ_NOW_NAME: &str = "mz_now";
2693const MZ_NOW_SCHEMA: &str = "mz_catalog";
2694
2695pub fn purify_create_materialized_view_options(
2701 catalog: impl SessionCatalog,
2702 mz_now: Option<Timestamp>,
2703 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2704 resolved_ids: &mut ResolvedIds,
2705) {
2706 let (mz_now_id, mz_now_expr) = {
2709 let item = catalog
2710 .resolve_function(&PartialItemName {
2711 database: None,
2712 schema: Some(MZ_NOW_SCHEMA.to_string()),
2713 item: MZ_NOW_NAME.to_string(),
2714 })
2715 .expect("we should be able to resolve mz_now");
2716 (
2717 item.id(),
2718 Expr::Function(Function {
2719 name: ResolvedItemName::Item {
2720 id: item.id(),
2721 qualifiers: item.name().qualifiers.clone(),
2722 full_name: catalog.resolve_full_name(item.name()),
2723 print_id: false,
2724 version: RelationVersionSelector::Latest,
2725 },
2726 args: FunctionArgs::Args {
2727 args: Vec::new(),
2728 order_by: Vec::new(),
2729 },
2730 filter: None,
2731 over: None,
2732 distinct: false,
2733 }),
2734 )
2735 };
2736 let (mz_timestamp_id, mz_timestamp_type) = {
2738 let item = catalog.get_system_type("mz_timestamp");
2739 let full_name = catalog.resolve_full_name(item.name());
2740 (
2741 item.id(),
2742 ResolvedDataType::Named {
2743 id: item.id(),
2744 qualifiers: item.name().qualifiers.clone(),
2745 full_name,
2746 modifiers: vec![],
2747 print_id: true,
2748 },
2749 )
2750 };
2751
2752 let mut introduced_mz_timestamp = false;
2753
2754 for option in cmvs.with_options.iter_mut() {
2755 if matches!(
2757 option.value,
2758 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2759 ) {
2760 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2761 RefreshAtOptionValue {
2762 time: mz_now_expr.clone(),
2763 },
2764 )));
2765 }
2766
2767 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2769 RefreshEveryOptionValue { aligned_to, .. },
2770 ))) = &mut option.value
2771 {
2772 if aligned_to.is_none() {
2773 *aligned_to = Some(mz_now_expr.clone());
2774 }
2775 }
2776
2777 match &mut option.value {
2780 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2781 time,
2782 }))) => {
2783 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2784 visitor.visit_expr_mut(time);
2785 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2786 }
2787 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2788 RefreshEveryOptionValue {
2789 interval: _,
2790 aligned_to: Some(aligned_to),
2791 },
2792 ))) => {
2793 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2794 visitor.visit_expr_mut(aligned_to);
2795 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2796 }
2797 _ => {}
2798 }
2799 }
2800
2801 if !cmvs.with_options.iter().any(|o| {
2803 matches!(
2804 o,
2805 MaterializedViewOption {
2806 value: Some(WithOptionValue::Refresh(..)),
2807 ..
2808 }
2809 )
2810 }) {
2811 cmvs.with_options.push(MaterializedViewOption {
2812 name: MaterializedViewOptionName::Refresh,
2813 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2814 })
2815 }
2816
2817 if introduced_mz_timestamp {
2821 resolved_ids.add_item(mz_timestamp_id);
2822 }
2823 let mut visitor = ExprContainsTemporalVisitor::new();
2827 visitor.visit_create_materialized_view_statement(cmvs);
2828 if !visitor.contains_temporal {
2829 resolved_ids.remove_item(&mz_now_id);
2830 }
2831}
2832
2833pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2836 match &mvo.value {
2837 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2838 let mut visitor = ExprContainsTemporalVisitor::new();
2839 visitor.visit_expr(time);
2840 visitor.contains_temporal
2841 }
2842 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2843 interval: _,
2844 aligned_to: Some(aligned_to),
2845 }))) => {
2846 let mut visitor = ExprContainsTemporalVisitor::new();
2847 visitor.visit_expr(aligned_to);
2848 visitor.contains_temporal
2849 }
2850 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2851 interval: _,
2852 aligned_to: None,
2853 }))) => {
2854 true
2857 }
2858 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2859 true
2861 }
2862 _ => false,
2863 }
2864}
2865
2866struct ExprContainsTemporalVisitor {
2868 pub contains_temporal: bool,
2869}
2870
2871impl ExprContainsTemporalVisitor {
2872 pub fn new() -> ExprContainsTemporalVisitor {
2873 ExprContainsTemporalVisitor {
2874 contains_temporal: false,
2875 }
2876 }
2877}
2878
2879impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2880 fn visit_function(&mut self, func: &Function<Aug>) {
2881 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2882 visit_function(self, func);
2883 }
2884}
2885
2886struct MzNowPurifierVisitor {
2887 pub mz_now: Option<Timestamp>,
2888 pub mz_timestamp_type: ResolvedDataType,
2889 pub introduced_mz_timestamp: bool,
2890}
2891
2892impl MzNowPurifierVisitor {
2893 pub fn new(
2894 mz_now: Option<Timestamp>,
2895 mz_timestamp_type: ResolvedDataType,
2896 ) -> MzNowPurifierVisitor {
2897 MzNowPurifierVisitor {
2898 mz_now,
2899 mz_timestamp_type,
2900 introduced_mz_timestamp: false,
2901 }
2902 }
2903}
2904
2905impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2906 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2907 match expr {
2908 Expr::Function(Function {
2909 name:
2910 ResolvedItemName::Item {
2911 full_name: FullItemName { item, .. },
2912 ..
2913 },
2914 ..
2915 }) if item == &MZ_NOW_NAME.to_string() => {
2916 let mz_now = self.mz_now.expect(
2917 "we should have chosen a timestamp if the expression contains mz_now()",
2918 );
2919 *expr = Expr::Cast {
2922 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2923 data_type: self.mz_timestamp_type.clone(),
2924 };
2925 self.introduced_mz_timestamp = true;
2926 }
2927 _ => visit_expr_mut(self, expr),
2928 }
2929 }
2930}