mz_storage_types/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to the introduction of changing collections into `dataflow`.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34    StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38    CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39    RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use proptest_derive::Arbitrary;
45use prost::Message;
46use serde::{Deserialize, Serialize};
47use timely::order::{PartialOrder, TotalOrder};
48use timely::progress::timestamp::Refines;
49use timely::progress::{PathSummary, Timestamp};
50
51use crate::AlterCompatible;
52use crate::connections::inline::{
53    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
54    ReferencedConnection,
55};
56use crate::controller::{AlterError, CollectionMetadata};
57use crate::errors::{DataflowError, ProtoDataflowError};
58use crate::instances::StorageInstanceId;
59use crate::sources::proto_ingestion_description::{ProtoSourceExport, ProtoSourceImport};
60use crate::sources::sql_server::SqlServerSourceExportDetails;
61
62pub mod encoding;
63pub mod envelope;
64pub mod kafka;
65pub mod load_generator;
66pub mod mysql;
67pub mod postgres;
68pub mod sql_server;
69
70pub use crate::sources::envelope::SourceEnvelope;
71pub use crate::sources::kafka::KafkaSourceConnection;
72pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
73pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
74pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
75pub use crate::sources::sql_server::{SqlServerSource, SqlServerSourceExtras};
76
77include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
78
79/// A description of a source ingestion
80#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
81pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
82    /// The source description.
83    pub desc: SourceDesc<C>,
84    /// Additional storage controller metadata needed to ingest this source
85    pub ingestion_metadata: S,
86    /// Collections to be exported by this ingestion.
87    ///
88    /// # Notes
89    /// - For multi-output sources:
90    ///     - Add exports by adding a new [`SourceExport`].
91    ///     - Remove exports by removing the [`SourceExport`].
92    ///
93    ///   Re-rendering/executing the source after making these modifications
94    ///   adds and drops the subsource, respectively.
95    /// - This field includes the primary source's ID, which might need to be
96    ///   filtered out to understand which exports are explicit ingestion exports.
97    /// - This field does _not_ include the remap collection, which is tracked
98    ///   in its own field.
99    #[proptest(
100        strategy = "proptest::collection::btree_map(any::<GlobalId>(), any::<SourceExport<S>>(), 0..4)"
101    )]
102    pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
103    /// The ID of the instance in which to install the source.
104    pub instance_id: StorageInstanceId,
105    /// The ID of this ingestion's remap/progress collection.
106    pub remap_collection_id: GlobalId,
107}
108
109impl IngestionDescription {
110    pub fn new(
111        desc: SourceDesc,
112        instance_id: StorageInstanceId,
113        remap_collection_id: GlobalId,
114    ) -> Self {
115        Self {
116            desc,
117            ingestion_metadata: (),
118            source_exports: BTreeMap::new(),
119            instance_id,
120            remap_collection_id,
121        }
122    }
123}
124
125impl<S> IngestionDescription<S> {
126    /// Return an iterator over the `GlobalId`s of `self`'s collections.
127    /// This will contain ids for the remap collection, subsources,
128    /// tables for this source, and the primary collection ID, even if
129    /// no data will be exported to the primary collection.
130    pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
131        // Expand self so that any new fields added generate a compiler error to
132        // increase the likelihood of developers seeing this function.
133        let IngestionDescription {
134            desc: _,
135            ingestion_metadata: _,
136            source_exports,
137            instance_id: _,
138            remap_collection_id,
139        } = &self;
140
141        source_exports
142            .keys()
143            .copied()
144            .chain(std::iter::once(*remap_collection_id))
145    }
146}
147
148impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
149    fn alter_compatible(
150        &self,
151        id: GlobalId,
152        other: &IngestionDescription<S>,
153    ) -> Result<(), AlterError> {
154        if self == other {
155            return Ok(());
156        }
157        let IngestionDescription {
158            desc,
159            ingestion_metadata,
160            source_exports,
161            instance_id,
162            remap_collection_id,
163        } = self;
164
165        let compatibility_checks = [
166            (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
167            (
168                ingestion_metadata == &other.ingestion_metadata,
169                "ingestion_metadata",
170            ),
171            (
172                source_exports
173                    .iter()
174                    .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
175                        l_key.cmp(r_key)
176                    })
177                    .all(|r| match r {
178                        Both(
179                            (
180                                _,
181                                SourceExport {
182                                    storage_metadata: l_metadata,
183                                    details: l_details,
184                                    data_config: l_data_config,
185                                },
186                            ),
187                            (
188                                _,
189                                SourceExport {
190                                    storage_metadata: r_metadata,
191                                    details: r_details,
192                                    data_config: r_data_config,
193                                },
194                            ),
195                        ) => {
196                            l_metadata.alter_compatible(id, r_metadata).is_ok()
197                                && l_details.alter_compatible(id, r_details).is_ok()
198                                && l_data_config.alter_compatible(id, r_data_config).is_ok()
199                        }
200                        _ => true,
201                    }),
202                "source_exports",
203            ),
204            (instance_id == &other.instance_id, "instance_id"),
205            (
206                remap_collection_id == &other.remap_collection_id,
207                "remap_collection_id",
208            ),
209        ];
210        for (compatible, field) in compatibility_checks {
211            if !compatible {
212                tracing::warn!(
213                    "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
214                    self,
215                    other
216                );
217
218                return Err(AlterError { id });
219            }
220        }
221
222        Ok(())
223    }
224}
225
226impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
227    for IngestionDescription<(), ReferencedConnection>
228{
229    fn into_inline_connection(self, r: R) -> IngestionDescription {
230        let IngestionDescription {
231            desc,
232            ingestion_metadata,
233            source_exports,
234            instance_id,
235            remap_collection_id,
236        } = self;
237
238        IngestionDescription {
239            desc: desc.into_inline_connection(r),
240            ingestion_metadata,
241            source_exports,
242            instance_id,
243            remap_collection_id,
244        }
245    }
246}
247
248#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
249pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
250    /// The collection metadata needed to write the exported data
251    pub storage_metadata: S,
252    /// Details necessary for the source to export data to this export's collection.
253    pub details: SourceExportDetails,
254    /// Config necessary to handle (e.g. decode and envelope) the data for this export.
255    pub data_config: SourceExportDataConfig<C>,
256}
257
258impl RustType<ProtoIngestionDescription> for IngestionDescription<CollectionMetadata> {
259    fn into_proto(&self) -> ProtoIngestionDescription {
260        ProtoIngestionDescription {
261            source_exports: self.source_exports.into_proto(),
262            ingestion_metadata: Some(self.ingestion_metadata.into_proto()),
263            desc: Some(self.desc.into_proto()),
264            instance_id: Some(self.instance_id.into_proto()),
265            remap_collection_id: Some(self.remap_collection_id.into_proto()),
266        }
267    }
268
269    fn from_proto(proto: ProtoIngestionDescription) -> Result<Self, TryFromProtoError> {
270        Ok(IngestionDescription {
271            source_exports: proto.source_exports.into_rust()?,
272            desc: proto
273                .desc
274                .into_rust_if_some("ProtoIngestionDescription::desc")?,
275            ingestion_metadata: proto
276                .ingestion_metadata
277                .into_rust_if_some("ProtoIngestionDescription::ingestion_metadata")?,
278            instance_id: proto
279                .instance_id
280                .into_rust_if_some("ProtoIngestionDescription::instance_id")?,
281            remap_collection_id: proto
282                .remap_collection_id
283                .into_rust_if_some("ProtoIngestionDescription::remap_collection_id")?,
284        })
285    }
286}
287
288impl ProtoMapEntry<GlobalId, CollectionMetadata> for ProtoSourceImport {
289    fn from_rust<'a>(entry: (&'a GlobalId, &'a CollectionMetadata)) -> Self {
290        ProtoSourceImport {
291            id: Some(entry.0.into_proto()),
292            storage_metadata: Some(entry.1.into_proto()),
293        }
294    }
295
296    fn into_rust(self) -> Result<(GlobalId, CollectionMetadata), TryFromProtoError> {
297        Ok((
298            self.id.into_rust_if_some("ProtoSourceImport::id")?,
299            self.storage_metadata
300                .into_rust_if_some("ProtoSourceImport::storage_metadata")?,
301        ))
302    }
303}
304
305impl ProtoMapEntry<GlobalId, SourceExport<CollectionMetadata>> for ProtoSourceExport {
306    fn from_rust<'a>(
307        (id, source_export): (&'a GlobalId, &'a SourceExport<CollectionMetadata>),
308    ) -> Self {
309        ProtoSourceExport {
310            id: Some(id.into_proto()),
311            storage_metadata: Some(source_export.storage_metadata.into_proto()),
312            details: Some(source_export.details.into_proto()),
313            data_config: Some(source_export.data_config.into_proto()),
314        }
315    }
316
317    fn into_rust(self) -> Result<(GlobalId, SourceExport<CollectionMetadata>), TryFromProtoError> {
318        Ok((
319            self.id.into_rust_if_some("ProtoSourceExport::id")?,
320            SourceExport {
321                storage_metadata: self
322                    .storage_metadata
323                    .into_rust_if_some("ProtoSourceExport::storage_metadata")?,
324                details: self
325                    .details
326                    .into_rust_if_some("ProtoSourceExport::details")?,
327                data_config: self
328                    .data_config
329                    .into_rust_if_some("ProtoSourceExport::data_config")?,
330            },
331        ))
332    }
333}
334
335pub trait SourceTimestamp:
336    Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
337{
338    fn encode_row(&self) -> Row;
339    fn decode_row(row: &Row) -> Self;
340}
341
342impl SourceTimestamp for MzOffset {
343    fn encode_row(&self) -> Row {
344        Row::pack([Datum::UInt64(self.offset)])
345    }
346
347    fn decode_row(row: &Row) -> Self {
348        let mut datums = row.iter();
349        match (datums.next(), datums.next()) {
350            (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
351            _ => panic!("invalid row {row:?}"),
352        }
353    }
354}
355
356/// Universal language for describing message positions in Materialize, in a source independent
357/// way. Individual sources like Kafka or File sources should explicitly implement their own offset
358/// type that converts to/From MzOffsets. A 0-MzOffset denotes an empty stream.
359#[derive(
360    Copy,
361    Clone,
362    Default,
363    Debug,
364    PartialEq,
365    PartialOrd,
366    Eq,
367    Ord,
368    Hash,
369    Serialize,
370    Deserialize,
371    Arbitrary,
372)]
373pub struct MzOffset {
374    pub offset: u64,
375}
376
377impl differential_dataflow::difference::Semigroup for MzOffset {
378    fn plus_equals(&mut self, rhs: &Self) {
379        self.offset.plus_equals(&rhs.offset)
380    }
381}
382
383impl differential_dataflow::difference::IsZero for MzOffset {
384    fn is_zero(&self) -> bool {
385        self.offset.is_zero()
386    }
387}
388
389impl mz_persist_types::Codec64 for MzOffset {
390    fn codec_name() -> String {
391        "MzOffset".to_string()
392    }
393
394    fn encode(&self) -> [u8; 8] {
395        mz_persist_types::Codec64::encode(&self.offset)
396    }
397
398    fn decode(buf: [u8; 8]) -> Self {
399        Self {
400            offset: mz_persist_types::Codec64::decode(buf),
401        }
402    }
403}
404
405impl columnation::Columnation for MzOffset {
406    type InnerRegion = columnation::CopyRegion<MzOffset>;
407}
408
409impl RustType<ProtoMzOffset> for MzOffset {
410    fn into_proto(&self) -> ProtoMzOffset {
411        ProtoMzOffset {
412            offset: self.offset,
413        }
414    }
415
416    fn from_proto(proto: ProtoMzOffset) -> Result<Self, TryFromProtoError> {
417        Ok(Self {
418            offset: proto.offset,
419        })
420    }
421}
422
423impl MzOffset {
424    pub fn checked_sub(self, other: Self) -> Option<Self> {
425        self.offset
426            .checked_sub(other.offset)
427            .map(|offset| Self { offset })
428    }
429}
430
431/// Convert from MzOffset to Kafka::Offset as long as
432/// the offset is not negative
433impl From<u64> for MzOffset {
434    fn from(offset: u64) -> Self {
435        Self { offset }
436    }
437}
438
439impl std::fmt::Display for MzOffset {
440    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
441        write!(f, "{}", self.offset)
442    }
443}
444
445// Assume overflow does not occur for addition
446impl Add<u64> for MzOffset {
447    type Output = MzOffset;
448
449    fn add(self, x: u64) -> MzOffset {
450        MzOffset {
451            offset: self.offset + x,
452        }
453    }
454}
455impl Add<Self> for MzOffset {
456    type Output = Self;
457
458    fn add(self, x: Self) -> Self {
459        MzOffset {
460            offset: self.offset + x.offset,
461        }
462    }
463}
464impl AddAssign<u64> for MzOffset {
465    fn add_assign(&mut self, x: u64) {
466        self.offset += x;
467    }
468}
469impl AddAssign<Self> for MzOffset {
470    fn add_assign(&mut self, x: Self) {
471        self.offset += x.offset;
472    }
473}
474
475/// Convert from `PgLsn` to MzOffset
476impl From<tokio_postgres::types::PgLsn> for MzOffset {
477    fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
478        MzOffset { offset: lsn.into() }
479    }
480}
481
482impl Timestamp for MzOffset {
483    type Summary = MzOffset;
484
485    fn minimum() -> Self {
486        MzOffset {
487            offset: Timestamp::minimum(),
488        }
489    }
490}
491
492impl PathSummary<MzOffset> for MzOffset {
493    fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
494        Some(MzOffset {
495            offset: self.offset.results_in(&src.offset)?,
496        })
497    }
498
499    fn followed_by(&self, other: &Self) -> Option<Self> {
500        Some(MzOffset {
501            offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
502        })
503    }
504}
505
506impl Refines<()> for MzOffset {
507    fn to_inner(_: ()) -> Self {
508        MzOffset::minimum()
509    }
510    fn to_outer(self) {}
511    fn summarize(_: Self::Summary) {}
512}
513
514impl PartialOrder for MzOffset {
515    #[inline]
516    fn less_equal(&self, other: &Self) -> bool {
517        self.offset.less_equal(&other.offset)
518    }
519}
520
521impl TotalOrder for MzOffset {}
522
523/// The meaning of the timestamp number produced by data sources. This type
524/// is not concerned with the source of the timestamp (like if the data came
525/// from a Debezium consistency topic or a CDCv2 stream), instead only what the
526/// timestamp number means.
527///
528/// Some variants here have attached data used to differentiate incomparable
529/// instantiations. These attached data types should be expanded in the future
530/// if we need to tell apart more kinds of sources.
531#[derive(Arbitrary, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Hash)]
532pub enum Timeline {
533    /// EpochMilliseconds means the timestamp is the number of milliseconds since
534    /// the Unix epoch.
535    EpochMilliseconds,
536    /// External means the timestamp comes from an external data source and we
537    /// don't know what the number means. The attached String is the source's name,
538    /// which will result in different sources being incomparable.
539    External(String),
540    /// User means the user has manually specified a timeline. The attached
541    /// String is specified by the user, allowing them to decide sources that are
542    /// joinable.
543    User(String),
544}
545
546impl Timeline {
547    const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
548    const EXTERNAL_ID_CHAR: char = 'E';
549    const USER_ID_CHAR: char = 'U';
550
551    fn id_char(&self) -> char {
552        match self {
553            Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
554            Self::External(_) => Self::EXTERNAL_ID_CHAR,
555            Self::User(_) => Self::USER_ID_CHAR,
556        }
557    }
558}
559
560impl RustType<ProtoTimeline> for Timeline {
561    fn into_proto(&self) -> ProtoTimeline {
562        use proto_timeline::Kind;
563        ProtoTimeline {
564            kind: Some(match self {
565                Timeline::EpochMilliseconds => Kind::EpochMilliseconds(()),
566                Timeline::External(s) => Kind::External(s.clone()),
567                Timeline::User(s) => Kind::User(s.clone()),
568            }),
569        }
570    }
571
572    fn from_proto(proto: ProtoTimeline) -> Result<Self, TryFromProtoError> {
573        use proto_timeline::Kind;
574        let kind = proto
575            .kind
576            .ok_or_else(|| TryFromProtoError::missing_field("ProtoTimeline::kind"))?;
577        Ok(match kind {
578            Kind::EpochMilliseconds(()) => Timeline::EpochMilliseconds,
579            Kind::External(s) => Timeline::External(s),
580            Kind::User(s) => Timeline::User(s),
581        })
582    }
583}
584
585impl ToString for Timeline {
586    fn to_string(&self) -> String {
587        match self {
588            Self::EpochMilliseconds => format!("{}", self.id_char()),
589            Self::External(id) => format!("{}.{id}", self.id_char()),
590            Self::User(id) => format!("{}.{id}", self.id_char()),
591        }
592    }
593}
594
595impl FromStr for Timeline {
596    type Err = String;
597
598    fn from_str(s: &str) -> Result<Self, Self::Err> {
599        if s.is_empty() {
600            return Err("empty timeline".to_string());
601        }
602        let mut chars = s.chars();
603        match chars.next().expect("non-empty string") {
604            Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
605                None => Ok(Self::EpochMilliseconds),
606                Some(_) => Err(format!("unknown timeline: {s}")),
607            },
608            Self::EXTERNAL_ID_CHAR => match chars.next() {
609                Some('.') => Ok(Self::External(chars.as_str().to_string())),
610                _ => Err(format!("unknown timeline: {s}")),
611            },
612            Self::USER_ID_CHAR => match chars.next() {
613                Some('.') => Ok(Self::User(chars.as_str().to_string())),
614                _ => Err(format!("unknown timeline: {s}")),
615            },
616            _ => Err(format!("unknown timeline: {s}")),
617        }
618    }
619}
620
621/// A connection to an external system
622pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
623    /// The name of the external system (e.g kafka, postgres, etc).
624    fn name(&self) -> &'static str;
625
626    /// The name of the resource in the external system (e.g kafka topic) if any
627    fn external_reference(&self) -> Option<&str>;
628
629    /// Defines the key schema to use by default for this source connection type.
630    /// This will be used for the primary export of the source and as the default
631    /// pre-encoding key schema for the source.
632    fn default_key_desc(&self) -> RelationDesc;
633
634    /// Defines the value schema to use by default for this source connection type.
635    /// This will be used for the primary export of the source and as the default
636    /// pre-encoding value schema for the source.
637    fn default_value_desc(&self) -> RelationDesc;
638
639    /// The schema of this connection's timestamp type. This will also be the schema of the
640    /// progress relation.
641    fn timestamp_desc(&self) -> RelationDesc;
642
643    /// The id of the connection object (i.e the one obtained from running `CREATE CONNECTION`) in
644    /// the catalog, if any.
645    fn connection_id(&self) -> Option<CatalogItemId>;
646
647    /// If this source connection can output to a primary collection, contains the source-specific
648    /// details of that export, else is set to `SourceExportDetails::None` to indicate that
649    /// this source should not export to the primary collection.
650    fn primary_export_details(&self) -> SourceExportDetails;
651
652    /// Whether the source type supports read only mode.
653    fn supports_read_only(&self) -> bool;
654
655    /// Whether the source type prefers to run on only one replica of a multi-replica cluster.
656    fn prefers_single_replica(&self) -> bool;
657}
658
659#[derive(Arbitrary, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
660pub enum Compression {
661    Gzip,
662    None,
663}
664
665impl RustType<ProtoCompression> for Compression {
666    fn into_proto(&self) -> ProtoCompression {
667        use proto_compression::Kind;
668        ProtoCompression {
669            kind: Some(match self {
670                Compression::Gzip => Kind::Gzip(()),
671                Compression::None => Kind::None(()),
672            }),
673        }
674    }
675
676    fn from_proto(proto: ProtoCompression) -> Result<Self, TryFromProtoError> {
677        use proto_compression::Kind;
678        Ok(match proto.kind {
679            Some(Kind::Gzip(())) => Compression::Gzip,
680            Some(Kind::None(())) => Compression::None,
681            None => {
682                return Err(TryFromProtoError::MissingField(
683                    "ProtoCompression::kind".into(),
684                ));
685            }
686        })
687    }
688}
689
690/// Defines the configuration for how to handle data that is exported for a given
691/// Source Export.
692#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
693pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
694    pub encoding: Option<encoding::SourceDataEncoding<C>>,
695    pub envelope: SourceEnvelope,
696}
697
698impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
699    for SourceExportDataConfig<ReferencedConnection>
700{
701    fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
702        let SourceExportDataConfig { encoding, envelope } = self;
703
704        SourceExportDataConfig {
705            encoding: encoding.map(|e| e.into_inline_connection(r)),
706            envelope,
707        }
708    }
709}
710
711impl RustType<ProtoSourceExportDataConfig> for SourceExportDataConfig {
712    fn into_proto(&self) -> ProtoSourceExportDataConfig {
713        ProtoSourceExportDataConfig {
714            encoding: self.encoding.into_proto(),
715            envelope: Some(self.envelope.into_proto()),
716        }
717    }
718
719    fn from_proto(proto: ProtoSourceExportDataConfig) -> Result<Self, TryFromProtoError> {
720        Ok(SourceExportDataConfig {
721            encoding: proto.encoding.into_rust()?,
722            envelope: proto
723                .envelope
724                .into_rust_if_some("ProtoSourceExportDataConfig::envelope")?,
725        })
726    }
727}
728
729impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
730    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
731        if self == other {
732            return Ok(());
733        }
734        let Self { encoding, envelope } = &self;
735
736        let compatibility_checks = [
737            (
738                match (encoding, &other.encoding) {
739                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
740                    (s, o) => s == o,
741                },
742                "encoding",
743            ),
744            (envelope == &other.envelope, "envelope"),
745        ];
746
747        for (compatible, field) in compatibility_checks {
748            if !compatible {
749                tracing::warn!(
750                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
751                    self,
752                    other
753                );
754
755                return Err(AlterError { id });
756            }
757        }
758        Ok(())
759    }
760}
761
762impl<C: ConnectionAccess> SourceExportDataConfig<C> {
763    /// Returns `true` if this connection yields data that is
764    /// append-only/monotonic. Append-monly means the source
765    /// never produces retractions.
766    // TODO(guswynn): consider enforcing this more completely at the
767    // parsing/typechecking level, by not using an `envelope`
768    // for sources like pg
769    pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
770        match &self.envelope {
771            // Upsert and CdcV2 may produce retractions.
772            SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
773            SourceEnvelope::None(_) => {
774                match connection {
775                    // Postgres can produce retractions (deletes).
776                    GenericSourceConnection::Postgres(_) => false,
777                    // MySQL can produce retractions (deletes).
778                    GenericSourceConnection::MySql(_) => false,
779                    // SQL Server can produce retractions (deletes).
780                    GenericSourceConnection::SqlServer(_) => false,
781                    // Whether or not a Loadgen source can produce retractions varies.
782                    GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
783                    // Kafka exports with `None` envelope are append-only.
784                    GenericSourceConnection::Kafka(_) => true,
785                }
786            }
787        }
788    }
789}
790
791/// An external source of updates for a relational collection.
792#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)]
793pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
794    pub connection: GenericSourceConnection<C>,
795    pub timestamp_interval: Duration,
796    /// The data encoding and format of data to export to the
797    /// primary collection for this source.
798    /// TODO(database-issues#8620): This will be removed once sources no longer export
799    /// to primary collections and only export to explicit SourceExports (tables).
800    pub primary_export: SourceExportDataConfig<C>,
801    pub primary_export_details: SourceExportDetails,
802}
803
804impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
805    for SourceDesc<ReferencedConnection>
806{
807    fn into_inline_connection(self, r: R) -> SourceDesc {
808        let SourceDesc {
809            connection,
810            primary_export,
811            primary_export_details,
812            timestamp_interval,
813        } = self;
814
815        SourceDesc {
816            connection: connection.into_inline_connection(&r),
817            primary_export: primary_export.into_inline_connection(r),
818            primary_export_details,
819            timestamp_interval,
820        }
821    }
822}
823
824impl RustType<ProtoSourceDesc> for SourceDesc {
825    fn into_proto(&self) -> ProtoSourceDesc {
826        ProtoSourceDesc {
827            connection: Some(self.connection.into_proto()),
828            primary_export: Some(self.primary_export.into_proto()),
829            primary_export_details: Some(self.primary_export_details.into_proto()),
830            timestamp_interval: Some(self.timestamp_interval.into_proto()),
831        }
832    }
833
834    fn from_proto(proto: ProtoSourceDesc) -> Result<Self, TryFromProtoError> {
835        Ok(SourceDesc {
836            connection: proto
837                .connection
838                .into_rust_if_some("ProtoSourceDesc::connection")?,
839            primary_export: proto
840                .primary_export
841                .into_rust_if_some("ProtoSourceDesc::primary_export")?,
842            primary_export_details: proto
843                .primary_export_details
844                .into_rust_if_some("ProtoSourceDesc::primary_export_details")?,
845            timestamp_interval: proto
846                .timestamp_interval
847                .into_rust_if_some("ProtoSourceDesc::timestamp_interval")?,
848        })
849    }
850}
851
852impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
853    /// Determines if `self` is compatible with another `SourceDesc`, in such a
854    /// way that it is possible to turn `self` into `other` through a valid
855    /// series of transformations (e.g. no transformation or `ALTER SOURCE`).
856    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
857        if self == other {
858            return Ok(());
859        }
860        let Self {
861            connection,
862            primary_export,
863            primary_export_details,
864            timestamp_interval,
865        } = &self;
866
867        let compatibility_checks = [
868            (
869                connection.alter_compatible(id, &other.connection).is_ok(),
870                "connection",
871            ),
872            (primary_export == &other.primary_export, "primary_export"),
873            (
874                primary_export_details == &other.primary_export_details,
875                "primary_export_details",
876            ),
877            (
878                timestamp_interval == &other.timestamp_interval,
879                "timestamp_interval",
880            ),
881        ];
882
883        for (compatible, field) in compatibility_checks {
884            if !compatible {
885                tracing::warn!(
886                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
887                    self,
888                    other
889                );
890
891                return Err(AlterError { id });
892            }
893        }
894
895        Ok(())
896    }
897}
898
899impl SourceDesc<InlinedConnection> {
900    /// Returns the SourceExport details for the primary export.
901    /// TODO(database-issues#8620): This will be removed once sources no longer export
902    /// to primary collections and only export to explicit SourceExports (tables).
903    pub fn primary_source_export(&self) -> SourceExport<(), InlinedConnection> {
904        SourceExport {
905            storage_metadata: (),
906            details: self.primary_export_details.clone(),
907            data_config: self.primary_export.clone(),
908        }
909    }
910}
911
912#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
913pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
914    Kafka(KafkaSourceConnection<C>),
915    Postgres(PostgresSourceConnection<C>),
916    MySql(MySqlSourceConnection<C>),
917    SqlServer(SqlServerSource<C>),
918    LoadGenerator(LoadGeneratorSourceConnection),
919}
920
921impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
922    fn from(conn: KafkaSourceConnection<C>) -> Self {
923        Self::Kafka(conn)
924    }
925}
926
927impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
928    fn from(conn: PostgresSourceConnection<C>) -> Self {
929        Self::Postgres(conn)
930    }
931}
932
933impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
934    fn from(conn: MySqlSourceConnection<C>) -> Self {
935        Self::MySql(conn)
936    }
937}
938
939impl<C: ConnectionAccess> From<SqlServerSource<C>> for GenericSourceConnection<C> {
940    fn from(conn: SqlServerSource<C>) -> Self {
941        Self::SqlServer(conn)
942    }
943}
944
945impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
946    fn from(conn: LoadGeneratorSourceConnection) -> Self {
947        Self::LoadGenerator(conn)
948    }
949}
950
951impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
952    for GenericSourceConnection<ReferencedConnection>
953{
954    fn into_inline_connection(self, r: R) -> GenericSourceConnection {
955        match self {
956            GenericSourceConnection::Kafka(kafka) => {
957                GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
958            }
959            GenericSourceConnection::Postgres(pg) => {
960                GenericSourceConnection::Postgres(pg.into_inline_connection(r))
961            }
962            GenericSourceConnection::MySql(mysql) => {
963                GenericSourceConnection::MySql(mysql.into_inline_connection(r))
964            }
965            GenericSourceConnection::SqlServer(sql_server) => {
966                GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
967            }
968            GenericSourceConnection::LoadGenerator(lg) => {
969                GenericSourceConnection::LoadGenerator(lg)
970            }
971        }
972    }
973}
974
975impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
976    fn name(&self) -> &'static str {
977        match self {
978            Self::Kafka(conn) => conn.name(),
979            Self::Postgres(conn) => conn.name(),
980            Self::MySql(conn) => conn.name(),
981            Self::SqlServer(conn) => conn.name(),
982            Self::LoadGenerator(conn) => conn.name(),
983        }
984    }
985
986    fn external_reference(&self) -> Option<&str> {
987        match self {
988            Self::Kafka(conn) => conn.external_reference(),
989            Self::Postgres(conn) => conn.external_reference(),
990            Self::MySql(conn) => conn.external_reference(),
991            Self::SqlServer(conn) => conn.external_reference(),
992            Self::LoadGenerator(conn) => conn.external_reference(),
993        }
994    }
995
996    fn default_key_desc(&self) -> RelationDesc {
997        match self {
998            Self::Kafka(conn) => conn.default_key_desc(),
999            Self::Postgres(conn) => conn.default_key_desc(),
1000            Self::MySql(conn) => conn.default_key_desc(),
1001            Self::SqlServer(conn) => conn.default_key_desc(),
1002            Self::LoadGenerator(conn) => conn.default_key_desc(),
1003        }
1004    }
1005
1006    fn default_value_desc(&self) -> RelationDesc {
1007        match self {
1008            Self::Kafka(conn) => conn.default_value_desc(),
1009            Self::Postgres(conn) => conn.default_value_desc(),
1010            Self::MySql(conn) => conn.default_value_desc(),
1011            Self::SqlServer(conn) => conn.default_value_desc(),
1012            Self::LoadGenerator(conn) => conn.default_value_desc(),
1013        }
1014    }
1015
1016    fn timestamp_desc(&self) -> RelationDesc {
1017        match self {
1018            Self::Kafka(conn) => conn.timestamp_desc(),
1019            Self::Postgres(conn) => conn.timestamp_desc(),
1020            Self::MySql(conn) => conn.timestamp_desc(),
1021            Self::SqlServer(conn) => conn.timestamp_desc(),
1022            Self::LoadGenerator(conn) => conn.timestamp_desc(),
1023        }
1024    }
1025
1026    fn connection_id(&self) -> Option<CatalogItemId> {
1027        match self {
1028            Self::Kafka(conn) => conn.connection_id(),
1029            Self::Postgres(conn) => conn.connection_id(),
1030            Self::MySql(conn) => conn.connection_id(),
1031            Self::SqlServer(conn) => conn.connection_id(),
1032            Self::LoadGenerator(conn) => conn.connection_id(),
1033        }
1034    }
1035
1036    fn primary_export_details(&self) -> SourceExportDetails {
1037        match self {
1038            Self::Kafka(conn) => conn.primary_export_details(),
1039            Self::Postgres(conn) => conn.primary_export_details(),
1040            Self::MySql(conn) => conn.primary_export_details(),
1041            Self::SqlServer(conn) => conn.primary_export_details(),
1042            Self::LoadGenerator(conn) => conn.primary_export_details(),
1043        }
1044    }
1045
1046    fn supports_read_only(&self) -> bool {
1047        match self {
1048            GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
1049            GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
1050            GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
1051            GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
1052            GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
1053        }
1054    }
1055
1056    fn prefers_single_replica(&self) -> bool {
1057        match self {
1058            GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
1059            GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
1060            GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
1061            GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
1062            GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
1063        }
1064    }
1065}
1066impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
1067    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1068        if self == other {
1069            return Ok(());
1070        }
1071        let r = match (self, other) {
1072            (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
1073            (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
1074            (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
1075            (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
1076            (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
1077                conn.alter_compatible(id, other)
1078            }
1079            _ => Err(AlterError { id }),
1080        };
1081
1082        if r.is_err() {
1083            tracing::warn!(
1084                "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1085                self,
1086                other
1087            );
1088        }
1089
1090        r
1091    }
1092}
1093
1094impl RustType<ProtoSourceConnection> for GenericSourceConnection<InlinedConnection> {
1095    fn into_proto(&self) -> ProtoSourceConnection {
1096        use proto_source_connection::Kind;
1097        ProtoSourceConnection {
1098            kind: Some(match self {
1099                GenericSourceConnection::Kafka(kafka) => Kind::Kafka(kafka.into_proto()),
1100                GenericSourceConnection::Postgres(postgres) => {
1101                    Kind::Postgres(postgres.into_proto())
1102                }
1103                GenericSourceConnection::MySql(mysql) => Kind::Mysql(mysql.into_proto()),
1104                GenericSourceConnection::SqlServer(sql_server) => {
1105                    Kind::SqlServer(sql_server.into_proto())
1106                }
1107                GenericSourceConnection::LoadGenerator(loadgen) => {
1108                    Kind::Loadgen(loadgen.into_proto())
1109                }
1110            }),
1111        }
1112    }
1113
1114    fn from_proto(proto: ProtoSourceConnection) -> Result<Self, TryFromProtoError> {
1115        use proto_source_connection::Kind;
1116        let kind = proto
1117            .kind
1118            .ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceConnection::kind"))?;
1119        Ok(match kind {
1120            Kind::Kafka(kafka) => GenericSourceConnection::Kafka(kafka.into_rust()?),
1121            Kind::Postgres(postgres) => GenericSourceConnection::Postgres(postgres.into_rust()?),
1122            Kind::Mysql(mysql) => GenericSourceConnection::MySql(mysql.into_rust()?),
1123            Kind::SqlServer(sql_server) => {
1124                GenericSourceConnection::SqlServer(sql_server.into_rust()?)
1125            }
1126            Kind::Loadgen(loadgen) => GenericSourceConnection::LoadGenerator(loadgen.into_rust()?),
1127        })
1128    }
1129}
1130
1131/// Details necessary for each source export to allow the source implementations
1132/// to export data to the export's collection.
1133#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1134pub enum SourceExportDetails {
1135    /// Used when the primary collection of a source isn't an export to
1136    /// output to.
1137    None,
1138    Kafka(KafkaSourceExportDetails),
1139    Postgres(PostgresSourceExportDetails),
1140    MySql(MySqlSourceExportDetails),
1141    SqlServer(SqlServerSourceExportDetails),
1142    LoadGenerator(LoadGeneratorSourceExportDetails),
1143}
1144
1145impl crate::AlterCompatible for SourceExportDetails {
1146    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
1147        if self == other {
1148            return Ok(());
1149        }
1150        let r = match (self, other) {
1151            (Self::None, Self::None) => Ok(()),
1152            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
1153            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
1154            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
1155            (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
1156            _ => Err(AlterError { id }),
1157        };
1158
1159        if r.is_err() {
1160            tracing::warn!(
1161                "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
1162                self,
1163                other
1164            );
1165        }
1166
1167        r
1168    }
1169}
1170
1171impl RustType<ProtoSourceExportDetails> for SourceExportDetails {
1172    fn into_proto(&self) -> ProtoSourceExportDetails {
1173        use proto_source_export_details::Kind;
1174        ProtoSourceExportDetails {
1175            kind: match self {
1176                SourceExportDetails::None => None,
1177                SourceExportDetails::Kafka(details) => Some(Kind::Kafka(details.into_proto())),
1178                SourceExportDetails::Postgres(details) => {
1179                    Some(Kind::Postgres(details.into_proto()))
1180                }
1181                SourceExportDetails::MySql(details) => Some(Kind::Mysql(details.into_proto())),
1182                SourceExportDetails::SqlServer(details) => {
1183                    Some(Kind::SqlServer(details.into_proto()))
1184                }
1185                SourceExportDetails::LoadGenerator(details) => {
1186                    Some(Kind::Loadgen(details.into_proto()))
1187                }
1188            },
1189        }
1190    }
1191
1192    fn from_proto(proto: ProtoSourceExportDetails) -> Result<Self, TryFromProtoError> {
1193        use proto_source_export_details::Kind;
1194        Ok(match proto.kind {
1195            None => SourceExportDetails::None,
1196            Some(Kind::Kafka(details)) => SourceExportDetails::Kafka(details.into_rust()?),
1197            Some(Kind::Postgres(details)) => SourceExportDetails::Postgres(details.into_rust()?),
1198            Some(Kind::Mysql(details)) => SourceExportDetails::MySql(details.into_rust()?),
1199            Some(Kind::SqlServer(details)) => SourceExportDetails::SqlServer(details.into_rust()?),
1200            Some(Kind::Loadgen(details)) => {
1201                SourceExportDetails::LoadGenerator(details.into_rust()?)
1202            }
1203        })
1204    }
1205}
1206
1207/// Details necessary to store in the `Details` option of a source export
1208/// statement (`CREATE SUBSOURCE` and `CREATE TABLE .. FROM SOURCE` statements),
1209/// to generate the appropriate `SourceExportDetails` struct during planning.
1210/// NOTE that this is serialized as proto to the catalog, so any changes here
1211/// must be backwards compatible or will require a migration.
1212pub enum SourceExportStatementDetails {
1213    Postgres {
1214        table: mz_postgres_util::desc::PostgresTableDesc,
1215    },
1216    MySql {
1217        table: mz_mysql_util::MySqlTableDesc,
1218        initial_gtid_set: String,
1219    },
1220    SqlServer {
1221        table: mz_sql_server_util::desc::SqlServerTableDesc,
1222        capture_instance: Arc<str>,
1223    },
1224    LoadGenerator {
1225        output: LoadGeneratorOutput,
1226    },
1227    Kafka {},
1228}
1229
1230impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
1231    fn into_proto(&self) -> ProtoSourceExportStatementDetails {
1232        match self {
1233            SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
1234                kind: Some(proto_source_export_statement_details::Kind::Postgres(
1235                    postgres::ProtoPostgresSourceExportStatementDetails {
1236                        table: Some(table.into_proto()),
1237                    },
1238                )),
1239            },
1240            SourceExportStatementDetails::MySql {
1241                table,
1242                initial_gtid_set,
1243            } => ProtoSourceExportStatementDetails {
1244                kind: Some(proto_source_export_statement_details::Kind::Mysql(
1245                    mysql::ProtoMySqlSourceExportStatementDetails {
1246                        table: Some(table.into_proto()),
1247                        initial_gtid_set: initial_gtid_set.clone(),
1248                    },
1249                )),
1250            },
1251            SourceExportStatementDetails::SqlServer {
1252                table,
1253                capture_instance,
1254            } => ProtoSourceExportStatementDetails {
1255                kind: Some(proto_source_export_statement_details::Kind::SqlServer(
1256                    sql_server::ProtoSqlServerSourceExportStatementDetails {
1257                        table: Some(table.into_proto()),
1258                        capture_instance: capture_instance.to_string(),
1259                    },
1260                )),
1261            },
1262            SourceExportStatementDetails::LoadGenerator { output } => {
1263                ProtoSourceExportStatementDetails {
1264                    kind: Some(proto_source_export_statement_details::Kind::Loadgen(
1265                        load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
1266                            output: output.into_proto().into(),
1267                        },
1268                    )),
1269                }
1270            }
1271            SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
1272                kind: Some(proto_source_export_statement_details::Kind::Kafka(
1273                    kafka::ProtoKafkaSourceExportStatementDetails {},
1274                )),
1275            },
1276        }
1277    }
1278
1279    fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
1280        use proto_source_export_statement_details::Kind;
1281        Ok(match proto.kind {
1282            Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
1283                table: details
1284                    .table
1285                    .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
1286            },
1287            Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
1288                table: details
1289                    .table
1290                    .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
1291
1292                initial_gtid_set: details.initial_gtid_set,
1293            },
1294            Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
1295                table: details
1296                    .table
1297                    .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
1298                capture_instance: details.capture_instance.into(),
1299            },
1300            Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
1301                output: details
1302                    .output
1303                    .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1304            },
1305            Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1306            None => {
1307                return Err(TryFromProtoError::missing_field(
1308                    "ProtoSourceExportStatementDetails::kind",
1309                ));
1310            }
1311        })
1312    }
1313}
1314
1315#[derive(Arbitrary, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1316#[repr(transparent)]
1317pub struct SourceData(pub Result<Row, DataflowError>);
1318
1319impl Default for SourceData {
1320    fn default() -> Self {
1321        SourceData(Ok(Row::default()))
1322    }
1323}
1324
1325impl Deref for SourceData {
1326    type Target = Result<Row, DataflowError>;
1327
1328    fn deref(&self) -> &Self::Target {
1329        &self.0
1330    }
1331}
1332
1333impl DerefMut for SourceData {
1334    fn deref_mut(&mut self) -> &mut Self::Target {
1335        &mut self.0
1336    }
1337}
1338
1339impl RustType<ProtoSourceData> for SourceData {
1340    fn into_proto(&self) -> ProtoSourceData {
1341        use proto_source_data::Kind;
1342        ProtoSourceData {
1343            kind: Some(match &**self {
1344                Ok(row) => Kind::Ok(row.into_proto()),
1345                Err(err) => Kind::Err(err.into_proto()),
1346            }),
1347        }
1348    }
1349
1350    fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1351        use proto_source_data::Kind;
1352        match proto.kind {
1353            Some(kind) => match kind {
1354                Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1355                Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1356            },
1357            None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1358        }
1359    }
1360}
1361
1362impl Codec for SourceData {
1363    type Storage = ProtoRow;
1364    type Schema = RelationDesc;
1365
1366    fn codec_name() -> String {
1367        "protobuf[SourceData]".into()
1368    }
1369
1370    fn encode<B: BufMut>(&self, buf: &mut B) {
1371        self.into_proto()
1372            .encode(buf)
1373            .expect("no required fields means no initialization errors");
1374    }
1375
1376    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1377        let mut val = SourceData::default();
1378        <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1379        Ok(val)
1380    }
1381
1382    fn decode_from<'a>(
1383        &mut self,
1384        buf: &'a [u8],
1385        storage: &mut Option<ProtoRow>,
1386        schema: &RelationDesc,
1387    ) -> Result<(), String> {
1388        // Optimize for common case of `Ok` by leaving a (cleared) `ProtoRow` in
1389        // the `Ok` variant of `ProtoSourceData`. prost's `Message::merge` impl
1390        // is smart about reusing the `Vec<Datum>` when it can.
1391        let mut proto = storage.take().unwrap_or_default();
1392        proto.clear();
1393        let mut proto = ProtoSourceData {
1394            kind: Some(proto_source_data::Kind::Ok(proto)),
1395        };
1396        proto.merge(buf).map_err(|err| err.to_string())?;
1397        match (proto.kind, &mut self.0) {
1398            // Again, optimize for the common case...
1399            (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1400                let ret = row.decode_from_proto(&proto, schema);
1401                storage.replace(proto);
1402                ret
1403            }
1404            // ...otherwise fall back to the obvious thing.
1405            (kind, _) => {
1406                let proto = ProtoSourceData { kind };
1407                *self = proto.into_rust().map_err(|err| err.to_string())?;
1408                // Nothing to put back in storage.
1409                Ok(())
1410            }
1411        }
1412    }
1413
1414    fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1415        match &val.0 {
1416            Ok(row) => Row::validate(row, desc),
1417            Err(_) => Ok(()),
1418        }
1419    }
1420
1421    fn encode_schema(schema: &Self::Schema) -> Bytes {
1422        schema.into_proto().encode_to_vec().into()
1423    }
1424
1425    fn decode_schema(buf: &Bytes) -> Self::Schema {
1426        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1427        proto.into_rust().expect("valid schema")
1428    }
1429}
1430
1431/// Given a [`RelationDesc`] returns an arbitrary [`SourceData`].
1432pub fn arb_source_data_for_relation_desc(
1433    desc: &RelationDesc,
1434) -> impl Strategy<Value = SourceData> + use<> {
1435    let row_strat = arb_row_for_relation(desc).no_shrink();
1436
1437    proptest::strategy::Union::new_weighted(vec![
1438        (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1439        (
1440            1,
1441            any::<DataflowError>()
1442                .prop_map(|err| SourceData(Err(err)))
1443                .no_shrink()
1444                .boxed(),
1445        ),
1446    ])
1447}
1448
1449/// Describes how external references should be organized in a multi-level
1450/// hierarchy.
1451///
1452/// For both PostgreSQL and MySQL sources, these levels of reference are
1453/// intrinsic to the items which we're referencing. If there are other naming
1454/// schemas for other types of sources we discover, we might need to revisit
1455/// this.
1456pub trait ExternalCatalogReference {
1457    /// The "second" level of namespacing for the reference.
1458    fn schema_name(&self) -> &str;
1459    /// The lowest level of namespacing for the reference.
1460    fn item_name(&self) -> &str;
1461}
1462
1463impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1464    fn schema_name(&self) -> &str {
1465        &self.schema_name
1466    }
1467
1468    fn item_name(&self) -> &str {
1469        &self.name
1470    }
1471}
1472
1473impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1474    fn schema_name(&self) -> &str {
1475        &self.namespace
1476    }
1477
1478    fn item_name(&self) -> &str {
1479        &self.name
1480    }
1481}
1482
1483impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1484    fn schema_name(&self) -> &str {
1485        &*self.schema_name
1486    }
1487
1488    fn item_name(&self) -> &str {
1489        &*self.name
1490    }
1491}
1492
1493// This implementation provides a means of converting arbitrary objects into a
1494// `SubsourceCatalogReference`, e.g. load generator view names.
1495impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1496    fn schema_name(&self) -> &str {
1497        self.0
1498    }
1499
1500    fn item_name(&self) -> &str {
1501        self.1
1502    }
1503}
1504
1505/// Stores and resolves references to a `&[T: ExternalCatalogReference]`.
1506///
1507/// This is meant to provide an API to quickly look up a source's subsources.
1508///
1509/// For sources that do not provide any subsources, use the `Default`
1510/// implementation, which is empty and will not be able to resolve any
1511/// references.
1512#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1513pub struct SourceReferenceResolver {
1514    inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1515}
1516
1517#[derive(Debug, Clone, thiserror::Error)]
1518pub enum ExternalReferenceResolutionError {
1519    #[error("reference to {name} not found in source")]
1520    DoesNotExist { name: String },
1521    #[error(
1522        "reference {name} is ambiguous, consider specifying an additional \
1523    layer of qualification"
1524    )]
1525    Ambiguous { name: String },
1526    #[error("invalid identifier: {0}")]
1527    Ident(#[from] IdentError),
1528}
1529
1530impl<'a> SourceReferenceResolver {
1531    /// Constructs a new `SourceReferenceResolver` from a slice of `T:
1532    /// SubsourceCatalogReference`.
1533    ///
1534    /// # Errors
1535    /// - If any `&str` provided cannot be taken to an [`Ident`].
1536    pub fn new<T: ExternalCatalogReference>(
1537        database: &str,
1538        referenceable_items: &'a [T],
1539    ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1540        // An index from table name -> schema name -> database name -> index in
1541        // `referenceable_items`.
1542        let mut inner = BTreeMap::new();
1543
1544        let database = Ident::new(database)?;
1545
1546        for (reference_idx, item) in referenceable_items.iter().enumerate() {
1547            let item_name = Ident::new(item.item_name())?;
1548            let schema_name = Ident::new(item.schema_name())?;
1549
1550            inner
1551                .entry(item_name)
1552                .or_insert_with(BTreeMap::new)
1553                .entry(schema_name)
1554                .or_insert_with(BTreeMap::new)
1555                .entry(database.clone())
1556                .or_insert(reference_idx);
1557        }
1558
1559        Ok(SourceReferenceResolver { inner })
1560    }
1561
1562    /// Returns the canonical reference and index from which it originated in
1563    /// the `referenceable_items` provided to [`Self::new`].
1564    ///
1565    /// # Args
1566    /// - `name` is `&[Ident]` to let users provide the inner element of
1567    ///   [`UnresolvedItemName`].
1568    /// - `canonicalize_to_width` limits the number of elements in the returned
1569    ///   [`UnresolvedItemName`];this is useful if the source type requires
1570    ///   contriving database and schema names that a subsource should not
1571    ///   persist as its reference.
1572    ///
1573    /// # Errors
1574    /// - If `name` does not resolve to an item in `self.inner`.
1575    ///
1576    /// # Panics
1577    /// - If `canonicalize_to_width`` is not in `1..=3`.
1578    pub fn resolve(
1579        &self,
1580        name: &[Ident],
1581        canonicalize_to_width: usize,
1582    ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1583        let (db, schema, idx) = self.resolve_inner(name)?;
1584
1585        let item = name.last().expect("must have provided at least 1 element");
1586
1587        let canonical_name = match canonicalize_to_width {
1588            1 => vec![item.clone()],
1589            2 => vec![schema.clone(), item.clone()],
1590            3 => vec![db.clone(), schema.clone(), item.clone()],
1591            o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1592        };
1593
1594        Ok((UnresolvedItemName(canonical_name), idx))
1595    }
1596
1597    /// Returns the index from which it originated in the `referenceable_items`
1598    /// provided to [`Self::new`].
1599    ///
1600    /// # Args
1601    /// `name` is `&[Ident]` to let users provide the inner element of
1602    /// [`UnresolvedItemName`].
1603    ///
1604    /// # Errors
1605    /// - If `name` does not resolve to an item in `self.inner`.
1606    pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1607        let (_db, _schema, idx) = self.resolve_inner(name)?;
1608        Ok(idx)
1609    }
1610
1611    /// Returns the index from which it originated in the `referenceable_items`
1612    /// provided to [`Self::new`].
1613    ///
1614    /// # Args
1615    /// `name` is `&[Ident]` to let users provide the inner element of
1616    /// [`UnresolvedItemName`].
1617    ///
1618    /// # Return
1619    /// Returns a tuple whose elements are:
1620    /// 1. The "database"- or top-level namespace of the reference.
1621    /// 2. The "schema"- or second-level namespace of the reference.
1622    /// 3. The index to find the item in `referenceable_items` argument provided
1623    ///    to `SourceReferenceResolver::new`.
1624    ///
1625    /// # Errors
1626    /// - If `name` does not resolve to an item in `self.inner`.
1627    fn resolve_inner<'name: 'a>(
1628        &'a self,
1629        name: &'name [Ident],
1630    ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1631        let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1632
1633        // Names must be composed of 1..=3 elements.
1634        if !(1..=3).contains(&name.len()) {
1635            Err(ExternalReferenceResolutionError::DoesNotExist {
1636                name: get_provided_name(),
1637            })?;
1638        }
1639
1640        // Fill on the leading elements with `None` if they aren't present.
1641        let mut names = std::iter::repeat(None)
1642            .take(3 - name.len())
1643            .chain(name.iter().map(Some));
1644
1645        let database = names.next().flatten();
1646        let schema = names.next().flatten();
1647        let item = names
1648            .next()
1649            .flatten()
1650            .expect("must have provided the item name");
1651
1652        assert_none!(names.next(), "expected a 3-element iterator");
1653
1654        let schemas =
1655            self.inner
1656                .get(item)
1657                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1658                    name: get_provided_name(),
1659                })?;
1660
1661        let schema = match schema {
1662            Some(schema) => schema,
1663            None => schemas.keys().exactly_one().map_err(|_e| {
1664                ExternalReferenceResolutionError::Ambiguous {
1665                    name: get_provided_name(),
1666                }
1667            })?,
1668        };
1669
1670        let databases =
1671            schemas
1672                .get(schema)
1673                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1674                    name: get_provided_name(),
1675                })?;
1676
1677        let database = match database {
1678            Some(database) => database,
1679            None => databases.keys().exactly_one().map_err(|_e| {
1680                ExternalReferenceResolutionError::Ambiguous {
1681                    name: get_provided_name(),
1682                }
1683            })?,
1684        };
1685
1686        let reference_idx = databases.get(database).ok_or_else(|| {
1687            ExternalReferenceResolutionError::DoesNotExist {
1688                name: get_provided_name(),
1689            }
1690        })?;
1691
1692        Ok((database, schema, *reference_idx))
1693    }
1694}
1695
1696/// A decoder for [`Row`]s within [`SourceData`].
1697///
1698/// This type exists as a wrapper around [`RowColumnarDecoder`] to handle the
1699/// case where the [`RelationDesc`] we're encoding with has no columns. See
1700/// [`SourceDataRowColumnarEncoder`] for more details.
1701#[derive(Debug)]
1702pub enum SourceDataRowColumnarDecoder {
1703    Row(RowColumnarDecoder),
1704    EmptyRow,
1705}
1706
1707impl SourceDataRowColumnarDecoder {
1708    pub fn decode(&self, idx: usize, row: &mut Row) {
1709        match self {
1710            SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1711            SourceDataRowColumnarDecoder::EmptyRow => {
1712                // Create a packer just to clear the Row.
1713                row.packer();
1714            }
1715        }
1716    }
1717
1718    pub fn goodbytes(&self) -> usize {
1719        match self {
1720            SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1721            SourceDataRowColumnarDecoder::EmptyRow => 0,
1722        }
1723    }
1724}
1725
1726#[derive(Debug)]
1727pub struct SourceDataColumnarDecoder {
1728    row_decoder: SourceDataRowColumnarDecoder,
1729    err_decoder: BinaryArray,
1730}
1731
1732impl SourceDataColumnarDecoder {
1733    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1734        // TODO(parkmcar): We should validate the fields here.
1735        let (_fields, arrays, nullability) = col.into_parts();
1736
1737        if nullability.is_some() {
1738            anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1739        }
1740        if arrays.len() != 2 {
1741            anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1742        }
1743
1744        let errs = arrays[1]
1745            .as_any()
1746            .downcast_ref::<BinaryArray>()
1747            .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1748
1749        let row_decoder = match arrays[0].data_type() {
1750            arrow::datatypes::DataType::Struct(_) => {
1751                let rows = arrays[0]
1752                    .as_any()
1753                    .downcast_ref::<StructArray>()
1754                    .ok_or_else(|| {
1755                        anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1756                    })?;
1757                let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1758                SourceDataRowColumnarDecoder::Row(decoder)
1759            }
1760            arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1761            other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1762        };
1763
1764        Ok(SourceDataColumnarDecoder {
1765            row_decoder,
1766            err_decoder: errs.clone(),
1767        })
1768    }
1769}
1770
1771impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1772    fn decode(&self, idx: usize, val: &mut SourceData) {
1773        let err_null = self.err_decoder.is_null(idx);
1774        let row_null = match &self.row_decoder {
1775            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1776            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1777        };
1778
1779        match (row_null, err_null) {
1780            (true, false) => {
1781                let err = self.err_decoder.value(idx);
1782                let err = ProtoDataflowError::decode(err)
1783                    .expect("proto should be valid")
1784                    .into_rust()
1785                    .expect("error should be valid");
1786                val.0 = Err(err);
1787            }
1788            (false, true) => {
1789                let row = match val.0.as_mut() {
1790                    Ok(row) => row,
1791                    Err(_) => {
1792                        val.0 = Ok(Row::default());
1793                        val.0.as_mut().unwrap()
1794                    }
1795                };
1796                self.row_decoder.decode(idx, row);
1797            }
1798            (true, true) => panic!("should have one of 'ok' or 'err'"),
1799            (false, false) => panic!("cannot have both 'ok' and 'err'"),
1800        }
1801    }
1802
1803    fn is_null(&self, idx: usize) -> bool {
1804        let err_null = self.err_decoder.is_null(idx);
1805        let row_null = match &self.row_decoder {
1806            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1807            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1808        };
1809        assert!(!err_null || !row_null, "SourceData should never be null!");
1810
1811        false
1812    }
1813
1814    fn goodbytes(&self) -> usize {
1815        self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1816    }
1817
1818    fn stats(&self) -> StructStats {
1819        let len = self.err_decoder.len();
1820        let err_stats = ColumnarStats {
1821            nulls: Some(ColumnNullStats {
1822                count: self.err_decoder.null_count(),
1823            }),
1824            values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1825        };
1826        // The top level struct is non-nullable and every entry is either an
1827        // `Ok(Row)` or an `Err(String)`. As a result, we can compute the number
1828        // of `Ok` entries by subtracting the number of `Err` entries from the
1829        // total count.
1830        let row_null_count = len - self.err_decoder.null_count();
1831        let row_stats = match &self.row_decoder {
1832            SourceDataRowColumnarDecoder::Row(encoder) => {
1833                // Sanity check that the number of row nulls/nones we calculated
1834                // using the error column matches what the row column thinks it
1835                // has.
1836                assert_eq!(encoder.null_count(), row_null_count);
1837                encoder.stats()
1838            }
1839            SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1840                len,
1841                cols: BTreeMap::default(),
1842            },
1843        };
1844        let row_stats = ColumnarStats {
1845            nulls: Some(ColumnNullStats {
1846                count: row_null_count,
1847            }),
1848            values: ColumnStatKinds::Struct(row_stats),
1849        };
1850
1851        let stats = [
1852            (
1853                SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1854                row_stats,
1855            ),
1856            (
1857                SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1858                err_stats,
1859            ),
1860        ];
1861        StructStats {
1862            len,
1863            cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1864        }
1865    }
1866}
1867
1868/// An encoder for [`Row`]s within [`SourceData`].
1869///
1870/// This type exists as a wrapper around [`RowColumnarEncoder`] to support
1871/// encoding empty [`Row`]s. A [`RowColumnarEncoder`] finishes as a
1872/// [`StructArray`] which is required to have at least one column, and thus
1873/// cannot support empty [`Row`]s.
1874#[derive(Debug)]
1875pub enum SourceDataRowColumnarEncoder {
1876    Row(RowColumnarEncoder),
1877    EmptyRow,
1878}
1879
1880impl SourceDataRowColumnarEncoder {
1881    pub(crate) fn goodbytes(&self) -> usize {
1882        match self {
1883            SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1884            SourceDataRowColumnarEncoder::EmptyRow => 0,
1885        }
1886    }
1887
1888    pub fn append(&mut self, row: &Row) {
1889        match self {
1890            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1891            SourceDataRowColumnarEncoder::EmptyRow => {
1892                assert_eq!(row.iter().count(), 0)
1893            }
1894        }
1895    }
1896
1897    pub fn append_null(&mut self) {
1898        match self {
1899            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1900            SourceDataRowColumnarEncoder::EmptyRow => (),
1901        }
1902    }
1903}
1904
1905#[derive(Debug)]
1906pub struct SourceDataColumnarEncoder {
1907    row_encoder: SourceDataRowColumnarEncoder,
1908    err_encoder: BinaryBuilder,
1909}
1910
1911impl SourceDataColumnarEncoder {
1912    const OK_COLUMN_NAME: &'static str = "ok";
1913    const ERR_COLUMN_NAME: &'static str = "err";
1914
1915    pub fn new(desc: &RelationDesc) -> Self {
1916        let row_encoder = match RowColumnarEncoder::new(desc) {
1917            Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1918            None => {
1919                assert!(desc.typ().columns().is_empty());
1920                SourceDataRowColumnarEncoder::EmptyRow
1921            }
1922        };
1923        let err_encoder = BinaryBuilder::new();
1924
1925        SourceDataColumnarEncoder {
1926            row_encoder,
1927            err_encoder,
1928        }
1929    }
1930}
1931
1932impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1933    type FinishedColumn = StructArray;
1934
1935    fn goodbytes(&self) -> usize {
1936        self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1937    }
1938
1939    #[inline]
1940    fn append(&mut self, val: &SourceData) {
1941        match val.0.as_ref() {
1942            Ok(row) => {
1943                self.row_encoder.append(row);
1944                self.err_encoder.append_null();
1945            }
1946            Err(err) => {
1947                self.row_encoder.append_null();
1948                self.err_encoder
1949                    .append_value(err.into_proto().encode_to_vec());
1950            }
1951        }
1952    }
1953
1954    #[inline]
1955    fn append_null(&mut self) {
1956        panic!("appending a null into SourceDataColumnarEncoder is not supported");
1957    }
1958
1959    fn finish(self) -> Self::FinishedColumn {
1960        let SourceDataColumnarEncoder {
1961            row_encoder,
1962            mut err_encoder,
1963        } = self;
1964
1965        let err_column = BinaryBuilder::finish(&mut err_encoder);
1966        let row_column: ArrayRef = match row_encoder {
1967            SourceDataRowColumnarEncoder::Row(encoder) => {
1968                let column = encoder.finish();
1969                Arc::new(column)
1970            }
1971            SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1972        };
1973
1974        assert_eq!(row_column.len(), err_column.len());
1975
1976        let fields = vec![
1977            Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1978            Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1979        ];
1980        let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1981        StructArray::new(Fields::from(fields), arrays, None)
1982    }
1983}
1984
1985impl Schema<SourceData> for RelationDesc {
1986    type ArrowColumn = StructArray;
1987    type Statistics = StructStats;
1988
1989    type Decoder = SourceDataColumnarDecoder;
1990    type Encoder = SourceDataColumnarEncoder;
1991
1992    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1993        SourceDataColumnarDecoder::new(col, self)
1994    }
1995
1996    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1997        Ok(SourceDataColumnarEncoder::new(self))
1998    }
1999}
2000
2001#[cfg(test)]
2002mod tests {
2003    use arrow::array::{ArrayData, make_comparator};
2004    use base64::Engine;
2005    use bytes::Bytes;
2006    use mz_expr::EvalError;
2007    use mz_ore::assert_err;
2008    use mz_ore::metrics::MetricsRegistry;
2009    use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
2010    use mz_persist::metrics::ColumnarMetrics;
2011    use mz_persist_types::parquet::EncodingConfig;
2012    use mz_persist_types::schema::{Migration, backward_compatible};
2013    use mz_persist_types::stats::{PartStats, PartStatsMetrics};
2014    use mz_repr::{
2015        ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
2016        RowArena, ScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
2017    };
2018    use proptest::prelude::*;
2019    use proptest::strategy::{Union, ValueTree};
2020
2021    use crate::stats::RelationPartStats;
2022
2023    use super::*;
2024
2025    #[mz_ore::test]
2026    fn test_timeline_parsing() {
2027        assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
2028        assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
2029        assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
2030
2031        assert_err!("Materialize".parse::<Timeline>());
2032        assert_err!("Ejoe".parse::<Timeline>());
2033        assert_err!("Umike".parse::<Timeline>());
2034        assert_err!("Dance".parse::<Timeline>());
2035        assert_err!("".parse::<Timeline>());
2036    }
2037
2038    #[track_caller]
2039    fn roundtrip_source_data(
2040        desc: &RelationDesc,
2041        datas: Vec<SourceData>,
2042        read_desc: &RelationDesc,
2043        config: &EncodingConfig,
2044    ) {
2045        let metrics = ColumnarMetrics::disconnected();
2046        let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
2047        for data in &datas {
2048            encoder.append(data);
2049        }
2050        let col = encoder.finish();
2051
2052        // The top-level StructArray for SourceData should always be non-nullable.
2053        assert!(!col.is_nullable());
2054
2055        // Reallocate our arrays with lgalloc.
2056        let col = realloc_array(&col, &metrics);
2057
2058        // Roundtrip through ProtoArray format.
2059        {
2060            let proto = col.to_data().into_proto();
2061            let bytes = proto.encode_to_vec();
2062            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
2063            let array_data: ArrayData = proto.into_rust().unwrap();
2064
2065            let col_rnd = StructArray::from(array_data.clone());
2066            assert_eq!(col, col_rnd);
2067
2068            let col_dyn = arrow::array::make_array(array_data);
2069            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
2070            assert_eq!(&col, col_dyn);
2071        }
2072
2073        // Encode to Parquet.
2074        let mut buf = Vec::new();
2075        let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
2076        let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
2077        mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
2078
2079        // Decode from Parquet.
2080        let buf = Bytes::from(buf);
2081        let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
2082        let maybe_batch = reader.next();
2083
2084        // If we didn't encode any data then our record_batch will be empty.
2085        let Some(record_batch) = maybe_batch else {
2086            assert!(datas.is_empty());
2087            return;
2088        };
2089        let record_batch = record_batch.unwrap();
2090
2091        assert_eq!(record_batch.columns().len(), 1);
2092        let rnd_col = &record_batch.columns()[0];
2093        let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
2094        let rnd_col = rnd_col
2095            .as_any()
2096            .downcast_ref::<StructArray>()
2097            .unwrap()
2098            .clone();
2099
2100        // Try generating stats for the data, just to make sure we don't panic.
2101        let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
2102            .expect("valid decoder")
2103            .stats();
2104
2105        // Read back all of our data and assert it roundtrips.
2106        let mut rnd_data = SourceData(Ok(Row::default()));
2107        let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
2108        for (idx, og_data) in datas.iter().enumerate() {
2109            decoder.decode(idx, &mut rnd_data);
2110            assert_eq!(og_data, &rnd_data);
2111        }
2112
2113        // Read back all of our data a second time with a projection applied, and make sure the
2114        // stats are valid.
2115        let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
2116        let stats = RelationPartStats {
2117            name: "test",
2118            metrics: &stats_metrics,
2119            stats: &PartStats { key: stats },
2120            desc: read_desc,
2121        };
2122        let mut datum_vec = DatumVec::new();
2123        let arena = RowArena::default();
2124        let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
2125
2126        for (idx, og_data) in datas.iter().enumerate() {
2127            decoder.decode(idx, &mut rnd_data);
2128            match (&og_data.0, &rnd_data.0) {
2129                (Ok(og_row), Ok(rnd_row)) => {
2130                    // Filter down to just the Datums in the projection schema.
2131                    {
2132                        let datums = datum_vec.borrow_with(og_row);
2133                        let projected_datums =
2134                            datums.iter().enumerate().filter_map(|(idx, datum)| {
2135                                read_desc
2136                                    .contains_index(&ColumnIndex::from_raw(idx))
2137                                    .then_some(datum)
2138                            });
2139                        let og_projected_row = Row::pack(projected_datums);
2140                        assert_eq!(&og_projected_row, rnd_row);
2141                    }
2142
2143                    // Validate the stats for all of our projected columns.
2144                    {
2145                        let proj_datums = datum_vec.borrow_with(rnd_row);
2146                        for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
2147                            let spec = stats.col_stats(idx, &arena);
2148                            assert!(spec.may_contain(proj_datums[pos]));
2149                        }
2150                    }
2151                }
2152                (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
2153                (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
2154            }
2155        }
2156
2157        // Verify that the RelationDesc itself roundtrips through
2158        // {encode,decode}_schema.
2159        let encoded_schema = SourceData::encode_schema(desc);
2160        let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
2161        assert_eq!(desc, &roundtrip_desc);
2162
2163        // Verify that the RelationDesc is backward compatible with itself (this
2164        // mostly checks for `unimplemented!` type panics).
2165        let migration =
2166            mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
2167        let migration = migration.expect("should be backward compatible with self");
2168        // Also verify that the Fn doesn't do anything wonky.
2169        let migrated = migration.migrate(Arc::new(col.clone()));
2170        assert_eq!(col.data_type(), migrated.data_type());
2171    }
2172
2173    #[mz_ore::test]
2174    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2175    fn all_source_data_roundtrips() {
2176        let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
2177        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
2178            weights.extend([
2179                (10, Just(32..128)),
2180                (5, Just(128..512)),
2181                (3, Just(512..2048)),
2182                (1, Just(2048..8192)),
2183            ]);
2184        }
2185        let num_rows = Union::new_weighted(weights);
2186
2187        // TODO(parkmycar): There are so many clones going on here, and maybe we can avoid them?
2188        let strat = (any::<RelationDesc>(), num_rows)
2189            .prop_flat_map(|(desc, num_rows)| {
2190                arb_relation_desc_projection(desc.clone())
2191                    .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
2192            })
2193            .prop_flat_map(|(desc, read_desc, num_rows)| {
2194                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2195                    .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
2196            });
2197
2198        proptest!(|((config, (desc, source_datas, read_desc)) in (any::<EncodingConfig>(), strat))| {
2199            roundtrip_source_data(&desc, source_datas, &read_desc, &config);
2200        });
2201    }
2202
2203    #[mz_ore::test]
2204    fn roundtrip_error_nulls() {
2205        let desc = RelationDescBuilder::default()
2206            .with_column(
2207                "ts",
2208                ScalarType::TimestampTz { precision: None }.nullable(false),
2209            )
2210            .finish();
2211        let source_datas = vec![SourceData(Err(DataflowError::EvalError(
2212            EvalError::DateOutOfRange.into(),
2213        )))];
2214        let config = EncodingConfig::default();
2215        roundtrip_source_data(&desc, source_datas, &desc, &config);
2216    }
2217
2218    fn is_sorted(array: &dyn Array) -> bool {
2219        let sort_options = arrow::compute::SortOptions::default();
2220        let Ok(cmp) = make_comparator(array, array, sort_options) else {
2221            // TODO: arrow v51.0.0 doesn't support comparing structs. When
2222            // we migrate to v52+, the `build_compare` function is
2223            // deprecated and replaced by `make_comparator`, which does
2224            // support structs. At which point, this will work (and we
2225            // should switch this early return to an expect, if possible).
2226            return false;
2227        };
2228        (0..array.len())
2229            .tuple_windows()
2230            .all(|(i, j)| cmp(i, j).is_le())
2231    }
2232
2233    fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
2234        use mz_persist_types::columnar::ColumnEncoder;
2235        let array = Schema::encoder(schema).expect("valid schema").finish();
2236        Array::data_type(&array).clone()
2237    }
2238
2239    #[track_caller]
2240    fn backward_compatible_testcase(
2241        old: &RelationDesc,
2242        new: &RelationDesc,
2243        migration: Migration,
2244        datas: &[SourceData],
2245    ) {
2246        let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
2247        for data in datas {
2248            encoder.append(data);
2249        }
2250        let old = encoder.finish();
2251        let new = Schema::<SourceData>::encoder(new)
2252            .expect("valid schema")
2253            .finish();
2254        let old: Arc<dyn Array> = Arc::new(old);
2255        let new: Arc<dyn Array> = Arc::new(new);
2256        let migrated = migration.migrate(Arc::clone(&old));
2257        assert_eq!(migrated.data_type(), new.data_type());
2258
2259        // Check the sortedness preservation, if we can.
2260        if migration.preserves_order() && is_sorted(&old) {
2261            assert!(is_sorted(&new))
2262        }
2263    }
2264
2265    #[mz_ore::test]
2266    fn backward_compatible_empty_add_column() {
2267        let old = RelationDesc::empty();
2268        let new = RelationDesc::from_names_and_types([("a", ScalarType::Bool.nullable(true))]);
2269
2270        let old_data_type = get_data_type(&old);
2271        let new_data_type = get_data_type(&new);
2272
2273        let migration = backward_compatible(&old_data_type, &new_data_type);
2274        assert!(migration.is_some());
2275    }
2276
2277    #[mz_ore::test]
2278    fn backward_compatible_project_away_all() {
2279        let old = RelationDesc::from_names_and_types([("a", ScalarType::Bool.nullable(true))]);
2280        let new = RelationDesc::empty();
2281
2282        let old_data_type = get_data_type(&old);
2283        let new_data_type = get_data_type(&new);
2284
2285        let migration = backward_compatible(&old_data_type, &new_data_type);
2286        assert!(migration.is_some());
2287    }
2288
2289    #[mz_ore::test]
2290    #[cfg_attr(miri, ignore)]
2291    fn backward_compatible_migrate() {
2292        let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
2293            proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
2294                .prop_map(move |datas| (old.clone(), new.clone(), datas))
2295        });
2296
2297        proptest!(|((old, new, datas) in strat)| {
2298            let old_data_type = get_data_type(&old);
2299            let new_data_type = get_data_type(&new);
2300
2301            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2302                backward_compatible_testcase(&old, &new, migration, &datas);
2303            };
2304        });
2305    }
2306
2307    #[mz_ore::test]
2308    #[cfg_attr(miri, ignore)]
2309    fn backward_compatible_migrate_from_common() {
2310        use mz_repr::ColumnType;
2311        fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2312            // TODO(parkmycar): As we iterate on schema migrations more things should become compatible.
2313            let should_be_compatible = diffs.iter().all(|diff| match diff {
2314                // We only support adding nullable columns.
2315                PropRelationDescDiff::AddColumn {
2316                    typ: ColumnType { nullable, .. },
2317                    ..
2318                } => *nullable,
2319                PropRelationDescDiff::DropColumn { .. } => true,
2320                _ => false,
2321            });
2322
2323            let mut new = old.clone();
2324            for diff in diffs.into_iter() {
2325                diff.apply(&mut new)
2326            }
2327
2328            let old_data_type = get_data_type(&old);
2329            let new_data_type = get_data_type(&new);
2330
2331            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2332                backward_compatible_testcase(&old, &new, migration, &datas);
2333            } else if should_be_compatible {
2334                panic!("new DataType was not compatible when it should have been!");
2335            }
2336        }
2337
2338        let strat = any::<RelationDesc>()
2339            .prop_flat_map(|desc| {
2340                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2341                    .no_shrink()
2342                    .prop_map(move |datas| (desc.clone(), datas))
2343            })
2344            .prop_flat_map(|(desc, datas)| {
2345                arb_relation_desc_diff(&desc)
2346                    .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2347            });
2348
2349        proptest!(|((old, diffs, datas) in strat)| {
2350            test_case(old, diffs, datas);
2351        });
2352    }
2353
2354    #[mz_ore::test]
2355    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2356    fn empty_relation_desc_roundtrips() {
2357        let empty = RelationDesc::empty();
2358        let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2359            .prop_map(move |datas| (empty.clone(), datas));
2360
2361        // Note: This case should be covered by the `all_source_data_roundtrips` test above, but
2362        // it's a special case that we explicitly want to exercise.
2363        proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2364            roundtrip_source_data(&desc, source_datas, &desc, &config);
2365        });
2366    }
2367
2368    #[mz_ore::test]
2369    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2370    fn arrow_datatype_consistent() {
2371        fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2372            let half = datas.len() / 2;
2373
2374            let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2375            for data in &datas[..half] {
2376                encoder_a.append(data);
2377            }
2378            let col_a = encoder_a.finish();
2379
2380            let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2381            for data in &datas[half..] {
2382                encoder_b.append(data);
2383            }
2384            let col_b = encoder_b.finish();
2385
2386            // The DataType of the resulting column should not change based on what data was
2387            // encoded.
2388            assert_eq!(col_a.data_type(), col_b.data_type());
2389        }
2390
2391        let num_rows = 12;
2392        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2393            proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2394                .prop_map(move |datas| (desc.clone(), datas))
2395        });
2396
2397        proptest!(|((desc, data) in strat)| {
2398            test_case(desc, data);
2399        });
2400    }
2401
2402    #[mz_ore::test]
2403    #[cfg_attr(miri, ignore)] // too slow
2404    fn source_proto_serialization_stability() {
2405        let min_protos = 10;
2406        let encoded = include_str!("snapshots/source-datas.txt");
2407
2408        // Decode the pre-generated source datas
2409        let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2410            .lines()
2411            .map(|s| {
2412                let (desc, data) = s.split_once(',').expect("comma separated data");
2413                let desc = base64::engine::general_purpose::STANDARD
2414                    .decode(desc)
2415                    .expect("valid base64");
2416                let data = base64::engine::general_purpose::STANDARD
2417                    .decode(data)
2418                    .expect("valid base64");
2419                (desc, data)
2420            })
2421            .map(|(desc, data)| {
2422                let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2423                let desc = desc.into_rust().expect("valid proto");
2424                let data = SourceData::decode(&data, &desc).expect("valid proto");
2425                (desc, data)
2426            })
2427            .collect();
2428
2429        // If there are fewer than the minimum examples, generate some new ones arbitrarily
2430        let mut runner = proptest::test_runner::TestRunner::deterministic();
2431        let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2432            arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2433        });
2434        while decoded.len() < min_protos {
2435            let arbitrary_data = strategy
2436                .new_tree(&mut runner)
2437                .expect("source data")
2438                .current();
2439            decoded.push(arbitrary_data);
2440        }
2441
2442        // Reencode and compare the strings
2443        let mut reencoded = String::new();
2444        let mut buf = vec![];
2445        for (desc, data) in decoded {
2446            buf.clear();
2447            desc.into_proto().encode(&mut buf).expect("success");
2448            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2449            reencoded.push(',');
2450
2451            buf.clear();
2452            data.encode(&mut buf);
2453            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2454            reencoded.push('\n');
2455        }
2456
2457        // Optimizations in Persist, particularly consolidation on read,
2458        // depend on a stable serialization for the serialized data.
2459        // For example, reordering proto fields could cause us
2460        // to generate a different (equivalent) serialization for a record,
2461        // and the two versions would not consolidate out.
2462        // This can impact correctness!
2463        //
2464        // If you need to change how SourceDatas are encoded, that's still fine...
2465        // but we'll also need to increase
2466        // the MINIMUM_CONSOLIDATED_VERSION as part of the same release.
2467        assert_eq!(
2468            encoded,
2469            reencoded.as_str(),
2470            "SourceData serde should be stable"
2471        )
2472    }
2473}