Skip to main content

mz_storage_types/
sources.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits related to the introduction of changing collections into `dataflow`.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33    ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34    StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38    CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39    RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use timely::order::{PartialOrder, TotalOrder};
47use timely::progress::timestamp::Refines;
48use timely::progress::{PathSummary, Timestamp};
49
50use crate::AlterCompatible;
51use crate::connections::inline::{
52    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
53    ReferencedConnection,
54};
55use crate::controller::AlterError;
56use crate::errors::{DataflowError, ProtoDataflowError};
57use crate::instances::StorageInstanceId;
58use crate::sources::sql_server::SqlServerSourceExportDetails;
59
60pub mod encoding;
61pub mod envelope;
62pub mod kafka;
63pub mod load_generator;
64pub mod mysql;
65pub mod postgres;
66pub mod sql_server;
67
68pub use crate::sources::envelope::SourceEnvelope;
69pub use crate::sources::kafka::KafkaSourceConnection;
70pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
71pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
72pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
73pub use crate::sources::sql_server::{SqlServerSourceConnection, SqlServerSourceExtras};
74
75include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
76
77/// A description of a source ingestion
78#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
79pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
80    /// The source description.
81    pub desc: SourceDesc<C>,
82    /// Collections to be exported by this ingestion.
83    ///
84    /// # Notes
85    /// - For multi-output sources:
86    ///     - Add exports by adding a new [`SourceExport`].
87    ///     - Remove exports by removing the [`SourceExport`].
88    ///
89    ///   Re-rendering/executing the source after making these modifications
90    ///   adds and drops the subsource, respectively.
91    /// - This field includes the primary source's ID, which might need to be
92    ///   filtered out to understand which exports are explicit ingestion exports.
93    /// - This field does _not_ include the remap collection, which is tracked
94    ///   in its own field.
95    pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
96    /// The ID of the instance in which to install the source.
97    pub instance_id: StorageInstanceId,
98    /// The ID of this ingestion's remap/progress collection.
99    pub remap_collection_id: GlobalId,
100    /// The storage metadata for the remap/progress collection
101    pub remap_metadata: S,
102}
103
104impl IngestionDescription {
105    pub fn new(
106        desc: SourceDesc,
107        instance_id: StorageInstanceId,
108        remap_collection_id: GlobalId,
109    ) -> Self {
110        Self {
111            desc,
112            remap_metadata: (),
113            source_exports: BTreeMap::new(),
114            instance_id,
115            remap_collection_id,
116        }
117    }
118}
119
120impl<S> IngestionDescription<S> {
121    /// Return an iterator over the `GlobalId`s of `self`'s collections.
122    /// This will contain ids for the remap collection, subsources,
123    /// tables for this source, and the primary collection ID, even if
124    /// no data will be exported to the primary collection.
125    pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
126        // Expand self so that any new fields added generate a compiler error to
127        // increase the likelihood of developers seeing this function.
128        let IngestionDescription {
129            desc: _,
130            remap_metadata: _,
131            source_exports,
132            instance_id: _,
133            remap_collection_id,
134        } = &self;
135
136        source_exports
137            .keys()
138            .copied()
139            .chain(std::iter::once(*remap_collection_id))
140    }
141}
142
143impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
144    fn alter_compatible(
145        &self,
146        id: GlobalId,
147        other: &IngestionDescription<S>,
148    ) -> Result<(), AlterError> {
149        if self == other {
150            return Ok(());
151        }
152        let IngestionDescription {
153            desc,
154            remap_metadata,
155            source_exports,
156            instance_id,
157            remap_collection_id,
158        } = self;
159
160        let compatibility_checks = [
161            (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
162            (remap_metadata == &other.remap_metadata, "remap_metadata"),
163            (
164                source_exports
165                    .iter()
166                    .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
167                        l_key.cmp(r_key)
168                    })
169                    .all(|r| match r {
170                        Both(
171                            (
172                                _,
173                                SourceExport {
174                                    storage_metadata: l_metadata,
175                                    details: l_details,
176                                    data_config: l_data_config,
177                                },
178                            ),
179                            (
180                                _,
181                                SourceExport {
182                                    storage_metadata: r_metadata,
183                                    details: r_details,
184                                    data_config: r_data_config,
185                                },
186                            ),
187                        ) => {
188                            l_metadata.alter_compatible(id, r_metadata).is_ok()
189                                && l_details.alter_compatible(id, r_details).is_ok()
190                                && l_data_config.alter_compatible(id, r_data_config).is_ok()
191                        }
192                        _ => true,
193                    }),
194                "source_exports",
195            ),
196            (instance_id == &other.instance_id, "instance_id"),
197            (
198                remap_collection_id == &other.remap_collection_id,
199                "remap_collection_id",
200            ),
201        ];
202        for (compatible, field) in compatibility_checks {
203            if !compatible {
204                tracing::warn!(
205                    "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
206                    self,
207                    other
208                );
209
210                return Err(AlterError { id });
211            }
212        }
213
214        Ok(())
215    }
216}
217
218impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
219    for IngestionDescription<(), ReferencedConnection>
220{
221    fn into_inline_connection(self, r: R) -> IngestionDescription {
222        let IngestionDescription {
223            desc,
224            remap_metadata,
225            source_exports,
226            instance_id,
227            remap_collection_id,
228        } = self;
229
230        IngestionDescription {
231            desc: desc.into_inline_connection(r),
232            remap_metadata,
233            source_exports,
234            instance_id,
235            remap_collection_id,
236        }
237    }
238}
239
240#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
241pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
242    /// The collection metadata needed to write the exported data
243    pub storage_metadata: S,
244    /// Details necessary for the source to export data to this export's collection.
245    pub details: SourceExportDetails,
246    /// Config necessary to handle (e.g. decode and envelope) the data for this export.
247    pub data_config: SourceExportDataConfig<C>,
248}
249
250pub trait SourceTimestamp:
251    Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
252{
253    fn encode_row(&self) -> Row;
254    fn decode_row(row: &Row) -> Self;
255}
256
257impl SourceTimestamp for MzOffset {
258    fn encode_row(&self) -> Row {
259        Row::pack([Datum::UInt64(self.offset)])
260    }
261
262    fn decode_row(row: &Row) -> Self {
263        let mut datums = row.iter();
264        match (datums.next(), datums.next()) {
265            (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
266            _ => panic!("invalid row {row:?}"),
267        }
268    }
269}
270
271/// Universal language for describing message positions in Materialize, in a source independent
272/// way. Individual sources like Kafka or File sources should explicitly implement their own offset
273/// type that converts to/From MzOffsets. A 0-MzOffset denotes an empty stream.
274#[derive(
275    Copy,
276    Clone,
277    Default,
278    Debug,
279    PartialEq,
280    PartialOrd,
281    Eq,
282    Ord,
283    Hash,
284    Serialize,
285    Deserialize
286)]
287pub struct MzOffset {
288    pub offset: u64,
289}
290
291impl differential_dataflow::difference::Semigroup for MzOffset {
292    fn plus_equals(&mut self, rhs: &Self) {
293        self.offset.plus_equals(&rhs.offset)
294    }
295}
296
297impl differential_dataflow::difference::IsZero for MzOffset {
298    fn is_zero(&self) -> bool {
299        self.offset.is_zero()
300    }
301}
302
303impl mz_persist_types::Codec64 for MzOffset {
304    fn codec_name() -> String {
305        "MzOffset".to_string()
306    }
307
308    fn encode(&self) -> [u8; 8] {
309        mz_persist_types::Codec64::encode(&self.offset)
310    }
311
312    fn decode(buf: [u8; 8]) -> Self {
313        Self {
314            offset: mz_persist_types::Codec64::decode(buf),
315        }
316    }
317}
318
319impl columnation::Columnation for MzOffset {
320    type InnerRegion = columnation::CopyRegion<MzOffset>;
321}
322
323impl MzOffset {
324    pub fn checked_sub(self, other: Self) -> Option<Self> {
325        self.offset
326            .checked_sub(other.offset)
327            .map(|offset| Self { offset })
328    }
329}
330
331/// Convert from MzOffset to Kafka::Offset as long as
332/// the offset is not negative
333impl From<u64> for MzOffset {
334    fn from(offset: u64) -> Self {
335        Self { offset }
336    }
337}
338
339impl std::fmt::Display for MzOffset {
340    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341        write!(f, "{}", self.offset)
342    }
343}
344
345// Assume overflow does not occur for addition
346impl Add<u64> for MzOffset {
347    type Output = MzOffset;
348
349    fn add(self, x: u64) -> MzOffset {
350        MzOffset {
351            offset: self.offset + x,
352        }
353    }
354}
355impl Add<Self> for MzOffset {
356    type Output = Self;
357
358    fn add(self, x: Self) -> Self {
359        MzOffset {
360            offset: self.offset + x.offset,
361        }
362    }
363}
364impl AddAssign<u64> for MzOffset {
365    fn add_assign(&mut self, x: u64) {
366        self.offset += x;
367    }
368}
369impl AddAssign<Self> for MzOffset {
370    fn add_assign(&mut self, x: Self) {
371        self.offset += x.offset;
372    }
373}
374
375/// Convert from `PgLsn` to MzOffset
376impl From<tokio_postgres::types::PgLsn> for MzOffset {
377    fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
378        MzOffset { offset: lsn.into() }
379    }
380}
381
382impl Timestamp for MzOffset {
383    type Summary = MzOffset;
384
385    fn minimum() -> Self {
386        MzOffset {
387            offset: Timestamp::minimum(),
388        }
389    }
390}
391
392impl PathSummary<MzOffset> for MzOffset {
393    fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
394        Some(MzOffset {
395            offset: self.offset.results_in(&src.offset)?,
396        })
397    }
398
399    fn followed_by(&self, other: &Self) -> Option<Self> {
400        Some(MzOffset {
401            offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
402        })
403    }
404}
405
406impl Refines<()> for MzOffset {
407    fn to_inner(_: ()) -> Self {
408        MzOffset::minimum()
409    }
410    fn to_outer(self) {}
411    fn summarize(_: Self::Summary) {}
412}
413
414impl PartialOrder for MzOffset {
415    #[inline]
416    fn less_equal(&self, other: &Self) -> bool {
417        self.offset.less_equal(&other.offset)
418    }
419}
420
421impl TotalOrder for MzOffset {}
422
423/// The meaning of the timestamp number produced by data sources. This type
424/// is not concerned with the source of the timestamp (like if the data came
425/// from a Debezium consistency topic or a CDCv2 stream), instead only what the
426/// timestamp number means.
427///
428/// Some variants here have attached data used to differentiate incomparable
429/// instantiations. These attached data types should be expanded in the future
430/// if we need to tell apart more kinds of sources.
431#[derive(
432    Clone,
433    Debug,
434    Ord,
435    PartialOrd,
436    Eq,
437    PartialEq,
438    Serialize,
439    Deserialize,
440    Hash
441)]
442pub enum Timeline {
443    /// EpochMilliseconds means the timestamp is the number of milliseconds since
444    /// the Unix epoch.
445    EpochMilliseconds,
446    /// External means the timestamp comes from an external data source and we
447    /// don't know what the number means. The attached String is the source's name,
448    /// which will result in different sources being incomparable.
449    External(String),
450    /// User means the user has manually specified a timeline. The attached
451    /// String is specified by the user, allowing them to decide sources that are
452    /// joinable.
453    User(String),
454}
455
456impl Timeline {
457    const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
458    const EXTERNAL_ID_CHAR: char = 'E';
459    const USER_ID_CHAR: char = 'U';
460
461    fn id_char(&self) -> char {
462        match self {
463            Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
464            Self::External(_) => Self::EXTERNAL_ID_CHAR,
465            Self::User(_) => Self::USER_ID_CHAR,
466        }
467    }
468}
469
470impl ToString for Timeline {
471    fn to_string(&self) -> String {
472        match self {
473            Self::EpochMilliseconds => format!("{}", self.id_char()),
474            Self::External(id) => format!("{}.{id}", self.id_char()),
475            Self::User(id) => format!("{}.{id}", self.id_char()),
476        }
477    }
478}
479
480impl FromStr for Timeline {
481    type Err = String;
482
483    fn from_str(s: &str) -> Result<Self, Self::Err> {
484        if s.is_empty() {
485            return Err("empty timeline".to_string());
486        }
487        let mut chars = s.chars();
488        match chars.next().expect("non-empty string") {
489            Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
490                None => Ok(Self::EpochMilliseconds),
491                Some(_) => Err(format!("unknown timeline: {s}")),
492            },
493            Self::EXTERNAL_ID_CHAR => match chars.next() {
494                Some('.') => Ok(Self::External(chars.as_str().to_string())),
495                _ => Err(format!("unknown timeline: {s}")),
496            },
497            Self::USER_ID_CHAR => match chars.next() {
498                Some('.') => Ok(Self::User(chars.as_str().to_string())),
499                _ => Err(format!("unknown timeline: {s}")),
500            },
501            _ => Err(format!("unknown timeline: {s}")),
502        }
503    }
504}
505
506/// A connection to an external system
507pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
508    /// The name of the external system (e.g kafka, postgres, etc).
509    fn name(&self) -> &'static str;
510
511    /// The name of the resource in the external system (e.g kafka topic) if any
512    fn external_reference(&self) -> Option<&str>;
513
514    /// Defines the key schema to use by default for this source connection type.
515    /// This will be used for the primary export of the source and as the default
516    /// pre-encoding key schema for the source.
517    fn default_key_desc(&self) -> RelationDesc;
518
519    /// Defines the value schema to use by default for this source connection type.
520    /// This will be used for the primary export of the source and as the default
521    /// pre-encoding value schema for the source.
522    fn default_value_desc(&self) -> RelationDesc;
523
524    /// The schema of this connection's timestamp type. This will also be the schema of the
525    /// progress relation.
526    fn timestamp_desc(&self) -> RelationDesc;
527
528    /// The id of the connection object (i.e the one obtained from running `CREATE CONNECTION`) in
529    /// the catalog, if any.
530    fn connection_id(&self) -> Option<CatalogItemId>;
531
532    /// Whether the source type supports read only mode.
533    fn supports_read_only(&self) -> bool;
534
535    /// Whether the source type prefers to run on only one replica of a multi-replica cluster.
536    fn prefers_single_replica(&self) -> bool;
537}
538
539#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
540pub enum Compression {
541    Gzip,
542    None,
543}
544
545/// Defines the configuration for how to handle data that is exported for a given
546/// Source Export.
547#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
548pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
549    pub encoding: Option<encoding::SourceDataEncoding<C>>,
550    pub envelope: SourceEnvelope,
551}
552
553impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
554    for SourceExportDataConfig<ReferencedConnection>
555{
556    fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
557        let SourceExportDataConfig { encoding, envelope } = self;
558
559        SourceExportDataConfig {
560            encoding: encoding.map(|e| e.into_inline_connection(r)),
561            envelope,
562        }
563    }
564}
565
566impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
567    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
568        if self == other {
569            return Ok(());
570        }
571        let Self { encoding, envelope } = &self;
572
573        let compatibility_checks = [
574            (
575                match (encoding, &other.encoding) {
576                    (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
577                    (s, o) => s == o,
578                },
579                "encoding",
580            ),
581            (envelope == &other.envelope, "envelope"),
582        ];
583
584        for (compatible, field) in compatibility_checks {
585            if !compatible {
586                tracing::warn!(
587                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
588                    self,
589                    other
590                );
591
592                return Err(AlterError { id });
593            }
594        }
595        Ok(())
596    }
597}
598
599impl<C: ConnectionAccess> SourceExportDataConfig<C> {
600    /// Returns `true` if this connection yields data that is
601    /// append-only/monotonic. Append-monly means the source
602    /// never produces retractions.
603    // TODO(guswynn): consider enforcing this more completely at the
604    // parsing/typechecking level, by not using an `envelope`
605    // for sources like pg
606    pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
607        match &self.envelope {
608            // Upsert and CdcV2 may produce retractions.
609            SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
610            SourceEnvelope::None(_) => {
611                match connection {
612                    // Postgres can produce retractions (deletes).
613                    GenericSourceConnection::Postgres(_) => false,
614                    // MySQL can produce retractions (deletes).
615                    GenericSourceConnection::MySql(_) => false,
616                    // SQL Server can produce retractions (deletes).
617                    GenericSourceConnection::SqlServer(_) => false,
618                    // Whether or not a Loadgen source can produce retractions varies.
619                    GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
620                    // Kafka exports with `None` envelope are append-only.
621                    GenericSourceConnection::Kafka(_) => true,
622                }
623            }
624        }
625    }
626}
627
628/// An external source of updates for a relational collection.
629#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
630pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
631    pub connection: GenericSourceConnection<C>,
632    pub timestamp_interval: Duration,
633}
634
635impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
636    for SourceDesc<ReferencedConnection>
637{
638    fn into_inline_connection(self, r: R) -> SourceDesc {
639        let SourceDesc {
640            connection,
641            timestamp_interval,
642        } = self;
643
644        SourceDesc {
645            connection: connection.into_inline_connection(&r),
646            timestamp_interval,
647        }
648    }
649}
650
651impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
652    /// Determines if `self` is compatible with another `SourceDesc`, in such a
653    /// way that it is possible to turn `self` into `other` through a valid
654    /// series of transformations (e.g. no transformation or `ALTER SOURCE`).
655    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
656        if self == other {
657            return Ok(());
658        }
659        let Self {
660            connection,
661            // timestamp_interval is allowed to change via ALTER SOURCE
662            timestamp_interval: _,
663        } = &self;
664
665        let compatibility_checks = [(
666            connection.alter_compatible(id, &other.connection).is_ok(),
667            "connection",
668        )];
669
670        for (compatible, field) in compatibility_checks {
671            if !compatible {
672                tracing::warn!(
673                    "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
674                    self,
675                    other
676                );
677
678                return Err(AlterError { id });
679            }
680        }
681
682        Ok(())
683    }
684}
685
686#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
687pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
688    Kafka(KafkaSourceConnection<C>),
689    Postgres(PostgresSourceConnection<C>),
690    MySql(MySqlSourceConnection<C>),
691    SqlServer(SqlServerSourceConnection<C>),
692    LoadGenerator(LoadGeneratorSourceConnection),
693}
694
695impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
696    fn from(conn: KafkaSourceConnection<C>) -> Self {
697        Self::Kafka(conn)
698    }
699}
700
701impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
702    fn from(conn: PostgresSourceConnection<C>) -> Self {
703        Self::Postgres(conn)
704    }
705}
706
707impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
708    fn from(conn: MySqlSourceConnection<C>) -> Self {
709        Self::MySql(conn)
710    }
711}
712
713impl<C: ConnectionAccess> From<SqlServerSourceConnection<C>> for GenericSourceConnection<C> {
714    fn from(conn: SqlServerSourceConnection<C>) -> Self {
715        Self::SqlServer(conn)
716    }
717}
718
719impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
720    fn from(conn: LoadGeneratorSourceConnection) -> Self {
721        Self::LoadGenerator(conn)
722    }
723}
724
725impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
726    for GenericSourceConnection<ReferencedConnection>
727{
728    fn into_inline_connection(self, r: R) -> GenericSourceConnection {
729        match self {
730            GenericSourceConnection::Kafka(kafka) => {
731                GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
732            }
733            GenericSourceConnection::Postgres(pg) => {
734                GenericSourceConnection::Postgres(pg.into_inline_connection(r))
735            }
736            GenericSourceConnection::MySql(mysql) => {
737                GenericSourceConnection::MySql(mysql.into_inline_connection(r))
738            }
739            GenericSourceConnection::SqlServer(sql_server) => {
740                GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
741            }
742            GenericSourceConnection::LoadGenerator(lg) => {
743                GenericSourceConnection::LoadGenerator(lg)
744            }
745        }
746    }
747}
748
749impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
750    fn name(&self) -> &'static str {
751        match self {
752            Self::Kafka(conn) => conn.name(),
753            Self::Postgres(conn) => conn.name(),
754            Self::MySql(conn) => conn.name(),
755            Self::SqlServer(conn) => conn.name(),
756            Self::LoadGenerator(conn) => conn.name(),
757        }
758    }
759
760    fn external_reference(&self) -> Option<&str> {
761        match self {
762            Self::Kafka(conn) => conn.external_reference(),
763            Self::Postgres(conn) => conn.external_reference(),
764            Self::MySql(conn) => conn.external_reference(),
765            Self::SqlServer(conn) => conn.external_reference(),
766            Self::LoadGenerator(conn) => conn.external_reference(),
767        }
768    }
769
770    fn default_key_desc(&self) -> RelationDesc {
771        match self {
772            Self::Kafka(conn) => conn.default_key_desc(),
773            Self::Postgres(conn) => conn.default_key_desc(),
774            Self::MySql(conn) => conn.default_key_desc(),
775            Self::SqlServer(conn) => conn.default_key_desc(),
776            Self::LoadGenerator(conn) => conn.default_key_desc(),
777        }
778    }
779
780    fn default_value_desc(&self) -> RelationDesc {
781        match self {
782            Self::Kafka(conn) => conn.default_value_desc(),
783            Self::Postgres(conn) => conn.default_value_desc(),
784            Self::MySql(conn) => conn.default_value_desc(),
785            Self::SqlServer(conn) => conn.default_value_desc(),
786            Self::LoadGenerator(conn) => conn.default_value_desc(),
787        }
788    }
789
790    fn timestamp_desc(&self) -> RelationDesc {
791        match self {
792            Self::Kafka(conn) => conn.timestamp_desc(),
793            Self::Postgres(conn) => conn.timestamp_desc(),
794            Self::MySql(conn) => conn.timestamp_desc(),
795            Self::SqlServer(conn) => conn.timestamp_desc(),
796            Self::LoadGenerator(conn) => conn.timestamp_desc(),
797        }
798    }
799
800    fn connection_id(&self) -> Option<CatalogItemId> {
801        match self {
802            Self::Kafka(conn) => conn.connection_id(),
803            Self::Postgres(conn) => conn.connection_id(),
804            Self::MySql(conn) => conn.connection_id(),
805            Self::SqlServer(conn) => conn.connection_id(),
806            Self::LoadGenerator(conn) => conn.connection_id(),
807        }
808    }
809
810    fn supports_read_only(&self) -> bool {
811        match self {
812            GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
813            GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
814            GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
815            GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
816            GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
817        }
818    }
819
820    fn prefers_single_replica(&self) -> bool {
821        match self {
822            GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
823            GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
824            GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
825            GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
826            GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
827        }
828    }
829}
830impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
831    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
832        if self == other {
833            return Ok(());
834        }
835        let r = match (self, other) {
836            (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
837            (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
838            (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
839            (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
840            (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
841                conn.alter_compatible(id, other)
842            }
843            _ => Err(AlterError { id }),
844        };
845
846        if r.is_err() {
847            tracing::warn!(
848                "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
849                self,
850                other
851            );
852        }
853
854        r
855    }
856}
857
858/// Details necessary for each source export to allow the source implementations
859/// to export data to the export's collection.
860#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
861pub enum SourceExportDetails {
862    /// Used when the primary collection of a source isn't an export to
863    /// output to.
864    None,
865    Kafka(KafkaSourceExportDetails),
866    Postgres(PostgresSourceExportDetails),
867    MySql(MySqlSourceExportDetails),
868    SqlServer(SqlServerSourceExportDetails),
869    LoadGenerator(LoadGeneratorSourceExportDetails),
870}
871
872impl crate::AlterCompatible for SourceExportDetails {
873    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
874        if self == other {
875            return Ok(());
876        }
877        let r = match (self, other) {
878            (Self::None, Self::None) => Ok(()),
879            (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
880            (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
881            (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
882            (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
883            _ => Err(AlterError { id }),
884        };
885
886        if r.is_err() {
887            tracing::warn!(
888                "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
889                self,
890                other
891            );
892        }
893
894        r
895    }
896}
897
898/// Details necessary to store in the `Details` option of a source export
899/// statement (`CREATE SUBSOURCE` and `CREATE TABLE .. FROM SOURCE` statements),
900/// to generate the appropriate `SourceExportDetails` struct during planning.
901/// NOTE that this is serialized as proto to the catalog, so any changes here
902/// must be backwards compatible or will require a migration.
903pub enum SourceExportStatementDetails {
904    Postgres {
905        table: mz_postgres_util::desc::PostgresTableDesc,
906    },
907    MySql {
908        table: mz_mysql_util::MySqlTableDesc,
909        initial_gtid_set: String,
910    },
911    SqlServer {
912        table: mz_sql_server_util::desc::SqlServerTableDesc,
913        capture_instance: Arc<str>,
914        initial_lsn: mz_sql_server_util::cdc::Lsn,
915    },
916    LoadGenerator {
917        output: LoadGeneratorOutput,
918    },
919    Kafka {},
920}
921
922impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
923    fn into_proto(&self) -> ProtoSourceExportStatementDetails {
924        match self {
925            SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
926                kind: Some(proto_source_export_statement_details::Kind::Postgres(
927                    postgres::ProtoPostgresSourceExportStatementDetails {
928                        table: Some(table.into_proto()),
929                    },
930                )),
931            },
932            SourceExportStatementDetails::MySql {
933                table,
934                initial_gtid_set,
935            } => ProtoSourceExportStatementDetails {
936                kind: Some(proto_source_export_statement_details::Kind::Mysql(
937                    mysql::ProtoMySqlSourceExportStatementDetails {
938                        table: Some(table.into_proto()),
939                        initial_gtid_set: initial_gtid_set.clone(),
940                    },
941                )),
942            },
943            SourceExportStatementDetails::SqlServer {
944                table,
945                capture_instance,
946                initial_lsn,
947            } => ProtoSourceExportStatementDetails {
948                kind: Some(proto_source_export_statement_details::Kind::SqlServer(
949                    sql_server::ProtoSqlServerSourceExportStatementDetails {
950                        table: Some(table.into_proto()),
951                        capture_instance: capture_instance.to_string(),
952                        initial_lsn: initial_lsn.as_bytes().to_vec(),
953                    },
954                )),
955            },
956            SourceExportStatementDetails::LoadGenerator { output } => {
957                ProtoSourceExportStatementDetails {
958                    kind: Some(proto_source_export_statement_details::Kind::Loadgen(
959                        load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
960                            output: output.into_proto().into(),
961                        },
962                    )),
963                }
964            }
965            SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
966                kind: Some(proto_source_export_statement_details::Kind::Kafka(
967                    kafka::ProtoKafkaSourceExportStatementDetails {},
968                )),
969            },
970        }
971    }
972
973    fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
974        use proto_source_export_statement_details::Kind;
975        Ok(match proto.kind {
976            Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
977                table: details
978                    .table
979                    .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
980            },
981            Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
982                table: details
983                    .table
984                    .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
985
986                initial_gtid_set: details.initial_gtid_set,
987            },
988            Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
989                table: details
990                    .table
991                    .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
992                capture_instance: details.capture_instance.into(),
993                initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
994                    .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
995            },
996            Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
997                output: details
998                    .output
999                    .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1000            },
1001            Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1002            None => {
1003                return Err(TryFromProtoError::missing_field(
1004                    "ProtoSourceExportStatementDetails::kind",
1005                ));
1006            }
1007        })
1008    }
1009}
1010
1011#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1012#[repr(transparent)]
1013pub struct SourceData(pub Result<Row, DataflowError>);
1014
1015impl Default for SourceData {
1016    fn default() -> Self {
1017        SourceData(Ok(Row::default()))
1018    }
1019}
1020
1021impl Deref for SourceData {
1022    type Target = Result<Row, DataflowError>;
1023
1024    fn deref(&self) -> &Self::Target {
1025        &self.0
1026    }
1027}
1028
1029impl DerefMut for SourceData {
1030    fn deref_mut(&mut self) -> &mut Self::Target {
1031        &mut self.0
1032    }
1033}
1034
1035impl RustType<ProtoSourceData> for SourceData {
1036    fn into_proto(&self) -> ProtoSourceData {
1037        use proto_source_data::Kind;
1038        ProtoSourceData {
1039            kind: Some(match &**self {
1040                Ok(row) => Kind::Ok(row.into_proto()),
1041                Err(err) => Kind::Err(err.into_proto()),
1042            }),
1043        }
1044    }
1045
1046    fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1047        use proto_source_data::Kind;
1048        match proto.kind {
1049            Some(kind) => match kind {
1050                Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1051                Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1052            },
1053            None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1054        }
1055    }
1056}
1057
1058impl Codec for SourceData {
1059    type Storage = ProtoRow;
1060    type Schema = RelationDesc;
1061
1062    fn codec_name() -> String {
1063        "protobuf[SourceData]".into()
1064    }
1065
1066    fn encode<B: BufMut>(&self, buf: &mut B) {
1067        self.into_proto()
1068            .encode(buf)
1069            .expect("no required fields means no initialization errors");
1070    }
1071
1072    fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1073        let mut val = SourceData::default();
1074        <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1075        Ok(val)
1076    }
1077
1078    fn decode_from<'a>(
1079        &mut self,
1080        buf: &'a [u8],
1081        storage: &mut Option<ProtoRow>,
1082        schema: &RelationDesc,
1083    ) -> Result<(), String> {
1084        // Optimize for common case of `Ok` by leaving a (cleared) `ProtoRow` in
1085        // the `Ok` variant of `ProtoSourceData`. prost's `Message::merge` impl
1086        // is smart about reusing the `Vec<Datum>` when it can.
1087        let mut proto = storage.take().unwrap_or_default();
1088        proto.clear();
1089        let mut proto = ProtoSourceData {
1090            kind: Some(proto_source_data::Kind::Ok(proto)),
1091        };
1092        proto.merge(buf).map_err(|err| err.to_string())?;
1093        match (proto.kind, &mut self.0) {
1094            // Again, optimize for the common case...
1095            (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1096                let ret = row.decode_from_proto(&proto, schema);
1097                storage.replace(proto);
1098                ret
1099            }
1100            // ...otherwise fall back to the obvious thing.
1101            (kind, _) => {
1102                let proto = ProtoSourceData { kind };
1103                *self = proto.into_rust().map_err(|err| err.to_string())?;
1104                // Nothing to put back in storage.
1105                Ok(())
1106            }
1107        }
1108    }
1109
1110    fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1111        match &val.0 {
1112            Ok(row) => Row::validate(row, desc),
1113            Err(_) => Ok(()),
1114        }
1115    }
1116
1117    fn encode_schema(schema: &Self::Schema) -> Bytes {
1118        schema.into_proto().encode_to_vec().into()
1119    }
1120
1121    fn decode_schema(buf: &Bytes) -> Self::Schema {
1122        let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1123        proto.into_rust().expect("valid schema")
1124    }
1125}
1126
1127/// Given a [`RelationDesc`] returns an arbitrary [`SourceData`].
1128pub fn arb_source_data_for_relation_desc(
1129    desc: &RelationDesc,
1130) -> impl Strategy<Value = SourceData> + use<> {
1131    let row_strat = arb_row_for_relation(desc).no_shrink();
1132
1133    proptest::strategy::Union::new_weighted(vec![
1134        (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1135        (
1136            1,
1137            any::<DataflowError>()
1138                .prop_map(|err| SourceData(Err(err)))
1139                .no_shrink()
1140                .boxed(),
1141        ),
1142    ])
1143}
1144
1145/// Describes how external references should be organized in a multi-level
1146/// hierarchy.
1147///
1148/// For both PostgreSQL and MySQL sources, these levels of reference are
1149/// intrinsic to the items which we're referencing. If there are other naming
1150/// schemas for other types of sources we discover, we might need to revisit
1151/// this.
1152pub trait ExternalCatalogReference {
1153    /// The "second" level of namespacing for the reference.
1154    fn schema_name(&self) -> &str;
1155    /// The lowest level of namespacing for the reference.
1156    fn item_name(&self) -> &str;
1157}
1158
1159impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1160    fn schema_name(&self) -> &str {
1161        &self.schema_name
1162    }
1163
1164    fn item_name(&self) -> &str {
1165        &self.name
1166    }
1167}
1168
1169impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1170    fn schema_name(&self) -> &str {
1171        &self.namespace
1172    }
1173
1174    fn item_name(&self) -> &str {
1175        &self.name
1176    }
1177}
1178
1179impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1180    fn schema_name(&self) -> &str {
1181        &*self.schema_name
1182    }
1183
1184    fn item_name(&self) -> &str {
1185        &*self.name
1186    }
1187}
1188
1189// This implementation provides a means of converting arbitrary objects into a
1190// `SubsourceCatalogReference`, e.g. load generator view names.
1191impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1192    fn schema_name(&self) -> &str {
1193        self.0
1194    }
1195
1196    fn item_name(&self) -> &str {
1197        self.1
1198    }
1199}
1200
1201/// Stores and resolves references to a `&[T: ExternalCatalogReference]`.
1202///
1203/// This is meant to provide an API to quickly look up a source's subsources.
1204///
1205/// For sources that do not provide any subsources, use the `Default`
1206/// implementation, which is empty and will not be able to resolve any
1207/// references.
1208#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1209pub struct SourceReferenceResolver {
1210    inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1211}
1212
1213#[derive(Debug, Clone, thiserror::Error)]
1214pub enum ExternalReferenceResolutionError {
1215    #[error("reference to {name} not found in source")]
1216    DoesNotExist { name: String },
1217    #[error(
1218        "reference {name} is ambiguous, consider specifying an additional \
1219    layer of qualification"
1220    )]
1221    Ambiguous { name: String },
1222    #[error("invalid identifier: {0}")]
1223    Ident(#[from] IdentError),
1224}
1225
1226impl<'a> SourceReferenceResolver {
1227    /// Constructs a new `SourceReferenceResolver` from a slice of `T:
1228    /// SubsourceCatalogReference`.
1229    ///
1230    /// # Errors
1231    /// - If any `&str` provided cannot be taken to an [`Ident`].
1232    pub fn new<T: ExternalCatalogReference>(
1233        database: &str,
1234        referenceable_items: &'a [T],
1235    ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1236        // An index from table name -> schema name -> database name -> index in
1237        // `referenceable_items`.
1238        let mut inner = BTreeMap::new();
1239
1240        let database = Ident::new(database)?;
1241
1242        for (reference_idx, item) in referenceable_items.iter().enumerate() {
1243            let item_name = Ident::new(item.item_name())?;
1244            let schema_name = Ident::new(item.schema_name())?;
1245
1246            inner
1247                .entry(item_name)
1248                .or_insert_with(BTreeMap::new)
1249                .entry(schema_name)
1250                .or_insert_with(BTreeMap::new)
1251                .entry(database.clone())
1252                .or_insert(reference_idx);
1253        }
1254
1255        Ok(SourceReferenceResolver { inner })
1256    }
1257
1258    /// Returns the canonical reference and index from which it originated in
1259    /// the `referenceable_items` provided to [`Self::new`].
1260    ///
1261    /// # Args
1262    /// - `name` is `&[Ident]` to let users provide the inner element of
1263    ///   [`UnresolvedItemName`].
1264    /// - `canonicalize_to_width` limits the number of elements in the returned
1265    ///   [`UnresolvedItemName`];this is useful if the source type requires
1266    ///   contriving database and schema names that a subsource should not
1267    ///   persist as its reference.
1268    ///
1269    /// # Errors
1270    /// - If `name` does not resolve to an item in `self.inner`.
1271    ///
1272    /// # Panics
1273    /// - If `canonicalize_to_width`` is not in `1..=3`.
1274    pub fn resolve(
1275        &self,
1276        name: &[Ident],
1277        canonicalize_to_width: usize,
1278    ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1279        let (db, schema, idx) = self.resolve_inner(name)?;
1280
1281        let item = name.last().expect("must have provided at least 1 element");
1282
1283        let canonical_name = match canonicalize_to_width {
1284            1 => vec![item.clone()],
1285            2 => vec![schema.clone(), item.clone()],
1286            3 => vec![db.clone(), schema.clone(), item.clone()],
1287            o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1288        };
1289
1290        Ok((UnresolvedItemName(canonical_name), idx))
1291    }
1292
1293    /// Returns the index from which it originated in the `referenceable_items`
1294    /// provided to [`Self::new`].
1295    ///
1296    /// # Args
1297    /// `name` is `&[Ident]` to let users provide the inner element of
1298    /// [`UnresolvedItemName`].
1299    ///
1300    /// # Errors
1301    /// - If `name` does not resolve to an item in `self.inner`.
1302    pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1303        let (_db, _schema, idx) = self.resolve_inner(name)?;
1304        Ok(idx)
1305    }
1306
1307    /// Returns the index from which it originated in the `referenceable_items`
1308    /// provided to [`Self::new`].
1309    ///
1310    /// # Args
1311    /// `name` is `&[Ident]` to let users provide the inner element of
1312    /// [`UnresolvedItemName`].
1313    ///
1314    /// # Return
1315    /// Returns a tuple whose elements are:
1316    /// 1. The "database"- or top-level namespace of the reference.
1317    /// 2. The "schema"- or second-level namespace of the reference.
1318    /// 3. The index to find the item in `referenceable_items` argument provided
1319    ///    to `SourceReferenceResolver::new`.
1320    ///
1321    /// # Errors
1322    /// - If `name` does not resolve to an item in `self.inner`.
1323    fn resolve_inner<'name: 'a>(
1324        &'a self,
1325        name: &'name [Ident],
1326    ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1327        let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1328
1329        // Names must be composed of 1..=3 elements.
1330        if !(1..=3).contains(&name.len()) {
1331            Err(ExternalReferenceResolutionError::DoesNotExist {
1332                name: get_provided_name(),
1333            })?;
1334        }
1335
1336        // Fill on the leading elements with `None` if they aren't present.
1337        let mut names = std::iter::repeat(None)
1338            .take(3 - name.len())
1339            .chain(name.iter().map(Some));
1340
1341        let database = names.next().flatten();
1342        let schema = names.next().flatten();
1343        let item = names
1344            .next()
1345            .flatten()
1346            .expect("must have provided the item name");
1347
1348        assert_none!(names.next(), "expected a 3-element iterator");
1349
1350        let schemas =
1351            self.inner
1352                .get(item)
1353                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1354                    name: get_provided_name(),
1355                })?;
1356
1357        let schema = match schema {
1358            Some(schema) => schema,
1359            None => schemas.keys().exactly_one().map_err(|_e| {
1360                ExternalReferenceResolutionError::Ambiguous {
1361                    name: get_provided_name(),
1362                }
1363            })?,
1364        };
1365
1366        let databases =
1367            schemas
1368                .get(schema)
1369                .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1370                    name: get_provided_name(),
1371                })?;
1372
1373        let database = match database {
1374            Some(database) => database,
1375            None => databases.keys().exactly_one().map_err(|_e| {
1376                ExternalReferenceResolutionError::Ambiguous {
1377                    name: get_provided_name(),
1378                }
1379            })?,
1380        };
1381
1382        let reference_idx = databases.get(database).ok_or_else(|| {
1383            ExternalReferenceResolutionError::DoesNotExist {
1384                name: get_provided_name(),
1385            }
1386        })?;
1387
1388        Ok((database, schema, *reference_idx))
1389    }
1390}
1391
1392/// A decoder for [`Row`]s within [`SourceData`].
1393///
1394/// This type exists as a wrapper around [`RowColumnarDecoder`] to handle the
1395/// case where the [`RelationDesc`] we're encoding with has no columns. See
1396/// [`SourceDataRowColumnarEncoder`] for more details.
1397#[derive(Debug)]
1398pub enum SourceDataRowColumnarDecoder {
1399    Row(RowColumnarDecoder),
1400    EmptyRow,
1401}
1402
1403impl SourceDataRowColumnarDecoder {
1404    pub fn decode(&self, idx: usize, row: &mut Row) {
1405        match self {
1406            SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1407            SourceDataRowColumnarDecoder::EmptyRow => {
1408                // Create a packer just to clear the Row.
1409                row.packer();
1410            }
1411        }
1412    }
1413
1414    pub fn goodbytes(&self) -> usize {
1415        match self {
1416            SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1417            SourceDataRowColumnarDecoder::EmptyRow => 0,
1418        }
1419    }
1420}
1421
1422#[derive(Debug)]
1423pub struct SourceDataColumnarDecoder {
1424    row_decoder: SourceDataRowColumnarDecoder,
1425    err_decoder: BinaryArray,
1426}
1427
1428impl SourceDataColumnarDecoder {
1429    pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1430        // TODO(parkmcar): We should validate the fields here.
1431        let (_fields, arrays, nullability) = col.into_parts();
1432
1433        if nullability.is_some() {
1434            anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1435        }
1436        if arrays.len() != 2 {
1437            anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1438        }
1439
1440        let errs = arrays[1]
1441            .as_any()
1442            .downcast_ref::<BinaryArray>()
1443            .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1444
1445        let row_decoder = match arrays[0].data_type() {
1446            arrow::datatypes::DataType::Struct(_) => {
1447                let rows = arrays[0]
1448                    .as_any()
1449                    .downcast_ref::<StructArray>()
1450                    .ok_or_else(|| {
1451                        anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1452                    })?;
1453                let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1454                SourceDataRowColumnarDecoder::Row(decoder)
1455            }
1456            arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1457            other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1458        };
1459
1460        Ok(SourceDataColumnarDecoder {
1461            row_decoder,
1462            err_decoder: errs.clone(),
1463        })
1464    }
1465}
1466
1467impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1468    fn decode(&self, idx: usize, val: &mut SourceData) {
1469        let err_null = self.err_decoder.is_null(idx);
1470        let row_null = match &self.row_decoder {
1471            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1472            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1473        };
1474
1475        match (row_null, err_null) {
1476            (true, false) => {
1477                let err = self.err_decoder.value(idx);
1478                let err = ProtoDataflowError::decode(err)
1479                    .expect("proto should be valid")
1480                    .into_rust()
1481                    .expect("error should be valid");
1482                val.0 = Err(err);
1483            }
1484            (false, true) => {
1485                let row = match val.0.as_mut() {
1486                    Ok(row) => row,
1487                    Err(_) => {
1488                        val.0 = Ok(Row::default());
1489                        val.0.as_mut().unwrap()
1490                    }
1491                };
1492                self.row_decoder.decode(idx, row);
1493            }
1494            (true, true) => panic!("should have one of 'ok' or 'err'"),
1495            (false, false) => panic!("cannot have both 'ok' and 'err'"),
1496        }
1497    }
1498
1499    fn is_null(&self, idx: usize) -> bool {
1500        let err_null = self.err_decoder.is_null(idx);
1501        let row_null = match &self.row_decoder {
1502            SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1503            SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1504        };
1505        assert!(!err_null || !row_null, "SourceData should never be null!");
1506
1507        false
1508    }
1509
1510    fn goodbytes(&self) -> usize {
1511        self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1512    }
1513
1514    fn stats(&self) -> StructStats {
1515        let len = self.err_decoder.len();
1516        let err_stats = ColumnarStats {
1517            nulls: Some(ColumnNullStats {
1518                count: self.err_decoder.null_count(),
1519            }),
1520            values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1521        };
1522        // The top level struct is non-nullable and every entry is either an
1523        // `Ok(Row)` or an `Err(String)`. As a result, we can compute the number
1524        // of `Ok` entries by subtracting the number of `Err` entries from the
1525        // total count.
1526        let row_null_count = len - self.err_decoder.null_count();
1527        let row_stats = match &self.row_decoder {
1528            SourceDataRowColumnarDecoder::Row(encoder) => {
1529                // Sanity check that the number of row nulls/nones we calculated
1530                // using the error column matches what the row column thinks it
1531                // has.
1532                assert_eq!(encoder.null_count(), row_null_count);
1533                encoder.stats()
1534            }
1535            SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1536                len,
1537                cols: BTreeMap::default(),
1538            },
1539        };
1540        let row_stats = ColumnarStats {
1541            nulls: Some(ColumnNullStats {
1542                count: row_null_count,
1543            }),
1544            values: ColumnStatKinds::Struct(row_stats),
1545        };
1546
1547        let stats = [
1548            (
1549                SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1550                row_stats,
1551            ),
1552            (
1553                SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1554                err_stats,
1555            ),
1556        ];
1557        StructStats {
1558            len,
1559            cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1560        }
1561    }
1562}
1563
1564/// An encoder for [`Row`]s within [`SourceData`].
1565///
1566/// This type exists as a wrapper around [`RowColumnarEncoder`] to support
1567/// encoding empty [`Row`]s. A [`RowColumnarEncoder`] finishes as a
1568/// [`StructArray`] which is required to have at least one column, and thus
1569/// cannot support empty [`Row`]s.
1570#[derive(Debug)]
1571pub enum SourceDataRowColumnarEncoder {
1572    Row(RowColumnarEncoder),
1573    EmptyRow,
1574}
1575
1576impl SourceDataRowColumnarEncoder {
1577    pub(crate) fn goodbytes(&self) -> usize {
1578        match self {
1579            SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1580            SourceDataRowColumnarEncoder::EmptyRow => 0,
1581        }
1582    }
1583
1584    pub fn append(&mut self, row: &Row) {
1585        match self {
1586            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1587            SourceDataRowColumnarEncoder::EmptyRow => {
1588                assert_eq!(row.iter().count(), 0)
1589            }
1590        }
1591    }
1592
1593    pub fn append_null(&mut self) {
1594        match self {
1595            SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1596            SourceDataRowColumnarEncoder::EmptyRow => (),
1597        }
1598    }
1599}
1600
1601#[derive(Debug)]
1602pub struct SourceDataColumnarEncoder {
1603    row_encoder: SourceDataRowColumnarEncoder,
1604    err_encoder: BinaryBuilder,
1605}
1606
1607impl SourceDataColumnarEncoder {
1608    const OK_COLUMN_NAME: &'static str = "ok";
1609    const ERR_COLUMN_NAME: &'static str = "err";
1610
1611    pub fn new(desc: &RelationDesc) -> Self {
1612        let row_encoder = match RowColumnarEncoder::new(desc) {
1613            Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1614            None => {
1615                assert!(desc.typ().columns().is_empty());
1616                SourceDataRowColumnarEncoder::EmptyRow
1617            }
1618        };
1619        let err_encoder = BinaryBuilder::new();
1620
1621        SourceDataColumnarEncoder {
1622            row_encoder,
1623            err_encoder,
1624        }
1625    }
1626}
1627
1628impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1629    type FinishedColumn = StructArray;
1630
1631    fn goodbytes(&self) -> usize {
1632        self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1633    }
1634
1635    #[inline]
1636    fn append(&mut self, val: &SourceData) {
1637        match val.0.as_ref() {
1638            Ok(row) => {
1639                self.row_encoder.append(row);
1640                self.err_encoder.append_null();
1641            }
1642            Err(err) => {
1643                self.row_encoder.append_null();
1644                self.err_encoder
1645                    .append_value(err.into_proto().encode_to_vec());
1646            }
1647        }
1648    }
1649
1650    #[inline]
1651    fn append_null(&mut self) {
1652        panic!("appending a null into SourceDataColumnarEncoder is not supported");
1653    }
1654
1655    fn finish(self) -> Self::FinishedColumn {
1656        let SourceDataColumnarEncoder {
1657            row_encoder,
1658            mut err_encoder,
1659        } = self;
1660
1661        let err_column = BinaryBuilder::finish(&mut err_encoder);
1662        let row_column: ArrayRef = match row_encoder {
1663            SourceDataRowColumnarEncoder::Row(encoder) => {
1664                let column = encoder.finish();
1665                Arc::new(column)
1666            }
1667            SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1668        };
1669
1670        assert_eq!(row_column.len(), err_column.len());
1671
1672        let fields = vec![
1673            Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1674            Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1675        ];
1676        let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1677        StructArray::new(Fields::from(fields), arrays, None)
1678    }
1679}
1680
1681impl Schema<SourceData> for RelationDesc {
1682    type ArrowColumn = StructArray;
1683    type Statistics = StructStats;
1684
1685    type Decoder = SourceDataColumnarDecoder;
1686    type Encoder = SourceDataColumnarEncoder;
1687
1688    fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1689        SourceDataColumnarDecoder::new(col, self)
1690    }
1691
1692    fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1693        Ok(SourceDataColumnarEncoder::new(self))
1694    }
1695}
1696
1697#[cfg(test)]
1698mod tests {
1699    use arrow::array::{ArrayData, make_comparator};
1700    use base64::Engine;
1701    use bytes::Bytes;
1702    use mz_expr::EvalError;
1703    use mz_ore::assert_err;
1704    use mz_ore::metrics::MetricsRegistry;
1705    use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
1706    use mz_persist::metrics::ColumnarMetrics;
1707    use mz_persist_types::parquet::EncodingConfig;
1708    use mz_persist_types::schema::{Migration, backward_compatible};
1709    use mz_persist_types::stats::{PartStats, PartStatsMetrics};
1710    use mz_repr::{
1711        ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
1712        RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
1713    };
1714    use proptest::prelude::*;
1715    use proptest::strategy::{Union, ValueTree};
1716
1717    use crate::stats::RelationPartStats;
1718
1719    use super::*;
1720
1721    #[mz_ore::test]
1722    fn test_timeline_parsing() {
1723        assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
1724        assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
1725        assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
1726
1727        assert_err!("Materialize".parse::<Timeline>());
1728        assert_err!("Ejoe".parse::<Timeline>());
1729        assert_err!("Umike".parse::<Timeline>());
1730        assert_err!("Dance".parse::<Timeline>());
1731        assert_err!("".parse::<Timeline>());
1732    }
1733
1734    #[track_caller]
1735    fn roundtrip_source_data(
1736        desc: &RelationDesc,
1737        datas: Vec<SourceData>,
1738        read_desc: &RelationDesc,
1739        config: &EncodingConfig,
1740    ) {
1741        let metrics = ColumnarMetrics::disconnected();
1742        let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
1743        for data in &datas {
1744            encoder.append(data);
1745        }
1746        let col = encoder.finish();
1747
1748        // The top-level StructArray for SourceData should always be non-nullable.
1749        assert!(!col.is_nullable());
1750
1751        // Reallocate our arrays with lgalloc.
1752        let col = realloc_array(&col, &metrics);
1753
1754        // Roundtrip through ProtoArray format.
1755        {
1756            let proto = col.to_data().into_proto();
1757            let bytes = proto.encode_to_vec();
1758            let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
1759            let array_data: ArrayData = proto.into_rust().unwrap();
1760
1761            let col_rnd = StructArray::from(array_data.clone());
1762            assert_eq!(col, col_rnd);
1763
1764            let col_dyn = arrow::array::make_array(array_data);
1765            let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
1766            assert_eq!(&col, col_dyn);
1767        }
1768
1769        // Encode to Parquet.
1770        let mut buf = Vec::new();
1771        let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
1772        let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
1773        mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
1774
1775        // Decode from Parquet.
1776        let buf = Bytes::from(buf);
1777        let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
1778        let maybe_batch = reader.next();
1779
1780        // If we didn't encode any data then our record_batch will be empty.
1781        let Some(record_batch) = maybe_batch else {
1782            assert!(datas.is_empty());
1783            return;
1784        };
1785        let record_batch = record_batch.unwrap();
1786
1787        assert_eq!(record_batch.columns().len(), 1);
1788        let rnd_col = &record_batch.columns()[0];
1789        let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
1790        let rnd_col = rnd_col
1791            .as_any()
1792            .downcast_ref::<StructArray>()
1793            .unwrap()
1794            .clone();
1795
1796        // Try generating stats for the data, just to make sure we don't panic.
1797        let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
1798            .expect("valid decoder")
1799            .stats();
1800
1801        // Read back all of our data and assert it roundtrips.
1802        let mut rnd_data = SourceData(Ok(Row::default()));
1803        let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
1804        for (idx, og_data) in datas.iter().enumerate() {
1805            decoder.decode(idx, &mut rnd_data);
1806            assert_eq!(og_data, &rnd_data);
1807        }
1808
1809        // Read back all of our data a second time with a projection applied, and make sure the
1810        // stats are valid.
1811        let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
1812        let stats = RelationPartStats {
1813            name: "test",
1814            metrics: &stats_metrics,
1815            stats: &PartStats { key: stats },
1816            desc: read_desc,
1817        };
1818        let mut datum_vec = DatumVec::new();
1819        let arena = RowArena::default();
1820        let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
1821
1822        for (idx, og_data) in datas.iter().enumerate() {
1823            decoder.decode(idx, &mut rnd_data);
1824            match (&og_data.0, &rnd_data.0) {
1825                (Ok(og_row), Ok(rnd_row)) => {
1826                    // Filter down to just the Datums in the projection schema.
1827                    {
1828                        let datums = datum_vec.borrow_with(og_row);
1829                        let projected_datums =
1830                            datums.iter().enumerate().filter_map(|(idx, datum)| {
1831                                read_desc
1832                                    .contains_index(&ColumnIndex::from_raw(idx))
1833                                    .then_some(datum)
1834                            });
1835                        let og_projected_row = Row::pack(projected_datums);
1836                        assert_eq!(&og_projected_row, rnd_row);
1837                    }
1838
1839                    // Validate the stats for all of our projected columns.
1840                    {
1841                        let proj_datums = datum_vec.borrow_with(rnd_row);
1842                        for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
1843                            let spec = stats.col_stats(idx, &arena);
1844                            assert!(spec.may_contain(proj_datums[pos]));
1845                        }
1846                    }
1847                }
1848                (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
1849                (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
1850            }
1851        }
1852
1853        // Verify that the RelationDesc itself roundtrips through
1854        // {encode,decode}_schema.
1855        let encoded_schema = SourceData::encode_schema(desc);
1856        let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
1857        assert_eq!(desc, &roundtrip_desc);
1858
1859        // Verify that the RelationDesc is backward compatible with itself (this
1860        // mostly checks for `unimplemented!` type panics).
1861        let migration =
1862            mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
1863        let migration = migration.expect("should be backward compatible with self");
1864        // Also verify that the Fn doesn't do anything wonky.
1865        let migrated = migration.migrate(Arc::new(col.clone()));
1866        assert_eq!(col.data_type(), migrated.data_type());
1867    }
1868
1869    #[mz_ore::test]
1870    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
1871    fn all_source_data_roundtrips() {
1872        let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
1873        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1874            weights.extend([
1875                (10, Just(32..128)),
1876                (5, Just(128..512)),
1877                (3, Just(512..2048)),
1878                (1, Just(2048..8192)),
1879            ]);
1880        }
1881        let num_rows = Union::new_weighted(weights);
1882
1883        // TODO(parkmycar): There are so many clones going on here, and maybe we can avoid them?
1884        let strat = (any::<RelationDesc>(), num_rows)
1885            .prop_flat_map(|(desc, num_rows)| {
1886                arb_relation_desc_projection(desc.clone())
1887                    .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
1888            })
1889            .prop_flat_map(|(desc, read_desc, num_rows)| {
1890                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
1891                    .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
1892            });
1893
1894        let combined_strat = (any::<EncodingConfig>(), strat);
1895        proptest!(|((config, (desc, source_datas, read_desc)) in combined_strat)| {
1896            roundtrip_source_data(&desc, source_datas, &read_desc, &config);
1897        });
1898    }
1899
1900    #[mz_ore::test]
1901    fn roundtrip_error_nulls() {
1902        let desc = RelationDescBuilder::default()
1903            .with_column(
1904                "ts",
1905                SqlScalarType::TimestampTz { precision: None }.nullable(false),
1906            )
1907            .finish();
1908        let source_datas = vec![SourceData(Err(DataflowError::EvalError(
1909            EvalError::DateOutOfRange.into(),
1910        )))];
1911        let config = EncodingConfig::default();
1912        roundtrip_source_data(&desc, source_datas, &desc, &config);
1913    }
1914
1915    fn is_sorted(array: &dyn Array) -> bool {
1916        let sort_options = arrow::compute::SortOptions::default();
1917        let Ok(cmp) = make_comparator(array, array, sort_options) else {
1918            // TODO: arrow v51.0.0 doesn't support comparing structs. When
1919            // we migrate to v52+, the `build_compare` function is
1920            // deprecated and replaced by `make_comparator`, which does
1921            // support structs. At which point, this will work (and we
1922            // should switch this early return to an expect, if possible).
1923            return false;
1924        };
1925        (0..array.len())
1926            .tuple_windows()
1927            .all(|(i, j)| cmp(i, j).is_le())
1928    }
1929
1930    fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
1931        use mz_persist_types::columnar::ColumnEncoder;
1932        let array = Schema::encoder(schema).expect("valid schema").finish();
1933        Array::data_type(&array).clone()
1934    }
1935
1936    #[track_caller]
1937    fn backward_compatible_testcase(
1938        old: &RelationDesc,
1939        new: &RelationDesc,
1940        migration: Migration,
1941        datas: &[SourceData],
1942    ) {
1943        let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
1944        for data in datas {
1945            encoder.append(data);
1946        }
1947        let old = encoder.finish();
1948        let new = Schema::<SourceData>::encoder(new)
1949            .expect("valid schema")
1950            .finish();
1951        let old: Arc<dyn Array> = Arc::new(old);
1952        let new: Arc<dyn Array> = Arc::new(new);
1953        let migrated = migration.migrate(Arc::clone(&old));
1954        assert_eq!(migrated.data_type(), new.data_type());
1955
1956        // Check the sortedness preservation, if we can.
1957        if migration.preserves_order() && is_sorted(&old) {
1958            assert!(is_sorted(&new))
1959        }
1960    }
1961
1962    #[mz_ore::test]
1963    fn backward_compatible_empty_add_column() {
1964        let old = RelationDesc::empty();
1965        let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1966
1967        let old_data_type = get_data_type(&old);
1968        let new_data_type = get_data_type(&new);
1969
1970        let migration = backward_compatible(&old_data_type, &new_data_type);
1971        assert!(migration.is_some());
1972    }
1973
1974    #[mz_ore::test]
1975    fn backward_compatible_project_away_all() {
1976        let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1977        let new = RelationDesc::empty();
1978
1979        let old_data_type = get_data_type(&old);
1980        let new_data_type = get_data_type(&new);
1981
1982        let migration = backward_compatible(&old_data_type, &new_data_type);
1983        assert!(migration.is_some());
1984    }
1985
1986    #[mz_ore::test]
1987    #[cfg_attr(miri, ignore)]
1988    fn backward_compatible_migrate() {
1989        let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
1990            proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
1991                .prop_map(move |datas| (old.clone(), new.clone(), datas))
1992        });
1993
1994        proptest!(|((old, new, datas) in strat)| {
1995            let old_data_type = get_data_type(&old);
1996            let new_data_type = get_data_type(&new);
1997
1998            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
1999                backward_compatible_testcase(&old, &new, migration, &datas);
2000            };
2001        });
2002    }
2003
2004    #[mz_ore::test]
2005    #[cfg_attr(miri, ignore)]
2006    fn backward_compatible_migrate_from_common() {
2007        use mz_repr::SqlColumnType;
2008        fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2009            // TODO(parkmycar): As we iterate on schema migrations more things should become compatible.
2010            let should_be_compatible = diffs.iter().all(|diff| match diff {
2011                // We only support adding nullable columns.
2012                PropRelationDescDiff::AddColumn {
2013                    typ: SqlColumnType { nullable, .. },
2014                    ..
2015                } => *nullable,
2016                PropRelationDescDiff::DropColumn { .. } => true,
2017                _ => false,
2018            });
2019
2020            let mut new = old.clone();
2021            for diff in diffs.into_iter() {
2022                diff.apply(&mut new)
2023            }
2024
2025            let old_data_type = get_data_type(&old);
2026            let new_data_type = get_data_type(&new);
2027
2028            if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2029                backward_compatible_testcase(&old, &new, migration, &datas);
2030            } else if should_be_compatible {
2031                panic!("new DataType was not compatible when it should have been!");
2032            }
2033        }
2034
2035        let strat = any::<RelationDesc>()
2036            .prop_flat_map(|desc| {
2037                proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2038                    .no_shrink()
2039                    .prop_map(move |datas| (desc.clone(), datas))
2040            })
2041            .prop_flat_map(|(desc, datas)| {
2042                arb_relation_desc_diff(&desc)
2043                    .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2044            });
2045
2046        proptest!(|((old, diffs, datas) in strat)| {
2047            test_case(old, diffs, datas);
2048        });
2049    }
2050
2051    #[mz_ore::test]
2052    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2053    fn empty_relation_desc_roundtrips() {
2054        let empty = RelationDesc::empty();
2055        let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2056            .prop_map(move |datas| (empty.clone(), datas));
2057
2058        // Note: This case should be covered by the `all_source_data_roundtrips` test above, but
2059        // it's a special case that we explicitly want to exercise.
2060        proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2061            roundtrip_source_data(&desc, source_datas, &desc, &config);
2062        });
2063    }
2064
2065    #[mz_ore::test]
2066    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
2067    fn arrow_datatype_consistent() {
2068        fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2069            let half = datas.len() / 2;
2070
2071            let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2072            for data in &datas[..half] {
2073                encoder_a.append(data);
2074            }
2075            let col_a = encoder_a.finish();
2076
2077            let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2078            for data in &datas[half..] {
2079                encoder_b.append(data);
2080            }
2081            let col_b = encoder_b.finish();
2082
2083            // The DataType of the resulting column should not change based on what data was
2084            // encoded.
2085            assert_eq!(col_a.data_type(), col_b.data_type());
2086        }
2087
2088        let num_rows = 12;
2089        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2090            proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2091                .prop_map(move |datas| (desc.clone(), datas))
2092        });
2093
2094        proptest!(|((desc, data) in strat)| {
2095            test_case(desc, data);
2096        });
2097    }
2098
2099    #[mz_ore::test]
2100    #[cfg_attr(miri, ignore)] // too slow
2101    fn source_proto_serialization_stability() {
2102        let min_protos = 10;
2103        let encoded = include_str!("snapshots/source-datas.txt");
2104
2105        // Decode the pre-generated source datas
2106        let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2107            .lines()
2108            .map(|s| {
2109                let (desc, data) = s.split_once(',').expect("comma separated data");
2110                let desc = base64::engine::general_purpose::STANDARD
2111                    .decode(desc)
2112                    .expect("valid base64");
2113                let data = base64::engine::general_purpose::STANDARD
2114                    .decode(data)
2115                    .expect("valid base64");
2116                (desc, data)
2117            })
2118            .map(|(desc, data)| {
2119                let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2120                let desc = desc.into_rust().expect("valid proto");
2121                let data = SourceData::decode(&data, &desc).expect("valid proto");
2122                (desc, data)
2123            })
2124            .collect();
2125
2126        // If there are fewer than the minimum examples, generate some new ones arbitrarily
2127        let mut runner = proptest::test_runner::TestRunner::deterministic();
2128        let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2129            arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2130        });
2131        while decoded.len() < min_protos {
2132            let arbitrary_data = strategy
2133                .new_tree(&mut runner)
2134                .expect("source data")
2135                .current();
2136            decoded.push(arbitrary_data);
2137        }
2138
2139        // Reencode and compare the strings
2140        let mut reencoded = String::new();
2141        let mut buf = vec![];
2142        for (desc, data) in decoded {
2143            buf.clear();
2144            desc.into_proto().encode(&mut buf).expect("success");
2145            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2146            reencoded.push(',');
2147
2148            buf.clear();
2149            data.encode(&mut buf);
2150            base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2151            reencoded.push('\n');
2152        }
2153
2154        // Optimizations in Persist, particularly consolidation on read,
2155        // depend on a stable serialization for the serialized data.
2156        // For example, reordering proto fields could cause us
2157        // to generate a different (equivalent) serialization for a record,
2158        // and the two versions would not consolidate out.
2159        // This can impact correctness!
2160        //
2161        // If you need to change how SourceDatas are encoded, that's still fine...
2162        // but we'll also need to increase
2163        // the MINIMUM_CONSOLIDATED_VERSION as part of the same release.
2164        assert_eq!(
2165            encoded,
2166            reencoded.as_str(),
2167            "SourceData serde should be stable"
2168        )
2169    }
2170}