Skip to main content

mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26
27use crate::relation_and_scalar::proto_relation_type::ProtoKey;
28pub use crate::relation_and_scalar::{
29    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
30    ProtoRelationVersion,
31};
32use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
33
34/// The type of a [`Datum`].
35///
36/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
37/// Int32 or String) with its nullability.
38///
39/// To construct a column type, either initialize the struct directly, or
40/// use the [`SqlScalarType::nullable`] method.
41#[derive(
42    Arbitrary,
43    Clone,
44    Debug,
45    Eq,
46    PartialEq,
47    Ord,
48    PartialOrd,
49    Serialize,
50    Deserialize,
51    Hash,
52    MzReflect
53)]
54pub struct SqlColumnType {
55    /// The underlying scalar type (e.g., Int32 or String) of this column.
56    pub scalar_type: SqlScalarType,
57    /// Whether this datum can be null.
58    #[serde(default = "return_true")]
59    pub nullable: bool,
60}
61
62/// This method exists solely for the purpose of making SqlColumnType nullable by
63/// default in unit tests. The default value of a bool is false, and the only
64/// way to make an object take on any other value by default is to pass it a
65/// function that returns the desired default value. See
66/// <https://github.com/serde-rs/serde/issues/1030>
67#[inline(always)]
68fn return_true() -> bool {
69    true
70}
71
72impl SqlColumnType {
73    /// Backports nullability information from `backport_typ` into `self`,
74    /// affecting the outer `.nullable` field but also record fields deeper
75    /// into the type.
76    pub fn backport_nullability(&mut self, backport_typ: &SqlColumnType) {
77        self.scalar_type
78            .backport_nullability(&backport_typ.scalar_type);
79        self.nullable = backport_typ.nullable;
80    }
81
82    /// Unions two [`SqlColumnType`]s.
83    ///
84    /// Will return an error if the underlying scalar types are SQL-incompatible,
85    /// e.g., unioning a `Text` and a `Int32`... or, more surprisingly, unioning
86    /// a `Text` and a `VarChar`.
87    pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
88        match (&self.scalar_type, &other.scalar_type) {
89            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
90                Ok(SqlColumnType {
91                    scalar_type: scalar_type.clone(),
92                    nullable: self.nullable || other.nullable,
93                })
94            }
95            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
96                Ok(SqlColumnType {
97                    scalar_type: scalar_type.without_modifiers(),
98                    nullable: self.nullable || other.nullable,
99                })
100            }
101            (
102                SqlScalarType::Record { fields, custom_id },
103                SqlScalarType::Record {
104                    fields: other_fields,
105                    custom_id: other_custom_id,
106                },
107            ) => {
108                if custom_id != other_custom_id {
109                    bail!(
110                        "Can't union types: {:?} and {:?}",
111                        self.scalar_type,
112                        other.scalar_type
113                    );
114                };
115
116                if fields.len() != other_fields.len() {
117                    bail!(
118                        "Can't union types: {:?} and {:?}",
119                        self.scalar_type,
120                        other.scalar_type
121                    );
122                }
123                let mut union_fields = Vec::with_capacity(fields.len());
124                for ((name, typ), (other_name, other_typ)) in
125                    fields.iter().zip_eq(other_fields.iter())
126                {
127                    if name != other_name {
128                        bail!(
129                            "Can't union types: {:?} and {:?}",
130                            self.scalar_type,
131                            other.scalar_type
132                        );
133                    } else {
134                        let union_column_type = typ.sql_union(other_typ)?;
135                        union_fields.push((name.clone(), union_column_type));
136                    };
137                }
138
139                Ok(SqlColumnType {
140                    scalar_type: SqlScalarType::Record {
141                        fields: union_fields.into(),
142                        custom_id: *custom_id,
143                    },
144                    nullable: self.nullable || other.nullable,
145                })
146            }
147            _ => bail!(
148                "Can't union types: {:?} and {:?}",
149                self.scalar_type,
150                other.scalar_type
151            ),
152        }
153    }
154
155    pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
156        self.sql_union(other).or_else(|e| {
157            ::tracing::trace!("repr type error: sql_union({self:?}, {other:?}): {e}");
158
159            let repr_self = ReprColumnType::from(self);
160            let repr_other = ReprColumnType::from(other);
161            repr_self
162                .union(&repr_other)
163                .map(|typ| SqlColumnType::from_repr(&typ))
164        })
165    }
166
167    pub fn union(&self, other: &Self) -> Self {
168        self.try_union(other).unwrap_or_else(|e| {
169            panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
170        })
171    }
172
173    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
174    /// nullability set to the specified boolean.
175    pub fn nullable(mut self, nullable: bool) -> Self {
176        self.nullable = nullable;
177        self
178    }
179}
180
181impl RustType<ProtoColumnType> for SqlColumnType {
182    fn into_proto(&self) -> ProtoColumnType {
183        ProtoColumnType {
184            nullable: self.nullable,
185            scalar_type: Some(self.scalar_type.into_proto()),
186        }
187    }
188
189    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
190        Ok(SqlColumnType {
191            nullable: proto.nullable,
192            scalar_type: proto
193                .scalar_type
194                .into_rust_if_some("ProtoColumnType::scalar_type")?,
195        })
196    }
197}
198
199impl fmt::Display for SqlColumnType {
200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201        let nullable = if self.nullable { "Null" } else { "NotNull" };
202        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
203    }
204}
205
206/// The type of a relation.
207#[derive(
208    Arbitrary,
209    Clone,
210    Debug,
211    Eq,
212    PartialEq,
213    Ord,
214    PartialOrd,
215    Serialize,
216    Deserialize,
217    Hash,
218    MzReflect
219)]
220pub struct SqlRelationType {
221    /// The type for each column, in order.
222    pub column_types: Vec<SqlColumnType>,
223    /// Sets of indices that are "keys" for the collection.
224    ///
225    /// Each element in this list is a set of column indices, each with the
226    /// property that the collection contains at most one record with each
227    /// distinct set of values for each column. Alternately, for a specific set
228    /// of values assigned to the these columns there is at most one record.
229    ///
230    /// A collection can contain multiple sets of keys, although it is common to
231    /// have either zero or one sets of key indices.
232    #[serde(default)]
233    pub keys: Vec<Vec<usize>>,
234}
235
236impl SqlRelationType {
237    /// Constructs a `SqlRelationType` representing the relation with no columns and
238    /// no keys.
239    pub fn empty() -> Self {
240        SqlRelationType::new(vec![])
241    }
242
243    /// Constructs a new `SqlRelationType` from specified column types.
244    ///
245    /// The `SqlRelationType` will have no keys.
246    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
247        SqlRelationType {
248            column_types,
249            keys: Vec::new(),
250        }
251    }
252
253    /// Adds a new key for the relation.
254    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
255        indices.sort_unstable();
256        if !self.keys.contains(&indices) {
257            self.keys.push(indices);
258        }
259        self
260    }
261
262    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
263        for key in keys {
264            self = self.with_key(key)
265        }
266        self
267    }
268
269    /// Computes the number of columns in the relation.
270    pub fn arity(&self) -> usize {
271        self.column_types.len()
272    }
273
274    /// Gets the index of the columns used when creating a default index.
275    pub fn default_key(&self) -> Vec<usize> {
276        if let Some(key) = self.keys.first() {
277            if key.is_empty() {
278                (0..self.column_types.len()).collect()
279            } else {
280                key.clone()
281            }
282        } else {
283            (0..self.column_types.len()).collect()
284        }
285    }
286
287    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
288    pub fn columns(&self) -> &[SqlColumnType] {
289        &self.column_types
290    }
291
292    /// Adopts the nullability and keys from another `SqlRelationType`.
293    ///
294    /// Panics if the number of columns does not match.
295    pub fn backport_nullability_and_keys(&mut self, backport_typ: &SqlRelationType) {
296        assert_eq!(
297            backport_typ.column_types.len(),
298            self.column_types.len(),
299            "HIR and MIR types should have the same number of columns"
300        );
301        for (backport_col, sql_col) in backport_typ
302            .column_types
303            .iter()
304            .zip_eq(self.column_types.iter_mut())
305        {
306            sql_col.backport_nullability(backport_col);
307        }
308
309        self.keys = backport_typ.keys.clone();
310    }
311}
312
313impl RustType<ProtoRelationType> for SqlRelationType {
314    fn into_proto(&self) -> ProtoRelationType {
315        ProtoRelationType {
316            column_types: self.column_types.into_proto(),
317            keys: self.keys.into_proto(),
318        }
319    }
320
321    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
322        Ok(SqlRelationType {
323            column_types: proto.column_types.into_rust()?,
324            keys: proto.keys.into_rust()?,
325        })
326    }
327}
328
329impl RustType<ProtoKey> for Vec<usize> {
330    fn into_proto(&self) -> ProtoKey {
331        ProtoKey {
332            keys: self.into_proto(),
333        }
334    }
335
336    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
337        proto.keys.into_rust()
338    }
339}
340
341/// The type of a relation.
342#[derive(
343    Clone,
344    Debug,
345    Eq,
346    PartialEq,
347    Ord,
348    PartialOrd,
349    Serialize,
350    Deserialize,
351    Hash
352)]
353pub struct ReprRelationType {
354    /// The type for each column, in order.
355    pub column_types: Vec<ReprColumnType>,
356    /// Sets of indices that are "keys" for the collection.
357    ///
358    /// Each element in this list is a set of column indices, each with the
359    /// property that the collection contains at most one record with each
360    /// distinct set of values for each column. Alternately, for a specific set
361    /// of values assigned to the these columns there is at most one record.
362    ///
363    /// A collection can contain multiple sets of keys, although it is common to
364    /// have either zero or one sets of key indices.
365    #[serde(default)]
366    pub keys: Vec<Vec<usize>>,
367}
368
369impl ReprRelationType {
370    /// Constructs a `ReprRelationType` representing the relation with no columns and
371    /// no keys.
372    pub fn empty() -> Self {
373        ReprRelationType::new(vec![])
374    }
375
376    /// Constructs a new `ReprRelationType` from specified column types.
377    ///
378    /// The `ReprRelationType` will have no keys.
379    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
380        ReprRelationType {
381            column_types,
382            keys: Vec::new(),
383        }
384    }
385
386    /// Adds a new key for the relation.
387    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
388        indices.sort_unstable();
389        if !self.keys.contains(&indices) {
390            self.keys.push(indices);
391        }
392        self
393    }
394
395    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
396        for key in keys {
397            self = self.with_key(key)
398        }
399        self
400    }
401
402    /// Computes the number of columns in the relation.
403    pub fn arity(&self) -> usize {
404        self.column_types.len()
405    }
406
407    /// Gets the index of the columns used when creating a default index.
408    pub fn default_key(&self) -> Vec<usize> {
409        if let Some(key) = self.keys.first() {
410            if key.is_empty() {
411                (0..self.column_types.len()).collect()
412            } else {
413                key.clone()
414            }
415        } else {
416            (0..self.column_types.len()).collect()
417        }
418    }
419
420    /// Returns all the column types in order, for this relation.
421    pub fn columns(&self) -> &[ReprColumnType] {
422        &self.column_types
423    }
424}
425
426impl From<&SqlRelationType> for ReprRelationType {
427    fn from(sql_relation_type: &SqlRelationType) -> Self {
428        ReprRelationType {
429            column_types: sql_relation_type
430                .column_types
431                .iter()
432                .map(ReprColumnType::from)
433                .collect(),
434            keys: sql_relation_type.keys.clone(),
435        }
436    }
437}
438
439#[derive(
440    Clone,
441    Debug,
442    Eq,
443    PartialEq,
444    Ord,
445    PartialOrd,
446    Serialize,
447    Deserialize,
448    Hash,
449    MzReflect
450)]
451pub struct ReprColumnType {
452    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
453    pub scalar_type: ReprScalarType,
454    /// Whether this datum can be null.
455    #[serde(default = "return_true")]
456    pub nullable: bool,
457}
458
459impl ReprColumnType {
460    pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
461        let scalar_type = self.scalar_type.union(&col.scalar_type)?;
462        let nullable = self.nullable || col.nullable;
463
464        Ok(ReprColumnType {
465            scalar_type,
466            nullable,
467        })
468    }
469}
470
471impl From<&SqlColumnType> for ReprColumnType {
472    fn from(sql_column_type: &SqlColumnType) -> Self {
473        let scalar_type = &sql_column_type.scalar_type;
474        let scalar_type = scalar_type.into();
475        let nullable = sql_column_type.nullable;
476
477        ReprColumnType {
478            scalar_type,
479            nullable,
480        }
481    }
482}
483
484impl SqlColumnType {
485    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
486    ///
487    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
488    pub fn from_repr(repr: &ReprColumnType) -> Self {
489        let scalar_type = &repr.scalar_type;
490        let scalar_type = SqlScalarType::from_repr(scalar_type);
491        let nullable = repr.nullable;
492
493        SqlColumnType {
494            scalar_type,
495            nullable,
496        }
497    }
498}
499
500/// The name of a column in a [`RelationDesc`].
501#[derive(
502    Clone,
503    Debug,
504    Eq,
505    PartialEq,
506    Ord,
507    PartialOrd,
508    Serialize,
509    Deserialize,
510    Hash,
511    MzReflect
512)]
513pub struct ColumnName(Box<str>);
514
515impl ColumnName {
516    /// Returns this column name as a `str`.
517    #[inline(always)]
518    pub fn as_str(&self) -> &str {
519        &*self
520    }
521
522    /// Returns this column name as a `&mut Box<str>`.
523    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
524        &mut self.0
525    }
526
527    /// Returns if this [`ColumnName`] is similar to the provided one.
528    pub fn is_similar(&self, other: &ColumnName) -> bool {
529        const SIMILARITY_THRESHOLD: f64 = 0.6;
530
531        let a_lowercase = self.to_lowercase();
532        let b_lowercase = other.to_lowercase();
533
534        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
535    }
536}
537
538impl std::ops::Deref for ColumnName {
539    type Target = str;
540
541    #[inline(always)]
542    fn deref(&self) -> &Self::Target {
543        &self.0
544    }
545}
546
547impl fmt::Display for ColumnName {
548    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
549        f.write_str(&self.0)
550    }
551}
552
553impl From<String> for ColumnName {
554    fn from(s: String) -> ColumnName {
555        ColumnName(s.into())
556    }
557}
558
559impl From<&str> for ColumnName {
560    fn from(s: &str) -> ColumnName {
561        ColumnName(s.into())
562    }
563}
564
565impl From<&ColumnName> for ColumnName {
566    fn from(n: &ColumnName) -> ColumnName {
567        n.clone()
568    }
569}
570
571impl RustType<ProtoColumnName> for ColumnName {
572    fn into_proto(&self) -> ProtoColumnName {
573        ProtoColumnName {
574            value: Some(self.0.to_string()),
575        }
576    }
577
578    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
579        Ok(ColumnName(
580            proto
581                .value
582                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
583                .into(),
584        ))
585    }
586}
587
588impl From<ColumnName> for mz_sql_parser::ast::Ident {
589    fn from(value: ColumnName) -> Self {
590        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
591        mz_sql_parser::ast::Ident::new_unchecked(value.0)
592    }
593}
594
595impl proptest::arbitrary::Arbitrary for ColumnName {
596    type Parameters = ();
597    type Strategy = BoxedStrategy<ColumnName>;
598
599    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
600        // Long column names are generally uninteresting, and can greatly
601        // increase the runtime for a test case, so bound the max length.
602        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
603        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
604            weights.extend([
605                (5, Just(16..128)),
606                (1, Just(128..1024)),
607                (1, Just(1024..4096)),
608            ]);
609        }
610        let name_length = Union::new_weighted(weights);
611
612        // Non-ASCII characters are also generally uninteresting and can make
613        // debugging harder.
614        let char_strat = Rc::new(Union::new_weighted(vec![
615            (50, proptest::char::range('A', 'z').boxed()),
616            (1, any::<char>().boxed()),
617        ]));
618
619        name_length
620            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
621            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
622            .no_shrink()
623            .boxed()
624    }
625}
626
627/// Default name of a column (when no other information is known).
628pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
629
630/// Stable index of a column in a [`RelationDesc`].
631#[derive(
632    Clone,
633    Copy,
634    Debug,
635    Eq,
636    PartialEq,
637    PartialOrd,
638    Ord,
639    Serialize,
640    Deserialize,
641    Hash,
642    MzReflect
643)]
644pub struct ColumnIndex(usize);
645
646static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
647
648impl ColumnIndex {
649    /// Returns a stable identifier for this [`ColumnIndex`].
650    pub fn to_stable_name(&self) -> String {
651        self.0.to_string()
652    }
653
654    pub fn to_raw(&self) -> usize {
655        self.0
656    }
657
658    pub fn from_raw(val: usize) -> Self {
659        ColumnIndex(val)
660    }
661}
662
663/// The version a given column was added at.
664#[derive(
665    Clone,
666    Copy,
667    Debug,
668    Eq,
669    PartialEq,
670    PartialOrd,
671    Ord,
672    Serialize,
673    Deserialize,
674    Hash,
675    MzReflect,
676    Arbitrary
677)]
678pub struct RelationVersion(u64);
679
680impl RelationVersion {
681    /// Returns the "root" or "initial" version of a [`RelationDesc`].
682    pub fn root() -> Self {
683        RelationVersion(0)
684    }
685
686    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
687    pub fn bump(&self) -> Self {
688        let next_version = self
689            .0
690            .checked_add(1)
691            .expect("added more than u64::MAX columns?");
692        RelationVersion(next_version)
693    }
694
695    /// Consume a [`RelationVersion`] returning the raw value.
696    ///
697    /// Should __only__ be used for serialization.
698    pub fn into_raw(self) -> u64 {
699        self.0
700    }
701
702    /// Create a [`RelationVersion`] from a raw value.
703    ///
704    /// Should __only__ be used for serialization.
705    pub fn from_raw(val: u64) -> RelationVersion {
706        RelationVersion(val)
707    }
708}
709
710impl From<RelationVersion> for SchemaId {
711    fn from(value: RelationVersion) -> Self {
712        SchemaId(usize::cast_from(value.0))
713    }
714}
715
716impl From<mz_sql_parser::ast::Version> for RelationVersion {
717    fn from(value: mz_sql_parser::ast::Version) -> Self {
718        RelationVersion(value.into_inner())
719    }
720}
721
722impl fmt::Display for RelationVersion {
723    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
724        write!(f, "v{}", self.0)
725    }
726}
727
728impl From<RelationVersion> for mz_sql_parser::ast::Version {
729    fn from(value: RelationVersion) -> Self {
730        mz_sql_parser::ast::Version::new(value.0)
731    }
732}
733
734impl RustType<ProtoRelationVersion> for RelationVersion {
735    fn into_proto(&self) -> ProtoRelationVersion {
736        ProtoRelationVersion { value: self.0 }
737    }
738
739    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
740        Ok(RelationVersion(proto.value))
741    }
742}
743
744/// Metadata (other than type) for a column in a [`RelationDesc`].
745#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
746struct ColumnMetadata {
747    /// Name of the column.
748    name: ColumnName,
749    /// Index into a [`SqlRelationType`] for this column.
750    typ_idx: usize,
751    /// Version this column was added at.
752    added: RelationVersion,
753    /// Version this column was dropped at.
754    dropped: Option<RelationVersion>,
755}
756
757/// A description of the shape of a relation.
758///
759/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
760/// the relation.
761///
762/// # Examples
763///
764/// A `RelationDesc`s is typically constructed via its builder API:
765///
766/// ```
767/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
768///
769/// let desc = RelationDesc::builder()
770///     .with_column("id", SqlScalarType::Int64.nullable(false))
771///     .with_column("price", SqlScalarType::Float64.nullable(true))
772///     .finish();
773/// ```
774///
775/// In more complicated cases, like when constructing a `RelationDesc` in
776/// response to user input, it may be more convenient to construct a relation
777/// type first, and imbue it with column names to form a `RelationDesc` later:
778///
779/// ```
780/// use mz_repr::RelationDesc;
781///
782/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
783/// let relation_type = plan_query("SELECT * FROM table");
784/// let names = (0..relation_type.arity()).map(|i| match i {
785///     0 => "first",
786///     1 => "second",
787///     _ => "unknown",
788/// });
789/// let desc = RelationDesc::new(relation_type, names);
790/// ```
791///
792/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
793/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
794/// column throughout the lifetime of the relation. This allows a
795/// [`RelationDesc`] to represent a projection over a version of itself.
796///
797/// ```
798/// use std::collections::BTreeSet;
799/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
800///
801/// let desc = RelationDesc::builder()
802///     .with_column("name", SqlScalarType::String.nullable(false))
803///     .with_column("email", SqlScalarType::String.nullable(false))
804///     .finish();
805///
806/// // Project away the second column.
807/// let demands = BTreeSet::from([1]);
808/// let proj = desc.apply_demand(&demands);
809///
810/// // We projected away the first column.
811/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
812/// // But retained the second.
813/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
814///
815/// // The underlying `SqlRelationType` also contains a single column.
816/// assert_eq!(proj.typ().arity(), 1);
817/// ```
818///
819/// To maintain this stable mapping and track the lifetime of a column (e.g.
820/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
821/// the index in [`SqlRelationType`] that corresponds to a given column, and the
822/// version at which this column was added or dropped.
823///
824#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
825pub struct RelationDesc {
826    typ: SqlRelationType,
827    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
828}
829
830impl RustType<ProtoRelationDesc> for RelationDesc {
831    fn into_proto(&self) -> ProtoRelationDesc {
832        let (names, metadata): (Vec<_>, Vec<_>) = self
833            .metadata
834            .values()
835            .map(|meta| {
836                let metadata = ProtoColumnMetadata {
837                    added: Some(meta.added.into_proto()),
838                    dropped: meta.dropped.map(|v| v.into_proto()),
839                };
840                (meta.name.into_proto(), metadata)
841            })
842            .unzip();
843
844        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
845        // metadata field was added. To make sure our serialization roundtrips the same as before
846        // we added the field, we omit `metadata` if all of the values are equal to the default.
847        //
848        // Note: This logic needs to exist approximately forever.
849        let is_all_default_metadata = metadata.iter().all(|meta| {
850            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
851        });
852        let metadata = if is_all_default_metadata {
853            Vec::new()
854        } else {
855            metadata
856        };
857
858        ProtoRelationDesc {
859            typ: Some(self.typ.into_proto()),
860            names,
861            metadata,
862        }
863    }
864
865    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
866        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
867        // metadata field was added. If the field doesn't exist we fill it in with default values,
868        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
869        //
870        // Note: This logic needs to exist approximately forever.
871        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
872            let val = ProtoColumnMetadata {
873                added: Some(RelationVersion::root().into_proto()),
874                dropped: None,
875            };
876            Box::new(itertools::repeat_n(val, proto.names.len()))
877        } else {
878            Box::new(proto.metadata.into_iter())
879        };
880
881        let metadata = proto
882            .names
883            .into_iter()
884            .zip_eq(proto_metadata)
885            .enumerate()
886            .map(|(idx, (name, metadata))| {
887                let meta = ColumnMetadata {
888                    name: name.into_rust()?,
889                    typ_idx: idx,
890                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
891                    dropped: metadata.dropped.into_rust()?,
892                };
893                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
894            })
895            .collect::<Result<_, _>>()?;
896
897        Ok(RelationDesc {
898            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
899            metadata,
900        })
901    }
902}
903
904impl RelationDesc {
905    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
906    pub fn builder() -> RelationDescBuilder {
907        RelationDescBuilder::default()
908    }
909
910    /// Constructs a new `RelationDesc` that represents the empty relation
911    /// with no columns and no keys.
912    pub fn empty() -> Self {
913        RelationDesc {
914            typ: SqlRelationType::empty(),
915            metadata: BTreeMap::default(),
916        }
917    }
918
919    /// Check if the `RelationDesc` is empty.
920    pub fn is_empty(&self) -> bool {
921        self == &Self::empty()
922    }
923
924    /// Returns the number of columns in this [`RelationDesc`].
925    pub fn len(&self) -> usize {
926        self.typ().column_types.len()
927    }
928
929    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
930    /// over column names.
931    ///
932    /// # Panics
933    ///
934    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
935    /// items in `names`.
936    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
937    where
938        I: IntoIterator<Item = N>,
939        N: Into<ColumnName>,
940    {
941        let metadata: BTreeMap<_, _> = names
942            .into_iter()
943            .enumerate()
944            .map(|(idx, name)| {
945                let col_idx = ColumnIndex(idx);
946                let metadata = ColumnMetadata {
947                    name: name.into(),
948                    typ_idx: idx,
949                    added: RelationVersion::root(),
950                    dropped: None,
951                };
952                (col_idx, metadata)
953            })
954            .collect();
955
956        // TODO(parkmycar): Add better validation here.
957        assert_eq!(typ.column_types.len(), metadata.len());
958
959        RelationDesc { typ, metadata }
960    }
961
962    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
963    where
964        I: IntoIterator<Item = (N, T)>,
965        T: Into<SqlColumnType>,
966        N: Into<ColumnName>,
967    {
968        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
969        let types = types.into_iter().map(Into::into).collect();
970        let typ = SqlRelationType::new(types);
971        Self::new(typ, names)
972    }
973
974    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
975    ///
976    /// # Panics
977    ///
978    /// Panics if either `self` or `other` have columns that were added at a
979    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
980    /// columns were dropped.
981    ///
982    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
983    pub fn concat(mut self, other: Self) -> Self {
984        let self_len = self.typ.column_types.len();
985
986        for (typ, (_col_idx, meta)) in other
987            .typ
988            .column_types
989            .into_iter()
990            .zip_eq(other.metadata.into_iter())
991        {
992            assert_eq!(meta.added, RelationVersion::root());
993            assert_none!(meta.dropped);
994
995            let new_idx = self.typ.columns().len();
996            let new_meta = ColumnMetadata {
997                name: meta.name,
998                typ_idx: new_idx,
999                added: RelationVersion::root(),
1000                dropped: None,
1001            };
1002
1003            self.typ.column_types.push(typ);
1004            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1005
1006            assert_eq!(self.metadata.len(), self.typ.columns().len());
1007            assert_none!(prev);
1008        }
1009
1010        for k in other.typ.keys {
1011            let k = k.into_iter().map(|idx| idx + self_len).collect();
1012            self = self.with_key(k);
1013        }
1014        self
1015    }
1016
1017    /// Adds a new key for the relation.
1018    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1019        self.typ = self.typ.with_key(indices);
1020        self
1021    }
1022
1023    /// Drops all existing keys.
1024    pub fn without_keys(mut self) -> Self {
1025        self.typ.keys.clear();
1026        self
1027    }
1028
1029    /// Builds a new relation description with the column names replaced with
1030    /// new names.
1031    ///
1032    /// # Panics
1033    ///
1034    /// Panics if the arity of the relation type does not match the number of
1035    /// items in `names`.
1036    pub fn with_names<I, N>(self, names: I) -> Self
1037    where
1038        I: IntoIterator<Item = N>,
1039        N: Into<ColumnName>,
1040    {
1041        Self::new(self.typ, names)
1042    }
1043
1044    /// Computes the number of columns in the relation.
1045    pub fn arity(&self) -> usize {
1046        self.typ.arity()
1047    }
1048
1049    /// Returns the relation type underlying this relation description.
1050    pub fn typ(&self) -> &SqlRelationType {
1051        &self.typ
1052    }
1053
1054    /// Returns the owned relation type underlying this relation description.
1055    pub fn into_typ(self) -> SqlRelationType {
1056        self.typ
1057    }
1058
1059    /// Returns an iterator over the columns in this relation.
1060    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1061        self.metadata.values().map(|meta| {
1062            let typ = &self.typ.columns()[meta.typ_idx];
1063            (&meta.name, typ)
1064        })
1065    }
1066
1067    /// Returns an iterator over the types of the columns in this relation.
1068    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1069        self.typ.column_types.iter()
1070    }
1071
1072    /// Returns an iterator over the names of the columns in this relation.
1073    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1074        self.metadata.values().map(|meta| &meta.name)
1075    }
1076
1077    /// Returns an iterator over the columns in this relation, with all their metadata.
1078    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1079        self.metadata.iter().map(|(col_idx, metadata)| {
1080            let col_typ = &self.typ.columns()[metadata.typ_idx];
1081            (col_idx, &metadata.name, col_typ)
1082        })
1083    }
1084
1085    /// Returns an iterator over the names of the columns in this relation that are "similar" to
1086    /// the provided `name`.
1087    pub fn iter_similar_names<'a>(
1088        &'a self,
1089        name: &'a ColumnName,
1090    ) -> impl Iterator<Item = &'a ColumnName> {
1091        self.iter_names().filter(|n| n.is_similar(name))
1092    }
1093
1094    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
1095    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1096        self.metadata.contains_key(idx)
1097    }
1098
1099    /// Finds a column by name.
1100    ///
1101    /// Returns the index and type of the column named `name`. If no column with
1102    /// the specified name exists, returns `None`. If multiple columns have the
1103    /// specified name, the leftmost column is returned.
1104    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1105        self.iter_names()
1106            .position(|n| n == name)
1107            .map(|i| (i, &self.typ.column_types[i]))
1108    }
1109
1110    /// Gets the name of the `i`th column.
1111    ///
1112    /// # Panics
1113    ///
1114    /// Panics if `i` is not a valid column index.
1115    ///
1116    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
1117    pub fn get_name(&self, i: usize) -> &ColumnName {
1118        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1119        self.get_name_idx(&ColumnIndex(i))
1120    }
1121
1122    /// Gets the name of the column at `idx`.
1123    ///
1124    /// # Panics
1125    ///
1126    /// Panics if no column exists at `idx`.
1127    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1128        &self.metadata.get(idx).expect("should exist").name
1129    }
1130
1131    /// Mutably gets the name of the `i`th column.
1132    ///
1133    /// # Panics
1134    ///
1135    /// Panics if `i` is not a valid column index.
1136    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1137        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1138        &mut self
1139            .metadata
1140            .get_mut(&ColumnIndex(i))
1141            .expect("should exist")
1142            .name
1143    }
1144
1145    /// Gets the [`SqlColumnType`] of the column at `idx`.
1146    ///
1147    /// # Panics
1148    ///
1149    /// Panics if no column exists at `idx`.
1150    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1151        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1152        &self.typ.column_types[typ_idx]
1153    }
1154
1155    /// Gets the name of the `i`th column if that column name is unambiguous.
1156    ///
1157    /// If at least one other column has the same name as the `i`th column,
1158    /// returns `None`. If the `i`th column has no name, returns `None`.
1159    ///
1160    /// # Panics
1161    ///
1162    /// Panics if `i` is not a valid column index.
1163    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1164        let name = self.get_name(i);
1165        if self.iter_names().filter(|n| *n == name).count() == 1 {
1166            Some(name)
1167        } else {
1168            None
1169        }
1170    }
1171
1172    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1173    ///
1174    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1175    /// structure will be simple to extend.
1176    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1177        let name = self.get_name(i);
1178        let typ = &self.typ.column_types[i];
1179        if d == &Datum::Null && !typ.nullable {
1180            Err(NotNullViolation(name.clone()))
1181        } else {
1182            Ok(())
1183        }
1184    }
1185
1186    /// Computes the differences between two [`RelationDesc`]s.
1187    ///
1188    /// Returns a rich diff describing which columns differ, and in what way.
1189    ///
1190    /// # Panics
1191    ///
1192    /// Panics if either `self` or `other` have columns that were added at a
1193    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1194    /// columns were dropped.
1195    ///
1196    /// This simplifies things by allowing us to assume that `ColumnIndex`es are
1197    /// dense and that they match the indexes of `typ.columns()`. Without this
1198    /// we would, e.g., struggle comparing keys as those are in terms of
1199    /// `typ.columns()` indexes.
1200    pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1201        assert_eq!(self.metadata.len(), self.typ.columns().len());
1202        assert_eq!(other.metadata.len(), other.typ.columns().len());
1203        for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1204            assert_eq!(meta.typ_idx, idx.0);
1205            assert_eq!(meta.added, RelationVersion::root());
1206            assert_none!(meta.dropped);
1207        }
1208
1209        let mut column_diffs = BTreeMap::new();
1210        let mut key_diff = None;
1211
1212        let left_arity = self.arity();
1213        let right_arity = other.arity();
1214        let common_arity = std::cmp::min(left_arity, right_arity);
1215
1216        for idx in 0..common_arity {
1217            let left_name = self.get_name(idx);
1218            let right_name = other.get_name(idx);
1219            let left_type = &self.typ.column_types[idx];
1220            let right_type = &other.typ.column_types[idx];
1221
1222            if left_name != right_name {
1223                let diff = ColumnDiff::NameMismatch {
1224                    left: left_name.clone(),
1225                    right: right_name.clone(),
1226                };
1227                column_diffs.insert(idx, diff);
1228            } else if left_type.scalar_type != right_type.scalar_type {
1229                let diff = ColumnDiff::TypeMismatch {
1230                    name: left_name.clone(),
1231                    left: left_type.scalar_type.clone(),
1232                    right: right_type.scalar_type.clone(),
1233                };
1234                column_diffs.insert(idx, diff);
1235            } else if left_type.nullable != right_type.nullable {
1236                let diff = ColumnDiff::NullabilityMismatch {
1237                    name: left_name.clone(),
1238                    left: left_type.nullable,
1239                    right: right_type.nullable,
1240                };
1241                column_diffs.insert(idx, diff);
1242            }
1243        }
1244
1245        for idx in common_arity..left_arity {
1246            let diff = ColumnDiff::Missing {
1247                name: self.get_name(idx).clone(),
1248            };
1249            column_diffs.insert(idx, diff);
1250        }
1251
1252        for idx in common_arity..right_arity {
1253            let diff = ColumnDiff::Extra {
1254                name: other.get_name(idx).clone(),
1255            };
1256            column_diffs.insert(idx, diff);
1257        }
1258
1259        let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1260        let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1261        if left_keys != right_keys {
1262            let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1263                keys.iter()
1264                    .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1265                    .collect()
1266            };
1267            key_diff = Some(KeyDiff {
1268                left: column_names(self, left_keys),
1269                right: column_names(other, right_keys),
1270            });
1271        }
1272
1273        RelationDescDiff {
1274            column_diffs,
1275            key_diff,
1276        }
1277    }
1278
1279    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1280    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1281        let mut new_desc = self.clone();
1282
1283        // Update ColumnMetadata.
1284        let mut removed = 0;
1285        new_desc.metadata.retain(|idx, metadata| {
1286            let retain = demands.contains(&idx.0);
1287            if !retain {
1288                removed += 1;
1289            } else {
1290                metadata.typ_idx -= removed;
1291            }
1292            retain
1293        });
1294
1295        // Update SqlColumnType.
1296        let mut idx = 0;
1297        new_desc.typ.column_types.retain(|_| {
1298            let keep = demands.contains(&idx);
1299            idx += 1;
1300            keep
1301        });
1302
1303        new_desc
1304    }
1305}
1306
1307impl Arbitrary for RelationDesc {
1308    type Parameters = ();
1309    type Strategy = BoxedStrategy<RelationDesc>;
1310
1311    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1312        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1313        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1314            weights.extend([
1315                (12, Just(16..32)),
1316                (6, Just(32..64)),
1317                (3, Just(64..128)),
1318                (1, Just(128..256)),
1319            ]);
1320        }
1321        let num_columns = Union::new_weighted(weights);
1322
1323        num_columns.prop_flat_map(arb_relation_desc).boxed()
1324    }
1325}
1326
1327/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1328/// within the range provided.
1329pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1330    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1331        .prop_map(RelationDesc::from_names_and_types)
1332}
1333
1334/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1335pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1336    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1337    mask.prop_map(move |mask| {
1338        let demands: BTreeSet<_> = mask
1339            .into_iter()
1340            .enumerate()
1341            .filter_map(|(idx, keep)| keep.then_some(idx))
1342            .collect();
1343        desc.apply_demand(&demands)
1344    })
1345}
1346
1347impl IntoIterator for RelationDesc {
1348    type Item = (ColumnName, SqlColumnType);
1349    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1350
1351    fn into_iter(self) -> Self::IntoIter {
1352        let iter = self
1353            .metadata
1354            .into_values()
1355            .zip_eq(self.typ.column_types)
1356            .map(|(meta, typ)| (meta.name, typ));
1357        Box::new(iter)
1358    }
1359}
1360
1361/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1362pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1363    let datums: Vec<_> = desc
1364        .typ()
1365        .columns()
1366        .iter()
1367        .cloned()
1368        .map(arb_datum_for_column)
1369        .collect();
1370    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1371}
1372
1373/// Expression violated not-null constraint on named column
1374#[derive(Debug, PartialEq, Eq)]
1375pub struct NotNullViolation(pub ColumnName);
1376
1377impl fmt::Display for NotNullViolation {
1378    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1379        write!(
1380            f,
1381            "null value in column {} violates not-null constraint",
1382            self.0.quoted()
1383        )
1384    }
1385}
1386
1387/// The result of comparing two [`RelationDesc`]s.
1388#[derive(Debug, Clone, PartialEq, Eq)]
1389pub struct RelationDescDiff {
1390    /// Column differences, keyed by column index.
1391    pub column_diffs: BTreeMap<usize, ColumnDiff>,
1392    /// Key differences, if any.
1393    pub key_diff: Option<KeyDiff>,
1394}
1395
1396impl RelationDescDiff {
1397    /// Returns whether the diff contains any differences.
1398    pub fn is_empty(&self) -> bool {
1399        self.column_diffs.is_empty() && self.key_diff.is_none()
1400    }
1401}
1402
1403/// A difference in a column between two [`RelationDesc`]s.
1404#[derive(Debug, Clone, PartialEq, Eq)]
1405pub enum ColumnDiff {
1406    /// Column exists only in the left relation.
1407    Missing { name: ColumnName },
1408    /// Column exists only in the right relation.
1409    Extra { name: ColumnName },
1410    /// Columns have different types.
1411    TypeMismatch {
1412        name: ColumnName,
1413        left: SqlScalarType,
1414        right: SqlScalarType,
1415    },
1416    /// Columns have different nullability.
1417    NullabilityMismatch {
1418        name: ColumnName,
1419        left: bool,
1420        right: bool,
1421    },
1422    /// Columns have different names.
1423    NameMismatch { left: ColumnName, right: ColumnName },
1424}
1425
1426/// A difference in the keys of two [`RelationDesc`]s.
1427#[derive(Debug, Clone, PartialEq, Eq)]
1428pub struct KeyDiff {
1429    /// Keys of the left relation.
1430    pub left: BTreeSet<Vec<ColumnName>>,
1431    /// Keys of the right relation.
1432    pub right: BTreeSet<Vec<ColumnName>>,
1433}
1434
1435/// A builder for a [`RelationDesc`].
1436#[derive(Clone, Default, Debug, PartialEq, Eq)]
1437pub struct RelationDescBuilder {
1438    /// Columns of the relation.
1439    columns: Vec<(ColumnName, SqlColumnType)>,
1440    /// Sets of indices that are "keys" for the collection.
1441    keys: Vec<Vec<usize>>,
1442}
1443
1444impl RelationDescBuilder {
1445    /// Appends a column with the specified name and type.
1446    pub fn with_column<N: Into<ColumnName>>(
1447        mut self,
1448        name: N,
1449        ty: SqlColumnType,
1450    ) -> RelationDescBuilder {
1451        let name = name.into();
1452        self.columns.push((name, ty));
1453        self
1454    }
1455
1456    /// Appends the provided columns to the builder.
1457    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1458    where
1459        I: IntoIterator<Item = (N, T)>,
1460        T: Into<SqlColumnType>,
1461        N: Into<ColumnName>,
1462    {
1463        self.columns
1464            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1465        self
1466    }
1467
1468    /// Adds a new key for the relation.
1469    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1470        indices.sort_unstable();
1471        if !self.keys.contains(&indices) {
1472            self.keys.push(indices);
1473        }
1474        self
1475    }
1476
1477    /// Removes all previously inserted keys.
1478    pub fn without_keys(mut self) -> RelationDescBuilder {
1479        self.keys.clear();
1480        assert_eq!(self.keys.len(), 0);
1481        self
1482    }
1483
1484    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1485    pub fn concat(mut self, other: Self) -> Self {
1486        let self_len = self.columns.len();
1487
1488        self.columns.extend(other.columns);
1489        for k in other.keys {
1490            let k = k.into_iter().map(|idx| idx + self_len).collect();
1491            self = self.with_key(k);
1492        }
1493
1494        self
1495    }
1496
1497    /// Finish the builder, returning a [`RelationDesc`].
1498    pub fn finish(self) -> RelationDesc {
1499        let mut desc = RelationDesc::from_names_and_types(self.columns);
1500        desc.typ = desc.typ.with_keys(self.keys);
1501        desc
1502    }
1503}
1504
1505/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1506#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1507pub enum RelationVersionSelector {
1508    Specific(RelationVersion),
1509    Latest,
1510}
1511
1512impl RelationVersionSelector {
1513    pub fn specific(version: u64) -> Self {
1514        RelationVersionSelector::Specific(RelationVersion(version))
1515    }
1516}
1517
1518/// A wrapper around [`RelationDesc`] that provides an interface for adding
1519/// columns and generating new versions.
1520///
1521/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1522/// be great.
1523#[derive(Debug, Clone, Serialize)]
1524pub struct VersionedRelationDesc {
1525    inner: RelationDesc,
1526}
1527
1528impl VersionedRelationDesc {
1529    pub fn new(inner: RelationDesc) -> Self {
1530        VersionedRelationDesc { inner }
1531    }
1532
1533    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1534    ///
1535    /// # Panics
1536    ///
1537    /// * Panics if a column with `name` already exists that hasn't been dropped.
1538    ///
1539    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1540    #[must_use]
1541    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1542    where
1543        N: Into<ColumnName>,
1544        T: Into<SqlColumnType>,
1545    {
1546        let latest_version = self.latest_version();
1547        let new_version = latest_version.bump();
1548
1549        let name = name.into();
1550        let existing = self
1551            .inner
1552            .metadata
1553            .iter()
1554            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1555        if let Some(existing) = existing {
1556            panic!("column named '{name}' already exists! {existing:?}");
1557        }
1558
1559        let next_idx = self.inner.metadata.len();
1560        let col_meta = ColumnMetadata {
1561            name,
1562            typ_idx: next_idx,
1563            added: new_version,
1564            dropped: None,
1565        };
1566
1567        self.inner.typ.column_types.push(typ.into());
1568        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1569
1570        assert_none!(prev, "column index overlap!");
1571        self.validate();
1572
1573        new_version
1574    }
1575
1576    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1577    /// `name` drops the left-most one that hasn't already been dropped.
1578    ///
1579    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1580    ///
1581    /// # Panics
1582    ///
1583    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1584    #[must_use]
1585    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1586    where
1587        N: Into<ColumnName>,
1588    {
1589        let name = name.into();
1590        let latest_version = self.latest_version();
1591        let new_version = latest_version.bump();
1592
1593        let col = self
1594            .inner
1595            .metadata
1596            .values_mut()
1597            .find(|meta| meta.name == name && meta.dropped.is_none())
1598            .expect("column to exist");
1599
1600        // Make sure the column hadn't been previously dropped.
1601        assert_none!(col.dropped, "column was already dropped");
1602        col.dropped = Some(new_version);
1603
1604        // Make sure the column isn't being used as a key.
1605        let dropped_key = self
1606            .inner
1607            .typ
1608            .keys
1609            .iter()
1610            .any(|keys| keys.contains(&col.typ_idx));
1611        assert!(!dropped_key, "column being dropped was used as a key");
1612
1613        self.validate();
1614        new_version
1615    }
1616
1617    /// Returns the [`RelationDesc`] at the latest version.
1618    pub fn latest(&self) -> RelationDesc {
1619        self.inner.clone()
1620    }
1621
1622    /// Returns this [`RelationDesc`] at the specified version.
1623    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1624        // Get all of the changes from the start, up to whatever version was requested.
1625        let up_to_version = match version {
1626            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1627            RelationVersionSelector::Specific(v) => v,
1628        };
1629
1630        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1631            let added = meta.added <= up_to_version;
1632            let dropped = meta
1633                .dropped
1634                .map(|dropped_at| up_to_version >= dropped_at)
1635                .unwrap_or(false);
1636
1637            added && !dropped
1638        });
1639
1640        let mut column_types = Vec::new();
1641        let mut column_metas = BTreeMap::new();
1642
1643        // N.B. At this point we need to be careful because col_idx might not
1644        // equal typ_idx.
1645        //
1646        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1647        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1648        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1649        for (col_idx, meta) in valid_columns {
1650            let new_meta = ColumnMetadata {
1651                name: meta.name.clone(),
1652                typ_idx: column_types.len(),
1653                added: meta.added.clone(),
1654                dropped: meta.dropped.clone(),
1655            };
1656            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1657            column_metas.insert(*col_idx, new_meta);
1658        }
1659
1660        // Remap keys in case a column with an index less than that of a key was
1661        // dropped.
1662        //
1663        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1664        // keys and "b" was dropped.
1665        let keys = self
1666            .inner
1667            .typ
1668            .keys
1669            .iter()
1670            .map(|keys| {
1671                keys.iter()
1672                    .map(|key_idx| {
1673                        let metadata = column_metas
1674                            .get(&ColumnIndex(*key_idx))
1675                            .expect("found key for column that doesn't exist");
1676                        metadata.typ_idx
1677                    })
1678                    .collect()
1679            })
1680            .collect();
1681
1682        let relation_type = SqlRelationType { column_types, keys };
1683
1684        RelationDesc {
1685            typ: relation_type,
1686            metadata: column_metas,
1687        }
1688    }
1689
1690    pub fn latest_version(&self) -> RelationVersion {
1691        self.inner
1692            .metadata
1693            .values()
1694            // N.B. Dropped is always greater than added.
1695            .map(|meta| meta.dropped.unwrap_or(meta.added))
1696            .max()
1697            // If there aren't any columns we're implicitly the root version.
1698            .unwrap_or_else(RelationVersion::root)
1699    }
1700
1701    /// Validates internal contraints of the [`RelationDesc`] are correct.
1702    ///
1703    /// # Panics
1704    ///
1705    /// Panics if a constraint is not satisfied.
1706    fn validate(&self) {
1707        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1708            if desc.typ.column_types.len() != desc.metadata.len() {
1709                anyhow::bail!("mismatch between number of types and metadatas");
1710            }
1711
1712            for (col_idx, meta) in &desc.metadata {
1713                if col_idx.0 > desc.metadata.len() {
1714                    anyhow::bail!("column index out of bounds");
1715                }
1716                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1717                    anyhow::bail!("column was added after it was dropped?");
1718                }
1719                if desc.typ().columns().get(meta.typ_idx).is_none() {
1720                    anyhow::bail!("typ_idx incorrect");
1721                }
1722            }
1723
1724            for keys in &desc.typ.keys {
1725                for key in keys {
1726                    if *key >= desc.typ.column_types.len() {
1727                        anyhow::bail!("key index was out of bounds!");
1728                    }
1729                }
1730            }
1731
1732            let versions = desc
1733                .metadata
1734                .values()
1735                .map(|meta| meta.dropped.unwrap_or(meta.added));
1736            let mut max = 0;
1737            let mut sum = 0;
1738            for version in versions {
1739                max = std::cmp::max(max, version.0);
1740                sum += version.0;
1741            }
1742
1743            // Other than RelationVersion(0), we should never have duplicate
1744            // versions and they should always increase by 1. In other words, the
1745            // sum of all RelationVersions should be the sum of [0, max].
1746            //
1747            // N.B. n * (n + 1) / 2 = sum of [0, n]
1748            //
1749            // While I normally don't like tricks like this, it allows us to
1750            // validate that our column versions are correct in O(n) time and
1751            // without allocations.
1752            if sum != (max * (max + 1) / 2) {
1753                anyhow::bail!("there is a duplicate or missing relation version");
1754            }
1755
1756            Ok(())
1757        }
1758
1759        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1760    }
1761}
1762
1763/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1764/// exercise schema migrations.
1765#[derive(Debug)]
1766pub enum PropRelationDescDiff {
1767    AddColumn {
1768        name: ColumnName,
1769        typ: SqlColumnType,
1770    },
1771    DropColumn {
1772        name: ColumnName,
1773    },
1774    ToggleNullability {
1775        name: ColumnName,
1776    },
1777    ChangeType {
1778        name: ColumnName,
1779        typ: SqlColumnType,
1780    },
1781}
1782
1783impl PropRelationDescDiff {
1784    pub fn apply(self, desc: &mut RelationDesc) {
1785        match self {
1786            PropRelationDescDiff::AddColumn { name, typ } => {
1787                let new_idx = desc.metadata.len();
1788                let meta = ColumnMetadata {
1789                    name,
1790                    typ_idx: new_idx,
1791                    added: RelationVersion(0),
1792                    dropped: None,
1793                };
1794                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1795                desc.typ.column_types.push(typ);
1796
1797                assert_none!(prev);
1798                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1799            }
1800            PropRelationDescDiff::DropColumn { name } => {
1801                let next_version = desc
1802                    .metadata
1803                    .values()
1804                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1805                    .max()
1806                    .unwrap_or_else(RelationVersion::root)
1807                    .bump();
1808                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1809                else {
1810                    return;
1811                };
1812                if metadata.dropped.is_none() {
1813                    metadata.dropped = Some(next_version);
1814                }
1815            }
1816            PropRelationDescDiff::ToggleNullability { name } => {
1817                let Some((pos, _)) = desc.get_by_name(&name) else {
1818                    return;
1819                };
1820                let col_type = desc
1821                    .typ
1822                    .column_types
1823                    .get_mut(pos)
1824                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1825                col_type.nullable = !col_type.nullable;
1826            }
1827            PropRelationDescDiff::ChangeType { name, typ } => {
1828                let Some((pos, _)) = desc.get_by_name(&name) else {
1829                    return;
1830                };
1831                let col_type = desc
1832                    .typ
1833                    .column_types
1834                    .get_mut(pos)
1835                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1836                *col_type = typ;
1837            }
1838        }
1839    }
1840}
1841
1842/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1843pub fn arb_relation_desc_diff(
1844    source: &RelationDesc,
1845) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1846    let source = Rc::new(source.clone());
1847    let num_source_columns = source.typ.columns().len();
1848
1849    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1850    let add_columns_strat = num_add_columns
1851        .prop_flat_map(|num_columns| {
1852            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1853        })
1854        .prop_map(|cols| {
1855            cols.into_iter()
1856                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1857                .collect::<Vec<_>>()
1858        });
1859
1860    // If the source RelationDesc is empty there is nothing else to do.
1861    if num_source_columns == 0 {
1862        return add_columns_strat.boxed();
1863    }
1864
1865    let source_ = Rc::clone(&source);
1866    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1867        let mut set = BTreeSet::default();
1868        for _ in 0..num_columns {
1869            let col_idx = rng.random_range(0..num_source_columns);
1870            set.insert(source_.get_name(col_idx).clone());
1871        }
1872        set.into_iter()
1873            .map(|name| PropRelationDescDiff::DropColumn { name })
1874            .collect::<Vec<_>>()
1875    });
1876
1877    let source_ = Rc::clone(&source);
1878    let toggle_nullability_strat =
1879        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1880            let mut set = BTreeSet::default();
1881            for _ in 0..num_columns {
1882                let col_idx = rng.random_range(0..num_source_columns);
1883                set.insert(source_.get_name(col_idx).clone());
1884            }
1885            set.into_iter()
1886                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1887                .collect::<Vec<_>>()
1888        });
1889
1890    let source_ = Rc::clone(&source);
1891    let change_type_strat = (0..num_source_columns)
1892        .prop_perturb(move |num_columns, mut rng| {
1893            let mut set = BTreeSet::default();
1894            for _ in 0..num_columns {
1895                let col_idx = rng.random_range(0..num_source_columns);
1896                set.insert(source_.get_name(col_idx).clone());
1897            }
1898            set
1899        })
1900        .prop_flat_map(|cols| {
1901            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1902                .prop_map(move |types| (cols.clone(), types))
1903        })
1904        .prop_map(|(cols, types)| {
1905            cols.into_iter()
1906                .zip_eq(types)
1907                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1908                .collect::<Vec<_>>()
1909        });
1910
1911    (
1912        add_columns_strat,
1913        drop_columns_strat,
1914        toggle_nullability_strat,
1915        change_type_strat,
1916    )
1917        .prop_map(|(adds, drops, toggles, changes)| {
1918            adds.into_iter()
1919                .chain(drops)
1920                .chain(toggles)
1921                .chain(changes)
1922                .collect::<Vec<_>>()
1923        })
1924        .prop_shuffle()
1925        .boxed()
1926}
1927
1928#[cfg(test)]
1929mod tests {
1930    use super::*;
1931    use prost::Message;
1932
1933    #[mz_ore::test]
1934    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1935    fn smoktest_at_version() {
1936        let desc = RelationDesc::builder()
1937            .with_column("a", SqlScalarType::Bool.nullable(true))
1938            .with_column("z", SqlScalarType::String.nullable(false))
1939            .finish();
1940
1941        let mut versioned_desc = VersionedRelationDesc {
1942            inner: desc.clone(),
1943        };
1944        versioned_desc.validate();
1945
1946        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1947        assert_eq!(desc, latest);
1948
1949        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1950        assert_eq!(desc, v0);
1951
1952        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1953        assert_eq!(desc, v3);
1954
1955        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
1956        assert_eq!(v1, RelationVersion(1));
1957
1958        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1959        insta::assert_json_snapshot!(v1.metadata, @r###"
1960        {
1961          "0": {
1962            "name": "a",
1963            "typ_idx": 0,
1964            "added": 0,
1965            "dropped": null
1966          },
1967          "1": {
1968            "name": "z",
1969            "typ_idx": 1,
1970            "added": 0,
1971            "dropped": null
1972          },
1973          "2": {
1974            "name": "b",
1975            "typ_idx": 2,
1976            "added": 1,
1977            "dropped": null
1978          }
1979        }
1980        "###);
1981
1982        // Check that V0 doesn't show the new column.
1983        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1984        assert!(v0.iter().eq(v0_b.iter()));
1985
1986        let v2 = versioned_desc.drop_column("z");
1987        assert_eq!(v2, RelationVersion(2));
1988
1989        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1990        insta::assert_json_snapshot!(v2.metadata, @r###"
1991        {
1992          "0": {
1993            "name": "a",
1994            "typ_idx": 0,
1995            "added": 0,
1996            "dropped": null
1997          },
1998          "2": {
1999            "name": "b",
2000            "typ_idx": 1,
2001            "added": 1,
2002            "dropped": null
2003          }
2004        }
2005        "###);
2006
2007        // Check that V0 and V1 are still correct.
2008        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2009        assert!(v0.iter().eq(v0_c.iter()));
2010
2011        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2012        assert!(v1.iter().eq(v1_b.iter()));
2013
2014        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2015        {
2016          "0": {
2017            "name": "a",
2018            "typ_idx": 0,
2019            "added": 0,
2020            "dropped": null
2021          },
2022          "1": {
2023            "name": "z",
2024            "typ_idx": 1,
2025            "added": 0,
2026            "dropped": 2
2027          },
2028          "2": {
2029            "name": "b",
2030            "typ_idx": 2,
2031            "added": 1,
2032            "dropped": null
2033          }
2034        }
2035        "###);
2036    }
2037
2038    #[mz_ore::test]
2039    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2040    fn test_dropping_columns_with_keys() {
2041        let desc = RelationDesc::builder()
2042            .with_column("a", SqlScalarType::Bool.nullable(true))
2043            .with_column("z", SqlScalarType::String.nullable(false))
2044            .with_key(vec![1])
2045            .finish();
2046
2047        let mut versioned_desc = VersionedRelationDesc {
2048            inner: desc.clone(),
2049        };
2050        versioned_desc.validate();
2051
2052        let v1 = versioned_desc.drop_column("a");
2053        assert_eq!(v1, RelationVersion(1));
2054
2055        // Make sure the key index for 'z' got remapped since 'a' was dropped.
2056        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2057        insta::assert_json_snapshot!(v1, @r###"
2058        {
2059          "typ": {
2060            "column_types": [
2061              {
2062                "scalar_type": "String",
2063                "nullable": false
2064              }
2065            ],
2066            "keys": [
2067              [
2068                0
2069              ]
2070            ]
2071          },
2072          "metadata": {
2073            "1": {
2074              "name": "z",
2075              "typ_idx": 0,
2076              "added": 0,
2077              "dropped": null
2078            }
2079          }
2080        }
2081        "###);
2082
2083        // Make sure the key index of 'z' is correct when all columns are present.
2084        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2085        insta::assert_json_snapshot!(v0, @r###"
2086        {
2087          "typ": {
2088            "column_types": [
2089              {
2090                "scalar_type": "Bool",
2091                "nullable": true
2092              },
2093              {
2094                "scalar_type": "String",
2095                "nullable": false
2096              }
2097            ],
2098            "keys": [
2099              [
2100                1
2101              ]
2102            ]
2103          },
2104          "metadata": {
2105            "0": {
2106              "name": "a",
2107              "typ_idx": 0,
2108              "added": 0,
2109              "dropped": 1
2110            },
2111            "1": {
2112              "name": "z",
2113              "typ_idx": 1,
2114              "added": 0,
2115              "dropped": null
2116            }
2117          }
2118        }
2119        "###);
2120    }
2121
2122    #[mz_ore::test]
2123    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2124    fn roundtrip_relation_desc_without_metadata() {
2125        let typ = ProtoRelationType {
2126            column_types: vec![
2127                SqlScalarType::String.nullable(false).into_proto(),
2128                SqlScalarType::Bool.nullable(true).into_proto(),
2129            ],
2130            keys: vec![],
2131        };
2132        let proto = ProtoRelationDesc {
2133            typ: Some(typ),
2134            names: vec![
2135                ColumnName("a".into()).into_proto(),
2136                ColumnName("b".into()).into_proto(),
2137            ],
2138            metadata: vec![],
2139        };
2140        let desc: RelationDesc = proto.into_rust().unwrap();
2141
2142        insta::assert_json_snapshot!(desc, @r###"
2143        {
2144          "typ": {
2145            "column_types": [
2146              {
2147                "scalar_type": "String",
2148                "nullable": false
2149              },
2150              {
2151                "scalar_type": "Bool",
2152                "nullable": true
2153              }
2154            ],
2155            "keys": []
2156          },
2157          "metadata": {
2158            "0": {
2159              "name": "a",
2160              "typ_idx": 0,
2161              "added": 0,
2162              "dropped": null
2163            },
2164            "1": {
2165              "name": "b",
2166              "typ_idx": 1,
2167              "added": 0,
2168              "dropped": null
2169            }
2170          }
2171        }
2172        "###);
2173    }
2174
2175    #[mz_ore::test]
2176    #[should_panic(expected = "column named 'a' already exists!")]
2177    fn test_add_column_with_same_name_panics() {
2178        let desc = RelationDesc::builder()
2179            .with_column("a", SqlScalarType::Bool.nullable(true))
2180            .finish();
2181        let mut versioned = VersionedRelationDesc::new(desc);
2182
2183        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2184    }
2185
2186    #[mz_ore::test]
2187    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2188    fn test_add_column_with_same_name_prev_dropped() {
2189        let desc = RelationDesc::builder()
2190            .with_column("a", SqlScalarType::Bool.nullable(true))
2191            .finish();
2192        let mut versioned = VersionedRelationDesc::new(desc);
2193
2194        let v1 = versioned.drop_column("a");
2195        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2196        insta::assert_json_snapshot!(v1, @r###"
2197        {
2198          "typ": {
2199            "column_types": [],
2200            "keys": []
2201          },
2202          "metadata": {}
2203        }
2204        "###);
2205
2206        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2207        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2208        insta::assert_json_snapshot!(v2, @r###"
2209        {
2210          "typ": {
2211            "column_types": [
2212              {
2213                "scalar_type": "String",
2214                "nullable": false
2215              }
2216            ],
2217            "keys": []
2218          },
2219          "metadata": {
2220            "1": {
2221              "name": "a",
2222              "typ_idx": 0,
2223              "added": 2,
2224              "dropped": null
2225            }
2226          }
2227        }
2228        "###);
2229    }
2230
2231    #[mz_ore::test]
2232    #[cfg_attr(miri, ignore)]
2233    fn apply_demand() {
2234        let desc = RelationDesc::builder()
2235            .with_column("a", SqlScalarType::String.nullable(true))
2236            .with_column("b", SqlScalarType::Int64.nullable(false))
2237            .with_column("c", SqlScalarType::Time.nullable(false))
2238            .finish();
2239        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2240        assert_eq!(desc.arity(), 2);
2241        // TODO(parkmycar): Move validate onto RelationDesc.
2242        VersionedRelationDesc::new(desc).validate();
2243    }
2244
2245    #[mz_ore::test]
2246    #[cfg_attr(miri, ignore)]
2247    fn smoketest_column_index_stable_ident() {
2248        let idx_a = ColumnIndex(42);
2249        // Note(parkmycar): This should never change.
2250        assert_eq!(idx_a.to_stable_name(), "42");
2251    }
2252
2253    #[mz_ore::test]
2254    #[cfg_attr(miri, ignore)] // too slow
2255    fn proptest_relation_desc_roundtrips() {
2256        fn testcase(og: RelationDesc) {
2257            let bytes = og.into_proto().encode_to_vec();
2258            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2259            let rnd = RelationDesc::from_proto(proto).unwrap();
2260
2261            assert_eq!(og, rnd);
2262        }
2263
2264        proptest!(|(desc in any::<RelationDesc>())| {
2265            testcase(desc);
2266        });
2267
2268        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2269            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2270        });
2271
2272        proptest!(|((mut desc, diffs) in strat)| {
2273            for diff in diffs {
2274                diff.apply(&mut desc);
2275            };
2276            testcase(desc);
2277        });
2278    }
2279}