1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
62};
63use prost::Message;
64use protobuf_native::MessageLite;
65use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
66use rdkafka::admin::AdminClient;
67use references::{RetrievedSourceReferences, SourceReferenceClient};
68use uuid::Uuid;
69
70use crate::ast::{
71 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
72 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
73 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
74};
75use crate::catalog::{CatalogItemType, SessionCatalog};
76use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
77use crate::names::{
78 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
79 ResolvedItemName,
80};
81use crate::plan::error::PlanError;
82use crate::plan::statement::ddl::load_generator_ast_to_generator;
83use crate::plan::{SourceReferences, StatementContext};
84use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
85use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
86use crate::{kafka_util, normalize};
87
88use self::error::{
89 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
90 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
91};
92
93pub(crate) mod error;
94mod references;
95
96pub mod mysql;
97pub mod postgres;
98pub mod sql_server;
99
100pub(crate) struct RequestedSourceExport<T> {
101 external_reference: UnresolvedItemName,
102 name: UnresolvedItemName,
103 meta: T,
104}
105
106impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.debug_struct("RequestedSourceExport")
109 .field("external_reference", &self.external_reference)
110 .field("name", &self.name)
111 .field("meta", &self.meta)
112 .finish()
113 }
114}
115
116impl<T> RequestedSourceExport<T> {
117 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
118 RequestedSourceExport {
119 external_reference: self.external_reference,
120 name: self.name,
121 meta: new_meta,
122 }
123 }
124}
125
126fn source_export_name_gen(
131 source_name: &UnresolvedItemName,
132 subsource_name: &str,
133) -> Result<UnresolvedItemName, PlanError> {
134 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
135 partial.item = subsource_name.to_string();
136 Ok(UnresolvedItemName::from(partial))
137}
138
139fn validate_source_export_names<T>(
143 requested_source_exports: &[RequestedSourceExport<T>],
144) -> Result<(), PlanError> {
145 if let Some(name) = requested_source_exports
149 .iter()
150 .map(|subsource| &subsource.name)
151 .duplicates()
152 .next()
153 .cloned()
154 {
155 let mut upstream_references: Vec<_> = requested_source_exports
156 .into_iter()
157 .filter_map(|subsource| {
158 if &subsource.name == &name {
159 Some(subsource.external_reference.clone())
160 } else {
161 None
162 }
163 })
164 .collect();
165
166 upstream_references.sort();
167
168 Err(PlanError::SubsourceNameConflict {
169 name,
170 upstream_references,
171 })?;
172 }
173
174 if let Some(name) = requested_source_exports
180 .iter()
181 .map(|export| &export.external_reference)
182 .duplicates()
183 .next()
184 .cloned()
185 {
186 let mut target_names: Vec<_> = requested_source_exports
187 .into_iter()
188 .filter_map(|export| {
189 if &export.external_reference == &name {
190 Some(export.name.clone())
191 } else {
192 None
193 }
194 })
195 .collect();
196
197 target_names.sort();
198
199 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
200 }
201
202 Ok(())
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum PurifiedStatement {
207 PurifiedCreateSource {
208 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
210 create_source_stmt: CreateSourceStatement<Aug>,
211 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213 available_source_references: SourceReferences,
216 },
217 PurifiedAlterSource {
218 alter_source_stmt: AlterSourceStatement<Aug>,
219 },
220 PurifiedAlterSourceAddSubsources {
221 source_name: ResolvedItemName,
223 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228 },
229 PurifiedAlterSourceRefreshReferences {
230 source_name: ResolvedItemName,
231 available_source_references: SourceReferences,
233 },
234 PurifiedCreateSink(CreateSinkStatement<Aug>),
235 PurifiedCreateTableFromSource {
236 stmt: CreateTableFromSourceStatement<Aug>,
237 },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242 pub external_reference: UnresolvedItemName,
243 pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248 MySql {
249 table: MySqlTableDesc,
250 text_columns: Option<Vec<Ident>>,
251 exclude_columns: Option<Vec<Ident>>,
252 initial_gtid_set: String,
253 },
254 Postgres {
255 table: PostgresTableDesc,
256 text_columns: Option<Vec<Ident>>,
257 exclude_columns: Option<Vec<Ident>>,
258 },
259 SqlServer {
260 table: SqlServerTableDesc,
261 text_columns: Option<Vec<Ident>>,
262 excl_columns: Option<Vec<Ident>>,
263 capture_instance: Arc<str>,
264 initial_lsn: mz_sql_server_util::cdc::Lsn,
265 },
266 Kafka {},
267 LoadGenerator {
268 table: Option<RelationDesc>,
269 output: LoadGeneratorOutput,
270 },
271}
272
273pub async fn purify_statement(
283 catalog: impl SessionCatalog,
284 now: u64,
285 stmt: Statement<Aug>,
286 storage_configuration: &StorageConfiguration,
287) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
288 match stmt {
289 Statement::CreateSource(stmt) => {
290 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
291 (
292 purify_create_source(catalog, now, stmt, storage_configuration).await,
293 cluster_id,
294 )
295 }
296 Statement::AlterSource(stmt) => (
297 purify_alter_source(catalog, stmt, storage_configuration).await,
298 None,
299 ),
300 Statement::CreateSink(stmt) => {
301 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
302 (
303 purify_create_sink(catalog, stmt, storage_configuration).await,
304 cluster_id,
305 )
306 }
307 Statement::CreateTableFromSource(stmt) => (
308 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
309 None,
310 ),
311 o => unreachable!("{:?} does not need to be purified", o),
312 }
313}
314
315pub(crate) fn purify_create_sink_avro_doc_on_options(
320 catalog: &dyn SessionCatalog,
321 from_id: CatalogItemId,
322 format: &mut Option<FormatSpecifier<Aug>>,
323) -> Result<(), PlanError> {
324 let from = catalog.get_item(&from_id);
326 let object_ids = from
327 .references()
328 .items()
329 .copied()
330 .chain_one(from.id())
331 .collect::<Vec<_>>();
332
333 let mut avro_format_options = vec![];
336 for_each_format(format, |doc_on_schema, fmt| match fmt {
337 Format::Avro(AvroSchema::InlineSchema { .. })
338 | Format::Bytes
339 | Format::Csv { .. }
340 | Format::Json { .. }
341 | Format::Protobuf(..)
342 | Format::Regex(..)
343 | Format::Text => (),
344 Format::Avro(AvroSchema::Csr {
345 csr_connection: CsrConnectionAvro { connection, .. },
346 }) => {
347 avro_format_options.push((doc_on_schema, &mut connection.options));
348 }
349 });
350
351 for (for_schema, options) in avro_format_options {
354 let user_provided_comments = options
355 .iter()
356 .filter_map(|CsrConfigOption { name, .. }| match name {
357 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
358 _ => None,
359 })
360 .collect::<BTreeSet<_>>();
361
362 for object_id in &object_ids {
364 let item = catalog
366 .get_item(object_id)
367 .at_version(RelationVersionSelector::Latest);
368 let full_name = catalog.resolve_full_name(item.name());
369 let full_resolved_name = ResolvedItemName::Item {
370 id: *object_id,
371 qualifiers: item.name().qualifiers.clone(),
372 full_name: full_name.clone(),
373 print_id: !matches!(
374 item.item_type(),
375 CatalogItemType::Func | CatalogItemType::Type
376 ),
377 version: RelationVersionSelector::Latest,
378 };
379
380 if let Some(comments_map) = catalog.get_item_comments(object_id) {
381 let doc_on_item_key = AvroDocOn {
384 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
385 for_schema,
386 };
387 if !user_provided_comments.contains(&doc_on_item_key) {
388 if let Some(root_comment) = comments_map.get(&None) {
389 options.push(CsrConfigOption {
390 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
391 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
392 root_comment.clone(),
393 ))),
394 });
395 }
396 }
397
398 let column_descs: Result<Option<RelationDesc>, PlanError> = match item.item_type() {
402 CatalogItemType::Type => item
403 .type_details()
404 .and_then(|details| details.typ.desc(catalog).transpose())
405 .transpose(),
406 _ => item
407 .desc(&full_name)
408 .map(|desc| Some(desc.into_owned()))
409 .map_err(Into::into),
410 };
411
412 if let Ok(Some(desc)) = column_descs {
413 for (pos, column_name) in desc.iter_names().enumerate() {
414 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
415 let doc_on_column_key = AvroDocOn {
416 identifier: DocOnIdentifier::Column(ColumnName {
417 relation: full_resolved_name.clone(),
418 column: ResolvedColumnReference::Column {
419 name: column_name.to_owned(),
420 index: pos,
421 },
422 }),
423 for_schema,
424 };
425 if !user_provided_comments.contains(&doc_on_column_key) {
426 options.push(CsrConfigOption {
427 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
428 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
429 Value::String(comment_str.clone()),
430 )),
431 });
432 }
433 }
434 }
435 }
436 }
437 }
438 }
439
440 Ok(())
441}
442
443async fn purify_create_sink(
446 catalog: impl SessionCatalog,
447 mut create_sink_stmt: CreateSinkStatement<Aug>,
448 storage_configuration: &StorageConfiguration,
449) -> Result<PurifiedStatement, PlanError> {
450 let CreateSinkStatement {
452 connection,
453 format,
454 with_options,
455 name: _,
456 in_cluster: _,
457 if_not_exists: _,
458 from,
459 envelope: _,
460 } = &mut create_sink_stmt;
461
462 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
464
465 if let Some(op) = with_options
466 .iter()
467 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
468 {
469 sql_bail!(
470 "CREATE SINK...WITH ({}..) is not allowed",
471 op.name.to_ast_string_simple(),
472 )
473 }
474
475 match &connection {
476 CreateSinkConnection::Kafka {
477 connection,
478 options,
479 key: _,
480 headers: _,
481 } => {
482 let scx = StatementContext::new(None, &catalog);
487 let connection = {
488 let item = scx.get_item_by_resolved_name(connection)?;
489 match item.connection()? {
491 Connection::Kafka(connection) => {
492 connection.clone().into_inline_connection(scx.catalog)
493 }
494 _ => sql_bail!(
495 "{} is not a kafka connection",
496 scx.catalog.resolve_full_name(item.name())
497 ),
498 }
499 };
500
501 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
502
503 if extracted_options.legacy_ids == Some(true) {
504 sql_bail!("LEGACY IDs option is not supported");
505 }
506
507 let client: AdminClient<_> = connection
508 .create_with_context(
509 storage_configuration,
510 MzClientContext::default(),
511 &BTreeMap::new(),
512 InTask::No,
513 )
514 .await
515 .map_err(|e| {
516 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
518 })?;
519
520 let metadata = client
521 .inner()
522 .fetch_metadata(
523 None,
524 storage_configuration
525 .parameters
526 .kafka_timeout_config
527 .fetch_metadata_timeout,
528 )
529 .map_err(|e| {
530 KafkaSinkPurificationError::AdminClientError(Arc::new(
531 ContextCreationError::KafkaError(e),
532 ))
533 })?;
534
535 if metadata.brokers().len() == 0 {
536 Err(KafkaSinkPurificationError::ZeroBrokers)?;
537 }
538 }
539 CreateSinkConnection::Iceberg {
540 connection,
541 aws_connection,
542 ..
543 } => {
544 let scx = StatementContext::new(None, &catalog);
545 let connection = {
546 let item = scx.get_item_by_resolved_name(connection)?;
547 match item.connection()? {
549 Connection::IcebergCatalog(connection) => {
550 connection.clone().into_inline_connection(scx.catalog)
551 }
552 _ => sql_bail!(
553 "{} is not an iceberg connection",
554 scx.catalog.resolve_full_name(item.name())
555 ),
556 }
557 };
558
559 let aws_conn_id = aws_connection.item_id();
560
561 let aws_connection = {
562 let item = scx.get_item_by_resolved_name(aws_connection)?;
563 match item.connection()? {
565 Connection::Aws(aws_connection) => aws_connection.clone(),
566 _ => sql_bail!(
567 "{} is not an aws connection",
568 scx.catalog.resolve_full_name(item.name())
569 ),
570 }
571 };
572
573 let _catalog = connection
574 .connect(storage_configuration, InTask::No)
575 .await
576 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
577
578 let sdk_config = aws_connection
579 .load_sdk_config(
580 &storage_configuration.connection_context,
581 aws_conn_id.clone(),
582 InTask::No,
583 )
584 .await
585 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
586
587 let sts_client = aws_sdk_sts::Client::new(&sdk_config);
588 let _ = sts_client.get_caller_identity().send().await.map_err(|e| {
589 IcebergSinkPurificationError::StsIdentityError(Arc::new(e.into_service_error()))
590 })?;
591 }
592 }
593
594 let mut csr_connection_ids = BTreeSet::new();
595 for_each_format(format, |_, fmt| match fmt {
596 Format::Avro(AvroSchema::InlineSchema { .. })
597 | Format::Bytes
598 | Format::Csv { .. }
599 | Format::Json { .. }
600 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
601 | Format::Regex(..)
602 | Format::Text => (),
603 Format::Avro(AvroSchema::Csr {
604 csr_connection: CsrConnectionAvro { connection, .. },
605 })
606 | Format::Protobuf(ProtobufSchema::Csr {
607 csr_connection: CsrConnectionProtobuf { connection, .. },
608 }) => {
609 csr_connection_ids.insert(*connection.connection.item_id());
610 }
611 });
612
613 let scx = StatementContext::new(None, &catalog);
614 for csr_connection_id in csr_connection_ids {
615 let connection = {
616 let item = scx.get_item(&csr_connection_id);
617 match item.connection()? {
619 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
620 _ => Err(CsrPurificationError::NotCsrConnection(
621 scx.catalog.resolve_full_name(item.name()),
622 ))?,
623 }
624 };
625
626 let client = connection
627 .connect(storage_configuration, InTask::No)
628 .await
629 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
630
631 client
632 .list_subjects()
633 .await
634 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
635 }
636
637 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
638
639 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
640}
641
642fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
649where
650 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
651{
652 match format {
653 None => (),
654 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
655 Some(FormatSpecifier::KeyValue { key, value }) => {
656 f(DocOnSchema::KeyOnly, key);
657 f(DocOnSchema::ValueOnly, value);
658 }
659 }
660}
661
662#[derive(Debug, Copy, Clone, PartialEq, Eq)]
665pub(crate) enum SourceReferencePolicy {
666 NotAllowed,
669 Optional,
672 Required,
675}
676
677async fn purify_create_source(
678 catalog: impl SessionCatalog,
679 now: u64,
680 mut create_source_stmt: CreateSourceStatement<Aug>,
681 storage_configuration: &StorageConfiguration,
682) -> Result<PurifiedStatement, PlanError> {
683 let CreateSourceStatement {
684 name: source_name,
685 col_names,
686 key_constraint,
687 connection: source_connection,
688 format,
689 envelope,
690 include_metadata,
691 external_references,
692 progress_subsource,
693 with_options,
694 ..
695 } = &mut create_source_stmt;
696
697 let uses_old_syntax = !col_names.is_empty()
698 || key_constraint.is_some()
699 || format.is_some()
700 || envelope.is_some()
701 || !include_metadata.is_empty()
702 || external_references.is_some()
703 || progress_subsource.is_some();
704
705 if let Some(DeferredItemName::Named(_)) = progress_subsource {
706 sql_bail!("Cannot manually ID qualify progress subsource")
707 }
708
709 let mut requested_subsource_map = BTreeMap::new();
710
711 let progress_desc = match &source_connection {
712 CreateSourceConnection::Kafka { .. } => {
713 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
714 }
715 CreateSourceConnection::Postgres { .. } => {
716 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
717 }
718 CreateSourceConnection::SqlServer { .. } => {
719 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
720 }
721 CreateSourceConnection::MySql { .. } => {
722 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
723 }
724 CreateSourceConnection::LoadGenerator { .. } => {
725 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
726 }
727 };
728 let scx = StatementContext::new(None, &catalog);
729
730 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
734 && scx.catalog.system_vars().force_source_table_syntax()
735 {
736 SourceReferencePolicy::NotAllowed
737 } else if scx.catalog.system_vars().enable_create_table_from_source() {
738 SourceReferencePolicy::Optional
739 } else {
740 SourceReferencePolicy::Required
741 };
742
743 let mut format_options = SourceFormatOptions::Default;
744
745 let retrieved_source_references: RetrievedSourceReferences;
746
747 match source_connection {
748 CreateSourceConnection::Kafka {
749 connection,
750 options: base_with_options,
751 ..
752 } => {
753 if let Some(external_references) = external_references {
754 Err(KafkaSourcePurificationError::ReferencedSubsources(
755 external_references.clone(),
756 ))?;
757 }
758
759 let connection = {
760 let item = scx.get_item_by_resolved_name(connection)?;
761 match item.connection()? {
763 Connection::Kafka(connection) => {
764 connection.clone().into_inline_connection(&catalog)
765 }
766 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
767 scx.catalog.resolve_full_name(item.name()),
768 ))?,
769 }
770 };
771
772 let extracted_options: KafkaSourceConfigOptionExtracted =
773 base_with_options.clone().try_into()?;
774
775 let topic = extracted_options
776 .topic
777 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
778
779 let consumer = connection
780 .create_with_context(
781 storage_configuration,
782 MzClientContext::default(),
783 &BTreeMap::new(),
784 InTask::No,
785 )
786 .await
787 .map_err(|e| {
788 KafkaSourcePurificationError::KafkaConsumerError(
790 e.display_with_causes().to_string(),
791 )
792 })?;
793 let consumer = Arc::new(consumer);
794
795 match (
796 extracted_options.start_offset,
797 extracted_options.start_timestamp,
798 ) {
799 (None, None) => {
800 kafka_util::ensure_topic_exists(
802 Arc::clone(&consumer),
803 &topic,
804 storage_configuration
805 .parameters
806 .kafka_timeout_config
807 .fetch_metadata_timeout,
808 )
809 .await?;
810 }
811 (Some(_), Some(_)) => {
812 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
813 }
814 (Some(start_offsets), None) => {
815 kafka_util::validate_start_offsets(
817 Arc::clone(&consumer),
818 &topic,
819 start_offsets,
820 storage_configuration
821 .parameters
822 .kafka_timeout_config
823 .fetch_metadata_timeout,
824 )
825 .await?;
826 }
827 (None, Some(time_offset)) => {
828 let start_offsets = kafka_util::lookup_start_offsets(
830 Arc::clone(&consumer),
831 &topic,
832 time_offset,
833 now,
834 storage_configuration
835 .parameters
836 .kafka_timeout_config
837 .fetch_metadata_timeout,
838 )
839 .await?;
840
841 base_with_options.retain(|val| {
842 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
843 });
844 base_with_options.push(KafkaSourceConfigOption {
845 name: KafkaSourceConfigOptionName::StartOffset,
846 value: Some(WithOptionValue::Sequence(
847 start_offsets
848 .iter()
849 .map(|offset| {
850 WithOptionValue::Value(Value::Number(offset.to_string()))
851 })
852 .collect(),
853 )),
854 });
855 }
856 }
857
858 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
859 retrieved_source_references = reference_client.get_source_references().await?;
860
861 format_options = SourceFormatOptions::Kafka { topic };
862 }
863 CreateSourceConnection::Postgres {
864 connection,
865 options,
866 } => {
867 let connection_item = scx.get_item_by_resolved_name(connection)?;
868 let connection = match connection_item.connection().map_err(PlanError::from)? {
869 Connection::Postgres(connection) => {
870 connection.clone().into_inline_connection(&catalog)
871 }
872 _ => Err(PgSourcePurificationError::NotPgConnection(
873 scx.catalog.resolve_full_name(connection_item.name()),
874 ))?,
875 };
876 let crate::plan::statement::PgConfigOptionExtracted {
877 publication,
878 text_columns,
879 exclude_columns,
880 details,
881 ..
882 } = options.clone().try_into()?;
883 let publication =
884 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
885
886 if details.is_some() {
887 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
888 }
889
890 let client = connection
891 .validate(connection_item.id(), storage_configuration)
892 .await?;
893
894 let reference_client = SourceReferenceClient::Postgres {
895 client: &client,
896 publication: &publication,
897 database: &connection.database,
898 };
899 retrieved_source_references = reference_client.get_source_references().await?;
900
901 let postgres::PurifiedSourceExports {
902 source_exports: subsources,
903 normalized_text_columns,
904 } = postgres::purify_source_exports(
905 &client,
906 &retrieved_source_references,
907 external_references,
908 text_columns,
909 exclude_columns,
910 source_name,
911 &reference_policy,
912 )
913 .await?;
914
915 if let Some(text_cols_option) = options
916 .iter_mut()
917 .find(|option| option.name == PgConfigOptionName::TextColumns)
918 {
919 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
920 }
921
922 requested_subsource_map.extend(subsources);
923
924 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
927
928 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
930 let details = PostgresSourcePublicationDetails {
931 slot: format!(
932 "materialize_{}",
933 Uuid::new_v4().to_string().replace('-', "")
934 ),
935 timeline_id: Some(timeline_id),
936 database: connection.database,
937 };
938 options.push(PgConfigOption {
939 name: PgConfigOptionName::Details,
940 value: Some(WithOptionValue::Value(Value::String(hex::encode(
941 details.into_proto().encode_to_vec(),
942 )))),
943 })
944 }
945 CreateSourceConnection::SqlServer {
946 connection,
947 options,
948 } => {
949 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
950
951 let connection_item = scx.get_item_by_resolved_name(connection)?;
954 let connection = match connection_item.connection()? {
955 Connection::SqlServer(connection) => {
956 connection.clone().into_inline_connection(&catalog)
957 }
958 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
959 scx.catalog.resolve_full_name(connection_item.name()),
960 ))?,
961 };
962 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
963 details,
964 text_columns,
965 exclude_columns,
966 seen: _,
967 } = options.clone().try_into()?;
968
969 if details.is_some() {
970 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
971 }
972
973 let mut client = connection
974 .validate(connection_item.id(), storage_configuration)
975 .await?;
976
977 let database: Arc<str> = connection.database.into();
978 let reference_client = SourceReferenceClient::SqlServer {
979 client: &mut client,
980 database: Arc::clone(&database),
981 };
982 retrieved_source_references = reference_client.get_source_references().await?;
983 tracing::debug!(?retrieved_source_references, "got source references");
984
985 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
986 .get(storage_configuration.config_set());
987
988 let purified_source_exports = sql_server::purify_source_exports(
989 &*database,
990 &mut client,
991 &retrieved_source_references,
992 external_references,
993 &text_columns,
994 &exclude_columns,
995 source_name,
996 timeout,
997 &reference_policy,
998 )
999 .await?;
1000
1001 let sql_server::PurifiedSourceExports {
1002 source_exports,
1003 normalized_text_columns,
1004 normalized_excl_columns,
1005 } = purified_source_exports;
1006
1007 requested_subsource_map.extend(source_exports);
1009
1010 let restore_history_id =
1014 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1015 let details = SqlServerSourceExtras { restore_history_id };
1016
1017 options.retain(|SqlServerConfigOption { name, .. }| {
1018 name != &SqlServerConfigOptionName::Details
1019 });
1020 options.push(SqlServerConfigOption {
1021 name: SqlServerConfigOptionName::Details,
1022 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1023 details.into_proto().encode_to_vec(),
1024 )))),
1025 });
1026
1027 if let Some(text_cols_option) = options
1029 .iter_mut()
1030 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1031 {
1032 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1033 }
1034 if let Some(excl_cols_option) = options
1035 .iter_mut()
1036 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1037 {
1038 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1039 }
1040 }
1041 CreateSourceConnection::MySql {
1042 connection,
1043 options,
1044 } => {
1045 let connection_item = scx.get_item_by_resolved_name(connection)?;
1046 let connection = match connection_item.connection()? {
1047 Connection::MySql(connection) => {
1048 connection.clone().into_inline_connection(&catalog)
1049 }
1050 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1051 scx.catalog.resolve_full_name(connection_item.name()),
1052 ))?,
1053 };
1054 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1055 details,
1056 text_columns,
1057 exclude_columns,
1058 seen: _,
1059 } = options.clone().try_into()?;
1060
1061 if details.is_some() {
1062 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1063 }
1064
1065 let mut conn = connection
1066 .validate(connection_item.id(), storage_configuration)
1067 .await
1068 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1069
1070 let initial_gtid_set =
1074 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1075
1076 let reference_client = SourceReferenceClient::MySql {
1077 conn: &mut conn,
1078 include_system_schemas: mysql::references_system_schemas(external_references),
1079 };
1080 retrieved_source_references = reference_client.get_source_references().await?;
1081
1082 let mysql::PurifiedSourceExports {
1083 source_exports: subsources,
1084 normalized_text_columns,
1085 normalized_exclude_columns,
1086 } = mysql::purify_source_exports(
1087 &mut conn,
1088 &retrieved_source_references,
1089 external_references,
1090 text_columns,
1091 exclude_columns,
1092 source_name,
1093 initial_gtid_set.clone(),
1094 &reference_policy,
1095 )
1096 .await?;
1097 requested_subsource_map.extend(subsources);
1098
1099 let details = MySqlSourceDetails {};
1102 options
1104 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1105 options.push(MySqlConfigOption {
1106 name: MySqlConfigOptionName::Details,
1107 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1108 details.into_proto().encode_to_vec(),
1109 )))),
1110 });
1111
1112 if let Some(text_cols_option) = options
1113 .iter_mut()
1114 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1115 {
1116 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1117 }
1118 if let Some(exclude_cols_option) = options
1119 .iter_mut()
1120 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1121 {
1122 exclude_cols_option.value =
1123 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1124 }
1125 }
1126 CreateSourceConnection::LoadGenerator { generator, options } => {
1127 let load_generator =
1128 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1129
1130 let reference_client = SourceReferenceClient::LoadGenerator {
1131 generator: &load_generator,
1132 };
1133 retrieved_source_references = reference_client.get_source_references().await?;
1134 let multi_output_sources =
1138 retrieved_source_references
1139 .all_references()
1140 .iter()
1141 .any(|r| {
1142 r.load_generator_output().expect("is loadgen")
1143 != &LoadGeneratorOutput::Default
1144 });
1145
1146 match external_references {
1147 Some(requested)
1148 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1149 {
1150 Err(PlanError::UseTablesForSources(requested.to_string()))?
1151 }
1152 Some(requested) if !multi_output_sources => match requested {
1153 ExternalReferences::SubsetTables(_) => {
1154 Err(LoadGeneratorSourcePurificationError::ForTables)?
1155 }
1156 ExternalReferences::SubsetSchemas(_) => {
1157 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1158 }
1159 ExternalReferences::All => {
1160 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1161 }
1162 },
1163 Some(requested) => {
1164 let requested_exports = retrieved_source_references
1165 .requested_source_exports(Some(requested), source_name)?;
1166 for export in requested_exports {
1167 requested_subsource_map.insert(
1168 export.name,
1169 PurifiedSourceExport {
1170 external_reference: export.external_reference,
1171 details: PurifiedExportDetails::LoadGenerator {
1172 table: export
1173 .meta
1174 .load_generator_desc()
1175 .expect("is loadgen")
1176 .clone(),
1177 output: export
1178 .meta
1179 .load_generator_output()
1180 .expect("is loadgen")
1181 .clone(),
1182 },
1183 },
1184 );
1185 }
1186 }
1187 None => {
1188 if multi_output_sources
1189 && matches!(reference_policy, SourceReferencePolicy::Required)
1190 {
1191 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1192 }
1193 }
1194 }
1195
1196 if let LoadGenerator::Clock = generator {
1197 if !options
1198 .iter()
1199 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1200 {
1201 let now = catalog.now();
1202 options.push(LoadGeneratorOption {
1203 name: LoadGeneratorOptionName::AsOf,
1204 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1205 });
1206 }
1207 }
1208 }
1209 }
1210
1211 *external_references = None;
1215
1216 let create_progress_subsource_stmt = if uses_old_syntax {
1218 let name = match progress_subsource {
1220 Some(name) => match name {
1221 DeferredItemName::Deferred(name) => name.clone(),
1222 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1223 },
1224 None => {
1225 let (item, prefix) = source_name.0.split_last().unwrap();
1226 let item_name =
1227 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1228 let mut suggested_name = prefix.to_vec();
1229 suggested_name.push(candidate.clone());
1230
1231 let partial =
1232 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1233 let qualified = scx.allocate_qualified_name(partial)?;
1234 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1235 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1236 Ok::<_, PlanError>(!item_exists && !type_exists)
1237 })?;
1238
1239 let mut full_name = prefix.to_vec();
1240 full_name.push(item_name);
1241 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1242 let qualified_name = scx.allocate_qualified_name(full_name)?;
1243 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1244
1245 UnresolvedItemName::from(full_name.clone())
1246 }
1247 };
1248
1249 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1250
1251 let mut progress_with_options: Vec<_> = with_options
1253 .iter()
1254 .filter_map(|opt| match opt.name {
1255 CreateSourceOptionName::TimestampInterval => None,
1256 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1257 name: CreateSubsourceOptionName::RetainHistory,
1258 value: opt.value.clone(),
1259 }),
1260 })
1261 .collect();
1262 progress_with_options.push(CreateSubsourceOption {
1263 name: CreateSubsourceOptionName::Progress,
1264 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1265 });
1266
1267 Some(CreateSubsourceStatement {
1268 name,
1269 columns,
1270 of_source: None,
1274 constraints,
1275 if_not_exists: false,
1276 with_options: progress_with_options,
1277 })
1278 } else {
1279 None
1280 };
1281
1282 purify_source_format(
1283 &catalog,
1284 format,
1285 &format_options,
1286 envelope,
1287 storage_configuration,
1288 )
1289 .await?;
1290
1291 Ok(PurifiedStatement::PurifiedCreateSource {
1292 create_progress_subsource_stmt,
1293 create_source_stmt,
1294 subsources: requested_subsource_map,
1295 available_source_references: retrieved_source_references.available_source_references(),
1296 })
1297}
1298
1299async fn purify_alter_source(
1302 catalog: impl SessionCatalog,
1303 stmt: AlterSourceStatement<Aug>,
1304 storage_configuration: &StorageConfiguration,
1305) -> Result<PurifiedStatement, PlanError> {
1306 let scx = StatementContext::new(None, &catalog);
1307 let AlterSourceStatement {
1308 source_name: unresolved_source_name,
1309 action,
1310 if_exists,
1311 } = stmt;
1312
1313 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1315 Ok(item) => item,
1316 Err(_) if if_exists => {
1317 return Ok(PurifiedStatement::PurifiedAlterSource {
1318 alter_source_stmt: AlterSourceStatement {
1319 source_name: unresolved_source_name,
1320 action,
1321 if_exists,
1322 },
1323 });
1324 }
1325 Err(e) => return Err(e),
1326 };
1327
1328 let desc = match item.source_desc()? {
1330 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1331 None => {
1332 sql_bail!("cannot ALTER this type of source")
1333 }
1334 };
1335
1336 let source_name = item.name();
1337
1338 let resolved_source_name = ResolvedItemName::Item {
1339 id: item.id(),
1340 qualifiers: item.name().qualifiers.clone(),
1341 full_name: scx.catalog.resolve_full_name(source_name),
1342 print_id: true,
1343 version: RelationVersionSelector::Latest,
1344 };
1345
1346 let partial_name = scx.catalog.minimal_qualification(source_name);
1347
1348 match action {
1349 AlterSourceAction::AddSubsources {
1350 external_references,
1351 options,
1352 } => {
1353 if scx.catalog.system_vars().enable_create_table_from_source()
1354 && scx.catalog.system_vars().force_source_table_syntax()
1355 {
1356 Err(PlanError::UseTablesForSources(
1357 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1358 ))?;
1359 }
1360
1361 purify_alter_source_add_subsources(
1362 external_references,
1363 options,
1364 desc,
1365 partial_name,
1366 unresolved_source_name,
1367 resolved_source_name,
1368 storage_configuration,
1369 )
1370 .await
1371 }
1372 AlterSourceAction::RefreshReferences => {
1373 purify_alter_source_refresh_references(
1374 desc,
1375 resolved_source_name,
1376 storage_configuration,
1377 )
1378 .await
1379 }
1380 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1381 alter_source_stmt: AlterSourceStatement {
1382 source_name: unresolved_source_name,
1383 action,
1384 if_exists,
1385 },
1386 }),
1387 }
1388}
1389
1390async fn purify_alter_source_add_subsources(
1393 external_references: Vec<ExternalReferenceExport>,
1394 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1395 desc: SourceDesc,
1396 partial_source_name: PartialItemName,
1397 unresolved_source_name: UnresolvedItemName,
1398 resolved_source_name: ResolvedItemName,
1399 storage_configuration: &StorageConfiguration,
1400) -> Result<PurifiedStatement, PlanError> {
1401 let connection_id = match &desc.connection {
1403 GenericSourceConnection::Postgres(c) => c.connection_id,
1404 GenericSourceConnection::MySql(c) => c.connection_id,
1405 GenericSourceConnection::SqlServer(c) => c.connection_id,
1406 _ => sql_bail!(
1407 "source {} does not support ALTER SOURCE.",
1408 partial_source_name
1409 ),
1410 };
1411
1412 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1413 text_columns,
1414 exclude_columns,
1415 details,
1416 seen: _,
1417 } = options.clone().try_into()?;
1418 assert_none!(details, "details cannot be explicitly set");
1419
1420 let mut requested_subsource_map = BTreeMap::new();
1421
1422 match desc.connection {
1423 GenericSourceConnection::Postgres(pg_source_connection) => {
1424 let pg_connection = &pg_source_connection.connection;
1426
1427 let client = pg_connection
1428 .validate(connection_id, storage_configuration)
1429 .await?;
1430
1431 let reference_client = SourceReferenceClient::Postgres {
1432 client: &client,
1433 publication: &pg_source_connection.publication,
1434 database: &pg_connection.database,
1435 };
1436 let retrieved_source_references = reference_client.get_source_references().await?;
1437
1438 let postgres::PurifiedSourceExports {
1439 source_exports: subsources,
1440 normalized_text_columns,
1441 } = postgres::purify_source_exports(
1442 &client,
1443 &retrieved_source_references,
1444 &Some(ExternalReferences::SubsetTables(external_references)),
1445 text_columns,
1446 exclude_columns,
1447 &unresolved_source_name,
1448 &SourceReferencePolicy::Required,
1449 )
1450 .await?;
1451
1452 if let Some(text_cols_option) = options
1453 .iter_mut()
1454 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1455 {
1456 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1457 }
1458
1459 requested_subsource_map.extend(subsources);
1460 }
1461 GenericSourceConnection::MySql(mysql_source_connection) => {
1462 let mysql_connection = &mysql_source_connection.connection;
1463 let config = mysql_connection
1464 .config(
1465 &storage_configuration.connection_context.secrets_reader,
1466 storage_configuration,
1467 InTask::No,
1468 )
1469 .await?;
1470
1471 let mut conn = config
1472 .connect(
1473 "mysql purification",
1474 &storage_configuration.connection_context.ssh_tunnel_manager,
1475 )
1476 .await?;
1477
1478 let initial_gtid_set =
1481 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1482
1483 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1484
1485 let reference_client = SourceReferenceClient::MySql {
1486 conn: &mut conn,
1487 include_system_schemas: mysql::references_system_schemas(&requested_references),
1488 };
1489 let retrieved_source_references = reference_client.get_source_references().await?;
1490
1491 let mysql::PurifiedSourceExports {
1492 source_exports: subsources,
1493 normalized_text_columns,
1494 normalized_exclude_columns,
1495 } = mysql::purify_source_exports(
1496 &mut conn,
1497 &retrieved_source_references,
1498 &requested_references,
1499 text_columns,
1500 exclude_columns,
1501 &unresolved_source_name,
1502 initial_gtid_set,
1503 &SourceReferencePolicy::Required,
1504 )
1505 .await?;
1506 requested_subsource_map.extend(subsources);
1507
1508 if let Some(text_cols_option) = options
1510 .iter_mut()
1511 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1512 {
1513 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1514 }
1515 if let Some(exclude_cols_option) = options
1516 .iter_mut()
1517 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1518 {
1519 exclude_cols_option.value =
1520 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1521 }
1522 }
1523 GenericSourceConnection::SqlServer(sql_server_source) => {
1524 let sql_server_connection = &sql_server_source.connection;
1526 let config = sql_server_connection
1527 .resolve_config(
1528 &storage_configuration.connection_context.secrets_reader,
1529 storage_configuration,
1530 InTask::No,
1531 )
1532 .await?;
1533 let mut client = mz_sql_server_util::Client::connect(config).await?;
1534
1535 let database = sql_server_connection.database.clone().into();
1537 let source_references = SourceReferenceClient::SqlServer {
1538 client: &mut client,
1539 database: Arc::clone(&database),
1540 }
1541 .get_source_references()
1542 .await?;
1543 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1544
1545 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1546 .get(storage_configuration.config_set());
1547
1548 let result = sql_server::purify_source_exports(
1549 &*database,
1550 &mut client,
1551 &source_references,
1552 &requested_references,
1553 &text_columns,
1554 &exclude_columns,
1555 &unresolved_source_name,
1556 timeout,
1557 &SourceReferencePolicy::Required,
1558 )
1559 .await;
1560 let sql_server::PurifiedSourceExports {
1561 source_exports,
1562 normalized_text_columns,
1563 normalized_excl_columns,
1564 } = result?;
1565
1566 requested_subsource_map.extend(source_exports);
1568
1569 if let Some(text_cols_option) = options
1571 .iter_mut()
1572 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1573 {
1574 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1575 }
1576 if let Some(exclude_cols_option) = options
1577 .iter_mut()
1578 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1579 {
1580 exclude_cols_option.value =
1581 Some(WithOptionValue::Sequence(normalized_excl_columns));
1582 }
1583 }
1584 _ => unreachable!(),
1585 };
1586
1587 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1588 source_name: resolved_source_name,
1589 options,
1590 subsources: requested_subsource_map,
1591 })
1592}
1593
1594async fn purify_alter_source_refresh_references(
1595 desc: SourceDesc,
1596 resolved_source_name: ResolvedItemName,
1597 storage_configuration: &StorageConfiguration,
1598) -> Result<PurifiedStatement, PlanError> {
1599 let retrieved_source_references = match desc.connection {
1600 GenericSourceConnection::Postgres(pg_source_connection) => {
1601 let pg_connection = &pg_source_connection.connection;
1603
1604 let config = pg_connection
1605 .config(
1606 &storage_configuration.connection_context.secrets_reader,
1607 storage_configuration,
1608 InTask::No,
1609 )
1610 .await?;
1611
1612 let client = config
1613 .connect(
1614 "postgres_purification",
1615 &storage_configuration.connection_context.ssh_tunnel_manager,
1616 )
1617 .await?;
1618 let reference_client = SourceReferenceClient::Postgres {
1619 client: &client,
1620 publication: &pg_source_connection.publication,
1621 database: &pg_connection.database,
1622 };
1623 reference_client.get_source_references().await?
1624 }
1625 GenericSourceConnection::MySql(mysql_source_connection) => {
1626 let mysql_connection = &mysql_source_connection.connection;
1627 let config = mysql_connection
1628 .config(
1629 &storage_configuration.connection_context.secrets_reader,
1630 storage_configuration,
1631 InTask::No,
1632 )
1633 .await?;
1634
1635 let mut conn = config
1636 .connect(
1637 "mysql purification",
1638 &storage_configuration.connection_context.ssh_tunnel_manager,
1639 )
1640 .await?;
1641
1642 let reference_client = SourceReferenceClient::MySql {
1643 conn: &mut conn,
1644 include_system_schemas: false,
1645 };
1646 reference_client.get_source_references().await?
1647 }
1648 GenericSourceConnection::SqlServer(sql_server_source) => {
1649 let sql_server_connection = &sql_server_source.connection;
1651 let config = sql_server_connection
1652 .resolve_config(
1653 &storage_configuration.connection_context.secrets_reader,
1654 storage_configuration,
1655 InTask::No,
1656 )
1657 .await?;
1658 let mut client = mz_sql_server_util::Client::connect(config).await?;
1659
1660 let source_references = SourceReferenceClient::SqlServer {
1662 client: &mut client,
1663 database: sql_server_connection.database.clone().into(),
1664 }
1665 .get_source_references()
1666 .await?;
1667 source_references
1668 }
1669 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1670 let reference_client = SourceReferenceClient::LoadGenerator {
1671 generator: &load_gen_connection.load_generator,
1672 };
1673 reference_client.get_source_references().await?
1674 }
1675 GenericSourceConnection::Kafka(kafka_conn) => {
1676 let reference_client = SourceReferenceClient::Kafka {
1677 topic: &kafka_conn.topic,
1678 };
1679 reference_client.get_source_references().await?
1680 }
1681 };
1682 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1683 source_name: resolved_source_name,
1684 available_source_references: retrieved_source_references.available_source_references(),
1685 })
1686}
1687
1688async fn purify_create_table_from_source(
1689 catalog: impl SessionCatalog,
1690 mut stmt: CreateTableFromSourceStatement<Aug>,
1691 storage_configuration: &StorageConfiguration,
1692) -> Result<PurifiedStatement, PlanError> {
1693 let scx = StatementContext::new(None, &catalog);
1694 let CreateTableFromSourceStatement {
1695 name: _,
1696 columns,
1697 constraints,
1698 source: source_name,
1699 if_not_exists: _,
1700 external_reference,
1701 format,
1702 envelope,
1703 include_metadata: _,
1704 with_options,
1705 } = &mut stmt;
1706
1707 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1709 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1710 }
1711 if !constraints.is_empty() {
1712 sql_bail!(
1713 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1714 );
1715 }
1716
1717 let item = match scx.get_item_by_resolved_name(source_name) {
1719 Ok(item) => item,
1720 Err(e) => return Err(e),
1721 };
1722
1723 let desc = match item.source_desc()? {
1725 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1726 None => {
1727 sql_bail!("cannot ALTER this type of source")
1728 }
1729 };
1730 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1731
1732 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1733 text_columns,
1734 exclude_columns,
1735 retain_history: _,
1736 details,
1737 partition_by: _,
1738 seen: _,
1739 } = with_options.clone().try_into()?;
1740 assert_none!(details, "details cannot be explicitly set");
1741
1742 let qualified_text_columns = text_columns
1746 .iter()
1747 .map(|col| {
1748 UnresolvedItemName(
1749 external_reference
1750 .as_ref()
1751 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1752 .unwrap_or_else(|| vec![col.clone()]),
1753 )
1754 })
1755 .collect_vec();
1756 let qualified_exclude_columns = exclude_columns
1757 .iter()
1758 .map(|col| {
1759 UnresolvedItemName(
1760 external_reference
1761 .as_ref()
1762 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1763 .unwrap_or_else(|| vec![col.clone()]),
1764 )
1765 })
1766 .collect_vec();
1767
1768 let mut format_options = SourceFormatOptions::Default;
1770
1771 let retrieved_source_references: RetrievedSourceReferences;
1772
1773 let requested_references = external_reference.as_ref().map(|ref_name| {
1774 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1775 reference: ref_name.clone(),
1776 alias: None,
1777 }])
1778 });
1779
1780 let purified_export = match desc.connection {
1783 GenericSourceConnection::Postgres(pg_source_connection) => {
1784 let pg_connection = &pg_source_connection.connection;
1786
1787 let client = pg_connection
1788 .validate(pg_source_connection.connection_id, storage_configuration)
1789 .await?;
1790
1791 let reference_client = SourceReferenceClient::Postgres {
1792 client: &client,
1793 publication: &pg_source_connection.publication,
1794 database: &pg_connection.database,
1795 };
1796 retrieved_source_references = reference_client.get_source_references().await?;
1797
1798 let postgres::PurifiedSourceExports {
1799 source_exports,
1800 normalized_text_columns: _,
1804 } = postgres::purify_source_exports(
1805 &client,
1806 &retrieved_source_references,
1807 &requested_references,
1808 qualified_text_columns,
1809 qualified_exclude_columns,
1810 &unresolved_source_name,
1811 &SourceReferencePolicy::Required,
1812 )
1813 .await?;
1814 let (_, purified_export) = source_exports.into_element();
1816 purified_export
1817 }
1818 GenericSourceConnection::MySql(mysql_source_connection) => {
1819 let mysql_connection = &mysql_source_connection.connection;
1820 let config = mysql_connection
1821 .config(
1822 &storage_configuration.connection_context.secrets_reader,
1823 storage_configuration,
1824 InTask::No,
1825 )
1826 .await?;
1827
1828 let mut conn = config
1829 .connect(
1830 "mysql purification",
1831 &storage_configuration.connection_context.ssh_tunnel_manager,
1832 )
1833 .await?;
1834
1835 let initial_gtid_set =
1838 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1839
1840 let reference_client = SourceReferenceClient::MySql {
1841 conn: &mut conn,
1842 include_system_schemas: mysql::references_system_schemas(&requested_references),
1843 };
1844 retrieved_source_references = reference_client.get_source_references().await?;
1845
1846 let mysql::PurifiedSourceExports {
1847 source_exports,
1848 normalized_text_columns: _,
1852 normalized_exclude_columns: _,
1853 } = mysql::purify_source_exports(
1854 &mut conn,
1855 &retrieved_source_references,
1856 &requested_references,
1857 qualified_text_columns,
1858 qualified_exclude_columns,
1859 &unresolved_source_name,
1860 initial_gtid_set,
1861 &SourceReferencePolicy::Required,
1862 )
1863 .await?;
1864 let (_, purified_export) = source_exports.into_element();
1866 purified_export
1867 }
1868 GenericSourceConnection::SqlServer(sql_server_source) => {
1869 let connection = sql_server_source.connection;
1870 let config = connection
1871 .resolve_config(
1872 &storage_configuration.connection_context.secrets_reader,
1873 storage_configuration,
1874 InTask::No,
1875 )
1876 .await?;
1877 let mut client = mz_sql_server_util::Client::connect(config).await?;
1878
1879 let database: Arc<str> = connection.database.into();
1880 let reference_client = SourceReferenceClient::SqlServer {
1881 client: &mut client,
1882 database: Arc::clone(&database),
1883 };
1884 retrieved_source_references = reference_client.get_source_references().await?;
1885 tracing::debug!(?retrieved_source_references, "got source references");
1886
1887 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1888 .get(storage_configuration.config_set());
1889
1890 let purified_source_exports = sql_server::purify_source_exports(
1891 &*database,
1892 &mut client,
1893 &retrieved_source_references,
1894 &requested_references,
1895 &qualified_text_columns,
1896 &qualified_exclude_columns,
1897 &unresolved_source_name,
1898 timeout,
1899 &SourceReferencePolicy::Required,
1900 )
1901 .await?;
1902
1903 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1905 purified_export
1906 }
1907 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1908 let reference_client = SourceReferenceClient::LoadGenerator {
1909 generator: &load_gen_connection.load_generator,
1910 };
1911 retrieved_source_references = reference_client.get_source_references().await?;
1912
1913 let requested_exports = retrieved_source_references
1914 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1915 let export = requested_exports.into_element();
1917 PurifiedSourceExport {
1918 external_reference: export.external_reference,
1919 details: PurifiedExportDetails::LoadGenerator {
1920 table: export
1921 .meta
1922 .load_generator_desc()
1923 .expect("is loadgen")
1924 .clone(),
1925 output: export
1926 .meta
1927 .load_generator_output()
1928 .expect("is loadgen")
1929 .clone(),
1930 },
1931 }
1932 }
1933 GenericSourceConnection::Kafka(kafka_conn) => {
1934 let reference_client = SourceReferenceClient::Kafka {
1935 topic: &kafka_conn.topic,
1936 };
1937 retrieved_source_references = reference_client.get_source_references().await?;
1938 let requested_exports = retrieved_source_references
1939 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1940 let export = requested_exports.into_element();
1942
1943 format_options = SourceFormatOptions::Kafka {
1944 topic: kafka_conn.topic.clone(),
1945 };
1946 PurifiedSourceExport {
1947 external_reference: export.external_reference,
1948 details: PurifiedExportDetails::Kafka {},
1949 }
1950 }
1951 };
1952
1953 purify_source_format(
1954 &catalog,
1955 format,
1956 &format_options,
1957 envelope,
1958 storage_configuration,
1959 )
1960 .await?;
1961
1962 *external_reference = Some(purified_export.external_reference.clone());
1965
1966 match &purified_export.details {
1968 PurifiedExportDetails::Postgres { .. } => {
1969 let mut unsupported_cols = vec![];
1970 let postgres::PostgresExportStatementValues {
1971 columns: gen_columns,
1972 constraints: gen_constraints,
1973 text_columns: gen_text_columns,
1974 exclude_columns: gen_exclude_columns,
1975 details: gen_details,
1976 external_reference: _,
1977 } = postgres::generate_source_export_statement_values(
1978 &scx,
1979 purified_export,
1980 &mut unsupported_cols,
1981 )?;
1982 if !unsupported_cols.is_empty() {
1983 unsupported_cols.sort();
1984 Err(PgSourcePurificationError::UnrecognizedTypes {
1985 cols: unsupported_cols,
1986 })?;
1987 }
1988
1989 if let Some(text_cols_option) = with_options
1990 .iter_mut()
1991 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1992 {
1993 match gen_text_columns {
1994 Some(gen_text_columns) => {
1995 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1996 }
1997 None => soft_panic_or_log!(
1998 "text_columns should be Some if text_cols_option is present"
1999 ),
2000 }
2001 }
2002 if let Some(exclude_cols_option) = with_options
2003 .iter_mut()
2004 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2005 {
2006 match gen_exclude_columns {
2007 Some(gen_exclude_columns) => {
2008 exclude_cols_option.value =
2009 Some(WithOptionValue::Sequence(gen_exclude_columns))
2010 }
2011 None => soft_panic_or_log!(
2012 "exclude_columns should be Some if exclude_cols_option is present"
2013 ),
2014 }
2015 }
2016 match columns {
2017 TableFromSourceColumns::Defined(_) => unreachable!(),
2018 TableFromSourceColumns::NotSpecified => {
2019 *columns = TableFromSourceColumns::Defined(gen_columns);
2020 *constraints = gen_constraints;
2021 }
2022 TableFromSourceColumns::Named(_) => {
2023 sql_bail!("columns cannot be named for Postgres sources")
2024 }
2025 }
2026 with_options.push(TableFromSourceOption {
2027 name: TableFromSourceOptionName::Details,
2028 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2029 gen_details.into_proto().encode_to_vec(),
2030 )))),
2031 })
2032 }
2033 PurifiedExportDetails::MySql { .. } => {
2034 let mysql::MySqlExportStatementValues {
2035 columns: gen_columns,
2036 constraints: gen_constraints,
2037 text_columns: gen_text_columns,
2038 exclude_columns: gen_exclude_columns,
2039 details: gen_details,
2040 external_reference: _,
2041 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2042
2043 if let Some(text_cols_option) = with_options
2044 .iter_mut()
2045 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2046 {
2047 match gen_text_columns {
2048 Some(gen_text_columns) => {
2049 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2050 }
2051 None => soft_panic_or_log!(
2052 "text_columns should be Some if text_cols_option is present"
2053 ),
2054 }
2055 }
2056 if let Some(exclude_cols_option) = with_options
2057 .iter_mut()
2058 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2059 {
2060 match gen_exclude_columns {
2061 Some(gen_exclude_columns) => {
2062 exclude_cols_option.value =
2063 Some(WithOptionValue::Sequence(gen_exclude_columns))
2064 }
2065 None => soft_panic_or_log!(
2066 "exclude_columns should be Some if exclude_cols_option is present"
2067 ),
2068 }
2069 }
2070 match columns {
2071 TableFromSourceColumns::Defined(_) => unreachable!(),
2072 TableFromSourceColumns::NotSpecified => {
2073 *columns = TableFromSourceColumns::Defined(gen_columns);
2074 *constraints = gen_constraints;
2075 }
2076 TableFromSourceColumns::Named(_) => {
2077 sql_bail!("columns cannot be named for MySQL sources")
2078 }
2079 }
2080 with_options.push(TableFromSourceOption {
2081 name: TableFromSourceOptionName::Details,
2082 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2083 gen_details.into_proto().encode_to_vec(),
2084 )))),
2085 })
2086 }
2087 PurifiedExportDetails::SqlServer { .. } => {
2088 let sql_server::SqlServerExportStatementValues {
2089 columns: gen_columns,
2090 constraints: gen_constraints,
2091 text_columns: gen_text_columns,
2092 excl_columns: gen_excl_columns,
2093 details: gen_details,
2094 external_reference: _,
2095 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2096
2097 if let Some(text_cols_option) = with_options
2098 .iter_mut()
2099 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2100 {
2101 match gen_text_columns {
2102 Some(gen_text_columns) => {
2103 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2104 }
2105 None => soft_panic_or_log!(
2106 "text_columns should be Some if text_cols_option is present"
2107 ),
2108 }
2109 }
2110 if let Some(exclude_cols_option) = with_options
2111 .iter_mut()
2112 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2113 {
2114 match gen_excl_columns {
2115 Some(gen_excl_columns) => {
2116 exclude_cols_option.value =
2117 Some(WithOptionValue::Sequence(gen_excl_columns))
2118 }
2119 None => soft_panic_or_log!(
2120 "excl_columns should be Some if excl_cols_option is present"
2121 ),
2122 }
2123 }
2124
2125 match columns {
2126 TableFromSourceColumns::NotSpecified => {
2127 *columns = TableFromSourceColumns::Defined(gen_columns);
2128 *constraints = gen_constraints;
2129 }
2130 TableFromSourceColumns::Named(_) => {
2131 sql_bail!("columns cannot be named for SQL Server sources")
2132 }
2133 TableFromSourceColumns::Defined(_) => unreachable!(),
2134 }
2135
2136 with_options.push(TableFromSourceOption {
2137 name: TableFromSourceOptionName::Details,
2138 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2139 gen_details.into_proto().encode_to_vec(),
2140 )))),
2141 })
2142 }
2143 PurifiedExportDetails::LoadGenerator { .. } => {
2144 let (desc, output) = match purified_export.details {
2145 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2146 _ => unreachable!("purified export details must be load generator"),
2147 };
2148 if let Some(desc) = desc {
2153 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2154 match columns {
2155 TableFromSourceColumns::Defined(_) => unreachable!(),
2156 TableFromSourceColumns::NotSpecified => {
2157 *columns = TableFromSourceColumns::Defined(gen_columns);
2158 *constraints = gen_constraints;
2159 }
2160 TableFromSourceColumns::Named(_) => {
2161 sql_bail!("columns cannot be named for multi-output load generator sources")
2162 }
2163 }
2164 }
2165 let details = SourceExportStatementDetails::LoadGenerator { output };
2166 with_options.push(TableFromSourceOption {
2167 name: TableFromSourceOptionName::Details,
2168 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2169 details.into_proto().encode_to_vec(),
2170 )))),
2171 })
2172 }
2173 PurifiedExportDetails::Kafka {} => {
2174 let details = SourceExportStatementDetails::Kafka {};
2178 with_options.push(TableFromSourceOption {
2179 name: TableFromSourceOptionName::Details,
2180 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2181 details.into_proto().encode_to_vec(),
2182 )))),
2183 })
2184 }
2185 };
2186
2187 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2191}
2192
2193enum SourceFormatOptions {
2194 Default,
2195 Kafka { topic: String },
2196}
2197
2198async fn purify_source_format(
2199 catalog: &dyn SessionCatalog,
2200 format: &mut Option<FormatSpecifier<Aug>>,
2201 options: &SourceFormatOptions,
2202 envelope: &Option<SourceEnvelope>,
2203 storage_configuration: &StorageConfiguration,
2204) -> Result<(), PlanError> {
2205 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2206 && !matches!(options, SourceFormatOptions::Kafka { .. })
2207 {
2208 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2209 }
2210
2211 match format.as_mut() {
2212 None => {}
2213 Some(FormatSpecifier::Bare(format)) => {
2214 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2215 .await?;
2216 }
2217
2218 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2219 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2220 .await?;
2221 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2222 .await?;
2223 }
2224 }
2225 Ok(())
2226}
2227
2228async fn purify_source_format_single(
2229 catalog: &dyn SessionCatalog,
2230 format: &mut Format<Aug>,
2231 options: &SourceFormatOptions,
2232 envelope: &Option<SourceEnvelope>,
2233 storage_configuration: &StorageConfiguration,
2234) -> Result<(), PlanError> {
2235 match format {
2236 Format::Avro(schema) => match schema {
2237 AvroSchema::Csr { csr_connection } => {
2238 purify_csr_connection_avro(
2239 catalog,
2240 options,
2241 csr_connection,
2242 envelope,
2243 storage_configuration,
2244 )
2245 .await?
2246 }
2247 AvroSchema::InlineSchema { .. } => {}
2248 },
2249 Format::Protobuf(schema) => match schema {
2250 ProtobufSchema::Csr { csr_connection } => {
2251 purify_csr_connection_proto(
2252 catalog,
2253 options,
2254 csr_connection,
2255 envelope,
2256 storage_configuration,
2257 )
2258 .await?;
2259 }
2260 ProtobufSchema::InlineSchema { .. } => {}
2261 },
2262 Format::Bytes
2263 | Format::Regex(_)
2264 | Format::Json { .. }
2265 | Format::Text
2266 | Format::Csv { .. } => (),
2267 }
2268 Ok(())
2269}
2270
2271pub fn generate_subsource_statements(
2272 scx: &StatementContext,
2273 source_name: ResolvedItemName,
2274 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2275) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2276 if subsources.is_empty() {
2278 return Ok(vec![]);
2279 }
2280 let (_, purified_export) = subsources.iter().next().unwrap();
2281
2282 let statements = match &purified_export.details {
2283 PurifiedExportDetails::Postgres { .. } => {
2284 crate::pure::postgres::generate_create_subsource_statements(
2285 scx,
2286 source_name,
2287 subsources,
2288 )?
2289 }
2290 PurifiedExportDetails::MySql { .. } => {
2291 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2292 }
2293 PurifiedExportDetails::SqlServer { .. } => {
2294 crate::pure::sql_server::generate_create_subsource_statements(
2295 scx,
2296 source_name,
2297 subsources,
2298 )?
2299 }
2300 PurifiedExportDetails::LoadGenerator { .. } => {
2301 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2302 for (subsource_name, purified_export) in subsources {
2303 let (desc, output) = match purified_export.details {
2304 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2305 _ => unreachable!("purified export details must be load generator"),
2306 };
2307 let desc =
2308 desc.expect("subsources cannot be generated for single-output load generators");
2309
2310 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2311 let details = SourceExportStatementDetails::LoadGenerator { output };
2312 let subsource = CreateSubsourceStatement {
2314 name: subsource_name,
2315 columns,
2316 of_source: Some(source_name.clone()),
2317 constraints: table_constraints,
2322 if_not_exists: false,
2323 with_options: vec![
2324 CreateSubsourceOption {
2325 name: CreateSubsourceOptionName::ExternalReference,
2326 value: Some(WithOptionValue::UnresolvedItemName(
2327 purified_export.external_reference,
2328 )),
2329 },
2330 CreateSubsourceOption {
2331 name: CreateSubsourceOptionName::Details,
2332 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2333 details.into_proto().encode_to_vec(),
2334 )))),
2335 },
2336 ],
2337 };
2338 subsource_stmts.push(subsource);
2339 }
2340
2341 subsource_stmts
2342 }
2343 PurifiedExportDetails::Kafka { .. } => {
2344 assert!(
2348 subsources.is_empty(),
2349 "Kafka sources do not produce data-bearing subsources"
2350 );
2351 vec![]
2352 }
2353 };
2354 Ok(statements)
2355}
2356
2357async fn purify_csr_connection_proto(
2358 catalog: &dyn SessionCatalog,
2359 options: &SourceFormatOptions,
2360 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2361 envelope: &Option<SourceEnvelope>,
2362 storage_configuration: &StorageConfiguration,
2363) -> Result<(), PlanError> {
2364 let SourceFormatOptions::Kafka { topic } = options else {
2365 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2366 };
2367
2368 let CsrConnectionProtobuf {
2369 seed,
2370 connection: CsrConnection {
2371 connection,
2372 options: _,
2373 },
2374 } = csr_connection;
2375 match seed {
2376 None => {
2377 let scx = StatementContext::new(None, &*catalog);
2378
2379 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2380 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2381 _ => sql_bail!("{} is not a schema registry connection", connection),
2382 };
2383
2384 let ccsr_client = ccsr_connection
2385 .connect(storage_configuration, InTask::No)
2386 .await
2387 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2388
2389 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2390 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2391 .await
2392 .ok();
2393
2394 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2395 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2396 }
2397
2398 *seed = Some(CsrSeedProtobuf { value, key });
2399 }
2400 Some(_) => (),
2401 }
2402
2403 Ok(())
2404}
2405
2406async fn purify_csr_connection_avro(
2407 catalog: &dyn SessionCatalog,
2408 options: &SourceFormatOptions,
2409 csr_connection: &mut CsrConnectionAvro<Aug>,
2410 envelope: &Option<SourceEnvelope>,
2411 storage_configuration: &StorageConfiguration,
2412) -> Result<(), PlanError> {
2413 let SourceFormatOptions::Kafka { topic } = options else {
2414 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2415 };
2416
2417 let CsrConnectionAvro {
2418 connection: CsrConnection { connection, .. },
2419 seed,
2420 key_strategy,
2421 value_strategy,
2422 } = csr_connection;
2423 if seed.is_none() {
2424 let scx = StatementContext::new(None, &*catalog);
2425 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2426 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2427 _ => sql_bail!("{} is not a schema registry connection", connection),
2428 };
2429 let ccsr_client = csr_connection
2430 .connect(storage_configuration, InTask::No)
2431 .await
2432 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2433
2434 let Schema {
2435 key_schema,
2436 value_schema,
2437 } = get_remote_csr_schema(
2438 &ccsr_client,
2439 key_strategy.clone().unwrap_or_default(),
2440 value_strategy.clone().unwrap_or_default(),
2441 topic,
2442 )
2443 .await?;
2444 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2445 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2446 }
2447
2448 *seed = Some(CsrSeedAvro {
2449 key_schema,
2450 value_schema,
2451 })
2452 }
2453
2454 Ok(())
2455}
2456
2457#[derive(Debug)]
2458pub struct Schema {
2459 pub key_schema: Option<String>,
2460 pub value_schema: String,
2461}
2462
2463async fn get_schema_with_strategy(
2464 client: &Client,
2465 strategy: ReaderSchemaSelectionStrategy,
2466 subject: &str,
2467) -> Result<Option<String>, PlanError> {
2468 match strategy {
2469 ReaderSchemaSelectionStrategy::Latest => {
2470 match client.get_schema_by_subject(subject).await {
2471 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2472 Err(GetBySubjectError::SubjectNotFound)
2473 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2474 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2475 schema_lookup: format!("subject {}", subject.quoted()),
2476 cause: Arc::new(e),
2477 }),
2478 }
2479 }
2480 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2481 ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2482 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2483 Err(GetByIdError::SchemaNotFound) => Ok(None),
2484 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2485 schema_lookup: format!("ID {}", id),
2486 cause: Arc::new(e),
2487 }),
2488 },
2489 }
2490}
2491
2492async fn get_remote_csr_schema(
2493 ccsr_client: &mz_ccsr::Client,
2494 key_strategy: ReaderSchemaSelectionStrategy,
2495 value_strategy: ReaderSchemaSelectionStrategy,
2496 topic: &str,
2497) -> Result<Schema, PlanError> {
2498 let value_schema_name = format!("{}-value", topic);
2499 let value_schema =
2500 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2501 let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2502 let subject = format!("{}-key", topic);
2503 let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2504 Ok(Schema {
2505 key_schema,
2506 value_schema,
2507 })
2508}
2509
2510async fn compile_proto(
2512 subject_name: &String,
2513 ccsr_client: &Client,
2514) -> Result<CsrSeedProtobufSchema, PlanError> {
2515 let (primary_subject, dependency_subjects) = ccsr_client
2516 .get_subject_and_references(subject_name)
2517 .await
2518 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2519 schema_lookup: format!("subject {}", subject_name.quoted()),
2520 cause: Arc::new(e),
2521 })?;
2522
2523 let mut source_tree = VirtualSourceTree::new();
2525 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2526 source_tree.as_mut().add_file(
2527 Path::new(&subject.name),
2528 subject.schema.raw.as_bytes().to_vec(),
2529 );
2530 }
2531 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2532 let fds = db
2533 .as_mut()
2534 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2535 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2536
2537 let primary_fd = fds.file(0);
2539 let message_name = match primary_fd.message_type_size() {
2540 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2541 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2542 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2543 };
2544
2545 let bytes = &fds
2547 .serialize()
2548 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2549 let mut schema = String::new();
2550 strconv::format_bytes(&mut schema, bytes);
2551
2552 Ok(CsrSeedProtobufSchema {
2553 schema,
2554 message_name,
2555 })
2556}
2557
2558const MZ_NOW_NAME: &str = "mz_now";
2559const MZ_NOW_SCHEMA: &str = "mz_catalog";
2560
2561pub fn purify_create_materialized_view_options(
2567 catalog: impl SessionCatalog,
2568 mz_now: Option<Timestamp>,
2569 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2570 resolved_ids: &mut ResolvedIds,
2571) {
2572 let (mz_now_id, mz_now_expr) = {
2575 let item = catalog
2576 .resolve_function(&PartialItemName {
2577 database: None,
2578 schema: Some(MZ_NOW_SCHEMA.to_string()),
2579 item: MZ_NOW_NAME.to_string(),
2580 })
2581 .expect("we should be able to resolve mz_now");
2582 (
2583 item.id(),
2584 Expr::Function(Function {
2585 name: ResolvedItemName::Item {
2586 id: item.id(),
2587 qualifiers: item.name().qualifiers.clone(),
2588 full_name: catalog.resolve_full_name(item.name()),
2589 print_id: false,
2590 version: RelationVersionSelector::Latest,
2591 },
2592 args: FunctionArgs::Args {
2593 args: Vec::new(),
2594 order_by: Vec::new(),
2595 },
2596 filter: None,
2597 over: None,
2598 distinct: false,
2599 }),
2600 )
2601 };
2602 let (mz_timestamp_id, mz_timestamp_type) = {
2604 let item = catalog.get_system_type("mz_timestamp");
2605 let full_name = catalog.resolve_full_name(item.name());
2606 (
2607 item.id(),
2608 ResolvedDataType::Named {
2609 id: item.id(),
2610 qualifiers: item.name().qualifiers.clone(),
2611 full_name,
2612 modifiers: vec![],
2613 print_id: true,
2614 },
2615 )
2616 };
2617
2618 let mut introduced_mz_timestamp = false;
2619
2620 for option in cmvs.with_options.iter_mut() {
2621 if matches!(
2623 option.value,
2624 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2625 ) {
2626 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2627 RefreshAtOptionValue {
2628 time: mz_now_expr.clone(),
2629 },
2630 )));
2631 }
2632
2633 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2635 RefreshEveryOptionValue { aligned_to, .. },
2636 ))) = &mut option.value
2637 {
2638 if aligned_to.is_none() {
2639 *aligned_to = Some(mz_now_expr.clone());
2640 }
2641 }
2642
2643 match &mut option.value {
2646 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2647 time,
2648 }))) => {
2649 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2650 visitor.visit_expr_mut(time);
2651 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2652 }
2653 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2654 RefreshEveryOptionValue {
2655 interval: _,
2656 aligned_to: Some(aligned_to),
2657 },
2658 ))) => {
2659 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2660 visitor.visit_expr_mut(aligned_to);
2661 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2662 }
2663 _ => {}
2664 }
2665 }
2666
2667 if !cmvs.with_options.iter().any(|o| {
2669 matches!(
2670 o,
2671 MaterializedViewOption {
2672 value: Some(WithOptionValue::Refresh(..)),
2673 ..
2674 }
2675 )
2676 }) {
2677 cmvs.with_options.push(MaterializedViewOption {
2678 name: MaterializedViewOptionName::Refresh,
2679 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2680 })
2681 }
2682
2683 if introduced_mz_timestamp {
2687 resolved_ids.add_item(mz_timestamp_id);
2688 }
2689 let mut visitor = ExprContainsTemporalVisitor::new();
2693 visitor.visit_create_materialized_view_statement(cmvs);
2694 if !visitor.contains_temporal {
2695 resolved_ids.remove_item(&mz_now_id);
2696 }
2697}
2698
2699pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2702 match &mvo.value {
2703 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2704 let mut visitor = ExprContainsTemporalVisitor::new();
2705 visitor.visit_expr(time);
2706 visitor.contains_temporal
2707 }
2708 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2709 interval: _,
2710 aligned_to: Some(aligned_to),
2711 }))) => {
2712 let mut visitor = ExprContainsTemporalVisitor::new();
2713 visitor.visit_expr(aligned_to);
2714 visitor.contains_temporal
2715 }
2716 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2717 interval: _,
2718 aligned_to: None,
2719 }))) => {
2720 true
2723 }
2724 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2725 true
2727 }
2728 _ => false,
2729 }
2730}
2731
2732struct ExprContainsTemporalVisitor {
2734 pub contains_temporal: bool,
2735}
2736
2737impl ExprContainsTemporalVisitor {
2738 pub fn new() -> ExprContainsTemporalVisitor {
2739 ExprContainsTemporalVisitor {
2740 contains_temporal: false,
2741 }
2742 }
2743}
2744
2745impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2746 fn visit_function(&mut self, func: &Function<Aug>) {
2747 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2748 visit_function(self, func);
2749 }
2750}
2751
2752struct MzNowPurifierVisitor {
2753 pub mz_now: Option<Timestamp>,
2754 pub mz_timestamp_type: ResolvedDataType,
2755 pub introduced_mz_timestamp: bool,
2756}
2757
2758impl MzNowPurifierVisitor {
2759 pub fn new(
2760 mz_now: Option<Timestamp>,
2761 mz_timestamp_type: ResolvedDataType,
2762 ) -> MzNowPurifierVisitor {
2763 MzNowPurifierVisitor {
2764 mz_now,
2765 mz_timestamp_type,
2766 introduced_mz_timestamp: false,
2767 }
2768 }
2769}
2770
2771impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2772 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2773 match expr {
2774 Expr::Function(Function {
2775 name:
2776 ResolvedItemName::Item {
2777 full_name: FullItemName { item, .. },
2778 ..
2779 },
2780 ..
2781 }) if item == &MZ_NOW_NAME.to_string() => {
2782 let mz_now = self.mz_now.expect(
2783 "we should have chosen a timestamp if the expression contains mz_now()",
2784 );
2785 *expr = Expr::Cast {
2788 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2789 data_type: self.mz_timestamp_type.clone(),
2790 };
2791 self.introduced_mz_timestamp = true;
2792 }
2793 _ => visit_expr_mut(self, expr),
2794 }
2795 }
2796}