mz_storage_types/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to the introduction of changing collections into `dataflow`.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34    StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38    CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39    RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use proptest_derive::Arbitrary;
45use prost::Message;
46use serde::{Deserialize, Serialize};
47use timely::order::{PartialOrder, TotalOrder};
48use timely::progress::timestamp::Refines;
49use timely::progress::{PathSummary, Timestamp};
50
51use crate::AlterCompatible;
52use crate::connections::inline::{
53    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
54    ReferencedConnection,
55};
56use crate::controller::{AlterError, CollectionMetadata};
57use crate::errors::{DataflowError, ProtoDataflowError};
58use crate::instances::StorageInstanceId;
59use crate::sources::proto_ingestion_description::{ProtoSourceExport, ProtoSourceImport};
60use crate::sources::sql_server::SqlServerSourceExportDetails;
61
62pub mod encoding;
63pub mod envelope;
64pub mod kafka;
65pub mod load_generator;
66pub mod mysql;
67pub mod postgres;
68pub mod sql_server;
69
70pub use crate::sources::envelope::SourceEnvelope;
71pub use crate::sources::kafka::KafkaSourceConnection;
72pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
73pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
74pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
75pub use crate::sources::sql_server::{SqlServerSource, SqlServerSourceExtras};
76
77include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
78
79/// A description of a source ingestion
80#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
81pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
82    /// The source description.
83    pub desc: SourceDesc<C>,
84    /// Additional storage controller metadata needed to ingest this source
85    pub ingestion_metadata: S,
86    /// Collections to be exported by this ingestion.
87    ///
88    /// # Notes
89    /// - For multi-output sources:
90    ///     - Add exports by adding a new [`SourceExport`].
91    ///     - Remove exports by removing the [`SourceExport`].
92    ///
93    ///   Re-rendering/executing the source after making these modifications
94    ///   adds and drops the subsource, respectively.
95    /// - This field includes the primary source's ID, which might need to be
96    ///   filtered out to understand which exports are explicit ingestion exports.
97    /// - This field does _not_ include the remap collection, which is tracked
98    ///   in its own field.
99    #[proptest(
100        strategy = "proptest::collection::btree_map(any::<GlobalId>(), any::<SourceExport<S>>(), 0..4)"
101    )]
102    pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
103    /// The ID of the instance in which to install the source.
104    pub instance_id: StorageInstanceId,
105    /// The ID of this ingestion's remap/progress collection.
106    pub remap_collection_id: GlobalId,
107}
108
109impl IngestionDescription {
110    pub fn new(
111        desc: SourceDesc,
112        instance_id: StorageInstanceId,
113        remap_collection_id: GlobalId,
114    ) -> Self {
115        Self {
116            desc,
117            ingestion_metadata: (),
118            source_exports: BTreeMap::new(),
119            instance_id,
120            remap_collection_id,
121        }
122    }
123}
124
125impl<S> IngestionDescription<S> {
126    /// Return an iterator over the `GlobalId`s of `self`'s collections.
127    /// This will contain ids for the remap collection, subsources,
128    /// tables for this source, and the primary collection ID, even if
129    /// no data will be exported to the primary collection.
130    pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
131        // Expand self so that any new fields added generate a compiler error to
132        // increase the likelihood of developers seeing this function.
133        let IngestionDescription {
134            desc: _,
135            ingestion_metadata: _,
136            source_exports,
137            instance_id: _,
138            remap_collection_id,
139        } = &self;
140
141        source_exports
142            .keys()
143            .copied()
144            .chain(std::iter::once(*remap_collection_id))
145    }
146}
147
148impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
149    fn alter_compatible(
150        &self,
151        id: GlobalId,
152        other: &IngestionDescription<S>,
153    ) -> Result<(), AlterError> {
154        if self == other {
155            return Ok(());
156        }
157        let IngestionDescription {
158            desc,
159            ingestion_metadata,
160            source_exports,
161            instance_id,
162            remap_collection_id,
163        } = self;
164
165        let compatibility_checks = [
166            (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
167            (
168                ingestion_metadata == &other.ingestion_metadata,
169                "ingestion_metadata",
170            ),
171            (
172                source_exports
173                    .iter()
174                    .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
175                        l_key.cmp(r_key)
176                    })
177                    .all(|r| match r {
178                        Both(
179                            (
180                                _,
181                                SourceExport {
182                                    storage_metadata: l_metadata,
183                                    details: l_details,
184                                    data_config: l_data_config,
185                                },
186                            ),
187                            (
188                                _,
189                                SourceExport {
190                                    storage_metadata: r_metadata,
191                                    details: r_details,
192                                    data_config: r_data_config,
193                                },
194                            ),
195                        ) => {
196                            l_metadata.alter_compatible(id, r_metadata).is_ok()
197                                && l_details.alter_compatible(id, r_details).is_ok()
198                                && l_data_config.alter_compatible(id, r_data_config).is_ok()
199                        }
200                        _ => true,
201                    }),
202                "source_exports",
203            ),
204            (instance_id == &other.instance_id, "instance_id"),
205            (
206                remap_collection_id == &other.remap_collection_id,
207                "remap_collection_id",
208            ),
209        ];
210        for (compatible, field) in compatibility_checks {
211            if !compatible {
212                tracing::warn!(
213                    "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
214                    self,
215                    other
216                );
217
218                return Err(AlterError { id });
219            }
220        }
221
222        Ok(())
223    }
224}
225
226impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
227    for IngestionDescription<(), ReferencedConnection>
228{
229    fn into_inline_connection(self, r: R) -> IngestionDescription {
230        let IngestionDescription {
231            desc,
232            ingestion_metadata,
233            source_exports,
234            instance_id,
235            remap_collection_id,
236        } = self;
237
238        IngestionDescription {
239            desc: desc.into_inline_connection(r),
240            ingestion_metadata,
241            source_exports,
242            instance_id,
243            remap_collection_id,
244        }
245    }
246}
247
248#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
249pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
250    /// The collection metadata needed to write the exported data
251    pub storage_metadata: S,
252    /// Details necessary for the source to export data to this export's collection.
253    pub details: SourceExportDetails,
254    /// Config necessary to handle (e.g. decode and envelope) the data for this export.
255    pub data_config: SourceExportDataConfig<C>,
256}
257
258impl RustType<ProtoIngestionDescription> for IngestionDescription<CollectionMetadata> {
259    fn into_proto(&self) -> ProtoIngestionDescription {
260        ProtoIngestionDescription {
261            source_exports: self.source_exports.into_proto(),
262            ingestion_metadata: Some(self.ingestion_metadata.into_proto()),
263            desc: Some(self.desc.into_proto()),
264            instance_id: Some(self.instance_id.into_proto()),
265            remap_collection_id: Some(self.remap_collection_id.into_proto()),
266        }
267    }
268
269    fn from_proto(proto: ProtoIngestionDescription) -> Result<Self, TryFromProtoError> {
270        Ok(IngestionDescription {
271            source_exports: proto.source_exports.into_rust()?,
272            desc: proto
273                .desc
274                .into_rust_if_some("ProtoIngestionDescription::desc")?,
275            ingestion_metadata: proto
276                .ingestion_metadata
277                .into_rust_if_some("ProtoIngestionDescription::ingestion_metadata")?,
278            instance_id: proto
279                .instance_id
280                .into_rust_if_some("ProtoIngestionDescription::instance_id")?,
281            remap_collection_id: proto
282                .remap_collection_id
283                .into_rust_if_some("ProtoIngestionDescription::remap_collection_id")?,
284        })
285    }
286}
287
288impl ProtoMapEntry<GlobalId, CollectionMetadata> for ProtoSourceImport {
289    fn from_rust<'a>(entry: (&'a GlobalId, &'a CollectionMetadata)) -> Self {
290        ProtoSourceImport {
291            id: Some(entry.0.into_proto()),
292            storage_metadata: Some(entry.1.into_proto()),
293        }
294    }
295
296    fn into_rust(self) -> Result<(GlobalId, CollectionMetadata), TryFromProtoError> {
297        Ok((
298            self.id.into_rust_if_some("ProtoSourceImport::id")?,
299            self.storage_metadata
300                .into_rust_if_some("ProtoSourceImport::storage_metadata")?,
301        ))
302    }
303}
304
305impl ProtoMapEntry<GlobalId, SourceExport<CollectionMetadata>> for ProtoSourceExport {
306    fn from_rust<'a>(
307        (id, source_export): (&'a GlobalId, &'a SourceExport<CollectionMetadata>),
308    ) -> Self {
309        ProtoSourceExport {
310            id: Some(id.into_proto()),
311            storage_metadata: Some(source_export.storage_metadata.into_proto()),
312            details: Some(source_export.details.into_proto()),
313            data_config: Some(source_export.data_config.into_proto()),
314        }
315    }
316
317    fn into_rust(self) -> Result<(GlobalId, SourceExport<CollectionMetadata>), TryFromProtoError> {
318        Ok((
319            self.id.into_rust_if_some("ProtoSourceExport::id")?,
320            SourceExport {
321                storage_metadata: self
322                    .storage_metadata
323                    .into_rust_if_some("ProtoSourceExport::storage_metadata")?,
324                details: self
325                    .details
326                    .into_rust_if_some("ProtoSourceExport::details")?,
327                data_config: self
328                    .data_config
329                    .into_rust_if_some("ProtoSourceExport::data_config")?,
330            },
331        ))
332    }
333}
334
335pub trait SourceTimestamp:
336    Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
337{
338    fn encode_row(&self) -> Row;
339    fn decode_row(row: &Row) -> Self;
340}
341
342impl SourceTimestamp for MzOffset {
343    fn encode_row(&self) -> Row {
344        Row::pack([Datum::UInt64(self.offset)])
345    }
346
347    fn decode_row(row: &Row) -> Self {
348        let mut datums = row.iter();
349        match (datums.next(), datums.next()) {
350            (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
351            _ => panic!("invalid row {row:?}"),
352        }
353    }
354}
355
356/// Universal language for describing message positions in Materialize, in a source independent
357/// way. Individual sources like Kafka or File sources should explicitly implement their own offset
358/// type that converts to/From MzOffsets. A 0-MzOffset denotes an empty stream.
359#[derive(
360    Copy,
361    Clone,
362    Default,
363    Debug,
364    PartialEq,
365    PartialOrd,
366    Eq,
367    Ord,
368    Hash,
369    Serialize,
370    Deserialize,
371    Arbitrary,
372)]
373pub struct MzOffset {
374    pub offset: u64,
375}
376
377impl differential_dataflow::difference::Semigroup for MzOffset {
378    fn plus_equals(&mut self, rhs: &Self) {
379        self.offset.plus_equals(&rhs.offset)
380    }
381}
382
383impl differential_dataflow::difference::IsZero for MzOffset {
384    fn is_zero(&self) -> bool {
385        self.offset.is_zero()
386    }
387}
388
389impl mz_persist_types::Codec64 for MzOffset {
390    fn codec_name() -> String {
391        "MzOffset".to_string()
392    }
393
394    fn encode(&self) -> [u8; 8] {
395        mz_persist_types::Codec64::encode(&self.offset)
396    }
397
398    fn decode(buf: [u8; 8]) -> Self {
399        Self {
400            offset: mz_persist_types::Codec64::decode(buf),
401        }
402    }
403}
404
405impl columnation::Columnation for MzOffset {
406    type InnerRegion = columnation::CopyRegion<MzOffset>;
407}
408
409impl RustType<ProtoMzOffset> for MzOffset {
410    fn into_proto(&self) -> ProtoMzOffset {
411        ProtoMzOffset {
412            offset: self.offset,
413        }
414    }
415
416    fn from_proto(proto: ProtoMzOffset) -> Result<Self, TryFromProtoError> {
417        Ok(Self {
418            offset: proto.offset,
419        })
420    }
421}
422
423impl MzOffset {
424    pub fn checked_sub(self, other: Self) -> Option<Self> {
425        self.offset
426            .checked_sub(other.offset)
427            .map(|offset| Self { offset })
428    }
429}
430
431/// Convert from MzOffset to Kafka::Offset as long as
432/// the offset is not negative
433impl From<u64> for MzOffset {
434    fn from(offset: u64) -> Self {
435        Self { offset }
436    }
437}
438
439impl std::fmt::Display for MzOffset {
440    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441        write!(f, "{}", self.offset)
442    }
443}
444
445// Assume overflow does not occur for addition
446impl Add<u64> for MzOffset {
447    type Output = MzOffset;
448
449    fn add(self, x: u64) -> MzOffset {
450        MzOffset {
451            offset: self.offset + x,
452        }
453    }
454}
455impl Add<Self> for MzOffset {
456    type Output = Self;
457
458    fn add(self, x: Self) -> Self {
459        MzOffset {
460            offset: self.offset + x.offset,
461        }
462    }
463}
464impl AddAssign<u64> for MzOffset {
465    fn add_assign(&mut self, x: u64) {
466        self.offset += x;
467    }
468}
469impl AddAssign<Self> for MzOffset {
470    fn add_assign(&mut self, x: Self) {
471        self.offset += x.offset;
472    }
473}
474
475/// Convert from `PgLsn` to MzOffset
476impl From<tokio_postgres::types::PgLsn> for MzOffset {
477    fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
478        MzOffset { offset: lsn.into() }
479    }
480}
481
482impl Timestamp for MzOffset {
483    type Summary = MzOffset;
484
485    fn minimum() -> Self {
486        MzOffset {
487            offset: Timestamp::minimum(),
488        }
489    }
490}
491
492impl PathSummary<MzOffset> for MzOffset {
493    fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
494        Some(MzOffset {
495            offset: self.offset.results_in(&src.offset)?,
496        })
497    }
498
499    fn followed_by(&self, other: &Self) -> Option<Self> {
500        Some(MzOffset {
501            offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
502        })
503    }
504}
505
506impl Refines<()> for MzOffset {
507    fn to_inner(_: ()) -> Self {
508        MzOffset::minimum()
509    }
510    fn to_outer(self) {}
511    fn summarize(_: Self::Summary) {}
512}
513
514impl PartialOrder for MzOffset {
515    #[inline]
516    fn less_equal(&self, other: &Self) -> bool {
517        self.offset.less_equal(&other.offset)
518    }
519}
520
521impl TotalOrder for MzOffset {}
522
523/// The meaning of the timestamp number produced by data sources. This type
524/// is not concerned with the source of the timestamp (like if the data came
525/// from a Debezium consistency topic or a CDCv2 stream), instead only what the
526/// timestamp number means.
527///
528/// Some variants here have attached data used to differentiate incomparable
529/// instantiations. These attached data types should be expanded in the future
530/// if we need to tell apart more kinds of sources.
531#[derive(Arbitrary, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Hash)]
532pub enum Timeline {
533    /// EpochMilliseconds means the timestamp is the number of milliseconds since
534    /// the Unix epoch.
535    EpochMilliseconds,
536    /// External means the timestamp comes from an external data source and we
537    /// don't know what the number means. The attached String is the source's name,
538    /// which will result in different sources being incomparable.
539    External(String),
540    /// User means the user has manually specified a timeline. The attached
541    /// String is specified by the user, allowing them to decide sources that are
542    /// joinable.
543    User(String),
544}
545
546impl Timeline {
547    const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
548    const EXTERNAL_ID_CHAR: char = 'E';
549    const USER_ID_CHAR: char = 'U';
550
551    fn id_char(&self) -> char {
552        match self {
553            Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
554            Self::External(_) => Self::EXTERNAL_ID_CHAR,
555            Self::User(_) => Self::USER_ID_CHAR,
556        }
557    }
558}
559
560impl RustType<ProtoTimeline> for Timeline {
561    fn into_proto(&self) -> ProtoTimeline {
562        use proto_timeline::Kind;
563        ProtoTimeline {
564            kind: Some(match self {
565                Timeline::EpochMilliseconds => Kind::EpochMilliseconds(()),
566                Timeline::External(s) => Kind::External(s.clone()),
567                Timeline::User(s) => Kind::User(s.clone()),
568            }),
569        }
570    }
571
572    fn from_proto(proto: ProtoTimeline) -> Result<Self, TryFromProtoError> {
573        use proto_timeline::Kind;
574        let kind = proto
575            .kind
576            .ok_or_else(|| TryFromProtoError::missing_field("ProtoTimeline::kind"))?;
577        Ok(match kind {
578            Kind::EpochMilliseconds(()) => Timeline::EpochMilliseconds,
579            Kind::External(s) => Timeline::External(s),
580            Kind::User(s) => Timeline::User(s),
581        })
582    }
583}
584
585impl ToString for Timeline {
586    fn to_string(&self) -> String {
587        match self {
588            Self::EpochMilliseconds => format!("{}", self.id_char()),
589            Self::External(id) => format!("{}.{id}", self.id_char()),
590            Self::User(id) => format!("{}.{id}", self.id_char()),
591        }
592    }
593}
594
595impl FromStr for Timeline {
596    type Err = String;
597
598    fn from_str(s: &str) -> Result<Self, Self::Err> {
599        if s.is_empty() {
600            return Err("empty timeline".to_string());
601        }
602        let mut chars = s.chars();
603        match chars.next().expect("non-empty string") {
604            Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
605                None => Ok(Self::EpochMilliseconds),
606                Some(_) => Err(format!("unknown timeline: {s}")),
607            },
608            Self::EXTERNAL_ID_CHAR => match chars.next() {
609                Some('.') => Ok(Self::External(chars.as_str().to_string())),
610                _ => Err(format!("unknown timeline: {s}")),
611            },
612            Self::USER_ID_CHAR => match chars.next() {
613                Some('.') => Ok(Self::User(chars.as_str().to_string())),
614                _ => Err(format!("unknown timeline: {s}")),
615            },
616            _ => Err(format!("unknown timeline: {s}")),
617        }
618    }
619}
620
621/// A connection to an external system
622pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
623    /// The name of the external system (e.g kafka, postgres, etc).
624    fn name(&self) -> &'static str;
625
626    /// The name of the resource in the external system (e.g kafka topic) if any
627    fn external_reference(&self) -> Option<&str>;
628
629    /// Defines the key schema to use by default for this source connection type.
630    /// This will be used for the primary export of the source and as the default
631    /// pre-encoding key schema for the source.
632    fn default_key_desc(&self) -> RelationDesc;
633
634    /// Defines the value schema to use by default for this source connection type.
635    /// This will be used for the primary export of the source and as the default
636    /// pre-encoding value schema for the source.
637    fn default_value_desc(&self) -> RelationDesc;
638
639    /// The schema of this connection's timestamp type. This will also be the schema of the
640    /// progress relation.
641    fn timestamp_desc(&self) -> RelationDesc;
642
643    /// The id of the connection object (i.e the one obtained from running `CREATE CONNECTION`) in
644    /// the catalog, if any.
645    fn connection_id(&self) -> Option<CatalogItemId>;
646
647    /// If this source connection can output to a primary collection, contains the source-specific
648    /// details of that export, else is set to `SourceExportDetails::None` to indicate that
649    /// this source should not export to the primary collection.
650    fn primary_export_details(&self) -> SourceExportDetails;
651
652    /// Whether the source type supports read only mode.
653    fn supports_read_only(&self) -> bool;
654
655    /// Whether the source type prefers to run on only one replica of a multi-replica cluster.
656    fn prefers_single_replica(&self) -> bool;
657}
658
659#[derive(Arbitrary, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
660pub enum Compression {
661    Gzip,
662    None,
663}
664
665impl RustType<ProtoCompression> for Compression {
666    fn into_proto(&self) -> ProtoCompression {
667        use proto_compression::Kind;
668        ProtoCompression {
669            kind: Some(match self {
670                Compression::Gzip => Kind::Gzip(()),
671                Compression::None => Kind::None(()),
672            }),
673        }
674    }
675
676    fn from_proto(proto: ProtoCompression) -> Result<Self, TryFromProtoError> {
677        use proto_compression::Kind;
678        Ok(match proto.kind {
679            Some(Kind::Gzip(())) => Compression::Gzip,
680            Some(Kind::None(())) => Compression::None,
681            None => {
682                return Err(TryFromProtoError::MissingField(
683                    "ProtoCompression::kind".into(),
684                ));
685            }
686        })
687    }
688}
689
690/// Defines the configuration for how to handle data that is exported for a given
691/// Source Export.
692#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
693pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
694    pub encoding: Option<encoding::SourceDataEncoding<C>>,
695    pub envelope: SourceEnvelope,
696}
697
698impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
699    for SourceExportDataConfig<ReferencedConnection>
700{
701    fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
702        let SourceExportDataConfig { encoding, envelope } = self;
703
704        SourceExportDataConfig {
705            encoding: encoding.map(|e| e.into_inline_connection(r)),
706            envelope,
707        }
708    }
709}
710
711impl RustType<ProtoSourceExportDataConfig> for SourceExportDataConfig {
712    fn into_proto(&self) -> ProtoSourceExportDataConfig {
713        ProtoSourceExportDataConfig {
714            encoding: self.encoding.into_proto(),
715            envelope: Some(self.envelope.into_proto()),
716        }
717    }
718
719    fn from_proto(proto: ProtoSourceExportDataConfig) -> Result<Self, TryFromProtoError> {
720        Ok(SourceExportDataConfig {
721            encoding: proto.encoding.into_rust()?,
722            envelope: proto
723                .envelope
724                .into_rust_if_some("ProtoSourceExportDataConfig::envelope")?,
725        })
726    }
727}
728
729impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
730    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
731        if self == other {
732            return Ok(());
733        }
734        let Self { encoding, envelope } = &self;
735
736        let compatibility_checks = [
737            (
738                match (encoding, &other.encoding) {
739                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
740                    (s, o) => s == o,
741                },
742                "encoding",
743            ),
744            (envelope == &other.envelope, "envelope"),
745        ];
746
747        for (compatible, field) in compatibility_checks {
748            if !compatible {
749                tracing::warn!(
750                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
751                    self,
752                    other
753                );
754
755                return Err(AlterError { id });
756            }
757        }
758        Ok(())
759    }
760}
761
762impl<C: ConnectionAccess> SourceExportDataConfig<C> {
763    /// Returns `true` if this connection yields data that is
764    /// append-only/monotonic. Append-monly means the source
765    /// never produces retractions.
766    // TODO(guswynn): consider enforcing this more completely at the
767    // parsing/typechecking level, by not using an `envelope`
768    // for sources like pg
769    pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
770        match &self.envelope {
771            // Upsert and CdcV2 may produce retractions.
772            SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
773            SourceEnvelope::None(_) => {
774                match connection {
775                    // Postgres can produce retractions (deletes).
776                    GenericSourceConnection::Postgres(_) => false,
777                    // MySQL can produce retractions (deletes).
778                    GenericSourceConnection::MySql(_) => false,
779                    // SQL Server can produce retractions (deletes).
780                    GenericSourceConnection::SqlServer(_) => false,
781                    // Whether or not a Loadgen source can produce retractions varies.
782                    GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
783                    // Kafka exports with `None` envelope are append-only.
784                    GenericSourceConnection::Kafka(_) => true,
785                }
786            }
787        }
788    }
789}
790
791/// An external source of updates for a relational collection.
792#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
793pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
794    pub connection: GenericSourceConnection<C>,
795    pub timestamp_interval: Duration,
796    /// The data encoding and format of data to export to the
797    /// primary collection for this source.
798    /// TODO(database-issues#8620): This will be removed once sources no longer export
799    /// to primary collections and only export to explicit SourceExports (tables).
800    pub primary_export: SourceExportDataConfig<C>,
801    pub primary_export_details: SourceExportDetails,
802}
803
804impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
805    for SourceDesc<ReferencedConnection>
806{
807    fn into_inline_connection(self, r: R) -> SourceDesc {
808        let SourceDesc {
809            connection,
810            primary_export,
811            primary_export_details,
812            timestamp_interval,
813        } = self;
814
815        SourceDesc {
816            connection: connection.into_inline_connection(&r),
817            primary_export: primary_export.into_inline_connection(r),
818            primary_export_details,
819            timestamp_interval,
820        }
821    }
822}
823
824impl RustType<ProtoSourceDesc> for SourceDesc {
825    fn into_proto(&self) -> ProtoSourceDesc {
826        ProtoSourceDesc {
827            connection: Some(self.connection.into_proto()),
828            primary_export: Some(self.primary_export.into_proto()),
829            primary_export_details: Some(self.primary_export_details.into_proto()),
830            timestamp_interval: Some(self.timestamp_interval.into_proto()),
831        }
832    }
833
834    fn from_proto(proto: ProtoSourceDesc) -> Result<Self, TryFromProtoError> {
835        Ok(SourceDesc {
836            connection: proto
837                .connection
838                .into_rust_if_some("ProtoSourceDesc::connection")?,
839            primary_export: proto
840                .primary_export
841                .into_rust_if_some("ProtoSourceDesc::primary_export")?,
842            primary_export_details: proto
843                .primary_export_details
844                .into_rust_if_some("ProtoSourceDesc::primary_export_details")?,
845            timestamp_interval: proto
846                .timestamp_interval
847                .into_rust_if_some("ProtoSourceDesc::timestamp_interval")?,
848        })
849    }
850}
851
852impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
853    /// Determines if `self` is compatible with another `SourceDesc`, in such a
854    /// way that it is possible to turn `self` into `other` through a valid
855    /// series of transformations (e.g. no transformation or `ALTER SOURCE`).
856    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
857        if self == other {
858            return Ok(());
859        }
860        let Self {
861            connection,
862            primary_export,
863            primary_export_details,
864            timestamp_interval,
865        } = &self;
866
867        let compatibility_checks = [
868            (
869                connection.alter_compatible(id, &other.connection).is_ok(),
870                "connection",
871            ),
872            (primary_export == &other.primary_export, "primary_export"),
873            (
874                primary_export_details == &other.primary_export_details,
875                "primary_export_details",
876            ),
877            (
878                timestamp_interval == &other.timestamp_interval,
879                "timestamp_interval",
880            ),
881        ];
882
883        for (compatible, field) in compatibility_checks {
884            if !compatible {
885                tracing::warn!(
886                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
887                    self,
888                    other
889                );
890
891                return Err(AlterError { id });
892            }
893        }
894
895        Ok(())
896    }
897}
898
899impl SourceDesc<InlinedConnection> {
900    /// Returns the SourceExport details for the primary export.
901    /// TODO(database-issues#8620): This will be removed once sources no longer export
902    /// to primary collections and only export to explicit SourceExports (tables).
903    pub fn primary_source_export(&self) -> SourceExport<(), InlinedConnection> {
904        SourceExport {
905            storage_metadata: (),
906            details: self.primary_export_details.clone(),
907            data_config: self.primary_export.clone(),
908        }
909    }
910}
911
912#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
913pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
914    Kafka(KafkaSourceConnection<C>),
915    Postgres(PostgresSourceConnection<C>),
916    MySql(MySqlSourceConnection<C>),
917    SqlServer(SqlServerSource<C>),
918    LoadGenerator(LoadGeneratorSourceConnection),
919}
920
921impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
922    fn from(conn: KafkaSourceConnection<C>) -> Self {
923        Self::Kafka(conn)
924    }
925}
926
927impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
928    fn from(conn: PostgresSourceConnection<C>) -> Self {
929        Self::Postgres(conn)
930    }
931}
932
933impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
934    fn from(conn: MySqlSourceConnection<C>) -> Self {
935        Self::MySql(conn)
936    }
937}
938
939impl<C: ConnectionAccess> From<SqlServerSource<C>> for GenericSourceConnection<C> {
940    fn from(conn: SqlServerSource<C>) -> Self {
941        Self::SqlServer(conn)
942    }
943}
944
945impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
946    fn from(conn: LoadGeneratorSourceConnection) -> Self {
947        Self::LoadGenerator(conn)
948    }
949}
950
951impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
952    for GenericSourceConnection<ReferencedConnection>
953{
954    fn into_inline_connection(self, r: R) -> GenericSourceConnection {
955        match self {
956            GenericSourceConnection::Kafka(kafka) => {
957                GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
958            }
959            GenericSourceConnection::Postgres(pg) => {
960                GenericSourceConnection::Postgres(pg.into_inline_connection(r))
961            }
962            GenericSourceConnection::MySql(mysql) => {
963                GenericSourceConnection::MySql(mysql.into_inline_connection(r))
964            }
965            GenericSourceConnection::SqlServer(sql_server) => {
966                GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
967            }
968            GenericSourceConnection::LoadGenerator(lg) => {
969                GenericSourceConnection::LoadGenerator(lg)
970            }
971        }
972    }
973}
974
975impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
976    fn name(&self) -> &'static str {
977        match self {
978            Self::Kafka(conn) => conn.name(),
979            Self::Postgres(conn) => conn.name(),
980            Self::MySql(conn) => conn.name(),
981            Self::SqlServer(conn) => conn.name(),
982            Self::LoadGenerator(conn) => conn.name(),
983        }
984    }
985
986    fn external_reference(&self) -> Option<&str> {
987        match self {
988            Self::Kafka(conn) => conn.external_reference(),
989            Self::Postgres(conn) => conn.external_reference(),
990            Self::MySql(conn) => conn.external_reference(),
991            Self::SqlServer(conn) => conn.external_reference(),
992            Self::LoadGenerator(conn) => conn.external_reference(),
993        }
994    }
995
996    fn default_key_desc(&self) -> RelationDesc {
997        match self {
998            Self::Kafka(conn) => conn.default_key_desc(),
999            Self::Postgres(conn) => conn.default_key_desc(),
1000            Self::MySql(conn) => conn.default_key_desc(),
1001            Self::SqlServer(conn) => conn.default_key_desc(),
1002            Self::LoadGenerator(conn) => conn.default_key_desc(),
1003        }
1004    }
1005
1006    fn default_value_desc(&self) -> RelationDesc {
1007        match self {
1008            Self::Kafka(conn) => conn.default_value_desc(),
1009            Self::Postgres(conn) => conn.default_value_desc(),
1010            Self::MySql(conn) => conn.default_value_desc(),
1011            Self::SqlServer(conn) => conn.default_value_desc(),
1012            Self::LoadGenerator(conn) => conn.default_value_desc(),
1013        }
1014    }
1015
1016    fn timestamp_desc(&self) -> RelationDesc {
1017        match self {
1018            Self::Kafka(conn) => conn.timestamp_desc(),
1019            Self::Postgres(conn) => conn.timestamp_desc(),
1020            Self::MySql(conn) => conn.timestamp_desc(),
1021            Self::SqlServer(conn) => conn.timestamp_desc(),
1022            Self::LoadGenerator(conn) => conn.timestamp_desc(),
1023        }
1024    }
1025
1026    fn connection_id(&self) -> Option<CatalogItemId> {
1027        match self {
1028            Self::Kafka(conn) => conn.connection_id(),
1029            Self::Postgres(conn) => conn.connection_id(),
1030            Self::MySql(conn) => conn.connection_id(),
1031            Self::SqlServer(conn) => conn.connection_id(),
1032            Self::LoadGenerator(conn) => conn.connection_id(),
1033        }
1034    }
1035
1036    fn primary_export_details(&self) -> SourceExportDetails {
1037        match self {
1038            Self::Kafka(conn) => conn.primary_export_details(),
1039            Self::Postgres(conn) => conn.primary_export_details(),
1040            Self::MySql(conn) => conn.primary_export_details(),
1041            Self::SqlServer(conn) => conn.primary_export_details(),
1042            Self::LoadGenerator(conn) => conn.primary_export_details(),
1043        }
1044    }
1045
1046    fn supports_read_only(&self) -> bool {
1047        match self {
1048            GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
1049            GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
1050            GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
1051            GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
1052            GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
1053        }
1054    }
1055
1056    fn prefers_single_replica(&self) -> bool {
1057        match self {
1058            GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
1059            GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
1060            GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
1061            GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
1062            GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
1063        }
1064    }
1065}
1066impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
1067    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1068        if self == other {
1069            return Ok(());
1070        }
1071        let r = match (self, other) {
1072            (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
1073            (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
1074            (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
1075            (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
1076            (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
1077                conn.alter_compatible(id, other)
1078            }
1079            _ => Err(AlterError { id }),
1080        };
1081
1082        if r.is_err() {
1083            tracing::warn!(
1084                "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1085                self,
1086                other
1087            );
1088        }
1089
1090        r
1091    }
1092}
1093
1094impl RustType<ProtoSourceConnection> for GenericSourceConnection<InlinedConnection> {
1095    fn into_proto(&self) -> ProtoSourceConnection {
1096        use proto_source_connection::Kind;
1097        ProtoSourceConnection {
1098            kind: Some(match self {
1099                GenericSourceConnection::Kafka(kafka) => Kind::Kafka(kafka.into_proto()),
1100                GenericSourceConnection::Postgres(postgres) => {
1101                    Kind::Postgres(postgres.into_proto())
1102                }
1103                GenericSourceConnection::MySql(mysql) => Kind::Mysql(mysql.into_proto()),
1104                GenericSourceConnection::SqlServer(sql_server) => {
1105                    Kind::SqlServer(sql_server.into_proto())
1106                }
1107                GenericSourceConnection::LoadGenerator(loadgen) => {
1108                    Kind::Loadgen(loadgen.into_proto())
1109                }
1110            }),
1111        }
1112    }
1113
1114    fn from_proto(proto: ProtoSourceConnection) -> Result<Self, TryFromProtoError> {
1115        use proto_source_connection::Kind;
1116        let kind = proto
1117            .kind
1118            .ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceConnection::kind"))?;
1119        Ok(match kind {
1120            Kind::Kafka(kafka) => GenericSourceConnection::Kafka(kafka.into_rust()?),
1121            Kind::Postgres(postgres) => GenericSourceConnection::Postgres(postgres.into_rust()?),
1122            Kind::Mysql(mysql) => GenericSourceConnection::MySql(mysql.into_rust()?),
1123            Kind::SqlServer(sql_server) => {
1124                GenericSourceConnection::SqlServer(sql_server.into_rust()?)
1125            }
1126            Kind::Loadgen(loadgen) => GenericSourceConnection::LoadGenerator(loadgen.into_rust()?),
1127        })
1128    }
1129}
1130
1131/// Details necessary for each source export to allow the source implementations
1132/// to export data to the export's collection.
1133#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1134pub enum SourceExportDetails {
1135    /// Used when the primary collection of a source isn't an export to
1136    /// output to.
1137    None,
1138    Kafka(KafkaSourceExportDetails),
1139    Postgres(PostgresSourceExportDetails),
1140    MySql(MySqlSourceExportDetails),
1141    SqlServer(SqlServerSourceExportDetails),
1142    LoadGenerator(LoadGeneratorSourceExportDetails),
1143}
1144
1145impl crate::AlterCompatible for SourceExportDetails {
1146    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1147        if self == other {
1148            return Ok(());
1149        }
1150        let r = match (self, other) {
1151            (Self::None, Self::None) => Ok(()),
1152            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
1153            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
1154            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
1155            (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
1156            _ => Err(AlterError { id }),
1157        };
1158
1159        if r.is_err() {
1160            tracing::warn!(
1161                "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1162                self,
1163                other
1164            );
1165        }
1166
1167        r
1168    }
1169}
1170
1171impl RustType<ProtoSourceExportDetails> for SourceExportDetails {
1172    fn into_proto(&self) -> ProtoSourceExportDetails {
1173        use proto_source_export_details::Kind;
1174        ProtoSourceExportDetails {
1175            kind: match self {
1176                SourceExportDetails::None => None,
1177                SourceExportDetails::Kafka(details) => Some(Kind::Kafka(details.into_proto())),
1178                SourceExportDetails::Postgres(details) => {
1179                    Some(Kind::Postgres(details.into_proto()))
1180                }
1181                SourceExportDetails::MySql(details) => Some(Kind::Mysql(details.into_proto())),
1182                SourceExportDetails::SqlServer(details) => {
1183                    Some(Kind::SqlServer(details.into_proto()))
1184                }
1185                SourceExportDetails::LoadGenerator(details) => {
1186                    Some(Kind::Loadgen(details.into_proto()))
1187                }
1188            },
1189        }
1190    }
1191
1192    fn from_proto(proto: ProtoSourceExportDetails) -> Result<Self, TryFromProtoError> {
1193        use proto_source_export_details::Kind;
1194        Ok(match proto.kind {
1195            None => SourceExportDetails::None,
1196            Some(Kind::Kafka(details)) => SourceExportDetails::Kafka(details.into_rust()?),
1197            Some(Kind::Postgres(details)) => SourceExportDetails::Postgres(details.into_rust()?),
1198            Some(Kind::Mysql(details)) => SourceExportDetails::MySql(details.into_rust()?),
1199            Some(Kind::SqlServer(details)) => SourceExportDetails::SqlServer(details.into_rust()?),
1200            Some(Kind::Loadgen(details)) => {
1201                SourceExportDetails::LoadGenerator(details.into_rust()?)
1202            }
1203        })
1204    }
1205}
1206
1207/// Details necessary to store in the `Details` option of a source export
1208/// statement (`CREATE SUBSOURCE` and `CREATE TABLE .. FROM SOURCE` statements),
1209/// to generate the appropriate `SourceExportDetails` struct during planning.
1210/// NOTE that this is serialized as proto to the catalog, so any changes here
1211/// must be backwards compatible or will require a migration.
1212pub enum SourceExportStatementDetails {
1213    Postgres {
1214        table: mz_postgres_util::desc::PostgresTableDesc,
1215    },
1216    MySql {
1217        table: mz_mysql_util::MySqlTableDesc,
1218        initial_gtid_set: String,
1219    },
1220    SqlServer {
1221        table: mz_sql_server_util::desc::SqlServerTableDesc,
1222        capture_instance: Arc<str>,
1223        initial_lsn: mz_sql_server_util::cdc::Lsn,
1224    },
1225    LoadGenerator {
1226        output: LoadGeneratorOutput,
1227    },
1228    Kafka {},
1229}
1230
1231impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
1232    fn into_proto(&self) -> ProtoSourceExportStatementDetails {
1233        match self {
1234            SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
1235                kind: Some(proto_source_export_statement_details::Kind::Postgres(
1236                    postgres::ProtoPostgresSourceExportStatementDetails {
1237                        table: Some(table.into_proto()),
1238                    },
1239                )),
1240            },
1241            SourceExportStatementDetails::MySql {
1242                table,
1243                initial_gtid_set,
1244            } => ProtoSourceExportStatementDetails {
1245                kind: Some(proto_source_export_statement_details::Kind::Mysql(
1246                    mysql::ProtoMySqlSourceExportStatementDetails {
1247                        table: Some(table.into_proto()),
1248                        initial_gtid_set: initial_gtid_set.clone(),
1249                    },
1250                )),
1251            },
1252            SourceExportStatementDetails::SqlServer {
1253                table,
1254                capture_instance,
1255                initial_lsn,
1256            } => ProtoSourceExportStatementDetails {
1257                kind: Some(proto_source_export_statement_details::Kind::SqlServer(
1258                    sql_server::ProtoSqlServerSourceExportStatementDetails {
1259                        table: Some(table.into_proto()),
1260                        capture_instance: capture_instance.to_string(),
1261                        initial_lsn: initial_lsn.as_bytes().to_vec(),
1262                    },
1263                )),
1264            },
1265            SourceExportStatementDetails::LoadGenerator { output } => {
1266                ProtoSourceExportStatementDetails {
1267                    kind: Some(proto_source_export_statement_details::Kind::Loadgen(
1268                        load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
1269                            output: output.into_proto().into(),
1270                        },
1271                    )),
1272                }
1273            }
1274            SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
1275                kind: Some(proto_source_export_statement_details::Kind::Kafka(
1276                    kafka::ProtoKafkaSourceExportStatementDetails {},
1277                )),
1278            },
1279        }
1280    }
1281
1282    fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
1283        use proto_source_export_statement_details::Kind;
1284        Ok(match proto.kind {
1285            Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
1286                table: details
1287                    .table
1288                    .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
1289            },
1290            Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
1291                table: details
1292                    .table
1293                    .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
1294
1295                initial_gtid_set: details.initial_gtid_set,
1296            },
1297            Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
1298                table: details
1299                    .table
1300                    .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
1301                capture_instance: details.capture_instance.into(),
1302                initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
1303                    .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
1304            },
1305            Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
1306                output: details
1307                    .output
1308                    .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1309            },
1310            Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1311            None => {
1312                return Err(TryFromProtoError::missing_field(
1313                    "ProtoSourceExportStatementDetails::kind",
1314                ));
1315            }
1316        })
1317    }
1318}
1319
1320#[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1321#[repr(transparent)]
1322pub struct SourceData(pub Result<Row, DataflowError>);
1323
1324impl Default for SourceData {
1325    fn default() -> Self {
1326        SourceData(Ok(Row::default()))
1327    }
1328}
1329
1330impl Deref for SourceData {
1331    type Target = Result<Row, DataflowError>;
1332
1333    fn deref(&self) -> &Self::Target {
1334        &self.0
1335    }
1336}
1337
1338impl DerefMut for SourceData {
1339    fn deref_mut(&mut self) -> &mut Self::Target {
1340        &mut self.0
1341    }
1342}
1343
1344impl RustType<ProtoSourceData> for SourceData {
1345    fn into_proto(&self) -> ProtoSourceData {
1346        use proto_source_data::Kind;
1347        ProtoSourceData {
1348            kind: Some(match &**self {
1349                Ok(row) => Kind::Ok(row.into_proto()),
1350                Err(err) => Kind::Err(err.into_proto()),
1351            }),
1352        }
1353    }
1354
1355    fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1356        use proto_source_data::Kind;
1357        match proto.kind {
1358            Some(kind) => match kind {
1359                Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1360                Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1361            },
1362            None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1363        }
1364    }
1365}
1366
1367impl Codec for SourceData {
1368    type Storage = ProtoRow;
1369    type Schema = RelationDesc;
1370
1371    fn codec_name() -> String {
1372        "protobuf[SourceData]".into()
1373    }
1374
1375    fn encode<B: BufMut>(&self, buf: &mut B) {
1376        self.into_proto()
1377            .encode(buf)
1378            .expect("no required fields means no initialization errors");
1379    }
1380
1381    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1382        let mut val = SourceData::default();
1383        <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1384        Ok(val)
1385    }
1386
1387    fn decode_from<'a>(
1388        &mut self,
1389        buf: &'a [u8],
1390        storage: &mut Option<ProtoRow>,
1391        schema: &RelationDesc,
1392    ) -> Result<(), String> {
1393        // Optimize for common case of `Ok` by leaving a (cleared) `ProtoRow` in
1394        // the `Ok` variant of `ProtoSourceData`. prost's `Message::merge` impl
1395        // is smart about reusing the `Vec<Datum>` when it can.
1396        let mut proto = storage.take().unwrap_or_default();
1397        proto.clear();
1398        let mut proto = ProtoSourceData {
1399            kind: Some(proto_source_data::Kind::Ok(proto)),
1400        };
1401        proto.merge(buf).map_err(|err| err.to_string())?;
1402        match (proto.kind, &mut self.0) {
1403            // Again, optimize for the common case...
1404            (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1405                let ret = row.decode_from_proto(&proto, schema);
1406                storage.replace(proto);
1407                ret
1408            }
1409            // ...otherwise fall back to the obvious thing.
1410            (kind, _) => {
1411                let proto = ProtoSourceData { kind };
1412                *self = proto.into_rust().map_err(|err| err.to_string())?;
1413                // Nothing to put back in storage.
1414                Ok(())
1415            }
1416        }
1417    }
1418
1419    fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1420        match &val.0 {
1421            Ok(row) => Row::validate(row, desc),
1422            Err(_) => Ok(()),
1423        }
1424    }
1425
1426    fn encode_schema(schema: &Self::Schema) -> Bytes {
1427        schema.into_proto().encode_to_vec().into()
1428    }
1429
1430    fn decode_schema(buf: &Bytes) -> Self::Schema {
1431        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1432        proto.into_rust().expect("valid schema")
1433    }
1434}
1435
1436/// Given a [`RelationDesc`] returns an arbitrary [`SourceData`].
1437pub fn arb_source_data_for_relation_desc(
1438    desc: &RelationDesc,
1439) -> impl Strategy<Value = SourceData> + use<> {
1440    let row_strat = arb_row_for_relation(desc).no_shrink();
1441
1442    proptest::strategy::Union::new_weighted(vec![
1443        (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1444        (
1445            1,
1446            any::<DataflowError>()
1447                .prop_map(|err| SourceData(Err(err)))
1448                .no_shrink()
1449                .boxed(),
1450        ),
1451    ])
1452}
1453
1454/// Describes how external references should be organized in a multi-level
1455/// hierarchy.
1456///
1457/// For both PostgreSQL and MySQL sources, these levels of reference are
1458/// intrinsic to the items which we're referencing. If there are other naming
1459/// schemas for other types of sources we discover, we might need to revisit
1460/// this.
1461pub trait ExternalCatalogReference {
1462    /// The "second" level of namespacing for the reference.
1463    fn schema_name(&self) -> &str;
1464    /// The lowest level of namespacing for the reference.
1465    fn item_name(&self) -> &str;
1466}
1467
1468impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1469    fn schema_name(&self) -> &str {
1470        &self.schema_name
1471    }
1472
1473    fn item_name(&self) -> &str {
1474        &self.name
1475    }
1476}
1477
1478impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1479    fn schema_name(&self) -> &str {
1480        &self.namespace
1481    }
1482
1483    fn item_name(&self) -> &str {
1484        &self.name
1485    }
1486}
1487
1488impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1489    fn schema_name(&self) -> &str {
1490        &*self.schema_name
1491    }
1492
1493    fn item_name(&self) -> &str {
1494        &*self.name
1495    }
1496}
1497
1498// This implementation provides a means of converting arbitrary objects into a
1499// `SubsourceCatalogReference`, e.g. load generator view names.
1500impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1501    fn schema_name(&self) -> &str {
1502        self.0
1503    }
1504
1505    fn item_name(&self) -> &str {
1506        self.1
1507    }
1508}
1509
1510/// Stores and resolves references to a `&[T: ExternalCatalogReference]`.
1511///
1512/// This is meant to provide an API to quickly look up a source's subsources.
1513///
1514/// For sources that do not provide any subsources, use the `Default`
1515/// implementation, which is empty and will not be able to resolve any
1516/// references.
1517#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1518pub struct SourceReferenceResolver {
1519    inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1520}
1521
1522#[derive(Debug, Clone, thiserror::Error)]
1523pub enum ExternalReferenceResolutionError {
1524    #[error("reference to {name} not found in source")]
1525    DoesNotExist { name: String },
1526    #[error(
1527        "reference {name} is ambiguous, consider specifying an additional \
1528    layer of qualification"
1529    )]
1530    Ambiguous { name: String },
1531    #[error("invalid identifier: {0}")]
1532    Ident(#[from] IdentError),
1533}
1534
1535impl<'a> SourceReferenceResolver {
1536    /// Constructs a new `SourceReferenceResolver` from a slice of `T:
1537    /// SubsourceCatalogReference`.
1538    ///
1539    /// # Errors
1540    /// - If any `&str` provided cannot be taken to an [`Ident`].
1541    pub fn new<T: ExternalCatalogReference>(
1542        database: &str,
1543        referenceable_items: &'a [T],
1544    ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1545        // An index from table name -> schema name -> database name -> index in
1546        // `referenceable_items`.
1547        let mut inner = BTreeMap::new();
1548
1549        let database = Ident::new(database)?;
1550
1551        for (reference_idx, item) in referenceable_items.iter().enumerate() {
1552            let item_name = Ident::new(item.item_name())?;
1553            let schema_name = Ident::new(item.schema_name())?;
1554
1555            inner
1556                .entry(item_name)
1557                .or_insert_with(BTreeMap::new)
1558                .entry(schema_name)
1559                .or_insert_with(BTreeMap::new)
1560                .entry(database.clone())
1561                .or_insert(reference_idx);
1562        }
1563
1564        Ok(SourceReferenceResolver { inner })
1565    }
1566
1567    /// Returns the canonical reference and index from which it originated in
1568    /// the `referenceable_items` provided to [`Self::new`].
1569    ///
1570    /// # Args
1571    /// - `name` is `&[Ident]` to let users provide the inner element of
1572    ///   [`UnresolvedItemName`].
1573    /// - `canonicalize_to_width` limits the number of elements in the returned
1574    ///   [`UnresolvedItemName`];this is useful if the source type requires
1575    ///   contriving database and schema names that a subsource should not
1576    ///   persist as its reference.
1577    ///
1578    /// # Errors
1579    /// - If `name` does not resolve to an item in `self.inner`.
1580    ///
1581    /// # Panics
1582    /// - If `canonicalize_to_width`` is not in `1..=3`.
1583    pub fn resolve(
1584        &self,
1585        name: &[Ident],
1586        canonicalize_to_width: usize,
1587    ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1588        let (db, schema, idx) = self.resolve_inner(name)?;
1589
1590        let item = name.last().expect("must have provided at least 1 element");
1591
1592        let canonical_name = match canonicalize_to_width {
1593            1 => vec![item.clone()],
1594            2 => vec![schema.clone(), item.clone()],
1595            3 => vec![db.clone(), schema.clone(), item.clone()],
1596            o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1597        };
1598
1599        Ok((UnresolvedItemName(canonical_name), idx))
1600    }
1601
1602    /// Returns the index from which it originated in the `referenceable_items`
1603    /// provided to [`Self::new`].
1604    ///
1605    /// # Args
1606    /// `name` is `&[Ident]` to let users provide the inner element of
1607    /// [`UnresolvedItemName`].
1608    ///
1609    /// # Errors
1610    /// - If `name` does not resolve to an item in `self.inner`.
1611    pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1612        let (_db, _schema, idx) = self.resolve_inner(name)?;
1613        Ok(idx)
1614    }
1615
1616    /// Returns the index from which it originated in the `referenceable_items`
1617    /// provided to [`Self::new`].
1618    ///
1619    /// # Args
1620    /// `name` is `&[Ident]` to let users provide the inner element of
1621    /// [`UnresolvedItemName`].
1622    ///
1623    /// # Return
1624    /// Returns a tuple whose elements are:
1625    /// 1. The "database"- or top-level namespace of the reference.
1626    /// 2. The "schema"- or second-level namespace of the reference.
1627    /// 3. The index to find the item in `referenceable_items` argument provided
1628    ///    to `SourceReferenceResolver::new`.
1629    ///
1630    /// # Errors
1631    /// - If `name` does not resolve to an item in `self.inner`.
1632    fn resolve_inner<'name: 'a>(
1633        &'a self,
1634        name: &'name [Ident],
1635    ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1636        let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1637
1638        // Names must be composed of 1..=3 elements.
1639        if !(1..=3).contains(&name.len()) {
1640            Err(ExternalReferenceResolutionError::DoesNotExist {
1641                name: get_provided_name(),
1642            })?;
1643        }
1644
1645        // Fill on the leading elements with `None` if they aren't present.
1646        let mut names = std::iter::repeat(None)
1647            .take(3 - name.len())
1648            .chain(name.iter().map(Some));
1649
1650        let database = names.next().flatten();
1651        let schema = names.next().flatten();
1652        let item = names
1653            .next()
1654            .flatten()
1655            .expect("must have provided the item name");
1656
1657        assert_none!(names.next(), "expected a 3-element iterator");
1658
1659        let schemas =
1660            self.inner
1661                .get(item)
1662                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1663                    name: get_provided_name(),
1664                })?;
1665
1666        let schema = match schema {
1667            Some(schema) => schema,
1668            None => schemas.keys().exactly_one().map_err(|_e| {
1669                ExternalReferenceResolutionError::Ambiguous {
1670                    name: get_provided_name(),
1671                }
1672            })?,
1673        };
1674
1675        let databases =
1676            schemas
1677                .get(schema)
1678                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1679                    name: get_provided_name(),
1680                })?;
1681
1682        let database = match database {
1683            Some(database) => database,
1684            None => databases.keys().exactly_one().map_err(|_e| {
1685                ExternalReferenceResolutionError::Ambiguous {
1686                    name: get_provided_name(),
1687                }
1688            })?,
1689        };
1690
1691        let reference_idx = databases.get(database).ok_or_else(|| {
1692            ExternalReferenceResolutionError::DoesNotExist {
1693                name: get_provided_name(),
1694            }
1695        })?;
1696
1697        Ok((database, schema, *reference_idx))
1698    }
1699}
1700
1701/// A decoder for [`Row`]s within [`SourceData`].
1702///
1703/// This type exists as a wrapper around [`RowColumnarDecoder`] to handle the
1704/// case where the [`RelationDesc`] we're encoding with has no columns. See
1705/// [`SourceDataRowColumnarEncoder`] for more details.
1706#[derive(Debug)]
1707pub enum SourceDataRowColumnarDecoder {
1708    Row(RowColumnarDecoder),
1709    EmptyRow,
1710}
1711
1712impl SourceDataRowColumnarDecoder {
1713    pub fn decode(&self, idx: usize, row: &mut Row) {
1714        match self {
1715            SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1716            SourceDataRowColumnarDecoder::EmptyRow => {
1717                // Create a packer just to clear the Row.
1718                row.packer();
1719            }
1720        }
1721    }
1722
1723    pub fn goodbytes(&self) -> usize {
1724        match self {
1725            SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1726            SourceDataRowColumnarDecoder::EmptyRow => 0,
1727        }
1728    }
1729}
1730
1731#[derive(Debug)]
1732pub struct SourceDataColumnarDecoder {
1733    row_decoder: SourceDataRowColumnarDecoder,
1734    err_decoder: BinaryArray,
1735}
1736
1737impl SourceDataColumnarDecoder {
1738    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1739        // TODO(parkmcar): We should validate the fields here.
1740        let (_fields, arrays, nullability) = col.into_parts();
1741
1742        if nullability.is_some() {
1743            anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1744        }
1745        if arrays.len() != 2 {
1746            anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1747        }
1748
1749        let errs = arrays[1]
1750            .as_any()
1751            .downcast_ref::<BinaryArray>()
1752            .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1753
1754        let row_decoder = match arrays[0].data_type() {
1755            arrow::datatypes::DataType::Struct(_) => {
1756                let rows = arrays[0]
1757                    .as_any()
1758                    .downcast_ref::<StructArray>()
1759                    .ok_or_else(|| {
1760                        anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1761                    })?;
1762                let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1763                SourceDataRowColumnarDecoder::Row(decoder)
1764            }
1765            arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1766            other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1767        };
1768
1769        Ok(SourceDataColumnarDecoder {
1770            row_decoder,
1771            err_decoder: errs.clone(),
1772        })
1773    }
1774}
1775
1776impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1777    fn decode(&self, idx: usize, val: &mut SourceData) {
1778        let err_null = self.err_decoder.is_null(idx);
1779        let row_null = match &self.row_decoder {
1780            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1781            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1782        };
1783
1784        match (row_null, err_null) {
1785            (true, false) => {
1786                let err = self.err_decoder.value(idx);
1787                let err = ProtoDataflowError::decode(err)
1788                    .expect("proto should be valid")
1789                    .into_rust()
1790                    .expect("error should be valid");
1791                val.0 = Err(err);
1792            }
1793            (false, true) => {
1794                let row = match val.0.as_mut() {
1795                    Ok(row) => row,
1796                    Err(_) => {
1797                        val.0 = Ok(Row::default());
1798                        val.0.as_mut().unwrap()
1799                    }
1800                };
1801                self.row_decoder.decode(idx, row);
1802            }
1803            (true, true) => panic!("should have one of 'ok' or 'err'"),
1804            (false, false) => panic!("cannot have both 'ok' and 'err'"),
1805        }
1806    }
1807
1808    fn is_null(&self, idx: usize) -> bool {
1809        let err_null = self.err_decoder.is_null(idx);
1810        let row_null = match &self.row_decoder {
1811            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1812            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1813        };
1814        assert!(!err_null || !row_null, "SourceData should never be null!");
1815
1816        false
1817    }
1818
1819    fn goodbytes(&self) -> usize {
1820        self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1821    }
1822
1823    fn stats(&self) -> StructStats {
1824        let len = self.err_decoder.len();
1825        let err_stats = ColumnarStats {
1826            nulls: Some(ColumnNullStats {
1827                count: self.err_decoder.null_count(),
1828            }),
1829            values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1830        };
1831        // The top level struct is non-nullable and every entry is either an
1832        // `Ok(Row)` or an `Err(String)`. As a result, we can compute the number
1833        // of `Ok` entries by subtracting the number of `Err` entries from the
1834        // total count.
1835        let row_null_count = len - self.err_decoder.null_count();
1836        let row_stats = match &self.row_decoder {
1837            SourceDataRowColumnarDecoder::Row(encoder) => {
1838                // Sanity check that the number of row nulls/nones we calculated
1839                // using the error column matches what the row column thinks it
1840                // has.
1841                assert_eq!(encoder.null_count(), row_null_count);
1842                encoder.stats()
1843            }
1844            SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1845                len,
1846                cols: BTreeMap::default(),
1847            },
1848        };
1849        let row_stats = ColumnarStats {
1850            nulls: Some(ColumnNullStats {
1851                count: row_null_count,
1852            }),
1853            values: ColumnStatKinds::Struct(row_stats),
1854        };
1855
1856        let stats = [
1857            (
1858                SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1859                row_stats,
1860            ),
1861            (
1862                SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1863                err_stats,
1864            ),
1865        ];
1866        StructStats {
1867            len,
1868            cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1869        }
1870    }
1871}
1872
1873/// An encoder for [`Row`]s within [`SourceData`].
1874///
1875/// This type exists as a wrapper around [`RowColumnarEncoder`] to support
1876/// encoding empty [`Row`]s. A [`RowColumnarEncoder`] finishes as a
1877/// [`StructArray`] which is required to have at least one column, and thus
1878/// cannot support empty [`Row`]s.
1879#[derive(Debug)]
1880pub enum SourceDataRowColumnarEncoder {
1881    Row(RowColumnarEncoder),
1882    EmptyRow,
1883}
1884
1885impl SourceDataRowColumnarEncoder {
1886    pub(crate) fn goodbytes(&self) -> usize {
1887        match self {
1888            SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1889            SourceDataRowColumnarEncoder::EmptyRow => 0,
1890        }
1891    }
1892
1893    pub fn append(&mut self, row: &Row) {
1894        match self {
1895            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1896            SourceDataRowColumnarEncoder::EmptyRow => {
1897                assert_eq!(row.iter().count(), 0)
1898            }
1899        }
1900    }
1901
1902    pub fn append_null(&mut self) {
1903        match self {
1904            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1905            SourceDataRowColumnarEncoder::EmptyRow => (),
1906        }
1907    }
1908}
1909
1910#[derive(Debug)]
1911pub struct SourceDataColumnarEncoder {
1912    row_encoder: SourceDataRowColumnarEncoder,
1913    err_encoder: BinaryBuilder,
1914}
1915
1916impl SourceDataColumnarEncoder {
1917    const OK_COLUMN_NAME: &'static str = "ok";
1918    const ERR_COLUMN_NAME: &'static str = "err";
1919
1920    pub fn new(desc: &RelationDesc) -> Self {
1921        let row_encoder = match RowColumnarEncoder::new(desc) {
1922            Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1923            None => {
1924                assert!(desc.typ().columns().is_empty());
1925                SourceDataRowColumnarEncoder::EmptyRow
1926            }
1927        };
1928        let err_encoder = BinaryBuilder::new();
1929
1930        SourceDataColumnarEncoder {
1931            row_encoder,
1932            err_encoder,
1933        }
1934    }
1935}
1936
1937impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1938    type FinishedColumn = StructArray;
1939
1940    fn goodbytes(&self) -> usize {
1941        self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1942    }
1943
1944    #[inline]
1945    fn append(&mut self, val: &SourceData) {
1946        match val.0.as_ref() {
1947            Ok(row) => {
1948                self.row_encoder.append(row);
1949                self.err_encoder.append_null();
1950            }
1951            Err(err) => {
1952                self.row_encoder.append_null();
1953                self.err_encoder
1954                    .append_value(err.into_proto().encode_to_vec());
1955            }
1956        }
1957    }
1958
1959    #[inline]
1960    fn append_null(&mut self) {
1961        panic!("appending a null into SourceDataColumnarEncoder is not supported");
1962    }
1963
1964    fn finish(self) -> Self::FinishedColumn {
1965        let SourceDataColumnarEncoder {
1966            row_encoder,
1967            mut err_encoder,
1968        } = self;
1969
1970        let err_column = BinaryBuilder::finish(&mut err_encoder);
1971        let row_column: ArrayRef = match row_encoder {
1972            SourceDataRowColumnarEncoder::Row(encoder) => {
1973                let column = encoder.finish();
1974                Arc::new(column)
1975            }
1976            SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1977        };
1978
1979        assert_eq!(row_column.len(), err_column.len());
1980
1981        let fields = vec![
1982            Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1983            Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1984        ];
1985        let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1986        StructArray::new(Fields::from(fields), arrays, None)
1987    }
1988}
1989
1990impl Schema<SourceData> for RelationDesc {
1991    type ArrowColumn = StructArray;
1992    type Statistics = StructStats;
1993
1994    type Decoder = SourceDataColumnarDecoder;
1995    type Encoder = SourceDataColumnarEncoder;
1996
1997    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1998        SourceDataColumnarDecoder::new(col, self)
1999    }
2000
2001    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
2002        Ok(SourceDataColumnarEncoder::new(self))
2003    }
2004}
2005
2006#[cfg(test)]
2007mod tests {
2008    use arrow::array::{ArrayData, make_comparator};
2009    use base64::Engine;
2010    use bytes::Bytes;
2011    use mz_expr::EvalError;
2012    use mz_ore::assert_err;
2013    use mz_ore::metrics::MetricsRegistry;
2014    use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
2015    use mz_persist::metrics::ColumnarMetrics;
2016    use mz_persist_types::parquet::EncodingConfig;
2017    use mz_persist_types::schema::{Migration, backward_compatible};
2018    use mz_persist_types::stats::{PartStats, PartStatsMetrics};
2019    use mz_repr::{
2020        ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
2021        RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
2022    };
2023    use proptest::prelude::*;
2024    use proptest::strategy::{Union, ValueTree};
2025
2026    use crate::stats::RelationPartStats;
2027
2028    use super::*;
2029
2030    #[mz_ore::test]
2031    fn test_timeline_parsing() {
2032        assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
2033        assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
2034        assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
2035
2036        assert_err!("Materialize".parse::<Timeline>());
2037        assert_err!("Ejoe".parse::<Timeline>());
2038        assert_err!("Umike".parse::<Timeline>());
2039        assert_err!("Dance".parse::<Timeline>());
2040        assert_err!("".parse::<Timeline>());
2041    }
2042
2043    #[track_caller]
2044    fn roundtrip_source_data(
2045        desc: &RelationDesc,
2046        datas: Vec<SourceData>,
2047        read_desc: &RelationDesc,
2048        config: &EncodingConfig,
2049    ) {
2050        let metrics = ColumnarMetrics::disconnected();
2051        let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
2052        for data in &datas {
2053            encoder.append(data);
2054        }
2055        let col = encoder.finish();
2056
2057        // The top-level StructArray for SourceData should always be non-nullable.
2058        assert!(!col.is_nullable());
2059
2060        // Reallocate our arrays with lgalloc.
2061        let col = realloc_array(&col, &metrics);
2062
2063        // Roundtrip through ProtoArray format.
2064        {
2065            let proto = col.to_data().into_proto();
2066            let bytes = proto.encode_to_vec();
2067            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2068            let array_data: ArrayData = proto.into_rust().unwrap();
2069
2070            let col_rnd = StructArray::from(array_data.clone());
2071            assert_eq!(col, col_rnd);
2072
2073            let col_dyn = arrow::array::make_array(array_data);
2074            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2075            assert_eq!(&col, col_dyn);
2076        }
2077
2078        // Encode to Parquet.
2079        let mut buf = Vec::new();
2080        let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
2081        let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
2082        mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
2083
2084        // Decode from Parquet.
2085        let buf = Bytes::from(buf);
2086        let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
2087        let maybe_batch = reader.next();
2088
2089        // If we didn't encode any data then our record_batch will be empty.
2090        let Some(record_batch) = maybe_batch else {
2091            assert!(datas.is_empty());
2092            return;
2093        };
2094        let record_batch = record_batch.unwrap();
2095
2096        assert_eq!(record_batch.columns().len(), 1);
2097        let rnd_col = &record_batch.columns()[0];
2098        let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
2099        let rnd_col = rnd_col
2100            .as_any()
2101            .downcast_ref::<StructArray>()
2102            .unwrap()
2103            .clone();
2104
2105        // Try generating stats for the data, just to make sure we don't panic.
2106        let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
2107            .expect("valid decoder")
2108            .stats();
2109
2110        // Read back all of our data and assert it roundtrips.
2111        let mut rnd_data = SourceData(Ok(Row::default()));
2112        let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
2113        for (idx, og_data) in datas.iter().enumerate() {
2114            decoder.decode(idx, &mut rnd_data);
2115            assert_eq!(og_data, &rnd_data);
2116        }
2117
2118        // Read back all of our data a second time with a projection applied, and make sure the
2119        // stats are valid.
2120        let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
2121        let stats = RelationPartStats {
2122            name: "test",
2123            metrics: &stats_metrics,
2124            stats: &PartStats { key: stats },
2125            desc: read_desc,
2126        };
2127        let mut datum_vec = DatumVec::new();
2128        let arena = RowArena::default();
2129        let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
2130
2131        for (idx, og_data) in datas.iter().enumerate() {
2132            decoder.decode(idx, &mut rnd_data);
2133            match (&og_data.0, &rnd_data.0) {
2134                (Ok(og_row), Ok(rnd_row)) => {
2135                    // Filter down to just the Datums in the projection schema.
2136                    {
2137                        let datums = datum_vec.borrow_with(og_row);
2138                        let projected_datums =
2139                            datums.iter().enumerate().filter_map(|(idx, datum)| {
2140                                read_desc
2141                                    .contains_index(&ColumnIndex::from_raw(idx))
2142                                    .then_some(datum)
2143                            });
2144                        let og_projected_row = Row::pack(projected_datums);
2145                        assert_eq!(&og_projected_row, rnd_row);
2146                    }
2147
2148                    // Validate the stats for all of our projected columns.
2149                    {
2150                        let proj_datums = datum_vec.borrow_with(rnd_row);
2151                        for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
2152                            let spec = stats.col_stats(idx, &arena);
2153                            assert!(spec.may_contain(proj_datums[pos]));
2154                        }
2155                    }
2156                }
2157                (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
2158                (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
2159            }
2160        }
2161
2162        // Verify that the RelationDesc itself roundtrips through
2163        // {encode,decode}_schema.
2164        let encoded_schema = SourceData::encode_schema(desc);
2165        let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
2166        assert_eq!(desc, &roundtrip_desc);
2167
2168        // Verify that the RelationDesc is backward compatible with itself (this
2169        // mostly checks for `unimplemented!` type panics).
2170        let migration =
2171            mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
2172        let migration = migration.expect("should be backward compatible with self");
2173        // Also verify that the Fn doesn't do anything wonky.
2174        let migrated = migration.migrate(Arc::new(col.clone()));
2175        assert_eq!(col.data_type(), migrated.data_type());
2176    }
2177
2178    #[mz_ore::test]
2179    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2180    fn all_source_data_roundtrips() {
2181        let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
2182        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
2183            weights.extend([
2184                (10, Just(32..128)),
2185                (5, Just(128..512)),
2186                (3, Just(512..2048)),
2187                (1, Just(2048..8192)),
2188            ]);
2189        }
2190        let num_rows = Union::new_weighted(weights);
2191
2192        // TODO(parkmycar): There are so many clones going on here, and maybe we can avoid them?
2193        let strat = (any::<RelationDesc>(), num_rows)
2194            .prop_flat_map(|(desc, num_rows)| {
2195                arb_relation_desc_projection(desc.clone())
2196                    .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
2197            })
2198            .prop_flat_map(|(desc, read_desc, num_rows)| {
2199                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2200                    .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
2201            });
2202
2203        proptest!(|((config, (desc, source_datas, read_desc)) in (any::<EncodingConfig>(), strat))| {
2204            roundtrip_source_data(&desc, source_datas, &read_desc, &config);
2205        });
2206    }
2207
2208    #[mz_ore::test]
2209    fn roundtrip_error_nulls() {
2210        let desc = RelationDescBuilder::default()
2211            .with_column(
2212                "ts",
2213                SqlScalarType::TimestampTz { precision: None }.nullable(false),
2214            )
2215            .finish();
2216        let source_datas = vec![SourceData(Err(DataflowError::EvalError(
2217            EvalError::DateOutOfRange.into(),
2218        )))];
2219        let config = EncodingConfig::default();
2220        roundtrip_source_data(&desc, source_datas, &desc, &config);
2221    }
2222
2223    fn is_sorted(array: &dyn Array) -> bool {
2224        let sort_options = arrow::compute::SortOptions::default();
2225        let Ok(cmp) = make_comparator(array, array, sort_options) else {
2226            // TODO: arrow v51.0.0 doesn't support comparing structs. When
2227            // we migrate to v52+, the `build_compare` function is
2228            // deprecated and replaced by `make_comparator`, which does
2229            // support structs. At which point, this will work (and we
2230            // should switch this early return to an expect, if possible).
2231            return false;
2232        };
2233        (0..array.len())
2234            .tuple_windows()
2235            .all(|(i, j)| cmp(i, j).is_le())
2236    }
2237
2238    fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
2239        use mz_persist_types::columnar::ColumnEncoder;
2240        let array = Schema::encoder(schema).expect("valid schema").finish();
2241        Array::data_type(&array).clone()
2242    }
2243
2244    #[track_caller]
2245    fn backward_compatible_testcase(
2246        old: &RelationDesc,
2247        new: &RelationDesc,
2248        migration: Migration,
2249        datas: &[SourceData],
2250    ) {
2251        let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
2252        for data in datas {
2253            encoder.append(data);
2254        }
2255        let old = encoder.finish();
2256        let new = Schema::<SourceData>::encoder(new)
2257            .expect("valid schema")
2258            .finish();
2259        let old: Arc<dyn Array> = Arc::new(old);
2260        let new: Arc<dyn Array> = Arc::new(new);
2261        let migrated = migration.migrate(Arc::clone(&old));
2262        assert_eq!(migrated.data_type(), new.data_type());
2263
2264        // Check the sortedness preservation, if we can.
2265        if migration.preserves_order() && is_sorted(&old) {
2266            assert!(is_sorted(&new))
2267        }
2268    }
2269
2270    #[mz_ore::test]
2271    fn backward_compatible_empty_add_column() {
2272        let old = RelationDesc::empty();
2273        let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
2274
2275        let old_data_type = get_data_type(&old);
2276        let new_data_type = get_data_type(&new);
2277
2278        let migration = backward_compatible(&old_data_type, &new_data_type);
2279        assert!(migration.is_some());
2280    }
2281
2282    #[mz_ore::test]
2283    fn backward_compatible_project_away_all() {
2284        let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
2285        let new = RelationDesc::empty();
2286
2287        let old_data_type = get_data_type(&old);
2288        let new_data_type = get_data_type(&new);
2289
2290        let migration = backward_compatible(&old_data_type, &new_data_type);
2291        assert!(migration.is_some());
2292    }
2293
2294    #[mz_ore::test]
2295    #[cfg_attr(miri, ignore)]
2296    fn backward_compatible_migrate() {
2297        let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
2298            proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
2299                .prop_map(move |datas| (old.clone(), new.clone(), datas))
2300        });
2301
2302        proptest!(|((old, new, datas) in strat)| {
2303            let old_data_type = get_data_type(&old);
2304            let new_data_type = get_data_type(&new);
2305
2306            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2307                backward_compatible_testcase(&old, &new, migration, &datas);
2308            };
2309        });
2310    }
2311
2312    #[mz_ore::test]
2313    #[cfg_attr(miri, ignore)]
2314    fn backward_compatible_migrate_from_common() {
2315        use mz_repr::SqlColumnType;
2316        fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2317            // TODO(parkmycar): As we iterate on schema migrations more things should become compatible.
2318            let should_be_compatible = diffs.iter().all(|diff| match diff {
2319                // We only support adding nullable columns.
2320                PropRelationDescDiff::AddColumn {
2321                    typ: SqlColumnType { nullable, .. },
2322                    ..
2323                } => *nullable,
2324                PropRelationDescDiff::DropColumn { .. } => true,
2325                _ => false,
2326            });
2327
2328            let mut new = old.clone();
2329            for diff in diffs.into_iter() {
2330                diff.apply(&mut new)
2331            }
2332
2333            let old_data_type = get_data_type(&old);
2334            let new_data_type = get_data_type(&new);
2335
2336            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2337                backward_compatible_testcase(&old, &new, migration, &datas);
2338            } else if should_be_compatible {
2339                panic!("new DataType was not compatible when it should have been!");
2340            }
2341        }
2342
2343        let strat = any::<RelationDesc>()
2344            .prop_flat_map(|desc| {
2345                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2346                    .no_shrink()
2347                    .prop_map(move |datas| (desc.clone(), datas))
2348            })
2349            .prop_flat_map(|(desc, datas)| {
2350                arb_relation_desc_diff(&desc)
2351                    .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2352            });
2353
2354        proptest!(|((old, diffs, datas) in strat)| {
2355            test_case(old, diffs, datas);
2356        });
2357    }
2358
2359    #[mz_ore::test]
2360    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2361    fn empty_relation_desc_roundtrips() {
2362        let empty = RelationDesc::empty();
2363        let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2364            .prop_map(move |datas| (empty.clone(), datas));
2365
2366        // Note: This case should be covered by the `all_source_data_roundtrips` test above, but
2367        // it's a special case that we explicitly want to exercise.
2368        proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2369            roundtrip_source_data(&desc, source_datas, &desc, &config);
2370        });
2371    }
2372
2373    #[mz_ore::test]
2374    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2375    fn arrow_datatype_consistent() {
2376        fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2377            let half = datas.len() / 2;
2378
2379            let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2380            for data in &datas[..half] {
2381                encoder_a.append(data);
2382            }
2383            let col_a = encoder_a.finish();
2384
2385            let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2386            for data in &datas[half..] {
2387                encoder_b.append(data);
2388            }
2389            let col_b = encoder_b.finish();
2390
2391            // The DataType of the resulting column should not change based on what data was
2392            // encoded.
2393            assert_eq!(col_a.data_type(), col_b.data_type());
2394        }
2395
2396        let num_rows = 12;
2397        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2398            proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2399                .prop_map(move |datas| (desc.clone(), datas))
2400        });
2401
2402        proptest!(|((desc, data) in strat)| {
2403            test_case(desc, data);
2404        });
2405    }
2406
2407    #[mz_ore::test]
2408    #[cfg_attr(miri, ignore)] // too slow
2409    fn source_proto_serialization_stability() {
2410        let min_protos = 10;
2411        let encoded = include_str!("snapshots/source-datas.txt");
2412
2413        // Decode the pre-generated source datas
2414        let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2415            .lines()
2416            .map(|s| {
2417                let (desc, data) = s.split_once(',').expect("comma separated data");
2418                let desc = base64::engine::general_purpose::STANDARD
2419                    .decode(desc)
2420                    .expect("valid base64");
2421                let data = base64::engine::general_purpose::STANDARD
2422                    .decode(data)
2423                    .expect("valid base64");
2424                (desc, data)
2425            })
2426            .map(|(desc, data)| {
2427                let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2428                let desc = desc.into_rust().expect("valid proto");
2429                let data = SourceData::decode(&data, &desc).expect("valid proto");
2430                (desc, data)
2431            })
2432            .collect();
2433
2434        // If there are fewer than the minimum examples, generate some new ones arbitrarily
2435        let mut runner = proptest::test_runner::TestRunner::deterministic();
2436        let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2437            arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2438        });
2439        while decoded.len() < min_protos {
2440            let arbitrary_data = strategy
2441                .new_tree(&mut runner)
2442                .expect("source data")
2443                .current();
2444            decoded.push(arbitrary_data);
2445        }
2446
2447        // Reencode and compare the strings
2448        let mut reencoded = String::new();
2449        let mut buf = vec![];
2450        for (desc, data) in decoded {
2451            buf.clear();
2452            desc.into_proto().encode(&mut buf).expect("success");
2453            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2454            reencoded.push(',');
2455
2456            buf.clear();
2457            data.encode(&mut buf);
2458            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2459            reencoded.push('\n');
2460        }
2461
2462        // Optimizations in Persist, particularly consolidation on read,
2463        // depend on a stable serialization for the serialized data.
2464        // For example, reordering proto fields could cause us
2465        // to generate a different (equivalent) serialization for a record,
2466        // and the two versions would not consolidate out.
2467        // This can impact correctness!
2468        //
2469        // If you need to change how SourceDatas are encoded, that's still fine...
2470        // but we'll also need to increase
2471        // the MINIMUM_CONSOLIDATED_VERSION as part of the same release.
2472        assert_eq!(
2473            encoded,
2474            reencoded.as_str(),
2475            "SourceData serde should be stable"
2476        )
2477    }
2478}