1use std::collections::{BTreeMap, BTreeSet};
15use std::fmt;
16use std::iter;
17use std::path::Path;
18use std::sync::Arc;
19
20use anyhow::anyhow;
21use itertools::Itertools;
22use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
23use mz_controller_types::ClusterId;
24use mz_kafka_util::client::MzClientContext;
25use mz_mysql_util::MySqlTableDesc;
26use mz_ore::error::ErrorExt;
27use mz_ore::future::InTask;
28use mz_ore::iter::IteratorExt;
29use mz_ore::str::StrExt;
30use mz_ore::{assert_none, soft_panic_or_log};
31use mz_postgres_util::desc::PostgresTableDesc;
32use mz_postgres_util::replication::WalLevel;
33use mz_postgres_util::tunnel::PostgresFlavor;
34use mz_proto::RustType;
35use mz_repr::{CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp, strconv};
36use mz_sql_parser::ast::display::AstDisplay;
37use mz_sql_parser::ast::visit::{Visit, visit_function};
38use mz_sql_parser::ast::visit_mut::{VisitMut, visit_expr_mut};
39use mz_sql_parser::ast::{
40 AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
41 ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOptionName,
42 CreateSinkStatement, CreateSubsourceOption, CreateSubsourceOptionName,
43 CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
44 CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
45 DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
46 KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
47 MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
48 PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
49 RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope,
50 SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableFromSourceColumns,
51 TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
52};
53use mz_sql_server_util::desc::SqlServerTableDesc;
54use mz_storage_types::configuration::StorageConfiguration;
55use mz_storage_types::connections::inline::IntoInlineConnection;
56use mz_storage_types::connections::{Connection, PostgresConnection};
57use mz_storage_types::errors::ContextCreationError;
58use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
59use mz_storage_types::sources::mysql::MySqlSourceDetails;
60use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
61use mz_storage_types::sources::{
62 GenericSourceConnection, PostgresSourceConnection, SourceConnection, SourceDesc,
63 SourceExportStatementDetails, SqlServerSourceExtras,
64};
65use prost::Message;
66use protobuf_native::MessageLite;
67use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
68use rdkafka::admin::AdminClient;
69use references::{RetrievedSourceReferences, SourceReferenceClient};
70use uuid::Uuid;
71
72use crate::ast::{
73 AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
74 CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
75 ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
76};
77use crate::catalog::{CatalogItemType, SessionCatalog};
78use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
79use crate::names::{
80 Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
81 ResolvedItemName,
82};
83use crate::plan::error::PlanError;
84use crate::plan::statement::ddl::load_generator_ast_to_generator;
85use crate::plan::{SourceReferences, StatementContext};
86use crate::pure::error::SqlServerSourcePurificationError;
87use crate::session::vars::ENABLE_SQL_SERVER_SOURCE;
88use crate::{kafka_util, normalize};
89
90use self::error::{
91 CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
92 LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
93};
94
95pub(crate) mod error;
96mod references;
97
98pub mod mysql;
99pub mod postgres;
100pub mod sql_server;
101
102pub(crate) struct RequestedSourceExport<T> {
103 external_reference: UnresolvedItemName,
104 name: UnresolvedItemName,
105 meta: T,
106}
107
108impl<T: fmt::Debug> fmt::Debug for RequestedSourceExport<T> {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 f.debug_struct("RequestedSourceExport")
111 .field("external_reference", &self.external_reference)
112 .field("name", &self.name)
113 .field("meta", &self.meta)
114 .finish()
115 }
116}
117
118impl<T> RequestedSourceExport<T> {
119 fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
120 RequestedSourceExport {
121 external_reference: self.external_reference,
122 name: self.name,
123 meta: new_meta,
124 }
125 }
126}
127
128fn source_export_name_gen(
133 source_name: &UnresolvedItemName,
134 subsource_name: &str,
135) -> Result<UnresolvedItemName, PlanError> {
136 let mut partial = normalize::unresolved_item_name(source_name.clone())?;
137 partial.item = subsource_name.to_string();
138 Ok(UnresolvedItemName::from(partial))
139}
140
141fn validate_source_export_names<T>(
145 requested_source_exports: &[RequestedSourceExport<T>],
146) -> Result<(), PlanError> {
147 if let Some(name) = requested_source_exports
151 .iter()
152 .map(|subsource| &subsource.name)
153 .duplicates()
154 .next()
155 .cloned()
156 {
157 let mut upstream_references: Vec<_> = requested_source_exports
158 .into_iter()
159 .filter_map(|subsource| {
160 if &subsource.name == &name {
161 Some(subsource.external_reference.clone())
162 } else {
163 None
164 }
165 })
166 .collect();
167
168 upstream_references.sort();
169
170 Err(PlanError::SubsourceNameConflict {
171 name,
172 upstream_references,
173 })?;
174 }
175
176 if let Some(name) = requested_source_exports
182 .iter()
183 .map(|export| &export.external_reference)
184 .duplicates()
185 .next()
186 .cloned()
187 {
188 let mut target_names: Vec<_> = requested_source_exports
189 .into_iter()
190 .filter_map(|export| {
191 if &export.external_reference == &name {
192 Some(export.name.clone())
193 } else {
194 None
195 }
196 })
197 .collect();
198
199 target_names.sort();
200
201 Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
202 }
203
204 Ok(())
205}
206
207#[derive(Debug, Clone, PartialEq, Eq)]
208pub enum PurifiedStatement {
209 PurifiedCreateSource {
210 create_progress_subsource_stmt: CreateSubsourceStatement<Aug>,
211 create_source_stmt: CreateSourceStatement<Aug>,
212 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
214 available_source_references: SourceReferences,
217 },
218 PurifiedAlterSource {
219 alter_source_stmt: AlterSourceStatement<Aug>,
220 },
221 PurifiedAlterSourceAddSubsources {
222 source_name: ResolvedItemName,
224 options: Vec<AlterSourceAddSubsourceOption<Aug>>,
227 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
229 },
230 PurifiedAlterSourceRefreshReferences {
231 source_name: ResolvedItemName,
232 available_source_references: SourceReferences,
234 },
235 PurifiedCreateSink(CreateSinkStatement<Aug>),
236 PurifiedCreateTableFromSource {
237 stmt: CreateTableFromSourceStatement<Aug>,
238 },
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct PurifiedSourceExport {
243 pub external_reference: UnresolvedItemName,
244 pub details: PurifiedExportDetails,
245}
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub enum PurifiedExportDetails {
249 MySql {
250 table: MySqlTableDesc,
251 text_columns: Option<Vec<Ident>>,
252 exclude_columns: Option<Vec<Ident>>,
253 initial_gtid_set: String,
254 },
255 Postgres {
256 table: PostgresTableDesc,
257 text_columns: Option<Vec<Ident>>,
258 },
259 SqlServer {
260 table: SqlServerTableDesc,
261 text_columns: Option<Vec<Ident>>,
262 excl_columns: Option<Vec<Ident>>,
263 capture_instance: Arc<str>,
264 },
265 Kafka {},
266 LoadGenerator {
267 table: Option<RelationDesc>,
268 output: LoadGeneratorOutput,
269 },
270}
271
272pub async fn purify_statement(
282 catalog: impl SessionCatalog,
283 now: u64,
284 stmt: Statement<Aug>,
285 storage_configuration: &StorageConfiguration,
286) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
287 match stmt {
288 Statement::CreateSource(stmt) => {
289 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
290 (
291 purify_create_source(catalog, now, stmt, storage_configuration).await,
292 cluster_id,
293 )
294 }
295 Statement::AlterSource(stmt) => (
296 purify_alter_source(catalog, stmt, storage_configuration).await,
297 None,
298 ),
299 Statement::CreateSink(stmt) => {
300 let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
301 (
302 purify_create_sink(catalog, stmt, storage_configuration).await,
303 cluster_id,
304 )
305 }
306 Statement::CreateTableFromSource(stmt) => (
307 purify_create_table_from_source(catalog, stmt, storage_configuration).await,
308 None,
309 ),
310 o => unreachable!("{:?} does not need to be purified", o),
311 }
312}
313
314pub(crate) fn purify_create_sink_avro_doc_on_options(
319 catalog: &dyn SessionCatalog,
320 from_id: CatalogItemId,
321 format: &mut Option<FormatSpecifier<Aug>>,
322) -> Result<(), PlanError> {
323 let from = catalog.get_item(&from_id);
325 let object_ids = from
326 .references()
327 .items()
328 .copied()
329 .chain_one(from.id())
330 .collect::<Vec<_>>();
331
332 let mut avro_format_options = vec![];
335 for_each_format(format, |doc_on_schema, fmt| match fmt {
336 Format::Avro(AvroSchema::InlineSchema { .. })
337 | Format::Bytes
338 | Format::Csv { .. }
339 | Format::Json { .. }
340 | Format::Protobuf(..)
341 | Format::Regex(..)
342 | Format::Text => (),
343 Format::Avro(AvroSchema::Csr {
344 csr_connection: CsrConnectionAvro { connection, .. },
345 }) => {
346 avro_format_options.push((doc_on_schema, &mut connection.options));
347 }
348 });
349
350 for (for_schema, options) in avro_format_options {
353 let user_provided_comments = options
354 .iter()
355 .filter_map(|CsrConfigOption { name, .. }| match name {
356 CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
357 _ => None,
358 })
359 .collect::<BTreeSet<_>>();
360
361 for object_id in &object_ids {
363 let item = catalog
365 .get_item(object_id)
366 .at_version(RelationVersionSelector::Latest);
367 let full_name = catalog.resolve_full_name(item.name());
368 let full_resolved_name = ResolvedItemName::Item {
369 id: *object_id,
370 qualifiers: item.name().qualifiers.clone(),
371 full_name: full_name.clone(),
372 print_id: !matches!(
373 item.item_type(),
374 CatalogItemType::Func | CatalogItemType::Type
375 ),
376 version: RelationVersionSelector::Latest,
377 };
378
379 if let Some(comments_map) = catalog.get_item_comments(object_id) {
380 let doc_on_item_key = AvroDocOn {
383 identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
384 for_schema,
385 };
386 if !user_provided_comments.contains(&doc_on_item_key) {
387 if let Some(root_comment) = comments_map.get(&None) {
388 options.push(CsrConfigOption {
389 name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
390 value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
391 root_comment.clone(),
392 ))),
393 });
394 }
395 }
396
397 if let Ok(desc) = item.desc(&full_name) {
401 for (pos, column_name) in desc.iter_names().enumerate() {
402 let comment = comments_map.get(&Some(pos + 1));
403 if let Some(comment_str) = comment {
404 let doc_on_column_key = AvroDocOn {
405 identifier: DocOnIdentifier::Column(ColumnName {
406 relation: full_resolved_name.clone(),
407 column: ResolvedColumnReference::Column {
408 name: column_name.to_owned(),
409 index: pos,
410 },
411 }),
412 for_schema,
413 };
414 if !user_provided_comments.contains(&doc_on_column_key) {
415 options.push(CsrConfigOption {
416 name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
417 value: Some(mz_sql_parser::ast::WithOptionValue::Value(
418 Value::String(comment_str.clone()),
419 )),
420 });
421 }
422 }
423 }
424 }
425 }
426 }
427 }
428
429 Ok(())
430}
431
432async fn purify_create_sink(
440 catalog: impl SessionCatalog,
441 mut create_sink_stmt: CreateSinkStatement<Aug>,
442 storage_configuration: &StorageConfiguration,
443) -> Result<PurifiedStatement, PlanError> {
444 let CreateSinkStatement {
446 connection,
447 format,
448 with_options,
449 name: _,
450 in_cluster: _,
451 if_not_exists: _,
452 from,
453 envelope: _,
454 } = &mut create_sink_stmt;
455
456 const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
458
459 if let Some(op) = with_options
460 .iter()
461 .find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
462 {
463 sql_bail!(
464 "CREATE SINK...WITH ({}..) is not allowed",
465 op.name.to_ast_string_simple(),
466 )
467 }
468
469 match &connection {
470 CreateSinkConnection::Kafka {
471 connection,
472 options,
473 key: _,
474 headers: _,
475 } => {
476 let scx = StatementContext::new(None, &catalog);
477 let connection = {
478 let item = scx.get_item_by_resolved_name(connection)?;
479 match item.connection()? {
481 Connection::Kafka(connection) => {
482 connection.clone().into_inline_connection(scx.catalog)
483 }
484 _ => sql_bail!(
485 "{} is not a kafka connection",
486 scx.catalog.resolve_full_name(item.name())
487 ),
488 }
489 };
490
491 let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
492
493 if extracted_options.legacy_ids == Some(true) {
494 sql_bail!("LEGACY IDs option is not supported");
495 }
496
497 let client: AdminClient<_> = connection
498 .create_with_context(
499 storage_configuration,
500 MzClientContext::default(),
501 &BTreeMap::new(),
502 InTask::No,
503 )
504 .await
505 .map_err(|e| {
506 KafkaSinkPurificationError::AdminClientError(Arc::new(e))
508 })?;
509
510 let metadata = client
511 .inner()
512 .fetch_metadata(
513 None,
514 storage_configuration
515 .parameters
516 .kafka_timeout_config
517 .fetch_metadata_timeout,
518 )
519 .map_err(|e| {
520 KafkaSinkPurificationError::AdminClientError(Arc::new(
521 ContextCreationError::KafkaError(e),
522 ))
523 })?;
524
525 if metadata.brokers().len() == 0 {
526 Err(KafkaSinkPurificationError::ZeroBrokers)?;
527 }
528 }
529 }
530
531 let mut csr_connection_ids = BTreeSet::new();
532 for_each_format(format, |_, fmt| match fmt {
533 Format::Avro(AvroSchema::InlineSchema { .. })
534 | Format::Bytes
535 | Format::Csv { .. }
536 | Format::Json { .. }
537 | Format::Protobuf(ProtobufSchema::InlineSchema { .. })
538 | Format::Regex(..)
539 | Format::Text => (),
540 Format::Avro(AvroSchema::Csr {
541 csr_connection: CsrConnectionAvro { connection, .. },
542 })
543 | Format::Protobuf(ProtobufSchema::Csr {
544 csr_connection: CsrConnectionProtobuf { connection, .. },
545 }) => {
546 csr_connection_ids.insert(*connection.connection.item_id());
547 }
548 });
549
550 let scx = StatementContext::new(None, &catalog);
551 for csr_connection_id in csr_connection_ids {
552 let connection = {
553 let item = scx.get_item(&csr_connection_id);
554 match item.connection()? {
556 Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
557 _ => Err(CsrPurificationError::NotCsrConnection(
558 scx.catalog.resolve_full_name(item.name()),
559 ))?,
560 }
561 };
562
563 let client = connection
564 .connect(storage_configuration, InTask::No)
565 .await
566 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
567
568 client
569 .list_subjects()
570 .await
571 .map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
572 }
573
574 purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
575
576 Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
577}
578
579fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
586where
587 F: FnMut(DocOnSchema, &'a mut Format<Aug>),
588{
589 match format {
590 None => (),
591 Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
592 Some(FormatSpecifier::KeyValue { key, value }) => {
593 f(DocOnSchema::KeyOnly, key);
594 f(DocOnSchema::ValueOnly, value);
595 }
596 }
597}
598
599#[derive(Debug, Copy, Clone, PartialEq, Eq)]
602pub(crate) enum SourceReferencePolicy {
603 NotAllowed,
606 Optional,
609 Required,
612}
613
614async fn purify_create_source(
615 catalog: impl SessionCatalog,
616 now: u64,
617 mut create_source_stmt: CreateSourceStatement<Aug>,
618 storage_configuration: &StorageConfiguration,
619) -> Result<PurifiedStatement, PlanError> {
620 let CreateSourceStatement {
621 name: source_name,
622 connection: source_connection,
623 format,
624 envelope,
625 include_metadata,
626 external_references,
627 progress_subsource,
628 ..
629 } = &mut create_source_stmt;
630
631 if let Some(DeferredItemName::Named(_)) = progress_subsource {
632 sql_bail!("Cannot manually ID qualify progress subsource")
633 }
634
635 let mut requested_subsource_map = BTreeMap::new();
636
637 let progress_desc = match &source_connection {
638 CreateSourceConnection::Kafka { .. } => {
639 &mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
640 }
641 CreateSourceConnection::Postgres { .. } | CreateSourceConnection::Yugabyte { .. } => {
642 &mz_storage_types::sources::postgres::PG_PROGRESS_DESC
643 }
644 CreateSourceConnection::SqlServer { .. } => {
645 &mz_storage_types::sources::sql_server::SQL_SERVER_PROGRESS_DESC
646 }
647 CreateSourceConnection::MySql { .. } => {
648 &mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
649 }
650 CreateSourceConnection::LoadGenerator { .. } => {
651 &mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
652 }
653 };
654 let scx = StatementContext::new(None, &catalog);
655
656 let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
660 && scx.catalog.system_vars().force_source_table_syntax()
661 {
662 SourceReferencePolicy::NotAllowed
663 } else if scx.catalog.system_vars().enable_create_table_from_source() {
664 SourceReferencePolicy::Optional
665 } else {
666 SourceReferencePolicy::Required
667 };
668
669 let mut format_options = SourceFormatOptions::Default;
670
671 let retrieved_source_references: RetrievedSourceReferences;
672
673 match source_connection {
674 CreateSourceConnection::Kafka {
675 connection,
676 options: base_with_options,
677 ..
678 } => {
679 if let Some(external_references) = external_references {
680 Err(KafkaSourcePurificationError::ReferencedSubsources(
681 external_references.clone(),
682 ))?;
683 }
684
685 let connection = {
686 let item = scx.get_item_by_resolved_name(connection)?;
687 match item.connection()? {
689 Connection::Kafka(connection) => {
690 connection.clone().into_inline_connection(&catalog)
691 }
692 _ => Err(KafkaSourcePurificationError::NotKafkaConnection(
693 scx.catalog.resolve_full_name(item.name()),
694 ))?,
695 }
696 };
697
698 let extracted_options: KafkaSourceConfigOptionExtracted =
699 base_with_options.clone().try_into()?;
700
701 let topic = extracted_options
702 .topic
703 .ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
704
705 let consumer = connection
706 .create_with_context(
707 storage_configuration,
708 MzClientContext::default(),
709 &BTreeMap::new(),
710 InTask::No,
711 )
712 .await
713 .map_err(|e| {
714 KafkaSourcePurificationError::KafkaConsumerError(
716 e.display_with_causes().to_string(),
717 )
718 })?;
719 let consumer = Arc::new(consumer);
720
721 match (
722 extracted_options.start_offset,
723 extracted_options.start_timestamp,
724 ) {
725 (None, None) => {
726 kafka_util::ensure_topic_exists(
728 Arc::clone(&consumer),
729 &topic,
730 storage_configuration
731 .parameters
732 .kafka_timeout_config
733 .fetch_metadata_timeout,
734 )
735 .await?;
736 }
737 (Some(_), Some(_)) => {
738 sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
739 }
740 (Some(start_offsets), None) => {
741 kafka_util::validate_start_offsets(
743 Arc::clone(&consumer),
744 &topic,
745 start_offsets,
746 storage_configuration
747 .parameters
748 .kafka_timeout_config
749 .fetch_metadata_timeout,
750 )
751 .await?;
752 }
753 (None, Some(time_offset)) => {
754 let start_offsets = kafka_util::lookup_start_offsets(
756 Arc::clone(&consumer),
757 &topic,
758 time_offset,
759 now,
760 storage_configuration
761 .parameters
762 .kafka_timeout_config
763 .fetch_metadata_timeout,
764 )
765 .await?;
766
767 base_with_options.retain(|val| {
768 !matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
769 });
770 base_with_options.push(KafkaSourceConfigOption {
771 name: KafkaSourceConfigOptionName::StartOffset,
772 value: Some(WithOptionValue::Sequence(
773 start_offsets
774 .iter()
775 .map(|offset| {
776 WithOptionValue::Value(Value::Number(offset.to_string()))
777 })
778 .collect(),
779 )),
780 });
781 }
782 }
783
784 let reference_client = SourceReferenceClient::Kafka { topic: &topic };
785 retrieved_source_references = reference_client.get_source_references().await?;
786
787 format_options = SourceFormatOptions::Kafka { topic };
788 }
789 source_connection @ CreateSourceConnection::Postgres { .. }
790 | source_connection @ CreateSourceConnection::Yugabyte { .. } => {
791 let (source_flavor, connection, options) = match source_connection {
792 CreateSourceConnection::Postgres {
793 connection,
794 options,
795 } => (PostgresFlavor::Vanilla, connection, options),
796 CreateSourceConnection::Yugabyte {
797 connection,
798 options,
799 } => (PostgresFlavor::Yugabyte, connection, options),
800 _ => unreachable!(),
801 };
802 let connection = {
803 let item = scx.get_item_by_resolved_name(connection)?;
804 match item.connection().map_err(PlanError::from)? {
805 Connection::Postgres(connection) => {
806 let connection = connection.clone().into_inline_connection(&catalog);
807 if connection.flavor != source_flavor {
808 match source_flavor {
809 PostgresFlavor::Vanilla => {
810 Err(PgSourcePurificationError::NotPgConnection(
811 scx.catalog.resolve_full_name(item.name()),
812 ))
813 }
814 PostgresFlavor::Yugabyte => {
815 Err(PgSourcePurificationError::NotYugabyteConnection(
816 scx.catalog.resolve_full_name(item.name()),
817 ))
818 }
819 }
820 } else {
821 Ok(connection)
822 }
823 }
824 _ => match source_flavor {
825 PostgresFlavor::Vanilla => Err(PgSourcePurificationError::NotPgConnection(
826 scx.catalog.resolve_full_name(item.name()),
827 )),
828 PostgresFlavor::Yugabyte => {
829 Err(PgSourcePurificationError::NotYugabyteConnection(
830 scx.catalog.resolve_full_name(item.name()),
831 ))
832 }
833 },
834 }
835 }?;
836 let crate::plan::statement::PgConfigOptionExtracted {
837 publication,
838 text_columns,
839 details,
840 ..
841 } = options.clone().try_into()?;
842 let publication =
843 publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
844
845 if details.is_some() {
846 Err(PgSourcePurificationError::UserSpecifiedDetails)?;
847 }
848
849 let config = connection
851 .config(
852 &storage_configuration.connection_context.secrets_reader,
853 storage_configuration,
854 InTask::No,
855 )
856 .await?;
857
858 let client = config
859 .connect(
860 "postgres_purification",
861 &storage_configuration.connection_context.ssh_tunnel_manager,
862 )
863 .await?;
864
865 let wal_level = mz_postgres_util::get_wal_level(&client).await?;
866
867 if wal_level < WalLevel::Logical {
868 Err(PgSourcePurificationError::InsufficientWalLevel { wal_level })?;
869 }
870
871 let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
872
873 if max_wal_senders < 1 {
874 Err(PgSourcePurificationError::ReplicationDisabled)?;
875 }
876
877 let available_replication_slots =
878 mz_postgres_util::available_replication_slots(&client).await?;
879
880 if available_replication_slots < 2 {
882 Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 2 })?;
883 }
884
885 let reference_client = SourceReferenceClient::Postgres {
886 client: &client,
887 publication: &publication,
888 database: &connection.database,
889 };
890 retrieved_source_references = reference_client.get_source_references().await?;
891
892 let postgres::PurifiedSourceExports {
893 source_exports: subsources,
894 normalized_text_columns,
895 } = postgres::purify_source_exports(
896 &client,
897 &config,
898 &retrieved_source_references,
899 external_references,
900 text_columns,
901 source_name,
902 &reference_policy,
903 )
904 .await?;
905
906 if let Some(text_cols_option) = options
907 .iter_mut()
908 .find(|option| option.name == PgConfigOptionName::TextColumns)
909 {
910 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
911 }
912
913 requested_subsource_map.extend(subsources);
914
915 let replication_client = config
918 .connect_replication(&storage_configuration.connection_context.ssh_tunnel_manager)
919 .await?;
920 let timeline_id = mz_postgres_util::get_timeline_id(&replication_client).await?;
921
922 options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
924 let details = PostgresSourcePublicationDetails {
925 slot: format!(
926 "materialize_{}",
927 Uuid::new_v4().to_string().replace('-', "")
928 ),
929 timeline_id: Some(timeline_id),
930 database: connection.database,
931 };
932 options.push(PgConfigOption {
933 name: PgConfigOptionName::Details,
934 value: Some(WithOptionValue::Value(Value::String(hex::encode(
935 details.into_proto().encode_to_vec(),
936 )))),
937 })
938 }
939 CreateSourceConnection::SqlServer {
940 connection,
941 options,
942 } => {
943 scx.require_feature_flag(&ENABLE_SQL_SERVER_SOURCE)?;
944
945 let connection_item = scx.get_item_by_resolved_name(connection)?;
948 let connection = match connection_item.connection()? {
949 Connection::SqlServer(connection) => {
950 connection.clone().into_inline_connection(&catalog)
951 }
952 _ => Err(SqlServerSourcePurificationError::NotSqlServerConnection(
953 scx.catalog.resolve_full_name(connection_item.name()),
954 ))?,
955 };
956 let crate::plan::statement::ddl::SqlServerConfigOptionExtracted {
957 details,
958 text_columns,
959 exclude_columns,
960 seen: _,
961 } = options.clone().try_into()?;
962
963 if details.is_some() {
964 Err(SqlServerSourcePurificationError::UserSpecifiedDetails)?;
965 }
966
967 let config = connection
968 .resolve_config(
969 &storage_configuration.connection_context.secrets_reader,
970 storage_configuration,
971 InTask::No,
972 )
973 .await?;
974 let (mut client, conn) = mz_sql_server_util::Client::connect(config).await?;
975 mz_ore::task::spawn(|| "sql-server-conn", async move { conn.await });
978
979 let mut replication_errors = vec![];
984 for error in [
985 mz_sql_server_util::inspect::ensure_database_cdc_enabled(&mut client).await,
986 mz_sql_server_util::inspect::ensure_snapshot_isolation_enabled(&mut client).await,
987 ] {
988 match error {
989 Err(mz_sql_server_util::SqlServerError::InvalidSystemSetting {
990 name,
991 expected,
992 actual,
993 }) => replication_errors.push((name, expected, actual)),
994 Err(other) => Err(other)?,
995 Ok(()) => (),
996 }
997 }
998 if !replication_errors.is_empty() {
999 Err(SqlServerSourcePurificationError::ReplicationSettingsError(
1000 replication_errors,
1001 ))?;
1002 }
1003
1004 let database: Arc<str> = connection.database.into();
1007 let reference_client = SourceReferenceClient::SqlServer {
1008 client: &mut client,
1009 database: Arc::clone(&database),
1010 };
1011 retrieved_source_references = reference_client.get_source_references().await?;
1012 tracing::debug!(?retrieved_source_references, "got source references");
1013
1014 let purified_source_exports = sql_server::purify_source_exports(
1015 &*database,
1016 &mut client,
1017 &retrieved_source_references,
1018 external_references,
1019 &text_columns,
1020 &exclude_columns,
1021 source_name,
1022 &reference_policy,
1023 )
1024 .await?;
1025
1026 let sql_server::PurifiedSourceExports {
1027 source_exports,
1028 normalized_text_columns,
1029 normalized_excl_columns,
1030 } = purified_source_exports;
1031
1032 requested_subsource_map.extend(source_exports);
1034
1035 let details = SqlServerSourceExtras {};
1038 options.retain(|SqlServerConfigOption { name, .. }| {
1039 name != &SqlServerConfigOptionName::Details
1040 });
1041 options.push(SqlServerConfigOption {
1042 name: SqlServerConfigOptionName::Details,
1043 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1044 details.into_proto().encode_to_vec(),
1045 )))),
1046 });
1047
1048 if let Some(text_cols_option) = options
1050 .iter_mut()
1051 .find(|option| option.name == SqlServerConfigOptionName::TextColumns)
1052 {
1053 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1054 }
1055 if let Some(excl_cols_option) = options
1056 .iter_mut()
1057 .find(|option| option.name == SqlServerConfigOptionName::ExcludeColumns)
1058 {
1059 excl_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1060 }
1061 }
1062 CreateSourceConnection::MySql {
1063 connection,
1064 options,
1065 } => {
1066 let connection_item = scx.get_item_by_resolved_name(connection)?;
1067 let connection = match connection_item.connection()? {
1068 Connection::MySql(connection) => {
1069 connection.clone().into_inline_connection(&catalog)
1070 }
1071 _ => Err(MySqlSourcePurificationError::NotMySqlConnection(
1072 scx.catalog.resolve_full_name(connection_item.name()),
1073 ))?,
1074 };
1075 let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
1076 details,
1077 text_columns,
1078 exclude_columns,
1079 seen: _,
1080 } = options.clone().try_into()?;
1081
1082 if details.is_some() {
1083 Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
1084 }
1085
1086 let config = connection
1087 .config(
1088 &storage_configuration.connection_context.secrets_reader,
1089 storage_configuration,
1090 InTask::No,
1091 )
1092 .await?;
1093
1094 let mut conn = config
1095 .connect(
1096 "mysql purification",
1097 &storage_configuration.connection_context.ssh_tunnel_manager,
1098 )
1099 .await?;
1100
1101 let mut replication_errors = vec![];
1103 for error in [
1104 mz_mysql_util::ensure_gtid_consistency(&mut conn)
1105 .await
1106 .err(),
1107 mz_mysql_util::ensure_full_row_binlog_format(&mut conn)
1108 .await
1109 .err(),
1110 mz_mysql_util::ensure_replication_commit_order(&mut conn)
1111 .await
1112 .err(),
1113 ] {
1114 match error {
1115 Some(mz_mysql_util::MySqlError::InvalidSystemSetting {
1116 setting,
1117 expected,
1118 actual,
1119 }) => {
1120 replication_errors.push((setting, expected, actual));
1121 }
1122 Some(err) => Err(err)?,
1123 None => (),
1124 }
1125 }
1126 if !replication_errors.is_empty() {
1127 Err(MySqlSourcePurificationError::ReplicationSettingsError(
1128 replication_errors,
1129 ))?;
1130 }
1131
1132 let initial_gtid_set =
1136 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1137
1138 let reference_client = SourceReferenceClient::MySql {
1139 conn: &mut conn,
1140 include_system_schemas: mysql::references_system_schemas(external_references),
1141 };
1142 retrieved_source_references = reference_client.get_source_references().await?;
1143
1144 let mysql::PurifiedSourceExports {
1145 source_exports: subsources,
1146 normalized_text_columns,
1147 normalized_exclude_columns,
1148 } = mysql::purify_source_exports(
1149 &mut conn,
1150 &retrieved_source_references,
1151 external_references,
1152 text_columns,
1153 exclude_columns,
1154 source_name,
1155 initial_gtid_set.clone(),
1156 &reference_policy,
1157 )
1158 .await?;
1159 requested_subsource_map.extend(subsources);
1160
1161 let details = MySqlSourceDetails {};
1164 options
1166 .retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
1167 options.push(MySqlConfigOption {
1168 name: MySqlConfigOptionName::Details,
1169 value: Some(WithOptionValue::Value(Value::String(hex::encode(
1170 details.into_proto().encode_to_vec(),
1171 )))),
1172 });
1173
1174 if let Some(text_cols_option) = options
1175 .iter_mut()
1176 .find(|option| option.name == MySqlConfigOptionName::TextColumns)
1177 {
1178 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1179 }
1180 if let Some(ignore_cols_option) = options
1181 .iter_mut()
1182 .find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
1183 {
1184 ignore_cols_option.value =
1185 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1186 }
1187 }
1188 CreateSourceConnection::LoadGenerator { generator, options } => {
1189 let load_generator =
1190 load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
1191
1192 let reference_client = SourceReferenceClient::LoadGenerator {
1193 generator: &load_generator,
1194 };
1195 retrieved_source_references = reference_client.get_source_references().await?;
1196 let subsource_references = retrieved_source_references
1200 .all_references()
1201 .iter()
1202 .filter(|r| {
1203 r.load_generator_output().expect("is loadgen") != &LoadGeneratorOutput::Default
1204 })
1205 .collect::<Vec<_>>();
1206
1207 match external_references {
1208 Some(requested)
1209 if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
1210 {
1211 Err(PlanError::UseTablesForSources(requested.to_string()))?
1212 }
1213 Some(requested) => {
1214 let requested_exports = retrieved_source_references
1215 .requested_source_exports(Some(requested), source_name)?;
1216 for export in requested_exports {
1217 requested_subsource_map.insert(
1218 export.name,
1219 PurifiedSourceExport {
1220 external_reference: export.external_reference,
1221 details: PurifiedExportDetails::LoadGenerator {
1222 table: export
1223 .meta
1224 .load_generator_desc()
1225 .expect("is loadgen")
1226 .clone(),
1227 output: export
1228 .meta
1229 .load_generator_output()
1230 .expect("is loadgen")
1231 .clone(),
1232 },
1233 },
1234 );
1235 }
1236 }
1237 None => {
1238 if matches!(reference_policy, SourceReferencePolicy::Required)
1239 && !subsource_references.is_empty()
1240 {
1241 Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
1242 }
1243 }
1244 }
1245
1246 if let LoadGenerator::Clock = generator {
1247 if !options
1248 .iter()
1249 .any(|p| p.name == LoadGeneratorOptionName::AsOf)
1250 {
1251 let now = catalog.now();
1252 options.push(LoadGeneratorOption {
1253 name: LoadGeneratorOptionName::AsOf,
1254 value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
1255 });
1256 }
1257 }
1258 }
1259 }
1260
1261 *external_references = None;
1265
1266 let name = match progress_subsource {
1270 Some(name) => match name {
1271 DeferredItemName::Deferred(name) => name.clone(),
1272 DeferredItemName::Named(_) => unreachable!("already checked for this value"),
1273 },
1274 None => {
1275 let (item, prefix) = source_name.0.split_last().unwrap();
1276 let item_name = Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
1277 let mut suggested_name = prefix.to_vec();
1278 suggested_name.push(candidate.clone());
1279
1280 let partial = normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
1281 let qualified = scx.allocate_qualified_name(partial)?;
1282 let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
1283 let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
1284 Ok::<_, PlanError>(!item_exists && !type_exists)
1285 })?;
1286
1287 let mut full_name = prefix.to_vec();
1288 full_name.push(item_name);
1289 let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
1290 let qualified_name = scx.allocate_qualified_name(full_name)?;
1291 let full_name = scx.catalog.resolve_full_name(&qualified_name);
1292
1293 UnresolvedItemName::from(full_name.clone())
1294 }
1295 };
1296
1297 let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
1298
1299 let create_progress_subsource_stmt = CreateSubsourceStatement {
1301 name,
1302 columns,
1303 of_source: None,
1307 constraints,
1308 if_not_exists: false,
1309 with_options: vec![CreateSubsourceOption {
1310 name: CreateSubsourceOptionName::Progress,
1311 value: Some(WithOptionValue::Value(Value::Boolean(true))),
1312 }],
1313 };
1314
1315 purify_source_format(
1316 &catalog,
1317 format,
1318 &format_options,
1319 envelope,
1320 storage_configuration,
1321 )
1322 .await?;
1323
1324 Ok(PurifiedStatement::PurifiedCreateSource {
1325 create_progress_subsource_stmt,
1326 create_source_stmt,
1327 subsources: requested_subsource_map,
1328 available_source_references: retrieved_source_references.available_source_references(),
1329 })
1330}
1331
1332async fn purify_alter_source(
1335 catalog: impl SessionCatalog,
1336 stmt: AlterSourceStatement<Aug>,
1337 storage_configuration: &StorageConfiguration,
1338) -> Result<PurifiedStatement, PlanError> {
1339 let scx = StatementContext::new(None, &catalog);
1340 let AlterSourceStatement {
1341 source_name: unresolved_source_name,
1342 action,
1343 if_exists,
1344 } = stmt;
1345
1346 let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
1348 Ok(item) => item,
1349 Err(_) if if_exists => {
1350 return Ok(PurifiedStatement::PurifiedAlterSource {
1351 alter_source_stmt: AlterSourceStatement {
1352 source_name: unresolved_source_name,
1353 action,
1354 if_exists,
1355 },
1356 });
1357 }
1358 Err(e) => return Err(e),
1359 };
1360
1361 let desc = match item.source_desc()? {
1363 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1364 None => {
1365 sql_bail!("cannot ALTER this type of source")
1366 }
1367 };
1368
1369 let source_name = item.name();
1370
1371 let resolved_source_name = ResolvedItemName::Item {
1372 id: item.id(),
1373 qualifiers: item.name().qualifiers.clone(),
1374 full_name: scx.catalog.resolve_full_name(source_name),
1375 print_id: true,
1376 version: RelationVersionSelector::Latest,
1377 };
1378
1379 let partial_name = scx.catalog.minimal_qualification(source_name);
1380
1381 match action {
1382 AlterSourceAction::AddSubsources {
1383 external_references,
1384 options,
1385 } => {
1386 if scx.catalog.system_vars().enable_create_table_from_source()
1387 && scx.catalog.system_vars().force_source_table_syntax()
1388 {
1389 Err(PlanError::UseTablesForSources(
1390 "ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
1391 ))?;
1392 }
1393
1394 purify_alter_source_add_subsources(
1395 external_references,
1396 options,
1397 desc,
1398 partial_name,
1399 unresolved_source_name,
1400 resolved_source_name,
1401 storage_configuration,
1402 )
1403 .await
1404 }
1405 AlterSourceAction::RefreshReferences => {
1406 purify_alter_source_refresh_references(
1407 desc,
1408 resolved_source_name,
1409 storage_configuration,
1410 )
1411 .await
1412 }
1413 _ => Ok(PurifiedStatement::PurifiedAlterSource {
1414 alter_source_stmt: AlterSourceStatement {
1415 source_name: unresolved_source_name,
1416 action,
1417 if_exists,
1418 },
1419 }),
1420 }
1421}
1422
1423async fn purify_alter_source_add_subsources(
1426 external_references: Vec<ExternalReferenceExport>,
1427 mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
1428 desc: SourceDesc,
1429 partial_source_name: PartialItemName,
1430 unresolved_source_name: UnresolvedItemName,
1431 resolved_source_name: ResolvedItemName,
1432 storage_configuration: &StorageConfiguration,
1433) -> Result<PurifiedStatement, PlanError> {
1434 match desc.connection {
1436 GenericSourceConnection::Postgres(PostgresSourceConnection {
1437 connection:
1438 PostgresConnection {
1439 flavor: PostgresFlavor::Vanilla,
1440 ..
1441 },
1442 ..
1443 }) => {}
1444 GenericSourceConnection::MySql(_) => {}
1445 GenericSourceConnection::SqlServer(_) => {}
1446 _ => sql_bail!(
1447 "source {} does not support ALTER SOURCE.",
1448 partial_source_name
1449 ),
1450 };
1451
1452 let connection_name = desc.connection.name();
1453
1454 let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
1455 text_columns,
1456 exclude_columns,
1457 details,
1458 seen: _,
1459 } = options.clone().try_into()?;
1460 assert_none!(details, "details cannot be explicitly set");
1461
1462 let mut requested_subsource_map = BTreeMap::new();
1463
1464 match desc.connection {
1465 GenericSourceConnection::Postgres(pg_source_connection) => {
1466 let pg_connection = &pg_source_connection.connection;
1468
1469 let config = pg_connection
1470 .config(
1471 &storage_configuration.connection_context.secrets_reader,
1472 storage_configuration,
1473 InTask::No,
1474 )
1475 .await?;
1476
1477 let client = config
1478 .connect(
1479 "postgres_purification",
1480 &storage_configuration.connection_context.ssh_tunnel_manager,
1481 )
1482 .await?;
1483
1484 let available_replication_slots =
1485 mz_postgres_util::available_replication_slots(&client).await?;
1486
1487 if available_replication_slots < 1 {
1489 Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
1490 }
1491
1492 if !exclude_columns.is_empty() {
1493 sql_bail!(
1494 "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1495 partial_source_name,
1496 connection_name
1497 )
1498 }
1499
1500 let reference_client = SourceReferenceClient::Postgres {
1501 client: &client,
1502 publication: &pg_source_connection.publication,
1503 database: &pg_connection.database,
1504 };
1505 let retrieved_source_references = reference_client.get_source_references().await?;
1506
1507 let postgres::PurifiedSourceExports {
1508 source_exports: subsources,
1509 normalized_text_columns,
1510 } = postgres::purify_source_exports(
1511 &client,
1512 &config,
1513 &retrieved_source_references,
1514 &Some(ExternalReferences::SubsetTables(external_references)),
1515 text_columns,
1516 &unresolved_source_name,
1517 &SourceReferencePolicy::Required,
1518 )
1519 .await?;
1520
1521 if let Some(text_cols_option) = options
1522 .iter_mut()
1523 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1524 {
1525 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1526 }
1527
1528 requested_subsource_map.extend(subsources);
1529 }
1530 GenericSourceConnection::MySql(mysql_source_connection) => {
1531 let mysql_connection = &mysql_source_connection.connection;
1532 let config = mysql_connection
1533 .config(
1534 &storage_configuration.connection_context.secrets_reader,
1535 storage_configuration,
1536 InTask::No,
1537 )
1538 .await?;
1539
1540 let mut conn = config
1541 .connect(
1542 "mysql purification",
1543 &storage_configuration.connection_context.ssh_tunnel_manager,
1544 )
1545 .await?;
1546
1547 let initial_gtid_set =
1550 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1551
1552 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1553
1554 let reference_client = SourceReferenceClient::MySql {
1555 conn: &mut conn,
1556 include_system_schemas: mysql::references_system_schemas(&requested_references),
1557 };
1558 let retrieved_source_references = reference_client.get_source_references().await?;
1559
1560 let mysql::PurifiedSourceExports {
1561 source_exports: subsources,
1562 normalized_text_columns,
1563 normalized_exclude_columns,
1564 } = mysql::purify_source_exports(
1565 &mut conn,
1566 &retrieved_source_references,
1567 &requested_references,
1568 text_columns,
1569 exclude_columns,
1570 &unresolved_source_name,
1571 initial_gtid_set,
1572 &SourceReferencePolicy::Required,
1573 )
1574 .await?;
1575 requested_subsource_map.extend(subsources);
1576
1577 if let Some(text_cols_option) = options
1579 .iter_mut()
1580 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1581 {
1582 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1583 }
1584 if let Some(ignore_cols_option) = options
1585 .iter_mut()
1586 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1587 {
1588 ignore_cols_option.value =
1589 Some(WithOptionValue::Sequence(normalized_exclude_columns));
1590 }
1591 }
1592 GenericSourceConnection::SqlServer(sql_server_source) => {
1593 let sql_server_connection = &sql_server_source.connection;
1595 let config = sql_server_connection
1596 .resolve_config(
1597 &storage_configuration.connection_context.secrets_reader,
1598 storage_configuration,
1599 InTask::No,
1600 )
1601 .await?;
1602 let (mut client, conn) = mz_sql_server_util::Client::connect(config).await?;
1603 mz_ore::task::spawn(|| "sql-server-connection", async move {
1605 conn.await;
1606 });
1607
1608 let database = sql_server_connection.database.clone().into();
1610 let source_references = SourceReferenceClient::SqlServer {
1611 client: &mut client,
1612 database: Arc::clone(&database),
1613 }
1614 .get_source_references()
1615 .await?;
1616 let requested_references = Some(ExternalReferences::SubsetTables(external_references));
1617
1618 let result = sql_server::purify_source_exports(
1619 &*database,
1620 &mut client,
1621 &source_references,
1622 &requested_references,
1623 &text_columns,
1624 &exclude_columns,
1625 &unresolved_source_name,
1626 &SourceReferencePolicy::Required,
1627 )
1628 .await;
1629 let sql_server::PurifiedSourceExports {
1630 source_exports,
1631 normalized_text_columns,
1632 normalized_excl_columns,
1633 } = result?;
1634
1635 requested_subsource_map.extend(source_exports);
1637
1638 if let Some(text_cols_option) = options
1640 .iter_mut()
1641 .find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
1642 {
1643 text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
1644 }
1645 if let Some(ignore_cols_option) = options
1646 .iter_mut()
1647 .find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
1648 {
1649 ignore_cols_option.value = Some(WithOptionValue::Sequence(normalized_excl_columns));
1650 }
1651 }
1652 _ => unreachable!(),
1653 };
1654
1655 Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
1656 source_name: resolved_source_name,
1657 options,
1658 subsources: requested_subsource_map,
1659 })
1660}
1661
1662async fn purify_alter_source_refresh_references(
1663 desc: SourceDesc,
1664 resolved_source_name: ResolvedItemName,
1665 storage_configuration: &StorageConfiguration,
1666) -> Result<PurifiedStatement, PlanError> {
1667 let retrieved_source_references = match desc.connection {
1668 GenericSourceConnection::Postgres(pg_source_connection) => {
1669 let pg_connection = &pg_source_connection.connection;
1671
1672 let config = pg_connection
1673 .config(
1674 &storage_configuration.connection_context.secrets_reader,
1675 storage_configuration,
1676 InTask::No,
1677 )
1678 .await?;
1679
1680 let client = config
1681 .connect(
1682 "postgres_purification",
1683 &storage_configuration.connection_context.ssh_tunnel_manager,
1684 )
1685 .await?;
1686 let reference_client = SourceReferenceClient::Postgres {
1687 client: &client,
1688 publication: &pg_source_connection.publication,
1689 database: &pg_connection.database,
1690 };
1691 reference_client.get_source_references().await?
1692 }
1693 GenericSourceConnection::MySql(mysql_source_connection) => {
1694 let mysql_connection = &mysql_source_connection.connection;
1695 let config = mysql_connection
1696 .config(
1697 &storage_configuration.connection_context.secrets_reader,
1698 storage_configuration,
1699 InTask::No,
1700 )
1701 .await?;
1702
1703 let mut conn = config
1704 .connect(
1705 "mysql purification",
1706 &storage_configuration.connection_context.ssh_tunnel_manager,
1707 )
1708 .await?;
1709
1710 let reference_client = SourceReferenceClient::MySql {
1711 conn: &mut conn,
1712 include_system_schemas: false,
1713 };
1714 reference_client.get_source_references().await?
1715 }
1716 GenericSourceConnection::SqlServer(sql_server_source) => {
1717 let sql_server_connection = &sql_server_source.connection;
1719 let config = sql_server_connection
1720 .resolve_config(
1721 &storage_configuration.connection_context.secrets_reader,
1722 storage_configuration,
1723 InTask::No,
1724 )
1725 .await?;
1726 let (mut client, conn) = mz_sql_server_util::Client::connect(config).await?;
1727 mz_ore::task::spawn(|| "sql-server-connection", async move {
1729 conn.await;
1730 });
1731
1732 let source_references = SourceReferenceClient::SqlServer {
1734 client: &mut client,
1735 database: sql_server_connection.database.clone().into(),
1736 }
1737 .get_source_references()
1738 .await?;
1739 source_references
1740 }
1741 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1742 let reference_client = SourceReferenceClient::LoadGenerator {
1743 generator: &load_gen_connection.load_generator,
1744 };
1745 reference_client.get_source_references().await?
1746 }
1747 GenericSourceConnection::Kafka(kafka_conn) => {
1748 let reference_client = SourceReferenceClient::Kafka {
1749 topic: &kafka_conn.topic,
1750 };
1751 reference_client.get_source_references().await?
1752 }
1753 };
1754 Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
1755 source_name: resolved_source_name,
1756 available_source_references: retrieved_source_references.available_source_references(),
1757 })
1758}
1759
1760async fn purify_create_table_from_source(
1761 catalog: impl SessionCatalog,
1762 mut stmt: CreateTableFromSourceStatement<Aug>,
1763 storage_configuration: &StorageConfiguration,
1764) -> Result<PurifiedStatement, PlanError> {
1765 let scx = StatementContext::new(None, &catalog);
1766 let CreateTableFromSourceStatement {
1767 name: _,
1768 columns,
1769 constraints,
1770 source: source_name,
1771 if_not_exists: _,
1772 external_reference,
1773 format,
1774 envelope,
1775 include_metadata: _,
1776 with_options,
1777 } = &mut stmt;
1778
1779 if matches!(columns, TableFromSourceColumns::Defined(_)) {
1781 sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
1782 }
1783 if !constraints.is_empty() {
1784 sql_bail!(
1785 "CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
1786 );
1787 }
1788
1789 let item = match scx.get_item_by_resolved_name(source_name) {
1791 Ok(item) => item,
1792 Err(e) => return Err(e),
1793 };
1794
1795 let desc = match item.source_desc()? {
1797 Some(desc) => desc.clone().into_inline_connection(scx.catalog),
1798 None => {
1799 sql_bail!("cannot ALTER this type of source")
1800 }
1801 };
1802 let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
1803 let qualified_source_name = item.name();
1804 let connection_name = desc.connection.name();
1805
1806 let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
1807 text_columns,
1808 exclude_columns,
1809 details,
1810 partition_by: _,
1811 seen: _,
1812 } = with_options.clone().try_into()?;
1813 assert_none!(details, "details cannot be explicitly set");
1814
1815 let qualified_text_columns = text_columns
1819 .iter()
1820 .map(|col| {
1821 UnresolvedItemName(
1822 external_reference
1823 .as_ref()
1824 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1825 .unwrap_or_else(|| vec![col.clone()]),
1826 )
1827 })
1828 .collect_vec();
1829 let qualified_exclude_columns = exclude_columns
1830 .iter()
1831 .map(|col| {
1832 UnresolvedItemName(
1833 external_reference
1834 .as_ref()
1835 .map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
1836 .unwrap_or_else(|| vec![col.clone()]),
1837 )
1838 })
1839 .collect_vec();
1840
1841 let mut format_options = SourceFormatOptions::Default;
1843
1844 let retrieved_source_references: RetrievedSourceReferences;
1845
1846 let requested_references = external_reference.as_ref().map(|ref_name| {
1847 ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
1848 reference: ref_name.clone(),
1849 alias: None,
1850 }])
1851 });
1852
1853 let purified_export = match desc.connection {
1856 GenericSourceConnection::Postgres(pg_source_connection) => {
1857 let pg_connection = &pg_source_connection.connection;
1859
1860 let config = pg_connection
1861 .config(
1862 &storage_configuration.connection_context.secrets_reader,
1863 storage_configuration,
1864 InTask::No,
1865 )
1866 .await?;
1867
1868 let client = config
1869 .connect(
1870 "postgres_purification",
1871 &storage_configuration.connection_context.ssh_tunnel_manager,
1872 )
1873 .await?;
1874
1875 let available_replication_slots =
1876 mz_postgres_util::available_replication_slots(&client).await?;
1877
1878 if available_replication_slots < 1 {
1880 Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
1881 }
1882
1883 if !exclude_columns.is_empty() {
1885 sql_bail!(
1886 "{} is a {} source, which does not support EXCLUDE COLUMNS.",
1887 scx.catalog.minimal_qualification(qualified_source_name),
1888 connection_name
1889 )
1890 }
1891
1892 let reference_client = SourceReferenceClient::Postgres {
1893 client: &client,
1894 publication: &pg_source_connection.publication,
1895 database: &pg_connection.database,
1896 };
1897 retrieved_source_references = reference_client.get_source_references().await?;
1898
1899 let postgres::PurifiedSourceExports {
1900 source_exports,
1901 normalized_text_columns: _,
1905 } = postgres::purify_source_exports(
1906 &client,
1907 &config,
1908 &retrieved_source_references,
1909 &requested_references,
1910 qualified_text_columns,
1911 &unresolved_source_name,
1912 &SourceReferencePolicy::Required,
1913 )
1914 .await?;
1915 let (_, purified_export) = source_exports.into_iter().next().unwrap();
1917 purified_export
1918 }
1919 GenericSourceConnection::MySql(mysql_source_connection) => {
1920 let mysql_connection = &mysql_source_connection.connection;
1921 let config = mysql_connection
1922 .config(
1923 &storage_configuration.connection_context.secrets_reader,
1924 storage_configuration,
1925 InTask::No,
1926 )
1927 .await?;
1928
1929 let mut conn = config
1930 .connect(
1931 "mysql purification",
1932 &storage_configuration.connection_context.ssh_tunnel_manager,
1933 )
1934 .await?;
1935
1936 let initial_gtid_set =
1939 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
1940
1941 let reference_client = SourceReferenceClient::MySql {
1942 conn: &mut conn,
1943 include_system_schemas: mysql::references_system_schemas(&requested_references),
1944 };
1945 retrieved_source_references = reference_client.get_source_references().await?;
1946
1947 let mysql::PurifiedSourceExports {
1948 source_exports,
1949 normalized_text_columns: _,
1953 normalized_exclude_columns: _,
1954 } = mysql::purify_source_exports(
1955 &mut conn,
1956 &retrieved_source_references,
1957 &requested_references,
1958 qualified_text_columns,
1959 qualified_exclude_columns,
1960 &unresolved_source_name,
1961 initial_gtid_set,
1962 &SourceReferencePolicy::Required,
1963 )
1964 .await?;
1965 let (_, purified_export) = source_exports.into_iter().next().unwrap();
1967 purified_export
1968 }
1969 GenericSourceConnection::SqlServer(_sql_server_source) => {
1970 return Err(PlanError::Unsupported {
1972 feature: "CREATE TABLE ... FROM SQL SERVER SOURCE".to_string(),
1973 discussion_no: None,
1974 });
1975 }
1976 GenericSourceConnection::LoadGenerator(load_gen_connection) => {
1977 let reference_client = SourceReferenceClient::LoadGenerator {
1978 generator: &load_gen_connection.load_generator,
1979 };
1980 retrieved_source_references = reference_client.get_source_references().await?;
1981
1982 let requested_exports = retrieved_source_references
1983 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
1984 let export = requested_exports.into_iter().next().unwrap();
1986 PurifiedSourceExport {
1987 external_reference: export.external_reference,
1988 details: PurifiedExportDetails::LoadGenerator {
1989 table: export
1990 .meta
1991 .load_generator_desc()
1992 .expect("is loadgen")
1993 .clone(),
1994 output: export
1995 .meta
1996 .load_generator_output()
1997 .expect("is loadgen")
1998 .clone(),
1999 },
2000 }
2001 }
2002 GenericSourceConnection::Kafka(kafka_conn) => {
2003 let reference_client = SourceReferenceClient::Kafka {
2004 topic: &kafka_conn.topic,
2005 };
2006 retrieved_source_references = reference_client.get_source_references().await?;
2007 let requested_exports = retrieved_source_references
2008 .requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
2009 let export = requested_exports.into_iter().next().unwrap();
2011
2012 format_options = SourceFormatOptions::Kafka {
2013 topic: kafka_conn.topic.clone(),
2014 };
2015 PurifiedSourceExport {
2016 external_reference: export.external_reference,
2017 details: PurifiedExportDetails::Kafka {},
2018 }
2019 }
2020 };
2021
2022 purify_source_format(
2023 &catalog,
2024 format,
2025 &format_options,
2026 envelope,
2027 storage_configuration,
2028 )
2029 .await?;
2030
2031 *external_reference = Some(purified_export.external_reference.clone());
2034
2035 match &purified_export.details {
2037 PurifiedExportDetails::Postgres { .. } => {
2038 let mut unsupported_cols = vec![];
2039 let postgres::PostgresExportStatementValues {
2040 columns: gen_columns,
2041 constraints: gen_constraints,
2042 text_columns: gen_text_columns,
2043 details: gen_details,
2044 external_reference: _,
2045 } = postgres::generate_source_export_statement_values(
2046 &scx,
2047 purified_export,
2048 &mut unsupported_cols,
2049 )?;
2050 if !unsupported_cols.is_empty() {
2051 unsupported_cols.sort();
2052 Err(PgSourcePurificationError::UnrecognizedTypes {
2053 cols: unsupported_cols,
2054 })?;
2055 }
2056
2057 if let Some(text_cols_option) = with_options
2058 .iter_mut()
2059 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2060 {
2061 if let Some(gen_text_columns) = gen_text_columns {
2062 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2063 } else {
2064 soft_panic_or_log!(
2065 "text_columns should be Some if text_cols_option is present"
2066 );
2067 }
2068 }
2069 match columns {
2070 TableFromSourceColumns::Defined(_) => unreachable!(),
2071 TableFromSourceColumns::NotSpecified => {
2072 *columns = TableFromSourceColumns::Defined(gen_columns);
2073 *constraints = gen_constraints;
2074 }
2075 TableFromSourceColumns::Named(_) => {
2076 sql_bail!("columns cannot be named for Postgres sources")
2077 }
2078 }
2079 with_options.push(TableFromSourceOption {
2080 name: TableFromSourceOptionName::Details,
2081 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2082 gen_details.into_proto().encode_to_vec(),
2083 )))),
2084 })
2085 }
2086 PurifiedExportDetails::MySql { .. } => {
2087 let mysql::MySqlExportStatementValues {
2088 columns: gen_columns,
2089 constraints: gen_constraints,
2090 text_columns: gen_text_columns,
2091 exclude_columns: gen_exclude_columns,
2092 details: gen_details,
2093 external_reference: _,
2094 } = mysql::generate_source_export_statement_values(&scx, purified_export)?;
2095
2096 if let Some(text_cols_option) = with_options
2097 .iter_mut()
2098 .find(|option| option.name == TableFromSourceOptionName::TextColumns)
2099 {
2100 if let Some(gen_text_columns) = gen_text_columns {
2101 text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
2102 } else {
2103 soft_panic_or_log!(
2104 "text_columns should be Some if text_cols_option is present"
2105 );
2106 }
2107 }
2108 if let Some(ignore_cols_option) = with_options
2109 .iter_mut()
2110 .find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
2111 {
2112 if let Some(gen_exclude_columns) = gen_exclude_columns {
2113 ignore_cols_option.value = Some(WithOptionValue::Sequence(gen_exclude_columns));
2114 } else {
2115 soft_panic_or_log!(
2116 "text_columns should be Some if ignore_cols_option is present"
2117 );
2118 }
2119 }
2120 match columns {
2121 TableFromSourceColumns::Defined(_) => unreachable!(),
2122 TableFromSourceColumns::NotSpecified => {
2123 *columns = TableFromSourceColumns::Defined(gen_columns);
2124 *constraints = gen_constraints;
2125 }
2126 TableFromSourceColumns::Named(_) => {
2127 sql_bail!("columns cannot be named for MySQL sources")
2128 }
2129 }
2130 with_options.push(TableFromSourceOption {
2131 name: TableFromSourceOptionName::Details,
2132 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2133 gen_details.into_proto().encode_to_vec(),
2134 )))),
2135 })
2136 }
2137 PurifiedExportDetails::SqlServer { .. } => {
2138 return Err(PlanError::Unsupported {
2140 feature: "CREATE TABLE ... FROM SQL SERVER SOURCE".to_string(),
2141 discussion_no: None,
2142 });
2143 }
2144 PurifiedExportDetails::LoadGenerator { .. } => {
2145 let (desc, output) = match purified_export.details {
2146 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2147 _ => unreachable!("purified export details must be load generator"),
2148 };
2149 if let Some(desc) = desc {
2154 let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2155 match columns {
2156 TableFromSourceColumns::Defined(_) => unreachable!(),
2157 TableFromSourceColumns::NotSpecified => {
2158 *columns = TableFromSourceColumns::Defined(gen_columns);
2159 *constraints = gen_constraints;
2160 }
2161 TableFromSourceColumns::Named(_) => {
2162 sql_bail!("columns cannot be named for multi-output load generator sources")
2163 }
2164 }
2165 }
2166 let details = SourceExportStatementDetails::LoadGenerator { output };
2167 with_options.push(TableFromSourceOption {
2168 name: TableFromSourceOptionName::Details,
2169 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2170 details.into_proto().encode_to_vec(),
2171 )))),
2172 })
2173 }
2174 PurifiedExportDetails::Kafka {} => {
2175 let details = SourceExportStatementDetails::Kafka {};
2179 with_options.push(TableFromSourceOption {
2180 name: TableFromSourceOptionName::Details,
2181 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2182 details.into_proto().encode_to_vec(),
2183 )))),
2184 })
2185 }
2186 };
2187
2188 Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
2192}
2193
2194enum SourceFormatOptions {
2195 Default,
2196 Kafka { topic: String },
2197}
2198
2199async fn purify_source_format(
2200 catalog: &dyn SessionCatalog,
2201 format: &mut Option<FormatSpecifier<Aug>>,
2202 options: &SourceFormatOptions,
2203 envelope: &Option<SourceEnvelope>,
2204 storage_configuration: &StorageConfiguration,
2205) -> Result<(), PlanError> {
2206 if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
2207 && !matches!(options, SourceFormatOptions::Kafka { .. })
2208 {
2209 sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
2210 }
2211
2212 match format.as_mut() {
2213 None => {}
2214 Some(FormatSpecifier::Bare(format)) => {
2215 purify_source_format_single(catalog, format, options, envelope, storage_configuration)
2216 .await?;
2217 }
2218
2219 Some(FormatSpecifier::KeyValue { key, value: val }) => {
2220 purify_source_format_single(catalog, key, options, envelope, storage_configuration)
2221 .await?;
2222 purify_source_format_single(catalog, val, options, envelope, storage_configuration)
2223 .await?;
2224 }
2225 }
2226 Ok(())
2227}
2228
2229async fn purify_source_format_single(
2230 catalog: &dyn SessionCatalog,
2231 format: &mut Format<Aug>,
2232 options: &SourceFormatOptions,
2233 envelope: &Option<SourceEnvelope>,
2234 storage_configuration: &StorageConfiguration,
2235) -> Result<(), PlanError> {
2236 match format {
2237 Format::Avro(schema) => match schema {
2238 AvroSchema::Csr { csr_connection } => {
2239 purify_csr_connection_avro(
2240 catalog,
2241 options,
2242 csr_connection,
2243 envelope,
2244 storage_configuration,
2245 )
2246 .await?
2247 }
2248 AvroSchema::InlineSchema { .. } => {}
2249 },
2250 Format::Protobuf(schema) => match schema {
2251 ProtobufSchema::Csr { csr_connection } => {
2252 purify_csr_connection_proto(
2253 catalog,
2254 options,
2255 csr_connection,
2256 envelope,
2257 storage_configuration,
2258 )
2259 .await?;
2260 }
2261 ProtobufSchema::InlineSchema { .. } => {}
2262 },
2263 Format::Bytes
2264 | Format::Regex(_)
2265 | Format::Json { .. }
2266 | Format::Text
2267 | Format::Csv { .. } => (),
2268 }
2269 Ok(())
2270}
2271
2272pub fn generate_subsource_statements(
2273 scx: &StatementContext,
2274 source_name: ResolvedItemName,
2275 subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
2276) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
2277 if subsources.is_empty() {
2279 return Ok(vec![]);
2280 }
2281 let (_, purified_export) = subsources.iter().next().unwrap();
2282
2283 let statements = match &purified_export.details {
2284 PurifiedExportDetails::Postgres { .. } => {
2285 crate::pure::postgres::generate_create_subsource_statements(
2286 scx,
2287 source_name,
2288 subsources,
2289 )?
2290 }
2291 PurifiedExportDetails::MySql { .. } => {
2292 crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
2293 }
2294 PurifiedExportDetails::SqlServer { .. } => {
2295 crate::pure::sql_server::generate_create_subsource_statements(
2296 scx,
2297 source_name,
2298 subsources,
2299 )?
2300 }
2301 PurifiedExportDetails::LoadGenerator { .. } => {
2302 let mut subsource_stmts = Vec::with_capacity(subsources.len());
2303 for (subsource_name, purified_export) in subsources {
2304 let (desc, output) = match purified_export.details {
2305 PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
2306 _ => unreachable!("purified export details must be load generator"),
2307 };
2308 let desc =
2309 desc.expect("subsources cannot be generated for single-output load generators");
2310
2311 let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
2312 let details = SourceExportStatementDetails::LoadGenerator { output };
2313 let subsource = CreateSubsourceStatement {
2315 name: subsource_name,
2316 columns,
2317 of_source: Some(source_name.clone()),
2318 constraints: table_constraints,
2323 if_not_exists: false,
2324 with_options: vec![
2325 CreateSubsourceOption {
2326 name: CreateSubsourceOptionName::ExternalReference,
2327 value: Some(WithOptionValue::UnresolvedItemName(
2328 purified_export.external_reference,
2329 )),
2330 },
2331 CreateSubsourceOption {
2332 name: CreateSubsourceOptionName::Details,
2333 value: Some(WithOptionValue::Value(Value::String(hex::encode(
2334 details.into_proto().encode_to_vec(),
2335 )))),
2336 },
2337 ],
2338 };
2339 subsource_stmts.push(subsource);
2340 }
2341
2342 subsource_stmts
2343 }
2344 PurifiedExportDetails::Kafka { .. } => {
2345 assert!(
2349 subsources.is_empty(),
2350 "Kafka sources do not produce data-bearing subsources"
2351 );
2352 vec![]
2353 }
2354 };
2355 Ok(statements)
2356}
2357
2358async fn purify_csr_connection_proto(
2359 catalog: &dyn SessionCatalog,
2360 options: &SourceFormatOptions,
2361 csr_connection: &mut CsrConnectionProtobuf<Aug>,
2362 envelope: &Option<SourceEnvelope>,
2363 storage_configuration: &StorageConfiguration,
2364) -> Result<(), PlanError> {
2365 let SourceFormatOptions::Kafka { topic } = options else {
2366 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2367 };
2368
2369 let CsrConnectionProtobuf {
2370 seed,
2371 connection: CsrConnection {
2372 connection,
2373 options: _,
2374 },
2375 } = csr_connection;
2376 match seed {
2377 None => {
2378 let scx = StatementContext::new(None, &*catalog);
2379
2380 let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2381 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2382 _ => sql_bail!("{} is not a schema registry connection", connection),
2383 };
2384
2385 let ccsr_client = ccsr_connection
2386 .connect(storage_configuration, InTask::No)
2387 .await
2388 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2389
2390 let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
2391 let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
2392 .await
2393 .ok();
2394
2395 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
2396 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2397 }
2398
2399 *seed = Some(CsrSeedProtobuf { value, key });
2400 }
2401 Some(_) => (),
2402 }
2403
2404 Ok(())
2405}
2406
2407async fn purify_csr_connection_avro(
2408 catalog: &dyn SessionCatalog,
2409 options: &SourceFormatOptions,
2410 csr_connection: &mut CsrConnectionAvro<Aug>,
2411 envelope: &Option<SourceEnvelope>,
2412 storage_configuration: &StorageConfiguration,
2413) -> Result<(), PlanError> {
2414 let SourceFormatOptions::Kafka { topic } = options else {
2415 sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
2416 };
2417
2418 let CsrConnectionAvro {
2419 connection: CsrConnection { connection, .. },
2420 seed,
2421 key_strategy,
2422 value_strategy,
2423 } = csr_connection;
2424 if seed.is_none() {
2425 let scx = StatementContext::new(None, &*catalog);
2426 let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
2427 Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
2428 _ => sql_bail!("{} is not a schema registry connection", connection),
2429 };
2430 let ccsr_client = csr_connection
2431 .connect(storage_configuration, InTask::No)
2432 .await
2433 .map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
2434
2435 let Schema {
2436 key_schema,
2437 value_schema,
2438 } = get_remote_csr_schema(
2439 &ccsr_client,
2440 key_strategy.clone().unwrap_or_default(),
2441 value_strategy.clone().unwrap_or_default(),
2442 topic,
2443 )
2444 .await?;
2445 if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
2446 sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
2447 }
2448
2449 *seed = Some(CsrSeedAvro {
2450 key_schema,
2451 value_schema,
2452 })
2453 }
2454
2455 Ok(())
2456}
2457
2458#[derive(Debug)]
2459pub struct Schema {
2460 pub key_schema: Option<String>,
2461 pub value_schema: String,
2462}
2463
2464async fn get_schema_with_strategy(
2465 client: &Client,
2466 strategy: ReaderSchemaSelectionStrategy,
2467 subject: &str,
2468) -> Result<Option<String>, PlanError> {
2469 match strategy {
2470 ReaderSchemaSelectionStrategy::Latest => {
2471 match client.get_schema_by_subject(subject).await {
2472 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2473 Err(GetBySubjectError::SubjectNotFound)
2474 | Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
2475 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2476 schema_lookup: format!("subject {}", subject.quoted()),
2477 cause: Arc::new(e),
2478 }),
2479 }
2480 }
2481 ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
2482 ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
2483 Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
2484 Err(GetByIdError::SchemaNotFound) => Ok(None),
2485 Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
2486 schema_lookup: format!("ID {}", id),
2487 cause: Arc::new(e),
2488 }),
2489 },
2490 }
2491}
2492
2493async fn get_remote_csr_schema(
2494 ccsr_client: &mz_ccsr::Client,
2495 key_strategy: ReaderSchemaSelectionStrategy,
2496 value_strategy: ReaderSchemaSelectionStrategy,
2497 topic: &str,
2498) -> Result<Schema, PlanError> {
2499 let value_schema_name = format!("{}-value", topic);
2500 let value_schema =
2501 get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
2502 let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
2503 let subject = format!("{}-key", topic);
2504 let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
2505 Ok(Schema {
2506 key_schema,
2507 value_schema,
2508 })
2509}
2510
2511async fn compile_proto(
2513 subject_name: &String,
2514 ccsr_client: &Client,
2515) -> Result<CsrSeedProtobufSchema, PlanError> {
2516 let (primary_subject, dependency_subjects) = ccsr_client
2517 .get_subject_and_references(subject_name)
2518 .await
2519 .map_err(|e| PlanError::FetchingCsrSchemaFailed {
2520 schema_lookup: format!("subject {}", subject_name.quoted()),
2521 cause: Arc::new(e),
2522 })?;
2523
2524 let mut source_tree = VirtualSourceTree::new();
2526 for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
2527 source_tree.as_mut().add_file(
2528 Path::new(&subject.name),
2529 subject.schema.raw.as_bytes().to_vec(),
2530 );
2531 }
2532 let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
2533 let fds = db
2534 .as_mut()
2535 .build_file_descriptor_set(&[Path::new(&primary_subject.name)])
2536 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2537
2538 let primary_fd = fds.file(0);
2540 let message_name = match primary_fd.message_type_size() {
2541 1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
2542 0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
2543 _ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
2544 };
2545
2546 let bytes = &fds
2548 .serialize()
2549 .map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
2550 let mut schema = String::new();
2551 strconv::format_bytes(&mut schema, bytes);
2552
2553 Ok(CsrSeedProtobufSchema {
2554 schema,
2555 message_name,
2556 })
2557}
2558
2559const MZ_NOW_NAME: &str = "mz_now";
2560const MZ_NOW_SCHEMA: &str = "mz_catalog";
2561
2562pub fn purify_create_materialized_view_options(
2568 catalog: impl SessionCatalog,
2569 mz_now: Option<Timestamp>,
2570 cmvs: &mut CreateMaterializedViewStatement<Aug>,
2571 resolved_ids: &mut ResolvedIds,
2572) {
2573 let (mz_now_id, mz_now_expr) = {
2576 let item = catalog
2577 .resolve_function(&PartialItemName {
2578 database: None,
2579 schema: Some(MZ_NOW_SCHEMA.to_string()),
2580 item: MZ_NOW_NAME.to_string(),
2581 })
2582 .expect("we should be able to resolve mz_now");
2583 (
2584 item.id(),
2585 Expr::Function(Function {
2586 name: ResolvedItemName::Item {
2587 id: item.id(),
2588 qualifiers: item.name().qualifiers.clone(),
2589 full_name: catalog.resolve_full_name(item.name()),
2590 print_id: false,
2591 version: RelationVersionSelector::Latest,
2592 },
2593 args: FunctionArgs::Args {
2594 args: Vec::new(),
2595 order_by: Vec::new(),
2596 },
2597 filter: None,
2598 over: None,
2599 distinct: false,
2600 }),
2601 )
2602 };
2603 let (mz_timestamp_id, mz_timestamp_type) = {
2605 let item = catalog.get_system_type("mz_timestamp");
2606 let full_name = catalog.resolve_full_name(item.name());
2607 (
2608 item.id(),
2609 ResolvedDataType::Named {
2610 id: item.id(),
2611 qualifiers: item.name().qualifiers.clone(),
2612 full_name,
2613 modifiers: vec![],
2614 print_id: true,
2615 },
2616 )
2617 };
2618
2619 let mut introduced_mz_timestamp = false;
2620
2621 for option in cmvs.with_options.iter_mut() {
2622 if matches!(
2624 option.value,
2625 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
2626 ) {
2627 option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
2628 RefreshAtOptionValue {
2629 time: mz_now_expr.clone(),
2630 },
2631 )));
2632 }
2633
2634 if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2636 RefreshEveryOptionValue { aligned_to, .. },
2637 ))) = &mut option.value
2638 {
2639 if aligned_to.is_none() {
2640 *aligned_to = Some(mz_now_expr.clone());
2641 }
2642 }
2643
2644 match &mut option.value {
2647 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
2648 time,
2649 }))) => {
2650 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2651 visitor.visit_expr_mut(time);
2652 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2653 }
2654 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
2655 RefreshEveryOptionValue {
2656 interval: _,
2657 aligned_to: Some(aligned_to),
2658 },
2659 ))) => {
2660 let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
2661 visitor.visit_expr_mut(aligned_to);
2662 introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
2663 }
2664 _ => {}
2665 }
2666 }
2667
2668 if !cmvs.with_options.iter().any(|o| {
2670 matches!(
2671 o,
2672 MaterializedViewOption {
2673 value: Some(WithOptionValue::Refresh(..)),
2674 ..
2675 }
2676 )
2677 }) {
2678 cmvs.with_options.push(MaterializedViewOption {
2679 name: MaterializedViewOptionName::Refresh,
2680 value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
2681 })
2682 }
2683
2684 if introduced_mz_timestamp {
2688 resolved_ids.add_item(mz_timestamp_id);
2689 }
2690 let mut visitor = ExprContainsTemporalVisitor::new();
2694 visitor.visit_create_materialized_view_statement(cmvs);
2695 if !visitor.contains_temporal {
2696 resolved_ids.remove_item(&mz_now_id);
2697 }
2698}
2699
2700pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
2703 match &mvo.value {
2704 Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
2705 let mut visitor = ExprContainsTemporalVisitor::new();
2706 visitor.visit_expr(time);
2707 visitor.contains_temporal
2708 }
2709 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2710 interval: _,
2711 aligned_to: Some(aligned_to),
2712 }))) => {
2713 let mut visitor = ExprContainsTemporalVisitor::new();
2714 visitor.visit_expr(aligned_to);
2715 visitor.contains_temporal
2716 }
2717 Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
2718 interval: _,
2719 aligned_to: None,
2720 }))) => {
2721 true
2724 }
2725 Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
2726 true
2728 }
2729 _ => false,
2730 }
2731}
2732
2733struct ExprContainsTemporalVisitor {
2735 pub contains_temporal: bool,
2736}
2737
2738impl ExprContainsTemporalVisitor {
2739 pub fn new() -> ExprContainsTemporalVisitor {
2740 ExprContainsTemporalVisitor {
2741 contains_temporal: false,
2742 }
2743 }
2744}
2745
2746impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
2747 fn visit_function(&mut self, func: &Function<Aug>) {
2748 self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
2749 visit_function(self, func);
2750 }
2751}
2752
2753struct MzNowPurifierVisitor {
2754 pub mz_now: Option<Timestamp>,
2755 pub mz_timestamp_type: ResolvedDataType,
2756 pub introduced_mz_timestamp: bool,
2757}
2758
2759impl MzNowPurifierVisitor {
2760 pub fn new(
2761 mz_now: Option<Timestamp>,
2762 mz_timestamp_type: ResolvedDataType,
2763 ) -> MzNowPurifierVisitor {
2764 MzNowPurifierVisitor {
2765 mz_now,
2766 mz_timestamp_type,
2767 introduced_mz_timestamp: false,
2768 }
2769 }
2770}
2771
2772impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
2773 fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
2774 match expr {
2775 Expr::Function(Function {
2776 name:
2777 ResolvedItemName::Item {
2778 full_name: FullItemName { item, .. },
2779 ..
2780 },
2781 ..
2782 }) if item == &MZ_NOW_NAME.to_string() => {
2783 let mz_now = self.mz_now.expect(
2784 "we should have chosen a timestamp if the expression contains mz_now()",
2785 );
2786 *expr = Expr::Cast {
2789 expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
2790 data_type: self.mz_timestamp_type.clone(),
2791 };
2792 self.introduced_mz_timestamp = true;
2793 }
2794 _ => visit_expr_mut(self, expr),
2795 }
2796 }
2797}