Skip to main content

mz_storage_types/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to the introduction of changing collections into `dataflow`.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34    StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
37#[cfg(any(test, feature = "proptest"))]
38use mz_repr::arb_row_for_relation;
39use mz_repr::{
40    CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
41    RowColumnarDecoder, RowColumnarEncoder,
42};
43use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
44#[cfg(any(test, feature = "proptest"))]
45use proptest::prelude::any;
46#[cfg(any(test, feature = "proptest"))]
47use proptest::strategy::Strategy;
48use prost::Message;
49use serde::{Deserialize, Serialize};
50use timely::order::{PartialOrder, TotalOrder};
51use timely::progress::timestamp::Refines;
52use timely::progress::{PathSummary, Timestamp};
53
54use crate::AlterCompatible;
55use crate::connections::inline::{
56    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
57    ReferencedConnection,
58};
59use crate::controller::AlterError;
60use crate::errors::{DataflowError, ProtoDataflowError};
61use crate::instances::StorageInstanceId;
62use crate::sources::sql_server::SqlServerSourceExportDetails;
63
64pub mod casts;
65pub mod encoding;
66pub mod envelope;
67pub mod kafka;
68pub mod load_generator;
69pub mod mysql;
70pub mod postgres;
71pub mod sql_server;
72
73pub use crate::sources::envelope::SourceEnvelope;
74pub use crate::sources::kafka::KafkaSourceConnection;
75pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
76pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
77pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
78pub use crate::sources::sql_server::{SqlServerSourceConnection, SqlServerSourceExtras};
79
80include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
81
82/// A description of a source ingestion
83#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
84pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
85    /// The source description.
86    pub desc: SourceDesc<C>,
87    /// Collections to be exported by this ingestion.
88    ///
89    /// # Notes
90    /// - For multi-output sources:
91    ///     - Add exports by adding a new [`SourceExport`].
92    ///     - Remove exports by removing the [`SourceExport`].
93    ///
94    ///   Re-rendering/executing the source after making these modifications
95    ///   adds and drops the subsource, respectively.
96    /// - This field includes the primary source's ID, which might need to be
97    ///   filtered out to understand which exports are explicit ingestion exports.
98    /// - This field does _not_ include the remap collection, which is tracked
99    ///   in its own field.
100    pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
101    /// The ID of the instance in which to install the source.
102    pub instance_id: StorageInstanceId,
103    /// The ID of this ingestion's remap/progress collection.
104    pub remap_collection_id: GlobalId,
105    /// The storage metadata for the remap/progress collection
106    pub remap_metadata: S,
107}
108
109impl IngestionDescription {
110    pub fn new(
111        desc: SourceDesc,
112        instance_id: StorageInstanceId,
113        remap_collection_id: GlobalId,
114    ) -> Self {
115        Self {
116            desc,
117            remap_metadata: (),
118            source_exports: BTreeMap::new(),
119            instance_id,
120            remap_collection_id,
121        }
122    }
123}
124
125impl<S> IngestionDescription<S> {
126    /// Return an iterator over the `GlobalId`s of `self`'s collections.
127    /// This will contain ids for the remap collection, subsources,
128    /// tables for this source, and the primary collection ID, even if
129    /// no data will be exported to the primary collection.
130    pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
131        // Expand self so that any new fields added generate a compiler error to
132        // increase the likelihood of developers seeing this function.
133        let IngestionDescription {
134            desc: _,
135            remap_metadata: _,
136            source_exports,
137            instance_id: _,
138            remap_collection_id,
139        } = &self;
140
141        source_exports
142            .keys()
143            .copied()
144            .chain(std::iter::once(*remap_collection_id))
145    }
146}
147
148impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
149    fn alter_compatible(
150        &self,
151        id: GlobalId,
152        other: &IngestionDescription<S>,
153    ) -> Result<(), AlterError> {
154        if self == other {
155            return Ok(());
156        }
157        let IngestionDescription {
158            desc,
159            remap_metadata,
160            source_exports,
161            instance_id,
162            remap_collection_id,
163        } = self;
164
165        let compatibility_checks = [
166            (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
167            (remap_metadata == &other.remap_metadata, "remap_metadata"),
168            (
169                source_exports
170                    .iter()
171                    .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
172                        l_key.cmp(r_key)
173                    })
174                    .all(|r| match r {
175                        Both(
176                            (
177                                _,
178                                SourceExport {
179                                    storage_metadata: l_metadata,
180                                    details: l_details,
181                                    data_config: l_data_config,
182                                },
183                            ),
184                            (
185                                _,
186                                SourceExport {
187                                    storage_metadata: r_metadata,
188                                    details: r_details,
189                                    data_config: r_data_config,
190                                },
191                            ),
192                        ) => {
193                            l_metadata.alter_compatible(id, r_metadata).is_ok()
194                                && l_details.alter_compatible(id, r_details).is_ok()
195                                && l_data_config.alter_compatible(id, r_data_config).is_ok()
196                        }
197                        _ => true,
198                    }),
199                "source_exports",
200            ),
201            (instance_id == &other.instance_id, "instance_id"),
202            (
203                remap_collection_id == &other.remap_collection_id,
204                "remap_collection_id",
205            ),
206        ];
207        for (compatible, field) in compatibility_checks {
208            if !compatible {
209                tracing::warn!(
210                    "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
211                    self,
212                    other
213                );
214
215                return Err(AlterError { id });
216            }
217        }
218
219        Ok(())
220    }
221}
222
223impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
224    for IngestionDescription<(), ReferencedConnection>
225{
226    fn into_inline_connection(self, r: R) -> IngestionDescription {
227        let IngestionDescription {
228            desc,
229            remap_metadata,
230            source_exports,
231            instance_id,
232            remap_collection_id,
233        } = self;
234
235        IngestionDescription {
236            desc: desc.into_inline_connection(r),
237            remap_metadata,
238            source_exports,
239            instance_id,
240            remap_collection_id,
241        }
242    }
243}
244
245#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
246pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
247    /// The collection metadata needed to write the exported data
248    pub storage_metadata: S,
249    /// Details necessary for the source to export data to this export's collection.
250    pub details: SourceExportDetails,
251    /// Config necessary to handle (e.g. decode and envelope) the data for this export.
252    pub data_config: SourceExportDataConfig<C>,
253}
254
255pub trait SourceTimestamp:
256    Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
257{
258    fn encode_row(&self) -> Row;
259    fn decode_row(row: &Row) -> Self;
260}
261
262impl SourceTimestamp for MzOffset {
263    fn encode_row(&self) -> Row {
264        Row::pack([Datum::UInt64(self.offset)])
265    }
266
267    fn decode_row(row: &Row) -> Self {
268        let mut datums = row.iter();
269        match (datums.next(), datums.next()) {
270            (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
271            _ => panic!("invalid row {row:?}"),
272        }
273    }
274}
275
276/// Universal language for describing message positions in Materialize, in a source independent
277/// way. Individual sources like Kafka or File sources should explicitly implement their own offset
278/// type that converts to/From MzOffsets. A 0-MzOffset denotes an empty stream.
279#[derive(
280    Copy,
281    Clone,
282    Default,
283    Debug,
284    PartialEq,
285    PartialOrd,
286    Eq,
287    Ord,
288    Hash,
289    Serialize,
290    Deserialize
291)]
292pub struct MzOffset {
293    pub offset: u64,
294}
295
296impl differential_dataflow::difference::Semigroup for MzOffset {
297    fn plus_equals(&mut self, rhs: &Self) {
298        self.offset.plus_equals(&rhs.offset)
299    }
300}
301
302impl differential_dataflow::difference::IsZero for MzOffset {
303    fn is_zero(&self) -> bool {
304        self.offset.is_zero()
305    }
306}
307
308impl mz_persist_types::Codec64 for MzOffset {
309    fn codec_name() -> String {
310        "MzOffset".to_string()
311    }
312
313    fn encode(&self) -> [u8; 8] {
314        mz_persist_types::Codec64::encode(&self.offset)
315    }
316
317    fn decode(buf: [u8; 8]) -> Self {
318        Self {
319            offset: mz_persist_types::Codec64::decode(buf),
320        }
321    }
322}
323
324impl columnation::Columnation for MzOffset {
325    type InnerRegion = columnation::CopyRegion<MzOffset>;
326}
327
328impl MzOffset {
329    pub fn checked_sub(self, other: Self) -> Option<Self> {
330        self.offset
331            .checked_sub(other.offset)
332            .map(|offset| Self { offset })
333    }
334}
335
336/// Convert from MzOffset to Kafka::Offset as long as
337/// the offset is not negative
338impl From<u64> for MzOffset {
339    fn from(offset: u64) -> Self {
340        Self { offset }
341    }
342}
343
344impl std::fmt::Display for MzOffset {
345    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346        write!(f, "{}", self.offset)
347    }
348}
349
350// Assume overflow does not occur for addition
351impl Add<u64> for MzOffset {
352    type Output = MzOffset;
353
354    fn add(self, x: u64) -> MzOffset {
355        MzOffset {
356            offset: self.offset + x,
357        }
358    }
359}
360impl Add<Self> for MzOffset {
361    type Output = Self;
362
363    fn add(self, x: Self) -> Self {
364        MzOffset {
365            offset: self.offset + x.offset,
366        }
367    }
368}
369impl AddAssign<u64> for MzOffset {
370    fn add_assign(&mut self, x: u64) {
371        self.offset += x;
372    }
373}
374impl AddAssign<Self> for MzOffset {
375    fn add_assign(&mut self, x: Self) {
376        self.offset += x.offset;
377    }
378}
379
380/// Convert from `PgLsn` to MzOffset
381impl From<tokio_postgres::types::PgLsn> for MzOffset {
382    fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
383        MzOffset { offset: lsn.into() }
384    }
385}
386
387impl Timestamp for MzOffset {
388    type Summary = MzOffset;
389
390    fn minimum() -> Self {
391        MzOffset {
392            offset: Timestamp::minimum(),
393        }
394    }
395}
396
397impl PathSummary<MzOffset> for MzOffset {
398    fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
399        Some(MzOffset {
400            offset: self.offset.results_in(&src.offset)?,
401        })
402    }
403
404    fn followed_by(&self, other: &Self) -> Option<Self> {
405        Some(MzOffset {
406            offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
407        })
408    }
409}
410
411impl Refines<()> for MzOffset {
412    fn to_inner(_: ()) -> Self {
413        MzOffset::minimum()
414    }
415    fn to_outer(self) {}
416    fn summarize(_: Self::Summary) {}
417}
418
419impl PartialOrder for MzOffset {
420    #[inline]
421    fn less_equal(&self, other: &Self) -> bool {
422        self.offset.less_equal(&other.offset)
423    }
424}
425
426impl TotalOrder for MzOffset {}
427
428/// The meaning of the timestamp number produced by data sources. This type
429/// is not concerned with the source of the timestamp (like if the data came
430/// from a Debezium consistency topic or a CDCv2 stream), instead only what the
431/// timestamp number means.
432///
433/// Some variants here have attached data used to differentiate incomparable
434/// instantiations. These attached data types should be expanded in the future
435/// if we need to tell apart more kinds of sources.
436#[derive(
437    Clone,
438    Debug,
439    Ord,
440    PartialOrd,
441    Eq,
442    PartialEq,
443    Serialize,
444    Deserialize,
445    Hash
446)]
447pub enum Timeline {
448    /// EpochMilliseconds means the timestamp is the number of milliseconds since
449    /// the Unix epoch.
450    EpochMilliseconds,
451    /// External means the timestamp comes from an external data source and we
452    /// don't know what the number means. The attached String is the source's name,
453    /// which will result in different sources being incomparable.
454    External(String),
455    /// User means the user has manually specified a timeline. The attached
456    /// String is specified by the user, allowing them to decide sources that are
457    /// joinable.
458    User(String),
459}
460
461impl Timeline {
462    const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
463    const EXTERNAL_ID_CHAR: char = 'E';
464    const USER_ID_CHAR: char = 'U';
465
466    fn id_char(&self) -> char {
467        match self {
468            Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
469            Self::External(_) => Self::EXTERNAL_ID_CHAR,
470            Self::User(_) => Self::USER_ID_CHAR,
471        }
472    }
473}
474
475impl ToString for Timeline {
476    fn to_string(&self) -> String {
477        match self {
478            Self::EpochMilliseconds => format!("{}", self.id_char()),
479            Self::External(id) => format!("{}.{id}", self.id_char()),
480            Self::User(id) => format!("{}.{id}", self.id_char()),
481        }
482    }
483}
484
485impl FromStr for Timeline {
486    type Err = String;
487
488    fn from_str(s: &str) -> Result<Self, Self::Err> {
489        if s.is_empty() {
490            return Err("empty timeline".to_string());
491        }
492        let mut chars = s.chars();
493        match chars.next().expect("non-empty string") {
494            Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
495                None => Ok(Self::EpochMilliseconds),
496                Some(_) => Err(format!("unknown timeline: {s}")),
497            },
498            Self::EXTERNAL_ID_CHAR => match chars.next() {
499                Some('.') => Ok(Self::External(chars.as_str().to_string())),
500                _ => Err(format!("unknown timeline: {s}")),
501            },
502            Self::USER_ID_CHAR => match chars.next() {
503                Some('.') => Ok(Self::User(chars.as_str().to_string())),
504                _ => Err(format!("unknown timeline: {s}")),
505            },
506            _ => Err(format!("unknown timeline: {s}")),
507        }
508    }
509}
510
511/// A connection to an external system
512pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
513    /// The name of the external system (e.g kafka, postgres, etc).
514    fn name(&self) -> &'static str;
515
516    /// The name of the resource in the external system (e.g kafka topic) if any
517    fn external_reference(&self) -> Option<&str>;
518
519    /// Defines the key schema to use by default for this source connection type.
520    /// This will be used for the primary export of the source and as the default
521    /// pre-encoding key schema for the source.
522    fn default_key_desc(&self) -> RelationDesc;
523
524    /// Defines the value schema to use by default for this source connection type.
525    /// This will be used for the primary export of the source and as the default
526    /// pre-encoding value schema for the source.
527    fn default_value_desc(&self) -> RelationDesc;
528
529    /// The schema of this connection's timestamp type. This will also be the schema of the
530    /// progress relation.
531    fn timestamp_desc(&self) -> RelationDesc;
532
533    /// The id of the connection object (i.e the one obtained from running `CREATE CONNECTION`) in
534    /// the catalog, if any.
535    fn connection_id(&self) -> Option<CatalogItemId>;
536
537    /// Whether the source type supports read only mode.
538    fn supports_read_only(&self) -> bool;
539
540    /// Whether the source type prefers to run on only one replica of a multi-replica cluster.
541    fn prefers_single_replica(&self) -> bool;
542}
543
544#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
545pub enum Compression {
546    Gzip,
547    None,
548}
549
550/// Defines the configuration for how to handle data that is exported for a given
551/// Source Export.
552#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
553pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
554    pub encoding: Option<encoding::SourceDataEncoding<C>>,
555    pub envelope: SourceEnvelope,
556}
557
558impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
559    for SourceExportDataConfig<ReferencedConnection>
560{
561    fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
562        let SourceExportDataConfig { encoding, envelope } = self;
563
564        SourceExportDataConfig {
565            encoding: encoding.map(|e| e.into_inline_connection(r)),
566            envelope,
567        }
568    }
569}
570
571impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
572    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
573        if self == other {
574            return Ok(());
575        }
576        let Self { encoding, envelope } = &self;
577
578        let compatibility_checks = [
579            (
580                match (encoding, &other.encoding) {
581                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
582                    (s, o) => s == o,
583                },
584                "encoding",
585            ),
586            (envelope == &other.envelope, "envelope"),
587        ];
588
589        for (compatible, field) in compatibility_checks {
590            if !compatible {
591                tracing::warn!(
592                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
593                    self,
594                    other
595                );
596
597                return Err(AlterError { id });
598            }
599        }
600        Ok(())
601    }
602}
603
604impl<C: ConnectionAccess> SourceExportDataConfig<C> {
605    /// Returns `true` if this connection yields data that is
606    /// append-only/monotonic. Append-monly means the source
607    /// never produces retractions.
608    // TODO(guswynn): consider enforcing this more completely at the
609    // parsing/typechecking level, by not using an `envelope`
610    // for sources like pg
611    pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
612        match &self.envelope {
613            // Upsert and CdcV2 may produce retractions.
614            SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
615            SourceEnvelope::None(_) => {
616                match connection {
617                    // Postgres can produce retractions (deletes).
618                    GenericSourceConnection::Postgres(_) => false,
619                    // MySQL can produce retractions (deletes).
620                    GenericSourceConnection::MySql(_) => false,
621                    // SQL Server can produce retractions (deletes).
622                    GenericSourceConnection::SqlServer(_) => false,
623                    // Whether or not a Loadgen source can produce retractions varies.
624                    GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
625                    // Kafka exports with `None` envelope are append-only.
626                    GenericSourceConnection::Kafka(_) => true,
627                }
628            }
629        }
630    }
631}
632
633/// An external source of updates for a relational collection.
634#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
635pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
636    pub connection: GenericSourceConnection<C>,
637    pub timestamp_interval: Duration,
638}
639
640impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
641    for SourceDesc<ReferencedConnection>
642{
643    fn into_inline_connection(self, r: R) -> SourceDesc {
644        let SourceDesc {
645            connection,
646            timestamp_interval,
647        } = self;
648
649        SourceDesc {
650            connection: connection.into_inline_connection(&r),
651            timestamp_interval,
652        }
653    }
654}
655
656impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
657    /// Determines if `self` is compatible with another `SourceDesc`, in such a
658    /// way that it is possible to turn `self` into `other` through a valid
659    /// series of transformations (e.g. no transformation or `ALTER SOURCE`).
660    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
661        if self == other {
662            return Ok(());
663        }
664        let Self {
665            connection,
666            // timestamp_interval is allowed to change via ALTER SOURCE
667            timestamp_interval: _,
668        } = &self;
669
670        let compatibility_checks = [(
671            connection.alter_compatible(id, &other.connection).is_ok(),
672            "connection",
673        )];
674
675        for (compatible, field) in compatibility_checks {
676            if !compatible {
677                tracing::warn!(
678                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
679                    self,
680                    other
681                );
682
683                return Err(AlterError { id });
684            }
685        }
686
687        Ok(())
688    }
689}
690
691#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
692pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
693    Kafka(KafkaSourceConnection<C>),
694    Postgres(PostgresSourceConnection<C>),
695    MySql(MySqlSourceConnection<C>),
696    SqlServer(SqlServerSourceConnection<C>),
697    LoadGenerator(LoadGeneratorSourceConnection),
698}
699
700impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
701    fn from(conn: KafkaSourceConnection<C>) -> Self {
702        Self::Kafka(conn)
703    }
704}
705
706impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
707    fn from(conn: PostgresSourceConnection<C>) -> Self {
708        Self::Postgres(conn)
709    }
710}
711
712impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
713    fn from(conn: MySqlSourceConnection<C>) -> Self {
714        Self::MySql(conn)
715    }
716}
717
718impl<C: ConnectionAccess> From<SqlServerSourceConnection<C>> for GenericSourceConnection<C> {
719    fn from(conn: SqlServerSourceConnection<C>) -> Self {
720        Self::SqlServer(conn)
721    }
722}
723
724impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
725    fn from(conn: LoadGeneratorSourceConnection) -> Self {
726        Self::LoadGenerator(conn)
727    }
728}
729
730impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
731    for GenericSourceConnection<ReferencedConnection>
732{
733    fn into_inline_connection(self, r: R) -> GenericSourceConnection {
734        match self {
735            GenericSourceConnection::Kafka(kafka) => {
736                GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
737            }
738            GenericSourceConnection::Postgres(pg) => {
739                GenericSourceConnection::Postgres(pg.into_inline_connection(r))
740            }
741            GenericSourceConnection::MySql(mysql) => {
742                GenericSourceConnection::MySql(mysql.into_inline_connection(r))
743            }
744            GenericSourceConnection::SqlServer(sql_server) => {
745                GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
746            }
747            GenericSourceConnection::LoadGenerator(lg) => {
748                GenericSourceConnection::LoadGenerator(lg)
749            }
750        }
751    }
752}
753
754impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
755    fn name(&self) -> &'static str {
756        match self {
757            Self::Kafka(conn) => conn.name(),
758            Self::Postgres(conn) => conn.name(),
759            Self::MySql(conn) => conn.name(),
760            Self::SqlServer(conn) => conn.name(),
761            Self::LoadGenerator(conn) => conn.name(),
762        }
763    }
764
765    fn external_reference(&self) -> Option<&str> {
766        match self {
767            Self::Kafka(conn) => conn.external_reference(),
768            Self::Postgres(conn) => conn.external_reference(),
769            Self::MySql(conn) => conn.external_reference(),
770            Self::SqlServer(conn) => conn.external_reference(),
771            Self::LoadGenerator(conn) => conn.external_reference(),
772        }
773    }
774
775    fn default_key_desc(&self) -> RelationDesc {
776        match self {
777            Self::Kafka(conn) => conn.default_key_desc(),
778            Self::Postgres(conn) => conn.default_key_desc(),
779            Self::MySql(conn) => conn.default_key_desc(),
780            Self::SqlServer(conn) => conn.default_key_desc(),
781            Self::LoadGenerator(conn) => conn.default_key_desc(),
782        }
783    }
784
785    fn default_value_desc(&self) -> RelationDesc {
786        match self {
787            Self::Kafka(conn) => conn.default_value_desc(),
788            Self::Postgres(conn) => conn.default_value_desc(),
789            Self::MySql(conn) => conn.default_value_desc(),
790            Self::SqlServer(conn) => conn.default_value_desc(),
791            Self::LoadGenerator(conn) => conn.default_value_desc(),
792        }
793    }
794
795    fn timestamp_desc(&self) -> RelationDesc {
796        match self {
797            Self::Kafka(conn) => conn.timestamp_desc(),
798            Self::Postgres(conn) => conn.timestamp_desc(),
799            Self::MySql(conn) => conn.timestamp_desc(),
800            Self::SqlServer(conn) => conn.timestamp_desc(),
801            Self::LoadGenerator(conn) => conn.timestamp_desc(),
802        }
803    }
804
805    fn connection_id(&self) -> Option<CatalogItemId> {
806        match self {
807            Self::Kafka(conn) => conn.connection_id(),
808            Self::Postgres(conn) => conn.connection_id(),
809            Self::MySql(conn) => conn.connection_id(),
810            Self::SqlServer(conn) => conn.connection_id(),
811            Self::LoadGenerator(conn) => conn.connection_id(),
812        }
813    }
814
815    fn supports_read_only(&self) -> bool {
816        match self {
817            GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
818            GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
819            GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
820            GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
821            GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
822        }
823    }
824
825    fn prefers_single_replica(&self) -> bool {
826        match self {
827            GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
828            GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
829            GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
830            GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
831            GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
832        }
833    }
834}
835impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
836    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
837        if self == other {
838            return Ok(());
839        }
840        let r = match (self, other) {
841            (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
842            (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
843            (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
844            (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
845            (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
846                conn.alter_compatible(id, other)
847            }
848            _ => Err(AlterError { id }),
849        };
850
851        if r.is_err() {
852            tracing::warn!(
853                "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
854                self,
855                other
856            );
857        }
858
859        r
860    }
861}
862
863/// Details necessary for each source export to allow the source implementations
864/// to export data to the export's collection.
865#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
866pub enum SourceExportDetails {
867    /// Used when the primary collection of a source isn't an export to
868    /// output to.
869    None,
870    Kafka(KafkaSourceExportDetails),
871    Postgres(PostgresSourceExportDetails),
872    MySql(MySqlSourceExportDetails),
873    SqlServer(SqlServerSourceExportDetails),
874    LoadGenerator(LoadGeneratorSourceExportDetails),
875}
876
877impl crate::AlterCompatible for SourceExportDetails {
878    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
879        if self == other {
880            return Ok(());
881        }
882        let r = match (self, other) {
883            (Self::None, Self::None) => Ok(()),
884            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
885            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
886            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
887            (Self::SqlServer(s), Self::SqlServer(o)) => s.alter_compatible(id, o),
888            (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
889            _ => Err(AlterError { id }),
890        };
891
892        if r.is_err() {
893            tracing::warn!(
894                "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
895                self,
896                other
897            );
898        }
899
900        r
901    }
902}
903
904/// Details necessary to store in the `Details` option of a source export
905/// statement (`CREATE SUBSOURCE` and `CREATE TABLE .. FROM SOURCE` statements),
906/// to generate the appropriate `SourceExportDetails` struct during planning.
907/// NOTE that this is serialized as proto to the catalog, so any changes here
908/// must be backwards compatible or will require a migration.
909pub enum SourceExportStatementDetails {
910    Postgres {
911        table: mz_postgres_util::desc::PostgresTableDesc,
912    },
913    MySql {
914        table: mz_mysql_util::MySqlTableDesc,
915        initial_gtid_set: String,
916        binlog_full_metadata: bool,
917    },
918    SqlServer {
919        table: mz_sql_server_util::desc::SqlServerTableDesc,
920        capture_instance: Arc<str>,
921        initial_lsn: mz_sql_server_util::cdc::Lsn,
922    },
923    LoadGenerator {
924        output: LoadGeneratorOutput,
925    },
926    Kafka {},
927}
928
929impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
930    fn into_proto(&self) -> ProtoSourceExportStatementDetails {
931        match self {
932            SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
933                kind: Some(proto_source_export_statement_details::Kind::Postgres(
934                    postgres::ProtoPostgresSourceExportStatementDetails {
935                        table: Some(table.into_proto()),
936                    },
937                )),
938            },
939            SourceExportStatementDetails::MySql {
940                table,
941                initial_gtid_set,
942                binlog_full_metadata,
943            } => ProtoSourceExportStatementDetails {
944                kind: Some(proto_source_export_statement_details::Kind::Mysql(
945                    mysql::ProtoMySqlSourceExportStatementDetails {
946                        table: Some(table.into_proto()),
947                        initial_gtid_set: initial_gtid_set.clone(),
948                        binlog_full_metadata: *binlog_full_metadata,
949                    },
950                )),
951            },
952            SourceExportStatementDetails::SqlServer {
953                table,
954                capture_instance,
955                initial_lsn,
956            } => ProtoSourceExportStatementDetails {
957                kind: Some(proto_source_export_statement_details::Kind::SqlServer(
958                    sql_server::ProtoSqlServerSourceExportStatementDetails {
959                        table: Some(table.into_proto()),
960                        capture_instance: capture_instance.to_string(),
961                        initial_lsn: initial_lsn.as_bytes().to_vec(),
962                    },
963                )),
964            },
965            SourceExportStatementDetails::LoadGenerator { output } => {
966                ProtoSourceExportStatementDetails {
967                    kind: Some(proto_source_export_statement_details::Kind::Loadgen(
968                        load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
969                            output: output.into_proto().into(),
970                        },
971                    )),
972                }
973            }
974            SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
975                kind: Some(proto_source_export_statement_details::Kind::Kafka(
976                    kafka::ProtoKafkaSourceExportStatementDetails {},
977                )),
978            },
979        }
980    }
981
982    fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
983        use proto_source_export_statement_details::Kind;
984        Ok(match proto.kind {
985            Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
986                table: details
987                    .table
988                    .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
989            },
990            Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
991                table: details
992                    .table
993                    .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
994
995                initial_gtid_set: details.initial_gtid_set,
996                binlog_full_metadata: details.binlog_full_metadata,
997            },
998            Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
999                table: details
1000                    .table
1001                    .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
1002                capture_instance: details.capture_instance.into(),
1003                initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
1004                    .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
1005            },
1006            Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
1007                output: details
1008                    .output
1009                    .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1010            },
1011            Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1012            None => {
1013                return Err(TryFromProtoError::missing_field(
1014                    "ProtoSourceExportStatementDetails::kind",
1015                ));
1016            }
1017        })
1018    }
1019}
1020
1021#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1022#[repr(transparent)]
1023pub struct SourceData(pub Result<Row, DataflowError>);
1024
1025impl Default for SourceData {
1026    fn default() -> Self {
1027        SourceData(Ok(Row::default()))
1028    }
1029}
1030
1031impl Deref for SourceData {
1032    type Target = Result<Row, DataflowError>;
1033
1034    fn deref(&self) -> &Self::Target {
1035        &self.0
1036    }
1037}
1038
1039impl DerefMut for SourceData {
1040    fn deref_mut(&mut self) -> &mut Self::Target {
1041        &mut self.0
1042    }
1043}
1044
1045impl RustType<ProtoSourceData> for SourceData {
1046    fn into_proto(&self) -> ProtoSourceData {
1047        use proto_source_data::Kind;
1048        ProtoSourceData {
1049            kind: Some(match &**self {
1050                Ok(row) => Kind::Ok(row.into_proto()),
1051                Err(err) => Kind::Err(err.into_proto()),
1052            }),
1053        }
1054    }
1055
1056    fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1057        use proto_source_data::Kind;
1058        match proto.kind {
1059            Some(kind) => match kind {
1060                Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1061                Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1062            },
1063            None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1064        }
1065    }
1066}
1067
1068impl Codec for SourceData {
1069    type Storage = ProtoRow;
1070    type Schema = RelationDesc;
1071
1072    fn codec_name() -> String {
1073        "protobuf[SourceData]".into()
1074    }
1075
1076    fn encode<B: BufMut>(&self, buf: &mut B) {
1077        self.into_proto()
1078            .encode(buf)
1079            .expect("no required fields means no initialization errors");
1080    }
1081
1082    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1083        let mut val = SourceData::default();
1084        <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1085        Ok(val)
1086    }
1087
1088    fn decode_from<'a>(
1089        &mut self,
1090        buf: &'a [u8],
1091        storage: &mut Option<ProtoRow>,
1092        schema: &RelationDesc,
1093    ) -> Result<(), String> {
1094        // Optimize for common case of `Ok` by leaving a (cleared) `ProtoRow` in
1095        // the `Ok` variant of `ProtoSourceData`. prost's `Message::merge` impl
1096        // is smart about reusing the `Vec<Datum>` when it can.
1097        let mut proto = storage.take().unwrap_or_default();
1098        proto.clear();
1099        let mut proto = ProtoSourceData {
1100            kind: Some(proto_source_data::Kind::Ok(proto)),
1101        };
1102        proto.merge(buf).map_err(|err| err.to_string())?;
1103        match (proto.kind, &mut self.0) {
1104            // Again, optimize for the common case...
1105            (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1106                let ret = row.decode_from_proto(&proto, schema);
1107                storage.replace(proto);
1108                ret
1109            }
1110            // ...otherwise fall back to the obvious thing.
1111            (kind, _) => {
1112                let proto = ProtoSourceData { kind };
1113                *self = proto.into_rust().map_err(|err| err.to_string())?;
1114                // Nothing to put back in storage.
1115                Ok(())
1116            }
1117        }
1118    }
1119
1120    fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1121        match &val.0 {
1122            Ok(row) => Row::validate(row, desc),
1123            Err(_) => Ok(()),
1124        }
1125    }
1126
1127    fn encode_schema(schema: &Self::Schema) -> Bytes {
1128        schema.into_proto().encode_to_vec().into()
1129    }
1130
1131    fn decode_schema(buf: &Bytes) -> Self::Schema {
1132        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1133        proto.into_rust().expect("valid schema")
1134    }
1135}
1136
1137/// Given a [`RelationDesc`] returns an arbitrary [`SourceData`].
1138#[cfg(any(test, feature = "proptest"))]
1139pub fn arb_source_data_for_relation_desc(
1140    desc: &RelationDesc,
1141) -> impl Strategy<Value = SourceData> + use<> {
1142    let row_strat = arb_row_for_relation(desc).no_shrink();
1143
1144    proptest::strategy::Union::new_weighted(vec![
1145        (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1146        (
1147            1,
1148            any::<DataflowError>()
1149                .prop_map(|err| SourceData(Err(err)))
1150                .no_shrink()
1151                .boxed(),
1152        ),
1153    ])
1154}
1155
1156/// Describes how external references should be organized in a multi-level
1157/// hierarchy.
1158///
1159/// For both PostgreSQL and MySQL sources, these levels of reference are
1160/// intrinsic to the items which we're referencing. If there are other naming
1161/// schemas for other types of sources we discover, we might need to revisit
1162/// this.
1163pub trait ExternalCatalogReference {
1164    /// The "second" level of namespacing for the reference.
1165    fn schema_name(&self) -> &str;
1166    /// The lowest level of namespacing for the reference.
1167    fn item_name(&self) -> &str;
1168}
1169
1170impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1171    fn schema_name(&self) -> &str {
1172        &self.schema_name
1173    }
1174
1175    fn item_name(&self) -> &str {
1176        &self.name
1177    }
1178}
1179
1180impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1181    fn schema_name(&self) -> &str {
1182        &self.namespace
1183    }
1184
1185    fn item_name(&self) -> &str {
1186        &self.name
1187    }
1188}
1189
1190impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1191    fn schema_name(&self) -> &str {
1192        &*self.schema_name
1193    }
1194
1195    fn item_name(&self) -> &str {
1196        &*self.name
1197    }
1198}
1199
1200// This implementation provides a means of converting arbitrary objects into a
1201// `SubsourceCatalogReference`, e.g. load generator view names.
1202impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1203    fn schema_name(&self) -> &str {
1204        self.0
1205    }
1206
1207    fn item_name(&self) -> &str {
1208        self.1
1209    }
1210}
1211
1212/// Stores and resolves references to a `&[T: ExternalCatalogReference]`.
1213///
1214/// This is meant to provide an API to quickly look up a source's subsources.
1215///
1216/// For sources that do not provide any subsources, use the `Default`
1217/// implementation, which is empty and will not be able to resolve any
1218/// references.
1219#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1220pub struct SourceReferenceResolver {
1221    inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1222}
1223
1224#[derive(Debug, Clone, thiserror::Error)]
1225pub enum ExternalReferenceResolutionError {
1226    #[error("reference to {name} not found in source")]
1227    DoesNotExist { name: String },
1228    #[error(
1229        "reference {name} is ambiguous, consider specifying an additional \
1230    layer of qualification"
1231    )]
1232    Ambiguous { name: String },
1233    #[error("invalid identifier: {0}")]
1234    Ident(#[from] IdentError),
1235}
1236
1237impl<'a> SourceReferenceResolver {
1238    /// Constructs a new `SourceReferenceResolver` from a slice of `T:
1239    /// SubsourceCatalogReference`.
1240    ///
1241    /// # Errors
1242    /// - If any `&str` provided cannot be taken to an [`Ident`].
1243    pub fn new<T: ExternalCatalogReference>(
1244        database: &str,
1245        referenceable_items: &'a [T],
1246    ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1247        // An index from table name -> schema name -> database name -> index in
1248        // `referenceable_items`.
1249        let mut inner = BTreeMap::new();
1250
1251        let database = Ident::new(database)?;
1252
1253        for (reference_idx, item) in referenceable_items.iter().enumerate() {
1254            let item_name = Ident::new(item.item_name())?;
1255            let schema_name = Ident::new(item.schema_name())?;
1256
1257            inner
1258                .entry(item_name)
1259                .or_insert_with(BTreeMap::new)
1260                .entry(schema_name)
1261                .or_insert_with(BTreeMap::new)
1262                .entry(database.clone())
1263                .or_insert(reference_idx);
1264        }
1265
1266        Ok(SourceReferenceResolver { inner })
1267    }
1268
1269    /// Returns the canonical reference and index from which it originated in
1270    /// the `referenceable_items` provided to [`Self::new`].
1271    ///
1272    /// # Args
1273    /// - `name` is `&[Ident]` to let users provide the inner element of
1274    ///   [`UnresolvedItemName`].
1275    /// - `canonicalize_to_width` limits the number of elements in the returned
1276    ///   [`UnresolvedItemName`];this is useful if the source type requires
1277    ///   contriving database and schema names that a subsource should not
1278    ///   persist as its reference.
1279    ///
1280    /// # Errors
1281    /// - If `name` does not resolve to an item in `self.inner`.
1282    ///
1283    /// # Panics
1284    /// - If `canonicalize_to_width`` is not in `1..=3`.
1285    pub fn resolve(
1286        &self,
1287        name: &[Ident],
1288        canonicalize_to_width: usize,
1289    ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1290        let (db, schema, idx) = self.resolve_inner(name)?;
1291
1292        let item = name.last().expect("must have provided at least 1 element");
1293
1294        let canonical_name = match canonicalize_to_width {
1295            1 => vec![item.clone()],
1296            2 => vec![schema.clone(), item.clone()],
1297            3 => vec![db.clone(), schema.clone(), item.clone()],
1298            o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1299        };
1300
1301        Ok((UnresolvedItemName(canonical_name), idx))
1302    }
1303
1304    /// Returns the index from which it originated in the `referenceable_items`
1305    /// provided to [`Self::new`].
1306    ///
1307    /// # Args
1308    /// `name` is `&[Ident]` to let users provide the inner element of
1309    /// [`UnresolvedItemName`].
1310    ///
1311    /// # Errors
1312    /// - If `name` does not resolve to an item in `self.inner`.
1313    pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1314        let (_db, _schema, idx) = self.resolve_inner(name)?;
1315        Ok(idx)
1316    }
1317
1318    /// Returns the index from which it originated in the `referenceable_items`
1319    /// provided to [`Self::new`].
1320    ///
1321    /// # Args
1322    /// `name` is `&[Ident]` to let users provide the inner element of
1323    /// [`UnresolvedItemName`].
1324    ///
1325    /// # Return
1326    /// Returns a tuple whose elements are:
1327    /// 1. The "database"- or top-level namespace of the reference.
1328    /// 2. The "schema"- or second-level namespace of the reference.
1329    /// 3. The index to find the item in `referenceable_items` argument provided
1330    ///    to `SourceReferenceResolver::new`.
1331    ///
1332    /// # Errors
1333    /// - If `name` does not resolve to an item in `self.inner`.
1334    fn resolve_inner<'name: 'a>(
1335        &'a self,
1336        name: &'name [Ident],
1337    ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1338        let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1339
1340        // Names must be composed of 1..=3 elements.
1341        if !(1..=3).contains(&name.len()) {
1342            Err(ExternalReferenceResolutionError::DoesNotExist {
1343                name: get_provided_name(),
1344            })?;
1345        }
1346
1347        // Fill on the leading elements with `None` if they aren't present.
1348        let mut names = std::iter::repeat(None)
1349            .take(3 - name.len())
1350            .chain(name.iter().map(Some));
1351
1352        let database = names.next().flatten();
1353        let schema = names.next().flatten();
1354        let item = names
1355            .next()
1356            .flatten()
1357            .expect("must have provided the item name");
1358
1359        assert_none!(names.next(), "expected a 3-element iterator");
1360
1361        let schemas =
1362            self.inner
1363                .get(item)
1364                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1365                    name: get_provided_name(),
1366                })?;
1367
1368        let schema = match schema {
1369            Some(schema) => schema,
1370            None => schemas.keys().exactly_one().map_err(|_e| {
1371                ExternalReferenceResolutionError::Ambiguous {
1372                    name: get_provided_name(),
1373                }
1374            })?,
1375        };
1376
1377        let databases =
1378            schemas
1379                .get(schema)
1380                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1381                    name: get_provided_name(),
1382                })?;
1383
1384        let database = match database {
1385            Some(database) => database,
1386            None => databases.keys().exactly_one().map_err(|_e| {
1387                ExternalReferenceResolutionError::Ambiguous {
1388                    name: get_provided_name(),
1389                }
1390            })?,
1391        };
1392
1393        let reference_idx = databases.get(database).ok_or_else(|| {
1394            ExternalReferenceResolutionError::DoesNotExist {
1395                name: get_provided_name(),
1396            }
1397        })?;
1398
1399        Ok((database, schema, *reference_idx))
1400    }
1401}
1402
1403/// A decoder for [`Row`]s within [`SourceData`].
1404///
1405/// This type exists as a wrapper around [`RowColumnarDecoder`] to handle the
1406/// case where the [`RelationDesc`] we're encoding with has no columns. See
1407/// [`SourceDataRowColumnarEncoder`] for more details.
1408#[derive(Debug)]
1409pub enum SourceDataRowColumnarDecoder {
1410    Row(RowColumnarDecoder),
1411    EmptyRow,
1412}
1413
1414impl SourceDataRowColumnarDecoder {
1415    pub fn decode(&self, idx: usize, row: &mut Row) {
1416        match self {
1417            SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1418            SourceDataRowColumnarDecoder::EmptyRow => {
1419                // Create a packer just to clear the Row.
1420                row.packer();
1421            }
1422        }
1423    }
1424
1425    pub fn goodbytes(&self) -> usize {
1426        match self {
1427            SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1428            SourceDataRowColumnarDecoder::EmptyRow => 0,
1429        }
1430    }
1431}
1432
1433#[derive(Debug)]
1434pub struct SourceDataColumnarDecoder {
1435    row_decoder: SourceDataRowColumnarDecoder,
1436    err_decoder: BinaryArray,
1437}
1438
1439impl SourceDataColumnarDecoder {
1440    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1441        // TODO(parkmcar): We should validate the fields here.
1442        let (_fields, arrays, nullability) = col.into_parts();
1443
1444        if nullability.is_some() {
1445            anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1446        }
1447        if arrays.len() != 2 {
1448            anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1449        }
1450
1451        let errs = arrays[1]
1452            .as_any()
1453            .downcast_ref::<BinaryArray>()
1454            .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1455
1456        let row_decoder = match arrays[0].data_type() {
1457            arrow::datatypes::DataType::Struct(_) => {
1458                let rows = arrays[0]
1459                    .as_any()
1460                    .downcast_ref::<StructArray>()
1461                    .ok_or_else(|| {
1462                        anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1463                    })?;
1464                let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1465                SourceDataRowColumnarDecoder::Row(decoder)
1466            }
1467            arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1468            other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1469        };
1470
1471        Ok(SourceDataColumnarDecoder {
1472            row_decoder,
1473            err_decoder: errs.clone(),
1474        })
1475    }
1476}
1477
1478impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1479    fn decode(&self, idx: usize, val: &mut SourceData) {
1480        let err_null = self.err_decoder.is_null(idx);
1481        let row_null = match &self.row_decoder {
1482            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1483            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1484        };
1485
1486        match (row_null, err_null) {
1487            (true, false) => {
1488                let err = self.err_decoder.value(idx);
1489                let err = ProtoDataflowError::decode(err)
1490                    .expect("proto should be valid")
1491                    .into_rust()
1492                    .expect("error should be valid");
1493                val.0 = Err(err);
1494            }
1495            (false, true) => {
1496                let row = match val.0.as_mut() {
1497                    Ok(row) => row,
1498                    Err(_) => {
1499                        val.0 = Ok(Row::default());
1500                        val.0.as_mut().unwrap()
1501                    }
1502                };
1503                self.row_decoder.decode(idx, row);
1504            }
1505            (true, true) => panic!("should have one of 'ok' or 'err'"),
1506            (false, false) => panic!("cannot have both 'ok' and 'err'"),
1507        }
1508    }
1509
1510    fn is_null(&self, idx: usize) -> bool {
1511        let err_null = self.err_decoder.is_null(idx);
1512        let row_null = match &self.row_decoder {
1513            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1514            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1515        };
1516        assert!(!err_null || !row_null, "SourceData should never be null!");
1517
1518        false
1519    }
1520
1521    fn goodbytes(&self) -> usize {
1522        self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1523    }
1524
1525    fn stats(&self) -> StructStats {
1526        let len = self.err_decoder.len();
1527        let err_stats = ColumnarStats {
1528            nulls: Some(ColumnNullStats {
1529                count: self.err_decoder.null_count(),
1530            }),
1531            values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1532        };
1533        // The top level struct is non-nullable and every entry is either an
1534        // `Ok(Row)` or an `Err(String)`. As a result, we can compute the number
1535        // of `Ok` entries by subtracting the number of `Err` entries from the
1536        // total count.
1537        let row_null_count = len - self.err_decoder.null_count();
1538        let row_stats = match &self.row_decoder {
1539            SourceDataRowColumnarDecoder::Row(encoder) => {
1540                // Sanity check that the number of row nulls/nones we calculated
1541                // using the error column matches what the row column thinks it
1542                // has.
1543                assert_eq!(encoder.null_count(), row_null_count);
1544                encoder.stats()
1545            }
1546            SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1547                len,
1548                cols: BTreeMap::default(),
1549            },
1550        };
1551        let row_stats = ColumnarStats {
1552            nulls: Some(ColumnNullStats {
1553                count: row_null_count,
1554            }),
1555            values: ColumnStatKinds::Struct(row_stats),
1556        };
1557
1558        let stats = [
1559            (
1560                SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1561                row_stats,
1562            ),
1563            (
1564                SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1565                err_stats,
1566            ),
1567        ];
1568        StructStats {
1569            len,
1570            cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1571        }
1572    }
1573}
1574
1575/// An encoder for [`Row`]s within [`SourceData`].
1576///
1577/// This type exists as a wrapper around [`RowColumnarEncoder`] to support
1578/// encoding empty [`Row`]s. A [`RowColumnarEncoder`] finishes as a
1579/// [`StructArray`] which is required to have at least one column, and thus
1580/// cannot support empty [`Row`]s.
1581#[derive(Debug)]
1582pub enum SourceDataRowColumnarEncoder {
1583    Row(RowColumnarEncoder),
1584    EmptyRow,
1585}
1586
1587impl SourceDataRowColumnarEncoder {
1588    pub(crate) fn goodbytes(&self) -> usize {
1589        match self {
1590            SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1591            SourceDataRowColumnarEncoder::EmptyRow => 0,
1592        }
1593    }
1594
1595    pub fn append(&mut self, row: &Row) {
1596        match self {
1597            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1598            SourceDataRowColumnarEncoder::EmptyRow => {
1599                assert_eq!(row.iter().count(), 0)
1600            }
1601        }
1602    }
1603
1604    pub fn append_null(&mut self) {
1605        match self {
1606            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1607            SourceDataRowColumnarEncoder::EmptyRow => (),
1608        }
1609    }
1610}
1611
1612#[derive(Debug)]
1613pub struct SourceDataColumnarEncoder {
1614    row_encoder: SourceDataRowColumnarEncoder,
1615    err_encoder: BinaryBuilder,
1616}
1617
1618impl SourceDataColumnarEncoder {
1619    const OK_COLUMN_NAME: &'static str = "ok";
1620    const ERR_COLUMN_NAME: &'static str = "err";
1621
1622    pub fn new(desc: &RelationDesc) -> Self {
1623        let row_encoder = match RowColumnarEncoder::new(desc) {
1624            Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1625            None => {
1626                assert!(desc.typ().columns().is_empty());
1627                SourceDataRowColumnarEncoder::EmptyRow
1628            }
1629        };
1630        let err_encoder = BinaryBuilder::new();
1631
1632        SourceDataColumnarEncoder {
1633            row_encoder,
1634            err_encoder,
1635        }
1636    }
1637}
1638
1639impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1640    type FinishedColumn = StructArray;
1641
1642    fn goodbytes(&self) -> usize {
1643        self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1644    }
1645
1646    #[inline]
1647    fn append(&mut self, val: &SourceData) {
1648        match val.0.as_ref() {
1649            Ok(row) => {
1650                self.row_encoder.append(row);
1651                self.err_encoder.append_null();
1652            }
1653            Err(err) => {
1654                self.row_encoder.append_null();
1655                self.err_encoder
1656                    .append_value(err.into_proto().encode_to_vec());
1657            }
1658        }
1659    }
1660
1661    #[inline]
1662    fn append_null(&mut self) {
1663        panic!("appending a null into SourceDataColumnarEncoder is not supported");
1664    }
1665
1666    fn finish(self) -> Self::FinishedColumn {
1667        let SourceDataColumnarEncoder {
1668            row_encoder,
1669            mut err_encoder,
1670        } = self;
1671
1672        let err_column = BinaryBuilder::finish(&mut err_encoder);
1673        let row_column: ArrayRef = match row_encoder {
1674            SourceDataRowColumnarEncoder::Row(encoder) => {
1675                let column = encoder.finish();
1676                Arc::new(column)
1677            }
1678            SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1679        };
1680
1681        assert_eq!(row_column.len(), err_column.len());
1682
1683        let fields = vec![
1684            Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1685            Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1686        ];
1687        let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1688        StructArray::new(Fields::from(fields), arrays, None)
1689    }
1690}
1691
1692impl Schema<SourceData> for RelationDesc {
1693    type ArrowColumn = StructArray;
1694    type Statistics = StructStats;
1695
1696    type Decoder = SourceDataColumnarDecoder;
1697    type Encoder = SourceDataColumnarEncoder;
1698
1699    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1700        SourceDataColumnarDecoder::new(col, self)
1701    }
1702
1703    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1704        Ok(SourceDataColumnarEncoder::new(self))
1705    }
1706}
1707
1708#[cfg(test)]
1709mod tests {
1710    use arrow::array::{ArrayData, make_comparator};
1711    use base64::Engine;
1712    use bytes::Bytes;
1713    use mz_expr::EvalError;
1714    use mz_ore::assert_err;
1715    use mz_ore::metrics::MetricsRegistry;
1716    use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
1717    use mz_persist::metrics::ColumnarMetrics;
1718    use mz_persist_types::parquet::EncodingConfig;
1719    use mz_persist_types::schema::{Migration, backward_compatible};
1720    use mz_persist_types::stats::{PartStats, PartStatsMetrics};
1721    use mz_repr::{
1722        ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
1723        RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
1724    };
1725    use proptest::prelude::*;
1726    use proptest::strategy::{Union, ValueTree};
1727
1728    use crate::stats::RelationPartStats;
1729
1730    use super::*;
1731
1732    #[mz_ore::test]
1733    fn test_timeline_parsing() {
1734        assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
1735        assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
1736        assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
1737
1738        assert_err!("Materialize".parse::<Timeline>());
1739        assert_err!("Ejoe".parse::<Timeline>());
1740        assert_err!("Umike".parse::<Timeline>());
1741        assert_err!("Dance".parse::<Timeline>());
1742        assert_err!("".parse::<Timeline>());
1743    }
1744
1745    #[track_caller]
1746    fn roundtrip_source_data(
1747        desc: &RelationDesc,
1748        datas: Vec<SourceData>,
1749        read_desc: &RelationDesc,
1750        config: &EncodingConfig,
1751    ) {
1752        let metrics = ColumnarMetrics::disconnected();
1753        let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
1754        for data in &datas {
1755            encoder.append(data);
1756        }
1757        let col = encoder.finish();
1758
1759        // The top-level StructArray for SourceData should always be non-nullable.
1760        assert!(!col.is_nullable());
1761
1762        // Reallocate our arrays with lgalloc.
1763        let col = realloc_array(&col, &metrics);
1764
1765        // Roundtrip through ProtoArray format.
1766        {
1767            let proto = col.to_data().into_proto();
1768            let bytes = proto.encode_to_vec();
1769            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
1770            let array_data: ArrayData = proto.into_rust().unwrap();
1771
1772            let col_rnd = StructArray::from(array_data.clone());
1773            assert_eq!(col, col_rnd);
1774
1775            let col_dyn = arrow::array::make_array(array_data);
1776            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
1777            assert_eq!(&col, col_dyn);
1778        }
1779
1780        // Encode to Parquet.
1781        let mut buf = Vec::new();
1782        let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
1783        let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
1784        mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
1785
1786        // Decode from Parquet.
1787        let buf = Bytes::from(buf);
1788        let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
1789        let maybe_batch = reader.next();
1790
1791        // If we didn't encode any data then our record_batch will be empty.
1792        let Some(record_batch) = maybe_batch else {
1793            assert!(datas.is_empty());
1794            return;
1795        };
1796        let record_batch = record_batch.unwrap();
1797
1798        assert_eq!(record_batch.columns().len(), 1);
1799        let rnd_col = &record_batch.columns()[0];
1800        let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
1801        let rnd_col = rnd_col
1802            .as_any()
1803            .downcast_ref::<StructArray>()
1804            .unwrap()
1805            .clone();
1806
1807        // Try generating stats for the data, just to make sure we don't panic.
1808        let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
1809            .expect("valid decoder")
1810            .stats();
1811
1812        // Read back all of our data and assert it roundtrips.
1813        let mut rnd_data = SourceData(Ok(Row::default()));
1814        let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
1815        for (idx, og_data) in datas.iter().enumerate() {
1816            decoder.decode(idx, &mut rnd_data);
1817            assert_eq!(og_data, &rnd_data);
1818        }
1819
1820        // Read back all of our data a second time with a projection applied, and make sure the
1821        // stats are valid.
1822        let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
1823        let stats = RelationPartStats {
1824            name: "test",
1825            metrics: &stats_metrics,
1826            stats: &PartStats { key: stats },
1827            desc: read_desc,
1828        };
1829        let mut datum_vec = DatumVec::new();
1830        let arena = RowArena::default();
1831        let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
1832
1833        for (idx, og_data) in datas.iter().enumerate() {
1834            decoder.decode(idx, &mut rnd_data);
1835            match (&og_data.0, &rnd_data.0) {
1836                (Ok(og_row), Ok(rnd_row)) => {
1837                    // Filter down to just the Datums in the projection schema.
1838                    {
1839                        let datums = datum_vec.borrow_with(og_row);
1840                        let projected_datums =
1841                            datums.iter().enumerate().filter_map(|(idx, datum)| {
1842                                read_desc
1843                                    .contains_index(&ColumnIndex::from_raw(idx))
1844                                    .then_some(datum)
1845                            });
1846                        let og_projected_row = Row::pack(projected_datums);
1847                        assert_eq!(&og_projected_row, rnd_row);
1848                    }
1849
1850                    // Validate the stats for all of our projected columns.
1851                    {
1852                        let proj_datums = datum_vec.borrow_with(rnd_row);
1853                        for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
1854                            let spec = stats.col_stats(idx, &arena);
1855                            assert!(spec.may_contain(proj_datums[pos]));
1856                        }
1857                    }
1858                }
1859                (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
1860                (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
1861            }
1862        }
1863
1864        // Verify that the RelationDesc itself roundtrips through
1865        // {encode,decode}_schema.
1866        let encoded_schema = SourceData::encode_schema(desc);
1867        let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
1868        assert_eq!(desc, &roundtrip_desc);
1869
1870        // Verify that the RelationDesc is backward compatible with itself (this
1871        // mostly checks for `unimplemented!` type panics).
1872        let migration =
1873            mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
1874        let migration = migration.expect("should be backward compatible with self");
1875        // Also verify that the Fn doesn't do anything wonky.
1876        let migrated = migration.migrate(Arc::new(col.clone()));
1877        assert_eq!(col.data_type(), migrated.data_type());
1878    }
1879
1880    #[mz_ore::test]
1881    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1882    fn all_source_data_roundtrips() {
1883        let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
1884        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1885            weights.extend([
1886                (10, Just(32..128)),
1887                (5, Just(128..512)),
1888                (3, Just(512..2048)),
1889                (1, Just(2048..8192)),
1890            ]);
1891        }
1892        let num_rows = Union::new_weighted(weights);
1893
1894        // TODO(parkmycar): There are so many clones going on here, and maybe we can avoid them?
1895        let strat = (any::<RelationDesc>(), num_rows)
1896            .prop_flat_map(|(desc, num_rows)| {
1897                arb_relation_desc_projection(desc.clone())
1898                    .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
1899            })
1900            .prop_flat_map(|(desc, read_desc, num_rows)| {
1901                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
1902                    .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
1903            });
1904
1905        let combined_strat = (any::<EncodingConfig>(), strat);
1906        proptest!(|((config, (desc, source_datas, read_desc)) in combined_strat)| {
1907            roundtrip_source_data(&desc, source_datas, &read_desc, &config);
1908        });
1909    }
1910
1911    #[mz_ore::test]
1912    fn roundtrip_error_nulls() {
1913        let desc = RelationDescBuilder::default()
1914            .with_column(
1915                "ts",
1916                SqlScalarType::TimestampTz { precision: None }.nullable(false),
1917            )
1918            .finish();
1919        let source_datas = vec![SourceData(Err(DataflowError::EvalError(
1920            EvalError::DateOutOfRange.into(),
1921        )))];
1922        let config = EncodingConfig::default();
1923        roundtrip_source_data(&desc, source_datas, &desc, &config);
1924    }
1925
1926    fn is_sorted(array: &dyn Array) -> bool {
1927        let sort_options = arrow::compute::SortOptions::default();
1928        let Ok(cmp) = make_comparator(array, array, sort_options) else {
1929            // TODO: arrow v51.0.0 doesn't support comparing structs. When
1930            // we migrate to v52+, the `build_compare` function is
1931            // deprecated and replaced by `make_comparator`, which does
1932            // support structs. At which point, this will work (and we
1933            // should switch this early return to an expect, if possible).
1934            return false;
1935        };
1936        (0..array.len())
1937            .tuple_windows()
1938            .all(|(i, j)| cmp(i, j).is_le())
1939    }
1940
1941    fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
1942        use mz_persist_types::columnar::ColumnEncoder;
1943        let array = Schema::encoder(schema).expect("valid schema").finish();
1944        Array::data_type(&array).clone()
1945    }
1946
1947    #[track_caller]
1948    fn backward_compatible_testcase(
1949        old: &RelationDesc,
1950        new: &RelationDesc,
1951        migration: Migration,
1952        datas: &[SourceData],
1953    ) {
1954        let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
1955        for data in datas {
1956            encoder.append(data);
1957        }
1958        let old = encoder.finish();
1959        let new = Schema::<SourceData>::encoder(new)
1960            .expect("valid schema")
1961            .finish();
1962        let old: Arc<dyn Array> = Arc::new(old);
1963        let new: Arc<dyn Array> = Arc::new(new);
1964        let migrated = migration.migrate(Arc::clone(&old));
1965        assert_eq!(migrated.data_type(), new.data_type());
1966
1967        // Check the sortedness preservation, if we can.
1968        if migration.preserves_order() && is_sorted(&old) {
1969            assert!(is_sorted(&new))
1970        }
1971    }
1972
1973    #[mz_ore::test]
1974    fn backward_compatible_empty_add_column() {
1975        let old = RelationDesc::empty();
1976        let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1977
1978        let old_data_type = get_data_type(&old);
1979        let new_data_type = get_data_type(&new);
1980
1981        let migration = backward_compatible(&old_data_type, &new_data_type);
1982        assert!(migration.is_some());
1983    }
1984
1985    #[mz_ore::test]
1986    fn backward_compatible_project_away_all() {
1987        let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1988        let new = RelationDesc::empty();
1989
1990        let old_data_type = get_data_type(&old);
1991        let new_data_type = get_data_type(&new);
1992
1993        let migration = backward_compatible(&old_data_type, &new_data_type);
1994        assert!(migration.is_some());
1995    }
1996
1997    #[mz_ore::test]
1998    #[cfg_attr(miri, ignore)]
1999    fn backward_compatible_migrate() {
2000        let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
2001            proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
2002                .prop_map(move |datas| (old.clone(), new.clone(), datas))
2003        });
2004
2005        proptest!(|((old, new, datas) in strat)| {
2006            let old_data_type = get_data_type(&old);
2007            let new_data_type = get_data_type(&new);
2008
2009            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2010                backward_compatible_testcase(&old, &new, migration, &datas);
2011            };
2012        });
2013    }
2014
2015    #[mz_ore::test]
2016    #[cfg_attr(miri, ignore)]
2017    fn backward_compatible_migrate_from_common() {
2018        use mz_repr::SqlColumnType;
2019        fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2020            // TODO(parkmycar): As we iterate on schema migrations more things should become compatible.
2021            let should_be_compatible = diffs.iter().all(|diff| match diff {
2022                // We only support adding nullable columns.
2023                PropRelationDescDiff::AddColumn {
2024                    typ: SqlColumnType { nullable, .. },
2025                    ..
2026                } => *nullable,
2027                PropRelationDescDiff::DropColumn { .. } => true,
2028                _ => false,
2029            });
2030
2031            let mut new = old.clone();
2032            for diff in diffs.into_iter() {
2033                diff.apply(&mut new)
2034            }
2035
2036            let old_data_type = get_data_type(&old);
2037            let new_data_type = get_data_type(&new);
2038
2039            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2040                backward_compatible_testcase(&old, &new, migration, &datas);
2041            } else if should_be_compatible {
2042                panic!("new DataType was not compatible when it should have been!");
2043            }
2044        }
2045
2046        let strat = any::<RelationDesc>()
2047            .prop_flat_map(|desc| {
2048                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2049                    .no_shrink()
2050                    .prop_map(move |datas| (desc.clone(), datas))
2051            })
2052            .prop_flat_map(|(desc, datas)| {
2053                arb_relation_desc_diff(&desc)
2054                    .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2055            });
2056
2057        proptest!(|((old, diffs, datas) in strat)| {
2058            test_case(old, diffs, datas);
2059        });
2060    }
2061
2062    #[mz_ore::test]
2063    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2064    fn empty_relation_desc_roundtrips() {
2065        let empty = RelationDesc::empty();
2066        let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2067            .prop_map(move |datas| (empty.clone(), datas));
2068
2069        // Note: This case should be covered by the `all_source_data_roundtrips` test above, but
2070        // it's a special case that we explicitly want to exercise.
2071        proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2072            roundtrip_source_data(&desc, source_datas, &desc, &config);
2073        });
2074    }
2075
2076    #[mz_ore::test]
2077    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2078    fn arrow_datatype_consistent() {
2079        fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2080            let half = datas.len() / 2;
2081
2082            let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2083            for data in &datas[..half] {
2084                encoder_a.append(data);
2085            }
2086            let col_a = encoder_a.finish();
2087
2088            let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2089            for data in &datas[half..] {
2090                encoder_b.append(data);
2091            }
2092            let col_b = encoder_b.finish();
2093
2094            // The DataType of the resulting column should not change based on what data was
2095            // encoded.
2096            assert_eq!(col_a.data_type(), col_b.data_type());
2097        }
2098
2099        let num_rows = 12;
2100        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2101            proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2102                .prop_map(move |datas| (desc.clone(), datas))
2103        });
2104
2105        proptest!(|((desc, data) in strat)| {
2106            test_case(desc, data);
2107        });
2108    }
2109
2110    #[mz_ore::test]
2111    #[cfg_attr(miri, ignore)] // too slow
2112    fn source_proto_serialization_stability() {
2113        let min_protos = 10;
2114        let encoded = include_str!("snapshots/source-datas.txt");
2115
2116        // Decode the pre-generated source datas
2117        let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2118            .lines()
2119            .map(|s| {
2120                let (desc, data) = s.split_once(',').expect("comma separated data");
2121                let desc = base64::engine::general_purpose::STANDARD
2122                    .decode(desc)
2123                    .expect("valid base64");
2124                let data = base64::engine::general_purpose::STANDARD
2125                    .decode(data)
2126                    .expect("valid base64");
2127                (desc, data)
2128            })
2129            .map(|(desc, data)| {
2130                let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2131                let desc = desc.into_rust().expect("valid proto");
2132                let data = SourceData::decode(&data, &desc).expect("valid proto");
2133                (desc, data)
2134            })
2135            .collect();
2136
2137        // If there are fewer than the minimum examples, generate some new ones arbitrarily
2138        let mut runner = proptest::test_runner::TestRunner::deterministic();
2139        let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2140            arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2141        });
2142        while decoded.len() < min_protos {
2143            let arbitrary_data = strategy
2144                .new_tree(&mut runner)
2145                .expect("source data")
2146                .current();
2147            decoded.push(arbitrary_data);
2148        }
2149
2150        // Reencode and compare the strings
2151        let mut reencoded = String::new();
2152        let mut buf = vec![];
2153        for (desc, data) in decoded {
2154            buf.clear();
2155            desc.into_proto().encode(&mut buf).expect("success");
2156            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2157            reencoded.push(',');
2158
2159            buf.clear();
2160            data.encode(&mut buf);
2161            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2162            reencoded.push('\n');
2163        }
2164
2165        // Optimizations in Persist, particularly consolidation on read,
2166        // depend on a stable serialization for the serialized data.
2167        // For example, reordering proto fields could cause us
2168        // to generate a different (equivalent) serialization for a record,
2169        // and the two versions would not consolidate out.
2170        // This can impact correctness!
2171        //
2172        // If you need to change how SourceDatas are encoded, that's still fine...
2173        // but we'll also need to increase
2174        // the MINIMUM_CONSOLIDATED_VERSION as part of the same release.
2175        assert_eq!(
2176            encoded,
2177            reencoded.as_str(),
2178            "SourceData serde should be stable"
2179        )
2180    }
2181}