1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33 ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34 StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38 CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39 RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use proptest_derive::Arbitrary;
45use prost::Message;
46use serde::{Deserialize, Serialize};
47use timely::order::{PartialOrder, TotalOrder};
48use timely::progress::timestamp::Refines;
49use timely::progress::{PathSummary, Timestamp};
50
51use crate::AlterCompatible;
52use crate::connections::inline::{
53 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
54 ReferencedConnection,
55};
56use crate::controller::{AlterError, CollectionMetadata};
57use crate::errors::{DataflowError, ProtoDataflowError};
58use crate::instances::StorageInstanceId;
59use crate::sources::proto_ingestion_description::{ProtoSourceExport, ProtoSourceImport};
60use crate::sources::sql_server::SqlServerSourceExportDetails;
61
62pub mod encoding;
63pub mod envelope;
64pub mod kafka;
65pub mod load_generator;
66pub mod mysql;
67pub mod postgres;
68pub mod sql_server;
69
70pub use crate::sources::envelope::SourceEnvelope;
71pub use crate::sources::kafka::KafkaSourceConnection;
72pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
73pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
74pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
75pub use crate::sources::sql_server::{SqlServerSource, SqlServerSourceExtras};
76
77include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
78
79#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
81pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
82 pub desc: SourceDesc<C>,
84 pub ingestion_metadata: S,
86 #[proptest(
100 strategy = "proptest::collection::btree_map(any::<GlobalId>(), any::<SourceExport<S>>(), 0..4)"
101 )]
102 pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
103 pub instance_id: StorageInstanceId,
105 pub remap_collection_id: GlobalId,
107}
108
109impl IngestionDescription {
110 pub fn new(
111 desc: SourceDesc,
112 instance_id: StorageInstanceId,
113 remap_collection_id: GlobalId,
114 ) -> Self {
115 Self {
116 desc,
117 ingestion_metadata: (),
118 source_exports: BTreeMap::new(),
119 instance_id,
120 remap_collection_id,
121 }
122 }
123}
124
125impl<S> IngestionDescription<S> {
126 pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
131 let IngestionDescription {
134 desc: _,
135 ingestion_metadata: _,
136 source_exports,
137 instance_id: _,
138 remap_collection_id,
139 } = &self;
140
141 source_exports
142 .keys()
143 .copied()
144 .chain(std::iter::once(*remap_collection_id))
145 }
146}
147
148impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
149 fn alter_compatible(
150 &self,
151 id: GlobalId,
152 other: &IngestionDescription<S>,
153 ) -> Result<(), AlterError> {
154 if self == other {
155 return Ok(());
156 }
157 let IngestionDescription {
158 desc,
159 ingestion_metadata,
160 source_exports,
161 instance_id,
162 remap_collection_id,
163 } = self;
164
165 let compatibility_checks = [
166 (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
167 (
168 ingestion_metadata == &other.ingestion_metadata,
169 "ingestion_metadata",
170 ),
171 (
172 source_exports
173 .iter()
174 .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
175 l_key.cmp(r_key)
176 })
177 .all(|r| match r {
178 Both(
179 (
180 _,
181 SourceExport {
182 storage_metadata: l_metadata,
183 details: l_details,
184 data_config: l_data_config,
185 },
186 ),
187 (
188 _,
189 SourceExport {
190 storage_metadata: r_metadata,
191 details: r_details,
192 data_config: r_data_config,
193 },
194 ),
195 ) => {
196 l_metadata.alter_compatible(id, r_metadata).is_ok()
197 && l_details.alter_compatible(id, r_details).is_ok()
198 && l_data_config.alter_compatible(id, r_data_config).is_ok()
199 }
200 _ => true,
201 }),
202 "source_exports",
203 ),
204 (instance_id == &other.instance_id, "instance_id"),
205 (
206 remap_collection_id == &other.remap_collection_id,
207 "remap_collection_id",
208 ),
209 ];
210 for (compatible, field) in compatibility_checks {
211 if !compatible {
212 tracing::warn!(
213 "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
214 self,
215 other
216 );
217
218 return Err(AlterError { id });
219 }
220 }
221
222 Ok(())
223 }
224}
225
226impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
227 for IngestionDescription<(), ReferencedConnection>
228{
229 fn into_inline_connection(self, r: R) -> IngestionDescription {
230 let IngestionDescription {
231 desc,
232 ingestion_metadata,
233 source_exports,
234 instance_id,
235 remap_collection_id,
236 } = self;
237
238 IngestionDescription {
239 desc: desc.into_inline_connection(r),
240 ingestion_metadata,
241 source_exports,
242 instance_id,
243 remap_collection_id,
244 }
245 }
246}
247
248#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
249pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
250 pub storage_metadata: S,
252 pub details: SourceExportDetails,
254 pub data_config: SourceExportDataConfig<C>,
256}
257
258impl RustType<ProtoIngestionDescription> for IngestionDescription<CollectionMetadata> {
259 fn into_proto(&self) -> ProtoIngestionDescription {
260 ProtoIngestionDescription {
261 source_exports: self.source_exports.into_proto(),
262 ingestion_metadata: Some(self.ingestion_metadata.into_proto()),
263 desc: Some(self.desc.into_proto()),
264 instance_id: Some(self.instance_id.into_proto()),
265 remap_collection_id: Some(self.remap_collection_id.into_proto()),
266 }
267 }
268
269 fn from_proto(proto: ProtoIngestionDescription) -> Result<Self, TryFromProtoError> {
270 Ok(IngestionDescription {
271 source_exports: proto.source_exports.into_rust()?,
272 desc: proto
273 .desc
274 .into_rust_if_some("ProtoIngestionDescription::desc")?,
275 ingestion_metadata: proto
276 .ingestion_metadata
277 .into_rust_if_some("ProtoIngestionDescription::ingestion_metadata")?,
278 instance_id: proto
279 .instance_id
280 .into_rust_if_some("ProtoIngestionDescription::instance_id")?,
281 remap_collection_id: proto
282 .remap_collection_id
283 .into_rust_if_some("ProtoIngestionDescription::remap_collection_id")?,
284 })
285 }
286}
287
288impl ProtoMapEntry<GlobalId, CollectionMetadata> for ProtoSourceImport {
289 fn from_rust<'a>(entry: (&'a GlobalId, &'a CollectionMetadata)) -> Self {
290 ProtoSourceImport {
291 id: Some(entry.0.into_proto()),
292 storage_metadata: Some(entry.1.into_proto()),
293 }
294 }
295
296 fn into_rust(self) -> Result<(GlobalId, CollectionMetadata), TryFromProtoError> {
297 Ok((
298 self.id.into_rust_if_some("ProtoSourceImport::id")?,
299 self.storage_metadata
300 .into_rust_if_some("ProtoSourceImport::storage_metadata")?,
301 ))
302 }
303}
304
305impl ProtoMapEntry<GlobalId, SourceExport<CollectionMetadata>> for ProtoSourceExport {
306 fn from_rust<'a>(
307 (id, source_export): (&'a GlobalId, &'a SourceExport<CollectionMetadata>),
308 ) -> Self {
309 ProtoSourceExport {
310 id: Some(id.into_proto()),
311 storage_metadata: Some(source_export.storage_metadata.into_proto()),
312 details: Some(source_export.details.into_proto()),
313 data_config: Some(source_export.data_config.into_proto()),
314 }
315 }
316
317 fn into_rust(self) -> Result<(GlobalId, SourceExport<CollectionMetadata>), TryFromProtoError> {
318 Ok((
319 self.id.into_rust_if_some("ProtoSourceExport::id")?,
320 SourceExport {
321 storage_metadata: self
322 .storage_metadata
323 .into_rust_if_some("ProtoSourceExport::storage_metadata")?,
324 details: self
325 .details
326 .into_rust_if_some("ProtoSourceExport::details")?,
327 data_config: self
328 .data_config
329 .into_rust_if_some("ProtoSourceExport::data_config")?,
330 },
331 ))
332 }
333}
334
335pub trait SourceTimestamp:
336 Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
337{
338 fn encode_row(&self) -> Row;
339 fn decode_row(row: &Row) -> Self;
340}
341
342impl SourceTimestamp for MzOffset {
343 fn encode_row(&self) -> Row {
344 Row::pack([Datum::UInt64(self.offset)])
345 }
346
347 fn decode_row(row: &Row) -> Self {
348 let mut datums = row.iter();
349 match (datums.next(), datums.next()) {
350 (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
351 _ => panic!("invalid row {row:?}"),
352 }
353 }
354}
355
356#[derive(
360 Copy,
361 Clone,
362 Default,
363 Debug,
364 PartialEq,
365 PartialOrd,
366 Eq,
367 Ord,
368 Hash,
369 Serialize,
370 Deserialize,
371 Arbitrary,
372)]
373pub struct MzOffset {
374 pub offset: u64,
375}
376
377impl differential_dataflow::difference::Semigroup for MzOffset {
378 fn plus_equals(&mut self, rhs: &Self) {
379 self.offset.plus_equals(&rhs.offset)
380 }
381}
382
383impl differential_dataflow::difference::IsZero for MzOffset {
384 fn is_zero(&self) -> bool {
385 self.offset.is_zero()
386 }
387}
388
389impl mz_persist_types::Codec64 for MzOffset {
390 fn codec_name() -> String {
391 "MzOffset".to_string()
392 }
393
394 fn encode(&self) -> [u8; 8] {
395 mz_persist_types::Codec64::encode(&self.offset)
396 }
397
398 fn decode(buf: [u8; 8]) -> Self {
399 Self {
400 offset: mz_persist_types::Codec64::decode(buf),
401 }
402 }
403}
404
405impl columnation::Columnation for MzOffset {
406 type InnerRegion = columnation::CopyRegion<MzOffset>;
407}
408
409impl RustType<ProtoMzOffset> for MzOffset {
410 fn into_proto(&self) -> ProtoMzOffset {
411 ProtoMzOffset {
412 offset: self.offset,
413 }
414 }
415
416 fn from_proto(proto: ProtoMzOffset) -> Result<Self, TryFromProtoError> {
417 Ok(Self {
418 offset: proto.offset,
419 })
420 }
421}
422
423impl MzOffset {
424 pub fn checked_sub(self, other: Self) -> Option<Self> {
425 self.offset
426 .checked_sub(other.offset)
427 .map(|offset| Self { offset })
428 }
429}
430
431impl From<u64> for MzOffset {
434 fn from(offset: u64) -> Self {
435 Self { offset }
436 }
437}
438
439impl std::fmt::Display for MzOffset {
440 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441 write!(f, "{}", self.offset)
442 }
443}
444
445impl Add<u64> for MzOffset {
447 type Output = MzOffset;
448
449 fn add(self, x: u64) -> MzOffset {
450 MzOffset {
451 offset: self.offset + x,
452 }
453 }
454}
455impl Add<Self> for MzOffset {
456 type Output = Self;
457
458 fn add(self, x: Self) -> Self {
459 MzOffset {
460 offset: self.offset + x.offset,
461 }
462 }
463}
464impl AddAssign<u64> for MzOffset {
465 fn add_assign(&mut self, x: u64) {
466 self.offset += x;
467 }
468}
469impl AddAssign<Self> for MzOffset {
470 fn add_assign(&mut self, x: Self) {
471 self.offset += x.offset;
472 }
473}
474
475impl From<tokio_postgres::types::PgLsn> for MzOffset {
477 fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
478 MzOffset { offset: lsn.into() }
479 }
480}
481
482impl Timestamp for MzOffset {
483 type Summary = MzOffset;
484
485 fn minimum() -> Self {
486 MzOffset {
487 offset: Timestamp::minimum(),
488 }
489 }
490}
491
492impl PathSummary<MzOffset> for MzOffset {
493 fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
494 Some(MzOffset {
495 offset: self.offset.results_in(&src.offset)?,
496 })
497 }
498
499 fn followed_by(&self, other: &Self) -> Option<Self> {
500 Some(MzOffset {
501 offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
502 })
503 }
504}
505
506impl Refines<()> for MzOffset {
507 fn to_inner(_: ()) -> Self {
508 MzOffset::minimum()
509 }
510 fn to_outer(self) {}
511 fn summarize(_: Self::Summary) {}
512}
513
514impl PartialOrder for MzOffset {
515 #[inline]
516 fn less_equal(&self, other: &Self) -> bool {
517 self.offset.less_equal(&other.offset)
518 }
519}
520
521impl TotalOrder for MzOffset {}
522
523#[derive(Arbitrary, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Hash)]
532pub enum Timeline {
533 EpochMilliseconds,
536 External(String),
540 User(String),
544}
545
546impl Timeline {
547 const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
548 const EXTERNAL_ID_CHAR: char = 'E';
549 const USER_ID_CHAR: char = 'U';
550
551 fn id_char(&self) -> char {
552 match self {
553 Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
554 Self::External(_) => Self::EXTERNAL_ID_CHAR,
555 Self::User(_) => Self::USER_ID_CHAR,
556 }
557 }
558}
559
560impl RustType<ProtoTimeline> for Timeline {
561 fn into_proto(&self) -> ProtoTimeline {
562 use proto_timeline::Kind;
563 ProtoTimeline {
564 kind: Some(match self {
565 Timeline::EpochMilliseconds => Kind::EpochMilliseconds(()),
566 Timeline::External(s) => Kind::External(s.clone()),
567 Timeline::User(s) => Kind::User(s.clone()),
568 }),
569 }
570 }
571
572 fn from_proto(proto: ProtoTimeline) -> Result<Self, TryFromProtoError> {
573 use proto_timeline::Kind;
574 let kind = proto
575 .kind
576 .ok_or_else(|| TryFromProtoError::missing_field("ProtoTimeline::kind"))?;
577 Ok(match kind {
578 Kind::EpochMilliseconds(()) => Timeline::EpochMilliseconds,
579 Kind::External(s) => Timeline::External(s),
580 Kind::User(s) => Timeline::User(s),
581 })
582 }
583}
584
585impl ToString for Timeline {
586 fn to_string(&self) -> String {
587 match self {
588 Self::EpochMilliseconds => format!("{}", self.id_char()),
589 Self::External(id) => format!("{}.{id}", self.id_char()),
590 Self::User(id) => format!("{}.{id}", self.id_char()),
591 }
592 }
593}
594
595impl FromStr for Timeline {
596 type Err = String;
597
598 fn from_str(s: &str) -> Result<Self, Self::Err> {
599 if s.is_empty() {
600 return Err("empty timeline".to_string());
601 }
602 let mut chars = s.chars();
603 match chars.next().expect("non-empty string") {
604 Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
605 None => Ok(Self::EpochMilliseconds),
606 Some(_) => Err(format!("unknown timeline: {s}")),
607 },
608 Self::EXTERNAL_ID_CHAR => match chars.next() {
609 Some('.') => Ok(Self::External(chars.as_str().to_string())),
610 _ => Err(format!("unknown timeline: {s}")),
611 },
612 Self::USER_ID_CHAR => match chars.next() {
613 Some('.') => Ok(Self::User(chars.as_str().to_string())),
614 _ => Err(format!("unknown timeline: {s}")),
615 },
616 _ => Err(format!("unknown timeline: {s}")),
617 }
618 }
619}
620
621pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
623 fn name(&self) -> &'static str;
625
626 fn external_reference(&self) -> Option<&str>;
628
629 fn default_key_desc(&self) -> RelationDesc;
633
634 fn default_value_desc(&self) -> RelationDesc;
638
639 fn timestamp_desc(&self) -> RelationDesc;
642
643 fn connection_id(&self) -> Option<CatalogItemId>;
646
647 fn primary_export_details(&self) -> SourceExportDetails;
651
652 fn supports_read_only(&self) -> bool;
654
655 fn prefers_single_replica(&self) -> bool;
657}
658
659#[derive(Arbitrary, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
660pub enum Compression {
661 Gzip,
662 None,
663}
664
665impl RustType<ProtoCompression> for Compression {
666 fn into_proto(&self) -> ProtoCompression {
667 use proto_compression::Kind;
668 ProtoCompression {
669 kind: Some(match self {
670 Compression::Gzip => Kind::Gzip(()),
671 Compression::None => Kind::None(()),
672 }),
673 }
674 }
675
676 fn from_proto(proto: ProtoCompression) -> Result<Self, TryFromProtoError> {
677 use proto_compression::Kind;
678 Ok(match proto.kind {
679 Some(Kind::Gzip(())) => Compression::Gzip,
680 Some(Kind::None(())) => Compression::None,
681 None => {
682 return Err(TryFromProtoError::MissingField(
683 "ProtoCompression::kind".into(),
684 ));
685 }
686 })
687 }
688}
689
690#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
693pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
694 pub encoding: Option<encoding::SourceDataEncoding<C>>,
695 pub envelope: SourceEnvelope,
696}
697
698impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
699 for SourceExportDataConfig<ReferencedConnection>
700{
701 fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
702 let SourceExportDataConfig { encoding, envelope } = self;
703
704 SourceExportDataConfig {
705 encoding: encoding.map(|e| e.into_inline_connection(r)),
706 envelope,
707 }
708 }
709}
710
711impl RustType<ProtoSourceExportDataConfig> for SourceExportDataConfig {
712 fn into_proto(&self) -> ProtoSourceExportDataConfig {
713 ProtoSourceExportDataConfig {
714 encoding: self.encoding.into_proto(),
715 envelope: Some(self.envelope.into_proto()),
716 }
717 }
718
719 fn from_proto(proto: ProtoSourceExportDataConfig) -> Result<Self, TryFromProtoError> {
720 Ok(SourceExportDataConfig {
721 encoding: proto.encoding.into_rust()?,
722 envelope: proto
723 .envelope
724 .into_rust_if_some("ProtoSourceExportDataConfig::envelope")?,
725 })
726 }
727}
728
729impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
730 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
731 if self == other {
732 return Ok(());
733 }
734 let Self { encoding, envelope } = &self;
735
736 let compatibility_checks = [
737 (
738 match (encoding, &other.encoding) {
739 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
740 (s, o) => s == o,
741 },
742 "encoding",
743 ),
744 (envelope == &other.envelope, "envelope"),
745 ];
746
747 for (compatible, field) in compatibility_checks {
748 if !compatible {
749 tracing::warn!(
750 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
751 self,
752 other
753 );
754
755 return Err(AlterError { id });
756 }
757 }
758 Ok(())
759 }
760}
761
762impl<C: ConnectionAccess> SourceExportDataConfig<C> {
763 pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
770 match &self.envelope {
771 SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
773 SourceEnvelope::None(_) => {
774 match connection {
775 GenericSourceConnection::Postgres(_) => false,
777 GenericSourceConnection::MySql(_) => false,
779 GenericSourceConnection::SqlServer(_) => false,
781 GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
783 GenericSourceConnection::Kafka(_) => true,
785 }
786 }
787 }
788 }
789}
790
791#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
793pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
794 pub connection: GenericSourceConnection<C>,
795 pub timestamp_interval: Duration,
796 pub primary_export: SourceExportDataConfig<C>,
801 pub primary_export_details: SourceExportDetails,
802}
803
804impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
805 for SourceDesc<ReferencedConnection>
806{
807 fn into_inline_connection(self, r: R) -> SourceDesc {
808 let SourceDesc {
809 connection,
810 primary_export,
811 primary_export_details,
812 timestamp_interval,
813 } = self;
814
815 SourceDesc {
816 connection: connection.into_inline_connection(&r),
817 primary_export: primary_export.into_inline_connection(r),
818 primary_export_details,
819 timestamp_interval,
820 }
821 }
822}
823
824impl RustType<ProtoSourceDesc> for SourceDesc {
825 fn into_proto(&self) -> ProtoSourceDesc {
826 ProtoSourceDesc {
827 connection: Some(self.connection.into_proto()),
828 primary_export: Some(self.primary_export.into_proto()),
829 primary_export_details: Some(self.primary_export_details.into_proto()),
830 timestamp_interval: Some(self.timestamp_interval.into_proto()),
831 }
832 }
833
834 fn from_proto(proto: ProtoSourceDesc) -> Result<Self, TryFromProtoError> {
835 Ok(SourceDesc {
836 connection: proto
837 .connection
838 .into_rust_if_some("ProtoSourceDesc::connection")?,
839 primary_export: proto
840 .primary_export
841 .into_rust_if_some("ProtoSourceDesc::primary_export")?,
842 primary_export_details: proto
843 .primary_export_details
844 .into_rust_if_some("ProtoSourceDesc::primary_export_details")?,
845 timestamp_interval: proto
846 .timestamp_interval
847 .into_rust_if_some("ProtoSourceDesc::timestamp_interval")?,
848 })
849 }
850}
851
852impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
853 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
857 if self == other {
858 return Ok(());
859 }
860 let Self {
861 connection,
862 primary_export,
863 primary_export_details,
864 timestamp_interval,
865 } = &self;
866
867 let compatibility_checks = [
868 (
869 connection.alter_compatible(id, &other.connection).is_ok(),
870 "connection",
871 ),
872 (primary_export == &other.primary_export, "primary_export"),
873 (
874 primary_export_details == &other.primary_export_details,
875 "primary_export_details",
876 ),
877 (
878 timestamp_interval == &other.timestamp_interval,
879 "timestamp_interval",
880 ),
881 ];
882
883 for (compatible, field) in compatibility_checks {
884 if !compatible {
885 tracing::warn!(
886 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
887 self,
888 other
889 );
890
891 return Err(AlterError { id });
892 }
893 }
894
895 Ok(())
896 }
897}
898
899impl SourceDesc<InlinedConnection> {
900 pub fn primary_source_export(&self) -> SourceExport<(), InlinedConnection> {
904 SourceExport {
905 storage_metadata: (),
906 details: self.primary_export_details.clone(),
907 data_config: self.primary_export.clone(),
908 }
909 }
910}
911
912#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
913pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
914 Kafka(KafkaSourceConnection<C>),
915 Postgres(PostgresSourceConnection<C>),
916 MySql(MySqlSourceConnection<C>),
917 SqlServer(SqlServerSource<C>),
918 LoadGenerator(LoadGeneratorSourceConnection),
919}
920
921impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
922 fn from(conn: KafkaSourceConnection<C>) -> Self {
923 Self::Kafka(conn)
924 }
925}
926
927impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
928 fn from(conn: PostgresSourceConnection<C>) -> Self {
929 Self::Postgres(conn)
930 }
931}
932
933impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
934 fn from(conn: MySqlSourceConnection<C>) -> Self {
935 Self::MySql(conn)
936 }
937}
938
939impl<C: ConnectionAccess> From<SqlServerSource<C>> for GenericSourceConnection<C> {
940 fn from(conn: SqlServerSource<C>) -> Self {
941 Self::SqlServer(conn)
942 }
943}
944
945impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
946 fn from(conn: LoadGeneratorSourceConnection) -> Self {
947 Self::LoadGenerator(conn)
948 }
949}
950
951impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
952 for GenericSourceConnection<ReferencedConnection>
953{
954 fn into_inline_connection(self, r: R) -> GenericSourceConnection {
955 match self {
956 GenericSourceConnection::Kafka(kafka) => {
957 GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
958 }
959 GenericSourceConnection::Postgres(pg) => {
960 GenericSourceConnection::Postgres(pg.into_inline_connection(r))
961 }
962 GenericSourceConnection::MySql(mysql) => {
963 GenericSourceConnection::MySql(mysql.into_inline_connection(r))
964 }
965 GenericSourceConnection::SqlServer(sql_server) => {
966 GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
967 }
968 GenericSourceConnection::LoadGenerator(lg) => {
969 GenericSourceConnection::LoadGenerator(lg)
970 }
971 }
972 }
973}
974
975impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
976 fn name(&self) -> &'static str {
977 match self {
978 Self::Kafka(conn) => conn.name(),
979 Self::Postgres(conn) => conn.name(),
980 Self::MySql(conn) => conn.name(),
981 Self::SqlServer(conn) => conn.name(),
982 Self::LoadGenerator(conn) => conn.name(),
983 }
984 }
985
986 fn external_reference(&self) -> Option<&str> {
987 match self {
988 Self::Kafka(conn) => conn.external_reference(),
989 Self::Postgres(conn) => conn.external_reference(),
990 Self::MySql(conn) => conn.external_reference(),
991 Self::SqlServer(conn) => conn.external_reference(),
992 Self::LoadGenerator(conn) => conn.external_reference(),
993 }
994 }
995
996 fn default_key_desc(&self) -> RelationDesc {
997 match self {
998 Self::Kafka(conn) => conn.default_key_desc(),
999 Self::Postgres(conn) => conn.default_key_desc(),
1000 Self::MySql(conn) => conn.default_key_desc(),
1001 Self::SqlServer(conn) => conn.default_key_desc(),
1002 Self::LoadGenerator(conn) => conn.default_key_desc(),
1003 }
1004 }
1005
1006 fn default_value_desc(&self) -> RelationDesc {
1007 match self {
1008 Self::Kafka(conn) => conn.default_value_desc(),
1009 Self::Postgres(conn) => conn.default_value_desc(),
1010 Self::MySql(conn) => conn.default_value_desc(),
1011 Self::SqlServer(conn) => conn.default_value_desc(),
1012 Self::LoadGenerator(conn) => conn.default_value_desc(),
1013 }
1014 }
1015
1016 fn timestamp_desc(&self) -> RelationDesc {
1017 match self {
1018 Self::Kafka(conn) => conn.timestamp_desc(),
1019 Self::Postgres(conn) => conn.timestamp_desc(),
1020 Self::MySql(conn) => conn.timestamp_desc(),
1021 Self::SqlServer(conn) => conn.timestamp_desc(),
1022 Self::LoadGenerator(conn) => conn.timestamp_desc(),
1023 }
1024 }
1025
1026 fn connection_id(&self) -> Option<CatalogItemId> {
1027 match self {
1028 Self::Kafka(conn) => conn.connection_id(),
1029 Self::Postgres(conn) => conn.connection_id(),
1030 Self::MySql(conn) => conn.connection_id(),
1031 Self::SqlServer(conn) => conn.connection_id(),
1032 Self::LoadGenerator(conn) => conn.connection_id(),
1033 }
1034 }
1035
1036 fn primary_export_details(&self) -> SourceExportDetails {
1037 match self {
1038 Self::Kafka(conn) => conn.primary_export_details(),
1039 Self::Postgres(conn) => conn.primary_export_details(),
1040 Self::MySql(conn) => conn.primary_export_details(),
1041 Self::SqlServer(conn) => conn.primary_export_details(),
1042 Self::LoadGenerator(conn) => conn.primary_export_details(),
1043 }
1044 }
1045
1046 fn supports_read_only(&self) -> bool {
1047 match self {
1048 GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
1049 GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
1050 GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
1051 GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
1052 GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
1053 }
1054 }
1055
1056 fn prefers_single_replica(&self) -> bool {
1057 match self {
1058 GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
1059 GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
1060 GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
1061 GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
1062 GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
1063 }
1064 }
1065}
1066impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
1067 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1068 if self == other {
1069 return Ok(());
1070 }
1071 let r = match (self, other) {
1072 (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
1073 (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
1074 (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
1075 (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
1076 (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
1077 conn.alter_compatible(id, other)
1078 }
1079 _ => Err(AlterError { id }),
1080 };
1081
1082 if r.is_err() {
1083 tracing::warn!(
1084 "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1085 self,
1086 other
1087 );
1088 }
1089
1090 r
1091 }
1092}
1093
1094impl RustType<ProtoSourceConnection> for GenericSourceConnection<InlinedConnection> {
1095 fn into_proto(&self) -> ProtoSourceConnection {
1096 use proto_source_connection::Kind;
1097 ProtoSourceConnection {
1098 kind: Some(match self {
1099 GenericSourceConnection::Kafka(kafka) => Kind::Kafka(kafka.into_proto()),
1100 GenericSourceConnection::Postgres(postgres) => {
1101 Kind::Postgres(postgres.into_proto())
1102 }
1103 GenericSourceConnection::MySql(mysql) => Kind::Mysql(mysql.into_proto()),
1104 GenericSourceConnection::SqlServer(sql_server) => {
1105 Kind::SqlServer(sql_server.into_proto())
1106 }
1107 GenericSourceConnection::LoadGenerator(loadgen) => {
1108 Kind::Loadgen(loadgen.into_proto())
1109 }
1110 }),
1111 }
1112 }
1113
1114 fn from_proto(proto: ProtoSourceConnection) -> Result<Self, TryFromProtoError> {
1115 use proto_source_connection::Kind;
1116 let kind = proto
1117 .kind
1118 .ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceConnection::kind"))?;
1119 Ok(match kind {
1120 Kind::Kafka(kafka) => GenericSourceConnection::Kafka(kafka.into_rust()?),
1121 Kind::Postgres(postgres) => GenericSourceConnection::Postgres(postgres.into_rust()?),
1122 Kind::Mysql(mysql) => GenericSourceConnection::MySql(mysql.into_rust()?),
1123 Kind::SqlServer(sql_server) => {
1124 GenericSourceConnection::SqlServer(sql_server.into_rust()?)
1125 }
1126 Kind::Loadgen(loadgen) => GenericSourceConnection::LoadGenerator(loadgen.into_rust()?),
1127 })
1128 }
1129}
1130
1131#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1134pub enum SourceExportDetails {
1135 None,
1138 Kafka(KafkaSourceExportDetails),
1139 Postgres(PostgresSourceExportDetails),
1140 MySql(MySqlSourceExportDetails),
1141 SqlServer(SqlServerSourceExportDetails),
1142 LoadGenerator(LoadGeneratorSourceExportDetails),
1143}
1144
1145impl crate::AlterCompatible for SourceExportDetails {
1146 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1147 if self == other {
1148 return Ok(());
1149 }
1150 let r = match (self, other) {
1151 (Self::None, Self::None) => Ok(()),
1152 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
1153 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
1154 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
1155 (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
1156 _ => Err(AlterError { id }),
1157 };
1158
1159 if r.is_err() {
1160 tracing::warn!(
1161 "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1162 self,
1163 other
1164 );
1165 }
1166
1167 r
1168 }
1169}
1170
1171impl RustType<ProtoSourceExportDetails> for SourceExportDetails {
1172 fn into_proto(&self) -> ProtoSourceExportDetails {
1173 use proto_source_export_details::Kind;
1174 ProtoSourceExportDetails {
1175 kind: match self {
1176 SourceExportDetails::None => None,
1177 SourceExportDetails::Kafka(details) => Some(Kind::Kafka(details.into_proto())),
1178 SourceExportDetails::Postgres(details) => {
1179 Some(Kind::Postgres(details.into_proto()))
1180 }
1181 SourceExportDetails::MySql(details) => Some(Kind::Mysql(details.into_proto())),
1182 SourceExportDetails::SqlServer(details) => {
1183 Some(Kind::SqlServer(details.into_proto()))
1184 }
1185 SourceExportDetails::LoadGenerator(details) => {
1186 Some(Kind::Loadgen(details.into_proto()))
1187 }
1188 },
1189 }
1190 }
1191
1192 fn from_proto(proto: ProtoSourceExportDetails) -> Result<Self, TryFromProtoError> {
1193 use proto_source_export_details::Kind;
1194 Ok(match proto.kind {
1195 None => SourceExportDetails::None,
1196 Some(Kind::Kafka(details)) => SourceExportDetails::Kafka(details.into_rust()?),
1197 Some(Kind::Postgres(details)) => SourceExportDetails::Postgres(details.into_rust()?),
1198 Some(Kind::Mysql(details)) => SourceExportDetails::MySql(details.into_rust()?),
1199 Some(Kind::SqlServer(details)) => SourceExportDetails::SqlServer(details.into_rust()?),
1200 Some(Kind::Loadgen(details)) => {
1201 SourceExportDetails::LoadGenerator(details.into_rust()?)
1202 }
1203 })
1204 }
1205}
1206
1207pub enum SourceExportStatementDetails {
1213 Postgres {
1214 table: mz_postgres_util::desc::PostgresTableDesc,
1215 },
1216 MySql {
1217 table: mz_mysql_util::MySqlTableDesc,
1218 initial_gtid_set: String,
1219 },
1220 SqlServer {
1221 table: mz_sql_server_util::desc::SqlServerTableDesc,
1222 capture_instance: Arc<str>,
1223 initial_lsn: mz_sql_server_util::cdc::Lsn,
1224 },
1225 LoadGenerator {
1226 output: LoadGeneratorOutput,
1227 },
1228 Kafka {},
1229}
1230
1231impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
1232 fn into_proto(&self) -> ProtoSourceExportStatementDetails {
1233 match self {
1234 SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
1235 kind: Some(proto_source_export_statement_details::Kind::Postgres(
1236 postgres::ProtoPostgresSourceExportStatementDetails {
1237 table: Some(table.into_proto()),
1238 },
1239 )),
1240 },
1241 SourceExportStatementDetails::MySql {
1242 table,
1243 initial_gtid_set,
1244 } => ProtoSourceExportStatementDetails {
1245 kind: Some(proto_source_export_statement_details::Kind::Mysql(
1246 mysql::ProtoMySqlSourceExportStatementDetails {
1247 table: Some(table.into_proto()),
1248 initial_gtid_set: initial_gtid_set.clone(),
1249 },
1250 )),
1251 },
1252 SourceExportStatementDetails::SqlServer {
1253 table,
1254 capture_instance,
1255 initial_lsn,
1256 } => ProtoSourceExportStatementDetails {
1257 kind: Some(proto_source_export_statement_details::Kind::SqlServer(
1258 sql_server::ProtoSqlServerSourceExportStatementDetails {
1259 table: Some(table.into_proto()),
1260 capture_instance: capture_instance.to_string(),
1261 initial_lsn: initial_lsn.as_bytes().to_vec(),
1262 },
1263 )),
1264 },
1265 SourceExportStatementDetails::LoadGenerator { output } => {
1266 ProtoSourceExportStatementDetails {
1267 kind: Some(proto_source_export_statement_details::Kind::Loadgen(
1268 load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
1269 output: output.into_proto().into(),
1270 },
1271 )),
1272 }
1273 }
1274 SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
1275 kind: Some(proto_source_export_statement_details::Kind::Kafka(
1276 kafka::ProtoKafkaSourceExportStatementDetails {},
1277 )),
1278 },
1279 }
1280 }
1281
1282 fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
1283 use proto_source_export_statement_details::Kind;
1284 Ok(match proto.kind {
1285 Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
1286 table: details
1287 .table
1288 .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
1289 },
1290 Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
1291 table: details
1292 .table
1293 .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
1294
1295 initial_gtid_set: details.initial_gtid_set,
1296 },
1297 Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
1298 table: details
1299 .table
1300 .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
1301 capture_instance: details.capture_instance.into(),
1302 initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
1303 .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
1304 },
1305 Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
1306 output: details
1307 .output
1308 .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1309 },
1310 Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1311 None => {
1312 return Err(TryFromProtoError::missing_field(
1313 "ProtoSourceExportStatementDetails::kind",
1314 ));
1315 }
1316 })
1317 }
1318}
1319
1320#[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1321#[repr(transparent)]
1322pub struct SourceData(pub Result<Row, DataflowError>);
1323
1324impl Default for SourceData {
1325 fn default() -> Self {
1326 SourceData(Ok(Row::default()))
1327 }
1328}
1329
1330impl Deref for SourceData {
1331 type Target = Result<Row, DataflowError>;
1332
1333 fn deref(&self) -> &Self::Target {
1334 &self.0
1335 }
1336}
1337
1338impl DerefMut for SourceData {
1339 fn deref_mut(&mut self) -> &mut Self::Target {
1340 &mut self.0
1341 }
1342}
1343
1344impl RustType<ProtoSourceData> for SourceData {
1345 fn into_proto(&self) -> ProtoSourceData {
1346 use proto_source_data::Kind;
1347 ProtoSourceData {
1348 kind: Some(match &**self {
1349 Ok(row) => Kind::Ok(row.into_proto()),
1350 Err(err) => Kind::Err(err.into_proto()),
1351 }),
1352 }
1353 }
1354
1355 fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1356 use proto_source_data::Kind;
1357 match proto.kind {
1358 Some(kind) => match kind {
1359 Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1360 Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1361 },
1362 None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1363 }
1364 }
1365}
1366
1367impl Codec for SourceData {
1368 type Storage = ProtoRow;
1369 type Schema = RelationDesc;
1370
1371 fn codec_name() -> String {
1372 "protobuf[SourceData]".into()
1373 }
1374
1375 fn encode<B: BufMut>(&self, buf: &mut B) {
1376 self.into_proto()
1377 .encode(buf)
1378 .expect("no required fields means no initialization errors");
1379 }
1380
1381 fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1382 let mut val = SourceData::default();
1383 <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1384 Ok(val)
1385 }
1386
1387 fn decode_from<'a>(
1388 &mut self,
1389 buf: &'a [u8],
1390 storage: &mut Option<ProtoRow>,
1391 schema: &RelationDesc,
1392 ) -> Result<(), String> {
1393 let mut proto = storage.take().unwrap_or_default();
1397 proto.clear();
1398 let mut proto = ProtoSourceData {
1399 kind: Some(proto_source_data::Kind::Ok(proto)),
1400 };
1401 proto.merge(buf).map_err(|err| err.to_string())?;
1402 match (proto.kind, &mut self.0) {
1403 (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1405 let ret = row.decode_from_proto(&proto, schema);
1406 storage.replace(proto);
1407 ret
1408 }
1409 (kind, _) => {
1411 let proto = ProtoSourceData { kind };
1412 *self = proto.into_rust().map_err(|err| err.to_string())?;
1413 Ok(())
1415 }
1416 }
1417 }
1418
1419 fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1420 match &val.0 {
1421 Ok(row) => Row::validate(row, desc),
1422 Err(_) => Ok(()),
1423 }
1424 }
1425
1426 fn encode_schema(schema: &Self::Schema) -> Bytes {
1427 schema.into_proto().encode_to_vec().into()
1428 }
1429
1430 fn decode_schema(buf: &Bytes) -> Self::Schema {
1431 let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1432 proto.into_rust().expect("valid schema")
1433 }
1434}
1435
1436pub fn arb_source_data_for_relation_desc(
1438 desc: &RelationDesc,
1439) -> impl Strategy<Value = SourceData> + use<> {
1440 let row_strat = arb_row_for_relation(desc).no_shrink();
1441
1442 proptest::strategy::Union::new_weighted(vec![
1443 (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1444 (
1445 1,
1446 any::<DataflowError>()
1447 .prop_map(|err| SourceData(Err(err)))
1448 .no_shrink()
1449 .boxed(),
1450 ),
1451 ])
1452}
1453
1454pub trait ExternalCatalogReference {
1462 fn schema_name(&self) -> &str;
1464 fn item_name(&self) -> &str;
1466}
1467
1468impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1469 fn schema_name(&self) -> &str {
1470 &self.schema_name
1471 }
1472
1473 fn item_name(&self) -> &str {
1474 &self.name
1475 }
1476}
1477
1478impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1479 fn schema_name(&self) -> &str {
1480 &self.namespace
1481 }
1482
1483 fn item_name(&self) -> &str {
1484 &self.name
1485 }
1486}
1487
1488impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1489 fn schema_name(&self) -> &str {
1490 &*self.schema_name
1491 }
1492
1493 fn item_name(&self) -> &str {
1494 &*self.name
1495 }
1496}
1497
1498impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1501 fn schema_name(&self) -> &str {
1502 self.0
1503 }
1504
1505 fn item_name(&self) -> &str {
1506 self.1
1507 }
1508}
1509
1510#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1518pub struct SourceReferenceResolver {
1519 inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1520}
1521
1522#[derive(Debug, Clone, thiserror::Error)]
1523pub enum ExternalReferenceResolutionError {
1524 #[error("reference to {name} not found in source")]
1525 DoesNotExist { name: String },
1526 #[error(
1527 "reference {name} is ambiguous, consider specifying an additional \
1528 layer of qualification"
1529 )]
1530 Ambiguous { name: String },
1531 #[error("invalid identifier: {0}")]
1532 Ident(#[from] IdentError),
1533}
1534
1535impl<'a> SourceReferenceResolver {
1536 pub fn new<T: ExternalCatalogReference>(
1542 database: &str,
1543 referenceable_items: &'a [T],
1544 ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1545 let mut inner = BTreeMap::new();
1548
1549 let database = Ident::new(database)?;
1550
1551 for (reference_idx, item) in referenceable_items.iter().enumerate() {
1552 let item_name = Ident::new(item.item_name())?;
1553 let schema_name = Ident::new(item.schema_name())?;
1554
1555 inner
1556 .entry(item_name)
1557 .or_insert_with(BTreeMap::new)
1558 .entry(schema_name)
1559 .or_insert_with(BTreeMap::new)
1560 .entry(database.clone())
1561 .or_insert(reference_idx);
1562 }
1563
1564 Ok(SourceReferenceResolver { inner })
1565 }
1566
1567 pub fn resolve(
1584 &self,
1585 name: &[Ident],
1586 canonicalize_to_width: usize,
1587 ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1588 let (db, schema, idx) = self.resolve_inner(name)?;
1589
1590 let item = name.last().expect("must have provided at least 1 element");
1591
1592 let canonical_name = match canonicalize_to_width {
1593 1 => vec![item.clone()],
1594 2 => vec![schema.clone(), item.clone()],
1595 3 => vec![db.clone(), schema.clone(), item.clone()],
1596 o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1597 };
1598
1599 Ok((UnresolvedItemName(canonical_name), idx))
1600 }
1601
1602 pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1612 let (_db, _schema, idx) = self.resolve_inner(name)?;
1613 Ok(idx)
1614 }
1615
1616 fn resolve_inner<'name: 'a>(
1633 &'a self,
1634 name: &'name [Ident],
1635 ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1636 let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1637
1638 if !(1..=3).contains(&name.len()) {
1640 Err(ExternalReferenceResolutionError::DoesNotExist {
1641 name: get_provided_name(),
1642 })?;
1643 }
1644
1645 let mut names = std::iter::repeat(None)
1647 .take(3 - name.len())
1648 .chain(name.iter().map(Some));
1649
1650 let database = names.next().flatten();
1651 let schema = names.next().flatten();
1652 let item = names
1653 .next()
1654 .flatten()
1655 .expect("must have provided the item name");
1656
1657 assert_none!(names.next(), "expected a 3-element iterator");
1658
1659 let schemas =
1660 self.inner
1661 .get(item)
1662 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1663 name: get_provided_name(),
1664 })?;
1665
1666 let schema = match schema {
1667 Some(schema) => schema,
1668 None => schemas.keys().exactly_one().map_err(|_e| {
1669 ExternalReferenceResolutionError::Ambiguous {
1670 name: get_provided_name(),
1671 }
1672 })?,
1673 };
1674
1675 let databases =
1676 schemas
1677 .get(schema)
1678 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1679 name: get_provided_name(),
1680 })?;
1681
1682 let database = match database {
1683 Some(database) => database,
1684 None => databases.keys().exactly_one().map_err(|_e| {
1685 ExternalReferenceResolutionError::Ambiguous {
1686 name: get_provided_name(),
1687 }
1688 })?,
1689 };
1690
1691 let reference_idx = databases.get(database).ok_or_else(|| {
1692 ExternalReferenceResolutionError::DoesNotExist {
1693 name: get_provided_name(),
1694 }
1695 })?;
1696
1697 Ok((database, schema, *reference_idx))
1698 }
1699}
1700
1701#[derive(Debug)]
1707pub enum SourceDataRowColumnarDecoder {
1708 Row(RowColumnarDecoder),
1709 EmptyRow,
1710}
1711
1712impl SourceDataRowColumnarDecoder {
1713 pub fn decode(&self, idx: usize, row: &mut Row) {
1714 match self {
1715 SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1716 SourceDataRowColumnarDecoder::EmptyRow => {
1717 row.packer();
1719 }
1720 }
1721 }
1722
1723 pub fn goodbytes(&self) -> usize {
1724 match self {
1725 SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1726 SourceDataRowColumnarDecoder::EmptyRow => 0,
1727 }
1728 }
1729}
1730
1731#[derive(Debug)]
1732pub struct SourceDataColumnarDecoder {
1733 row_decoder: SourceDataRowColumnarDecoder,
1734 err_decoder: BinaryArray,
1735}
1736
1737impl SourceDataColumnarDecoder {
1738 pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1739 let (_fields, arrays, nullability) = col.into_parts();
1741
1742 if nullability.is_some() {
1743 anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1744 }
1745 if arrays.len() != 2 {
1746 anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1747 }
1748
1749 let errs = arrays[1]
1750 .as_any()
1751 .downcast_ref::<BinaryArray>()
1752 .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1753
1754 let row_decoder = match arrays[0].data_type() {
1755 arrow::datatypes::DataType::Struct(_) => {
1756 let rows = arrays[0]
1757 .as_any()
1758 .downcast_ref::<StructArray>()
1759 .ok_or_else(|| {
1760 anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1761 })?;
1762 let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1763 SourceDataRowColumnarDecoder::Row(decoder)
1764 }
1765 arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1766 other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1767 };
1768
1769 Ok(SourceDataColumnarDecoder {
1770 row_decoder,
1771 err_decoder: errs.clone(),
1772 })
1773 }
1774}
1775
1776impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1777 fn decode(&self, idx: usize, val: &mut SourceData) {
1778 let err_null = self.err_decoder.is_null(idx);
1779 let row_null = match &self.row_decoder {
1780 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1781 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1782 };
1783
1784 match (row_null, err_null) {
1785 (true, false) => {
1786 let err = self.err_decoder.value(idx);
1787 let err = ProtoDataflowError::decode(err)
1788 .expect("proto should be valid")
1789 .into_rust()
1790 .expect("error should be valid");
1791 val.0 = Err(err);
1792 }
1793 (false, true) => {
1794 let row = match val.0.as_mut() {
1795 Ok(row) => row,
1796 Err(_) => {
1797 val.0 = Ok(Row::default());
1798 val.0.as_mut().unwrap()
1799 }
1800 };
1801 self.row_decoder.decode(idx, row);
1802 }
1803 (true, true) => panic!("should have one of 'ok' or 'err'"),
1804 (false, false) => panic!("cannot have both 'ok' and 'err'"),
1805 }
1806 }
1807
1808 fn is_null(&self, idx: usize) -> bool {
1809 let err_null = self.err_decoder.is_null(idx);
1810 let row_null = match &self.row_decoder {
1811 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1812 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1813 };
1814 assert!(!err_null || !row_null, "SourceData should never be null!");
1815
1816 false
1817 }
1818
1819 fn goodbytes(&self) -> usize {
1820 self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1821 }
1822
1823 fn stats(&self) -> StructStats {
1824 let len = self.err_decoder.len();
1825 let err_stats = ColumnarStats {
1826 nulls: Some(ColumnNullStats {
1827 count: self.err_decoder.null_count(),
1828 }),
1829 values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1830 };
1831 let row_null_count = len - self.err_decoder.null_count();
1836 let row_stats = match &self.row_decoder {
1837 SourceDataRowColumnarDecoder::Row(encoder) => {
1838 assert_eq!(encoder.null_count(), row_null_count);
1842 encoder.stats()
1843 }
1844 SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1845 len,
1846 cols: BTreeMap::default(),
1847 },
1848 };
1849 let row_stats = ColumnarStats {
1850 nulls: Some(ColumnNullStats {
1851 count: row_null_count,
1852 }),
1853 values: ColumnStatKinds::Struct(row_stats),
1854 };
1855
1856 let stats = [
1857 (
1858 SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1859 row_stats,
1860 ),
1861 (
1862 SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1863 err_stats,
1864 ),
1865 ];
1866 StructStats {
1867 len,
1868 cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1869 }
1870 }
1871}
1872
1873#[derive(Debug)]
1880pub enum SourceDataRowColumnarEncoder {
1881 Row(RowColumnarEncoder),
1882 EmptyRow,
1883}
1884
1885impl SourceDataRowColumnarEncoder {
1886 pub(crate) fn goodbytes(&self) -> usize {
1887 match self {
1888 SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1889 SourceDataRowColumnarEncoder::EmptyRow => 0,
1890 }
1891 }
1892
1893 pub fn append(&mut self, row: &Row) {
1894 match self {
1895 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1896 SourceDataRowColumnarEncoder::EmptyRow => {
1897 assert_eq!(row.iter().count(), 0)
1898 }
1899 }
1900 }
1901
1902 pub fn append_null(&mut self) {
1903 match self {
1904 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1905 SourceDataRowColumnarEncoder::EmptyRow => (),
1906 }
1907 }
1908}
1909
1910#[derive(Debug)]
1911pub struct SourceDataColumnarEncoder {
1912 row_encoder: SourceDataRowColumnarEncoder,
1913 err_encoder: BinaryBuilder,
1914}
1915
1916impl SourceDataColumnarEncoder {
1917 const OK_COLUMN_NAME: &'static str = "ok";
1918 const ERR_COLUMN_NAME: &'static str = "err";
1919
1920 pub fn new(desc: &RelationDesc) -> Self {
1921 let row_encoder = match RowColumnarEncoder::new(desc) {
1922 Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1923 None => {
1924 assert!(desc.typ().columns().is_empty());
1925 SourceDataRowColumnarEncoder::EmptyRow
1926 }
1927 };
1928 let err_encoder = BinaryBuilder::new();
1929
1930 SourceDataColumnarEncoder {
1931 row_encoder,
1932 err_encoder,
1933 }
1934 }
1935}
1936
1937impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1938 type FinishedColumn = StructArray;
1939
1940 fn goodbytes(&self) -> usize {
1941 self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1942 }
1943
1944 #[inline]
1945 fn append(&mut self, val: &SourceData) {
1946 match val.0.as_ref() {
1947 Ok(row) => {
1948 self.row_encoder.append(row);
1949 self.err_encoder.append_null();
1950 }
1951 Err(err) => {
1952 self.row_encoder.append_null();
1953 self.err_encoder
1954 .append_value(err.into_proto().encode_to_vec());
1955 }
1956 }
1957 }
1958
1959 #[inline]
1960 fn append_null(&mut self) {
1961 panic!("appending a null into SourceDataColumnarEncoder is not supported");
1962 }
1963
1964 fn finish(self) -> Self::FinishedColumn {
1965 let SourceDataColumnarEncoder {
1966 row_encoder,
1967 mut err_encoder,
1968 } = self;
1969
1970 let err_column = BinaryBuilder::finish(&mut err_encoder);
1971 let row_column: ArrayRef = match row_encoder {
1972 SourceDataRowColumnarEncoder::Row(encoder) => {
1973 let column = encoder.finish();
1974 Arc::new(column)
1975 }
1976 SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1977 };
1978
1979 assert_eq!(row_column.len(), err_column.len());
1980
1981 let fields = vec![
1982 Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1983 Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1984 ];
1985 let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1986 StructArray::new(Fields::from(fields), arrays, None)
1987 }
1988}
1989
1990impl Schema<SourceData> for RelationDesc {
1991 type ArrowColumn = StructArray;
1992 type Statistics = StructStats;
1993
1994 type Decoder = SourceDataColumnarDecoder;
1995 type Encoder = SourceDataColumnarEncoder;
1996
1997 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1998 SourceDataColumnarDecoder::new(col, self)
1999 }
2000
2001 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
2002 Ok(SourceDataColumnarEncoder::new(self))
2003 }
2004}
2005
2006#[cfg(test)]
2007mod tests {
2008 use arrow::array::{ArrayData, make_comparator};
2009 use base64::Engine;
2010 use bytes::Bytes;
2011 use mz_expr::EvalError;
2012 use mz_ore::assert_err;
2013 use mz_ore::metrics::MetricsRegistry;
2014 use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
2015 use mz_persist::metrics::ColumnarMetrics;
2016 use mz_persist_types::parquet::EncodingConfig;
2017 use mz_persist_types::schema::{Migration, backward_compatible};
2018 use mz_persist_types::stats::{PartStats, PartStatsMetrics};
2019 use mz_repr::{
2020 ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
2021 RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
2022 };
2023 use proptest::prelude::*;
2024 use proptest::strategy::{Union, ValueTree};
2025
2026 use crate::stats::RelationPartStats;
2027
2028 use super::*;
2029
2030 #[mz_ore::test]
2031 fn test_timeline_parsing() {
2032 assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
2033 assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
2034 assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
2035
2036 assert_err!("Materialize".parse::<Timeline>());
2037 assert_err!("Ejoe".parse::<Timeline>());
2038 assert_err!("Umike".parse::<Timeline>());
2039 assert_err!("Dance".parse::<Timeline>());
2040 assert_err!("".parse::<Timeline>());
2041 }
2042
2043 #[track_caller]
2044 fn roundtrip_source_data(
2045 desc: &RelationDesc,
2046 datas: Vec<SourceData>,
2047 read_desc: &RelationDesc,
2048 config: &EncodingConfig,
2049 ) {
2050 let metrics = ColumnarMetrics::disconnected();
2051 let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
2052 for data in &datas {
2053 encoder.append(data);
2054 }
2055 let col = encoder.finish();
2056
2057 assert!(!col.is_nullable());
2059
2060 let col = realloc_array(&col, &metrics);
2062
2063 {
2065 let proto = col.to_data().into_proto();
2066 let bytes = proto.encode_to_vec();
2067 let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2068 let array_data: ArrayData = proto.into_rust().unwrap();
2069
2070 let col_rnd = StructArray::from(array_data.clone());
2071 assert_eq!(col, col_rnd);
2072
2073 let col_dyn = arrow::array::make_array(array_data);
2074 let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2075 assert_eq!(&col, col_dyn);
2076 }
2077
2078 let mut buf = Vec::new();
2080 let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
2081 let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
2082 mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
2083
2084 let buf = Bytes::from(buf);
2086 let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
2087 let maybe_batch = reader.next();
2088
2089 let Some(record_batch) = maybe_batch else {
2091 assert!(datas.is_empty());
2092 return;
2093 };
2094 let record_batch = record_batch.unwrap();
2095
2096 assert_eq!(record_batch.columns().len(), 1);
2097 let rnd_col = &record_batch.columns()[0];
2098 let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
2099 let rnd_col = rnd_col
2100 .as_any()
2101 .downcast_ref::<StructArray>()
2102 .unwrap()
2103 .clone();
2104
2105 let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
2107 .expect("valid decoder")
2108 .stats();
2109
2110 let mut rnd_data = SourceData(Ok(Row::default()));
2112 let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
2113 for (idx, og_data) in datas.iter().enumerate() {
2114 decoder.decode(idx, &mut rnd_data);
2115 assert_eq!(og_data, &rnd_data);
2116 }
2117
2118 let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
2121 let stats = RelationPartStats {
2122 name: "test",
2123 metrics: &stats_metrics,
2124 stats: &PartStats { key: stats },
2125 desc: read_desc,
2126 };
2127 let mut datum_vec = DatumVec::new();
2128 let arena = RowArena::default();
2129 let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
2130
2131 for (idx, og_data) in datas.iter().enumerate() {
2132 decoder.decode(idx, &mut rnd_data);
2133 match (&og_data.0, &rnd_data.0) {
2134 (Ok(og_row), Ok(rnd_row)) => {
2135 {
2137 let datums = datum_vec.borrow_with(og_row);
2138 let projected_datums =
2139 datums.iter().enumerate().filter_map(|(idx, datum)| {
2140 read_desc
2141 .contains_index(&ColumnIndex::from_raw(idx))
2142 .then_some(datum)
2143 });
2144 let og_projected_row = Row::pack(projected_datums);
2145 assert_eq!(&og_projected_row, rnd_row);
2146 }
2147
2148 {
2150 let proj_datums = datum_vec.borrow_with(rnd_row);
2151 for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
2152 let spec = stats.col_stats(idx, &arena);
2153 assert!(spec.may_contain(proj_datums[pos]));
2154 }
2155 }
2156 }
2157 (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
2158 (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
2159 }
2160 }
2161
2162 let encoded_schema = SourceData::encode_schema(desc);
2165 let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
2166 assert_eq!(desc, &roundtrip_desc);
2167
2168 let migration =
2171 mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
2172 let migration = migration.expect("should be backward compatible with self");
2173 let migrated = migration.migrate(Arc::new(col.clone()));
2175 assert_eq!(col.data_type(), migrated.data_type());
2176 }
2177
2178 #[mz_ore::test]
2179 #[cfg_attr(miri, ignore)] fn all_source_data_roundtrips() {
2181 let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
2182 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
2183 weights.extend([
2184 (10, Just(32..128)),
2185 (5, Just(128..512)),
2186 (3, Just(512..2048)),
2187 (1, Just(2048..8192)),
2188 ]);
2189 }
2190 let num_rows = Union::new_weighted(weights);
2191
2192 let strat = (any::<RelationDesc>(), num_rows)
2194 .prop_flat_map(|(desc, num_rows)| {
2195 arb_relation_desc_projection(desc.clone())
2196 .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
2197 })
2198 .prop_flat_map(|(desc, read_desc, num_rows)| {
2199 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2200 .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
2201 });
2202
2203 proptest!(|((config, (desc, source_datas, read_desc)) in (any::<EncodingConfig>(), strat))| {
2204 roundtrip_source_data(&desc, source_datas, &read_desc, &config);
2205 });
2206 }
2207
2208 #[mz_ore::test]
2209 fn roundtrip_error_nulls() {
2210 let desc = RelationDescBuilder::default()
2211 .with_column(
2212 "ts",
2213 SqlScalarType::TimestampTz { precision: None }.nullable(false),
2214 )
2215 .finish();
2216 let source_datas = vec![SourceData(Err(DataflowError::EvalError(
2217 EvalError::DateOutOfRange.into(),
2218 )))];
2219 let config = EncodingConfig::default();
2220 roundtrip_source_data(&desc, source_datas, &desc, &config);
2221 }
2222
2223 fn is_sorted(array: &dyn Array) -> bool {
2224 let sort_options = arrow::compute::SortOptions::default();
2225 let Ok(cmp) = make_comparator(array, array, sort_options) else {
2226 return false;
2232 };
2233 (0..array.len())
2234 .tuple_windows()
2235 .all(|(i, j)| cmp(i, j).is_le())
2236 }
2237
2238 fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
2239 use mz_persist_types::columnar::ColumnEncoder;
2240 let array = Schema::encoder(schema).expect("valid schema").finish();
2241 Array::data_type(&array).clone()
2242 }
2243
2244 #[track_caller]
2245 fn backward_compatible_testcase(
2246 old: &RelationDesc,
2247 new: &RelationDesc,
2248 migration: Migration,
2249 datas: &[SourceData],
2250 ) {
2251 let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
2252 for data in datas {
2253 encoder.append(data);
2254 }
2255 let old = encoder.finish();
2256 let new = Schema::<SourceData>::encoder(new)
2257 .expect("valid schema")
2258 .finish();
2259 let old: Arc<dyn Array> = Arc::new(old);
2260 let new: Arc<dyn Array> = Arc::new(new);
2261 let migrated = migration.migrate(Arc::clone(&old));
2262 assert_eq!(migrated.data_type(), new.data_type());
2263
2264 if migration.preserves_order() && is_sorted(&old) {
2266 assert!(is_sorted(&new))
2267 }
2268 }
2269
2270 #[mz_ore::test]
2271 fn backward_compatible_empty_add_column() {
2272 let old = RelationDesc::empty();
2273 let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
2274
2275 let old_data_type = get_data_type(&old);
2276 let new_data_type = get_data_type(&new);
2277
2278 let migration = backward_compatible(&old_data_type, &new_data_type);
2279 assert!(migration.is_some());
2280 }
2281
2282 #[mz_ore::test]
2283 fn backward_compatible_project_away_all() {
2284 let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
2285 let new = RelationDesc::empty();
2286
2287 let old_data_type = get_data_type(&old);
2288 let new_data_type = get_data_type(&new);
2289
2290 let migration = backward_compatible(&old_data_type, &new_data_type);
2291 assert!(migration.is_some());
2292 }
2293
2294 #[mz_ore::test]
2295 #[cfg_attr(miri, ignore)]
2296 fn backward_compatible_migrate() {
2297 let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
2298 proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
2299 .prop_map(move |datas| (old.clone(), new.clone(), datas))
2300 });
2301
2302 proptest!(|((old, new, datas) in strat)| {
2303 let old_data_type = get_data_type(&old);
2304 let new_data_type = get_data_type(&new);
2305
2306 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2307 backward_compatible_testcase(&old, &new, migration, &datas);
2308 };
2309 });
2310 }
2311
2312 #[mz_ore::test]
2313 #[cfg_attr(miri, ignore)]
2314 fn backward_compatible_migrate_from_common() {
2315 use mz_repr::SqlColumnType;
2316 fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2317 let should_be_compatible = diffs.iter().all(|diff| match diff {
2319 PropRelationDescDiff::AddColumn {
2321 typ: SqlColumnType { nullable, .. },
2322 ..
2323 } => *nullable,
2324 PropRelationDescDiff::DropColumn { .. } => true,
2325 _ => false,
2326 });
2327
2328 let mut new = old.clone();
2329 for diff in diffs.into_iter() {
2330 diff.apply(&mut new)
2331 }
2332
2333 let old_data_type = get_data_type(&old);
2334 let new_data_type = get_data_type(&new);
2335
2336 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2337 backward_compatible_testcase(&old, &new, migration, &datas);
2338 } else if should_be_compatible {
2339 panic!("new DataType was not compatible when it should have been!");
2340 }
2341 }
2342
2343 let strat = any::<RelationDesc>()
2344 .prop_flat_map(|desc| {
2345 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2346 .no_shrink()
2347 .prop_map(move |datas| (desc.clone(), datas))
2348 })
2349 .prop_flat_map(|(desc, datas)| {
2350 arb_relation_desc_diff(&desc)
2351 .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2352 });
2353
2354 proptest!(|((old, diffs, datas) in strat)| {
2355 test_case(old, diffs, datas);
2356 });
2357 }
2358
2359 #[mz_ore::test]
2360 #[cfg_attr(miri, ignore)] fn empty_relation_desc_roundtrips() {
2362 let empty = RelationDesc::empty();
2363 let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2364 .prop_map(move |datas| (empty.clone(), datas));
2365
2366 proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2369 roundtrip_source_data(&desc, source_datas, &desc, &config);
2370 });
2371 }
2372
2373 #[mz_ore::test]
2374 #[cfg_attr(miri, ignore)] fn arrow_datatype_consistent() {
2376 fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2377 let half = datas.len() / 2;
2378
2379 let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2380 for data in &datas[..half] {
2381 encoder_a.append(data);
2382 }
2383 let col_a = encoder_a.finish();
2384
2385 let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2386 for data in &datas[half..] {
2387 encoder_b.append(data);
2388 }
2389 let col_b = encoder_b.finish();
2390
2391 assert_eq!(col_a.data_type(), col_b.data_type());
2394 }
2395
2396 let num_rows = 12;
2397 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2398 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2399 .prop_map(move |datas| (desc.clone(), datas))
2400 });
2401
2402 proptest!(|((desc, data) in strat)| {
2403 test_case(desc, data);
2404 });
2405 }
2406
2407 #[mz_ore::test]
2408 #[cfg_attr(miri, ignore)] fn source_proto_serialization_stability() {
2410 let min_protos = 10;
2411 let encoded = include_str!("snapshots/source-datas.txt");
2412
2413 let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2415 .lines()
2416 .map(|s| {
2417 let (desc, data) = s.split_once(',').expect("comma separated data");
2418 let desc = base64::engine::general_purpose::STANDARD
2419 .decode(desc)
2420 .expect("valid base64");
2421 let data = base64::engine::general_purpose::STANDARD
2422 .decode(data)
2423 .expect("valid base64");
2424 (desc, data)
2425 })
2426 .map(|(desc, data)| {
2427 let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2428 let desc = desc.into_rust().expect("valid proto");
2429 let data = SourceData::decode(&data, &desc).expect("valid proto");
2430 (desc, data)
2431 })
2432 .collect();
2433
2434 let mut runner = proptest::test_runner::TestRunner::deterministic();
2436 let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2437 arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2438 });
2439 while decoded.len() < min_protos {
2440 let arbitrary_data = strategy
2441 .new_tree(&mut runner)
2442 .expect("source data")
2443 .current();
2444 decoded.push(arbitrary_data);
2445 }
2446
2447 let mut reencoded = String::new();
2449 let mut buf = vec![];
2450 for (desc, data) in decoded {
2451 buf.clear();
2452 desc.into_proto().encode(&mut buf).expect("success");
2453 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2454 reencoded.push(',');
2455
2456 buf.clear();
2457 data.encode(&mut buf);
2458 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2459 reencoded.push('\n');
2460 }
2461
2462 assert_eq!(
2473 encoded,
2474 reencoded.as_str(),
2475 "SourceData serde should be stable"
2476 )
2477 }
2478}