Skip to main content

mz_storage_types/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to the introduction of changing collections into `dataflow`.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34    StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38    CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39    RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use timely::order::{PartialOrder, TotalOrder};
47use timely::progress::timestamp::Refines;
48use timely::progress::{PathSummary, Timestamp};
49
50use crate::AlterCompatible;
51use crate::connections::inline::{
52    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
53    ReferencedConnection,
54};
55use crate::controller::AlterError;
56use crate::errors::{DataflowError, ProtoDataflowError};
57use crate::instances::StorageInstanceId;
58use crate::sources::sql_server::SqlServerSourceExportDetails;
59
60pub mod casts;
61pub mod encoding;
62pub mod envelope;
63pub mod kafka;
64pub mod load_generator;
65pub mod mysql;
66pub mod postgres;
67pub mod sql_server;
68
69pub use crate::sources::envelope::SourceEnvelope;
70pub use crate::sources::kafka::KafkaSourceConnection;
71pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
72pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
73pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
74pub use crate::sources::sql_server::{SqlServerSourceConnection, SqlServerSourceExtras};
75
76include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
77
78/// A description of a source ingestion
79#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
80pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
81    /// The source description.
82    pub desc: SourceDesc<C>,
83    /// Collections to be exported by this ingestion.
84    ///
85    /// # Notes
86    /// - For multi-output sources:
87    ///     - Add exports by adding a new [`SourceExport`].
88    ///     - Remove exports by removing the [`SourceExport`].
89    ///
90    ///   Re-rendering/executing the source after making these modifications
91    ///   adds and drops the subsource, respectively.
92    /// - This field includes the primary source's ID, which might need to be
93    ///   filtered out to understand which exports are explicit ingestion exports.
94    /// - This field does _not_ include the remap collection, which is tracked
95    ///   in its own field.
96    pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
97    /// The ID of the instance in which to install the source.
98    pub instance_id: StorageInstanceId,
99    /// The ID of this ingestion's remap/progress collection.
100    pub remap_collection_id: GlobalId,
101    /// The storage metadata for the remap/progress collection
102    pub remap_metadata: S,
103}
104
105impl IngestionDescription {
106    pub fn new(
107        desc: SourceDesc,
108        instance_id: StorageInstanceId,
109        remap_collection_id: GlobalId,
110    ) -> Self {
111        Self {
112            desc,
113            remap_metadata: (),
114            source_exports: BTreeMap::new(),
115            instance_id,
116            remap_collection_id,
117        }
118    }
119}
120
121impl<S> IngestionDescription<S> {
122    /// Return an iterator over the `GlobalId`s of `self`'s collections.
123    /// This will contain ids for the remap collection, subsources,
124    /// tables for this source, and the primary collection ID, even if
125    /// no data will be exported to the primary collection.
126    pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
127        // Expand self so that any new fields added generate a compiler error to
128        // increase the likelihood of developers seeing this function.
129        let IngestionDescription {
130            desc: _,
131            remap_metadata: _,
132            source_exports,
133            instance_id: _,
134            remap_collection_id,
135        } = &self;
136
137        source_exports
138            .keys()
139            .copied()
140            .chain(std::iter::once(*remap_collection_id))
141    }
142}
143
144impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
145    fn alter_compatible(
146        &self,
147        id: GlobalId,
148        other: &IngestionDescription<S>,
149    ) -> Result<(), AlterError> {
150        if self == other {
151            return Ok(());
152        }
153        let IngestionDescription {
154            desc,
155            remap_metadata,
156            source_exports,
157            instance_id,
158            remap_collection_id,
159        } = self;
160
161        let compatibility_checks = [
162            (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
163            (remap_metadata == &other.remap_metadata, "remap_metadata"),
164            (
165                source_exports
166                    .iter()
167                    .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
168                        l_key.cmp(r_key)
169                    })
170                    .all(|r| match r {
171                        Both(
172                            (
173                                _,
174                                SourceExport {
175                                    storage_metadata: l_metadata,
176                                    details: l_details,
177                                    data_config: l_data_config,
178                                },
179                            ),
180                            (
181                                _,
182                                SourceExport {
183                                    storage_metadata: r_metadata,
184                                    details: r_details,
185                                    data_config: r_data_config,
186                                },
187                            ),
188                        ) => {
189                            l_metadata.alter_compatible(id, r_metadata).is_ok()
190                                && l_details.alter_compatible(id, r_details).is_ok()
191                                && l_data_config.alter_compatible(id, r_data_config).is_ok()
192                        }
193                        _ => true,
194                    }),
195                "source_exports",
196            ),
197            (instance_id == &other.instance_id, "instance_id"),
198            (
199                remap_collection_id == &other.remap_collection_id,
200                "remap_collection_id",
201            ),
202        ];
203        for (compatible, field) in compatibility_checks {
204            if !compatible {
205                tracing::warn!(
206                    "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
207                    self,
208                    other
209                );
210
211                return Err(AlterError { id });
212            }
213        }
214
215        Ok(())
216    }
217}
218
219impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
220    for IngestionDescription<(), ReferencedConnection>
221{
222    fn into_inline_connection(self, r: R) -> IngestionDescription {
223        let IngestionDescription {
224            desc,
225            remap_metadata,
226            source_exports,
227            instance_id,
228            remap_collection_id,
229        } = self;
230
231        IngestionDescription {
232            desc: desc.into_inline_connection(r),
233            remap_metadata,
234            source_exports,
235            instance_id,
236            remap_collection_id,
237        }
238    }
239}
240
241#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
242pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
243    /// The collection metadata needed to write the exported data
244    pub storage_metadata: S,
245    /// Details necessary for the source to export data to this export's collection.
246    pub details: SourceExportDetails,
247    /// Config necessary to handle (e.g. decode and envelope) the data for this export.
248    pub data_config: SourceExportDataConfig<C>,
249}
250
251pub trait SourceTimestamp:
252    Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
253{
254    fn encode_row(&self) -> Row;
255    fn decode_row(row: &Row) -> Self;
256}
257
258impl SourceTimestamp for MzOffset {
259    fn encode_row(&self) -> Row {
260        Row::pack([Datum::UInt64(self.offset)])
261    }
262
263    fn decode_row(row: &Row) -> Self {
264        let mut datums = row.iter();
265        match (datums.next(), datums.next()) {
266            (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
267            _ => panic!("invalid row {row:?}"),
268        }
269    }
270}
271
272/// Universal language for describing message positions in Materialize, in a source independent
273/// way. Individual sources like Kafka or File sources should explicitly implement their own offset
274/// type that converts to/From MzOffsets. A 0-MzOffset denotes an empty stream.
275#[derive(
276    Copy,
277    Clone,
278    Default,
279    Debug,
280    PartialEq,
281    PartialOrd,
282    Eq,
283    Ord,
284    Hash,
285    Serialize,
286    Deserialize
287)]
288pub struct MzOffset {
289    pub offset: u64,
290}
291
292impl differential_dataflow::difference::Semigroup for MzOffset {
293    fn plus_equals(&mut self, rhs: &Self) {
294        self.offset.plus_equals(&rhs.offset)
295    }
296}
297
298impl differential_dataflow::difference::IsZero for MzOffset {
299    fn is_zero(&self) -> bool {
300        self.offset.is_zero()
301    }
302}
303
304impl mz_persist_types::Codec64 for MzOffset {
305    fn codec_name() -> String {
306        "MzOffset".to_string()
307    }
308
309    fn encode(&self) -> [u8; 8] {
310        mz_persist_types::Codec64::encode(&self.offset)
311    }
312
313    fn decode(buf: [u8; 8]) -> Self {
314        Self {
315            offset: mz_persist_types::Codec64::decode(buf),
316        }
317    }
318}
319
320impl columnation::Columnation for MzOffset {
321    type InnerRegion = columnation::CopyRegion<MzOffset>;
322}
323
324impl MzOffset {
325    pub fn checked_sub(self, other: Self) -> Option<Self> {
326        self.offset
327            .checked_sub(other.offset)
328            .map(|offset| Self { offset })
329    }
330}
331
332/// Convert from MzOffset to Kafka::Offset as long as
333/// the offset is not negative
334impl From<u64> for MzOffset {
335    fn from(offset: u64) -> Self {
336        Self { offset }
337    }
338}
339
340impl std::fmt::Display for MzOffset {
341    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342        write!(f, "{}", self.offset)
343    }
344}
345
346// Assume overflow does not occur for addition
347impl Add<u64> for MzOffset {
348    type Output = MzOffset;
349
350    fn add(self, x: u64) -> MzOffset {
351        MzOffset {
352            offset: self.offset + x,
353        }
354    }
355}
356impl Add<Self> for MzOffset {
357    type Output = Self;
358
359    fn add(self, x: Self) -> Self {
360        MzOffset {
361            offset: self.offset + x.offset,
362        }
363    }
364}
365impl AddAssign<u64> for MzOffset {
366    fn add_assign(&mut self, x: u64) {
367        self.offset += x;
368    }
369}
370impl AddAssign<Self> for MzOffset {
371    fn add_assign(&mut self, x: Self) {
372        self.offset += x.offset;
373    }
374}
375
376/// Convert from `PgLsn` to MzOffset
377impl From<tokio_postgres::types::PgLsn> for MzOffset {
378    fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
379        MzOffset { offset: lsn.into() }
380    }
381}
382
383impl Timestamp for MzOffset {
384    type Summary = MzOffset;
385
386    fn minimum() -> Self {
387        MzOffset {
388            offset: Timestamp::minimum(),
389        }
390    }
391}
392
393impl PathSummary<MzOffset> for MzOffset {
394    fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
395        Some(MzOffset {
396            offset: self.offset.results_in(&src.offset)?,
397        })
398    }
399
400    fn followed_by(&self, other: &Self) -> Option<Self> {
401        Some(MzOffset {
402            offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
403        })
404    }
405}
406
407impl Refines<()> for MzOffset {
408    fn to_inner(_: ()) -> Self {
409        MzOffset::minimum()
410    }
411    fn to_outer(self) {}
412    fn summarize(_: Self::Summary) {}
413}
414
415impl PartialOrder for MzOffset {
416    #[inline]
417    fn less_equal(&self, other: &Self) -> bool {
418        self.offset.less_equal(&other.offset)
419    }
420}
421
422impl TotalOrder for MzOffset {}
423
424/// The meaning of the timestamp number produced by data sources. This type
425/// is not concerned with the source of the timestamp (like if the data came
426/// from a Debezium consistency topic or a CDCv2 stream), instead only what the
427/// timestamp number means.
428///
429/// Some variants here have attached data used to differentiate incomparable
430/// instantiations. These attached data types should be expanded in the future
431/// if we need to tell apart more kinds of sources.
432#[derive(
433    Clone,
434    Debug,
435    Ord,
436    PartialOrd,
437    Eq,
438    PartialEq,
439    Serialize,
440    Deserialize,
441    Hash
442)]
443pub enum Timeline {
444    /// EpochMilliseconds means the timestamp is the number of milliseconds since
445    /// the Unix epoch.
446    EpochMilliseconds,
447    /// External means the timestamp comes from an external data source and we
448    /// don't know what the number means. The attached String is the source's name,
449    /// which will result in different sources being incomparable.
450    External(String),
451    /// User means the user has manually specified a timeline. The attached
452    /// String is specified by the user, allowing them to decide sources that are
453    /// joinable.
454    User(String),
455}
456
457impl Timeline {
458    const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
459    const EXTERNAL_ID_CHAR: char = 'E';
460    const USER_ID_CHAR: char = 'U';
461
462    fn id_char(&self) -> char {
463        match self {
464            Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
465            Self::External(_) => Self::EXTERNAL_ID_CHAR,
466            Self::User(_) => Self::USER_ID_CHAR,
467        }
468    }
469}
470
471impl ToString for Timeline {
472    fn to_string(&self) -> String {
473        match self {
474            Self::EpochMilliseconds => format!("{}", self.id_char()),
475            Self::External(id) => format!("{}.{id}", self.id_char()),
476            Self::User(id) => format!("{}.{id}", self.id_char()),
477        }
478    }
479}
480
481impl FromStr for Timeline {
482    type Err = String;
483
484    fn from_str(s: &str) -> Result<Self, Self::Err> {
485        if s.is_empty() {
486            return Err("empty timeline".to_string());
487        }
488        let mut chars = s.chars();
489        match chars.next().expect("non-empty string") {
490            Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
491                None => Ok(Self::EpochMilliseconds),
492                Some(_) => Err(format!("unknown timeline: {s}")),
493            },
494            Self::EXTERNAL_ID_CHAR => match chars.next() {
495                Some('.') => Ok(Self::External(chars.as_str().to_string())),
496                _ => Err(format!("unknown timeline: {s}")),
497            },
498            Self::USER_ID_CHAR => match chars.next() {
499                Some('.') => Ok(Self::User(chars.as_str().to_string())),
500                _ => Err(format!("unknown timeline: {s}")),
501            },
502            _ => Err(format!("unknown timeline: {s}")),
503        }
504    }
505}
506
507/// A connection to an external system
508pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
509    /// The name of the external system (e.g kafka, postgres, etc).
510    fn name(&self) -> &'static str;
511
512    /// The name of the resource in the external system (e.g kafka topic) if any
513    fn external_reference(&self) -> Option<&str>;
514
515    /// Defines the key schema to use by default for this source connection type.
516    /// This will be used for the primary export of the source and as the default
517    /// pre-encoding key schema for the source.
518    fn default_key_desc(&self) -> RelationDesc;
519
520    /// Defines the value schema to use by default for this source connection type.
521    /// This will be used for the primary export of the source and as the default
522    /// pre-encoding value schema for the source.
523    fn default_value_desc(&self) -> RelationDesc;
524
525    /// The schema of this connection's timestamp type. This will also be the schema of the
526    /// progress relation.
527    fn timestamp_desc(&self) -> RelationDesc;
528
529    /// The id of the connection object (i.e the one obtained from running `CREATE CONNECTION`) in
530    /// the catalog, if any.
531    fn connection_id(&self) -> Option<CatalogItemId>;
532
533    /// Whether the source type supports read only mode.
534    fn supports_read_only(&self) -> bool;
535
536    /// Whether the source type prefers to run on only one replica of a multi-replica cluster.
537    fn prefers_single_replica(&self) -> bool;
538}
539
540#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
541pub enum Compression {
542    Gzip,
543    None,
544}
545
546/// Defines the configuration for how to handle data that is exported for a given
547/// Source Export.
548#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
549pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
550    pub encoding: Option<encoding::SourceDataEncoding<C>>,
551    pub envelope: SourceEnvelope,
552}
553
554impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
555    for SourceExportDataConfig<ReferencedConnection>
556{
557    fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
558        let SourceExportDataConfig { encoding, envelope } = self;
559
560        SourceExportDataConfig {
561            encoding: encoding.map(|e| e.into_inline_connection(r)),
562            envelope,
563        }
564    }
565}
566
567impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
568    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
569        if self == other {
570            return Ok(());
571        }
572        let Self { encoding, envelope } = &self;
573
574        let compatibility_checks = [
575            (
576                match (encoding, &other.encoding) {
577                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
578                    (s, o) => s == o,
579                },
580                "encoding",
581            ),
582            (envelope == &other.envelope, "envelope"),
583        ];
584
585        for (compatible, field) in compatibility_checks {
586            if !compatible {
587                tracing::warn!(
588                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
589                    self,
590                    other
591                );
592
593                return Err(AlterError { id });
594            }
595        }
596        Ok(())
597    }
598}
599
600impl<C: ConnectionAccess> SourceExportDataConfig<C> {
601    /// Returns `true` if this connection yields data that is
602    /// append-only/monotonic. Append-monly means the source
603    /// never produces retractions.
604    // TODO(guswynn): consider enforcing this more completely at the
605    // parsing/typechecking level, by not using an `envelope`
606    // for sources like pg
607    pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
608        match &self.envelope {
609            // Upsert and CdcV2 may produce retractions.
610            SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
611            SourceEnvelope::None(_) => {
612                match connection {
613                    // Postgres can produce retractions (deletes).
614                    GenericSourceConnection::Postgres(_) => false,
615                    // MySQL can produce retractions (deletes).
616                    GenericSourceConnection::MySql(_) => false,
617                    // SQL Server can produce retractions (deletes).
618                    GenericSourceConnection::SqlServer(_) => false,
619                    // Whether or not a Loadgen source can produce retractions varies.
620                    GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
621                    // Kafka exports with `None` envelope are append-only.
622                    GenericSourceConnection::Kafka(_) => true,
623                }
624            }
625        }
626    }
627}
628
629/// An external source of updates for a relational collection.
630#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
631pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
632    pub connection: GenericSourceConnection<C>,
633    pub timestamp_interval: Duration,
634}
635
636impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
637    for SourceDesc<ReferencedConnection>
638{
639    fn into_inline_connection(self, r: R) -> SourceDesc {
640        let SourceDesc {
641            connection,
642            timestamp_interval,
643        } = self;
644
645        SourceDesc {
646            connection: connection.into_inline_connection(&r),
647            timestamp_interval,
648        }
649    }
650}
651
652impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
653    /// Determines if `self` is compatible with another `SourceDesc`, in such a
654    /// way that it is possible to turn `self` into `other` through a valid
655    /// series of transformations (e.g. no transformation or `ALTER SOURCE`).
656    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
657        if self == other {
658            return Ok(());
659        }
660        let Self {
661            connection,
662            // timestamp_interval is allowed to change via ALTER SOURCE
663            timestamp_interval: _,
664        } = &self;
665
666        let compatibility_checks = [(
667            connection.alter_compatible(id, &other.connection).is_ok(),
668            "connection",
669        )];
670
671        for (compatible, field) in compatibility_checks {
672            if !compatible {
673                tracing::warn!(
674                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
675                    self,
676                    other
677                );
678
679                return Err(AlterError { id });
680            }
681        }
682
683        Ok(())
684    }
685}
686
687#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
688pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
689    Kafka(KafkaSourceConnection<C>),
690    Postgres(PostgresSourceConnection<C>),
691    MySql(MySqlSourceConnection<C>),
692    SqlServer(SqlServerSourceConnection<C>),
693    LoadGenerator(LoadGeneratorSourceConnection),
694}
695
696impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
697    fn from(conn: KafkaSourceConnection<C>) -> Self {
698        Self::Kafka(conn)
699    }
700}
701
702impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
703    fn from(conn: PostgresSourceConnection<C>) -> Self {
704        Self::Postgres(conn)
705    }
706}
707
708impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
709    fn from(conn: MySqlSourceConnection<C>) -> Self {
710        Self::MySql(conn)
711    }
712}
713
714impl<C: ConnectionAccess> From<SqlServerSourceConnection<C>> for GenericSourceConnection<C> {
715    fn from(conn: SqlServerSourceConnection<C>) -> Self {
716        Self::SqlServer(conn)
717    }
718}
719
720impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
721    fn from(conn: LoadGeneratorSourceConnection) -> Self {
722        Self::LoadGenerator(conn)
723    }
724}
725
726impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
727    for GenericSourceConnection<ReferencedConnection>
728{
729    fn into_inline_connection(self, r: R) -> GenericSourceConnection {
730        match self {
731            GenericSourceConnection::Kafka(kafka) => {
732                GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
733            }
734            GenericSourceConnection::Postgres(pg) => {
735                GenericSourceConnection::Postgres(pg.into_inline_connection(r))
736            }
737            GenericSourceConnection::MySql(mysql) => {
738                GenericSourceConnection::MySql(mysql.into_inline_connection(r))
739            }
740            GenericSourceConnection::SqlServer(sql_server) => {
741                GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
742            }
743            GenericSourceConnection::LoadGenerator(lg) => {
744                GenericSourceConnection::LoadGenerator(lg)
745            }
746        }
747    }
748}
749
750impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
751    fn name(&self) -> &'static str {
752        match self {
753            Self::Kafka(conn) => conn.name(),
754            Self::Postgres(conn) => conn.name(),
755            Self::MySql(conn) => conn.name(),
756            Self::SqlServer(conn) => conn.name(),
757            Self::LoadGenerator(conn) => conn.name(),
758        }
759    }
760
761    fn external_reference(&self) -> Option<&str> {
762        match self {
763            Self::Kafka(conn) => conn.external_reference(),
764            Self::Postgres(conn) => conn.external_reference(),
765            Self::MySql(conn) => conn.external_reference(),
766            Self::SqlServer(conn) => conn.external_reference(),
767            Self::LoadGenerator(conn) => conn.external_reference(),
768        }
769    }
770
771    fn default_key_desc(&self) -> RelationDesc {
772        match self {
773            Self::Kafka(conn) => conn.default_key_desc(),
774            Self::Postgres(conn) => conn.default_key_desc(),
775            Self::MySql(conn) => conn.default_key_desc(),
776            Self::SqlServer(conn) => conn.default_key_desc(),
777            Self::LoadGenerator(conn) => conn.default_key_desc(),
778        }
779    }
780
781    fn default_value_desc(&self) -> RelationDesc {
782        match self {
783            Self::Kafka(conn) => conn.default_value_desc(),
784            Self::Postgres(conn) => conn.default_value_desc(),
785            Self::MySql(conn) => conn.default_value_desc(),
786            Self::SqlServer(conn) => conn.default_value_desc(),
787            Self::LoadGenerator(conn) => conn.default_value_desc(),
788        }
789    }
790
791    fn timestamp_desc(&self) -> RelationDesc {
792        match self {
793            Self::Kafka(conn) => conn.timestamp_desc(),
794            Self::Postgres(conn) => conn.timestamp_desc(),
795            Self::MySql(conn) => conn.timestamp_desc(),
796            Self::SqlServer(conn) => conn.timestamp_desc(),
797            Self::LoadGenerator(conn) => conn.timestamp_desc(),
798        }
799    }
800
801    fn connection_id(&self) -> Option<CatalogItemId> {
802        match self {
803            Self::Kafka(conn) => conn.connection_id(),
804            Self::Postgres(conn) => conn.connection_id(),
805            Self::MySql(conn) => conn.connection_id(),
806            Self::SqlServer(conn) => conn.connection_id(),
807            Self::LoadGenerator(conn) => conn.connection_id(),
808        }
809    }
810
811    fn supports_read_only(&self) -> bool {
812        match self {
813            GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
814            GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
815            GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
816            GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
817            GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
818        }
819    }
820
821    fn prefers_single_replica(&self) -> bool {
822        match self {
823            GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
824            GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
825            GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
826            GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
827            GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
828        }
829    }
830}
831impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
832    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
833        if self == other {
834            return Ok(());
835        }
836        let r = match (self, other) {
837            (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
838            (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
839            (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
840            (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
841            (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
842                conn.alter_compatible(id, other)
843            }
844            _ => Err(AlterError { id }),
845        };
846
847        if r.is_err() {
848            tracing::warn!(
849                "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
850                self,
851                other
852            );
853        }
854
855        r
856    }
857}
858
859/// Details necessary for each source export to allow the source implementations
860/// to export data to the export's collection.
861#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
862pub enum SourceExportDetails {
863    /// Used when the primary collection of a source isn't an export to
864    /// output to.
865    None,
866    Kafka(KafkaSourceExportDetails),
867    Postgres(PostgresSourceExportDetails),
868    MySql(MySqlSourceExportDetails),
869    SqlServer(SqlServerSourceExportDetails),
870    LoadGenerator(LoadGeneratorSourceExportDetails),
871}
872
873impl crate::AlterCompatible for SourceExportDetails {
874    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
875        if self == other {
876            return Ok(());
877        }
878        let r = match (self, other) {
879            (Self::None, Self::None) => Ok(()),
880            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
881            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
882            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
883            (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
884            _ => Err(AlterError { id }),
885        };
886
887        if r.is_err() {
888            tracing::warn!(
889                "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
890                self,
891                other
892            );
893        }
894
895        r
896    }
897}
898
899/// Details necessary to store in the `Details` option of a source export
900/// statement (`CREATE SUBSOURCE` and `CREATE TABLE .. FROM SOURCE` statements),
901/// to generate the appropriate `SourceExportDetails` struct during planning.
902/// NOTE that this is serialized as proto to the catalog, so any changes here
903/// must be backwards compatible or will require a migration.
904pub enum SourceExportStatementDetails {
905    Postgres {
906        table: mz_postgres_util::desc::PostgresTableDesc,
907    },
908    MySql {
909        table: mz_mysql_util::MySqlTableDesc,
910        initial_gtid_set: String,
911    },
912    SqlServer {
913        table: mz_sql_server_util::desc::SqlServerTableDesc,
914        capture_instance: Arc<str>,
915        initial_lsn: mz_sql_server_util::cdc::Lsn,
916    },
917    LoadGenerator {
918        output: LoadGeneratorOutput,
919    },
920    Kafka {},
921}
922
923impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
924    fn into_proto(&self) -> ProtoSourceExportStatementDetails {
925        match self {
926            SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
927                kind: Some(proto_source_export_statement_details::Kind::Postgres(
928                    postgres::ProtoPostgresSourceExportStatementDetails {
929                        table: Some(table.into_proto()),
930                    },
931                )),
932            },
933            SourceExportStatementDetails::MySql {
934                table,
935                initial_gtid_set,
936            } => ProtoSourceExportStatementDetails {
937                kind: Some(proto_source_export_statement_details::Kind::Mysql(
938                    mysql::ProtoMySqlSourceExportStatementDetails {
939                        table: Some(table.into_proto()),
940                        initial_gtid_set: initial_gtid_set.clone(),
941                    },
942                )),
943            },
944            SourceExportStatementDetails::SqlServer {
945                table,
946                capture_instance,
947                initial_lsn,
948            } => ProtoSourceExportStatementDetails {
949                kind: Some(proto_source_export_statement_details::Kind::SqlServer(
950                    sql_server::ProtoSqlServerSourceExportStatementDetails {
951                        table: Some(table.into_proto()),
952                        capture_instance: capture_instance.to_string(),
953                        initial_lsn: initial_lsn.as_bytes().to_vec(),
954                    },
955                )),
956            },
957            SourceExportStatementDetails::LoadGenerator { output } => {
958                ProtoSourceExportStatementDetails {
959                    kind: Some(proto_source_export_statement_details::Kind::Loadgen(
960                        load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
961                            output: output.into_proto().into(),
962                        },
963                    )),
964                }
965            }
966            SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
967                kind: Some(proto_source_export_statement_details::Kind::Kafka(
968                    kafka::ProtoKafkaSourceExportStatementDetails {},
969                )),
970            },
971        }
972    }
973
974    fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
975        use proto_source_export_statement_details::Kind;
976        Ok(match proto.kind {
977            Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
978                table: details
979                    .table
980                    .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
981            },
982            Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
983                table: details
984                    .table
985                    .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
986
987                initial_gtid_set: details.initial_gtid_set,
988            },
989            Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
990                table: details
991                    .table
992                    .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
993                capture_instance: details.capture_instance.into(),
994                initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
995                    .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
996            },
997            Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
998                output: details
999                    .output
1000                    .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1001            },
1002            Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1003            None => {
1004                return Err(TryFromProtoError::missing_field(
1005                    "ProtoSourceExportStatementDetails::kind",
1006                ));
1007            }
1008        })
1009    }
1010}
1011
1012#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1013#[repr(transparent)]
1014pub struct SourceData(pub Result<Row, DataflowError>);
1015
1016impl Default for SourceData {
1017    fn default() -> Self {
1018        SourceData(Ok(Row::default()))
1019    }
1020}
1021
1022impl Deref for SourceData {
1023    type Target = Result<Row, DataflowError>;
1024
1025    fn deref(&self) -> &Self::Target {
1026        &self.0
1027    }
1028}
1029
1030impl DerefMut for SourceData {
1031    fn deref_mut(&mut self) -> &mut Self::Target {
1032        &mut self.0
1033    }
1034}
1035
1036impl RustType<ProtoSourceData> for SourceData {
1037    fn into_proto(&self) -> ProtoSourceData {
1038        use proto_source_data::Kind;
1039        ProtoSourceData {
1040            kind: Some(match &**self {
1041                Ok(row) => Kind::Ok(row.into_proto()),
1042                Err(err) => Kind::Err(err.into_proto()),
1043            }),
1044        }
1045    }
1046
1047    fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1048        use proto_source_data::Kind;
1049        match proto.kind {
1050            Some(kind) => match kind {
1051                Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1052                Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1053            },
1054            None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1055        }
1056    }
1057}
1058
1059impl Codec for SourceData {
1060    type Storage = ProtoRow;
1061    type Schema = RelationDesc;
1062
1063    fn codec_name() -> String {
1064        "protobuf[SourceData]".into()
1065    }
1066
1067    fn encode<B: BufMut>(&self, buf: &mut B) {
1068        self.into_proto()
1069            .encode(buf)
1070            .expect("no required fields means no initialization errors");
1071    }
1072
1073    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1074        let mut val = SourceData::default();
1075        <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1076        Ok(val)
1077    }
1078
1079    fn decode_from<'a>(
1080        &mut self,
1081        buf: &'a [u8],
1082        storage: &mut Option<ProtoRow>,
1083        schema: &RelationDesc,
1084    ) -> Result<(), String> {
1085        // Optimize for common case of `Ok` by leaving a (cleared) `ProtoRow` in
1086        // the `Ok` variant of `ProtoSourceData`. prost's `Message::merge` impl
1087        // is smart about reusing the `Vec<Datum>` when it can.
1088        let mut proto = storage.take().unwrap_or_default();
1089        proto.clear();
1090        let mut proto = ProtoSourceData {
1091            kind: Some(proto_source_data::Kind::Ok(proto)),
1092        };
1093        proto.merge(buf).map_err(|err| err.to_string())?;
1094        match (proto.kind, &mut self.0) {
1095            // Again, optimize for the common case...
1096            (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1097                let ret = row.decode_from_proto(&proto, schema);
1098                storage.replace(proto);
1099                ret
1100            }
1101            // ...otherwise fall back to the obvious thing.
1102            (kind, _) => {
1103                let proto = ProtoSourceData { kind };
1104                *self = proto.into_rust().map_err(|err| err.to_string())?;
1105                // Nothing to put back in storage.
1106                Ok(())
1107            }
1108        }
1109    }
1110
1111    fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1112        match &val.0 {
1113            Ok(row) => Row::validate(row, desc),
1114            Err(_) => Ok(()),
1115        }
1116    }
1117
1118    fn encode_schema(schema: &Self::Schema) -> Bytes {
1119        schema.into_proto().encode_to_vec().into()
1120    }
1121
1122    fn decode_schema(buf: &Bytes) -> Self::Schema {
1123        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1124        proto.into_rust().expect("valid schema")
1125    }
1126}
1127
1128/// Given a [`RelationDesc`] returns an arbitrary [`SourceData`].
1129pub fn arb_source_data_for_relation_desc(
1130    desc: &RelationDesc,
1131) -> impl Strategy<Value = SourceData> + use<> {
1132    let row_strat = arb_row_for_relation(desc).no_shrink();
1133
1134    proptest::strategy::Union::new_weighted(vec![
1135        (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1136        (
1137            1,
1138            any::<DataflowError>()
1139                .prop_map(|err| SourceData(Err(err)))
1140                .no_shrink()
1141                .boxed(),
1142        ),
1143    ])
1144}
1145
1146/// Describes how external references should be organized in a multi-level
1147/// hierarchy.
1148///
1149/// For both PostgreSQL and MySQL sources, these levels of reference are
1150/// intrinsic to the items which we're referencing. If there are other naming
1151/// schemas for other types of sources we discover, we might need to revisit
1152/// this.
1153pub trait ExternalCatalogReference {
1154    /// The "second" level of namespacing for the reference.
1155    fn schema_name(&self) -> &str;
1156    /// The lowest level of namespacing for the reference.
1157    fn item_name(&self) -> &str;
1158}
1159
1160impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1161    fn schema_name(&self) -> &str {
1162        &self.schema_name
1163    }
1164
1165    fn item_name(&self) -> &str {
1166        &self.name
1167    }
1168}
1169
1170impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1171    fn schema_name(&self) -> &str {
1172        &self.namespace
1173    }
1174
1175    fn item_name(&self) -> &str {
1176        &self.name
1177    }
1178}
1179
1180impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1181    fn schema_name(&self) -> &str {
1182        &*self.schema_name
1183    }
1184
1185    fn item_name(&self) -> &str {
1186        &*self.name
1187    }
1188}
1189
1190// This implementation provides a means of converting arbitrary objects into a
1191// `SubsourceCatalogReference`, e.g. load generator view names.
1192impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1193    fn schema_name(&self) -> &str {
1194        self.0
1195    }
1196
1197    fn item_name(&self) -> &str {
1198        self.1
1199    }
1200}
1201
1202/// Stores and resolves references to a `&[T: ExternalCatalogReference]`.
1203///
1204/// This is meant to provide an API to quickly look up a source's subsources.
1205///
1206/// For sources that do not provide any subsources, use the `Default`
1207/// implementation, which is empty and will not be able to resolve any
1208/// references.
1209#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1210pub struct SourceReferenceResolver {
1211    inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1212}
1213
1214#[derive(Debug, Clone, thiserror::Error)]
1215pub enum ExternalReferenceResolutionError {
1216    #[error("reference to {name} not found in source")]
1217    DoesNotExist { name: String },
1218    #[error(
1219        "reference {name} is ambiguous, consider specifying an additional \
1220    layer of qualification"
1221    )]
1222    Ambiguous { name: String },
1223    #[error("invalid identifier: {0}")]
1224    Ident(#[from] IdentError),
1225}
1226
1227impl<'a> SourceReferenceResolver {
1228    /// Constructs a new `SourceReferenceResolver` from a slice of `T:
1229    /// SubsourceCatalogReference`.
1230    ///
1231    /// # Errors
1232    /// - If any `&str` provided cannot be taken to an [`Ident`].
1233    pub fn new<T: ExternalCatalogReference>(
1234        database: &str,
1235        referenceable_items: &'a [T],
1236    ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1237        // An index from table name -> schema name -> database name -> index in
1238        // `referenceable_items`.
1239        let mut inner = BTreeMap::new();
1240
1241        let database = Ident::new(database)?;
1242
1243        for (reference_idx, item) in referenceable_items.iter().enumerate() {
1244            let item_name = Ident::new(item.item_name())?;
1245            let schema_name = Ident::new(item.schema_name())?;
1246
1247            inner
1248                .entry(item_name)
1249                .or_insert_with(BTreeMap::new)
1250                .entry(schema_name)
1251                .or_insert_with(BTreeMap::new)
1252                .entry(database.clone())
1253                .or_insert(reference_idx);
1254        }
1255
1256        Ok(SourceReferenceResolver { inner })
1257    }
1258
1259    /// Returns the canonical reference and index from which it originated in
1260    /// the `referenceable_items` provided to [`Self::new`].
1261    ///
1262    /// # Args
1263    /// - `name` is `&[Ident]` to let users provide the inner element of
1264    ///   [`UnresolvedItemName`].
1265    /// - `canonicalize_to_width` limits the number of elements in the returned
1266    ///   [`UnresolvedItemName`];this is useful if the source type requires
1267    ///   contriving database and schema names that a subsource should not
1268    ///   persist as its reference.
1269    ///
1270    /// # Errors
1271    /// - If `name` does not resolve to an item in `self.inner`.
1272    ///
1273    /// # Panics
1274    /// - If `canonicalize_to_width`` is not in `1..=3`.
1275    pub fn resolve(
1276        &self,
1277        name: &[Ident],
1278        canonicalize_to_width: usize,
1279    ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1280        let (db, schema, idx) = self.resolve_inner(name)?;
1281
1282        let item = name.last().expect("must have provided at least 1 element");
1283
1284        let canonical_name = match canonicalize_to_width {
1285            1 => vec![item.clone()],
1286            2 => vec![schema.clone(), item.clone()],
1287            3 => vec![db.clone(), schema.clone(), item.clone()],
1288            o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1289        };
1290
1291        Ok((UnresolvedItemName(canonical_name), idx))
1292    }
1293
1294    /// Returns the index from which it originated in the `referenceable_items`
1295    /// provided to [`Self::new`].
1296    ///
1297    /// # Args
1298    /// `name` is `&[Ident]` to let users provide the inner element of
1299    /// [`UnresolvedItemName`].
1300    ///
1301    /// # Errors
1302    /// - If `name` does not resolve to an item in `self.inner`.
1303    pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1304        let (_db, _schema, idx) = self.resolve_inner(name)?;
1305        Ok(idx)
1306    }
1307
1308    /// Returns the index from which it originated in the `referenceable_items`
1309    /// provided to [`Self::new`].
1310    ///
1311    /// # Args
1312    /// `name` is `&[Ident]` to let users provide the inner element of
1313    /// [`UnresolvedItemName`].
1314    ///
1315    /// # Return
1316    /// Returns a tuple whose elements are:
1317    /// 1. The "database"- or top-level namespace of the reference.
1318    /// 2. The "schema"- or second-level namespace of the reference.
1319    /// 3. The index to find the item in `referenceable_items` argument provided
1320    ///    to `SourceReferenceResolver::new`.
1321    ///
1322    /// # Errors
1323    /// - If `name` does not resolve to an item in `self.inner`.
1324    fn resolve_inner<'name: 'a>(
1325        &'a self,
1326        name: &'name [Ident],
1327    ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1328        let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1329
1330        // Names must be composed of 1..=3 elements.
1331        if !(1..=3).contains(&name.len()) {
1332            Err(ExternalReferenceResolutionError::DoesNotExist {
1333                name: get_provided_name(),
1334            })?;
1335        }
1336
1337        // Fill on the leading elements with `None` if they aren't present.
1338        let mut names = std::iter::repeat(None)
1339            .take(3 - name.len())
1340            .chain(name.iter().map(Some));
1341
1342        let database = names.next().flatten();
1343        let schema = names.next().flatten();
1344        let item = names
1345            .next()
1346            .flatten()
1347            .expect("must have provided the item name");
1348
1349        assert_none!(names.next(), "expected a 3-element iterator");
1350
1351        let schemas =
1352            self.inner
1353                .get(item)
1354                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1355                    name: get_provided_name(),
1356                })?;
1357
1358        let schema = match schema {
1359            Some(schema) => schema,
1360            None => schemas.keys().exactly_one().map_err(|_e| {
1361                ExternalReferenceResolutionError::Ambiguous {
1362                    name: get_provided_name(),
1363                }
1364            })?,
1365        };
1366
1367        let databases =
1368            schemas
1369                .get(schema)
1370                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1371                    name: get_provided_name(),
1372                })?;
1373
1374        let database = match database {
1375            Some(database) => database,
1376            None => databases.keys().exactly_one().map_err(|_e| {
1377                ExternalReferenceResolutionError::Ambiguous {
1378                    name: get_provided_name(),
1379                }
1380            })?,
1381        };
1382
1383        let reference_idx = databases.get(database).ok_or_else(|| {
1384            ExternalReferenceResolutionError::DoesNotExist {
1385                name: get_provided_name(),
1386            }
1387        })?;
1388
1389        Ok((database, schema, *reference_idx))
1390    }
1391}
1392
1393/// A decoder for [`Row`]s within [`SourceData`].
1394///
1395/// This type exists as a wrapper around [`RowColumnarDecoder`] to handle the
1396/// case where the [`RelationDesc`] we're encoding with has no columns. See
1397/// [`SourceDataRowColumnarEncoder`] for more details.
1398#[derive(Debug)]
1399pub enum SourceDataRowColumnarDecoder {
1400    Row(RowColumnarDecoder),
1401    EmptyRow,
1402}
1403
1404impl SourceDataRowColumnarDecoder {
1405    pub fn decode(&self, idx: usize, row: &mut Row) {
1406        match self {
1407            SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1408            SourceDataRowColumnarDecoder::EmptyRow => {
1409                // Create a packer just to clear the Row.
1410                row.packer();
1411            }
1412        }
1413    }
1414
1415    pub fn goodbytes(&self) -> usize {
1416        match self {
1417            SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1418            SourceDataRowColumnarDecoder::EmptyRow => 0,
1419        }
1420    }
1421}
1422
1423#[derive(Debug)]
1424pub struct SourceDataColumnarDecoder {
1425    row_decoder: SourceDataRowColumnarDecoder,
1426    err_decoder: BinaryArray,
1427}
1428
1429impl SourceDataColumnarDecoder {
1430    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1431        // TODO(parkmcar): We should validate the fields here.
1432        let (_fields, arrays, nullability) = col.into_parts();
1433
1434        if nullability.is_some() {
1435            anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1436        }
1437        if arrays.len() != 2 {
1438            anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1439        }
1440
1441        let errs = arrays[1]
1442            .as_any()
1443            .downcast_ref::<BinaryArray>()
1444            .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1445
1446        let row_decoder = match arrays[0].data_type() {
1447            arrow::datatypes::DataType::Struct(_) => {
1448                let rows = arrays[0]
1449                    .as_any()
1450                    .downcast_ref::<StructArray>()
1451                    .ok_or_else(|| {
1452                        anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1453                    })?;
1454                let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1455                SourceDataRowColumnarDecoder::Row(decoder)
1456            }
1457            arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1458            other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1459        };
1460
1461        Ok(SourceDataColumnarDecoder {
1462            row_decoder,
1463            err_decoder: errs.clone(),
1464        })
1465    }
1466}
1467
1468impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1469    fn decode(&self, idx: usize, val: &mut SourceData) {
1470        let err_null = self.err_decoder.is_null(idx);
1471        let row_null = match &self.row_decoder {
1472            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1473            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1474        };
1475
1476        match (row_null, err_null) {
1477            (true, false) => {
1478                let err = self.err_decoder.value(idx);
1479                let err = ProtoDataflowError::decode(err)
1480                    .expect("proto should be valid")
1481                    .into_rust()
1482                    .expect("error should be valid");
1483                val.0 = Err(err);
1484            }
1485            (false, true) => {
1486                let row = match val.0.as_mut() {
1487                    Ok(row) => row,
1488                    Err(_) => {
1489                        val.0 = Ok(Row::default());
1490                        val.0.as_mut().unwrap()
1491                    }
1492                };
1493                self.row_decoder.decode(idx, row);
1494            }
1495            (true, true) => panic!("should have one of 'ok' or 'err'"),
1496            (false, false) => panic!("cannot have both 'ok' and 'err'"),
1497        }
1498    }
1499
1500    fn is_null(&self, idx: usize) -> bool {
1501        let err_null = self.err_decoder.is_null(idx);
1502        let row_null = match &self.row_decoder {
1503            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1504            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1505        };
1506        assert!(!err_null || !row_null, "SourceData should never be null!");
1507
1508        false
1509    }
1510
1511    fn goodbytes(&self) -> usize {
1512        self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1513    }
1514
1515    fn stats(&self) -> StructStats {
1516        let len = self.err_decoder.len();
1517        let err_stats = ColumnarStats {
1518            nulls: Some(ColumnNullStats {
1519                count: self.err_decoder.null_count(),
1520            }),
1521            values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1522        };
1523        // The top level struct is non-nullable and every entry is either an
1524        // `Ok(Row)` or an `Err(String)`. As a result, we can compute the number
1525        // of `Ok` entries by subtracting the number of `Err` entries from the
1526        // total count.
1527        let row_null_count = len - self.err_decoder.null_count();
1528        let row_stats = match &self.row_decoder {
1529            SourceDataRowColumnarDecoder::Row(encoder) => {
1530                // Sanity check that the number of row nulls/nones we calculated
1531                // using the error column matches what the row column thinks it
1532                // has.
1533                assert_eq!(encoder.null_count(), row_null_count);
1534                encoder.stats()
1535            }
1536            SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1537                len,
1538                cols: BTreeMap::default(),
1539            },
1540        };
1541        let row_stats = ColumnarStats {
1542            nulls: Some(ColumnNullStats {
1543                count: row_null_count,
1544            }),
1545            values: ColumnStatKinds::Struct(row_stats),
1546        };
1547
1548        let stats = [
1549            (
1550                SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1551                row_stats,
1552            ),
1553            (
1554                SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1555                err_stats,
1556            ),
1557        ];
1558        StructStats {
1559            len,
1560            cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1561        }
1562    }
1563}
1564
1565/// An encoder for [`Row`]s within [`SourceData`].
1566///
1567/// This type exists as a wrapper around [`RowColumnarEncoder`] to support
1568/// encoding empty [`Row`]s. A [`RowColumnarEncoder`] finishes as a
1569/// [`StructArray`] which is required to have at least one column, and thus
1570/// cannot support empty [`Row`]s.
1571#[derive(Debug)]
1572pub enum SourceDataRowColumnarEncoder {
1573    Row(RowColumnarEncoder),
1574    EmptyRow,
1575}
1576
1577impl SourceDataRowColumnarEncoder {
1578    pub(crate) fn goodbytes(&self) -> usize {
1579        match self {
1580            SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1581            SourceDataRowColumnarEncoder::EmptyRow => 0,
1582        }
1583    }
1584
1585    pub fn append(&mut self, row: &Row) {
1586        match self {
1587            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1588            SourceDataRowColumnarEncoder::EmptyRow => {
1589                assert_eq!(row.iter().count(), 0)
1590            }
1591        }
1592    }
1593
1594    pub fn append_null(&mut self) {
1595        match self {
1596            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1597            SourceDataRowColumnarEncoder::EmptyRow => (),
1598        }
1599    }
1600}
1601
1602#[derive(Debug)]
1603pub struct SourceDataColumnarEncoder {
1604    row_encoder: SourceDataRowColumnarEncoder,
1605    err_encoder: BinaryBuilder,
1606}
1607
1608impl SourceDataColumnarEncoder {
1609    const OK_COLUMN_NAME: &'static str = "ok";
1610    const ERR_COLUMN_NAME: &'static str = "err";
1611
1612    pub fn new(desc: &RelationDesc) -> Self {
1613        let row_encoder = match RowColumnarEncoder::new(desc) {
1614            Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1615            None => {
1616                assert!(desc.typ().columns().is_empty());
1617                SourceDataRowColumnarEncoder::EmptyRow
1618            }
1619        };
1620        let err_encoder = BinaryBuilder::new();
1621
1622        SourceDataColumnarEncoder {
1623            row_encoder,
1624            err_encoder,
1625        }
1626    }
1627}
1628
1629impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1630    type FinishedColumn = StructArray;
1631
1632    fn goodbytes(&self) -> usize {
1633        self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1634    }
1635
1636    #[inline]
1637    fn append(&mut self, val: &SourceData) {
1638        match val.0.as_ref() {
1639            Ok(row) => {
1640                self.row_encoder.append(row);
1641                self.err_encoder.append_null();
1642            }
1643            Err(err) => {
1644                self.row_encoder.append_null();
1645                self.err_encoder
1646                    .append_value(err.into_proto().encode_to_vec());
1647            }
1648        }
1649    }
1650
1651    #[inline]
1652    fn append_null(&mut self) {
1653        panic!("appending a null into SourceDataColumnarEncoder is not supported");
1654    }
1655
1656    fn finish(self) -> Self::FinishedColumn {
1657        let SourceDataColumnarEncoder {
1658            row_encoder,
1659            mut err_encoder,
1660        } = self;
1661
1662        let err_column = BinaryBuilder::finish(&mut err_encoder);
1663        let row_column: ArrayRef = match row_encoder {
1664            SourceDataRowColumnarEncoder::Row(encoder) => {
1665                let column = encoder.finish();
1666                Arc::new(column)
1667            }
1668            SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1669        };
1670
1671        assert_eq!(row_column.len(), err_column.len());
1672
1673        let fields = vec![
1674            Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1675            Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1676        ];
1677        let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1678        StructArray::new(Fields::from(fields), arrays, None)
1679    }
1680}
1681
1682impl Schema<SourceData> for RelationDesc {
1683    type ArrowColumn = StructArray;
1684    type Statistics = StructStats;
1685
1686    type Decoder = SourceDataColumnarDecoder;
1687    type Encoder = SourceDataColumnarEncoder;
1688
1689    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1690        SourceDataColumnarDecoder::new(col, self)
1691    }
1692
1693    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1694        Ok(SourceDataColumnarEncoder::new(self))
1695    }
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700    use arrow::array::{ArrayData, make_comparator};
1701    use base64::Engine;
1702    use bytes::Bytes;
1703    use mz_expr::EvalError;
1704    use mz_ore::assert_err;
1705    use mz_ore::metrics::MetricsRegistry;
1706    use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
1707    use mz_persist::metrics::ColumnarMetrics;
1708    use mz_persist_types::parquet::EncodingConfig;
1709    use mz_persist_types::schema::{Migration, backward_compatible};
1710    use mz_persist_types::stats::{PartStats, PartStatsMetrics};
1711    use mz_repr::{
1712        ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
1713        RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
1714    };
1715    use proptest::prelude::*;
1716    use proptest::strategy::{Union, ValueTree};
1717
1718    use crate::stats::RelationPartStats;
1719
1720    use super::*;
1721
1722    #[mz_ore::test]
1723    fn test_timeline_parsing() {
1724        assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
1725        assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
1726        assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
1727
1728        assert_err!("Materialize".parse::<Timeline>());
1729        assert_err!("Ejoe".parse::<Timeline>());
1730        assert_err!("Umike".parse::<Timeline>());
1731        assert_err!("Dance".parse::<Timeline>());
1732        assert_err!("".parse::<Timeline>());
1733    }
1734
1735    #[track_caller]
1736    fn roundtrip_source_data(
1737        desc: &RelationDesc,
1738        datas: Vec<SourceData>,
1739        read_desc: &RelationDesc,
1740        config: &EncodingConfig,
1741    ) {
1742        let metrics = ColumnarMetrics::disconnected();
1743        let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
1744        for data in &datas {
1745            encoder.append(data);
1746        }
1747        let col = encoder.finish();
1748
1749        // The top-level StructArray for SourceData should always be non-nullable.
1750        assert!(!col.is_nullable());
1751
1752        // Reallocate our arrays with lgalloc.
1753        let col = realloc_array(&col, &metrics);
1754
1755        // Roundtrip through ProtoArray format.
1756        {
1757            let proto = col.to_data().into_proto();
1758            let bytes = proto.encode_to_vec();
1759            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
1760            let array_data: ArrayData = proto.into_rust().unwrap();
1761
1762            let col_rnd = StructArray::from(array_data.clone());
1763            assert_eq!(col, col_rnd);
1764
1765            let col_dyn = arrow::array::make_array(array_data);
1766            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
1767            assert_eq!(&col, col_dyn);
1768        }
1769
1770        // Encode to Parquet.
1771        let mut buf = Vec::new();
1772        let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
1773        let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
1774        mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
1775
1776        // Decode from Parquet.
1777        let buf = Bytes::from(buf);
1778        let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
1779        let maybe_batch = reader.next();
1780
1781        // If we didn't encode any data then our record_batch will be empty.
1782        let Some(record_batch) = maybe_batch else {
1783            assert!(datas.is_empty());
1784            return;
1785        };
1786        let record_batch = record_batch.unwrap();
1787
1788        assert_eq!(record_batch.columns().len(), 1);
1789        let rnd_col = &record_batch.columns()[0];
1790        let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
1791        let rnd_col = rnd_col
1792            .as_any()
1793            .downcast_ref::<StructArray>()
1794            .unwrap()
1795            .clone();
1796
1797        // Try generating stats for the data, just to make sure we don't panic.
1798        let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
1799            .expect("valid decoder")
1800            .stats();
1801
1802        // Read back all of our data and assert it roundtrips.
1803        let mut rnd_data = SourceData(Ok(Row::default()));
1804        let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
1805        for (idx, og_data) in datas.iter().enumerate() {
1806            decoder.decode(idx, &mut rnd_data);
1807            assert_eq!(og_data, &rnd_data);
1808        }
1809
1810        // Read back all of our data a second time with a projection applied, and make sure the
1811        // stats are valid.
1812        let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
1813        let stats = RelationPartStats {
1814            name: "test",
1815            metrics: &stats_metrics,
1816            stats: &PartStats { key: stats },
1817            desc: read_desc,
1818        };
1819        let mut datum_vec = DatumVec::new();
1820        let arena = RowArena::default();
1821        let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
1822
1823        for (idx, og_data) in datas.iter().enumerate() {
1824            decoder.decode(idx, &mut rnd_data);
1825            match (&og_data.0, &rnd_data.0) {
1826                (Ok(og_row), Ok(rnd_row)) => {
1827                    // Filter down to just the Datums in the projection schema.
1828                    {
1829                        let datums = datum_vec.borrow_with(og_row);
1830                        let projected_datums =
1831                            datums.iter().enumerate().filter_map(|(idx, datum)| {
1832                                read_desc
1833                                    .contains_index(&ColumnIndex::from_raw(idx))
1834                                    .then_some(datum)
1835                            });
1836                        let og_projected_row = Row::pack(projected_datums);
1837                        assert_eq!(&og_projected_row, rnd_row);
1838                    }
1839
1840                    // Validate the stats for all of our projected columns.
1841                    {
1842                        let proj_datums = datum_vec.borrow_with(rnd_row);
1843                        for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
1844                            let spec = stats.col_stats(idx, &arena);
1845                            assert!(spec.may_contain(proj_datums[pos]));
1846                        }
1847                    }
1848                }
1849                (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
1850                (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
1851            }
1852        }
1853
1854        // Verify that the RelationDesc itself roundtrips through
1855        // {encode,decode}_schema.
1856        let encoded_schema = SourceData::encode_schema(desc);
1857        let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
1858        assert_eq!(desc, &roundtrip_desc);
1859
1860        // Verify that the RelationDesc is backward compatible with itself (this
1861        // mostly checks for `unimplemented!` type panics).
1862        let migration =
1863            mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
1864        let migration = migration.expect("should be backward compatible with self");
1865        // Also verify that the Fn doesn't do anything wonky.
1866        let migrated = migration.migrate(Arc::new(col.clone()));
1867        assert_eq!(col.data_type(), migrated.data_type());
1868    }
1869
1870    #[mz_ore::test]
1871    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1872    fn all_source_data_roundtrips() {
1873        let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
1874        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1875            weights.extend([
1876                (10, Just(32..128)),
1877                (5, Just(128..512)),
1878                (3, Just(512..2048)),
1879                (1, Just(2048..8192)),
1880            ]);
1881        }
1882        let num_rows = Union::new_weighted(weights);
1883
1884        // TODO(parkmycar): There are so many clones going on here, and maybe we can avoid them?
1885        let strat = (any::<RelationDesc>(), num_rows)
1886            .prop_flat_map(|(desc, num_rows)| {
1887                arb_relation_desc_projection(desc.clone())
1888                    .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
1889            })
1890            .prop_flat_map(|(desc, read_desc, num_rows)| {
1891                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
1892                    .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
1893            });
1894
1895        let combined_strat = (any::<EncodingConfig>(), strat);
1896        proptest!(|((config, (desc, source_datas, read_desc)) in combined_strat)| {
1897            roundtrip_source_data(&desc, source_datas, &read_desc, &config);
1898        });
1899    }
1900
1901    #[mz_ore::test]
1902    fn roundtrip_error_nulls() {
1903        let desc = RelationDescBuilder::default()
1904            .with_column(
1905                "ts",
1906                SqlScalarType::TimestampTz { precision: None }.nullable(false),
1907            )
1908            .finish();
1909        let source_datas = vec![SourceData(Err(DataflowError::EvalError(
1910            EvalError::DateOutOfRange.into(),
1911        )))];
1912        let config = EncodingConfig::default();
1913        roundtrip_source_data(&desc, source_datas, &desc, &config);
1914    }
1915
1916    fn is_sorted(array: &dyn Array) -> bool {
1917        let sort_options = arrow::compute::SortOptions::default();
1918        let Ok(cmp) = make_comparator(array, array, sort_options) else {
1919            // TODO: arrow v51.0.0 doesn't support comparing structs. When
1920            // we migrate to v52+, the `build_compare` function is
1921            // deprecated and replaced by `make_comparator`, which does
1922            // support structs. At which point, this will work (and we
1923            // should switch this early return to an expect, if possible).
1924            return false;
1925        };
1926        (0..array.len())
1927            .tuple_windows()
1928            .all(|(i, j)| cmp(i, j).is_le())
1929    }
1930
1931    fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
1932        use mz_persist_types::columnar::ColumnEncoder;
1933        let array = Schema::encoder(schema).expect("valid schema").finish();
1934        Array::data_type(&array).clone()
1935    }
1936
1937    #[track_caller]
1938    fn backward_compatible_testcase(
1939        old: &RelationDesc,
1940        new: &RelationDesc,
1941        migration: Migration,
1942        datas: &[SourceData],
1943    ) {
1944        let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
1945        for data in datas {
1946            encoder.append(data);
1947        }
1948        let old = encoder.finish();
1949        let new = Schema::<SourceData>::encoder(new)
1950            .expect("valid schema")
1951            .finish();
1952        let old: Arc<dyn Array> = Arc::new(old);
1953        let new: Arc<dyn Array> = Arc::new(new);
1954        let migrated = migration.migrate(Arc::clone(&old));
1955        assert_eq!(migrated.data_type(), new.data_type());
1956
1957        // Check the sortedness preservation, if we can.
1958        if migration.preserves_order() && is_sorted(&old) {
1959            assert!(is_sorted(&new))
1960        }
1961    }
1962
1963    #[mz_ore::test]
1964    fn backward_compatible_empty_add_column() {
1965        let old = RelationDesc::empty();
1966        let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1967
1968        let old_data_type = get_data_type(&old);
1969        let new_data_type = get_data_type(&new);
1970
1971        let migration = backward_compatible(&old_data_type, &new_data_type);
1972        assert!(migration.is_some());
1973    }
1974
1975    #[mz_ore::test]
1976    fn backward_compatible_project_away_all() {
1977        let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1978        let new = RelationDesc::empty();
1979
1980        let old_data_type = get_data_type(&old);
1981        let new_data_type = get_data_type(&new);
1982
1983        let migration = backward_compatible(&old_data_type, &new_data_type);
1984        assert!(migration.is_some());
1985    }
1986
1987    #[mz_ore::test]
1988    #[cfg_attr(miri, ignore)]
1989    fn backward_compatible_migrate() {
1990        let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
1991            proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
1992                .prop_map(move |datas| (old.clone(), new.clone(), datas))
1993        });
1994
1995        proptest!(|((old, new, datas) in strat)| {
1996            let old_data_type = get_data_type(&old);
1997            let new_data_type = get_data_type(&new);
1998
1999            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2000                backward_compatible_testcase(&old, &new, migration, &datas);
2001            };
2002        });
2003    }
2004
2005    #[mz_ore::test]
2006    #[cfg_attr(miri, ignore)]
2007    fn backward_compatible_migrate_from_common() {
2008        use mz_repr::SqlColumnType;
2009        fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2010            // TODO(parkmycar): As we iterate on schema migrations more things should become compatible.
2011            let should_be_compatible = diffs.iter().all(|diff| match diff {
2012                // We only support adding nullable columns.
2013                PropRelationDescDiff::AddColumn {
2014                    typ: SqlColumnType { nullable, .. },
2015                    ..
2016                } => *nullable,
2017                PropRelationDescDiff::DropColumn { .. } => true,
2018                _ => false,
2019            });
2020
2021            let mut new = old.clone();
2022            for diff in diffs.into_iter() {
2023                diff.apply(&mut new)
2024            }
2025
2026            let old_data_type = get_data_type(&old);
2027            let new_data_type = get_data_type(&new);
2028
2029            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2030                backward_compatible_testcase(&old, &new, migration, &datas);
2031            } else if should_be_compatible {
2032                panic!("new DataType was not compatible when it should have been!");
2033            }
2034        }
2035
2036        let strat = any::<RelationDesc>()
2037            .prop_flat_map(|desc| {
2038                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2039                    .no_shrink()
2040                    .prop_map(move |datas| (desc.clone(), datas))
2041            })
2042            .prop_flat_map(|(desc, datas)| {
2043                arb_relation_desc_diff(&desc)
2044                    .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2045            });
2046
2047        proptest!(|((old, diffs, datas) in strat)| {
2048            test_case(old, diffs, datas);
2049        });
2050    }
2051
2052    #[mz_ore::test]
2053    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2054    fn empty_relation_desc_roundtrips() {
2055        let empty = RelationDesc::empty();
2056        let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2057            .prop_map(move |datas| (empty.clone(), datas));
2058
2059        // Note: This case should be covered by the `all_source_data_roundtrips` test above, but
2060        // it's a special case that we explicitly want to exercise.
2061        proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2062            roundtrip_source_data(&desc, source_datas, &desc, &config);
2063        });
2064    }
2065
2066    #[mz_ore::test]
2067    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2068    fn arrow_datatype_consistent() {
2069        fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2070            let half = datas.len() / 2;
2071
2072            let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2073            for data in &datas[..half] {
2074                encoder_a.append(data);
2075            }
2076            let col_a = encoder_a.finish();
2077
2078            let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2079            for data in &datas[half..] {
2080                encoder_b.append(data);
2081            }
2082            let col_b = encoder_b.finish();
2083
2084            // The DataType of the resulting column should not change based on what data was
2085            // encoded.
2086            assert_eq!(col_a.data_type(), col_b.data_type());
2087        }
2088
2089        let num_rows = 12;
2090        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2091            proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2092                .prop_map(move |datas| (desc.clone(), datas))
2093        });
2094
2095        proptest!(|((desc, data) in strat)| {
2096            test_case(desc, data);
2097        });
2098    }
2099
2100    #[mz_ore::test]
2101    #[cfg_attr(miri, ignore)] // too slow
2102    fn source_proto_serialization_stability() {
2103        let min_protos = 10;
2104        let encoded = include_str!("snapshots/source-datas.txt");
2105
2106        // Decode the pre-generated source datas
2107        let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2108            .lines()
2109            .map(|s| {
2110                let (desc, data) = s.split_once(',').expect("comma separated data");
2111                let desc = base64::engine::general_purpose::STANDARD
2112                    .decode(desc)
2113                    .expect("valid base64");
2114                let data = base64::engine::general_purpose::STANDARD
2115                    .decode(data)
2116                    .expect("valid base64");
2117                (desc, data)
2118            })
2119            .map(|(desc, data)| {
2120                let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2121                let desc = desc.into_rust().expect("valid proto");
2122                let data = SourceData::decode(&data, &desc).expect("valid proto");
2123                (desc, data)
2124            })
2125            .collect();
2126
2127        // If there are fewer than the minimum examples, generate some new ones arbitrarily
2128        let mut runner = proptest::test_runner::TestRunner::deterministic();
2129        let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2130            arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2131        });
2132        while decoded.len() < min_protos {
2133            let arbitrary_data = strategy
2134                .new_tree(&mut runner)
2135                .expect("source data")
2136                .current();
2137            decoded.push(arbitrary_data);
2138        }
2139
2140        // Reencode and compare the strings
2141        let mut reencoded = String::new();
2142        let mut buf = vec![];
2143        for (desc, data) in decoded {
2144            buf.clear();
2145            desc.into_proto().encode(&mut buf).expect("success");
2146            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2147            reencoded.push(',');
2148
2149            buf.clear();
2150            data.encode(&mut buf);
2151            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2152            reencoded.push('\n');
2153        }
2154
2155        // Optimizations in Persist, particularly consolidation on read,
2156        // depend on a stable serialization for the serialized data.
2157        // For example, reordering proto fields could cause us
2158        // to generate a different (equivalent) serialization for a record,
2159        // and the two versions would not consolidate out.
2160        // This can impact correctness!
2161        //
2162        // If you need to change how SourceDatas are encoded, that's still fine...
2163        // but we'll also need to increase
2164        // the MINIMUM_CONSOLIDATED_VERSION as part of the same release.
2165        assert_eq!(
2166            encoded,
2167            reencoded.as_str(),
2168            "SourceData serde should be stable"
2169        )
2170    }
2171}