1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_adapter_types::dyncfgs::ENABLE_S3_TABLES_REGION_CHECK;
23use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
24use mz_cloud_provider::CloudProvider;
25use mz_controller_types::ClusterId;
26use mz_kafka_util::client::MzClientContext;
27use mz_mysql_util::MySqlTableDesc;
28use mz_ore::collections::CollectionExt;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_ore::str::StrExt;
33use mz_ore::{assert_none, soft_panic_or_log};
34use mz_postgres_util::desc::PostgresTableDesc;
35use mz_proto::RustType;
36use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
37use mz_sql_parser::ast::display::AstDisplay;
38use mz_sql_parser::ast::visit::{Visit, visit_function};
39use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
40use mz_sql_parser::ast::{
41 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
42 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
43 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
44 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
45 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
46 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
47 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
48 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
49 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
50 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
51 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
52 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
53};
54use mz_sql_server_util::desc::SqlServerTableDesc;
55use mz_storage_types::configuration::StorageConfiguration;
56use mz_storage_types::connections::Connection;
57use mz_storage_types::connections::inline::IntoInlineConnection;
58use mz_storage_types::errors::ContextCreationError;
59use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
60use mz_storage_types::sources::mysql::MySqlSourceDetails;
61use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
62use mz_storage_types::sources::{
63 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81 ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103 external_reference: UnresolvedItemName,
104 name: UnresolvedItemName,
105 meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 f.debug_struct("RequestedSourceExport")
111 .field("external_reference", &self.external_reference)
112 .field("name", &self.name)
113 .field("meta", &self.meta)
114 .finish()
115 }
116}
117
118impl<T> RequestedSourceExport<T> {
119 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120 RequestedSourceExport {
121 external_reference: self.external_reference,
122 name: self.name,
123 meta: new_meta,
124 }
125 }
126}
127
128fn source_export_name_gen(
133 source_name: &UnresolvedItemName,
134 subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137 partial.item = subsource_name.to_string();
138 Ok(UnresolvedItemName::from(partial))
139}
140
141fn validate_source_export_names<T>(
145 requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147 if let Some(name) = requested_source_exports
151 .iter()
152 .map(|subsource| &subsource.name)
153 .duplicates()
154 .next()
155 .cloned()
156 {
157 let mut upstream_references: Vec<_> = requested_source_exports
158 .into_iter()
159 .filter_map(|subsource| {
160 if &subsource.name == &name {
161 Some(subsource.external_reference.clone())
162 } else {
163 None
164 }
165 })
166 .collect();
167
168 upstream_references.sort();
169
170 Err(PlanError::SubsourceNameConflict {
171 name,
172 upstream_references,
173 })?;
174 }
175
176 if let Some(name) = requested_source_exports
182 .iter()
183 .map(|export| &export.external_reference)
184 .duplicates()
185 .next()
186 .cloned()
187 {
188 let mut target_names: Vec<_> = requested_source_exports
189 .into_iter()
190 .filter_map(|export| {
191 if &export.external_reference == &name {
192 Some(export.name.clone())
193 } else {
194 None
195 }
196 })
197 .collect();
198
199 target_names.sort();
200
201 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202 }
203
204 Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209 PurifiedCreateSource {
210 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
212 create_source_stmt: CreateSourceStatement<Aug>,
213 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
215 available_source_references: SourceReferences,
218 },
219 PurifiedAlterSource {
220 alter_source_stmt: AlterSourceStatement<Aug>,
221 },
222 PurifiedAlterSourceAddSubsources {
223 source_name: ResolvedItemName,
225 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
228 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
230 },
231 PurifiedAlterSourceRefreshReferences {
232 source_name: ResolvedItemName,
233 available_source_references: SourceReferences,
235 },
236 PurifiedCreateSink(CreateSinkStatement<Aug>),
237 PurifiedCreateTableFromSource {
238 stmt: CreateTableFromSourceStatement<Aug>,
239 },
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct PurifiedSourceExport {
244 pub external_reference: UnresolvedItemName,
245 pub details: PurifiedExportDetails,
246}
247
248#[derive(Debug, Clone, PartialEq, Eq)]
249pub enum PurifiedExportDetails {
250 MySql {
251 table: MySqlTableDesc,
252 text_columns: Option<Vec<Ident>>,
253 exclude_columns: Option<Vec<Ident>>,
254 initial_gtid_set: String,
255 },
256 Postgres {
257 table: PostgresTableDesc,
258 text_columns: Option<Vec<Ident>>,
259 exclude_columns: Option<Vec<Ident>>,
260 },
261 SqlServer {
262 table: SqlServerTableDesc,
263 text_columns: Option<Vec<Ident>>,
264 excl_columns: Option<Vec<Ident>>,
265 capture_instance: Arc<str>,
266 initial_lsn: mz_sql_server_util::cdc::Lsn,
267 },
268 Kafka {},
269 LoadGenerator {
270 table: Option<RelationDesc>,
271 output: LoadGeneratorOutput,
272 },
273}
274
275pub async fn purify_statement(
285 catalog: impl SessionCatalog,
286 now: u64,
287 stmt: Statement<Aug>,
288 storage_configuration: &StorageConfiguration,
289) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
290 match stmt {
291 Statement::CreateSource(stmt) => {
292 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
293 (
294 purify_create_source(catalog, now, stmt, storage_configuration).await,
295 cluster_id,
296 )
297 }
298 Statement::AlterSource(stmt) => (
299 purify_alter_source(catalog, stmt, storage_configuration).await,
300 None,
301 ),
302 Statement::CreateSink(stmt) => {
303 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
304 (
305 purify_create_sink(catalog, stmt, storage_configuration).await,
306 cluster_id,
307 )
308 }
309 Statement::CreateTableFromSource(stmt) => (
310 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
311 None,
312 ),
313 o => unreachable!("{:?} does not need to be purified", o),
314 }
315}
316
317pub(crate) fn purify_create_sink_avro_doc_on_options(
322 catalog: &dyn SessionCatalog,
323 from_id: CatalogItemId,
324 format: &mut Option<FormatSpecifier<Aug>>,
325) -> Result<(), PlanError> {
326 let from = catalog.get_item(&from_id);
328 let object_ids = from
329 .references()
330 .items()
331 .copied()
332 .chain_one(from.id())
333 .collect::<Vec<_>>();
334
335 let mut avro_format_options = vec![];
338 for_each_format(format, |doc_on_schema, fmt| match fmt {
339 Format::Avro(AvroSchema::InlineSchema { .. })
340 | Format::Bytes
341 | Format::Csv { .. }
342 | Format::Json { .. }
343 | Format::Protobuf(..)
344 | Format::Regex(..)
345 | Format::Text => (),
346 Format::Avro(AvroSchema::Csr {
347 csr_connection: CsrConnectionAvro { connection, .. },
348 }) => {
349 avro_format_options.push((doc_on_schema, &mut connection.options));
350 }
351 });
352
353 for (for_schema, options) in avro_format_options {
356 let user_provided_comments = options
357 .iter()
358 .filter_map(|CsrConfigOption { name, .. }| match name {
359 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
360 _ => None,
361 })
362 .collect::<BTreeSet<_>>();
363
364 for object_id in &object_ids {
366 let item = catalog
368 .get_item(object_id)
369 .at_version(RelationVersionSelector::Latest);
370 let full_resolved_name = ResolvedItemName::Item {
371 id: *object_id,
372 qualifiers: item.name().qualifiers.clone(),
373 full_name: catalog.resolve_full_name(item.name()),
374 print_id: !matches!(
375 item.item_type(),
376 CatalogItemType::Func | CatalogItemType::Type
377 ),
378 version: RelationVersionSelector::Latest,
379 };
380
381 if let Some(comments_map) = catalog.get_item_comments(object_id) {
382 let doc_on_item_key = AvroDocOn {
385 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
386 for_schema,
387 };
388 if !user_provided_comments.contains(&doc_on_item_key) {
389 if let Some(root_comment) = comments_map.get(&None) {
390 options.push(CsrConfigOption {
391 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
392 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
393 root_comment.clone(),
394 ))),
395 });
396 }
397 }
398
399 let column_descs = match item.type_details() {
403 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
404 None => item.relation_desc().map(|d| d.into_owned()),
405 };
406
407 if let Some(desc) = column_descs {
408 for (pos, column_name) in desc.iter_names().enumerate() {
409 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
410 let doc_on_column_key = AvroDocOn {
411 identifier: DocOnIdentifier::Column(ColumnName {
412 relation: full_resolved_name.clone(),
413 column: ResolvedColumnReference::Column {
414 name: column_name.to_owned(),
415 index: pos,
416 },
417 }),
418 for_schema,
419 };
420 if !user_provided_comments.contains(&doc_on_column_key) {
421 options.push(CsrConfigOption {
422 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
423 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
424 Value::String(comment_str.clone()),
425 )),
426 });
427 }
428 }
429 }
430 }
431 }
432 }
433 }
434
435 Ok(())
436}
437
438async fn purify_create_sink(
441 catalog: impl SessionCatalog,
442 mut create_sink_stmt: CreateSinkStatement<Aug>,
443 storage_configuration: &StorageConfiguration,
444) -> Result<PurifiedStatement, PlanError> {
445 let CreateSinkStatement {
447 connection,
448 format,
449 with_options,
450 name: _,
451 in_cluster: _,
452 if_not_exists: _,
453 from,
454 envelope: _,
455 mode: _,
456 } = &mut create_sink_stmt;
457
458 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
460 CreateSinkOptionName::Snapshot,
461 CreateSinkOptionName::CommitInterval,
462 ];
463
464 if let Some(op) = with_options
465 .iter()
466 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
467 {
468 sql_bail!(
469 "CREATE SINK...WITH ({}..) is not allowed",
470 op.name.to_ast_string_simple(),
471 )
472 }
473
474 match &connection {
475 CreateSinkConnection::Kafka {
476 connection,
477 options,
478 key: _,
479 headers: _,
480 } => {
481 let scx = StatementContext::new(None, &catalog);
486 let connection = {
487 let item = scx.get_item_by_resolved_name(connection)?;
488 match item.connection()? {
490 Connection::Kafka(connection) => {
491 connection.clone().into_inline_connection(scx.catalog)
492 }
493 _ => sql_bail!(
494 "{} is not a kafka connection",
495 scx.catalog.resolve_full_name(item.name())
496 ),
497 }
498 };
499
500 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
501
502 if extracted_options.legacy_ids == Some(true) {
503 sql_bail!("LEGACY IDs option is not supported");
504 }
505
506 let client: AdminClient<_> = connection
507 .create_with_context(
508 storage_configuration,
509 MzClientContext::default(),
510 &BTreeMap::new(),
511 InTask::No,
512 )
513 .await
514 .map_err(|e| {
515 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
517 })?;
518
519 let metadata = client
520 .inner()
521 .fetch_metadata(
522 None,
523 storage_configuration
524 .parameters
525 .kafka_timeout_config
526 .fetch_metadata_timeout,
527 )
528 .map_err(|e| {
529 KafkaSinkPurificationError::AdminClientError(Arc::new(
530 ContextCreationError::KafkaError(e),
531 ))
532 })?;
533
534 if metadata.brokers().len() == 0 {
535 Err(KafkaSinkPurificationError::ZeroBrokers)?;
536 }
537 }
538 CreateSinkConnection::Iceberg {
539 connection,
540 aws_connection,
541 ..
542 } => {
543 let scx = StatementContext::new(None, &catalog);
544 let connection = {
545 let item = scx.get_item_by_resolved_name(connection)?;
546 match item.connection()? {
548 Connection::IcebergCatalog(connection) => {
549 connection.clone().into_inline_connection(scx.catalog)
550 }
551 _ => sql_bail!(
552 "{} is not an iceberg connection",
553 scx.catalog.resolve_full_name(item.name())
554 ),
555 }
556 };
557
558 let aws_conn_id = aws_connection.item_id();
559
560 let aws_connection = {
561 let item = scx.get_item_by_resolved_name(aws_connection)?;
562 match item.connection()? {
564 Connection::Aws(aws_connection) => aws_connection.clone(),
565 _ => sql_bail!(
566 "{} is not an aws connection",
567 scx.catalog.resolve_full_name(item.name())
568 ),
569 }
570 };
571
572 if let Some(s3tables) = connection.s3tables_catalog() {
576 let enable_region_check =
577 ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
578 if enable_region_check {
579 let env_id = &catalog.config().environment_id;
580 if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
581 let env_region = env_id.cloud_provider_region();
582 let s3_tables_region = s3tables
585 .aws_connection
586 .connection
587 .region
588 .clone()
589 .unwrap_or_else(|| "us-east-1".to_string());
590 if s3_tables_region != env_region {
591 Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
592 s3_tables_region,
593 environment_region: env_region.to_string(),
594 })?;
595 }
596 }
597 }
598 }
599
600 let _catalog = connection
601 .connect(storage_configuration, InTask::No)
602 .await
603 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
604
605 let _sdk_config = aws_connection
606 .load_sdk_config(
607 &storage_configuration.connection_context,
608 aws_conn_id.clone(),
609 InTask::No,
610 )
611 .await
612 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
613 }
614 }
615
616 let mut csr_connection_ids = BTreeSet::new();
617 for_each_format(format, |_, fmt| match fmt {
618 Format::Avro(AvroSchema::InlineSchema { .. })
619 | Format::Bytes
620 | Format::Csv { .. }
621 | Format::Json { .. }
622 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
623 | Format::Regex(..)
624 | Format::Text => (),
625 Format::Avro(AvroSchema::Csr {
626 csr_connection: CsrConnectionAvro { connection, .. },
627 })
628 | Format::Protobuf(ProtobufSchema::Csr {
629 csr_connection: CsrConnectionProtobuf { connection, .. },
630 }) => {
631 csr_connection_ids.insert(*connection.connection.item_id());
632 }
633 });
634
635 let scx = StatementContext::new(None, &catalog);
636 for csr_connection_id in csr_connection_ids {
637 let connection = {
638 let item = scx.get_item(&csr_connection_id);
639 match item.connection()? {
641 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
642 _ => Err(CsrPurificationError::NotCsrConnection(
643 scx.catalog.resolve_full_name(item.name()),
644 ))?,
645 }
646 };
647
648 let client = connection
649 .connect(storage_configuration, InTask::No)
650 .await
651 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
652
653 client
654 .list_subjects()
655 .await
656 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
657 }
658
659 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
660
661 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
662}
663
664fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
671where
672 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
673{
674 match format {
675 None => (),
676 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
677 Some(FormatSpecifier::KeyValue { key, value }) => {
678 f(DocOnSchema::KeyOnly, key);
679 f(DocOnSchema::ValueOnly, value);
680 }
681 }
682}
683
684#[derive(Debug, Copy, Clone, PartialEq, Eq)]
687pub(crate) enum SourceReferencePolicy {
688 NotAllowed,
691 Optional,
694 Required,
697}
698
699async fn purify_create_source(
700 catalog: impl SessionCatalog,
701 now: u64,
702 mut create_source_stmt: CreateSourceStatement<Aug>,
703 storage_configuration: &StorageConfiguration,
704) -> Result<PurifiedStatement, PlanError> {
705 let CreateSourceStatement {
706 name: source_name,
707 col_names,
708 key_constraint,
709 connection: source_connection,
710 format,
711 envelope,
712 include_metadata,
713 external_references,
714 progress_subsource,
715 with_options,
716 ..
717 } = &mut create_source_stmt;
718
719 let uses_old_syntax = !col_names.is_empty()
720 || key_constraint.is_some()
721 || format.is_some()
722 || envelope.is_some()
723 || !include_metadata.is_empty()
724 || external_references.is_some()
725 || progress_subsource.is_some();
726
727 if let Some(DeferredItemName::Named(_)) = progress_subsource {
728 sql_bail!("Cannot manually ID qualify progress subsource")
729 }
730
731 let mut requested_subsource_map = BTreeMap::new();
732
733 let progress_desc = match &source_connection {
734 CreateSourceConnection::Kafka { .. } => {
735 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
736 }
737 CreateSourceConnection::Postgres { .. } => {
738 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
739 }
740 CreateSourceConnection::SqlServer { .. } => {
741 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
742 }
743 CreateSourceConnection::MySql { .. } => {
744 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
745 }
746 CreateSourceConnection::LoadGenerator { .. } => {
747 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
748 }
749 };
750 let scx = StatementContext::new(None, &catalog);
751
752 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
756 && scx.catalog.system_vars().force_source_table_syntax()
757 {
758 SourceReferencePolicy::NotAllowed
759 } else if scx.catalog.system_vars().enable_create_table_from_source() {
760 SourceReferencePolicy::Optional
761 } else {
762 SourceReferencePolicy::Required
763 };
764
765 let mut format_options = SourceFormatOptions::Default;
766
767 let retrieved_source_references: RetrievedSourceReferences;
768
769 match source_connection {
770 CreateSourceConnection::Kafka {
771 connection,
772 options: base_with_options,
773 ..
774 } => {
775 if let Some(external_references) = external_references {
776 Err(KafkaSourcePurificationError::ReferencedSubsources(
777 external_references.clone(),
778 ))?;
779 }
780
781 let connection = {
782 let item = scx.get_item_by_resolved_name(connection)?;
783 match item.connection()? {
785 Connection::Kafka(connection) => {
786 connection.clone().into_inline_connection(&catalog)
787 }
788 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
789 scx.catalog.resolve_full_name(item.name()),
790 ))?,
791 }
792 };
793
794 let extracted_options: KafkaSourceConfigOptionExtracted =
795 base_with_options.clone().try_into()?;
796
797 let topic = extracted_options
798 .topic
799 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
800
801 let consumer = connection
802 .create_with_context(
803 storage_configuration,
804 MzClientContext::default(),
805 &BTreeMap::new(),
806 InTask::No,
807 )
808 .await
809 .map_err(|e| {
810 KafkaSourcePurificationError::KafkaConsumerError(
812 e.display_with_causes().to_string(),
813 )
814 })?;
815 let consumer = Arc::new(consumer);
816
817 match (
818 extracted_options.start_offset,
819 extracted_options.start_timestamp,
820 ) {
821 (None, None) => {
822 kafka_util::ensure_topic_exists(
824 Arc::clone(&consumer),
825 &topic,
826 storage_configuration
827 .parameters
828 .kafka_timeout_config
829 .fetch_metadata_timeout,
830 )
831 .await?;
832 }
833 (Some(_), Some(_)) => {
834 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
835 }
836 (Some(start_offsets), None) => {
837 kafka_util::validate_start_offsets(
839 Arc::clone(&consumer),
840 &topic,
841 start_offsets,
842 storage_configuration
843 .parameters
844 .kafka_timeout_config
845 .fetch_metadata_timeout,
846 )
847 .await?;
848 }
849 (None, Some(time_offset)) => {
850 let start_offsets = kafka_util::lookup_start_offsets(
852 Arc::clone(&consumer),
853 &topic,
854 time_offset,
855 now,
856 storage_configuration
857 .parameters
858 .kafka_timeout_config
859 .fetch_metadata_timeout,
860 )
861 .await?;
862
863 base_with_options.retain(|val| {
864 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
865 });
866 base_with_options.push(KafkaSourceConfigOption {
867 name: KafkaSourceConfigOptionName::StartOffset,
868 value: Some(WithOptionValue::Sequence(
869 start_offsets
870 .iter()
871 .map(|offset| {
872 WithOptionValue::Value(Value::Number(offset.to_string()))
873 })
874 .collect(),
875 )),
876 });
877 }
878 }
879
880 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
881 retrieved_source_references = reference_client.get_source_references().await?;
882
883 format_options = SourceFormatOptions::Kafka { topic };
884 }
885 CreateSourceConnection::Postgres {
886 connection,
887 options,
888 } => {
889 let connection_item = scx.get_item_by_resolved_name(connection)?;
890 let connection = match connection_item.connection().map_err(PlanError::from)? {
891 Connection::Postgres(connection) => {
892 connection.clone().into_inline_connection(&catalog)
893 }
894 _ => Err(PgSourcePurificationError::NotPgConnection(
895 scx.catalog.resolve_full_name(connection_item.name()),
896 ))?,
897 };
898 let crate::plan::statement::PgConfigOptionExtracted {
899 publication,
900 text_columns,
901 exclude_columns,
902 details,
903 ..
904 } = options.clone().try_into()?;
905 let publication =
906 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
907
908 if details.is_some() {
909 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
910 }
911
912 let client = connection
913 .validate(connection_item.id(), storage_configuration)
914 .await?;
915
916 let reference_client = SourceReferenceClient::Postgres {
917 client: &client,
918 publication: &publication,
919 database: &connection.database,
920 };
921 retrieved_source_references = reference_client.get_source_references().await?;
922
923 let postgres::PurifiedSourceExports {
924 source_exports: subsources,
925 normalized_text_columns,
926 } = postgres::purify_source_exports(
927 &client,
928 &retrieved_source_references,
929 external_references,
930 text_columns,
931 exclude_columns,
932 source_name,
933 &reference_policy,
934 )
935 .await?;
936
937 if let Some(text_cols_option) = options
938 .iter_mut()
939 .find(|option| option.name == PgConfigOptionName::TextColumns)
940 {
941 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
942 }
943
944 requested_subsource_map.extend(subsources);
945
946 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
949
950 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
952 let details = PostgresSourcePublicationDetails {
953 slot: format!(
954 "materialize_{}",
955 Uuid::new_v4().to_string().replace('-', "")
956 ),
957 timeline_id: Some(timeline_id),
958 database: connection.database,
959 };
960 options.push(PgConfigOption {
961 name: PgConfigOptionName::Details,
962 value: Some(WithOptionValue::Value(Value::String(hex::encode(
963 details.into_proto().encode_to_vec(),
964 )))),
965 })
966 }
967 CreateSourceConnection::SqlServer {
968 connection,
969 options,
970 } => {
971 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
972
973 let connection_item = scx.get_item_by_resolved_name(connection)?;
976 let connection = match connection_item.connection()? {
977 Connection::SqlServer(connection) => {
978 connection.clone().into_inline_connection(&catalog)
979 }
980 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
981 scx.catalog.resolve_full_name(connection_item.name()),
982 ))?,
983 };
984 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
985 details,
986 text_columns,
987 exclude_columns,
988 seen: _,
989 } = options.clone().try_into()?;
990
991 if details.is_some() {
992 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
993 }
994
995 let mut client = connection
996 .validate(connection_item.id(), storage_configuration)
997 .await?;
998
999 let database: Arc<str> = connection.database.into();
1000 let reference_client = SourceReferenceClient::SqlServer {
1001 client: &mut client,
1002 database: Arc::clone(&database),
1003 };
1004 retrieved_source_references = reference_client.get_source_references().await?;
1005 tracing::debug!(?retrieved_source_references, "got source references");
1006
1007 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1008 .get(storage_configuration.config_set());
1009
1010 let purified_source_exports = sql_server::purify_source_exports(
1011 &*database,
1012 &mut client,
1013 &retrieved_source_references,
1014 external_references,
1015 &text_columns,
1016 &exclude_columns,
1017 source_name,
1018 timeout,
1019 &reference_policy,
1020 )
1021 .await?;
1022
1023 let sql_server::PurifiedSourceExports {
1024 source_exports,
1025 normalized_text_columns,
1026 normalized_excl_columns,
1027 } = purified_source_exports;
1028
1029 requested_subsource_map.extend(source_exports);
1031
1032 let restore_history_id =
1036 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1037 let details = SqlServerSourceExtras { restore_history_id };
1038
1039 options.retain(|SqlServerConfigOption { name, .. }| {
1040 name != &SqlServerConfigOptionName::Details
1041 });
1042 options.push(SqlServerConfigOption {
1043 name: SqlServerConfigOptionName::Details,
1044 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1045 details.into_proto().encode_to_vec(),
1046 )))),
1047 });
1048
1049 if let Some(text_cols_option) = options
1051 .iter_mut()
1052 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1053 {
1054 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1055 }
1056 if let Some(excl_cols_option) = options
1057 .iter_mut()
1058 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1059 {
1060 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1061 }
1062 }
1063 CreateSourceConnection::MySql {
1064 connection,
1065 options,
1066 } => {
1067 let connection_item = scx.get_item_by_resolved_name(connection)?;
1068 let connection = match connection_item.connection()? {
1069 Connection::MySql(connection) => {
1070 connection.clone().into_inline_connection(&catalog)
1071 }
1072 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1073 scx.catalog.resolve_full_name(connection_item.name()),
1074 ))?,
1075 };
1076 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1077 details,
1078 text_columns,
1079 exclude_columns,
1080 seen: _,
1081 } = options.clone().try_into()?;
1082
1083 if details.is_some() {
1084 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1085 }
1086
1087 let mut conn = connection
1088 .validate(connection_item.id(), storage_configuration)
1089 .await
1090 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1091
1092 let initial_gtid_set =
1096 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1097
1098 let reference_client = SourceReferenceClient::MySql {
1099 conn: &mut conn,
1100 include_system_schemas: mysql::references_system_schemas(external_references),
1101 };
1102 retrieved_source_references = reference_client.get_source_references().await?;
1103
1104 let mysql::PurifiedSourceExports {
1105 source_exports: subsources,
1106 normalized_text_columns,
1107 normalized_exclude_columns,
1108 } = mysql::purify_source_exports(
1109 &mut conn,
1110 &retrieved_source_references,
1111 external_references,
1112 text_columns,
1113 exclude_columns,
1114 source_name,
1115 initial_gtid_set.clone(),
1116 &reference_policy,
1117 )
1118 .await?;
1119 requested_subsource_map.extend(subsources);
1120
1121 let details = MySqlSourceDetails {};
1124 options
1126 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1127 options.push(MySqlConfigOption {
1128 name: MySqlConfigOptionName::Details,
1129 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1130 details.into_proto().encode_to_vec(),
1131 )))),
1132 });
1133
1134 if let Some(text_cols_option) = options
1135 .iter_mut()
1136 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1137 {
1138 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1139 }
1140 if let Some(exclude_cols_option) = options
1141 .iter_mut()
1142 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1143 {
1144 exclude_cols_option.value =
1145 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1146 }
1147 }
1148 CreateSourceConnection::LoadGenerator { generator, options } => {
1149 let load_generator =
1150 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1151
1152 let reference_client = SourceReferenceClient::LoadGenerator {
1153 generator: &load_generator,
1154 };
1155 retrieved_source_references = reference_client.get_source_references().await?;
1156 let multi_output_sources =
1160 retrieved_source_references
1161 .all_references()
1162 .iter()
1163 .any(|r| {
1164 r.load_generator_output().expect("is loadgen")
1165 != &LoadGeneratorOutput::Default
1166 });
1167
1168 match external_references {
1169 Some(requested)
1170 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1171 {
1172 Err(PlanError::UseTablesForSources(requested.to_string()))?
1173 }
1174 Some(requested) if !multi_output_sources => match requested {
1175 ExternalReferences::SubsetTables(_) => {
1176 Err(LoadGeneratorSourcePurificationError::ForTables)?
1177 }
1178 ExternalReferences::SubsetSchemas(_) => {
1179 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1180 }
1181 ExternalReferences::All => {
1182 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1183 }
1184 },
1185 Some(requested) => {
1186 let requested_exports = retrieved_source_references
1187 .requested_source_exports(Some(requested), source_name)?;
1188 for export in requested_exports {
1189 requested_subsource_map.insert(
1190 export.name,
1191 PurifiedSourceExport {
1192 external_reference: export.external_reference,
1193 details: PurifiedExportDetails::LoadGenerator {
1194 table: export
1195 .meta
1196 .load_generator_desc()
1197 .expect("is loadgen")
1198 .clone(),
1199 output: export
1200 .meta
1201 .load_generator_output()
1202 .expect("is loadgen")
1203 .clone(),
1204 },
1205 },
1206 );
1207 }
1208 }
1209 None => {
1210 if multi_output_sources
1211 && matches!(reference_policy, SourceReferencePolicy::Required)
1212 {
1213 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1214 }
1215 }
1216 }
1217
1218 if let LoadGenerator::Clock = generator {
1219 if !options
1220 .iter()
1221 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1222 {
1223 let now = catalog.now();
1224 options.push(LoadGeneratorOption {
1225 name: LoadGeneratorOptionName::AsOf,
1226 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1227 });
1228 }
1229 }
1230 }
1231 }
1232
1233 *external_references = None;
1237
1238 let create_progress_subsource_stmt = if uses_old_syntax {
1240 let name = match progress_subsource {
1242 Some(name) => match name {
1243 DeferredItemName::Deferred(name) => name.clone(),
1244 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1245 },
1246 None => {
1247 let (item, prefix) = source_name.0.split_last().unwrap();
1248 let item_name =
1249 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1250 let mut suggested_name = prefix.to_vec();
1251 suggested_name.push(candidate.clone());
1252
1253 let partial =
1254 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1255 let qualified = scx.allocate_qualified_name(partial)?;
1256 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1257 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1258 Ok::<_, PlanError>(!item_exists && !type_exists)
1259 })?;
1260
1261 let mut full_name = prefix.to_vec();
1262 full_name.push(item_name);
1263 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1264 let qualified_name = scx.allocate_qualified_name(full_name)?;
1265 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1266
1267 UnresolvedItemName::from(full_name.clone())
1268 }
1269 };
1270
1271 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1272
1273 let mut progress_with_options: Vec<_> = with_options
1275 .iter()
1276 .filter_map(|opt| match opt.name {
1277 CreateSourceOptionName::TimestampInterval => None,
1278 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1279 name: CreateSubsourceOptionName::RetainHistory,
1280 value: opt.value.clone(),
1281 }),
1282 })
1283 .collect();
1284 progress_with_options.push(CreateSubsourceOption {
1285 name: CreateSubsourceOptionName::Progress,
1286 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1287 });
1288
1289 Some(CreateSubsourceStatement {
1290 name,
1291 columns,
1292 of_source: None,
1296 constraints,
1297 if_not_exists: false,
1298 with_options: progress_with_options,
1299 })
1300 } else {
1301 None
1302 };
1303
1304 purify_source_format(
1305 &catalog,
1306 format,
1307 &format_options,
1308 envelope,
1309 storage_configuration,
1310 )
1311 .await?;
1312
1313 Ok(PurifiedStatement::PurifiedCreateSource {
1314 create_progress_subsource_stmt,
1315 create_source_stmt,
1316 subsources: requested_subsource_map,
1317 available_source_references: retrieved_source_references.available_source_references(),
1318 })
1319}
1320
1321async fn purify_alter_source(
1324 catalog: impl SessionCatalog,
1325 stmt: AlterSourceStatement<Aug>,
1326 storage_configuration: &StorageConfiguration,
1327) -> Result<PurifiedStatement, PlanError> {
1328 let scx = StatementContext::new(None, &catalog);
1329 let AlterSourceStatement {
1330 source_name: unresolved_source_name,
1331 action,
1332 if_exists,
1333 } = stmt;
1334
1335 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1337 Ok(item) => item,
1338 Err(_) if if_exists => {
1339 return Ok(PurifiedStatement::PurifiedAlterSource {
1340 alter_source_stmt: AlterSourceStatement {
1341 source_name: unresolved_source_name,
1342 action,
1343 if_exists,
1344 },
1345 });
1346 }
1347 Err(e) => return Err(e),
1348 };
1349
1350 let desc = match item.source_desc()? {
1352 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1353 None => {
1354 sql_bail!("cannot ALTER this type of source")
1355 }
1356 };
1357
1358 let source_name = item.name();
1359
1360 let resolved_source_name = ResolvedItemName::Item {
1361 id: item.id(),
1362 qualifiers: item.name().qualifiers.clone(),
1363 full_name: scx.catalog.resolve_full_name(source_name),
1364 print_id: true,
1365 version: RelationVersionSelector::Latest,
1366 };
1367
1368 let partial_name = scx.catalog.minimal_qualification(source_name);
1369
1370 match action {
1371 AlterSourceAction::AddSubsources {
1372 external_references,
1373 options,
1374 } => {
1375 if scx.catalog.system_vars().enable_create_table_from_source()
1376 && scx.catalog.system_vars().force_source_table_syntax()
1377 {
1378 Err(PlanError::UseTablesForSources(
1379 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1380 ))?;
1381 }
1382
1383 purify_alter_source_add_subsources(
1384 external_references,
1385 options,
1386 desc,
1387 partial_name,
1388 unresolved_source_name,
1389 resolved_source_name,
1390 storage_configuration,
1391 )
1392 .await
1393 }
1394 AlterSourceAction::RefreshReferences => {
1395 purify_alter_source_refresh_references(
1396 desc,
1397 resolved_source_name,
1398 storage_configuration,
1399 )
1400 .await
1401 }
1402 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1403 alter_source_stmt: AlterSourceStatement {
1404 source_name: unresolved_source_name,
1405 action,
1406 if_exists,
1407 },
1408 }),
1409 }
1410}
1411
1412async fn purify_alter_source_add_subsources(
1415 external_references: Vec<ExternalReferenceExport>,
1416 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1417 desc: SourceDesc,
1418 partial_source_name: PartialItemName,
1419 unresolved_source_name: UnresolvedItemName,
1420 resolved_source_name: ResolvedItemName,
1421 storage_configuration: &StorageConfiguration,
1422) -> Result<PurifiedStatement, PlanError> {
1423 let connection_id = match &desc.connection {
1425 GenericSourceConnection::Postgres(c) => c.connection_id,
1426 GenericSourceConnection::MySql(c) => c.connection_id,
1427 GenericSourceConnection::SqlServer(c) => c.connection_id,
1428 _ => sql_bail!(
1429 "source {} does not support ALTER SOURCE.",
1430 partial_source_name
1431 ),
1432 };
1433
1434 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1435 text_columns,
1436 exclude_columns,
1437 details,
1438 seen: _,
1439 } = options.clone().try_into()?;
1440 assert_none!(details, "details cannot be explicitly set");
1441
1442 let mut requested_subsource_map = BTreeMap::new();
1443
1444 match desc.connection {
1445 GenericSourceConnection::Postgres(pg_source_connection) => {
1446 let pg_connection = &pg_source_connection.connection;
1448
1449 let client = pg_connection
1450 .validate(connection_id, storage_configuration)
1451 .await?;
1452
1453 let reference_client = SourceReferenceClient::Postgres {
1454 client: &client,
1455 publication: &pg_source_connection.publication,
1456 database: &pg_connection.database,
1457 };
1458 let retrieved_source_references = reference_client.get_source_references().await?;
1459
1460 let postgres::PurifiedSourceExports {
1461 source_exports: subsources,
1462 normalized_text_columns,
1463 } = postgres::purify_source_exports(
1464 &client,
1465 &retrieved_source_references,
1466 &Some(ExternalReferences::SubsetTables(external_references)),
1467 text_columns,
1468 exclude_columns,
1469 &unresolved_source_name,
1470 &SourceReferencePolicy::Required,
1471 )
1472 .await?;
1473
1474 if let Some(text_cols_option) = options
1475 .iter_mut()
1476 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1477 {
1478 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1479 }
1480
1481 requested_subsource_map.extend(subsources);
1482 }
1483 GenericSourceConnection::MySql(mysql_source_connection) => {
1484 let mysql_connection = &mysql_source_connection.connection;
1485 let config = mysql_connection
1486 .config(
1487 &storage_configuration.connection_context.secrets_reader,
1488 storage_configuration,
1489 InTask::No,
1490 )
1491 .await?;
1492
1493 let mut conn = config
1494 .connect(
1495 "mysql purification",
1496 &storage_configuration.connection_context.ssh_tunnel_manager,
1497 )
1498 .await?;
1499
1500 let initial_gtid_set =
1503 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1504
1505 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1506
1507 let reference_client = SourceReferenceClient::MySql {
1508 conn: &mut conn,
1509 include_system_schemas: mysql::references_system_schemas(&requested_references),
1510 };
1511 let retrieved_source_references = reference_client.get_source_references().await?;
1512
1513 let mysql::PurifiedSourceExports {
1514 source_exports: subsources,
1515 normalized_text_columns,
1516 normalized_exclude_columns,
1517 } = mysql::purify_source_exports(
1518 &mut conn,
1519 &retrieved_source_references,
1520 &requested_references,
1521 text_columns,
1522 exclude_columns,
1523 &unresolved_source_name,
1524 initial_gtid_set,
1525 &SourceReferencePolicy::Required,
1526 )
1527 .await?;
1528 requested_subsource_map.extend(subsources);
1529
1530 if let Some(text_cols_option) = options
1532 .iter_mut()
1533 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1534 {
1535 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1536 }
1537 if let Some(exclude_cols_option) = options
1538 .iter_mut()
1539 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1540 {
1541 exclude_cols_option.value =
1542 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1543 }
1544 }
1545 GenericSourceConnection::SqlServer(sql_server_source) => {
1546 let sql_server_connection = &sql_server_source.connection;
1548 let config = sql_server_connection
1549 .resolve_config(
1550 &storage_configuration.connection_context.secrets_reader,
1551 storage_configuration,
1552 InTask::No,
1553 )
1554 .await?;
1555 let mut client = mz_sql_server_util::Client::connect(config).await?;
1556
1557 let database = sql_server_connection.database.clone().into();
1559 let source_references = SourceReferenceClient::SqlServer {
1560 client: &mut client,
1561 database: Arc::clone(&database),
1562 }
1563 .get_source_references()
1564 .await?;
1565 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1566
1567 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1568 .get(storage_configuration.config_set());
1569
1570 let result = sql_server::purify_source_exports(
1571 &*database,
1572 &mut client,
1573 &source_references,
1574 &requested_references,
1575 &text_columns,
1576 &exclude_columns,
1577 &unresolved_source_name,
1578 timeout,
1579 &SourceReferencePolicy::Required,
1580 )
1581 .await;
1582 let sql_server::PurifiedSourceExports {
1583 source_exports,
1584 normalized_text_columns,
1585 normalized_excl_columns,
1586 } = result?;
1587
1588 requested_subsource_map.extend(source_exports);
1590
1591 if let Some(text_cols_option) = options
1593 .iter_mut()
1594 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1595 {
1596 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1597 }
1598 if let Some(exclude_cols_option) = options
1599 .iter_mut()
1600 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1601 {
1602 exclude_cols_option.value =
1603 Some(WithOptionValue::Sequence(normalized_excl_columns));
1604 }
1605 }
1606 _ => unreachable!(),
1607 };
1608
1609 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1610 source_name: resolved_source_name,
1611 options,
1612 subsources: requested_subsource_map,
1613 })
1614}
1615
1616async fn purify_alter_source_refresh_references(
1617 desc: SourceDesc,
1618 resolved_source_name: ResolvedItemName,
1619 storage_configuration: &StorageConfiguration,
1620) -> Result<PurifiedStatement, PlanError> {
1621 let retrieved_source_references = match desc.connection {
1622 GenericSourceConnection::Postgres(pg_source_connection) => {
1623 let pg_connection = &pg_source_connection.connection;
1625
1626 let config = pg_connection
1627 .config(
1628 &storage_configuration.connection_context.secrets_reader,
1629 storage_configuration,
1630 InTask::No,
1631 )
1632 .await?;
1633
1634 let client = config
1635 .connect(
1636 "postgres_purification",
1637 &storage_configuration.connection_context.ssh_tunnel_manager,
1638 )
1639 .await?;
1640 let reference_client = SourceReferenceClient::Postgres {
1641 client: &client,
1642 publication: &pg_source_connection.publication,
1643 database: &pg_connection.database,
1644 };
1645 reference_client.get_source_references().await?
1646 }
1647 GenericSourceConnection::MySql(mysql_source_connection) => {
1648 let mysql_connection = &mysql_source_connection.connection;
1649 let config = mysql_connection
1650 .config(
1651 &storage_configuration.connection_context.secrets_reader,
1652 storage_configuration,
1653 InTask::No,
1654 )
1655 .await?;
1656
1657 let mut conn = config
1658 .connect(
1659 "mysql purification",
1660 &storage_configuration.connection_context.ssh_tunnel_manager,
1661 )
1662 .await?;
1663
1664 let reference_client = SourceReferenceClient::MySql {
1665 conn: &mut conn,
1666 include_system_schemas: false,
1667 };
1668 reference_client.get_source_references().await?
1669 }
1670 GenericSourceConnection::SqlServer(sql_server_source) => {
1671 let sql_server_connection = &sql_server_source.connection;
1673 let config = sql_server_connection
1674 .resolve_config(
1675 &storage_configuration.connection_context.secrets_reader,
1676 storage_configuration,
1677 InTask::No,
1678 )
1679 .await?;
1680 let mut client = mz_sql_server_util::Client::connect(config).await?;
1681
1682 let source_references = SourceReferenceClient::SqlServer {
1684 client: &mut client,
1685 database: sql_server_connection.database.clone().into(),
1686 }
1687 .get_source_references()
1688 .await?;
1689 source_references
1690 }
1691 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1692 let reference_client = SourceReferenceClient::LoadGenerator {
1693 generator: &load_gen_connection.load_generator,
1694 };
1695 reference_client.get_source_references().await?
1696 }
1697 GenericSourceConnection::Kafka(kafka_conn) => {
1698 let reference_client = SourceReferenceClient::Kafka {
1699 topic: &kafka_conn.topic,
1700 };
1701 reference_client.get_source_references().await?
1702 }
1703 };
1704 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1705 source_name: resolved_source_name,
1706 available_source_references: retrieved_source_references.available_source_references(),
1707 })
1708}
1709
1710async fn purify_create_table_from_source(
1711 catalog: impl SessionCatalog,
1712 mut stmt: CreateTableFromSourceStatement<Aug>,
1713 storage_configuration: &StorageConfiguration,
1714) -> Result<PurifiedStatement, PlanError> {
1715 let scx = StatementContext::new(None, &catalog);
1716 let CreateTableFromSourceStatement {
1717 name: _,
1718 columns,
1719 constraints,
1720 source: source_name,
1721 if_not_exists: _,
1722 external_reference,
1723 format,
1724 envelope,
1725 include_metadata: _,
1726 with_options,
1727 } = &mut stmt;
1728
1729 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1731 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1732 }
1733 if !constraints.is_empty() {
1734 sql_bail!(
1735 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1736 );
1737 }
1738
1739 let item = match scx.get_item_by_resolved_name(source_name) {
1741 Ok(item) => item,
1742 Err(e) => return Err(e),
1743 };
1744
1745 let desc = match item.source_desc()? {
1747 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1748 None => {
1749 sql_bail!("cannot ALTER this type of source")
1750 }
1751 };
1752 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1753
1754 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1755 text_columns,
1756 exclude_columns,
1757 retain_history: _,
1758 details,
1759 partition_by: _,
1760 seen: _,
1761 } = with_options.clone().try_into()?;
1762 assert_none!(details, "details cannot be explicitly set");
1763
1764 let qualified_text_columns = text_columns
1768 .iter()
1769 .map(|col| {
1770 UnresolvedItemName(
1771 external_reference
1772 .as_ref()
1773 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1774 .unwrap_or_else(|| vec![col.clone()]),
1775 )
1776 })
1777 .collect_vec();
1778 let qualified_exclude_columns = exclude_columns
1779 .iter()
1780 .map(|col| {
1781 UnresolvedItemName(
1782 external_reference
1783 .as_ref()
1784 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1785 .unwrap_or_else(|| vec![col.clone()]),
1786 )
1787 })
1788 .collect_vec();
1789
1790 let mut format_options = SourceFormatOptions::Default;
1792
1793 let retrieved_source_references: RetrievedSourceReferences;
1794
1795 let requested_references = external_reference.as_ref().map(|ref_name| {
1796 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1797 reference: ref_name.clone(),
1798 alias: None,
1799 }])
1800 });
1801
1802 let purified_export = match desc.connection {
1805 GenericSourceConnection::Postgres(pg_source_connection) => {
1806 let pg_connection = &pg_source_connection.connection;
1808
1809 let client = pg_connection
1810 .validate(pg_source_connection.connection_id, storage_configuration)
1811 .await?;
1812
1813 let reference_client = SourceReferenceClient::Postgres {
1814 client: &client,
1815 publication: &pg_source_connection.publication,
1816 database: &pg_connection.database,
1817 };
1818 retrieved_source_references = reference_client.get_source_references().await?;
1819
1820 let postgres::PurifiedSourceExports {
1821 source_exports,
1822 normalized_text_columns: _,
1826 } = postgres::purify_source_exports(
1827 &client,
1828 &retrieved_source_references,
1829 &requested_references,
1830 qualified_text_columns,
1831 qualified_exclude_columns,
1832 &unresolved_source_name,
1833 &SourceReferencePolicy::Required,
1834 )
1835 .await?;
1836 let (_, purified_export) = source_exports.into_element();
1838 purified_export
1839 }
1840 GenericSourceConnection::MySql(mysql_source_connection) => {
1841 let mysql_connection = &mysql_source_connection.connection;
1842 let config = mysql_connection
1843 .config(
1844 &storage_configuration.connection_context.secrets_reader,
1845 storage_configuration,
1846 InTask::No,
1847 )
1848 .await?;
1849
1850 let mut conn = config
1851 .connect(
1852 "mysql purification",
1853 &storage_configuration.connection_context.ssh_tunnel_manager,
1854 )
1855 .await?;
1856
1857 let initial_gtid_set =
1860 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1861
1862 let reference_client = SourceReferenceClient::MySql {
1863 conn: &mut conn,
1864 include_system_schemas: mysql::references_system_schemas(&requested_references),
1865 };
1866 retrieved_source_references = reference_client.get_source_references().await?;
1867
1868 let mysql::PurifiedSourceExports {
1869 source_exports,
1870 normalized_text_columns: _,
1874 normalized_exclude_columns: _,
1875 } = mysql::purify_source_exports(
1876 &mut conn,
1877 &retrieved_source_references,
1878 &requested_references,
1879 qualified_text_columns,
1880 qualified_exclude_columns,
1881 &unresolved_source_name,
1882 initial_gtid_set,
1883 &SourceReferencePolicy::Required,
1884 )
1885 .await?;
1886 let (_, purified_export) = source_exports.into_element();
1888 purified_export
1889 }
1890 GenericSourceConnection::SqlServer(sql_server_source) => {
1891 let connection = sql_server_source.connection;
1892 let config = connection
1893 .resolve_config(
1894 &storage_configuration.connection_context.secrets_reader,
1895 storage_configuration,
1896 InTask::No,
1897 )
1898 .await?;
1899 let mut client = mz_sql_server_util::Client::connect(config).await?;
1900
1901 let database: Arc<str> = connection.database.into();
1902 let reference_client = SourceReferenceClient::SqlServer {
1903 client: &mut client,
1904 database: Arc::clone(&database),
1905 };
1906 retrieved_source_references = reference_client.get_source_references().await?;
1907 tracing::debug!(?retrieved_source_references, "got source references");
1908
1909 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1910 .get(storage_configuration.config_set());
1911
1912 let purified_source_exports = sql_server::purify_source_exports(
1913 &*database,
1914 &mut client,
1915 &retrieved_source_references,
1916 &requested_references,
1917 &qualified_text_columns,
1918 &qualified_exclude_columns,
1919 &unresolved_source_name,
1920 timeout,
1921 &SourceReferencePolicy::Required,
1922 )
1923 .await?;
1924
1925 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1927 purified_export
1928 }
1929 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1930 let reference_client = SourceReferenceClient::LoadGenerator {
1931 generator: &load_gen_connection.load_generator,
1932 };
1933 retrieved_source_references = reference_client.get_source_references().await?;
1934
1935 let requested_exports = retrieved_source_references
1936 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1937 let export = requested_exports.into_element();
1939 PurifiedSourceExport {
1940 external_reference: export.external_reference,
1941 details: PurifiedExportDetails::LoadGenerator {
1942 table: export
1943 .meta
1944 .load_generator_desc()
1945 .expect("is loadgen")
1946 .clone(),
1947 output: export
1948 .meta
1949 .load_generator_output()
1950 .expect("is loadgen")
1951 .clone(),
1952 },
1953 }
1954 }
1955 GenericSourceConnection::Kafka(kafka_conn) => {
1956 let reference_client = SourceReferenceClient::Kafka {
1957 topic: &kafka_conn.topic,
1958 };
1959 retrieved_source_references = reference_client.get_source_references().await?;
1960 let requested_exports = retrieved_source_references
1961 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1962 let export = requested_exports.into_element();
1964
1965 format_options = SourceFormatOptions::Kafka {
1966 topic: kafka_conn.topic.clone(),
1967 };
1968 PurifiedSourceExport {
1969 external_reference: export.external_reference,
1970 details: PurifiedExportDetails::Kafka {},
1971 }
1972 }
1973 };
1974
1975 purify_source_format(
1976 &catalog,
1977 format,
1978 &format_options,
1979 envelope,
1980 storage_configuration,
1981 )
1982 .await?;
1983
1984 *external_reference = Some(purified_export.external_reference.clone());
1987
1988 match &purified_export.details {
1990 PurifiedExportDetails::Postgres { .. } => {
1991 let mut unsupported_cols = vec![];
1992 let postgres::PostgresExportStatementValues {
1993 columns: gen_columns,
1994 constraints: gen_constraints,
1995 text_columns: gen_text_columns,
1996 exclude_columns: gen_exclude_columns,
1997 details: gen_details,
1998 external_reference: _,
1999 } = postgres::generate_source_export_statement_values(
2000 &scx,
2001 purified_export,
2002 &mut unsupported_cols,
2003 )?;
2004 if !unsupported_cols.is_empty() {
2005 unsupported_cols.sort();
2006 Err(PgSourcePurificationError::UnrecognizedTypes {
2007 cols: unsupported_cols,
2008 })?;
2009 }
2010
2011 if let Some(text_cols_option) = with_options
2012 .iter_mut()
2013 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2014 {
2015 match gen_text_columns {
2016 Some(gen_text_columns) => {
2017 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2018 }
2019 None => soft_panic_or_log!(
2020 "text_columns should be Some if text_cols_option is present"
2021 ),
2022 }
2023 }
2024 if let Some(exclude_cols_option) = with_options
2025 .iter_mut()
2026 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2027 {
2028 match gen_exclude_columns {
2029 Some(gen_exclude_columns) => {
2030 exclude_cols_option.value =
2031 Some(WithOptionValue::Sequence(gen_exclude_columns))
2032 }
2033 None => soft_panic_or_log!(
2034 "exclude_columns should be Some if exclude_cols_option is present"
2035 ),
2036 }
2037 }
2038 match columns {
2039 TableFromSourceColumns::Defined(_) => unreachable!(),
2040 TableFromSourceColumns::NotSpecified => {
2041 *columns = TableFromSourceColumns::Defined(gen_columns);
2042 *constraints = gen_constraints;
2043 }
2044 TableFromSourceColumns::Named(_) => {
2045 sql_bail!("columns cannot be named for Postgres sources")
2046 }
2047 }
2048 with_options.push(TableFromSourceOption {
2049 name: TableFromSourceOptionName::Details,
2050 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2051 gen_details.into_proto().encode_to_vec(),
2052 )))),
2053 })
2054 }
2055 PurifiedExportDetails::MySql { .. } => {
2056 let mysql::MySqlExportStatementValues {
2057 columns: gen_columns,
2058 constraints: gen_constraints,
2059 text_columns: gen_text_columns,
2060 exclude_columns: gen_exclude_columns,
2061 details: gen_details,
2062 external_reference: _,
2063 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2064
2065 if let Some(text_cols_option) = with_options
2066 .iter_mut()
2067 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2068 {
2069 match gen_text_columns {
2070 Some(gen_text_columns) => {
2071 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2072 }
2073 None => soft_panic_or_log!(
2074 "text_columns should be Some if text_cols_option is present"
2075 ),
2076 }
2077 }
2078 if let Some(exclude_cols_option) = with_options
2079 .iter_mut()
2080 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2081 {
2082 match gen_exclude_columns {
2083 Some(gen_exclude_columns) => {
2084 exclude_cols_option.value =
2085 Some(WithOptionValue::Sequence(gen_exclude_columns))
2086 }
2087 None => soft_panic_or_log!(
2088 "exclude_columns should be Some if exclude_cols_option is present"
2089 ),
2090 }
2091 }
2092 match columns {
2093 TableFromSourceColumns::Defined(_) => unreachable!(),
2094 TableFromSourceColumns::NotSpecified => {
2095 *columns = TableFromSourceColumns::Defined(gen_columns);
2096 *constraints = gen_constraints;
2097 }
2098 TableFromSourceColumns::Named(_) => {
2099 sql_bail!("columns cannot be named for MySQL sources")
2100 }
2101 }
2102 with_options.push(TableFromSourceOption {
2103 name: TableFromSourceOptionName::Details,
2104 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2105 gen_details.into_proto().encode_to_vec(),
2106 )))),
2107 })
2108 }
2109 PurifiedExportDetails::SqlServer { .. } => {
2110 let sql_server::SqlServerExportStatementValues {
2111 columns: gen_columns,
2112 constraints: gen_constraints,
2113 text_columns: gen_text_columns,
2114 excl_columns: gen_excl_columns,
2115 details: gen_details,
2116 external_reference: _,
2117 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2118
2119 if let Some(text_cols_option) = with_options
2120 .iter_mut()
2121 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2122 {
2123 match gen_text_columns {
2124 Some(gen_text_columns) => {
2125 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2126 }
2127 None => soft_panic_or_log!(
2128 "text_columns should be Some if text_cols_option is present"
2129 ),
2130 }
2131 }
2132 if let Some(exclude_cols_option) = with_options
2133 .iter_mut()
2134 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2135 {
2136 match gen_excl_columns {
2137 Some(gen_excl_columns) => {
2138 exclude_cols_option.value =
2139 Some(WithOptionValue::Sequence(gen_excl_columns))
2140 }
2141 None => soft_panic_or_log!(
2142 "excl_columns should be Some if excl_cols_option is present"
2143 ),
2144 }
2145 }
2146
2147 match columns {
2148 TableFromSourceColumns::NotSpecified => {
2149 *columns = TableFromSourceColumns::Defined(gen_columns);
2150 *constraints = gen_constraints;
2151 }
2152 TableFromSourceColumns::Named(_) => {
2153 sql_bail!("columns cannot be named for SQL Server sources")
2154 }
2155 TableFromSourceColumns::Defined(_) => unreachable!(),
2156 }
2157
2158 with_options.push(TableFromSourceOption {
2159 name: TableFromSourceOptionName::Details,
2160 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2161 gen_details.into_proto().encode_to_vec(),
2162 )))),
2163 })
2164 }
2165 PurifiedExportDetails::LoadGenerator { .. } => {
2166 let (desc, output) = match purified_export.details {
2167 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2168 _ => unreachable!("purified export details must be load generator"),
2169 };
2170 if let Some(desc) = desc {
2175 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2176 match columns {
2177 TableFromSourceColumns::Defined(_) => unreachable!(),
2178 TableFromSourceColumns::NotSpecified => {
2179 *columns = TableFromSourceColumns::Defined(gen_columns);
2180 *constraints = gen_constraints;
2181 }
2182 TableFromSourceColumns::Named(_) => {
2183 sql_bail!("columns cannot be named for multi-output load generator sources")
2184 }
2185 }
2186 }
2187 let details = SourceExportStatementDetails::LoadGenerator { output };
2188 with_options.push(TableFromSourceOption {
2189 name: TableFromSourceOptionName::Details,
2190 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2191 details.into_proto().encode_to_vec(),
2192 )))),
2193 })
2194 }
2195 PurifiedExportDetails::Kafka {} => {
2196 let details = SourceExportStatementDetails::Kafka {};
2200 with_options.push(TableFromSourceOption {
2201 name: TableFromSourceOptionName::Details,
2202 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2203 details.into_proto().encode_to_vec(),
2204 )))),
2205 })
2206 }
2207 };
2208
2209 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2213}
2214
2215enum SourceFormatOptions {
2216 Default,
2217 Kafka { topic: String },
2218}
2219
2220async fn purify_source_format(
2221 catalog: &dyn SessionCatalog,
2222 format: &mut Option<FormatSpecifier<Aug>>,
2223 options: &SourceFormatOptions,
2224 envelope: &Option<SourceEnvelope>,
2225 storage_configuration: &StorageConfiguration,
2226) -> Result<(), PlanError> {
2227 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2228 && !matches!(options, SourceFormatOptions::Kafka { .. })
2229 {
2230 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2231 }
2232
2233 match format.as_mut() {
2234 None => {}
2235 Some(FormatSpecifier::Bare(format)) => {
2236 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2237 .await?;
2238 }
2239
2240 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2241 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2242 .await?;
2243 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2244 .await?;
2245 }
2246 }
2247 Ok(())
2248}
2249
2250async fn purify_source_format_single(
2251 catalog: &dyn SessionCatalog,
2252 format: &mut Format<Aug>,
2253 options: &SourceFormatOptions,
2254 envelope: &Option<SourceEnvelope>,
2255 storage_configuration: &StorageConfiguration,
2256) -> Result<(), PlanError> {
2257 match format {
2258 Format::Avro(schema) => match schema {
2259 AvroSchema::Csr { csr_connection } => {
2260 purify_csr_connection_avro(
2261 catalog,
2262 options,
2263 csr_connection,
2264 envelope,
2265 storage_configuration,
2266 )
2267 .await?
2268 }
2269 AvroSchema::InlineSchema { .. } => {}
2270 },
2271 Format::Protobuf(schema) => match schema {
2272 ProtobufSchema::Csr { csr_connection } => {
2273 purify_csr_connection_proto(
2274 catalog,
2275 options,
2276 csr_connection,
2277 envelope,
2278 storage_configuration,
2279 )
2280 .await?;
2281 }
2282 ProtobufSchema::InlineSchema { .. } => {}
2283 },
2284 Format::Bytes
2285 | Format::Regex(_)
2286 | Format::Json { .. }
2287 | Format::Text
2288 | Format::Csv { .. } => (),
2289 }
2290 Ok(())
2291}
2292
2293pub fn generate_subsource_statements(
2294 scx: &StatementContext,
2295 source_name: ResolvedItemName,
2296 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2297) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2298 if subsources.is_empty() {
2300 return Ok(vec![]);
2301 }
2302 let (_, purified_export) = subsources.iter().next().unwrap();
2303
2304 let statements = match &purified_export.details {
2305 PurifiedExportDetails::Postgres { .. } => {
2306 crate::pure::postgres::generate_create_subsource_statements(
2307 scx,
2308 source_name,
2309 subsources,
2310 )?
2311 }
2312 PurifiedExportDetails::MySql { .. } => {
2313 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2314 }
2315 PurifiedExportDetails::SqlServer { .. } => {
2316 crate::pure::sql_server::generate_create_subsource_statements(
2317 scx,
2318 source_name,
2319 subsources,
2320 )?
2321 }
2322 PurifiedExportDetails::LoadGenerator { .. } => {
2323 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2324 for (subsource_name, purified_export) in subsources {
2325 let (desc, output) = match purified_export.details {
2326 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2327 _ => unreachable!("purified export details must be load generator"),
2328 };
2329 let desc =
2330 desc.expect("subsources cannot be generated for single-output load generators");
2331
2332 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2333 let details = SourceExportStatementDetails::LoadGenerator { output };
2334 let subsource = CreateSubsourceStatement {
2336 name: subsource_name,
2337 columns,
2338 of_source: Some(source_name.clone()),
2339 constraints: table_constraints,
2344 if_not_exists: false,
2345 with_options: vec![
2346 CreateSubsourceOption {
2347 name: CreateSubsourceOptionName::ExternalReference,
2348 value: Some(WithOptionValue::UnresolvedItemName(
2349 purified_export.external_reference,
2350 )),
2351 },
2352 CreateSubsourceOption {
2353 name: CreateSubsourceOptionName::Details,
2354 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2355 details.into_proto().encode_to_vec(),
2356 )))),
2357 },
2358 ],
2359 };
2360 subsource_stmts.push(subsource);
2361 }
2362
2363 subsource_stmts
2364 }
2365 PurifiedExportDetails::Kafka { .. } => {
2366 assert!(
2370 subsources.is_empty(),
2371 "Kafka sources do not produce data-bearing subsources"
2372 );
2373 vec![]
2374 }
2375 };
2376 Ok(statements)
2377}
2378
2379async fn purify_csr_connection_proto(
2380 catalog: &dyn SessionCatalog,
2381 options: &SourceFormatOptions,
2382 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2383 envelope: &Option<SourceEnvelope>,
2384 storage_configuration: &StorageConfiguration,
2385) -> Result<(), PlanError> {
2386 let SourceFormatOptions::Kafka { topic } = options else {
2387 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2388 };
2389
2390 let CsrConnectionProtobuf {
2391 seed,
2392 connection: CsrConnection {
2393 connection,
2394 options: _,
2395 },
2396 } = csr_connection;
2397 match seed {
2398 None => {
2399 let scx = StatementContext::new(None, &*catalog);
2400
2401 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2402 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2403 _ => sql_bail!("{} is not a schema registry connection", connection),
2404 };
2405
2406 let ccsr_client = ccsr_connection
2407 .connect(storage_configuration, InTask::No)
2408 .await
2409 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2410
2411 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2412 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2413 .await
2414 .ok();
2415
2416 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2417 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2418 }
2419
2420 *seed = Some(CsrSeedProtobuf { value, key });
2421 }
2422 Some(_) => (),
2423 }
2424
2425 Ok(())
2426}
2427
2428async fn purify_csr_connection_avro(
2429 catalog: &dyn SessionCatalog,
2430 options: &SourceFormatOptions,
2431 csr_connection: &mut CsrConnectionAvro<Aug>,
2432 envelope: &Option<SourceEnvelope>,
2433 storage_configuration: &StorageConfiguration,
2434) -> Result<(), PlanError> {
2435 let SourceFormatOptions::Kafka { topic } = options else {
2436 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2437 };
2438
2439 let CsrConnectionAvro {
2440 connection: CsrConnection { connection, .. },
2441 seed,
2442 key_strategy,
2443 value_strategy,
2444 } = csr_connection;
2445 if seed.is_none() {
2446 let scx = StatementContext::new(None, &*catalog);
2447 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2448 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2449 _ => sql_bail!("{} is not a schema registry connection", connection),
2450 };
2451 let ccsr_client = csr_connection
2452 .connect(storage_configuration, InTask::No)
2453 .await
2454 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2455
2456 let Schema {
2457 key_schema,
2458 value_schema,
2459 } = get_remote_csr_schema(
2460 &ccsr_client,
2461 key_strategy.clone().unwrap_or_default(),
2462 value_strategy.clone().unwrap_or_default(),
2463 topic,
2464 )
2465 .await?;
2466 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2467 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2468 }
2469
2470 *seed = Some(CsrSeedAvro {
2471 key_schema,
2472 value_schema,
2473 })
2474 }
2475
2476 Ok(())
2477}
2478
2479#[derive(Debug)]
2480pub struct Schema {
2481 pub key_schema: Option<String>,
2482 pub value_schema: String,
2483}
2484
2485async fn get_schema_with_strategy(
2486 client: &Client,
2487 strategy: ReaderSchemaSelectionStrategy,
2488 subject: &str,
2489) -> Result<Option<String>, PlanError> {
2490 match strategy {
2491 ReaderSchemaSelectionStrategy::Latest => {
2492 match client.get_schema_by_subject(subject).await {
2493 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2494 Err(GetBySubjectError::SubjectNotFound)
2495 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2496 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2497 schema_lookup: format!("subject {}", subject.quoted()),
2498 cause: Arc::new(e),
2499 }),
2500 }
2501 }
2502 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2503 ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2504 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2505 Err(GetByIdError::SchemaNotFound) => Ok(None),
2506 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2507 schema_lookup: format!("ID {}", id),
2508 cause: Arc::new(e),
2509 }),
2510 },
2511 }
2512}
2513
2514async fn get_remote_csr_schema(
2515 ccsr_client: &mz_ccsr::Client,
2516 key_strategy: ReaderSchemaSelectionStrategy,
2517 value_strategy: ReaderSchemaSelectionStrategy,
2518 topic: &str,
2519) -> Result<Schema, PlanError> {
2520 let value_schema_name = format!("{}-value", topic);
2521 let value_schema =
2522 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2523 let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2524 let subject = format!("{}-key", topic);
2525 let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2526 Ok(Schema {
2527 key_schema,
2528 value_schema,
2529 })
2530}
2531
2532async fn compile_proto(
2534 subject_name: &String,
2535 ccsr_client: &Client,
2536) -> Result<CsrSeedProtobufSchema, PlanError> {
2537 let (primary_subject, dependency_subjects) = ccsr_client
2538 .get_subject_and_references(subject_name)
2539 .await
2540 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2541 schema_lookup: format!("subject {}", subject_name.quoted()),
2542 cause: Arc::new(e),
2543 })?;
2544
2545 let mut source_tree = VirtualSourceTree::new();
2547
2548 source_tree.as_mut().map_well_known_types();
2552
2553 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2554 source_tree.as_mut().add_file(
2555 Path::new(&subject.name),
2556 subject.schema.raw.as_bytes().to_vec(),
2557 );
2558 }
2559 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2560 let fds = db
2561 .as_mut()
2562 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2563 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2564
2565 let primary_fd = fds.file(0);
2567 let message_name = match primary_fd.message_type_size() {
2568 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2569 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2570 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2571 };
2572
2573 let bytes = &fds
2575 .serialize()
2576 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2577 let mut schema = String::new();
2578 strconv::format_bytes(&mut schema, bytes);
2579
2580 Ok(CsrSeedProtobufSchema {
2581 schema,
2582 message_name,
2583 })
2584}
2585
2586const MZ_NOW_NAME: &str = "mz_now";
2587const MZ_NOW_SCHEMA: &str = "mz_catalog";
2588
2589pub fn purify_create_materialized_view_options(
2595 catalog: impl SessionCatalog,
2596 mz_now: Option<Timestamp>,
2597 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2598 resolved_ids: &mut ResolvedIds,
2599) {
2600 let (mz_now_id, mz_now_expr) = {
2603 let item = catalog
2604 .resolve_function(&PartialItemName {
2605 database: None,
2606 schema: Some(MZ_NOW_SCHEMA.to_string()),
2607 item: MZ_NOW_NAME.to_string(),
2608 })
2609 .expect("we should be able to resolve mz_now");
2610 (
2611 item.id(),
2612 Expr::Function(Function {
2613 name: ResolvedItemName::Item {
2614 id: item.id(),
2615 qualifiers: item.name().qualifiers.clone(),
2616 full_name: catalog.resolve_full_name(item.name()),
2617 print_id: false,
2618 version: RelationVersionSelector::Latest,
2619 },
2620 args: FunctionArgs::Args {
2621 args: Vec::new(),
2622 order_by: Vec::new(),
2623 },
2624 filter: None,
2625 over: None,
2626 distinct: false,
2627 }),
2628 )
2629 };
2630 let (mz_timestamp_id, mz_timestamp_type) = {
2632 let item = catalog.get_system_type("mz_timestamp");
2633 let full_name = catalog.resolve_full_name(item.name());
2634 (
2635 item.id(),
2636 ResolvedDataType::Named {
2637 id: item.id(),
2638 qualifiers: item.name().qualifiers.clone(),
2639 full_name,
2640 modifiers: vec![],
2641 print_id: true,
2642 },
2643 )
2644 };
2645
2646 let mut introduced_mz_timestamp = false;
2647
2648 for option in cmvs.with_options.iter_mut() {
2649 if matches!(
2651 option.value,
2652 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2653 ) {
2654 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2655 RefreshAtOptionValue {
2656 time: mz_now_expr.clone(),
2657 },
2658 )));
2659 }
2660
2661 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2663 RefreshEveryOptionValue { aligned_to, .. },
2664 ))) = &mut option.value
2665 {
2666 if aligned_to.is_none() {
2667 *aligned_to = Some(mz_now_expr.clone());
2668 }
2669 }
2670
2671 match &mut option.value {
2674 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2675 time,
2676 }))) => {
2677 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2678 visitor.visit_expr_mut(time);
2679 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2680 }
2681 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2682 RefreshEveryOptionValue {
2683 interval: _,
2684 aligned_to: Some(aligned_to),
2685 },
2686 ))) => {
2687 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2688 visitor.visit_expr_mut(aligned_to);
2689 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2690 }
2691 _ => {}
2692 }
2693 }
2694
2695 if !cmvs.with_options.iter().any(|o| {
2697 matches!(
2698 o,
2699 MaterializedViewOption {
2700 value: Some(WithOptionValue::Refresh(..)),
2701 ..
2702 }
2703 )
2704 }) {
2705 cmvs.with_options.push(MaterializedViewOption {
2706 name: MaterializedViewOptionName::Refresh,
2707 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2708 })
2709 }
2710
2711 if introduced_mz_timestamp {
2715 resolved_ids.add_item(mz_timestamp_id);
2716 }
2717 let mut visitor = ExprContainsTemporalVisitor::new();
2721 visitor.visit_create_materialized_view_statement(cmvs);
2722 if !visitor.contains_temporal {
2723 resolved_ids.remove_item(&mz_now_id);
2724 }
2725}
2726
2727pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2730 match &mvo.value {
2731 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2732 let mut visitor = ExprContainsTemporalVisitor::new();
2733 visitor.visit_expr(time);
2734 visitor.contains_temporal
2735 }
2736 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2737 interval: _,
2738 aligned_to: Some(aligned_to),
2739 }))) => {
2740 let mut visitor = ExprContainsTemporalVisitor::new();
2741 visitor.visit_expr(aligned_to);
2742 visitor.contains_temporal
2743 }
2744 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2745 interval: _,
2746 aligned_to: None,
2747 }))) => {
2748 true
2751 }
2752 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2753 true
2755 }
2756 _ => false,
2757 }
2758}
2759
2760struct ExprContainsTemporalVisitor {
2762 pub contains_temporal: bool,
2763}
2764
2765impl ExprContainsTemporalVisitor {
2766 pub fn new() -> ExprContainsTemporalVisitor {
2767 ExprContainsTemporalVisitor {
2768 contains_temporal: false,
2769 }
2770 }
2771}
2772
2773impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2774 fn visit_function(&mut self, func: &Function<Aug>) {
2775 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2776 visit_function(self, func);
2777 }
2778}
2779
2780struct MzNowPurifierVisitor {
2781 pub mz_now: Option<Timestamp>,
2782 pub mz_timestamp_type: ResolvedDataType,
2783 pub introduced_mz_timestamp: bool,
2784}
2785
2786impl MzNowPurifierVisitor {
2787 pub fn new(
2788 mz_now: Option<Timestamp>,
2789 mz_timestamp_type: ResolvedDataType,
2790 ) -> MzNowPurifierVisitor {
2791 MzNowPurifierVisitor {
2792 mz_now,
2793 mz_timestamp_type,
2794 introduced_mz_timestamp: false,
2795 }
2796 }
2797}
2798
2799impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2800 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2801 match expr {
2802 Expr::Function(Function {
2803 name:
2804 ResolvedItemName::Item {
2805 full_name: FullItemName { item, .. },
2806 ..
2807 },
2808 ..
2809 }) if item == &MZ_NOW_NAME.to_string() => {
2810 let mz_now = self.mz_now.expect(
2811 "we should have chosen a timestamp if the expression contains mz_now()",
2812 );
2813 *expr = Expr::Cast {
2816 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2817 data_type: self.mz_timestamp_type.clone(),
2818 };
2819 self.introduced_mz_timestamp = true;
2820 }
2821 _ => visit_expr_mut(self, expr),
2822 }
2823 }
2824}