1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
62};
63use prost::Message;
64use protobuf_native::MessageLite;
65use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
66use rdkafka::admin::AdminClient;
67use references::{RetrievedSourceReferences, SourceReferenceClient};
68use uuid::Uuid;
69
70use crate::ast::{
71 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
72 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
73 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
74};
75use crate::catalog::{CatalogItemType, SessionCatalog};
76use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
77use crate::names::{
78 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
79 ResolvedItemName,
80};
81use crate::plan::error::PlanError;
82use crate::plan::statement::ddl::load_generator_ast_to_generator;
83use crate::plan::{SourceReferences, StatementContext};
84use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
85use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
86use crate::{kafka_util, normalize};
87
88use self::error::{
89 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
90 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
91};
92
93pub(crate) mod error;
94mod references;
95
96pub mod mysql;
97pub mod postgres;
98pub mod sql_server;
99
100pub(crate) struct RequestedSourceExport<T> {
101 external_reference: UnresolvedItemName,
102 name: UnresolvedItemName,
103 meta: T,
104}
105
106impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.debug_struct("RequestedSourceExport")
109 .field("external_reference", &self.external_reference)
110 .field("name", &self.name)
111 .field("meta", &self.meta)
112 .finish()
113 }
114}
115
116impl<T> RequestedSourceExport<T> {
117 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
118 RequestedSourceExport {
119 external_reference: self.external_reference,
120 name: self.name,
121 meta: new_meta,
122 }
123 }
124}
125
126fn source_export_name_gen(
131 source_name: &UnresolvedItemName,
132 subsource_name: &str,
133) -> Result<UnresolvedItemName, PlanError> {
134 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
135 partial.item = subsource_name.to_string();
136 Ok(UnresolvedItemName::from(partial))
137}
138
139fn validate_source_export_names<T>(
143 requested_source_exports: &[RequestedSourceExport<T>],
144) -> Result<(), PlanError> {
145 if let Some(name) = requested_source_exports
149 .iter()
150 .map(|subsource| &subsource.name)
151 .duplicates()
152 .next()
153 .cloned()
154 {
155 let mut upstream_references: Vec<_> = requested_source_exports
156 .into_iter()
157 .filter_map(|subsource| {
158 if &subsource.name == &name {
159 Some(subsource.external_reference.clone())
160 } else {
161 None
162 }
163 })
164 .collect();
165
166 upstream_references.sort();
167
168 Err(PlanError::SubsourceNameConflict {
169 name,
170 upstream_references,
171 })?;
172 }
173
174 if let Some(name) = requested_source_exports
180 .iter()
181 .map(|export| &export.external_reference)
182 .duplicates()
183 .next()
184 .cloned()
185 {
186 let mut target_names: Vec<_> = requested_source_exports
187 .into_iter()
188 .filter_map(|export| {
189 if &export.external_reference == &name {
190 Some(export.name.clone())
191 } else {
192 None
193 }
194 })
195 .collect();
196
197 target_names.sort();
198
199 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
200 }
201
202 Ok(())
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum PurifiedStatement {
207 PurifiedCreateSource {
208 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
210 create_source_stmt: CreateSourceStatement<Aug>,
211 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213 available_source_references: SourceReferences,
216 },
217 PurifiedAlterSource {
218 alter_source_stmt: AlterSourceStatement<Aug>,
219 },
220 PurifiedAlterSourceAddSubsources {
221 source_name: ResolvedItemName,
223 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228 },
229 PurifiedAlterSourceRefreshReferences {
230 source_name: ResolvedItemName,
231 available_source_references: SourceReferences,
233 },
234 PurifiedCreateSink(CreateSinkStatement<Aug>),
235 PurifiedCreateTableFromSource {
236 stmt: CreateTableFromSourceStatement<Aug>,
237 },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242 pub external_reference: UnresolvedItemName,
243 pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248 MySql {
249 table: MySqlTableDesc,
250 text_columns: Option<Vec<Ident>>,
251 exclude_columns: Option<Vec<Ident>>,
252 initial_gtid_set: String,
253 },
254 Postgres {
255 table: PostgresTableDesc,
256 text_columns: Option<Vec<Ident>>,
257 exclude_columns: Option<Vec<Ident>>,
258 },
259 SqlServer {
260 table: SqlServerTableDesc,
261 text_columns: Option<Vec<Ident>>,
262 excl_columns: Option<Vec<Ident>>,
263 capture_instance: Arc<str>,
264 initial_lsn: mz_sql_server_util::cdc::Lsn,
265 },
266 Kafka {},
267 LoadGenerator {
268 table: Option<RelationDesc>,
269 output: LoadGeneratorOutput,
270 },
271}
272
273pub async fn purify_statement(
283 catalog: impl SessionCatalog,
284 now: u64,
285 stmt: Statement<Aug>,
286 storage_configuration: &StorageConfiguration,
287) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
288 match stmt {
289 Statement::CreateSource(stmt) => {
290 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
291 (
292 purify_create_source(catalog, now, stmt, storage_configuration).await,
293 cluster_id,
294 )
295 }
296 Statement::AlterSource(stmt) => (
297 purify_alter_source(catalog, stmt, storage_configuration).await,
298 None,
299 ),
300 Statement::CreateSink(stmt) => {
301 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
302 (
303 purify_create_sink(catalog, stmt, storage_configuration).await,
304 cluster_id,
305 )
306 }
307 Statement::CreateTableFromSource(stmt) => (
308 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
309 None,
310 ),
311 o => unreachable!("{:?} does not need to be purified", o),
312 }
313}
314
315pub(crate) fn purify_create_sink_avro_doc_on_options(
320 catalog: &dyn SessionCatalog,
321 from_id: CatalogItemId,
322 format: &mut Option<FormatSpecifier<Aug>>,
323) -> Result<(), PlanError> {
324 let from = catalog.get_item(&from_id);
326 let object_ids = from
327 .references()
328 .items()
329 .copied()
330 .chain_one(from.id())
331 .collect::<Vec<_>>();
332
333 let mut avro_format_options = vec![];
336 for_each_format(format, |doc_on_schema, fmt| match fmt {
337 Format::Avro(AvroSchema::InlineSchema { .. })
338 | Format::Bytes
339 | Format::Csv { .. }
340 | Format::Json { .. }
341 | Format::Protobuf(..)
342 | Format::Regex(..)
343 | Format::Text => (),
344 Format::Avro(AvroSchema::Csr {
345 csr_connection: CsrConnectionAvro { connection, .. },
346 }) => {
347 avro_format_options.push((doc_on_schema, &mut connection.options));
348 }
349 });
350
351 for (for_schema, options) in avro_format_options {
354 let user_provided_comments = options
355 .iter()
356 .filter_map(|CsrConfigOption { name, .. }| match name {
357 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
358 _ => None,
359 })
360 .collect::<BTreeSet<_>>();
361
362 for object_id in &object_ids {
364 let item = catalog
366 .get_item(object_id)
367 .at_version(RelationVersionSelector::Latest);
368 let full_resolved_name = ResolvedItemName::Item {
369 id: *object_id,
370 qualifiers: item.name().qualifiers.clone(),
371 full_name: catalog.resolve_full_name(item.name()),
372 print_id: !matches!(
373 item.item_type(),
374 CatalogItemType::Func | CatalogItemType::Type
375 ),
376 version: RelationVersionSelector::Latest,
377 };
378
379 if let Some(comments_map) = catalog.get_item_comments(object_id) {
380 let doc_on_item_key = AvroDocOn {
383 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384 for_schema,
385 };
386 if !user_provided_comments.contains(&doc_on_item_key) {
387 if let Some(root_comment) = comments_map.get(&None) {
388 options.push(CsrConfigOption {
389 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391 root_comment.clone(),
392 ))),
393 });
394 }
395 }
396
397 let column_descs = match item.type_details() {
401 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
402 None => item.relation_desc().map(|d| d.into_owned()),
403 };
404
405 if let Some(desc) = column_descs {
406 for (pos, column_name) in desc.iter_names().enumerate() {
407 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
408 let doc_on_column_key = AvroDocOn {
409 identifier: DocOnIdentifier::Column(ColumnName {
410 relation: full_resolved_name.clone(),
411 column: ResolvedColumnReference::Column {
412 name: column_name.to_owned(),
413 index: pos,
414 },
415 }),
416 for_schema,
417 };
418 if !user_provided_comments.contains(&doc_on_column_key) {
419 options.push(CsrConfigOption {
420 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
421 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
422 Value::String(comment_str.clone()),
423 )),
424 });
425 }
426 }
427 }
428 }
429 }
430 }
431 }
432
433 Ok(())
434}
435
436async fn purify_create_sink(
439 catalog: impl SessionCatalog,
440 mut create_sink_stmt: CreateSinkStatement<Aug>,
441 storage_configuration: &StorageConfiguration,
442) -> Result<PurifiedStatement, PlanError> {
443 let CreateSinkStatement {
445 connection,
446 format,
447 with_options,
448 name: _,
449 in_cluster: _,
450 if_not_exists: _,
451 from,
452 envelope: _,
453 } = &mut create_sink_stmt;
454
455 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
457 CreateSinkOptionName::Snapshot,
458 CreateSinkOptionName::CommitInterval,
459 ];
460
461 if let Some(op) = with_options
462 .iter()
463 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
464 {
465 sql_bail!(
466 "CREATE SINK...WITH ({}..) is not allowed",
467 op.name.to_ast_string_simple(),
468 )
469 }
470
471 match &connection {
472 CreateSinkConnection::Kafka {
473 connection,
474 options,
475 key: _,
476 headers: _,
477 } => {
478 let scx = StatementContext::new(None, &catalog);
483 let connection = {
484 let item = scx.get_item_by_resolved_name(connection)?;
485 match item.connection()? {
487 Connection::Kafka(connection) => {
488 connection.clone().into_inline_connection(scx.catalog)
489 }
490 _ => sql_bail!(
491 "{} is not a kafka connection",
492 scx.catalog.resolve_full_name(item.name())
493 ),
494 }
495 };
496
497 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
498
499 if extracted_options.legacy_ids == Some(true) {
500 sql_bail!("LEGACY IDs option is not supported");
501 }
502
503 let client: AdminClient<_> = connection
504 .create_with_context(
505 storage_configuration,
506 MzClientContext::default(),
507 &BTreeMap::new(),
508 InTask::No,
509 )
510 .await
511 .map_err(|e| {
512 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
514 })?;
515
516 let metadata = client
517 .inner()
518 .fetch_metadata(
519 None,
520 storage_configuration
521 .parameters
522 .kafka_timeout_config
523 .fetch_metadata_timeout,
524 )
525 .map_err(|e| {
526 KafkaSinkPurificationError::AdminClientError(Arc::new(
527 ContextCreationError::KafkaError(e),
528 ))
529 })?;
530
531 if metadata.brokers().len() == 0 {
532 Err(KafkaSinkPurificationError::ZeroBrokers)?;
533 }
534 }
535 CreateSinkConnection::Iceberg {
536 connection,
537 aws_connection,
538 ..
539 } => {
540 let scx = StatementContext::new(None, &catalog);
541 let connection = {
542 let item = scx.get_item_by_resolved_name(connection)?;
543 match item.connection()? {
545 Connection::IcebergCatalog(connection) => {
546 connection.clone().into_inline_connection(scx.catalog)
547 }
548 _ => sql_bail!(
549 "{} is not an iceberg connection",
550 scx.catalog.resolve_full_name(item.name())
551 ),
552 }
553 };
554
555 let aws_conn_id = aws_connection.item_id();
556
557 let aws_connection = {
558 let item = scx.get_item_by_resolved_name(aws_connection)?;
559 match item.connection()? {
561 Connection::Aws(aws_connection) => aws_connection.clone(),
562 _ => sql_bail!(
563 "{} is not an aws connection",
564 scx.catalog.resolve_full_name(item.name())
565 ),
566 }
567 };
568
569 let _catalog = connection
570 .connect(storage_configuration, InTask::No)
571 .await
572 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
573
574 let sdk_config = aws_connection
575 .load_sdk_config(
576 &storage_configuration.connection_context,
577 aws_conn_id.clone(),
578 InTask::No,
579 )
580 .await
581 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
582
583 let sts_client = aws_sdk_sts::Client::new(&sdk_config);
584 let _ = sts_client.get_caller_identity().send().await.map_err(|e| {
585 IcebergSinkPurificationError::StsIdentityError(Arc::new(e.into_service_error()))
586 })?;
587 }
588 }
589
590 let mut csr_connection_ids = BTreeSet::new();
591 for_each_format(format, |_, fmt| match fmt {
592 Format::Avro(AvroSchema::InlineSchema { .. })
593 | Format::Bytes
594 | Format::Csv { .. }
595 | Format::Json { .. }
596 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
597 | Format::Regex(..)
598 | Format::Text => (),
599 Format::Avro(AvroSchema::Csr {
600 csr_connection: CsrConnectionAvro { connection, .. },
601 })
602 | Format::Protobuf(ProtobufSchema::Csr {
603 csr_connection: CsrConnectionProtobuf { connection, .. },
604 }) => {
605 csr_connection_ids.insert(*connection.connection.item_id());
606 }
607 });
608
609 let scx = StatementContext::new(None, &catalog);
610 for csr_connection_id in csr_connection_ids {
611 let connection = {
612 let item = scx.get_item(&csr_connection_id);
613 match item.connection()? {
615 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
616 _ => Err(CsrPurificationError::NotCsrConnection(
617 scx.catalog.resolve_full_name(item.name()),
618 ))?,
619 }
620 };
621
622 let client = connection
623 .connect(storage_configuration, InTask::No)
624 .await
625 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
626
627 client
628 .list_subjects()
629 .await
630 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
631 }
632
633 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
634
635 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
636}
637
638fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
645where
646 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
647{
648 match format {
649 None => (),
650 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
651 Some(FormatSpecifier::KeyValue { key, value }) => {
652 f(DocOnSchema::KeyOnly, key);
653 f(DocOnSchema::ValueOnly, value);
654 }
655 }
656}
657
658#[derive(Debug, Copy, Clone, PartialEq, Eq)]
661pub(crate) enum SourceReferencePolicy {
662 NotAllowed,
665 Optional,
668 Required,
671}
672
673async fn purify_create_source(
674 catalog: impl SessionCatalog,
675 now: u64,
676 mut create_source_stmt: CreateSourceStatement<Aug>,
677 storage_configuration: &StorageConfiguration,
678) -> Result<PurifiedStatement, PlanError> {
679 let CreateSourceStatement {
680 name: source_name,
681 col_names,
682 key_constraint,
683 connection: source_connection,
684 format,
685 envelope,
686 include_metadata,
687 external_references,
688 progress_subsource,
689 with_options,
690 ..
691 } = &mut create_source_stmt;
692
693 let uses_old_syntax = !col_names.is_empty()
694 || key_constraint.is_some()
695 || format.is_some()
696 || envelope.is_some()
697 || !include_metadata.is_empty()
698 || external_references.is_some()
699 || progress_subsource.is_some();
700
701 if let Some(DeferredItemName::Named(_)) = progress_subsource {
702 sql_bail!("Cannot manually ID qualify progress subsource")
703 }
704
705 let mut requested_subsource_map = BTreeMap::new();
706
707 let progress_desc = match &source_connection {
708 CreateSourceConnection::Kafka { .. } => {
709 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
710 }
711 CreateSourceConnection::Postgres { .. } => {
712 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
713 }
714 CreateSourceConnection::SqlServer { .. } => {
715 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
716 }
717 CreateSourceConnection::MySql { .. } => {
718 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
719 }
720 CreateSourceConnection::LoadGenerator { .. } => {
721 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
722 }
723 };
724 let scx = StatementContext::new(None, &catalog);
725
726 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
730 && scx.catalog.system_vars().force_source_table_syntax()
731 {
732 SourceReferencePolicy::NotAllowed
733 } else if scx.catalog.system_vars().enable_create_table_from_source() {
734 SourceReferencePolicy::Optional
735 } else {
736 SourceReferencePolicy::Required
737 };
738
739 let mut format_options = SourceFormatOptions::Default;
740
741 let retrieved_source_references: RetrievedSourceReferences;
742
743 match source_connection {
744 CreateSourceConnection::Kafka {
745 connection,
746 options: base_with_options,
747 ..
748 } => {
749 if let Some(external_references) = external_references {
750 Err(KafkaSourcePurificationError::ReferencedSubsources(
751 external_references.clone(),
752 ))?;
753 }
754
755 let connection = {
756 let item = scx.get_item_by_resolved_name(connection)?;
757 match item.connection()? {
759 Connection::Kafka(connection) => {
760 connection.clone().into_inline_connection(&catalog)
761 }
762 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
763 scx.catalog.resolve_full_name(item.name()),
764 ))?,
765 }
766 };
767
768 let extracted_options: KafkaSourceConfigOptionExtracted =
769 base_with_options.clone().try_into()?;
770
771 let topic = extracted_options
772 .topic
773 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
774
775 let consumer = connection
776 .create_with_context(
777 storage_configuration,
778 MzClientContext::default(),
779 &BTreeMap::new(),
780 InTask::No,
781 )
782 .await
783 .map_err(|e| {
784 KafkaSourcePurificationError::KafkaConsumerError(
786 e.display_with_causes().to_string(),
787 )
788 })?;
789 let consumer = Arc::new(consumer);
790
791 match (
792 extracted_options.start_offset,
793 extracted_options.start_timestamp,
794 ) {
795 (None, None) => {
796 kafka_util::ensure_topic_exists(
798 Arc::clone(&consumer),
799 &topic,
800 storage_configuration
801 .parameters
802 .kafka_timeout_config
803 .fetch_metadata_timeout,
804 )
805 .await?;
806 }
807 (Some(_), Some(_)) => {
808 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
809 }
810 (Some(start_offsets), None) => {
811 kafka_util::validate_start_offsets(
813 Arc::clone(&consumer),
814 &topic,
815 start_offsets,
816 storage_configuration
817 .parameters
818 .kafka_timeout_config
819 .fetch_metadata_timeout,
820 )
821 .await?;
822 }
823 (None, Some(time_offset)) => {
824 let start_offsets = kafka_util::lookup_start_offsets(
826 Arc::clone(&consumer),
827 &topic,
828 time_offset,
829 now,
830 storage_configuration
831 .parameters
832 .kafka_timeout_config
833 .fetch_metadata_timeout,
834 )
835 .await?;
836
837 base_with_options.retain(|val| {
838 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
839 });
840 base_with_options.push(KafkaSourceConfigOption {
841 name: KafkaSourceConfigOptionName::StartOffset,
842 value: Some(WithOptionValue::Sequence(
843 start_offsets
844 .iter()
845 .map(|offset| {
846 WithOptionValue::Value(Value::Number(offset.to_string()))
847 })
848 .collect(),
849 )),
850 });
851 }
852 }
853
854 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
855 retrieved_source_references = reference_client.get_source_references().await?;
856
857 format_options = SourceFormatOptions::Kafka { topic };
858 }
859 CreateSourceConnection::Postgres {
860 connection,
861 options,
862 } => {
863 let connection_item = scx.get_item_by_resolved_name(connection)?;
864 let connection = match connection_item.connection().map_err(PlanError::from)? {
865 Connection::Postgres(connection) => {
866 connection.clone().into_inline_connection(&catalog)
867 }
868 _ => Err(PgSourcePurificationError::NotPgConnection(
869 scx.catalog.resolve_full_name(connection_item.name()),
870 ))?,
871 };
872 let crate::plan::statement::PgConfigOptionExtracted {
873 publication,
874 text_columns,
875 exclude_columns,
876 details,
877 ..
878 } = options.clone().try_into()?;
879 let publication =
880 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
881
882 if details.is_some() {
883 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
884 }
885
886 let client = connection
887 .validate(connection_item.id(), storage_configuration)
888 .await?;
889
890 let reference_client = SourceReferenceClient::Postgres {
891 client: &client,
892 publication: &publication,
893 database: &connection.database,
894 };
895 retrieved_source_references = reference_client.get_source_references().await?;
896
897 let postgres::PurifiedSourceExports {
898 source_exports: subsources,
899 normalized_text_columns,
900 } = postgres::purify_source_exports(
901 &client,
902 &retrieved_source_references,
903 external_references,
904 text_columns,
905 exclude_columns,
906 source_name,
907 &reference_policy,
908 )
909 .await?;
910
911 if let Some(text_cols_option) = options
912 .iter_mut()
913 .find(|option| option.name == PgConfigOptionName::TextColumns)
914 {
915 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
916 }
917
918 requested_subsource_map.extend(subsources);
919
920 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
923
924 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
926 let details = PostgresSourcePublicationDetails {
927 slot: format!(
928 "materialize_{}",
929 Uuid::new_v4().to_string().replace('-', "")
930 ),
931 timeline_id: Some(timeline_id),
932 database: connection.database,
933 };
934 options.push(PgConfigOption {
935 name: PgConfigOptionName::Details,
936 value: Some(WithOptionValue::Value(Value::String(hex::encode(
937 details.into_proto().encode_to_vec(),
938 )))),
939 })
940 }
941 CreateSourceConnection::SqlServer {
942 connection,
943 options,
944 } => {
945 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
946
947 let connection_item = scx.get_item_by_resolved_name(connection)?;
950 let connection = match connection_item.connection()? {
951 Connection::SqlServer(connection) => {
952 connection.clone().into_inline_connection(&catalog)
953 }
954 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
955 scx.catalog.resolve_full_name(connection_item.name()),
956 ))?,
957 };
958 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
959 details,
960 text_columns,
961 exclude_columns,
962 seen: _,
963 } = options.clone().try_into()?;
964
965 if details.is_some() {
966 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
967 }
968
969 let mut client = connection
970 .validate(connection_item.id(), storage_configuration)
971 .await?;
972
973 let database: Arc<str> = connection.database.into();
974 let reference_client = SourceReferenceClient::SqlServer {
975 client: &mut client,
976 database: Arc::clone(&database),
977 };
978 retrieved_source_references = reference_client.get_source_references().await?;
979 tracing::debug!(?retrieved_source_references, "got source references");
980
981 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
982 .get(storage_configuration.config_set());
983
984 let purified_source_exports = sql_server::purify_source_exports(
985 &*database,
986 &mut client,
987 &retrieved_source_references,
988 external_references,
989 &text_columns,
990 &exclude_columns,
991 source_name,
992 timeout,
993 &reference_policy,
994 )
995 .await?;
996
997 let sql_server::PurifiedSourceExports {
998 source_exports,
999 normalized_text_columns,
1000 normalized_excl_columns,
1001 } = purified_source_exports;
1002
1003 requested_subsource_map.extend(source_exports);
1005
1006 let restore_history_id =
1010 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1011 let details = SqlServerSourceExtras { restore_history_id };
1012
1013 options.retain(|SqlServerConfigOption { name, .. }| {
1014 name != &SqlServerConfigOptionName::Details
1015 });
1016 options.push(SqlServerConfigOption {
1017 name: SqlServerConfigOptionName::Details,
1018 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1019 details.into_proto().encode_to_vec(),
1020 )))),
1021 });
1022
1023 if let Some(text_cols_option) = options
1025 .iter_mut()
1026 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1027 {
1028 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1029 }
1030 if let Some(excl_cols_option) = options
1031 .iter_mut()
1032 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1033 {
1034 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1035 }
1036 }
1037 CreateSourceConnection::MySql {
1038 connection,
1039 options,
1040 } => {
1041 let connection_item = scx.get_item_by_resolved_name(connection)?;
1042 let connection = match connection_item.connection()? {
1043 Connection::MySql(connection) => {
1044 connection.clone().into_inline_connection(&catalog)
1045 }
1046 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1047 scx.catalog.resolve_full_name(connection_item.name()),
1048 ))?,
1049 };
1050 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1051 details,
1052 text_columns,
1053 exclude_columns,
1054 seen: _,
1055 } = options.clone().try_into()?;
1056
1057 if details.is_some() {
1058 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1059 }
1060
1061 let mut conn = connection
1062 .validate(connection_item.id(), storage_configuration)
1063 .await
1064 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1065
1066 let initial_gtid_set =
1070 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1071
1072 let reference_client = SourceReferenceClient::MySql {
1073 conn: &mut conn,
1074 include_system_schemas: mysql::references_system_schemas(external_references),
1075 };
1076 retrieved_source_references = reference_client.get_source_references().await?;
1077
1078 let mysql::PurifiedSourceExports {
1079 source_exports: subsources,
1080 normalized_text_columns,
1081 normalized_exclude_columns,
1082 } = mysql::purify_source_exports(
1083 &mut conn,
1084 &retrieved_source_references,
1085 external_references,
1086 text_columns,
1087 exclude_columns,
1088 source_name,
1089 initial_gtid_set.clone(),
1090 &reference_policy,
1091 )
1092 .await?;
1093 requested_subsource_map.extend(subsources);
1094
1095 let details = MySqlSourceDetails {};
1098 options
1100 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1101 options.push(MySqlConfigOption {
1102 name: MySqlConfigOptionName::Details,
1103 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1104 details.into_proto().encode_to_vec(),
1105 )))),
1106 });
1107
1108 if let Some(text_cols_option) = options
1109 .iter_mut()
1110 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1111 {
1112 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1113 }
1114 if let Some(exclude_cols_option) = options
1115 .iter_mut()
1116 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1117 {
1118 exclude_cols_option.value =
1119 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1120 }
1121 }
1122 CreateSourceConnection::LoadGenerator { generator, options } => {
1123 let load_generator =
1124 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1125
1126 let reference_client = SourceReferenceClient::LoadGenerator {
1127 generator: &load_generator,
1128 };
1129 retrieved_source_references = reference_client.get_source_references().await?;
1130 let multi_output_sources =
1134 retrieved_source_references
1135 .all_references()
1136 .iter()
1137 .any(|r| {
1138 r.load_generator_output().expect("is loadgen")
1139 != &LoadGeneratorOutput::Default
1140 });
1141
1142 match external_references {
1143 Some(requested)
1144 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1145 {
1146 Err(PlanError::UseTablesForSources(requested.to_string()))?
1147 }
1148 Some(requested) if !multi_output_sources => match requested {
1149 ExternalReferences::SubsetTables(_) => {
1150 Err(LoadGeneratorSourcePurificationError::ForTables)?
1151 }
1152 ExternalReferences::SubsetSchemas(_) => {
1153 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1154 }
1155 ExternalReferences::All => {
1156 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1157 }
1158 },
1159 Some(requested) => {
1160 let requested_exports = retrieved_source_references
1161 .requested_source_exports(Some(requested), source_name)?;
1162 for export in requested_exports {
1163 requested_subsource_map.insert(
1164 export.name,
1165 PurifiedSourceExport {
1166 external_reference: export.external_reference,
1167 details: PurifiedExportDetails::LoadGenerator {
1168 table: export
1169 .meta
1170 .load_generator_desc()
1171 .expect("is loadgen")
1172 .clone(),
1173 output: export
1174 .meta
1175 .load_generator_output()
1176 .expect("is loadgen")
1177 .clone(),
1178 },
1179 },
1180 );
1181 }
1182 }
1183 None => {
1184 if multi_output_sources
1185 && matches!(reference_policy, SourceReferencePolicy::Required)
1186 {
1187 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1188 }
1189 }
1190 }
1191
1192 if let LoadGenerator::Clock = generator {
1193 if !options
1194 .iter()
1195 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1196 {
1197 let now = catalog.now();
1198 options.push(LoadGeneratorOption {
1199 name: LoadGeneratorOptionName::AsOf,
1200 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1201 });
1202 }
1203 }
1204 }
1205 }
1206
1207 *external_references = None;
1211
1212 let create_progress_subsource_stmt = if uses_old_syntax {
1214 let name = match progress_subsource {
1216 Some(name) => match name {
1217 DeferredItemName::Deferred(name) => name.clone(),
1218 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1219 },
1220 None => {
1221 let (item, prefix) = source_name.0.split_last().unwrap();
1222 let item_name =
1223 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1224 let mut suggested_name = prefix.to_vec();
1225 suggested_name.push(candidate.clone());
1226
1227 let partial =
1228 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1229 let qualified = scx.allocate_qualified_name(partial)?;
1230 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1231 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1232 Ok::<_, PlanError>(!item_exists && !type_exists)
1233 })?;
1234
1235 let mut full_name = prefix.to_vec();
1236 full_name.push(item_name);
1237 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1238 let qualified_name = scx.allocate_qualified_name(full_name)?;
1239 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1240
1241 UnresolvedItemName::from(full_name.clone())
1242 }
1243 };
1244
1245 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1246
1247 let mut progress_with_options: Vec<_> = with_options
1249 .iter()
1250 .filter_map(|opt| match opt.name {
1251 CreateSourceOptionName::TimestampInterval => None,
1252 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1253 name: CreateSubsourceOptionName::RetainHistory,
1254 value: opt.value.clone(),
1255 }),
1256 })
1257 .collect();
1258 progress_with_options.push(CreateSubsourceOption {
1259 name: CreateSubsourceOptionName::Progress,
1260 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1261 });
1262
1263 Some(CreateSubsourceStatement {
1264 name,
1265 columns,
1266 of_source: None,
1270 constraints,
1271 if_not_exists: false,
1272 with_options: progress_with_options,
1273 })
1274 } else {
1275 None
1276 };
1277
1278 purify_source_format(
1279 &catalog,
1280 format,
1281 &format_options,
1282 envelope,
1283 storage_configuration,
1284 )
1285 .await?;
1286
1287 Ok(PurifiedStatement::PurifiedCreateSource {
1288 create_progress_subsource_stmt,
1289 create_source_stmt,
1290 subsources: requested_subsource_map,
1291 available_source_references: retrieved_source_references.available_source_references(),
1292 })
1293}
1294
1295async fn purify_alter_source(
1298 catalog: impl SessionCatalog,
1299 stmt: AlterSourceStatement<Aug>,
1300 storage_configuration: &StorageConfiguration,
1301) -> Result<PurifiedStatement, PlanError> {
1302 let scx = StatementContext::new(None, &catalog);
1303 let AlterSourceStatement {
1304 source_name: unresolved_source_name,
1305 action,
1306 if_exists,
1307 } = stmt;
1308
1309 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1311 Ok(item) => item,
1312 Err(_) if if_exists => {
1313 return Ok(PurifiedStatement::PurifiedAlterSource {
1314 alter_source_stmt: AlterSourceStatement {
1315 source_name: unresolved_source_name,
1316 action,
1317 if_exists,
1318 },
1319 });
1320 }
1321 Err(e) => return Err(e),
1322 };
1323
1324 let desc = match item.source_desc()? {
1326 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1327 None => {
1328 sql_bail!("cannot ALTER this type of source")
1329 }
1330 };
1331
1332 let source_name = item.name();
1333
1334 let resolved_source_name = ResolvedItemName::Item {
1335 id: item.id(),
1336 qualifiers: item.name().qualifiers.clone(),
1337 full_name: scx.catalog.resolve_full_name(source_name),
1338 print_id: true,
1339 version: RelationVersionSelector::Latest,
1340 };
1341
1342 let partial_name = scx.catalog.minimal_qualification(source_name);
1343
1344 match action {
1345 AlterSourceAction::AddSubsources {
1346 external_references,
1347 options,
1348 } => {
1349 if scx.catalog.system_vars().enable_create_table_from_source()
1350 && scx.catalog.system_vars().force_source_table_syntax()
1351 {
1352 Err(PlanError::UseTablesForSources(
1353 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1354 ))?;
1355 }
1356
1357 purify_alter_source_add_subsources(
1358 external_references,
1359 options,
1360 desc,
1361 partial_name,
1362 unresolved_source_name,
1363 resolved_source_name,
1364 storage_configuration,
1365 )
1366 .await
1367 }
1368 AlterSourceAction::RefreshReferences => {
1369 purify_alter_source_refresh_references(
1370 desc,
1371 resolved_source_name,
1372 storage_configuration,
1373 )
1374 .await
1375 }
1376 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1377 alter_source_stmt: AlterSourceStatement {
1378 source_name: unresolved_source_name,
1379 action,
1380 if_exists,
1381 },
1382 }),
1383 }
1384}
1385
1386async fn purify_alter_source_add_subsources(
1389 external_references: Vec<ExternalReferenceExport>,
1390 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1391 desc: SourceDesc,
1392 partial_source_name: PartialItemName,
1393 unresolved_source_name: UnresolvedItemName,
1394 resolved_source_name: ResolvedItemName,
1395 storage_configuration: &StorageConfiguration,
1396) -> Result<PurifiedStatement, PlanError> {
1397 let connection_id = match &desc.connection {
1399 GenericSourceConnection::Postgres(c) => c.connection_id,
1400 GenericSourceConnection::MySql(c) => c.connection_id,
1401 GenericSourceConnection::SqlServer(c) => c.connection_id,
1402 _ => sql_bail!(
1403 "source {} does not support ALTER SOURCE.",
1404 partial_source_name
1405 ),
1406 };
1407
1408 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1409 text_columns,
1410 exclude_columns,
1411 details,
1412 seen: _,
1413 } = options.clone().try_into()?;
1414 assert_none!(details, "details cannot be explicitly set");
1415
1416 let mut requested_subsource_map = BTreeMap::new();
1417
1418 match desc.connection {
1419 GenericSourceConnection::Postgres(pg_source_connection) => {
1420 let pg_connection = &pg_source_connection.connection;
1422
1423 let client = pg_connection
1424 .validate(connection_id, storage_configuration)
1425 .await?;
1426
1427 let reference_client = SourceReferenceClient::Postgres {
1428 client: &client,
1429 publication: &pg_source_connection.publication,
1430 database: &pg_connection.database,
1431 };
1432 let retrieved_source_references = reference_client.get_source_references().await?;
1433
1434 let postgres::PurifiedSourceExports {
1435 source_exports: subsources,
1436 normalized_text_columns,
1437 } = postgres::purify_source_exports(
1438 &client,
1439 &retrieved_source_references,
1440 &Some(ExternalReferences::SubsetTables(external_references)),
1441 text_columns,
1442 exclude_columns,
1443 &unresolved_source_name,
1444 &SourceReferencePolicy::Required,
1445 )
1446 .await?;
1447
1448 if let Some(text_cols_option) = options
1449 .iter_mut()
1450 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1451 {
1452 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1453 }
1454
1455 requested_subsource_map.extend(subsources);
1456 }
1457 GenericSourceConnection::MySql(mysql_source_connection) => {
1458 let mysql_connection = &mysql_source_connection.connection;
1459 let config = mysql_connection
1460 .config(
1461 &storage_configuration.connection_context.secrets_reader,
1462 storage_configuration,
1463 InTask::No,
1464 )
1465 .await?;
1466
1467 let mut conn = config
1468 .connect(
1469 "mysql purification",
1470 &storage_configuration.connection_context.ssh_tunnel_manager,
1471 )
1472 .await?;
1473
1474 let initial_gtid_set =
1477 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1478
1479 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1480
1481 let reference_client = SourceReferenceClient::MySql {
1482 conn: &mut conn,
1483 include_system_schemas: mysql::references_system_schemas(&requested_references),
1484 };
1485 let retrieved_source_references = reference_client.get_source_references().await?;
1486
1487 let mysql::PurifiedSourceExports {
1488 source_exports: subsources,
1489 normalized_text_columns,
1490 normalized_exclude_columns,
1491 } = mysql::purify_source_exports(
1492 &mut conn,
1493 &retrieved_source_references,
1494 &requested_references,
1495 text_columns,
1496 exclude_columns,
1497 &unresolved_source_name,
1498 initial_gtid_set,
1499 &SourceReferencePolicy::Required,
1500 )
1501 .await?;
1502 requested_subsource_map.extend(subsources);
1503
1504 if let Some(text_cols_option) = options
1506 .iter_mut()
1507 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1508 {
1509 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1510 }
1511 if let Some(exclude_cols_option) = options
1512 .iter_mut()
1513 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1514 {
1515 exclude_cols_option.value =
1516 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1517 }
1518 }
1519 GenericSourceConnection::SqlServer(sql_server_source) => {
1520 let sql_server_connection = &sql_server_source.connection;
1522 let config = sql_server_connection
1523 .resolve_config(
1524 &storage_configuration.connection_context.secrets_reader,
1525 storage_configuration,
1526 InTask::No,
1527 )
1528 .await?;
1529 let mut client = mz_sql_server_util::Client::connect(config).await?;
1530
1531 let database = sql_server_connection.database.clone().into();
1533 let source_references = SourceReferenceClient::SqlServer {
1534 client: &mut client,
1535 database: Arc::clone(&database),
1536 }
1537 .get_source_references()
1538 .await?;
1539 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1540
1541 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1542 .get(storage_configuration.config_set());
1543
1544 let result = sql_server::purify_source_exports(
1545 &*database,
1546 &mut client,
1547 &source_references,
1548 &requested_references,
1549 &text_columns,
1550 &exclude_columns,
1551 &unresolved_source_name,
1552 timeout,
1553 &SourceReferencePolicy::Required,
1554 )
1555 .await;
1556 let sql_server::PurifiedSourceExports {
1557 source_exports,
1558 normalized_text_columns,
1559 normalized_excl_columns,
1560 } = result?;
1561
1562 requested_subsource_map.extend(source_exports);
1564
1565 if let Some(text_cols_option) = options
1567 .iter_mut()
1568 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1569 {
1570 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1571 }
1572 if let Some(exclude_cols_option) = options
1573 .iter_mut()
1574 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1575 {
1576 exclude_cols_option.value =
1577 Some(WithOptionValue::Sequence(normalized_excl_columns));
1578 }
1579 }
1580 _ => unreachable!(),
1581 };
1582
1583 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1584 source_name: resolved_source_name,
1585 options,
1586 subsources: requested_subsource_map,
1587 })
1588}
1589
1590async fn purify_alter_source_refresh_references(
1591 desc: SourceDesc,
1592 resolved_source_name: ResolvedItemName,
1593 storage_configuration: &StorageConfiguration,
1594) -> Result<PurifiedStatement, PlanError> {
1595 let retrieved_source_references = match desc.connection {
1596 GenericSourceConnection::Postgres(pg_source_connection) => {
1597 let pg_connection = &pg_source_connection.connection;
1599
1600 let config = pg_connection
1601 .config(
1602 &storage_configuration.connection_context.secrets_reader,
1603 storage_configuration,
1604 InTask::No,
1605 )
1606 .await?;
1607
1608 let client = config
1609 .connect(
1610 "postgres_purification",
1611 &storage_configuration.connection_context.ssh_tunnel_manager,
1612 )
1613 .await?;
1614 let reference_client = SourceReferenceClient::Postgres {
1615 client: &client,
1616 publication: &pg_source_connection.publication,
1617 database: &pg_connection.database,
1618 };
1619 reference_client.get_source_references().await?
1620 }
1621 GenericSourceConnection::MySql(mysql_source_connection) => {
1622 let mysql_connection = &mysql_source_connection.connection;
1623 let config = mysql_connection
1624 .config(
1625 &storage_configuration.connection_context.secrets_reader,
1626 storage_configuration,
1627 InTask::No,
1628 )
1629 .await?;
1630
1631 let mut conn = config
1632 .connect(
1633 "mysql purification",
1634 &storage_configuration.connection_context.ssh_tunnel_manager,
1635 )
1636 .await?;
1637
1638 let reference_client = SourceReferenceClient::MySql {
1639 conn: &mut conn,
1640 include_system_schemas: false,
1641 };
1642 reference_client.get_source_references().await?
1643 }
1644 GenericSourceConnection::SqlServer(sql_server_source) => {
1645 let sql_server_connection = &sql_server_source.connection;
1647 let config = sql_server_connection
1648 .resolve_config(
1649 &storage_configuration.connection_context.secrets_reader,
1650 storage_configuration,
1651 InTask::No,
1652 )
1653 .await?;
1654 let mut client = mz_sql_server_util::Client::connect(config).await?;
1655
1656 let source_references = SourceReferenceClient::SqlServer {
1658 client: &mut client,
1659 database: sql_server_connection.database.clone().into(),
1660 }
1661 .get_source_references()
1662 .await?;
1663 source_references
1664 }
1665 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1666 let reference_client = SourceReferenceClient::LoadGenerator {
1667 generator: &load_gen_connection.load_generator,
1668 };
1669 reference_client.get_source_references().await?
1670 }
1671 GenericSourceConnection::Kafka(kafka_conn) => {
1672 let reference_client = SourceReferenceClient::Kafka {
1673 topic: &kafka_conn.topic,
1674 };
1675 reference_client.get_source_references().await?
1676 }
1677 };
1678 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1679 source_name: resolved_source_name,
1680 available_source_references: retrieved_source_references.available_source_references(),
1681 })
1682}
1683
1684async fn purify_create_table_from_source(
1685 catalog: impl SessionCatalog,
1686 mut stmt: CreateTableFromSourceStatement<Aug>,
1687 storage_configuration: &StorageConfiguration,
1688) -> Result<PurifiedStatement, PlanError> {
1689 let scx = StatementContext::new(None, &catalog);
1690 let CreateTableFromSourceStatement {
1691 name: _,
1692 columns,
1693 constraints,
1694 source: source_name,
1695 if_not_exists: _,
1696 external_reference,
1697 format,
1698 envelope,
1699 include_metadata: _,
1700 with_options,
1701 } = &mut stmt;
1702
1703 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1705 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1706 }
1707 if !constraints.is_empty() {
1708 sql_bail!(
1709 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1710 );
1711 }
1712
1713 let item = match scx.get_item_by_resolved_name(source_name) {
1715 Ok(item) => item,
1716 Err(e) => return Err(e),
1717 };
1718
1719 let desc = match item.source_desc()? {
1721 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1722 None => {
1723 sql_bail!("cannot ALTER this type of source")
1724 }
1725 };
1726 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1727
1728 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1729 text_columns,
1730 exclude_columns,
1731 retain_history: _,
1732 details,
1733 partition_by: _,
1734 seen: _,
1735 } = with_options.clone().try_into()?;
1736 assert_none!(details, "details cannot be explicitly set");
1737
1738 let qualified_text_columns = text_columns
1742 .iter()
1743 .map(|col| {
1744 UnresolvedItemName(
1745 external_reference
1746 .as_ref()
1747 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1748 .unwrap_or_else(|| vec![col.clone()]),
1749 )
1750 })
1751 .collect_vec();
1752 let qualified_exclude_columns = exclude_columns
1753 .iter()
1754 .map(|col| {
1755 UnresolvedItemName(
1756 external_reference
1757 .as_ref()
1758 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1759 .unwrap_or_else(|| vec![col.clone()]),
1760 )
1761 })
1762 .collect_vec();
1763
1764 let mut format_options = SourceFormatOptions::Default;
1766
1767 let retrieved_source_references: RetrievedSourceReferences;
1768
1769 let requested_references = external_reference.as_ref().map(|ref_name| {
1770 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1771 reference: ref_name.clone(),
1772 alias: None,
1773 }])
1774 });
1775
1776 let purified_export = match desc.connection {
1779 GenericSourceConnection::Postgres(pg_source_connection) => {
1780 let pg_connection = &pg_source_connection.connection;
1782
1783 let client = pg_connection
1784 .validate(pg_source_connection.connection_id, storage_configuration)
1785 .await?;
1786
1787 let reference_client = SourceReferenceClient::Postgres {
1788 client: &client,
1789 publication: &pg_source_connection.publication,
1790 database: &pg_connection.database,
1791 };
1792 retrieved_source_references = reference_client.get_source_references().await?;
1793
1794 let postgres::PurifiedSourceExports {
1795 source_exports,
1796 normalized_text_columns: _,
1800 } = postgres::purify_source_exports(
1801 &client,
1802 &retrieved_source_references,
1803 &requested_references,
1804 qualified_text_columns,
1805 qualified_exclude_columns,
1806 &unresolved_source_name,
1807 &SourceReferencePolicy::Required,
1808 )
1809 .await?;
1810 let (_, purified_export) = source_exports.into_element();
1812 purified_export
1813 }
1814 GenericSourceConnection::MySql(mysql_source_connection) => {
1815 let mysql_connection = &mysql_source_connection.connection;
1816 let config = mysql_connection
1817 .config(
1818 &storage_configuration.connection_context.secrets_reader,
1819 storage_configuration,
1820 InTask::No,
1821 )
1822 .await?;
1823
1824 let mut conn = config
1825 .connect(
1826 "mysql purification",
1827 &storage_configuration.connection_context.ssh_tunnel_manager,
1828 )
1829 .await?;
1830
1831 let initial_gtid_set =
1834 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1835
1836 let reference_client = SourceReferenceClient::MySql {
1837 conn: &mut conn,
1838 include_system_schemas: mysql::references_system_schemas(&requested_references),
1839 };
1840 retrieved_source_references = reference_client.get_source_references().await?;
1841
1842 let mysql::PurifiedSourceExports {
1843 source_exports,
1844 normalized_text_columns: _,
1848 normalized_exclude_columns: _,
1849 } = mysql::purify_source_exports(
1850 &mut conn,
1851 &retrieved_source_references,
1852 &requested_references,
1853 qualified_text_columns,
1854 qualified_exclude_columns,
1855 &unresolved_source_name,
1856 initial_gtid_set,
1857 &SourceReferencePolicy::Required,
1858 )
1859 .await?;
1860 let (_, purified_export) = source_exports.into_element();
1862 purified_export
1863 }
1864 GenericSourceConnection::SqlServer(sql_server_source) => {
1865 let connection = sql_server_source.connection;
1866 let config = connection
1867 .resolve_config(
1868 &storage_configuration.connection_context.secrets_reader,
1869 storage_configuration,
1870 InTask::No,
1871 )
1872 .await?;
1873 let mut client = mz_sql_server_util::Client::connect(config).await?;
1874
1875 let database: Arc<str> = connection.database.into();
1876 let reference_client = SourceReferenceClient::SqlServer {
1877 client: &mut client,
1878 database: Arc::clone(&database),
1879 };
1880 retrieved_source_references = reference_client.get_source_references().await?;
1881 tracing::debug!(?retrieved_source_references, "got source references");
1882
1883 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1884 .get(storage_configuration.config_set());
1885
1886 let purified_source_exports = sql_server::purify_source_exports(
1887 &*database,
1888 &mut client,
1889 &retrieved_source_references,
1890 &requested_references,
1891 &qualified_text_columns,
1892 &qualified_exclude_columns,
1893 &unresolved_source_name,
1894 timeout,
1895 &SourceReferencePolicy::Required,
1896 )
1897 .await?;
1898
1899 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1901 purified_export
1902 }
1903 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1904 let reference_client = SourceReferenceClient::LoadGenerator {
1905 generator: &load_gen_connection.load_generator,
1906 };
1907 retrieved_source_references = reference_client.get_source_references().await?;
1908
1909 let requested_exports = retrieved_source_references
1910 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1911 let export = requested_exports.into_element();
1913 PurifiedSourceExport {
1914 external_reference: export.external_reference,
1915 details: PurifiedExportDetails::LoadGenerator {
1916 table: export
1917 .meta
1918 .load_generator_desc()
1919 .expect("is loadgen")
1920 .clone(),
1921 output: export
1922 .meta
1923 .load_generator_output()
1924 .expect("is loadgen")
1925 .clone(),
1926 },
1927 }
1928 }
1929 GenericSourceConnection::Kafka(kafka_conn) => {
1930 let reference_client = SourceReferenceClient::Kafka {
1931 topic: &kafka_conn.topic,
1932 };
1933 retrieved_source_references = reference_client.get_source_references().await?;
1934 let requested_exports = retrieved_source_references
1935 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1936 let export = requested_exports.into_element();
1938
1939 format_options = SourceFormatOptions::Kafka {
1940 topic: kafka_conn.topic.clone(),
1941 };
1942 PurifiedSourceExport {
1943 external_reference: export.external_reference,
1944 details: PurifiedExportDetails::Kafka {},
1945 }
1946 }
1947 };
1948
1949 purify_source_format(
1950 &catalog,
1951 format,
1952 &format_options,
1953 envelope,
1954 storage_configuration,
1955 )
1956 .await?;
1957
1958 *external_reference = Some(purified_export.external_reference.clone());
1961
1962 match &purified_export.details {
1964 PurifiedExportDetails::Postgres { .. } => {
1965 let mut unsupported_cols = vec![];
1966 let postgres::PostgresExportStatementValues {
1967 columns: gen_columns,
1968 constraints: gen_constraints,
1969 text_columns: gen_text_columns,
1970 exclude_columns: gen_exclude_columns,
1971 details: gen_details,
1972 external_reference: _,
1973 } = postgres::generate_source_export_statement_values(
1974 &scx,
1975 purified_export,
1976 &mut unsupported_cols,
1977 )?;
1978 if !unsupported_cols.is_empty() {
1979 unsupported_cols.sort();
1980 Err(PgSourcePurificationError::UnrecognizedTypes {
1981 cols: unsupported_cols,
1982 })?;
1983 }
1984
1985 if let Some(text_cols_option) = with_options
1986 .iter_mut()
1987 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1988 {
1989 match gen_text_columns {
1990 Some(gen_text_columns) => {
1991 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1992 }
1993 None => soft_panic_or_log!(
1994 "text_columns should be Some if text_cols_option is present"
1995 ),
1996 }
1997 }
1998 if let Some(exclude_cols_option) = with_options
1999 .iter_mut()
2000 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2001 {
2002 match gen_exclude_columns {
2003 Some(gen_exclude_columns) => {
2004 exclude_cols_option.value =
2005 Some(WithOptionValue::Sequence(gen_exclude_columns))
2006 }
2007 None => soft_panic_or_log!(
2008 "exclude_columns should be Some if exclude_cols_option is present"
2009 ),
2010 }
2011 }
2012 match columns {
2013 TableFromSourceColumns::Defined(_) => unreachable!(),
2014 TableFromSourceColumns::NotSpecified => {
2015 *columns = TableFromSourceColumns::Defined(gen_columns);
2016 *constraints = gen_constraints;
2017 }
2018 TableFromSourceColumns::Named(_) => {
2019 sql_bail!("columns cannot be named for Postgres sources")
2020 }
2021 }
2022 with_options.push(TableFromSourceOption {
2023 name: TableFromSourceOptionName::Details,
2024 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2025 gen_details.into_proto().encode_to_vec(),
2026 )))),
2027 })
2028 }
2029 PurifiedExportDetails::MySql { .. } => {
2030 let mysql::MySqlExportStatementValues {
2031 columns: gen_columns,
2032 constraints: gen_constraints,
2033 text_columns: gen_text_columns,
2034 exclude_columns: gen_exclude_columns,
2035 details: gen_details,
2036 external_reference: _,
2037 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2038
2039 if let Some(text_cols_option) = with_options
2040 .iter_mut()
2041 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2042 {
2043 match gen_text_columns {
2044 Some(gen_text_columns) => {
2045 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2046 }
2047 None => soft_panic_or_log!(
2048 "text_columns should be Some if text_cols_option is present"
2049 ),
2050 }
2051 }
2052 if let Some(exclude_cols_option) = with_options
2053 .iter_mut()
2054 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2055 {
2056 match gen_exclude_columns {
2057 Some(gen_exclude_columns) => {
2058 exclude_cols_option.value =
2059 Some(WithOptionValue::Sequence(gen_exclude_columns))
2060 }
2061 None => soft_panic_or_log!(
2062 "exclude_columns should be Some if exclude_cols_option is present"
2063 ),
2064 }
2065 }
2066 match columns {
2067 TableFromSourceColumns::Defined(_) => unreachable!(),
2068 TableFromSourceColumns::NotSpecified => {
2069 *columns = TableFromSourceColumns::Defined(gen_columns);
2070 *constraints = gen_constraints;
2071 }
2072 TableFromSourceColumns::Named(_) => {
2073 sql_bail!("columns cannot be named for MySQL sources")
2074 }
2075 }
2076 with_options.push(TableFromSourceOption {
2077 name: TableFromSourceOptionName::Details,
2078 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2079 gen_details.into_proto().encode_to_vec(),
2080 )))),
2081 })
2082 }
2083 PurifiedExportDetails::SqlServer { .. } => {
2084 let sql_server::SqlServerExportStatementValues {
2085 columns: gen_columns,
2086 constraints: gen_constraints,
2087 text_columns: gen_text_columns,
2088 excl_columns: gen_excl_columns,
2089 details: gen_details,
2090 external_reference: _,
2091 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2092
2093 if let Some(text_cols_option) = with_options
2094 .iter_mut()
2095 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2096 {
2097 match gen_text_columns {
2098 Some(gen_text_columns) => {
2099 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2100 }
2101 None => soft_panic_or_log!(
2102 "text_columns should be Some if text_cols_option is present"
2103 ),
2104 }
2105 }
2106 if let Some(exclude_cols_option) = with_options
2107 .iter_mut()
2108 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2109 {
2110 match gen_excl_columns {
2111 Some(gen_excl_columns) => {
2112 exclude_cols_option.value =
2113 Some(WithOptionValue::Sequence(gen_excl_columns))
2114 }
2115 None => soft_panic_or_log!(
2116 "excl_columns should be Some if excl_cols_option is present"
2117 ),
2118 }
2119 }
2120
2121 match columns {
2122 TableFromSourceColumns::NotSpecified => {
2123 *columns = TableFromSourceColumns::Defined(gen_columns);
2124 *constraints = gen_constraints;
2125 }
2126 TableFromSourceColumns::Named(_) => {
2127 sql_bail!("columns cannot be named for SQL Server sources")
2128 }
2129 TableFromSourceColumns::Defined(_) => unreachable!(),
2130 }
2131
2132 with_options.push(TableFromSourceOption {
2133 name: TableFromSourceOptionName::Details,
2134 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2135 gen_details.into_proto().encode_to_vec(),
2136 )))),
2137 })
2138 }
2139 PurifiedExportDetails::LoadGenerator { .. } => {
2140 let (desc, output) = match purified_export.details {
2141 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2142 _ => unreachable!("purified export details must be load generator"),
2143 };
2144 if let Some(desc) = desc {
2149 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2150 match columns {
2151 TableFromSourceColumns::Defined(_) => unreachable!(),
2152 TableFromSourceColumns::NotSpecified => {
2153 *columns = TableFromSourceColumns::Defined(gen_columns);
2154 *constraints = gen_constraints;
2155 }
2156 TableFromSourceColumns::Named(_) => {
2157 sql_bail!("columns cannot be named for multi-output load generator sources")
2158 }
2159 }
2160 }
2161 let details = SourceExportStatementDetails::LoadGenerator { output };
2162 with_options.push(TableFromSourceOption {
2163 name: TableFromSourceOptionName::Details,
2164 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2165 details.into_proto().encode_to_vec(),
2166 )))),
2167 })
2168 }
2169 PurifiedExportDetails::Kafka {} => {
2170 let details = SourceExportStatementDetails::Kafka {};
2174 with_options.push(TableFromSourceOption {
2175 name: TableFromSourceOptionName::Details,
2176 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2177 details.into_proto().encode_to_vec(),
2178 )))),
2179 })
2180 }
2181 };
2182
2183 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2187}
2188
2189enum SourceFormatOptions {
2190 Default,
2191 Kafka { topic: String },
2192}
2193
2194async fn purify_source_format(
2195 catalog: &dyn SessionCatalog,
2196 format: &mut Option<FormatSpecifier<Aug>>,
2197 options: &SourceFormatOptions,
2198 envelope: &Option<SourceEnvelope>,
2199 storage_configuration: &StorageConfiguration,
2200) -> Result<(), PlanError> {
2201 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2202 && !matches!(options, SourceFormatOptions::Kafka { .. })
2203 {
2204 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2205 }
2206
2207 match format.as_mut() {
2208 None => {}
2209 Some(FormatSpecifier::Bare(format)) => {
2210 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2211 .await?;
2212 }
2213
2214 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2215 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2216 .await?;
2217 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2218 .await?;
2219 }
2220 }
2221 Ok(())
2222}
2223
2224async fn purify_source_format_single(
2225 catalog: &dyn SessionCatalog,
2226 format: &mut Format<Aug>,
2227 options: &SourceFormatOptions,
2228 envelope: &Option<SourceEnvelope>,
2229 storage_configuration: &StorageConfiguration,
2230) -> Result<(), PlanError> {
2231 match format {
2232 Format::Avro(schema) => match schema {
2233 AvroSchema::Csr { csr_connection } => {
2234 purify_csr_connection_avro(
2235 catalog,
2236 options,
2237 csr_connection,
2238 envelope,
2239 storage_configuration,
2240 )
2241 .await?
2242 }
2243 AvroSchema::InlineSchema { .. } => {}
2244 },
2245 Format::Protobuf(schema) => match schema {
2246 ProtobufSchema::Csr { csr_connection } => {
2247 purify_csr_connection_proto(
2248 catalog,
2249 options,
2250 csr_connection,
2251 envelope,
2252 storage_configuration,
2253 )
2254 .await?;
2255 }
2256 ProtobufSchema::InlineSchema { .. } => {}
2257 },
2258 Format::Bytes
2259 | Format::Regex(_)
2260 | Format::Json { .. }
2261 | Format::Text
2262 | Format::Csv { .. } => (),
2263 }
2264 Ok(())
2265}
2266
2267pub fn generate_subsource_statements(
2268 scx: &StatementContext,
2269 source_name: ResolvedItemName,
2270 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2271) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2272 if subsources.is_empty() {
2274 return Ok(vec![]);
2275 }
2276 let (_, purified_export) = subsources.iter().next().unwrap();
2277
2278 let statements = match &purified_export.details {
2279 PurifiedExportDetails::Postgres { .. } => {
2280 crate::pure::postgres::generate_create_subsource_statements(
2281 scx,
2282 source_name,
2283 subsources,
2284 )?
2285 }
2286 PurifiedExportDetails::MySql { .. } => {
2287 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2288 }
2289 PurifiedExportDetails::SqlServer { .. } => {
2290 crate::pure::sql_server::generate_create_subsource_statements(
2291 scx,
2292 source_name,
2293 subsources,
2294 )?
2295 }
2296 PurifiedExportDetails::LoadGenerator { .. } => {
2297 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2298 for (subsource_name, purified_export) in subsources {
2299 let (desc, output) = match purified_export.details {
2300 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2301 _ => unreachable!("purified export details must be load generator"),
2302 };
2303 let desc =
2304 desc.expect("subsources cannot be generated for single-output load generators");
2305
2306 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2307 let details = SourceExportStatementDetails::LoadGenerator { output };
2308 let subsource = CreateSubsourceStatement {
2310 name: subsource_name,
2311 columns,
2312 of_source: Some(source_name.clone()),
2313 constraints: table_constraints,
2318 if_not_exists: false,
2319 with_options: vec![
2320 CreateSubsourceOption {
2321 name: CreateSubsourceOptionName::ExternalReference,
2322 value: Some(WithOptionValue::UnresolvedItemName(
2323 purified_export.external_reference,
2324 )),
2325 },
2326 CreateSubsourceOption {
2327 name: CreateSubsourceOptionName::Details,
2328 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2329 details.into_proto().encode_to_vec(),
2330 )))),
2331 },
2332 ],
2333 };
2334 subsource_stmts.push(subsource);
2335 }
2336
2337 subsource_stmts
2338 }
2339 PurifiedExportDetails::Kafka { .. } => {
2340 assert!(
2344 subsources.is_empty(),
2345 "Kafka sources do not produce data-bearing subsources"
2346 );
2347 vec![]
2348 }
2349 };
2350 Ok(statements)
2351}
2352
2353async fn purify_csr_connection_proto(
2354 catalog: &dyn SessionCatalog,
2355 options: &SourceFormatOptions,
2356 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2357 envelope: &Option<SourceEnvelope>,
2358 storage_configuration: &StorageConfiguration,
2359) -> Result<(), PlanError> {
2360 let SourceFormatOptions::Kafka { topic } = options else {
2361 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2362 };
2363
2364 let CsrConnectionProtobuf {
2365 seed,
2366 connection: CsrConnection {
2367 connection,
2368 options: _,
2369 },
2370 } = csr_connection;
2371 match seed {
2372 None => {
2373 let scx = StatementContext::new(None, &*catalog);
2374
2375 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2376 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2377 _ => sql_bail!("{} is not a schema registry connection", connection),
2378 };
2379
2380 let ccsr_client = ccsr_connection
2381 .connect(storage_configuration, InTask::No)
2382 .await
2383 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2384
2385 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2386 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2387 .await
2388 .ok();
2389
2390 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2391 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2392 }
2393
2394 *seed = Some(CsrSeedProtobuf { value, key });
2395 }
2396 Some(_) => (),
2397 }
2398
2399 Ok(())
2400}
2401
2402async fn purify_csr_connection_avro(
2403 catalog: &dyn SessionCatalog,
2404 options: &SourceFormatOptions,
2405 csr_connection: &mut CsrConnectionAvro<Aug>,
2406 envelope: &Option<SourceEnvelope>,
2407 storage_configuration: &StorageConfiguration,
2408) -> Result<(), PlanError> {
2409 let SourceFormatOptions::Kafka { topic } = options else {
2410 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2411 };
2412
2413 let CsrConnectionAvro {
2414 connection: CsrConnection { connection, .. },
2415 seed,
2416 key_strategy,
2417 value_strategy,
2418 } = csr_connection;
2419 if seed.is_none() {
2420 let scx = StatementContext::new(None, &*catalog);
2421 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2422 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2423 _ => sql_bail!("{} is not a schema registry connection", connection),
2424 };
2425 let ccsr_client = csr_connection
2426 .connect(storage_configuration, InTask::No)
2427 .await
2428 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2429
2430 let Schema {
2431 key_schema,
2432 value_schema,
2433 } = get_remote_csr_schema(
2434 &ccsr_client,
2435 key_strategy.clone().unwrap_or_default(),
2436 value_strategy.clone().unwrap_or_default(),
2437 topic,
2438 )
2439 .await?;
2440 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2441 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2442 }
2443
2444 *seed = Some(CsrSeedAvro {
2445 key_schema,
2446 value_schema,
2447 })
2448 }
2449
2450 Ok(())
2451}
2452
2453#[derive(Debug)]
2454pub struct Schema {
2455 pub key_schema: Option<String>,
2456 pub value_schema: String,
2457}
2458
2459async fn get_schema_with_strategy(
2460 client: &Client,
2461 strategy: ReaderSchemaSelectionStrategy,
2462 subject: &str,
2463) -> Result<Option<String>, PlanError> {
2464 match strategy {
2465 ReaderSchemaSelectionStrategy::Latest => {
2466 match client.get_schema_by_subject(subject).await {
2467 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2468 Err(GetBySubjectError::SubjectNotFound)
2469 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2470 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2471 schema_lookup: format!("subject {}", subject.quoted()),
2472 cause: Arc::new(e),
2473 }),
2474 }
2475 }
2476 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2477 ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2478 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2479 Err(GetByIdError::SchemaNotFound) => Ok(None),
2480 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2481 schema_lookup: format!("ID {}", id),
2482 cause: Arc::new(e),
2483 }),
2484 },
2485 }
2486}
2487
2488async fn get_remote_csr_schema(
2489 ccsr_client: &mz_ccsr::Client,
2490 key_strategy: ReaderSchemaSelectionStrategy,
2491 value_strategy: ReaderSchemaSelectionStrategy,
2492 topic: &str,
2493) -> Result<Schema, PlanError> {
2494 let value_schema_name = format!("{}-value", topic);
2495 let value_schema =
2496 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2497 let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2498 let subject = format!("{}-key", topic);
2499 let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2500 Ok(Schema {
2501 key_schema,
2502 value_schema,
2503 })
2504}
2505
2506async fn compile_proto(
2508 subject_name: &String,
2509 ccsr_client: &Client,
2510) -> Result<CsrSeedProtobufSchema, PlanError> {
2511 let (primary_subject, dependency_subjects) = ccsr_client
2512 .get_subject_and_references(subject_name)
2513 .await
2514 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2515 schema_lookup: format!("subject {}", subject_name.quoted()),
2516 cause: Arc::new(e),
2517 })?;
2518
2519 let mut source_tree = VirtualSourceTree::new();
2521 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2522 source_tree.as_mut().add_file(
2523 Path::new(&subject.name),
2524 subject.schema.raw.as_bytes().to_vec(),
2525 );
2526 }
2527 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2528 let fds = db
2529 .as_mut()
2530 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2531 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2532
2533 let primary_fd = fds.file(0);
2535 let message_name = match primary_fd.message_type_size() {
2536 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2537 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2538 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2539 };
2540
2541 let bytes = &fds
2543 .serialize()
2544 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2545 let mut schema = String::new();
2546 strconv::format_bytes(&mut schema, bytes);
2547
2548 Ok(CsrSeedProtobufSchema {
2549 schema,
2550 message_name,
2551 })
2552}
2553
2554const MZ_NOW_NAME: &str = "mz_now";
2555const MZ_NOW_SCHEMA: &str = "mz_catalog";
2556
2557pub fn purify_create_materialized_view_options(
2563 catalog: impl SessionCatalog,
2564 mz_now: Option<Timestamp>,
2565 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2566 resolved_ids: &mut ResolvedIds,
2567) {
2568 let (mz_now_id, mz_now_expr) = {
2571 let item = catalog
2572 .resolve_function(&PartialItemName {
2573 database: None,
2574 schema: Some(MZ_NOW_SCHEMA.to_string()),
2575 item: MZ_NOW_NAME.to_string(),
2576 })
2577 .expect("we should be able to resolve mz_now");
2578 (
2579 item.id(),
2580 Expr::Function(Function {
2581 name: ResolvedItemName::Item {
2582 id: item.id(),
2583 qualifiers: item.name().qualifiers.clone(),
2584 full_name: catalog.resolve_full_name(item.name()),
2585 print_id: false,
2586 version: RelationVersionSelector::Latest,
2587 },
2588 args: FunctionArgs::Args {
2589 args: Vec::new(),
2590 order_by: Vec::new(),
2591 },
2592 filter: None,
2593 over: None,
2594 distinct: false,
2595 }),
2596 )
2597 };
2598 let (mz_timestamp_id, mz_timestamp_type) = {
2600 let item = catalog.get_system_type("mz_timestamp");
2601 let full_name = catalog.resolve_full_name(item.name());
2602 (
2603 item.id(),
2604 ResolvedDataType::Named {
2605 id: item.id(),
2606 qualifiers: item.name().qualifiers.clone(),
2607 full_name,
2608 modifiers: vec![],
2609 print_id: true,
2610 },
2611 )
2612 };
2613
2614 let mut introduced_mz_timestamp = false;
2615
2616 for option in cmvs.with_options.iter_mut() {
2617 if matches!(
2619 option.value,
2620 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2621 ) {
2622 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2623 RefreshAtOptionValue {
2624 time: mz_now_expr.clone(),
2625 },
2626 )));
2627 }
2628
2629 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2631 RefreshEveryOptionValue { aligned_to, .. },
2632 ))) = &mut option.value
2633 {
2634 if aligned_to.is_none() {
2635 *aligned_to = Some(mz_now_expr.clone());
2636 }
2637 }
2638
2639 match &mut option.value {
2642 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2643 time,
2644 }))) => {
2645 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2646 visitor.visit_expr_mut(time);
2647 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2648 }
2649 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2650 RefreshEveryOptionValue {
2651 interval: _,
2652 aligned_to: Some(aligned_to),
2653 },
2654 ))) => {
2655 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2656 visitor.visit_expr_mut(aligned_to);
2657 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2658 }
2659 _ => {}
2660 }
2661 }
2662
2663 if !cmvs.with_options.iter().any(|o| {
2665 matches!(
2666 o,
2667 MaterializedViewOption {
2668 value: Some(WithOptionValue::Refresh(..)),
2669 ..
2670 }
2671 )
2672 }) {
2673 cmvs.with_options.push(MaterializedViewOption {
2674 name: MaterializedViewOptionName::Refresh,
2675 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2676 })
2677 }
2678
2679 if introduced_mz_timestamp {
2683 resolved_ids.add_item(mz_timestamp_id);
2684 }
2685 let mut visitor = ExprContainsTemporalVisitor::new();
2689 visitor.visit_create_materialized_view_statement(cmvs);
2690 if !visitor.contains_temporal {
2691 resolved_ids.remove_item(&mz_now_id);
2692 }
2693}
2694
2695pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2698 match &mvo.value {
2699 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2700 let mut visitor = ExprContainsTemporalVisitor::new();
2701 visitor.visit_expr(time);
2702 visitor.contains_temporal
2703 }
2704 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2705 interval: _,
2706 aligned_to: Some(aligned_to),
2707 }))) => {
2708 let mut visitor = ExprContainsTemporalVisitor::new();
2709 visitor.visit_expr(aligned_to);
2710 visitor.contains_temporal
2711 }
2712 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2713 interval: _,
2714 aligned_to: None,
2715 }))) => {
2716 true
2719 }
2720 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2721 true
2723 }
2724 _ => false,
2725 }
2726}
2727
2728struct ExprContainsTemporalVisitor {
2730 pub contains_temporal: bool,
2731}
2732
2733impl ExprContainsTemporalVisitor {
2734 pub fn new() -> ExprContainsTemporalVisitor {
2735 ExprContainsTemporalVisitor {
2736 contains_temporal: false,
2737 }
2738 }
2739}
2740
2741impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2742 fn visit_function(&mut self, func: &Function<Aug>) {
2743 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2744 visit_function(self, func);
2745 }
2746}
2747
2748struct MzNowPurifierVisitor {
2749 pub mz_now: Option<Timestamp>,
2750 pub mz_timestamp_type: ResolvedDataType,
2751 pub introduced_mz_timestamp: bool,
2752}
2753
2754impl MzNowPurifierVisitor {
2755 pub fn new(
2756 mz_now: Option<Timestamp>,
2757 mz_timestamp_type: ResolvedDataType,
2758 ) -> MzNowPurifierVisitor {
2759 MzNowPurifierVisitor {
2760 mz_now,
2761 mz_timestamp_type,
2762 introduced_mz_timestamp: false,
2763 }
2764 }
2765}
2766
2767impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2768 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2769 match expr {
2770 Expr::Function(Function {
2771 name:
2772 ResolvedItemName::Item {
2773 full_name: FullItemName { item, .. },
2774 ..
2775 },
2776 ..
2777 }) if item == &MZ_NOW_NAME.to_string() => {
2778 let mz_now = self.mz_now.expect(
2779 "we should have chosen a timestamp if the expression contains mz_now()",
2780 );
2781 *expr = Expr::Cast {
2784 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2785 data_type: self.mz_timestamp_type.clone(),
2786 };
2787 self.introduced_mz_timestamp = true;
2788 }
2789 _ => visit_expr_mut(self, expr),
2790 }
2791 }
2792}