1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetBySubjectError};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::str::StrExt;
33use mz_postgres_util::desc::PostgresTableDesc;
34use mz_proto::RustType;
35use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
36use mz_sql_parser::ast::display::AstDisplay;
37use mz_sql_parser::ast::visit::{Visit, visit_function};
38use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
39use mz_sql_parser::ast::{
40 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
41 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
42 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
43 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
44 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
45 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
46 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
47 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
48 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
49 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
50 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
51 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
52};
53use mz_sql_server_util::desc::SqlServerTableDesc;
54use mz_storage_types::configuration::StorageConfiguration;
55use mz_storage_types::connections::Connection;
56use mz_storage_types::connections::inline::IntoInlineConnection;
57use mz_storage_types::errors::ContextCreationError;
58use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
59use mz_storage_types::sources::mysql::MySqlSourceDetails;
60use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
61use mz_storage_types::sources::{
62 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
63};
64use prost::Message;
65use protobuf_native::MessageLite;
66use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
67use rdkafka::admin::AdminClient;
68use references::{RetrievedSourceReferences, SourceReferenceClient};
69use uuid::Uuid;
70
71use crate::ast::{
72 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
73 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
74 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
75};
76use crate::catalog::{CatalogItemType, SessionCatalog};
77use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
78use crate::names::{
79 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
80 ResolvedItemName,
81};
82use crate::plan::error::PlanError;
83use crate::plan::statement::ddl::load_generator_ast_to_generator;
84use crate::plan::{SourceReferences, StatementContext};
85use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
86use crate::pure::mysql::{ensure_binlog_full_metadata, is_binlog_full_metadata};
87use crate::{kafka_util, normalize};
88
89use self::error::{
90 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
91 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
92};
93
94pub(crate) mod error;
95mod references;
96
97pub mod mysql;
98pub mod postgres;
99pub mod sql_server;
100
101pub(crate) struct RequestedSourceExport<T> {
102 external_reference: UnresolvedItemName,
103 name: UnresolvedItemName,
104 meta: T,
105}
106
107impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("RequestedSourceExport")
110 .field("external_reference", &self.external_reference)
111 .field("name", &self.name)
112 .field("meta", &self.meta)
113 .finish()
114 }
115}
116
117impl<T> RequestedSourceExport<T> {
118 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
119 RequestedSourceExport {
120 external_reference: self.external_reference,
121 name: self.name,
122 meta: new_meta,
123 }
124 }
125}
126
127fn source_export_name_gen(
132 source_name: &UnresolvedItemName,
133 subsource_name: &str,
134) -> Result<UnresolvedItemName, PlanError> {
135 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
136 partial.item = subsource_name.to_string();
137 Ok(UnresolvedItemName::from(partial))
138}
139
140fn validate_source_export_names<T>(
144 requested_source_exports: &[RequestedSourceExport<T>],
145) -> Result<(), PlanError> {
146 if let Some(name) = requested_source_exports
150 .iter()
151 .map(|subsource| &subsource.name)
152 .duplicates()
153 .next()
154 .cloned()
155 {
156 let mut upstream_references: Vec<_> = requested_source_exports
157 .into_iter()
158 .filter_map(|subsource| {
159 if &subsource.name == &name {
160 Some(subsource.external_reference.clone())
161 } else {
162 None
163 }
164 })
165 .collect();
166
167 upstream_references.sort();
168
169 Err(PlanError::SubsourceNameConflict {
170 name,
171 upstream_references,
172 })?;
173 }
174
175 if let Some(name) = requested_source_exports
181 .iter()
182 .map(|export| &export.external_reference)
183 .duplicates()
184 .next()
185 .cloned()
186 {
187 let mut target_names: Vec<_> = requested_source_exports
188 .into_iter()
189 .filter_map(|export| {
190 if &export.external_reference == &name {
191 Some(export.name.clone())
192 } else {
193 None
194 }
195 })
196 .collect();
197
198 target_names.sort();
199
200 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
201 }
202
203 Ok(())
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub enum PurifiedStatement {
208 PurifiedCreateSource {
209 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
211 create_source_stmt: CreateSourceStatement<Aug>,
212 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
214 available_source_references: SourceReferences,
217 },
218 PurifiedAlterSource {
219 alter_source_stmt: AlterSourceStatement<Aug>,
220 },
221 PurifiedAlterSourceAddSubsources {
222 source_name: ResolvedItemName,
224 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
227 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
229 },
230 PurifiedAlterSourceRefreshReferences {
231 source_name: ResolvedItemName,
232 available_source_references: SourceReferences,
234 },
235 PurifiedCreateSink(CreateSinkStatement<Aug>),
236 PurifiedCreateTableFromSource {
237 stmt: CreateTableFromSourceStatement<Aug>,
238 },
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct PurifiedSourceExport {
243 pub external_reference: UnresolvedItemName,
244 pub details: PurifiedExportDetails,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum PurifiedExportDetails {
249 MySql {
250 table: MySqlTableDesc,
251 text_columns: Option<Vec<Ident>>,
252 exclude_columns: Option<Vec<Ident>>,
253 initial_gtid_set: String,
254 binlog_full_metadata: bool,
255 },
256 Postgres {
257 table: PostgresTableDesc,
258 text_columns: Option<Vec<Ident>>,
259 exclude_columns: Option<Vec<Ident>>,
260 },
261 SqlServer {
262 table: SqlServerTableDesc,
263 text_columns: Option<Vec<Ident>>,
264 excl_columns: Option<Vec<Ident>>,
265 capture_instance: Arc<str>,
266 initial_lsn: mz_sql_server_util::cdc::Lsn,
267 },
268 Kafka {},
269 LoadGenerator {
270 table: Option<RelationDesc>,
271 output: LoadGeneratorOutput,
272 },
273}
274
275pub async fn purify_statement(
285 catalog: impl SessionCatalog,
286 now: u64,
287 stmt: Statement<Aug>,
288 storage_configuration: &StorageConfiguration,
289) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
290 match stmt {
291 Statement::CreateSource(stmt) => {
292 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
293 (
294 purify_create_source(catalog, now, stmt, storage_configuration).await,
295 cluster_id,
296 )
297 }
298 Statement::AlterSource(stmt) => (
299 purify_alter_source(catalog, stmt, storage_configuration).await,
300 None,
301 ),
302 Statement::CreateSink(stmt) => {
303 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
304 (
305 purify_create_sink(catalog, stmt, storage_configuration).await,
306 cluster_id,
307 )
308 }
309 Statement::CreateTableFromSource(stmt) => (
310 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
311 None,
312 ),
313 o => (
314 Err(internal_err!(
315 "unexpected statement type in purification: {:?}",
316 o
317 )),
318 None,
319 ),
320 }
321}
322
323pub(crate) fn purify_create_sink_avro_doc_on_options(
328 catalog: &dyn SessionCatalog,
329 from_id: CatalogItemId,
330 format: &mut Option<FormatSpecifier<Aug>>,
331) -> Result<(), PlanError> {
332 let from = catalog.get_item(&from_id);
334 let object_ids = from
335 .references()
336 .items()
337 .copied()
338 .chain_one(from.id())
339 .collect::<Vec<_>>();
340
341 let mut avro_format_options = vec![];
344 for_each_format(format, |doc_on_schema, fmt| match fmt {
345 Format::Avro(AvroSchema::InlineSchema { .. })
346 | Format::Bytes
347 | Format::Csv { .. }
348 | Format::Json { .. }
349 | Format::Protobuf(..)
350 | Format::Regex(..)
351 | Format::Text => (),
352 Format::Avro(AvroSchema::Csr {
353 csr_connection: CsrConnectionAvro { connection, .. },
354 }) => {
355 avro_format_options.push((doc_on_schema, &mut connection.options));
356 }
357 });
358
359 for (for_schema, options) in avro_format_options {
362 let user_provided_comments = options
363 .iter()
364 .filter_map(|CsrConfigOption { name, .. }| match name {
365 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
366 _ => None,
367 })
368 .collect::<BTreeSet<_>>();
369
370 for object_id in &object_ids {
372 let item = catalog
374 .get_item(object_id)
375 .at_version(RelationVersionSelector::Latest);
376 let full_resolved_name = ResolvedItemName::Item {
377 id: *object_id,
378 qualifiers: item.name().qualifiers.clone(),
379 full_name: catalog.resolve_full_name(item.name()),
380 print_id: !matches!(
381 item.item_type(),
382 CatalogItemType::Func | CatalogItemType::Type
383 ),
384 version: RelationVersionSelector::Latest,
385 };
386
387 if let Some(comments_map) = catalog.get_item_comments(object_id) {
388 let doc_on_item_key = AvroDocOn {
391 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
392 for_schema,
393 };
394 if !user_provided_comments.contains(&doc_on_item_key) {
395 if let Some(root_comment) = comments_map.get(&None) {
396 options.push(CsrConfigOption {
397 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
398 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
399 root_comment.clone(),
400 ))),
401 });
402 }
403 }
404
405 let column_descs = match item.type_details() {
409 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
410 None => item.relation_desc().map(|d| d.into_owned()),
411 };
412
413 if let Some(desc) = column_descs {
414 for (pos, column_name) in desc.iter_names().enumerate() {
415 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
416 let doc_on_column_key = AvroDocOn {
417 identifier: DocOnIdentifier::Column(ColumnName {
418 relation: full_resolved_name.clone(),
419 column: ResolvedColumnReference::Column {
420 name: column_name.to_owned(),
421 index: pos,
422 },
423 }),
424 for_schema,
425 };
426 if !user_provided_comments.contains(&doc_on_column_key) {
427 options.push(CsrConfigOption {
428 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
429 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
430 Value::String(comment_str.clone()),
431 )),
432 });
433 }
434 }
435 }
436 }
437 }
438 }
439 }
440
441 Ok(())
442}
443
444async fn purify_create_sink(
447 catalog: impl SessionCatalog,
448 mut create_sink_stmt: CreateSinkStatement<Aug>,
449 storage_configuration: &StorageConfiguration,
450) -> Result<PurifiedStatement, PlanError> {
451 let CreateSinkStatement {
453 connection,
454 format,
455 with_options,
456 name: _,
457 in_cluster: _,
458 if_not_exists: _,
459 from,
460 envelope: _,
461 mode: _,
462 } = &mut create_sink_stmt;
463
464 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
466 CreateSinkOptionName::Snapshot,
467 CreateSinkOptionName::CommitInterval,
468 ];
469
470 if let Some(op) = with_options
471 .iter()
472 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
473 {
474 sql_bail!(
475 "CREATE SINK...WITH ({}..) is not allowed",
476 op.name.to_ast_string_simple(),
477 )
478 }
479
480 match &connection {
481 CreateSinkConnection::Kafka {
482 connection,
483 options,
484 key: _,
485 headers: _,
486 } => {
487 let scx = StatementContext::new(None, &catalog);
492 let connection = {
493 let item = scx.get_item_by_resolved_name(connection)?;
494 match item.connection()? {
496 Connection::Kafka(connection) => {
497 connection.clone().into_inline_connection(scx.catalog)
498 }
499 _ => sql_bail!(
500 "{} is not a kafka connection",
501 scx.catalog.resolve_full_name(item.name())
502 ),
503 }
504 };
505
506 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
507
508 if extracted_options.legacy_ids == Some(true) {
509 sql_bail!("LEGACY IDs option is not supported");
510 }
511
512 let client: AdminClient<_> = connection
513 .create_with_context(
514 storage_configuration,
515 MzClientContext::default(),
516 &BTreeMap::new(),
517 InTask::No,
518 )
519 .await
520 .map_err(|e| {
521 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
523 })?;
524
525 let metadata = client
526 .inner()
527 .fetch_metadata(
528 None,
529 storage_configuration
530 .parameters
531 .kafka_timeout_config
532 .fetch_metadata_timeout,
533 )
534 .map_err(|e| {
535 KafkaSinkPurificationError::AdminClientError(Arc::new(
536 ContextCreationError::KafkaError(e),
537 ))
538 })?;
539
540 if metadata.brokers().len() == 0 {
541 Err(KafkaSinkPurificationError::ZeroBrokers)?;
542 }
543 }
544 CreateSinkConnection::Iceberg {
545 catalog_connection,
546 aws_connection,
547 ..
548 } => {
549 let scx = StatementContext::new(None, &catalog);
550 let connection = {
551 let item = scx.get_item_by_resolved_name(catalog_connection)?;
552 match item.connection()? {
554 Connection::IcebergCatalog(connection) => {
555 connection.clone().into_inline_connection(scx.catalog)
556 }
557 _ => sql_bail!(
558 "{} is not an iceberg connection",
559 scx.catalog.resolve_full_name(item.name())
560 ),
561 }
562 };
563
564 if let Some(s3tables) = connection.s3tables_catalog() {
568 let enable_region_check =
569 ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
570 if enable_region_check {
571 let env_id = &catalog.config().environment_id;
572 if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
573 let env_region = env_id.cloud_provider_region();
574 let s3_tables_region = s3tables
577 .aws_connection
578 .connection
579 .region
580 .clone()
581 .unwrap_or_else(|| "us-east-1".to_string());
582 if s3_tables_region != env_region {
583 Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
584 s3_tables_region,
585 environment_region: env_region.to_string(),
586 })?;
587 }
588 }
589 }
590 }
591
592 if let Some(aws_connection) = aws_connection {
598 let aws_conn_id = aws_connection.item_id();
599 let aws_connection = {
600 let item = scx.get_item_by_resolved_name(aws_connection)?;
601 match item.connection()? {
603 Connection::Aws(aws_connection) => aws_connection.clone(),
604 _ => sql_bail!(
605 "{} is not an aws connection",
606 scx.catalog.resolve_full_name(item.name())
607 ),
608 }
609 };
610
611 let _sdk_config = aws_connection
612 .load_sdk_config(
613 &storage_configuration.connection_context,
614 aws_conn_id.clone(),
615 InTask::No,
616 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
617 .get(storage_configuration.config_set()),
618 )
619 .await
620 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
621 }
622
623 let _catalog = connection
627 .connect(storage_configuration, InTask::No)
628 .await
629 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
630 }
631 }
632
633 let mut csr_connection_ids = BTreeSet::new();
634 for_each_format(format, |_, fmt| match fmt {
635 Format::Avro(AvroSchema::InlineSchema { .. })
636 | Format::Bytes
637 | Format::Csv { .. }
638 | Format::Json { .. }
639 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
640 | Format::Regex(..)
641 | Format::Text => (),
642 Format::Avro(AvroSchema::Csr {
643 csr_connection: CsrConnectionAvro { connection, .. },
644 })
645 | Format::Protobuf(ProtobufSchema::Csr {
646 csr_connection: CsrConnectionProtobuf { connection, .. },
647 }) => {
648 csr_connection_ids.insert(*connection.connection.item_id());
649 }
650 });
651
652 let scx = StatementContext::new(None, &catalog);
653 for csr_connection_id in csr_connection_ids {
654 let connection = {
655 let item = scx.get_item(&csr_connection_id);
656 match item.connection()? {
658 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
659 _ => Err(CsrPurificationError::NotCsrConnection(
660 scx.catalog.resolve_full_name(item.name()),
661 ))?,
662 }
663 };
664
665 let client = connection
666 .connect(storage_configuration, InTask::No)
667 .await
668 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
669
670 client
671 .list_subjects()
672 .await
673 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
674 }
675
676 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
677
678 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
679}
680
681fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
688where
689 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
690{
691 match format {
692 None => (),
693 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
694 Some(FormatSpecifier::KeyValue { key, value }) => {
695 f(DocOnSchema::KeyOnly, key);
696 f(DocOnSchema::ValueOnly, value);
697 }
698 }
699}
700
701#[derive(Debug, Copy, Clone, PartialEq, Eq)]
704pub(crate) enum SourceReferencePolicy {
705 NotAllowed,
708 Optional,
711 Required,
714}
715
716async fn purify_create_source(
717 catalog: impl SessionCatalog,
718 now: u64,
719 mut create_source_stmt: CreateSourceStatement<Aug>,
720 storage_configuration: &StorageConfiguration,
721) -> Result<PurifiedStatement, PlanError> {
722 let CreateSourceStatement {
723 name: source_name,
724 col_names,
725 key_constraint,
726 connection: source_connection,
727 format,
728 envelope,
729 include_metadata,
730 external_references,
731 progress_subsource,
732 with_options,
733 ..
734 } = &mut create_source_stmt;
735
736 let uses_old_syntax = !col_names.is_empty()
737 || key_constraint.is_some()
738 || format.is_some()
739 || envelope.is_some()
740 || !include_metadata.is_empty()
741 || external_references.is_some()
742 || progress_subsource.is_some();
743
744 if let Some(DeferredItemName::Named(_)) = progress_subsource {
745 sql_bail!("Cannot manually ID qualify progress subsource")
746 }
747
748 let mut requested_subsource_map = BTreeMap::new();
749
750 let progress_desc = match &source_connection {
751 CreateSourceConnection::Kafka { .. } => {
752 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
753 }
754 CreateSourceConnection::Postgres { .. } => {
755 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
756 }
757 CreateSourceConnection::SqlServer { .. } => {
758 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
759 }
760 CreateSourceConnection::MySql { .. } => {
761 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
762 }
763 CreateSourceConnection::LoadGenerator { .. } => {
764 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
765 }
766 };
767 let scx = StatementContext::new(None, &catalog);
768
769 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
773 && scx.catalog.system_vars().force_source_table_syntax()
774 {
775 SourceReferencePolicy::NotAllowed
776 } else if scx.catalog.system_vars().enable_create_table_from_source() {
777 SourceReferencePolicy::Optional
778 } else {
779 SourceReferencePolicy::Required
780 };
781
782 let mut format_options = SourceFormatOptions::Default;
783
784 let retrieved_source_references: RetrievedSourceReferences;
785
786 match source_connection {
787 CreateSourceConnection::Kafka {
788 connection,
789 options: base_with_options,
790 ..
791 } => {
792 if let Some(external_references) = external_references {
793 Err(KafkaSourcePurificationError::ReferencedSubsources(
794 external_references.clone(),
795 ))?;
796 }
797
798 let connection = {
799 let item = scx.get_item_by_resolved_name(connection)?;
800 match item.connection()? {
802 Connection::Kafka(connection) => {
803 connection.clone().into_inline_connection(&catalog)
804 }
805 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
806 scx.catalog.resolve_full_name(item.name()),
807 ))?,
808 }
809 };
810
811 let extracted_options: KafkaSourceConfigOptionExtracted =
812 base_with_options.clone().try_into()?;
813
814 let topic = extracted_options
815 .topic
816 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
817
818 let consumer = connection
819 .create_with_context(
820 storage_configuration,
821 MzClientContext::default(),
822 &BTreeMap::new(),
823 InTask::No,
824 )
825 .await
826 .map_err(|e| {
827 KafkaSourcePurificationError::KafkaConsumerError(
829 e.display_with_causes().to_string(),
830 )
831 })?;
832 let consumer = Arc::new(consumer);
833
834 match (
835 extracted_options.start_offset,
836 extracted_options.start_timestamp,
837 ) {
838 (None, None) => {
839 kafka_util::ensure_topic_exists(
841 Arc::clone(&consumer),
842 &topic,
843 storage_configuration
844 .parameters
845 .kafka_timeout_config
846 .fetch_metadata_timeout,
847 )
848 .await?;
849 }
850 (Some(_), Some(_)) => {
851 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
852 }
853 (Some(start_offsets), None) => {
854 kafka_util::validate_start_offsets(
856 Arc::clone(&consumer),
857 &topic,
858 start_offsets,
859 storage_configuration
860 .parameters
861 .kafka_timeout_config
862 .fetch_metadata_timeout,
863 )
864 .await?;
865 }
866 (None, Some(time_offset)) => {
867 let start_offsets = kafka_util::lookup_start_offsets(
869 Arc::clone(&consumer),
870 &topic,
871 time_offset,
872 now,
873 storage_configuration
874 .parameters
875 .kafka_timeout_config
876 .fetch_metadata_timeout,
877 )
878 .await?;
879
880 base_with_options.retain(|val| {
881 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
882 });
883 base_with_options.push(KafkaSourceConfigOption {
884 name: KafkaSourceConfigOptionName::StartOffset,
885 value: Some(WithOptionValue::Sequence(
886 start_offsets
887 .iter()
888 .map(|offset| {
889 WithOptionValue::Value(Value::Number(offset.to_string()))
890 })
891 .collect(),
892 )),
893 });
894 }
895 }
896
897 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
898 retrieved_source_references = reference_client.get_source_references().await?;
899
900 format_options = SourceFormatOptions::Kafka { topic };
901 }
902 CreateSourceConnection::Postgres {
903 connection,
904 options,
905 } => {
906 let connection_item = scx.get_item_by_resolved_name(connection)?;
907 let connection = match connection_item.connection().map_err(PlanError::from)? {
908 Connection::Postgres(connection) => {
909 connection.clone().into_inline_connection(&catalog)
910 }
911 _ => Err(PgSourcePurificationError::NotPgConnection(
912 scx.catalog.resolve_full_name(connection_item.name()),
913 ))?,
914 };
915 let crate::plan::statement::PgConfigOptionExtracted {
916 publication,
917 text_columns,
918 exclude_columns,
919 details,
920 ..
921 } = options.clone().try_into()?;
922 let publication =
923 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
924
925 if details.is_some() {
926 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
927 }
928
929 let client = connection
930 .validate(connection_item.id(), storage_configuration)
931 .await?;
932
933 let reference_client = SourceReferenceClient::Postgres {
934 client: &client,
935 publication: &publication,
936 database: &connection.database,
937 };
938 retrieved_source_references = reference_client.get_source_references().await?;
939
940 let postgres::PurifiedSourceExports {
941 source_exports: subsources,
942 normalized_text_columns,
943 } = postgres::purify_source_exports(
944 &client,
945 &retrieved_source_references,
946 external_references,
947 text_columns,
948 exclude_columns,
949 source_name,
950 &reference_policy,
951 )
952 .await?;
953
954 if let Some(text_cols_option) = options
955 .iter_mut()
956 .find(|option| option.name == PgConfigOptionName::TextColumns)
957 {
958 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
959 }
960
961 requested_subsource_map.extend(subsources);
962
963 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
966
967 let is_physical_replica = Some(mz_postgres_util::get_is_in_recovery(&client).await?);
970
971 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
973 let details = PostgresSourcePublicationDetails {
974 slot: format!(
975 "materialize_{}",
976 Uuid::new_v4().to_string().replace('-', "")
977 ),
978 timeline_id: Some(timeline_id),
979 database: connection.database,
980 is_physical_replica,
981 };
982 options.push(PgConfigOption {
983 name: PgConfigOptionName::Details,
984 value: Some(WithOptionValue::Value(Value::String(hex::encode(
985 details.into_proto().encode_to_vec(),
986 )))),
987 })
988 }
989 CreateSourceConnection::SqlServer {
990 connection,
991 options,
992 } => {
993 let connection_item = scx.get_item_by_resolved_name(connection)?;
996 let connection = match connection_item.connection()? {
997 Connection::SqlServer(connection) => {
998 connection.clone().into_inline_connection(&catalog)
999 }
1000 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
1001 scx.catalog.resolve_full_name(connection_item.name()),
1002 ))?,
1003 };
1004 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
1005 details,
1006 text_columns,
1007 exclude_columns,
1008 seen: _,
1009 } = options.clone().try_into()?;
1010
1011 if details.is_some() {
1012 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
1013 }
1014
1015 let mut client = connection
1016 .validate(connection_item.id(), storage_configuration)
1017 .await?;
1018
1019 let database: Arc<str> = connection.database.into();
1020 let reference_client = SourceReferenceClient::SqlServer {
1021 client: &mut client,
1022 database: Arc::clone(&database),
1023 };
1024 retrieved_source_references = reference_client.get_source_references().await?;
1025 tracing::debug!(?retrieved_source_references, "got source references");
1026
1027 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1028 .get(storage_configuration.config_set());
1029
1030 let purified_source_exports = sql_server::purify_source_exports(
1031 &*database,
1032 &mut client,
1033 &retrieved_source_references,
1034 external_references,
1035 &text_columns,
1036 &exclude_columns,
1037 source_name,
1038 timeout,
1039 &reference_policy,
1040 )
1041 .await?;
1042
1043 let sql_server::PurifiedSourceExports {
1044 source_exports,
1045 normalized_text_columns,
1046 normalized_excl_columns,
1047 } = purified_source_exports;
1048
1049 requested_subsource_map.extend(source_exports);
1051
1052 let restore_history_id =
1056 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1057 let details = SqlServerSourceExtras { restore_history_id };
1058
1059 options.retain(|SqlServerConfigOption { name, .. }| {
1060 name != &SqlServerConfigOptionName::Details
1061 });
1062 options.push(SqlServerConfigOption {
1063 name: SqlServerConfigOptionName::Details,
1064 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1065 details.into_proto().encode_to_vec(),
1066 )))),
1067 });
1068
1069 if let Some(text_cols_option) = options
1071 .iter_mut()
1072 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1073 {
1074 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1075 }
1076 if let Some(excl_cols_option) = options
1077 .iter_mut()
1078 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1079 {
1080 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1081 }
1082 }
1083 CreateSourceConnection::MySql {
1084 connection,
1085 options,
1086 } => {
1087 let connection_item = scx.get_item_by_resolved_name(connection)?;
1088 let connection = match connection_item.connection()? {
1089 Connection::MySql(connection) => {
1090 connection.clone().into_inline_connection(&catalog)
1091 }
1092 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1093 scx.catalog.resolve_full_name(connection_item.name()),
1094 ))?,
1095 };
1096 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1097 details,
1098 text_columns,
1099 exclude_columns,
1100 seen: _,
1101 } = options.clone().try_into()?;
1102
1103 if details.is_some() {
1104 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1105 }
1106
1107 let mut conn = connection
1108 .validate(connection_item.id(), storage_configuration)
1109 .await
1110 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1111
1112 let initial_gtid_set =
1116 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1117
1118 let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1119
1120 let reference_client = SourceReferenceClient::MySql {
1121 conn: &mut conn,
1122 include_system_schemas: mysql::references_system_schemas(external_references),
1123 };
1124 retrieved_source_references = reference_client.get_source_references().await?;
1125
1126 let mysql::PurifiedSourceExports {
1127 source_exports: subsources,
1128 normalized_text_columns,
1129 normalized_exclude_columns,
1130 } = mysql::purify_source_exports(
1131 &mut conn,
1132 &retrieved_source_references,
1133 external_references,
1134 text_columns,
1135 exclude_columns,
1136 source_name,
1137 initial_gtid_set.clone(),
1138 &reference_policy,
1139 binlog_full_metadata,
1140 )
1141 .await?;
1142 requested_subsource_map.extend(subsources);
1143
1144 let details = MySqlSourceDetails {};
1147 options
1149 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1150 options.push(MySqlConfigOption {
1151 name: MySqlConfigOptionName::Details,
1152 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1153 details.into_proto().encode_to_vec(),
1154 )))),
1155 });
1156
1157 if let Some(text_cols_option) = options
1158 .iter_mut()
1159 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1160 {
1161 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1162 }
1163 if let Some(exclude_cols_option) = options
1164 .iter_mut()
1165 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1166 {
1167 exclude_cols_option.value =
1168 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1169 }
1170 }
1171 CreateSourceConnection::LoadGenerator { generator, options } => {
1172 let load_generator =
1173 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1174
1175 let reference_client = SourceReferenceClient::LoadGenerator {
1176 generator: &load_generator,
1177 };
1178 retrieved_source_references = reference_client.get_source_references().await?;
1179 let multi_output_sources =
1183 retrieved_source_references
1184 .all_references()
1185 .iter()
1186 .any(|r| {
1187 matches!(
1188 r.load_generator_output(),
1189 Some(output) if output != &LoadGeneratorOutput::Default
1190 )
1191 });
1192
1193 match external_references {
1194 Some(requested)
1195 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1196 {
1197 Err(PlanError::UseTablesForSources(requested.to_string()))?
1198 }
1199 Some(requested) if !multi_output_sources => match requested {
1200 ExternalReferences::SubsetTables(_) => {
1201 Err(LoadGeneratorSourcePurificationError::ForTables)?
1202 }
1203 ExternalReferences::SubsetSchemas(_) => {
1204 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1205 }
1206 ExternalReferences::All => {
1207 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1208 }
1209 },
1210 Some(requested) => {
1211 let requested_exports = retrieved_source_references
1212 .requested_source_exports(Some(requested), source_name)?;
1213 for export in requested_exports {
1214 requested_subsource_map.insert(
1215 export.name,
1216 PurifiedSourceExport {
1217 external_reference: export.external_reference,
1218 details: PurifiedExportDetails::LoadGenerator {
1219 table: export
1220 .meta
1221 .load_generator_desc()
1222 .ok_or_else(|| {
1223 internal_err!(
1224 "expected load generator source reference"
1225 )
1226 })?
1227 .clone(),
1228 output: export
1229 .meta
1230 .load_generator_output()
1231 .ok_or_else(|| {
1232 internal_err!(
1233 "expected load generator source reference"
1234 )
1235 })?
1236 .clone(),
1237 },
1238 },
1239 );
1240 }
1241 }
1242 None => {
1243 if multi_output_sources
1244 && matches!(reference_policy, SourceReferencePolicy::Required)
1245 {
1246 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1247 }
1248 }
1249 }
1250
1251 if let LoadGenerator::Clock = generator {
1252 if !options
1253 .iter()
1254 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1255 {
1256 let now = catalog.now();
1257 options.push(LoadGeneratorOption {
1258 name: LoadGeneratorOptionName::AsOf,
1259 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1260 });
1261 }
1262 }
1263 }
1264 }
1265
1266 *external_references = None;
1270
1271 let create_progress_subsource_stmt = if uses_old_syntax {
1273 let name = match progress_subsource {
1275 Some(name) => match name {
1276 DeferredItemName::Deferred(name) => name.clone(),
1277 DeferredItemName::Named(_) => {
1279 sql_bail!("progress subsource name cannot be a resolved name")
1280 }
1281 },
1282 None => {
1283 let (item, prefix) = source_name
1284 .0
1285 .split_last()
1286 .ok_or_else(|| sql_err!("source name must have at least one component"))?;
1287 let item_name =
1288 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1289 let mut suggested_name = prefix.to_vec();
1290 suggested_name.push(candidate.clone());
1291
1292 let partial =
1293 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1294 let qualified = scx.allocate_qualified_name(partial)?;
1295 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1296 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1297 Ok::<_, PlanError>(!item_exists && !type_exists)
1298 })?;
1299
1300 let mut full_name = prefix.to_vec();
1301 full_name.push(item_name);
1302 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1303 let qualified_name = scx.allocate_qualified_name(full_name)?;
1304 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1305
1306 UnresolvedItemName::from(full_name.clone())
1307 }
1308 };
1309
1310 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1311
1312 let mut progress_with_options: Vec<_> = with_options
1314 .iter()
1315 .filter_map(|opt| match opt.name {
1316 CreateSourceOptionName::TimestampInterval => None,
1317 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1318 name: CreateSubsourceOptionName::RetainHistory,
1319 value: opt.value.clone(),
1320 }),
1321 })
1322 .collect();
1323 progress_with_options.push(CreateSubsourceOption {
1324 name: CreateSubsourceOptionName::Progress,
1325 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1326 });
1327
1328 Some(CreateSubsourceStatement {
1329 name,
1330 columns,
1331 of_source: None,
1335 constraints,
1336 if_not_exists: false,
1337 with_options: progress_with_options,
1338 })
1339 } else {
1340 None
1341 };
1342
1343 purify_source_format(
1344 &catalog,
1345 format,
1346 &format_options,
1347 envelope,
1348 storage_configuration,
1349 )
1350 .await?;
1351
1352 Ok(PurifiedStatement::PurifiedCreateSource {
1353 create_progress_subsource_stmt,
1354 create_source_stmt,
1355 subsources: requested_subsource_map,
1356 available_source_references: retrieved_source_references.available_source_references(),
1357 })
1358}
1359
1360async fn purify_alter_source(
1363 catalog: impl SessionCatalog,
1364 stmt: AlterSourceStatement<Aug>,
1365 storage_configuration: &StorageConfiguration,
1366) -> Result<PurifiedStatement, PlanError> {
1367 let scx = StatementContext::new(None, &catalog);
1368 let AlterSourceStatement {
1369 source_name: unresolved_source_name,
1370 action,
1371 if_exists,
1372 } = stmt;
1373
1374 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1376 Ok(item) => item,
1377 Err(_) if if_exists => {
1378 return Ok(PurifiedStatement::PurifiedAlterSource {
1379 alter_source_stmt: AlterSourceStatement {
1380 source_name: unresolved_source_name,
1381 action,
1382 if_exists,
1383 },
1384 });
1385 }
1386 Err(e) => return Err(e),
1387 };
1388
1389 let desc = match item.source_desc()? {
1391 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1392 None => {
1393 sql_bail!("cannot ALTER this type of source")
1394 }
1395 };
1396
1397 let source_name = item.name();
1398
1399 let resolved_source_name = ResolvedItemName::Item {
1400 id: item.id(),
1401 qualifiers: item.name().qualifiers.clone(),
1402 full_name: scx.catalog.resolve_full_name(source_name),
1403 print_id: true,
1404 version: RelationVersionSelector::Latest,
1405 };
1406
1407 let partial_name = scx.catalog.minimal_qualification(source_name);
1408
1409 match action {
1410 AlterSourceAction::AddSubsources {
1411 external_references,
1412 options,
1413 } => {
1414 if scx.catalog.system_vars().enable_create_table_from_source()
1415 && scx.catalog.system_vars().force_source_table_syntax()
1416 {
1417 Err(PlanError::UseTablesForSources(
1418 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1419 ))?;
1420 }
1421
1422 purify_alter_source_add_subsources(
1423 external_references,
1424 options,
1425 desc,
1426 partial_name,
1427 unresolved_source_name,
1428 resolved_source_name,
1429 storage_configuration,
1430 )
1431 .await
1432 }
1433 AlterSourceAction::RefreshReferences => {
1434 purify_alter_source_refresh_references(
1435 desc,
1436 resolved_source_name,
1437 storage_configuration,
1438 )
1439 .await
1440 }
1441 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1442 alter_source_stmt: AlterSourceStatement {
1443 source_name: unresolved_source_name,
1444 action,
1445 if_exists,
1446 },
1447 }),
1448 }
1449}
1450
1451async fn purify_alter_source_add_subsources(
1454 external_references: Vec<ExternalReferenceExport>,
1455 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1456 desc: SourceDesc,
1457 partial_source_name: PartialItemName,
1458 unresolved_source_name: UnresolvedItemName,
1459 resolved_source_name: ResolvedItemName,
1460 storage_configuration: &StorageConfiguration,
1461) -> Result<PurifiedStatement, PlanError> {
1462 let connection_id = match &desc.connection {
1464 GenericSourceConnection::Postgres(c) => c.connection_id,
1465 GenericSourceConnection::MySql(c) => c.connection_id,
1466 GenericSourceConnection::SqlServer(c) => c.connection_id,
1467 _ => sql_bail!(
1468 "source {} does not support ALTER SOURCE.",
1469 partial_source_name
1470 ),
1471 };
1472
1473 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1474 text_columns,
1475 exclude_columns,
1476 details,
1477 seen: _,
1478 } = options.clone().try_into()?;
1479 if details.is_some() {
1480 sql_bail!("DETAILS option cannot be explicitly set");
1481 }
1482
1483 let mut requested_subsource_map = BTreeMap::new();
1484
1485 match desc.connection {
1486 GenericSourceConnection::Postgres(pg_source_connection) => {
1487 let pg_connection = &pg_source_connection.connection;
1489
1490 let client = pg_connection
1491 .validate(connection_id, storage_configuration)
1492 .await?;
1493
1494 let reference_client = SourceReferenceClient::Postgres {
1495 client: &client,
1496 publication: &pg_source_connection.publication,
1497 database: &pg_connection.database,
1498 };
1499 let retrieved_source_references = reference_client.get_source_references().await?;
1500
1501 let postgres::PurifiedSourceExports {
1502 source_exports: subsources,
1503 normalized_text_columns,
1504 } = postgres::purify_source_exports(
1505 &client,
1506 &retrieved_source_references,
1507 &Some(ExternalReferences::SubsetTables(external_references)),
1508 text_columns,
1509 exclude_columns,
1510 &unresolved_source_name,
1511 &SourceReferencePolicy::Required,
1512 )
1513 .await?;
1514
1515 if let Some(text_cols_option) = options
1516 .iter_mut()
1517 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1518 {
1519 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1520 }
1521
1522 requested_subsource_map.extend(subsources);
1523 }
1524 GenericSourceConnection::MySql(mysql_source_connection) => {
1525 let mysql_connection = &mysql_source_connection.connection;
1526 let config = mysql_connection
1527 .config(
1528 &storage_configuration.connection_context.secrets_reader,
1529 storage_configuration,
1530 InTask::No,
1531 )
1532 .await?;
1533
1534 let mut conn = config
1535 .connect(
1536 "mysql purification",
1537 &storage_configuration.connection_context.ssh_tunnel_manager,
1538 )
1539 .await?;
1540
1541 let initial_gtid_set =
1544 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1545
1546 let binlog_full_metadata = is_binlog_full_metadata(&mut conn).await?;
1547
1548 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1549
1550 let reference_client = SourceReferenceClient::MySql {
1551 conn: &mut conn,
1552 include_system_schemas: mysql::references_system_schemas(&requested_references),
1553 };
1554 let retrieved_source_references = reference_client.get_source_references().await?;
1555
1556 let mysql::PurifiedSourceExports {
1557 source_exports: subsources,
1558 normalized_text_columns,
1559 normalized_exclude_columns,
1560 } = mysql::purify_source_exports(
1561 &mut conn,
1562 &retrieved_source_references,
1563 &requested_references,
1564 text_columns,
1565 exclude_columns,
1566 &unresolved_source_name,
1567 initial_gtid_set,
1568 &SourceReferencePolicy::Required,
1569 binlog_full_metadata,
1570 )
1571 .await?;
1572 requested_subsource_map.extend(subsources);
1573
1574 if let Some(text_cols_option) = options
1576 .iter_mut()
1577 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1578 {
1579 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1580 }
1581 if let Some(exclude_cols_option) = options
1582 .iter_mut()
1583 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1584 {
1585 exclude_cols_option.value =
1586 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1587 }
1588 }
1589 GenericSourceConnection::SqlServer(sql_server_source) => {
1590 let sql_server_connection = &sql_server_source.connection;
1592 let config = sql_server_connection
1593 .resolve_config(
1594 &storage_configuration.connection_context.secrets_reader,
1595 storage_configuration,
1596 InTask::No,
1597 )
1598 .await?;
1599 let mut client = mz_sql_server_util::Client::connect(config).await?;
1600
1601 let database = sql_server_connection.database.clone().into();
1603 let source_references = SourceReferenceClient::SqlServer {
1604 client: &mut client,
1605 database: Arc::clone(&database),
1606 }
1607 .get_source_references()
1608 .await?;
1609 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1610
1611 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1612 .get(storage_configuration.config_set());
1613
1614 let result = sql_server::purify_source_exports(
1615 &*database,
1616 &mut client,
1617 &source_references,
1618 &requested_references,
1619 &text_columns,
1620 &exclude_columns,
1621 &unresolved_source_name,
1622 timeout,
1623 &SourceReferencePolicy::Required,
1624 )
1625 .await;
1626 let sql_server::PurifiedSourceExports {
1627 source_exports,
1628 normalized_text_columns,
1629 normalized_excl_columns,
1630 } = result?;
1631
1632 requested_subsource_map.extend(source_exports);
1634
1635 if let Some(text_cols_option) = options
1637 .iter_mut()
1638 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1639 {
1640 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1641 }
1642 if let Some(exclude_cols_option) = options
1643 .iter_mut()
1644 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1645 {
1646 exclude_cols_option.value =
1647 Some(WithOptionValue::Sequence(normalized_excl_columns));
1648 }
1649 }
1650 _ => bail_internal!("source does not support ALTER SOURCE...ADD SUBSOURCE"),
1653 };
1654
1655 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1656 source_name: resolved_source_name,
1657 options,
1658 subsources: requested_subsource_map,
1659 })
1660}
1661
1662async fn purify_alter_source_refresh_references(
1663 desc: SourceDesc,
1664 resolved_source_name: ResolvedItemName,
1665 storage_configuration: &StorageConfiguration,
1666) -> Result<PurifiedStatement, PlanError> {
1667 let retrieved_source_references = match desc.connection {
1668 GenericSourceConnection::Postgres(pg_source_connection) => {
1669 let pg_connection = &pg_source_connection.connection;
1671
1672 let config = pg_connection
1673 .config(
1674 &storage_configuration.connection_context.secrets_reader,
1675 storage_configuration,
1676 InTask::No,
1677 )
1678 .await?;
1679
1680 let client = config
1681 .connect(
1682 "postgres_purification",
1683 &storage_configuration.connection_context.ssh_tunnel_manager,
1684 )
1685 .await?;
1686 let reference_client = SourceReferenceClient::Postgres {
1687 client: &client,
1688 publication: &pg_source_connection.publication,
1689 database: &pg_connection.database,
1690 };
1691 reference_client.get_source_references().await?
1692 }
1693 GenericSourceConnection::MySql(mysql_source_connection) => {
1694 let mysql_connection = &mysql_source_connection.connection;
1695 let config = mysql_connection
1696 .config(
1697 &storage_configuration.connection_context.secrets_reader,
1698 storage_configuration,
1699 InTask::No,
1700 )
1701 .await?;
1702
1703 let mut conn = config
1704 .connect(
1705 "mysql purification",
1706 &storage_configuration.connection_context.ssh_tunnel_manager,
1707 )
1708 .await?;
1709
1710 let reference_client = SourceReferenceClient::MySql {
1711 conn: &mut conn,
1712 include_system_schemas: false,
1713 };
1714 reference_client.get_source_references().await?
1715 }
1716 GenericSourceConnection::SqlServer(sql_server_source) => {
1717 let sql_server_connection = &sql_server_source.connection;
1719 let config = sql_server_connection
1720 .resolve_config(
1721 &storage_configuration.connection_context.secrets_reader,
1722 storage_configuration,
1723 InTask::No,
1724 )
1725 .await?;
1726 let mut client = mz_sql_server_util::Client::connect(config).await?;
1727
1728 let source_references = SourceReferenceClient::SqlServer {
1730 client: &mut client,
1731 database: sql_server_connection.database.clone().into(),
1732 }
1733 .get_source_references()
1734 .await?;
1735 source_references
1736 }
1737 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1738 let reference_client = SourceReferenceClient::LoadGenerator {
1739 generator: &load_gen_connection.load_generator,
1740 };
1741 reference_client.get_source_references().await?
1742 }
1743 GenericSourceConnection::Kafka(kafka_conn) => {
1744 let reference_client = SourceReferenceClient::Kafka {
1745 topic: &kafka_conn.topic,
1746 };
1747 reference_client.get_source_references().await?
1748 }
1749 };
1750 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1751 source_name: resolved_source_name,
1752 available_source_references: retrieved_source_references.available_source_references(),
1753 })
1754}
1755
1756async fn purify_create_table_from_source(
1757 catalog: impl SessionCatalog,
1758 mut stmt: CreateTableFromSourceStatement<Aug>,
1759 storage_configuration: &StorageConfiguration,
1760) -> Result<PurifiedStatement, PlanError> {
1761 let scx = StatementContext::new(None, &catalog);
1762 let CreateTableFromSourceStatement {
1763 name: _,
1764 columns,
1765 constraints,
1766 source: source_name,
1767 if_not_exists: _,
1768 external_reference,
1769 format,
1770 envelope,
1771 include_metadata: _,
1772 with_options,
1773 } = &mut stmt;
1774
1775 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1777 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1778 }
1779 if !constraints.is_empty() {
1780 sql_bail!(
1781 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1782 );
1783 }
1784
1785 let item = match scx.get_item_by_resolved_name(source_name) {
1787 Ok(item) => item,
1788 Err(e) => return Err(e),
1789 };
1790
1791 let desc = match item.source_desc()? {
1793 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1794 None => {
1795 sql_bail!("cannot ALTER this type of source")
1796 }
1797 };
1798 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1799
1800 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1801 text_columns,
1802 exclude_columns,
1803 retain_history: _,
1804 details,
1805 partition_by: _,
1806 seen: _,
1807 } = with_options.clone().try_into()?;
1808 if details.is_some() {
1809 sql_bail!("DETAILS option cannot be explicitly set");
1810 }
1811
1812 let qualified_text_columns = text_columns
1816 .iter()
1817 .map(|col| {
1818 UnresolvedItemName(
1819 external_reference
1820 .as_ref()
1821 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1822 .unwrap_or_else(|| vec![col.clone()]),
1823 )
1824 })
1825 .collect_vec();
1826 let qualified_exclude_columns = exclude_columns
1827 .iter()
1828 .map(|col| {
1829 UnresolvedItemName(
1830 external_reference
1831 .as_ref()
1832 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1833 .unwrap_or_else(|| vec![col.clone()]),
1834 )
1835 })
1836 .collect_vec();
1837
1838 let mut format_options = SourceFormatOptions::Default;
1840
1841 let retrieved_source_references: RetrievedSourceReferences;
1842
1843 let requested_references = external_reference.as_ref().map(|ref_name| {
1844 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1845 reference: ref_name.clone(),
1846 alias: None,
1847 }])
1848 });
1849
1850 let purified_export = match desc.connection {
1853 GenericSourceConnection::Postgres(pg_source_connection) => {
1854 let pg_connection = &pg_source_connection.connection;
1856
1857 let client = pg_connection
1858 .validate(pg_source_connection.connection_id, storage_configuration)
1859 .await?;
1860
1861 let reference_client = SourceReferenceClient::Postgres {
1862 client: &client,
1863 publication: &pg_source_connection.publication,
1864 database: &pg_connection.database,
1865 };
1866 retrieved_source_references = reference_client.get_source_references().await?;
1867
1868 let postgres::PurifiedSourceExports {
1869 source_exports,
1870 normalized_text_columns: _,
1874 } = postgres::purify_source_exports(
1875 &client,
1876 &retrieved_source_references,
1877 &requested_references,
1878 qualified_text_columns,
1879 qualified_exclude_columns,
1880 &unresolved_source_name,
1881 &SourceReferencePolicy::Required,
1882 )
1883 .await?;
1884 let (_, purified_export) = source_exports.into_element();
1886 purified_export
1887 }
1888 GenericSourceConnection::MySql(mysql_source_connection) => {
1889 let mysql_connection = &mysql_source_connection.connection;
1890 let config = mysql_connection
1891 .config(
1892 &storage_configuration.connection_context.secrets_reader,
1893 storage_configuration,
1894 InTask::No,
1895 )
1896 .await?;
1897
1898 let mut conn = config
1899 .connect(
1900 "mysql purification",
1901 &storage_configuration.connection_context.ssh_tunnel_manager,
1902 )
1903 .await?;
1904
1905 ensure_binlog_full_metadata(&mut conn).await?;
1906 let binlog_full_metadata = true;
1907
1908 let initial_gtid_set =
1911 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1912
1913 let reference_client = SourceReferenceClient::MySql {
1914 conn: &mut conn,
1915 include_system_schemas: mysql::references_system_schemas(&requested_references),
1916 };
1917 retrieved_source_references = reference_client.get_source_references().await?;
1918
1919 let mysql::PurifiedSourceExports {
1920 source_exports,
1921 normalized_text_columns: _,
1925 normalized_exclude_columns: _,
1926 } = mysql::purify_source_exports(
1927 &mut conn,
1928 &retrieved_source_references,
1929 &requested_references,
1930 qualified_text_columns,
1931 qualified_exclude_columns,
1932 &unresolved_source_name,
1933 initial_gtid_set,
1934 &SourceReferencePolicy::Required,
1935 binlog_full_metadata,
1936 )
1937 .await?;
1938 let (_, purified_export) = source_exports.into_element();
1940 purified_export
1941 }
1942 GenericSourceConnection::SqlServer(sql_server_source) => {
1943 let connection = sql_server_source.connection;
1944 let config = connection
1945 .resolve_config(
1946 &storage_configuration.connection_context.secrets_reader,
1947 storage_configuration,
1948 InTask::No,
1949 )
1950 .await?;
1951 let mut client = mz_sql_server_util::Client::connect(config).await?;
1952
1953 let database: Arc<str> = connection.database.into();
1954 let reference_client = SourceReferenceClient::SqlServer {
1955 client: &mut client,
1956 database: Arc::clone(&database),
1957 };
1958 retrieved_source_references = reference_client.get_source_references().await?;
1959 tracing::debug!(?retrieved_source_references, "got source references");
1960
1961 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1962 .get(storage_configuration.config_set());
1963
1964 let purified_source_exports = sql_server::purify_source_exports(
1965 &*database,
1966 &mut client,
1967 &retrieved_source_references,
1968 &requested_references,
1969 &qualified_text_columns,
1970 &qualified_exclude_columns,
1971 &unresolved_source_name,
1972 timeout,
1973 &SourceReferencePolicy::Required,
1974 )
1975 .await?;
1976
1977 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1979 purified_export
1980 }
1981 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1982 let reference_client = SourceReferenceClient::LoadGenerator {
1983 generator: &load_gen_connection.load_generator,
1984 };
1985 retrieved_source_references = reference_client.get_source_references().await?;
1986
1987 let requested_exports = retrieved_source_references
1988 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1989 let export = requested_exports.into_element();
1991 PurifiedSourceExport {
1992 external_reference: export.external_reference,
1993 details: PurifiedExportDetails::LoadGenerator {
1994 table: export
1995 .meta
1996 .load_generator_desc()
1997 .ok_or_else(|| internal_err!("expected load generator source reference"))?
1998 .clone(),
1999 output: export
2000 .meta
2001 .load_generator_output()
2002 .ok_or_else(|| internal_err!("expected load generator source reference"))?
2003 .clone(),
2004 },
2005 }
2006 }
2007 GenericSourceConnection::Kafka(kafka_conn) => {
2008 let reference_client = SourceReferenceClient::Kafka {
2009 topic: &kafka_conn.topic,
2010 };
2011 retrieved_source_references = reference_client.get_source_references().await?;
2012 let requested_exports = retrieved_source_references
2013 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2014 let export = requested_exports.into_element();
2016
2017 format_options = SourceFormatOptions::Kafka {
2018 topic: kafka_conn.topic.clone(),
2019 };
2020 PurifiedSourceExport {
2021 external_reference: export.external_reference,
2022 details: PurifiedExportDetails::Kafka {},
2023 }
2024 }
2025 };
2026
2027 purify_source_format(
2028 &catalog,
2029 format,
2030 &format_options,
2031 envelope,
2032 storage_configuration,
2033 )
2034 .await?;
2035
2036 *external_reference = Some(purified_export.external_reference.clone());
2039
2040 match &purified_export.details {
2042 PurifiedExportDetails::Postgres { .. } => {
2043 let mut unsupported_cols = vec![];
2044 let postgres::PostgresExportStatementValues {
2045 columns: gen_columns,
2046 constraints: gen_constraints,
2047 text_columns: gen_text_columns,
2048 exclude_columns: gen_exclude_columns,
2049 details: gen_details,
2050 external_reference: _,
2051 } = postgres::generate_source_export_statement_values(
2052 &scx,
2053 purified_export,
2054 &mut unsupported_cols,
2055 )?;
2056 if !unsupported_cols.is_empty() {
2057 unsupported_cols.sort();
2058 Err(PgSourcePurificationError::UnrecognizedTypes {
2059 cols: unsupported_cols,
2060 })?;
2061 }
2062
2063 if let Some(text_cols_option) = with_options
2064 .iter_mut()
2065 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2066 {
2067 if let Some(gen_text_columns) = gen_text_columns {
2068 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2069 }
2070 }
2071 if let Some(exclude_cols_option) = with_options
2072 .iter_mut()
2073 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2074 {
2075 if let Some(gen_exclude_columns) = gen_exclude_columns {
2076 exclude_cols_option.value =
2077 Some(WithOptionValue::Sequence(gen_exclude_columns));
2078 }
2079 }
2080 match columns {
2081 TableFromSourceColumns::Defined(_) => {
2082 bail_internal!(
2085 "column definitions cannot be explicitly set for this source type"
2086 )
2087 }
2088 TableFromSourceColumns::NotSpecified => {
2089 *columns = TableFromSourceColumns::Defined(gen_columns);
2090 *constraints = gen_constraints;
2091 }
2092 TableFromSourceColumns::Named(_) => {
2093 sql_bail!("columns cannot be named for Postgres sources")
2094 }
2095 }
2096 with_options.push(TableFromSourceOption {
2097 name: TableFromSourceOptionName::Details,
2098 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2099 gen_details.into_proto().encode_to_vec(),
2100 )))),
2101 })
2102 }
2103 PurifiedExportDetails::MySql { .. } => {
2104 let mysql::MySqlExportStatementValues {
2105 columns: gen_columns,
2106 constraints: gen_constraints,
2107 text_columns: gen_text_columns,
2108 exclude_columns: gen_exclude_columns,
2109 details: gen_details,
2110 external_reference: _,
2111 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2112
2113 if let Some(text_cols_option) = with_options
2114 .iter_mut()
2115 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2116 {
2117 if let Some(gen_text_columns) = gen_text_columns {
2118 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2119 }
2120 }
2121 if let Some(exclude_cols_option) = with_options
2122 .iter_mut()
2123 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2124 {
2125 if let Some(gen_exclude_columns) = gen_exclude_columns {
2126 exclude_cols_option.value =
2127 Some(WithOptionValue::Sequence(gen_exclude_columns));
2128 }
2129 }
2130 match columns {
2131 TableFromSourceColumns::Defined(_) => {
2132 bail_internal!(
2135 "column definitions cannot be explicitly set for this source type"
2136 )
2137 }
2138 TableFromSourceColumns::NotSpecified => {
2139 *columns = TableFromSourceColumns::Defined(gen_columns);
2140 *constraints = gen_constraints;
2141 }
2142 TableFromSourceColumns::Named(_) => {
2143 sql_bail!("columns cannot be named for MySQL sources")
2144 }
2145 }
2146 with_options.push(TableFromSourceOption {
2147 name: TableFromSourceOptionName::Details,
2148 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2149 gen_details.into_proto().encode_to_vec(),
2150 )))),
2151 })
2152 }
2153 PurifiedExportDetails::SqlServer { .. } => {
2154 let sql_server::SqlServerExportStatementValues {
2155 columns: gen_columns,
2156 constraints: gen_constraints,
2157 text_columns: gen_text_columns,
2158 excl_columns: gen_excl_columns,
2159 details: gen_details,
2160 external_reference: _,
2161 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2162
2163 if let Some(text_cols_option) = with_options
2164 .iter_mut()
2165 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2166 {
2167 if let Some(gen_text_columns) = gen_text_columns {
2168 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2169 }
2170 }
2171 if let Some(exclude_cols_option) = with_options
2172 .iter_mut()
2173 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2174 {
2175 if let Some(gen_excl_columns) = gen_excl_columns {
2176 exclude_cols_option.value = Some(WithOptionValue::Sequence(gen_excl_columns));
2177 }
2178 }
2179
2180 match columns {
2181 TableFromSourceColumns::NotSpecified => {
2182 *columns = TableFromSourceColumns::Defined(gen_columns);
2183 *constraints = gen_constraints;
2184 }
2185 TableFromSourceColumns::Named(_) => {
2186 sql_bail!("columns cannot be named for SQL Server sources")
2187 }
2188 TableFromSourceColumns::Defined(_) => {
2189 bail_internal!(
2192 "column definitions cannot be explicitly set for this source type"
2193 )
2194 }
2195 }
2196
2197 with_options.push(TableFromSourceOption {
2198 name: TableFromSourceOptionName::Details,
2199 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2200 gen_details.into_proto().encode_to_vec(),
2201 )))),
2202 })
2203 }
2204 PurifiedExportDetails::LoadGenerator { .. } => {
2205 let (desc, output) = match purified_export.details {
2206 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2207 _ => bail_internal!("purified export details must be load generator"),
2208 };
2209 if let Some(desc) = desc {
2214 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2215 match columns {
2216 TableFromSourceColumns::Defined(_) => bail_internal!(
2219 "column definitions cannot be explicitly set for this source type"
2220 ),
2221 TableFromSourceColumns::NotSpecified => {
2222 *columns = TableFromSourceColumns::Defined(gen_columns);
2223 *constraints = gen_constraints;
2224 }
2225 TableFromSourceColumns::Named(_) => {
2226 sql_bail!("columns cannot be named for multi-output load generator sources")
2227 }
2228 }
2229 }
2230 let details = SourceExportStatementDetails::LoadGenerator { output };
2231 with_options.push(TableFromSourceOption {
2232 name: TableFromSourceOptionName::Details,
2233 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2234 details.into_proto().encode_to_vec(),
2235 )))),
2236 })
2237 }
2238 PurifiedExportDetails::Kafka {} => {
2239 let details = SourceExportStatementDetails::Kafka {};
2243 with_options.push(TableFromSourceOption {
2244 name: TableFromSourceOptionName::Details,
2245 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2246 details.into_proto().encode_to_vec(),
2247 )))),
2248 })
2249 }
2250 };
2251
2252 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2256}
2257
2258enum SourceFormatOptions {
2259 Default,
2260 Kafka { topic: String },
2261}
2262
2263async fn purify_source_format(
2264 catalog: &dyn SessionCatalog,
2265 format: &mut Option<FormatSpecifier<Aug>>,
2266 options: &SourceFormatOptions,
2267 envelope: &Option<SourceEnvelope>,
2268 storage_configuration: &StorageConfiguration,
2269) -> Result<(), PlanError> {
2270 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2271 && !matches!(options, SourceFormatOptions::Kafka { .. })
2272 {
2273 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2274 }
2275
2276 match format.as_mut() {
2277 None => {}
2278 Some(FormatSpecifier::Bare(format)) => {
2279 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2280 .await?;
2281 }
2282
2283 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2284 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2285 .await?;
2286 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2287 .await?;
2288 }
2289 }
2290 Ok(())
2291}
2292
2293async fn purify_source_format_single(
2294 catalog: &dyn SessionCatalog,
2295 format: &mut Format<Aug>,
2296 options: &SourceFormatOptions,
2297 envelope: &Option<SourceEnvelope>,
2298 storage_configuration: &StorageConfiguration,
2299) -> Result<(), PlanError> {
2300 match format {
2301 Format::Avro(schema) => match schema {
2302 AvroSchema::Csr { csr_connection } => {
2303 purify_csr_connection_avro(
2304 catalog,
2305 options,
2306 csr_connection,
2307 envelope,
2308 storage_configuration,
2309 )
2310 .await?
2311 }
2312 AvroSchema::InlineSchema { .. } => {}
2313 },
2314 Format::Protobuf(schema) => match schema {
2315 ProtobufSchema::Csr { csr_connection } => {
2316 purify_csr_connection_proto(
2317 catalog,
2318 options,
2319 csr_connection,
2320 envelope,
2321 storage_configuration,
2322 )
2323 .await?;
2324 }
2325 ProtobufSchema::InlineSchema { .. } => {}
2326 },
2327 Format::Bytes
2328 | Format::Regex(_)
2329 | Format::Json { .. }
2330 | Format::Text
2331 | Format::Csv { .. } => (),
2332 }
2333 Ok(())
2334}
2335
2336pub fn generate_subsource_statements(
2337 scx: &StatementContext,
2338 source_name: ResolvedItemName,
2339 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2340) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2341 if subsources.is_empty() {
2343 return Ok(vec![]);
2344 }
2345 let (_, purified_export) = subsources
2346 .iter()
2347 .next()
2348 .ok_or_else(|| internal_err!("expected at least one subsource"))?;
2349
2350 let statements = match &purified_export.details {
2351 PurifiedExportDetails::Postgres { .. } => {
2352 crate::pure::postgres::generate_create_subsource_statements(
2353 scx,
2354 source_name,
2355 subsources,
2356 )?
2357 }
2358 PurifiedExportDetails::MySql { .. } => {
2359 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2360 }
2361 PurifiedExportDetails::SqlServer { .. } => {
2362 crate::pure::sql_server::generate_create_subsource_statements(
2363 scx,
2364 source_name,
2365 subsources,
2366 )?
2367 }
2368 PurifiedExportDetails::LoadGenerator { .. } => {
2369 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2370 for (subsource_name, purified_export) in subsources {
2371 let (desc, output) = match purified_export.details {
2372 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2373 _ => {
2374 bail_internal!("purified export details must be load generator")
2375 }
2376 };
2377 let desc = desc.ok_or_else(|| {
2378 internal_err!(
2379 "subsources cannot be generated for single-output load generators"
2380 )
2381 })?;
2382
2383 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2384 let details = SourceExportStatementDetails::LoadGenerator { output };
2385 let subsource = CreateSubsourceStatement {
2387 name: subsource_name,
2388 columns,
2389 of_source: Some(source_name.clone()),
2390 constraints: table_constraints,
2395 if_not_exists: false,
2396 with_options: vec![
2397 CreateSubsourceOption {
2398 name: CreateSubsourceOptionName::ExternalReference,
2399 value: Some(WithOptionValue::UnresolvedItemName(
2400 purified_export.external_reference,
2401 )),
2402 },
2403 CreateSubsourceOption {
2404 name: CreateSubsourceOptionName::Details,
2405 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2406 details.into_proto().encode_to_vec(),
2407 )))),
2408 },
2409 ],
2410 };
2411 subsource_stmts.push(subsource);
2412 }
2413
2414 subsource_stmts
2415 }
2416 PurifiedExportDetails::Kafka { .. } => {
2417 if !subsources.is_empty() {
2421 bail_internal!("Kafka sources do not produce data-bearing subsources");
2422 }
2423 vec![]
2424 }
2425 };
2426 Ok(statements)
2427}
2428
2429async fn purify_csr_connection_proto(
2430 catalog: &dyn SessionCatalog,
2431 options: &SourceFormatOptions,
2432 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2433 envelope: &Option<SourceEnvelope>,
2434 storage_configuration: &StorageConfiguration,
2435) -> Result<(), PlanError> {
2436 let SourceFormatOptions::Kafka { topic } = options else {
2437 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2438 };
2439
2440 let CsrConnectionProtobuf {
2441 seed,
2442 connection: CsrConnection {
2443 connection,
2444 options: _,
2445 },
2446 } = csr_connection;
2447 match seed {
2448 None => {
2449 let scx = StatementContext::new(None, &*catalog);
2450
2451 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2452 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2453 _ => sql_bail!("{} is not a schema registry connection", connection),
2454 };
2455
2456 let ccsr_client = ccsr_connection
2457 .connect(storage_configuration, InTask::No)
2458 .await
2459 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2460
2461 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2462 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2463 .await
2464 .ok();
2465
2466 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2467 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2468 }
2469
2470 *seed = Some(CsrSeedProtobuf { value, key });
2471 }
2472 Some(_) => (),
2473 }
2474
2475 Ok(())
2476}
2477
2478async fn purify_csr_connection_avro(
2479 catalog: &dyn SessionCatalog,
2480 options: &SourceFormatOptions,
2481 csr_connection: &mut CsrConnectionAvro<Aug>,
2482 envelope: &Option<SourceEnvelope>,
2483 storage_configuration: &StorageConfiguration,
2484) -> Result<(), PlanError> {
2485 let SourceFormatOptions::Kafka { topic } = options else {
2486 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2487 };
2488
2489 let CsrConnectionAvro {
2490 connection: CsrConnection { connection, .. },
2491 seed,
2492 key_strategy,
2493 value_strategy,
2494 } = csr_connection;
2495 if seed.is_none() {
2496 let scx = StatementContext::new(None, &*catalog);
2497 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2498 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2499 _ => sql_bail!("{} is not a schema registry connection", connection),
2500 };
2501 let ccsr_client = csr_connection
2502 .connect(storage_configuration, InTask::No)
2503 .await
2504 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2505
2506 let Schema {
2507 key_schema,
2508 value_schema,
2509 key_reference_schemas,
2510 value_reference_schemas,
2511 } = get_remote_csr_schema(
2512 &ccsr_client,
2513 key_strategy.clone().unwrap_or_default(),
2514 value_strategy.clone().unwrap_or_default(),
2515 topic,
2516 )
2517 .await?;
2518 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2519 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2520 }
2521
2522 *seed = Some(CsrSeedAvro {
2523 key_schema,
2524 value_schema,
2525 key_reference_schemas,
2526 value_reference_schemas,
2527 })
2528 }
2529
2530 Ok(())
2531}
2532
2533#[derive(Debug)]
2534pub struct Schema {
2535 pub key_schema: Option<String>,
2536 pub value_schema: String,
2537 pub key_reference_schemas: Vec<String>,
2539 pub value_reference_schemas: Vec<String>,
2541}
2542
2543struct SchemaWithReferences {
2545 schema: String,
2547 references: Vec<String>,
2549}
2550
2551async fn get_schema_with_strategy(
2552 client: &Client,
2553 strategy: ReaderSchemaSelectionStrategy,
2554 subject: &str,
2555) -> Result<Option<SchemaWithReferences>, PlanError> {
2556 match strategy {
2557 ReaderSchemaSelectionStrategy::Latest => {
2558 match client.get_subject_and_references(subject).await {
2560 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2561 schema: primary.schema.raw,
2562 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2563 })),
2564 Err(GetBySubjectError::SubjectNotFound)
2565 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2566 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2567 schema_lookup: format!("subject {}", subject.quoted()),
2568 cause: Arc::new(e),
2569 }),
2570 }
2571 }
2572 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(SchemaWithReferences {
2576 schema: raw,
2577 references: vec![],
2578 })),
2579 ReaderSchemaSelectionStrategy::ById(id) => {
2580 match client.get_subject_and_references_by_id(id).await {
2581 Ok((primary, dependencies)) => Ok(Some(SchemaWithReferences {
2582 schema: primary.schema.raw,
2583 references: dependencies.into_iter().map(|s| s.schema.raw).collect(),
2584 })),
2585 Err(GetBySubjectError::SubjectNotFound)
2586 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2587 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2588 schema_lookup: format!("subject {}", subject.quoted()),
2589 cause: Arc::new(e),
2590 }),
2591 }
2592 }
2593 }
2594}
2595
2596async fn get_remote_csr_schema(
2597 ccsr_client: &mz_ccsr::Client,
2598 key_strategy: ReaderSchemaSelectionStrategy,
2599 value_strategy: ReaderSchemaSelectionStrategy,
2600 topic: &str,
2601) -> Result<Schema, PlanError> {
2602 let value_schema_name = format!("{}-value", topic);
2603 let value_result =
2604 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2605 let value_result = value_result.ok_or_else(|| anyhow!("No value schema found"))?;
2606
2607 let key_subject = format!("{}-key", topic);
2608 let key_result = get_schema_with_strategy(ccsr_client, key_strategy, &key_subject).await?;
2609 Ok(Schema {
2610 key_schema: key_result.as_ref().map(|r| r.schema.clone()),
2611 value_schema: value_result.schema,
2612 key_reference_schemas: key_result.map(|r| r.references).unwrap_or_default(),
2613 value_reference_schemas: value_result.references,
2614 })
2615}
2616
2617async fn compile_proto(
2619 subject_name: &String,
2620 ccsr_client: &Client,
2621) -> Result<CsrSeedProtobufSchema, PlanError> {
2622 let (primary_subject, dependency_subjects) = ccsr_client
2623 .get_subject_and_references(subject_name)
2624 .await
2625 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2626 schema_lookup: format!("subject {}", subject_name.quoted()),
2627 cause: Arc::new(e),
2628 })?;
2629
2630 let mut source_tree = VirtualSourceTree::new();
2632
2633 source_tree.as_mut().map_well_known_types();
2637
2638 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2639 source_tree.as_mut().add_file(
2640 Path::new(&subject.name),
2641 subject.schema.raw.as_bytes().to_vec(),
2642 );
2643 }
2644 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2645 let fds = db
2646 .as_mut()
2647 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2648 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2649
2650 let primary_fd = fds.file(0);
2652 let message_name = match primary_fd.message_type_size() {
2653 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2654 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2655 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2656 };
2657
2658 let bytes = &fds
2660 .serialize()
2661 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2662 let mut schema = String::new();
2663 strconv::format_bytes(&mut schema, bytes);
2664
2665 Ok(CsrSeedProtobufSchema {
2666 schema,
2667 message_name,
2668 })
2669}
2670
2671const MZ_NOW_NAME: &str = "mz_now";
2672const MZ_NOW_SCHEMA: &str = "mz_catalog";
2673
2674pub fn purify_create_materialized_view_options(
2680 catalog: impl SessionCatalog,
2681 mz_now: Option<Timestamp>,
2682 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2683 resolved_ids: &mut ResolvedIds,
2684) {
2685 let (mz_now_id, mz_now_expr) = {
2688 let item = catalog
2689 .resolve_function(&PartialItemName {
2690 database: None,
2691 schema: Some(MZ_NOW_SCHEMA.to_string()),
2692 item: MZ_NOW_NAME.to_string(),
2693 })
2694 .expect("we should be able to resolve mz_now");
2695 (
2696 item.id(),
2697 Expr::Function(Function {
2698 name: ResolvedItemName::Item {
2699 id: item.id(),
2700 qualifiers: item.name().qualifiers.clone(),
2701 full_name: catalog.resolve_full_name(item.name()),
2702 print_id: false,
2703 version: RelationVersionSelector::Latest,
2704 },
2705 args: FunctionArgs::Args {
2706 args: Vec::new(),
2707 order_by: Vec::new(),
2708 },
2709 filter: None,
2710 over: None,
2711 distinct: false,
2712 }),
2713 )
2714 };
2715 let (mz_timestamp_id, mz_timestamp_type) = {
2717 let item = catalog.get_system_type("mz_timestamp");
2718 let full_name = catalog.resolve_full_name(item.name());
2719 (
2720 item.id(),
2721 ResolvedDataType::Named {
2722 id: item.id(),
2723 qualifiers: item.name().qualifiers.clone(),
2724 full_name,
2725 modifiers: vec![],
2726 print_id: true,
2727 },
2728 )
2729 };
2730
2731 let mut introduced_mz_timestamp = false;
2732
2733 for option in cmvs.with_options.iter_mut() {
2734 if matches!(
2736 option.value,
2737 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2738 ) {
2739 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2740 RefreshAtOptionValue {
2741 time: mz_now_expr.clone(),
2742 },
2743 )));
2744 }
2745
2746 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2748 RefreshEveryOptionValue { aligned_to, .. },
2749 ))) = &mut option.value
2750 {
2751 if aligned_to.is_none() {
2752 *aligned_to = Some(mz_now_expr.clone());
2753 }
2754 }
2755
2756 match &mut option.value {
2759 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2760 time,
2761 }))) => {
2762 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2763 visitor.visit_expr_mut(time);
2764 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2765 }
2766 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2767 RefreshEveryOptionValue {
2768 interval: _,
2769 aligned_to: Some(aligned_to),
2770 },
2771 ))) => {
2772 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2773 visitor.visit_expr_mut(aligned_to);
2774 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2775 }
2776 _ => {}
2777 }
2778 }
2779
2780 if !cmvs.with_options.iter().any(|o| {
2782 matches!(
2783 o,
2784 MaterializedViewOption {
2785 value: Some(WithOptionValue::Refresh(..)),
2786 ..
2787 }
2788 )
2789 }) {
2790 cmvs.with_options.push(MaterializedViewOption {
2791 name: MaterializedViewOptionName::Refresh,
2792 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2793 })
2794 }
2795
2796 if introduced_mz_timestamp {
2800 resolved_ids.add_item(mz_timestamp_id);
2801 }
2802 let mut visitor = ExprContainsTemporalVisitor::new();
2806 visitor.visit_create_materialized_view_statement(cmvs);
2807 if !visitor.contains_temporal {
2808 resolved_ids.remove_item(&mz_now_id);
2809 }
2810}
2811
2812pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2815 match &mvo.value {
2816 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2817 let mut visitor = ExprContainsTemporalVisitor::new();
2818 visitor.visit_expr(time);
2819 visitor.contains_temporal
2820 }
2821 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2822 interval: _,
2823 aligned_to: Some(aligned_to),
2824 }))) => {
2825 let mut visitor = ExprContainsTemporalVisitor::new();
2826 visitor.visit_expr(aligned_to);
2827 visitor.contains_temporal
2828 }
2829 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2830 interval: _,
2831 aligned_to: None,
2832 }))) => {
2833 true
2836 }
2837 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2838 true
2840 }
2841 _ => false,
2842 }
2843}
2844
2845struct ExprContainsTemporalVisitor {
2847 pub contains_temporal: bool,
2848}
2849
2850impl ExprContainsTemporalVisitor {
2851 pub fn new() -> ExprContainsTemporalVisitor {
2852 ExprContainsTemporalVisitor {
2853 contains_temporal: false,
2854 }
2855 }
2856}
2857
2858impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2859 fn visit_function(&mut self, func: &Function<Aug>) {
2860 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2861 visit_function(self, func);
2862 }
2863}
2864
2865struct MzNowPurifierVisitor {
2866 pub mz_now: Option<Timestamp>,
2867 pub mz_timestamp_type: ResolvedDataType,
2868 pub introduced_mz_timestamp: bool,
2869}
2870
2871impl MzNowPurifierVisitor {
2872 pub fn new(
2873 mz_now: Option<Timestamp>,
2874 mz_timestamp_type: ResolvedDataType,
2875 ) -> MzNowPurifierVisitor {
2876 MzNowPurifierVisitor {
2877 mz_now,
2878 mz_timestamp_type,
2879 introduced_mz_timestamp: false,
2880 }
2881 }
2882}
2883
2884impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2885 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2886 match expr {
2887 Expr::Function(Function {
2888 name:
2889 ResolvedItemName::Item {
2890 full_name: FullItemName { item, .. },
2891 ..
2892 },
2893 ..
2894 }) if item == &MZ_NOW_NAME.to_string() => {
2895 let mz_now = self.mz_now.expect(
2896 "we should have chosen a timestamp if the expression contains mz_now()",
2897 );
2898 *expr = Expr::Cast {
2901 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2902 data_type: self.mz_timestamp_type.clone(),
2903 };
2904 self.introduced_mz_timestamp = true;
2905 }
2906 _ => visit_expr_mut(self, expr),
2907 }
2908 }
2909}