1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
62};
63use prost::Message;
64use protobuf_native::MessageLite;
65use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
66use rdkafka::admin::AdminClient;
67use references::{RetrievedSourceReferences, SourceReferenceClient};
68use uuid::Uuid;
69
70use crate::ast::{
71 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
72 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
73 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
74};
75use crate::catalog::{CatalogItemType, SessionCatalog};
76use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
77use crate::names::{
78 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
79 ResolvedItemName,
80};
81use crate::plan::error::PlanError;
82use crate::plan::statement::ddl::load_generator_ast_to_generator;
83use crate::plan::{SourceReferences, StatementContext};
84use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
85use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
86use crate::{kafka_util, normalize};
87
88use self::error::{
89 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
90 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
91};
92
93pub(crate) mod error;
94mod references;
95
96pub mod mysql;
97pub mod postgres;
98pub mod sql_server;
99
100pub(crate) struct RequestedSourceExport<T> {
101 external_reference: UnresolvedItemName,
102 name: UnresolvedItemName,
103 meta: T,
104}
105
106impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.debug_struct("RequestedSourceExport")
109 .field("external_reference", &self.external_reference)
110 .field("name", &self.name)
111 .field("meta", &self.meta)
112 .finish()
113 }
114}
115
116impl<T> RequestedSourceExport<T> {
117 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
118 RequestedSourceExport {
119 external_reference: self.external_reference,
120 name: self.name,
121 meta: new_meta,
122 }
123 }
124}
125
126fn source_export_name_gen(
131 source_name: &UnresolvedItemName,
132 subsource_name: &str,
133) -> Result<UnresolvedItemName, PlanError> {
134 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
135 partial.item = subsource_name.to_string();
136 Ok(UnresolvedItemName::from(partial))
137}
138
139fn validate_source_export_names<T>(
143 requested_source_exports: &[RequestedSourceExport<T>],
144) -> Result<(), PlanError> {
145 if let Some(name) = requested_source_exports
149 .iter()
150 .map(|subsource| &subsource.name)
151 .duplicates()
152 .next()
153 .cloned()
154 {
155 let mut upstream_references: Vec<_> = requested_source_exports
156 .into_iter()
157 .filter_map(|subsource| {
158 if &subsource.name == &name {
159 Some(subsource.external_reference.clone())
160 } else {
161 None
162 }
163 })
164 .collect();
165
166 upstream_references.sort();
167
168 Err(PlanError::SubsourceNameConflict {
169 name,
170 upstream_references,
171 })?;
172 }
173
174 if let Some(name) = requested_source_exports
180 .iter()
181 .map(|export| &export.external_reference)
182 .duplicates()
183 .next()
184 .cloned()
185 {
186 let mut target_names: Vec<_> = requested_source_exports
187 .into_iter()
188 .filter_map(|export| {
189 if &export.external_reference == &name {
190 Some(export.name.clone())
191 } else {
192 None
193 }
194 })
195 .collect();
196
197 target_names.sort();
198
199 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
200 }
201
202 Ok(())
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum PurifiedStatement {
207 PurifiedCreateSource {
208 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
210 create_source_stmt: CreateSourceStatement<Aug>,
211 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213 available_source_references: SourceReferences,
216 },
217 PurifiedAlterSource {
218 alter_source_stmt: AlterSourceStatement<Aug>,
219 },
220 PurifiedAlterSourceAddSubsources {
221 source_name: ResolvedItemName,
223 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228 },
229 PurifiedAlterSourceRefreshReferences {
230 source_name: ResolvedItemName,
231 available_source_references: SourceReferences,
233 },
234 PurifiedCreateSink(CreateSinkStatement<Aug>),
235 PurifiedCreateTableFromSource {
236 stmt: CreateTableFromSourceStatement<Aug>,
237 },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242 pub external_reference: UnresolvedItemName,
243 pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248 MySql {
249 table: MySqlTableDesc,
250 text_columns: Option<Vec<Ident>>,
251 exclude_columns: Option<Vec<Ident>>,
252 initial_gtid_set: String,
253 },
254 Postgres {
255 table: PostgresTableDesc,
256 text_columns: Option<Vec<Ident>>,
257 exclude_columns: Option<Vec<Ident>>,
258 },
259 SqlServer {
260 table: SqlServerTableDesc,
261 text_columns: Option<Vec<Ident>>,
262 excl_columns: Option<Vec<Ident>>,
263 capture_instance: Arc<str>,
264 initial_lsn: mz_sql_server_util::cdc::Lsn,
265 },
266 Kafka {},
267 LoadGenerator {
268 table: Option<RelationDesc>,
269 output: LoadGeneratorOutput,
270 },
271}
272
273pub async fn purify_statement(
283 catalog: impl SessionCatalog,
284 now: u64,
285 stmt: Statement<Aug>,
286 storage_configuration: &StorageConfiguration,
287) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
288 match stmt {
289 Statement::CreateSource(stmt) => {
290 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
291 (
292 purify_create_source(catalog, now, stmt, storage_configuration).await,
293 cluster_id,
294 )
295 }
296 Statement::AlterSource(stmt) => (
297 purify_alter_source(catalog, stmt, storage_configuration).await,
298 None,
299 ),
300 Statement::CreateSink(stmt) => {
301 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
302 (
303 purify_create_sink(catalog, stmt, storage_configuration).await,
304 cluster_id,
305 )
306 }
307 Statement::CreateTableFromSource(stmt) => (
308 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
309 None,
310 ),
311 o => unreachable!("{:?} does not need to be purified", o),
312 }
313}
314
315pub(crate) fn purify_create_sink_avro_doc_on_options(
320 catalog: &dyn SessionCatalog,
321 from_id: CatalogItemId,
322 format: &mut Option<FormatSpecifier<Aug>>,
323) -> Result<(), PlanError> {
324 let from = catalog.get_item(&from_id);
326 let object_ids = from
327 .references()
328 .items()
329 .copied()
330 .chain_one(from.id())
331 .collect::<Vec<_>>();
332
333 let mut avro_format_options = vec![];
336 for_each_format(format, |doc_on_schema, fmt| match fmt {
337 Format::Avro(AvroSchema::InlineSchema { .. })
338 | Format::Bytes
339 | Format::Csv { .. }
340 | Format::Json { .. }
341 | Format::Protobuf(..)
342 | Format::Regex(..)
343 | Format::Text => (),
344 Format::Avro(AvroSchema::Csr {
345 csr_connection: CsrConnectionAvro { connection, .. },
346 }) => {
347 avro_format_options.push((doc_on_schema, &mut connection.options));
348 }
349 });
350
351 for (for_schema, options) in avro_format_options {
354 let user_provided_comments = options
355 .iter()
356 .filter_map(|CsrConfigOption { name, .. }| match name {
357 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
358 _ => None,
359 })
360 .collect::<BTreeSet<_>>();
361
362 for object_id in &object_ids {
364 let item = catalog
366 .get_item(object_id)
367 .at_version(RelationVersionSelector::Latest);
368 let full_name = catalog.resolve_full_name(item.name());
369 let full_resolved_name = ResolvedItemName::Item {
370 id: *object_id,
371 qualifiers: item.name().qualifiers.clone(),
372 full_name: full_name.clone(),
373 print_id: !matches!(
374 item.item_type(),
375 CatalogItemType::Func | CatalogItemType::Type
376 ),
377 version: RelationVersionSelector::Latest,
378 };
379
380 if let Some(comments_map) = catalog.get_item_comments(object_id) {
381 let doc_on_item_key = AvroDocOn {
384 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
385 for_schema,
386 };
387 if !user_provided_comments.contains(&doc_on_item_key) {
388 if let Some(root_comment) = comments_map.get(&None) {
389 options.push(CsrConfigOption {
390 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
391 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
392 root_comment.clone(),
393 ))),
394 });
395 }
396 }
397
398 if let Ok(desc) = item.desc(&full_name) {
402 for (pos, column_name) in desc.iter_names().enumerate() {
403 let comment = comments_map.get(&Some(pos + 1));
404 if let Some(comment_str) = comment {
405 let doc_on_column_key = AvroDocOn {
406 identifier: DocOnIdentifier::Column(ColumnName {
407 relation: full_resolved_name.clone(),
408 column: ResolvedColumnReference::Column {
409 name: column_name.to_owned(),
410 index: pos,
411 },
412 }),
413 for_schema,
414 };
415 if !user_provided_comments.contains(&doc_on_column_key) {
416 options.push(CsrConfigOption {
417 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
418 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
419 Value::String(comment_str.clone()),
420 )),
421 });
422 }
423 }
424 }
425 }
426 }
427 }
428 }
429
430 Ok(())
431}
432
433async fn purify_create_sink(
436 catalog: impl SessionCatalog,
437 mut create_sink_stmt: CreateSinkStatement<Aug>,
438 storage_configuration: &StorageConfiguration,
439) -> Result<PurifiedStatement, PlanError> {
440 let CreateSinkStatement {
442 connection,
443 format,
444 with_options,
445 name: _,
446 in_cluster: _,
447 if_not_exists: _,
448 from,
449 envelope: _,
450 } = &mut create_sink_stmt;
451
452 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
454
455 if let Some(op) = with_options
456 .iter()
457 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
458 {
459 sql_bail!(
460 "CREATE SINK...WITH ({}..) is not allowed",
461 op.name.to_ast_string_simple(),
462 )
463 }
464
465 match &connection {
466 CreateSinkConnection::Kafka {
467 connection,
468 options,
469 key: _,
470 headers: _,
471 } => {
472 let scx = StatementContext::new(None, &catalog);
477 let connection = {
478 let item = scx.get_item_by_resolved_name(connection)?;
479 match item.connection()? {
481 Connection::Kafka(connection) => {
482 connection.clone().into_inline_connection(scx.catalog)
483 }
484 _ => sql_bail!(
485 "{} is not a kafka connection",
486 scx.catalog.resolve_full_name(item.name())
487 ),
488 }
489 };
490
491 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
492
493 if extracted_options.legacy_ids == Some(true) {
494 sql_bail!("LEGACY IDs option is not supported");
495 }
496
497 let client: AdminClient<_> = connection
498 .create_with_context(
499 storage_configuration,
500 MzClientContext::default(),
501 &BTreeMap::new(),
502 InTask::No,
503 )
504 .await
505 .map_err(|e| {
506 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
508 })?;
509
510 let metadata = client
511 .inner()
512 .fetch_metadata(
513 None,
514 storage_configuration
515 .parameters
516 .kafka_timeout_config
517 .fetch_metadata_timeout,
518 )
519 .map_err(|e| {
520 KafkaSinkPurificationError::AdminClientError(Arc::new(
521 ContextCreationError::KafkaError(e),
522 ))
523 })?;
524
525 if metadata.brokers().len() == 0 {
526 Err(KafkaSinkPurificationError::ZeroBrokers)?;
527 }
528 }
529 CreateSinkConnection::Iceberg {
530 connection,
531 aws_connection,
532 ..
533 } => {
534 let scx = StatementContext::new(None, &catalog);
535 let connection = {
536 let item = scx.get_item_by_resolved_name(connection)?;
537 match item.connection()? {
539 Connection::IcebergCatalog(connection) => {
540 connection.clone().into_inline_connection(scx.catalog)
541 }
542 _ => sql_bail!(
543 "{} is not an iceberg connection",
544 scx.catalog.resolve_full_name(item.name())
545 ),
546 }
547 };
548
549 let aws_conn_id = aws_connection.item_id();
550
551 let aws_connection = {
552 let item = scx.get_item_by_resolved_name(aws_connection)?;
553 match item.connection()? {
555 Connection::Aws(aws_connection) => aws_connection.clone(),
556 _ => sql_bail!(
557 "{} is not an aws connection",
558 scx.catalog.resolve_full_name(item.name())
559 ),
560 }
561 };
562
563 let _catalog = connection
564 .connect(storage_configuration, InTask::No)
565 .await
566 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
567
568 let sdk_config = aws_connection
569 .load_sdk_config(
570 &storage_configuration.connection_context,
571 aws_conn_id.clone(),
572 InTask::No,
573 )
574 .await
575 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
576
577 let sts_client = aws_sdk_sts::Client::new(&sdk_config);
578 let _ = sts_client.get_caller_identity().send().await.map_err(|e| {
579 IcebergSinkPurificationError::StsIdentityError(Arc::new(e.into_service_error()))
580 })?;
581 }
582 }
583
584 let mut csr_connection_ids = BTreeSet::new();
585 for_each_format(format, |_, fmt| match fmt {
586 Format::Avro(AvroSchema::InlineSchema { .. })
587 | Format::Bytes
588 | Format::Csv { .. }
589 | Format::Json { .. }
590 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
591 | Format::Regex(..)
592 | Format::Text => (),
593 Format::Avro(AvroSchema::Csr {
594 csr_connection: CsrConnectionAvro { connection, .. },
595 })
596 | Format::Protobuf(ProtobufSchema::Csr {
597 csr_connection: CsrConnectionProtobuf { connection, .. },
598 }) => {
599 csr_connection_ids.insert(*connection.connection.item_id());
600 }
601 });
602
603 let scx = StatementContext::new(None, &catalog);
604 for csr_connection_id in csr_connection_ids {
605 let connection = {
606 let item = scx.get_item(&csr_connection_id);
607 match item.connection()? {
609 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
610 _ => Err(CsrPurificationError::NotCsrConnection(
611 scx.catalog.resolve_full_name(item.name()),
612 ))?,
613 }
614 };
615
616 let client = connection
617 .connect(storage_configuration, InTask::No)
618 .await
619 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
620
621 client
622 .list_subjects()
623 .await
624 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
625 }
626
627 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
628
629 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
630}
631
632fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
639where
640 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
641{
642 match format {
643 None => (),
644 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
645 Some(FormatSpecifier::KeyValue { key, value }) => {
646 f(DocOnSchema::KeyOnly, key);
647 f(DocOnSchema::ValueOnly, value);
648 }
649 }
650}
651
652#[derive(Debug, Copy, Clone, PartialEq, Eq)]
655pub(crate) enum SourceReferencePolicy {
656 NotAllowed,
659 Optional,
662 Required,
665}
666
667async fn purify_create_source(
668 catalog: impl SessionCatalog,
669 now: u64,
670 mut create_source_stmt: CreateSourceStatement<Aug>,
671 storage_configuration: &StorageConfiguration,
672) -> Result<PurifiedStatement, PlanError> {
673 let CreateSourceStatement {
674 name: source_name,
675 col_names,
676 key_constraint,
677 connection: source_connection,
678 format,
679 envelope,
680 include_metadata,
681 external_references,
682 progress_subsource,
683 with_options,
684 ..
685 } = &mut create_source_stmt;
686
687 let uses_old_syntax = !col_names.is_empty()
688 || key_constraint.is_some()
689 || format.is_some()
690 || envelope.is_some()
691 || !include_metadata.is_empty()
692 || external_references.is_some()
693 || progress_subsource.is_some();
694
695 if let Some(DeferredItemName::Named(_)) = progress_subsource {
696 sql_bail!("Cannot manually ID qualify progress subsource")
697 }
698
699 let mut requested_subsource_map = BTreeMap::new();
700
701 let progress_desc = match &source_connection {
702 CreateSourceConnection::Kafka { .. } => {
703 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
704 }
705 CreateSourceConnection::Postgres { .. } => {
706 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
707 }
708 CreateSourceConnection::SqlServer { .. } => {
709 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
710 }
711 CreateSourceConnection::MySql { .. } => {
712 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
713 }
714 CreateSourceConnection::LoadGenerator { .. } => {
715 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
716 }
717 };
718 let scx = StatementContext::new(None, &catalog);
719
720 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
724 && scx.catalog.system_vars().force_source_table_syntax()
725 {
726 SourceReferencePolicy::NotAllowed
727 } else if scx.catalog.system_vars().enable_create_table_from_source() {
728 SourceReferencePolicy::Optional
729 } else {
730 SourceReferencePolicy::Required
731 };
732
733 let mut format_options = SourceFormatOptions::Default;
734
735 let retrieved_source_references: RetrievedSourceReferences;
736
737 match source_connection {
738 CreateSourceConnection::Kafka {
739 connection,
740 options: base_with_options,
741 ..
742 } => {
743 if let Some(external_references) = external_references {
744 Err(KafkaSourcePurificationError::ReferencedSubsources(
745 external_references.clone(),
746 ))?;
747 }
748
749 let connection = {
750 let item = scx.get_item_by_resolved_name(connection)?;
751 match item.connection()? {
753 Connection::Kafka(connection) => {
754 connection.clone().into_inline_connection(&catalog)
755 }
756 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
757 scx.catalog.resolve_full_name(item.name()),
758 ))?,
759 }
760 };
761
762 let extracted_options: KafkaSourceConfigOptionExtracted =
763 base_with_options.clone().try_into()?;
764
765 let topic = extracted_options
766 .topic
767 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
768
769 let consumer = connection
770 .create_with_context(
771 storage_configuration,
772 MzClientContext::default(),
773 &BTreeMap::new(),
774 InTask::No,
775 )
776 .await
777 .map_err(|e| {
778 KafkaSourcePurificationError::KafkaConsumerError(
780 e.display_with_causes().to_string(),
781 )
782 })?;
783 let consumer = Arc::new(consumer);
784
785 match (
786 extracted_options.start_offset,
787 extracted_options.start_timestamp,
788 ) {
789 (None, None) => {
790 kafka_util::ensure_topic_exists(
792 Arc::clone(&consumer),
793 &topic,
794 storage_configuration
795 .parameters
796 .kafka_timeout_config
797 .fetch_metadata_timeout,
798 )
799 .await?;
800 }
801 (Some(_), Some(_)) => {
802 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
803 }
804 (Some(start_offsets), None) => {
805 kafka_util::validate_start_offsets(
807 Arc::clone(&consumer),
808 &topic,
809 start_offsets,
810 storage_configuration
811 .parameters
812 .kafka_timeout_config
813 .fetch_metadata_timeout,
814 )
815 .await?;
816 }
817 (None, Some(time_offset)) => {
818 let start_offsets = kafka_util::lookup_start_offsets(
820 Arc::clone(&consumer),
821 &topic,
822 time_offset,
823 now,
824 storage_configuration
825 .parameters
826 .kafka_timeout_config
827 .fetch_metadata_timeout,
828 )
829 .await?;
830
831 base_with_options.retain(|val| {
832 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
833 });
834 base_with_options.push(KafkaSourceConfigOption {
835 name: KafkaSourceConfigOptionName::StartOffset,
836 value: Some(WithOptionValue::Sequence(
837 start_offsets
838 .iter()
839 .map(|offset| {
840 WithOptionValue::Value(Value::Number(offset.to_string()))
841 })
842 .collect(),
843 )),
844 });
845 }
846 }
847
848 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
849 retrieved_source_references = reference_client.get_source_references().await?;
850
851 format_options = SourceFormatOptions::Kafka { topic };
852 }
853 CreateSourceConnection::Postgres {
854 connection,
855 options,
856 } => {
857 let connection_item = scx.get_item_by_resolved_name(connection)?;
858 let connection = match connection_item.connection().map_err(PlanError::from)? {
859 Connection::Postgres(connection) => {
860 connection.clone().into_inline_connection(&catalog)
861 }
862 _ => Err(PgSourcePurificationError::NotPgConnection(
863 scx.catalog.resolve_full_name(connection_item.name()),
864 ))?,
865 };
866 let crate::plan::statement::PgConfigOptionExtracted {
867 publication,
868 text_columns,
869 exclude_columns,
870 details,
871 ..
872 } = options.clone().try_into()?;
873 let publication =
874 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
875
876 if details.is_some() {
877 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
878 }
879
880 let client = connection
881 .validate(connection_item.id(), storage_configuration)
882 .await?;
883
884 let reference_client = SourceReferenceClient::Postgres {
885 client: &client,
886 publication: &publication,
887 database: &connection.database,
888 };
889 retrieved_source_references = reference_client.get_source_references().await?;
890
891 let postgres::PurifiedSourceExports {
892 source_exports: subsources,
893 normalized_text_columns,
894 } = postgres::purify_source_exports(
895 &client,
896 &retrieved_source_references,
897 external_references,
898 text_columns,
899 exclude_columns,
900 source_name,
901 &reference_policy,
902 )
903 .await?;
904
905 if let Some(text_cols_option) = options
906 .iter_mut()
907 .find(|option| option.name == PgConfigOptionName::TextColumns)
908 {
909 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
910 }
911
912 requested_subsource_map.extend(subsources);
913
914 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
917
918 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
920 let details = PostgresSourcePublicationDetails {
921 slot: format!(
922 "materialize_{}",
923 Uuid::new_v4().to_string().replace('-', "")
924 ),
925 timeline_id: Some(timeline_id),
926 database: connection.database,
927 };
928 options.push(PgConfigOption {
929 name: PgConfigOptionName::Details,
930 value: Some(WithOptionValue::Value(Value::String(hex::encode(
931 details.into_proto().encode_to_vec(),
932 )))),
933 })
934 }
935 CreateSourceConnection::SqlServer {
936 connection,
937 options,
938 } => {
939 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
940
941 let connection_item = scx.get_item_by_resolved_name(connection)?;
944 let connection = match connection_item.connection()? {
945 Connection::SqlServer(connection) => {
946 connection.clone().into_inline_connection(&catalog)
947 }
948 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
949 scx.catalog.resolve_full_name(connection_item.name()),
950 ))?,
951 };
952 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
953 details,
954 text_columns,
955 exclude_columns,
956 seen: _,
957 } = options.clone().try_into()?;
958
959 if details.is_some() {
960 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
961 }
962
963 let mut client = connection
964 .validate(connection_item.id(), storage_configuration)
965 .await?;
966
967 let database: Arc<str> = connection.database.into();
968 let reference_client = SourceReferenceClient::SqlServer {
969 client: &mut client,
970 database: Arc::clone(&database),
971 };
972 retrieved_source_references = reference_client.get_source_references().await?;
973 tracing::debug!(?retrieved_source_references, "got source references");
974
975 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
976 .get(storage_configuration.config_set());
977
978 let purified_source_exports = sql_server::purify_source_exports(
979 &*database,
980 &mut client,
981 &retrieved_source_references,
982 external_references,
983 &text_columns,
984 &exclude_columns,
985 source_name,
986 timeout,
987 &reference_policy,
988 )
989 .await?;
990
991 let sql_server::PurifiedSourceExports {
992 source_exports,
993 normalized_text_columns,
994 normalized_excl_columns,
995 } = purified_source_exports;
996
997 requested_subsource_map.extend(source_exports);
999
1000 let restore_history_id =
1004 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1005 let details = SqlServerSourceExtras { restore_history_id };
1006
1007 options.retain(|SqlServerConfigOption { name, .. }| {
1008 name != &SqlServerConfigOptionName::Details
1009 });
1010 options.push(SqlServerConfigOption {
1011 name: SqlServerConfigOptionName::Details,
1012 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1013 details.into_proto().encode_to_vec(),
1014 )))),
1015 });
1016
1017 if let Some(text_cols_option) = options
1019 .iter_mut()
1020 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1021 {
1022 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1023 }
1024 if let Some(excl_cols_option) = options
1025 .iter_mut()
1026 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1027 {
1028 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1029 }
1030 }
1031 CreateSourceConnection::MySql {
1032 connection,
1033 options,
1034 } => {
1035 let connection_item = scx.get_item_by_resolved_name(connection)?;
1036 let connection = match connection_item.connection()? {
1037 Connection::MySql(connection) => {
1038 connection.clone().into_inline_connection(&catalog)
1039 }
1040 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1041 scx.catalog.resolve_full_name(connection_item.name()),
1042 ))?,
1043 };
1044 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1045 details,
1046 text_columns,
1047 exclude_columns,
1048 seen: _,
1049 } = options.clone().try_into()?;
1050
1051 if details.is_some() {
1052 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1053 }
1054
1055 let mut conn = connection
1056 .validate(connection_item.id(), storage_configuration)
1057 .await
1058 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1059
1060 let initial_gtid_set =
1064 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1065
1066 let reference_client = SourceReferenceClient::MySql {
1067 conn: &mut conn,
1068 include_system_schemas: mysql::references_system_schemas(external_references),
1069 };
1070 retrieved_source_references = reference_client.get_source_references().await?;
1071
1072 let mysql::PurifiedSourceExports {
1073 source_exports: subsources,
1074 normalized_text_columns,
1075 normalized_exclude_columns,
1076 } = mysql::purify_source_exports(
1077 &mut conn,
1078 &retrieved_source_references,
1079 external_references,
1080 text_columns,
1081 exclude_columns,
1082 source_name,
1083 initial_gtid_set.clone(),
1084 &reference_policy,
1085 )
1086 .await?;
1087 requested_subsource_map.extend(subsources);
1088
1089 let details = MySqlSourceDetails {};
1092 options
1094 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1095 options.push(MySqlConfigOption {
1096 name: MySqlConfigOptionName::Details,
1097 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1098 details.into_proto().encode_to_vec(),
1099 )))),
1100 });
1101
1102 if let Some(text_cols_option) = options
1103 .iter_mut()
1104 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1105 {
1106 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1107 }
1108 if let Some(exclude_cols_option) = options
1109 .iter_mut()
1110 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1111 {
1112 exclude_cols_option.value =
1113 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1114 }
1115 }
1116 CreateSourceConnection::LoadGenerator { generator, options } => {
1117 let load_generator =
1118 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1119
1120 let reference_client = SourceReferenceClient::LoadGenerator {
1121 generator: &load_generator,
1122 };
1123 retrieved_source_references = reference_client.get_source_references().await?;
1124 let multi_output_sources =
1128 retrieved_source_references
1129 .all_references()
1130 .iter()
1131 .any(|r| {
1132 r.load_generator_output().expect("is loadgen")
1133 != &LoadGeneratorOutput::Default
1134 });
1135
1136 match external_references {
1137 Some(requested)
1138 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1139 {
1140 Err(PlanError::UseTablesForSources(requested.to_string()))?
1141 }
1142 Some(requested) if !multi_output_sources => match requested {
1143 ExternalReferences::SubsetTables(_) => {
1144 Err(LoadGeneratorSourcePurificationError::ForTables)?
1145 }
1146 ExternalReferences::SubsetSchemas(_) => {
1147 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1148 }
1149 ExternalReferences::All => {
1150 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1151 }
1152 },
1153 Some(requested) => {
1154 let requested_exports = retrieved_source_references
1155 .requested_source_exports(Some(requested), source_name)?;
1156 for export in requested_exports {
1157 requested_subsource_map.insert(
1158 export.name,
1159 PurifiedSourceExport {
1160 external_reference: export.external_reference,
1161 details: PurifiedExportDetails::LoadGenerator {
1162 table: export
1163 .meta
1164 .load_generator_desc()
1165 .expect("is loadgen")
1166 .clone(),
1167 output: export
1168 .meta
1169 .load_generator_output()
1170 .expect("is loadgen")
1171 .clone(),
1172 },
1173 },
1174 );
1175 }
1176 }
1177 None => {
1178 if multi_output_sources
1179 && matches!(reference_policy, SourceReferencePolicy::Required)
1180 {
1181 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1182 }
1183 }
1184 }
1185
1186 if let LoadGenerator::Clock = generator {
1187 if !options
1188 .iter()
1189 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1190 {
1191 let now = catalog.now();
1192 options.push(LoadGeneratorOption {
1193 name: LoadGeneratorOptionName::AsOf,
1194 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1195 });
1196 }
1197 }
1198 }
1199 }
1200
1201 *external_references = None;
1205
1206 let create_progress_subsource_stmt = if uses_old_syntax {
1208 let name = match progress_subsource {
1210 Some(name) => match name {
1211 DeferredItemName::Deferred(name) => name.clone(),
1212 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1213 },
1214 None => {
1215 let (item, prefix) = source_name.0.split_last().unwrap();
1216 let item_name =
1217 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1218 let mut suggested_name = prefix.to_vec();
1219 suggested_name.push(candidate.clone());
1220
1221 let partial =
1222 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1223 let qualified = scx.allocate_qualified_name(partial)?;
1224 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1225 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1226 Ok::<_, PlanError>(!item_exists && !type_exists)
1227 })?;
1228
1229 let mut full_name = prefix.to_vec();
1230 full_name.push(item_name);
1231 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1232 let qualified_name = scx.allocate_qualified_name(full_name)?;
1233 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1234
1235 UnresolvedItemName::from(full_name.clone())
1236 }
1237 };
1238
1239 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1240
1241 let mut progress_with_options: Vec<_> = with_options
1243 .iter()
1244 .filter_map(|opt| match opt.name {
1245 CreateSourceOptionName::TimestampInterval => None,
1246 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1247 name: CreateSubsourceOptionName::RetainHistory,
1248 value: opt.value.clone(),
1249 }),
1250 })
1251 .collect();
1252 progress_with_options.push(CreateSubsourceOption {
1253 name: CreateSubsourceOptionName::Progress,
1254 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1255 });
1256
1257 Some(CreateSubsourceStatement {
1258 name,
1259 columns,
1260 of_source: None,
1264 constraints,
1265 if_not_exists: false,
1266 with_options: progress_with_options,
1267 })
1268 } else {
1269 None
1270 };
1271
1272 purify_source_format(
1273 &catalog,
1274 format,
1275 &format_options,
1276 envelope,
1277 storage_configuration,
1278 )
1279 .await?;
1280
1281 Ok(PurifiedStatement::PurifiedCreateSource {
1282 create_progress_subsource_stmt,
1283 create_source_stmt,
1284 subsources: requested_subsource_map,
1285 available_source_references: retrieved_source_references.available_source_references(),
1286 })
1287}
1288
1289async fn purify_alter_source(
1292 catalog: impl SessionCatalog,
1293 stmt: AlterSourceStatement<Aug>,
1294 storage_configuration: &StorageConfiguration,
1295) -> Result<PurifiedStatement, PlanError> {
1296 let scx = StatementContext::new(None, &catalog);
1297 let AlterSourceStatement {
1298 source_name: unresolved_source_name,
1299 action,
1300 if_exists,
1301 } = stmt;
1302
1303 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1305 Ok(item) => item,
1306 Err(_) if if_exists => {
1307 return Ok(PurifiedStatement::PurifiedAlterSource {
1308 alter_source_stmt: AlterSourceStatement {
1309 source_name: unresolved_source_name,
1310 action,
1311 if_exists,
1312 },
1313 });
1314 }
1315 Err(e) => return Err(e),
1316 };
1317
1318 let desc = match item.source_desc()? {
1320 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1321 None => {
1322 sql_bail!("cannot ALTER this type of source")
1323 }
1324 };
1325
1326 let source_name = item.name();
1327
1328 let resolved_source_name = ResolvedItemName::Item {
1329 id: item.id(),
1330 qualifiers: item.name().qualifiers.clone(),
1331 full_name: scx.catalog.resolve_full_name(source_name),
1332 print_id: true,
1333 version: RelationVersionSelector::Latest,
1334 };
1335
1336 let partial_name = scx.catalog.minimal_qualification(source_name);
1337
1338 match action {
1339 AlterSourceAction::AddSubsources {
1340 external_references,
1341 options,
1342 } => {
1343 if scx.catalog.system_vars().enable_create_table_from_source()
1344 && scx.catalog.system_vars().force_source_table_syntax()
1345 {
1346 Err(PlanError::UseTablesForSources(
1347 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1348 ))?;
1349 }
1350
1351 purify_alter_source_add_subsources(
1352 external_references,
1353 options,
1354 desc,
1355 partial_name,
1356 unresolved_source_name,
1357 resolved_source_name,
1358 storage_configuration,
1359 )
1360 .await
1361 }
1362 AlterSourceAction::RefreshReferences => {
1363 purify_alter_source_refresh_references(
1364 desc,
1365 resolved_source_name,
1366 storage_configuration,
1367 )
1368 .await
1369 }
1370 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1371 alter_source_stmt: AlterSourceStatement {
1372 source_name: unresolved_source_name,
1373 action,
1374 if_exists,
1375 },
1376 }),
1377 }
1378}
1379
1380async fn purify_alter_source_add_subsources(
1383 external_references: Vec<ExternalReferenceExport>,
1384 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1385 desc: SourceDesc,
1386 partial_source_name: PartialItemName,
1387 unresolved_source_name: UnresolvedItemName,
1388 resolved_source_name: ResolvedItemName,
1389 storage_configuration: &StorageConfiguration,
1390) -> Result<PurifiedStatement, PlanError> {
1391 let connection_id = match &desc.connection {
1393 GenericSourceConnection::Postgres(c) => c.connection_id,
1394 GenericSourceConnection::MySql(c) => c.connection_id,
1395 GenericSourceConnection::SqlServer(c) => c.connection_id,
1396 _ => sql_bail!(
1397 "source {} does not support ALTER SOURCE.",
1398 partial_source_name
1399 ),
1400 };
1401
1402 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1403 text_columns,
1404 exclude_columns,
1405 details,
1406 seen: _,
1407 } = options.clone().try_into()?;
1408 assert_none!(details, "details cannot be explicitly set");
1409
1410 let mut requested_subsource_map = BTreeMap::new();
1411
1412 match desc.connection {
1413 GenericSourceConnection::Postgres(pg_source_connection) => {
1414 let pg_connection = &pg_source_connection.connection;
1416
1417 let client = pg_connection
1418 .validate(connection_id, storage_configuration)
1419 .await?;
1420
1421 let reference_client = SourceReferenceClient::Postgres {
1422 client: &client,
1423 publication: &pg_source_connection.publication,
1424 database: &pg_connection.database,
1425 };
1426 let retrieved_source_references = reference_client.get_source_references().await?;
1427
1428 let postgres::PurifiedSourceExports {
1429 source_exports: subsources,
1430 normalized_text_columns,
1431 } = postgres::purify_source_exports(
1432 &client,
1433 &retrieved_source_references,
1434 &Some(ExternalReferences::SubsetTables(external_references)),
1435 text_columns,
1436 exclude_columns,
1437 &unresolved_source_name,
1438 &SourceReferencePolicy::Required,
1439 )
1440 .await?;
1441
1442 if let Some(text_cols_option) = options
1443 .iter_mut()
1444 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1445 {
1446 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1447 }
1448
1449 requested_subsource_map.extend(subsources);
1450 }
1451 GenericSourceConnection::MySql(mysql_source_connection) => {
1452 let mysql_connection = &mysql_source_connection.connection;
1453 let config = mysql_connection
1454 .config(
1455 &storage_configuration.connection_context.secrets_reader,
1456 storage_configuration,
1457 InTask::No,
1458 )
1459 .await?;
1460
1461 let mut conn = config
1462 .connect(
1463 "mysql purification",
1464 &storage_configuration.connection_context.ssh_tunnel_manager,
1465 )
1466 .await?;
1467
1468 let initial_gtid_set =
1471 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1472
1473 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1474
1475 let reference_client = SourceReferenceClient::MySql {
1476 conn: &mut conn,
1477 include_system_schemas: mysql::references_system_schemas(&requested_references),
1478 };
1479 let retrieved_source_references = reference_client.get_source_references().await?;
1480
1481 let mysql::PurifiedSourceExports {
1482 source_exports: subsources,
1483 normalized_text_columns,
1484 normalized_exclude_columns,
1485 } = mysql::purify_source_exports(
1486 &mut conn,
1487 &retrieved_source_references,
1488 &requested_references,
1489 text_columns,
1490 exclude_columns,
1491 &unresolved_source_name,
1492 initial_gtid_set,
1493 &SourceReferencePolicy::Required,
1494 )
1495 .await?;
1496 requested_subsource_map.extend(subsources);
1497
1498 if let Some(text_cols_option) = options
1500 .iter_mut()
1501 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1502 {
1503 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1504 }
1505 if let Some(exclude_cols_option) = options
1506 .iter_mut()
1507 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1508 {
1509 exclude_cols_option.value =
1510 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1511 }
1512 }
1513 GenericSourceConnection::SqlServer(sql_server_source) => {
1514 let sql_server_connection = &sql_server_source.connection;
1516 let config = sql_server_connection
1517 .resolve_config(
1518 &storage_configuration.connection_context.secrets_reader,
1519 storage_configuration,
1520 InTask::No,
1521 )
1522 .await?;
1523 let mut client = mz_sql_server_util::Client::connect(config).await?;
1524
1525 let database = sql_server_connection.database.clone().into();
1527 let source_references = SourceReferenceClient::SqlServer {
1528 client: &mut client,
1529 database: Arc::clone(&database),
1530 }
1531 .get_source_references()
1532 .await?;
1533 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1534
1535 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1536 .get(storage_configuration.config_set());
1537
1538 let result = sql_server::purify_source_exports(
1539 &*database,
1540 &mut client,
1541 &source_references,
1542 &requested_references,
1543 &text_columns,
1544 &exclude_columns,
1545 &unresolved_source_name,
1546 timeout,
1547 &SourceReferencePolicy::Required,
1548 )
1549 .await;
1550 let sql_server::PurifiedSourceExports {
1551 source_exports,
1552 normalized_text_columns,
1553 normalized_excl_columns,
1554 } = result?;
1555
1556 requested_subsource_map.extend(source_exports);
1558
1559 if let Some(text_cols_option) = options
1561 .iter_mut()
1562 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1563 {
1564 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1565 }
1566 if let Some(exclude_cols_option) = options
1567 .iter_mut()
1568 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1569 {
1570 exclude_cols_option.value =
1571 Some(WithOptionValue::Sequence(normalized_excl_columns));
1572 }
1573 }
1574 _ => unreachable!(),
1575 };
1576
1577 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1578 source_name: resolved_source_name,
1579 options,
1580 subsources: requested_subsource_map,
1581 })
1582}
1583
1584async fn purify_alter_source_refresh_references(
1585 desc: SourceDesc,
1586 resolved_source_name: ResolvedItemName,
1587 storage_configuration: &StorageConfiguration,
1588) -> Result<PurifiedStatement, PlanError> {
1589 let retrieved_source_references = match desc.connection {
1590 GenericSourceConnection::Postgres(pg_source_connection) => {
1591 let pg_connection = &pg_source_connection.connection;
1593
1594 let config = pg_connection
1595 .config(
1596 &storage_configuration.connection_context.secrets_reader,
1597 storage_configuration,
1598 InTask::No,
1599 )
1600 .await?;
1601
1602 let client = config
1603 .connect(
1604 "postgres_purification",
1605 &storage_configuration.connection_context.ssh_tunnel_manager,
1606 )
1607 .await?;
1608 let reference_client = SourceReferenceClient::Postgres {
1609 client: &client,
1610 publication: &pg_source_connection.publication,
1611 database: &pg_connection.database,
1612 };
1613 reference_client.get_source_references().await?
1614 }
1615 GenericSourceConnection::MySql(mysql_source_connection) => {
1616 let mysql_connection = &mysql_source_connection.connection;
1617 let config = mysql_connection
1618 .config(
1619 &storage_configuration.connection_context.secrets_reader,
1620 storage_configuration,
1621 InTask::No,
1622 )
1623 .await?;
1624
1625 let mut conn = config
1626 .connect(
1627 "mysql purification",
1628 &storage_configuration.connection_context.ssh_tunnel_manager,
1629 )
1630 .await?;
1631
1632 let reference_client = SourceReferenceClient::MySql {
1633 conn: &mut conn,
1634 include_system_schemas: false,
1635 };
1636 reference_client.get_source_references().await?
1637 }
1638 GenericSourceConnection::SqlServer(sql_server_source) => {
1639 let sql_server_connection = &sql_server_source.connection;
1641 let config = sql_server_connection
1642 .resolve_config(
1643 &storage_configuration.connection_context.secrets_reader,
1644 storage_configuration,
1645 InTask::No,
1646 )
1647 .await?;
1648 let mut client = mz_sql_server_util::Client::connect(config).await?;
1649
1650 let source_references = SourceReferenceClient::SqlServer {
1652 client: &mut client,
1653 database: sql_server_connection.database.clone().into(),
1654 }
1655 .get_source_references()
1656 .await?;
1657 source_references
1658 }
1659 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1660 let reference_client = SourceReferenceClient::LoadGenerator {
1661 generator: &load_gen_connection.load_generator,
1662 };
1663 reference_client.get_source_references().await?
1664 }
1665 GenericSourceConnection::Kafka(kafka_conn) => {
1666 let reference_client = SourceReferenceClient::Kafka {
1667 topic: &kafka_conn.topic,
1668 };
1669 reference_client.get_source_references().await?
1670 }
1671 };
1672 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1673 source_name: resolved_source_name,
1674 available_source_references: retrieved_source_references.available_source_references(),
1675 })
1676}
1677
1678async fn purify_create_table_from_source(
1679 catalog: impl SessionCatalog,
1680 mut stmt: CreateTableFromSourceStatement<Aug>,
1681 storage_configuration: &StorageConfiguration,
1682) -> Result<PurifiedStatement, PlanError> {
1683 let scx = StatementContext::new(None, &catalog);
1684 let CreateTableFromSourceStatement {
1685 name: _,
1686 columns,
1687 constraints,
1688 source: source_name,
1689 if_not_exists: _,
1690 external_reference,
1691 format,
1692 envelope,
1693 include_metadata: _,
1694 with_options,
1695 } = &mut stmt;
1696
1697 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1699 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1700 }
1701 if !constraints.is_empty() {
1702 sql_bail!(
1703 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1704 );
1705 }
1706
1707 let item = match scx.get_item_by_resolved_name(source_name) {
1709 Ok(item) => item,
1710 Err(e) => return Err(e),
1711 };
1712
1713 let desc = match item.source_desc()? {
1715 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1716 None => {
1717 sql_bail!("cannot ALTER this type of source")
1718 }
1719 };
1720 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1721
1722 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1723 text_columns,
1724 exclude_columns,
1725 retain_history: _,
1726 details,
1727 partition_by: _,
1728 seen: _,
1729 } = with_options.clone().try_into()?;
1730 assert_none!(details, "details cannot be explicitly set");
1731
1732 let qualified_text_columns = text_columns
1736 .iter()
1737 .map(|col| {
1738 UnresolvedItemName(
1739 external_reference
1740 .as_ref()
1741 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1742 .unwrap_or_else(|| vec![col.clone()]),
1743 )
1744 })
1745 .collect_vec();
1746 let qualified_exclude_columns = exclude_columns
1747 .iter()
1748 .map(|col| {
1749 UnresolvedItemName(
1750 external_reference
1751 .as_ref()
1752 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1753 .unwrap_or_else(|| vec![col.clone()]),
1754 )
1755 })
1756 .collect_vec();
1757
1758 let mut format_options = SourceFormatOptions::Default;
1760
1761 let retrieved_source_references: RetrievedSourceReferences;
1762
1763 let requested_references = external_reference.as_ref().map(|ref_name| {
1764 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1765 reference: ref_name.clone(),
1766 alias: None,
1767 }])
1768 });
1769
1770 let purified_export = match desc.connection {
1773 GenericSourceConnection::Postgres(pg_source_connection) => {
1774 let pg_connection = &pg_source_connection.connection;
1776
1777 let client = pg_connection
1778 .validate(pg_source_connection.connection_id, storage_configuration)
1779 .await?;
1780
1781 let reference_client = SourceReferenceClient::Postgres {
1782 client: &client,
1783 publication: &pg_source_connection.publication,
1784 database: &pg_connection.database,
1785 };
1786 retrieved_source_references = reference_client.get_source_references().await?;
1787
1788 let postgres::PurifiedSourceExports {
1789 source_exports,
1790 normalized_text_columns: _,
1794 } = postgres::purify_source_exports(
1795 &client,
1796 &retrieved_source_references,
1797 &requested_references,
1798 qualified_text_columns,
1799 qualified_exclude_columns,
1800 &unresolved_source_name,
1801 &SourceReferencePolicy::Required,
1802 )
1803 .await?;
1804 let (_, purified_export) = source_exports.into_element();
1806 purified_export
1807 }
1808 GenericSourceConnection::MySql(mysql_source_connection) => {
1809 let mysql_connection = &mysql_source_connection.connection;
1810 let config = mysql_connection
1811 .config(
1812 &storage_configuration.connection_context.secrets_reader,
1813 storage_configuration,
1814 InTask::No,
1815 )
1816 .await?;
1817
1818 let mut conn = config
1819 .connect(
1820 "mysql purification",
1821 &storage_configuration.connection_context.ssh_tunnel_manager,
1822 )
1823 .await?;
1824
1825 let initial_gtid_set =
1828 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1829
1830 let reference_client = SourceReferenceClient::MySql {
1831 conn: &mut conn,
1832 include_system_schemas: mysql::references_system_schemas(&requested_references),
1833 };
1834 retrieved_source_references = reference_client.get_source_references().await?;
1835
1836 let mysql::PurifiedSourceExports {
1837 source_exports,
1838 normalized_text_columns: _,
1842 normalized_exclude_columns: _,
1843 } = mysql::purify_source_exports(
1844 &mut conn,
1845 &retrieved_source_references,
1846 &requested_references,
1847 qualified_text_columns,
1848 qualified_exclude_columns,
1849 &unresolved_source_name,
1850 initial_gtid_set,
1851 &SourceReferencePolicy::Required,
1852 )
1853 .await?;
1854 let (_, purified_export) = source_exports.into_element();
1856 purified_export
1857 }
1858 GenericSourceConnection::SqlServer(sql_server_source) => {
1859 let connection = sql_server_source.connection;
1860 let config = connection
1861 .resolve_config(
1862 &storage_configuration.connection_context.secrets_reader,
1863 storage_configuration,
1864 InTask::No,
1865 )
1866 .await?;
1867 let mut client = mz_sql_server_util::Client::connect(config).await?;
1868
1869 let database: Arc<str> = connection.database.into();
1870 let reference_client = SourceReferenceClient::SqlServer {
1871 client: &mut client,
1872 database: Arc::clone(&database),
1873 };
1874 retrieved_source_references = reference_client.get_source_references().await?;
1875 tracing::debug!(?retrieved_source_references, "got source references");
1876
1877 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1878 .get(storage_configuration.config_set());
1879
1880 let purified_source_exports = sql_server::purify_source_exports(
1881 &*database,
1882 &mut client,
1883 &retrieved_source_references,
1884 &requested_references,
1885 &qualified_text_columns,
1886 &qualified_exclude_columns,
1887 &unresolved_source_name,
1888 timeout,
1889 &SourceReferencePolicy::Required,
1890 )
1891 .await?;
1892
1893 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1895 purified_export
1896 }
1897 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1898 let reference_client = SourceReferenceClient::LoadGenerator {
1899 generator: &load_gen_connection.load_generator,
1900 };
1901 retrieved_source_references = reference_client.get_source_references().await?;
1902
1903 let requested_exports = retrieved_source_references
1904 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1905 let export = requested_exports.into_element();
1907 PurifiedSourceExport {
1908 external_reference: export.external_reference,
1909 details: PurifiedExportDetails::LoadGenerator {
1910 table: export
1911 .meta
1912 .load_generator_desc()
1913 .expect("is loadgen")
1914 .clone(),
1915 output: export
1916 .meta
1917 .load_generator_output()
1918 .expect("is loadgen")
1919 .clone(),
1920 },
1921 }
1922 }
1923 GenericSourceConnection::Kafka(kafka_conn) => {
1924 let reference_client = SourceReferenceClient::Kafka {
1925 topic: &kafka_conn.topic,
1926 };
1927 retrieved_source_references = reference_client.get_source_references().await?;
1928 let requested_exports = retrieved_source_references
1929 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1930 let export = requested_exports.into_element();
1932
1933 format_options = SourceFormatOptions::Kafka {
1934 topic: kafka_conn.topic.clone(),
1935 };
1936 PurifiedSourceExport {
1937 external_reference: export.external_reference,
1938 details: PurifiedExportDetails::Kafka {},
1939 }
1940 }
1941 };
1942
1943 purify_source_format(
1944 &catalog,
1945 format,
1946 &format_options,
1947 envelope,
1948 storage_configuration,
1949 )
1950 .await?;
1951
1952 *external_reference = Some(purified_export.external_reference.clone());
1955
1956 match &purified_export.details {
1958 PurifiedExportDetails::Postgres { .. } => {
1959 let mut unsupported_cols = vec![];
1960 let postgres::PostgresExportStatementValues {
1961 columns: gen_columns,
1962 constraints: gen_constraints,
1963 text_columns: gen_text_columns,
1964 exclude_columns: gen_exclude_columns,
1965 details: gen_details,
1966 external_reference: _,
1967 } = postgres::generate_source_export_statement_values(
1968 &scx,
1969 purified_export,
1970 &mut unsupported_cols,
1971 )?;
1972 if !unsupported_cols.is_empty() {
1973 unsupported_cols.sort();
1974 Err(PgSourcePurificationError::UnrecognizedTypes {
1975 cols: unsupported_cols,
1976 })?;
1977 }
1978
1979 if let Some(text_cols_option) = with_options
1980 .iter_mut()
1981 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1982 {
1983 match gen_text_columns {
1984 Some(gen_text_columns) => {
1985 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1986 }
1987 None => soft_panic_or_log!(
1988 "text_columns should be Some if text_cols_option is present"
1989 ),
1990 }
1991 }
1992 if let Some(exclude_cols_option) = with_options
1993 .iter_mut()
1994 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
1995 {
1996 match gen_exclude_columns {
1997 Some(gen_exclude_columns) => {
1998 exclude_cols_option.value =
1999 Some(WithOptionValue::Sequence(gen_exclude_columns))
2000 }
2001 None => soft_panic_or_log!(
2002 "exclude_columns should be Some if exclude_cols_option is present"
2003 ),
2004 }
2005 }
2006 match columns {
2007 TableFromSourceColumns::Defined(_) => unreachable!(),
2008 TableFromSourceColumns::NotSpecified => {
2009 *columns = TableFromSourceColumns::Defined(gen_columns);
2010 *constraints = gen_constraints;
2011 }
2012 TableFromSourceColumns::Named(_) => {
2013 sql_bail!("columns cannot be named for Postgres sources")
2014 }
2015 }
2016 with_options.push(TableFromSourceOption {
2017 name: TableFromSourceOptionName::Details,
2018 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2019 gen_details.into_proto().encode_to_vec(),
2020 )))),
2021 })
2022 }
2023 PurifiedExportDetails::MySql { .. } => {
2024 let mysql::MySqlExportStatementValues {
2025 columns: gen_columns,
2026 constraints: gen_constraints,
2027 text_columns: gen_text_columns,
2028 exclude_columns: gen_exclude_columns,
2029 details: gen_details,
2030 external_reference: _,
2031 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2032
2033 if let Some(text_cols_option) = with_options
2034 .iter_mut()
2035 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2036 {
2037 match gen_text_columns {
2038 Some(gen_text_columns) => {
2039 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2040 }
2041 None => soft_panic_or_log!(
2042 "text_columns should be Some if text_cols_option is present"
2043 ),
2044 }
2045 }
2046 if let Some(exclude_cols_option) = with_options
2047 .iter_mut()
2048 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2049 {
2050 match gen_exclude_columns {
2051 Some(gen_exclude_columns) => {
2052 exclude_cols_option.value =
2053 Some(WithOptionValue::Sequence(gen_exclude_columns))
2054 }
2055 None => soft_panic_or_log!(
2056 "exclude_columns should be Some if exclude_cols_option is present"
2057 ),
2058 }
2059 }
2060 match columns {
2061 TableFromSourceColumns::Defined(_) => unreachable!(),
2062 TableFromSourceColumns::NotSpecified => {
2063 *columns = TableFromSourceColumns::Defined(gen_columns);
2064 *constraints = gen_constraints;
2065 }
2066 TableFromSourceColumns::Named(_) => {
2067 sql_bail!("columns cannot be named for MySQL sources")
2068 }
2069 }
2070 with_options.push(TableFromSourceOption {
2071 name: TableFromSourceOptionName::Details,
2072 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2073 gen_details.into_proto().encode_to_vec(),
2074 )))),
2075 })
2076 }
2077 PurifiedExportDetails::SqlServer { .. } => {
2078 let sql_server::SqlServerExportStatementValues {
2079 columns: gen_columns,
2080 constraints: gen_constraints,
2081 text_columns: gen_text_columns,
2082 excl_columns: gen_excl_columns,
2083 details: gen_details,
2084 external_reference: _,
2085 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2086
2087 if let Some(text_cols_option) = with_options
2088 .iter_mut()
2089 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2090 {
2091 match gen_text_columns {
2092 Some(gen_text_columns) => {
2093 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2094 }
2095 None => soft_panic_or_log!(
2096 "text_columns should be Some if text_cols_option is present"
2097 ),
2098 }
2099 }
2100 if let Some(exclude_cols_option) = with_options
2101 .iter_mut()
2102 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2103 {
2104 match gen_excl_columns {
2105 Some(gen_excl_columns) => {
2106 exclude_cols_option.value =
2107 Some(WithOptionValue::Sequence(gen_excl_columns))
2108 }
2109 None => soft_panic_or_log!(
2110 "excl_columns should be Some if excl_cols_option is present"
2111 ),
2112 }
2113 }
2114
2115 match columns {
2116 TableFromSourceColumns::NotSpecified => {
2117 *columns = TableFromSourceColumns::Defined(gen_columns);
2118 *constraints = gen_constraints;
2119 }
2120 TableFromSourceColumns::Named(_) => {
2121 sql_bail!("columns cannot be named for SQL Server sources")
2122 }
2123 TableFromSourceColumns::Defined(_) => unreachable!(),
2124 }
2125
2126 with_options.push(TableFromSourceOption {
2127 name: TableFromSourceOptionName::Details,
2128 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2129 gen_details.into_proto().encode_to_vec(),
2130 )))),
2131 })
2132 }
2133 PurifiedExportDetails::LoadGenerator { .. } => {
2134 let (desc, output) = match purified_export.details {
2135 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2136 _ => unreachable!("purified export details must be load generator"),
2137 };
2138 if let Some(desc) = desc {
2143 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2144 match columns {
2145 TableFromSourceColumns::Defined(_) => unreachable!(),
2146 TableFromSourceColumns::NotSpecified => {
2147 *columns = TableFromSourceColumns::Defined(gen_columns);
2148 *constraints = gen_constraints;
2149 }
2150 TableFromSourceColumns::Named(_) => {
2151 sql_bail!("columns cannot be named for multi-output load generator sources")
2152 }
2153 }
2154 }
2155 let details = SourceExportStatementDetails::LoadGenerator { output };
2156 with_options.push(TableFromSourceOption {
2157 name: TableFromSourceOptionName::Details,
2158 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2159 details.into_proto().encode_to_vec(),
2160 )))),
2161 })
2162 }
2163 PurifiedExportDetails::Kafka {} => {
2164 let details = SourceExportStatementDetails::Kafka {};
2168 with_options.push(TableFromSourceOption {
2169 name: TableFromSourceOptionName::Details,
2170 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2171 details.into_proto().encode_to_vec(),
2172 )))),
2173 })
2174 }
2175 };
2176
2177 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2181}
2182
2183enum SourceFormatOptions {
2184 Default,
2185 Kafka { topic: String },
2186}
2187
2188async fn purify_source_format(
2189 catalog: &dyn SessionCatalog,
2190 format: &mut Option<FormatSpecifier<Aug>>,
2191 options: &SourceFormatOptions,
2192 envelope: &Option<SourceEnvelope>,
2193 storage_configuration: &StorageConfiguration,
2194) -> Result<(), PlanError> {
2195 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2196 && !matches!(options, SourceFormatOptions::Kafka { .. })
2197 {
2198 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2199 }
2200
2201 match format.as_mut() {
2202 None => {}
2203 Some(FormatSpecifier::Bare(format)) => {
2204 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2205 .await?;
2206 }
2207
2208 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2209 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2210 .await?;
2211 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2212 .await?;
2213 }
2214 }
2215 Ok(())
2216}
2217
2218async fn purify_source_format_single(
2219 catalog: &dyn SessionCatalog,
2220 format: &mut Format<Aug>,
2221 options: &SourceFormatOptions,
2222 envelope: &Option<SourceEnvelope>,
2223 storage_configuration: &StorageConfiguration,
2224) -> Result<(), PlanError> {
2225 match format {
2226 Format::Avro(schema) => match schema {
2227 AvroSchema::Csr { csr_connection } => {
2228 purify_csr_connection_avro(
2229 catalog,
2230 options,
2231 csr_connection,
2232 envelope,
2233 storage_configuration,
2234 )
2235 .await?
2236 }
2237 AvroSchema::InlineSchema { .. } => {}
2238 },
2239 Format::Protobuf(schema) => match schema {
2240 ProtobufSchema::Csr { csr_connection } => {
2241 purify_csr_connection_proto(
2242 catalog,
2243 options,
2244 csr_connection,
2245 envelope,
2246 storage_configuration,
2247 )
2248 .await?;
2249 }
2250 ProtobufSchema::InlineSchema { .. } => {}
2251 },
2252 Format::Bytes
2253 | Format::Regex(_)
2254 | Format::Json { .. }
2255 | Format::Text
2256 | Format::Csv { .. } => (),
2257 }
2258 Ok(())
2259}
2260
2261pub fn generate_subsource_statements(
2262 scx: &StatementContext,
2263 source_name: ResolvedItemName,
2264 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2265) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2266 if subsources.is_empty() {
2268 return Ok(vec![]);
2269 }
2270 let (_, purified_export) = subsources.iter().next().unwrap();
2271
2272 let statements = match &purified_export.details {
2273 PurifiedExportDetails::Postgres { .. } => {
2274 crate::pure::postgres::generate_create_subsource_statements(
2275 scx,
2276 source_name,
2277 subsources,
2278 )?
2279 }
2280 PurifiedExportDetails::MySql { .. } => {
2281 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2282 }
2283 PurifiedExportDetails::SqlServer { .. } => {
2284 crate::pure::sql_server::generate_create_subsource_statements(
2285 scx,
2286 source_name,
2287 subsources,
2288 )?
2289 }
2290 PurifiedExportDetails::LoadGenerator { .. } => {
2291 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2292 for (subsource_name, purified_export) in subsources {
2293 let (desc, output) = match purified_export.details {
2294 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2295 _ => unreachable!("purified export details must be load generator"),
2296 };
2297 let desc =
2298 desc.expect("subsources cannot be generated for single-output load generators");
2299
2300 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2301 let details = SourceExportStatementDetails::LoadGenerator { output };
2302 let subsource = CreateSubsourceStatement {
2304 name: subsource_name,
2305 columns,
2306 of_source: Some(source_name.clone()),
2307 constraints: table_constraints,
2312 if_not_exists: false,
2313 with_options: vec![
2314 CreateSubsourceOption {
2315 name: CreateSubsourceOptionName::ExternalReference,
2316 value: Some(WithOptionValue::UnresolvedItemName(
2317 purified_export.external_reference,
2318 )),
2319 },
2320 CreateSubsourceOption {
2321 name: CreateSubsourceOptionName::Details,
2322 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2323 details.into_proto().encode_to_vec(),
2324 )))),
2325 },
2326 ],
2327 };
2328 subsource_stmts.push(subsource);
2329 }
2330
2331 subsource_stmts
2332 }
2333 PurifiedExportDetails::Kafka { .. } => {
2334 assert!(
2338 subsources.is_empty(),
2339 "Kafka sources do not produce data-bearing subsources"
2340 );
2341 vec![]
2342 }
2343 };
2344 Ok(statements)
2345}
2346
2347async fn purify_csr_connection_proto(
2348 catalog: &dyn SessionCatalog,
2349 options: &SourceFormatOptions,
2350 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2351 envelope: &Option<SourceEnvelope>,
2352 storage_configuration: &StorageConfiguration,
2353) -> Result<(), PlanError> {
2354 let SourceFormatOptions::Kafka { topic } = options else {
2355 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2356 };
2357
2358 let CsrConnectionProtobuf {
2359 seed,
2360 connection: CsrConnection {
2361 connection,
2362 options: _,
2363 },
2364 } = csr_connection;
2365 match seed {
2366 None => {
2367 let scx = StatementContext::new(None, &*catalog);
2368
2369 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2370 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2371 _ => sql_bail!("{} is not a schema registry connection", connection),
2372 };
2373
2374 let ccsr_client = ccsr_connection
2375 .connect(storage_configuration, InTask::No)
2376 .await
2377 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2378
2379 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2380 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2381 .await
2382 .ok();
2383
2384 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2385 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2386 }
2387
2388 *seed = Some(CsrSeedProtobuf { value, key });
2389 }
2390 Some(_) => (),
2391 }
2392
2393 Ok(())
2394}
2395
2396async fn purify_csr_connection_avro(
2397 catalog: &dyn SessionCatalog,
2398 options: &SourceFormatOptions,
2399 csr_connection: &mut CsrConnectionAvro<Aug>,
2400 envelope: &Option<SourceEnvelope>,
2401 storage_configuration: &StorageConfiguration,
2402) -> Result<(), PlanError> {
2403 let SourceFormatOptions::Kafka { topic } = options else {
2404 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2405 };
2406
2407 let CsrConnectionAvro {
2408 connection: CsrConnection { connection, .. },
2409 seed,
2410 key_strategy,
2411 value_strategy,
2412 } = csr_connection;
2413 if seed.is_none() {
2414 let scx = StatementContext::new(None, &*catalog);
2415 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2416 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2417 _ => sql_bail!("{} is not a schema registry connection", connection),
2418 };
2419 let ccsr_client = csr_connection
2420 .connect(storage_configuration, InTask::No)
2421 .await
2422 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2423
2424 let Schema {
2425 key_schema,
2426 value_schema,
2427 } = get_remote_csr_schema(
2428 &ccsr_client,
2429 key_strategy.clone().unwrap_or_default(),
2430 value_strategy.clone().unwrap_or_default(),
2431 topic,
2432 )
2433 .await?;
2434 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2435 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2436 }
2437
2438 *seed = Some(CsrSeedAvro {
2439 key_schema,
2440 value_schema,
2441 })
2442 }
2443
2444 Ok(())
2445}
2446
2447#[derive(Debug)]
2448pub struct Schema {
2449 pub key_schema: Option<String>,
2450 pub value_schema: String,
2451}
2452
2453async fn get_schema_with_strategy(
2454 client: &Client,
2455 strategy: ReaderSchemaSelectionStrategy,
2456 subject: &str,
2457) -> Result<Option<String>, PlanError> {
2458 match strategy {
2459 ReaderSchemaSelectionStrategy::Latest => {
2460 match client.get_schema_by_subject(subject).await {
2461 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2462 Err(GetBySubjectError::SubjectNotFound)
2463 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2464 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2465 schema_lookup: format!("subject {}", subject.quoted()),
2466 cause: Arc::new(e),
2467 }),
2468 }
2469 }
2470 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2471 ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2472 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2473 Err(GetByIdError::SchemaNotFound) => Ok(None),
2474 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2475 schema_lookup: format!("ID {}", id),
2476 cause: Arc::new(e),
2477 }),
2478 },
2479 }
2480}
2481
2482async fn get_remote_csr_schema(
2483 ccsr_client: &mz_ccsr::Client,
2484 key_strategy: ReaderSchemaSelectionStrategy,
2485 value_strategy: ReaderSchemaSelectionStrategy,
2486 topic: &str,
2487) -> Result<Schema, PlanError> {
2488 let value_schema_name = format!("{}-value", topic);
2489 let value_schema =
2490 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2491 let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2492 let subject = format!("{}-key", topic);
2493 let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2494 Ok(Schema {
2495 key_schema,
2496 value_schema,
2497 })
2498}
2499
2500async fn compile_proto(
2502 subject_name: &String,
2503 ccsr_client: &Client,
2504) -> Result<CsrSeedProtobufSchema, PlanError> {
2505 let (primary_subject, dependency_subjects) = ccsr_client
2506 .get_subject_and_references(subject_name)
2507 .await
2508 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2509 schema_lookup: format!("subject {}", subject_name.quoted()),
2510 cause: Arc::new(e),
2511 })?;
2512
2513 let mut source_tree = VirtualSourceTree::new();
2515 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2516 source_tree.as_mut().add_file(
2517 Path::new(&subject.name),
2518 subject.schema.raw.as_bytes().to_vec(),
2519 );
2520 }
2521 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2522 let fds = db
2523 .as_mut()
2524 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2525 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2526
2527 let primary_fd = fds.file(0);
2529 let message_name = match primary_fd.message_type_size() {
2530 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2531 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2532 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2533 };
2534
2535 let bytes = &fds
2537 .serialize()
2538 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2539 let mut schema = String::new();
2540 strconv::format_bytes(&mut schema, bytes);
2541
2542 Ok(CsrSeedProtobufSchema {
2543 schema,
2544 message_name,
2545 })
2546}
2547
2548const MZ_NOW_NAME: &str = "mz_now";
2549const MZ_NOW_SCHEMA: &str = "mz_catalog";
2550
2551pub fn purify_create_materialized_view_options(
2557 catalog: impl SessionCatalog,
2558 mz_now: Option<Timestamp>,
2559 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2560 resolved_ids: &mut ResolvedIds,
2561) {
2562 let (mz_now_id, mz_now_expr) = {
2565 let item = catalog
2566 .resolve_function(&PartialItemName {
2567 database: None,
2568 schema: Some(MZ_NOW_SCHEMA.to_string()),
2569 item: MZ_NOW_NAME.to_string(),
2570 })
2571 .expect("we should be able to resolve mz_now");
2572 (
2573 item.id(),
2574 Expr::Function(Function {
2575 name: ResolvedItemName::Item {
2576 id: item.id(),
2577 qualifiers: item.name().qualifiers.clone(),
2578 full_name: catalog.resolve_full_name(item.name()),
2579 print_id: false,
2580 version: RelationVersionSelector::Latest,
2581 },
2582 args: FunctionArgs::Args {
2583 args: Vec::new(),
2584 order_by: Vec::new(),
2585 },
2586 filter: None,
2587 over: None,
2588 distinct: false,
2589 }),
2590 )
2591 };
2592 let (mz_timestamp_id, mz_timestamp_type) = {
2594 let item = catalog.get_system_type("mz_timestamp");
2595 let full_name = catalog.resolve_full_name(item.name());
2596 (
2597 item.id(),
2598 ResolvedDataType::Named {
2599 id: item.id(),
2600 qualifiers: item.name().qualifiers.clone(),
2601 full_name,
2602 modifiers: vec![],
2603 print_id: true,
2604 },
2605 )
2606 };
2607
2608 let mut introduced_mz_timestamp = false;
2609
2610 for option in cmvs.with_options.iter_mut() {
2611 if matches!(
2613 option.value,
2614 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2615 ) {
2616 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2617 RefreshAtOptionValue {
2618 time: mz_now_expr.clone(),
2619 },
2620 )));
2621 }
2622
2623 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2625 RefreshEveryOptionValue { aligned_to, .. },
2626 ))) = &mut option.value
2627 {
2628 if aligned_to.is_none() {
2629 *aligned_to = Some(mz_now_expr.clone());
2630 }
2631 }
2632
2633 match &mut option.value {
2636 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2637 time,
2638 }))) => {
2639 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2640 visitor.visit_expr_mut(time);
2641 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2642 }
2643 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2644 RefreshEveryOptionValue {
2645 interval: _,
2646 aligned_to: Some(aligned_to),
2647 },
2648 ))) => {
2649 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2650 visitor.visit_expr_mut(aligned_to);
2651 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2652 }
2653 _ => {}
2654 }
2655 }
2656
2657 if !cmvs.with_options.iter().any(|o| {
2659 matches!(
2660 o,
2661 MaterializedViewOption {
2662 value: Some(WithOptionValue::Refresh(..)),
2663 ..
2664 }
2665 )
2666 }) {
2667 cmvs.with_options.push(MaterializedViewOption {
2668 name: MaterializedViewOptionName::Refresh,
2669 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2670 })
2671 }
2672
2673 if introduced_mz_timestamp {
2677 resolved_ids.add_item(mz_timestamp_id);
2678 }
2679 let mut visitor = ExprContainsTemporalVisitor::new();
2683 visitor.visit_create_materialized_view_statement(cmvs);
2684 if !visitor.contains_temporal {
2685 resolved_ids.remove_item(&mz_now_id);
2686 }
2687}
2688
2689pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2692 match &mvo.value {
2693 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2694 let mut visitor = ExprContainsTemporalVisitor::new();
2695 visitor.visit_expr(time);
2696 visitor.contains_temporal
2697 }
2698 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2699 interval: _,
2700 aligned_to: Some(aligned_to),
2701 }))) => {
2702 let mut visitor = ExprContainsTemporalVisitor::new();
2703 visitor.visit_expr(aligned_to);
2704 visitor.contains_temporal
2705 }
2706 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2707 interval: _,
2708 aligned_to: None,
2709 }))) => {
2710 true
2713 }
2714 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2715 true
2717 }
2718 _ => false,
2719 }
2720}
2721
2722struct ExprContainsTemporalVisitor {
2724 pub contains_temporal: bool,
2725}
2726
2727impl ExprContainsTemporalVisitor {
2728 pub fn new() -> ExprContainsTemporalVisitor {
2729 ExprContainsTemporalVisitor {
2730 contains_temporal: false,
2731 }
2732 }
2733}
2734
2735impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2736 fn visit_function(&mut self, func: &Function<Aug>) {
2737 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2738 visit_function(self, func);
2739 }
2740}
2741
2742struct MzNowPurifierVisitor {
2743 pub mz_now: Option<Timestamp>,
2744 pub mz_timestamp_type: ResolvedDataType,
2745 pub introduced_mz_timestamp: bool,
2746}
2747
2748impl MzNowPurifierVisitor {
2749 pub fn new(
2750 mz_now: Option<Timestamp>,
2751 mz_timestamp_type: ResolvedDataType,
2752 ) -> MzNowPurifierVisitor {
2753 MzNowPurifierVisitor {
2754 mz_now,
2755 mz_timestamp_type,
2756 introduced_mz_timestamp: false,
2757 }
2758 }
2759}
2760
2761impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2762 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2763 match expr {
2764 Expr::Function(Function {
2765 name:
2766 ResolvedItemName::Item {
2767 full_name: FullItemName { item, .. },
2768 ..
2769 },
2770 ..
2771 }) if item == &MZ_NOW_NAME.to_string() => {
2772 let mz_now = self.mz_now.expect(
2773 "we should have chosen a timestamp if the expression contains mz_now()",
2774 );
2775 *expr = Expr::Cast {
2778 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2779 data_type: self.mz_timestamp_type.clone(),
2780 };
2781 self.introduced_mz_timestamp = true;
2782 }
2783 _ => visit_expr_mut(self, expr),
2784 }
2785 }
2786}