1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81 ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103 external_reference: UnresolvedItemName,
104 name: UnresolvedItemName,
105 meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 f.debug_struct("RequestedSourceExport")
111 .field("external_reference", &self.external_reference)
112 .field("name", &self.name)
113 .field("meta", &self.meta)
114 .finish()
115 }
116}
117
118impl<T> RequestedSourceExport<T> {
119 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120 RequestedSourceExport {
121 external_reference: self.external_reference,
122 name: self.name,
123 meta: new_meta,
124 }
125 }
126}
127
128fn source_export_name_gen(
133 source_name: &UnresolvedItemName,
134 subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137 partial.item = subsource_name.to_string();
138 Ok(UnresolvedItemName::from(partial))
139}
140
141fn validate_source_export_names<T>(
145 requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147 if let Some(name) = requested_source_exports
151 .iter()
152 .map(|subsource| &subsource.name)
153 .duplicates()
154 .next()
155 .cloned()
156 {
157 let mut upstream_references: Vec<_> = requested_source_exports
158 .into_iter()
159 .filter_map(|subsource| {
160 if &subsource.name == &name {
161 Some(subsource.external_reference.clone())
162 } else {
163 None
164 }
165 })
166 .collect();
167
168 upstream_references.sort();
169
170 Err(PlanError::SubsourceNameConflict {
171 name,
172 upstream_references,
173 })?;
174 }
175
176 if let Some(name) = requested_source_exports
182 .iter()
183 .map(|export| &export.external_reference)
184 .duplicates()
185 .next()
186 .cloned()
187 {
188 let mut target_names: Vec<_> = requested_source_exports
189 .into_iter()
190 .filter_map(|export| {
191 if &export.external_reference == &name {
192 Some(export.name.clone())
193 } else {
194 None
195 }
196 })
197 .collect();
198
199 target_names.sort();
200
201 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202 }
203
204 Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209 PurifiedCreateSource {
210 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212 create_source_stmt: CreateSourceStatement<Aug>,
213 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215 available_source_references: SourceReferences,
218 },
219 PurifiedAlterSource {
220 alter_source_stmt: AlterSourceStatement<Aug>,
221 },
222 PurifiedAlterSourceAddSubsources {
223 source_name: ResolvedItemName,
225 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230 },
231 PurifiedAlterSourceRefreshReferences {
232 source_name: ResolvedItemName,
233 available_source_references: SourceReferences,
235 },
236 PurifiedCreateSink(CreateSinkStatement<Aug>),
237 PurifiedCreateTableFromSource {
238 stmt: CreateTableFromSourceStatement<Aug>,
239 },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244 pub external_reference: UnresolvedItemName,
245 pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250 MySql {
251 table: MySqlTableDesc,
252 text_columns: Option<Vec<Ident>>,
253 exclude_columns: Option<Vec<Ident>>,
254 initial_gtid_set: String,
255 },
256 Postgres {
257 table: PostgresTableDesc,
258 text_columns: Option<Vec<Ident>>,
259 exclude_columns: Option<Vec<Ident>>,
260 },
261 SqlServer {
262 table: SqlServerTableDesc,
263 text_columns: Option<Vec<Ident>>,
264 excl_columns: Option<Vec<Ident>>,
265 capture_instance: Arc<str>,
266 initial_lsn: mz_sql_server_util::cdc::Lsn,
267 },
268 Kafka {},
269 LoadGenerator {
270 table: Option<RelationDesc>,
271 output: LoadGeneratorOutput,
272 },
273}
274
275pub async fn purify_statement(
285 catalog: impl SessionCatalog,
286 now: u64,
287 stmt: Statement<Aug>,
288 storage_configuration: &StorageConfiguration,
289) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
290 match stmt {
291 Statement::CreateSource(stmt) => {
292 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
293 (
294 purify_create_source(catalog, now, stmt, storage_configuration).await,
295 cluster_id,
296 )
297 }
298 Statement::AlterSource(stmt) => (
299 purify_alter_source(catalog, stmt, storage_configuration).await,
300 None,
301 ),
302 Statement::CreateSink(stmt) => {
303 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
304 (
305 purify_create_sink(catalog, stmt, storage_configuration).await,
306 cluster_id,
307 )
308 }
309 Statement::CreateTableFromSource(stmt) => (
310 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
311 None,
312 ),
313 o => (
314 Err(internal_err!(
315 "unexpected statement type in purification: {:?}",
316 o
317 )),
318 None,
319 ),
320 }
321}
322
323pub(crate) fn purify_create_sink_avro_doc_on_options(
328 catalog: &dyn SessionCatalog,
329 from_id: CatalogItemId,
330 format: &mut Option<FormatSpecifier<Aug>>,
331) -> Result<(), PlanError> {
332 let from = catalog.get_item(&from_id);
334 let object_ids = from
335 .references()
336 .items()
337 .copied()
338 .chain_one(from.id())
339 .collect::<Vec<_>>();
340
341 let mut avro_format_options = vec![];
344 for_each_format(format, |doc_on_schema, fmt| match fmt {
345 Format::Avro(AvroSchema::InlineSchema { .. })
346 | Format::Bytes
347 | Format::Csv { .. }
348 | Format::Json { .. }
349 | Format::Protobuf(..)
350 | Format::Regex(..)
351 | Format::Text => (),
352 Format::Avro(AvroSchema::Csr {
353 csr_connection: CsrConnectionAvro { connection, .. },
354 }) => {
355 avro_format_options.push((doc_on_schema, &mut connection.options));
356 }
357 });
358
359 for (for_schema, options) in avro_format_options {
362 let user_provided_comments = options
363 .iter()
364 .filter_map(|CsrConfigOption { name, .. }| match name {
365 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
366 _ => None,
367 })
368 .collect::<BTreeSet<_>>();
369
370 for object_id in &object_ids {
372 let item = catalog
374 .get_item(object_id)
375 .at_version(RelationVersionSelector::Latest);
376 let full_resolved_name = ResolvedItemName::Item {
377 id: *object_id,
378 qualifiers: item.name().qualifiers.clone(),
379 full_name: catalog.resolve_full_name(item.name()),
380 print_id: !matches!(
381 item.item_type(),
382 CatalogItemType::Func | CatalogItemType::Type
383 ),
384 version: RelationVersionSelector::Latest,
385 };
386
387 if let Some(comments_map) = catalog.get_item_comments(object_id) {
388 let doc_on_item_key = AvroDocOn {
391 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
392 for_schema,
393 };
394 if !user_provided_comments.contains(&doc_on_item_key) {
395 if let Some(root_comment) = comments_map.get(&None) {
396 options.push(CsrConfigOption {
397 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
398 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
399 root_comment.clone(),
400 ))),
401 });
402 }
403 }
404
405 let column_descs = match item.type_details() {
409 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
410 None => item.relation_desc().map(|d| d.into_owned()),
411 };
412
413 if let Some(desc) = column_descs {
414 for (pos, column_name) in desc.iter_names().enumerate() {
415 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
416 let doc_on_column_key = AvroDocOn {
417 identifier: DocOnIdentifier::Column(ColumnName {
418 relation: full_resolved_name.clone(),
419 column: ResolvedColumnReference::Column {
420 name: column_name.to_owned(),
421 index: pos,
422 },
423 }),
424 for_schema,
425 };
426 if !user_provided_comments.contains(&doc_on_column_key) {
427 options.push(CsrConfigOption {
428 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
429 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
430 Value::String(comment_str.clone()),
431 )),
432 });
433 }
434 }
435 }
436 }
437 }
438 }
439 }
440
441 Ok(())
442}
443
444async fn purify_create_sink(
447 catalog: impl SessionCatalog,
448 mut create_sink_stmt: CreateSinkStatement<Aug>,
449 storage_configuration: &StorageConfiguration,
450) -> Result<PurifiedStatement, PlanError> {
451 let CreateSinkStatement {
453 connection,
454 format,
455 with_options,
456 name: _,
457 in_cluster: _,
458 if_not_exists: _,
459 from,
460 envelope: _,
461 mode: _,
462 } = &mut create_sink_stmt;
463
464 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
466 CreateSinkOptionName::Snapshot,
467 CreateSinkOptionName::CommitInterval,
468 ];
469
470 if let Some(op) = with_options
471 .iter()
472 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
473 {
474 sql_bail!(
475 "CREATE SINK...WITH ({}..) is not allowed",
476 op.name.to_ast_string_simple(),
477 )
478 }
479
480 match &connection {
481 CreateSinkConnection::Kafka {
482 connection,
483 options,
484 key: _,
485 headers: _,
486 } => {
487 let scx = StatementContext::new(None, &catalog);
492 let connection = {
493 let item = scx.get_item_by_resolved_name(connection)?;
494 match item.connection()? {
496 Connection::Kafka(connection) => {
497 connection.clone().into_inline_connection(scx.catalog)
498 }
499 _ => sql_bail!(
500 "{} is not a kafka connection",
501 scx.catalog.resolve_full_name(item.name())
502 ),
503 }
504 };
505
506 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
507
508 if extracted_options.legacy_ids == Some(true) {
509 sql_bail!("LEGACY IDs option is not supported");
510 }
511
512 let client: AdminClient<_> = connection
513 .create_with_context(
514 storage_configuration,
515 MzClientContext::default(),
516 &BTreeMap::new(),
517 InTask::No,
518 )
519 .await
520 .map_err(|e| {
521 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
523 })?;
524
525 let metadata = client
526 .inner()
527 .fetch_metadata(
528 None,
529 storage_configuration
530 .parameters
531 .kafka_timeout_config
532 .fetch_metadata_timeout,
533 )
534 .map_err(|e| {
535 KafkaSinkPurificationError::AdminClientError(Arc::new(
536 ContextCreationError::KafkaError(e),
537 ))
538 })?;
539
540 if metadata.brokers().len() == 0 {
541 Err(KafkaSinkPurificationError::ZeroBrokers)?;
542 }
543 }
544 CreateSinkConnection::Iceberg {
545 connection,
546 aws_connection,
547 ..
548 } => {
549 let scx = StatementContext::new(None, &catalog);
550 let connection = {
551 let item = scx.get_item_by_resolved_name(connection)?;
552 match item.connection()? {
554 Connection::IcebergCatalog(connection) => {
555 connection.clone().into_inline_connection(scx.catalog)
556 }
557 _ => sql_bail!(
558 "{} is not an iceberg connection",
559 scx.catalog.resolve_full_name(item.name())
560 ),
561 }
562 };
563
564 let aws_conn_id = aws_connection.item_id();
565
566 let aws_connection = {
567 let item = scx.get_item_by_resolved_name(aws_connection)?;
568 match item.connection()? {
570 Connection::Aws(aws_connection) => aws_connection.clone(),
571 _ => sql_bail!(
572 "{} is not an aws connection",
573 scx.catalog.resolve_full_name(item.name())
574 ),
575 }
576 };
577
578 if let Some(s3tables) = connection.s3tables_catalog() {
582 let enable_region_check =
583 ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
584 if enable_region_check {
585 let env_id = &catalog.config().environment_id;
586 if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
587 let env_region = env_id.cloud_provider_region();
588 let s3_tables_region = s3tables
591 .aws_connection
592 .connection
593 .region
594 .clone()
595 .unwrap_or_else(|| "us-east-1".to_string());
596 if s3_tables_region != env_region {
597 Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
598 s3_tables_region,
599 environment_region: env_region.to_string(),
600 })?;
601 }
602 }
603 }
604 }
605
606 let _catalog = connection
607 .connect(storage_configuration, InTask::No)
608 .await
609 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
610
611 let _sdk_config = aws_connection
612 .load_sdk_config(
613 &storage_configuration.connection_context,
614 aws_conn_id.clone(),
615 InTask::No,
616 )
617 .await
618 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
619 }
620 }
621
622 let mut csr_connection_ids = BTreeSet::new();
623 for_each_format(format, |_, fmt| match fmt {
624 Format::Avro(AvroSchema::InlineSchema { .. })
625 | Format::Bytes
626 | Format::Csv { .. }
627 | Format::Json { .. }
628 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
629 | Format::Regex(..)
630 | Format::Text => (),
631 Format::Avro(AvroSchema::Csr {
632 csr_connection: CsrConnectionAvro { connection, .. },
633 })
634 | Format::Protobuf(ProtobufSchema::Csr {
635 csr_connection: CsrConnectionProtobuf { connection, .. },
636 }) => {
637 csr_connection_ids.insert(*connection.connection.item_id());
638 }
639 });
640
641 let scx = StatementContext::new(None, &catalog);
642 for csr_connection_id in csr_connection_ids {
643 let connection = {
644 let item = scx.get_item(&csr_connection_id);
645 match item.connection()? {
647 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
648 _ => Err(CsrPurificationError::NotCsrConnection(
649 scx.catalog.resolve_full_name(item.name()),
650 ))?,
651 }
652 };
653
654 let client = connection
655 .connect(storage_configuration, InTask::No)
656 .await
657 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
658
659 client
660 .list_subjects()
661 .await
662 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
663 }
664
665 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
666
667 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
668}
669
670fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
677where
678 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
679{
680 match format {
681 None => (),
682 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
683 Some(FormatSpecifier::KeyValue { key, value }) => {
684 f(DocOnSchema::KeyOnly, key);
685 f(DocOnSchema::ValueOnly, value);
686 }
687 }
688}
689
690#[derive(Debug, Copy, Clone, PartialEq, Eq)]
693pub(crate) enum SourceReferencePolicy {
694 NotAllowed,
697 Optional,
700 Required,
703}
704
705async fn purify_create_source(
706 catalog: impl SessionCatalog,
707 now: u64,
708 mut create_source_stmt: CreateSourceStatement<Aug>,
709 storage_configuration: &StorageConfiguration,
710) -> Result<PurifiedStatement, PlanError> {
711 let CreateSourceStatement {
712 name: source_name,
713 col_names,
714 key_constraint,
715 connection: source_connection,
716 format,
717 envelope,
718 include_metadata,
719 external_references,
720 progress_subsource,
721 with_options,
722 ..
723 } = &mut create_source_stmt;
724
725 let uses_old_syntax = !col_names.is_empty()
726 || key_constraint.is_some()
727 || format.is_some()
728 || envelope.is_some()
729 || !include_metadata.is_empty()
730 || external_references.is_some()
731 || progress_subsource.is_some();
732
733 if let Some(DeferredItemName::Named(_)) = progress_subsource {
734 sql_bail!("Cannot manually ID qualify progress subsource")
735 }
736
737 let mut requested_subsource_map = BTreeMap::new();
738
739 let progress_desc = match &source_connection {
740 CreateSourceConnection::Kafka { .. } => {
741 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
742 }
743 CreateSourceConnection::Postgres { .. } => {
744 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
745 }
746 CreateSourceConnection::SqlServer { .. } => {
747 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
748 }
749 CreateSourceConnection::MySql { .. } => {
750 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
751 }
752 CreateSourceConnection::LoadGenerator { .. } => {
753 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
754 }
755 };
756 let scx = StatementContext::new(None, &catalog);
757
758 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
762 && scx.catalog.system_vars().force_source_table_syntax()
763 {
764 SourceReferencePolicy::NotAllowed
765 } else if scx.catalog.system_vars().enable_create_table_from_source() {
766 SourceReferencePolicy::Optional
767 } else {
768 SourceReferencePolicy::Required
769 };
770
771 let mut format_options = SourceFormatOptions::Default;
772
773 let retrieved_source_references: RetrievedSourceReferences;
774
775 match source_connection {
776 CreateSourceConnection::Kafka {
777 connection,
778 options: base_with_options,
779 ..
780 } => {
781 if let Some(external_references) = external_references {
782 Err(KafkaSourcePurificationError::ReferencedSubsources(
783 external_references.clone(),
784 ))?;
785 }
786
787 let connection = {
788 let item = scx.get_item_by_resolved_name(connection)?;
789 match item.connection()? {
791 Connection::Kafka(connection) => {
792 connection.clone().into_inline_connection(&catalog)
793 }
794 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
795 scx.catalog.resolve_full_name(item.name()),
796 ))?,
797 }
798 };
799
800 let extracted_options: KafkaSourceConfigOptionExtracted =
801 base_with_options.clone().try_into()?;
802
803 let topic = extracted_options
804 .topic
805 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
806
807 let consumer = connection
808 .create_with_context(
809 storage_configuration,
810 MzClientContext::default(),
811 &BTreeMap::new(),
812 InTask::No,
813 )
814 .await
815 .map_err(|e| {
816 KafkaSourcePurificationError::KafkaConsumerError(
818 e.display_with_causes().to_string(),
819 )
820 })?;
821 let consumer = Arc::new(consumer);
822
823 match (
824 extracted_options.start_offset,
825 extracted_options.start_timestamp,
826 ) {
827 (None, None) => {
828 kafka_util::ensure_topic_exists(
830 Arc::clone(&consumer),
831 &topic,
832 storage_configuration
833 .parameters
834 .kafka_timeout_config
835 .fetch_metadata_timeout,
836 )
837 .await?;
838 }
839 (Some(_), Some(_)) => {
840 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
841 }
842 (Some(start_offsets), None) => {
843 kafka_util::validate_start_offsets(
845 Arc::clone(&consumer),
846 &topic,
847 start_offsets,
848 storage_configuration
849 .parameters
850 .kafka_timeout_config
851 .fetch_metadata_timeout,
852 )
853 .await?;
854 }
855 (None, Some(time_offset)) => {
856 let start_offsets = kafka_util::lookup_start_offsets(
858 Arc::clone(&consumer),
859 &topic,
860 time_offset,
861 now,
862 storage_configuration
863 .parameters
864 .kafka_timeout_config
865 .fetch_metadata_timeout,
866 )
867 .await?;
868
869 base_with_options.retain(|val| {
870 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
871 });
872 base_with_options.push(KafkaSourceConfigOption {
873 name: KafkaSourceConfigOptionName::StartOffset,
874 value: Some(WithOptionValue::Sequence(
875 start_offsets
876 .iter()
877 .map(|offset| {
878 WithOptionValue::Value(Value::Number(offset.to_string()))
879 })
880 .collect(),
881 )),
882 });
883 }
884 }
885
886 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
887 retrieved_source_references = reference_client.get_source_references().await?;
888
889 format_options = SourceFormatOptions::Kafka { topic };
890 }
891 CreateSourceConnection::Postgres {
892 connection,
893 options,
894 } => {
895 let connection_item = scx.get_item_by_resolved_name(connection)?;
896 let connection = match connection_item.connection().map_err(PlanError::from)? {
897 Connection::Postgres(connection) => {
898 connection.clone().into_inline_connection(&catalog)
899 }
900 _ => Err(PgSourcePurificationError::NotPgConnection(
901 scx.catalog.resolve_full_name(connection_item.name()),
902 ))?,
903 };
904 let crate::plan::statement::PgConfigOptionExtracted {
905 publication,
906 text_columns,
907 exclude_columns,
908 details,
909 ..
910 } = options.clone().try_into()?;
911 let publication =
912 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
913
914 if details.is_some() {
915 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
916 }
917
918 let client = connection
919 .validate(connection_item.id(), storage_configuration)
920 .await?;
921
922 let reference_client = SourceReferenceClient::Postgres {
923 client: &client,
924 publication: &publication,
925 database: &connection.database,
926 };
927 retrieved_source_references = reference_client.get_source_references().await?;
928
929 let postgres::PurifiedSourceExports {
930 source_exports: subsources,
931 normalized_text_columns,
932 } = postgres::purify_source_exports(
933 &client,
934 &retrieved_source_references,
935 external_references,
936 text_columns,
937 exclude_columns,
938 source_name,
939 &reference_policy,
940 )
941 .await?;
942
943 if let Some(text_cols_option) = options
944 .iter_mut()
945 .find(|option| option.name == PgConfigOptionName::TextColumns)
946 {
947 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
948 }
949
950 requested_subsource_map.extend(subsources);
951
952 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
955
956 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
958 let details = PostgresSourcePublicationDetails {
959 slot: format!(
960 "materialize_{}",
961 Uuid::new_v4().to_string().replace('-', "")
962 ),
963 timeline_id: Some(timeline_id),
964 database: connection.database,
965 };
966 options.push(PgConfigOption {
967 name: PgConfigOptionName::Details,
968 value: Some(WithOptionValue::Value(Value::String(hex::encode(
969 details.into_proto().encode_to_vec(),
970 )))),
971 })
972 }
973 CreateSourceConnection::SqlServer {
974 connection,
975 options,
976 } => {
977 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
978
979 let connection_item = scx.get_item_by_resolved_name(connection)?;
982 let connection = match connection_item.connection()? {
983 Connection::SqlServer(connection) => {
984 connection.clone().into_inline_connection(&catalog)
985 }
986 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
987 scx.catalog.resolve_full_name(connection_item.name()),
988 ))?,
989 };
990 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
991 details,
992 text_columns,
993 exclude_columns,
994 seen: _,
995 } = options.clone().try_into()?;
996
997 if details.is_some() {
998 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
999 }
1000
1001 let mut client = connection
1002 .validate(connection_item.id(), storage_configuration)
1003 .await?;
1004
1005 let database: Arc<str> = connection.database.into();
1006 let reference_client = SourceReferenceClient::SqlServer {
1007 client: &mut client,
1008 database: Arc::clone(&database),
1009 };
1010 retrieved_source_references = reference_client.get_source_references().await?;
1011 tracing::debug!(?retrieved_source_references, "got source references");
1012
1013 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1014 .get(storage_configuration.config_set());
1015
1016 let purified_source_exports = sql_server::purify_source_exports(
1017 &*database,
1018 &mut client,
1019 &retrieved_source_references,
1020 external_references,
1021 &text_columns,
1022 &exclude_columns,
1023 source_name,
1024 timeout,
1025 &reference_policy,
1026 )
1027 .await?;
1028
1029 let sql_server::PurifiedSourceExports {
1030 source_exports,
1031 normalized_text_columns,
1032 normalized_excl_columns,
1033 } = purified_source_exports;
1034
1035 requested_subsource_map.extend(source_exports);
1037
1038 let restore_history_id =
1042 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1043 let details = SqlServerSourceExtras { restore_history_id };
1044
1045 options.retain(|SqlServerConfigOption { name, .. }| {
1046 name != &SqlServerConfigOptionName::Details
1047 });
1048 options.push(SqlServerConfigOption {
1049 name: SqlServerConfigOptionName::Details,
1050 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1051 details.into_proto().encode_to_vec(),
1052 )))),
1053 });
1054
1055 if let Some(text_cols_option) = options
1057 .iter_mut()
1058 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1059 {
1060 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1061 }
1062 if let Some(excl_cols_option) = options
1063 .iter_mut()
1064 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1065 {
1066 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1067 }
1068 }
1069 CreateSourceConnection::MySql {
1070 connection,
1071 options,
1072 } => {
1073 let connection_item = scx.get_item_by_resolved_name(connection)?;
1074 let connection = match connection_item.connection()? {
1075 Connection::MySql(connection) => {
1076 connection.clone().into_inline_connection(&catalog)
1077 }
1078 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1079 scx.catalog.resolve_full_name(connection_item.name()),
1080 ))?,
1081 };
1082 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1083 details,
1084 text_columns,
1085 exclude_columns,
1086 seen: _,
1087 } = options.clone().try_into()?;
1088
1089 if details.is_some() {
1090 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1091 }
1092
1093 let mut conn = connection
1094 .validate(connection_item.id(), storage_configuration)
1095 .await
1096 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1097
1098 let initial_gtid_set =
1102 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1103
1104 let reference_client = SourceReferenceClient::MySql {
1105 conn: &mut conn,
1106 include_system_schemas: mysql::references_system_schemas(external_references),
1107 };
1108 retrieved_source_references = reference_client.get_source_references().await?;
1109
1110 let mysql::PurifiedSourceExports {
1111 source_exports: subsources,
1112 normalized_text_columns,
1113 normalized_exclude_columns,
1114 } = mysql::purify_source_exports(
1115 &mut conn,
1116 &retrieved_source_references,
1117 external_references,
1118 text_columns,
1119 exclude_columns,
1120 source_name,
1121 initial_gtid_set.clone(),
1122 &reference_policy,
1123 )
1124 .await?;
1125 requested_subsource_map.extend(subsources);
1126
1127 let details = MySqlSourceDetails {};
1130 options
1132 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1133 options.push(MySqlConfigOption {
1134 name: MySqlConfigOptionName::Details,
1135 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1136 details.into_proto().encode_to_vec(),
1137 )))),
1138 });
1139
1140 if let Some(text_cols_option) = options
1141 .iter_mut()
1142 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1143 {
1144 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1145 }
1146 if let Some(exclude_cols_option) = options
1147 .iter_mut()
1148 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1149 {
1150 exclude_cols_option.value =
1151 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1152 }
1153 }
1154 CreateSourceConnection::LoadGenerator { generator, options } => {
1155 let load_generator =
1156 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1157
1158 let reference_client = SourceReferenceClient::LoadGenerator {
1159 generator: &load_generator,
1160 };
1161 retrieved_source_references = reference_client.get_source_references().await?;
1162 let multi_output_sources =
1166 retrieved_source_references
1167 .all_references()
1168 .iter()
1169 .any(|r| {
1170 matches!(
1171 r.load_generator_output(),
1172 Some(output) if output != &LoadGeneratorOutput::Default
1173 )
1174 });
1175
1176 match external_references {
1177 Some(requested)
1178 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1179 {
1180 Err(PlanError::UseTablesForSources(requested.to_string()))?
1181 }
1182 Some(requested) if !multi_output_sources => match requested {
1183 ExternalReferences::SubsetTables(_) => {
1184 Err(LoadGeneratorSourcePurificationError::ForTables)?
1185 }
1186 ExternalReferences::SubsetSchemas(_) => {
1187 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1188 }
1189 ExternalReferences::All => {
1190 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1191 }
1192 },
1193 Some(requested) => {
1194 let requested_exports = retrieved_source_references
1195 .requested_source_exports(Some(requested), source_name)?;
1196 for export in requested_exports {
1197 requested_subsource_map.insert(
1198 export.name,
1199 PurifiedSourceExport {
1200 external_reference: export.external_reference,
1201 details: PurifiedExportDetails::LoadGenerator {
1202 table: export
1203 .meta
1204 .load_generator_desc()
1205 .ok_or_else(|| {
1206 internal_err!(
1207 "expected load generator source reference"
1208 )
1209 })?
1210 .clone(),
1211 output: export
1212 .meta
1213 .load_generator_output()
1214 .ok_or_else(|| {
1215 internal_err!(
1216 "expected load generator source reference"
1217 )
1218 })?
1219 .clone(),
1220 },
1221 },
1222 );
1223 }
1224 }
1225 None => {
1226 if multi_output_sources
1227 && matches!(reference_policy, SourceReferencePolicy::Required)
1228 {
1229 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1230 }
1231 }
1232 }
1233
1234 if let LoadGenerator::Clock = generator {
1235 if !options
1236 .iter()
1237 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1238 {
1239 let now = catalog.now();
1240 options.push(LoadGeneratorOption {
1241 name: LoadGeneratorOptionName::AsOf,
1242 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1243 });
1244 }
1245 }
1246 }
1247 }
1248
1249 *external_references = None;
1253
1254 let create_progress_subsource_stmt = if uses_old_syntax {
1256 let name = match progress_subsource {
1258 Some(name) => match name {
1259 DeferredItemName::Deferred(name) => name.clone(),
1260 DeferredItemName::Named(_) => {
1262 sql_bail!("progress subsource name cannot be a resolved name")
1263 }
1264 },
1265 None => {
1266 let (item, prefix) = source_name
1267 .0
1268 .split_last()
1269 .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1270 let item_name =
1271 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1272 let mut suggested_name = prefix.to_vec();
1273 suggested_name.push(candidate.clone());
1274
1275 let partial =
1276 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1277 let qualified = scx.allocate_qualified_name(partial)?;
1278 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1279 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1280 Ok::<_, PlanError>(!item_exists && !type_exists)
1281 })?;
1282
1283 let mut full_name = prefix.to_vec();
1284 full_name.push(item_name);
1285 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1286 let qualified_name = scx.allocate_qualified_name(full_name)?;
1287 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1288
1289 UnresolvedItemName::from(full_name.clone())
1290 }
1291 };
1292
1293 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1294
1295 let mut progress_with_options: Vec<_> = with_options
1297 .iter()
1298 .filter_map(|opt| match opt.name {
1299 CreateSourceOptionName::TimestampInterval => None,
1300 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1301 name: CreateSubsourceOptionName::RetainHistory,
1302 value: opt.value.clone(),
1303 }),
1304 })
1305 .collect();
1306 progress_with_options.push(CreateSubsourceOption {
1307 name: CreateSubsourceOptionName::Progress,
1308 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1309 });
1310
1311 Some(CreateSubsourceStatement {
1312 name,
1313 columns,
1314 of_source: None,
1318 constraints,
1319 if_not_exists: false,
1320 with_options: progress_with_options,
1321 })
1322 } else {
1323 None
1324 };
1325
1326 purify_source_format(
1327 &catalog,
1328 format,
1329 &format_options,
1330 envelope,
1331 storage_configuration,
1332 )
1333 .await?;
1334
1335 Ok(PurifiedStatement::PurifiedCreateSource {
1336 create_progress_subsource_stmt,
1337 create_source_stmt,
1338 subsources: requested_subsource_map,
1339 available_source_references: retrieved_source_references.available_source_references(),
1340 })
1341}
1342
1343async fn purify_alter_source(
1346 catalog: impl SessionCatalog,
1347 stmt: AlterSourceStatement<Aug>,
1348 storage_configuration: &StorageConfiguration,
1349) -> Result<PurifiedStatement, PlanError> {
1350 let scx = StatementContext::new(None, &catalog);
1351 let AlterSourceStatement {
1352 source_name: unresolved_source_name,
1353 action,
1354 if_exists,
1355 } = stmt;
1356
1357 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1359 Ok(item) => item,
1360 Err(_) if if_exists => {
1361 return Ok(PurifiedStatement::PurifiedAlterSource {
1362 alter_source_stmt: AlterSourceStatement {
1363 source_name: unresolved_source_name,
1364 action,
1365 if_exists,
1366 },
1367 });
1368 }
1369 Err(e) => return Err(e),
1370 };
1371
1372 let desc = match item.source_desc()? {
1374 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1375 None => {
1376 sql_bail!("cannot ALTER this type of source")
1377 }
1378 };
1379
1380 let source_name = item.name();
1381
1382 let resolved_source_name = ResolvedItemName::Item {
1383 id: item.id(),
1384 qualifiers: item.name().qualifiers.clone(),
1385 full_name: scx.catalog.resolve_full_name(source_name),
1386 print_id: true,
1387 version: RelationVersionSelector::Latest,
1388 };
1389
1390 let partial_name = scx.catalog.minimal_qualification(source_name);
1391
1392 match action {
1393 AlterSourceAction::AddSubsources {
1394 external_references,
1395 options,
1396 } => {
1397 if scx.catalog.system_vars().enable_create_table_from_source()
1398 && scx.catalog.system_vars().force_source_table_syntax()
1399 {
1400 Err(PlanError::UseTablesForSources(
1401 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1402 ))?;
1403 }
1404
1405 purify_alter_source_add_subsources(
1406 external_references,
1407 options,
1408 desc,
1409 partial_name,
1410 unresolved_source_name,
1411 resolved_source_name,
1412 storage_configuration,
1413 )
1414 .await
1415 }
1416 AlterSourceAction::RefreshReferences => {
1417 purify_alter_source_refresh_references(
1418 desc,
1419 resolved_source_name,
1420 storage_configuration,
1421 )
1422 .await
1423 }
1424 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1425 alter_source_stmt: AlterSourceStatement {
1426 source_name: unresolved_source_name,
1427 action,
1428 if_exists,
1429 },
1430 }),
1431 }
1432}
1433
1434async fn purify_alter_source_add_subsources(
1437 external_references: Vec<ExternalReferenceExport>,
1438 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1439 desc: SourceDesc,
1440 partial_source_name: PartialItemName,
1441 unresolved_source_name: UnresolvedItemName,
1442 resolved_source_name: ResolvedItemName,
1443 storage_configuration: &StorageConfiguration,
1444) -> Result<PurifiedStatement, PlanError> {
1445 let connection_id = match &desc.connection {
1447 GenericSourceConnection::Postgres(c) => c.connection_id,
1448 GenericSourceConnection::MySql(c) => c.connection_id,
1449 GenericSourceConnection::SqlServer(c) => c.connection_id,
1450 _ => sql_bail!(
1451 "source {} does not support ALTER SOURCE.",
1452 partial_source_name
1453 ),
1454 };
1455
1456 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1457 text_columns,
1458 exclude_columns,
1459 details,
1460 seen: _,
1461 } = options.clone().try_into()?;
1462 if details.is_some() {
1463 sql_bail!("DETAILS option cannot be explicitly set");
1464 }
1465
1466 let mut requested_subsource_map = BTreeMap::new();
1467
1468 match desc.connection {
1469 GenericSourceConnection::Postgres(pg_source_connection) => {
1470 let pg_connection = &pg_source_connection.connection;
1472
1473 let client = pg_connection
1474 .validate(connection_id, storage_configuration)
1475 .await?;
1476
1477 let reference_client = SourceReferenceClient::Postgres {
1478 client: &client,
1479 publication: &pg_source_connection.publication,
1480 database: &pg_connection.database,
1481 };
1482 let retrieved_source_references = reference_client.get_source_references().await?;
1483
1484 let postgres::PurifiedSourceExports {
1485 source_exports: subsources,
1486 normalized_text_columns,
1487 } = postgres::purify_source_exports(
1488 &client,
1489 &retrieved_source_references,
1490 &Some(ExternalReferences::SubsetTables(external_references)),
1491 text_columns,
1492 exclude_columns,
1493 &unresolved_source_name,
1494 &SourceReferencePolicy::Required,
1495 )
1496 .await?;
1497
1498 if let Some(text_cols_option) = options
1499 .iter_mut()
1500 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1501 {
1502 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1503 }
1504
1505 requested_subsource_map.extend(subsources);
1506 }
1507 GenericSourceConnection::MySql(mysql_source_connection) => {
1508 let mysql_connection = &mysql_source_connection.connection;
1509 let config = mysql_connection
1510 .config(
1511 &storage_configuration.connection_context.secrets_reader,
1512 storage_configuration,
1513 InTask::No,
1514 )
1515 .await?;
1516
1517 let mut conn = config
1518 .connect(
1519 "mysql purification",
1520 &storage_configuration.connection_context.ssh_tunnel_manager,
1521 )
1522 .await?;
1523
1524 let initial_gtid_set =
1527 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1528
1529 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1530
1531 let reference_client = SourceReferenceClient::MySql {
1532 conn: &mut conn,
1533 include_system_schemas: mysql::references_system_schemas(&requested_references),
1534 };
1535 let retrieved_source_references = reference_client.get_source_references().await?;
1536
1537 let mysql::PurifiedSourceExports {
1538 source_exports: subsources,
1539 normalized_text_columns,
1540 normalized_exclude_columns,
1541 } = mysql::purify_source_exports(
1542 &mut conn,
1543 &retrieved_source_references,
1544 &requested_references,
1545 text_columns,
1546 exclude_columns,
1547 &unresolved_source_name,
1548 initial_gtid_set,
1549 &SourceReferencePolicy::Required,
1550 )
1551 .await?;
1552 requested_subsource_map.extend(subsources);
1553
1554 if let Some(text_cols_option) = options
1556 .iter_mut()
1557 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1558 {
1559 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1560 }
1561 if let Some(exclude_cols_option) = options
1562 .iter_mut()
1563 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1564 {
1565 exclude_cols_option.value =
1566 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1567 }
1568 }
1569 GenericSourceConnection::SqlServer(sql_server_source) => {
1570 let sql_server_connection = &sql_server_source.connection;
1572 let config = sql_server_connection
1573 .resolve_config(
1574 &storage_configuration.connection_context.secrets_reader,
1575 storage_configuration,
1576 InTask::No,
1577 )
1578 .await?;
1579 let mut client = mz_sql_server_util::Client::connect(config).await?;
1580
1581 let database = sql_server_connection.database.clone().into();
1583 let source_references = SourceReferenceClient::SqlServer {
1584 client: &mut client,
1585 database: Arc::clone(&database),
1586 }
1587 .get_source_references()
1588 .await?;
1589 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1590
1591 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1592 .get(storage_configuration.config_set());
1593
1594 let result = sql_server::purify_source_exports(
1595 &*database,
1596 &mut client,
1597 &source_references,
1598 &requested_references,
1599 &text_columns,
1600 &exclude_columns,
1601 &unresolved_source_name,
1602 timeout,
1603 &SourceReferencePolicy::Required,
1604 )
1605 .await;
1606 let sql_server::PurifiedSourceExports {
1607 source_exports,
1608 normalized_text_columns,
1609 normalized_excl_columns,
1610 } = result?;
1611
1612 requested_subsource_map.extend(source_exports);
1614
1615 if let Some(text_cols_option) = options
1617 .iter_mut()
1618 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1619 {
1620 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1621 }
1622 if let Some(exclude_cols_option) = options
1623 .iter_mut()
1624 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1625 {
1626 exclude_cols_option.value =
1627 Some(WithOptionValue::Sequence(normalized_excl_columns));
1628 }
1629 }
1630 _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1633 };
1634
1635 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1636 source_name: resolved_source_name,
1637 options,
1638 subsources: requested_subsource_map,
1639 })
1640}
1641
1642async fn purify_alter_source_refresh_references(
1643 desc: SourceDesc,
1644 resolved_source_name: ResolvedItemName,
1645 storage_configuration: &StorageConfiguration,
1646) -> Result<PurifiedStatement, PlanError> {
1647 let retrieved_source_references = match desc.connection {
1648 GenericSourceConnection::Postgres(pg_source_connection) => {
1649 let pg_connection = &pg_source_connection.connection;
1651
1652 let config = pg_connection
1653 .config(
1654 &storage_configuration.connection_context.secrets_reader,
1655 storage_configuration,
1656 InTask::No,
1657 )
1658 .await?;
1659
1660 let client = config
1661 .connect(
1662 "postgres_purification",
1663 &storage_configuration.connection_context.ssh_tunnel_manager,
1664 )
1665 .await?;
1666 let reference_client = SourceReferenceClient::Postgres {
1667 client: &client,
1668 publication: &pg_source_connection.publication,
1669 database: &pg_connection.database,
1670 };
1671 reference_client.get_source_references().await?
1672 }
1673 GenericSourceConnection::MySql(mysql_source_connection) => {
1674 let mysql_connection = &mysql_source_connection.connection;
1675 let config = mysql_connection
1676 .config(
1677 &storage_configuration.connection_context.secrets_reader,
1678 storage_configuration,
1679 InTask::No,
1680 )
1681 .await?;
1682
1683 let mut conn = config
1684 .connect(
1685 "mysql purification",
1686 &storage_configuration.connection_context.ssh_tunnel_manager,
1687 )
1688 .await?;
1689
1690 let reference_client = SourceReferenceClient::MySql {
1691 conn: &mut conn,
1692 include_system_schemas: false,
1693 };
1694 reference_client.get_source_references().await?
1695 }
1696 GenericSourceConnection::SqlServer(sql_server_source) => {
1697 let sql_server_connection = &sql_server_source.connection;
1699 let config = sql_server_connection
1700 .resolve_config(
1701 &storage_configuration.connection_context.secrets_reader,
1702 storage_configuration,
1703 InTask::No,
1704 )
1705 .await?;
1706 let mut client = mz_sql_server_util::Client::connect(config).await?;
1707
1708 let source_references = SourceReferenceClient::SqlServer {
1710 client: &mut client,
1711 database: sql_server_connection.database.clone().into(),
1712 }
1713 .get_source_references()
1714 .await?;
1715 source_references
1716 }
1717 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1718 let reference_client = SourceReferenceClient::LoadGenerator {
1719 generator: &load_gen_connection.load_generator,
1720 };
1721 reference_client.get_source_references().await?
1722 }
1723 GenericSourceConnection::Kafka(kafka_conn) => {
1724 let reference_client = SourceReferenceClient::Kafka {
1725 topic: &kafka_conn.topic,
1726 };
1727 reference_client.get_source_references().await?
1728 }
1729 };
1730 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1731 source_name: resolved_source_name,
1732 available_source_references: retrieved_source_references.available_source_references(),
1733 })
1734}
1735
1736async fn purify_create_table_from_source(
1737 catalog: impl SessionCatalog,
1738 mut stmt: CreateTableFromSourceStatement<Aug>,
1739 storage_configuration: &StorageConfiguration,
1740) -> Result<PurifiedStatement, PlanError> {
1741 let scx = StatementContext::new(None, &catalog);
1742 let CreateTableFromSourceStatement {
1743 name: _,
1744 columns,
1745 constraints,
1746 source: source_name,
1747 if_not_exists: _,
1748 external_reference,
1749 format,
1750 envelope,
1751 include_metadata: _,
1752 with_options,
1753 } = &mut stmt;
1754
1755 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1757 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1758 }
1759 if !constraints.is_empty() {
1760 sql_bail!(
1761 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1762 );
1763 }
1764
1765 let item = match scx.get_item_by_resolved_name(source_name) {
1767 Ok(item) => item,
1768 Err(e) => return Err(e),
1769 };
1770
1771 let desc = match item.source_desc()? {
1773 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1774 None => {
1775 sql_bail!("cannot ALTER this type of source")
1776 }
1777 };
1778 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1779
1780 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1781 text_columns,
1782 exclude_columns,
1783 retain_history: _,
1784 details,
1785 partition_by: _,
1786 seen: _,
1787 } = with_options.clone().try_into()?;
1788 if details.is_some() {
1789 sql_bail!("DETAILS option cannot be explicitly set");
1790 }
1791
1792 let qualified_text_columns = text_columns
1796 .iter()
1797 .map(|col| {
1798 UnresolvedItemName(
1799 external_reference
1800 .as_ref()
1801 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1802 .unwrap_or_else(|| vec![col.clone()]),
1803 )
1804 })
1805 .collect_vec();
1806 let qualified_exclude_columns = exclude_columns
1807 .iter()
1808 .map(|col| {
1809 UnresolvedItemName(
1810 external_reference
1811 .as_ref()
1812 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1813 .unwrap_or_else(|| vec![col.clone()]),
1814 )
1815 })
1816 .collect_vec();
1817
1818 let mut format_options = SourceFormatOptions::Default;
1820
1821 let retrieved_source_references: RetrievedSourceReferences;
1822
1823 let requested_references = external_reference.as_ref().map(|ref_name| {
1824 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1825 reference: ref_name.clone(),
1826 alias: None,
1827 }])
1828 });
1829
1830 let purified_export = match desc.connection {
1833 GenericSourceConnection::Postgres(pg_source_connection) => {
1834 let pg_connection = &pg_source_connection.connection;
1836
1837 let client = pg_connection
1838 .validate(pg_source_connection.connection_id, storage_configuration)
1839 .await?;
1840
1841 let reference_client = SourceReferenceClient::Postgres {
1842 client: &client,
1843 publication: &pg_source_connection.publication,
1844 database: &pg_connection.database,
1845 };
1846 retrieved_source_references = reference_client.get_source_references().await?;
1847
1848 let postgres::PurifiedSourceExports {
1849 source_exports,
1850 normalized_text_columns: _,
1854 } = postgres::purify_source_exports(
1855 &client,
1856 &retrieved_source_references,
1857 &requested_references,
1858 qualified_text_columns,
1859 qualified_exclude_columns,
1860 &unresolved_source_name,
1861 &SourceReferencePolicy::Required,
1862 )
1863 .await?;
1864 let (_, purified_export) = source_exports.into_element();
1866 purified_export
1867 }
1868 GenericSourceConnection::MySql(mysql_source_connection) => {
1869 let mysql_connection = &mysql_source_connection.connection;
1870 let config = mysql_connection
1871 .config(
1872 &storage_configuration.connection_context.secrets_reader,
1873 storage_configuration,
1874 InTask::No,
1875 )
1876 .await?;
1877
1878 let mut conn = config
1879 .connect(
1880 "mysql purification",
1881 &storage_configuration.connection_context.ssh_tunnel_manager,
1882 )
1883 .await?;
1884
1885 let initial_gtid_set =
1888 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1889
1890 let reference_client = SourceReferenceClient::MySql {
1891 conn: &mut conn,
1892 include_system_schemas: mysql::references_system_schemas(&requested_references),
1893 };
1894 retrieved_source_references = reference_client.get_source_references().await?;
1895
1896 let mysql::PurifiedSourceExports {
1897 source_exports,
1898 normalized_text_columns: _,
1902 normalized_exclude_columns: _,
1903 } = mysql::purify_source_exports(
1904 &mut conn,
1905 &retrieved_source_references,
1906 &requested_references,
1907 qualified_text_columns,
1908 qualified_exclude_columns,
1909 &unresolved_source_name,
1910 initial_gtid_set,
1911 &SourceReferencePolicy::Required,
1912 )
1913 .await?;
1914 let (_, purified_export) = source_exports.into_element();
1916 purified_export
1917 }
1918 GenericSourceConnection::SqlServer(sql_server_source) => {
1919 let connection = sql_server_source.connection;
1920 let config = connection
1921 .resolve_config(
1922 &storage_configuration.connection_context.secrets_reader,
1923 storage_configuration,
1924 InTask::No,
1925 )
1926 .await?;
1927 let mut client = mz_sql_server_util::Client::connect(config).await?;
1928
1929 let database: Arc<str> = connection.database.into();
1930 let reference_client = SourceReferenceClient::SqlServer {
1931 client: &mut client,
1932 database: Arc::clone(&database),
1933 };
1934 retrieved_source_references = reference_client.get_source_references().await?;
1935 tracing::debug!(?retrieved_source_references, "got source references");
1936
1937 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1938 .get(storage_configuration.config_set());
1939
1940 let purified_source_exports = sql_server::purify_source_exports(
1941 &*database,
1942 &mut client,
1943 &retrieved_source_references,
1944 &requested_references,
1945 &qualified_text_columns,
1946 &qualified_exclude_columns,
1947 &unresolved_source_name,
1948 timeout,
1949 &SourceReferencePolicy::Required,
1950 )
1951 .await?;
1952
1953 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1955 purified_export
1956 }
1957 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1958 let reference_client = SourceReferenceClient::LoadGenerator {
1959 generator: &load_gen_connection.load_generator,
1960 };
1961 retrieved_source_references = reference_client.get_source_references().await?;
1962
1963 let requested_exports = retrieved_source_references
1964 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1965 let export = requested_exports.into_element();
1967 PurifiedSourceExport {
1968 external_reference: export.external_reference,
1969 details: PurifiedExportDetails::LoadGenerator {
1970 table: export
1971 .meta
1972 .load_generator_desc()
1973 .ok_or_else(|| internal_err!("expected load generator source reference"))?
1974 .clone(),
1975 output: export
1976 .meta
1977 .load_generator_output()
1978 .ok_or_else(|| internal_err!("expected load generator source reference"))?
1979 .clone(),
1980 },
1981 }
1982 }
1983 GenericSourceConnection::Kafka(kafka_conn) => {
1984 let reference_client = SourceReferenceClient::Kafka {
1985 topic: &kafka_conn.topic,
1986 };
1987 retrieved_source_references = reference_client.get_source_references().await?;
1988 let requested_exports = retrieved_source_references
1989 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1990 let export = requested_exports.into_element();
1992
1993 format_options = SourceFormatOptions::Kafka {
1994 topic: kafka_conn.topic.clone(),
1995 };
1996 PurifiedSourceExport {
1997 external_reference: export.external_reference,
1998 details: PurifiedExportDetails::Kafka {},
1999 }
2000 }
2001 };
2002
2003 purify_source_format(
2004 &catalog,
2005 format,
2006 &format_options,
2007 envelope,
2008 storage_configuration,
2009 )
2010 .await?;
2011
2012 *external_reference = Some(purified_export.external_reference.clone());
2015
2016 match &purified_export.details {
2018 PurifiedExportDetails::Postgres { .. } => {
2019 let mut unsupported_cols = vec![];
2020 let postgres::PostgresExportStatementValues {
2021 columns: gen_columns,
2022 constraints: gen_constraints,
2023 text_columns: gen_text_columns,
2024 exclude_columns: gen_exclude_columns,
2025 details: gen_details,
2026 external_reference: _,
2027 } = postgres::generate_source_export_statement_values(
2028 &scx,
2029 purified_export,
2030 &mut unsupported_cols,
2031 )?;
2032 if !unsupported_cols.is_empty() {
2033 unsupported_cols.sort();
2034 Err(PgSourcePurificationError::UnrecognizedTypes {
2035 cols: unsupported_cols,
2036 })?;
2037 }
2038
2039 if let Some(text_cols_option) = with_options
2040 .iter_mut()
2041 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2042 {
2043 match gen_text_columns {
2044 Some(gen_text_columns) => {
2045 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2046 }
2047 None => soft_panic_or_log!(
2048 "text_columns should be Some if text_cols_option is present"
2049 ),
2050 }
2051 }
2052 if let Some(exclude_cols_option) = with_options
2053 .iter_mut()
2054 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2055 {
2056 match gen_exclude_columns {
2057 Some(gen_exclude_columns) => {
2058 exclude_cols_option.value =
2059 Some(WithOptionValue::Sequence(gen_exclude_columns))
2060 }
2061 None => soft_panic_or_log!(
2062 "exclude_columns should be Some if exclude_cols_option is present"
2063 ),
2064 }
2065 }
2066 match columns {
2067 TableFromSourceColumns::Defined(_) => {
2068 bail_internal!(
2071 "column definitions cannot be explicitly set for this source type"
2072 )
2073 }
2074 TableFromSourceColumns::NotSpecified => {
2075 *columns = TableFromSourceColumns::Defined(gen_columns);
2076 *constraints = gen_constraints;
2077 }
2078 TableFromSourceColumns::Named(_) => {
2079 sql_bail!("columns cannot be named for Postgres sources")
2080 }
2081 }
2082 with_options.push(TableFromSourceOption {
2083 name: TableFromSourceOptionName::Details,
2084 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2085 gen_details.into_proto().encode_to_vec(),
2086 )))),
2087 })
2088 }
2089 PurifiedExportDetails::MySql { .. } => {
2090 let mysql::MySqlExportStatementValues {
2091 columns: gen_columns,
2092 constraints: gen_constraints,
2093 text_columns: gen_text_columns,
2094 exclude_columns: gen_exclude_columns,
2095 details: gen_details,
2096 external_reference: _,
2097 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2098
2099 if let Some(text_cols_option) = with_options
2100 .iter_mut()
2101 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2102 {
2103 match gen_text_columns {
2104 Some(gen_text_columns) => {
2105 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2106 }
2107 None => soft_panic_or_log!(
2108 "text_columns should be Some if text_cols_option is present"
2109 ),
2110 }
2111 }
2112 if let Some(exclude_cols_option) = with_options
2113 .iter_mut()
2114 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2115 {
2116 match gen_exclude_columns {
2117 Some(gen_exclude_columns) => {
2118 exclude_cols_option.value =
2119 Some(WithOptionValue::Sequence(gen_exclude_columns))
2120 }
2121 None => soft_panic_or_log!(
2122 "exclude_columns should be Some if exclude_cols_option is present"
2123 ),
2124 }
2125 }
2126 match columns {
2127 TableFromSourceColumns::Defined(_) => {
2128 bail_internal!(
2131 "column definitions cannot be explicitly set for this source type"
2132 )
2133 }
2134 TableFromSourceColumns::NotSpecified => {
2135 *columns = TableFromSourceColumns::Defined(gen_columns);
2136 *constraints = gen_constraints;
2137 }
2138 TableFromSourceColumns::Named(_) => {
2139 sql_bail!("columns cannot be named for MySQL sources")
2140 }
2141 }
2142 with_options.push(TableFromSourceOption {
2143 name: TableFromSourceOptionName::Details,
2144 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2145 gen_details.into_proto().encode_to_vec(),
2146 )))),
2147 })
2148 }
2149 PurifiedExportDetails::SqlServer { .. } => {
2150 let sql_server::SqlServerExportStatementValues {
2151 columns: gen_columns,
2152 constraints: gen_constraints,
2153 text_columns: gen_text_columns,
2154 excl_columns: gen_excl_columns,
2155 details: gen_details,
2156 external_reference: _,
2157 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2158
2159 if let Some(text_cols_option) = with_options
2160 .iter_mut()
2161 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2162 {
2163 match gen_text_columns {
2164 Some(gen_text_columns) => {
2165 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2166 }
2167 None => soft_panic_or_log!(
2168 "text_columns should be Some if text_cols_option is present"
2169 ),
2170 }
2171 }
2172 if let Some(exclude_cols_option) = with_options
2173 .iter_mut()
2174 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2175 {
2176 match gen_excl_columns {
2177 Some(gen_excl_columns) => {
2178 exclude_cols_option.value =
2179 Some(WithOptionValue::Sequence(gen_excl_columns))
2180 }
2181 None => soft_panic_or_log!(
2182 "excl_columns should be Some if excl_cols_option is present"
2183 ),
2184 }
2185 }
2186
2187 match columns {
2188 TableFromSourceColumns::NotSpecified => {
2189 *columns = TableFromSourceColumns::Defined(gen_columns);
2190 *constraints = gen_constraints;
2191 }
2192 TableFromSourceColumns::Named(_) => {
2193 sql_bail!("columns cannot be named for SQL Server sources")
2194 }
2195 TableFromSourceColumns::Defined(_) => {
2196 bail_internal!(
2199 "column definitions cannot be explicitly set for this source type"
2200 )
2201 }
2202 }
2203
2204 with_options.push(TableFromSourceOption {
2205 name: TableFromSourceOptionName::Details,
2206 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2207 gen_details.into_proto().encode_to_vec(),
2208 )))),
2209 })
2210 }
2211 PurifiedExportDetails::LoadGenerator { .. } => {
2212 let (desc, output) = match purified_export.details {
2213 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2214 _ => bail_internal!("purified export details must be load generator"),
2215 };
2216 if let Some(desc) = desc {
2221 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2222 match columns {
2223 TableFromSourceColumns::Defined(_) => bail_internal!(
2226 "column definitions cannot be explicitly set for this source type"
2227 ),
2228 TableFromSourceColumns::NotSpecified => {
2229 *columns = TableFromSourceColumns::Defined(gen_columns);
2230 *constraints = gen_constraints;
2231 }
2232 TableFromSourceColumns::Named(_) => {
2233 sql_bail!("columns cannot be named for multi-output load generator sources")
2234 }
2235 }
2236 }
2237 let details = SourceExportStatementDetails::LoadGenerator { output };
2238 with_options.push(TableFromSourceOption {
2239 name: TableFromSourceOptionName::Details,
2240 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2241 details.into_proto().encode_to_vec(),
2242 )))),
2243 })
2244 }
2245 PurifiedExportDetails::Kafka {} => {
2246 let details = SourceExportStatementDetails::Kafka {};
2250 with_options.push(TableFromSourceOption {
2251 name: TableFromSourceOptionName::Details,
2252 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2253 details.into_proto().encode_to_vec(),
2254 )))),
2255 })
2256 }
2257 };
2258
2259 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2263}
2264
2265enum SourceFormatOptions {
2266 Default,
2267 Kafka { topic: String },
2268}
2269
2270async fn purify_source_format(
2271 catalog: &dyn SessionCatalog,
2272 format: &mut Option<FormatSpecifier<Aug>>,
2273 options: &SourceFormatOptions,
2274 envelope: &Option<SourceEnvelope>,
2275 storage_configuration: &StorageConfiguration,
2276) -> Result<(), PlanError> {
2277 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2278 && !matches!(options, SourceFormatOptions::Kafka { .. })
2279 {
2280 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2281 }
2282
2283 match format.as_mut() {
2284 None => {}
2285 Some(FormatSpecifier::Bare(format)) => {
2286 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2287 .await?;
2288 }
2289
2290 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2291 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2292 .await?;
2293 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2294 .await?;
2295 }
2296 }
2297 Ok(())
2298}
2299
2300async fn purify_source_format_single(
2301 catalog: &dyn SessionCatalog,
2302 format: &mut Format<Aug>,
2303 options: &SourceFormatOptions,
2304 envelope: &Option<SourceEnvelope>,
2305 storage_configuration: &StorageConfiguration,
2306) -> Result<(), PlanError> {
2307 match format {
2308 Format::Avro(schema) => match schema {
2309 AvroSchema::Csr { csr_connection } => {
2310 purify_csr_connection_avro(
2311 catalog,
2312 options,
2313 csr_connection,
2314 envelope,
2315 storage_configuration,
2316 )
2317 .await?
2318 }
2319 AvroSchema::InlineSchema { .. } => {}
2320 },
2321 Format::Protobuf(schema) => match schema {
2322 ProtobufSchema::Csr { csr_connection } => {
2323 purify_csr_connection_proto(
2324 catalog,
2325 options,
2326 csr_connection,
2327 envelope,
2328 storage_configuration,
2329 )
2330 .await?;
2331 }
2332 ProtobufSchema::InlineSchema { .. } => {}
2333 },
2334 Format::Bytes
2335 | Format::Regex(_)
2336 | Format::Json { .. }
2337 | Format::Text
2338 | Format::Csv { .. } => (),
2339 }
2340 Ok(())
2341}
2342
2343pub fn generate_subsource_statements(
2344 scx: &StatementContext,
2345 source_name: ResolvedItemName,
2346 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2347) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2348 if subsources.is_empty() {
2350 return Ok(vec![]);
2351 }
2352 let (_, purified_export) = subsources
2353 .iter()
2354 .next()
2355 .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2356
2357 let statements = match &purified_export.details {
2358 PurifiedExportDetails::Postgres { .. } => {
2359 crate::pure::postgres::generate_create_subsource_statements(
2360 scx,
2361 source_name,
2362 subsources,
2363 )?
2364 }
2365 PurifiedExportDetails::MySql { .. } => {
2366 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2367 }
2368 PurifiedExportDetails::SqlServer { .. } => {
2369 crate::pure::sql_server::generate_create_subsource_statements(
2370 scx,
2371 source_name,
2372 subsources,
2373 )?
2374 }
2375 PurifiedExportDetails::LoadGenerator { .. } => {
2376 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2377 for (subsource_name, purified_export) in subsources {
2378 let (desc, output) = match purified_export.details {
2379 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2380 _ => {
2381 bail_internal!("purified export details must be load generator")
2382 }
2383 };
2384 let desc = desc.ok_or_else(|| {
2385 internal_err!(
2386 "subsources cannot be generated for single-output load generators"
2387 )
2388 })?;
2389
2390 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2391 let details = SourceExportStatementDetails::LoadGenerator { output };
2392 let subsource = CreateSubsourceStatement {
2394 name: subsource_name,
2395 columns,
2396 of_source: Some(source_name.clone()),
2397 constraints: table_constraints,
2402 if_not_exists: false,
2403 with_options: vec![
2404 CreateSubsourceOption {
2405 name: CreateSubsourceOptionName::ExternalReference,
2406 value: Some(WithOptionValue::UnresolvedItemName(
2407 purified_export.external_reference,
2408 )),
2409 },
2410 CreateSubsourceOption {
2411 name: CreateSubsourceOptionName::Details,
2412 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2413 details.into_proto().encode_to_vec(),
2414 )))),
2415 },
2416 ],
2417 };
2418 subsource_stmts.push(subsource);
2419 }
2420
2421 subsource_stmts
2422 }
2423 PurifiedExportDetails::Kafka { .. } => {
2424 if !subsources.is_empty() {
2428 bail_internal!("Kafka sources do not produce data-bearing subsources");
2429 }
2430 vec![]
2431 }
2432 };
2433 Ok(statements)
2434}
2435
2436async fn purify_csr_connection_proto(
2437 catalog: &dyn SessionCatalog,
2438 options: &SourceFormatOptions,
2439 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2440 envelope: &Option<SourceEnvelope>,
2441 storage_configuration: &StorageConfiguration,
2442) -> Result<(), PlanError> {
2443 let SourceFormatOptions::Kafka { topic } = options else {
2444 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2445 };
2446
2447 let CsrConnectionProtobuf {
2448 seed,
2449 connection: CsrConnection {
2450 connection,
2451 options: _,
2452 },
2453 } = csr_connection;
2454 match seed {
2455 None => {
2456 let scx = StatementContext::new(None, &*catalog);
2457
2458 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2459 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2460 _ => sql_bail!("{} is not a schema registry connection", connection),
2461 };
2462
2463 let ccsr_client = ccsr_connection
2464 .connect(storage_configuration, InTask::No)
2465 .await
2466 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2467
2468 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2469 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2470 .await
2471 .ok();
2472
2473 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2474 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2475 }
2476
2477 *seed = Some(CsrSeedProtobuf { value, key });
2478 }
2479 Some(_) => (),
2480 }
2481
2482 Ok(())
2483}
2484
2485async fn purify_csr_connection_avro(
2486 catalog: &dyn SessionCatalog,
2487 options: &SourceFormatOptions,
2488 csr_connection: &mut CsrConnectionAvro<Aug>,
2489 envelope: &Option<SourceEnvelope>,
2490 storage_configuration: &StorageConfiguration,
2491) -> Result<(), PlanError> {
2492 let SourceFormatOptions::Kafka { topic } = options else {
2493 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2494 };
2495
2496 let CsrConnectionAvro {
2497 connection: CsrConnection { connection, .. },
2498 seed,
2499 key_strategy,
2500 value_strategy,
2501 } = csr_connection;
2502 if seed.is_none() {
2503 let scx = StatementContext::new(None, &*catalog);
2504 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2505 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2506 _ => sql_bail!("{} is not a schema registry connection", connection),
2507 };
2508 let ccsr_client = csr_connection
2509 .connect(storage_configuration, InTask::No)
2510 .await
2511 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2512
2513 let Schema {
2514 key_schema,
2515 value_schema,
2516 key_reference_schemas,
2517 value_reference_schemas,
2518 } = get_remote_csr_schema(
2519 &ccsr_client,
2520 key_strategy.clone().unwrap_or_default(),
2521 value_strategy.clone().unwrap_or_default(),
2522 topic,
2523 )
2524 .await?;
2525 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2526 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2527 }
2528
2529 *seed = Some(CsrSeedAvro {
2530 key_schema,
2531 value_schema,
2532 key_reference_schemas,
2533 value_reference_schemas,
2534 })
2535 }
2536
2537 Ok(())
2538}
2539
2540#[derive(Debug)]
2541pub struct Schema {
2542 pub key_schema: Option<String>,
2543 pub value_schema: String,
2544 pub key_reference_schemas: Vec<String>,
2546 pub value_reference_schemas: Vec<String>,
2548}
2549
2550struct SchemaWithReferences {
2552 schema: String,
2554 references: Vec<String>,
2556}
2557
2558async fn get_schema_with_strategy(
2559 client: &Client,
2560 strategy: ReaderSchemaSelectionStrategy,
2561 subject: &str,
2562) -> Result<Option<SchemaWithReferences>, PlanError> {
2563 match strategy {
2564 ReaderSchemaSelectionStrategy::Latest => {
2565 match client.get_subject_and_references(subject).await {
2567 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2568 schema: primary.schema.raw,
2569 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2570 })),
2571 Err(GetBySubjectError::SubjectNotFound)
2572 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2573 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2574 schema_lookup: format!("subject {}", subject.quoted()),
2575 cause: Arc::new(e),
2576 }),
2577 }
2578 }
2579 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2583 schema: raw,
2584 references: vec![],
2585 })),
2586 ReaderSchemaSelectionStrategy::ById(id) => {
2587 match client.get_subject_and_references_by_id(id).await {
2588 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2589 schema: primary.schema.raw,
2590 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2591 })),
2592 Err(GetBySubjectError::SubjectNotFound)
2593 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2594 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2595 schema_lookup: format!("subject {}", subject.quoted()),
2596 cause: Arc::new(e),
2597 }),
2598 }
2599 }
2600 }
2601}
2602
2603async fn get_remote_csr_schema(
2604 ccsr_client: &mz_ccsr::Client,
2605 key_strategy: ReaderSchemaSelectionStrategy,
2606 value_strategy: ReaderSchemaSelectionStrategy,
2607 topic: &str,
2608) -> Result<Schema, PlanError> {
2609 let value_schema_name = format!("{}-value", topic);
2610 let value_result =
2611 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2612 let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2613
2614 let key_subject = format!("{}-key", topic);
2615 let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2616 Ok(Schema {
2617 key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2618 value_schema: value_result.schema,
2619 key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2620 value_reference_schemas: value_result.references,
2621 })
2622}
2623
2624async fn compile_proto(
2626 subject_name: &String,
2627 ccsr_client: &Client,
2628) -> Result<CsrSeedProtobufSchema, PlanError> {
2629 let (primary_subject, dependency_subjects) = ccsr_client
2630 .get_subject_and_references(subject_name)
2631 .await
2632 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2633 schema_lookup: format!("subject {}", subject_name.quoted()),
2634 cause: Arc::new(e),
2635 })?;
2636
2637 let mut source_tree = VirtualSourceTree::new();
2639
2640 source_tree.as_mut().map_well_known_types();
2644
2645 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2646 source_tree.as_mut().add_file(
2647 Path::new(&subject.name),
2648 subject.schema.raw.as_bytes().to_vec(),
2649 );
2650 }
2651 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2652 let fds = db
2653 .as_mut()
2654 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2655 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2656
2657 let primary_fd = fds.file(0);
2659 let message_name = match primary_fd.message_type_size() {
2660 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2661 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2662 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2663 };
2664
2665 let bytes = &fds
2667 .serialize()
2668 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2669 let mut schema = String::new();
2670 strconv::format_bytes(&mut schema, bytes);
2671
2672 Ok(CsrSeedProtobufSchema {
2673 schema,
2674 message_name,
2675 })
2676}
2677
2678const MZ_NOW_NAME: &str = "mz_now";
2679const MZ_NOW_SCHEMA: &str = "mz_catalog";
2680
2681pub fn purify_create_materialized_view_options(
2687 catalog: impl SessionCatalog,
2688 mz_now: Option<Timestamp>,
2689 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2690 resolved_ids: &mut ResolvedIds,
2691) {
2692 let (mz_now_id, mz_now_expr) = {
2695 let item = catalog
2696 .resolve_function(&PartialItemName {
2697 database: None,
2698 schema: Some(MZ_NOW_SCHEMA.to_string()),
2699 item: MZ_NOW_NAME.to_string(),
2700 })
2701 .expect("we should be able to resolve mz_now");
2702 (
2703 item.id(),
2704 Expr::Function(Function {
2705 name: ResolvedItemName::Item {
2706 id: item.id(),
2707 qualifiers: item.name().qualifiers.clone(),
2708 full_name: catalog.resolve_full_name(item.name()),
2709 print_id: false,
2710 version: RelationVersionSelector::Latest,
2711 },
2712 args: FunctionArgs::Args {
2713 args: Vec::new(),
2714 order_by: Vec::new(),
2715 },
2716 filter: None,
2717 over: None,
2718 distinct: false,
2719 }),
2720 )
2721 };
2722 let (mz_timestamp_id, mz_timestamp_type) = {
2724 let item = catalog.get_system_type("mz_timestamp");
2725 let full_name = catalog.resolve_full_name(item.name());
2726 (
2727 item.id(),
2728 ResolvedDataType::Named {
2729 id: item.id(),
2730 qualifiers: item.name().qualifiers.clone(),
2731 full_name,
2732 modifiers: vec![],
2733 print_id: true,
2734 },
2735 )
2736 };
2737
2738 let mut introduced_mz_timestamp = false;
2739
2740 for option in cmvs.with_options.iter_mut() {
2741 if matches!(
2743 option.value,
2744 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2745 ) {
2746 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2747 RefreshAtOptionValue {
2748 time: mz_now_expr.clone(),
2749 },
2750 )));
2751 }
2752
2753 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2755 RefreshEveryOptionValue { aligned_to, .. },
2756 ))) = &mut option.value
2757 {
2758 if aligned_to.is_none() {
2759 *aligned_to = Some(mz_now_expr.clone());
2760 }
2761 }
2762
2763 match &mut option.value {
2766 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2767 time,
2768 }))) => {
2769 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2770 visitor.visit_expr_mut(time);
2771 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2772 }
2773 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2774 RefreshEveryOptionValue {
2775 interval: _,
2776 aligned_to: Some(aligned_to),
2777 },
2778 ))) => {
2779 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2780 visitor.visit_expr_mut(aligned_to);
2781 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2782 }
2783 _ => {}
2784 }
2785 }
2786
2787 if !cmvs.with_options.iter().any(|o| {
2789 matches!(
2790 o,
2791 MaterializedViewOption {
2792 value: Some(WithOptionValue::Refresh(..)),
2793 ..
2794 }
2795 )
2796 }) {
2797 cmvs.with_options.push(MaterializedViewOption {
2798 name: MaterializedViewOptionName::Refresh,
2799 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2800 })
2801 }
2802
2803 if introduced_mz_timestamp {
2807 resolved_ids.add_item(mz_timestamp_id);
2808 }
2809 let mut visitor = ExprContainsTemporalVisitor::new();
2813 visitor.visit_create_materialized_view_statement(cmvs);
2814 if !visitor.contains_temporal {
2815 resolved_ids.remove_item(&mz_now_id);
2816 }
2817}
2818
2819pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2822 match &mvo.value {
2823 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2824 let mut visitor = ExprContainsTemporalVisitor::new();
2825 visitor.visit_expr(time);
2826 visitor.contains_temporal
2827 }
2828 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2829 interval: _,
2830 aligned_to: Some(aligned_to),
2831 }))) => {
2832 let mut visitor = ExprContainsTemporalVisitor::new();
2833 visitor.visit_expr(aligned_to);
2834 visitor.contains_temporal
2835 }
2836 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2837 interval: _,
2838 aligned_to: None,
2839 }))) => {
2840 true
2843 }
2844 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2845 true
2847 }
2848 _ => false,
2849 }
2850}
2851
2852struct ExprContainsTemporalVisitor {
2854 pub contains_temporal: bool,
2855}
2856
2857impl ExprContainsTemporalVisitor {
2858 pub fn new() -> ExprContainsTemporalVisitor {
2859 ExprContainsTemporalVisitor {
2860 contains_temporal: false,
2861 }
2862 }
2863}
2864
2865impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2866 fn visit_function(&mut self, func: &Function<Aug>) {
2867 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2868 visit_function(self, func);
2869 }
2870}
2871
2872struct MzNowPurifierVisitor {
2873 pub mz_now: Option<Timestamp>,
2874 pub mz_timestamp_type: ResolvedDataType,
2875 pub introduced_mz_timestamp: bool,
2876}
2877
2878impl MzNowPurifierVisitor {
2879 pub fn new(
2880 mz_now: Option<Timestamp>,
2881 mz_timestamp_type: ResolvedDataType,
2882 ) -> MzNowPurifierVisitor {
2883 MzNowPurifierVisitor {
2884 mz_now,
2885 mz_timestamp_type,
2886 introduced_mz_timestamp: false,
2887 }
2888 }
2889}
2890
2891impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2892 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2893 match expr {
2894 Expr::Function(Function {
2895 name:
2896 ResolvedItemName::Item {
2897 full_name: FullItemName { item, .. },
2898 ..
2899 },
2900 ..
2901 }) if item == &MZ_NOW_NAME.to_string() => {
2902 let mz_now = self.mz_now.expect(
2903 "we should have chosen a timestamp if the expression contains mz_now()",
2904 );
2905 *expr = Expr::Cast {
2908 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2909 data_type: self.mz_timestamp_type.clone(),
2910 };
2911 self.introduced_mz_timestamp = true;
2912 }
2913 _ => visit_expr_mut(self, expr),
2914 }
2915 }
2916}