1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61 GenericSourceConnection, SourceDesc, SourceExportStatementDetails, SqlServerSourceExtras,
62};
63use prost::Message;
64use protobuf_native::MessageLite;
65use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
66use rdkafka::admin::AdminClient;
67use references::{RetrievedSourceReferences, SourceReferenceClient};
68use uuid::Uuid;
69
70use crate::ast::{
71 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
72 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
73 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
74};
75use crate::catalog::{CatalogItemType, SessionCatalog};
76use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
77use crate::names::{
78 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
79 ResolvedItemName,
80};
81use crate::plan::error::PlanError;
82use crate::plan::statement::ddl::load_generator_ast_to_generator;
83use crate::plan::{SourceReferences, StatementContext};
84use crate::pure::error::{IcebergSinkPurificationError, SqlServerSourcePurificationError};
85use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
86use crate::{kafka_util, normalize};
87
88use self::error::{
89 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
90 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
91};
92
93pub(crate) mod error;
94mod references;
95
96pub mod mysql;
97pub mod postgres;
98pub mod sql_server;
99
100pub(crate) struct RequestedSourceExport<T> {
101 external_reference: UnresolvedItemName,
102 name: UnresolvedItemName,
103 meta: T,
104}
105
106impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.debug_struct("RequestedSourceExport")
109 .field("external_reference", &self.external_reference)
110 .field("name", &self.name)
111 .field("meta", &self.meta)
112 .finish()
113 }
114}
115
116impl<T> RequestedSourceExport<T> {
117 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
118 RequestedSourceExport {
119 external_reference: self.external_reference,
120 name: self.name,
121 meta: new_meta,
122 }
123 }
124}
125
126fn source_export_name_gen(
131 source_name: &UnresolvedItemName,
132 subsource_name: &str,
133) -> Result<UnresolvedItemName, PlanError> {
134 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
135 partial.item = subsource_name.to_string();
136 Ok(UnresolvedItemName::from(partial))
137}
138
139fn validate_source_export_names<T>(
143 requested_source_exports: &[RequestedSourceExport<T>],
144) -> Result<(), PlanError> {
145 if let Some(name) = requested_source_exports
149 .iter()
150 .map(|subsource| &subsource.name)
151 .duplicates()
152 .next()
153 .cloned()
154 {
155 let mut upstream_references: Vec<_> = requested_source_exports
156 .into_iter()
157 .filter_map(|subsource| {
158 if &subsource.name == &name {
159 Some(subsource.external_reference.clone())
160 } else {
161 None
162 }
163 })
164 .collect();
165
166 upstream_references.sort();
167
168 Err(PlanError::SubsourceNameConflict {
169 name,
170 upstream_references,
171 })?;
172 }
173
174 if let Some(name) = requested_source_exports
180 .iter()
181 .map(|export| &export.external_reference)
182 .duplicates()
183 .next()
184 .cloned()
185 {
186 let mut target_names: Vec<_> = requested_source_exports
187 .into_iter()
188 .filter_map(|export| {
189 if &export.external_reference == &name {
190 Some(export.name.clone())
191 } else {
192 None
193 }
194 })
195 .collect();
196
197 target_names.sort();
198
199 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
200 }
201
202 Ok(())
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub enum PurifiedStatement {
207 PurifiedCreateSource {
208 create_progress_subsource_stmt: Option<CreateSubsourceStatement<Aug>>,
210 create_source_stmt: CreateSourceStatement<Aug>,
211 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213 available_source_references: SourceReferences,
216 },
217 PurifiedAlterSource {
218 alter_source_stmt: AlterSourceStatement<Aug>,
219 },
220 PurifiedAlterSourceAddSubsources {
221 source_name: ResolvedItemName,
223 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228 },
229 PurifiedAlterSourceRefreshReferences {
230 source_name: ResolvedItemName,
231 available_source_references: SourceReferences,
233 },
234 PurifiedCreateSink(CreateSinkStatement<Aug>),
235 PurifiedCreateTableFromSource {
236 stmt: CreateTableFromSourceStatement<Aug>,
237 },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242 pub external_reference: UnresolvedItemName,
243 pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248 MySql {
249 table: MySqlTableDesc,
250 text_columns: Option<Vec<Ident>>,
251 exclude_columns: Option<Vec<Ident>>,
252 initial_gtid_set: String,
253 },
254 Postgres {
255 table: PostgresTableDesc,
256 text_columns: Option<Vec<Ident>>,
257 exclude_columns: Option<Vec<Ident>>,
258 },
259 SqlServer {
260 table: SqlServerTableDesc,
261 text_columns: Option<Vec<Ident>>,
262 excl_columns: Option<Vec<Ident>>,
263 capture_instance: Arc<str>,
264 initial_lsn: mz_sql_server_util::cdc::Lsn,
265 },
266 Kafka {},
267 LoadGenerator {
268 table: Option<RelationDesc>,
269 output: LoadGeneratorOutput,
270 },
271}
272
273pub async fn purify_statement(
283 catalog: impl SessionCatalog,
284 now: u64,
285 stmt: Statement<Aug>,
286 storage_configuration: &StorageConfiguration,
287) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
288 match stmt {
289 Statement::CreateSource(stmt) => {
290 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
291 (
292 purify_create_source(catalog, now, stmt, storage_configuration).await,
293 cluster_id,
294 )
295 }
296 Statement::AlterSource(stmt) => (
297 purify_alter_source(catalog, stmt, storage_configuration).await,
298 None,
299 ),
300 Statement::CreateSink(stmt) => {
301 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
302 (
303 purify_create_sink(catalog, stmt, storage_configuration).await,
304 cluster_id,
305 )
306 }
307 Statement::CreateTableFromSource(stmt) => (
308 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
309 None,
310 ),
311 o => unreachable!("{:?} does not need to be purified", o),
312 }
313}
314
315pub(crate) fn purify_create_sink_avro_doc_on_options(
320 catalog: &dyn SessionCatalog,
321 from_id: CatalogItemId,
322 format: &mut Option<FormatSpecifier<Aug>>,
323) -> Result<(), PlanError> {
324 let from = catalog.get_item(&from_id);
326 let object_ids = from
327 .references()
328 .items()
329 .copied()
330 .chain_one(from.id())
331 .collect::<Vec<_>>();
332
333 let mut avro_format_options = vec![];
336 for_each_format(format, |doc_on_schema, fmt| match fmt {
337 Format::Avro(AvroSchema::InlineSchema { .. })
338 | Format::Bytes
339 | Format::Csv { .. }
340 | Format::Json { .. }
341 | Format::Protobuf(..)
342 | Format::Regex(..)
343 | Format::Text => (),
344 Format::Avro(AvroSchema::Csr {
345 csr_connection: CsrConnectionAvro { connection, .. },
346 }) => {
347 avro_format_options.push((doc_on_schema, &mut connection.options));
348 }
349 });
350
351 for (for_schema, options) in avro_format_options {
354 let user_provided_comments = options
355 .iter()
356 .filter_map(|CsrConfigOption { name, .. }| match name {
357 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
358 _ => None,
359 })
360 .collect::<BTreeSet<_>>();
361
362 for object_id in &object_ids {
364 let item = catalog
366 .get_item(object_id)
367 .at_version(RelationVersionSelector::Latest);
368 let full_resolved_name = ResolvedItemName::Item {
369 id: *object_id,
370 qualifiers: item.name().qualifiers.clone(),
371 full_name: catalog.resolve_full_name(item.name()),
372 print_id: !matches!(
373 item.item_type(),
374 CatalogItemType::Func | CatalogItemType::Type
375 ),
376 version: RelationVersionSelector::Latest,
377 };
378
379 if let Some(comments_map) = catalog.get_item_comments(object_id) {
380 let doc_on_item_key = AvroDocOn {
383 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384 for_schema,
385 };
386 if !user_provided_comments.contains(&doc_on_item_key) {
387 if let Some(root_comment) = comments_map.get(&None) {
388 options.push(CsrConfigOption {
389 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391 root_comment.clone(),
392 ))),
393 });
394 }
395 }
396
397 let column_descs = match item.type_details() {
401 Some(details) => details.typ.desc(catalog).unwrap_or_default(),
402 None => item.relation_desc().map(|d| d.into_owned()),
403 };
404
405 if let Some(desc) = column_descs {
406 for (pos, column_name) in desc.iter_names().enumerate() {
407 if let Some(comment_str) = comments_map.get(&Some(pos + 1)) {
408 let doc_on_column_key = AvroDocOn {
409 identifier: DocOnIdentifier::Column(ColumnName {
410 relation: full_resolved_name.clone(),
411 column: ResolvedColumnReference::Column {
412 name: column_name.to_owned(),
413 index: pos,
414 },
415 }),
416 for_schema,
417 };
418 if !user_provided_comments.contains(&doc_on_column_key) {
419 options.push(CsrConfigOption {
420 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
421 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
422 Value::String(comment_str.clone()),
423 )),
424 });
425 }
426 }
427 }
428 }
429 }
430 }
431 }
432
433 Ok(())
434}
435
436async fn purify_create_sink(
439 catalog: impl SessionCatalog,
440 mut create_sink_stmt: CreateSinkStatement<Aug>,
441 storage_configuration: &StorageConfiguration,
442) -> Result<PurifiedStatement, PlanError> {
443 let CreateSinkStatement {
445 connection,
446 format,
447 with_options,
448 name: _,
449 in_cluster: _,
450 if_not_exists: _,
451 from,
452 envelope: _,
453 } = &mut create_sink_stmt;
454
455 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[
457 CreateSinkOptionName::Snapshot,
458 CreateSinkOptionName::CommitInterval,
459 ];
460
461 if let Some(op) = with_options
462 .iter()
463 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
464 {
465 sql_bail!(
466 "CREATE SINK...WITH ({}..) is not allowed",
467 op.name.to_ast_string_simple(),
468 )
469 }
470
471 match &connection {
472 CreateSinkConnection::Kafka {
473 connection,
474 options,
475 key: _,
476 headers: _,
477 } => {
478 let scx = StatementContext::new(None, &catalog);
483 let connection = {
484 let item = scx.get_item_by_resolved_name(connection)?;
485 match item.connection()? {
487 Connection::Kafka(connection) => {
488 connection.clone().into_inline_connection(scx.catalog)
489 }
490 _ => sql_bail!(
491 "{} is not a kafka connection",
492 scx.catalog.resolve_full_name(item.name())
493 ),
494 }
495 };
496
497 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
498
499 if extracted_options.legacy_ids == Some(true) {
500 sql_bail!("LEGACY IDs option is not supported");
501 }
502
503 let client: AdminClient<_> = connection
504 .create_with_context(
505 storage_configuration,
506 MzClientContext::default(),
507 &BTreeMap::new(),
508 InTask::No,
509 )
510 .await
511 .map_err(|e| {
512 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
514 })?;
515
516 let metadata = client
517 .inner()
518 .fetch_metadata(
519 None,
520 storage_configuration
521 .parameters
522 .kafka_timeout_config
523 .fetch_metadata_timeout,
524 )
525 .map_err(|e| {
526 KafkaSinkPurificationError::AdminClientError(Arc::new(
527 ContextCreationError::KafkaError(e),
528 ))
529 })?;
530
531 if metadata.brokers().len() == 0 {
532 Err(KafkaSinkPurificationError::ZeroBrokers)?;
533 }
534 }
535 CreateSinkConnection::Iceberg {
536 connection,
537 aws_connection,
538 ..
539 } => {
540 let scx = StatementContext::new(None, &catalog);
541 let connection = {
542 let item = scx.get_item_by_resolved_name(connection)?;
543 match item.connection()? {
545 Connection::IcebergCatalog(connection) => {
546 connection.clone().into_inline_connection(scx.catalog)
547 }
548 _ => sql_bail!(
549 "{} is not an iceberg connection",
550 scx.catalog.resolve_full_name(item.name())
551 ),
552 }
553 };
554
555 let aws_conn_id = aws_connection.item_id();
556
557 let aws_connection = {
558 let item = scx.get_item_by_resolved_name(aws_connection)?;
559 match item.connection()? {
561 Connection::Aws(aws_connection) => aws_connection.clone(),
562 _ => sql_bail!(
563 "{} is not an aws connection",
564 scx.catalog.resolve_full_name(item.name())
565 ),
566 }
567 };
568
569 let _catalog = connection
570 .connect(storage_configuration, InTask::No)
571 .await
572 .map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
573
574 let _sdk_config = aws_connection
575 .load_sdk_config(
576 &storage_configuration.connection_context,
577 aws_conn_id.clone(),
578 InTask::No,
579 )
580 .await
581 .map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
582 }
583 }
584
585 let mut csr_connection_ids = BTreeSet::new();
586 for_each_format(format, |_, fmt| match fmt {
587 Format::Avro(AvroSchema::InlineSchema { .. })
588 | Format::Bytes
589 | Format::Csv { .. }
590 | Format::Json { .. }
591 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
592 | Format::Regex(..)
593 | Format::Text => (),
594 Format::Avro(AvroSchema::Csr {
595 csr_connection: CsrConnectionAvro { connection, .. },
596 })
597 | Format::Protobuf(ProtobufSchema::Csr {
598 csr_connection: CsrConnectionProtobuf { connection, .. },
599 }) => {
600 csr_connection_ids.insert(*connection.connection.item_id());
601 }
602 });
603
604 let scx = StatementContext::new(None, &catalog);
605 for csr_connection_id in csr_connection_ids {
606 let connection = {
607 let item = scx.get_item(&csr_connection_id);
608 match item.connection()? {
610 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
611 _ => Err(CsrPurificationError::NotCsrConnection(
612 scx.catalog.resolve_full_name(item.name()),
613 ))?,
614 }
615 };
616
617 let client = connection
618 .connect(storage_configuration, InTask::No)
619 .await
620 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
621
622 client
623 .list_subjects()
624 .await
625 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
626 }
627
628 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
629
630 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
631}
632
633fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
640where
641 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
642{
643 match format {
644 None => (),
645 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
646 Some(FormatSpecifier::KeyValue { key, value }) => {
647 f(DocOnSchema::KeyOnly, key);
648 f(DocOnSchema::ValueOnly, value);
649 }
650 }
651}
652
653#[derive(Debug, Copy, Clone, PartialEq, Eq)]
656pub(crate) enum SourceReferencePolicy {
657 NotAllowed,
660 Optional,
663 Required,
666}
667
668async fn purify_create_source(
669 catalog: impl SessionCatalog,
670 now: u64,
671 mut create_source_stmt: CreateSourceStatement<Aug>,
672 storage_configuration: &StorageConfiguration,
673) -> Result<PurifiedStatement, PlanError> {
674 let CreateSourceStatement {
675 name: source_name,
676 col_names,
677 key_constraint,
678 connection: source_connection,
679 format,
680 envelope,
681 include_metadata,
682 external_references,
683 progress_subsource,
684 with_options,
685 ..
686 } = &mut create_source_stmt;
687
688 let uses_old_syntax = !col_names.is_empty()
689 || key_constraint.is_some()
690 || format.is_some()
691 || envelope.is_some()
692 || !include_metadata.is_empty()
693 || external_references.is_some()
694 || progress_subsource.is_some();
695
696 if let Some(DeferredItemName::Named(_)) = progress_subsource {
697 sql_bail!("Cannot manually ID qualify progress subsource")
698 }
699
700 let mut requested_subsource_map = BTreeMap::new();
701
702 let progress_desc = match &source_connection {
703 CreateSourceConnection::Kafka { .. } => {
704 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
705 }
706 CreateSourceConnection::Postgres { .. } => {
707 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
708 }
709 CreateSourceConnection::SqlServer { .. } => {
710 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
711 }
712 CreateSourceConnection::MySql { .. } => {
713 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
714 }
715 CreateSourceConnection::LoadGenerator { .. } => {
716 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
717 }
718 };
719 let scx = StatementContext::new(None, &catalog);
720
721 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
725 && scx.catalog.system_vars().force_source_table_syntax()
726 {
727 SourceReferencePolicy::NotAllowed
728 } else if scx.catalog.system_vars().enable_create_table_from_source() {
729 SourceReferencePolicy::Optional
730 } else {
731 SourceReferencePolicy::Required
732 };
733
734 let mut format_options = SourceFormatOptions::Default;
735
736 let retrieved_source_references: RetrievedSourceReferences;
737
738 match source_connection {
739 CreateSourceConnection::Kafka {
740 connection,
741 options: base_with_options,
742 ..
743 } => {
744 if let Some(external_references) = external_references {
745 Err(KafkaSourcePurificationError::ReferencedSubsources(
746 external_references.clone(),
747 ))?;
748 }
749
750 let connection = {
751 let item = scx.get_item_by_resolved_name(connection)?;
752 match item.connection()? {
754 Connection::Kafka(connection) => {
755 connection.clone().into_inline_connection(&catalog)
756 }
757 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
758 scx.catalog.resolve_full_name(item.name()),
759 ))?,
760 }
761 };
762
763 let extracted_options: KafkaSourceConfigOptionExtracted =
764 base_with_options.clone().try_into()?;
765
766 let topic = extracted_options
767 .topic
768 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
769
770 let consumer = connection
771 .create_with_context(
772 storage_configuration,
773 MzClientContext::default(),
774 &BTreeMap::new(),
775 InTask::No,
776 )
777 .await
778 .map_err(|e| {
779 KafkaSourcePurificationError::KafkaConsumerError(
781 e.display_with_causes().to_string(),
782 )
783 })?;
784 let consumer = Arc::new(consumer);
785
786 match (
787 extracted_options.start_offset,
788 extracted_options.start_timestamp,
789 ) {
790 (None, None) => {
791 kafka_util::ensure_topic_exists(
793 Arc::clone(&consumer),
794 &topic,
795 storage_configuration
796 .parameters
797 .kafka_timeout_config
798 .fetch_metadata_timeout,
799 )
800 .await?;
801 }
802 (Some(_), Some(_)) => {
803 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
804 }
805 (Some(start_offsets), None) => {
806 kafka_util::validate_start_offsets(
808 Arc::clone(&consumer),
809 &topic,
810 start_offsets,
811 storage_configuration
812 .parameters
813 .kafka_timeout_config
814 .fetch_metadata_timeout,
815 )
816 .await?;
817 }
818 (None, Some(time_offset)) => {
819 let start_offsets = kafka_util::lookup_start_offsets(
821 Arc::clone(&consumer),
822 &topic,
823 time_offset,
824 now,
825 storage_configuration
826 .parameters
827 .kafka_timeout_config
828 .fetch_metadata_timeout,
829 )
830 .await?;
831
832 base_with_options.retain(|val| {
833 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
834 });
835 base_with_options.push(KafkaSourceConfigOption {
836 name: KafkaSourceConfigOptionName::StartOffset,
837 value: Some(WithOptionValue::Sequence(
838 start_offsets
839 .iter()
840 .map(|offset| {
841 WithOptionValue::Value(Value::Number(offset.to_string()))
842 })
843 .collect(),
844 )),
845 });
846 }
847 }
848
849 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
850 retrieved_source_references = reference_client.get_source_references().await?;
851
852 format_options = SourceFormatOptions::Kafka { topic };
853 }
854 CreateSourceConnection::Postgres {
855 connection,
856 options,
857 } => {
858 let connection_item = scx.get_item_by_resolved_name(connection)?;
859 let connection = match connection_item.connection().map_err(PlanError::from)? {
860 Connection::Postgres(connection) => {
861 connection.clone().into_inline_connection(&catalog)
862 }
863 _ => Err(PgSourcePurificationError::NotPgConnection(
864 scx.catalog.resolve_full_name(connection_item.name()),
865 ))?,
866 };
867 let crate::plan::statement::PgConfigOptionExtracted {
868 publication,
869 text_columns,
870 exclude_columns,
871 details,
872 ..
873 } = options.clone().try_into()?;
874 let publication =
875 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
876
877 if details.is_some() {
878 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
879 }
880
881 let client = connection
882 .validate(connection_item.id(), storage_configuration)
883 .await?;
884
885 let reference_client = SourceReferenceClient::Postgres {
886 client: &client,
887 publication: &publication,
888 database: &connection.database,
889 };
890 retrieved_source_references = reference_client.get_source_references().await?;
891
892 let postgres::PurifiedSourceExports {
893 source_exports: subsources,
894 normalized_text_columns,
895 } = postgres::purify_source_exports(
896 &client,
897 &retrieved_source_references,
898 external_references,
899 text_columns,
900 exclude_columns,
901 source_name,
902 &reference_policy,
903 )
904 .await?;
905
906 if let Some(text_cols_option) = options
907 .iter_mut()
908 .find(|option| option.name == PgConfigOptionName::TextColumns)
909 {
910 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
911 }
912
913 requested_subsource_map.extend(subsources);
914
915 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
918
919 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
921 let details = PostgresSourcePublicationDetails {
922 slot: format!(
923 "materialize_{}",
924 Uuid::new_v4().to_string().replace('-', "")
925 ),
926 timeline_id: Some(timeline_id),
927 database: connection.database,
928 };
929 options.push(PgConfigOption {
930 name: PgConfigOptionName::Details,
931 value: Some(WithOptionValue::Value(Value::String(hex::encode(
932 details.into_proto().encode_to_vec(),
933 )))),
934 })
935 }
936 CreateSourceConnection::SqlServer {
937 connection,
938 options,
939 } => {
940 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
941
942 let connection_item = scx.get_item_by_resolved_name(connection)?;
945 let connection = match connection_item.connection()? {
946 Connection::SqlServer(connection) => {
947 connection.clone().into_inline_connection(&catalog)
948 }
949 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
950 scx.catalog.resolve_full_name(connection_item.name()),
951 ))?,
952 };
953 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
954 details,
955 text_columns,
956 exclude_columns,
957 seen: _,
958 } = options.clone().try_into()?;
959
960 if details.is_some() {
961 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
962 }
963
964 let mut client = connection
965 .validate(connection_item.id(), storage_configuration)
966 .await?;
967
968 let database: Arc<str> = connection.database.into();
969 let reference_client = SourceReferenceClient::SqlServer {
970 client: &mut client,
971 database: Arc::clone(&database),
972 };
973 retrieved_source_references = reference_client.get_source_references().await?;
974 tracing::debug!(?retrieved_source_references, "got source references");
975
976 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
977 .get(storage_configuration.config_set());
978
979 let purified_source_exports = sql_server::purify_source_exports(
980 &*database,
981 &mut client,
982 &retrieved_source_references,
983 external_references,
984 &text_columns,
985 &exclude_columns,
986 source_name,
987 timeout,
988 &reference_policy,
989 )
990 .await?;
991
992 let sql_server::PurifiedSourceExports {
993 source_exports,
994 normalized_text_columns,
995 normalized_excl_columns,
996 } = purified_source_exports;
997
998 requested_subsource_map.extend(source_exports);
1000
1001 let restore_history_id =
1005 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
1006 let details = SqlServerSourceExtras { restore_history_id };
1007
1008 options.retain(|SqlServerConfigOption { name, .. }| {
1009 name != &SqlServerConfigOptionName::Details
1010 });
1011 options.push(SqlServerConfigOption {
1012 name: SqlServerConfigOptionName::Details,
1013 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1014 details.into_proto().encode_to_vec(),
1015 )))),
1016 });
1017
1018 if let Some(text_cols_option) = options
1020 .iter_mut()
1021 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1022 {
1023 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1024 }
1025 if let Some(excl_cols_option) = options
1026 .iter_mut()
1027 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1028 {
1029 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1030 }
1031 }
1032 CreateSourceConnection::MySql {
1033 connection,
1034 options,
1035 } => {
1036 let connection_item = scx.get_item_by_resolved_name(connection)?;
1037 let connection = match connection_item.connection()? {
1038 Connection::MySql(connection) => {
1039 connection.clone().into_inline_connection(&catalog)
1040 }
1041 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1042 scx.catalog.resolve_full_name(connection_item.name()),
1043 ))?,
1044 };
1045 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1046 details,
1047 text_columns,
1048 exclude_columns,
1049 seen: _,
1050 } = options.clone().try_into()?;
1051
1052 if details.is_some() {
1053 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1054 }
1055
1056 let mut conn = connection
1057 .validate(connection_item.id(), storage_configuration)
1058 .await
1059 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
1060
1061 let initial_gtid_set =
1065 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1066
1067 let reference_client = SourceReferenceClient::MySql {
1068 conn: &mut conn,
1069 include_system_schemas: mysql::references_system_schemas(external_references),
1070 };
1071 retrieved_source_references = reference_client.get_source_references().await?;
1072
1073 let mysql::PurifiedSourceExports {
1074 source_exports: subsources,
1075 normalized_text_columns,
1076 normalized_exclude_columns,
1077 } = mysql::purify_source_exports(
1078 &mut conn,
1079 &retrieved_source_references,
1080 external_references,
1081 text_columns,
1082 exclude_columns,
1083 source_name,
1084 initial_gtid_set.clone(),
1085 &reference_policy,
1086 )
1087 .await?;
1088 requested_subsource_map.extend(subsources);
1089
1090 let details = MySqlSourceDetails {};
1093 options
1095 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1096 options.push(MySqlConfigOption {
1097 name: MySqlConfigOptionName::Details,
1098 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1099 details.into_proto().encode_to_vec(),
1100 )))),
1101 });
1102
1103 if let Some(text_cols_option) = options
1104 .iter_mut()
1105 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1106 {
1107 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1108 }
1109 if let Some(exclude_cols_option) = options
1110 .iter_mut()
1111 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1112 {
1113 exclude_cols_option.value =
1114 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1115 }
1116 }
1117 CreateSourceConnection::LoadGenerator { generator, options } => {
1118 let load_generator =
1119 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1120
1121 let reference_client = SourceReferenceClient::LoadGenerator {
1122 generator: &load_generator,
1123 };
1124 retrieved_source_references = reference_client.get_source_references().await?;
1125 let multi_output_sources =
1129 retrieved_source_references
1130 .all_references()
1131 .iter()
1132 .any(|r| {
1133 r.load_generator_output().expect("is loadgen")
1134 != &LoadGeneratorOutput::Default
1135 });
1136
1137 match external_references {
1138 Some(requested)
1139 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1140 {
1141 Err(PlanError::UseTablesForSources(requested.to_string()))?
1142 }
1143 Some(requested) if !multi_output_sources => match requested {
1144 ExternalReferences::SubsetTables(_) => {
1145 Err(LoadGeneratorSourcePurificationError::ForTables)?
1146 }
1147 ExternalReferences::SubsetSchemas(_) => {
1148 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1149 }
1150 ExternalReferences::All => {
1151 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1152 }
1153 },
1154 Some(requested) => {
1155 let requested_exports = retrieved_source_references
1156 .requested_source_exports(Some(requested), source_name)?;
1157 for export in requested_exports {
1158 requested_subsource_map.insert(
1159 export.name,
1160 PurifiedSourceExport {
1161 external_reference: export.external_reference,
1162 details: PurifiedExportDetails::LoadGenerator {
1163 table: export
1164 .meta
1165 .load_generator_desc()
1166 .expect("is loadgen")
1167 .clone(),
1168 output: export
1169 .meta
1170 .load_generator_output()
1171 .expect("is loadgen")
1172 .clone(),
1173 },
1174 },
1175 );
1176 }
1177 }
1178 None => {
1179 if multi_output_sources
1180 && matches!(reference_policy, SourceReferencePolicy::Required)
1181 {
1182 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1183 }
1184 }
1185 }
1186
1187 if let LoadGenerator::Clock = generator {
1188 if !options
1189 .iter()
1190 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1191 {
1192 let now = catalog.now();
1193 options.push(LoadGeneratorOption {
1194 name: LoadGeneratorOptionName::AsOf,
1195 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1196 });
1197 }
1198 }
1199 }
1200 }
1201
1202 *external_references = None;
1206
1207 let create_progress_subsource_stmt = if uses_old_syntax {
1209 let name = match progress_subsource {
1211 Some(name) => match name {
1212 DeferredItemName::Deferred(name) => name.clone(),
1213 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1214 },
1215 None => {
1216 let (item, prefix) = source_name.0.split_last().unwrap();
1217 let item_name =
1218 Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1219 let mut suggested_name = prefix.to_vec();
1220 suggested_name.push(candidate.clone());
1221
1222 let partial =
1223 normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1224 let qualified = scx.allocate_qualified_name(partial)?;
1225 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1226 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1227 Ok::<_, PlanError>(!item_exists && !type_exists)
1228 })?;
1229
1230 let mut full_name = prefix.to_vec();
1231 full_name.push(item_name);
1232 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1233 let qualified_name = scx.allocate_qualified_name(full_name)?;
1234 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1235
1236 UnresolvedItemName::from(full_name.clone())
1237 }
1238 };
1239
1240 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1241
1242 let mut progress_with_options: Vec<_> = with_options
1244 .iter()
1245 .filter_map(|opt| match opt.name {
1246 CreateSourceOptionName::TimestampInterval => None,
1247 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1248 name: CreateSubsourceOptionName::RetainHistory,
1249 value: opt.value.clone(),
1250 }),
1251 })
1252 .collect();
1253 progress_with_options.push(CreateSubsourceOption {
1254 name: CreateSubsourceOptionName::Progress,
1255 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1256 });
1257
1258 Some(CreateSubsourceStatement {
1259 name,
1260 columns,
1261 of_source: None,
1265 constraints,
1266 if_not_exists: false,
1267 with_options: progress_with_options,
1268 })
1269 } else {
1270 None
1271 };
1272
1273 purify_source_format(
1274 &catalog,
1275 format,
1276 &format_options,
1277 envelope,
1278 storage_configuration,
1279 )
1280 .await?;
1281
1282 Ok(PurifiedStatement::PurifiedCreateSource {
1283 create_progress_subsource_stmt,
1284 create_source_stmt,
1285 subsources: requested_subsource_map,
1286 available_source_references: retrieved_source_references.available_source_references(),
1287 })
1288}
1289
1290async fn purify_alter_source(
1293 catalog: impl SessionCatalog,
1294 stmt: AlterSourceStatement<Aug>,
1295 storage_configuration: &StorageConfiguration,
1296) -> Result<PurifiedStatement, PlanError> {
1297 let scx = StatementContext::new(None, &catalog);
1298 let AlterSourceStatement {
1299 source_name: unresolved_source_name,
1300 action,
1301 if_exists,
1302 } = stmt;
1303
1304 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1306 Ok(item) => item,
1307 Err(_) if if_exists => {
1308 return Ok(PurifiedStatement::PurifiedAlterSource {
1309 alter_source_stmt: AlterSourceStatement {
1310 source_name: unresolved_source_name,
1311 action,
1312 if_exists,
1313 },
1314 });
1315 }
1316 Err(e) => return Err(e),
1317 };
1318
1319 let desc = match item.source_desc()? {
1321 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1322 None => {
1323 sql_bail!("cannot ALTER this type of source")
1324 }
1325 };
1326
1327 let source_name = item.name();
1328
1329 let resolved_source_name = ResolvedItemName::Item {
1330 id: item.id(),
1331 qualifiers: item.name().qualifiers.clone(),
1332 full_name: scx.catalog.resolve_full_name(source_name),
1333 print_id: true,
1334 version: RelationVersionSelector::Latest,
1335 };
1336
1337 let partial_name = scx.catalog.minimal_qualification(source_name);
1338
1339 match action {
1340 AlterSourceAction::AddSubsources {
1341 external_references,
1342 options,
1343 } => {
1344 if scx.catalog.system_vars().enable_create_table_from_source()
1345 && scx.catalog.system_vars().force_source_table_syntax()
1346 {
1347 Err(PlanError::UseTablesForSources(
1348 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1349 ))?;
1350 }
1351
1352 purify_alter_source_add_subsources(
1353 external_references,
1354 options,
1355 desc,
1356 partial_name,
1357 unresolved_source_name,
1358 resolved_source_name,
1359 storage_configuration,
1360 )
1361 .await
1362 }
1363 AlterSourceAction::RefreshReferences => {
1364 purify_alter_source_refresh_references(
1365 desc,
1366 resolved_source_name,
1367 storage_configuration,
1368 )
1369 .await
1370 }
1371 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1372 alter_source_stmt: AlterSourceStatement {
1373 source_name: unresolved_source_name,
1374 action,
1375 if_exists,
1376 },
1377 }),
1378 }
1379}
1380
1381async fn purify_alter_source_add_subsources(
1384 external_references: Vec<ExternalReferenceExport>,
1385 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1386 desc: SourceDesc,
1387 partial_source_name: PartialItemName,
1388 unresolved_source_name: UnresolvedItemName,
1389 resolved_source_name: ResolvedItemName,
1390 storage_configuration: &StorageConfiguration,
1391) -> Result<PurifiedStatement, PlanError> {
1392 let connection_id = match &desc.connection {
1394 GenericSourceConnection::Postgres(c) => c.connection_id,
1395 GenericSourceConnection::MySql(c) => c.connection_id,
1396 GenericSourceConnection::SqlServer(c) => c.connection_id,
1397 _ => sql_bail!(
1398 "source {} does not support ALTER SOURCE.",
1399 partial_source_name
1400 ),
1401 };
1402
1403 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1404 text_columns,
1405 exclude_columns,
1406 details,
1407 seen: _,
1408 } = options.clone().try_into()?;
1409 assert_none!(details, "details cannot be explicitly set");
1410
1411 let mut requested_subsource_map = BTreeMap::new();
1412
1413 match desc.connection {
1414 GenericSourceConnection::Postgres(pg_source_connection) => {
1415 let pg_connection = &pg_source_connection.connection;
1417
1418 let client = pg_connection
1419 .validate(connection_id, storage_configuration)
1420 .await?;
1421
1422 let reference_client = SourceReferenceClient::Postgres {
1423 client: &client,
1424 publication: &pg_source_connection.publication,
1425 database: &pg_connection.database,
1426 };
1427 let retrieved_source_references = reference_client.get_source_references().await?;
1428
1429 let postgres::PurifiedSourceExports {
1430 source_exports: subsources,
1431 normalized_text_columns,
1432 } = postgres::purify_source_exports(
1433 &client,
1434 &retrieved_source_references,
1435 &Some(ExternalReferences::SubsetTables(external_references)),
1436 text_columns,
1437 exclude_columns,
1438 &unresolved_source_name,
1439 &SourceReferencePolicy::Required,
1440 )
1441 .await?;
1442
1443 if let Some(text_cols_option) = options
1444 .iter_mut()
1445 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1446 {
1447 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1448 }
1449
1450 requested_subsource_map.extend(subsources);
1451 }
1452 GenericSourceConnection::MySql(mysql_source_connection) => {
1453 let mysql_connection = &mysql_source_connection.connection;
1454 let config = mysql_connection
1455 .config(
1456 &storage_configuration.connection_context.secrets_reader,
1457 storage_configuration,
1458 InTask::No,
1459 )
1460 .await?;
1461
1462 let mut conn = config
1463 .connect(
1464 "mysql purification",
1465 &storage_configuration.connection_context.ssh_tunnel_manager,
1466 )
1467 .await?;
1468
1469 let initial_gtid_set =
1472 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1473
1474 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1475
1476 let reference_client = SourceReferenceClient::MySql {
1477 conn: &mut conn,
1478 include_system_schemas: mysql::references_system_schemas(&requested_references),
1479 };
1480 let retrieved_source_references = reference_client.get_source_references().await?;
1481
1482 let mysql::PurifiedSourceExports {
1483 source_exports: subsources,
1484 normalized_text_columns,
1485 normalized_exclude_columns,
1486 } = mysql::purify_source_exports(
1487 &mut conn,
1488 &retrieved_source_references,
1489 &requested_references,
1490 text_columns,
1491 exclude_columns,
1492 &unresolved_source_name,
1493 initial_gtid_set,
1494 &SourceReferencePolicy::Required,
1495 )
1496 .await?;
1497 requested_subsource_map.extend(subsources);
1498
1499 if let Some(text_cols_option) = options
1501 .iter_mut()
1502 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1503 {
1504 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1505 }
1506 if let Some(exclude_cols_option) = options
1507 .iter_mut()
1508 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1509 {
1510 exclude_cols_option.value =
1511 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1512 }
1513 }
1514 GenericSourceConnection::SqlServer(sql_server_source) => {
1515 let sql_server_connection = &sql_server_source.connection;
1517 let config = sql_server_connection
1518 .resolve_config(
1519 &storage_configuration.connection_context.secrets_reader,
1520 storage_configuration,
1521 InTask::No,
1522 )
1523 .await?;
1524 let mut client = mz_sql_server_util::Client::connect(config).await?;
1525
1526 let database = sql_server_connection.database.clone().into();
1528 let source_references = SourceReferenceClient::SqlServer {
1529 client: &mut client,
1530 database: Arc::clone(&database),
1531 }
1532 .get_source_references()
1533 .await?;
1534 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1535
1536 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1537 .get(storage_configuration.config_set());
1538
1539 let result = sql_server::purify_source_exports(
1540 &*database,
1541 &mut client,
1542 &source_references,
1543 &requested_references,
1544 &text_columns,
1545 &exclude_columns,
1546 &unresolved_source_name,
1547 timeout,
1548 &SourceReferencePolicy::Required,
1549 )
1550 .await;
1551 let sql_server::PurifiedSourceExports {
1552 source_exports,
1553 normalized_text_columns,
1554 normalized_excl_columns,
1555 } = result?;
1556
1557 requested_subsource_map.extend(source_exports);
1559
1560 if let Some(text_cols_option) = options
1562 .iter_mut()
1563 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1564 {
1565 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1566 }
1567 if let Some(exclude_cols_option) = options
1568 .iter_mut()
1569 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1570 {
1571 exclude_cols_option.value =
1572 Some(WithOptionValue::Sequence(normalized_excl_columns));
1573 }
1574 }
1575 _ => unreachable!(),
1576 };
1577
1578 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1579 source_name: resolved_source_name,
1580 options,
1581 subsources: requested_subsource_map,
1582 })
1583}
1584
1585async fn purify_alter_source_refresh_references(
1586 desc: SourceDesc,
1587 resolved_source_name: ResolvedItemName,
1588 storage_configuration: &StorageConfiguration,
1589) -> Result<PurifiedStatement, PlanError> {
1590 let retrieved_source_references = match desc.connection {
1591 GenericSourceConnection::Postgres(pg_source_connection) => {
1592 let pg_connection = &pg_source_connection.connection;
1594
1595 let config = pg_connection
1596 .config(
1597 &storage_configuration.connection_context.secrets_reader,
1598 storage_configuration,
1599 InTask::No,
1600 )
1601 .await?;
1602
1603 let client = config
1604 .connect(
1605 "postgres_purification",
1606 &storage_configuration.connection_context.ssh_tunnel_manager,
1607 )
1608 .await?;
1609 let reference_client = SourceReferenceClient::Postgres {
1610 client: &client,
1611 publication: &pg_source_connection.publication,
1612 database: &pg_connection.database,
1613 };
1614 reference_client.get_source_references().await?
1615 }
1616 GenericSourceConnection::MySql(mysql_source_connection) => {
1617 let mysql_connection = &mysql_source_connection.connection;
1618 let config = mysql_connection
1619 .config(
1620 &storage_configuration.connection_context.secrets_reader,
1621 storage_configuration,
1622 InTask::No,
1623 )
1624 .await?;
1625
1626 let mut conn = config
1627 .connect(
1628 "mysql purification",
1629 &storage_configuration.connection_context.ssh_tunnel_manager,
1630 )
1631 .await?;
1632
1633 let reference_client = SourceReferenceClient::MySql {
1634 conn: &mut conn,
1635 include_system_schemas: false,
1636 };
1637 reference_client.get_source_references().await?
1638 }
1639 GenericSourceConnection::SqlServer(sql_server_source) => {
1640 let sql_server_connection = &sql_server_source.connection;
1642 let config = sql_server_connection
1643 .resolve_config(
1644 &storage_configuration.connection_context.secrets_reader,
1645 storage_configuration,
1646 InTask::No,
1647 )
1648 .await?;
1649 let mut client = mz_sql_server_util::Client::connect(config).await?;
1650
1651 let source_references = SourceReferenceClient::SqlServer {
1653 client: &mut client,
1654 database: sql_server_connection.database.clone().into(),
1655 }
1656 .get_source_references()
1657 .await?;
1658 source_references
1659 }
1660 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1661 let reference_client = SourceReferenceClient::LoadGenerator {
1662 generator: &load_gen_connection.load_generator,
1663 };
1664 reference_client.get_source_references().await?
1665 }
1666 GenericSourceConnection::Kafka(kafka_conn) => {
1667 let reference_client = SourceReferenceClient::Kafka {
1668 topic: &kafka_conn.topic,
1669 };
1670 reference_client.get_source_references().await?
1671 }
1672 };
1673 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1674 source_name: resolved_source_name,
1675 available_source_references: retrieved_source_references.available_source_references(),
1676 })
1677}
1678
1679async fn purify_create_table_from_source(
1680 catalog: impl SessionCatalog,
1681 mut stmt: CreateTableFromSourceStatement<Aug>,
1682 storage_configuration: &StorageConfiguration,
1683) -> Result<PurifiedStatement, PlanError> {
1684 let scx = StatementContext::new(None, &catalog);
1685 let CreateTableFromSourceStatement {
1686 name: _,
1687 columns,
1688 constraints,
1689 source: source_name,
1690 if_not_exists: _,
1691 external_reference,
1692 format,
1693 envelope,
1694 include_metadata: _,
1695 with_options,
1696 } = &mut stmt;
1697
1698 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1700 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1701 }
1702 if !constraints.is_empty() {
1703 sql_bail!(
1704 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1705 );
1706 }
1707
1708 let item = match scx.get_item_by_resolved_name(source_name) {
1710 Ok(item) => item,
1711 Err(e) => return Err(e),
1712 };
1713
1714 let desc = match item.source_desc()? {
1716 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1717 None => {
1718 sql_bail!("cannot ALTER this type of source")
1719 }
1720 };
1721 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1722
1723 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1724 text_columns,
1725 exclude_columns,
1726 retain_history: _,
1727 details,
1728 partition_by: _,
1729 seen: _,
1730 } = with_options.clone().try_into()?;
1731 assert_none!(details, "details cannot be explicitly set");
1732
1733 let qualified_text_columns = text_columns
1737 .iter()
1738 .map(|col| {
1739 UnresolvedItemName(
1740 external_reference
1741 .as_ref()
1742 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1743 .unwrap_or_else(|| vec![col.clone()]),
1744 )
1745 })
1746 .collect_vec();
1747 let qualified_exclude_columns = exclude_columns
1748 .iter()
1749 .map(|col| {
1750 UnresolvedItemName(
1751 external_reference
1752 .as_ref()
1753 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1754 .unwrap_or_else(|| vec![col.clone()]),
1755 )
1756 })
1757 .collect_vec();
1758
1759 let mut format_options = SourceFormatOptions::Default;
1761
1762 let retrieved_source_references: RetrievedSourceReferences;
1763
1764 let requested_references = external_reference.as_ref().map(|ref_name| {
1765 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1766 reference: ref_name.clone(),
1767 alias: None,
1768 }])
1769 });
1770
1771 let purified_export = match desc.connection {
1774 GenericSourceConnection::Postgres(pg_source_connection) => {
1775 let pg_connection = &pg_source_connection.connection;
1777
1778 let client = pg_connection
1779 .validate(pg_source_connection.connection_id, storage_configuration)
1780 .await?;
1781
1782 let reference_client = SourceReferenceClient::Postgres {
1783 client: &client,
1784 publication: &pg_source_connection.publication,
1785 database: &pg_connection.database,
1786 };
1787 retrieved_source_references = reference_client.get_source_references().await?;
1788
1789 let postgres::PurifiedSourceExports {
1790 source_exports,
1791 normalized_text_columns: _,
1795 } = postgres::purify_source_exports(
1796 &client,
1797 &retrieved_source_references,
1798 &requested_references,
1799 qualified_text_columns,
1800 qualified_exclude_columns,
1801 &unresolved_source_name,
1802 &SourceReferencePolicy::Required,
1803 )
1804 .await?;
1805 let (_, purified_export) = source_exports.into_element();
1807 purified_export
1808 }
1809 GenericSourceConnection::MySql(mysql_source_connection) => {
1810 let mysql_connection = &mysql_source_connection.connection;
1811 let config = mysql_connection
1812 .config(
1813 &storage_configuration.connection_context.secrets_reader,
1814 storage_configuration,
1815 InTask::No,
1816 )
1817 .await?;
1818
1819 let mut conn = config
1820 .connect(
1821 "mysql purification",
1822 &storage_configuration.connection_context.ssh_tunnel_manager,
1823 )
1824 .await?;
1825
1826 let initial_gtid_set =
1829 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1830
1831 let reference_client = SourceReferenceClient::MySql {
1832 conn: &mut conn,
1833 include_system_schemas: mysql::references_system_schemas(&requested_references),
1834 };
1835 retrieved_source_references = reference_client.get_source_references().await?;
1836
1837 let mysql::PurifiedSourceExports {
1838 source_exports,
1839 normalized_text_columns: _,
1843 normalized_exclude_columns: _,
1844 } = mysql::purify_source_exports(
1845 &mut conn,
1846 &retrieved_source_references,
1847 &requested_references,
1848 qualified_text_columns,
1849 qualified_exclude_columns,
1850 &unresolved_source_name,
1851 initial_gtid_set,
1852 &SourceReferencePolicy::Required,
1853 )
1854 .await?;
1855 let (_, purified_export) = source_exports.into_element();
1857 purified_export
1858 }
1859 GenericSourceConnection::SqlServer(sql_server_source) => {
1860 let connection = sql_server_source.connection;
1861 let config = connection
1862 .resolve_config(
1863 &storage_configuration.connection_context.secrets_reader,
1864 storage_configuration,
1865 InTask::No,
1866 )
1867 .await?;
1868 let mut client = mz_sql_server_util::Client::connect(config).await?;
1869
1870 let database: Arc<str> = connection.database.into();
1871 let reference_client = SourceReferenceClient::SqlServer {
1872 client: &mut client,
1873 database: Arc::clone(&database),
1874 };
1875 retrieved_source_references = reference_client.get_source_references().await?;
1876 tracing::debug!(?retrieved_source_references, "got source references");
1877
1878 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1879 .get(storage_configuration.config_set());
1880
1881 let purified_source_exports = sql_server::purify_source_exports(
1882 &*database,
1883 &mut client,
1884 &retrieved_source_references,
1885 &requested_references,
1886 &qualified_text_columns,
1887 &qualified_exclude_columns,
1888 &unresolved_source_name,
1889 timeout,
1890 &SourceReferencePolicy::Required,
1891 )
1892 .await?;
1893
1894 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1896 purified_export
1897 }
1898 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1899 let reference_client = SourceReferenceClient::LoadGenerator {
1900 generator: &load_gen_connection.load_generator,
1901 };
1902 retrieved_source_references = reference_client.get_source_references().await?;
1903
1904 let requested_exports = retrieved_source_references
1905 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1906 let export = requested_exports.into_element();
1908 PurifiedSourceExport {
1909 external_reference: export.external_reference,
1910 details: PurifiedExportDetails::LoadGenerator {
1911 table: export
1912 .meta
1913 .load_generator_desc()
1914 .expect("is loadgen")
1915 .clone(),
1916 output: export
1917 .meta
1918 .load_generator_output()
1919 .expect("is loadgen")
1920 .clone(),
1921 },
1922 }
1923 }
1924 GenericSourceConnection::Kafka(kafka_conn) => {
1925 let reference_client = SourceReferenceClient::Kafka {
1926 topic: &kafka_conn.topic,
1927 };
1928 retrieved_source_references = reference_client.get_source_references().await?;
1929 let requested_exports = retrieved_source_references
1930 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1931 let export = requested_exports.into_element();
1933
1934 format_options = SourceFormatOptions::Kafka {
1935 topic: kafka_conn.topic.clone(),
1936 };
1937 PurifiedSourceExport {
1938 external_reference: export.external_reference,
1939 details: PurifiedExportDetails::Kafka {},
1940 }
1941 }
1942 };
1943
1944 purify_source_format(
1945 &catalog,
1946 format,
1947 &format_options,
1948 envelope,
1949 storage_configuration,
1950 )
1951 .await?;
1952
1953 *external_reference = Some(purified_export.external_reference.clone());
1956
1957 match &purified_export.details {
1959 PurifiedExportDetails::Postgres { .. } => {
1960 let mut unsupported_cols = vec![];
1961 let postgres::PostgresExportStatementValues {
1962 columns: gen_columns,
1963 constraints: gen_constraints,
1964 text_columns: gen_text_columns,
1965 exclude_columns: gen_exclude_columns,
1966 details: gen_details,
1967 external_reference: _,
1968 } = postgres::generate_source_export_statement_values(
1969 &scx,
1970 purified_export,
1971 &mut unsupported_cols,
1972 )?;
1973 if !unsupported_cols.is_empty() {
1974 unsupported_cols.sort();
1975 Err(PgSourcePurificationError::UnrecognizedTypes {
1976 cols: unsupported_cols,
1977 })?;
1978 }
1979
1980 if let Some(text_cols_option) = with_options
1981 .iter_mut()
1982 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1983 {
1984 match gen_text_columns {
1985 Some(gen_text_columns) => {
1986 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1987 }
1988 None => soft_panic_or_log!(
1989 "text_columns should be Some if text_cols_option is present"
1990 ),
1991 }
1992 }
1993 if let Some(exclude_cols_option) = with_options
1994 .iter_mut()
1995 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
1996 {
1997 match gen_exclude_columns {
1998 Some(gen_exclude_columns) => {
1999 exclude_cols_option.value =
2000 Some(WithOptionValue::Sequence(gen_exclude_columns))
2001 }
2002 None => soft_panic_or_log!(
2003 "exclude_columns should be Some if exclude_cols_option is present"
2004 ),
2005 }
2006 }
2007 match columns {
2008 TableFromSourceColumns::Defined(_) => unreachable!(),
2009 TableFromSourceColumns::NotSpecified => {
2010 *columns = TableFromSourceColumns::Defined(gen_columns);
2011 *constraints = gen_constraints;
2012 }
2013 TableFromSourceColumns::Named(_) => {
2014 sql_bail!("columns cannot be named for Postgres sources")
2015 }
2016 }
2017 with_options.push(TableFromSourceOption {
2018 name: TableFromSourceOptionName::Details,
2019 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2020 gen_details.into_proto().encode_to_vec(),
2021 )))),
2022 })
2023 }
2024 PurifiedExportDetails::MySql { .. } => {
2025 let mysql::MySqlExportStatementValues {
2026 columns: gen_columns,
2027 constraints: gen_constraints,
2028 text_columns: gen_text_columns,
2029 exclude_columns: gen_exclude_columns,
2030 details: gen_details,
2031 external_reference: _,
2032 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2033
2034 if let Some(text_cols_option) = with_options
2035 .iter_mut()
2036 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2037 {
2038 match gen_text_columns {
2039 Some(gen_text_columns) => {
2040 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2041 }
2042 None => soft_panic_or_log!(
2043 "text_columns should be Some if text_cols_option is present"
2044 ),
2045 }
2046 }
2047 if let Some(exclude_cols_option) = with_options
2048 .iter_mut()
2049 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2050 {
2051 match gen_exclude_columns {
2052 Some(gen_exclude_columns) => {
2053 exclude_cols_option.value =
2054 Some(WithOptionValue::Sequence(gen_exclude_columns))
2055 }
2056 None => soft_panic_or_log!(
2057 "exclude_columns should be Some if exclude_cols_option is present"
2058 ),
2059 }
2060 }
2061 match columns {
2062 TableFromSourceColumns::Defined(_) => unreachable!(),
2063 TableFromSourceColumns::NotSpecified => {
2064 *columns = TableFromSourceColumns::Defined(gen_columns);
2065 *constraints = gen_constraints;
2066 }
2067 TableFromSourceColumns::Named(_) => {
2068 sql_bail!("columns cannot be named for MySQL sources")
2069 }
2070 }
2071 with_options.push(TableFromSourceOption {
2072 name: TableFromSourceOptionName::Details,
2073 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2074 gen_details.into_proto().encode_to_vec(),
2075 )))),
2076 })
2077 }
2078 PurifiedExportDetails::SqlServer { .. } => {
2079 let sql_server::SqlServerExportStatementValues {
2080 columns: gen_columns,
2081 constraints: gen_constraints,
2082 text_columns: gen_text_columns,
2083 excl_columns: gen_excl_columns,
2084 details: gen_details,
2085 external_reference: _,
2086 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2087
2088 if let Some(text_cols_option) = with_options
2089 .iter_mut()
2090 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2091 {
2092 match gen_text_columns {
2093 Some(gen_text_columns) => {
2094 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2095 }
2096 None => soft_panic_or_log!(
2097 "text_columns should be Some if text_cols_option is present"
2098 ),
2099 }
2100 }
2101 if let Some(exclude_cols_option) = with_options
2102 .iter_mut()
2103 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2104 {
2105 match gen_excl_columns {
2106 Some(gen_excl_columns) => {
2107 exclude_cols_option.value =
2108 Some(WithOptionValue::Sequence(gen_excl_columns))
2109 }
2110 None => soft_panic_or_log!(
2111 "excl_columns should be Some if excl_cols_option is present"
2112 ),
2113 }
2114 }
2115
2116 match columns {
2117 TableFromSourceColumns::NotSpecified => {
2118 *columns = TableFromSourceColumns::Defined(gen_columns);
2119 *constraints = gen_constraints;
2120 }
2121 TableFromSourceColumns::Named(_) => {
2122 sql_bail!("columns cannot be named for SQL Server sources")
2123 }
2124 TableFromSourceColumns::Defined(_) => unreachable!(),
2125 }
2126
2127 with_options.push(TableFromSourceOption {
2128 name: TableFromSourceOptionName::Details,
2129 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2130 gen_details.into_proto().encode_to_vec(),
2131 )))),
2132 })
2133 }
2134 PurifiedExportDetails::LoadGenerator { .. } => {
2135 let (desc, output) = match purified_export.details {
2136 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2137 _ => unreachable!("purified export details must be load generator"),
2138 };
2139 if let Some(desc) = desc {
2144 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2145 match columns {
2146 TableFromSourceColumns::Defined(_) => unreachable!(),
2147 TableFromSourceColumns::NotSpecified => {
2148 *columns = TableFromSourceColumns::Defined(gen_columns);
2149 *constraints = gen_constraints;
2150 }
2151 TableFromSourceColumns::Named(_) => {
2152 sql_bail!("columns cannot be named for multi-output load generator sources")
2153 }
2154 }
2155 }
2156 let details = SourceExportStatementDetails::LoadGenerator { output };
2157 with_options.push(TableFromSourceOption {
2158 name: TableFromSourceOptionName::Details,
2159 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2160 details.into_proto().encode_to_vec(),
2161 )))),
2162 })
2163 }
2164 PurifiedExportDetails::Kafka {} => {
2165 let details = SourceExportStatementDetails::Kafka {};
2169 with_options.push(TableFromSourceOption {
2170 name: TableFromSourceOptionName::Details,
2171 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2172 details.into_proto().encode_to_vec(),
2173 )))),
2174 })
2175 }
2176 };
2177
2178 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2182}
2183
2184enum SourceFormatOptions {
2185 Default,
2186 Kafka { topic: String },
2187}
2188
2189async fn purify_source_format(
2190 catalog: &dyn SessionCatalog,
2191 format: &mut Option<FormatSpecifier<Aug>>,
2192 options: &SourceFormatOptions,
2193 envelope: &Option<SourceEnvelope>,
2194 storage_configuration: &StorageConfiguration,
2195) -> Result<(), PlanError> {
2196 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2197 && !matches!(options, SourceFormatOptions::Kafka { .. })
2198 {
2199 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2200 }
2201
2202 match format.as_mut() {
2203 None => {}
2204 Some(FormatSpecifier::Bare(format)) => {
2205 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2206 .await?;
2207 }
2208
2209 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2210 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2211 .await?;
2212 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2213 .await?;
2214 }
2215 }
2216 Ok(())
2217}
2218
2219async fn purify_source_format_single(
2220 catalog: &dyn SessionCatalog,
2221 format: &mut Format<Aug>,
2222 options: &SourceFormatOptions,
2223 envelope: &Option<SourceEnvelope>,
2224 storage_configuration: &StorageConfiguration,
2225) -> Result<(), PlanError> {
2226 match format {
2227 Format::Avro(schema) => match schema {
2228 AvroSchema::Csr { csr_connection } => {
2229 purify_csr_connection_avro(
2230 catalog,
2231 options,
2232 csr_connection,
2233 envelope,
2234 storage_configuration,
2235 )
2236 .await?
2237 }
2238 AvroSchema::InlineSchema { .. } => {}
2239 },
2240 Format::Protobuf(schema) => match schema {
2241 ProtobufSchema::Csr { csr_connection } => {
2242 purify_csr_connection_proto(
2243 catalog,
2244 options,
2245 csr_connection,
2246 envelope,
2247 storage_configuration,
2248 )
2249 .await?;
2250 }
2251 ProtobufSchema::InlineSchema { .. } => {}
2252 },
2253 Format::Bytes
2254 | Format::Regex(_)
2255 | Format::Json { .. }
2256 | Format::Text
2257 | Format::Csv { .. } => (),
2258 }
2259 Ok(())
2260}
2261
2262pub fn generate_subsource_statements(
2263 scx: &StatementContext,
2264 source_name: ResolvedItemName,
2265 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2266) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2267 if subsources.is_empty() {
2269 return Ok(vec![]);
2270 }
2271 let (_, purified_export) = subsources.iter().next().unwrap();
2272
2273 let statements = match &purified_export.details {
2274 PurifiedExportDetails::Postgres { .. } => {
2275 crate::pure::postgres::generate_create_subsource_statements(
2276 scx,
2277 source_name,
2278 subsources,
2279 )?
2280 }
2281 PurifiedExportDetails::MySql { .. } => {
2282 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2283 }
2284 PurifiedExportDetails::SqlServer { .. } => {
2285 crate::pure::sql_server::generate_create_subsource_statements(
2286 scx,
2287 source_name,
2288 subsources,
2289 )?
2290 }
2291 PurifiedExportDetails::LoadGenerator { .. } => {
2292 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2293 for (subsource_name, purified_export) in subsources {
2294 let (desc, output) = match purified_export.details {
2295 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2296 _ => unreachable!("purified export details must be load generator"),
2297 };
2298 let desc =
2299 desc.expect("subsources cannot be generated for single-output load generators");
2300
2301 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2302 let details = SourceExportStatementDetails::LoadGenerator { output };
2303 let subsource = CreateSubsourceStatement {
2305 name: subsource_name,
2306 columns,
2307 of_source: Some(source_name.clone()),
2308 constraints: table_constraints,
2313 if_not_exists: false,
2314 with_options: vec![
2315 CreateSubsourceOption {
2316 name: CreateSubsourceOptionName::ExternalReference,
2317 value: Some(WithOptionValue::UnresolvedItemName(
2318 purified_export.external_reference,
2319 )),
2320 },
2321 CreateSubsourceOption {
2322 name: CreateSubsourceOptionName::Details,
2323 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2324 details.into_proto().encode_to_vec(),
2325 )))),
2326 },
2327 ],
2328 };
2329 subsource_stmts.push(subsource);
2330 }
2331
2332 subsource_stmts
2333 }
2334 PurifiedExportDetails::Kafka { .. } => {
2335 assert!(
2339 subsources.is_empty(),
2340 "Kafka sources do not produce data-bearing subsources"
2341 );
2342 vec![]
2343 }
2344 };
2345 Ok(statements)
2346}
2347
2348async fn purify_csr_connection_proto(
2349 catalog: &dyn SessionCatalog,
2350 options: &SourceFormatOptions,
2351 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2352 envelope: &Option<SourceEnvelope>,
2353 storage_configuration: &StorageConfiguration,
2354) -> Result<(), PlanError> {
2355 let SourceFormatOptions::Kafka { topic } = options else {
2356 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2357 };
2358
2359 let CsrConnectionProtobuf {
2360 seed,
2361 connection: CsrConnection {
2362 connection,
2363 options: _,
2364 },
2365 } = csr_connection;
2366 match seed {
2367 None => {
2368 let scx = StatementContext::new(None, &*catalog);
2369
2370 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2371 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2372 _ => sql_bail!("{} is not a schema registry connection", connection),
2373 };
2374
2375 let ccsr_client = ccsr_connection
2376 .connect(storage_configuration, InTask::No)
2377 .await
2378 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2379
2380 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2381 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2382 .await
2383 .ok();
2384
2385 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2386 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2387 }
2388
2389 *seed = Some(CsrSeedProtobuf { value, key });
2390 }
2391 Some(_) => (),
2392 }
2393
2394 Ok(())
2395}
2396
2397async fn purify_csr_connection_avro(
2398 catalog: &dyn SessionCatalog,
2399 options: &SourceFormatOptions,
2400 csr_connection: &mut CsrConnectionAvro<Aug>,
2401 envelope: &Option<SourceEnvelope>,
2402 storage_configuration: &StorageConfiguration,
2403) -> Result<(), PlanError> {
2404 let SourceFormatOptions::Kafka { topic } = options else {
2405 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2406 };
2407
2408 let CsrConnectionAvro {
2409 connection: CsrConnection { connection, .. },
2410 seed,
2411 key_strategy,
2412 value_strategy,
2413 } = csr_connection;
2414 if seed.is_none() {
2415 let scx = StatementContext::new(None, &*catalog);
2416 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2417 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2418 _ => sql_bail!("{} is not a schema registry connection", connection),
2419 };
2420 let ccsr_client = csr_connection
2421 .connect(storage_configuration, InTask::No)
2422 .await
2423 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2424
2425 let Schema {
2426 key_schema,
2427 value_schema,
2428 } = get_remote_csr_schema(
2429 &ccsr_client,
2430 key_strategy.clone().unwrap_or_default(),
2431 value_strategy.clone().unwrap_or_default(),
2432 topic,
2433 )
2434 .await?;
2435 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2436 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2437 }
2438
2439 *seed = Some(CsrSeedAvro {
2440 key_schema,
2441 value_schema,
2442 })
2443 }
2444
2445 Ok(())
2446}
2447
2448#[derive(Debug)]
2449pub struct Schema {
2450 pub key_schema: Option<String>,
2451 pub value_schema: String,
2452}
2453
2454async fn get_schema_with_strategy(
2455 client: &Client,
2456 strategy: ReaderSchemaSelectionStrategy,
2457 subject: &str,
2458) -> Result<Option<String>, PlanError> {
2459 match strategy {
2460 ReaderSchemaSelectionStrategy::Latest => {
2461 match client.get_schema_by_subject(subject).await {
2462 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2463 Err(GetBySubjectError::SubjectNotFound)
2464 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2465 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2466 schema_lookup: format!("subject {}", subject.quoted()),
2467 cause: Arc::new(e),
2468 }),
2469 }
2470 }
2471 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2472 ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2473 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2474 Err(GetByIdError::SchemaNotFound) => Ok(None),
2475 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2476 schema_lookup: format!("ID {}", id),
2477 cause: Arc::new(e),
2478 }),
2479 },
2480 }
2481}
2482
2483async fn get_remote_csr_schema(
2484 ccsr_client: &mz_ccsr::Client,
2485 key_strategy: ReaderSchemaSelectionStrategy,
2486 value_strategy: ReaderSchemaSelectionStrategy,
2487 topic: &str,
2488) -> Result<Schema, PlanError> {
2489 let value_schema_name = format!("{}-value", topic);
2490 let value_schema =
2491 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2492 let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2493 let subject = format!("{}-key", topic);
2494 let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2495 Ok(Schema {
2496 key_schema,
2497 value_schema,
2498 })
2499}
2500
2501async fn compile_proto(
2503 subject_name: &String,
2504 ccsr_client: &Client,
2505) -> Result<CsrSeedProtobufSchema, PlanError> {
2506 let (primary_subject, dependency_subjects) = ccsr_client
2507 .get_subject_and_references(subject_name)
2508 .await
2509 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2510 schema_lookup: format!("subject {}", subject_name.quoted()),
2511 cause: Arc::new(e),
2512 })?;
2513
2514 let mut source_tree = VirtualSourceTree::new();
2516 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2517 source_tree.as_mut().add_file(
2518 Path::new(&subject.name),
2519 subject.schema.raw.as_bytes().to_vec(),
2520 );
2521 }
2522 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2523 let fds = db
2524 .as_mut()
2525 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2526 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2527
2528 let primary_fd = fds.file(0);
2530 let message_name = match primary_fd.message_type_size() {
2531 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2532 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2533 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2534 };
2535
2536 let bytes = &fds
2538 .serialize()
2539 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2540 let mut schema = String::new();
2541 strconv::format_bytes(&mut schema, bytes);
2542
2543 Ok(CsrSeedProtobufSchema {
2544 schema,
2545 message_name,
2546 })
2547}
2548
2549const MZ_NOW_NAME: &str = "mz_now";
2550const MZ_NOW_SCHEMA: &str = "mz_catalog";
2551
2552pub fn purify_create_materialized_view_options(
2558 catalog: impl SessionCatalog,
2559 mz_now: Option<Timestamp>,
2560 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2561 resolved_ids: &mut ResolvedIds,
2562) {
2563 let (mz_now_id, mz_now_expr) = {
2566 let item = catalog
2567 .resolve_function(&PartialItemName {
2568 database: None,
2569 schema: Some(MZ_NOW_SCHEMA.to_string()),
2570 item: MZ_NOW_NAME.to_string(),
2571 })
2572 .expect("we should be able to resolve mz_now");
2573 (
2574 item.id(),
2575 Expr::Function(Function {
2576 name: ResolvedItemName::Item {
2577 id: item.id(),
2578 qualifiers: item.name().qualifiers.clone(),
2579 full_name: catalog.resolve_full_name(item.name()),
2580 print_id: false,
2581 version: RelationVersionSelector::Latest,
2582 },
2583 args: FunctionArgs::Args {
2584 args: Vec::new(),
2585 order_by: Vec::new(),
2586 },
2587 filter: None,
2588 over: None,
2589 distinct: false,
2590 }),
2591 )
2592 };
2593 let (mz_timestamp_id, mz_timestamp_type) = {
2595 let item = catalog.get_system_type("mz_timestamp");
2596 let full_name = catalog.resolve_full_name(item.name());
2597 (
2598 item.id(),
2599 ResolvedDataType::Named {
2600 id: item.id(),
2601 qualifiers: item.name().qualifiers.clone(),
2602 full_name,
2603 modifiers: vec![],
2604 print_id: true,
2605 },
2606 )
2607 };
2608
2609 let mut introduced_mz_timestamp = false;
2610
2611 for option in cmvs.with_options.iter_mut() {
2612 if matches!(
2614 option.value,
2615 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2616 ) {
2617 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2618 RefreshAtOptionValue {
2619 time: mz_now_expr.clone(),
2620 },
2621 )));
2622 }
2623
2624 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2626 RefreshEveryOptionValue { aligned_to, .. },
2627 ))) = &mut option.value
2628 {
2629 if aligned_to.is_none() {
2630 *aligned_to = Some(mz_now_expr.clone());
2631 }
2632 }
2633
2634 match &mut option.value {
2637 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2638 time,
2639 }))) => {
2640 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2641 visitor.visit_expr_mut(time);
2642 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2643 }
2644 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2645 RefreshEveryOptionValue {
2646 interval: _,
2647 aligned_to: Some(aligned_to),
2648 },
2649 ))) => {
2650 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2651 visitor.visit_expr_mut(aligned_to);
2652 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2653 }
2654 _ => {}
2655 }
2656 }
2657
2658 if !cmvs.with_options.iter().any(|o| {
2660 matches!(
2661 o,
2662 MaterializedViewOption {
2663 value: Some(WithOptionValue::Refresh(..)),
2664 ..
2665 }
2666 )
2667 }) {
2668 cmvs.with_options.push(MaterializedViewOption {
2669 name: MaterializedViewOptionName::Refresh,
2670 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2671 })
2672 }
2673
2674 if introduced_mz_timestamp {
2678 resolved_ids.add_item(mz_timestamp_id);
2679 }
2680 let mut visitor = ExprContainsTemporalVisitor::new();
2684 visitor.visit_create_materialized_view_statement(cmvs);
2685 if !visitor.contains_temporal {
2686 resolved_ids.remove_item(&mz_now_id);
2687 }
2688}
2689
2690pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2693 match &mvo.value {
2694 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2695 let mut visitor = ExprContainsTemporalVisitor::new();
2696 visitor.visit_expr(time);
2697 visitor.contains_temporal
2698 }
2699 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2700 interval: _,
2701 aligned_to: Some(aligned_to),
2702 }))) => {
2703 let mut visitor = ExprContainsTemporalVisitor::new();
2704 visitor.visit_expr(aligned_to);
2705 visitor.contains_temporal
2706 }
2707 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2708 interval: _,
2709 aligned_to: None,
2710 }))) => {
2711 true
2714 }
2715 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2716 true
2718 }
2719 _ => false,
2720 }
2721}
2722
2723struct ExprContainsTemporalVisitor {
2725 pub contains_temporal: bool,
2726}
2727
2728impl ExprContainsTemporalVisitor {
2729 pub fn new() -> ExprContainsTemporalVisitor {
2730 ExprContainsTemporalVisitor {
2731 contains_temporal: false,
2732 }
2733 }
2734}
2735
2736impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2737 fn visit_function(&mut self, func: &Function<Aug>) {
2738 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2739 visit_function(self, func);
2740 }
2741}
2742
2743struct MzNowPurifierVisitor {
2744 pub mz_now: Option<Timestamp>,
2745 pub mz_timestamp_type: ResolvedDataType,
2746 pub introduced_mz_timestamp: bool,
2747}
2748
2749impl MzNowPurifierVisitor {
2750 pub fn new(
2751 mz_now: Option<Timestamp>,
2752 mz_timestamp_type: ResolvedDataType,
2753 ) -> MzNowPurifierVisitor {
2754 MzNowPurifierVisitor {
2755 mz_now,
2756 mz_timestamp_type,
2757 introduced_mz_timestamp: false,
2758 }
2759 }
2760}
2761
2762impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2763 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2764 match expr {
2765 Expr::Function(Function {
2766 name:
2767 ResolvedItemName::Item {
2768 full_name: FullItemName { item, .. },
2769 ..
2770 },
2771 ..
2772 }) if item == &MZ_NOW_NAME.to_string() => {
2773 let mz_now = self.mz_now.expect(
2774 "we should have chosen a timestamp if the expression contains mz_now()",
2775 );
2776 *expr = Expr::Cast {
2779 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2780 data_type: self.mz_timestamp_type.clone(),
2781 };
2782 self.introduced_mz_timestamp = true;
2783 }
2784 _ => visit_expr_mut(self, expr),
2785 }
2786 }
2787}