1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33 ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34 StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38 CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39 RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use proptest_derive::Arbitrary;
45use prost::Message;
46use serde::{Deserialize, Serialize};
47use timely::order::{PartialOrder, TotalOrder};
48use timely::progress::timestamp::Refines;
49use timely::progress::{PathSummary, Timestamp};
50
51use crate::AlterCompatible;
52use crate::connections::inline::{
53 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
54 ReferencedConnection,
55};
56use crate::controller::{AlterError, CollectionMetadata};
57use crate::errors::{DataflowError, ProtoDataflowError};
58use crate::instances::StorageInstanceId;
59use crate::sources::proto_ingestion_description::{ProtoSourceExport, ProtoSourceImport};
60use crate::sources::sql_server::SqlServerSourceExportDetails;
61
62pub mod encoding;
63pub mod envelope;
64pub mod kafka;
65pub mod load_generator;
66pub mod mysql;
67pub mod postgres;
68pub mod sql_server;
69
70pub use crate::sources::envelope::SourceEnvelope;
71pub use crate::sources::kafka::KafkaSourceConnection;
72pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
73pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
74pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
75pub use crate::sources::sql_server::{SqlServerSource, SqlServerSourceExtras};
76
77include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
78
79#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
81pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
82 pub desc: SourceDesc<C>,
84 pub ingestion_metadata: S,
86 #[proptest(
100 strategy = "proptest::collection::btree_map(any::<GlobalId>(), any::<SourceExport<S>>(), 0..4)"
101 )]
102 pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
103 pub instance_id: StorageInstanceId,
105 pub remap_collection_id: GlobalId,
107}
108
109impl IngestionDescription {
110 pub fn new(
111 desc: SourceDesc,
112 instance_id: StorageInstanceId,
113 remap_collection_id: GlobalId,
114 ) -> Self {
115 Self {
116 desc,
117 ingestion_metadata: (),
118 source_exports: BTreeMap::new(),
119 instance_id,
120 remap_collection_id,
121 }
122 }
123}
124
125impl<S> IngestionDescription<S> {
126 pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
131 let IngestionDescription {
134 desc: _,
135 ingestion_metadata: _,
136 source_exports,
137 instance_id: _,
138 remap_collection_id,
139 } = &self;
140
141 source_exports
142 .keys()
143 .copied()
144 .chain(std::iter::once(*remap_collection_id))
145 }
146}
147
148impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
149 fn alter_compatible(
150 &self,
151 id: GlobalId,
152 other: &IngestionDescription<S>,
153 ) -> Result<(), AlterError> {
154 if self == other {
155 return Ok(());
156 }
157 let IngestionDescription {
158 desc,
159 ingestion_metadata,
160 source_exports,
161 instance_id,
162 remap_collection_id,
163 } = self;
164
165 let compatibility_checks = [
166 (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
167 (
168 ingestion_metadata == &other.ingestion_metadata,
169 "ingestion_metadata",
170 ),
171 (
172 source_exports
173 .iter()
174 .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
175 l_key.cmp(r_key)
176 })
177 .all(|r| match r {
178 Both(
179 (
180 _,
181 SourceExport {
182 storage_metadata: l_metadata,
183 details: l_details,
184 data_config: l_data_config,
185 },
186 ),
187 (
188 _,
189 SourceExport {
190 storage_metadata: r_metadata,
191 details: r_details,
192 data_config: r_data_config,
193 },
194 ),
195 ) => {
196 l_metadata.alter_compatible(id, r_metadata).is_ok()
197 && l_details.alter_compatible(id, r_details).is_ok()
198 && l_data_config.alter_compatible(id, r_data_config).is_ok()
199 }
200 _ => true,
201 }),
202 "source_exports",
203 ),
204 (instance_id == &other.instance_id, "instance_id"),
205 (
206 remap_collection_id == &other.remap_collection_id,
207 "remap_collection_id",
208 ),
209 ];
210 for (compatible, field) in compatibility_checks {
211 if !compatible {
212 tracing::warn!(
213 "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
214 self,
215 other
216 );
217
218 return Err(AlterError { id });
219 }
220 }
221
222 Ok(())
223 }
224}
225
226impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
227 for IngestionDescription<(), ReferencedConnection>
228{
229 fn into_inline_connection(self, r: R) -> IngestionDescription {
230 let IngestionDescription {
231 desc,
232 ingestion_metadata,
233 source_exports,
234 instance_id,
235 remap_collection_id,
236 } = self;
237
238 IngestionDescription {
239 desc: desc.into_inline_connection(r),
240 ingestion_metadata,
241 source_exports,
242 instance_id,
243 remap_collection_id,
244 }
245 }
246}
247
248#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
249pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
250 pub storage_metadata: S,
252 pub details: SourceExportDetails,
254 pub data_config: SourceExportDataConfig<C>,
256}
257
258impl RustType<ProtoIngestionDescription> for IngestionDescription<CollectionMetadata> {
259 fn into_proto(&self) -> ProtoIngestionDescription {
260 ProtoIngestionDescription {
261 source_exports: self.source_exports.into_proto(),
262 ingestion_metadata: Some(self.ingestion_metadata.into_proto()),
263 desc: Some(self.desc.into_proto()),
264 instance_id: Some(self.instance_id.into_proto()),
265 remap_collection_id: Some(self.remap_collection_id.into_proto()),
266 }
267 }
268
269 fn from_proto(proto: ProtoIngestionDescription) -> Result<Self, TryFromProtoError> {
270 Ok(IngestionDescription {
271 source_exports: proto.source_exports.into_rust()?,
272 desc: proto
273 .desc
274 .into_rust_if_some("ProtoIngestionDescription::desc")?,
275 ingestion_metadata: proto
276 .ingestion_metadata
277 .into_rust_if_some("ProtoIngestionDescription::ingestion_metadata")?,
278 instance_id: proto
279 .instance_id
280 .into_rust_if_some("ProtoIngestionDescription::instance_id")?,
281 remap_collection_id: proto
282 .remap_collection_id
283 .into_rust_if_some("ProtoIngestionDescription::remap_collection_id")?,
284 })
285 }
286}
287
288impl ProtoMapEntry<GlobalId, CollectionMetadata> for ProtoSourceImport {
289 fn from_rust<'a>(entry: (&'a GlobalId, &'a CollectionMetadata)) -> Self {
290 ProtoSourceImport {
291 id: Some(entry.0.into_proto()),
292 storage_metadata: Some(entry.1.into_proto()),
293 }
294 }
295
296 fn into_rust(self) -> Result<(GlobalId, CollectionMetadata), TryFromProtoError> {
297 Ok((
298 self.id.into_rust_if_some("ProtoSourceImport::id")?,
299 self.storage_metadata
300 .into_rust_if_some("ProtoSourceImport::storage_metadata")?,
301 ))
302 }
303}
304
305impl ProtoMapEntry<GlobalId, SourceExport<CollectionMetadata>> for ProtoSourceExport {
306 fn from_rust<'a>(
307 (id, source_export): (&'a GlobalId, &'a SourceExport<CollectionMetadata>),
308 ) -> Self {
309 ProtoSourceExport {
310 id: Some(id.into_proto()),
311 storage_metadata: Some(source_export.storage_metadata.into_proto()),
312 details: Some(source_export.details.into_proto()),
313 data_config: Some(source_export.data_config.into_proto()),
314 }
315 }
316
317 fn into_rust(self) -> Result<(GlobalId, SourceExport<CollectionMetadata>), TryFromProtoError> {
318 Ok((
319 self.id.into_rust_if_some("ProtoSourceExport::id")?,
320 SourceExport {
321 storage_metadata: self
322 .storage_metadata
323 .into_rust_if_some("ProtoSourceExport::storage_metadata")?,
324 details: self
325 .details
326 .into_rust_if_some("ProtoSourceExport::details")?,
327 data_config: self
328 .data_config
329 .into_rust_if_some("ProtoSourceExport::data_config")?,
330 },
331 ))
332 }
333}
334
335pub trait SourceTimestamp:
336 Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
337{
338 fn encode_row(&self) -> Row;
339 fn decode_row(row: &Row) -> Self;
340}
341
342impl SourceTimestamp for MzOffset {
343 fn encode_row(&self) -> Row {
344 Row::pack([Datum::UInt64(self.offset)])
345 }
346
347 fn decode_row(row: &Row) -> Self {
348 let mut datums = row.iter();
349 match (datums.next(), datums.next()) {
350 (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
351 _ => panic!("invalid row {row:?}"),
352 }
353 }
354}
355
356#[derive(
360 Copy,
361 Clone,
362 Default,
363 Debug,
364 PartialEq,
365 PartialOrd,
366 Eq,
367 Ord,
368 Hash,
369 Serialize,
370 Deserialize,
371 Arbitrary,
372)]
373pub struct MzOffset {
374 pub offset: u64,
375}
376
377impl differential_dataflow::difference::Semigroup for MzOffset {
378 fn plus_equals(&mut self, rhs: &Self) {
379 self.offset.plus_equals(&rhs.offset)
380 }
381}
382
383impl differential_dataflow::difference::IsZero for MzOffset {
384 fn is_zero(&self) -> bool {
385 self.offset.is_zero()
386 }
387}
388
389impl mz_persist_types::Codec64 for MzOffset {
390 fn codec_name() -> String {
391 "MzOffset".to_string()
392 }
393
394 fn encode(&self) -> [u8; 8] {
395 mz_persist_types::Codec64::encode(&self.offset)
396 }
397
398 fn decode(buf: [u8; 8]) -> Self {
399 Self {
400 offset: mz_persist_types::Codec64::decode(buf),
401 }
402 }
403}
404
405impl columnation::Columnation for MzOffset {
406 type InnerRegion = columnation::CopyRegion<MzOffset>;
407}
408
409impl RustType<ProtoMzOffset> for MzOffset {
410 fn into_proto(&self) -> ProtoMzOffset {
411 ProtoMzOffset {
412 offset: self.offset,
413 }
414 }
415
416 fn from_proto(proto: ProtoMzOffset) -> Result<Self, TryFromProtoError> {
417 Ok(Self {
418 offset: proto.offset,
419 })
420 }
421}
422
423impl MzOffset {
424 pub fn checked_sub(self, other: Self) -> Option<Self> {
425 self.offset
426 .checked_sub(other.offset)
427 .map(|offset| Self { offset })
428 }
429}
430
431impl From<u64> for MzOffset {
434 fn from(offset: u64) -> Self {
435 Self { offset }
436 }
437}
438
439impl std::fmt::Display for MzOffset {
440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441 write!(f, "{}", self.offset)
442 }
443}
444
445impl Add<u64> for MzOffset {
447 type Output = MzOffset;
448
449 fn add(self, x: u64) -> MzOffset {
450 MzOffset {
451 offset: self.offset + x,
452 }
453 }
454}
455impl Add<Self> for MzOffset {
456 type Output = Self;
457
458 fn add(self, x: Self) -> Self {
459 MzOffset {
460 offset: self.offset + x.offset,
461 }
462 }
463}
464impl AddAssign<u64> for MzOffset {
465 fn add_assign(&mut self, x: u64) {
466 self.offset += x;
467 }
468}
469impl AddAssign<Self> for MzOffset {
470 fn add_assign(&mut self, x: Self) {
471 self.offset += x.offset;
472 }
473}
474
475impl From<tokio_postgres::types::PgLsn> for MzOffset {
477 fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
478 MzOffset { offset: lsn.into() }
479 }
480}
481
482impl Timestamp for MzOffset {
483 type Summary = MzOffset;
484
485 fn minimum() -> Self {
486 MzOffset {
487 offset: Timestamp::minimum(),
488 }
489 }
490}
491
492impl PathSummary<MzOffset> for MzOffset {
493 fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
494 Some(MzOffset {
495 offset: self.offset.results_in(&src.offset)?,
496 })
497 }
498
499 fn followed_by(&self, other: &Self) -> Option<Self> {
500 Some(MzOffset {
501 offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
502 })
503 }
504}
505
506impl Refines<()> for MzOffset {
507 fn to_inner(_: ()) -> Self {
508 MzOffset::minimum()
509 }
510 fn to_outer(self) {}
511 fn summarize(_: Self::Summary) {}
512}
513
514impl PartialOrder for MzOffset {
515 #[inline]
516 fn less_equal(&self, other: &Self) -> bool {
517 self.offset.less_equal(&other.offset)
518 }
519}
520
521impl TotalOrder for MzOffset {}
522
523#[derive(Arbitrary, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Hash)]
532pub enum Timeline {
533 EpochMilliseconds,
536 External(String),
540 User(String),
544}
545
546impl Timeline {
547 const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
548 const EXTERNAL_ID_CHAR: char = 'E';
549 const USER_ID_CHAR: char = 'U';
550
551 fn id_char(&self) -> char {
552 match self {
553 Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
554 Self::External(_) => Self::EXTERNAL_ID_CHAR,
555 Self::User(_) => Self::USER_ID_CHAR,
556 }
557 }
558}
559
560impl RustType<ProtoTimeline> for Timeline {
561 fn into_proto(&self) -> ProtoTimeline {
562 use proto_timeline::Kind;
563 ProtoTimeline {
564 kind: Some(match self {
565 Timeline::EpochMilliseconds => Kind::EpochMilliseconds(()),
566 Timeline::External(s) => Kind::External(s.clone()),
567 Timeline::User(s) => Kind::User(s.clone()),
568 }),
569 }
570 }
571
572 fn from_proto(proto: ProtoTimeline) -> Result<Self, TryFromProtoError> {
573 use proto_timeline::Kind;
574 let kind = proto
575 .kind
576 .ok_or_else(|| TryFromProtoError::missing_field("ProtoTimeline::kind"))?;
577 Ok(match kind {
578 Kind::EpochMilliseconds(()) => Timeline::EpochMilliseconds,
579 Kind::External(s) => Timeline::External(s),
580 Kind::User(s) => Timeline::User(s),
581 })
582 }
583}
584
585impl ToString for Timeline {
586 fn to_string(&self) -> String {
587 match self {
588 Self::EpochMilliseconds => format!("{}", self.id_char()),
589 Self::External(id) => format!("{}.{id}", self.id_char()),
590 Self::User(id) => format!("{}.{id}", self.id_char()),
591 }
592 }
593}
594
595impl FromStr for Timeline {
596 type Err = String;
597
598 fn from_str(s: &str) -> Result<Self, Self::Err> {
599 if s.is_empty() {
600 return Err("empty timeline".to_string());
601 }
602 let mut chars = s.chars();
603 match chars.next().expect("non-empty string") {
604 Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
605 None => Ok(Self::EpochMilliseconds),
606 Some(_) => Err(format!("unknown timeline: {s}")),
607 },
608 Self::EXTERNAL_ID_CHAR => match chars.next() {
609 Some('.') => Ok(Self::External(chars.as_str().to_string())),
610 _ => Err(format!("unknown timeline: {s}")),
611 },
612 Self::USER_ID_CHAR => match chars.next() {
613 Some('.') => Ok(Self::User(chars.as_str().to_string())),
614 _ => Err(format!("unknown timeline: {s}")),
615 },
616 _ => Err(format!("unknown timeline: {s}")),
617 }
618 }
619}
620
621pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
623 fn name(&self) -> &'static str;
625
626 fn external_reference(&self) -> Option<&str>;
628
629 fn default_key_desc(&self) -> RelationDesc;
633
634 fn default_value_desc(&self) -> RelationDesc;
638
639 fn timestamp_desc(&self) -> RelationDesc;
642
643 fn connection_id(&self) -> Option<CatalogItemId>;
646
647 fn primary_export_details(&self) -> SourceExportDetails;
651
652 fn supports_read_only(&self) -> bool;
654
655 fn prefers_single_replica(&self) -> bool;
657}
658
659#[derive(Arbitrary, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
660pub enum Compression {
661 Gzip,
662 None,
663}
664
665impl RustType<ProtoCompression> for Compression {
666 fn into_proto(&self) -> ProtoCompression {
667 use proto_compression::Kind;
668 ProtoCompression {
669 kind: Some(match self {
670 Compression::Gzip => Kind::Gzip(()),
671 Compression::None => Kind::None(()),
672 }),
673 }
674 }
675
676 fn from_proto(proto: ProtoCompression) -> Result<Self, TryFromProtoError> {
677 use proto_compression::Kind;
678 Ok(match proto.kind {
679 Some(Kind::Gzip(())) => Compression::Gzip,
680 Some(Kind::None(())) => Compression::None,
681 None => {
682 return Err(TryFromProtoError::MissingField(
683 "ProtoCompression::kind".into(),
684 ));
685 }
686 })
687 }
688}
689
690#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
693pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
694 pub encoding: Option<encoding::SourceDataEncoding<C>>,
695 pub envelope: SourceEnvelope,
696}
697
698impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
699 for SourceExportDataConfig<ReferencedConnection>
700{
701 fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
702 let SourceExportDataConfig { encoding, envelope } = self;
703
704 SourceExportDataConfig {
705 encoding: encoding.map(|e| e.into_inline_connection(r)),
706 envelope,
707 }
708 }
709}
710
711impl RustType<ProtoSourceExportDataConfig> for SourceExportDataConfig {
712 fn into_proto(&self) -> ProtoSourceExportDataConfig {
713 ProtoSourceExportDataConfig {
714 encoding: self.encoding.into_proto(),
715 envelope: Some(self.envelope.into_proto()),
716 }
717 }
718
719 fn from_proto(proto: ProtoSourceExportDataConfig) -> Result<Self, TryFromProtoError> {
720 Ok(SourceExportDataConfig {
721 encoding: proto.encoding.into_rust()?,
722 envelope: proto
723 .envelope
724 .into_rust_if_some("ProtoSourceExportDataConfig::envelope")?,
725 })
726 }
727}
728
729impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
730 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
731 if self == other {
732 return Ok(());
733 }
734 let Self { encoding, envelope } = &self;
735
736 let compatibility_checks = [
737 (
738 match (encoding, &other.encoding) {
739 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
740 (s, o) => s == o,
741 },
742 "encoding",
743 ),
744 (envelope == &other.envelope, "envelope"),
745 ];
746
747 for (compatible, field) in compatibility_checks {
748 if !compatible {
749 tracing::warn!(
750 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
751 self,
752 other
753 );
754
755 return Err(AlterError { id });
756 }
757 }
758 Ok(())
759 }
760}
761
762impl<C: ConnectionAccess> SourceExportDataConfig<C> {
763 pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
770 match &self.envelope {
771 SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
773 SourceEnvelope::None(_) => {
774 match connection {
775 GenericSourceConnection::Postgres(_) => false,
777 GenericSourceConnection::MySql(_) => false,
779 GenericSourceConnection::SqlServer(_) => false,
781 GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
783 GenericSourceConnection::Kafka(_) => true,
785 }
786 }
787 }
788 }
789}
790
791#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
793pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
794 pub connection: GenericSourceConnection<C>,
795 pub timestamp_interval: Duration,
796 pub primary_export: SourceExportDataConfig<C>,
801 pub primary_export_details: SourceExportDetails,
802}
803
804impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
805 for SourceDesc<ReferencedConnection>
806{
807 fn into_inline_connection(self, r: R) -> SourceDesc {
808 let SourceDesc {
809 connection,
810 primary_export,
811 primary_export_details,
812 timestamp_interval,
813 } = self;
814
815 SourceDesc {
816 connection: connection.into_inline_connection(&r),
817 primary_export: primary_export.into_inline_connection(r),
818 primary_export_details,
819 timestamp_interval,
820 }
821 }
822}
823
824impl RustType<ProtoSourceDesc> for SourceDesc {
825 fn into_proto(&self) -> ProtoSourceDesc {
826 ProtoSourceDesc {
827 connection: Some(self.connection.into_proto()),
828 primary_export: Some(self.primary_export.into_proto()),
829 primary_export_details: Some(self.primary_export_details.into_proto()),
830 timestamp_interval: Some(self.timestamp_interval.into_proto()),
831 }
832 }
833
834 fn from_proto(proto: ProtoSourceDesc) -> Result<Self, TryFromProtoError> {
835 Ok(SourceDesc {
836 connection: proto
837 .connection
838 .into_rust_if_some("ProtoSourceDesc::connection")?,
839 primary_export: proto
840 .primary_export
841 .into_rust_if_some("ProtoSourceDesc::primary_export")?,
842 primary_export_details: proto
843 .primary_export_details
844 .into_rust_if_some("ProtoSourceDesc::primary_export_details")?,
845 timestamp_interval: proto
846 .timestamp_interval
847 .into_rust_if_some("ProtoSourceDesc::timestamp_interval")?,
848 })
849 }
850}
851
852impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
853 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
857 if self == other {
858 return Ok(());
859 }
860 let Self {
861 connection,
862 primary_export,
863 primary_export_details,
864 timestamp_interval,
865 } = &self;
866
867 let compatibility_checks = [
868 (
869 connection.alter_compatible(id, &other.connection).is_ok(),
870 "connection",
871 ),
872 (primary_export == &other.primary_export, "primary_export"),
873 (
874 primary_export_details == &other.primary_export_details,
875 "primary_export_details",
876 ),
877 (
878 timestamp_interval == &other.timestamp_interval,
879 "timestamp_interval",
880 ),
881 ];
882
883 for (compatible, field) in compatibility_checks {
884 if !compatible {
885 tracing::warn!(
886 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
887 self,
888 other
889 );
890
891 return Err(AlterError { id });
892 }
893 }
894
895 Ok(())
896 }
897}
898
899impl SourceDesc<InlinedConnection> {
900 pub fn primary_source_export(&self) -> SourceExport<(), InlinedConnection> {
904 SourceExport {
905 storage_metadata: (),
906 details: self.primary_export_details.clone(),
907 data_config: self.primary_export.clone(),
908 }
909 }
910}
911
912#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
913pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
914 Kafka(KafkaSourceConnection<C>),
915 Postgres(PostgresSourceConnection<C>),
916 MySql(MySqlSourceConnection<C>),
917 SqlServer(SqlServerSource<C>),
918 LoadGenerator(LoadGeneratorSourceConnection),
919}
920
921impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
922 fn from(conn: KafkaSourceConnection<C>) -> Self {
923 Self::Kafka(conn)
924 }
925}
926
927impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
928 fn from(conn: PostgresSourceConnection<C>) -> Self {
929 Self::Postgres(conn)
930 }
931}
932
933impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
934 fn from(conn: MySqlSourceConnection<C>) -> Self {
935 Self::MySql(conn)
936 }
937}
938
939impl<C: ConnectionAccess> From<SqlServerSource<C>> for GenericSourceConnection<C> {
940 fn from(conn: SqlServerSource<C>) -> Self {
941 Self::SqlServer(conn)
942 }
943}
944
945impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
946 fn from(conn: LoadGeneratorSourceConnection) -> Self {
947 Self::LoadGenerator(conn)
948 }
949}
950
951impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
952 for GenericSourceConnection<ReferencedConnection>
953{
954 fn into_inline_connection(self, r: R) -> GenericSourceConnection {
955 match self {
956 GenericSourceConnection::Kafka(kafka) => {
957 GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
958 }
959 GenericSourceConnection::Postgres(pg) => {
960 GenericSourceConnection::Postgres(pg.into_inline_connection(r))
961 }
962 GenericSourceConnection::MySql(mysql) => {
963 GenericSourceConnection::MySql(mysql.into_inline_connection(r))
964 }
965 GenericSourceConnection::SqlServer(sql_server) => {
966 GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
967 }
968 GenericSourceConnection::LoadGenerator(lg) => {
969 GenericSourceConnection::LoadGenerator(lg)
970 }
971 }
972 }
973}
974
975impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
976 fn name(&self) -> &'static str {
977 match self {
978 Self::Kafka(conn) => conn.name(),
979 Self::Postgres(conn) => conn.name(),
980 Self::MySql(conn) => conn.name(),
981 Self::SqlServer(conn) => conn.name(),
982 Self::LoadGenerator(conn) => conn.name(),
983 }
984 }
985
986 fn external_reference(&self) -> Option<&str> {
987 match self {
988 Self::Kafka(conn) => conn.external_reference(),
989 Self::Postgres(conn) => conn.external_reference(),
990 Self::MySql(conn) => conn.external_reference(),
991 Self::SqlServer(conn) => conn.external_reference(),
992 Self::LoadGenerator(conn) => conn.external_reference(),
993 }
994 }
995
996 fn default_key_desc(&self) -> RelationDesc {
997 match self {
998 Self::Kafka(conn) => conn.default_key_desc(),
999 Self::Postgres(conn) => conn.default_key_desc(),
1000 Self::MySql(conn) => conn.default_key_desc(),
1001 Self::SqlServer(conn) => conn.default_key_desc(),
1002 Self::LoadGenerator(conn) => conn.default_key_desc(),
1003 }
1004 }
1005
1006 fn default_value_desc(&self) -> RelationDesc {
1007 match self {
1008 Self::Kafka(conn) => conn.default_value_desc(),
1009 Self::Postgres(conn) => conn.default_value_desc(),
1010 Self::MySql(conn) => conn.default_value_desc(),
1011 Self::SqlServer(conn) => conn.default_value_desc(),
1012 Self::LoadGenerator(conn) => conn.default_value_desc(),
1013 }
1014 }
1015
1016 fn timestamp_desc(&self) -> RelationDesc {
1017 match self {
1018 Self::Kafka(conn) => conn.timestamp_desc(),
1019 Self::Postgres(conn) => conn.timestamp_desc(),
1020 Self::MySql(conn) => conn.timestamp_desc(),
1021 Self::SqlServer(conn) => conn.timestamp_desc(),
1022 Self::LoadGenerator(conn) => conn.timestamp_desc(),
1023 }
1024 }
1025
1026 fn connection_id(&self) -> Option<CatalogItemId> {
1027 match self {
1028 Self::Kafka(conn) => conn.connection_id(),
1029 Self::Postgres(conn) => conn.connection_id(),
1030 Self::MySql(conn) => conn.connection_id(),
1031 Self::SqlServer(conn) => conn.connection_id(),
1032 Self::LoadGenerator(conn) => conn.connection_id(),
1033 }
1034 }
1035
1036 fn primary_export_details(&self) -> SourceExportDetails {
1037 match self {
1038 Self::Kafka(conn) => conn.primary_export_details(),
1039 Self::Postgres(conn) => conn.primary_export_details(),
1040 Self::MySql(conn) => conn.primary_export_details(),
1041 Self::SqlServer(conn) => conn.primary_export_details(),
1042 Self::LoadGenerator(conn) => conn.primary_export_details(),
1043 }
1044 }
1045
1046 fn supports_read_only(&self) -> bool {
1047 match self {
1048 GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
1049 GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
1050 GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
1051 GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
1052 GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
1053 }
1054 }
1055
1056 fn prefers_single_replica(&self) -> bool {
1057 match self {
1058 GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
1059 GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
1060 GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
1061 GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
1062 GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
1063 }
1064 }
1065}
1066impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
1067 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1068 if self == other {
1069 return Ok(());
1070 }
1071 let r = match (self, other) {
1072 (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
1073 (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
1074 (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
1075 (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
1076 (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
1077 conn.alter_compatible(id, other)
1078 }
1079 _ => Err(AlterError { id }),
1080 };
1081
1082 if r.is_err() {
1083 tracing::warn!(
1084 "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1085 self,
1086 other
1087 );
1088 }
1089
1090 r
1091 }
1092}
1093
1094impl RustType<ProtoSourceConnection> for GenericSourceConnection<InlinedConnection> {
1095 fn into_proto(&self) -> ProtoSourceConnection {
1096 use proto_source_connection::Kind;
1097 ProtoSourceConnection {
1098 kind: Some(match self {
1099 GenericSourceConnection::Kafka(kafka) => Kind::Kafka(kafka.into_proto()),
1100 GenericSourceConnection::Postgres(postgres) => {
1101 Kind::Postgres(postgres.into_proto())
1102 }
1103 GenericSourceConnection::MySql(mysql) => Kind::Mysql(mysql.into_proto()),
1104 GenericSourceConnection::SqlServer(sql_server) => {
1105 Kind::SqlServer(sql_server.into_proto())
1106 }
1107 GenericSourceConnection::LoadGenerator(loadgen) => {
1108 Kind::Loadgen(loadgen.into_proto())
1109 }
1110 }),
1111 }
1112 }
1113
1114 fn from_proto(proto: ProtoSourceConnection) -> Result<Self, TryFromProtoError> {
1115 use proto_source_connection::Kind;
1116 let kind = proto
1117 .kind
1118 .ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceConnection::kind"))?;
1119 Ok(match kind {
1120 Kind::Kafka(kafka) => GenericSourceConnection::Kafka(kafka.into_rust()?),
1121 Kind::Postgres(postgres) => GenericSourceConnection::Postgres(postgres.into_rust()?),
1122 Kind::Mysql(mysql) => GenericSourceConnection::MySql(mysql.into_rust()?),
1123 Kind::SqlServer(sql_server) => {
1124 GenericSourceConnection::SqlServer(sql_server.into_rust()?)
1125 }
1126 Kind::Loadgen(loadgen) => GenericSourceConnection::LoadGenerator(loadgen.into_rust()?),
1127 })
1128 }
1129}
1130
1131#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1134pub enum SourceExportDetails {
1135 None,
1138 Kafka(KafkaSourceExportDetails),
1139 Postgres(PostgresSourceExportDetails),
1140 MySql(MySqlSourceExportDetails),
1141 SqlServer(SqlServerSourceExportDetails),
1142 LoadGenerator(LoadGeneratorSourceExportDetails),
1143}
1144
1145impl crate::AlterCompatible for SourceExportDetails {
1146 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1147 if self == other {
1148 return Ok(());
1149 }
1150 let r = match (self, other) {
1151 (Self::None, Self::None) => Ok(()),
1152 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
1153 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
1154 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
1155 (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
1156 _ => Err(AlterError { id }),
1157 };
1158
1159 if r.is_err() {
1160 tracing::warn!(
1161 "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1162 self,
1163 other
1164 );
1165 }
1166
1167 r
1168 }
1169}
1170
1171impl RustType<ProtoSourceExportDetails> for SourceExportDetails {
1172 fn into_proto(&self) -> ProtoSourceExportDetails {
1173 use proto_source_export_details::Kind;
1174 ProtoSourceExportDetails {
1175 kind: match self {
1176 SourceExportDetails::None => None,
1177 SourceExportDetails::Kafka(details) => Some(Kind::Kafka(details.into_proto())),
1178 SourceExportDetails::Postgres(details) => {
1179 Some(Kind::Postgres(details.into_proto()))
1180 }
1181 SourceExportDetails::MySql(details) => Some(Kind::Mysql(details.into_proto())),
1182 SourceExportDetails::SqlServer(details) => {
1183 Some(Kind::SqlServer(details.into_proto()))
1184 }
1185 SourceExportDetails::LoadGenerator(details) => {
1186 Some(Kind::Loadgen(details.into_proto()))
1187 }
1188 },
1189 }
1190 }
1191
1192 fn from_proto(proto: ProtoSourceExportDetails) -> Result<Self, TryFromProtoError> {
1193 use proto_source_export_details::Kind;
1194 Ok(match proto.kind {
1195 None => SourceExportDetails::None,
1196 Some(Kind::Kafka(details)) => SourceExportDetails::Kafka(details.into_rust()?),
1197 Some(Kind::Postgres(details)) => SourceExportDetails::Postgres(details.into_rust()?),
1198 Some(Kind::Mysql(details)) => SourceExportDetails::MySql(details.into_rust()?),
1199 Some(Kind::SqlServer(details)) => SourceExportDetails::SqlServer(details.into_rust()?),
1200 Some(Kind::Loadgen(details)) => {
1201 SourceExportDetails::LoadGenerator(details.into_rust()?)
1202 }
1203 })
1204 }
1205}
1206
1207pub enum SourceExportStatementDetails {
1213 Postgres {
1214 table: mz_postgres_util::desc::PostgresTableDesc,
1215 },
1216 MySql {
1217 table: mz_mysql_util::MySqlTableDesc,
1218 initial_gtid_set: String,
1219 },
1220 SqlServer {
1221 table: mz_sql_server_util::desc::SqlServerTableDesc,
1222 capture_instance: Arc<str>,
1223 },
1224 LoadGenerator {
1225 output: LoadGeneratorOutput,
1226 },
1227 Kafka {},
1228}
1229
1230impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
1231 fn into_proto(&self) -> ProtoSourceExportStatementDetails {
1232 match self {
1233 SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
1234 kind: Some(proto_source_export_statement_details::Kind::Postgres(
1235 postgres::ProtoPostgresSourceExportStatementDetails {
1236 table: Some(table.into_proto()),
1237 },
1238 )),
1239 },
1240 SourceExportStatementDetails::MySql {
1241 table,
1242 initial_gtid_set,
1243 } => ProtoSourceExportStatementDetails {
1244 kind: Some(proto_source_export_statement_details::Kind::Mysql(
1245 mysql::ProtoMySqlSourceExportStatementDetails {
1246 table: Some(table.into_proto()),
1247 initial_gtid_set: initial_gtid_set.clone(),
1248 },
1249 )),
1250 },
1251 SourceExportStatementDetails::SqlServer {
1252 table,
1253 capture_instance,
1254 } => ProtoSourceExportStatementDetails {
1255 kind: Some(proto_source_export_statement_details::Kind::SqlServer(
1256 sql_server::ProtoSqlServerSourceExportStatementDetails {
1257 table: Some(table.into_proto()),
1258 capture_instance: capture_instance.to_string(),
1259 },
1260 )),
1261 },
1262 SourceExportStatementDetails::LoadGenerator { output } => {
1263 ProtoSourceExportStatementDetails {
1264 kind: Some(proto_source_export_statement_details::Kind::Loadgen(
1265 load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
1266 output: output.into_proto().into(),
1267 },
1268 )),
1269 }
1270 }
1271 SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
1272 kind: Some(proto_source_export_statement_details::Kind::Kafka(
1273 kafka::ProtoKafkaSourceExportStatementDetails {},
1274 )),
1275 },
1276 }
1277 }
1278
1279 fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
1280 use proto_source_export_statement_details::Kind;
1281 Ok(match proto.kind {
1282 Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
1283 table: details
1284 .table
1285 .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
1286 },
1287 Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
1288 table: details
1289 .table
1290 .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
1291
1292 initial_gtid_set: details.initial_gtid_set,
1293 },
1294 Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
1295 table: details
1296 .table
1297 .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
1298 capture_instance: details.capture_instance.into(),
1299 },
1300 Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
1301 output: details
1302 .output
1303 .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1304 },
1305 Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1306 None => {
1307 return Err(TryFromProtoError::missing_field(
1308 "ProtoSourceExportStatementDetails::kind",
1309 ));
1310 }
1311 })
1312 }
1313}
1314
1315#[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1316#[repr(transparent)]
1317pub struct SourceData(pub Result<Row, DataflowError>);
1318
1319impl Default for SourceData {
1320 fn default() -> Self {
1321 SourceData(Ok(Row::default()))
1322 }
1323}
1324
1325impl Deref for SourceData {
1326 type Target = Result<Row, DataflowError>;
1327
1328 fn deref(&self) -> &Self::Target {
1329 &self.0
1330 }
1331}
1332
1333impl DerefMut for SourceData {
1334 fn deref_mut(&mut self) -> &mut Self::Target {
1335 &mut self.0
1336 }
1337}
1338
1339impl RustType<ProtoSourceData> for SourceData {
1340 fn into_proto(&self) -> ProtoSourceData {
1341 use proto_source_data::Kind;
1342 ProtoSourceData {
1343 kind: Some(match &**self {
1344 Ok(row) => Kind::Ok(row.into_proto()),
1345 Err(err) => Kind::Err(err.into_proto()),
1346 }),
1347 }
1348 }
1349
1350 fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1351 use proto_source_data::Kind;
1352 match proto.kind {
1353 Some(kind) => match kind {
1354 Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1355 Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1356 },
1357 None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1358 }
1359 }
1360}
1361
1362impl Codec for SourceData {
1363 type Storage = ProtoRow;
1364 type Schema = RelationDesc;
1365
1366 fn codec_name() -> String {
1367 "protobuf[SourceData]".into()
1368 }
1369
1370 fn encode<B: BufMut>(&self, buf: &mut B) {
1371 self.into_proto()
1372 .encode(buf)
1373 .expect("no required fields means no initialization errors");
1374 }
1375
1376 fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1377 let mut val = SourceData::default();
1378 <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1379 Ok(val)
1380 }
1381
1382 fn decode_from<'a>(
1383 &mut self,
1384 buf: &'a [u8],
1385 storage: &mut Option<ProtoRow>,
1386 schema: &RelationDesc,
1387 ) -> Result<(), String> {
1388 let mut proto = storage.take().unwrap_or_default();
1392 proto.clear();
1393 let mut proto = ProtoSourceData {
1394 kind: Some(proto_source_data::Kind::Ok(proto)),
1395 };
1396 proto.merge(buf).map_err(|err| err.to_string())?;
1397 match (proto.kind, &mut self.0) {
1398 (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1400 let ret = row.decode_from_proto(&proto, schema);
1401 storage.replace(proto);
1402 ret
1403 }
1404 (kind, _) => {
1406 let proto = ProtoSourceData { kind };
1407 *self = proto.into_rust().map_err(|err| err.to_string())?;
1408 Ok(())
1410 }
1411 }
1412 }
1413
1414 fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1415 match &val.0 {
1416 Ok(row) => Row::validate(row, desc),
1417 Err(_) => Ok(()),
1418 }
1419 }
1420
1421 fn encode_schema(schema: &Self::Schema) -> Bytes {
1422 schema.into_proto().encode_to_vec().into()
1423 }
1424
1425 fn decode_schema(buf: &Bytes) -> Self::Schema {
1426 let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1427 proto.into_rust().expect("valid schema")
1428 }
1429}
1430
1431pub fn arb_source_data_for_relation_desc(
1433 desc: &RelationDesc,
1434) -> impl Strategy<Value = SourceData> + use<> {
1435 let row_strat = arb_row_for_relation(desc).no_shrink();
1436
1437 proptest::strategy::Union::new_weighted(vec![
1438 (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1439 (
1440 1,
1441 any::<DataflowError>()
1442 .prop_map(|err| SourceData(Err(err)))
1443 .no_shrink()
1444 .boxed(),
1445 ),
1446 ])
1447}
1448
1449pub trait ExternalCatalogReference {
1457 fn schema_name(&self) -> &str;
1459 fn item_name(&self) -> &str;
1461}
1462
1463impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1464 fn schema_name(&self) -> &str {
1465 &self.schema_name
1466 }
1467
1468 fn item_name(&self) -> &str {
1469 &self.name
1470 }
1471}
1472
1473impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1474 fn schema_name(&self) -> &str {
1475 &self.namespace
1476 }
1477
1478 fn item_name(&self) -> &str {
1479 &self.name
1480 }
1481}
1482
1483impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1484 fn schema_name(&self) -> &str {
1485 &*self.schema_name
1486 }
1487
1488 fn item_name(&self) -> &str {
1489 &*self.name
1490 }
1491}
1492
1493impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1496 fn schema_name(&self) -> &str {
1497 self.0
1498 }
1499
1500 fn item_name(&self) -> &str {
1501 self.1
1502 }
1503}
1504
1505#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1513pub struct SourceReferenceResolver {
1514 inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1515}
1516
1517#[derive(Debug, Clone, thiserror::Error)]
1518pub enum ExternalReferenceResolutionError {
1519 #[error("reference to {name} not found in source")]
1520 DoesNotExist { name: String },
1521 #[error(
1522 "reference {name} is ambiguous, consider specifying an additional \
1523 layer of qualification"
1524 )]
1525 Ambiguous { name: String },
1526 #[error("invalid identifier: {0}")]
1527 Ident(#[from] IdentError),
1528}
1529
1530impl<'a> SourceReferenceResolver {
1531 pub fn new<T: ExternalCatalogReference>(
1537 database: &str,
1538 referenceable_items: &'a [T],
1539 ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1540 let mut inner = BTreeMap::new();
1543
1544 let database = Ident::new(database)?;
1545
1546 for (reference_idx, item) in referenceable_items.iter().enumerate() {
1547 let item_name = Ident::new(item.item_name())?;
1548 let schema_name = Ident::new(item.schema_name())?;
1549
1550 inner
1551 .entry(item_name)
1552 .or_insert_with(BTreeMap::new)
1553 .entry(schema_name)
1554 .or_insert_with(BTreeMap::new)
1555 .entry(database.clone())
1556 .or_insert(reference_idx);
1557 }
1558
1559 Ok(SourceReferenceResolver { inner })
1560 }
1561
1562 pub fn resolve(
1579 &self,
1580 name: &[Ident],
1581 canonicalize_to_width: usize,
1582 ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1583 let (db, schema, idx) = self.resolve_inner(name)?;
1584
1585 let item = name.last().expect("must have provided at least 1 element");
1586
1587 let canonical_name = match canonicalize_to_width {
1588 1 => vec![item.clone()],
1589 2 => vec![schema.clone(), item.clone()],
1590 3 => vec![db.clone(), schema.clone(), item.clone()],
1591 o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1592 };
1593
1594 Ok((UnresolvedItemName(canonical_name), idx))
1595 }
1596
1597 pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1607 let (_db, _schema, idx) = self.resolve_inner(name)?;
1608 Ok(idx)
1609 }
1610
1611 fn resolve_inner<'name: 'a>(
1628 &'a self,
1629 name: &'name [Ident],
1630 ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1631 let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1632
1633 if !(1..=3).contains(&name.len()) {
1635 Err(ExternalReferenceResolutionError::DoesNotExist {
1636 name: get_provided_name(),
1637 })?;
1638 }
1639
1640 let mut names = std::iter::repeat(None)
1642 .take(3 - name.len())
1643 .chain(name.iter().map(Some));
1644
1645 let database = names.next().flatten();
1646 let schema = names.next().flatten();
1647 let item = names
1648 .next()
1649 .flatten()
1650 .expect("must have provided the item name");
1651
1652 assert_none!(names.next(), "expected a 3-element iterator");
1653
1654 let schemas =
1655 self.inner
1656 .get(item)
1657 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1658 name: get_provided_name(),
1659 })?;
1660
1661 let schema = match schema {
1662 Some(schema) => schema,
1663 None => schemas.keys().exactly_one().map_err(|_e| {
1664 ExternalReferenceResolutionError::Ambiguous {
1665 name: get_provided_name(),
1666 }
1667 })?,
1668 };
1669
1670 let databases =
1671 schemas
1672 .get(schema)
1673 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1674 name: get_provided_name(),
1675 })?;
1676
1677 let database = match database {
1678 Some(database) => database,
1679 None => databases.keys().exactly_one().map_err(|_e| {
1680 ExternalReferenceResolutionError::Ambiguous {
1681 name: get_provided_name(),
1682 }
1683 })?,
1684 };
1685
1686 let reference_idx = databases.get(database).ok_or_else(|| {
1687 ExternalReferenceResolutionError::DoesNotExist {
1688 name: get_provided_name(),
1689 }
1690 })?;
1691
1692 Ok((database, schema, *reference_idx))
1693 }
1694}
1695
1696#[derive(Debug)]
1702pub enum SourceDataRowColumnarDecoder {
1703 Row(RowColumnarDecoder),
1704 EmptyRow,
1705}
1706
1707impl SourceDataRowColumnarDecoder {
1708 pub fn decode(&self, idx: usize, row: &mut Row) {
1709 match self {
1710 SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1711 SourceDataRowColumnarDecoder::EmptyRow => {
1712 row.packer();
1714 }
1715 }
1716 }
1717
1718 pub fn goodbytes(&self) -> usize {
1719 match self {
1720 SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1721 SourceDataRowColumnarDecoder::EmptyRow => 0,
1722 }
1723 }
1724}
1725
1726#[derive(Debug)]
1727pub struct SourceDataColumnarDecoder {
1728 row_decoder: SourceDataRowColumnarDecoder,
1729 err_decoder: BinaryArray,
1730}
1731
1732impl SourceDataColumnarDecoder {
1733 pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1734 let (_fields, arrays, nullability) = col.into_parts();
1736
1737 if nullability.is_some() {
1738 anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1739 }
1740 if arrays.len() != 2 {
1741 anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1742 }
1743
1744 let errs = arrays[1]
1745 .as_any()
1746 .downcast_ref::<BinaryArray>()
1747 .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1748
1749 let row_decoder = match arrays[0].data_type() {
1750 arrow::datatypes::DataType::Struct(_) => {
1751 let rows = arrays[0]
1752 .as_any()
1753 .downcast_ref::<StructArray>()
1754 .ok_or_else(|| {
1755 anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1756 })?;
1757 let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1758 SourceDataRowColumnarDecoder::Row(decoder)
1759 }
1760 arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1761 other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1762 };
1763
1764 Ok(SourceDataColumnarDecoder {
1765 row_decoder,
1766 err_decoder: errs.clone(),
1767 })
1768 }
1769}
1770
1771impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1772 fn decode(&self, idx: usize, val: &mut SourceData) {
1773 let err_null = self.err_decoder.is_null(idx);
1774 let row_null = match &self.row_decoder {
1775 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1776 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1777 };
1778
1779 match (row_null, err_null) {
1780 (true, false) => {
1781 let err = self.err_decoder.value(idx);
1782 let err = ProtoDataflowError::decode(err)
1783 .expect("proto should be valid")
1784 .into_rust()
1785 .expect("error should be valid");
1786 val.0 = Err(err);
1787 }
1788 (false, true) => {
1789 let row = match val.0.as_mut() {
1790 Ok(row) => row,
1791 Err(_) => {
1792 val.0 = Ok(Row::default());
1793 val.0.as_mut().unwrap()
1794 }
1795 };
1796 self.row_decoder.decode(idx, row);
1797 }
1798 (true, true) => panic!("should have one of 'ok' or 'err'"),
1799 (false, false) => panic!("cannot have both 'ok' and 'err'"),
1800 }
1801 }
1802
1803 fn is_null(&self, idx: usize) -> bool {
1804 let err_null = self.err_decoder.is_null(idx);
1805 let row_null = match &self.row_decoder {
1806 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1807 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1808 };
1809 assert!(!err_null || !row_null, "SourceData should never be null!");
1810
1811 false
1812 }
1813
1814 fn goodbytes(&self) -> usize {
1815 self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1816 }
1817
1818 fn stats(&self) -> StructStats {
1819 let len = self.err_decoder.len();
1820 let err_stats = ColumnarStats {
1821 nulls: Some(ColumnNullStats {
1822 count: self.err_decoder.null_count(),
1823 }),
1824 values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1825 };
1826 let row_null_count = len - self.err_decoder.null_count();
1831 let row_stats = match &self.row_decoder {
1832 SourceDataRowColumnarDecoder::Row(encoder) => {
1833 assert_eq!(encoder.null_count(), row_null_count);
1837 encoder.stats()
1838 }
1839 SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1840 len,
1841 cols: BTreeMap::default(),
1842 },
1843 };
1844 let row_stats = ColumnarStats {
1845 nulls: Some(ColumnNullStats {
1846 count: row_null_count,
1847 }),
1848 values: ColumnStatKinds::Struct(row_stats),
1849 };
1850
1851 let stats = [
1852 (
1853 SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1854 row_stats,
1855 ),
1856 (
1857 SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1858 err_stats,
1859 ),
1860 ];
1861 StructStats {
1862 len,
1863 cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1864 }
1865 }
1866}
1867
1868#[derive(Debug)]
1875pub enum SourceDataRowColumnarEncoder {
1876 Row(RowColumnarEncoder),
1877 EmptyRow,
1878}
1879
1880impl SourceDataRowColumnarEncoder {
1881 pub(crate) fn goodbytes(&self) -> usize {
1882 match self {
1883 SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1884 SourceDataRowColumnarEncoder::EmptyRow => 0,
1885 }
1886 }
1887
1888 pub fn append(&mut self, row: &Row) {
1889 match self {
1890 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1891 SourceDataRowColumnarEncoder::EmptyRow => {
1892 assert_eq!(row.iter().count(), 0)
1893 }
1894 }
1895 }
1896
1897 pub fn append_null(&mut self) {
1898 match self {
1899 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1900 SourceDataRowColumnarEncoder::EmptyRow => (),
1901 }
1902 }
1903}
1904
1905#[derive(Debug)]
1906pub struct SourceDataColumnarEncoder {
1907 row_encoder: SourceDataRowColumnarEncoder,
1908 err_encoder: BinaryBuilder,
1909}
1910
1911impl SourceDataColumnarEncoder {
1912 const OK_COLUMN_NAME: &'static str = "ok";
1913 const ERR_COLUMN_NAME: &'static str = "err";
1914
1915 pub fn new(desc: &RelationDesc) -> Self {
1916 let row_encoder = match RowColumnarEncoder::new(desc) {
1917 Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1918 None => {
1919 assert!(desc.typ().columns().is_empty());
1920 SourceDataRowColumnarEncoder::EmptyRow
1921 }
1922 };
1923 let err_encoder = BinaryBuilder::new();
1924
1925 SourceDataColumnarEncoder {
1926 row_encoder,
1927 err_encoder,
1928 }
1929 }
1930}
1931
1932impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1933 type FinishedColumn = StructArray;
1934
1935 fn goodbytes(&self) -> usize {
1936 self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1937 }
1938
1939 #[inline]
1940 fn append(&mut self, val: &SourceData) {
1941 match val.0.as_ref() {
1942 Ok(row) => {
1943 self.row_encoder.append(row);
1944 self.err_encoder.append_null();
1945 }
1946 Err(err) => {
1947 self.row_encoder.append_null();
1948 self.err_encoder
1949 .append_value(err.into_proto().encode_to_vec());
1950 }
1951 }
1952 }
1953
1954 #[inline]
1955 fn append_null(&mut self) {
1956 panic!("appending a null into SourceDataColumnarEncoder is not supported");
1957 }
1958
1959 fn finish(self) -> Self::FinishedColumn {
1960 let SourceDataColumnarEncoder {
1961 row_encoder,
1962 mut err_encoder,
1963 } = self;
1964
1965 let err_column = BinaryBuilder::finish(&mut err_encoder);
1966 let row_column: ArrayRef = match row_encoder {
1967 SourceDataRowColumnarEncoder::Row(encoder) => {
1968 let column = encoder.finish();
1969 Arc::new(column)
1970 }
1971 SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1972 };
1973
1974 assert_eq!(row_column.len(), err_column.len());
1975
1976 let fields = vec![
1977 Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1978 Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1979 ];
1980 let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1981 StructArray::new(Fields::from(fields), arrays, None)
1982 }
1983}
1984
1985impl Schema<SourceData> for RelationDesc {
1986 type ArrowColumn = StructArray;
1987 type Statistics = StructStats;
1988
1989 type Decoder = SourceDataColumnarDecoder;
1990 type Encoder = SourceDataColumnarEncoder;
1991
1992 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1993 SourceDataColumnarDecoder::new(col, self)
1994 }
1995
1996 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1997 Ok(SourceDataColumnarEncoder::new(self))
1998 }
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003 use arrow::array::{ArrayData, make_comparator};
2004 use base64::Engine;
2005 use bytes::Bytes;
2006 use mz_expr::EvalError;
2007 use mz_ore::assert_err;
2008 use mz_ore::metrics::MetricsRegistry;
2009 use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
2010 use mz_persist::metrics::ColumnarMetrics;
2011 use mz_persist_types::parquet::EncodingConfig;
2012 use mz_persist_types::schema::{Migration, backward_compatible};
2013 use mz_persist_types::stats::{PartStats, PartStatsMetrics};
2014 use mz_repr::{
2015 ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
2016 RowArena, ScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
2017 };
2018 use proptest::prelude::*;
2019 use proptest::strategy::{Union, ValueTree};
2020
2021 use crate::stats::RelationPartStats;
2022
2023 use super::*;
2024
2025 #[mz_ore::test]
2026 fn test_timeline_parsing() {
2027 assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
2028 assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
2029 assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
2030
2031 assert_err!("Materialize".parse::<Timeline>());
2032 assert_err!("Ejoe".parse::<Timeline>());
2033 assert_err!("Umike".parse::<Timeline>());
2034 assert_err!("Dance".parse::<Timeline>());
2035 assert_err!("".parse::<Timeline>());
2036 }
2037
2038 #[track_caller]
2039 fn roundtrip_source_data(
2040 desc: &RelationDesc,
2041 datas: Vec<SourceData>,
2042 read_desc: &RelationDesc,
2043 config: &EncodingConfig,
2044 ) {
2045 let metrics = ColumnarMetrics::disconnected();
2046 let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
2047 for data in &datas {
2048 encoder.append(data);
2049 }
2050 let col = encoder.finish();
2051
2052 assert!(!col.is_nullable());
2054
2055 let col = realloc_array(&col, &metrics);
2057
2058 {
2060 let proto = col.to_data().into_proto();
2061 let bytes = proto.encode_to_vec();
2062 let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2063 let array_data: ArrayData = proto.into_rust().unwrap();
2064
2065 let col_rnd = StructArray::from(array_data.clone());
2066 assert_eq!(col, col_rnd);
2067
2068 let col_dyn = arrow::array::make_array(array_data);
2069 let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2070 assert_eq!(&col, col_dyn);
2071 }
2072
2073 let mut buf = Vec::new();
2075 let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
2076 let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
2077 mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
2078
2079 let buf = Bytes::from(buf);
2081 let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
2082 let maybe_batch = reader.next();
2083
2084 let Some(record_batch) = maybe_batch else {
2086 assert!(datas.is_empty());
2087 return;
2088 };
2089 let record_batch = record_batch.unwrap();
2090
2091 assert_eq!(record_batch.columns().len(), 1);
2092 let rnd_col = &record_batch.columns()[0];
2093 let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
2094 let rnd_col = rnd_col
2095 .as_any()
2096 .downcast_ref::<StructArray>()
2097 .unwrap()
2098 .clone();
2099
2100 let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
2102 .expect("valid decoder")
2103 .stats();
2104
2105 let mut rnd_data = SourceData(Ok(Row::default()));
2107 let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
2108 for (idx, og_data) in datas.iter().enumerate() {
2109 decoder.decode(idx, &mut rnd_data);
2110 assert_eq!(og_data, &rnd_data);
2111 }
2112
2113 let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
2116 let stats = RelationPartStats {
2117 name: "test",
2118 metrics: &stats_metrics,
2119 stats: &PartStats { key: stats },
2120 desc: read_desc,
2121 };
2122 let mut datum_vec = DatumVec::new();
2123 let arena = RowArena::default();
2124 let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
2125
2126 for (idx, og_data) in datas.iter().enumerate() {
2127 decoder.decode(idx, &mut rnd_data);
2128 match (&og_data.0, &rnd_data.0) {
2129 (Ok(og_row), Ok(rnd_row)) => {
2130 {
2132 let datums = datum_vec.borrow_with(og_row);
2133 let projected_datums =
2134 datums.iter().enumerate().filter_map(|(idx, datum)| {
2135 read_desc
2136 .contains_index(&ColumnIndex::from_raw(idx))
2137 .then_some(datum)
2138 });
2139 let og_projected_row = Row::pack(projected_datums);
2140 assert_eq!(&og_projected_row, rnd_row);
2141 }
2142
2143 {
2145 let proj_datums = datum_vec.borrow_with(rnd_row);
2146 for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
2147 let spec = stats.col_stats(idx, &arena);
2148 assert!(spec.may_contain(proj_datums[pos]));
2149 }
2150 }
2151 }
2152 (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
2153 (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
2154 }
2155 }
2156
2157 let encoded_schema = SourceData::encode_schema(desc);
2160 let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
2161 assert_eq!(desc, &roundtrip_desc);
2162
2163 let migration =
2166 mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
2167 let migration = migration.expect("should be backward compatible with self");
2168 let migrated = migration.migrate(Arc::new(col.clone()));
2170 assert_eq!(col.data_type(), migrated.data_type());
2171 }
2172
2173 #[mz_ore::test]
2174 #[cfg_attr(miri, ignore)] fn all_source_data_roundtrips() {
2176 let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
2177 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
2178 weights.extend([
2179 (10, Just(32..128)),
2180 (5, Just(128..512)),
2181 (3, Just(512..2048)),
2182 (1, Just(2048..8192)),
2183 ]);
2184 }
2185 let num_rows = Union::new_weighted(weights);
2186
2187 let strat = (any::<RelationDesc>(), num_rows)
2189 .prop_flat_map(|(desc, num_rows)| {
2190 arb_relation_desc_projection(desc.clone())
2191 .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
2192 })
2193 .prop_flat_map(|(desc, read_desc, num_rows)| {
2194 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2195 .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
2196 });
2197
2198 proptest!(|((config, (desc, source_datas, read_desc)) in (any::<EncodingConfig>(), strat))| {
2199 roundtrip_source_data(&desc, source_datas, &read_desc, &config);
2200 });
2201 }
2202
2203 #[mz_ore::test]
2204 fn roundtrip_error_nulls() {
2205 let desc = RelationDescBuilder::default()
2206 .with_column(
2207 "ts",
2208 ScalarType::TimestampTz { precision: None }.nullable(false),
2209 )
2210 .finish();
2211 let source_datas = vec![SourceData(Err(DataflowError::EvalError(
2212 EvalError::DateOutOfRange.into(),
2213 )))];
2214 let config = EncodingConfig::default();
2215 roundtrip_source_data(&desc, source_datas, &desc, &config);
2216 }
2217
2218 fn is_sorted(array: &dyn Array) -> bool {
2219 let sort_options = arrow::compute::SortOptions::default();
2220 let Ok(cmp) = make_comparator(array, array, sort_options) else {
2221 return false;
2227 };
2228 (0..array.len())
2229 .tuple_windows()
2230 .all(|(i, j)| cmp(i, j).is_le())
2231 }
2232
2233 fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
2234 use mz_persist_types::columnar::ColumnEncoder;
2235 let array = Schema::encoder(schema).expect("valid schema").finish();
2236 Array::data_type(&array).clone()
2237 }
2238
2239 #[track_caller]
2240 fn backward_compatible_testcase(
2241 old: &RelationDesc,
2242 new: &RelationDesc,
2243 migration: Migration,
2244 datas: &[SourceData],
2245 ) {
2246 let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
2247 for data in datas {
2248 encoder.append(data);
2249 }
2250 let old = encoder.finish();
2251 let new = Schema::<SourceData>::encoder(new)
2252 .expect("valid schema")
2253 .finish();
2254 let old: Arc<dyn Array> = Arc::new(old);
2255 let new: Arc<dyn Array> = Arc::new(new);
2256 let migrated = migration.migrate(Arc::clone(&old));
2257 assert_eq!(migrated.data_type(), new.data_type());
2258
2259 if migration.preserves_order() && is_sorted(&old) {
2261 assert!(is_sorted(&new))
2262 }
2263 }
2264
2265 #[mz_ore::test]
2266 fn backward_compatible_empty_add_column() {
2267 let old = RelationDesc::empty();
2268 let new = RelationDesc::from_names_and_types([("a", ScalarType::Bool.nullable(true))]);
2269
2270 let old_data_type = get_data_type(&old);
2271 let new_data_type = get_data_type(&new);
2272
2273 let migration = backward_compatible(&old_data_type, &new_data_type);
2274 assert!(migration.is_some());
2275 }
2276
2277 #[mz_ore::test]
2278 fn backward_compatible_project_away_all() {
2279 let old = RelationDesc::from_names_and_types([("a", ScalarType::Bool.nullable(true))]);
2280 let new = RelationDesc::empty();
2281
2282 let old_data_type = get_data_type(&old);
2283 let new_data_type = get_data_type(&new);
2284
2285 let migration = backward_compatible(&old_data_type, &new_data_type);
2286 assert!(migration.is_some());
2287 }
2288
2289 #[mz_ore::test]
2290 #[cfg_attr(miri, ignore)]
2291 fn backward_compatible_migrate() {
2292 let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
2293 proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
2294 .prop_map(move |datas| (old.clone(), new.clone(), datas))
2295 });
2296
2297 proptest!(|((old, new, datas) in strat)| {
2298 let old_data_type = get_data_type(&old);
2299 let new_data_type = get_data_type(&new);
2300
2301 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2302 backward_compatible_testcase(&old, &new, migration, &datas);
2303 };
2304 });
2305 }
2306
2307 #[mz_ore::test]
2308 #[cfg_attr(miri, ignore)]
2309 fn backward_compatible_migrate_from_common() {
2310 use mz_repr::ColumnType;
2311 fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2312 let should_be_compatible = diffs.iter().all(|diff| match diff {
2314 PropRelationDescDiff::AddColumn {
2316 typ: ColumnType { nullable, .. },
2317 ..
2318 } => *nullable,
2319 PropRelationDescDiff::DropColumn { .. } => true,
2320 _ => false,
2321 });
2322
2323 let mut new = old.clone();
2324 for diff in diffs.into_iter() {
2325 diff.apply(&mut new)
2326 }
2327
2328 let old_data_type = get_data_type(&old);
2329 let new_data_type = get_data_type(&new);
2330
2331 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2332 backward_compatible_testcase(&old, &new, migration, &datas);
2333 } else if should_be_compatible {
2334 panic!("new DataType was not compatible when it should have been!");
2335 }
2336 }
2337
2338 let strat = any::<RelationDesc>()
2339 .prop_flat_map(|desc| {
2340 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2341 .no_shrink()
2342 .prop_map(move |datas| (desc.clone(), datas))
2343 })
2344 .prop_flat_map(|(desc, datas)| {
2345 arb_relation_desc_diff(&desc)
2346 .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2347 });
2348
2349 proptest!(|((old, diffs, datas) in strat)| {
2350 test_case(old, diffs, datas);
2351 });
2352 }
2353
2354 #[mz_ore::test]
2355 #[cfg_attr(miri, ignore)] fn empty_relation_desc_roundtrips() {
2357 let empty = RelationDesc::empty();
2358 let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2359 .prop_map(move |datas| (empty.clone(), datas));
2360
2361 proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2364 roundtrip_source_data(&desc, source_datas, &desc, &config);
2365 });
2366 }
2367
2368 #[mz_ore::test]
2369 #[cfg_attr(miri, ignore)] fn arrow_datatype_consistent() {
2371 fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2372 let half = datas.len() / 2;
2373
2374 let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2375 for data in &datas[..half] {
2376 encoder_a.append(data);
2377 }
2378 let col_a = encoder_a.finish();
2379
2380 let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2381 for data in &datas[half..] {
2382 encoder_b.append(data);
2383 }
2384 let col_b = encoder_b.finish();
2385
2386 assert_eq!(col_a.data_type(), col_b.data_type());
2389 }
2390
2391 let num_rows = 12;
2392 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2393 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2394 .prop_map(move |datas| (desc.clone(), datas))
2395 });
2396
2397 proptest!(|((desc, data) in strat)| {
2398 test_case(desc, data);
2399 });
2400 }
2401
2402 #[mz_ore::test]
2403 #[cfg_attr(miri, ignore)] fn source_proto_serialization_stability() {
2405 let min_protos = 10;
2406 let encoded = include_str!("snapshots/source-datas.txt");
2407
2408 let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2410 .lines()
2411 .map(|s| {
2412 let (desc, data) = s.split_once(',').expect("comma separated data");
2413 let desc = base64::engine::general_purpose::STANDARD
2414 .decode(desc)
2415 .expect("valid base64");
2416 let data = base64::engine::general_purpose::STANDARD
2417 .decode(data)
2418 .expect("valid base64");
2419 (desc, data)
2420 })
2421 .map(|(desc, data)| {
2422 let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2423 let desc = desc.into_rust().expect("valid proto");
2424 let data = SourceData::decode(&data, &desc).expect("valid proto");
2425 (desc, data)
2426 })
2427 .collect();
2428
2429 let mut runner = proptest::test_runner::TestRunner::deterministic();
2431 let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2432 arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2433 });
2434 while decoded.len() < min_protos {
2435 let arbitrary_data = strategy
2436 .new_tree(&mut runner)
2437 .expect("source data")
2438 .current();
2439 decoded.push(arbitrary_data);
2440 }
2441
2442 let mut reencoded = String::new();
2444 let mut buf = vec![];
2445 for (desc, data) in decoded {
2446 buf.clear();
2447 desc.into_proto().encode(&mut buf).expect("success");
2448 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2449 reencoded.push(',');
2450
2451 buf.clear();
2452 data.encode(&mut buf);
2453 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2454 reencoded.push('\n');
2455 }
2456
2457 assert_eq!(
2468 encoded,
2469 reencoded.as_str(),
2470 "SourceData serde should be stable"
2471 )
2472 }
2473}