1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::soft_panic_or_log;
33use mz_ore::str::StrExt;
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81 ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103 external_reference: UnresolvedItemName,
104 name: UnresolvedItemName,
105 meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 f.debug_struct("RequestedSourceExport")
111 .field("external_reference", &self.external_reference)
112 .field("name", &self.name)
113 .field("meta", &self.meta)
114 .finish()
115 }
116}
117
118impl<T> RequestedSourceExport<T> {
119 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120 RequestedSourceExport {
121 external_reference: self.external_reference,
122 name: self.name,
123 meta: new_meta,
124 }
125 }
126}
127
128fn source_export_name_gen(
133 source_name: &UnresolvedItemName,
134 subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137 partial.item = subsource_name.to_string();
138 Ok(UnresolvedItemName::from(partial))
139}
140
141fn validate_source_export_names<T>(
145 requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147 if let Some(name) = requested_source_exports
151 .iter()
152 .map(|subsource| &subsource.name)
153 .duplicates()
154 .next()
155 .cloned()
156 {
157 let mut upstream_references: Vec<_> = requested_source_exports
158 .into_iter()
159 .filter_map(|subsource| {
160 if &subsource.name == &name {
161 Some(subsource.external_reference.clone())
162 } else {
163 None
164 }
165 })
166 .collect();
167
168 upstream_references.sort();
169
170 Err(PlanError::SubsourceNameConflict {
171 name,
172 upstream_references,
173 })?;
174 }
175
176 if let Some(name) = requested_source_exports
182 .iter()
183 .map(|export| &export.external_reference)
184 .duplicates()
185 .next()
186 .cloned()
187 {
188 let mut target_names: Vec<_> = requested_source_exports
189 .into_iter()
190 .filter_map(|export| {
191 if &export.external_reference == &name {
192 Some(export.name.clone())
193 } else {
194 None
195 }
196 })
197 .collect();
198
199 target_names.sort();
200
201 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202 }
203
204 Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209 PurifiedCreateSource {
210 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212 create_source_stmt: CreateSourceStatement<Aug>,
213 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215 available_source_references: SourceReferences,
218 },
219 PurifiedAlterSource {
220 alter_source_stmt: AlterSourceStatement<Aug>,
221 },
222 PurifiedAlterSourceAddSubsources {
223 source_name: ResolvedItemName,
225 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230 },
231 PurifiedAlterSourceRefreshReferences {
232 source_name: ResolvedItemName,
233 available_source_references: SourceReferences,
235 },
236 PurifiedCreateSink(CreateSinkStatement<Aug>),
237 PurifiedCreateTableFromSource {
238 stmt: CreateTableFromSourceStatement<Aug>,
239 },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244 pub external_reference: UnresolvedItemName,
245 pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250 MySql {
251 table: MySqlTableDesc,
252 text_columns: Option<Vec<Ident>>,
253 exclude_columns: Option<Vec<Ident>>,
254 initial_gtid_set: String,
255 },
256 Postgres {
257 table: PostgresTableDesc,
258 text_columns: Option<Vec<Ident>>,
259 exclude_columns: Option<Vec<Ident>>,
260 },
261 SqlServer {
262 table: SqlServerTableDesc,
263 text_columns: Option<Vec<Ident>>,
264 excl_columns: Option<Vec<Ident>>,
265 capture_instance: Arc<str>,
266 initial_lsn: mz_sql_server_util::cdc::Lsn,
267 },
268 Kafka {},
269 LoadGenerator {
270 table: Option<RelationDesc>,
271 output: LoadGeneratorOutput,
272 },
273}
274
275pub async fn purify_statement(
285 catalog: impl SessionCatalog,
286 now: u64,
287 stmt: Statement<Aug>,
288 storage_configuration: &StorageConfiguration,
289) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
290 match stmt {
291 Statement::CreateSource(stmt) => {
292 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
293 (
294 purify_create_source(catalog, now, stmt, storage_configuration).await,
295 cluster_id,
296 )
297 }
298 Statement::AlterSource(stmt) => (
299 purify_alter_source(catalog, stmt, storage_configuration).await,
300 None,
301 ),
302 Statement::CreateSink(stmt) => {
303 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
304 (
305 purify_create_sink(catalog, stmt, storage_configuration).await,
306 cluster_id,
307 )
308 }
309 Statement::CreateTableFromSource(stmt) => (
310 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
311 None,
312 ),
313 o => unreachable!("{:?} does not need to be purified", o),
314 }
315}
316
317pub(crate) fn purify_create_sink_avro_doc_on_options(
322 catalog: &dyn SessionCatalog,
323 from_id: CatalogItemId,
324 format: &mut Option<FormatSpecifier<Aug>>,
325) -> Result<(), PlanError> {
326 let from = catalog.get_item(&from_id);
328 let object_ids = from
329 .references()
330 .items()
331 .copied()
332 .chain_one(from.id())
333 .collect::<Vec<_>>();
334
335 let mut avro_format_options = vec![];
338 for_each_format(format, |doc_on_schema, fmt| match fmt {
339 Format::Avro(AvroSchema::InlineSchema { .. })
340 | Format::Bytes
341 | Format::Csv { .. }
342 | Format::Json { .. }
343 | Format::Protobuf(..)
344 | Format::Regex(..)
345 | Format::Text => (),
346 Format::Avro(AvroSchema::Csr {
347 csr_connection: CsrConnectionAvro { connection, .. },
348 }) => {
349 avro_format_options.push((doc_on_schema, &mut connection.options));
350 }
351 });
352
353 for (for_schema, options) in avro_format_options {
356 let user_provided_comments = options
357 .iter()
358 .filter_map(|CsrConfigOption { name, .. }| match name {
359 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
360 _ => None,
361 })
362 .collect::<BTreeSet<_>>();
363
364 for object_id in &object_ids {
366 let item = catalog
368 .get_item(object_id)
369 .at_version(RelationVersionSelector::Latest);
370 let full_resolved_name = ResolvedItemName::Item {
371 id: *object_id,
372 qualifiers: item.name().qualifiers.clone(),
373 full_name: catalog.resolve_full_name(item.name()),
374 print_id: !matches!(
375 item.item_type(),
376 CatalogItemType::Func | CatalogItemType::Type
377 ),
378 version: RelationVersionSelector::Latest,
379 };
380
381 if let Some(comments_map) = catalog.get_item_comments(object_id) {
382 let doc_on_item_key = AvroDocOn {
385 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
386 for_schema,
387 };
388 if !user_provided_comments.contains(&doc_on_item_key) {
389 if let Some(root_comment) = comments_map.get(&None) {
390 options.push(CsrConfigOption {
391 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
392 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
393 root_comment.clone(),
394 ))),
395 });
396 }
397 }
398
399 let column_descs = match item.type_details() {
403 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
404 None => item.relation_desc().map(|d| d.into_owned()),
405 };
406
407 if let Some(desc) = column_descs {
408 for (pos, column_name) in desc.iter_names().enumerate() {
409 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
410 let doc_on_column_key = AvroDocOn {
411 identifier: DocOnIdentifier::Column(ColumnName {
412 relation: full_resolved_name.clone(),
413 column: ResolvedColumnReference::Column {
414 name: column_name.to_owned(),
415 index: pos,
416 },
417 }),
418 for_schema,
419 };
420 if !user_provided_comments.contains(&doc_on_column_key) {
421 options.push(CsrConfigOption {
422 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
423 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
424 Value::String(comment_str.clone()),
425 )),
426 });
427 }
428 }
429 }
430 }
431 }
432 }
433 }
434
435 Ok(())
436}
437
438async fn purify_create_sink(
441 catalog: impl SessionCatalog,
442 mut create_sink_stmt: CreateSinkStatement<Aug>,
443 storage_configuration: &StorageConfiguration,
444) -> Result<PurifiedStatement, PlanError> {
445 let CreateSinkStatement {
447 connection,
448 format,
449 with_options,
450 name: _,
451 in_cluster: _,
452 if_not_exists: _,
453 from,
454 envelope: _,
455 mode: _,
456 } = &mut create_sink_stmt;
457
458 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
460 CreateSinkOptionName::Snapshot,
461 CreateSinkOptionName::CommitInterval,
462 ];
463
464 if let Some(op) = with_options
465 .iter()
466 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
467 {
468 sql_bail!(
469 "CREATE SINK...WITH ({}..) is not allowed",
470 op.name.to_ast_string_simple(),
471 )
472 }
473
474 match &connection {
475 CreateSinkConnection::Kafka {
476 connection,
477 options,
478 key: _,
479 headers: _,
480 } => {
481 let scx = StatementContext::new(None, &catalog);
486 let connection = {
487 let item = scx.get_item_by_resolved_name(connection)?;
488 match item.connection()? {
490 Connection::Kafka(connection) => {
491 connection.clone().into_inline_connection(scx.catalog)
492 }
493 _ => sql_bail!(
494 "{} is not a kafka connection",
495 scx.catalog.resolve_full_name(item.name())
496 ),
497 }
498 };
499
500 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
501
502 if extracted_options.legacy_ids == Some(true) {
503 sql_bail!("LEGACY IDs option is not supported");
504 }
505
506 let client: AdminClient<_> = connection
507 .create_with_context(
508 storage_configuration,
509 MzClientContext::default(),
510 &BTreeMap::new(),
511 InTask::No,
512 )
513 .await
514 .map_err(|e| {
515 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
517 })?;
518
519 let metadata = client
520 .inner()
521 .fetch_metadata(
522 None,
523 storage_configuration
524 .parameters
525 .kafka_timeout_config
526 .fetch_metadata_timeout,
527 )
528 .map_err(|e| {
529 KafkaSinkPurificationError::AdminClientError(Arc::new(
530 ContextCreationError::KafkaError(e),
531 ))
532 })?;
533
534 if metadata.brokers().len() == 0 {
535 Err(KafkaSinkPurificationError::ZeroBrokers)?;
536 }
537 }
538 CreateSinkConnection::Iceberg {
539 connection,
540 aws_connection,
541 ..
542 } => {
543 let scx = StatementContext::new(None, &catalog);
544 let connection = {
545 let item = scx.get_item_by_resolved_name(connection)?;
546 match item.connection()? {
548 Connection::IcebergCatalog(connection) => {
549 connection.clone().into_inline_connection(scx.catalog)
550 }
551 _ => sql_bail!(
552 "{} is not an iceberg connection",
553 scx.catalog.resolve_full_name(item.name())
554 ),
555 }
556 };
557
558 let aws_conn_id = aws_connection.item_id();
559
560 let aws_connection = {
561 let item = scx.get_item_by_resolved_name(aws_connection)?;
562 match item.connection()? {
564 Connection::Aws(aws_connection) => aws_connection.clone(),
565 _ => sql_bail!(
566 "{} is not an aws connection",
567 scx.catalog.resolve_full_name(item.name())
568 ),
569 }
570 };
571
572 if let Some(s3tables) = connection.s3tables_catalog() {
576 let enable_region_check =
577 ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
578 if enable_region_check {
579 let env_id = &catalog.config().environment_id;
580 if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
581 let env_region = env_id.cloud_provider_region();
582 let s3_tables_region = s3tables
585 .aws_connection
586 .connection
587 .region
588 .clone()
589 .unwrap_or_else(|| "us-east-1".to_string());
590 if s3_tables_region != env_region {
591 Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
592 s3_tables_region,
593 environment_region: env_region.to_string(),
594 })?;
595 }
596 }
597 }
598 }
599
600 let _catalog = connection
601 .connect(storage_configuration, InTask::No)
602 .await
603 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
604
605 let _sdk_config = aws_connection
606 .load_sdk_config(
607 &storage_configuration.connection_context,
608 aws_conn_id.clone(),
609 InTask::No,
610 )
611 .await
612 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
613 }
614 }
615
616 let mut csr_connection_ids = BTreeSet::new();
617 for_each_format(format, |_, fmt| match fmt {
618 Format::Avro(AvroSchema::InlineSchema { .. })
619 | Format::Bytes
620 | Format::Csv { .. }
621 | Format::Json { .. }
622 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
623 | Format::Regex(..)
624 | Format::Text => (),
625 Format::Avro(AvroSchema::Csr {
626 csr_connection: CsrConnectionAvro { connection, .. },
627 })
628 | Format::Protobuf(ProtobufSchema::Csr {
629 csr_connection: CsrConnectionProtobuf { connection, .. },
630 }) => {
631 csr_connection_ids.insert(*connection.connection.item_id());
632 }
633 });
634
635 let scx = StatementContext::new(None, &catalog);
636 for csr_connection_id in csr_connection_ids {
637 let connection = {
638 let item = scx.get_item(&csr_connection_id);
639 match item.connection()? {
641 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
642 _ => Err(CsrPurificationError::NotCsrConnection(
643 scx.catalog.resolve_full_name(item.name()),
644 ))?,
645 }
646 };
647
648 let client = connection
649 .connect(storage_configuration, InTask::No)
650 .await
651 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
652
653 client
654 .list_subjects()
655 .await
656 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
657 }
658
659 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
660
661 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
662}
663
664fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
671where
672 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
673{
674 match format {
675 None => (),
676 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
677 Some(FormatSpecifier::KeyValue { key, value }) => {
678 f(DocOnSchema::KeyOnly, key);
679 f(DocOnSchema::ValueOnly, value);
680 }
681 }
682}
683
684#[derive(Debug, Copy, Clone, PartialEq, Eq)]
687pub(crate) enum SourceReferencePolicy {
688 NotAllowed,
691 Optional,
694 Required,
697}
698
699async fn purify_create_source(
700 catalog: impl SessionCatalog,
701 now: u64,
702 mut create_source_stmt: CreateSourceStatement<Aug>,
703 storage_configuration: &StorageConfiguration,
704) -> Result<PurifiedStatement, PlanError> {
705 let CreateSourceStatement {
706 name: source_name,
707 col_names,
708 key_constraint,
709 connection: source_connection,
710 format,
711 envelope,
712 include_metadata,
713 external_references,
714 progress_subsource,
715 with_options,
716 ..
717 } = &mut create_source_stmt;
718
719 let uses_old_syntax = !col_names.is_empty()
720 || key_constraint.is_some()
721 || format.is_some()
722 || envelope.is_some()
723 || !include_metadata.is_empty()
724 || external_references.is_some()
725 || progress_subsource.is_some();
726
727 if let Some(DeferredItemName::Named(_)) = progress_subsource {
728 sql_bail!("Cannot manually ID qualify progress subsource")
729 }
730
731 let mut requested_subsource_map = BTreeMap::new();
732
733 let progress_desc = match &source_connection {
734 CreateSourceConnection::Kafka { .. } => {
735 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
736 }
737 CreateSourceConnection::Postgres { .. } => {
738 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
739 }
740 CreateSourceConnection::SqlServer { .. } => {
741 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
742 }
743 CreateSourceConnection::MySql { .. } => {
744 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
745 }
746 CreateSourceConnection::LoadGenerator { .. } => {
747 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
748 }
749 };
750 let scx = StatementContext::new(None, &catalog);
751
752 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
756 && scx.catalog.system_vars().force_source_table_syntax()
757 {
758 SourceReferencePolicy::NotAllowed
759 } else if scx.catalog.system_vars().enable_create_table_from_source() {
760 SourceReferencePolicy::Optional
761 } else {
762 SourceReferencePolicy::Required
763 };
764
765 let mut format_options = SourceFormatOptions::Default;
766
767 let retrieved_source_references: RetrievedSourceReferences;
768
769 match source_connection {
770 CreateSourceConnection::Kafka {
771 connection,
772 options: base_with_options,
773 ..
774 } => {
775 if let Some(external_references) = external_references {
776 Err(KafkaSourcePurificationError::ReferencedSubsources(
777 external_references.clone(),
778 ))?;
779 }
780
781 let connection = {
782 let item = scx.get_item_by_resolved_name(connection)?;
783 match item.connection()? {
785 Connection::Kafka(connection) => {
786 connection.clone().into_inline_connection(&catalog)
787 }
788 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
789 scx.catalog.resolve_full_name(item.name()),
790 ))?,
791 }
792 };
793
794 let extracted_options: KafkaSourceConfigOptionExtracted =
795 base_with_options.clone().try_into()?;
796
797 let topic = extracted_options
798 .topic
799 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
800
801 let consumer = connection
802 .create_with_context(
803 storage_configuration,
804 MzClientContext::default(),
805 &BTreeMap::new(),
806 InTask::No,
807 )
808 .await
809 .map_err(|e| {
810 KafkaSourcePurificationError::KafkaConsumerError(
812 e.display_with_causes().to_string(),
813 )
814 })?;
815 let consumer = Arc::new(consumer);
816
817 match (
818 extracted_options.start_offset,
819 extracted_options.start_timestamp,
820 ) {
821 (None, None) => {
822 kafka_util::ensure_topic_exists(
824 Arc::clone(&consumer),
825 &topic,
826 storage_configuration
827 .parameters
828 .kafka_timeout_config
829 .fetch_metadata_timeout,
830 )
831 .await?;
832 }
833 (Some(_), Some(_)) => {
834 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
835 }
836 (Some(start_offsets), None) => {
837 kafka_util::validate_start_offsets(
839 Arc::clone(&consumer),
840 &topic,
841 start_offsets,
842 storage_configuration
843 .parameters
844 .kafka_timeout_config
845 .fetch_metadata_timeout,
846 )
847 .await?;
848 }
849 (None, Some(time_offset)) => {
850 let start_offsets = kafka_util::lookup_start_offsets(
852 Arc::clone(&consumer),
853 &topic,
854 time_offset,
855 now,
856 storage_configuration
857 .parameters
858 .kafka_timeout_config
859 .fetch_metadata_timeout,
860 )
861 .await?;
862
863 base_with_options.retain(|val| {
864 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
865 });
866 base_with_options.push(KafkaSourceConfigOption {
867 name: KafkaSourceConfigOptionName::StartOffset,
868 value: Some(WithOptionValue::Sequence(
869 start_offsets
870 .iter()
871 .map(|offset| {
872 WithOptionValue::Value(Value::Number(offset.to_string()))
873 })
874 .collect(),
875 )),
876 });
877 }
878 }
879
880 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
881 retrieved_source_references = reference_client.get_source_references().await?;
882
883 format_options = SourceFormatOptions::Kafka { topic };
884 }
885 CreateSourceConnection::Postgres {
886 connection,
887 options,
888 } => {
889 let connection_item = scx.get_item_by_resolved_name(connection)?;
890 let connection = match connection_item.connection().map_err(PlanError::from)? {
891 Connection::Postgres(connection) => {
892 connection.clone().into_inline_connection(&catalog)
893 }
894 _ => Err(PgSourcePurificationError::NotPgConnection(
895 scx.catalog.resolve_full_name(connection_item.name()),
896 ))?,
897 };
898 let crate::plan::statement::PgConfigOptionExtracted {
899 publication,
900 text_columns,
901 exclude_columns,
902 details,
903 ..
904 } = options.clone().try_into()?;
905 let publication =
906 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
907
908 if details.is_some() {
909 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
910 }
911
912 let client = connection
913 .validate(connection_item.id(), storage_configuration)
914 .await?;
915
916 let reference_client = SourceReferenceClient::Postgres {
917 client: &client,
918 publication: &publication,
919 database: &connection.database,
920 };
921 retrieved_source_references = reference_client.get_source_references().await?;
922
923 let postgres::PurifiedSourceExports {
924 source_exports: subsources,
925 normalized_text_columns,
926 } = postgres::purify_source_exports(
927 &client,
928 &retrieved_source_references,
929 external_references,
930 text_columns,
931 exclude_columns,
932 source_name,
933 &reference_policy,
934 )
935 .await?;
936
937 if let Some(text_cols_option) = options
938 .iter_mut()
939 .find(|option| option.name == PgConfigOptionName::TextColumns)
940 {
941 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
942 }
943
944 requested_subsource_map.extend(subsources);
945
946 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
949
950 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
952 let details = PostgresSourcePublicationDetails {
953 slot: format!(
954 "materialize_{}",
955 Uuid::new_v4().to_string().replace('-', "")
956 ),
957 timeline_id: Some(timeline_id),
958 database: connection.database,
959 };
960 options.push(PgConfigOption {
961 name: PgConfigOptionName::Details,
962 value: Some(WithOptionValue::Value(Value::String(hex::encode(
963 details.into_proto().encode_to_vec(),
964 )))),
965 })
966 }
967 CreateSourceConnection::SqlServer {
968 connection,
969 options,
970 } => {
971 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
972
973 let connection_item = scx.get_item_by_resolved_name(connection)?;
976 let connection = match connection_item.connection()? {
977 Connection::SqlServer(connection) => {
978 connection.clone().into_inline_connection(&catalog)
979 }
980 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
981 scx.catalog.resolve_full_name(connection_item.name()),
982 ))?,
983 };
984 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
985 details,
986 text_columns,
987 exclude_columns,
988 seen: _,
989 } = options.clone().try_into()?;
990
991 if details.is_some() {
992 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
993 }
994
995 let mut client = connection
996 .validate(connection_item.id(), storage_configuration)
997 .await?;
998
999 let database: Arc<str> = connection.database.into();
1000 let reference_client = SourceReferenceClient::SqlServer {
1001 client: &mut client,
1002 database: Arc::clone(&database),
1003 };
1004 retrieved_source_references = reference_client.get_source_references().await?;
1005 tracing::debug!(?retrieved_source_references, "got source references");
1006
1007 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1008 .get(storage_configuration.config_set());
1009
1010 let purified_source_exports = sql_server::purify_source_exports(
1011 &*database,
1012 &mut client,
1013 &retrieved_source_references,
1014 external_references,
1015 &text_columns,
1016 &exclude_columns,
1017 source_name,
1018 timeout,
1019 &reference_policy,
1020 )
1021 .await?;
1022
1023 let sql_server::PurifiedSourceExports {
1024 source_exports,
1025 normalized_text_columns,
1026 normalized_excl_columns,
1027 } = purified_source_exports;
1028
1029 requested_subsource_map.extend(source_exports);
1031
1032 let restore_history_id =
1036 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1037 let details = SqlServerSourceExtras { restore_history_id };
1038
1039 options.retain(|SqlServerConfigOption { name, .. }| {
1040 name != &SqlServerConfigOptionName::Details
1041 });
1042 options.push(SqlServerConfigOption {
1043 name: SqlServerConfigOptionName::Details,
1044 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1045 details.into_proto().encode_to_vec(),
1046 )))),
1047 });
1048
1049 if let Some(text_cols_option) = options
1051 .iter_mut()
1052 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1053 {
1054 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1055 }
1056 if let Some(excl_cols_option) = options
1057 .iter_mut()
1058 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1059 {
1060 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1061 }
1062 }
1063 CreateSourceConnection::MySql {
1064 connection,
1065 options,
1066 } => {
1067 let connection_item = scx.get_item_by_resolved_name(connection)?;
1068 let connection = match connection_item.connection()? {
1069 Connection::MySql(connection) => {
1070 connection.clone().into_inline_connection(&catalog)
1071 }
1072 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1073 scx.catalog.resolve_full_name(connection_item.name()),
1074 ))?,
1075 };
1076 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1077 details,
1078 text_columns,
1079 exclude_columns,
1080 seen: _,
1081 } = options.clone().try_into()?;
1082
1083 if details.is_some() {
1084 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1085 }
1086
1087 let mut conn = connection
1088 .validate(connection_item.id(), storage_configuration)
1089 .await
1090 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1091
1092 let initial_gtid_set =
1096 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1097
1098 let reference_client = SourceReferenceClient::MySql {
1099 conn: &mut conn,
1100 include_system_schemas: mysql::references_system_schemas(external_references),
1101 };
1102 retrieved_source_references = reference_client.get_source_references().await?;
1103
1104 let mysql::PurifiedSourceExports {
1105 source_exports: subsources,
1106 normalized_text_columns,
1107 normalized_exclude_columns,
1108 } = mysql::purify_source_exports(
1109 &mut conn,
1110 &retrieved_source_references,
1111 external_references,
1112 text_columns,
1113 exclude_columns,
1114 source_name,
1115 initial_gtid_set.clone(),
1116 &reference_policy,
1117 )
1118 .await?;
1119 requested_subsource_map.extend(subsources);
1120
1121 let details = MySqlSourceDetails {};
1124 options
1126 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1127 options.push(MySqlConfigOption {
1128 name: MySqlConfigOptionName::Details,
1129 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1130 details.into_proto().encode_to_vec(),
1131 )))),
1132 });
1133
1134 if let Some(text_cols_option) = options
1135 .iter_mut()
1136 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1137 {
1138 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1139 }
1140 if let Some(exclude_cols_option) = options
1141 .iter_mut()
1142 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1143 {
1144 exclude_cols_option.value =
1145 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1146 }
1147 }
1148 CreateSourceConnection::LoadGenerator { generator, options } => {
1149 let load_generator =
1150 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1151
1152 let reference_client = SourceReferenceClient::LoadGenerator {
1153 generator: &load_generator,
1154 };
1155 retrieved_source_references = reference_client.get_source_references().await?;
1156 let multi_output_sources =
1160 retrieved_source_references
1161 .all_references()
1162 .iter()
1163 .any(|r| {
1164 r.load_generator_output().expect("is loadgen")
1165 != &LoadGeneratorOutput::Default
1166 });
1167
1168 match external_references {
1169 Some(requested)
1170 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1171 {
1172 Err(PlanError::UseTablesForSources(requested.to_string()))?
1173 }
1174 Some(requested) if !multi_output_sources => match requested {
1175 ExternalReferences::SubsetTables(_) => {
1176 Err(LoadGeneratorSourcePurificationError::ForTables)?
1177 }
1178 ExternalReferences::SubsetSchemas(_) => {
1179 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1180 }
1181 ExternalReferences::All => {
1182 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1183 }
1184 },
1185 Some(requested) => {
1186 let requested_exports = retrieved_source_references
1187 .requested_source_exports(Some(requested), source_name)?;
1188 for export in requested_exports {
1189 requested_subsource_map.insert(
1190 export.name,
1191 PurifiedSourceExport {
1192 external_reference: export.external_reference,
1193 details: PurifiedExportDetails::LoadGenerator {
1194 table: export
1195 .meta
1196 .load_generator_desc()
1197 .expect("is loadgen")
1198 .clone(),
1199 output: export
1200 .meta
1201 .load_generator_output()
1202 .expect("is loadgen")
1203 .clone(),
1204 },
1205 },
1206 );
1207 }
1208 }
1209 None => {
1210 if multi_output_sources
1211 && matches!(reference_policy, SourceReferencePolicy::Required)
1212 {
1213 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1214 }
1215 }
1216 }
1217
1218 if let LoadGenerator::Clock = generator {
1219 if !options
1220 .iter()
1221 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1222 {
1223 let now = catalog.now();
1224 options.push(LoadGeneratorOption {
1225 name: LoadGeneratorOptionName::AsOf,
1226 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1227 });
1228 }
1229 }
1230 }
1231 }
1232
1233 *external_references = None;
1237
1238 let create_progress_subsource_stmt = if uses_old_syntax {
1240 let name = match progress_subsource {
1242 Some(name) => match name {
1243 DeferredItemName::Deferred(name) => name.clone(),
1244 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1245 },
1246 None => {
1247 let (item, prefix) = source_name.0.split_last().unwrap();
1248 let item_name =
1249 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1250 let mut suggested_name = prefix.to_vec();
1251 suggested_name.push(candidate.clone());
1252
1253 let partial =
1254 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1255 let qualified = scx.allocate_qualified_name(partial)?;
1256 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1257 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1258 Ok::<_, PlanError>(!item_exists && !type_exists)
1259 })?;
1260
1261 let mut full_name = prefix.to_vec();
1262 full_name.push(item_name);
1263 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1264 let qualified_name = scx.allocate_qualified_name(full_name)?;
1265 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1266
1267 UnresolvedItemName::from(full_name.clone())
1268 }
1269 };
1270
1271 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1272
1273 let mut progress_with_options: Vec<_> = with_options
1275 .iter()
1276 .filter_map(|opt| match opt.name {
1277 CreateSourceOptionName::TimestampInterval => None,
1278 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1279 name: CreateSubsourceOptionName::RetainHistory,
1280 value: opt.value.clone(),
1281 }),
1282 })
1283 .collect();
1284 progress_with_options.push(CreateSubsourceOption {
1285 name: CreateSubsourceOptionName::Progress,
1286 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1287 });
1288
1289 Some(CreateSubsourceStatement {
1290 name,
1291 columns,
1292 of_source: None,
1296 constraints,
1297 if_not_exists: false,
1298 with_options: progress_with_options,
1299 })
1300 } else {
1301 None
1302 };
1303
1304 purify_source_format(
1305 &catalog,
1306 format,
1307 &format_options,
1308 envelope,
1309 storage_configuration,
1310 )
1311 .await?;
1312
1313 Ok(PurifiedStatement::PurifiedCreateSource {
1314 create_progress_subsource_stmt,
1315 create_source_stmt,
1316 subsources: requested_subsource_map,
1317 available_source_references: retrieved_source_references.available_source_references(),
1318 })
1319}
1320
1321async fn purify_alter_source(
1324 catalog: impl SessionCatalog,
1325 stmt: AlterSourceStatement<Aug>,
1326 storage_configuration: &StorageConfiguration,
1327) -> Result<PurifiedStatement, PlanError> {
1328 let scx = StatementContext::new(None, &catalog);
1329 let AlterSourceStatement {
1330 source_name: unresolved_source_name,
1331 action,
1332 if_exists,
1333 } = stmt;
1334
1335 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1337 Ok(item) => item,
1338 Err(_) if if_exists => {
1339 return Ok(PurifiedStatement::PurifiedAlterSource {
1340 alter_source_stmt: AlterSourceStatement {
1341 source_name: unresolved_source_name,
1342 action,
1343 if_exists,
1344 },
1345 });
1346 }
1347 Err(e) => return Err(e),
1348 };
1349
1350 let desc = match item.source_desc()? {
1352 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1353 None => {
1354 sql_bail!("cannot ALTER this type of source")
1355 }
1356 };
1357
1358 let source_name = item.name();
1359
1360 let resolved_source_name = ResolvedItemName::Item {
1361 id: item.id(),
1362 qualifiers: item.name().qualifiers.clone(),
1363 full_name: scx.catalog.resolve_full_name(source_name),
1364 print_id: true,
1365 version: RelationVersionSelector::Latest,
1366 };
1367
1368 let partial_name = scx.catalog.minimal_qualification(source_name);
1369
1370 match action {
1371 AlterSourceAction::AddSubsources {
1372 external_references,
1373 options,
1374 } => {
1375 if scx.catalog.system_vars().enable_create_table_from_source()
1376 && scx.catalog.system_vars().force_source_table_syntax()
1377 {
1378 Err(PlanError::UseTablesForSources(
1379 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1380 ))?;
1381 }
1382
1383 purify_alter_source_add_subsources(
1384 external_references,
1385 options,
1386 desc,
1387 partial_name,
1388 unresolved_source_name,
1389 resolved_source_name,
1390 storage_configuration,
1391 )
1392 .await
1393 }
1394 AlterSourceAction::RefreshReferences => {
1395 purify_alter_source_refresh_references(
1396 desc,
1397 resolved_source_name,
1398 storage_configuration,
1399 )
1400 .await
1401 }
1402 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1403 alter_source_stmt: AlterSourceStatement {
1404 source_name: unresolved_source_name,
1405 action,
1406 if_exists,
1407 },
1408 }),
1409 }
1410}
1411
1412async fn purify_alter_source_add_subsources(
1415 external_references: Vec<ExternalReferenceExport>,
1416 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1417 desc: SourceDesc,
1418 partial_source_name: PartialItemName,
1419 unresolved_source_name: UnresolvedItemName,
1420 resolved_source_name: ResolvedItemName,
1421 storage_configuration: &StorageConfiguration,
1422) -> Result<PurifiedStatement, PlanError> {
1423 let connection_id = match &desc.connection {
1425 GenericSourceConnection::Postgres(c) => c.connection_id,
1426 GenericSourceConnection::MySql(c) => c.connection_id,
1427 GenericSourceConnection::SqlServer(c) => c.connection_id,
1428 _ => sql_bail!(
1429 "source {} does not support ALTER SOURCE.",
1430 partial_source_name
1431 ),
1432 };
1433
1434 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1435 text_columns,
1436 exclude_columns,
1437 details,
1438 seen: _,
1439 } = options.clone().try_into()?;
1440 if details.is_some() {
1441 sql_bail!("DETAILS option cannot be explicitly set");
1442 }
1443
1444 let mut requested_subsource_map = BTreeMap::new();
1445
1446 match desc.connection {
1447 GenericSourceConnection::Postgres(pg_source_connection) => {
1448 let pg_connection = &pg_source_connection.connection;
1450
1451 let client = pg_connection
1452 .validate(connection_id, storage_configuration)
1453 .await?;
1454
1455 let reference_client = SourceReferenceClient::Postgres {
1456 client: &client,
1457 publication: &pg_source_connection.publication,
1458 database: &pg_connection.database,
1459 };
1460 let retrieved_source_references = reference_client.get_source_references().await?;
1461
1462 let postgres::PurifiedSourceExports {
1463 source_exports: subsources,
1464 normalized_text_columns,
1465 } = postgres::purify_source_exports(
1466 &client,
1467 &retrieved_source_references,
1468 &Some(ExternalReferences::SubsetTables(external_references)),
1469 text_columns,
1470 exclude_columns,
1471 &unresolved_source_name,
1472 &SourceReferencePolicy::Required,
1473 )
1474 .await?;
1475
1476 if let Some(text_cols_option) = options
1477 .iter_mut()
1478 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1479 {
1480 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1481 }
1482
1483 requested_subsource_map.extend(subsources);
1484 }
1485 GenericSourceConnection::MySql(mysql_source_connection) => {
1486 let mysql_connection = &mysql_source_connection.connection;
1487 let config = mysql_connection
1488 .config(
1489 &storage_configuration.connection_context.secrets_reader,
1490 storage_configuration,
1491 InTask::No,
1492 )
1493 .await?;
1494
1495 let mut conn = config
1496 .connect(
1497 "mysql purification",
1498 &storage_configuration.connection_context.ssh_tunnel_manager,
1499 )
1500 .await?;
1501
1502 let initial_gtid_set =
1505 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1506
1507 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1508
1509 let reference_client = SourceReferenceClient::MySql {
1510 conn: &mut conn,
1511 include_system_schemas: mysql::references_system_schemas(&requested_references),
1512 };
1513 let retrieved_source_references = reference_client.get_source_references().await?;
1514
1515 let mysql::PurifiedSourceExports {
1516 source_exports: subsources,
1517 normalized_text_columns,
1518 normalized_exclude_columns,
1519 } = mysql::purify_source_exports(
1520 &mut conn,
1521 &retrieved_source_references,
1522 &requested_references,
1523 text_columns,
1524 exclude_columns,
1525 &unresolved_source_name,
1526 initial_gtid_set,
1527 &SourceReferencePolicy::Required,
1528 )
1529 .await?;
1530 requested_subsource_map.extend(subsources);
1531
1532 if let Some(text_cols_option) = options
1534 .iter_mut()
1535 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1536 {
1537 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1538 }
1539 if let Some(exclude_cols_option) = options
1540 .iter_mut()
1541 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1542 {
1543 exclude_cols_option.value =
1544 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1545 }
1546 }
1547 GenericSourceConnection::SqlServer(sql_server_source) => {
1548 let sql_server_connection = &sql_server_source.connection;
1550 let config = sql_server_connection
1551 .resolve_config(
1552 &storage_configuration.connection_context.secrets_reader,
1553 storage_configuration,
1554 InTask::No,
1555 )
1556 .await?;
1557 let mut client = mz_sql_server_util::Client::connect(config).await?;
1558
1559 let database = sql_server_connection.database.clone().into();
1561 let source_references = SourceReferenceClient::SqlServer {
1562 client: &mut client,
1563 database: Arc::clone(&database),
1564 }
1565 .get_source_references()
1566 .await?;
1567 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1568
1569 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1570 .get(storage_configuration.config_set());
1571
1572 let result = sql_server::purify_source_exports(
1573 &*database,
1574 &mut client,
1575 &source_references,
1576 &requested_references,
1577 &text_columns,
1578 &exclude_columns,
1579 &unresolved_source_name,
1580 timeout,
1581 &SourceReferencePolicy::Required,
1582 )
1583 .await;
1584 let sql_server::PurifiedSourceExports {
1585 source_exports,
1586 normalized_text_columns,
1587 normalized_excl_columns,
1588 } = result?;
1589
1590 requested_subsource_map.extend(source_exports);
1592
1593 if let Some(text_cols_option) = options
1595 .iter_mut()
1596 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1597 {
1598 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1599 }
1600 if let Some(exclude_cols_option) = options
1601 .iter_mut()
1602 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1603 {
1604 exclude_cols_option.value =
1605 Some(WithOptionValue::Sequence(normalized_excl_columns));
1606 }
1607 }
1608 _ => unreachable!(),
1609 };
1610
1611 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1612 source_name: resolved_source_name,
1613 options,
1614 subsources: requested_subsource_map,
1615 })
1616}
1617
1618async fn purify_alter_source_refresh_references(
1619 desc: SourceDesc,
1620 resolved_source_name: ResolvedItemName,
1621 storage_configuration: &StorageConfiguration,
1622) -> Result<PurifiedStatement, PlanError> {
1623 let retrieved_source_references = match desc.connection {
1624 GenericSourceConnection::Postgres(pg_source_connection) => {
1625 let pg_connection = &pg_source_connection.connection;
1627
1628 let config = pg_connection
1629 .config(
1630 &storage_configuration.connection_context.secrets_reader,
1631 storage_configuration,
1632 InTask::No,
1633 )
1634 .await?;
1635
1636 let client = config
1637 .connect(
1638 "postgres_purification",
1639 &storage_configuration.connection_context.ssh_tunnel_manager,
1640 )
1641 .await?;
1642 let reference_client = SourceReferenceClient::Postgres {
1643 client: &client,
1644 publication: &pg_source_connection.publication,
1645 database: &pg_connection.database,
1646 };
1647 reference_client.get_source_references().await?
1648 }
1649 GenericSourceConnection::MySql(mysql_source_connection) => {
1650 let mysql_connection = &mysql_source_connection.connection;
1651 let config = mysql_connection
1652 .config(
1653 &storage_configuration.connection_context.secrets_reader,
1654 storage_configuration,
1655 InTask::No,
1656 )
1657 .await?;
1658
1659 let mut conn = config
1660 .connect(
1661 "mysql purification",
1662 &storage_configuration.connection_context.ssh_tunnel_manager,
1663 )
1664 .await?;
1665
1666 let reference_client = SourceReferenceClient::MySql {
1667 conn: &mut conn,
1668 include_system_schemas: false,
1669 };
1670 reference_client.get_source_references().await?
1671 }
1672 GenericSourceConnection::SqlServer(sql_server_source) => {
1673 let sql_server_connection = &sql_server_source.connection;
1675 let config = sql_server_connection
1676 .resolve_config(
1677 &storage_configuration.connection_context.secrets_reader,
1678 storage_configuration,
1679 InTask::No,
1680 )
1681 .await?;
1682 let mut client = mz_sql_server_util::Client::connect(config).await?;
1683
1684 let source_references = SourceReferenceClient::SqlServer {
1686 client: &mut client,
1687 database: sql_server_connection.database.clone().into(),
1688 }
1689 .get_source_references()
1690 .await?;
1691 source_references
1692 }
1693 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1694 let reference_client = SourceReferenceClient::LoadGenerator {
1695 generator: &load_gen_connection.load_generator,
1696 };
1697 reference_client.get_source_references().await?
1698 }
1699 GenericSourceConnection::Kafka(kafka_conn) => {
1700 let reference_client = SourceReferenceClient::Kafka {
1701 topic: &kafka_conn.topic,
1702 };
1703 reference_client.get_source_references().await?
1704 }
1705 };
1706 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1707 source_name: resolved_source_name,
1708 available_source_references: retrieved_source_references.available_source_references(),
1709 })
1710}
1711
1712async fn purify_create_table_from_source(
1713 catalog: impl SessionCatalog,
1714 mut stmt: CreateTableFromSourceStatement<Aug>,
1715 storage_configuration: &StorageConfiguration,
1716) -> Result<PurifiedStatement, PlanError> {
1717 let scx = StatementContext::new(None, &catalog);
1718 let CreateTableFromSourceStatement {
1719 name: _,
1720 columns,
1721 constraints,
1722 source: source_name,
1723 if_not_exists: _,
1724 external_reference,
1725 format,
1726 envelope,
1727 include_metadata: _,
1728 with_options,
1729 } = &mut stmt;
1730
1731 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1733 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1734 }
1735 if !constraints.is_empty() {
1736 sql_bail!(
1737 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1738 );
1739 }
1740
1741 let item = match scx.get_item_by_resolved_name(source_name) {
1743 Ok(item) => item,
1744 Err(e) => return Err(e),
1745 };
1746
1747 let desc = match item.source_desc()? {
1749 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1750 None => {
1751 sql_bail!("cannot ALTER this type of source")
1752 }
1753 };
1754 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1755
1756 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1757 text_columns,
1758 exclude_columns,
1759 retain_history: _,
1760 details,
1761 partition_by: _,
1762 seen: _,
1763 } = with_options.clone().try_into()?;
1764 if details.is_some() {
1765 sql_bail!("DETAILS option cannot be explicitly set");
1766 }
1767
1768 let qualified_text_columns = text_columns
1772 .iter()
1773 .map(|col| {
1774 UnresolvedItemName(
1775 external_reference
1776 .as_ref()
1777 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1778 .unwrap_or_else(|| vec![col.clone()]),
1779 )
1780 })
1781 .collect_vec();
1782 let qualified_exclude_columns = exclude_columns
1783 .iter()
1784 .map(|col| {
1785 UnresolvedItemName(
1786 external_reference
1787 .as_ref()
1788 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1789 .unwrap_or_else(|| vec![col.clone()]),
1790 )
1791 })
1792 .collect_vec();
1793
1794 let mut format_options = SourceFormatOptions::Default;
1796
1797 let retrieved_source_references: RetrievedSourceReferences;
1798
1799 let requested_references = external_reference.as_ref().map(|ref_name| {
1800 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1801 reference: ref_name.clone(),
1802 alias: None,
1803 }])
1804 });
1805
1806 let purified_export = match desc.connection {
1809 GenericSourceConnection::Postgres(pg_source_connection) => {
1810 let pg_connection = &pg_source_connection.connection;
1812
1813 let client = pg_connection
1814 .validate(pg_source_connection.connection_id, storage_configuration)
1815 .await?;
1816
1817 let reference_client = SourceReferenceClient::Postgres {
1818 client: &client,
1819 publication: &pg_source_connection.publication,
1820 database: &pg_connection.database,
1821 };
1822 retrieved_source_references = reference_client.get_source_references().await?;
1823
1824 let postgres::PurifiedSourceExports {
1825 source_exports,
1826 normalized_text_columns: _,
1830 } = postgres::purify_source_exports(
1831 &client,
1832 &retrieved_source_references,
1833 &requested_references,
1834 qualified_text_columns,
1835 qualified_exclude_columns,
1836 &unresolved_source_name,
1837 &SourceReferencePolicy::Required,
1838 )
1839 .await?;
1840 let (_, purified_export) = source_exports.into_element();
1842 purified_export
1843 }
1844 GenericSourceConnection::MySql(mysql_source_connection) => {
1845 let mysql_connection = &mysql_source_connection.connection;
1846 let config = mysql_connection
1847 .config(
1848 &storage_configuration.connection_context.secrets_reader,
1849 storage_configuration,
1850 InTask::No,
1851 )
1852 .await?;
1853
1854 let mut conn = config
1855 .connect(
1856 "mysql purification",
1857 &storage_configuration.connection_context.ssh_tunnel_manager,
1858 )
1859 .await?;
1860
1861 let initial_gtid_set =
1864 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1865
1866 let reference_client = SourceReferenceClient::MySql {
1867 conn: &mut conn,
1868 include_system_schemas: mysql::references_system_schemas(&requested_references),
1869 };
1870 retrieved_source_references = reference_client.get_source_references().await?;
1871
1872 let mysql::PurifiedSourceExports {
1873 source_exports,
1874 normalized_text_columns: _,
1878 normalized_exclude_columns: _,
1879 } = mysql::purify_source_exports(
1880 &mut conn,
1881 &retrieved_source_references,
1882 &requested_references,
1883 qualified_text_columns,
1884 qualified_exclude_columns,
1885 &unresolved_source_name,
1886 initial_gtid_set,
1887 &SourceReferencePolicy::Required,
1888 )
1889 .await?;
1890 let (_, purified_export) = source_exports.into_element();
1892 purified_export
1893 }
1894 GenericSourceConnection::SqlServer(sql_server_source) => {
1895 let connection = sql_server_source.connection;
1896 let config = connection
1897 .resolve_config(
1898 &storage_configuration.connection_context.secrets_reader,
1899 storage_configuration,
1900 InTask::No,
1901 )
1902 .await?;
1903 let mut client = mz_sql_server_util::Client::connect(config).await?;
1904
1905 let database: Arc<str> = connection.database.into();
1906 let reference_client = SourceReferenceClient::SqlServer {
1907 client: &mut client,
1908 database: Arc::clone(&database),
1909 };
1910 retrieved_source_references = reference_client.get_source_references().await?;
1911 tracing::debug!(?retrieved_source_references, "got source references");
1912
1913 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1914 .get(storage_configuration.config_set());
1915
1916 let purified_source_exports = sql_server::purify_source_exports(
1917 &*database,
1918 &mut client,
1919 &retrieved_source_references,
1920 &requested_references,
1921 &qualified_text_columns,
1922 &qualified_exclude_columns,
1923 &unresolved_source_name,
1924 timeout,
1925 &SourceReferencePolicy::Required,
1926 )
1927 .await?;
1928
1929 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1931 purified_export
1932 }
1933 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1934 let reference_client = SourceReferenceClient::LoadGenerator {
1935 generator: &load_gen_connection.load_generator,
1936 };
1937 retrieved_source_references = reference_client.get_source_references().await?;
1938
1939 let requested_exports = retrieved_source_references
1940 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1941 let export = requested_exports.into_element();
1943 PurifiedSourceExport {
1944 external_reference: export.external_reference,
1945 details: PurifiedExportDetails::LoadGenerator {
1946 table: export
1947 .meta
1948 .load_generator_desc()
1949 .expect("is loadgen")
1950 .clone(),
1951 output: export
1952 .meta
1953 .load_generator_output()
1954 .expect("is loadgen")
1955 .clone(),
1956 },
1957 }
1958 }
1959 GenericSourceConnection::Kafka(kafka_conn) => {
1960 let reference_client = SourceReferenceClient::Kafka {
1961 topic: &kafka_conn.topic,
1962 };
1963 retrieved_source_references = reference_client.get_source_references().await?;
1964 let requested_exports = retrieved_source_references
1965 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1966 let export = requested_exports.into_element();
1968
1969 format_options = SourceFormatOptions::Kafka {
1970 topic: kafka_conn.topic.clone(),
1971 };
1972 PurifiedSourceExport {
1973 external_reference: export.external_reference,
1974 details: PurifiedExportDetails::Kafka {},
1975 }
1976 }
1977 };
1978
1979 purify_source_format(
1980 &catalog,
1981 format,
1982 &format_options,
1983 envelope,
1984 storage_configuration,
1985 )
1986 .await?;
1987
1988 *external_reference = Some(purified_export.external_reference.clone());
1991
1992 match &purified_export.details {
1994 PurifiedExportDetails::Postgres { .. } => {
1995 let mut unsupported_cols = vec![];
1996 let postgres::PostgresExportStatementValues {
1997 columns: gen_columns,
1998 constraints: gen_constraints,
1999 text_columns: gen_text_columns,
2000 exclude_columns: gen_exclude_columns,
2001 details: gen_details,
2002 external_reference: _,
2003 } = postgres::generate_source_export_statement_values(
2004 &scx,
2005 purified_export,
2006 &mut unsupported_cols,
2007 )?;
2008 if !unsupported_cols.is_empty() {
2009 unsupported_cols.sort();
2010 Err(PgSourcePurificationError::UnrecognizedTypes {
2011 cols: unsupported_cols,
2012 })?;
2013 }
2014
2015 if let Some(text_cols_option) = with_options
2016 .iter_mut()
2017 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2018 {
2019 match gen_text_columns {
2020 Some(gen_text_columns) => {
2021 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2022 }
2023 None => soft_panic_or_log!(
2024 "text_columns should be Some if text_cols_option is present"
2025 ),
2026 }
2027 }
2028 if let Some(exclude_cols_option) = with_options
2029 .iter_mut()
2030 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2031 {
2032 match gen_exclude_columns {
2033 Some(gen_exclude_columns) => {
2034 exclude_cols_option.value =
2035 Some(WithOptionValue::Sequence(gen_exclude_columns))
2036 }
2037 None => soft_panic_or_log!(
2038 "exclude_columns should be Some if exclude_cols_option is present"
2039 ),
2040 }
2041 }
2042 match columns {
2043 TableFromSourceColumns::Defined(_) => unreachable!(),
2044 TableFromSourceColumns::NotSpecified => {
2045 *columns = TableFromSourceColumns::Defined(gen_columns);
2046 *constraints = gen_constraints;
2047 }
2048 TableFromSourceColumns::Named(_) => {
2049 sql_bail!("columns cannot be named for Postgres sources")
2050 }
2051 }
2052 with_options.push(TableFromSourceOption {
2053 name: TableFromSourceOptionName::Details,
2054 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2055 gen_details.into_proto().encode_to_vec(),
2056 )))),
2057 })
2058 }
2059 PurifiedExportDetails::MySql { .. } => {
2060 let mysql::MySqlExportStatementValues {
2061 columns: gen_columns,
2062 constraints: gen_constraints,
2063 text_columns: gen_text_columns,
2064 exclude_columns: gen_exclude_columns,
2065 details: gen_details,
2066 external_reference: _,
2067 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2068
2069 if let Some(text_cols_option) = with_options
2070 .iter_mut()
2071 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2072 {
2073 match gen_text_columns {
2074 Some(gen_text_columns) => {
2075 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2076 }
2077 None => soft_panic_or_log!(
2078 "text_columns should be Some if text_cols_option is present"
2079 ),
2080 }
2081 }
2082 if let Some(exclude_cols_option) = with_options
2083 .iter_mut()
2084 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2085 {
2086 match gen_exclude_columns {
2087 Some(gen_exclude_columns) => {
2088 exclude_cols_option.value =
2089 Some(WithOptionValue::Sequence(gen_exclude_columns))
2090 }
2091 None => soft_panic_or_log!(
2092 "exclude_columns should be Some if exclude_cols_option is present"
2093 ),
2094 }
2095 }
2096 match columns {
2097 TableFromSourceColumns::Defined(_) => unreachable!(),
2098 TableFromSourceColumns::NotSpecified => {
2099 *columns = TableFromSourceColumns::Defined(gen_columns);
2100 *constraints = gen_constraints;
2101 }
2102 TableFromSourceColumns::Named(_) => {
2103 sql_bail!("columns cannot be named for MySQL sources")
2104 }
2105 }
2106 with_options.push(TableFromSourceOption {
2107 name: TableFromSourceOptionName::Details,
2108 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2109 gen_details.into_proto().encode_to_vec(),
2110 )))),
2111 })
2112 }
2113 PurifiedExportDetails::SqlServer { .. } => {
2114 let sql_server::SqlServerExportStatementValues {
2115 columns: gen_columns,
2116 constraints: gen_constraints,
2117 text_columns: gen_text_columns,
2118 excl_columns: gen_excl_columns,
2119 details: gen_details,
2120 external_reference: _,
2121 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2122
2123 if let Some(text_cols_option) = with_options
2124 .iter_mut()
2125 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2126 {
2127 match gen_text_columns {
2128 Some(gen_text_columns) => {
2129 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2130 }
2131 None => soft_panic_or_log!(
2132 "text_columns should be Some if text_cols_option is present"
2133 ),
2134 }
2135 }
2136 if let Some(exclude_cols_option) = with_options
2137 .iter_mut()
2138 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2139 {
2140 match gen_excl_columns {
2141 Some(gen_excl_columns) => {
2142 exclude_cols_option.value =
2143 Some(WithOptionValue::Sequence(gen_excl_columns))
2144 }
2145 None => soft_panic_or_log!(
2146 "excl_columns should be Some if excl_cols_option is present"
2147 ),
2148 }
2149 }
2150
2151 match columns {
2152 TableFromSourceColumns::NotSpecified => {
2153 *columns = TableFromSourceColumns::Defined(gen_columns);
2154 *constraints = gen_constraints;
2155 }
2156 TableFromSourceColumns::Named(_) => {
2157 sql_bail!("columns cannot be named for SQL Server sources")
2158 }
2159 TableFromSourceColumns::Defined(_) => unreachable!(),
2160 }
2161
2162 with_options.push(TableFromSourceOption {
2163 name: TableFromSourceOptionName::Details,
2164 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2165 gen_details.into_proto().encode_to_vec(),
2166 )))),
2167 })
2168 }
2169 PurifiedExportDetails::LoadGenerator { .. } => {
2170 let (desc, output) = match purified_export.details {
2171 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2172 _ => unreachable!("purified export details must be load generator"),
2173 };
2174 if let Some(desc) = desc {
2179 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2180 match columns {
2181 TableFromSourceColumns::Defined(_) => unreachable!(),
2182 TableFromSourceColumns::NotSpecified => {
2183 *columns = TableFromSourceColumns::Defined(gen_columns);
2184 *constraints = gen_constraints;
2185 }
2186 TableFromSourceColumns::Named(_) => {
2187 sql_bail!("columns cannot be named for multi-output load generator sources")
2188 }
2189 }
2190 }
2191 let details = SourceExportStatementDetails::LoadGenerator { output };
2192 with_options.push(TableFromSourceOption {
2193 name: TableFromSourceOptionName::Details,
2194 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2195 details.into_proto().encode_to_vec(),
2196 )))),
2197 })
2198 }
2199 PurifiedExportDetails::Kafka {} => {
2200 let details = SourceExportStatementDetails::Kafka {};
2204 with_options.push(TableFromSourceOption {
2205 name: TableFromSourceOptionName::Details,
2206 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2207 details.into_proto().encode_to_vec(),
2208 )))),
2209 })
2210 }
2211 };
2212
2213 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2217}
2218
2219enum SourceFormatOptions {
2220 Default,
2221 Kafka { topic: String },
2222}
2223
2224async fn purify_source_format(
2225 catalog: &dyn SessionCatalog,
2226 format: &mut Option<FormatSpecifier<Aug>>,
2227 options: &SourceFormatOptions,
2228 envelope: &Option<SourceEnvelope>,
2229 storage_configuration: &StorageConfiguration,
2230) -> Result<(), PlanError> {
2231 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2232 && !matches!(options, SourceFormatOptions::Kafka { .. })
2233 {
2234 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2235 }
2236
2237 match format.as_mut() {
2238 None => {}
2239 Some(FormatSpecifier::Bare(format)) => {
2240 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2241 .await?;
2242 }
2243
2244 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2245 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2246 .await?;
2247 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2248 .await?;
2249 }
2250 }
2251 Ok(())
2252}
2253
2254async fn purify_source_format_single(
2255 catalog: &dyn SessionCatalog,
2256 format: &mut Format<Aug>,
2257 options: &SourceFormatOptions,
2258 envelope: &Option<SourceEnvelope>,
2259 storage_configuration: &StorageConfiguration,
2260) -> Result<(), PlanError> {
2261 match format {
2262 Format::Avro(schema) => match schema {
2263 AvroSchema::Csr { csr_connection } => {
2264 purify_csr_connection_avro(
2265 catalog,
2266 options,
2267 csr_connection,
2268 envelope,
2269 storage_configuration,
2270 )
2271 .await?
2272 }
2273 AvroSchema::InlineSchema { .. } => {}
2274 },
2275 Format::Protobuf(schema) => match schema {
2276 ProtobufSchema::Csr { csr_connection } => {
2277 purify_csr_connection_proto(
2278 catalog,
2279 options,
2280 csr_connection,
2281 envelope,
2282 storage_configuration,
2283 )
2284 .await?;
2285 }
2286 ProtobufSchema::InlineSchema { .. } => {}
2287 },
2288 Format::Bytes
2289 | Format::Regex(_)
2290 | Format::Json { .. }
2291 | Format::Text
2292 | Format::Csv { .. } => (),
2293 }
2294 Ok(())
2295}
2296
2297pub fn generate_subsource_statements(
2298 scx: &StatementContext,
2299 source_name: ResolvedItemName,
2300 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2301) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2302 if subsources.is_empty() {
2304 return Ok(vec![]);
2305 }
2306 let (_, purified_export) = subsources.iter().next().unwrap();
2307
2308 let statements = match &purified_export.details {
2309 PurifiedExportDetails::Postgres { .. } => {
2310 crate::pure::postgres::generate_create_subsource_statements(
2311 scx,
2312 source_name,
2313 subsources,
2314 )?
2315 }
2316 PurifiedExportDetails::MySql { .. } => {
2317 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2318 }
2319 PurifiedExportDetails::SqlServer { .. } => {
2320 crate::pure::sql_server::generate_create_subsource_statements(
2321 scx,
2322 source_name,
2323 subsources,
2324 )?
2325 }
2326 PurifiedExportDetails::LoadGenerator { .. } => {
2327 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2328 for (subsource_name, purified_export) in subsources {
2329 let (desc, output) = match purified_export.details {
2330 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2331 _ => unreachable!("purified export details must be load generator"),
2332 };
2333 let desc =
2334 desc.expect("subsources cannot be generated for single-output load generators");
2335
2336 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2337 let details = SourceExportStatementDetails::LoadGenerator { output };
2338 let subsource = CreateSubsourceStatement {
2340 name: subsource_name,
2341 columns,
2342 of_source: Some(source_name.clone()),
2343 constraints: table_constraints,
2348 if_not_exists: false,
2349 with_options: vec![
2350 CreateSubsourceOption {
2351 name: CreateSubsourceOptionName::ExternalReference,
2352 value: Some(WithOptionValue::UnresolvedItemName(
2353 purified_export.external_reference,
2354 )),
2355 },
2356 CreateSubsourceOption {
2357 name: CreateSubsourceOptionName::Details,
2358 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2359 details.into_proto().encode_to_vec(),
2360 )))),
2361 },
2362 ],
2363 };
2364 subsource_stmts.push(subsource);
2365 }
2366
2367 subsource_stmts
2368 }
2369 PurifiedExportDetails::Kafka { .. } => {
2370 assert!(
2374 subsources.is_empty(),
2375 "Kafka sources do not produce data-bearing subsources"
2376 );
2377 vec![]
2378 }
2379 };
2380 Ok(statements)
2381}
2382
2383async fn purify_csr_connection_proto(
2384 catalog: &dyn SessionCatalog,
2385 options: &SourceFormatOptions,
2386 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2387 envelope: &Option<SourceEnvelope>,
2388 storage_configuration: &StorageConfiguration,
2389) -> Result<(), PlanError> {
2390 let SourceFormatOptions::Kafka { topic } = options else {
2391 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2392 };
2393
2394 let CsrConnectionProtobuf {
2395 seed,
2396 connection: CsrConnection {
2397 connection,
2398 options: _,
2399 },
2400 } = csr_connection;
2401 match seed {
2402 None => {
2403 let scx = StatementContext::new(None, &*catalog);
2404
2405 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2406 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2407 _ => sql_bail!("{} is not a schema registry connection", connection),
2408 };
2409
2410 let ccsr_client = ccsr_connection
2411 .connect(storage_configuration, InTask::No)
2412 .await
2413 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2414
2415 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2416 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2417 .await
2418 .ok();
2419
2420 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2421 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2422 }
2423
2424 *seed = Some(CsrSeedProtobuf { value, key });
2425 }
2426 Some(_) => (),
2427 }
2428
2429 Ok(())
2430}
2431
2432async fn purify_csr_connection_avro(
2433 catalog: &dyn SessionCatalog,
2434 options: &SourceFormatOptions,
2435 csr_connection: &mut CsrConnectionAvro<Aug>,
2436 envelope: &Option<SourceEnvelope>,
2437 storage_configuration: &StorageConfiguration,
2438) -> Result<(), PlanError> {
2439 let SourceFormatOptions::Kafka { topic } = options else {
2440 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2441 };
2442
2443 let CsrConnectionAvro {
2444 connection: CsrConnection { connection, .. },
2445 seed,
2446 key_strategy,
2447 value_strategy,
2448 } = csr_connection;
2449 if seed.is_none() {
2450 let scx = StatementContext::new(None, &*catalog);
2451 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2452 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2453 _ => sql_bail!("{} is not a schema registry connection", connection),
2454 };
2455 let ccsr_client = csr_connection
2456 .connect(storage_configuration, InTask::No)
2457 .await
2458 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2459
2460 let Schema {
2461 key_schema,
2462 value_schema,
2463 key_reference_schemas,
2464 value_reference_schemas,
2465 } = get_remote_csr_schema(
2466 &ccsr_client,
2467 key_strategy.clone().unwrap_or_default(),
2468 value_strategy.clone().unwrap_or_default(),
2469 topic,
2470 )
2471 .await?;
2472 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2473 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2474 }
2475
2476 *seed = Some(CsrSeedAvro {
2477 key_schema,
2478 value_schema,
2479 key_reference_schemas,
2480 value_reference_schemas,
2481 })
2482 }
2483
2484 Ok(())
2485}
2486
2487#[derive(Debug)]
2488pub struct Schema {
2489 pub key_schema: Option<String>,
2490 pub value_schema: String,
2491 pub key_reference_schemas: Vec<String>,
2493 pub value_reference_schemas: Vec<String>,
2495}
2496
2497struct SchemaWithReferences {
2499 schema: String,
2501 references: Vec<String>,
2503}
2504
2505async fn get_schema_with_strategy(
2506 client: &Client,
2507 strategy: ReaderSchemaSelectionStrategy,
2508 subject: &str,
2509) -> Result<Option<SchemaWithReferences>, PlanError> {
2510 match strategy {
2511 ReaderSchemaSelectionStrategy::Latest => {
2512 match client.get_subject_and_references(subject).await {
2514 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2515 schema: primary.schema.raw,
2516 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2517 })),
2518 Err(GetBySubjectError::SubjectNotFound)
2519 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2520 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2521 schema_lookup: format!("subject {}", subject.quoted()),
2522 cause: Arc::new(e),
2523 }),
2524 }
2525 }
2526 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2530 schema: raw,
2531 references: vec![],
2532 })),
2533 ReaderSchemaSelectionStrategy::ById(id) => {
2534 match client.get_subject_and_references_by_id(id).await {
2535 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2536 schema: primary.schema.raw,
2537 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2538 })),
2539 Err(GetBySubjectError::SubjectNotFound)
2540 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2541 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2542 schema_lookup: format!("subject {}", subject.quoted()),
2543 cause: Arc::new(e),
2544 }),
2545 }
2546 }
2547 }
2548}
2549
2550async fn get_remote_csr_schema(
2551 ccsr_client: &mz_ccsr::Client,
2552 key_strategy: ReaderSchemaSelectionStrategy,
2553 value_strategy: ReaderSchemaSelectionStrategy,
2554 topic: &str,
2555) -> Result<Schema, PlanError> {
2556 let value_schema_name = format!("{}-value", topic);
2557 let value_result =
2558 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2559 let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2560
2561 let key_subject = format!("{}-key", topic);
2562 let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2563 Ok(Schema {
2564 key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2565 value_schema: value_result.schema,
2566 key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2567 value_reference_schemas: value_result.references,
2568 })
2569}
2570
2571async fn compile_proto(
2573 subject_name: &String,
2574 ccsr_client: &Client,
2575) -> Result<CsrSeedProtobufSchema, PlanError> {
2576 let (primary_subject, dependency_subjects) = ccsr_client
2577 .get_subject_and_references(subject_name)
2578 .await
2579 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2580 schema_lookup: format!("subject {}", subject_name.quoted()),
2581 cause: Arc::new(e),
2582 })?;
2583
2584 let mut source_tree = VirtualSourceTree::new();
2586
2587 source_tree.as_mut().map_well_known_types();
2591
2592 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2593 source_tree.as_mut().add_file(
2594 Path::new(&subject.name),
2595 subject.schema.raw.as_bytes().to_vec(),
2596 );
2597 }
2598 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2599 let fds = db
2600 .as_mut()
2601 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2602 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2603
2604 let primary_fd = fds.file(0);
2606 let message_name = match primary_fd.message_type_size() {
2607 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2608 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2609 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2610 };
2611
2612 let bytes = &fds
2614 .serialize()
2615 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2616 let mut schema = String::new();
2617 strconv::format_bytes(&mut schema, bytes);
2618
2619 Ok(CsrSeedProtobufSchema {
2620 schema,
2621 message_name,
2622 })
2623}
2624
2625const MZ_NOW_NAME: &str = "mz_now";
2626const MZ_NOW_SCHEMA: &str = "mz_catalog";
2627
2628pub fn purify_create_materialized_view_options(
2634 catalog: impl SessionCatalog,
2635 mz_now: Option<Timestamp>,
2636 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2637 resolved_ids: &mut ResolvedIds,
2638) {
2639 let (mz_now_id, mz_now_expr) = {
2642 let item = catalog
2643 .resolve_function(&PartialItemName {
2644 database: None,
2645 schema: Some(MZ_NOW_SCHEMA.to_string()),
2646 item: MZ_NOW_NAME.to_string(),
2647 })
2648 .expect("we should be able to resolve mz_now");
2649 (
2650 item.id(),
2651 Expr::Function(Function {
2652 name: ResolvedItemName::Item {
2653 id: item.id(),
2654 qualifiers: item.name().qualifiers.clone(),
2655 full_name: catalog.resolve_full_name(item.name()),
2656 print_id: false,
2657 version: RelationVersionSelector::Latest,
2658 },
2659 args: FunctionArgs::Args {
2660 args: Vec::new(),
2661 order_by: Vec::new(),
2662 },
2663 filter: None,
2664 over: None,
2665 distinct: false,
2666 }),
2667 )
2668 };
2669 let (mz_timestamp_id, mz_timestamp_type) = {
2671 let item = catalog.get_system_type("mz_timestamp");
2672 let full_name = catalog.resolve_full_name(item.name());
2673 (
2674 item.id(),
2675 ResolvedDataType::Named {
2676 id: item.id(),
2677 qualifiers: item.name().qualifiers.clone(),
2678 full_name,
2679 modifiers: vec![],
2680 print_id: true,
2681 },
2682 )
2683 };
2684
2685 let mut introduced_mz_timestamp = false;
2686
2687 for option in cmvs.with_options.iter_mut() {
2688 if matches!(
2690 option.value,
2691 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2692 ) {
2693 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2694 RefreshAtOptionValue {
2695 time: mz_now_expr.clone(),
2696 },
2697 )));
2698 }
2699
2700 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2702 RefreshEveryOptionValue { aligned_to, .. },
2703 ))) = &mut option.value
2704 {
2705 if aligned_to.is_none() {
2706 *aligned_to = Some(mz_now_expr.clone());
2707 }
2708 }
2709
2710 match &mut option.value {
2713 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2714 time,
2715 }))) => {
2716 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2717 visitor.visit_expr_mut(time);
2718 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2719 }
2720 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2721 RefreshEveryOptionValue {
2722 interval: _,
2723 aligned_to: Some(aligned_to),
2724 },
2725 ))) => {
2726 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2727 visitor.visit_expr_mut(aligned_to);
2728 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2729 }
2730 _ => {}
2731 }
2732 }
2733
2734 if !cmvs.with_options.iter().any(|o| {
2736 matches!(
2737 o,
2738 MaterializedViewOption {
2739 value: Some(WithOptionValue::Refresh(..)),
2740 ..
2741 }
2742 )
2743 }) {
2744 cmvs.with_options.push(MaterializedViewOption {
2745 name: MaterializedViewOptionName::Refresh,
2746 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2747 })
2748 }
2749
2750 if introduced_mz_timestamp {
2754 resolved_ids.add_item(mz_timestamp_id);
2755 }
2756 let mut visitor = ExprContainsTemporalVisitor::new();
2760 visitor.visit_create_materialized_view_statement(cmvs);
2761 if !visitor.contains_temporal {
2762 resolved_ids.remove_item(&mz_now_id);
2763 }
2764}
2765
2766pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2769 match &mvo.value {
2770 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2771 let mut visitor = ExprContainsTemporalVisitor::new();
2772 visitor.visit_expr(time);
2773 visitor.contains_temporal
2774 }
2775 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2776 interval: _,
2777 aligned_to: Some(aligned_to),
2778 }))) => {
2779 let mut visitor = ExprContainsTemporalVisitor::new();
2780 visitor.visit_expr(aligned_to);
2781 visitor.contains_temporal
2782 }
2783 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2784 interval: _,
2785 aligned_to: None,
2786 }))) => {
2787 true
2790 }
2791 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2792 true
2794 }
2795 _ => false,
2796 }
2797}
2798
2799struct ExprContainsTemporalVisitor {
2801 pub contains_temporal: bool,
2802}
2803
2804impl ExprContainsTemporalVisitor {
2805 pub fn new() -> ExprContainsTemporalVisitor {
2806 ExprContainsTemporalVisitor {
2807 contains_temporal: false,
2808 }
2809 }
2810}
2811
2812impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2813 fn visit_function(&mut self, func: &Function<Aug>) {
2814 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2815 visit_function(self, func);
2816 }
2817}
2818
2819struct MzNowPurifierVisitor {
2820 pub mz_now: Option<Timestamp>,
2821 pub mz_timestamp_type: ResolvedDataType,
2822 pub introduced_mz_timestamp: bool,
2823}
2824
2825impl MzNowPurifierVisitor {
2826 pub fn new(
2827 mz_now: Option<Timestamp>,
2828 mz_timestamp_type: ResolvedDataType,
2829 ) -> MzNowPurifierVisitor {
2830 MzNowPurifierVisitor {
2831 mz_now,
2832 mz_timestamp_type,
2833 introduced_mz_timestamp: false,
2834 }
2835 }
2836}
2837
2838impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2839 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2840 match expr {
2841 Expr::Function(Function {
2842 name:
2843 ResolvedItemName::Item {
2844 full_name: FullItemName { item, .. },
2845 ..
2846 },
2847 ..
2848 }) if item == &MZ_NOW_NAME.to_string() => {
2849 let mz_now = self.mz_now.expect(
2850 "we should have chosen a timestamp if the expression contains mz_now()",
2851 );
2852 *expr = Expr::Cast {
2855 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2856 data_type: self.mz_timestamp_type.clone(),
2857 };
2858 self.introduced_mz_timestamp = true;
2859 }
2860 _ => visit_expr_mut(self, expr),
2861 }
2862 }
2863}