1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::error::ErrorExt;
27use mz_ore::future::InTask;
28use mz_ore::iter::IteratorExt;
29use mz_ore::str::StrExt;
30use mz_ore::{assert_none, soft_panic_or_log};
31use mz_postgres_util::desc::PostgresTableDesc;
32use mz_postgres_util::replication::WalLevel;
33use mz_postgres_util::tunnel::PostgresFlavor;
34use mz_proto::RustType;
35use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
36use mz_sql_parser::ast::display::AstDisplay;
37use mz_sql_parser::ast::visit::{Visit, visit_function};
38use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
39use mz_sql_parser::ast::{
40 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
41 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
42 CreateSinkStatement, CreateSubsourceOption, CreateSubsourceOptionName,
43 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
44 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
45 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
46 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
47 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
48 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
49 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
50 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
51 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
52};
53use mz_sql_server_util::desc::SqlServerTableDesc;
54use mz_storage_types::configuration::StorageConfiguration;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::connections::{Connection, PostgresConnection};
57use mz_storage_types::errors::ContextCreationError;
58use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
59use mz_storage_types::sources::mysql::MySqlSourceDetails;
60use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
61use mz_storage_types::sources::{
62 GenericSourceConnection, PostgresSourceConnection, SourceConnection, SourceDesc,
63 SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81 ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::SqlServerSourcePurificationError;
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103 external_reference: UnresolvedItemName,
104 name: UnresolvedItemName,
105 meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 f.debug_struct("RequestedSourceExport")
111 .field("external_reference", &self.external_reference)
112 .field("name", &self.name)
113 .field("meta", &self.meta)
114 .finish()
115 }
116}
117
118impl<T> RequestedSourceExport<T> {
119 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120 RequestedSourceExport {
121 external_reference: self.external_reference,
122 name: self.name,
123 meta: new_meta,
124 }
125 }
126}
127
128fn source_export_name_gen(
133 source_name: &UnresolvedItemName,
134 subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137 partial.item = subsource_name.to_string();
138 Ok(UnresolvedItemName::from(partial))
139}
140
141fn validate_source_export_names<T>(
145 requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147 if let Some(name) = requested_source_exports
151 .iter()
152 .map(|subsource| &subsource.name)
153 .duplicates()
154 .next()
155 .cloned()
156 {
157 let mut upstream_references: Vec<_> = requested_source_exports
158 .into_iter()
159 .filter_map(|subsource| {
160 if &subsource.name == &name {
161 Some(subsource.external_reference.clone())
162 } else {
163 None
164 }
165 })
166 .collect();
167
168 upstream_references.sort();
169
170 Err(PlanError::SubsourceNameConflict {
171 name,
172 upstream_references,
173 })?;
174 }
175
176 if let Some(name) = requested_source_exports
182 .iter()
183 .map(|export| &export.external_reference)
184 .duplicates()
185 .next()
186 .cloned()
187 {
188 let mut target_names: Vec<_> = requested_source_exports
189 .into_iter()
190 .filter_map(|export| {
191 if &export.external_reference == &name {
192 Some(export.name.clone())
193 } else {
194 None
195 }
196 })
197 .collect();
198
199 target_names.sort();
200
201 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202 }
203
204 Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209 PurifiedCreateSource {
210 create_progress_subsource_stmt: CreateSubsourceStatement<Aug>,
211 create_source_stmt: CreateSourceStatement<Aug>,
212 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
214 available_source_references: SourceReferences,
217 },
218 PurifiedAlterSource {
219 alter_source_stmt: AlterSourceStatement<Aug>,
220 },
221 PurifiedAlterSourceAddSubsources {
222 source_name: ResolvedItemName,
224 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
227 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
229 },
230 PurifiedAlterSourceRefreshReferences {
231 source_name: ResolvedItemName,
232 available_source_references: SourceReferences,
234 },
235 PurifiedCreateSink(CreateSinkStatement<Aug>),
236 PurifiedCreateTableFromSource {
237 stmt: CreateTableFromSourceStatement<Aug>,
238 },
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct PurifiedSourceExport {
243 pub external_reference: UnresolvedItemName,
244 pub details: PurifiedExportDetails,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum PurifiedExportDetails {
249 MySql {
250 table: MySqlTableDesc,
251 text_columns: Option<Vec<Ident>>,
252 exclude_columns: Option<Vec<Ident>>,
253 initial_gtid_set: String,
254 },
255 Postgres {
256 table: PostgresTableDesc,
257 text_columns: Option<Vec<Ident>>,
258 },
259 SqlServer {
260 table: SqlServerTableDesc,
261 text_columns: Option<Vec<Ident>>,
262 excl_columns: Option<Vec<Ident>>,
263 capture_instance: Arc<str>,
264 },
265 Kafka {},
266 LoadGenerator {
267 table: Option<RelationDesc>,
268 output: LoadGeneratorOutput,
269 },
270}
271
272pub async fn purify_statement(
282 catalog: impl SessionCatalog,
283 now: u64,
284 stmt: Statement<Aug>,
285 storage_configuration: &StorageConfiguration,
286) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
287 match stmt {
288 Statement::CreateSource(stmt) => {
289 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
290 (
291 purify_create_source(catalog, now, stmt, storage_configuration).await,
292 cluster_id,
293 )
294 }
295 Statement::AlterSource(stmt) => (
296 purify_alter_source(catalog, stmt, storage_configuration).await,
297 None,
298 ),
299 Statement::CreateSink(stmt) => {
300 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
301 (
302 purify_create_sink(catalog, stmt, storage_configuration).await,
303 cluster_id,
304 )
305 }
306 Statement::CreateTableFromSource(stmt) => (
307 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
308 None,
309 ),
310 o => unreachable!("{:?} does not need to be purified", o),
311 }
312}
313
314pub(crate) fn purify_create_sink_avro_doc_on_options(
319 catalog: &dyn SessionCatalog,
320 from_id: CatalogItemId,
321 format: &mut Option<FormatSpecifier<Aug>>,
322) -> Result<(), PlanError> {
323 let from = catalog.get_item(&from_id);
325 let object_ids = from
326 .references()
327 .items()
328 .copied()
329 .chain_one(from.id())
330 .collect::<Vec<_>>();
331
332 let mut avro_format_options = vec![];
335 for_each_format(format, |doc_on_schema, fmt| match fmt {
336 Format::Avro(AvroSchema::InlineSchema { .. })
337 | Format::Bytes
338 | Format::Csv { .. }
339 | Format::Json { .. }
340 | Format::Protobuf(..)
341 | Format::Regex(..)
342 | Format::Text => (),
343 Format::Avro(AvroSchema::Csr {
344 csr_connection: CsrConnectionAvro { connection, .. },
345 }) => {
346 avro_format_options.push((doc_on_schema, &mut connection.options));
347 }
348 });
349
350 for (for_schema, options) in avro_format_options {
353 let user_provided_comments = options
354 .iter()
355 .filter_map(|CsrConfigOption { name, .. }| match name {
356 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
357 _ => None,
358 })
359 .collect::<BTreeSet<_>>();
360
361 for object_id in &object_ids {
363 let item = catalog
365 .get_item(object_id)
366 .at_version(RelationVersionSelector::Latest);
367 let full_name = catalog.resolve_full_name(item.name());
368 let full_resolved_name = ResolvedItemName::Item {
369 id: *object_id,
370 qualifiers: item.name().qualifiers.clone(),
371 full_name: full_name.clone(),
372 print_id: !matches!(
373 item.item_type(),
374 CatalogItemType::Func | CatalogItemType::Type
375 ),
376 version: RelationVersionSelector::Latest,
377 };
378
379 if let Some(comments_map) = catalog.get_item_comments(object_id) {
380 let doc_on_item_key = AvroDocOn {
383 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384 for_schema,
385 };
386 if !user_provided_comments.contains(&doc_on_item_key) {
387 if let Some(root_comment) = comments_map.get(&None) {
388 options.push(CsrConfigOption {
389 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391 root_comment.clone(),
392 ))),
393 });
394 }
395 }
396
397 if let Ok(desc) = item.desc(&full_name) {
401 for (pos, column_name) in desc.iter_names().enumerate() {
402 let comment = comments_map.get(&Some(pos + 1));
403 if let Some(comment_str) = comment {
404 let doc_on_column_key = AvroDocOn {
405 identifier: DocOnIdentifier::Column(ColumnName {
406 relation: full_resolved_name.clone(),
407 column: ResolvedColumnReference::Column {
408 name: column_name.to_owned(),
409 index: pos,
410 },
411 }),
412 for_schema,
413 };
414 if !user_provided_comments.contains(&doc_on_column_key) {
415 options.push(CsrConfigOption {
416 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
417 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
418 Value::String(comment_str.clone()),
419 )),
420 });
421 }
422 }
423 }
424 }
425 }
426 }
427 }
428
429 Ok(())
430}
431
432async fn purify_create_sink(
440 catalog: impl SessionCatalog,
441 mut create_sink_stmt: CreateSinkStatement<Aug>,
442 storage_configuration: &StorageConfiguration,
443) -> Result<PurifiedStatement, PlanError> {
444 let CreateSinkStatement {
446 connection,
447 format,
448 with_options,
449 name: _,
450 in_cluster: _,
451 if_not_exists: _,
452 from,
453 envelope: _,
454 } = &mut create_sink_stmt;
455
456 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
458
459 if let Some(op) = with_options
460 .iter()
461 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
462 {
463 sql_bail!(
464 "CREATE SINK...WITH ({}..) is not allowed",
465 op.name.to_ast_string_simple(),
466 )
467 }
468
469 match &connection {
470 CreateSinkConnection::Kafka {
471 connection,
472 options,
473 key: _,
474 headers: _,
475 } => {
476 let scx = StatementContext::new(None, &catalog);
477 let connection = {
478 let item = scx.get_item_by_resolved_name(connection)?;
479 match item.connection()? {
481 Connection::Kafka(connection) => {
482 connection.clone().into_inline_connection(scx.catalog)
483 }
484 _ => sql_bail!(
485 "{} is not a kafka connection",
486 scx.catalog.resolve_full_name(item.name())
487 ),
488 }
489 };
490
491 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
492
493 if extracted_options.legacy_ids == Some(true) {
494 sql_bail!("LEGACY IDs option is not supported");
495 }
496
497 let client: AdminClient<_> = connection
498 .create_with_context(
499 storage_configuration,
500 MzClientContext::default(),
501 &BTreeMap::new(),
502 InTask::No,
503 )
504 .await
505 .map_err(|e| {
506 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
508 })?;
509
510 let metadata = client
511 .inner()
512 .fetch_metadata(
513 None,
514 storage_configuration
515 .parameters
516 .kafka_timeout_config
517 .fetch_metadata_timeout,
518 )
519 .map_err(|e| {
520 KafkaSinkPurificationError::AdminClientError(Arc::new(
521 ContextCreationError::KafkaError(e),
522 ))
523 })?;
524
525 if metadata.brokers().len() == 0 {
526 Err(KafkaSinkPurificationError::ZeroBrokers)?;
527 }
528 }
529 }
530
531 let mut csr_connection_ids = BTreeSet::new();
532 for_each_format(format, |_, fmt| match fmt {
533 Format::Avro(AvroSchema::InlineSchema { .. })
534 | Format::Bytes
535 | Format::Csv { .. }
536 | Format::Json { .. }
537 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
538 | Format::Regex(..)
539 | Format::Text => (),
540 Format::Avro(AvroSchema::Csr {
541 csr_connection: CsrConnectionAvro { connection, .. },
542 })
543 | Format::Protobuf(ProtobufSchema::Csr {
544 csr_connection: CsrConnectionProtobuf { connection, .. },
545 }) => {
546 csr_connection_ids.insert(*connection.connection.item_id());
547 }
548 });
549
550 let scx = StatementContext::new(None, &catalog);
551 for csr_connection_id in csr_connection_ids {
552 let connection = {
553 let item = scx.get_item(&csr_connection_id);
554 match item.connection()? {
556 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
557 _ => Err(CsrPurificationError::NotCsrConnection(
558 scx.catalog.resolve_full_name(item.name()),
559 ))?,
560 }
561 };
562
563 let client = connection
564 .connect(storage_configuration, InTask::No)
565 .await
566 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
567
568 client
569 .list_subjects()
570 .await
571 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
572 }
573
574 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
575
576 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
577}
578
579fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
586where
587 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
588{
589 match format {
590 None => (),
591 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
592 Some(FormatSpecifier::KeyValue { key, value }) => {
593 f(DocOnSchema::KeyOnly, key);
594 f(DocOnSchema::ValueOnly, value);
595 }
596 }
597}
598
599#[derive(Debug, Copy, Clone, PartialEq, Eq)]
602pub(crate) enum SourceReferencePolicy {
603 NotAllowed,
606 Optional,
609 Required,
612}
613
614async fn purify_create_source(
615 catalog: impl SessionCatalog,
616 now: u64,
617 mut create_source_stmt: CreateSourceStatement<Aug>,
618 storage_configuration: &StorageConfiguration,
619) -> Result<PurifiedStatement, PlanError> {
620 let CreateSourceStatement {
621 name: source_name,
622 connection: source_connection,
623 format,
624 envelope,
625 include_metadata,
626 external_references,
627 progress_subsource,
628 ..
629 } = &mut create_source_stmt;
630
631 if let Some(DeferredItemName::Named(_)) = progress_subsource {
632 sql_bail!("Cannot manually ID qualify progress subsource")
633 }
634
635 let mut requested_subsource_map = BTreeMap::new();
636
637 let progress_desc = match &source_connection {
638 CreateSourceConnection::Kafka { .. } => {
639 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
640 }
641 CreateSourceConnection::Postgres { .. } | CreateSourceConnection::Yugabyte { .. } => {
642 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
643 }
644 CreateSourceConnection::SqlServer { .. } => {
645 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
646 }
647 CreateSourceConnection::MySql { .. } => {
648 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
649 }
650 CreateSourceConnection::LoadGenerator { .. } => {
651 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
652 }
653 };
654 let scx = StatementContext::new(None, &catalog);
655
656 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
660 && scx.catalog.system_vars().force_source_table_syntax()
661 {
662 SourceReferencePolicy::NotAllowed
663 } else if scx.catalog.system_vars().enable_create_table_from_source() {
664 SourceReferencePolicy::Optional
665 } else {
666 SourceReferencePolicy::Required
667 };
668
669 let mut format_options = SourceFormatOptions::Default;
670
671 let retrieved_source_references: RetrievedSourceReferences;
672
673 match source_connection {
674 CreateSourceConnection::Kafka {
675 connection,
676 options: base_with_options,
677 ..
678 } => {
679 if let Some(external_references) = external_references {
680 Err(KafkaSourcePurificationError::ReferencedSubsources(
681 external_references.clone(),
682 ))?;
683 }
684
685 let connection = {
686 let item = scx.get_item_by_resolved_name(connection)?;
687 match item.connection()? {
689 Connection::Kafka(connection) => {
690 connection.clone().into_inline_connection(&catalog)
691 }
692 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
693 scx.catalog.resolve_full_name(item.name()),
694 ))?,
695 }
696 };
697
698 let extracted_options: KafkaSourceConfigOptionExtracted =
699 base_with_options.clone().try_into()?;
700
701 let topic = extracted_options
702 .topic
703 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
704
705 let consumer = connection
706 .create_with_context(
707 storage_configuration,
708 MzClientContext::default(),
709 &BTreeMap::new(),
710 InTask::No,
711 )
712 .await
713 .map_err(|e| {
714 KafkaSourcePurificationError::KafkaConsumerError(
716 e.display_with_causes().to_string(),
717 )
718 })?;
719 let consumer = Arc::new(consumer);
720
721 match (
722 extracted_options.start_offset,
723 extracted_options.start_timestamp,
724 ) {
725 (None, None) => {
726 kafka_util::ensure_topic_exists(
728 Arc::clone(&consumer),
729 &topic,
730 storage_configuration
731 .parameters
732 .kafka_timeout_config
733 .fetch_metadata_timeout,
734 )
735 .await?;
736 }
737 (Some(_), Some(_)) => {
738 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
739 }
740 (Some(start_offsets), None) => {
741 kafka_util::validate_start_offsets(
743 Arc::clone(&consumer),
744 &topic,
745 start_offsets,
746 storage_configuration
747 .parameters
748 .kafka_timeout_config
749 .fetch_metadata_timeout,
750 )
751 .await?;
752 }
753 (None, Some(time_offset)) => {
754 let start_offsets = kafka_util::lookup_start_offsets(
756 Arc::clone(&consumer),
757 &topic,
758 time_offset,
759 now,
760 storage_configuration
761 .parameters
762 .kafka_timeout_config
763 .fetch_metadata_timeout,
764 )
765 .await?;
766
767 base_with_options.retain(|val| {
768 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
769 });
770 base_with_options.push(KafkaSourceConfigOption {
771 name: KafkaSourceConfigOptionName::StartOffset,
772 value: Some(WithOptionValue::Sequence(
773 start_offsets
774 .iter()
775 .map(|offset| {
776 WithOptionValue::Value(Value::Number(offset.to_string()))
777 })
778 .collect(),
779 )),
780 });
781 }
782 }
783
784 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
785 retrieved_source_references = reference_client.get_source_references().await?;
786
787 format_options = SourceFormatOptions::Kafka { topic };
788 }
789 source_connection @ CreateSourceConnection::Postgres { .. }
790 | source_connection @ CreateSourceConnection::Yugabyte { .. } => {
791 let (source_flavor, connection, options) = match source_connection {
792 CreateSourceConnection::Postgres {
793 connection,
794 options,
795 } => (PostgresFlavor::Vanilla, connection, options),
796 CreateSourceConnection::Yugabyte {
797 connection,
798 options,
799 } => (PostgresFlavor::Yugabyte, connection, options),
800 _ => unreachable!(),
801 };
802 let connection = {
803 let item = scx.get_item_by_resolved_name(connection)?;
804 match item.connection().map_err(PlanError::from)? {
805 Connection::Postgres(connection) => {
806 let connection = connection.clone().into_inline_connection(&catalog);
807 if connection.flavor != source_flavor {
808 match source_flavor {
809 PostgresFlavor::Vanilla => {
810 Err(PgSourcePurificationError::NotPgConnection(
811 scx.catalog.resolve_full_name(item.name()),
812 ))
813 }
814 PostgresFlavor::Yugabyte => {
815 Err(PgSourcePurificationError::NotYugabyteConnection(
816 scx.catalog.resolve_full_name(item.name()),
817 ))
818 }
819 }
820 } else {
821 Ok(connection)
822 }
823 }
824 _ => match source_flavor {
825 PostgresFlavor::Vanilla => Err(PgSourcePurificationError::NotPgConnection(
826 scx.catalog.resolve_full_name(item.name()),
827 )),
828 PostgresFlavor::Yugabyte => {
829 Err(PgSourcePurificationError::NotYugabyteConnection(
830 scx.catalog.resolve_full_name(item.name()),
831 ))
832 }
833 },
834 }
835 }?;
836 let crate::plan::statement::PgConfigOptionExtracted {
837 publication,
838 text_columns,
839 details,
840 ..
841 } = options.clone().try_into()?;
842 let publication =
843 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
844
845 if details.is_some() {
846 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
847 }
848
849 let config = connection
851 .config(
852 &storage_configuration.connection_context.secrets_reader,
853 storage_configuration,
854 InTask::No,
855 )
856 .await?;
857
858 let client = config
859 .connect(
860 "postgres_purification",
861 &storage_configuration.connection_context.ssh_tunnel_manager,
862 )
863 .await?;
864
865 let wal_level = mz_postgres_util::get_wal_level(&client).await?;
866
867 if wal_level < WalLevel::Logical {
868 Err(PgSourcePurificationError::InsufficientWalLevel { wal_level })?;
869 }
870
871 let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
872
873 if max_wal_senders < 1 {
874 Err(PgSourcePurificationError::ReplicationDisabled)?;
875 }
876
877 let available_replication_slots =
878 mz_postgres_util::available_replication_slots(&client).await?;
879
880 if available_replication_slots < 2 {
882 Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 2 })?;
883 }
884
885 let reference_client = SourceReferenceClient::Postgres {
886 client: &client,
887 publication: &publication,
888 database: &connection.database,
889 };
890 retrieved_source_references = reference_client.get_source_references().await?;
891
892 let postgres::PurifiedSourceExports {
893 source_exports: subsources,
894 normalized_text_columns,
895 } = postgres::purify_source_exports(
896 &client,
897 &config,
898 &retrieved_source_references,
899 external_references,
900 text_columns,
901 source_name,
902 &reference_policy,
903 )
904 .await?;
905
906 if let Some(text_cols_option) = options
907 .iter_mut()
908 .find(|option| option.name == PgConfigOptionName::TextColumns)
909 {
910 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
911 }
912
913 requested_subsource_map.extend(subsources);
914
915 let replication_client = config
918 .connect_replication(&storage_configuration.connection_context.ssh_tunnel_manager)
919 .await?;
920 let timeline_id = mz_postgres_util::get_timeline_id(&replication_client).await?;
921
922 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
924 let details = PostgresSourcePublicationDetails {
925 slot: format!(
926 "materialize_{}",
927 Uuid::new_v4().to_string().replace('-', "")
928 ),
929 timeline_id: Some(timeline_id),
930 database: connection.database,
931 };
932 options.push(PgConfigOption {
933 name: PgConfigOptionName::Details,
934 value: Some(WithOptionValue::Value(Value::String(hex::encode(
935 details.into_proto().encode_to_vec(),
936 )))),
937 })
938 }
939 CreateSourceConnection::SqlServer {
940 connection,
941 options,
942 } => {
943 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
944
945 let connection_item = scx.get_item_by_resolved_name(connection)?;
948 let connection = match connection_item.connection()? {
949 Connection::SqlServer(connection) => {
950 connection.clone().into_inline_connection(&catalog)
951 }
952 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
953 scx.catalog.resolve_full_name(connection_item.name()),
954 ))?,
955 };
956 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
957 details,
958 text_columns,
959 exclude_columns,
960 seen: _,
961 } = options.clone().try_into()?;
962
963 if details.is_some() {
964 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
965 }
966
967 let config = connection
968 .resolve_config(
969 &storage_configuration.connection_context.secrets_reader,
970 storage_configuration,
971 InTask::No,
972 )
973 .await?;
974 let mut client = mz_sql_server_util::Client::connect(config).await?;
975
976 let mut replication_errors = vec![];
981 for error in [
982 mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
983 mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
984 ] {
985 match error {
986 Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
987 name,
988 expected,
989 actual,
990 }) => replication_errors.push((name, expected, actual)),
991 Err(other) => Err(other)?,
992 Ok(()) => (),
993 }
994 }
995 if !replication_errors.is_empty() {
996 Err(SqlServerSourcePurificationError::ReplicationSettingsError(
997 replication_errors,
998 ))?;
999 }
1000
1001 let database: Arc<str> = connection.database.into();
1004 let reference_client = SourceReferenceClient::SqlServer {
1005 client: &mut client,
1006 database: Arc::clone(&database),
1007 };
1008 retrieved_source_references = reference_client.get_source_references().await?;
1009 tracing::debug!(?retrieved_source_references, "got source references");
1010
1011 let purified_source_exports = sql_server::purify_source_exports(
1012 &*database,
1013 &mut client,
1014 &retrieved_source_references,
1015 external_references,
1016 &text_columns,
1017 &exclude_columns,
1018 source_name,
1019 &reference_policy,
1020 )
1021 .await?;
1022
1023 let sql_server::PurifiedSourceExports {
1024 source_exports,
1025 normalized_text_columns,
1026 normalized_excl_columns,
1027 } = purified_source_exports;
1028
1029 requested_subsource_map.extend(source_exports);
1031
1032 let details = SqlServerSourceExtras {};
1035 options.retain(|SqlServerConfigOption { name, .. }| {
1036 name != &SqlServerConfigOptionName::Details
1037 });
1038 options.push(SqlServerConfigOption {
1039 name: SqlServerConfigOptionName::Details,
1040 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1041 details.into_proto().encode_to_vec(),
1042 )))),
1043 });
1044
1045 if let Some(text_cols_option) = options
1047 .iter_mut()
1048 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1049 {
1050 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1051 }
1052 if let Some(excl_cols_option) = options
1053 .iter_mut()
1054 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1055 {
1056 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1057 }
1058 }
1059 CreateSourceConnection::MySql {
1060 connection,
1061 options,
1062 } => {
1063 let connection_item = scx.get_item_by_resolved_name(connection)?;
1064 let connection = match connection_item.connection()? {
1065 Connection::MySql(connection) => {
1066 connection.clone().into_inline_connection(&catalog)
1067 }
1068 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1069 scx.catalog.resolve_full_name(connection_item.name()),
1070 ))?,
1071 };
1072 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1073 details,
1074 text_columns,
1075 exclude_columns,
1076 seen: _,
1077 } = options.clone().try_into()?;
1078
1079 if details.is_some() {
1080 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1081 }
1082
1083 let config = connection
1084 .config(
1085 &storage_configuration.connection_context.secrets_reader,
1086 storage_configuration,
1087 InTask::No,
1088 )
1089 .await?;
1090
1091 let mut conn = config
1092 .connect(
1093 "mysql purification",
1094 &storage_configuration.connection_context.ssh_tunnel_manager,
1095 )
1096 .await?;
1097
1098 let mut replication_errors = vec![];
1100 for error in [
1101 mz_mysql_util::ensure_gtid_consistency(&mut conn)
1102 .await
1103 .err(),
1104 mz_mysql_util::ensure_full_row_binlog_format(&mut conn)
1105 .await
1106 .err(),
1107 mz_mysql_util::ensure_replication_commit_order(&mut conn)
1108 .await
1109 .err(),
1110 ] {
1111 match error {
1112 Some(mz_mysql_util::MySqlError::InvalidSystemSetting {
1113 setting,
1114 expected,
1115 actual,
1116 }) => {
1117 replication_errors.push((setting, expected, actual));
1118 }
1119 Some(err) => Err(err)?,
1120 None => (),
1121 }
1122 }
1123 if !replication_errors.is_empty() {
1124 Err(MySqlSourcePurificationError::ReplicationSettingsError(
1125 replication_errors,
1126 ))?;
1127 }
1128
1129 let initial_gtid_set =
1133 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1134
1135 let reference_client = SourceReferenceClient::MySql {
1136 conn: &mut conn,
1137 include_system_schemas: mysql::references_system_schemas(external_references),
1138 };
1139 retrieved_source_references = reference_client.get_source_references().await?;
1140
1141 let mysql::PurifiedSourceExports {
1142 source_exports: subsources,
1143 normalized_text_columns,
1144 normalized_exclude_columns,
1145 } = mysql::purify_source_exports(
1146 &mut conn,
1147 &retrieved_source_references,
1148 external_references,
1149 text_columns,
1150 exclude_columns,
1151 source_name,
1152 initial_gtid_set.clone(),
1153 &reference_policy,
1154 )
1155 .await?;
1156 requested_subsource_map.extend(subsources);
1157
1158 let details = MySqlSourceDetails {};
1161 options
1163 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1164 options.push(MySqlConfigOption {
1165 name: MySqlConfigOptionName::Details,
1166 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1167 details.into_proto().encode_to_vec(),
1168 )))),
1169 });
1170
1171 if let Some(text_cols_option) = options
1172 .iter_mut()
1173 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1174 {
1175 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1176 }
1177 if let Some(ignore_cols_option) = options
1178 .iter_mut()
1179 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1180 {
1181 ignore_cols_option.value =
1182 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1183 }
1184 }
1185 CreateSourceConnection::LoadGenerator { generator, options } => {
1186 let load_generator =
1187 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1188
1189 let reference_client = SourceReferenceClient::LoadGenerator {
1190 generator: &load_generator,
1191 };
1192 retrieved_source_references = reference_client.get_source_references().await?;
1193 let multi_output_sources =
1197 retrieved_source_references
1198 .all_references()
1199 .iter()
1200 .any(|r| {
1201 r.load_generator_output().expect("is loadgen")
1202 != &LoadGeneratorOutput::Default
1203 });
1204
1205 match external_references {
1206 Some(requested)
1207 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1208 {
1209 Err(PlanError::UseTablesForSources(requested.to_string()))?
1210 }
1211 Some(requested) if !multi_output_sources => match requested {
1212 ExternalReferences::SubsetTables(_) => {
1213 Err(LoadGeneratorSourcePurificationError::ForTables)?
1214 }
1215 ExternalReferences::SubsetSchemas(_) => {
1216 Err(LoadGeneratorSourcePurificationError::ForSchemas)?
1217 }
1218 ExternalReferences::All => {
1219 Err(LoadGeneratorSourcePurificationError::ForAllTables)?
1220 }
1221 },
1222 Some(requested) => {
1223 let requested_exports = retrieved_source_references
1224 .requested_source_exports(Some(requested), source_name)?;
1225 for export in requested_exports {
1226 requested_subsource_map.insert(
1227 export.name,
1228 PurifiedSourceExport {
1229 external_reference: export.external_reference,
1230 details: PurifiedExportDetails::LoadGenerator {
1231 table: export
1232 .meta
1233 .load_generator_desc()
1234 .expect("is loadgen")
1235 .clone(),
1236 output: export
1237 .meta
1238 .load_generator_output()
1239 .expect("is loadgen")
1240 .clone(),
1241 },
1242 },
1243 );
1244 }
1245 }
1246 None => {
1247 if multi_output_sources
1248 && matches!(reference_policy, SourceReferencePolicy::Required)
1249 {
1250 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1251 }
1252 }
1253 }
1254
1255 if let LoadGenerator::Clock = generator {
1256 if !options
1257 .iter()
1258 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1259 {
1260 let now = catalog.now();
1261 options.push(LoadGeneratorOption {
1262 name: LoadGeneratorOptionName::AsOf,
1263 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1264 });
1265 }
1266 }
1267 }
1268 }
1269
1270 *external_references = None;
1274
1275 let name = match progress_subsource {
1279 Some(name) => match name {
1280 DeferredItemName::Deferred(name) => name.clone(),
1281 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1282 },
1283 None => {
1284 let (item, prefix) = source_name.0.split_last().unwrap();
1285 let item_name = Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1286 let mut suggested_name = prefix.to_vec();
1287 suggested_name.push(candidate.clone());
1288
1289 let partial = normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1290 let qualified = scx.allocate_qualified_name(partial)?;
1291 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1292 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1293 Ok::<_, PlanError>(!item_exists && !type_exists)
1294 })?;
1295
1296 let mut full_name = prefix.to_vec();
1297 full_name.push(item_name);
1298 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1299 let qualified_name = scx.allocate_qualified_name(full_name)?;
1300 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1301
1302 UnresolvedItemName::from(full_name.clone())
1303 }
1304 };
1305
1306 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1307
1308 let create_progress_subsource_stmt = CreateSubsourceStatement {
1310 name,
1311 columns,
1312 of_source: None,
1316 constraints,
1317 if_not_exists: false,
1318 with_options: vec![CreateSubsourceOption {
1319 name: CreateSubsourceOptionName::Progress,
1320 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1321 }],
1322 };
1323
1324 purify_source_format(
1325 &catalog,
1326 format,
1327 &format_options,
1328 envelope,
1329 storage_configuration,
1330 )
1331 .await?;
1332
1333 Ok(PurifiedStatement::PurifiedCreateSource {
1334 create_progress_subsource_stmt,
1335 create_source_stmt,
1336 subsources: requested_subsource_map,
1337 available_source_references: retrieved_source_references.available_source_references(),
1338 })
1339}
1340
1341async fn purify_alter_source(
1344 catalog: impl SessionCatalog,
1345 stmt: AlterSourceStatement<Aug>,
1346 storage_configuration: &StorageConfiguration,
1347) -> Result<PurifiedStatement, PlanError> {
1348 let scx = StatementContext::new(None, &catalog);
1349 let AlterSourceStatement {
1350 source_name: unresolved_source_name,
1351 action,
1352 if_exists,
1353 } = stmt;
1354
1355 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1357 Ok(item) => item,
1358 Err(_) if if_exists => {
1359 return Ok(PurifiedStatement::PurifiedAlterSource {
1360 alter_source_stmt: AlterSourceStatement {
1361 source_name: unresolved_source_name,
1362 action,
1363 if_exists,
1364 },
1365 });
1366 }
1367 Err(e) => return Err(e),
1368 };
1369
1370 let desc = match item.source_desc()? {
1372 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1373 None => {
1374 sql_bail!("cannot ALTER this type of source")
1375 }
1376 };
1377
1378 let source_name = item.name();
1379
1380 let resolved_source_name = ResolvedItemName::Item {
1381 id: item.id(),
1382 qualifiers: item.name().qualifiers.clone(),
1383 full_name: scx.catalog.resolve_full_name(source_name),
1384 print_id: true,
1385 version: RelationVersionSelector::Latest,
1386 };
1387
1388 let partial_name = scx.catalog.minimal_qualification(source_name);
1389
1390 match action {
1391 AlterSourceAction::AddSubsources {
1392 external_references,
1393 options,
1394 } => {
1395 if scx.catalog.system_vars().enable_create_table_from_source()
1396 && scx.catalog.system_vars().force_source_table_syntax()
1397 {
1398 Err(PlanError::UseTablesForSources(
1399 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1400 ))?;
1401 }
1402
1403 purify_alter_source_add_subsources(
1404 external_references,
1405 options,
1406 desc,
1407 partial_name,
1408 unresolved_source_name,
1409 resolved_source_name,
1410 storage_configuration,
1411 )
1412 .await
1413 }
1414 AlterSourceAction::RefreshReferences => {
1415 purify_alter_source_refresh_references(
1416 desc,
1417 resolved_source_name,
1418 storage_configuration,
1419 )
1420 .await
1421 }
1422 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1423 alter_source_stmt: AlterSourceStatement {
1424 source_name: unresolved_source_name,
1425 action,
1426 if_exists,
1427 },
1428 }),
1429 }
1430}
1431
1432async fn purify_alter_source_add_subsources(
1435 external_references: Vec<ExternalReferenceExport>,
1436 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1437 desc: SourceDesc,
1438 partial_source_name: PartialItemName,
1439 unresolved_source_name: UnresolvedItemName,
1440 resolved_source_name: ResolvedItemName,
1441 storage_configuration: &StorageConfiguration,
1442) -> Result<PurifiedStatement, PlanError> {
1443 match desc.connection {
1445 GenericSourceConnection::Postgres(PostgresSourceConnection {
1446 connection:
1447 PostgresConnection {
1448 flavor: PostgresFlavor::Vanilla,
1449 ..
1450 },
1451 ..
1452 }) => {}
1453 GenericSourceConnection::MySql(_) => {}
1454 GenericSourceConnection::SqlServer(_) => {}
1455 _ => sql_bail!(
1456 "source {} does not support ALTER SOURCE.",
1457 partial_source_name
1458 ),
1459 };
1460
1461 let connection_name = desc.connection.name();
1462
1463 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1464 text_columns,
1465 exclude_columns,
1466 details,
1467 seen: _,
1468 } = options.clone().try_into()?;
1469 assert_none!(details, "details cannot be explicitly set");
1470
1471 let mut requested_subsource_map = BTreeMap::new();
1472
1473 match desc.connection {
1474 GenericSourceConnection::Postgres(pg_source_connection) => {
1475 let pg_connection = &pg_source_connection.connection;
1477
1478 let config = pg_connection
1479 .config(
1480 &storage_configuration.connection_context.secrets_reader,
1481 storage_configuration,
1482 InTask::No,
1483 )
1484 .await?;
1485
1486 let client = config
1487 .connect(
1488 "postgres_purification",
1489 &storage_configuration.connection_context.ssh_tunnel_manager,
1490 )
1491 .await?;
1492
1493 let available_replication_slots =
1494 mz_postgres_util::available_replication_slots(&client).await?;
1495
1496 if available_replication_slots < 1 {
1498 Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
1499 }
1500
1501 if !exclude_columns.is_empty() {
1502 sql_bail!(
1503 "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1504 partial_source_name,
1505 connection_name
1506 )
1507 }
1508
1509 let reference_client = SourceReferenceClient::Postgres {
1510 client: &client,
1511 publication: &pg_source_connection.publication,
1512 database: &pg_connection.database,
1513 };
1514 let retrieved_source_references = reference_client.get_source_references().await?;
1515
1516 let postgres::PurifiedSourceExports {
1517 source_exports: subsources,
1518 normalized_text_columns,
1519 } = postgres::purify_source_exports(
1520 &client,
1521 &config,
1522 &retrieved_source_references,
1523 &Some(ExternalReferences::SubsetTables(external_references)),
1524 text_columns,
1525 &unresolved_source_name,
1526 &SourceReferencePolicy::Required,
1527 )
1528 .await?;
1529
1530 if let Some(text_cols_option) = options
1531 .iter_mut()
1532 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1533 {
1534 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1535 }
1536
1537 requested_subsource_map.extend(subsources);
1538 }
1539 GenericSourceConnection::MySql(mysql_source_connection) => {
1540 let mysql_connection = &mysql_source_connection.connection;
1541 let config = mysql_connection
1542 .config(
1543 &storage_configuration.connection_context.secrets_reader,
1544 storage_configuration,
1545 InTask::No,
1546 )
1547 .await?;
1548
1549 let mut conn = config
1550 .connect(
1551 "mysql purification",
1552 &storage_configuration.connection_context.ssh_tunnel_manager,
1553 )
1554 .await?;
1555
1556 let initial_gtid_set =
1559 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1560
1561 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1562
1563 let reference_client = SourceReferenceClient::MySql {
1564 conn: &mut conn,
1565 include_system_schemas: mysql::references_system_schemas(&requested_references),
1566 };
1567 let retrieved_source_references = reference_client.get_source_references().await?;
1568
1569 let mysql::PurifiedSourceExports {
1570 source_exports: subsources,
1571 normalized_text_columns,
1572 normalized_exclude_columns,
1573 } = mysql::purify_source_exports(
1574 &mut conn,
1575 &retrieved_source_references,
1576 &requested_references,
1577 text_columns,
1578 exclude_columns,
1579 &unresolved_source_name,
1580 initial_gtid_set,
1581 &SourceReferencePolicy::Required,
1582 )
1583 .await?;
1584 requested_subsource_map.extend(subsources);
1585
1586 if let Some(text_cols_option) = options
1588 .iter_mut()
1589 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1590 {
1591 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1592 }
1593 if let Some(ignore_cols_option) = options
1594 .iter_mut()
1595 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1596 {
1597 ignore_cols_option.value =
1598 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1599 }
1600 }
1601 GenericSourceConnection::SqlServer(sql_server_source) => {
1602 let sql_server_connection = &sql_server_source.connection;
1604 let config = sql_server_connection
1605 .resolve_config(
1606 &storage_configuration.connection_context.secrets_reader,
1607 storage_configuration,
1608 InTask::No,
1609 )
1610 .await?;
1611 let mut client = mz_sql_server_util::Client::connect(config).await?;
1612
1613 let database = sql_server_connection.database.clone().into();
1615 let source_references = SourceReferenceClient::SqlServer {
1616 client: &mut client,
1617 database: Arc::clone(&database),
1618 }
1619 .get_source_references()
1620 .await?;
1621 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1622
1623 let result = sql_server::purify_source_exports(
1624 &*database,
1625 &mut client,
1626 &source_references,
1627 &requested_references,
1628 &text_columns,
1629 &exclude_columns,
1630 &unresolved_source_name,
1631 &SourceReferencePolicy::Required,
1632 )
1633 .await;
1634 let sql_server::PurifiedSourceExports {
1635 source_exports,
1636 normalized_text_columns,
1637 normalized_excl_columns,
1638 } = result?;
1639
1640 requested_subsource_map.extend(source_exports);
1642
1643 if let Some(text_cols_option) = options
1645 .iter_mut()
1646 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1647 {
1648 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1649 }
1650 if let Some(ignore_cols_option) = options
1651 .iter_mut()
1652 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1653 {
1654 ignore_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1655 }
1656 }
1657 _ => unreachable!(),
1658 };
1659
1660 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1661 source_name: resolved_source_name,
1662 options,
1663 subsources: requested_subsource_map,
1664 })
1665}
1666
1667async fn purify_alter_source_refresh_references(
1668 desc: SourceDesc,
1669 resolved_source_name: ResolvedItemName,
1670 storage_configuration: &StorageConfiguration,
1671) -> Result<PurifiedStatement, PlanError> {
1672 let retrieved_source_references = match desc.connection {
1673 GenericSourceConnection::Postgres(pg_source_connection) => {
1674 let pg_connection = &pg_source_connection.connection;
1676
1677 let config = pg_connection
1678 .config(
1679 &storage_configuration.connection_context.secrets_reader,
1680 storage_configuration,
1681 InTask::No,
1682 )
1683 .await?;
1684
1685 let client = config
1686 .connect(
1687 "postgres_purification",
1688 &storage_configuration.connection_context.ssh_tunnel_manager,
1689 )
1690 .await?;
1691 let reference_client = SourceReferenceClient::Postgres {
1692 client: &client,
1693 publication: &pg_source_connection.publication,
1694 database: &pg_connection.database,
1695 };
1696 reference_client.get_source_references().await?
1697 }
1698 GenericSourceConnection::MySql(mysql_source_connection) => {
1699 let mysql_connection = &mysql_source_connection.connection;
1700 let config = mysql_connection
1701 .config(
1702 &storage_configuration.connection_context.secrets_reader,
1703 storage_configuration,
1704 InTask::No,
1705 )
1706 .await?;
1707
1708 let mut conn = config
1709 .connect(
1710 "mysql purification",
1711 &storage_configuration.connection_context.ssh_tunnel_manager,
1712 )
1713 .await?;
1714
1715 let reference_client = SourceReferenceClient::MySql {
1716 conn: &mut conn,
1717 include_system_schemas: false,
1718 };
1719 reference_client.get_source_references().await?
1720 }
1721 GenericSourceConnection::SqlServer(sql_server_source) => {
1722 let sql_server_connection = &sql_server_source.connection;
1724 let config = sql_server_connection
1725 .resolve_config(
1726 &storage_configuration.connection_context.secrets_reader,
1727 storage_configuration,
1728 InTask::No,
1729 )
1730 .await?;
1731 let mut client = mz_sql_server_util::Client::connect(config).await?;
1732
1733 let source_references = SourceReferenceClient::SqlServer {
1735 client: &mut client,
1736 database: sql_server_connection.database.clone().into(),
1737 }
1738 .get_source_references()
1739 .await?;
1740 source_references
1741 }
1742 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1743 let reference_client = SourceReferenceClient::LoadGenerator {
1744 generator: &load_gen_connection.load_generator,
1745 };
1746 reference_client.get_source_references().await?
1747 }
1748 GenericSourceConnection::Kafka(kafka_conn) => {
1749 let reference_client = SourceReferenceClient::Kafka {
1750 topic: &kafka_conn.topic,
1751 };
1752 reference_client.get_source_references().await?
1753 }
1754 };
1755 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1756 source_name: resolved_source_name,
1757 available_source_references: retrieved_source_references.available_source_references(),
1758 })
1759}
1760
1761async fn purify_create_table_from_source(
1762 catalog: impl SessionCatalog,
1763 mut stmt: CreateTableFromSourceStatement<Aug>,
1764 storage_configuration: &StorageConfiguration,
1765) -> Result<PurifiedStatement, PlanError> {
1766 let scx = StatementContext::new(None, &catalog);
1767 let CreateTableFromSourceStatement {
1768 name: _,
1769 columns,
1770 constraints,
1771 source: source_name,
1772 if_not_exists: _,
1773 external_reference,
1774 format,
1775 envelope,
1776 include_metadata: _,
1777 with_options,
1778 } = &mut stmt;
1779
1780 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1782 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1783 }
1784 if !constraints.is_empty() {
1785 sql_bail!(
1786 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1787 );
1788 }
1789
1790 let item = match scx.get_item_by_resolved_name(source_name) {
1792 Ok(item) => item,
1793 Err(e) => return Err(e),
1794 };
1795
1796 let desc = match item.source_desc()? {
1798 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1799 None => {
1800 sql_bail!("cannot ALTER this type of source")
1801 }
1802 };
1803 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1804 let qualified_source_name = item.name();
1805 let connection_name = desc.connection.name();
1806
1807 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1808 text_columns,
1809 exclude_columns,
1810 details,
1811 partition_by: _,
1812 seen: _,
1813 } = with_options.clone().try_into()?;
1814 assert_none!(details, "details cannot be explicitly set");
1815
1816 let qualified_text_columns = text_columns
1820 .iter()
1821 .map(|col| {
1822 UnresolvedItemName(
1823 external_reference
1824 .as_ref()
1825 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1826 .unwrap_or_else(|| vec![col.clone()]),
1827 )
1828 })
1829 .collect_vec();
1830 let qualified_exclude_columns = exclude_columns
1831 .iter()
1832 .map(|col| {
1833 UnresolvedItemName(
1834 external_reference
1835 .as_ref()
1836 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1837 .unwrap_or_else(|| vec![col.clone()]),
1838 )
1839 })
1840 .collect_vec();
1841
1842 let mut format_options = SourceFormatOptions::Default;
1844
1845 let retrieved_source_references: RetrievedSourceReferences;
1846
1847 let requested_references = external_reference.as_ref().map(|ref_name| {
1848 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1849 reference: ref_name.clone(),
1850 alias: None,
1851 }])
1852 });
1853
1854 let purified_export = match desc.connection {
1857 GenericSourceConnection::Postgres(pg_source_connection) => {
1858 let pg_connection = &pg_source_connection.connection;
1860
1861 let config = pg_connection
1862 .config(
1863 &storage_configuration.connection_context.secrets_reader,
1864 storage_configuration,
1865 InTask::No,
1866 )
1867 .await?;
1868
1869 let client = config
1870 .connect(
1871 "postgres_purification",
1872 &storage_configuration.connection_context.ssh_tunnel_manager,
1873 )
1874 .await?;
1875
1876 let available_replication_slots =
1877 mz_postgres_util::available_replication_slots(&client).await?;
1878
1879 if available_replication_slots < 1 {
1881 Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
1882 }
1883
1884 if !exclude_columns.is_empty() {
1886 sql_bail!(
1887 "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1888 scx.catalog.minimal_qualification(qualified_source_name),
1889 connection_name
1890 )
1891 }
1892
1893 let reference_client = SourceReferenceClient::Postgres {
1894 client: &client,
1895 publication: &pg_source_connection.publication,
1896 database: &pg_connection.database,
1897 };
1898 retrieved_source_references = reference_client.get_source_references().await?;
1899
1900 let postgres::PurifiedSourceExports {
1901 source_exports,
1902 normalized_text_columns: _,
1906 } = postgres::purify_source_exports(
1907 &client,
1908 &config,
1909 &retrieved_source_references,
1910 &requested_references,
1911 qualified_text_columns,
1912 &unresolved_source_name,
1913 &SourceReferencePolicy::Required,
1914 )
1915 .await?;
1916 let (_, purified_export) = source_exports.into_iter().next().unwrap();
1918 purified_export
1919 }
1920 GenericSourceConnection::MySql(mysql_source_connection) => {
1921 let mysql_connection = &mysql_source_connection.connection;
1922 let config = mysql_connection
1923 .config(
1924 &storage_configuration.connection_context.secrets_reader,
1925 storage_configuration,
1926 InTask::No,
1927 )
1928 .await?;
1929
1930 let mut conn = config
1931 .connect(
1932 "mysql purification",
1933 &storage_configuration.connection_context.ssh_tunnel_manager,
1934 )
1935 .await?;
1936
1937 let initial_gtid_set =
1940 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1941
1942 let reference_client = SourceReferenceClient::MySql {
1943 conn: &mut conn,
1944 include_system_schemas: mysql::references_system_schemas(&requested_references),
1945 };
1946 retrieved_source_references = reference_client.get_source_references().await?;
1947
1948 let mysql::PurifiedSourceExports {
1949 source_exports,
1950 normalized_text_columns: _,
1954 normalized_exclude_columns: _,
1955 } = mysql::purify_source_exports(
1956 &mut conn,
1957 &retrieved_source_references,
1958 &requested_references,
1959 qualified_text_columns,
1960 qualified_exclude_columns,
1961 &unresolved_source_name,
1962 initial_gtid_set,
1963 &SourceReferencePolicy::Required,
1964 )
1965 .await?;
1966 let (_, purified_export) = source_exports.into_iter().next().unwrap();
1968 purified_export
1969 }
1970 GenericSourceConnection::SqlServer(_sql_server_source) => {
1971 return Err(PlanError::Unsupported {
1973 feature: "CREATE TABLE ... FROM SQL SERVER SOURCE".to_string(),
1974 discussion_no: None,
1975 });
1976 }
1977 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1978 let reference_client = SourceReferenceClient::LoadGenerator {
1979 generator: &load_gen_connection.load_generator,
1980 };
1981 retrieved_source_references = reference_client.get_source_references().await?;
1982
1983 let requested_exports = retrieved_source_references
1984 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1985 let export = requested_exports.into_iter().next().unwrap();
1987 PurifiedSourceExport {
1988 external_reference: export.external_reference,
1989 details: PurifiedExportDetails::LoadGenerator {
1990 table: export
1991 .meta
1992 .load_generator_desc()
1993 .expect("is loadgen")
1994 .clone(),
1995 output: export
1996 .meta
1997 .load_generator_output()
1998 .expect("is loadgen")
1999 .clone(),
2000 },
2001 }
2002 }
2003 GenericSourceConnection::Kafka(kafka_conn) => {
2004 let reference_client = SourceReferenceClient::Kafka {
2005 topic: &kafka_conn.topic,
2006 };
2007 retrieved_source_references = reference_client.get_source_references().await?;
2008 let requested_exports = retrieved_source_references
2009 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2010 let export = requested_exports.into_iter().next().unwrap();
2012
2013 format_options = SourceFormatOptions::Kafka {
2014 topic: kafka_conn.topic.clone(),
2015 };
2016 PurifiedSourceExport {
2017 external_reference: export.external_reference,
2018 details: PurifiedExportDetails::Kafka {},
2019 }
2020 }
2021 };
2022
2023 purify_source_format(
2024 &catalog,
2025 format,
2026 &format_options,
2027 envelope,
2028 storage_configuration,
2029 )
2030 .await?;
2031
2032 *external_reference = Some(purified_export.external_reference.clone());
2035
2036 match &purified_export.details {
2038 PurifiedExportDetails::Postgres { .. } => {
2039 let mut unsupported_cols = vec![];
2040 let postgres::PostgresExportStatementValues {
2041 columns: gen_columns,
2042 constraints: gen_constraints,
2043 text_columns: gen_text_columns,
2044 details: gen_details,
2045 external_reference: _,
2046 } = postgres::generate_source_export_statement_values(
2047 &scx,
2048 purified_export,
2049 &mut unsupported_cols,
2050 )?;
2051 if !unsupported_cols.is_empty() {
2052 unsupported_cols.sort();
2053 Err(PgSourcePurificationError::UnrecognizedTypes {
2054 cols: unsupported_cols,
2055 })?;
2056 }
2057
2058 if let Some(text_cols_option) = with_options
2059 .iter_mut()
2060 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2061 {
2062 if let Some(gen_text_columns) = gen_text_columns {
2063 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2064 } else {
2065 soft_panic_or_log!(
2066 "text_columns should be Some if text_cols_option is present"
2067 );
2068 }
2069 }
2070 match columns {
2071 TableFromSourceColumns::Defined(_) => unreachable!(),
2072 TableFromSourceColumns::NotSpecified => {
2073 *columns = TableFromSourceColumns::Defined(gen_columns);
2074 *constraints = gen_constraints;
2075 }
2076 TableFromSourceColumns::Named(_) => {
2077 sql_bail!("columns cannot be named for Postgres sources")
2078 }
2079 }
2080 with_options.push(TableFromSourceOption {
2081 name: TableFromSourceOptionName::Details,
2082 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2083 gen_details.into_proto().encode_to_vec(),
2084 )))),
2085 })
2086 }
2087 PurifiedExportDetails::MySql { .. } => {
2088 let mysql::MySqlExportStatementValues {
2089 columns: gen_columns,
2090 constraints: gen_constraints,
2091 text_columns: gen_text_columns,
2092 exclude_columns: gen_exclude_columns,
2093 details: gen_details,
2094 external_reference: _,
2095 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2096
2097 if let Some(text_cols_option) = with_options
2098 .iter_mut()
2099 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2100 {
2101 if let Some(gen_text_columns) = gen_text_columns {
2102 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2103 } else {
2104 soft_panic_or_log!(
2105 "text_columns should be Some if text_cols_option is present"
2106 );
2107 }
2108 }
2109 if let Some(ignore_cols_option) = with_options
2110 .iter_mut()
2111 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2112 {
2113 if let Some(gen_exclude_columns) = gen_exclude_columns {
2114 ignore_cols_option.value = Some(WithOptionValue::Sequence(gen_exclude_columns));
2115 } else {
2116 soft_panic_or_log!(
2117 "text_columns should be Some if ignore_cols_option is present"
2118 );
2119 }
2120 }
2121 match columns {
2122 TableFromSourceColumns::Defined(_) => unreachable!(),
2123 TableFromSourceColumns::NotSpecified => {
2124 *columns = TableFromSourceColumns::Defined(gen_columns);
2125 *constraints = gen_constraints;
2126 }
2127 TableFromSourceColumns::Named(_) => {
2128 sql_bail!("columns cannot be named for MySQL sources")
2129 }
2130 }
2131 with_options.push(TableFromSourceOption {
2132 name: TableFromSourceOptionName::Details,
2133 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2134 gen_details.into_proto().encode_to_vec(),
2135 )))),
2136 })
2137 }
2138 PurifiedExportDetails::SqlServer { .. } => {
2139 return Err(PlanError::Unsupported {
2141 feature: "CREATE TABLE ... FROM SQL SERVER SOURCE".to_string(),
2142 discussion_no: None,
2143 });
2144 }
2145 PurifiedExportDetails::LoadGenerator { .. } => {
2146 let (desc, output) = match purified_export.details {
2147 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2148 _ => unreachable!("purified export details must be load generator"),
2149 };
2150 if let Some(desc) = desc {
2155 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2156 match columns {
2157 TableFromSourceColumns::Defined(_) => unreachable!(),
2158 TableFromSourceColumns::NotSpecified => {
2159 *columns = TableFromSourceColumns::Defined(gen_columns);
2160 *constraints = gen_constraints;
2161 }
2162 TableFromSourceColumns::Named(_) => {
2163 sql_bail!("columns cannot be named for multi-output load generator sources")
2164 }
2165 }
2166 }
2167 let details = SourceExportStatementDetails::LoadGenerator { output };
2168 with_options.push(TableFromSourceOption {
2169 name: TableFromSourceOptionName::Details,
2170 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2171 details.into_proto().encode_to_vec(),
2172 )))),
2173 })
2174 }
2175 PurifiedExportDetails::Kafka {} => {
2176 let details = SourceExportStatementDetails::Kafka {};
2180 with_options.push(TableFromSourceOption {
2181 name: TableFromSourceOptionName::Details,
2182 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2183 details.into_proto().encode_to_vec(),
2184 )))),
2185 })
2186 }
2187 };
2188
2189 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2193}
2194
2195enum SourceFormatOptions {
2196 Default,
2197 Kafka { topic: String },
2198}
2199
2200async fn purify_source_format(
2201 catalog: &dyn SessionCatalog,
2202 format: &mut Option<FormatSpecifier<Aug>>,
2203 options: &SourceFormatOptions,
2204 envelope: &Option<SourceEnvelope>,
2205 storage_configuration: &StorageConfiguration,
2206) -> Result<(), PlanError> {
2207 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2208 && !matches!(options, SourceFormatOptions::Kafka { .. })
2209 {
2210 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2211 }
2212
2213 match format.as_mut() {
2214 None => {}
2215 Some(FormatSpecifier::Bare(format)) => {
2216 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2217 .await?;
2218 }
2219
2220 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2221 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2222 .await?;
2223 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2224 .await?;
2225 }
2226 }
2227 Ok(())
2228}
2229
2230async fn purify_source_format_single(
2231 catalog: &dyn SessionCatalog,
2232 format: &mut Format<Aug>,
2233 options: &SourceFormatOptions,
2234 envelope: &Option<SourceEnvelope>,
2235 storage_configuration: &StorageConfiguration,
2236) -> Result<(), PlanError> {
2237 match format {
2238 Format::Avro(schema) => match schema {
2239 AvroSchema::Csr { csr_connection } => {
2240 purify_csr_connection_avro(
2241 catalog,
2242 options,
2243 csr_connection,
2244 envelope,
2245 storage_configuration,
2246 )
2247 .await?
2248 }
2249 AvroSchema::InlineSchema { .. } => {}
2250 },
2251 Format::Protobuf(schema) => match schema {
2252 ProtobufSchema::Csr { csr_connection } => {
2253 purify_csr_connection_proto(
2254 catalog,
2255 options,
2256 csr_connection,
2257 envelope,
2258 storage_configuration,
2259 )
2260 .await?;
2261 }
2262 ProtobufSchema::InlineSchema { .. } => {}
2263 },
2264 Format::Bytes
2265 | Format::Regex(_)
2266 | Format::Json { .. }
2267 | Format::Text
2268 | Format::Csv { .. } => (),
2269 }
2270 Ok(())
2271}
2272
2273pub fn generate_subsource_statements(
2274 scx: &StatementContext,
2275 source_name: ResolvedItemName,
2276 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2277) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2278 if subsources.is_empty() {
2280 return Ok(vec![]);
2281 }
2282 let (_, purified_export) = subsources.iter().next().unwrap();
2283
2284 let statements = match &purified_export.details {
2285 PurifiedExportDetails::Postgres { .. } => {
2286 crate::pure::postgres::generate_create_subsource_statements(
2287 scx,
2288 source_name,
2289 subsources,
2290 )?
2291 }
2292 PurifiedExportDetails::MySql { .. } => {
2293 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2294 }
2295 PurifiedExportDetails::SqlServer { .. } => {
2296 crate::pure::sql_server::generate_create_subsource_statements(
2297 scx,
2298 source_name,
2299 subsources,
2300 )?
2301 }
2302 PurifiedExportDetails::LoadGenerator { .. } => {
2303 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2304 for (subsource_name, purified_export) in subsources {
2305 let (desc, output) = match purified_export.details {
2306 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2307 _ => unreachable!("purified export details must be load generator"),
2308 };
2309 let desc =
2310 desc.expect("subsources cannot be generated for single-output load generators");
2311
2312 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2313 let details = SourceExportStatementDetails::LoadGenerator { output };
2314 let subsource = CreateSubsourceStatement {
2316 name: subsource_name,
2317 columns,
2318 of_source: Some(source_name.clone()),
2319 constraints: table_constraints,
2324 if_not_exists: false,
2325 with_options: vec![
2326 CreateSubsourceOption {
2327 name: CreateSubsourceOptionName::ExternalReference,
2328 value: Some(WithOptionValue::UnresolvedItemName(
2329 purified_export.external_reference,
2330 )),
2331 },
2332 CreateSubsourceOption {
2333 name: CreateSubsourceOptionName::Details,
2334 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2335 details.into_proto().encode_to_vec(),
2336 )))),
2337 },
2338 ],
2339 };
2340 subsource_stmts.push(subsource);
2341 }
2342
2343 subsource_stmts
2344 }
2345 PurifiedExportDetails::Kafka { .. } => {
2346 assert!(
2350 subsources.is_empty(),
2351 "Kafka sources do not produce data-bearing subsources"
2352 );
2353 vec![]
2354 }
2355 };
2356 Ok(statements)
2357}
2358
2359async fn purify_csr_connection_proto(
2360 catalog: &dyn SessionCatalog,
2361 options: &SourceFormatOptions,
2362 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2363 envelope: &Option<SourceEnvelope>,
2364 storage_configuration: &StorageConfiguration,
2365) -> Result<(), PlanError> {
2366 let SourceFormatOptions::Kafka { topic } = options else {
2367 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2368 };
2369
2370 let CsrConnectionProtobuf {
2371 seed,
2372 connection: CsrConnection {
2373 connection,
2374 options: _,
2375 },
2376 } = csr_connection;
2377 match seed {
2378 None => {
2379 let scx = StatementContext::new(None, &*catalog);
2380
2381 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2382 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2383 _ => sql_bail!("{} is not a schema registry connection", connection),
2384 };
2385
2386 let ccsr_client = ccsr_connection
2387 .connect(storage_configuration, InTask::No)
2388 .await
2389 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2390
2391 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2392 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2393 .await
2394 .ok();
2395
2396 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2397 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2398 }
2399
2400 *seed = Some(CsrSeedProtobuf { value, key });
2401 }
2402 Some(_) => (),
2403 }
2404
2405 Ok(())
2406}
2407
2408async fn purify_csr_connection_avro(
2409 catalog: &dyn SessionCatalog,
2410 options: &SourceFormatOptions,
2411 csr_connection: &mut CsrConnectionAvro<Aug>,
2412 envelope: &Option<SourceEnvelope>,
2413 storage_configuration: &StorageConfiguration,
2414) -> Result<(), PlanError> {
2415 let SourceFormatOptions::Kafka { topic } = options else {
2416 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2417 };
2418
2419 let CsrConnectionAvro {
2420 connection: CsrConnection { connection, .. },
2421 seed,
2422 key_strategy,
2423 value_strategy,
2424 } = csr_connection;
2425 if seed.is_none() {
2426 let scx = StatementContext::new(None, &*catalog);
2427 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2428 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2429 _ => sql_bail!("{} is not a schema registry connection", connection),
2430 };
2431 let ccsr_client = csr_connection
2432 .connect(storage_configuration, InTask::No)
2433 .await
2434 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2435
2436 let Schema {
2437 key_schema,
2438 value_schema,
2439 } = get_remote_csr_schema(
2440 &ccsr_client,
2441 key_strategy.clone().unwrap_or_default(),
2442 value_strategy.clone().unwrap_or_default(),
2443 topic,
2444 )
2445 .await?;
2446 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2447 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2448 }
2449
2450 *seed = Some(CsrSeedAvro {
2451 key_schema,
2452 value_schema,
2453 })
2454 }
2455
2456 Ok(())
2457}
2458
2459#[derive(Debug)]
2460pub struct Schema {
2461 pub key_schema: Option<String>,
2462 pub value_schema: String,
2463}
2464
2465async fn get_schema_with_strategy(
2466 client: &Client,
2467 strategy: ReaderSchemaSelectionStrategy,
2468 subject: &str,
2469) -> Result<Option<String>, PlanError> {
2470 match strategy {
2471 ReaderSchemaSelectionStrategy::Latest => {
2472 match client.get_schema_by_subject(subject).await {
2473 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2474 Err(GetBySubjectError::SubjectNotFound)
2475 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2476 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2477 schema_lookup: format!("subject {}", subject.quoted()),
2478 cause: Arc::new(e),
2479 }),
2480 }
2481 }
2482 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2483 ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2484 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2485 Err(GetByIdError::SchemaNotFound) => Ok(None),
2486 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2487 schema_lookup: format!("ID {}", id),
2488 cause: Arc::new(e),
2489 }),
2490 },
2491 }
2492}
2493
2494async fn get_remote_csr_schema(
2495 ccsr_client: &mz_ccsr::Client,
2496 key_strategy: ReaderSchemaSelectionStrategy,
2497 value_strategy: ReaderSchemaSelectionStrategy,
2498 topic: &str,
2499) -> Result<Schema, PlanError> {
2500 let value_schema_name = format!("{}-value", topic);
2501 let value_schema =
2502 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2503 let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2504 let subject = format!("{}-key", topic);
2505 let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2506 Ok(Schema {
2507 key_schema,
2508 value_schema,
2509 })
2510}
2511
2512async fn compile_proto(
2514 subject_name: &String,
2515 ccsr_client: &Client,
2516) -> Result<CsrSeedProtobufSchema, PlanError> {
2517 let (primary_subject, dependency_subjects) = ccsr_client
2518 .get_subject_and_references(subject_name)
2519 .await
2520 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2521 schema_lookup: format!("subject {}", subject_name.quoted()),
2522 cause: Arc::new(e),
2523 })?;
2524
2525 let mut source_tree = VirtualSourceTree::new();
2527 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2528 source_tree.as_mut().add_file(
2529 Path::new(&subject.name),
2530 subject.schema.raw.as_bytes().to_vec(),
2531 );
2532 }
2533 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2534 let fds = db
2535 .as_mut()
2536 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2537 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2538
2539 let primary_fd = fds.file(0);
2541 let message_name = match primary_fd.message_type_size() {
2542 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2543 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2544 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2545 };
2546
2547 let bytes = &fds
2549 .serialize()
2550 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2551 let mut schema = String::new();
2552 strconv::format_bytes(&mut schema, bytes);
2553
2554 Ok(CsrSeedProtobufSchema {
2555 schema,
2556 message_name,
2557 })
2558}
2559
2560const MZ_NOW_NAME: &str = "mz_now";
2561const MZ_NOW_SCHEMA: &str = "mz_catalog";
2562
2563pub fn purify_create_materialized_view_options(
2569 catalog: impl SessionCatalog,
2570 mz_now: Option<Timestamp>,
2571 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2572 resolved_ids: &mut ResolvedIds,
2573) {
2574 let (mz_now_id, mz_now_expr) = {
2577 let item = catalog
2578 .resolve_function(&PartialItemName {
2579 database: None,
2580 schema: Some(MZ_NOW_SCHEMA.to_string()),
2581 item: MZ_NOW_NAME.to_string(),
2582 })
2583 .expect("we should be able to resolve mz_now");
2584 (
2585 item.id(),
2586 Expr::Function(Function {
2587 name: ResolvedItemName::Item {
2588 id: item.id(),
2589 qualifiers: item.name().qualifiers.clone(),
2590 full_name: catalog.resolve_full_name(item.name()),
2591 print_id: false,
2592 version: RelationVersionSelector::Latest,
2593 },
2594 args: FunctionArgs::Args {
2595 args: Vec::new(),
2596 order_by: Vec::new(),
2597 },
2598 filter: None,
2599 over: None,
2600 distinct: false,
2601 }),
2602 )
2603 };
2604 let (mz_timestamp_id, mz_timestamp_type) = {
2606 let item = catalog.get_system_type("mz_timestamp");
2607 let full_name = catalog.resolve_full_name(item.name());
2608 (
2609 item.id(),
2610 ResolvedDataType::Named {
2611 id: item.id(),
2612 qualifiers: item.name().qualifiers.clone(),
2613 full_name,
2614 modifiers: vec![],
2615 print_id: true,
2616 },
2617 )
2618 };
2619
2620 let mut introduced_mz_timestamp = false;
2621
2622 for option in cmvs.with_options.iter_mut() {
2623 if matches!(
2625 option.value,
2626 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2627 ) {
2628 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2629 RefreshAtOptionValue {
2630 time: mz_now_expr.clone(),
2631 },
2632 )));
2633 }
2634
2635 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2637 RefreshEveryOptionValue { aligned_to, .. },
2638 ))) = &mut option.value
2639 {
2640 if aligned_to.is_none() {
2641 *aligned_to = Some(mz_now_expr.clone());
2642 }
2643 }
2644
2645 match &mut option.value {
2648 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2649 time,
2650 }))) => {
2651 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2652 visitor.visit_expr_mut(time);
2653 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2654 }
2655 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2656 RefreshEveryOptionValue {
2657 interval: _,
2658 aligned_to: Some(aligned_to),
2659 },
2660 ))) => {
2661 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2662 visitor.visit_expr_mut(aligned_to);
2663 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2664 }
2665 _ => {}
2666 }
2667 }
2668
2669 if !cmvs.with_options.iter().any(|o| {
2671 matches!(
2672 o,
2673 MaterializedViewOption {
2674 value: Some(WithOptionValue::Refresh(..)),
2675 ..
2676 }
2677 )
2678 }) {
2679 cmvs.with_options.push(MaterializedViewOption {
2680 name: MaterializedViewOptionName::Refresh,
2681 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2682 })
2683 }
2684
2685 if introduced_mz_timestamp {
2689 resolved_ids.add_item(mz_timestamp_id);
2690 }
2691 let mut visitor = ExprContainsTemporalVisitor::new();
2695 visitor.visit_create_materialized_view_statement(cmvs);
2696 if !visitor.contains_temporal {
2697 resolved_ids.remove_item(&mz_now_id);
2698 }
2699}
2700
2701pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2704 match &mvo.value {
2705 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2706 let mut visitor = ExprContainsTemporalVisitor::new();
2707 visitor.visit_expr(time);
2708 visitor.contains_temporal
2709 }
2710 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2711 interval: _,
2712 aligned_to: Some(aligned_to),
2713 }))) => {
2714 let mut visitor = ExprContainsTemporalVisitor::new();
2715 visitor.visit_expr(aligned_to);
2716 visitor.contains_temporal
2717 }
2718 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2719 interval: _,
2720 aligned_to: None,
2721 }))) => {
2722 true
2725 }
2726 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2727 true
2729 }
2730 _ => false,
2731 }
2732}
2733
2734struct ExprContainsTemporalVisitor {
2736 pub contains_temporal: bool,
2737}
2738
2739impl ExprContainsTemporalVisitor {
2740 pub fn new() -> ExprContainsTemporalVisitor {
2741 ExprContainsTemporalVisitor {
2742 contains_temporal: false,
2743 }
2744 }
2745}
2746
2747impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2748 fn visit_function(&mut self, func: &Function<Aug>) {
2749 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2750 visit_function(self, func);
2751 }
2752}
2753
2754struct MzNowPurifierVisitor {
2755 pub mz_now: Option<Timestamp>,
2756 pub mz_timestamp_type: ResolvedDataType,
2757 pub introduced_mz_timestamp: bool,
2758}
2759
2760impl MzNowPurifierVisitor {
2761 pub fn new(
2762 mz_now: Option<Timestamp>,
2763 mz_timestamp_type: ResolvedDataType,
2764 ) -> MzNowPurifierVisitor {
2765 MzNowPurifierVisitor {
2766 mz_now,
2767 mz_timestamp_type,
2768 introduced_mz_timestamp: false,
2769 }
2770 }
2771}
2772
2773impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2774 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2775 match expr {
2776 Expr::Function(Function {
2777 name:
2778 ResolvedItemName::Item {
2779 full_name: FullItemName { item, .. },
2780 ..
2781 },
2782 ..
2783 }) if item == &MZ_NOW_NAME.to_string() => {
2784 let mz_now = self.mz_now.expect(
2785 "we should have chosen a timestamp if the expression contains mz_now()",
2786 );
2787 *expr = Expr::Cast {
2790 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2791 data_type: self.mz_timestamp_type.clone(),
2792 };
2793 self.introduced_mz_timestamp = true;
2794 }
2795 _ => visit_expr_mut(self, expr),
2796 }
2797 }
2798}