mz_storage_types/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to the introduction of changing collections into `dataflow`.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34    StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38    CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39    RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use timely::order::{PartialOrder, TotalOrder};
47use timely::progress::timestamp::Refines;
48use timely::progress::{PathSummary, Timestamp};
49
50use crate::AlterCompatible;
51use crate::connections::inline::{
52    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
53    ReferencedConnection,
54};
55use crate::controller::AlterError;
56use crate::errors::{DataflowError, ProtoDataflowError};
57use crate::instances::StorageInstanceId;
58use crate::sources::sql_server::SqlServerSourceExportDetails;
59
60pub mod encoding;
61pub mod envelope;
62pub mod kafka;
63pub mod load_generator;
64pub mod mysql;
65pub mod postgres;
66pub mod sql_server;
67
68pub use crate::sources::envelope::SourceEnvelope;
69pub use crate::sources::kafka::KafkaSourceConnection;
70pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
71pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
72pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
73pub use crate::sources::sql_server::{SqlServerSourceConnection, SqlServerSourceExtras};
74
75include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
76
77/// A description of a source ingestion
78#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
79pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
80    /// The source description.
81    pub desc: SourceDesc<C>,
82    /// Collections to be exported by this ingestion.
83    ///
84    /// # Notes
85    /// - For multi-output sources:
86    ///     - Add exports by adding a new [`SourceExport`].
87    ///     - Remove exports by removing the [`SourceExport`].
88    ///
89    ///   Re-rendering/executing the source after making these modifications
90    ///   adds and drops the subsource, respectively.
91    /// - This field includes the primary source's ID, which might need to be
92    ///   filtered out to understand which exports are explicit ingestion exports.
93    /// - This field does _not_ include the remap collection, which is tracked
94    ///   in its own field.
95    pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
96    /// The ID of the instance in which to install the source.
97    pub instance_id: StorageInstanceId,
98    /// The ID of this ingestion's remap/progress collection.
99    pub remap_collection_id: GlobalId,
100    /// The storage metadata for the remap/progress collection
101    pub remap_metadata: S,
102}
103
104impl IngestionDescription {
105    pub fn new(
106        desc: SourceDesc,
107        instance_id: StorageInstanceId,
108        remap_collection_id: GlobalId,
109    ) -> Self {
110        Self {
111            desc,
112            remap_metadata: (),
113            source_exports: BTreeMap::new(),
114            instance_id,
115            remap_collection_id,
116        }
117    }
118}
119
120impl<S> IngestionDescription<S> {
121    /// Return an iterator over the `GlobalId`s of `self`'s collections.
122    /// This will contain ids for the remap collection, subsources,
123    /// tables for this source, and the primary collection ID, even if
124    /// no data will be exported to the primary collection.
125    pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
126        // Expand self so that any new fields added generate a compiler error to
127        // increase the likelihood of developers seeing this function.
128        let IngestionDescription {
129            desc: _,
130            remap_metadata: _,
131            source_exports,
132            instance_id: _,
133            remap_collection_id,
134        } = &self;
135
136        source_exports
137            .keys()
138            .copied()
139            .chain(std::iter::once(*remap_collection_id))
140    }
141}
142
143impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
144    fn alter_compatible(
145        &self,
146        id: GlobalId,
147        other: &IngestionDescription<S>,
148    ) -> Result<(), AlterError> {
149        if self == other {
150            return Ok(());
151        }
152        let IngestionDescription {
153            desc,
154            remap_metadata,
155            source_exports,
156            instance_id,
157            remap_collection_id,
158        } = self;
159
160        let compatibility_checks = [
161            (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
162            (remap_metadata == &other.remap_metadata, "remap_metadata"),
163            (
164                source_exports
165                    .iter()
166                    .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
167                        l_key.cmp(r_key)
168                    })
169                    .all(|r| match r {
170                        Both(
171                            (
172                                _,
173                                SourceExport {
174                                    storage_metadata: l_metadata,
175                                    details: l_details,
176                                    data_config: l_data_config,
177                                },
178                            ),
179                            (
180                                _,
181                                SourceExport {
182                                    storage_metadata: r_metadata,
183                                    details: r_details,
184                                    data_config: r_data_config,
185                                },
186                            ),
187                        ) => {
188                            l_metadata.alter_compatible(id, r_metadata).is_ok()
189                                && l_details.alter_compatible(id, r_details).is_ok()
190                                && l_data_config.alter_compatible(id, r_data_config).is_ok()
191                        }
192                        _ => true,
193                    }),
194                "source_exports",
195            ),
196            (instance_id == &other.instance_id, "instance_id"),
197            (
198                remap_collection_id == &other.remap_collection_id,
199                "remap_collection_id",
200            ),
201        ];
202        for (compatible, field) in compatibility_checks {
203            if !compatible {
204                tracing::warn!(
205                    "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
206                    self,
207                    other
208                );
209
210                return Err(AlterError { id });
211            }
212        }
213
214        Ok(())
215    }
216}
217
218impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
219    for IngestionDescription<(), ReferencedConnection>
220{
221    fn into_inline_connection(self, r: R) -> IngestionDescription {
222        let IngestionDescription {
223            desc,
224            remap_metadata,
225            source_exports,
226            instance_id,
227            remap_collection_id,
228        } = self;
229
230        IngestionDescription {
231            desc: desc.into_inline_connection(r),
232            remap_metadata,
233            source_exports,
234            instance_id,
235            remap_collection_id,
236        }
237    }
238}
239
240#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
241pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
242    /// The collection metadata needed to write the exported data
243    pub storage_metadata: S,
244    /// Details necessary for the source to export data to this export's collection.
245    pub details: SourceExportDetails,
246    /// Config necessary to handle (e.g. decode and envelope) the data for this export.
247    pub data_config: SourceExportDataConfig<C>,
248}
249
250pub trait SourceTimestamp:
251    Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
252{
253    fn encode_row(&self) -> Row;
254    fn decode_row(row: &Row) -> Self;
255}
256
257impl SourceTimestamp for MzOffset {
258    fn encode_row(&self) -> Row {
259        Row::pack([Datum::UInt64(self.offset)])
260    }
261
262    fn decode_row(row: &Row) -> Self {
263        let mut datums = row.iter();
264        match (datums.next(), datums.next()) {
265            (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
266            _ => panic!("invalid row {row:?}"),
267        }
268    }
269}
270
271/// Universal language for describing message positions in Materialize, in a source independent
272/// way. Individual sources like Kafka or File sources should explicitly implement their own offset
273/// type that converts to/From MzOffsets. A 0-MzOffset denotes an empty stream.
274#[derive(
275    Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize,
276)]
277pub struct MzOffset {
278    pub offset: u64,
279}
280
281impl differential_dataflow::difference::Semigroup for MzOffset {
282    fn plus_equals(&mut self, rhs: &Self) {
283        self.offset.plus_equals(&rhs.offset)
284    }
285}
286
287impl differential_dataflow::difference::IsZero for MzOffset {
288    fn is_zero(&self) -> bool {
289        self.offset.is_zero()
290    }
291}
292
293impl mz_persist_types::Codec64 for MzOffset {
294    fn codec_name() -> String {
295        "MzOffset".to_string()
296    }
297
298    fn encode(&self) -> [u8; 8] {
299        mz_persist_types::Codec64::encode(&self.offset)
300    }
301
302    fn decode(buf: [u8; 8]) -> Self {
303        Self {
304            offset: mz_persist_types::Codec64::decode(buf),
305        }
306    }
307}
308
309impl columnation::Columnation for MzOffset {
310    type InnerRegion = columnation::CopyRegion<MzOffset>;
311}
312
313impl MzOffset {
314    pub fn checked_sub(self, other: Self) -> Option<Self> {
315        self.offset
316            .checked_sub(other.offset)
317            .map(|offset| Self { offset })
318    }
319}
320
321/// Convert from MzOffset to Kafka::Offset as long as
322/// the offset is not negative
323impl From<u64> for MzOffset {
324    fn from(offset: u64) -> Self {
325        Self { offset }
326    }
327}
328
329impl std::fmt::Display for MzOffset {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        write!(f, "{}", self.offset)
332    }
333}
334
335// Assume overflow does not occur for addition
336impl Add<u64> for MzOffset {
337    type Output = MzOffset;
338
339    fn add(self, x: u64) -> MzOffset {
340        MzOffset {
341            offset: self.offset + x,
342        }
343    }
344}
345impl Add<Self> for MzOffset {
346    type Output = Self;
347
348    fn add(self, x: Self) -> Self {
349        MzOffset {
350            offset: self.offset + x.offset,
351        }
352    }
353}
354impl AddAssign<u64> for MzOffset {
355    fn add_assign(&mut self, x: u64) {
356        self.offset += x;
357    }
358}
359impl AddAssign<Self> for MzOffset {
360    fn add_assign(&mut self, x: Self) {
361        self.offset += x.offset;
362    }
363}
364
365/// Convert from `PgLsn` to MzOffset
366impl From<tokio_postgres::types::PgLsn> for MzOffset {
367    fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
368        MzOffset { offset: lsn.into() }
369    }
370}
371
372impl Timestamp for MzOffset {
373    type Summary = MzOffset;
374
375    fn minimum() -> Self {
376        MzOffset {
377            offset: Timestamp::minimum(),
378        }
379    }
380}
381
382impl PathSummary<MzOffset> for MzOffset {
383    fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
384        Some(MzOffset {
385            offset: self.offset.results_in(&src.offset)?,
386        })
387    }
388
389    fn followed_by(&self, other: &Self) -> Option<Self> {
390        Some(MzOffset {
391            offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
392        })
393    }
394}
395
396impl Refines<()> for MzOffset {
397    fn to_inner(_: ()) -> Self {
398        MzOffset::minimum()
399    }
400    fn to_outer(self) {}
401    fn summarize(_: Self::Summary) {}
402}
403
404impl PartialOrder for MzOffset {
405    #[inline]
406    fn less_equal(&self, other: &Self) -> bool {
407        self.offset.less_equal(&other.offset)
408    }
409}
410
411impl TotalOrder for MzOffset {}
412
413/// The meaning of the timestamp number produced by data sources. This type
414/// is not concerned with the source of the timestamp (like if the data came
415/// from a Debezium consistency topic or a CDCv2 stream), instead only what the
416/// timestamp number means.
417///
418/// Some variants here have attached data used to differentiate incomparable
419/// instantiations. These attached data types should be expanded in the future
420/// if we need to tell apart more kinds of sources.
421#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Hash)]
422pub enum Timeline {
423    /// EpochMilliseconds means the timestamp is the number of milliseconds since
424    /// the Unix epoch.
425    EpochMilliseconds,
426    /// External means the timestamp comes from an external data source and we
427    /// don't know what the number means. The attached String is the source's name,
428    /// which will result in different sources being incomparable.
429    External(String),
430    /// User means the user has manually specified a timeline. The attached
431    /// String is specified by the user, allowing them to decide sources that are
432    /// joinable.
433    User(String),
434}
435
436impl Timeline {
437    const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
438    const EXTERNAL_ID_CHAR: char = 'E';
439    const USER_ID_CHAR: char = 'U';
440
441    fn id_char(&self) -> char {
442        match self {
443            Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
444            Self::External(_) => Self::EXTERNAL_ID_CHAR,
445            Self::User(_) => Self::USER_ID_CHAR,
446        }
447    }
448}
449
450impl ToString for Timeline {
451    fn to_string(&self) -> String {
452        match self {
453            Self::EpochMilliseconds => format!("{}", self.id_char()),
454            Self::External(id) => format!("{}.{id}", self.id_char()),
455            Self::User(id) => format!("{}.{id}", self.id_char()),
456        }
457    }
458}
459
460impl FromStr for Timeline {
461    type Err = String;
462
463    fn from_str(s: &str) -> Result<Self, Self::Err> {
464        if s.is_empty() {
465            return Err("empty timeline".to_string());
466        }
467        let mut chars = s.chars();
468        match chars.next().expect("non-empty string") {
469            Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
470                None => Ok(Self::EpochMilliseconds),
471                Some(_) => Err(format!("unknown timeline: {s}")),
472            },
473            Self::EXTERNAL_ID_CHAR => match chars.next() {
474                Some('.') => Ok(Self::External(chars.as_str().to_string())),
475                _ => Err(format!("unknown timeline: {s}")),
476            },
477            Self::USER_ID_CHAR => match chars.next() {
478                Some('.') => Ok(Self::User(chars.as_str().to_string())),
479                _ => Err(format!("unknown timeline: {s}")),
480            },
481            _ => Err(format!("unknown timeline: {s}")),
482        }
483    }
484}
485
486/// A connection to an external system
487pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
488    /// The name of the external system (e.g kafka, postgres, etc).
489    fn name(&self) -> &'static str;
490
491    /// The name of the resource in the external system (e.g kafka topic) if any
492    fn external_reference(&self) -> Option<&str>;
493
494    /// Defines the key schema to use by default for this source connection type.
495    /// This will be used for the primary export of the source and as the default
496    /// pre-encoding key schema for the source.
497    fn default_key_desc(&self) -> RelationDesc;
498
499    /// Defines the value schema to use by default for this source connection type.
500    /// This will be used for the primary export of the source and as the default
501    /// pre-encoding value schema for the source.
502    fn default_value_desc(&self) -> RelationDesc;
503
504    /// The schema of this connection's timestamp type. This will also be the schema of the
505    /// progress relation.
506    fn timestamp_desc(&self) -> RelationDesc;
507
508    /// The id of the connection object (i.e the one obtained from running `CREATE CONNECTION`) in
509    /// the catalog, if any.
510    fn connection_id(&self) -> Option<CatalogItemId>;
511
512    /// Whether the source type supports read only mode.
513    fn supports_read_only(&self) -> bool;
514
515    /// Whether the source type prefers to run on only one replica of a multi-replica cluster.
516    fn prefers_single_replica(&self) -> bool;
517}
518
519#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
520pub enum Compression {
521    Gzip,
522    None,
523}
524
525/// Defines the configuration for how to handle data that is exported for a given
526/// Source Export.
527#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
528pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
529    pub encoding: Option<encoding::SourceDataEncoding<C>>,
530    pub envelope: SourceEnvelope,
531}
532
533impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
534    for SourceExportDataConfig<ReferencedConnection>
535{
536    fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
537        let SourceExportDataConfig { encoding, envelope } = self;
538
539        SourceExportDataConfig {
540            encoding: encoding.map(|e| e.into_inline_connection(r)),
541            envelope,
542        }
543    }
544}
545
546impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
547    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
548        if self == other {
549            return Ok(());
550        }
551        let Self { encoding, envelope } = &self;
552
553        let compatibility_checks = [
554            (
555                match (encoding, &other.encoding) {
556                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
557                    (s, o) => s == o,
558                },
559                "encoding",
560            ),
561            (envelope == &other.envelope, "envelope"),
562        ];
563
564        for (compatible, field) in compatibility_checks {
565            if !compatible {
566                tracing::warn!(
567                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
568                    self,
569                    other
570                );
571
572                return Err(AlterError { id });
573            }
574        }
575        Ok(())
576    }
577}
578
579impl<C: ConnectionAccess> SourceExportDataConfig<C> {
580    /// Returns `true` if this connection yields data that is
581    /// append-only/monotonic. Append-monly means the source
582    /// never produces retractions.
583    // TODO(guswynn): consider enforcing this more completely at the
584    // parsing/typechecking level, by not using an `envelope`
585    // for sources like pg
586    pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
587        match &self.envelope {
588            // Upsert and CdcV2 may produce retractions.
589            SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
590            SourceEnvelope::None(_) => {
591                match connection {
592                    // Postgres can produce retractions (deletes).
593                    GenericSourceConnection::Postgres(_) => false,
594                    // MySQL can produce retractions (deletes).
595                    GenericSourceConnection::MySql(_) => false,
596                    // SQL Server can produce retractions (deletes).
597                    GenericSourceConnection::SqlServer(_) => false,
598                    // Whether or not a Loadgen source can produce retractions varies.
599                    GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
600                    // Kafka exports with `None` envelope are append-only.
601                    GenericSourceConnection::Kafka(_) => true,
602                }
603            }
604        }
605    }
606}
607
608/// An external source of updates for a relational collection.
609#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
610pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
611    pub connection: GenericSourceConnection<C>,
612    pub timestamp_interval: Duration,
613}
614
615impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
616    for SourceDesc<ReferencedConnection>
617{
618    fn into_inline_connection(self, r: R) -> SourceDesc {
619        let SourceDesc {
620            connection,
621            timestamp_interval,
622        } = self;
623
624        SourceDesc {
625            connection: connection.into_inline_connection(&r),
626            timestamp_interval,
627        }
628    }
629}
630
631impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
632    /// Determines if `self` is compatible with another `SourceDesc`, in such a
633    /// way that it is possible to turn `self` into `other` through a valid
634    /// series of transformations (e.g. no transformation or `ALTER SOURCE`).
635    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
636        if self == other {
637            return Ok(());
638        }
639        let Self {
640            connection,
641            timestamp_interval,
642        } = &self;
643
644        let compatibility_checks = [
645            (
646                connection.alter_compatible(id, &other.connection).is_ok(),
647                "connection",
648            ),
649            (
650                timestamp_interval == &other.timestamp_interval,
651                "timestamp_interval",
652            ),
653        ];
654
655        for (compatible, field) in compatibility_checks {
656            if !compatible {
657                tracing::warn!(
658                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
659                    self,
660                    other
661                );
662
663                return Err(AlterError { id });
664            }
665        }
666
667        Ok(())
668    }
669}
670
671#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
672pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
673    Kafka(KafkaSourceConnection<C>),
674    Postgres(PostgresSourceConnection<C>),
675    MySql(MySqlSourceConnection<C>),
676    SqlServer(SqlServerSourceConnection<C>),
677    LoadGenerator(LoadGeneratorSourceConnection),
678}
679
680impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
681    fn from(conn: KafkaSourceConnection<C>) -> Self {
682        Self::Kafka(conn)
683    }
684}
685
686impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
687    fn from(conn: PostgresSourceConnection<C>) -> Self {
688        Self::Postgres(conn)
689    }
690}
691
692impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
693    fn from(conn: MySqlSourceConnection<C>) -> Self {
694        Self::MySql(conn)
695    }
696}
697
698impl<C: ConnectionAccess> From<SqlServerSourceConnection<C>> for GenericSourceConnection<C> {
699    fn from(conn: SqlServerSourceConnection<C>) -> Self {
700        Self::SqlServer(conn)
701    }
702}
703
704impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
705    fn from(conn: LoadGeneratorSourceConnection) -> Self {
706        Self::LoadGenerator(conn)
707    }
708}
709
710impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
711    for GenericSourceConnection<ReferencedConnection>
712{
713    fn into_inline_connection(self, r: R) -> GenericSourceConnection {
714        match self {
715            GenericSourceConnection::Kafka(kafka) => {
716                GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
717            }
718            GenericSourceConnection::Postgres(pg) => {
719                GenericSourceConnection::Postgres(pg.into_inline_connection(r))
720            }
721            GenericSourceConnection::MySql(mysql) => {
722                GenericSourceConnection::MySql(mysql.into_inline_connection(r))
723            }
724            GenericSourceConnection::SqlServer(sql_server) => {
725                GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
726            }
727            GenericSourceConnection::LoadGenerator(lg) => {
728                GenericSourceConnection::LoadGenerator(lg)
729            }
730        }
731    }
732}
733
734impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
735    fn name(&self) -> &'static str {
736        match self {
737            Self::Kafka(conn) => conn.name(),
738            Self::Postgres(conn) => conn.name(),
739            Self::MySql(conn) => conn.name(),
740            Self::SqlServer(conn) => conn.name(),
741            Self::LoadGenerator(conn) => conn.name(),
742        }
743    }
744
745    fn external_reference(&self) -> Option<&str> {
746        match self {
747            Self::Kafka(conn) => conn.external_reference(),
748            Self::Postgres(conn) => conn.external_reference(),
749            Self::MySql(conn) => conn.external_reference(),
750            Self::SqlServer(conn) => conn.external_reference(),
751            Self::LoadGenerator(conn) => conn.external_reference(),
752        }
753    }
754
755    fn default_key_desc(&self) -> RelationDesc {
756        match self {
757            Self::Kafka(conn) => conn.default_key_desc(),
758            Self::Postgres(conn) => conn.default_key_desc(),
759            Self::MySql(conn) => conn.default_key_desc(),
760            Self::SqlServer(conn) => conn.default_key_desc(),
761            Self::LoadGenerator(conn) => conn.default_key_desc(),
762        }
763    }
764
765    fn default_value_desc(&self) -> RelationDesc {
766        match self {
767            Self::Kafka(conn) => conn.default_value_desc(),
768            Self::Postgres(conn) => conn.default_value_desc(),
769            Self::MySql(conn) => conn.default_value_desc(),
770            Self::SqlServer(conn) => conn.default_value_desc(),
771            Self::LoadGenerator(conn) => conn.default_value_desc(),
772        }
773    }
774
775    fn timestamp_desc(&self) -> RelationDesc {
776        match self {
777            Self::Kafka(conn) => conn.timestamp_desc(),
778            Self::Postgres(conn) => conn.timestamp_desc(),
779            Self::MySql(conn) => conn.timestamp_desc(),
780            Self::SqlServer(conn) => conn.timestamp_desc(),
781            Self::LoadGenerator(conn) => conn.timestamp_desc(),
782        }
783    }
784
785    fn connection_id(&self) -> Option<CatalogItemId> {
786        match self {
787            Self::Kafka(conn) => conn.connection_id(),
788            Self::Postgres(conn) => conn.connection_id(),
789            Self::MySql(conn) => conn.connection_id(),
790            Self::SqlServer(conn) => conn.connection_id(),
791            Self::LoadGenerator(conn) => conn.connection_id(),
792        }
793    }
794
795    fn supports_read_only(&self) -> bool {
796        match self {
797            GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
798            GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
799            GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
800            GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
801            GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
802        }
803    }
804
805    fn prefers_single_replica(&self) -> bool {
806        match self {
807            GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
808            GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
809            GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
810            GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
811            GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
812        }
813    }
814}
815impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
816    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
817        if self == other {
818            return Ok(());
819        }
820        let r = match (self, other) {
821            (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
822            (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
823            (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
824            (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
825            (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
826                conn.alter_compatible(id, other)
827            }
828            _ => Err(AlterError { id }),
829        };
830
831        if r.is_err() {
832            tracing::warn!(
833                "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
834                self,
835                other
836            );
837        }
838
839        r
840    }
841}
842
843/// Details necessary for each source export to allow the source implementations
844/// to export data to the export's collection.
845#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
846pub enum SourceExportDetails {
847    /// Used when the primary collection of a source isn't an export to
848    /// output to.
849    None,
850    Kafka(KafkaSourceExportDetails),
851    Postgres(PostgresSourceExportDetails),
852    MySql(MySqlSourceExportDetails),
853    SqlServer(SqlServerSourceExportDetails),
854    LoadGenerator(LoadGeneratorSourceExportDetails),
855}
856
857impl crate::AlterCompatible for SourceExportDetails {
858    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
859        if self == other {
860            return Ok(());
861        }
862        let r = match (self, other) {
863            (Self::None, Self::None) => Ok(()),
864            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
865            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
866            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
867            (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
868            _ => Err(AlterError { id }),
869        };
870
871        if r.is_err() {
872            tracing::warn!(
873                "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
874                self,
875                other
876            );
877        }
878
879        r
880    }
881}
882
883/// Details necessary to store in the `Details` option of a source export
884/// statement (`CREATE SUBSOURCE` and `CREATE TABLE .. FROM SOURCE` statements),
885/// to generate the appropriate `SourceExportDetails` struct during planning.
886/// NOTE that this is serialized as proto to the catalog, so any changes here
887/// must be backwards compatible or will require a migration.
888pub enum SourceExportStatementDetails {
889    Postgres {
890        table: mz_postgres_util::desc::PostgresTableDesc,
891    },
892    MySql {
893        table: mz_mysql_util::MySqlTableDesc,
894        initial_gtid_set: String,
895    },
896    SqlServer {
897        table: mz_sql_server_util::desc::SqlServerTableDesc,
898        capture_instance: Arc<str>,
899        initial_lsn: mz_sql_server_util::cdc::Lsn,
900    },
901    LoadGenerator {
902        output: LoadGeneratorOutput,
903    },
904    Kafka {},
905}
906
907impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
908    fn into_proto(&self) -> ProtoSourceExportStatementDetails {
909        match self {
910            SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
911                kind: Some(proto_source_export_statement_details::Kind::Postgres(
912                    postgres::ProtoPostgresSourceExportStatementDetails {
913                        table: Some(table.into_proto()),
914                    },
915                )),
916            },
917            SourceExportStatementDetails::MySql {
918                table,
919                initial_gtid_set,
920            } => ProtoSourceExportStatementDetails {
921                kind: Some(proto_source_export_statement_details::Kind::Mysql(
922                    mysql::ProtoMySqlSourceExportStatementDetails {
923                        table: Some(table.into_proto()),
924                        initial_gtid_set: initial_gtid_set.clone(),
925                    },
926                )),
927            },
928            SourceExportStatementDetails::SqlServer {
929                table,
930                capture_instance,
931                initial_lsn,
932            } => ProtoSourceExportStatementDetails {
933                kind: Some(proto_source_export_statement_details::Kind::SqlServer(
934                    sql_server::ProtoSqlServerSourceExportStatementDetails {
935                        table: Some(table.into_proto()),
936                        capture_instance: capture_instance.to_string(),
937                        initial_lsn: initial_lsn.as_bytes().to_vec(),
938                    },
939                )),
940            },
941            SourceExportStatementDetails::LoadGenerator { output } => {
942                ProtoSourceExportStatementDetails {
943                    kind: Some(proto_source_export_statement_details::Kind::Loadgen(
944                        load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
945                            output: output.into_proto().into(),
946                        },
947                    )),
948                }
949            }
950            SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
951                kind: Some(proto_source_export_statement_details::Kind::Kafka(
952                    kafka::ProtoKafkaSourceExportStatementDetails {},
953                )),
954            },
955        }
956    }
957
958    fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
959        use proto_source_export_statement_details::Kind;
960        Ok(match proto.kind {
961            Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
962                table: details
963                    .table
964                    .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
965            },
966            Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
967                table: details
968                    .table
969                    .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
970
971                initial_gtid_set: details.initial_gtid_set,
972            },
973            Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
974                table: details
975                    .table
976                    .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
977                capture_instance: details.capture_instance.into(),
978                initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
979                    .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
980            },
981            Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
982                output: details
983                    .output
984                    .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
985            },
986            Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
987            None => {
988                return Err(TryFromProtoError::missing_field(
989                    "ProtoSourceExportStatementDetails::kind",
990                ));
991            }
992        })
993    }
994}
995
996#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
997#[repr(transparent)]
998pub struct SourceData(pub Result<Row, DataflowError>);
999
1000impl Default for SourceData {
1001    fn default() -> Self {
1002        SourceData(Ok(Row::default()))
1003    }
1004}
1005
1006impl Deref for SourceData {
1007    type Target = Result<Row, DataflowError>;
1008
1009    fn deref(&self) -> &Self::Target {
1010        &self.0
1011    }
1012}
1013
1014impl DerefMut for SourceData {
1015    fn deref_mut(&mut self) -> &mut Self::Target {
1016        &mut self.0
1017    }
1018}
1019
1020impl RustType<ProtoSourceData> for SourceData {
1021    fn into_proto(&self) -> ProtoSourceData {
1022        use proto_source_data::Kind;
1023        ProtoSourceData {
1024            kind: Some(match &**self {
1025                Ok(row) => Kind::Ok(row.into_proto()),
1026                Err(err) => Kind::Err(err.into_proto()),
1027            }),
1028        }
1029    }
1030
1031    fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1032        use proto_source_data::Kind;
1033        match proto.kind {
1034            Some(kind) => match kind {
1035                Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1036                Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1037            },
1038            None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1039        }
1040    }
1041}
1042
1043impl Codec for SourceData {
1044    type Storage = ProtoRow;
1045    type Schema = RelationDesc;
1046
1047    fn codec_name() -> String {
1048        "protobuf[SourceData]".into()
1049    }
1050
1051    fn encode<B: BufMut>(&self, buf: &mut B) {
1052        self.into_proto()
1053            .encode(buf)
1054            .expect("no required fields means no initialization errors");
1055    }
1056
1057    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1058        let mut val = SourceData::default();
1059        <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1060        Ok(val)
1061    }
1062
1063    fn decode_from<'a>(
1064        &mut self,
1065        buf: &'a [u8],
1066        storage: &mut Option<ProtoRow>,
1067        schema: &RelationDesc,
1068    ) -> Result<(), String> {
1069        // Optimize for common case of `Ok` by leaving a (cleared) `ProtoRow` in
1070        // the `Ok` variant of `ProtoSourceData`. prost's `Message::merge` impl
1071        // is smart about reusing the `Vec<Datum>` when it can.
1072        let mut proto = storage.take().unwrap_or_default();
1073        proto.clear();
1074        let mut proto = ProtoSourceData {
1075            kind: Some(proto_source_data::Kind::Ok(proto)),
1076        };
1077        proto.merge(buf).map_err(|err| err.to_string())?;
1078        match (proto.kind, &mut self.0) {
1079            // Again, optimize for the common case...
1080            (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1081                let ret = row.decode_from_proto(&proto, schema);
1082                storage.replace(proto);
1083                ret
1084            }
1085            // ...otherwise fall back to the obvious thing.
1086            (kind, _) => {
1087                let proto = ProtoSourceData { kind };
1088                *self = proto.into_rust().map_err(|err| err.to_string())?;
1089                // Nothing to put back in storage.
1090                Ok(())
1091            }
1092        }
1093    }
1094
1095    fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1096        match &val.0 {
1097            Ok(row) => Row::validate(row, desc),
1098            Err(_) => Ok(()),
1099        }
1100    }
1101
1102    fn encode_schema(schema: &Self::Schema) -> Bytes {
1103        schema.into_proto().encode_to_vec().into()
1104    }
1105
1106    fn decode_schema(buf: &Bytes) -> Self::Schema {
1107        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1108        proto.into_rust().expect("valid schema")
1109    }
1110}
1111
1112/// Given a [`RelationDesc`] returns an arbitrary [`SourceData`].
1113pub fn arb_source_data_for_relation_desc(
1114    desc: &RelationDesc,
1115) -> impl Strategy<Value = SourceData> + use<> {
1116    let row_strat = arb_row_for_relation(desc).no_shrink();
1117
1118    proptest::strategy::Union::new_weighted(vec![
1119        (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1120        (
1121            1,
1122            any::<DataflowError>()
1123                .prop_map(|err| SourceData(Err(err)))
1124                .no_shrink()
1125                .boxed(),
1126        ),
1127    ])
1128}
1129
1130/// Describes how external references should be organized in a multi-level
1131/// hierarchy.
1132///
1133/// For both PostgreSQL and MySQL sources, these levels of reference are
1134/// intrinsic to the items which we're referencing. If there are other naming
1135/// schemas for other types of sources we discover, we might need to revisit
1136/// this.
1137pub trait ExternalCatalogReference {
1138    /// The "second" level of namespacing for the reference.
1139    fn schema_name(&self) -> &str;
1140    /// The lowest level of namespacing for the reference.
1141    fn item_name(&self) -> &str;
1142}
1143
1144impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1145    fn schema_name(&self) -> &str {
1146        &self.schema_name
1147    }
1148
1149    fn item_name(&self) -> &str {
1150        &self.name
1151    }
1152}
1153
1154impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1155    fn schema_name(&self) -> &str {
1156        &self.namespace
1157    }
1158
1159    fn item_name(&self) -> &str {
1160        &self.name
1161    }
1162}
1163
1164impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1165    fn schema_name(&self) -> &str {
1166        &*self.schema_name
1167    }
1168
1169    fn item_name(&self) -> &str {
1170        &*self.name
1171    }
1172}
1173
1174// This implementation provides a means of converting arbitrary objects into a
1175// `SubsourceCatalogReference`, e.g. load generator view names.
1176impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1177    fn schema_name(&self) -> &str {
1178        self.0
1179    }
1180
1181    fn item_name(&self) -> &str {
1182        self.1
1183    }
1184}
1185
1186/// Stores and resolves references to a `&[T: ExternalCatalogReference]`.
1187///
1188/// This is meant to provide an API to quickly look up a source's subsources.
1189///
1190/// For sources that do not provide any subsources, use the `Default`
1191/// implementation, which is empty and will not be able to resolve any
1192/// references.
1193#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1194pub struct SourceReferenceResolver {
1195    inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1196}
1197
1198#[derive(Debug, Clone, thiserror::Error)]
1199pub enum ExternalReferenceResolutionError {
1200    #[error("reference to {name} not found in source")]
1201    DoesNotExist { name: String },
1202    #[error(
1203        "reference {name} is ambiguous, consider specifying an additional \
1204    layer of qualification"
1205    )]
1206    Ambiguous { name: String },
1207    #[error("invalid identifier: {0}")]
1208    Ident(#[from] IdentError),
1209}
1210
1211impl<'a> SourceReferenceResolver {
1212    /// Constructs a new `SourceReferenceResolver` from a slice of `T:
1213    /// SubsourceCatalogReference`.
1214    ///
1215    /// # Errors
1216    /// - If any `&str` provided cannot be taken to an [`Ident`].
1217    pub fn new<T: ExternalCatalogReference>(
1218        database: &str,
1219        referenceable_items: &'a [T],
1220    ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1221        // An index from table name -> schema name -> database name -> index in
1222        // `referenceable_items`.
1223        let mut inner = BTreeMap::new();
1224
1225        let database = Ident::new(database)?;
1226
1227        for (reference_idx, item) in referenceable_items.iter().enumerate() {
1228            let item_name = Ident::new(item.item_name())?;
1229            let schema_name = Ident::new(item.schema_name())?;
1230
1231            inner
1232                .entry(item_name)
1233                .or_insert_with(BTreeMap::new)
1234                .entry(schema_name)
1235                .or_insert_with(BTreeMap::new)
1236                .entry(database.clone())
1237                .or_insert(reference_idx);
1238        }
1239
1240        Ok(SourceReferenceResolver { inner })
1241    }
1242
1243    /// Returns the canonical reference and index from which it originated in
1244    /// the `referenceable_items` provided to [`Self::new`].
1245    ///
1246    /// # Args
1247    /// - `name` is `&[Ident]` to let users provide the inner element of
1248    ///   [`UnresolvedItemName`].
1249    /// - `canonicalize_to_width` limits the number of elements in the returned
1250    ///   [`UnresolvedItemName`];this is useful if the source type requires
1251    ///   contriving database and schema names that a subsource should not
1252    ///   persist as its reference.
1253    ///
1254    /// # Errors
1255    /// - If `name` does not resolve to an item in `self.inner`.
1256    ///
1257    /// # Panics
1258    /// - If `canonicalize_to_width`` is not in `1..=3`.
1259    pub fn resolve(
1260        &self,
1261        name: &[Ident],
1262        canonicalize_to_width: usize,
1263    ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1264        let (db, schema, idx) = self.resolve_inner(name)?;
1265
1266        let item = name.last().expect("must have provided at least 1 element");
1267
1268        let canonical_name = match canonicalize_to_width {
1269            1 => vec![item.clone()],
1270            2 => vec![schema.clone(), item.clone()],
1271            3 => vec![db.clone(), schema.clone(), item.clone()],
1272            o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1273        };
1274
1275        Ok((UnresolvedItemName(canonical_name), idx))
1276    }
1277
1278    /// Returns the index from which it originated in the `referenceable_items`
1279    /// provided to [`Self::new`].
1280    ///
1281    /// # Args
1282    /// `name` is `&[Ident]` to let users provide the inner element of
1283    /// [`UnresolvedItemName`].
1284    ///
1285    /// # Errors
1286    /// - If `name` does not resolve to an item in `self.inner`.
1287    pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1288        let (_db, _schema, idx) = self.resolve_inner(name)?;
1289        Ok(idx)
1290    }
1291
1292    /// Returns the index from which it originated in the `referenceable_items`
1293    /// provided to [`Self::new`].
1294    ///
1295    /// # Args
1296    /// `name` is `&[Ident]` to let users provide the inner element of
1297    /// [`UnresolvedItemName`].
1298    ///
1299    /// # Return
1300    /// Returns a tuple whose elements are:
1301    /// 1. The "database"- or top-level namespace of the reference.
1302    /// 2. The "schema"- or second-level namespace of the reference.
1303    /// 3. The index to find the item in `referenceable_items` argument provided
1304    ///    to `SourceReferenceResolver::new`.
1305    ///
1306    /// # Errors
1307    /// - If `name` does not resolve to an item in `self.inner`.
1308    fn resolve_inner<'name: 'a>(
1309        &'a self,
1310        name: &'name [Ident],
1311    ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1312        let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1313
1314        // Names must be composed of 1..=3 elements.
1315        if !(1..=3).contains(&name.len()) {
1316            Err(ExternalReferenceResolutionError::DoesNotExist {
1317                name: get_provided_name(),
1318            })?;
1319        }
1320
1321        // Fill on the leading elements with `None` if they aren't present.
1322        let mut names = std::iter::repeat(None)
1323            .take(3 - name.len())
1324            .chain(name.iter().map(Some));
1325
1326        let database = names.next().flatten();
1327        let schema = names.next().flatten();
1328        let item = names
1329            .next()
1330            .flatten()
1331            .expect("must have provided the item name");
1332
1333        assert_none!(names.next(), "expected a 3-element iterator");
1334
1335        let schemas =
1336            self.inner
1337                .get(item)
1338                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1339                    name: get_provided_name(),
1340                })?;
1341
1342        let schema = match schema {
1343            Some(schema) => schema,
1344            None => schemas.keys().exactly_one().map_err(|_e| {
1345                ExternalReferenceResolutionError::Ambiguous {
1346                    name: get_provided_name(),
1347                }
1348            })?,
1349        };
1350
1351        let databases =
1352            schemas
1353                .get(schema)
1354                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1355                    name: get_provided_name(),
1356                })?;
1357
1358        let database = match database {
1359            Some(database) => database,
1360            None => databases.keys().exactly_one().map_err(|_e| {
1361                ExternalReferenceResolutionError::Ambiguous {
1362                    name: get_provided_name(),
1363                }
1364            })?,
1365        };
1366
1367        let reference_idx = databases.get(database).ok_or_else(|| {
1368            ExternalReferenceResolutionError::DoesNotExist {
1369                name: get_provided_name(),
1370            }
1371        })?;
1372
1373        Ok((database, schema, *reference_idx))
1374    }
1375}
1376
1377/// A decoder for [`Row`]s within [`SourceData`].
1378///
1379/// This type exists as a wrapper around [`RowColumnarDecoder`] to handle the
1380/// case where the [`RelationDesc`] we're encoding with has no columns. See
1381/// [`SourceDataRowColumnarEncoder`] for more details.
1382#[derive(Debug)]
1383pub enum SourceDataRowColumnarDecoder {
1384    Row(RowColumnarDecoder),
1385    EmptyRow,
1386}
1387
1388impl SourceDataRowColumnarDecoder {
1389    pub fn decode(&self, idx: usize, row: &mut Row) {
1390        match self {
1391            SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1392            SourceDataRowColumnarDecoder::EmptyRow => {
1393                // Create a packer just to clear the Row.
1394                row.packer();
1395            }
1396        }
1397    }
1398
1399    pub fn goodbytes(&self) -> usize {
1400        match self {
1401            SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1402            SourceDataRowColumnarDecoder::EmptyRow => 0,
1403        }
1404    }
1405}
1406
1407#[derive(Debug)]
1408pub struct SourceDataColumnarDecoder {
1409    row_decoder: SourceDataRowColumnarDecoder,
1410    err_decoder: BinaryArray,
1411}
1412
1413impl SourceDataColumnarDecoder {
1414    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1415        // TODO(parkmcar): We should validate the fields here.
1416        let (_fields, arrays, nullability) = col.into_parts();
1417
1418        if nullability.is_some() {
1419            anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1420        }
1421        if arrays.len() != 2 {
1422            anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1423        }
1424
1425        let errs = arrays[1]
1426            .as_any()
1427            .downcast_ref::<BinaryArray>()
1428            .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1429
1430        let row_decoder = match arrays[0].data_type() {
1431            arrow::datatypes::DataType::Struct(_) => {
1432                let rows = arrays[0]
1433                    .as_any()
1434                    .downcast_ref::<StructArray>()
1435                    .ok_or_else(|| {
1436                        anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1437                    })?;
1438                let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1439                SourceDataRowColumnarDecoder::Row(decoder)
1440            }
1441            arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1442            other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1443        };
1444
1445        Ok(SourceDataColumnarDecoder {
1446            row_decoder,
1447            err_decoder: errs.clone(),
1448        })
1449    }
1450}
1451
1452impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1453    fn decode(&self, idx: usize, val: &mut SourceData) {
1454        let err_null = self.err_decoder.is_null(idx);
1455        let row_null = match &self.row_decoder {
1456            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1457            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1458        };
1459
1460        match (row_null, err_null) {
1461            (true, false) => {
1462                let err = self.err_decoder.value(idx);
1463                let err = ProtoDataflowError::decode(err)
1464                    .expect("proto should be valid")
1465                    .into_rust()
1466                    .expect("error should be valid");
1467                val.0 = Err(err);
1468            }
1469            (false, true) => {
1470                let row = match val.0.as_mut() {
1471                    Ok(row) => row,
1472                    Err(_) => {
1473                        val.0 = Ok(Row::default());
1474                        val.0.as_mut().unwrap()
1475                    }
1476                };
1477                self.row_decoder.decode(idx, row);
1478            }
1479            (true, true) => panic!("should have one of 'ok' or 'err'"),
1480            (false, false) => panic!("cannot have both 'ok' and 'err'"),
1481        }
1482    }
1483
1484    fn is_null(&self, idx: usize) -> bool {
1485        let err_null = self.err_decoder.is_null(idx);
1486        let row_null = match &self.row_decoder {
1487            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1488            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1489        };
1490        assert!(!err_null || !row_null, "SourceData should never be null!");
1491
1492        false
1493    }
1494
1495    fn goodbytes(&self) -> usize {
1496        self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1497    }
1498
1499    fn stats(&self) -> StructStats {
1500        let len = self.err_decoder.len();
1501        let err_stats = ColumnarStats {
1502            nulls: Some(ColumnNullStats {
1503                count: self.err_decoder.null_count(),
1504            }),
1505            values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1506        };
1507        // The top level struct is non-nullable and every entry is either an
1508        // `Ok(Row)` or an `Err(String)`. As a result, we can compute the number
1509        // of `Ok` entries by subtracting the number of `Err` entries from the
1510        // total count.
1511        let row_null_count = len - self.err_decoder.null_count();
1512        let row_stats = match &self.row_decoder {
1513            SourceDataRowColumnarDecoder::Row(encoder) => {
1514                // Sanity check that the number of row nulls/nones we calculated
1515                // using the error column matches what the row column thinks it
1516                // has.
1517                assert_eq!(encoder.null_count(), row_null_count);
1518                encoder.stats()
1519            }
1520            SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1521                len,
1522                cols: BTreeMap::default(),
1523            },
1524        };
1525        let row_stats = ColumnarStats {
1526            nulls: Some(ColumnNullStats {
1527                count: row_null_count,
1528            }),
1529            values: ColumnStatKinds::Struct(row_stats),
1530        };
1531
1532        let stats = [
1533            (
1534                SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1535                row_stats,
1536            ),
1537            (
1538                SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1539                err_stats,
1540            ),
1541        ];
1542        StructStats {
1543            len,
1544            cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1545        }
1546    }
1547}
1548
1549/// An encoder for [`Row`]s within [`SourceData`].
1550///
1551/// This type exists as a wrapper around [`RowColumnarEncoder`] to support
1552/// encoding empty [`Row`]s. A [`RowColumnarEncoder`] finishes as a
1553/// [`StructArray`] which is required to have at least one column, and thus
1554/// cannot support empty [`Row`]s.
1555#[derive(Debug)]
1556pub enum SourceDataRowColumnarEncoder {
1557    Row(RowColumnarEncoder),
1558    EmptyRow,
1559}
1560
1561impl SourceDataRowColumnarEncoder {
1562    pub(crate) fn goodbytes(&self) -> usize {
1563        match self {
1564            SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1565            SourceDataRowColumnarEncoder::EmptyRow => 0,
1566        }
1567    }
1568
1569    pub fn append(&mut self, row: &Row) {
1570        match self {
1571            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1572            SourceDataRowColumnarEncoder::EmptyRow => {
1573                assert_eq!(row.iter().count(), 0)
1574            }
1575        }
1576    }
1577
1578    pub fn append_null(&mut self) {
1579        match self {
1580            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1581            SourceDataRowColumnarEncoder::EmptyRow => (),
1582        }
1583    }
1584}
1585
1586#[derive(Debug)]
1587pub struct SourceDataColumnarEncoder {
1588    row_encoder: SourceDataRowColumnarEncoder,
1589    err_encoder: BinaryBuilder,
1590}
1591
1592impl SourceDataColumnarEncoder {
1593    const OK_COLUMN_NAME: &'static str = "ok";
1594    const ERR_COLUMN_NAME: &'static str = "err";
1595
1596    pub fn new(desc: &RelationDesc) -> Self {
1597        let row_encoder = match RowColumnarEncoder::new(desc) {
1598            Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1599            None => {
1600                assert!(desc.typ().columns().is_empty());
1601                SourceDataRowColumnarEncoder::EmptyRow
1602            }
1603        };
1604        let err_encoder = BinaryBuilder::new();
1605
1606        SourceDataColumnarEncoder {
1607            row_encoder,
1608            err_encoder,
1609        }
1610    }
1611}
1612
1613impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1614    type FinishedColumn = StructArray;
1615
1616    fn goodbytes(&self) -> usize {
1617        self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1618    }
1619
1620    #[inline]
1621    fn append(&mut self, val: &SourceData) {
1622        match val.0.as_ref() {
1623            Ok(row) => {
1624                self.row_encoder.append(row);
1625                self.err_encoder.append_null();
1626            }
1627            Err(err) => {
1628                self.row_encoder.append_null();
1629                self.err_encoder
1630                    .append_value(err.into_proto().encode_to_vec());
1631            }
1632        }
1633    }
1634
1635    #[inline]
1636    fn append_null(&mut self) {
1637        panic!("appending a null into SourceDataColumnarEncoder is not supported");
1638    }
1639
1640    fn finish(self) -> Self::FinishedColumn {
1641        let SourceDataColumnarEncoder {
1642            row_encoder,
1643            mut err_encoder,
1644        } = self;
1645
1646        let err_column = BinaryBuilder::finish(&mut err_encoder);
1647        let row_column: ArrayRef = match row_encoder {
1648            SourceDataRowColumnarEncoder::Row(encoder) => {
1649                let column = encoder.finish();
1650                Arc::new(column)
1651            }
1652            SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1653        };
1654
1655        assert_eq!(row_column.len(), err_column.len());
1656
1657        let fields = vec![
1658            Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1659            Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1660        ];
1661        let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1662        StructArray::new(Fields::from(fields), arrays, None)
1663    }
1664}
1665
1666impl Schema<SourceData> for RelationDesc {
1667    type ArrowColumn = StructArray;
1668    type Statistics = StructStats;
1669
1670    type Decoder = SourceDataColumnarDecoder;
1671    type Encoder = SourceDataColumnarEncoder;
1672
1673    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1674        SourceDataColumnarDecoder::new(col, self)
1675    }
1676
1677    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1678        Ok(SourceDataColumnarEncoder::new(self))
1679    }
1680}
1681
1682#[cfg(test)]
1683mod tests {
1684    use arrow::array::{ArrayData, make_comparator};
1685    use base64::Engine;
1686    use bytes::Bytes;
1687    use mz_expr::EvalError;
1688    use mz_ore::assert_err;
1689    use mz_ore::metrics::MetricsRegistry;
1690    use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
1691    use mz_persist::metrics::ColumnarMetrics;
1692    use mz_persist_types::parquet::EncodingConfig;
1693    use mz_persist_types::schema::{Migration, backward_compatible};
1694    use mz_persist_types::stats::{PartStats, PartStatsMetrics};
1695    use mz_repr::{
1696        ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
1697        RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
1698    };
1699    use proptest::prelude::*;
1700    use proptest::strategy::{Union, ValueTree};
1701
1702    use crate::stats::RelationPartStats;
1703
1704    use super::*;
1705
1706    #[mz_ore::test]
1707    fn test_timeline_parsing() {
1708        assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
1709        assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
1710        assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
1711
1712        assert_err!("Materialize".parse::<Timeline>());
1713        assert_err!("Ejoe".parse::<Timeline>());
1714        assert_err!("Umike".parse::<Timeline>());
1715        assert_err!("Dance".parse::<Timeline>());
1716        assert_err!("".parse::<Timeline>());
1717    }
1718
1719    #[track_caller]
1720    fn roundtrip_source_data(
1721        desc: &RelationDesc,
1722        datas: Vec<SourceData>,
1723        read_desc: &RelationDesc,
1724        config: &EncodingConfig,
1725    ) {
1726        let metrics = ColumnarMetrics::disconnected();
1727        let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
1728        for data in &datas {
1729            encoder.append(data);
1730        }
1731        let col = encoder.finish();
1732
1733        // The top-level StructArray for SourceData should always be non-nullable.
1734        assert!(!col.is_nullable());
1735
1736        // Reallocate our arrays with lgalloc.
1737        let col = realloc_array(&col, &metrics);
1738
1739        // Roundtrip through ProtoArray format.
1740        {
1741            let proto = col.to_data().into_proto();
1742            let bytes = proto.encode_to_vec();
1743            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
1744            let array_data: ArrayData = proto.into_rust().unwrap();
1745
1746            let col_rnd = StructArray::from(array_data.clone());
1747            assert_eq!(col, col_rnd);
1748
1749            let col_dyn = arrow::array::make_array(array_data);
1750            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
1751            assert_eq!(&col, col_dyn);
1752        }
1753
1754        // Encode to Parquet.
1755        let mut buf = Vec::new();
1756        let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
1757        let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
1758        mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
1759
1760        // Decode from Parquet.
1761        let buf = Bytes::from(buf);
1762        let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
1763        let maybe_batch = reader.next();
1764
1765        // If we didn't encode any data then our record_batch will be empty.
1766        let Some(record_batch) = maybe_batch else {
1767            assert!(datas.is_empty());
1768            return;
1769        };
1770        let record_batch = record_batch.unwrap();
1771
1772        assert_eq!(record_batch.columns().len(), 1);
1773        let rnd_col = &record_batch.columns()[0];
1774        let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
1775        let rnd_col = rnd_col
1776            .as_any()
1777            .downcast_ref::<StructArray>()
1778            .unwrap()
1779            .clone();
1780
1781        // Try generating stats for the data, just to make sure we don't panic.
1782        let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
1783            .expect("valid decoder")
1784            .stats();
1785
1786        // Read back all of our data and assert it roundtrips.
1787        let mut rnd_data = SourceData(Ok(Row::default()));
1788        let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
1789        for (idx, og_data) in datas.iter().enumerate() {
1790            decoder.decode(idx, &mut rnd_data);
1791            assert_eq!(og_data, &rnd_data);
1792        }
1793
1794        // Read back all of our data a second time with a projection applied, and make sure the
1795        // stats are valid.
1796        let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
1797        let stats = RelationPartStats {
1798            name: "test",
1799            metrics: &stats_metrics,
1800            stats: &PartStats { key: stats },
1801            desc: read_desc,
1802        };
1803        let mut datum_vec = DatumVec::new();
1804        let arena = RowArena::default();
1805        let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
1806
1807        for (idx, og_data) in datas.iter().enumerate() {
1808            decoder.decode(idx, &mut rnd_data);
1809            match (&og_data.0, &rnd_data.0) {
1810                (Ok(og_row), Ok(rnd_row)) => {
1811                    // Filter down to just the Datums in the projection schema.
1812                    {
1813                        let datums = datum_vec.borrow_with(og_row);
1814                        let projected_datums =
1815                            datums.iter().enumerate().filter_map(|(idx, datum)| {
1816                                read_desc
1817                                    .contains_index(&ColumnIndex::from_raw(idx))
1818                                    .then_some(datum)
1819                            });
1820                        let og_projected_row = Row::pack(projected_datums);
1821                        assert_eq!(&og_projected_row, rnd_row);
1822                    }
1823
1824                    // Validate the stats for all of our projected columns.
1825                    {
1826                        let proj_datums = datum_vec.borrow_with(rnd_row);
1827                        for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
1828                            let spec = stats.col_stats(idx, &arena);
1829                            assert!(spec.may_contain(proj_datums[pos]));
1830                        }
1831                    }
1832                }
1833                (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
1834                (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
1835            }
1836        }
1837
1838        // Verify that the RelationDesc itself roundtrips through
1839        // {encode,decode}_schema.
1840        let encoded_schema = SourceData::encode_schema(desc);
1841        let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
1842        assert_eq!(desc, &roundtrip_desc);
1843
1844        // Verify that the RelationDesc is backward compatible with itself (this
1845        // mostly checks for `unimplemented!` type panics).
1846        let migration =
1847            mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
1848        let migration = migration.expect("should be backward compatible with self");
1849        // Also verify that the Fn doesn't do anything wonky.
1850        let migrated = migration.migrate(Arc::new(col.clone()));
1851        assert_eq!(col.data_type(), migrated.data_type());
1852    }
1853
1854    #[mz_ore::test]
1855    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1856    fn all_source_data_roundtrips() {
1857        let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
1858        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1859            weights.extend([
1860                (10, Just(32..128)),
1861                (5, Just(128..512)),
1862                (3, Just(512..2048)),
1863                (1, Just(2048..8192)),
1864            ]);
1865        }
1866        let num_rows = Union::new_weighted(weights);
1867
1868        // TODO(parkmycar): There are so many clones going on here, and maybe we can avoid them?
1869        let strat = (any::<RelationDesc>(), num_rows)
1870            .prop_flat_map(|(desc, num_rows)| {
1871                arb_relation_desc_projection(desc.clone())
1872                    .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
1873            })
1874            .prop_flat_map(|(desc, read_desc, num_rows)| {
1875                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
1876                    .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
1877            });
1878
1879        proptest!(|((config, (desc, source_datas, read_desc)) in (any::<EncodingConfig>(), strat))| {
1880            roundtrip_source_data(&desc, source_datas, &read_desc, &config);
1881        });
1882    }
1883
1884    #[mz_ore::test]
1885    fn roundtrip_error_nulls() {
1886        let desc = RelationDescBuilder::default()
1887            .with_column(
1888                "ts",
1889                SqlScalarType::TimestampTz { precision: None }.nullable(false),
1890            )
1891            .finish();
1892        let source_datas = vec![SourceData(Err(DataflowError::EvalError(
1893            EvalError::DateOutOfRange.into(),
1894        )))];
1895        let config = EncodingConfig::default();
1896        roundtrip_source_data(&desc, source_datas, &desc, &config);
1897    }
1898
1899    fn is_sorted(array: &dyn Array) -> bool {
1900        let sort_options = arrow::compute::SortOptions::default();
1901        let Ok(cmp) = make_comparator(array, array, sort_options) else {
1902            // TODO: arrow v51.0.0 doesn't support comparing structs. When
1903            // we migrate to v52+, the `build_compare` function is
1904            // deprecated and replaced by `make_comparator`, which does
1905            // support structs. At which point, this will work (and we
1906            // should switch this early return to an expect, if possible).
1907            return false;
1908        };
1909        (0..array.len())
1910            .tuple_windows()
1911            .all(|(i, j)| cmp(i, j).is_le())
1912    }
1913
1914    fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
1915        use mz_persist_types::columnar::ColumnEncoder;
1916        let array = Schema::encoder(schema).expect("valid schema").finish();
1917        Array::data_type(&array).clone()
1918    }
1919
1920    #[track_caller]
1921    fn backward_compatible_testcase(
1922        old: &RelationDesc,
1923        new: &RelationDesc,
1924        migration: Migration,
1925        datas: &[SourceData],
1926    ) {
1927        let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
1928        for data in datas {
1929            encoder.append(data);
1930        }
1931        let old = encoder.finish();
1932        let new = Schema::<SourceData>::encoder(new)
1933            .expect("valid schema")
1934            .finish();
1935        let old: Arc<dyn Array> = Arc::new(old);
1936        let new: Arc<dyn Array> = Arc::new(new);
1937        let migrated = migration.migrate(Arc::clone(&old));
1938        assert_eq!(migrated.data_type(), new.data_type());
1939
1940        // Check the sortedness preservation, if we can.
1941        if migration.preserves_order() && is_sorted(&old) {
1942            assert!(is_sorted(&new))
1943        }
1944    }
1945
1946    #[mz_ore::test]
1947    fn backward_compatible_empty_add_column() {
1948        let old = RelationDesc::empty();
1949        let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1950
1951        let old_data_type = get_data_type(&old);
1952        let new_data_type = get_data_type(&new);
1953
1954        let migration = backward_compatible(&old_data_type, &new_data_type);
1955        assert!(migration.is_some());
1956    }
1957
1958    #[mz_ore::test]
1959    fn backward_compatible_project_away_all() {
1960        let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1961        let new = RelationDesc::empty();
1962
1963        let old_data_type = get_data_type(&old);
1964        let new_data_type = get_data_type(&new);
1965
1966        let migration = backward_compatible(&old_data_type, &new_data_type);
1967        assert!(migration.is_some());
1968    }
1969
1970    #[mz_ore::test]
1971    #[cfg_attr(miri, ignore)]
1972    fn backward_compatible_migrate() {
1973        let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
1974            proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
1975                .prop_map(move |datas| (old.clone(), new.clone(), datas))
1976        });
1977
1978        proptest!(|((old, new, datas) in strat)| {
1979            let old_data_type = get_data_type(&old);
1980            let new_data_type = get_data_type(&new);
1981
1982            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
1983                backward_compatible_testcase(&old, &new, migration, &datas);
1984            };
1985        });
1986    }
1987
1988    #[mz_ore::test]
1989    #[cfg_attr(miri, ignore)]
1990    fn backward_compatible_migrate_from_common() {
1991        use mz_repr::SqlColumnType;
1992        fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
1993            // TODO(parkmycar): As we iterate on schema migrations more things should become compatible.
1994            let should_be_compatible = diffs.iter().all(|diff| match diff {
1995                // We only support adding nullable columns.
1996                PropRelationDescDiff::AddColumn {
1997                    typ: SqlColumnType { nullable, .. },
1998                    ..
1999                } => *nullable,
2000                PropRelationDescDiff::DropColumn { .. } => true,
2001                _ => false,
2002            });
2003
2004            let mut new = old.clone();
2005            for diff in diffs.into_iter() {
2006                diff.apply(&mut new)
2007            }
2008
2009            let old_data_type = get_data_type(&old);
2010            let new_data_type = get_data_type(&new);
2011
2012            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2013                backward_compatible_testcase(&old, &new, migration, &datas);
2014            } else if should_be_compatible {
2015                panic!("new DataType was not compatible when it should have been!");
2016            }
2017        }
2018
2019        let strat = any::<RelationDesc>()
2020            .prop_flat_map(|desc| {
2021                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2022                    .no_shrink()
2023                    .prop_map(move |datas| (desc.clone(), datas))
2024            })
2025            .prop_flat_map(|(desc, datas)| {
2026                arb_relation_desc_diff(&desc)
2027                    .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2028            });
2029
2030        proptest!(|((old, diffs, datas) in strat)| {
2031            test_case(old, diffs, datas);
2032        });
2033    }
2034
2035    #[mz_ore::test]
2036    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2037    fn empty_relation_desc_roundtrips() {
2038        let empty = RelationDesc::empty();
2039        let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2040            .prop_map(move |datas| (empty.clone(), datas));
2041
2042        // Note: This case should be covered by the `all_source_data_roundtrips` test above, but
2043        // it's a special case that we explicitly want to exercise.
2044        proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2045            roundtrip_source_data(&desc, source_datas, &desc, &config);
2046        });
2047    }
2048
2049    #[mz_ore::test]
2050    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2051    fn arrow_datatype_consistent() {
2052        fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2053            let half = datas.len() / 2;
2054
2055            let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2056            for data in &datas[..half] {
2057                encoder_a.append(data);
2058            }
2059            let col_a = encoder_a.finish();
2060
2061            let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2062            for data in &datas[half..] {
2063                encoder_b.append(data);
2064            }
2065            let col_b = encoder_b.finish();
2066
2067            // The DataType of the resulting column should not change based on what data was
2068            // encoded.
2069            assert_eq!(col_a.data_type(), col_b.data_type());
2070        }
2071
2072        let num_rows = 12;
2073        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2074            proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2075                .prop_map(move |datas| (desc.clone(), datas))
2076        });
2077
2078        proptest!(|((desc, data) in strat)| {
2079            test_case(desc, data);
2080        });
2081    }
2082
2083    #[mz_ore::test]
2084    #[cfg_attr(miri, ignore)] // too slow
2085    fn source_proto_serialization_stability() {
2086        let min_protos = 10;
2087        let encoded = include_str!("snapshots/source-datas.txt");
2088
2089        // Decode the pre-generated source datas
2090        let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2091            .lines()
2092            .map(|s| {
2093                let (desc, data) = s.split_once(',').expect("comma separated data");
2094                let desc = base64::engine::general_purpose::STANDARD
2095                    .decode(desc)
2096                    .expect("valid base64");
2097                let data = base64::engine::general_purpose::STANDARD
2098                    .decode(data)
2099                    .expect("valid base64");
2100                (desc, data)
2101            })
2102            .map(|(desc, data)| {
2103                let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2104                let desc = desc.into_rust().expect("valid proto");
2105                let data = SourceData::decode(&data, &desc).expect("valid proto");
2106                (desc, data)
2107            })
2108            .collect();
2109
2110        // If there are fewer than the minimum examples, generate some new ones arbitrarily
2111        let mut runner = proptest::test_runner::TestRunner::deterministic();
2112        let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2113            arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2114        });
2115        while decoded.len() < min_protos {
2116            let arbitrary_data = strategy
2117                .new_tree(&mut runner)
2118                .expect("source data")
2119                .current();
2120            decoded.push(arbitrary_data);
2121        }
2122
2123        // Reencode and compare the strings
2124        let mut reencoded = String::new();
2125        let mut buf = vec![];
2126        for (desc, data) in decoded {
2127            buf.clear();
2128            desc.into_proto().encode(&mut buf).expect("success");
2129            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2130            reencoded.push(',');
2131
2132            buf.clear();
2133            data.encode(&mut buf);
2134            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2135            reencoded.push('\n');
2136        }
2137
2138        // Optimizations in Persist, particularly consolidation on read,
2139        // depend on a stable serialization for the serialized data.
2140        // For example, reordering proto fields could cause us
2141        // to generate a different (equivalent) serialization for a record,
2142        // and the two versions would not consolidate out.
2143        // This can impact correctness!
2144        //
2145        // If you need to change how SourceDatas are encoded, that's still fine...
2146        // but we'll also need to increase
2147        // the MINIMUM_CONSOLIDATED_VERSION as part of the same release.
2148        assert_eq!(
2149            encoded,
2150            reencoded.as_str(),
2151            "SourceData serde should be stable"
2152        )
2153    }
2154}