Skip to main content

mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::soft_panic_or_log;
19use mz_ore::str::StrExt;
20use mz_ore::{assert_none, assert_ok};
21use mz_persist_types::schema::SchemaId;
22use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
23use proptest::prelude::*;
24use proptest::strategy::{Strategy, Union};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31    ProtoRelationVersion,
32};
33use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
34
35/// The type of a [`Datum`].
36///
37/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
38/// Int32 or String) with its nullability.
39///
40/// To construct a column type, either initialize the struct directly, or
41/// use the [`SqlScalarType::nullable`] method.
42#[derive(
43    Arbitrary,
44    Clone,
45    Debug,
46    Eq,
47    PartialEq,
48    Ord,
49    PartialOrd,
50    Serialize,
51    Deserialize,
52    Hash,
53    MzReflect
54)]
55pub struct SqlColumnType {
56    /// The underlying scalar type (e.g., Int32 or String) of this column.
57    pub scalar_type: SqlScalarType,
58    /// Whether this datum can be null.
59    #[serde(default = "return_true")]
60    pub nullable: bool,
61}
62
63/// This method exists solely for the purpose of making SqlColumnType nullable by
64/// default in unit tests. The default value of a bool is false, and the only
65/// way to make an object take on any other value by default is to pass it a
66/// function that returns the desired default value. See
67/// <https://github.com/serde-rs/serde/issues/1030>
68#[inline(always)]
69fn return_true() -> bool {
70    true
71}
72
73impl SqlColumnType {
74    /// Compute the least upper bound of many column types, returning an error on
75    /// incompatible types or an empty iterator.
76    /// See [`SqlColumnType::try_union`] for details.
77    pub fn try_union_many<'a>(
78        typs: impl IntoIterator<Item = &'a Self>,
79    ) -> Result<Self, anyhow::Error> {
80        let mut iter = typs.into_iter();
81        let Some(typ) = iter.next() else {
82            bail!("Cannot union empty iterator");
83        };
84        iter.try_fold(typ.clone(), |a, b| a.try_union(b))
85    }
86
87    /// Compute the least upper bound of many column types.
88    /// See [`SqlColumnType::try_union`] for details.
89    ///
90    /// Panics on incompatible types or an empty iterator.
91    pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
92        Self::try_union_many(typs).expect("Cannot union empty iterator")
93    }
94
95    /// Backports nullability information from `backport_typ` into `self`,
96    /// affecting the outer `.nullable` field but also record fields deeper
97    /// into the type.
98    pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
99        self.scalar_type
100            .backport_nullability(&backport_typ.scalar_type);
101        self.nullable = backport_typ.nullable;
102    }
103
104    /// Compute the least upper bound of two column types at the SQL level.
105    ///
106    /// Two types are compatible when they are equal, share the same base type
107    /// (differing only in modifiers), or are records with pairwise-compatible
108    /// fields.
109    /// The resulting nullability is the disjunction of the two input
110    /// nullabilities.
111    ///
112    /// Returns an error for incompatible types, e.g. `Text` and `Int32`, or
113    /// `Text` and `VarChar` (different base types at the SQL level).
114    /// See [`SqlColumnType::try_union`] for a fallback that handles the latter
115    /// case via repr-level union.
116    pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
117        match (&self.scalar_type, &other.scalar_type) {
118            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
119                Ok(SqlColumnType {
120                    scalar_type: scalar_type.clone(),
121                    nullable: self.nullable || other.nullable,
122                })
123            }
124            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
125                Ok(SqlColumnType {
126                    scalar_type: scalar_type.without_modifiers(),
127                    nullable: self.nullable || other.nullable,
128                })
129            }
130            (
131                SqlScalarType::Record { fields, custom_id },
132                SqlScalarType::Record {
133                    fields: other_fields,
134                    custom_id: other_custom_id,
135                },
136            ) => {
137                if custom_id != other_custom_id {
138                    bail!(
139                        "Can't union types: {:?} and {:?}",
140                        self.scalar_type,
141                        other.scalar_type
142                    );
143                };
144
145                if fields.len() != other_fields.len() {
146                    bail!(
147                        "Can't union types: {:?} and {:?}",
148                        self.scalar_type,
149                        other.scalar_type
150                    );
151                }
152                let mut union_fields = Vec::with_capacity(fields.len());
153                for ((name, typ), (other_name, other_typ)) in
154                    fields.iter().zip_eq(other_fields.iter())
155                {
156                    if name != other_name {
157                        bail!(
158                            "Can't union types: {:?} and {:?}",
159                            self.scalar_type,
160                            other.scalar_type
161                        );
162                    } else {
163                        let union_column_type = typ.sql_union(other_typ)?;
164                        union_fields.push((name.clone(), union_column_type));
165                    };
166                }
167
168                Ok(SqlColumnType {
169                    scalar_type: SqlScalarType::Record {
170                        fields: union_fields.into(),
171                        custom_id: *custom_id,
172                    },
173                    nullable: self.nullable || other.nullable,
174                })
175            }
176            _ => bail!(
177                "Can't union types: {:?} and {:?}",
178                self.scalar_type,
179                other.scalar_type
180            ),
181        }
182    }
183
184    /// Compute the least upper bound of two column types.
185    ///
186    /// Attempts [`SqlColumnType::sql_union`] first, which preserves SQL-level type
187    /// information (e.g. modifiers). Falls back to a repr-level union via
188    /// [`ReprColumnType::union`] when the SQL types are incompatible but the
189    /// underlying repr types are compatible.
190    ///
191    /// The resulting nullability is the disjunction of the two input
192    /// nullabilities.
193    pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
194        self.sql_union(other).or_else(|e| {
195            let repr_self = ReprColumnType::from(self);
196            let repr_other = ReprColumnType::from(other);
197            match repr_self.union(&repr_other) {
198                Ok(typ) => {
199                    // sql_union failed but repr union succeeded — this indicates
200                    // a repr-type canonicalization gap that we want CI visibility for.
201                    soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
202                    Ok(SqlColumnType::from_repr(&typ))
203                }
204                Err(_) => {
205                    // Both sql_union and repr union failed — genuine type mismatch,
206                    // not a canonicalization issue. Just propagate the original error.
207                    Err(e)
208                }
209            }
210        })
211    }
212
213    /// Compute the least upper bound of two column types.
214    /// See [`SqlColumnType::try_union`] for details.
215    ///
216    /// Panics on incompatible types.
217    pub fn union(&self, other: &Self) -> Self {
218        self.try_union(other).unwrap_or_else(|e| {
219            panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
220        })
221    }
222
223    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
224    /// nullability set to the specified boolean.
225    pub fn nullable(mut self, nullable: bool) -> Self {
226        self.nullable = nullable;
227        self
228    }
229}
230
231impl RustType<ProtoColumnType> for SqlColumnType {
232    fn into_proto(&self) -> ProtoColumnType {
233        ProtoColumnType {
234            nullable: self.nullable,
235            scalar_type: Some(self.scalar_type.into_proto()),
236        }
237    }
238
239    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
240        Ok(SqlColumnType {
241            nullable: proto.nullable,
242            scalar_type: proto
243                .scalar_type
244                .into_rust_if_some("ProtoColumnType::scalar_type")?,
245        })
246    }
247}
248
249impl fmt::Display for SqlColumnType {
250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251        let nullable = if self.nullable { "Null" } else { "NotNull" };
252        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
253    }
254}
255
256/// The type of a relation.
257#[derive(
258    Arbitrary,
259    Clone,
260    Debug,
261    Eq,
262    PartialEq,
263    Ord,
264    PartialOrd,
265    Serialize,
266    Deserialize,
267    Hash,
268    MzReflect
269)]
270pub struct SqlRelationType {
271    /// The type for each column, in order.
272    pub column_types: Vec<SqlColumnType>,
273    /// Sets of indices that are "keys" for the collection.
274    ///
275    /// Each element in this list is a set of column indices, each with the
276    /// property that the collection contains at most one record with each
277    /// distinct set of values for each column. Alternately, for a specific set
278    /// of values assigned to the these columns there is at most one record.
279    ///
280    /// A collection can contain multiple sets of keys, although it is common to
281    /// have either zero or one sets of key indices.
282    #[serde(default)]
283    pub keys: Vec<Vec<usize>>,
284}
285
286impl SqlRelationType {
287    /// Constructs a `SqlRelationType` representing the relation with no columns and
288    /// no keys.
289    pub fn empty() -> Self {
290        SqlRelationType::new(vec![])
291    }
292
293    /// Constructs a new `SqlRelationType` from specified column types.
294    ///
295    /// The `SqlRelationType` will have no keys.
296    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
297        SqlRelationType {
298            column_types,
299            keys: Vec::new(),
300        }
301    }
302
303    /// Adds a new key for the relation.
304    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
305        indices.sort_unstable();
306        if !self.keys.contains(&indices) {
307            self.keys.push(indices);
308        }
309        self
310    }
311
312    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
313        for key in keys {
314            self = self.with_key(key)
315        }
316        self
317    }
318
319    /// Computes the number of columns in the relation.
320    pub fn arity(&self) -> usize {
321        self.column_types.len()
322    }
323
324    /// Gets the index of the columns used when creating a default index.
325    pub fn default_key(&self) -> Vec<usize> {
326        if let Some(key) = self.keys.first() {
327            if key.is_empty() {
328                (0..self.column_types.len()).collect()
329            } else {
330                key.clone()
331            }
332        } else {
333            (0..self.column_types.len()).collect()
334        }
335    }
336
337    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
338    pub fn columns(&self) -> &[SqlColumnType] {
339        &self.column_types
340    }
341
342    /// Adopts the nullability and keys from another `SqlRelationType`.
343    ///
344    /// Panics if the number of columns does not match.
345    pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
346        assert_eq!(
347            backport_typ.column_types.len(),
348            self.column_types.len(),
349            "HIR and MIR types should have the same number of columns"
350        );
351        for (backport_col, sql_col) in backport_typ
352            .column_types
353            .iter()
354            .zip_eq(self.column_types.iter_mut())
355        {
356            sql_col.backport_nullability(backport_col);
357        }
358
359        self.keys = backport_typ.keys.clone();
360    }
361
362    /// Constructs a `SqlRelationType` from a `ReprRelationType` by converting
363    /// each column type via [`SqlColumnType::from_repr`]. This is a lossy
364    /// inverse of `ReprRelationType::from(&SqlRelationType)`.
365    pub fn from_repr(repr: &ReprRelationType) -> Self {
366        SqlRelationType {
367            column_types: repr
368                .column_types
369                .iter()
370                .map(SqlColumnType::from_repr)
371                .collect(),
372            keys: repr.keys.clone(),
373        }
374    }
375}
376
377impl RustType<ProtoRelationType> for SqlRelationType {
378    fn into_proto(&self) -> ProtoRelationType {
379        ProtoRelationType {
380            column_types: self.column_types.into_proto(),
381            keys: self.keys.into_proto(),
382        }
383    }
384
385    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
386        Ok(SqlRelationType {
387            column_types: proto.column_types.into_rust()?,
388            keys: proto.keys.into_rust()?,
389        })
390    }
391}
392
393impl RustType<ProtoKey> for Vec<usize> {
394    fn into_proto(&self) -> ProtoKey {
395        ProtoKey {
396            keys: self.into_proto(),
397        }
398    }
399
400    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
401        proto.keys.into_rust()
402    }
403}
404
405/// The type of a relation.
406#[derive(
407    Clone,
408    Debug,
409    Eq,
410    PartialEq,
411    Ord,
412    PartialOrd,
413    Serialize,
414    Deserialize,
415    Hash,
416    MzReflect
417)]
418pub struct ReprRelationType {
419    /// The type for each column, in order.
420    pub column_types: Vec<ReprColumnType>,
421    /// Sets of indices that are "keys" for the collection.
422    ///
423    /// Each element in this list is a set of column indices, each with the
424    /// property that the collection contains at most one record with each
425    /// distinct set of values for each column. Alternately, for a specific set
426    /// of values assigned to the these columns there is at most one record.
427    ///
428    /// A collection can contain multiple sets of keys, although it is common to
429    /// have either zero or one sets of key indices.
430    #[serde(default)]
431    pub keys: Vec<Vec<usize>>,
432}
433
434impl ReprRelationType {
435    /// Constructs a `ReprRelationType` representing the relation with no columns and
436    /// no keys.
437    pub fn empty() -> Self {
438        ReprRelationType::new(vec![])
439    }
440
441    /// Constructs a new `ReprRelationType` from specified column types.
442    ///
443    /// The `ReprRelationType` will have no keys.
444    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
445        ReprRelationType {
446            column_types,
447            keys: Vec::new(),
448        }
449    }
450
451    /// Adds a new key for the relation.
452    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
453        indices.sort_unstable();
454        if !self.keys.contains(&indices) {
455            self.keys.push(indices);
456        }
457        self
458    }
459
460    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
461        for key in keys {
462            self = self.with_key(key)
463        }
464        self
465    }
466
467    /// Computes the number of columns in the relation.
468    pub fn arity(&self) -> usize {
469        self.column_types.len()
470    }
471
472    /// Gets the index of the columns used when creating a default index.
473    pub fn default_key(&self) -> Vec<usize> {
474        if let Some(key) = self.keys.first() {
475            if key.is_empty() {
476                (0..self.column_types.len()).collect()
477            } else {
478                key.clone()
479            }
480        } else {
481            (0..self.column_types.len()).collect()
482        }
483    }
484
485    /// Returns all the column types in order, for this relation.
486    pub fn columns(&self) -> &[ReprColumnType] {
487        &self.column_types
488    }
489}
490
491impl From<&SqlRelationType> for ReprRelationType {
492    fn from(sql_relation_type: &SqlRelationType) -> Self {
493        ReprRelationType {
494            column_types: sql_relation_type
495                .column_types
496                .iter()
497                .map(ReprColumnType::from)
498                .collect(),
499            keys: sql_relation_type.keys.clone(),
500        }
501    }
502}
503
504#[derive(
505    Clone,
506    Debug,
507    Eq,
508    PartialEq,
509    Ord,
510    PartialOrd,
511    Serialize,
512    Deserialize,
513    Hash,
514    MzReflect
515)]
516pub struct ReprColumnType {
517    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
518    pub scalar_type: ReprScalarType,
519    /// Whether this datum can be null.
520    #[serde(default = "return_true")]
521    pub nullable: bool,
522}
523
524impl std::fmt::Display for ReprColumnType {
525    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526        write!(f, "{}", self.scalar_type)?;
527        if self.nullable {
528            write!(f, "?")?;
529        }
530        Ok(())
531    }
532}
533
534impl ReprColumnType {
535    /// Compute the least upper bound of two column types at the repr level.
536    ///
537    /// More permissive than [`SqlColumnType::sql_union`] because it operates
538    /// on the underlying representation types, ignoring SQL-level distinctions
539    /// such as modifiers.
540    /// The resulting nullability is the disjunction of the two inputs.
541    pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
542        let scalar_type = self.scalar_type.union(&col.scalar_type)?;
543        let nullable = self.nullable || col.nullable;
544
545        Ok(ReprColumnType {
546            scalar_type,
547            nullable,
548        })
549    }
550}
551
552impl From<&SqlColumnType> for ReprColumnType {
553    fn from(sql_column_type: &SqlColumnType) -> Self {
554        let scalar_type = &sql_column_type.scalar_type;
555        let scalar_type = scalar_type.into();
556        let nullable = sql_column_type.nullable;
557
558        ReprColumnType {
559            scalar_type,
560            nullable,
561        }
562    }
563}
564
565impl SqlColumnType {
566    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
567    ///
568    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
569    pub fn from_repr(repr: &ReprColumnType) -> Self {
570        let scalar_type = &repr.scalar_type;
571        let scalar_type = SqlScalarType::from_repr(scalar_type);
572        let nullable = repr.nullable;
573
574        SqlColumnType {
575            scalar_type,
576            nullable,
577        }
578    }
579}
580
581/// The name of a column in a [`RelationDesc`].
582#[derive(
583    Clone,
584    Debug,
585    Eq,
586    PartialEq,
587    Ord,
588    PartialOrd,
589    Serialize,
590    Deserialize,
591    Hash,
592    MzReflect
593)]
594pub struct ColumnName(Box<str>);
595
596impl ColumnName {
597    /// Returns this column name as a `str`.
598    #[inline(always)]
599    pub fn as_str(&self) -> &str {
600        &*self
601    }
602
603    /// Returns this column name as a `&mut Box<str>`.
604    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
605        &mut self.0
606    }
607
608    /// Returns if this [`ColumnName`] is similar to the provided one.
609    pub fn is_similar(&self, other: &ColumnName) -> bool {
610        const SIMILARITY_THRESHOLD: f64 = 0.6;
611
612        let a_lowercase = self.to_lowercase();
613        let b_lowercase = other.to_lowercase();
614
615        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
616    }
617}
618
619impl std::ops::Deref for ColumnName {
620    type Target = str;
621
622    #[inline(always)]
623    fn deref(&self) -> &Self::Target {
624        &self.0
625    }
626}
627
628impl fmt::Display for ColumnName {
629    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
630        f.write_str(&self.0)
631    }
632}
633
634impl From<String> for ColumnName {
635    fn from(s: String) -> ColumnName {
636        ColumnName(s.into())
637    }
638}
639
640impl From<&str> for ColumnName {
641    fn from(s: &str) -> ColumnName {
642        ColumnName(s.into())
643    }
644}
645
646impl From<&ColumnName> for ColumnName {
647    fn from(n: &ColumnName) -> ColumnName {
648        n.clone()
649    }
650}
651
652impl RustType<ProtoColumnName> for ColumnName {
653    fn into_proto(&self) -> ProtoColumnName {
654        ProtoColumnName {
655            value: Some(self.0.to_string()),
656        }
657    }
658
659    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
660        Ok(ColumnName(
661            proto
662                .value
663                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
664                .into(),
665        ))
666    }
667}
668
669impl From<ColumnName> for mz_sql_parser::ast::Ident {
670    fn from(value: ColumnName) -> Self {
671        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
672        mz_sql_parser::ast::Ident::new_unchecked(value.0)
673    }
674}
675
676impl proptest::arbitrary::Arbitrary for ColumnName {
677    type Parameters = ();
678    type Strategy = BoxedStrategy<ColumnName>;
679
680    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
681        // Long column names are generally uninteresting, and can greatly
682        // increase the runtime for a test case, so bound the max length.
683        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
684        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
685            weights.extend([
686                (5, Just(16..128)),
687                (1, Just(128..1024)),
688                (1, Just(1024..4096)),
689            ]);
690        }
691        let name_length = Union::new_weighted(weights);
692
693        // Non-ASCII characters are also generally uninteresting and can make
694        // debugging harder.
695        let char_strat = Rc::new(Union::new_weighted(vec![
696            (50, proptest::char::range('A', 'z').boxed()),
697            (1, any::<char>().boxed()),
698        ]));
699
700        name_length
701            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
702            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
703            .no_shrink()
704            .boxed()
705    }
706}
707
708/// Default name of a column (when no other information is known).
709pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
710
711/// Stable index of a column in a [`RelationDesc`].
712#[derive(
713    Clone,
714    Copy,
715    Debug,
716    Eq,
717    PartialEq,
718    PartialOrd,
719    Ord,
720    Serialize,
721    Deserialize,
722    Hash,
723    MzReflect
724)]
725pub struct ColumnIndex(usize);
726
727static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
728
729impl ColumnIndex {
730    /// Returns a stable identifier for this [`ColumnIndex`].
731    pub fn to_stable_name(&self) -> String {
732        self.0.to_string()
733    }
734
735    pub fn to_raw(&self) -> usize {
736        self.0
737    }
738
739    pub fn from_raw(val: usize) -> Self {
740        ColumnIndex(val)
741    }
742}
743
744/// The version a given column was added at.
745#[derive(
746    Clone,
747    Copy,
748    Debug,
749    Eq,
750    PartialEq,
751    PartialOrd,
752    Ord,
753    Serialize,
754    Deserialize,
755    Hash,
756    MzReflect,
757    Arbitrary
758)]
759pub struct RelationVersion(u64);
760
761impl RelationVersion {
762    /// Returns the "root" or "initial" version of a [`RelationDesc`].
763    pub fn root() -> Self {
764        RelationVersion(0)
765    }
766
767    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
768    pub fn bump(&self) -> Self {
769        let next_version = self
770            .0
771            .checked_add(1)
772            .expect("added more than u64::MAX columns?");
773        RelationVersion(next_version)
774    }
775
776    /// Consume a [`RelationVersion`] returning the raw value.
777    ///
778    /// Should __only__ be used for serialization.
779    pub fn into_raw(self) -> u64 {
780        self.0
781    }
782
783    /// Create a [`RelationVersion`] from a raw value.
784    ///
785    /// Should __only__ be used for serialization.
786    pub fn from_raw(val: u64) -> RelationVersion {
787        RelationVersion(val)
788    }
789}
790
791impl From<RelationVersion> for SchemaId {
792    fn from(value: RelationVersion) -> Self {
793        SchemaId(usize::cast_from(value.0))
794    }
795}
796
797impl From<mz_sql_parser::ast::Version> for RelationVersion {
798    fn from(value: mz_sql_parser::ast::Version) -> Self {
799        RelationVersion(value.into_inner())
800    }
801}
802
803impl fmt::Display for RelationVersion {
804    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
805        write!(f, "v{}", self.0)
806    }
807}
808
809impl From<RelationVersion> for mz_sql_parser::ast::Version {
810    fn from(value: RelationVersion) -> Self {
811        mz_sql_parser::ast::Version::new(value.0)
812    }
813}
814
815impl RustType<ProtoRelationVersion> for RelationVersion {
816    fn into_proto(&self) -> ProtoRelationVersion {
817        ProtoRelationVersion { value: self.0 }
818    }
819
820    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
821        Ok(RelationVersion(proto.value))
822    }
823}
824
825/// Metadata (other than type) for a column in a [`RelationDesc`].
826#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
827struct ColumnMetadata {
828    /// Name of the column.
829    name: ColumnName,
830    /// Index into a [`SqlRelationType`] for this column.
831    typ_idx: usize,
832    /// Version this column was added at.
833    added: RelationVersion,
834    /// Version this column was dropped at.
835    dropped: Option<RelationVersion>,
836}
837
838/// A description of the shape of a relation.
839///
840/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
841/// the relation.
842///
843/// # Examples
844///
845/// A `RelationDesc`s is typically constructed via its builder API:
846///
847/// ```
848/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
849///
850/// let desc = RelationDesc::builder()
851///     .with_column("id", SqlScalarType::Int64.nullable(false))
852///     .with_column("price", SqlScalarType::Float64.nullable(true))
853///     .finish();
854/// ```
855///
856/// In more complicated cases, like when constructing a `RelationDesc` in
857/// response to user input, it may be more convenient to construct a relation
858/// type first, and imbue it with column names to form a `RelationDesc` later:
859///
860/// ```
861/// use mz_repr::RelationDesc;
862///
863/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
864/// let relation_type = plan_query("SELECT * FROM table");
865/// let names = (0..relation_type.arity()).map(|i| match i {
866///     0 => "first",
867///     1 => "second",
868///     _ => "unknown",
869/// });
870/// let desc = RelationDesc::new(relation_type, names);
871/// ```
872///
873/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
874/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
875/// column throughout the lifetime of the relation. This allows a
876/// [`RelationDesc`] to represent a projection over a version of itself.
877///
878/// ```
879/// use std::collections::BTreeSet;
880/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
881///
882/// let desc = RelationDesc::builder()
883///     .with_column("name", SqlScalarType::String.nullable(false))
884///     .with_column("email", SqlScalarType::String.nullable(false))
885///     .finish();
886///
887/// // Project away the second column.
888/// let demands = BTreeSet::from([1]);
889/// let proj = desc.apply_demand(&demands);
890///
891/// // We projected away the first column.
892/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
893/// // But retained the second.
894/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
895///
896/// // The underlying `SqlRelationType` also contains a single column.
897/// assert_eq!(proj.typ().arity(), 1);
898/// ```
899///
900/// To maintain this stable mapping and track the lifetime of a column (e.g.
901/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
902/// the index in [`SqlRelationType`] that corresponds to a given column, and the
903/// version at which this column was added or dropped.
904///
905#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
906pub struct RelationDesc {
907    typ: SqlRelationType,
908    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
909}
910
911impl RustType<ProtoRelationDesc> for RelationDesc {
912    fn into_proto(&self) -> ProtoRelationDesc {
913        let (names, metadata): (Vec<_>, Vec<_>) = self
914            .metadata
915            .values()
916            .map(|meta| {
917                let metadata = ProtoColumnMetadata {
918                    added: Some(meta.added.into_proto()),
919                    dropped: meta.dropped.map(|v| v.into_proto()),
920                };
921                (meta.name.into_proto(), metadata)
922            })
923            .unzip();
924
925        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
926        // metadata field was added. To make sure our serialization roundtrips the same as before
927        // we added the field, we omit `metadata` if all of the values are equal to the default.
928        //
929        // Note: This logic needs to exist approximately forever.
930        let is_all_default_metadata = metadata.iter().all(|meta| {
931            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
932        });
933        let metadata = if is_all_default_metadata {
934            Vec::new()
935        } else {
936            metadata
937        };
938
939        ProtoRelationDesc {
940            typ: Some(self.typ.into_proto()),
941            names,
942            metadata,
943        }
944    }
945
946    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
947        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
948        // metadata field was added. If the field doesn't exist we fill it in with default values,
949        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
950        //
951        // Note: This logic needs to exist approximately forever.
952        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
953            let val = ProtoColumnMetadata {
954                added: Some(RelationVersion::root().into_proto()),
955                dropped: None,
956            };
957            Box::new(itertools::repeat_n(val, proto.names.len()))
958        } else {
959            Box::new(proto.metadata.into_iter())
960        };
961
962        let metadata = proto
963            .names
964            .into_iter()
965            .zip_eq(proto_metadata)
966            .enumerate()
967            .map(|(idx, (name, metadata))| {
968                let meta = ColumnMetadata {
969                    name: name.into_rust()?,
970                    typ_idx: idx,
971                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
972                    dropped: metadata.dropped.into_rust()?,
973                };
974                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
975            })
976            .collect::<Result<_, _>>()?;
977
978        Ok(RelationDesc {
979            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
980            metadata,
981        })
982    }
983}
984
985impl RelationDesc {
986    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
987    pub fn builder() -> RelationDescBuilder {
988        RelationDescBuilder::default()
989    }
990
991    /// Constructs a new `RelationDesc` that represents the empty relation
992    /// with no columns and no keys.
993    pub fn empty() -> Self {
994        RelationDesc {
995            typ: SqlRelationType::empty(),
996            metadata: BTreeMap::default(),
997        }
998    }
999
1000    /// Check if the `RelationDesc` is empty.
1001    pub fn is_empty(&self) -> bool {
1002        self == &Self::empty()
1003    }
1004
1005    /// Returns the number of columns in this [`RelationDesc`].
1006    pub fn len(&self) -> usize {
1007        self.typ().column_types.len()
1008    }
1009
1010    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
1011    /// over column names.
1012    ///
1013    /// # Panics
1014    ///
1015    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
1016    /// items in `names`.
1017    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1018    where
1019        I: IntoIterator<Item = N>,
1020        N: Into<ColumnName>,
1021    {
1022        let metadata: BTreeMap<_, _> = names
1023            .into_iter()
1024            .enumerate()
1025            .map(|(idx, name)| {
1026                let col_idx = ColumnIndex(idx);
1027                let metadata = ColumnMetadata {
1028                    name: name.into(),
1029                    typ_idx: idx,
1030                    added: RelationVersion::root(),
1031                    dropped: None,
1032                };
1033                (col_idx, metadata)
1034            })
1035            .collect();
1036
1037        // TODO(parkmycar): Add better validation here.
1038        assert_eq!(typ.column_types.len(), metadata.len());
1039
1040        RelationDesc { typ, metadata }
1041    }
1042
1043    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1044    where
1045        I: IntoIterator<Item = (N, T)>,
1046        T: Into<SqlColumnType>,
1047        N: Into<ColumnName>,
1048    {
1049        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1050        let types = types.into_iter().map(Into::into).collect();
1051        let typ = SqlRelationType::new(types);
1052        Self::new(typ, names)
1053    }
1054
1055    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
1056    ///
1057    /// # Panics
1058    ///
1059    /// Panics if either `self` or `other` have columns that were added at a
1060    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1061    /// columns were dropped.
1062    ///
1063    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
1064    pub fn concat(mut self, other: Self) -> Self {
1065        let self_len = self.typ.column_types.len();
1066
1067        for (typ, (_col_idx, meta)) in other
1068            .typ
1069            .column_types
1070            .into_iter()
1071            .zip_eq(other.metadata.into_iter())
1072        {
1073            assert_eq!(meta.added, RelationVersion::root());
1074            assert_none!(meta.dropped);
1075
1076            let new_idx = self.typ.columns().len();
1077            let new_meta = ColumnMetadata {
1078                name: meta.name,
1079                typ_idx: new_idx,
1080                added: RelationVersion::root(),
1081                dropped: None,
1082            };
1083
1084            self.typ.column_types.push(typ);
1085            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1086
1087            assert_eq!(self.metadata.len(), self.typ.columns().len());
1088            assert_none!(prev);
1089        }
1090
1091        for k in other.typ.keys {
1092            let k = k.into_iter().map(|idx| idx + self_len).collect();
1093            self = self.with_key(k);
1094        }
1095        self
1096    }
1097
1098    /// Adds a new key for the relation.
1099    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1100        self.typ = self.typ.with_key(indices);
1101        self
1102    }
1103
1104    /// Drops all existing keys.
1105    pub fn without_keys(mut self) -> Self {
1106        self.typ.keys.clear();
1107        self
1108    }
1109
1110    /// Builds a new relation description with the column names replaced with
1111    /// new names.
1112    ///
1113    /// # Panics
1114    ///
1115    /// Panics if the arity of the relation type does not match the number of
1116    /// items in `names`.
1117    pub fn with_names<I, N>(self, names: I) -> Self
1118    where
1119        I: IntoIterator<Item = N>,
1120        N: Into<ColumnName>,
1121    {
1122        Self::new(self.typ, names)
1123    }
1124
1125    /// Computes the number of columns in the relation.
1126    pub fn arity(&self) -> usize {
1127        self.typ.arity()
1128    }
1129
1130    /// Returns the relation type underlying this relation description.
1131    pub fn typ(&self) -> &SqlRelationType {
1132        &self.typ
1133    }
1134
1135    /// Returns the owned relation type underlying this relation description.
1136    pub fn into_typ(self) -> SqlRelationType {
1137        self.typ
1138    }
1139
1140    /// Returns an iterator over the columns in this relation.
1141    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1142        self.metadata.values().map(|meta| {
1143            let typ = &self.typ.columns()[meta.typ_idx];
1144            (&meta.name, typ)
1145        })
1146    }
1147
1148    /// Returns an iterator over the types of the columns in this relation.
1149    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1150        self.typ.column_types.iter()
1151    }
1152
1153    /// Returns an iterator over the names of the columns in this relation.
1154    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1155        self.metadata.values().map(|meta| &meta.name)
1156    }
1157
1158    /// Returns an iterator over the columns in this relation, with all their metadata.
1159    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1160        self.metadata.iter().map(|(col_idx, metadata)| {
1161            let col_typ = &self.typ.columns()[metadata.typ_idx];
1162            (col_idx, &metadata.name, col_typ)
1163        })
1164    }
1165
1166    /// Returns an iterator over the names of the columns in this relation that are "similar" to
1167    /// the provided `name`.
1168    pub fn iter_similar_names<'a>(
1169        &'a self,
1170        name: &'a ColumnName,
1171    ) -> impl Iterator<Item = &'a ColumnName> {
1172        self.iter_names().filter(|n| n.is_similar(name))
1173    }
1174
1175    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
1176    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1177        self.metadata.contains_key(idx)
1178    }
1179
1180    /// Finds a column by name.
1181    ///
1182    /// Returns the index and type of the column named `name`. If no column with
1183    /// the specified name exists, returns `None`. If multiple columns have the
1184    /// specified name, the leftmost column is returned.
1185    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1186        self.iter_names()
1187            .position(|n| n == name)
1188            .map(|i| (i, &self.typ.column_types[i]))
1189    }
1190
1191    /// Gets the name of the `i`th column.
1192    ///
1193    /// # Panics
1194    ///
1195    /// Panics if `i` is not a valid column index.
1196    ///
1197    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
1198    pub fn get_name(&self, i: usize) -> &ColumnName {
1199        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1200        self.get_name_idx(&ColumnIndex(i))
1201    }
1202
1203    /// Gets the name of the column at `idx`.
1204    ///
1205    /// # Panics
1206    ///
1207    /// Panics if no column exists at `idx`.
1208    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1209        &self.metadata.get(idx).expect("should exist").name
1210    }
1211
1212    /// Mutably gets the name of the `i`th column.
1213    ///
1214    /// # Panics
1215    ///
1216    /// Panics if `i` is not a valid column index.
1217    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1218        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1219        &mut self
1220            .metadata
1221            .get_mut(&ColumnIndex(i))
1222            .expect("should exist")
1223            .name
1224    }
1225
1226    /// Gets the [`SqlColumnType`] of the column at `idx`.
1227    ///
1228    /// # Panics
1229    ///
1230    /// Panics if no column exists at `idx`.
1231    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1232        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1233        &self.typ.column_types[typ_idx]
1234    }
1235
1236    /// Gets the name of the `i`th column if that column name is unambiguous.
1237    ///
1238    /// If at least one other column has the same name as the `i`th column,
1239    /// returns `None`. If the `i`th column has no name, returns `None`.
1240    ///
1241    /// # Panics
1242    ///
1243    /// Panics if `i` is not a valid column index.
1244    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1245        let name = self.get_name(i);
1246        if self.iter_names().filter(|n| *n == name).count() == 1 {
1247            Some(name)
1248        } else {
1249            None
1250        }
1251    }
1252
1253    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1254    ///
1255    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1256    /// structure will be simple to extend.
1257    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1258        let name = self.get_name(i);
1259        let typ = &self.typ.column_types[i];
1260        if d == &Datum::Null && !typ.nullable {
1261            Err(NotNullViolation(name.clone()))
1262        } else {
1263            Ok(())
1264        }
1265    }
1266
1267    /// Computes the differences between two [`RelationDesc`]s.
1268    ///
1269    /// Returns a rich diff describing which columns differ, and in what way.
1270    ///
1271    /// # Panics
1272    ///
1273    /// Panics if either `self` or `other` have columns that were added at a
1274    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1275    /// columns were dropped.
1276    ///
1277    /// This simplifies things by allowing us to assume that `ColumnIndex`es are
1278    /// dense and that they match the indexes of `typ.columns()`. Without this
1279    /// we would, e.g., struggle comparing keys as those are in terms of
1280    /// `typ.columns()` indexes.
1281    pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1282        assert_eq!(self.metadata.len(), self.typ.columns().len());
1283        assert_eq!(other.metadata.len(), other.typ.columns().len());
1284        for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1285            assert_eq!(meta.typ_idx, idx.0);
1286            assert_eq!(meta.added, RelationVersion::root());
1287            assert_none!(meta.dropped);
1288        }
1289
1290        let mut column_diffs = BTreeMap::new();
1291        let mut key_diff = None;
1292
1293        let left_arity = self.arity();
1294        let right_arity = other.arity();
1295        let common_arity = std::cmp::min(left_arity, right_arity);
1296
1297        for idx in 0..common_arity {
1298            let left_name = self.get_name(idx);
1299            let right_name = other.get_name(idx);
1300            let left_type = &self.typ.column_types[idx];
1301            let right_type = &other.typ.column_types[idx];
1302
1303            if left_name != right_name {
1304                let diff = ColumnDiff::NameMismatch {
1305                    left: left_name.clone(),
1306                    right: right_name.clone(),
1307                };
1308                column_diffs.insert(idx, diff);
1309            } else if left_type.scalar_type != right_type.scalar_type {
1310                let diff = ColumnDiff::TypeMismatch {
1311                    name: left_name.clone(),
1312                    left: left_type.scalar_type.clone(),
1313                    right: right_type.scalar_type.clone(),
1314                };
1315                column_diffs.insert(idx, diff);
1316            } else if left_type.nullable != right_type.nullable {
1317                let diff = ColumnDiff::NullabilityMismatch {
1318                    name: left_name.clone(),
1319                    left: left_type.nullable,
1320                    right: right_type.nullable,
1321                };
1322                column_diffs.insert(idx, diff);
1323            }
1324        }
1325
1326        for idx in common_arity..left_arity {
1327            let diff = ColumnDiff::Missing {
1328                name: self.get_name(idx).clone(),
1329            };
1330            column_diffs.insert(idx, diff);
1331        }
1332
1333        for idx in common_arity..right_arity {
1334            let diff = ColumnDiff::Extra {
1335                name: other.get_name(idx).clone(),
1336            };
1337            column_diffs.insert(idx, diff);
1338        }
1339
1340        let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1341        let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1342        if left_keys != right_keys {
1343            let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1344                keys.iter()
1345                    .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1346                    .collect()
1347            };
1348            key_diff = Some(KeyDiff {
1349                left: column_names(self, left_keys),
1350                right: column_names(other, right_keys),
1351            });
1352        }
1353
1354        RelationDescDiff {
1355            column_diffs,
1356            key_diff,
1357        }
1358    }
1359
1360    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1361    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1362        let mut new_desc = self.clone();
1363
1364        // Update ColumnMetadata.
1365        let mut removed = 0;
1366        new_desc.metadata.retain(|idx, metadata| {
1367            let retain = demands.contains(&idx.0);
1368            if !retain {
1369                removed += 1;
1370            } else {
1371                metadata.typ_idx -= removed;
1372            }
1373            retain
1374        });
1375
1376        // Update SqlColumnType.
1377        let mut idx = 0;
1378        new_desc.typ.column_types.retain(|_| {
1379            let keep = demands.contains(&idx);
1380            idx += 1;
1381            keep
1382        });
1383
1384        new_desc
1385    }
1386}
1387
1388impl Arbitrary for RelationDesc {
1389    type Parameters = ();
1390    type Strategy = BoxedStrategy<RelationDesc>;
1391
1392    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1393        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1394        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1395            weights.extend([
1396                (12, Just(16..32)),
1397                (6, Just(32..64)),
1398                (3, Just(64..128)),
1399                (1, Just(128..256)),
1400            ]);
1401        }
1402        let num_columns = Union::new_weighted(weights);
1403
1404        num_columns.prop_flat_map(arb_relation_desc).boxed()
1405    }
1406}
1407
1408/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1409/// within the range provided.
1410pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1411    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1412        .prop_map(RelationDesc::from_names_and_types)
1413}
1414
1415/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1416pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1417    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1418    mask.prop_map(move |mask| {
1419        let demands: BTreeSet<_> = mask
1420            .into_iter()
1421            .enumerate()
1422            .filter_map(|(idx, keep)| keep.then_some(idx))
1423            .collect();
1424        desc.apply_demand(&demands)
1425    })
1426}
1427
1428impl IntoIterator for RelationDesc {
1429    type Item = (ColumnName, SqlColumnType);
1430    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1431
1432    fn into_iter(self) -> Self::IntoIter {
1433        let iter = self
1434            .metadata
1435            .into_values()
1436            .zip_eq(self.typ.column_types)
1437            .map(|(meta, typ)| (meta.name, typ));
1438        Box::new(iter)
1439    }
1440}
1441
1442/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1443pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1444    let datums: Vec<_> = desc
1445        .typ()
1446        .columns()
1447        .iter()
1448        .cloned()
1449        .map(arb_datum_for_column)
1450        .collect();
1451    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1452}
1453
1454/// Expression violated not-null constraint on named column
1455#[derive(Debug, PartialEq, Eq)]
1456pub struct NotNullViolation(pub ColumnName);
1457
1458impl fmt::Display for NotNullViolation {
1459    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1460        write!(
1461            f,
1462            "null value in column {} violates not-null constraint",
1463            self.0.quoted()
1464        )
1465    }
1466}
1467
1468/// The result of comparing two [`RelationDesc`]s.
1469#[derive(Debug, Clone, PartialEq, Eq)]
1470pub struct RelationDescDiff {
1471    /// Column differences, keyed by column index.
1472    pub column_diffs: BTreeMap<usize, ColumnDiff>,
1473    /// Key differences, if any.
1474    pub key_diff: Option<KeyDiff>,
1475}
1476
1477impl RelationDescDiff {
1478    /// Returns whether the diff contains any differences.
1479    pub fn is_empty(&self) -> bool {
1480        self.column_diffs.is_empty() && self.key_diff.is_none()
1481    }
1482}
1483
1484/// A difference in a column between two [`RelationDesc`]s.
1485#[derive(Debug, Clone, PartialEq, Eq)]
1486pub enum ColumnDiff {
1487    /// Column exists only in the left relation.
1488    Missing { name: ColumnName },
1489    /// Column exists only in the right relation.
1490    Extra { name: ColumnName },
1491    /// Columns have different types.
1492    TypeMismatch {
1493        name: ColumnName,
1494        left: SqlScalarType,
1495        right: SqlScalarType,
1496    },
1497    /// Columns have different nullability.
1498    NullabilityMismatch {
1499        name: ColumnName,
1500        left: bool,
1501        right: bool,
1502    },
1503    /// Columns have different names.
1504    NameMismatch { left: ColumnName, right: ColumnName },
1505}
1506
1507/// A difference in the keys of two [`RelationDesc`]s.
1508#[derive(Debug, Clone, PartialEq, Eq)]
1509pub struct KeyDiff {
1510    /// Keys of the left relation.
1511    pub left: BTreeSet<Vec<ColumnName>>,
1512    /// Keys of the right relation.
1513    pub right: BTreeSet<Vec<ColumnName>>,
1514}
1515
1516/// A builder for a [`RelationDesc`].
1517#[derive(Clone, Default, Debug, PartialEq, Eq)]
1518pub struct RelationDescBuilder {
1519    /// Columns of the relation.
1520    columns: Vec<(ColumnName, SqlColumnType)>,
1521    /// Sets of indices that are "keys" for the collection.
1522    keys: Vec<Vec<usize>>,
1523}
1524
1525impl RelationDescBuilder {
1526    /// Appends a column with the specified name and type.
1527    pub fn with_column<N: Into<ColumnName>>(
1528        mut self,
1529        name: N,
1530        ty: SqlColumnType,
1531    ) -> RelationDescBuilder {
1532        let name = name.into();
1533        self.columns.push((name, ty));
1534        self
1535    }
1536
1537    /// Appends the provided columns to the builder.
1538    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1539    where
1540        I: IntoIterator<Item = (N, T)>,
1541        T: Into<SqlColumnType>,
1542        N: Into<ColumnName>,
1543    {
1544        self.columns
1545            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1546        self
1547    }
1548
1549    /// Adds a new key for the relation.
1550    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1551        indices.sort_unstable();
1552        if !self.keys.contains(&indices) {
1553            self.keys.push(indices);
1554        }
1555        self
1556    }
1557
1558    /// Removes all previously inserted keys.
1559    pub fn without_keys(mut self) -> RelationDescBuilder {
1560        self.keys.clear();
1561        assert_eq!(self.keys.len(), 0);
1562        self
1563    }
1564
1565    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1566    pub fn concat(mut self, other: Self) -> Self {
1567        let self_len = self.columns.len();
1568
1569        self.columns.extend(other.columns);
1570        for k in other.keys {
1571            let k = k.into_iter().map(|idx| idx + self_len).collect();
1572            self = self.with_key(k);
1573        }
1574
1575        self
1576    }
1577
1578    /// Finish the builder, returning a [`RelationDesc`].
1579    pub fn finish(self) -> RelationDesc {
1580        let mut desc = RelationDesc::from_names_and_types(self.columns);
1581        desc.typ = desc.typ.with_keys(self.keys);
1582        desc
1583    }
1584}
1585
1586/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1587#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1588pub enum RelationVersionSelector {
1589    Specific(RelationVersion),
1590    Latest,
1591}
1592
1593impl RelationVersionSelector {
1594    pub fn specific(version: u64) -> Self {
1595        RelationVersionSelector::Specific(RelationVersion(version))
1596    }
1597}
1598
1599/// A wrapper around [`RelationDesc`] that provides an interface for adding
1600/// columns and generating new versions.
1601///
1602/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1603/// be great.
1604#[derive(Debug, Clone, Serialize)]
1605pub struct VersionedRelationDesc {
1606    inner: RelationDesc,
1607}
1608
1609impl VersionedRelationDesc {
1610    pub fn new(inner: RelationDesc) -> Self {
1611        VersionedRelationDesc { inner }
1612    }
1613
1614    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1615    ///
1616    /// # Panics
1617    ///
1618    /// * Panics if a column with `name` already exists that hasn't been dropped.
1619    ///
1620    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1621    #[must_use]
1622    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1623    where
1624        N: Into<ColumnName>,
1625        T: Into<SqlColumnType>,
1626    {
1627        let latest_version = self.latest_version();
1628        let new_version = latest_version.bump();
1629
1630        let name = name.into();
1631        let existing = self
1632            .inner
1633            .metadata
1634            .iter()
1635            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1636        if let Some(existing) = existing {
1637            panic!("column named '{name}' already exists! {existing:?}");
1638        }
1639
1640        let next_idx = self.inner.metadata.len();
1641        let col_meta = ColumnMetadata {
1642            name,
1643            typ_idx: next_idx,
1644            added: new_version,
1645            dropped: None,
1646        };
1647
1648        self.inner.typ.column_types.push(typ.into());
1649        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1650
1651        assert_none!(prev, "column index overlap!");
1652        self.validate();
1653
1654        new_version
1655    }
1656
1657    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1658    /// `name` drops the left-most one that hasn't already been dropped.
1659    ///
1660    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1661    ///
1662    /// # Panics
1663    ///
1664    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1665    #[must_use]
1666    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1667    where
1668        N: Into<ColumnName>,
1669    {
1670        let name = name.into();
1671        let latest_version = self.latest_version();
1672        let new_version = latest_version.bump();
1673
1674        let col = self
1675            .inner
1676            .metadata
1677            .values_mut()
1678            .find(|meta| meta.name == name && meta.dropped.is_none())
1679            .expect("column to exist");
1680
1681        // Make sure the column hadn't been previously dropped.
1682        assert_none!(col.dropped, "column was already dropped");
1683        col.dropped = Some(new_version);
1684
1685        // Make sure the column isn't being used as a key.
1686        let dropped_key = self
1687            .inner
1688            .typ
1689            .keys
1690            .iter()
1691            .any(|keys| keys.contains(&col.typ_idx));
1692        assert!(!dropped_key, "column being dropped was used as a key");
1693
1694        self.validate();
1695        new_version
1696    }
1697
1698    /// Returns the [`RelationDesc`] at the latest version.
1699    pub fn latest(&self) -> RelationDesc {
1700        self.inner.clone()
1701    }
1702
1703    /// Returns this [`RelationDesc`] at the specified version.
1704    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1705        // Get all of the changes from the start, up to whatever version was requested.
1706        let up_to_version = match version {
1707            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1708            RelationVersionSelector::Specific(v) => v,
1709        };
1710
1711        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1712            let added = meta.added <= up_to_version;
1713            let dropped = meta
1714                .dropped
1715                .map(|dropped_at| up_to_version >= dropped_at)
1716                .unwrap_or(false);
1717
1718            added && !dropped
1719        });
1720
1721        let mut column_types = Vec::new();
1722        let mut column_metas = BTreeMap::new();
1723
1724        // N.B. At this point we need to be careful because col_idx might not
1725        // equal typ_idx.
1726        //
1727        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1728        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1729        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1730        for (col_idx, meta) in valid_columns {
1731            let new_meta = ColumnMetadata {
1732                name: meta.name.clone(),
1733                typ_idx: column_types.len(),
1734                added: meta.added.clone(),
1735                dropped: meta.dropped.clone(),
1736            };
1737            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1738            column_metas.insert(*col_idx, new_meta);
1739        }
1740
1741        // Remap keys in case a column with an index less than that of a key was
1742        // dropped.
1743        //
1744        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1745        // keys and "b" was dropped.
1746        let keys = self
1747            .inner
1748            .typ
1749            .keys
1750            .iter()
1751            .map(|keys| {
1752                keys.iter()
1753                    .map(|key_idx| {
1754                        let metadata = column_metas
1755                            .get(&ColumnIndex(*key_idx))
1756                            .expect("found key for column that doesn't exist");
1757                        metadata.typ_idx
1758                    })
1759                    .collect()
1760            })
1761            .collect();
1762
1763        let relation_type = SqlRelationType { column_types, keys };
1764
1765        RelationDesc {
1766            typ: relation_type,
1767            metadata: column_metas,
1768        }
1769    }
1770
1771    pub fn latest_version(&self) -> RelationVersion {
1772        self.inner
1773            .metadata
1774            .values()
1775            // N.B. Dropped is always greater than added.
1776            .map(|meta| meta.dropped.unwrap_or(meta.added))
1777            .max()
1778            // If there aren't any columns we're implicitly the root version.
1779            .unwrap_or_else(RelationVersion::root)
1780    }
1781
1782    /// Validates internal contraints of the [`RelationDesc`] are correct.
1783    ///
1784    /// # Panics
1785    ///
1786    /// Panics if a constraint is not satisfied.
1787    fn validate(&self) {
1788        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1789            if desc.typ.column_types.len() != desc.metadata.len() {
1790                anyhow::bail!("mismatch between number of types and metadatas");
1791            }
1792
1793            for (col_idx, meta) in &desc.metadata {
1794                if col_idx.0 > desc.metadata.len() {
1795                    anyhow::bail!("column index out of bounds");
1796                }
1797                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1798                    anyhow::bail!("column was added after it was dropped?");
1799                }
1800                if desc.typ().columns().get(meta.typ_idx).is_none() {
1801                    anyhow::bail!("typ_idx incorrect");
1802                }
1803            }
1804
1805            for keys in &desc.typ.keys {
1806                for key in keys {
1807                    if *key >= desc.typ.column_types.len() {
1808                        anyhow::bail!("key index was out of bounds!");
1809                    }
1810                }
1811            }
1812
1813            let versions = desc
1814                .metadata
1815                .values()
1816                .map(|meta| meta.dropped.unwrap_or(meta.added));
1817            let mut max = 0;
1818            let mut sum = 0;
1819            for version in versions {
1820                max = std::cmp::max(max, version.0);
1821                sum += version.0;
1822            }
1823
1824            // Other than RelationVersion(0), we should never have duplicate
1825            // versions and they should always increase by 1. In other words, the
1826            // sum of all RelationVersions should be the sum of [0, max].
1827            //
1828            // N.B. n * (n + 1) / 2 = sum of [0, n]
1829            //
1830            // While I normally don't like tricks like this, it allows us to
1831            // validate that our column versions are correct in O(n) time and
1832            // without allocations.
1833            if sum != (max * (max + 1) / 2) {
1834                anyhow::bail!("there is a duplicate or missing relation version");
1835            }
1836
1837            Ok(())
1838        }
1839
1840        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1841    }
1842}
1843
1844/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1845/// exercise schema migrations.
1846#[derive(Debug)]
1847pub enum PropRelationDescDiff {
1848    AddColumn {
1849        name: ColumnName,
1850        typ: SqlColumnType,
1851    },
1852    DropColumn {
1853        name: ColumnName,
1854    },
1855    ToggleNullability {
1856        name: ColumnName,
1857    },
1858    ChangeType {
1859        name: ColumnName,
1860        typ: SqlColumnType,
1861    },
1862}
1863
1864impl PropRelationDescDiff {
1865    pub fn apply(self, desc: &mut RelationDesc) {
1866        match self {
1867            PropRelationDescDiff::AddColumn { name, typ } => {
1868                let new_idx = desc.metadata.len();
1869                let meta = ColumnMetadata {
1870                    name,
1871                    typ_idx: new_idx,
1872                    added: RelationVersion(0),
1873                    dropped: None,
1874                };
1875                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1876                desc.typ.column_types.push(typ);
1877
1878                assert_none!(prev);
1879                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1880            }
1881            PropRelationDescDiff::DropColumn { name } => {
1882                let next_version = desc
1883                    .metadata
1884                    .values()
1885                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1886                    .max()
1887                    .unwrap_or_else(RelationVersion::root)
1888                    .bump();
1889                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1890                else {
1891                    return;
1892                };
1893                if metadata.dropped.is_none() {
1894                    metadata.dropped = Some(next_version);
1895                }
1896            }
1897            PropRelationDescDiff::ToggleNullability { name } => {
1898                let Some((pos, _)) = desc.get_by_name(&name) else {
1899                    return;
1900                };
1901                let col_type = desc
1902                    .typ
1903                    .column_types
1904                    .get_mut(pos)
1905                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1906                col_type.nullable = !col_type.nullable;
1907            }
1908            PropRelationDescDiff::ChangeType { name, typ } => {
1909                let Some((pos, _)) = desc.get_by_name(&name) else {
1910                    return;
1911                };
1912                let col_type = desc
1913                    .typ
1914                    .column_types
1915                    .get_mut(pos)
1916                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1917                *col_type = typ;
1918            }
1919        }
1920    }
1921}
1922
1923/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1924pub fn arb_relation_desc_diff(
1925    source: &RelationDesc,
1926) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1927    let source = Rc::new(source.clone());
1928    let num_source_columns = source.typ.columns().len();
1929
1930    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1931    let add_columns_strat = num_add_columns
1932        .prop_flat_map(|num_columns| {
1933            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1934        })
1935        .prop_map(|cols| {
1936            cols.into_iter()
1937                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1938                .collect::<Vec<_>>()
1939        });
1940
1941    // If the source RelationDesc is empty there is nothing else to do.
1942    if num_source_columns == 0 {
1943        return add_columns_strat.boxed();
1944    }
1945
1946    let source_ = Rc::clone(&source);
1947    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1948        let mut set = BTreeSet::default();
1949        for _ in 0..num_columns {
1950            let col_idx = rng.random_range(0..num_source_columns);
1951            set.insert(source_.get_name(col_idx).clone());
1952        }
1953        set.into_iter()
1954            .map(|name| PropRelationDescDiff::DropColumn { name })
1955            .collect::<Vec<_>>()
1956    });
1957
1958    let source_ = Rc::clone(&source);
1959    let toggle_nullability_strat =
1960        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1961            let mut set = BTreeSet::default();
1962            for _ in 0..num_columns {
1963                let col_idx = rng.random_range(0..num_source_columns);
1964                set.insert(source_.get_name(col_idx).clone());
1965            }
1966            set.into_iter()
1967                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1968                .collect::<Vec<_>>()
1969        });
1970
1971    let source_ = Rc::clone(&source);
1972    let change_type_strat = (0..num_source_columns)
1973        .prop_perturb(move |num_columns, mut rng| {
1974            let mut set = BTreeSet::default();
1975            for _ in 0..num_columns {
1976                let col_idx = rng.random_range(0..num_source_columns);
1977                set.insert(source_.get_name(col_idx).clone());
1978            }
1979            set
1980        })
1981        .prop_flat_map(|cols| {
1982            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1983                .prop_map(move |types| (cols.clone(), types))
1984        })
1985        .prop_map(|(cols, types)| {
1986            cols.into_iter()
1987                .zip_eq(types)
1988                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1989                .collect::<Vec<_>>()
1990        });
1991
1992    (
1993        add_columns_strat,
1994        drop_columns_strat,
1995        toggle_nullability_strat,
1996        change_type_strat,
1997    )
1998        .prop_map(|(adds, drops, toggles, changes)| {
1999            adds.into_iter()
2000                .chain(drops)
2001                .chain(toggles)
2002                .chain(changes)
2003                .collect::<Vec<_>>()
2004        })
2005        .prop_shuffle()
2006        .boxed()
2007}
2008
2009#[cfg(test)]
2010mod tests {
2011    use super::*;
2012    use prost::Message;
2013
2014    #[mz_ore::test]
2015    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2016    fn smoktest_at_version() {
2017        let desc = RelationDesc::builder()
2018            .with_column("a", SqlScalarType::Bool.nullable(true))
2019            .with_column("z", SqlScalarType::String.nullable(false))
2020            .finish();
2021
2022        let mut versioned_desc = VersionedRelationDesc {
2023            inner: desc.clone(),
2024        };
2025        versioned_desc.validate();
2026
2027        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2028        assert_eq!(desc, latest);
2029
2030        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2031        assert_eq!(desc, v0);
2032
2033        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2034        assert_eq!(desc, v3);
2035
2036        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2037        assert_eq!(v1, RelationVersion(1));
2038
2039        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2040        insta::assert_json_snapshot!(v1.metadata, @r###"
2041        {
2042          "0": {
2043            "name": "a",
2044            "typ_idx": 0,
2045            "added": 0,
2046            "dropped": null
2047          },
2048          "1": {
2049            "name": "z",
2050            "typ_idx": 1,
2051            "added": 0,
2052            "dropped": null
2053          },
2054          "2": {
2055            "name": "b",
2056            "typ_idx": 2,
2057            "added": 1,
2058            "dropped": null
2059          }
2060        }
2061        "###);
2062
2063        // Check that V0 doesn't show the new column.
2064        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2065        assert!(v0.iter().eq(v0_b.iter()));
2066
2067        let v2 = versioned_desc.drop_column("z");
2068        assert_eq!(v2, RelationVersion(2));
2069
2070        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2071        insta::assert_json_snapshot!(v2.metadata, @r###"
2072        {
2073          "0": {
2074            "name": "a",
2075            "typ_idx": 0,
2076            "added": 0,
2077            "dropped": null
2078          },
2079          "2": {
2080            "name": "b",
2081            "typ_idx": 1,
2082            "added": 1,
2083            "dropped": null
2084          }
2085        }
2086        "###);
2087
2088        // Check that V0 and V1 are still correct.
2089        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2090        assert!(v0.iter().eq(v0_c.iter()));
2091
2092        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2093        assert!(v1.iter().eq(v1_b.iter()));
2094
2095        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2096        {
2097          "0": {
2098            "name": "a",
2099            "typ_idx": 0,
2100            "added": 0,
2101            "dropped": null
2102          },
2103          "1": {
2104            "name": "z",
2105            "typ_idx": 1,
2106            "added": 0,
2107            "dropped": 2
2108          },
2109          "2": {
2110            "name": "b",
2111            "typ_idx": 2,
2112            "added": 1,
2113            "dropped": null
2114          }
2115        }
2116        "###);
2117    }
2118
2119    #[mz_ore::test]
2120    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2121    fn test_dropping_columns_with_keys() {
2122        let desc = RelationDesc::builder()
2123            .with_column("a", SqlScalarType::Bool.nullable(true))
2124            .with_column("z", SqlScalarType::String.nullable(false))
2125            .with_key(vec![1])
2126            .finish();
2127
2128        let mut versioned_desc = VersionedRelationDesc {
2129            inner: desc.clone(),
2130        };
2131        versioned_desc.validate();
2132
2133        let v1 = versioned_desc.drop_column("a");
2134        assert_eq!(v1, RelationVersion(1));
2135
2136        // Make sure the key index for 'z' got remapped since 'a' was dropped.
2137        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2138        insta::assert_json_snapshot!(v1, @r###"
2139        {
2140          "typ": {
2141            "column_types": [
2142              {
2143                "scalar_type": "String",
2144                "nullable": false
2145              }
2146            ],
2147            "keys": [
2148              [
2149                0
2150              ]
2151            ]
2152          },
2153          "metadata": {
2154            "1": {
2155              "name": "z",
2156              "typ_idx": 0,
2157              "added": 0,
2158              "dropped": null
2159            }
2160          }
2161        }
2162        "###);
2163
2164        // Make sure the key index of 'z' is correct when all columns are present.
2165        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2166        insta::assert_json_snapshot!(v0, @r###"
2167        {
2168          "typ": {
2169            "column_types": [
2170              {
2171                "scalar_type": "Bool",
2172                "nullable": true
2173              },
2174              {
2175                "scalar_type": "String",
2176                "nullable": false
2177              }
2178            ],
2179            "keys": [
2180              [
2181                1
2182              ]
2183            ]
2184          },
2185          "metadata": {
2186            "0": {
2187              "name": "a",
2188              "typ_idx": 0,
2189              "added": 0,
2190              "dropped": 1
2191            },
2192            "1": {
2193              "name": "z",
2194              "typ_idx": 1,
2195              "added": 0,
2196              "dropped": null
2197            }
2198          }
2199        }
2200        "###);
2201    }
2202
2203    #[mz_ore::test]
2204    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2205    fn roundtrip_relation_desc_without_metadata() {
2206        let typ = ProtoRelationType {
2207            column_types: vec![
2208                SqlScalarType::String.nullable(false).into_proto(),
2209                SqlScalarType::Bool.nullable(true).into_proto(),
2210            ],
2211            keys: vec![],
2212        };
2213        let proto = ProtoRelationDesc {
2214            typ: Some(typ),
2215            names: vec![
2216                ColumnName("a".into()).into_proto(),
2217                ColumnName("b".into()).into_proto(),
2218            ],
2219            metadata: vec![],
2220        };
2221        let desc: RelationDesc = proto.into_rust().unwrap();
2222
2223        insta::assert_json_snapshot!(desc, @r###"
2224        {
2225          "typ": {
2226            "column_types": [
2227              {
2228                "scalar_type": "String",
2229                "nullable": false
2230              },
2231              {
2232                "scalar_type": "Bool",
2233                "nullable": true
2234              }
2235            ],
2236            "keys": []
2237          },
2238          "metadata": {
2239            "0": {
2240              "name": "a",
2241              "typ_idx": 0,
2242              "added": 0,
2243              "dropped": null
2244            },
2245            "1": {
2246              "name": "b",
2247              "typ_idx": 1,
2248              "added": 0,
2249              "dropped": null
2250            }
2251          }
2252        }
2253        "###);
2254    }
2255
2256    #[mz_ore::test]
2257    #[should_panic(expected = "column named 'a' already exists!")]
2258    fn test_add_column_with_same_name_panics() {
2259        let desc = RelationDesc::builder()
2260            .with_column("a", SqlScalarType::Bool.nullable(true))
2261            .finish();
2262        let mut versioned = VersionedRelationDesc::new(desc);
2263
2264        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2265    }
2266
2267    #[mz_ore::test]
2268    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2269    fn test_add_column_with_same_name_prev_dropped() {
2270        let desc = RelationDesc::builder()
2271            .with_column("a", SqlScalarType::Bool.nullable(true))
2272            .finish();
2273        let mut versioned = VersionedRelationDesc::new(desc);
2274
2275        let v1 = versioned.drop_column("a");
2276        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2277        insta::assert_json_snapshot!(v1, @r###"
2278        {
2279          "typ": {
2280            "column_types": [],
2281            "keys": []
2282          },
2283          "metadata": {}
2284        }
2285        "###);
2286
2287        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2288        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2289        insta::assert_json_snapshot!(v2, @r###"
2290        {
2291          "typ": {
2292            "column_types": [
2293              {
2294                "scalar_type": "String",
2295                "nullable": false
2296              }
2297            ],
2298            "keys": []
2299          },
2300          "metadata": {
2301            "1": {
2302              "name": "a",
2303              "typ_idx": 0,
2304              "added": 2,
2305              "dropped": null
2306            }
2307          }
2308        }
2309        "###);
2310    }
2311
2312    #[mz_ore::test]
2313    #[cfg_attr(miri, ignore)]
2314    fn apply_demand() {
2315        let desc = RelationDesc::builder()
2316            .with_column("a", SqlScalarType::String.nullable(true))
2317            .with_column("b", SqlScalarType::Int64.nullable(false))
2318            .with_column("c", SqlScalarType::Time.nullable(false))
2319            .finish();
2320        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2321        assert_eq!(desc.arity(), 2);
2322        // TODO(parkmycar): Move validate onto RelationDesc.
2323        VersionedRelationDesc::new(desc).validate();
2324    }
2325
2326    #[mz_ore::test]
2327    #[cfg_attr(miri, ignore)]
2328    fn smoketest_column_index_stable_ident() {
2329        let idx_a = ColumnIndex(42);
2330        // Note(parkmycar): This should never change.
2331        assert_eq!(idx_a.to_stable_name(), "42");
2332    }
2333
2334    #[mz_ore::test]
2335    #[cfg_attr(miri, ignore)] // too slow
2336    fn proptest_relation_desc_roundtrips() {
2337        fn testcase(og: RelationDesc) {
2338            let bytes = og.into_proto().encode_to_vec();
2339            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2340            let rnd = RelationDesc::from_proto(proto).unwrap();
2341
2342            assert_eq!(og, rnd);
2343        }
2344
2345        proptest!(|(desc in any::<RelationDesc>())| {
2346            testcase(desc);
2347        });
2348
2349        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2350            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2351        });
2352
2353        proptest!(|((mut desc, diffs) in strat)| {
2354            for diff in diffs {
2355                diff.apply(&mut desc);
2356            };
2357            testcase(desc);
2358        });
2359    }
2360}