Skip to main content

mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26
27use crate::relation_and_scalar::proto_relation_type::ProtoKey;
28pub use crate::relation_and_scalar::{
29    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
30    ProtoRelationVersion,
31};
32use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
33
34/// The type of a [`Datum`].
35///
36/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
37/// Int32 or String) with its nullability.
38///
39/// To construct a column type, either initialize the struct directly, or
40/// use the [`SqlScalarType::nullable`] method.
41#[derive(
42    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
43)]
44pub struct SqlColumnType {
45    /// The underlying scalar type (e.g., Int32 or String) of this column.
46    pub scalar_type: SqlScalarType,
47    /// Whether this datum can be null.
48    #[serde(default = "return_true")]
49    pub nullable: bool,
50}
51
52/// This method exists solely for the purpose of making SqlColumnType nullable by
53/// default in unit tests. The default value of a bool is false, and the only
54/// way to make an object take on any other value by default is to pass it a
55/// function that returns the desired default value. See
56/// <https://github.com/serde-rs/serde/issues/1030>
57#[inline(always)]
58fn return_true() -> bool {
59    true
60}
61
62impl SqlColumnType {
63    pub fn backport_nullability(&mut self, backport_typ: &SqlColumnType) {
64        self.scalar_type
65            .backport_nullability(&backport_typ.scalar_type);
66        self.nullable = backport_typ.nullable;
67    }
68
69    pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
70        match (&self.scalar_type, &other.scalar_type) {
71            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
72                Ok(SqlColumnType {
73                    scalar_type: scalar_type.clone(),
74                    nullable: self.nullable || other.nullable,
75                })
76            }
77            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
78                Ok(SqlColumnType {
79                    scalar_type: scalar_type.without_modifiers(),
80                    nullable: self.nullable || other.nullable,
81                })
82            }
83            (
84                SqlScalarType::Record { fields, custom_id },
85                SqlScalarType::Record {
86                    fields: other_fields,
87                    custom_id: other_custom_id,
88                },
89            ) => {
90                if custom_id != other_custom_id {
91                    bail!(
92                        "Can't union types: {:?} and {:?}",
93                        self.scalar_type,
94                        other.scalar_type
95                    );
96                };
97
98                if fields.len() != other_fields.len() {
99                    bail!(
100                        "Can't union types: {:?} and {:?}",
101                        self.scalar_type,
102                        other.scalar_type
103                    );
104                }
105                let mut union_fields = Vec::with_capacity(fields.len());
106                for ((name, typ), (other_name, other_typ)) in
107                    fields.iter().zip_eq(other_fields.iter())
108                {
109                    if name != other_name {
110                        bail!(
111                            "Can't union types: {:?} and {:?}",
112                            self.scalar_type,
113                            other.scalar_type
114                        );
115                    } else {
116                        let union_column_type = typ.union(other_typ)?;
117                        union_fields.push((name.clone(), union_column_type));
118                    };
119                }
120
121                Ok(SqlColumnType {
122                    scalar_type: SqlScalarType::Record {
123                        fields: union_fields.into(),
124                        custom_id: *custom_id,
125                    },
126                    nullable: self.nullable || other.nullable,
127                })
128            }
129            _ => bail!(
130                "Can't union types: {:?} and {:?}",
131                self.scalar_type,
132                other.scalar_type
133            ),
134        }
135    }
136
137    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
138    /// nullability set to the specified boolean.
139    pub fn nullable(mut self, nullable: bool) -> Self {
140        self.nullable = nullable;
141        self
142    }
143}
144
145impl RustType<ProtoColumnType> for SqlColumnType {
146    fn into_proto(&self) -> ProtoColumnType {
147        ProtoColumnType {
148            nullable: self.nullable,
149            scalar_type: Some(self.scalar_type.into_proto()),
150        }
151    }
152
153    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
154        Ok(SqlColumnType {
155            nullable: proto.nullable,
156            scalar_type: proto
157                .scalar_type
158                .into_rust_if_some("ProtoColumnType::scalar_type")?,
159        })
160    }
161}
162
163impl fmt::Display for SqlColumnType {
164    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165        let nullable = if self.nullable { "Null" } else { "NotNull" };
166        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
167    }
168}
169
170/// The type of a relation.
171#[derive(
172    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
173)]
174pub struct SqlRelationType {
175    /// The type for each column, in order.
176    pub column_types: Vec<SqlColumnType>,
177    /// Sets of indices that are "keys" for the collection.
178    ///
179    /// Each element in this list is a set of column indices, each with the
180    /// property that the collection contains at most one record with each
181    /// distinct set of values for each column. Alternately, for a specific set
182    /// of values assigned to the these columns there is at most one record.
183    ///
184    /// A collection can contain multiple sets of keys, although it is common to
185    /// have either zero or one sets of key indices.
186    #[serde(default)]
187    pub keys: Vec<Vec<usize>>,
188}
189
190impl SqlRelationType {
191    /// Constructs a `SqlRelationType` representing the relation with no columns and
192    /// no keys.
193    pub fn empty() -> Self {
194        SqlRelationType::new(vec![])
195    }
196
197    /// Constructs a new `SqlRelationType` from specified column types.
198    ///
199    /// The `SqlRelationType` will have no keys.
200    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
201        SqlRelationType {
202            column_types,
203            keys: Vec::new(),
204        }
205    }
206
207    /// Adds a new key for the relation.
208    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
209        indices.sort_unstable();
210        if !self.keys.contains(&indices) {
211            self.keys.push(indices);
212        }
213        self
214    }
215
216    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
217        for key in keys {
218            self = self.with_key(key)
219        }
220        self
221    }
222
223    /// Computes the number of columns in the relation.
224    pub fn arity(&self) -> usize {
225        self.column_types.len()
226    }
227
228    /// Gets the index of the columns used when creating a default index.
229    pub fn default_key(&self) -> Vec<usize> {
230        if let Some(key) = self.keys.first() {
231            if key.is_empty() {
232                (0..self.column_types.len()).collect()
233            } else {
234                key.clone()
235            }
236        } else {
237            (0..self.column_types.len()).collect()
238        }
239    }
240
241    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
242    pub fn columns(&self) -> &[SqlColumnType] {
243        &self.column_types
244    }
245
246    /// Adopts the nullability and keys from another `SqlRelationType`.
247    ///
248    /// Panics if the number of columns does not match.
249    pub fn backport_nullability_and_keys(&mut self, backport_typ: &SqlRelationType) {
250        assert_eq!(
251            backport_typ.column_types.len(),
252            self.column_types.len(),
253            "HIR and MIR types should have the same number of columns"
254        );
255        for (backport_col, sql_col) in backport_typ
256            .column_types
257            .iter()
258            .zip_eq(self.column_types.iter_mut())
259        {
260            sql_col.backport_nullability(backport_col);
261        }
262
263        self.keys = backport_typ.keys.clone();
264    }
265}
266
267impl RustType<ProtoRelationType> for SqlRelationType {
268    fn into_proto(&self) -> ProtoRelationType {
269        ProtoRelationType {
270            column_types: self.column_types.into_proto(),
271            keys: self.keys.into_proto(),
272        }
273    }
274
275    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
276        Ok(SqlRelationType {
277            column_types: proto.column_types.into_rust()?,
278            keys: proto.keys.into_rust()?,
279        })
280    }
281}
282
283impl RustType<ProtoKey> for Vec<usize> {
284    fn into_proto(&self) -> ProtoKey {
285        ProtoKey {
286            keys: self.into_proto(),
287        }
288    }
289
290    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
291        proto.keys.into_rust()
292    }
293}
294
295/// The type of a relation.
296#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
297pub struct ReprRelationType {
298    /// The type for each column, in order.
299    pub column_types: Vec<ReprColumnType>,
300    /// Sets of indices that are "keys" for the collection.
301    ///
302    /// Each element in this list is a set of column indices, each with the
303    /// property that the collection contains at most one record with each
304    /// distinct set of values for each column. Alternately, for a specific set
305    /// of values assigned to the these columns there is at most one record.
306    ///
307    /// A collection can contain multiple sets of keys, although it is common to
308    /// have either zero or one sets of key indices.
309    #[serde(default)]
310    pub keys: Vec<Vec<usize>>,
311}
312
313impl ReprRelationType {
314    /// Constructs a `ReprRelationType` representing the relation with no columns and
315    /// no keys.
316    pub fn empty() -> Self {
317        ReprRelationType::new(vec![])
318    }
319
320    /// Constructs a new `ReprRelationType` from specified column types.
321    ///
322    /// The `ReprRelationType` will have no keys.
323    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
324        ReprRelationType {
325            column_types,
326            keys: Vec::new(),
327        }
328    }
329
330    /// Adds a new key for the relation.
331    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
332        indices.sort_unstable();
333        if !self.keys.contains(&indices) {
334            self.keys.push(indices);
335        }
336        self
337    }
338
339    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
340        for key in keys {
341            self = self.with_key(key)
342        }
343        self
344    }
345
346    /// Computes the number of columns in the relation.
347    pub fn arity(&self) -> usize {
348        self.column_types.len()
349    }
350
351    /// Gets the index of the columns used when creating a default index.
352    pub fn default_key(&self) -> Vec<usize> {
353        if let Some(key) = self.keys.first() {
354            if key.is_empty() {
355                (0..self.column_types.len()).collect()
356            } else {
357                key.clone()
358            }
359        } else {
360            (0..self.column_types.len()).collect()
361        }
362    }
363
364    /// Returns all the column types in order, for this relation.
365    pub fn columns(&self) -> &[ReprColumnType] {
366        &self.column_types
367    }
368}
369
370impl From<&SqlRelationType> for ReprRelationType {
371    fn from(sql_relation_type: &SqlRelationType) -> Self {
372        ReprRelationType {
373            column_types: sql_relation_type
374                .column_types
375                .iter()
376                .map(ReprColumnType::from)
377                .collect(),
378            keys: sql_relation_type.keys.clone(),
379        }
380    }
381}
382
383#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
384pub struct ReprColumnType {
385    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
386    pub scalar_type: ReprScalarType,
387    /// Whether this datum can be null.
388    #[serde(default = "return_true")]
389    pub nullable: bool,
390}
391
392impl ReprColumnType {
393    pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
394        let scalar_type = self.scalar_type.union(&col.scalar_type)?;
395        let nullable = self.nullable || col.nullable;
396
397        Ok(ReprColumnType {
398            scalar_type,
399            nullable,
400        })
401    }
402}
403
404impl From<&SqlColumnType> for ReprColumnType {
405    fn from(sql_column_type: &SqlColumnType) -> Self {
406        let scalar_type = &sql_column_type.scalar_type;
407        let scalar_type = scalar_type.into();
408        let nullable = sql_column_type.nullable;
409
410        ReprColumnType {
411            scalar_type,
412            nullable,
413        }
414    }
415}
416
417impl SqlColumnType {
418    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
419    ///
420    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
421    pub fn from_repr(repr: &ReprColumnType) -> Self {
422        let scalar_type = &repr.scalar_type;
423        let scalar_type = SqlScalarType::from_repr(scalar_type);
424        let nullable = repr.nullable;
425
426        SqlColumnType {
427            scalar_type,
428            nullable,
429        }
430    }
431}
432
433/// The name of a column in a [`RelationDesc`].
434#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
435pub struct ColumnName(Box<str>);
436
437impl ColumnName {
438    /// Returns this column name as a `str`.
439    #[inline(always)]
440    pub fn as_str(&self) -> &str {
441        &*self
442    }
443
444    /// Returns this column name as a `&mut Box<str>`.
445    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
446        &mut self.0
447    }
448
449    /// Returns if this [`ColumnName`] is similar to the provided one.
450    pub fn is_similar(&self, other: &ColumnName) -> bool {
451        const SIMILARITY_THRESHOLD: f64 = 0.6;
452
453        let a_lowercase = self.to_lowercase();
454        let b_lowercase = other.to_lowercase();
455
456        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
457    }
458}
459
460impl std::ops::Deref for ColumnName {
461    type Target = str;
462
463    #[inline(always)]
464    fn deref(&self) -> &Self::Target {
465        &self.0
466    }
467}
468
469impl fmt::Display for ColumnName {
470    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
471        f.write_str(&self.0)
472    }
473}
474
475impl From<String> for ColumnName {
476    fn from(s: String) -> ColumnName {
477        ColumnName(s.into())
478    }
479}
480
481impl From<&str> for ColumnName {
482    fn from(s: &str) -> ColumnName {
483        ColumnName(s.into())
484    }
485}
486
487impl From<&ColumnName> for ColumnName {
488    fn from(n: &ColumnName) -> ColumnName {
489        n.clone()
490    }
491}
492
493impl RustType<ProtoColumnName> for ColumnName {
494    fn into_proto(&self) -> ProtoColumnName {
495        ProtoColumnName {
496            value: Some(self.0.to_string()),
497        }
498    }
499
500    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
501        Ok(ColumnName(
502            proto
503                .value
504                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
505                .into(),
506        ))
507    }
508}
509
510impl From<ColumnName> for mz_sql_parser::ast::Ident {
511    fn from(value: ColumnName) -> Self {
512        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
513        mz_sql_parser::ast::Ident::new_unchecked(value.0)
514    }
515}
516
517impl proptest::arbitrary::Arbitrary for ColumnName {
518    type Parameters = ();
519    type Strategy = BoxedStrategy<ColumnName>;
520
521    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
522        // Long column names are generally uninteresting, and can greatly
523        // increase the runtime for a test case, so bound the max length.
524        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
525        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
526            weights.extend([
527                (5, Just(16..128)),
528                (1, Just(128..1024)),
529                (1, Just(1024..4096)),
530            ]);
531        }
532        let name_length = Union::new_weighted(weights);
533
534        // Non-ASCII characters are also generally uninteresting and can make
535        // debugging harder.
536        let char_strat = Rc::new(Union::new_weighted(vec![
537            (50, proptest::char::range('A', 'z').boxed()),
538            (1, any::<char>().boxed()),
539        ]));
540
541        name_length
542            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
543            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
544            .no_shrink()
545            .boxed()
546    }
547}
548
549/// Default name of a column (when no other information is known).
550pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
551
552/// Stable index of a column in a [`RelationDesc`].
553#[derive(
554    Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
555)]
556pub struct ColumnIndex(usize);
557
558static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
559
560impl ColumnIndex {
561    /// Returns a stable identifier for this [`ColumnIndex`].
562    pub fn to_stable_name(&self) -> String {
563        self.0.to_string()
564    }
565
566    pub fn to_raw(&self) -> usize {
567        self.0
568    }
569
570    pub fn from_raw(val: usize) -> Self {
571        ColumnIndex(val)
572    }
573}
574
575/// The version a given column was added at.
576#[derive(
577    Clone,
578    Copy,
579    Debug,
580    Eq,
581    PartialEq,
582    PartialOrd,
583    Ord,
584    Serialize,
585    Deserialize,
586    Hash,
587    MzReflect,
588    Arbitrary,
589)]
590pub struct RelationVersion(u64);
591
592impl RelationVersion {
593    /// Returns the "root" or "initial" version of a [`RelationDesc`].
594    pub fn root() -> Self {
595        RelationVersion(0)
596    }
597
598    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
599    pub fn bump(&self) -> Self {
600        let next_version = self
601            .0
602            .checked_add(1)
603            .expect("added more than u64::MAX columns?");
604        RelationVersion(next_version)
605    }
606
607    /// Consume a [`RelationVersion`] returning the raw value.
608    ///
609    /// Should __only__ be used for serialization.
610    pub fn into_raw(self) -> u64 {
611        self.0
612    }
613
614    /// Create a [`RelationVersion`] from a raw value.
615    ///
616    /// Should __only__ be used for serialization.
617    pub fn from_raw(val: u64) -> RelationVersion {
618        RelationVersion(val)
619    }
620}
621
622impl From<RelationVersion> for SchemaId {
623    fn from(value: RelationVersion) -> Self {
624        SchemaId(usize::cast_from(value.0))
625    }
626}
627
628impl From<mz_sql_parser::ast::Version> for RelationVersion {
629    fn from(value: mz_sql_parser::ast::Version) -> Self {
630        RelationVersion(value.into_inner())
631    }
632}
633
634impl fmt::Display for RelationVersion {
635    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
636        write!(f, "v{}", self.0)
637    }
638}
639
640impl From<RelationVersion> for mz_sql_parser::ast::Version {
641    fn from(value: RelationVersion) -> Self {
642        mz_sql_parser::ast::Version::new(value.0)
643    }
644}
645
646impl RustType<ProtoRelationVersion> for RelationVersion {
647    fn into_proto(&self) -> ProtoRelationVersion {
648        ProtoRelationVersion { value: self.0 }
649    }
650
651    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
652        Ok(RelationVersion(proto.value))
653    }
654}
655
656/// Metadata (other than type) for a column in a [`RelationDesc`].
657#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
658struct ColumnMetadata {
659    /// Name of the column.
660    name: ColumnName,
661    /// Index into a [`SqlRelationType`] for this column.
662    typ_idx: usize,
663    /// Version this column was added at.
664    added: RelationVersion,
665    /// Version this column was dropped at.
666    dropped: Option<RelationVersion>,
667}
668
669/// A description of the shape of a relation.
670///
671/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
672/// the relation.
673///
674/// # Examples
675///
676/// A `RelationDesc`s is typically constructed via its builder API:
677///
678/// ```
679/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
680///
681/// let desc = RelationDesc::builder()
682///     .with_column("id", SqlScalarType::Int64.nullable(false))
683///     .with_column("price", SqlScalarType::Float64.nullable(true))
684///     .finish();
685/// ```
686///
687/// In more complicated cases, like when constructing a `RelationDesc` in
688/// response to user input, it may be more convenient to construct a relation
689/// type first, and imbue it with column names to form a `RelationDesc` later:
690///
691/// ```
692/// use mz_repr::RelationDesc;
693///
694/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
695/// let relation_type = plan_query("SELECT * FROM table");
696/// let names = (0..relation_type.arity()).map(|i| match i {
697///     0 => "first",
698///     1 => "second",
699///     _ => "unknown",
700/// });
701/// let desc = RelationDesc::new(relation_type, names);
702/// ```
703///
704/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
705/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
706/// column throughout the lifetime of the relation. This allows a
707/// [`RelationDesc`] to represent a projection over a version of itself.
708///
709/// ```
710/// use std::collections::BTreeSet;
711/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
712///
713/// let desc = RelationDesc::builder()
714///     .with_column("name", SqlScalarType::String.nullable(false))
715///     .with_column("email", SqlScalarType::String.nullable(false))
716///     .finish();
717///
718/// // Project away the second column.
719/// let demands = BTreeSet::from([1]);
720/// let proj = desc.apply_demand(&demands);
721///
722/// // We projected away the first column.
723/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
724/// // But retained the second.
725/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
726///
727/// // The underlying `SqlRelationType` also contains a single column.
728/// assert_eq!(proj.typ().arity(), 1);
729/// ```
730///
731/// To maintain this stable mapping and track the lifetime of a column (e.g.
732/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
733/// the index in [`SqlRelationType`] that corresponds to a given column, and the
734/// version at which this column was added or dropped.
735///
736#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
737pub struct RelationDesc {
738    typ: SqlRelationType,
739    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
740}
741
742impl RustType<ProtoRelationDesc> for RelationDesc {
743    fn into_proto(&self) -> ProtoRelationDesc {
744        let (names, metadata): (Vec<_>, Vec<_>) = self
745            .metadata
746            .values()
747            .map(|meta| {
748                let metadata = ProtoColumnMetadata {
749                    added: Some(meta.added.into_proto()),
750                    dropped: meta.dropped.map(|v| v.into_proto()),
751                };
752                (meta.name.into_proto(), metadata)
753            })
754            .unzip();
755
756        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
757        // metadata field was added. To make sure our serialization roundtrips the same as before
758        // we added the field, we omit `metadata` if all of the values are equal to the default.
759        //
760        // Note: This logic needs to exist approximately forever.
761        let is_all_default_metadata = metadata.iter().all(|meta| {
762            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
763        });
764        let metadata = if is_all_default_metadata {
765            Vec::new()
766        } else {
767            metadata
768        };
769
770        ProtoRelationDesc {
771            typ: Some(self.typ.into_proto()),
772            names,
773            metadata,
774        }
775    }
776
777    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
778        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
779        // metadata field was added. If the field doesn't exist we fill it in with default values,
780        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
781        //
782        // Note: This logic needs to exist approximately forever.
783        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
784            let val = ProtoColumnMetadata {
785                added: Some(RelationVersion::root().into_proto()),
786                dropped: None,
787            };
788            Box::new(itertools::repeat_n(val, proto.names.len()))
789        } else {
790            Box::new(proto.metadata.into_iter())
791        };
792
793        let metadata = proto
794            .names
795            .into_iter()
796            .zip_eq(proto_metadata)
797            .enumerate()
798            .map(|(idx, (name, metadata))| {
799                let meta = ColumnMetadata {
800                    name: name.into_rust()?,
801                    typ_idx: idx,
802                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
803                    dropped: metadata.dropped.into_rust()?,
804                };
805                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
806            })
807            .collect::<Result<_, _>>()?;
808
809        Ok(RelationDesc {
810            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
811            metadata,
812        })
813    }
814}
815
816impl RelationDesc {
817    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
818    pub fn builder() -> RelationDescBuilder {
819        RelationDescBuilder::default()
820    }
821
822    /// Constructs a new `RelationDesc` that represents the empty relation
823    /// with no columns and no keys.
824    pub fn empty() -> Self {
825        RelationDesc {
826            typ: SqlRelationType::empty(),
827            metadata: BTreeMap::default(),
828        }
829    }
830
831    /// Check if the `RelationDesc` is empty.
832    pub fn is_empty(&self) -> bool {
833        self == &Self::empty()
834    }
835
836    /// Returns the number of columns in this [`RelationDesc`].
837    pub fn len(&self) -> usize {
838        self.typ().column_types.len()
839    }
840
841    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
842    /// over column names.
843    ///
844    /// # Panics
845    ///
846    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
847    /// items in `names`.
848    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
849    where
850        I: IntoIterator<Item = N>,
851        N: Into<ColumnName>,
852    {
853        let metadata: BTreeMap<_, _> = names
854            .into_iter()
855            .enumerate()
856            .map(|(idx, name)| {
857                let col_idx = ColumnIndex(idx);
858                let metadata = ColumnMetadata {
859                    name: name.into(),
860                    typ_idx: idx,
861                    added: RelationVersion::root(),
862                    dropped: None,
863                };
864                (col_idx, metadata)
865            })
866            .collect();
867
868        // TODO(parkmycar): Add better validation here.
869        assert_eq!(typ.column_types.len(), metadata.len());
870
871        RelationDesc { typ, metadata }
872    }
873
874    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
875    where
876        I: IntoIterator<Item = (N, T)>,
877        T: Into<SqlColumnType>,
878        N: Into<ColumnName>,
879    {
880        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
881        let types = types.into_iter().map(Into::into).collect();
882        let typ = SqlRelationType::new(types);
883        Self::new(typ, names)
884    }
885
886    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
887    ///
888    /// # Panics
889    ///
890    /// Panics if either `self` or `other` have columns that were added at a
891    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
892    /// columns were dropped.
893    ///
894    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
895    pub fn concat(mut self, other: Self) -> Self {
896        let self_len = self.typ.column_types.len();
897
898        for (typ, (_col_idx, meta)) in other
899            .typ
900            .column_types
901            .into_iter()
902            .zip_eq(other.metadata.into_iter())
903        {
904            assert_eq!(meta.added, RelationVersion::root());
905            assert_none!(meta.dropped);
906
907            let new_idx = self.typ.columns().len();
908            let new_meta = ColumnMetadata {
909                name: meta.name,
910                typ_idx: new_idx,
911                added: RelationVersion::root(),
912                dropped: None,
913            };
914
915            self.typ.column_types.push(typ);
916            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
917
918            assert_eq!(self.metadata.len(), self.typ.columns().len());
919            assert_none!(prev);
920        }
921
922        for k in other.typ.keys {
923            let k = k.into_iter().map(|idx| idx + self_len).collect();
924            self = self.with_key(k);
925        }
926        self
927    }
928
929    /// Adds a new key for the relation.
930    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
931        self.typ = self.typ.with_key(indices);
932        self
933    }
934
935    /// Drops all existing keys.
936    pub fn without_keys(mut self) -> Self {
937        self.typ.keys.clear();
938        self
939    }
940
941    /// Builds a new relation description with the column names replaced with
942    /// new names.
943    ///
944    /// # Panics
945    ///
946    /// Panics if the arity of the relation type does not match the number of
947    /// items in `names`.
948    pub fn with_names<I, N>(self, names: I) -> Self
949    where
950        I: IntoIterator<Item = N>,
951        N: Into<ColumnName>,
952    {
953        Self::new(self.typ, names)
954    }
955
956    /// Computes the number of columns in the relation.
957    pub fn arity(&self) -> usize {
958        self.typ.arity()
959    }
960
961    /// Returns the relation type underlying this relation description.
962    pub fn typ(&self) -> &SqlRelationType {
963        &self.typ
964    }
965
966    /// Returns the owned relation type underlying this relation description.
967    pub fn into_typ(self) -> SqlRelationType {
968        self.typ
969    }
970
971    /// Returns an iterator over the columns in this relation.
972    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
973        self.metadata.values().map(|meta| {
974            let typ = &self.typ.columns()[meta.typ_idx];
975            (&meta.name, typ)
976        })
977    }
978
979    /// Returns an iterator over the types of the columns in this relation.
980    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
981        self.typ.column_types.iter()
982    }
983
984    /// Returns an iterator over the names of the columns in this relation.
985    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
986        self.metadata.values().map(|meta| &meta.name)
987    }
988
989    /// Returns an iterator over the columns in this relation, with all their metadata.
990    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
991        self.metadata.iter().map(|(col_idx, metadata)| {
992            let col_typ = &self.typ.columns()[metadata.typ_idx];
993            (col_idx, &metadata.name, col_typ)
994        })
995    }
996
997    /// Returns an iterator over the names of the columns in this relation that are "similar" to
998    /// the provided `name`.
999    pub fn iter_similar_names<'a>(
1000        &'a self,
1001        name: &'a ColumnName,
1002    ) -> impl Iterator<Item = &'a ColumnName> {
1003        self.iter_names().filter(|n| n.is_similar(name))
1004    }
1005
1006    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
1007    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1008        self.metadata.contains_key(idx)
1009    }
1010
1011    /// Finds a column by name.
1012    ///
1013    /// Returns the index and type of the column named `name`. If no column with
1014    /// the specified name exists, returns `None`. If multiple columns have the
1015    /// specified name, the leftmost column is returned.
1016    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1017        self.iter_names()
1018            .position(|n| n == name)
1019            .map(|i| (i, &self.typ.column_types[i]))
1020    }
1021
1022    /// Gets the name of the `i`th column.
1023    ///
1024    /// # Panics
1025    ///
1026    /// Panics if `i` is not a valid column index.
1027    ///
1028    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
1029    pub fn get_name(&self, i: usize) -> &ColumnName {
1030        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1031        self.get_name_idx(&ColumnIndex(i))
1032    }
1033
1034    /// Gets the name of the column at `idx`.
1035    ///
1036    /// # Panics
1037    ///
1038    /// Panics if no column exists at `idx`.
1039    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1040        &self.metadata.get(idx).expect("should exist").name
1041    }
1042
1043    /// Mutably gets the name of the `i`th column.
1044    ///
1045    /// # Panics
1046    ///
1047    /// Panics if `i` is not a valid column index.
1048    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1049        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1050        &mut self
1051            .metadata
1052            .get_mut(&ColumnIndex(i))
1053            .expect("should exist")
1054            .name
1055    }
1056
1057    /// Gets the [`SqlColumnType`] of the column at `idx`.
1058    ///
1059    /// # Panics
1060    ///
1061    /// Panics if no column exists at `idx`.
1062    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1063        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1064        &self.typ.column_types[typ_idx]
1065    }
1066
1067    /// Gets the name of the `i`th column if that column name is unambiguous.
1068    ///
1069    /// If at least one other column has the same name as the `i`th column,
1070    /// returns `None`. If the `i`th column has no name, returns `None`.
1071    ///
1072    /// # Panics
1073    ///
1074    /// Panics if `i` is not a valid column index.
1075    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1076        let name = self.get_name(i);
1077        if self.iter_names().filter(|n| *n == name).count() == 1 {
1078            Some(name)
1079        } else {
1080            None
1081        }
1082    }
1083
1084    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1085    ///
1086    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1087    /// structure will be simple to extend.
1088    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1089        let name = self.get_name(i);
1090        let typ = &self.typ.column_types[i];
1091        if d == &Datum::Null && !typ.nullable {
1092            Err(NotNullViolation(name.clone()))
1093        } else {
1094            Ok(())
1095        }
1096    }
1097
1098    /// Computes the differences between two [`RelationDesc`]s.
1099    ///
1100    /// Returns a rich diff describing which columns differ, and in what way.
1101    ///
1102    /// # Panics
1103    ///
1104    /// Panics if either `self` or `other` have columns that were added at a
1105    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1106    /// columns were dropped.
1107    ///
1108    /// This simplifies things by allowing us to assume that `ColumnIndex`es are
1109    /// dense and that they match the indexes of `typ.columns()`. Without this
1110    /// we would, e.g., struggle comparing keys as those are in terms of
1111    /// `typ.columns()` indexes.
1112    pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1113        assert_eq!(self.metadata.len(), self.typ.columns().len());
1114        assert_eq!(other.metadata.len(), other.typ.columns().len());
1115        for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1116            assert_eq!(meta.typ_idx, idx.0);
1117            assert_eq!(meta.added, RelationVersion::root());
1118            assert_none!(meta.dropped);
1119        }
1120
1121        let mut column_diffs = BTreeMap::new();
1122        let mut key_diff = None;
1123
1124        let left_arity = self.arity();
1125        let right_arity = other.arity();
1126        let common_arity = std::cmp::min(left_arity, right_arity);
1127
1128        for idx in 0..common_arity {
1129            let left_name = self.get_name(idx);
1130            let right_name = other.get_name(idx);
1131            let left_type = &self.typ.column_types[idx];
1132            let right_type = &other.typ.column_types[idx];
1133
1134            if left_name != right_name {
1135                let diff = ColumnDiff::NameMismatch {
1136                    left: left_name.clone(),
1137                    right: right_name.clone(),
1138                };
1139                column_diffs.insert(idx, diff);
1140            } else if left_type.scalar_type != right_type.scalar_type {
1141                let diff = ColumnDiff::TypeMismatch {
1142                    name: left_name.clone(),
1143                    left: left_type.scalar_type.clone(),
1144                    right: right_type.scalar_type.clone(),
1145                };
1146                column_diffs.insert(idx, diff);
1147            } else if left_type.nullable != right_type.nullable {
1148                let diff = ColumnDiff::NullabilityMismatch {
1149                    name: left_name.clone(),
1150                    left: left_type.nullable,
1151                    right: right_type.nullable,
1152                };
1153                column_diffs.insert(idx, diff);
1154            }
1155        }
1156
1157        for idx in common_arity..left_arity {
1158            let diff = ColumnDiff::Missing {
1159                name: self.get_name(idx).clone(),
1160            };
1161            column_diffs.insert(idx, diff);
1162        }
1163
1164        for idx in common_arity..right_arity {
1165            let diff = ColumnDiff::Extra {
1166                name: other.get_name(idx).clone(),
1167            };
1168            column_diffs.insert(idx, diff);
1169        }
1170
1171        let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1172        let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1173        if left_keys != right_keys {
1174            let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1175                keys.iter()
1176                    .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1177                    .collect()
1178            };
1179            key_diff = Some(KeyDiff {
1180                left: column_names(self, left_keys),
1181                right: column_names(other, right_keys),
1182            });
1183        }
1184
1185        RelationDescDiff {
1186            column_diffs,
1187            key_diff,
1188        }
1189    }
1190
1191    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1192    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1193        let mut new_desc = self.clone();
1194
1195        // Update ColumnMetadata.
1196        let mut removed = 0;
1197        new_desc.metadata.retain(|idx, metadata| {
1198            let retain = demands.contains(&idx.0);
1199            if !retain {
1200                removed += 1;
1201            } else {
1202                metadata.typ_idx -= removed;
1203            }
1204            retain
1205        });
1206
1207        // Update SqlColumnType.
1208        let mut idx = 0;
1209        new_desc.typ.column_types.retain(|_| {
1210            let keep = demands.contains(&idx);
1211            idx += 1;
1212            keep
1213        });
1214
1215        new_desc
1216    }
1217}
1218
1219impl Arbitrary for RelationDesc {
1220    type Parameters = ();
1221    type Strategy = BoxedStrategy<RelationDesc>;
1222
1223    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1224        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1225        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1226            weights.extend([
1227                (12, Just(16..32)),
1228                (6, Just(32..64)),
1229                (3, Just(64..128)),
1230                (1, Just(128..256)),
1231            ]);
1232        }
1233        let num_columns = Union::new_weighted(weights);
1234
1235        num_columns.prop_flat_map(arb_relation_desc).boxed()
1236    }
1237}
1238
1239/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1240/// within the range provided.
1241pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1242    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1243        .prop_map(RelationDesc::from_names_and_types)
1244}
1245
1246/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1247pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1248    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1249    mask.prop_map(move |mask| {
1250        let demands: BTreeSet<_> = mask
1251            .into_iter()
1252            .enumerate()
1253            .filter_map(|(idx, keep)| keep.then_some(idx))
1254            .collect();
1255        desc.apply_demand(&demands)
1256    })
1257}
1258
1259impl IntoIterator for RelationDesc {
1260    type Item = (ColumnName, SqlColumnType);
1261    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1262
1263    fn into_iter(self) -> Self::IntoIter {
1264        let iter = self
1265            .metadata
1266            .into_values()
1267            .zip_eq(self.typ.column_types)
1268            .map(|(meta, typ)| (meta.name, typ));
1269        Box::new(iter)
1270    }
1271}
1272
1273/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1274pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1275    let datums: Vec<_> = desc
1276        .typ()
1277        .columns()
1278        .iter()
1279        .cloned()
1280        .map(arb_datum_for_column)
1281        .collect();
1282    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1283}
1284
1285/// Expression violated not-null constraint on named column
1286#[derive(Debug, PartialEq, Eq)]
1287pub struct NotNullViolation(pub ColumnName);
1288
1289impl fmt::Display for NotNullViolation {
1290    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1291        write!(
1292            f,
1293            "null value in column {} violates not-null constraint",
1294            self.0.quoted()
1295        )
1296    }
1297}
1298
1299/// The result of comparing two [`RelationDesc`]s.
1300#[derive(Debug, Clone, PartialEq, Eq)]
1301pub struct RelationDescDiff {
1302    /// Column differences, keyed by column index.
1303    pub column_diffs: BTreeMap<usize, ColumnDiff>,
1304    /// Key differences, if any.
1305    pub key_diff: Option<KeyDiff>,
1306}
1307
1308impl RelationDescDiff {
1309    /// Returns whether the diff contains any differences.
1310    pub fn is_empty(&self) -> bool {
1311        self.column_diffs.is_empty() && self.key_diff.is_none()
1312    }
1313}
1314
1315/// A difference in a column between two [`RelationDesc`]s.
1316#[derive(Debug, Clone, PartialEq, Eq)]
1317pub enum ColumnDiff {
1318    /// Column exists only in the left relation.
1319    Missing { name: ColumnName },
1320    /// Column exists only in the right relation.
1321    Extra { name: ColumnName },
1322    /// Columns have different types.
1323    TypeMismatch {
1324        name: ColumnName,
1325        left: SqlScalarType,
1326        right: SqlScalarType,
1327    },
1328    /// Columns have different nullability.
1329    NullabilityMismatch {
1330        name: ColumnName,
1331        left: bool,
1332        right: bool,
1333    },
1334    /// Columns have different names.
1335    NameMismatch { left: ColumnName, right: ColumnName },
1336}
1337
1338/// A difference in the keys of two [`RelationDesc`]s.
1339#[derive(Debug, Clone, PartialEq, Eq)]
1340pub struct KeyDiff {
1341    /// Keys of the left relation.
1342    pub left: BTreeSet<Vec<ColumnName>>,
1343    /// Keys of the right relation.
1344    pub right: BTreeSet<Vec<ColumnName>>,
1345}
1346
1347/// A builder for a [`RelationDesc`].
1348#[derive(Clone, Default, Debug, PartialEq, Eq)]
1349pub struct RelationDescBuilder {
1350    /// Columns of the relation.
1351    columns: Vec<(ColumnName, SqlColumnType)>,
1352    /// Sets of indices that are "keys" for the collection.
1353    keys: Vec<Vec<usize>>,
1354}
1355
1356impl RelationDescBuilder {
1357    /// Appends a column with the specified name and type.
1358    pub fn with_column<N: Into<ColumnName>>(
1359        mut self,
1360        name: N,
1361        ty: SqlColumnType,
1362    ) -> RelationDescBuilder {
1363        let name = name.into();
1364        self.columns.push((name, ty));
1365        self
1366    }
1367
1368    /// Appends the provided columns to the builder.
1369    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1370    where
1371        I: IntoIterator<Item = (N, T)>,
1372        T: Into<SqlColumnType>,
1373        N: Into<ColumnName>,
1374    {
1375        self.columns
1376            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1377        self
1378    }
1379
1380    /// Adds a new key for the relation.
1381    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1382        indices.sort_unstable();
1383        if !self.keys.contains(&indices) {
1384            self.keys.push(indices);
1385        }
1386        self
1387    }
1388
1389    /// Removes all previously inserted keys.
1390    pub fn without_keys(mut self) -> RelationDescBuilder {
1391        self.keys.clear();
1392        assert_eq!(self.keys.len(), 0);
1393        self
1394    }
1395
1396    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1397    pub fn concat(mut self, other: Self) -> Self {
1398        let self_len = self.columns.len();
1399
1400        self.columns.extend(other.columns);
1401        for k in other.keys {
1402            let k = k.into_iter().map(|idx| idx + self_len).collect();
1403            self = self.with_key(k);
1404        }
1405
1406        self
1407    }
1408
1409    /// Finish the builder, returning a [`RelationDesc`].
1410    pub fn finish(self) -> RelationDesc {
1411        let mut desc = RelationDesc::from_names_and_types(self.columns);
1412        desc.typ = desc.typ.with_keys(self.keys);
1413        desc
1414    }
1415}
1416
1417/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1418#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1419pub enum RelationVersionSelector {
1420    Specific(RelationVersion),
1421    Latest,
1422}
1423
1424impl RelationVersionSelector {
1425    pub fn specific(version: u64) -> Self {
1426        RelationVersionSelector::Specific(RelationVersion(version))
1427    }
1428}
1429
1430/// A wrapper around [`RelationDesc`] that provides an interface for adding
1431/// columns and generating new versions.
1432///
1433/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1434/// be great.
1435#[derive(Debug, Clone, Serialize)]
1436pub struct VersionedRelationDesc {
1437    inner: RelationDesc,
1438}
1439
1440impl VersionedRelationDesc {
1441    pub fn new(inner: RelationDesc) -> Self {
1442        VersionedRelationDesc { inner }
1443    }
1444
1445    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1446    ///
1447    /// # Panics
1448    ///
1449    /// * Panics if a column with `name` already exists that hasn't been dropped.
1450    ///
1451    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1452    #[must_use]
1453    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1454    where
1455        N: Into<ColumnName>,
1456        T: Into<SqlColumnType>,
1457    {
1458        let latest_version = self.latest_version();
1459        let new_version = latest_version.bump();
1460
1461        let name = name.into();
1462        let existing = self
1463            .inner
1464            .metadata
1465            .iter()
1466            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1467        if let Some(existing) = existing {
1468            panic!("column named '{name}' already exists! {existing:?}");
1469        }
1470
1471        let next_idx = self.inner.metadata.len();
1472        let col_meta = ColumnMetadata {
1473            name,
1474            typ_idx: next_idx,
1475            added: new_version,
1476            dropped: None,
1477        };
1478
1479        self.inner.typ.column_types.push(typ.into());
1480        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1481
1482        assert_none!(prev, "column index overlap!");
1483        self.validate();
1484
1485        new_version
1486    }
1487
1488    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1489    /// `name` drops the left-most one that hasn't already been dropped.
1490    ///
1491    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1492    ///
1493    /// # Panics
1494    ///
1495    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1496    #[must_use]
1497    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1498    where
1499        N: Into<ColumnName>,
1500    {
1501        let name = name.into();
1502        let latest_version = self.latest_version();
1503        let new_version = latest_version.bump();
1504
1505        let col = self
1506            .inner
1507            .metadata
1508            .values_mut()
1509            .find(|meta| meta.name == name && meta.dropped.is_none())
1510            .expect("column to exist");
1511
1512        // Make sure the column hadn't been previously dropped.
1513        assert_none!(col.dropped, "column was already dropped");
1514        col.dropped = Some(new_version);
1515
1516        // Make sure the column isn't being used as a key.
1517        let dropped_key = self
1518            .inner
1519            .typ
1520            .keys
1521            .iter()
1522            .any(|keys| keys.contains(&col.typ_idx));
1523        assert!(!dropped_key, "column being dropped was used as a key");
1524
1525        self.validate();
1526        new_version
1527    }
1528
1529    /// Returns the [`RelationDesc`] at the latest version.
1530    pub fn latest(&self) -> RelationDesc {
1531        self.inner.clone()
1532    }
1533
1534    /// Returns this [`RelationDesc`] at the specified version.
1535    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1536        // Get all of the changes from the start, up to whatever version was requested.
1537        let up_to_version = match version {
1538            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1539            RelationVersionSelector::Specific(v) => v,
1540        };
1541
1542        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1543            let added = meta.added <= up_to_version;
1544            let dropped = meta
1545                .dropped
1546                .map(|dropped_at| up_to_version >= dropped_at)
1547                .unwrap_or(false);
1548
1549            added && !dropped
1550        });
1551
1552        let mut column_types = Vec::new();
1553        let mut column_metas = BTreeMap::new();
1554
1555        // N.B. At this point we need to be careful because col_idx might not
1556        // equal typ_idx.
1557        //
1558        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1559        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1560        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1561        for (col_idx, meta) in valid_columns {
1562            let new_meta = ColumnMetadata {
1563                name: meta.name.clone(),
1564                typ_idx: column_types.len(),
1565                added: meta.added.clone(),
1566                dropped: meta.dropped.clone(),
1567            };
1568            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1569            column_metas.insert(*col_idx, new_meta);
1570        }
1571
1572        // Remap keys in case a column with an index less than that of a key was
1573        // dropped.
1574        //
1575        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1576        // keys and "b" was dropped.
1577        let keys = self
1578            .inner
1579            .typ
1580            .keys
1581            .iter()
1582            .map(|keys| {
1583                keys.iter()
1584                    .map(|key_idx| {
1585                        let metadata = column_metas
1586                            .get(&ColumnIndex(*key_idx))
1587                            .expect("found key for column that doesn't exist");
1588                        metadata.typ_idx
1589                    })
1590                    .collect()
1591            })
1592            .collect();
1593
1594        let relation_type = SqlRelationType { column_types, keys };
1595
1596        RelationDesc {
1597            typ: relation_type,
1598            metadata: column_metas,
1599        }
1600    }
1601
1602    pub fn latest_version(&self) -> RelationVersion {
1603        self.inner
1604            .metadata
1605            .values()
1606            // N.B. Dropped is always greater than added.
1607            .map(|meta| meta.dropped.unwrap_or(meta.added))
1608            .max()
1609            // If there aren't any columns we're implicitly the root version.
1610            .unwrap_or_else(RelationVersion::root)
1611    }
1612
1613    /// Validates internal contraints of the [`RelationDesc`] are correct.
1614    ///
1615    /// # Panics
1616    ///
1617    /// Panics if a constraint is not satisfied.
1618    fn validate(&self) {
1619        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1620            if desc.typ.column_types.len() != desc.metadata.len() {
1621                anyhow::bail!("mismatch between number of types and metadatas");
1622            }
1623
1624            for (col_idx, meta) in &desc.metadata {
1625                if col_idx.0 > desc.metadata.len() {
1626                    anyhow::bail!("column index out of bounds");
1627                }
1628                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1629                    anyhow::bail!("column was added after it was dropped?");
1630                }
1631                if desc.typ().columns().get(meta.typ_idx).is_none() {
1632                    anyhow::bail!("typ_idx incorrect");
1633                }
1634            }
1635
1636            for keys in &desc.typ.keys {
1637                for key in keys {
1638                    if *key >= desc.typ.column_types.len() {
1639                        anyhow::bail!("key index was out of bounds!");
1640                    }
1641                }
1642            }
1643
1644            let versions = desc
1645                .metadata
1646                .values()
1647                .map(|meta| meta.dropped.unwrap_or(meta.added));
1648            let mut max = 0;
1649            let mut sum = 0;
1650            for version in versions {
1651                max = std::cmp::max(max, version.0);
1652                sum += version.0;
1653            }
1654
1655            // Other than RelationVersion(0), we should never have duplicate
1656            // versions and they should always increase by 1. In other words, the
1657            // sum of all RelationVersions should be the sum of [0, max].
1658            //
1659            // N.B. n * (n + 1) / 2 = sum of [0, n]
1660            //
1661            // While I normally don't like tricks like this, it allows us to
1662            // validate that our column versions are correct in O(n) time and
1663            // without allocations.
1664            if sum != (max * (max + 1) / 2) {
1665                anyhow::bail!("there is a duplicate or missing relation version");
1666            }
1667
1668            Ok(())
1669        }
1670
1671        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1672    }
1673}
1674
1675/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1676/// exercise schema migrations.
1677#[derive(Debug)]
1678pub enum PropRelationDescDiff {
1679    AddColumn {
1680        name: ColumnName,
1681        typ: SqlColumnType,
1682    },
1683    DropColumn {
1684        name: ColumnName,
1685    },
1686    ToggleNullability {
1687        name: ColumnName,
1688    },
1689    ChangeType {
1690        name: ColumnName,
1691        typ: SqlColumnType,
1692    },
1693}
1694
1695impl PropRelationDescDiff {
1696    pub fn apply(self, desc: &mut RelationDesc) {
1697        match self {
1698            PropRelationDescDiff::AddColumn { name, typ } => {
1699                let new_idx = desc.metadata.len();
1700                let meta = ColumnMetadata {
1701                    name,
1702                    typ_idx: new_idx,
1703                    added: RelationVersion(0),
1704                    dropped: None,
1705                };
1706                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1707                desc.typ.column_types.push(typ);
1708
1709                assert_none!(prev);
1710                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1711            }
1712            PropRelationDescDiff::DropColumn { name } => {
1713                let next_version = desc
1714                    .metadata
1715                    .values()
1716                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1717                    .max()
1718                    .unwrap_or_else(RelationVersion::root)
1719                    .bump();
1720                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1721                else {
1722                    return;
1723                };
1724                if metadata.dropped.is_none() {
1725                    metadata.dropped = Some(next_version);
1726                }
1727            }
1728            PropRelationDescDiff::ToggleNullability { name } => {
1729                let Some((pos, _)) = desc.get_by_name(&name) else {
1730                    return;
1731                };
1732                let col_type = desc
1733                    .typ
1734                    .column_types
1735                    .get_mut(pos)
1736                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1737                col_type.nullable = !col_type.nullable;
1738            }
1739            PropRelationDescDiff::ChangeType { name, typ } => {
1740                let Some((pos, _)) = desc.get_by_name(&name) else {
1741                    return;
1742                };
1743                let col_type = desc
1744                    .typ
1745                    .column_types
1746                    .get_mut(pos)
1747                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1748                *col_type = typ;
1749            }
1750        }
1751    }
1752}
1753
1754/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1755pub fn arb_relation_desc_diff(
1756    source: &RelationDesc,
1757) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1758    let source = Rc::new(source.clone());
1759    let num_source_columns = source.typ.columns().len();
1760
1761    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1762    let add_columns_strat = num_add_columns
1763        .prop_flat_map(|num_columns| {
1764            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1765        })
1766        .prop_map(|cols| {
1767            cols.into_iter()
1768                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1769                .collect::<Vec<_>>()
1770        });
1771
1772    // If the source RelationDesc is empty there is nothing else to do.
1773    if num_source_columns == 0 {
1774        return add_columns_strat.boxed();
1775    }
1776
1777    let source_ = Rc::clone(&source);
1778    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1779        let mut set = BTreeSet::default();
1780        for _ in 0..num_columns {
1781            let col_idx = rng.random_range(0..num_source_columns);
1782            set.insert(source_.get_name(col_idx).clone());
1783        }
1784        set.into_iter()
1785            .map(|name| PropRelationDescDiff::DropColumn { name })
1786            .collect::<Vec<_>>()
1787    });
1788
1789    let source_ = Rc::clone(&source);
1790    let toggle_nullability_strat =
1791        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1792            let mut set = BTreeSet::default();
1793            for _ in 0..num_columns {
1794                let col_idx = rng.random_range(0..num_source_columns);
1795                set.insert(source_.get_name(col_idx).clone());
1796            }
1797            set.into_iter()
1798                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1799                .collect::<Vec<_>>()
1800        });
1801
1802    let source_ = Rc::clone(&source);
1803    let change_type_strat = (0..num_source_columns)
1804        .prop_perturb(move |num_columns, mut rng| {
1805            let mut set = BTreeSet::default();
1806            for _ in 0..num_columns {
1807                let col_idx = rng.random_range(0..num_source_columns);
1808                set.insert(source_.get_name(col_idx).clone());
1809            }
1810            set
1811        })
1812        .prop_flat_map(|cols| {
1813            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1814                .prop_map(move |types| (cols.clone(), types))
1815        })
1816        .prop_map(|(cols, types)| {
1817            cols.into_iter()
1818                .zip_eq(types)
1819                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1820                .collect::<Vec<_>>()
1821        });
1822
1823    (
1824        add_columns_strat,
1825        drop_columns_strat,
1826        toggle_nullability_strat,
1827        change_type_strat,
1828    )
1829        .prop_map(|(adds, drops, toggles, changes)| {
1830            adds.into_iter()
1831                .chain(drops)
1832                .chain(toggles)
1833                .chain(changes)
1834                .collect::<Vec<_>>()
1835        })
1836        .prop_shuffle()
1837        .boxed()
1838}
1839
1840#[cfg(test)]
1841mod tests {
1842    use super::*;
1843    use prost::Message;
1844
1845    #[mz_ore::test]
1846    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1847    fn smoktest_at_version() {
1848        let desc = RelationDesc::builder()
1849            .with_column("a", SqlScalarType::Bool.nullable(true))
1850            .with_column("z", SqlScalarType::String.nullable(false))
1851            .finish();
1852
1853        let mut versioned_desc = VersionedRelationDesc {
1854            inner: desc.clone(),
1855        };
1856        versioned_desc.validate();
1857
1858        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1859        assert_eq!(desc, latest);
1860
1861        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1862        assert_eq!(desc, v0);
1863
1864        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1865        assert_eq!(desc, v3);
1866
1867        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
1868        assert_eq!(v1, RelationVersion(1));
1869
1870        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1871        insta::assert_json_snapshot!(v1.metadata, @r###"
1872        {
1873          "0": {
1874            "name": "a",
1875            "typ_idx": 0,
1876            "added": 0,
1877            "dropped": null
1878          },
1879          "1": {
1880            "name": "z",
1881            "typ_idx": 1,
1882            "added": 0,
1883            "dropped": null
1884          },
1885          "2": {
1886            "name": "b",
1887            "typ_idx": 2,
1888            "added": 1,
1889            "dropped": null
1890          }
1891        }
1892        "###);
1893
1894        // Check that V0 doesn't show the new column.
1895        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1896        assert!(v0.iter().eq(v0_b.iter()));
1897
1898        let v2 = versioned_desc.drop_column("z");
1899        assert_eq!(v2, RelationVersion(2));
1900
1901        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1902        insta::assert_json_snapshot!(v2.metadata, @r###"
1903        {
1904          "0": {
1905            "name": "a",
1906            "typ_idx": 0,
1907            "added": 0,
1908            "dropped": null
1909          },
1910          "2": {
1911            "name": "b",
1912            "typ_idx": 1,
1913            "added": 1,
1914            "dropped": null
1915          }
1916        }
1917        "###);
1918
1919        // Check that V0 and V1 are still correct.
1920        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1921        assert!(v0.iter().eq(v0_c.iter()));
1922
1923        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1924        assert!(v1.iter().eq(v1_b.iter()));
1925
1926        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1927        {
1928          "0": {
1929            "name": "a",
1930            "typ_idx": 0,
1931            "added": 0,
1932            "dropped": null
1933          },
1934          "1": {
1935            "name": "z",
1936            "typ_idx": 1,
1937            "added": 0,
1938            "dropped": 2
1939          },
1940          "2": {
1941            "name": "b",
1942            "typ_idx": 2,
1943            "added": 1,
1944            "dropped": null
1945          }
1946        }
1947        "###);
1948    }
1949
1950    #[mz_ore::test]
1951    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1952    fn test_dropping_columns_with_keys() {
1953        let desc = RelationDesc::builder()
1954            .with_column("a", SqlScalarType::Bool.nullable(true))
1955            .with_column("z", SqlScalarType::String.nullable(false))
1956            .with_key(vec![1])
1957            .finish();
1958
1959        let mut versioned_desc = VersionedRelationDesc {
1960            inner: desc.clone(),
1961        };
1962        versioned_desc.validate();
1963
1964        let v1 = versioned_desc.drop_column("a");
1965        assert_eq!(v1, RelationVersion(1));
1966
1967        // Make sure the key index for 'z' got remapped since 'a' was dropped.
1968        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1969        insta::assert_json_snapshot!(v1, @r###"
1970        {
1971          "typ": {
1972            "column_types": [
1973              {
1974                "scalar_type": "String",
1975                "nullable": false
1976              }
1977            ],
1978            "keys": [
1979              [
1980                0
1981              ]
1982            ]
1983          },
1984          "metadata": {
1985            "1": {
1986              "name": "z",
1987              "typ_idx": 0,
1988              "added": 0,
1989              "dropped": null
1990            }
1991          }
1992        }
1993        "###);
1994
1995        // Make sure the key index of 'z' is correct when all columns are present.
1996        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1997        insta::assert_json_snapshot!(v0, @r###"
1998        {
1999          "typ": {
2000            "column_types": [
2001              {
2002                "scalar_type": "Bool",
2003                "nullable": true
2004              },
2005              {
2006                "scalar_type": "String",
2007                "nullable": false
2008              }
2009            ],
2010            "keys": [
2011              [
2012                1
2013              ]
2014            ]
2015          },
2016          "metadata": {
2017            "0": {
2018              "name": "a",
2019              "typ_idx": 0,
2020              "added": 0,
2021              "dropped": 1
2022            },
2023            "1": {
2024              "name": "z",
2025              "typ_idx": 1,
2026              "added": 0,
2027              "dropped": null
2028            }
2029          }
2030        }
2031        "###);
2032    }
2033
2034    #[mz_ore::test]
2035    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2036    fn roundtrip_relation_desc_without_metadata() {
2037        let typ = ProtoRelationType {
2038            column_types: vec![
2039                SqlScalarType::String.nullable(false).into_proto(),
2040                SqlScalarType::Bool.nullable(true).into_proto(),
2041            ],
2042            keys: vec![],
2043        };
2044        let proto = ProtoRelationDesc {
2045            typ: Some(typ),
2046            names: vec![
2047                ColumnName("a".into()).into_proto(),
2048                ColumnName("b".into()).into_proto(),
2049            ],
2050            metadata: vec![],
2051        };
2052        let desc: RelationDesc = proto.into_rust().unwrap();
2053
2054        insta::assert_json_snapshot!(desc, @r###"
2055        {
2056          "typ": {
2057            "column_types": [
2058              {
2059                "scalar_type": "String",
2060                "nullable": false
2061              },
2062              {
2063                "scalar_type": "Bool",
2064                "nullable": true
2065              }
2066            ],
2067            "keys": []
2068          },
2069          "metadata": {
2070            "0": {
2071              "name": "a",
2072              "typ_idx": 0,
2073              "added": 0,
2074              "dropped": null
2075            },
2076            "1": {
2077              "name": "b",
2078              "typ_idx": 1,
2079              "added": 0,
2080              "dropped": null
2081            }
2082          }
2083        }
2084        "###);
2085    }
2086
2087    #[mz_ore::test]
2088    #[should_panic(expected = "column named 'a' already exists!")]
2089    fn test_add_column_with_same_name_panics() {
2090        let desc = RelationDesc::builder()
2091            .with_column("a", SqlScalarType::Bool.nullable(true))
2092            .finish();
2093        let mut versioned = VersionedRelationDesc::new(desc);
2094
2095        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2096    }
2097
2098    #[mz_ore::test]
2099    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2100    fn test_add_column_with_same_name_prev_dropped() {
2101        let desc = RelationDesc::builder()
2102            .with_column("a", SqlScalarType::Bool.nullable(true))
2103            .finish();
2104        let mut versioned = VersionedRelationDesc::new(desc);
2105
2106        let v1 = versioned.drop_column("a");
2107        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2108        insta::assert_json_snapshot!(v1, @r###"
2109        {
2110          "typ": {
2111            "column_types": [],
2112            "keys": []
2113          },
2114          "metadata": {}
2115        }
2116        "###);
2117
2118        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2119        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2120        insta::assert_json_snapshot!(v2, @r###"
2121        {
2122          "typ": {
2123            "column_types": [
2124              {
2125                "scalar_type": "String",
2126                "nullable": false
2127              }
2128            ],
2129            "keys": []
2130          },
2131          "metadata": {
2132            "1": {
2133              "name": "a",
2134              "typ_idx": 0,
2135              "added": 2,
2136              "dropped": null
2137            }
2138          }
2139        }
2140        "###);
2141    }
2142
2143    #[mz_ore::test]
2144    #[cfg_attr(miri, ignore)]
2145    fn apply_demand() {
2146        let desc = RelationDesc::builder()
2147            .with_column("a", SqlScalarType::String.nullable(true))
2148            .with_column("b", SqlScalarType::Int64.nullable(false))
2149            .with_column("c", SqlScalarType::Time.nullable(false))
2150            .finish();
2151        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2152        assert_eq!(desc.arity(), 2);
2153        // TODO(parkmycar): Move validate onto RelationDesc.
2154        VersionedRelationDesc::new(desc).validate();
2155    }
2156
2157    #[mz_ore::test]
2158    #[cfg_attr(miri, ignore)]
2159    fn smoketest_column_index_stable_ident() {
2160        let idx_a = ColumnIndex(42);
2161        // Note(parkmycar): This should never change.
2162        assert_eq!(idx_a.to_stable_name(), "42");
2163    }
2164
2165    #[mz_ore::test]
2166    #[cfg_attr(miri, ignore)] // too slow
2167    fn proptest_relation_desc_roundtrips() {
2168        fn testcase(og: RelationDesc) {
2169            let bytes = og.into_proto().encode_to_vec();
2170            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2171            let rnd = RelationDesc::from_proto(proto).unwrap();
2172
2173            assert_eq!(og, rnd);
2174        }
2175
2176        proptest!(|(desc in any::<RelationDesc>())| {
2177            testcase(desc);
2178        });
2179
2180        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2181            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2182        });
2183
2184        proptest!(|((mut desc, diffs) in strat)| {
2185            for diff in diffs {
2186                diff.apply(&mut desc);
2187            };
2188            testcase(desc);
2189        });
2190    }
2191}