1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::collections::CollectionExt;
27use mz_ore::error::ErrorExt;
28use mz_ore::future::InTask;
29use mz_ore::iter::IteratorExt;
30use mz_ore::str::StrExt;
31use mz_ore::{assert_none, soft_panic_or_log};
32use mz_postgres_util::desc::PostgresTableDesc;
33use mz_proto::RustType;
34use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_sql_parser::ast::visit::{Visit, visit_function};
37use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
38use mz_sql_parser::ast::{
39 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
40 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
41 CreateSinkStatement, CreateSourceOptionName, CreateSubsourceOption, CreateSubsourceOptionName,
42 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
43 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
44 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
45 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
46 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
47 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
48 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
49 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
50 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
51};
52use mz_sql_server_util::desc::SqlServerTableDesc;
53use mz_storage_types::configuration::StorageConfiguration;
54use mz_storage_types::connections::Connection;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::errors::ContextCreationError;
57use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
58use mz_storage_types::sources::mysql::MySqlSourceDetails;
59use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
60use mz_storage_types::sources::{
61 GenericSourceConnection, SourceConnection, SourceDesc, SourceExportStatementDetails,
62 SqlServerSourceExtras,
63};
64use prost::Message;
65use protobuf_native::MessageLite;
66use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
67use rdkafka::admin::AdminClient;
68use references::{RetrievedSourceReferences, SourceReferenceClient};
69use uuid::Uuid;
70
71use crate::ast::{
72 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
73 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
74 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
75};
76use crate::catalog::{CatalogItemType, SessionCatalog};
77use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
78use crate::names::{
79 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
80 ResolvedItemName,
81};
82use crate::plan::error::PlanError;
83use crate::plan::statement::ddl::load_generator_ast_to_generator;
84use crate::plan::{SourceReferences, StatementContext};
85use crate::pure::error::SqlServerSourcePurificationError;
86use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
87use crate::{kafka_util, normalize};
88
89use self::error::{
90 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
91 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
92};
93
94pub(crate) mod error;
95mod references;
96
97pub mod mysql;
98pub mod postgres;
99pub mod sql_server;
100
101pub(crate) struct RequestedSourceExport<T> {
102 external_reference: UnresolvedItemName,
103 name: UnresolvedItemName,
104 meta: T,
105}
106
107impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("RequestedSourceExport")
110 .field("external_reference", &self.external_reference)
111 .field("name", &self.name)
112 .field("meta", &self.meta)
113 .finish()
114 }
115}
116
117impl<T> RequestedSourceExport<T> {
118 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
119 RequestedSourceExport {
120 external_reference: self.external_reference,
121 name: self.name,
122 meta: new_meta,
123 }
124 }
125}
126
127fn source_export_name_gen(
132 source_name: &UnresolvedItemName,
133 subsource_name: &str,
134) -> Result<UnresolvedItemName, PlanError> {
135 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
136 partial.item = subsource_name.to_string();
137 Ok(UnresolvedItemName::from(partial))
138}
139
140fn validate_source_export_names<T>(
144 requested_source_exports: &[RequestedSourceExport<T>],
145) -> Result<(), PlanError> {
146 if let Some(name) = requested_source_exports
150 .iter()
151 .map(|subsource| &subsource.name)
152 .duplicates()
153 .next()
154 .cloned()
155 {
156 let mut upstream_references: Vec<_> = requested_source_exports
157 .into_iter()
158 .filter_map(|subsource| {
159 if &subsource.name == &name {
160 Some(subsource.external_reference.clone())
161 } else {
162 None
163 }
164 })
165 .collect();
166
167 upstream_references.sort();
168
169 Err(PlanError::SubsourceNameConflict {
170 name,
171 upstream_references,
172 })?;
173 }
174
175 if let Some(name) = requested_source_exports
181 .iter()
182 .map(|export| &export.external_reference)
183 .duplicates()
184 .next()
185 .cloned()
186 {
187 let mut target_names: Vec<_> = requested_source_exports
188 .into_iter()
189 .filter_map(|export| {
190 if &export.external_reference == &name {
191 Some(export.name.clone())
192 } else {
193 None
194 }
195 })
196 .collect();
197
198 target_names.sort();
199
200 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
201 }
202
203 Ok(())
204}
205
206#[derive(Debug, Clone, PartialEq, Eq)]
207pub enum PurifiedStatement {
208 PurifiedCreateSource {
209 create_progress_subsource_stmt: CreateSubsourceStatement<Aug>,
210 create_source_stmt: CreateSourceStatement<Aug>,
211 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
213 available_source_references: SourceReferences,
216 },
217 PurifiedAlterSource {
218 alter_source_stmt: AlterSourceStatement<Aug>,
219 },
220 PurifiedAlterSourceAddSubsources {
221 source_name: ResolvedItemName,
223 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
226 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
228 },
229 PurifiedAlterSourceRefreshReferences {
230 source_name: ResolvedItemName,
231 available_source_references: SourceReferences,
233 },
234 PurifiedCreateSink(CreateSinkStatement<Aug>),
235 PurifiedCreateTableFromSource {
236 stmt: CreateTableFromSourceStatement<Aug>,
237 },
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PurifiedSourceExport {
242 pub external_reference: UnresolvedItemName,
243 pub details: PurifiedExportDetails,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub enum PurifiedExportDetails {
248 MySql {
249 table: MySqlTableDesc,
250 text_columns: Option<Vec<Ident>>,
251 exclude_columns: Option<Vec<Ident>>,
252 initial_gtid_set: String,
253 },
254 Postgres {
255 table: PostgresTableDesc,
256 text_columns: Option<Vec<Ident>>,
257 },
258 SqlServer {
259 table: SqlServerTableDesc,
260 text_columns: Option<Vec<Ident>>,
261 excl_columns: Option<Vec<Ident>>,
262 capture_instance: Arc<str>,
263 initial_lsn: mz_sql_server_util::cdc::Lsn,
264 },
265 Kafka {},
266 LoadGenerator {
267 table: Option<RelationDesc>,
268 output: LoadGeneratorOutput,
269 },
270}
271
272pub async fn purify_statement(
282 catalog: impl SessionCatalog,
283 now: u64,
284 stmt: Statement<Aug>,
285 storage_configuration: &StorageConfiguration,
286) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
287 match stmt {
288 Statement::CreateSource(stmt) => {
289 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
290 (
291 purify_create_source(catalog, now, stmt, storage_configuration).await,
292 cluster_id,
293 )
294 }
295 Statement::AlterSource(stmt) => (
296 purify_alter_source(catalog, stmt, storage_configuration).await,
297 None,
298 ),
299 Statement::CreateSink(stmt) => {
300 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
301 (
302 purify_create_sink(catalog, stmt, storage_configuration).await,
303 cluster_id,
304 )
305 }
306 Statement::CreateTableFromSource(stmt) => (
307 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
308 None,
309 ),
310 o => unreachable!("{:?} does not need to be purified", o),
311 }
312}
313
314pub(crate) fn purify_create_sink_avro_doc_on_options(
319 catalog: &dyn SessionCatalog,
320 from_id: CatalogItemId,
321 format: &mut Option<FormatSpecifier<Aug>>,
322) -> Result<(), PlanError> {
323 let from = catalog.get_item(&from_id);
325 let object_ids = from
326 .references()
327 .items()
328 .copied()
329 .chain_one(from.id())
330 .collect::<Vec<_>>();
331
332 let mut avro_format_options = vec![];
335 for_each_format(format, |doc_on_schema, fmt| match fmt {
336 Format::Avro(AvroSchema::InlineSchema { .. })
337 | Format::Bytes
338 | Format::Csv { .. }
339 | Format::Json { .. }
340 | Format::Protobuf(..)
341 | Format::Regex(..)
342 | Format::Text => (),
343 Format::Avro(AvroSchema::Csr {
344 csr_connection: CsrConnectionAvro { connection, .. },
345 }) => {
346 avro_format_options.push((doc_on_schema, &mut connection.options));
347 }
348 });
349
350 for (for_schema, options) in avro_format_options {
353 let user_provided_comments = options
354 .iter()
355 .filter_map(|CsrConfigOption { name, .. }| match name {
356 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
357 _ => None,
358 })
359 .collect::<BTreeSet<_>>();
360
361 for object_id in &object_ids {
363 let item = catalog
365 .get_item(object_id)
366 .at_version(RelationVersionSelector::Latest);
367 let full_name = catalog.resolve_full_name(item.name());
368 let full_resolved_name = ResolvedItemName::Item {
369 id: *object_id,
370 qualifiers: item.name().qualifiers.clone(),
371 full_name: full_name.clone(),
372 print_id: !matches!(
373 item.item_type(),
374 CatalogItemType::Func | CatalogItemType::Type
375 ),
376 version: RelationVersionSelector::Latest,
377 };
378
379 if let Some(comments_map) = catalog.get_item_comments(object_id) {
380 let doc_on_item_key = AvroDocOn {
383 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384 for_schema,
385 };
386 if !user_provided_comments.contains(&doc_on_item_key) {
387 if let Some(root_comment) = comments_map.get(&None) {
388 options.push(CsrConfigOption {
389 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391 root_comment.clone(),
392 ))),
393 });
394 }
395 }
396
397 if let Ok(desc) = item.desc(&full_name) {
401 for (pos, column_name) in desc.iter_names().enumerate() {
402 let comment = comments_map.get(&Some(pos + 1));
403 if let Some(comment_str) = comment {
404 let doc_on_column_key = AvroDocOn {
405 identifier: DocOnIdentifier::Column(ColumnName {
406 relation: full_resolved_name.clone(),
407 column: ResolvedColumnReference::Column {
408 name: column_name.to_owned(),
409 index: pos,
410 },
411 }),
412 for_schema,
413 };
414 if !user_provided_comments.contains(&doc_on_column_key) {
415 options.push(CsrConfigOption {
416 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
417 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
418 Value::String(comment_str.clone()),
419 )),
420 });
421 }
422 }
423 }
424 }
425 }
426 }
427 }
428
429 Ok(())
430}
431
432async fn purify_create_sink(
440 catalog: impl SessionCatalog,
441 mut create_sink_stmt: CreateSinkStatement<Aug>,
442 storage_configuration: &StorageConfiguration,
443) -> Result<PurifiedStatement, PlanError> {
444 let CreateSinkStatement {
446 connection,
447 format,
448 with_options,
449 name: _,
450 in_cluster: _,
451 if_not_exists: _,
452 from,
453 envelope: _,
454 } = &mut create_sink_stmt;
455
456 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
458
459 if let Some(op) = with_options
460 .iter()
461 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
462 {
463 sql_bail!(
464 "CREATE SINK...WITH ({}..) is not allowed",
465 op.name.to_ast_string_simple(),
466 )
467 }
468
469 match &connection {
470 CreateSinkConnection::Kafka {
471 connection,
472 options,
473 key: _,
474 headers: _,
475 } => {
476 let scx = StatementContext::new(None, &catalog);
477 let connection = {
478 let item = scx.get_item_by_resolved_name(connection)?;
479 match item.connection()? {
481 Connection::Kafka(connection) => {
482 connection.clone().into_inline_connection(scx.catalog)
483 }
484 _ => sql_bail!(
485 "{} is not a kafka connection",
486 scx.catalog.resolve_full_name(item.name())
487 ),
488 }
489 };
490
491 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
492
493 if extracted_options.legacy_ids == Some(true) {
494 sql_bail!("LEGACY IDs option is not supported");
495 }
496
497 let client: AdminClient<_> = connection
498 .create_with_context(
499 storage_configuration,
500 MzClientContext::default(),
501 &BTreeMap::new(),
502 InTask::No,
503 )
504 .await
505 .map_err(|e| {
506 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
508 })?;
509
510 let metadata = client
511 .inner()
512 .fetch_metadata(
513 None,
514 storage_configuration
515 .parameters
516 .kafka_timeout_config
517 .fetch_metadata_timeout,
518 )
519 .map_err(|e| {
520 KafkaSinkPurificationError::AdminClientError(Arc::new(
521 ContextCreationError::KafkaError(e),
522 ))
523 })?;
524
525 if metadata.brokers().len() == 0 {
526 Err(KafkaSinkPurificationError::ZeroBrokers)?;
527 }
528 }
529 }
530
531 let mut csr_connection_ids = BTreeSet::new();
532 for_each_format(format, |_, fmt| match fmt {
533 Format::Avro(AvroSchema::InlineSchema { .. })
534 | Format::Bytes
535 | Format::Csv { .. }
536 | Format::Json { .. }
537 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
538 | Format::Regex(..)
539 | Format::Text => (),
540 Format::Avro(AvroSchema::Csr {
541 csr_connection: CsrConnectionAvro { connection, .. },
542 })
543 | Format::Protobuf(ProtobufSchema::Csr {
544 csr_connection: CsrConnectionProtobuf { connection, .. },
545 }) => {
546 csr_connection_ids.insert(*connection.connection.item_id());
547 }
548 });
549
550 let scx = StatementContext::new(None, &catalog);
551 for csr_connection_id in csr_connection_ids {
552 let connection = {
553 let item = scx.get_item(&csr_connection_id);
554 match item.connection()? {
556 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
557 _ => Err(CsrPurificationError::NotCsrConnection(
558 scx.catalog.resolve_full_name(item.name()),
559 ))?,
560 }
561 };
562
563 let client = connection
564 .connect(storage_configuration, InTask::No)
565 .await
566 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
567
568 client
569 .list_subjects()
570 .await
571 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
572 }
573
574 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
575
576 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
577}
578
579fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
586where
587 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
588{
589 match format {
590 None => (),
591 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
592 Some(FormatSpecifier::KeyValue { key, value }) => {
593 f(DocOnSchema::KeyOnly, key);
594 f(DocOnSchema::ValueOnly, value);
595 }
596 }
597}
598
599#[derive(Debug, Copy, Clone, PartialEq, Eq)]
602pub(crate) enum SourceReferencePolicy {
603 NotAllowed,
606 Optional,
609 Required,
612}
613
614async fn purify_create_source(
615 catalog: impl SessionCatalog,
616 now: u64,
617 mut create_source_stmt: CreateSourceStatement<Aug>,
618 storage_configuration: &StorageConfiguration,
619) -> Result<PurifiedStatement, PlanError> {
620 let CreateSourceStatement {
621 name: source_name,
622 connection: source_connection,
623 format,
624 envelope,
625 include_metadata,
626 external_references,
627 progress_subsource,
628 with_options,
629 ..
630 } = &mut create_source_stmt;
631
632 if let Some(DeferredItemName::Named(_)) = progress_subsource {
633 sql_bail!("Cannot manually ID qualify progress subsource")
634 }
635
636 let mut requested_subsource_map = BTreeMap::new();
637
638 let progress_desc = match &source_connection {
639 CreateSourceConnection::Kafka { .. } => {
640 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
641 }
642 CreateSourceConnection::Postgres { .. } => {
643 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
644 }
645 CreateSourceConnection::SqlServer { .. } => {
646 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
647 }
648 CreateSourceConnection::MySql { .. } => {
649 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
650 }
651 CreateSourceConnection::LoadGenerator { .. } => {
652 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
653 }
654 };
655 let scx = StatementContext::new(None, &catalog);
656
657 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
661 && scx.catalog.system_vars().force_source_table_syntax()
662 {
663 SourceReferencePolicy::NotAllowed
664 } else if scx.catalog.system_vars().enable_create_table_from_source() {
665 SourceReferencePolicy::Optional
666 } else {
667 SourceReferencePolicy::Required
668 };
669
670 let mut format_options = SourceFormatOptions::Default;
671
672 let retrieved_source_references: RetrievedSourceReferences;
673
674 match source_connection {
675 CreateSourceConnection::Kafka {
676 connection,
677 options: base_with_options,
678 ..
679 } => {
680 if let Some(external_references) = external_references {
681 Err(KafkaSourcePurificationError::ReferencedSubsources(
682 external_references.clone(),
683 ))?;
684 }
685
686 let connection = {
687 let item = scx.get_item_by_resolved_name(connection)?;
688 match item.connection()? {
690 Connection::Kafka(connection) => {
691 connection.clone().into_inline_connection(&catalog)
692 }
693 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
694 scx.catalog.resolve_full_name(item.name()),
695 ))?,
696 }
697 };
698
699 let extracted_options: KafkaSourceConfigOptionExtracted =
700 base_with_options.clone().try_into()?;
701
702 let topic = extracted_options
703 .topic
704 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
705
706 let consumer = connection
707 .create_with_context(
708 storage_configuration,
709 MzClientContext::default(),
710 &BTreeMap::new(),
711 InTask::No,
712 )
713 .await
714 .map_err(|e| {
715 KafkaSourcePurificationError::KafkaConsumerError(
717 e.display_with_causes().to_string(),
718 )
719 })?;
720 let consumer = Arc::new(consumer);
721
722 match (
723 extracted_options.start_offset,
724 extracted_options.start_timestamp,
725 ) {
726 (None, None) => {
727 kafka_util::ensure_topic_exists(
729 Arc::clone(&consumer),
730 &topic,
731 storage_configuration
732 .parameters
733 .kafka_timeout_config
734 .fetch_metadata_timeout,
735 )
736 .await?;
737 }
738 (Some(_), Some(_)) => {
739 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
740 }
741 (Some(start_offsets), None) => {
742 kafka_util::validate_start_offsets(
744 Arc::clone(&consumer),
745 &topic,
746 start_offsets,
747 storage_configuration
748 .parameters
749 .kafka_timeout_config
750 .fetch_metadata_timeout,
751 )
752 .await?;
753 }
754 (None, Some(time_offset)) => {
755 let start_offsets = kafka_util::lookup_start_offsets(
757 Arc::clone(&consumer),
758 &topic,
759 time_offset,
760 now,
761 storage_configuration
762 .parameters
763 .kafka_timeout_config
764 .fetch_metadata_timeout,
765 )
766 .await?;
767
768 base_with_options.retain(|val| {
769 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
770 });
771 base_with_options.push(KafkaSourceConfigOption {
772 name: KafkaSourceConfigOptionName::StartOffset,
773 value: Some(WithOptionValue::Sequence(
774 start_offsets
775 .iter()
776 .map(|offset| {
777 WithOptionValue::Value(Value::Number(offset.to_string()))
778 })
779 .collect(),
780 )),
781 });
782 }
783 }
784
785 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
786 retrieved_source_references = reference_client.get_source_references().await?;
787
788 format_options = SourceFormatOptions::Kafka { topic };
789 }
790 CreateSourceConnection::Postgres {
791 connection,
792 options,
793 } => {
794 let connection_item = scx.get_item_by_resolved_name(connection)?;
795 let connection = match connection_item.connection().map_err(PlanError::from)? {
796 Connection::Postgres(connection) => {
797 connection.clone().into_inline_connection(&catalog)
798 }
799 _ => Err(PgSourcePurificationError::NotPgConnection(
800 scx.catalog.resolve_full_name(connection_item.name()),
801 ))?,
802 };
803 let crate::plan::statement::PgConfigOptionExtracted {
804 publication,
805 text_columns,
806 details,
807 ..
808 } = options.clone().try_into()?;
809 let publication =
810 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
811
812 if details.is_some() {
813 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
814 }
815
816 let client = connection
817 .validate(connection_item.id(), storage_configuration)
818 .await?;
819
820 let reference_client = SourceReferenceClient::Postgres {
821 client: &client,
822 publication: &publication,
823 database: &connection.database,
824 };
825 retrieved_source_references = reference_client.get_source_references().await?;
826
827 let postgres::PurifiedSourceExports {
828 source_exports: subsources,
829 normalized_text_columns,
830 } = postgres::purify_source_exports(
831 &client,
832 &retrieved_source_references,
833 external_references,
834 text_columns,
835 source_name,
836 &reference_policy,
837 )
838 .await?;
839
840 if let Some(text_cols_option) = options
841 .iter_mut()
842 .find(|option| option.name == PgConfigOptionName::TextColumns)
843 {
844 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
845 }
846
847 requested_subsource_map.extend(subsources);
848
849 let timeline_id = mz_postgres_util::get_timeline_id(&client).await?;
852
853 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
855 let details = PostgresSourcePublicationDetails {
856 slot: format!(
857 "materialize_{}",
858 Uuid::new_v4().to_string().replace('-', "")
859 ),
860 timeline_id: Some(timeline_id),
861 database: connection.database,
862 };
863 options.push(PgConfigOption {
864 name: PgConfigOptionName::Details,
865 value: Some(WithOptionValue::Value(Value::String(hex::encode(
866 details.into_proto().encode_to_vec(),
867 )))),
868 })
869 }
870 CreateSourceConnection::SqlServer {
871 connection,
872 options,
873 } => {
874 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
875
876 let connection_item = scx.get_item_by_resolved_name(connection)?;
879 let connection = match connection_item.connection()? {
880 Connection::SqlServer(connection) => {
881 connection.clone().into_inline_connection(&catalog)
882 }
883 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
884 scx.catalog.resolve_full_name(connection_item.name()),
885 ))?,
886 };
887 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
888 details,
889 text_columns,
890 exclude_columns,
891 seen: _,
892 } = options.clone().try_into()?;
893
894 if details.is_some() {
895 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
896 }
897
898 let mut client = connection
899 .validate(connection_item.id(), storage_configuration)
900 .await?;
901
902 let database: Arc<str> = connection.database.into();
903 let reference_client = SourceReferenceClient::SqlServer {
904 client: &mut client,
905 database: Arc::clone(&database),
906 };
907 retrieved_source_references = reference_client.get_source_references().await?;
908 tracing::debug!(?retrieved_source_references, "got source references");
909
910 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
911 .get(storage_configuration.config_set());
912
913 let purified_source_exports = sql_server::purify_source_exports(
914 &*database,
915 &mut client,
916 &retrieved_source_references,
917 external_references,
918 &text_columns,
919 &exclude_columns,
920 source_name,
921 timeout,
922 &reference_policy,
923 )
924 .await?;
925
926 let sql_server::PurifiedSourceExports {
927 source_exports,
928 normalized_text_columns,
929 normalized_excl_columns,
930 } = purified_source_exports;
931
932 requested_subsource_map.extend(source_exports);
934
935 let restore_history_id =
939 mz_sql_server_util::inspect::get_latest_restore_history_id(&mut client).await?;
940 let details = SqlServerSourceExtras { restore_history_id };
941
942 options.retain(|SqlServerConfigOption { name, .. }| {
943 name != &SqlServerConfigOptionName::Details
944 });
945 options.push(SqlServerConfigOption {
946 name: SqlServerConfigOptionName::Details,
947 value: Some(WithOptionValue::Value(Value::String(hex::encode(
948 details.into_proto().encode_to_vec(),
949 )))),
950 });
951
952 if let Some(text_cols_option) = options
954 .iter_mut()
955 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
956 {
957 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
958 }
959 if let Some(excl_cols_option) = options
960 .iter_mut()
961 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
962 {
963 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
964 }
965 }
966 CreateSourceConnection::MySql {
967 connection,
968 options,
969 } => {
970 let connection_item = scx.get_item_by_resolved_name(connection)?;
971 let connection = match connection_item.connection()? {
972 Connection::MySql(connection) => {
973 connection.clone().into_inline_connection(&catalog)
974 }
975 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
976 scx.catalog.resolve_full_name(connection_item.name()),
977 ))?,
978 };
979 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
980 details,
981 text_columns,
982 exclude_columns,
983 seen: _,
984 } = options.clone().try_into()?;
985
986 if details.is_some() {
987 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
988 }
989
990 let mut conn = connection
991 .validate(connection_item.id(), storage_configuration)
992 .await
993 .map_err(MySqlSourcePurificationError::InvalidConnection)?;
994
995 let initial_gtid_set =
999 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1000
1001 let reference_client = SourceReferenceClient::MySql {
1002 conn: &mut conn,
1003 include_system_schemas: mysql::references_system_schemas(external_references),
1004 };
1005 retrieved_source_references = reference_client.get_source_references().await?;
1006
1007 let mysql::PurifiedSourceExports {
1008 source_exports: subsources,
1009 normalized_text_columns,
1010 normalized_exclude_columns,
1011 } = mysql::purify_source_exports(
1012 &mut conn,
1013 &retrieved_source_references,
1014 external_references,
1015 text_columns,
1016 exclude_columns,
1017 source_name,
1018 initial_gtid_set.clone(),
1019 &reference_policy,
1020 )
1021 .await?;
1022 requested_subsource_map.extend(subsources);
1023
1024 let details = MySqlSourceDetails {};
1027 options
1029 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1030 options.push(MySqlConfigOption {
1031 name: MySqlConfigOptionName::Details,
1032 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1033 details.into_proto().encode_to_vec(),
1034 )))),
1035 });
1036
1037 if let Some(text_cols_option) = options
1038 .iter_mut()
1039 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1040 {
1041 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1042 }
1043 if let Some(ignore_cols_option) = options
1044 .iter_mut()
1045 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1046 {
1047 ignore_cols_option.value =
1048 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1049 }
1050 }
1051 CreateSourceConnection::LoadGenerator { generator, options } => {
1052 let load_generator =
1053 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1054
1055 let reference_client = SourceReferenceClient::LoadGenerator {
1056 generator: &load_generator,
1057 };
1058 retrieved_source_references = reference_client.get_source_references().await?;
1059 let multi_output_sources =
1063 retrieved_source_references
1064 .all_references()
1065 .iter()
1066 .any(|r| {
1067 r.load_generator_output().expect("is loadgen")
1068 != &LoadGeneratorOutput::Default
1069 });
1070
1071 match external_references {
1072 Some(requested)
1073 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1074 {
1075 Err(PlanError::UseTablesForSources(requested.to_string()))?
1076 }
1077 Some(requested) if !multi_output_sources => match requested {
1078 ExternalReferences::SubsetTables(_) => {
1079 Err(LoadGeneratorSourcePurificationError::ForTables)?
1080 }
1081 ExternalReferences::SubsetSchemas(_) => {
1082 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1083 }
1084 ExternalReferences::All => {
1085 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1086 }
1087 },
1088 Some(requested) => {
1089 let requested_exports = retrieved_source_references
1090 .requested_source_exports(Some(requested), source_name)?;
1091 for export in requested_exports {
1092 requested_subsource_map.insert(
1093 export.name,
1094 PurifiedSourceExport {
1095 external_reference: export.external_reference,
1096 details: PurifiedExportDetails::LoadGenerator {
1097 table: export
1098 .meta
1099 .load_generator_desc()
1100 .expect("is loadgen")
1101 .clone(),
1102 output: export
1103 .meta
1104 .load_generator_output()
1105 .expect("is loadgen")
1106 .clone(),
1107 },
1108 },
1109 );
1110 }
1111 }
1112 None => {
1113 if multi_output_sources
1114 && matches!(reference_policy, SourceReferencePolicy::Required)
1115 {
1116 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1117 }
1118 }
1119 }
1120
1121 if let LoadGenerator::Clock = generator {
1122 if !options
1123 .iter()
1124 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1125 {
1126 let now = catalog.now();
1127 options.push(LoadGeneratorOption {
1128 name: LoadGeneratorOptionName::AsOf,
1129 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1130 });
1131 }
1132 }
1133 }
1134 }
1135
1136 *external_references = None;
1140
1141 let name = match progress_subsource {
1145 Some(name) => match name {
1146 DeferredItemName::Deferred(name) => name.clone(),
1147 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1148 },
1149 None => {
1150 let (item, prefix) = source_name.0.split_last().unwrap();
1151 let item_name = Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1152 let mut suggested_name = prefix.to_vec();
1153 suggested_name.push(candidate.clone());
1154
1155 let partial = normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1156 let qualified = scx.allocate_qualified_name(partial)?;
1157 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1158 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1159 Ok::<_, PlanError>(!item_exists && !type_exists)
1160 })?;
1161
1162 let mut full_name = prefix.to_vec();
1163 full_name.push(item_name);
1164 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1165 let qualified_name = scx.allocate_qualified_name(full_name)?;
1166 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1167
1168 UnresolvedItemName::from(full_name.clone())
1169 }
1170 };
1171
1172 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1173
1174 let mut progress_with_options: Vec<_> = with_options
1176 .iter()
1177 .filter_map(|opt| match opt.name {
1178 CreateSourceOptionName::TimestampInterval => None,
1179 CreateSourceOptionName::RetainHistory => Some(CreateSubsourceOption {
1180 name: CreateSubsourceOptionName::RetainHistory,
1181 value: opt.value.clone(),
1182 }),
1183 })
1184 .collect();
1185 progress_with_options.push(CreateSubsourceOption {
1186 name: CreateSubsourceOptionName::Progress,
1187 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1188 });
1189 let create_progress_subsource_stmt = CreateSubsourceStatement {
1190 name,
1191 columns,
1192 of_source: None,
1196 constraints,
1197 if_not_exists: false,
1198 with_options: progress_with_options,
1199 };
1200
1201 purify_source_format(
1202 &catalog,
1203 format,
1204 &format_options,
1205 envelope,
1206 storage_configuration,
1207 )
1208 .await?;
1209
1210 Ok(PurifiedStatement::PurifiedCreateSource {
1211 create_progress_subsource_stmt,
1212 create_source_stmt,
1213 subsources: requested_subsource_map,
1214 available_source_references: retrieved_source_references.available_source_references(),
1215 })
1216}
1217
1218async fn purify_alter_source(
1221 catalog: impl SessionCatalog,
1222 stmt: AlterSourceStatement<Aug>,
1223 storage_configuration: &StorageConfiguration,
1224) -> Result<PurifiedStatement, PlanError> {
1225 let scx = StatementContext::new(None, &catalog);
1226 let AlterSourceStatement {
1227 source_name: unresolved_source_name,
1228 action,
1229 if_exists,
1230 } = stmt;
1231
1232 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1234 Ok(item) => item,
1235 Err(_) if if_exists => {
1236 return Ok(PurifiedStatement::PurifiedAlterSource {
1237 alter_source_stmt: AlterSourceStatement {
1238 source_name: unresolved_source_name,
1239 action,
1240 if_exists,
1241 },
1242 });
1243 }
1244 Err(e) => return Err(e),
1245 };
1246
1247 let desc = match item.source_desc()? {
1249 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1250 None => {
1251 sql_bail!("cannot ALTER this type of source")
1252 }
1253 };
1254
1255 let source_name = item.name();
1256
1257 let resolved_source_name = ResolvedItemName::Item {
1258 id: item.id(),
1259 qualifiers: item.name().qualifiers.clone(),
1260 full_name: scx.catalog.resolve_full_name(source_name),
1261 print_id: true,
1262 version: RelationVersionSelector::Latest,
1263 };
1264
1265 let partial_name = scx.catalog.minimal_qualification(source_name);
1266
1267 match action {
1268 AlterSourceAction::AddSubsources {
1269 external_references,
1270 options,
1271 } => {
1272 if scx.catalog.system_vars().enable_create_table_from_source()
1273 && scx.catalog.system_vars().force_source_table_syntax()
1274 {
1275 Err(PlanError::UseTablesForSources(
1276 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1277 ))?;
1278 }
1279
1280 purify_alter_source_add_subsources(
1281 external_references,
1282 options,
1283 desc,
1284 partial_name,
1285 unresolved_source_name,
1286 resolved_source_name,
1287 storage_configuration,
1288 )
1289 .await
1290 }
1291 AlterSourceAction::RefreshReferences => {
1292 purify_alter_source_refresh_references(
1293 desc,
1294 resolved_source_name,
1295 storage_configuration,
1296 )
1297 .await
1298 }
1299 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1300 alter_source_stmt: AlterSourceStatement {
1301 source_name: unresolved_source_name,
1302 action,
1303 if_exists,
1304 },
1305 }),
1306 }
1307}
1308
1309async fn purify_alter_source_add_subsources(
1312 external_references: Vec<ExternalReferenceExport>,
1313 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1314 desc: SourceDesc,
1315 partial_source_name: PartialItemName,
1316 unresolved_source_name: UnresolvedItemName,
1317 resolved_source_name: ResolvedItemName,
1318 storage_configuration: &StorageConfiguration,
1319) -> Result<PurifiedStatement, PlanError> {
1320 let connection_id = match &desc.connection {
1322 GenericSourceConnection::Postgres(c) => c.connection_id,
1323 GenericSourceConnection::MySql(c) => c.connection_id,
1324 GenericSourceConnection::SqlServer(c) => c.connection_id,
1325 _ => sql_bail!(
1326 "source {} does not support ALTER SOURCE.",
1327 partial_source_name
1328 ),
1329 };
1330
1331 let connection_name = desc.connection.name();
1332
1333 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1334 text_columns,
1335 exclude_columns,
1336 details,
1337 seen: _,
1338 } = options.clone().try_into()?;
1339 assert_none!(details, "details cannot be explicitly set");
1340
1341 let mut requested_subsource_map = BTreeMap::new();
1342
1343 match desc.connection {
1344 GenericSourceConnection::Postgres(pg_source_connection) => {
1345 let pg_connection = &pg_source_connection.connection;
1347
1348 let client = pg_connection
1349 .validate(connection_id, storage_configuration)
1350 .await?;
1351
1352 if !exclude_columns.is_empty() {
1353 sql_bail!(
1354 "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1355 partial_source_name,
1356 connection_name
1357 )
1358 }
1359
1360 let reference_client = SourceReferenceClient::Postgres {
1361 client: &client,
1362 publication: &pg_source_connection.publication,
1363 database: &pg_connection.database,
1364 };
1365 let retrieved_source_references = reference_client.get_source_references().await?;
1366
1367 let postgres::PurifiedSourceExports {
1368 source_exports: subsources,
1369 normalized_text_columns,
1370 } = postgres::purify_source_exports(
1371 &client,
1372 &retrieved_source_references,
1373 &Some(ExternalReferences::SubsetTables(external_references)),
1374 text_columns,
1375 &unresolved_source_name,
1376 &SourceReferencePolicy::Required,
1377 )
1378 .await?;
1379
1380 if let Some(text_cols_option) = options
1381 .iter_mut()
1382 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1383 {
1384 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1385 }
1386
1387 requested_subsource_map.extend(subsources);
1388 }
1389 GenericSourceConnection::MySql(mysql_source_connection) => {
1390 let mysql_connection = &mysql_source_connection.connection;
1391 let config = mysql_connection
1392 .config(
1393 &storage_configuration.connection_context.secrets_reader,
1394 storage_configuration,
1395 InTask::No,
1396 )
1397 .await?;
1398
1399 let mut conn = config
1400 .connect(
1401 "mysql purification",
1402 &storage_configuration.connection_context.ssh_tunnel_manager,
1403 )
1404 .await?;
1405
1406 let initial_gtid_set =
1409 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1410
1411 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1412
1413 let reference_client = SourceReferenceClient::MySql {
1414 conn: &mut conn,
1415 include_system_schemas: mysql::references_system_schemas(&requested_references),
1416 };
1417 let retrieved_source_references = reference_client.get_source_references().await?;
1418
1419 let mysql::PurifiedSourceExports {
1420 source_exports: subsources,
1421 normalized_text_columns,
1422 normalized_exclude_columns,
1423 } = mysql::purify_source_exports(
1424 &mut conn,
1425 &retrieved_source_references,
1426 &requested_references,
1427 text_columns,
1428 exclude_columns,
1429 &unresolved_source_name,
1430 initial_gtid_set,
1431 &SourceReferencePolicy::Required,
1432 )
1433 .await?;
1434 requested_subsource_map.extend(subsources);
1435
1436 if let Some(text_cols_option) = options
1438 .iter_mut()
1439 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1440 {
1441 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1442 }
1443 if let Some(ignore_cols_option) = options
1444 .iter_mut()
1445 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1446 {
1447 ignore_cols_option.value =
1448 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1449 }
1450 }
1451 GenericSourceConnection::SqlServer(sql_server_source) => {
1452 let sql_server_connection = &sql_server_source.connection;
1454 let config = sql_server_connection
1455 .resolve_config(
1456 &storage_configuration.connection_context.secrets_reader,
1457 storage_configuration,
1458 InTask::No,
1459 )
1460 .await?;
1461 let mut client = mz_sql_server_util::Client::connect(config).await?;
1462
1463 let database = sql_server_connection.database.clone().into();
1465 let source_references = SourceReferenceClient::SqlServer {
1466 client: &mut client,
1467 database: Arc::clone(&database),
1468 }
1469 .get_source_references()
1470 .await?;
1471 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1472
1473 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1474 .get(storage_configuration.config_set());
1475
1476 let result = sql_server::purify_source_exports(
1477 &*database,
1478 &mut client,
1479 &source_references,
1480 &requested_references,
1481 &text_columns,
1482 &exclude_columns,
1483 &unresolved_source_name,
1484 timeout,
1485 &SourceReferencePolicy::Required,
1486 )
1487 .await;
1488 let sql_server::PurifiedSourceExports {
1489 source_exports,
1490 normalized_text_columns,
1491 normalized_excl_columns,
1492 } = result?;
1493
1494 requested_subsource_map.extend(source_exports);
1496
1497 if let Some(text_cols_option) = options
1499 .iter_mut()
1500 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1501 {
1502 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1503 }
1504 if let Some(ignore_cols_option) = options
1505 .iter_mut()
1506 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1507 {
1508 ignore_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1509 }
1510 }
1511 _ => unreachable!(),
1512 };
1513
1514 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1515 source_name: resolved_source_name,
1516 options,
1517 subsources: requested_subsource_map,
1518 })
1519}
1520
1521async fn purify_alter_source_refresh_references(
1522 desc: SourceDesc,
1523 resolved_source_name: ResolvedItemName,
1524 storage_configuration: &StorageConfiguration,
1525) -> Result<PurifiedStatement, PlanError> {
1526 let retrieved_source_references = match desc.connection {
1527 GenericSourceConnection::Postgres(pg_source_connection) => {
1528 let pg_connection = &pg_source_connection.connection;
1530
1531 let config = pg_connection
1532 .config(
1533 &storage_configuration.connection_context.secrets_reader,
1534 storage_configuration,
1535 InTask::No,
1536 )
1537 .await?;
1538
1539 let client = config
1540 .connect(
1541 "postgres_purification",
1542 &storage_configuration.connection_context.ssh_tunnel_manager,
1543 )
1544 .await?;
1545 let reference_client = SourceReferenceClient::Postgres {
1546 client: &client,
1547 publication: &pg_source_connection.publication,
1548 database: &pg_connection.database,
1549 };
1550 reference_client.get_source_references().await?
1551 }
1552 GenericSourceConnection::MySql(mysql_source_connection) => {
1553 let mysql_connection = &mysql_source_connection.connection;
1554 let config = mysql_connection
1555 .config(
1556 &storage_configuration.connection_context.secrets_reader,
1557 storage_configuration,
1558 InTask::No,
1559 )
1560 .await?;
1561
1562 let mut conn = config
1563 .connect(
1564 "mysql purification",
1565 &storage_configuration.connection_context.ssh_tunnel_manager,
1566 )
1567 .await?;
1568
1569 let reference_client = SourceReferenceClient::MySql {
1570 conn: &mut conn,
1571 include_system_schemas: false,
1572 };
1573 reference_client.get_source_references().await?
1574 }
1575 GenericSourceConnection::SqlServer(sql_server_source) => {
1576 let sql_server_connection = &sql_server_source.connection;
1578 let config = sql_server_connection
1579 .resolve_config(
1580 &storage_configuration.connection_context.secrets_reader,
1581 storage_configuration,
1582 InTask::No,
1583 )
1584 .await?;
1585 let mut client = mz_sql_server_util::Client::connect(config).await?;
1586
1587 let source_references = SourceReferenceClient::SqlServer {
1589 client: &mut client,
1590 database: sql_server_connection.database.clone().into(),
1591 }
1592 .get_source_references()
1593 .await?;
1594 source_references
1595 }
1596 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1597 let reference_client = SourceReferenceClient::LoadGenerator {
1598 generator: &load_gen_connection.load_generator,
1599 };
1600 reference_client.get_source_references().await?
1601 }
1602 GenericSourceConnection::Kafka(kafka_conn) => {
1603 let reference_client = SourceReferenceClient::Kafka {
1604 topic: &kafka_conn.topic,
1605 };
1606 reference_client.get_source_references().await?
1607 }
1608 };
1609 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1610 source_name: resolved_source_name,
1611 available_source_references: retrieved_source_references.available_source_references(),
1612 })
1613}
1614
1615async fn purify_create_table_from_source(
1616 catalog: impl SessionCatalog,
1617 mut stmt: CreateTableFromSourceStatement<Aug>,
1618 storage_configuration: &StorageConfiguration,
1619) -> Result<PurifiedStatement, PlanError> {
1620 let scx = StatementContext::new(None, &catalog);
1621 let CreateTableFromSourceStatement {
1622 name: _,
1623 columns,
1624 constraints,
1625 source: source_name,
1626 if_not_exists: _,
1627 external_reference,
1628 format,
1629 envelope,
1630 include_metadata: _,
1631 with_options,
1632 } = &mut stmt;
1633
1634 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1636 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1637 }
1638 if !constraints.is_empty() {
1639 sql_bail!(
1640 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1641 );
1642 }
1643
1644 let item = match scx.get_item_by_resolved_name(source_name) {
1646 Ok(item) => item,
1647 Err(e) => return Err(e),
1648 };
1649
1650 let desc = match item.source_desc()? {
1652 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1653 None => {
1654 sql_bail!("cannot ALTER this type of source")
1655 }
1656 };
1657 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1658 let qualified_source_name = item.name();
1659 let connection_name = desc.connection.name();
1660
1661 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1662 text_columns,
1663 exclude_columns,
1664 retain_history: _,
1665 details,
1666 partition_by: _,
1667 seen: _,
1668 } = with_options.clone().try_into()?;
1669 assert_none!(details, "details cannot be explicitly set");
1670
1671 let qualified_text_columns = text_columns
1675 .iter()
1676 .map(|col| {
1677 UnresolvedItemName(
1678 external_reference
1679 .as_ref()
1680 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1681 .unwrap_or_else(|| vec![col.clone()]),
1682 )
1683 })
1684 .collect_vec();
1685 let qualified_exclude_columns = exclude_columns
1686 .iter()
1687 .map(|col| {
1688 UnresolvedItemName(
1689 external_reference
1690 .as_ref()
1691 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1692 .unwrap_or_else(|| vec![col.clone()]),
1693 )
1694 })
1695 .collect_vec();
1696
1697 let mut format_options = SourceFormatOptions::Default;
1699
1700 let retrieved_source_references: RetrievedSourceReferences;
1701
1702 let requested_references = external_reference.as_ref().map(|ref_name| {
1703 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1704 reference: ref_name.clone(),
1705 alias: None,
1706 }])
1707 });
1708
1709 let purified_export = match desc.connection {
1712 GenericSourceConnection::Postgres(pg_source_connection) => {
1713 let pg_connection = &pg_source_connection.connection;
1715
1716 let client = pg_connection
1717 .validate(pg_source_connection.connection_id, storage_configuration)
1718 .await?;
1719
1720 if !exclude_columns.is_empty() {
1722 sql_bail!(
1723 "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1724 scx.catalog.minimal_qualification(qualified_source_name),
1725 connection_name
1726 )
1727 }
1728
1729 let reference_client = SourceReferenceClient::Postgres {
1730 client: &client,
1731 publication: &pg_source_connection.publication,
1732 database: &pg_connection.database,
1733 };
1734 retrieved_source_references = reference_client.get_source_references().await?;
1735
1736 let postgres::PurifiedSourceExports {
1737 source_exports,
1738 normalized_text_columns: _,
1742 } = postgres::purify_source_exports(
1743 &client,
1744 &retrieved_source_references,
1745 &requested_references,
1746 qualified_text_columns,
1747 &unresolved_source_name,
1748 &SourceReferencePolicy::Required,
1749 )
1750 .await?;
1751 let (_, purified_export) = source_exports.into_element();
1753 purified_export
1754 }
1755 GenericSourceConnection::MySql(mysql_source_connection) => {
1756 let mysql_connection = &mysql_source_connection.connection;
1757 let config = mysql_connection
1758 .config(
1759 &storage_configuration.connection_context.secrets_reader,
1760 storage_configuration,
1761 InTask::No,
1762 )
1763 .await?;
1764
1765 let mut conn = config
1766 .connect(
1767 "mysql purification",
1768 &storage_configuration.connection_context.ssh_tunnel_manager,
1769 )
1770 .await?;
1771
1772 let initial_gtid_set =
1775 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1776
1777 let reference_client = SourceReferenceClient::MySql {
1778 conn: &mut conn,
1779 include_system_schemas: mysql::references_system_schemas(&requested_references),
1780 };
1781 retrieved_source_references = reference_client.get_source_references().await?;
1782
1783 let mysql::PurifiedSourceExports {
1784 source_exports,
1785 normalized_text_columns: _,
1789 normalized_exclude_columns: _,
1790 } = mysql::purify_source_exports(
1791 &mut conn,
1792 &retrieved_source_references,
1793 &requested_references,
1794 qualified_text_columns,
1795 qualified_exclude_columns,
1796 &unresolved_source_name,
1797 initial_gtid_set,
1798 &SourceReferencePolicy::Required,
1799 )
1800 .await?;
1801 let (_, purified_export) = source_exports.into_element();
1803 purified_export
1804 }
1805 GenericSourceConnection::SqlServer(sql_server_source) => {
1806 let connection = sql_server_source.connection;
1807 let config = connection
1808 .resolve_config(
1809 &storage_configuration.connection_context.secrets_reader,
1810 storage_configuration,
1811 InTask::No,
1812 )
1813 .await?;
1814 let mut client = mz_sql_server_util::Client::connect(config).await?;
1815
1816 let database: Arc<str> = connection.database.into();
1817 let reference_client = SourceReferenceClient::SqlServer {
1818 client: &mut client,
1819 database: Arc::clone(&database),
1820 };
1821 retrieved_source_references = reference_client.get_source_references().await?;
1822 tracing::debug!(?retrieved_source_references, "got source references");
1823
1824 let timeout = mz_storage_types::sources::sql_server::MAX_LSN_WAIT
1825 .get(storage_configuration.config_set());
1826
1827 let purified_source_exports = sql_server::purify_source_exports(
1828 &*database,
1829 &mut client,
1830 &retrieved_source_references,
1831 &requested_references,
1832 &qualified_text_columns,
1833 &qualified_exclude_columns,
1834 &unresolved_source_name,
1835 timeout,
1836 &SourceReferencePolicy::Required,
1837 )
1838 .await?;
1839
1840 let (_, purified_export) = purified_source_exports.source_exports.into_element();
1842 purified_export
1843 }
1844 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1845 let reference_client = SourceReferenceClient::LoadGenerator {
1846 generator: &load_gen_connection.load_generator,
1847 };
1848 retrieved_source_references = reference_client.get_source_references().await?;
1849
1850 let requested_exports = retrieved_source_references
1851 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1852 let export = requested_exports.into_element();
1854 PurifiedSourceExport {
1855 external_reference: export.external_reference,
1856 details: PurifiedExportDetails::LoadGenerator {
1857 table: export
1858 .meta
1859 .load_generator_desc()
1860 .expect("is loadgen")
1861 .clone(),
1862 output: export
1863 .meta
1864 .load_generator_output()
1865 .expect("is loadgen")
1866 .clone(),
1867 },
1868 }
1869 }
1870 GenericSourceConnection::Kafka(kafka_conn) => {
1871 let reference_client = SourceReferenceClient::Kafka {
1872 topic: &kafka_conn.topic,
1873 };
1874 retrieved_source_references = reference_client.get_source_references().await?;
1875 let requested_exports = retrieved_source_references
1876 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1877 let export = requested_exports.into_element();
1879
1880 format_options = SourceFormatOptions::Kafka {
1881 topic: kafka_conn.topic.clone(),
1882 };
1883 PurifiedSourceExport {
1884 external_reference: export.external_reference,
1885 details: PurifiedExportDetails::Kafka {},
1886 }
1887 }
1888 };
1889
1890 purify_source_format(
1891 &catalog,
1892 format,
1893 &format_options,
1894 envelope,
1895 storage_configuration,
1896 )
1897 .await?;
1898
1899 *external_reference = Some(purified_export.external_reference.clone());
1902
1903 match &purified_export.details {
1905 PurifiedExportDetails::Postgres { .. } => {
1906 let mut unsupported_cols = vec![];
1907 let postgres::PostgresExportStatementValues {
1908 columns: gen_columns,
1909 constraints: gen_constraints,
1910 text_columns: gen_text_columns,
1911 details: gen_details,
1912 external_reference: _,
1913 } = postgres::generate_source_export_statement_values(
1914 &scx,
1915 purified_export,
1916 &mut unsupported_cols,
1917 )?;
1918 if !unsupported_cols.is_empty() {
1919 unsupported_cols.sort();
1920 Err(PgSourcePurificationError::UnrecognizedTypes {
1921 cols: unsupported_cols,
1922 })?;
1923 }
1924
1925 if let Some(text_cols_option) = with_options
1926 .iter_mut()
1927 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1928 {
1929 match gen_text_columns {
1930 Some(gen_text_columns) => {
1931 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1932 }
1933 None => soft_panic_or_log!(
1934 "text_columns should be Some if text_cols_option is present"
1935 ),
1936 }
1937 }
1938 match columns {
1939 TableFromSourceColumns::Defined(_) => unreachable!(),
1940 TableFromSourceColumns::NotSpecified => {
1941 *columns = TableFromSourceColumns::Defined(gen_columns);
1942 *constraints = gen_constraints;
1943 }
1944 TableFromSourceColumns::Named(_) => {
1945 sql_bail!("columns cannot be named for Postgres sources")
1946 }
1947 }
1948 with_options.push(TableFromSourceOption {
1949 name: TableFromSourceOptionName::Details,
1950 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1951 gen_details.into_proto().encode_to_vec(),
1952 )))),
1953 })
1954 }
1955 PurifiedExportDetails::MySql { .. } => {
1956 let mysql::MySqlExportStatementValues {
1957 columns: gen_columns,
1958 constraints: gen_constraints,
1959 text_columns: gen_text_columns,
1960 exclude_columns: gen_exclude_columns,
1961 details: gen_details,
1962 external_reference: _,
1963 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
1964
1965 if let Some(text_cols_option) = with_options
1966 .iter_mut()
1967 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
1968 {
1969 match gen_text_columns {
1970 Some(gen_text_columns) => {
1971 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
1972 }
1973 None => soft_panic_or_log!(
1974 "text_columns should be Some if text_cols_option is present"
1975 ),
1976 }
1977 }
1978 if let Some(ignore_cols_option) = with_options
1979 .iter_mut()
1980 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
1981 {
1982 match gen_exclude_columns {
1983 Some(gen_exclude_columns) => {
1984 ignore_cols_option.value =
1985 Some(WithOptionValue::Sequence(gen_exclude_columns))
1986 }
1987 None => soft_panic_or_log!(
1988 "exclude_columns should be Some if ignore_cols_option is present"
1989 ),
1990 }
1991 }
1992 match columns {
1993 TableFromSourceColumns::Defined(_) => unreachable!(),
1994 TableFromSourceColumns::NotSpecified => {
1995 *columns = TableFromSourceColumns::Defined(gen_columns);
1996 *constraints = gen_constraints;
1997 }
1998 TableFromSourceColumns::Named(_) => {
1999 sql_bail!("columns cannot be named for MySQL sources")
2000 }
2001 }
2002 with_options.push(TableFromSourceOption {
2003 name: TableFromSourceOptionName::Details,
2004 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2005 gen_details.into_proto().encode_to_vec(),
2006 )))),
2007 })
2008 }
2009 PurifiedExportDetails::SqlServer { .. } => {
2010 let sql_server::SqlServerExportStatementValues {
2011 columns: gen_columns,
2012 constraints: gen_constraints,
2013 text_columns: gen_text_columns,
2014 excl_columns: gen_excl_columns,
2015 details: gen_details,
2016 external_reference: _,
2017 } = sql_server::generate_source_export_statement_values(&scx, purified_export)?;
2018
2019 if let Some(text_cols_option) = with_options
2020 .iter_mut()
2021 .find(|opt| opt.name == TableFromSourceOptionName::TextColumns)
2022 {
2023 match gen_text_columns {
2024 Some(gen_text_columns) => {
2025 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns))
2026 }
2027 None => soft_panic_or_log!(
2028 "text_columns should be Some if text_cols_option is present"
2029 ),
2030 }
2031 }
2032 if let Some(exclude_cols_option) = with_options
2033 .iter_mut()
2034 .find(|opt| opt.name == TableFromSourceOptionName::ExcludeColumns)
2035 {
2036 match gen_excl_columns {
2037 Some(gen_excl_columns) => {
2038 exclude_cols_option.value =
2039 Some(WithOptionValue::Sequence(gen_excl_columns))
2040 }
2041 None => soft_panic_or_log!(
2042 "excl_columns should be Some if excl_cols_option is present"
2043 ),
2044 }
2045 }
2046
2047 match columns {
2048 TableFromSourceColumns::NotSpecified => {
2049 *columns = TableFromSourceColumns::Defined(gen_columns);
2050 *constraints = gen_constraints;
2051 }
2052 TableFromSourceColumns::Named(_) => {
2053 sql_bail!("columns cannot be named for SQL Server sources")
2054 }
2055 TableFromSourceColumns::Defined(_) => unreachable!(),
2056 }
2057
2058 with_options.push(TableFromSourceOption {
2059 name: TableFromSourceOptionName::Details,
2060 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2061 gen_details.into_proto().encode_to_vec(),
2062 )))),
2063 })
2064 }
2065 PurifiedExportDetails::LoadGenerator { .. } => {
2066 let (desc, output) = match purified_export.details {
2067 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2068 _ => unreachable!("purified export details must be load generator"),
2069 };
2070 if let Some(desc) = desc {
2075 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2076 match columns {
2077 TableFromSourceColumns::Defined(_) => unreachable!(),
2078 TableFromSourceColumns::NotSpecified => {
2079 *columns = TableFromSourceColumns::Defined(gen_columns);
2080 *constraints = gen_constraints;
2081 }
2082 TableFromSourceColumns::Named(_) => {
2083 sql_bail!("columns cannot be named for multi-output load generator sources")
2084 }
2085 }
2086 }
2087 let details = SourceExportStatementDetails::LoadGenerator { output };
2088 with_options.push(TableFromSourceOption {
2089 name: TableFromSourceOptionName::Details,
2090 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2091 details.into_proto().encode_to_vec(),
2092 )))),
2093 })
2094 }
2095 PurifiedExportDetails::Kafka {} => {
2096 let details = SourceExportStatementDetails::Kafka {};
2100 with_options.push(TableFromSourceOption {
2101 name: TableFromSourceOptionName::Details,
2102 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2103 details.into_proto().encode_to_vec(),
2104 )))),
2105 })
2106 }
2107 };
2108
2109 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2113}
2114
2115enum SourceFormatOptions {
2116 Default,
2117 Kafka { topic: String },
2118}
2119
2120async fn purify_source_format(
2121 catalog: &dyn SessionCatalog,
2122 format: &mut Option<FormatSpecifier<Aug>>,
2123 options: &SourceFormatOptions,
2124 envelope: &Option<SourceEnvelope>,
2125 storage_configuration: &StorageConfiguration,
2126) -> Result<(), PlanError> {
2127 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2128 && !matches!(options, SourceFormatOptions::Kafka { .. })
2129 {
2130 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2131 }
2132
2133 match format.as_mut() {
2134 None => {}
2135 Some(FormatSpecifier::Bare(format)) => {
2136 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2137 .await?;
2138 }
2139
2140 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2141 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2142 .await?;
2143 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2144 .await?;
2145 }
2146 }
2147 Ok(())
2148}
2149
2150async fn purify_source_format_single(
2151 catalog: &dyn SessionCatalog,
2152 format: &mut Format<Aug>,
2153 options: &SourceFormatOptions,
2154 envelope: &Option<SourceEnvelope>,
2155 storage_configuration: &StorageConfiguration,
2156) -> Result<(), PlanError> {
2157 match format {
2158 Format::Avro(schema) => match schema {
2159 AvroSchema::Csr { csr_connection } => {
2160 purify_csr_connection_avro(
2161 catalog,
2162 options,
2163 csr_connection,
2164 envelope,
2165 storage_configuration,
2166 )
2167 .await?
2168 }
2169 AvroSchema::InlineSchema { .. } => {}
2170 },
2171 Format::Protobuf(schema) => match schema {
2172 ProtobufSchema::Csr { csr_connection } => {
2173 purify_csr_connection_proto(
2174 catalog,
2175 options,
2176 csr_connection,
2177 envelope,
2178 storage_configuration,
2179 )
2180 .await?;
2181 }
2182 ProtobufSchema::InlineSchema { .. } => {}
2183 },
2184 Format::Bytes
2185 | Format::Regex(_)
2186 | Format::Json { .. }
2187 | Format::Text
2188 | Format::Csv { .. } => (),
2189 }
2190 Ok(())
2191}
2192
2193pub fn generate_subsource_statements(
2194 scx: &StatementContext,
2195 source_name: ResolvedItemName,
2196 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2197) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2198 if subsources.is_empty() {
2200 return Ok(vec![]);
2201 }
2202 let (_, purified_export) = subsources.iter().next().unwrap();
2203
2204 let statements = match &purified_export.details {
2205 PurifiedExportDetails::Postgres { .. } => {
2206 crate::pure::postgres::generate_create_subsource_statements(
2207 scx,
2208 source_name,
2209 subsources,
2210 )?
2211 }
2212 PurifiedExportDetails::MySql { .. } => {
2213 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2214 }
2215 PurifiedExportDetails::SqlServer { .. } => {
2216 crate::pure::sql_server::generate_create_subsource_statements(
2217 scx,
2218 source_name,
2219 subsources,
2220 )?
2221 }
2222 PurifiedExportDetails::LoadGenerator { .. } => {
2223 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2224 for (subsource_name, purified_export) in subsources {
2225 let (desc, output) = match purified_export.details {
2226 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2227 _ => unreachable!("purified export details must be load generator"),
2228 };
2229 let desc =
2230 desc.expect("subsources cannot be generated for single-output load generators");
2231
2232 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2233 let details = SourceExportStatementDetails::LoadGenerator { output };
2234 let subsource = CreateSubsourceStatement {
2236 name: subsource_name,
2237 columns,
2238 of_source: Some(source_name.clone()),
2239 constraints: table_constraints,
2244 if_not_exists: false,
2245 with_options: vec![
2246 CreateSubsourceOption {
2247 name: CreateSubsourceOptionName::ExternalReference,
2248 value: Some(WithOptionValue::UnresolvedItemName(
2249 purified_export.external_reference,
2250 )),
2251 },
2252 CreateSubsourceOption {
2253 name: CreateSubsourceOptionName::Details,
2254 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2255 details.into_proto().encode_to_vec(),
2256 )))),
2257 },
2258 ],
2259 };
2260 subsource_stmts.push(subsource);
2261 }
2262
2263 subsource_stmts
2264 }
2265 PurifiedExportDetails::Kafka { .. } => {
2266 assert!(
2270 subsources.is_empty(),
2271 "Kafka sources do not produce data-bearing subsources"
2272 );
2273 vec![]
2274 }
2275 };
2276 Ok(statements)
2277}
2278
2279async fn purify_csr_connection_proto(
2280 catalog: &dyn SessionCatalog,
2281 options: &SourceFormatOptions,
2282 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2283 envelope: &Option<SourceEnvelope>,
2284 storage_configuration: &StorageConfiguration,
2285) -> Result<(), PlanError> {
2286 let SourceFormatOptions::Kafka { topic } = options else {
2287 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2288 };
2289
2290 let CsrConnectionProtobuf {
2291 seed,
2292 connection: CsrConnection {
2293 connection,
2294 options: _,
2295 },
2296 } = csr_connection;
2297 match seed {
2298 None => {
2299 let scx = StatementContext::new(None, &*catalog);
2300
2301 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2302 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2303 _ => sql_bail!("{} is not a schema registry connection", connection),
2304 };
2305
2306 let ccsr_client = ccsr_connection
2307 .connect(storage_configuration, InTask::No)
2308 .await
2309 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2310
2311 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2312 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2313 .await
2314 .ok();
2315
2316 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2317 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2318 }
2319
2320 *seed = Some(CsrSeedProtobuf { value, key });
2321 }
2322 Some(_) => (),
2323 }
2324
2325 Ok(())
2326}
2327
2328async fn purify_csr_connection_avro(
2329 catalog: &dyn SessionCatalog,
2330 options: &SourceFormatOptions,
2331 csr_connection: &mut CsrConnectionAvro<Aug>,
2332 envelope: &Option<SourceEnvelope>,
2333 storage_configuration: &StorageConfiguration,
2334) -> Result<(), PlanError> {
2335 let SourceFormatOptions::Kafka { topic } = options else {
2336 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2337 };
2338
2339 let CsrConnectionAvro {
2340 connection: CsrConnection { connection, .. },
2341 seed,
2342 key_strategy,
2343 value_strategy,
2344 } = csr_connection;
2345 if seed.is_none() {
2346 let scx = StatementContext::new(None, &*catalog);
2347 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2348 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2349 _ => sql_bail!("{} is not a schema registry connection", connection),
2350 };
2351 let ccsr_client = csr_connection
2352 .connect(storage_configuration, InTask::No)
2353 .await
2354 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2355
2356 let Schema {
2357 key_schema,
2358 value_schema,
2359 } = get_remote_csr_schema(
2360 &ccsr_client,
2361 key_strategy.clone().unwrap_or_default(),
2362 value_strategy.clone().unwrap_or_default(),
2363 topic,
2364 )
2365 .await?;
2366 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2367 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2368 }
2369
2370 *seed = Some(CsrSeedAvro {
2371 key_schema,
2372 value_schema,
2373 })
2374 }
2375
2376 Ok(())
2377}
2378
2379#[derive(Debug)]
2380pub struct Schema {
2381 pub key_schema: Option<String>,
2382 pub value_schema: String,
2383}
2384
2385async fn get_schema_with_strategy(
2386 client: &Client,
2387 strategy: ReaderSchemaSelectionStrategy,
2388 subject: &str,
2389) -> Result<Option<String>, PlanError> {
2390 match strategy {
2391 ReaderSchemaSelectionStrategy::Latest => {
2392 match client.get_schema_by_subject(subject).await {
2393 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2394 Err(GetBySubjectError::SubjectNotFound)
2395 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2396 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2397 schema_lookup: format!("subject {}", subject.quoted()),
2398 cause: Arc::new(e),
2399 }),
2400 }
2401 }
2402 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2403 ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2404 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2405 Err(GetByIdError::SchemaNotFound) => Ok(None),
2406 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2407 schema_lookup: format!("ID {}", id),
2408 cause: Arc::new(e),
2409 }),
2410 },
2411 }
2412}
2413
2414async fn get_remote_csr_schema(
2415 ccsr_client: &mz_ccsr::Client,
2416 key_strategy: ReaderSchemaSelectionStrategy,
2417 value_strategy: ReaderSchemaSelectionStrategy,
2418 topic: &str,
2419) -> Result<Schema, PlanError> {
2420 let value_schema_name = format!("{}-value", topic);
2421 let value_schema =
2422 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2423 let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2424 let subject = format!("{}-key", topic);
2425 let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2426 Ok(Schema {
2427 key_schema,
2428 value_schema,
2429 })
2430}
2431
2432async fn compile_proto(
2434 subject_name: &String,
2435 ccsr_client: &Client,
2436) -> Result<CsrSeedProtobufSchema, PlanError> {
2437 let (primary_subject, dependency_subjects) = ccsr_client
2438 .get_subject_and_references(subject_name)
2439 .await
2440 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2441 schema_lookup: format!("subject {}", subject_name.quoted()),
2442 cause: Arc::new(e),
2443 })?;
2444
2445 let mut source_tree = VirtualSourceTree::new();
2447 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2448 source_tree.as_mut().add_file(
2449 Path::new(&subject.name),
2450 subject.schema.raw.as_bytes().to_vec(),
2451 );
2452 }
2453 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2454 let fds = db
2455 .as_mut()
2456 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2457 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2458
2459 let primary_fd = fds.file(0);
2461 let message_name = match primary_fd.message_type_size() {
2462 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2463 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2464 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2465 };
2466
2467 let bytes = &fds
2469 .serialize()
2470 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2471 let mut schema = String::new();
2472 strconv::format_bytes(&mut schema, bytes);
2473
2474 Ok(CsrSeedProtobufSchema {
2475 schema,
2476 message_name,
2477 })
2478}
2479
2480const MZ_NOW_NAME: &str = "mz_now";
2481const MZ_NOW_SCHEMA: &str = "mz_catalog";
2482
2483pub fn purify_create_materialized_view_options(
2489 catalog: impl SessionCatalog,
2490 mz_now: Option<Timestamp>,
2491 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2492 resolved_ids: &mut ResolvedIds,
2493) {
2494 let (mz_now_id, mz_now_expr) = {
2497 let item = catalog
2498 .resolve_function(&PartialItemName {
2499 database: None,
2500 schema: Some(MZ_NOW_SCHEMA.to_string()),
2501 item: MZ_NOW_NAME.to_string(),
2502 })
2503 .expect("we should be able to resolve mz_now");
2504 (
2505 item.id(),
2506 Expr::Function(Function {
2507 name: ResolvedItemName::Item {
2508 id: item.id(),
2509 qualifiers: item.name().qualifiers.clone(),
2510 full_name: catalog.resolve_full_name(item.name()),
2511 print_id: false,
2512 version: RelationVersionSelector::Latest,
2513 },
2514 args: FunctionArgs::Args {
2515 args: Vec::new(),
2516 order_by: Vec::new(),
2517 },
2518 filter: None,
2519 over: None,
2520 distinct: false,
2521 }),
2522 )
2523 };
2524 let (mz_timestamp_id, mz_timestamp_type) = {
2526 let item = catalog.get_system_type("mz_timestamp");
2527 let full_name = catalog.resolve_full_name(item.name());
2528 (
2529 item.id(),
2530 ResolvedDataType::Named {
2531 id: item.id(),
2532 qualifiers: item.name().qualifiers.clone(),
2533 full_name,
2534 modifiers: vec![],
2535 print_id: true,
2536 },
2537 )
2538 };
2539
2540 let mut introduced_mz_timestamp = false;
2541
2542 for option in cmvs.with_options.iter_mut() {
2543 if matches!(
2545 option.value,
2546 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2547 ) {
2548 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2549 RefreshAtOptionValue {
2550 time: mz_now_expr.clone(),
2551 },
2552 )));
2553 }
2554
2555 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2557 RefreshEveryOptionValue { aligned_to, .. },
2558 ))) = &mut option.value
2559 {
2560 if aligned_to.is_none() {
2561 *aligned_to = Some(mz_now_expr.clone());
2562 }
2563 }
2564
2565 match &mut option.value {
2568 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2569 time,
2570 }))) => {
2571 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2572 visitor.visit_expr_mut(time);
2573 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2574 }
2575 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2576 RefreshEveryOptionValue {
2577 interval: _,
2578 aligned_to: Some(aligned_to),
2579 },
2580 ))) => {
2581 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2582 visitor.visit_expr_mut(aligned_to);
2583 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2584 }
2585 _ => {}
2586 }
2587 }
2588
2589 if !cmvs.with_options.iter().any(|o| {
2591 matches!(
2592 o,
2593 MaterializedViewOption {
2594 value: Some(WithOptionValue::Refresh(..)),
2595 ..
2596 }
2597 )
2598 }) {
2599 cmvs.with_options.push(MaterializedViewOption {
2600 name: MaterializedViewOptionName::Refresh,
2601 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2602 })
2603 }
2604
2605 if introduced_mz_timestamp {
2609 resolved_ids.add_item(mz_timestamp_id);
2610 }
2611 let mut visitor = ExprContainsTemporalVisitor::new();
2615 visitor.visit_create_materialized_view_statement(cmvs);
2616 if !visitor.contains_temporal {
2617 resolved_ids.remove_item(&mz_now_id);
2618 }
2619}
2620
2621pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2624 match &mvo.value {
2625 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2626 let mut visitor = ExprContainsTemporalVisitor::new();
2627 visitor.visit_expr(time);
2628 visitor.contains_temporal
2629 }
2630 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2631 interval: _,
2632 aligned_to: Some(aligned_to),
2633 }))) => {
2634 let mut visitor = ExprContainsTemporalVisitor::new();
2635 visitor.visit_expr(aligned_to);
2636 visitor.contains_temporal
2637 }
2638 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2639 interval: _,
2640 aligned_to: None,
2641 }))) => {
2642 true
2645 }
2646 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2647 true
2649 }
2650 _ => false,
2651 }
2652}
2653
2654struct ExprContainsTemporalVisitor {
2656 pub contains_temporal: bool,
2657}
2658
2659impl ExprContainsTemporalVisitor {
2660 pub fn new() -> ExprContainsTemporalVisitor {
2661 ExprContainsTemporalVisitor {
2662 contains_temporal: false,
2663 }
2664 }
2665}
2666
2667impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2668 fn visit_function(&mut self, func: &Function<Aug>) {
2669 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2670 visit_function(self, func);
2671 }
2672}
2673
2674struct MzNowPurifierVisitor {
2675 pub mz_now: Option<Timestamp>,
2676 pub mz_timestamp_type: ResolvedDataType,
2677 pub introduced_mz_timestamp: bool,
2678}
2679
2680impl MzNowPurifierVisitor {
2681 pub fn new(
2682 mz_now: Option<Timestamp>,
2683 mz_timestamp_type: ResolvedDataType,
2684 ) -> MzNowPurifierVisitor {
2685 MzNowPurifierVisitor {
2686 mz_now,
2687 mz_timestamp_type,
2688 introduced_mz_timestamp: false,
2689 }
2690 }
2691}
2692
2693impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2694 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2695 match expr {
2696 Expr::Function(Function {
2697 name:
2698 ResolvedItemName::Item {
2699 full_name: FullItemName { item, .. },
2700 ..
2701 },
2702 ..
2703 }) if item == &MZ_NOW_NAME.to_string() => {
2704 let mz_now = self.mz_now.expect(
2705 "we should have chosen a timestamp if the expression contains mz_now()",
2706 );
2707 *expr = Expr::Cast {
2710 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2711 data_type: self.mz_timestamp_type.clone(),
2712 };
2713 self.introduced_mz_timestamp = true;
2714 }
2715 _ => visit_expr_mut(self, expr),
2716 }
2717 }
2718}