Skip to main content

mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::soft_panic_or_log;
19use mz_ore::str::StrExt;
20use mz_ore::{assert_none, assert_ok};
21use mz_persist_types::schema::SchemaId;
22use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
23use proptest::prelude::*;
24use proptest::strategy::{Strategy, Union};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31    ProtoRelationVersion,
32};
33use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
34
35/// The type of a [`Datum`].
36///
37/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
38/// Int32 or String) with its nullability.
39///
40/// To construct a column type, either initialize the struct directly, or
41/// use the [`SqlScalarType::nullable`] method.
42#[derive(
43    Arbitrary,
44    Clone,
45    Debug,
46    Eq,
47    PartialEq,
48    Ord,
49    PartialOrd,
50    Serialize,
51    Deserialize,
52    Hash,
53    MzReflect
54)]
55pub struct SqlColumnType {
56    /// The underlying scalar type (e.g., Int32 or String) of this column.
57    pub scalar_type: SqlScalarType,
58    /// Whether this datum can be null.
59    #[serde(default = "return_true")]
60    pub nullable: bool,
61}
62
63/// This method exists solely for the purpose of making SqlColumnType nullable by
64/// default in unit tests. The default value of a bool is false, and the only
65/// way to make an object take on any other value by default is to pass it a
66/// function that returns the desired default value. See
67/// <https://github.com/serde-rs/serde/issues/1030>
68#[inline(always)]
69fn return_true() -> bool {
70    true
71}
72
73impl SqlColumnType {
74    /// Compute the least upper bound of many column types, returning an error on
75    /// incompatible types or an empty iterator.
76    /// See [`SqlColumnType::try_union`] for details.
77    pub fn try_union_many<'a>(
78        typs: impl IntoIterator<Item = &'a Self>,
79    ) -> Result<Self, anyhow::Error> {
80        let mut iter = typs.into_iter();
81        let Some(typ) = iter.next() else {
82            bail!("Cannot union empty iterator");
83        };
84        iter.try_fold(typ.clone(), |a, b| a.try_union(b))
85    }
86
87    /// Compute the least upper bound of many column types.
88    /// See [`SqlColumnType::try_union`] for details.
89    ///
90    /// Panics on incompatible types or an empty iterator.
91    pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
92        Self::try_union_many(typs).expect("Cannot union empty iterator")
93    }
94
95    /// Backports nullability information from `backport_typ` into `self`,
96    /// affecting the outer `.nullable` field but also record fields deeper
97    /// into the type.
98    pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
99        self.scalar_type
100            .backport_nullability(&backport_typ.scalar_type);
101        self.nullable = backport_typ.nullable;
102    }
103
104    /// Compute the least upper bound of two column types at the SQL level.
105    ///
106    /// Two types are compatible when they are equal, share the same base type
107    /// (differing only in modifiers), or are records with pairwise-compatible
108    /// fields.
109    /// The resulting nullability is the disjunction of the two input
110    /// nullabilities.
111    ///
112    /// Returns an error for incompatible types, e.g. `Text` and `Int32`, or
113    /// `Text` and `VarChar` (different base types at the SQL level).
114    /// See [`SqlColumnType::try_union`] for a fallback that handles the latter
115    /// case via repr-level union.
116    pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
117        match (&self.scalar_type, &other.scalar_type) {
118            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
119                Ok(SqlColumnType {
120                    scalar_type: scalar_type.clone(),
121                    nullable: self.nullable || other.nullable,
122                })
123            }
124            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
125                Ok(SqlColumnType {
126                    scalar_type: scalar_type.without_modifiers(),
127                    nullable: self.nullable || other.nullable,
128                })
129            }
130            (
131                SqlScalarType::Record { fields, custom_id },
132                SqlScalarType::Record {
133                    fields: other_fields,
134                    custom_id: other_custom_id,
135                },
136            ) => {
137                if custom_id != other_custom_id {
138                    bail!(
139                        "Can't union types: {:?} and {:?}",
140                        self.scalar_type,
141                        other.scalar_type
142                    );
143                };
144
145                if fields.len() != other_fields.len() {
146                    bail!(
147                        "Can't union types: {:?} and {:?}",
148                        self.scalar_type,
149                        other.scalar_type
150                    );
151                }
152                let mut union_fields = Vec::with_capacity(fields.len());
153                for ((name, typ), (other_name, other_typ)) in
154                    fields.iter().zip_eq(other_fields.iter())
155                {
156                    if name != other_name {
157                        bail!(
158                            "Can't union types: {:?} and {:?}",
159                            self.scalar_type,
160                            other.scalar_type
161                        );
162                    } else {
163                        let union_column_type = typ.sql_union(other_typ)?;
164                        union_fields.push((name.clone(), union_column_type));
165                    };
166                }
167
168                Ok(SqlColumnType {
169                    scalar_type: SqlScalarType::Record {
170                        fields: union_fields.into(),
171                        custom_id: *custom_id,
172                    },
173                    nullable: self.nullable || other.nullable,
174                })
175            }
176            _ => bail!(
177                "Can't union types: {:?} and {:?}",
178                self.scalar_type,
179                other.scalar_type
180            ),
181        }
182    }
183
184    /// Compute the least upper bound of two column types.
185    ///
186    /// Attempts [`SqlColumnType::sql_union`] first, which preserves SQL-level type
187    /// information (e.g. modifiers). Falls back to a repr-level union via
188    /// [`ReprColumnType::union`] when the SQL types are incompatible but the
189    /// underlying repr types are compatible.
190    ///
191    /// The resulting nullability is the disjunction of the two input
192    /// nullabilities.
193    pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
194        self.sql_union(other).or_else(|e| {
195            let repr_self = ReprColumnType::from(self);
196            let repr_other = ReprColumnType::from(other);
197            match repr_self.union(&repr_other) {
198                Ok(typ) => {
199                    // sql_union failed but repr union succeeded — this indicates
200                    // a repr-type canonicalization gap that we want CI visibility for.
201                    soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
202                    Ok(SqlColumnType::from_repr(&typ))
203                }
204                Err(_) => {
205                    // Both sql_union and repr union failed — genuine type mismatch,
206                    // not a canonicalization issue. Just propagate the original error.
207                    Err(e)
208                }
209            }
210        })
211    }
212
213    /// Compute the least upper bound of two column types.
214    /// See [`SqlColumnType::try_union`] for details.
215    ///
216    /// Panics on incompatible types.
217    pub fn union(&self, other: &Self) -> Self {
218        self.try_union(other).unwrap_or_else(|e| {
219            panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
220        })
221    }
222
223    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
224    /// nullability set to the specified boolean.
225    pub fn nullable(mut self, nullable: bool) -> Self {
226        self.nullable = nullable;
227        self
228    }
229}
230
231impl RustType<ProtoColumnType> for SqlColumnType {
232    fn into_proto(&self) -> ProtoColumnType {
233        ProtoColumnType {
234            nullable: self.nullable,
235            scalar_type: Some(self.scalar_type.into_proto()),
236        }
237    }
238
239    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
240        Ok(SqlColumnType {
241            nullable: proto.nullable,
242            scalar_type: proto
243                .scalar_type
244                .into_rust_if_some("ProtoColumnType::scalar_type")?,
245        })
246    }
247}
248
249impl fmt::Display for SqlColumnType {
250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251        let nullable = if self.nullable { "Null" } else { "NotNull" };
252        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
253    }
254}
255
256/// The type of a relation.
257#[derive(
258    Arbitrary,
259    Clone,
260    Debug,
261    Eq,
262    PartialEq,
263    Ord,
264    PartialOrd,
265    Serialize,
266    Deserialize,
267    Hash,
268    MzReflect
269)]
270pub struct SqlRelationType {
271    /// The type for each column, in order.
272    pub column_types: Vec<SqlColumnType>,
273    /// Sets of indices that are "keys" for the collection.
274    ///
275    /// Each element in this list is a set of column indices, each with the
276    /// property that the collection contains at most one record with each
277    /// distinct set of values for each column. Alternately, for a specific set
278    /// of values assigned to the these columns there is at most one record.
279    ///
280    /// A collection can contain multiple sets of keys, although it is common to
281    /// have either zero or one sets of key indices.
282    #[serde(default)]
283    pub keys: Vec<Vec<usize>>,
284}
285
286impl SqlRelationType {
287    /// Constructs a `SqlRelationType` representing the relation with no columns and
288    /// no keys.
289    pub fn empty() -> Self {
290        SqlRelationType::new(vec![])
291    }
292
293    /// Constructs a new `SqlRelationType` from specified column types.
294    ///
295    /// The `SqlRelationType` will have no keys.
296    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
297        SqlRelationType {
298            column_types,
299            keys: Vec::new(),
300        }
301    }
302
303    /// Adds a new key for the relation.
304    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
305        indices.sort_unstable();
306        if !self.keys.contains(&indices) {
307            self.keys.push(indices);
308        }
309        self
310    }
311
312    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
313        for key in keys {
314            self = self.with_key(key)
315        }
316        self
317    }
318
319    /// Computes the number of columns in the relation.
320    pub fn arity(&self) -> usize {
321        self.column_types.len()
322    }
323
324    /// Gets the index of the columns used when creating a default index.
325    pub fn default_key(&self) -> Vec<usize> {
326        if let Some(key) = self.keys.first() {
327            if key.is_empty() {
328                (0..self.column_types.len()).collect()
329            } else {
330                key.clone()
331            }
332        } else {
333            (0..self.column_types.len()).collect()
334        }
335    }
336
337    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
338    pub fn columns(&self) -> &[SqlColumnType] {
339        &self.column_types
340    }
341
342    /// Adopts the nullability and keys from another `SqlRelationType`.
343    ///
344    /// Panics if the number of columns does not match.
345    pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
346        assert_eq!(
347            backport_typ.column_types.len(),
348            self.column_types.len(),
349            "HIR and MIR types should have the same number of columns"
350        );
351        for (backport_col, sql_col) in backport_typ
352            .column_types
353            .iter()
354            .zip_eq(self.column_types.iter_mut())
355        {
356            sql_col.backport_nullability(backport_col);
357        }
358
359        self.keys = backport_typ.keys.clone();
360    }
361
362    /// Constructs a `SqlRelationType` from a `ReprRelationType` by converting
363    /// each column type via [`SqlColumnType::from_repr`]. This is a lossy
364    /// inverse of `ReprRelationType::from(&SqlRelationType)`.
365    pub fn from_repr(repr: &ReprRelationType) -> Self {
366        SqlRelationType {
367            column_types: repr
368                .column_types
369                .iter()
370                .map(SqlColumnType::from_repr)
371                .collect(),
372            keys: repr.keys.clone(),
373        }
374    }
375}
376
377impl RustType<ProtoRelationType> for SqlRelationType {
378    fn into_proto(&self) -> ProtoRelationType {
379        ProtoRelationType {
380            column_types: self.column_types.into_proto(),
381            keys: self.keys.into_proto(),
382        }
383    }
384
385    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
386        Ok(SqlRelationType {
387            column_types: proto.column_types.into_rust()?,
388            keys: proto.keys.into_rust()?,
389        })
390    }
391}
392
393impl RustType<ProtoKey> for Vec<usize> {
394    fn into_proto(&self) -> ProtoKey {
395        ProtoKey {
396            keys: self.into_proto(),
397        }
398    }
399
400    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
401        proto.keys.into_rust()
402    }
403}
404
405/// The type of a relation.
406#[derive(
407    Clone,
408    Debug,
409    Eq,
410    PartialEq,
411    Ord,
412    PartialOrd,
413    Serialize,
414    Deserialize,
415    Hash,
416    MzReflect
417)]
418pub struct ReprRelationType {
419    /// The type for each column, in order.
420    pub column_types: Vec<ReprColumnType>,
421    /// Sets of indices that are "keys" for the collection.
422    ///
423    /// Each element in this list is a set of column indices, each with the
424    /// property that the collection contains at most one record with each
425    /// distinct set of values for each column. Alternately, for a specific set
426    /// of values assigned to the these columns there is at most one record.
427    ///
428    /// A collection can contain multiple sets of keys, although it is common to
429    /// have either zero or one sets of key indices.
430    #[serde(default)]
431    pub keys: Vec<Vec<usize>>,
432}
433
434impl ReprRelationType {
435    /// Constructs a `ReprRelationType` representing the relation with no columns and
436    /// no keys.
437    pub fn empty() -> Self {
438        ReprRelationType::new(vec![])
439    }
440
441    /// Constructs a new `ReprRelationType` from specified column types.
442    ///
443    /// The `ReprRelationType` will have no keys.
444    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
445        ReprRelationType {
446            column_types,
447            keys: Vec::new(),
448        }
449    }
450
451    /// Adds a new key for the relation.
452    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
453        indices.sort_unstable();
454        if !self.keys.contains(&indices) {
455            self.keys.push(indices);
456        }
457        self
458    }
459
460    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
461        for key in keys {
462            self = self.with_key(key)
463        }
464        self
465    }
466
467    /// Computes the number of columns in the relation.
468    pub fn arity(&self) -> usize {
469        self.column_types.len()
470    }
471
472    /// Gets the index of the columns used when creating a default index.
473    pub fn default_key(&self) -> Vec<usize> {
474        if let Some(key) = self.keys.first() {
475            if key.is_empty() {
476                (0..self.column_types.len()).collect()
477            } else {
478                key.clone()
479            }
480        } else {
481            (0..self.column_types.len()).collect()
482        }
483    }
484
485    /// Returns all the column types in order, for this relation.
486    pub fn columns(&self) -> &[ReprColumnType] {
487        &self.column_types
488    }
489}
490
491impl From<&SqlRelationType> for ReprRelationType {
492    fn from(sql_relation_type: &SqlRelationType) -> Self {
493        ReprRelationType {
494            column_types: sql_relation_type
495                .column_types
496                .iter()
497                .map(ReprColumnType::from)
498                .collect(),
499            keys: sql_relation_type.keys.clone(),
500        }
501    }
502}
503
504#[derive(
505    Clone,
506    Debug,
507    Eq,
508    PartialEq,
509    Ord,
510    PartialOrd,
511    Serialize,
512    Deserialize,
513    Hash,
514    MzReflect
515)]
516pub struct ReprColumnType {
517    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
518    pub scalar_type: ReprScalarType,
519    /// Whether this datum can be null.
520    #[serde(default = "return_true")]
521    pub nullable: bool,
522}
523
524impl ReprColumnType {
525    /// Compute the least upper bound of two column types at the repr level.
526    ///
527    /// More permissive than [`SqlColumnType::sql_union`] because it operates
528    /// on the underlying representation types, ignoring SQL-level distinctions
529    /// such as modifiers.
530    /// The resulting nullability is the disjunction of the two inputs.
531    pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
532        let scalar_type = self.scalar_type.union(&col.scalar_type)?;
533        let nullable = self.nullable || col.nullable;
534
535        Ok(ReprColumnType {
536            scalar_type,
537            nullable,
538        })
539    }
540}
541
542impl From<&SqlColumnType> for ReprColumnType {
543    fn from(sql_column_type: &SqlColumnType) -> Self {
544        let scalar_type = &sql_column_type.scalar_type;
545        let scalar_type = scalar_type.into();
546        let nullable = sql_column_type.nullable;
547
548        ReprColumnType {
549            scalar_type,
550            nullable,
551        }
552    }
553}
554
555impl SqlColumnType {
556    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
557    ///
558    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
559    pub fn from_repr(repr: &ReprColumnType) -> Self {
560        let scalar_type = &repr.scalar_type;
561        let scalar_type = SqlScalarType::from_repr(scalar_type);
562        let nullable = repr.nullable;
563
564        SqlColumnType {
565            scalar_type,
566            nullable,
567        }
568    }
569}
570
571/// The name of a column in a [`RelationDesc`].
572#[derive(
573    Clone,
574    Debug,
575    Eq,
576    PartialEq,
577    Ord,
578    PartialOrd,
579    Serialize,
580    Deserialize,
581    Hash,
582    MzReflect
583)]
584pub struct ColumnName(Box<str>);
585
586impl ColumnName {
587    /// Returns this column name as a `str`.
588    #[inline(always)]
589    pub fn as_str(&self) -> &str {
590        &*self
591    }
592
593    /// Returns this column name as a `&mut Box<str>`.
594    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
595        &mut self.0
596    }
597
598    /// Returns if this [`ColumnName`] is similar to the provided one.
599    pub fn is_similar(&self, other: &ColumnName) -> bool {
600        const SIMILARITY_THRESHOLD: f64 = 0.6;
601
602        let a_lowercase = self.to_lowercase();
603        let b_lowercase = other.to_lowercase();
604
605        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
606    }
607}
608
609impl std::ops::Deref for ColumnName {
610    type Target = str;
611
612    #[inline(always)]
613    fn deref(&self) -> &Self::Target {
614        &self.0
615    }
616}
617
618impl fmt::Display for ColumnName {
619    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
620        f.write_str(&self.0)
621    }
622}
623
624impl From<String> for ColumnName {
625    fn from(s: String) -> ColumnName {
626        ColumnName(s.into())
627    }
628}
629
630impl From<&str> for ColumnName {
631    fn from(s: &str) -> ColumnName {
632        ColumnName(s.into())
633    }
634}
635
636impl From<&ColumnName> for ColumnName {
637    fn from(n: &ColumnName) -> ColumnName {
638        n.clone()
639    }
640}
641
642impl RustType<ProtoColumnName> for ColumnName {
643    fn into_proto(&self) -> ProtoColumnName {
644        ProtoColumnName {
645            value: Some(self.0.to_string()),
646        }
647    }
648
649    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
650        Ok(ColumnName(
651            proto
652                .value
653                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
654                .into(),
655        ))
656    }
657}
658
659impl From<ColumnName> for mz_sql_parser::ast::Ident {
660    fn from(value: ColumnName) -> Self {
661        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
662        mz_sql_parser::ast::Ident::new_unchecked(value.0)
663    }
664}
665
666impl proptest::arbitrary::Arbitrary for ColumnName {
667    type Parameters = ();
668    type Strategy = BoxedStrategy<ColumnName>;
669
670    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
671        // Long column names are generally uninteresting, and can greatly
672        // increase the runtime for a test case, so bound the max length.
673        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
674        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
675            weights.extend([
676                (5, Just(16..128)),
677                (1, Just(128..1024)),
678                (1, Just(1024..4096)),
679            ]);
680        }
681        let name_length = Union::new_weighted(weights);
682
683        // Non-ASCII characters are also generally uninteresting and can make
684        // debugging harder.
685        let char_strat = Rc::new(Union::new_weighted(vec![
686            (50, proptest::char::range('A', 'z').boxed()),
687            (1, any::<char>().boxed()),
688        ]));
689
690        name_length
691            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
692            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
693            .no_shrink()
694            .boxed()
695    }
696}
697
698/// Default name of a column (when no other information is known).
699pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
700
701/// Stable index of a column in a [`RelationDesc`].
702#[derive(
703    Clone,
704    Copy,
705    Debug,
706    Eq,
707    PartialEq,
708    PartialOrd,
709    Ord,
710    Serialize,
711    Deserialize,
712    Hash,
713    MzReflect
714)]
715pub struct ColumnIndex(usize);
716
717static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
718
719impl ColumnIndex {
720    /// Returns a stable identifier for this [`ColumnIndex`].
721    pub fn to_stable_name(&self) -> String {
722        self.0.to_string()
723    }
724
725    pub fn to_raw(&self) -> usize {
726        self.0
727    }
728
729    pub fn from_raw(val: usize) -> Self {
730        ColumnIndex(val)
731    }
732}
733
734/// The version a given column was added at.
735#[derive(
736    Clone,
737    Copy,
738    Debug,
739    Eq,
740    PartialEq,
741    PartialOrd,
742    Ord,
743    Serialize,
744    Deserialize,
745    Hash,
746    MzReflect,
747    Arbitrary
748)]
749pub struct RelationVersion(u64);
750
751impl RelationVersion {
752    /// Returns the "root" or "initial" version of a [`RelationDesc`].
753    pub fn root() -> Self {
754        RelationVersion(0)
755    }
756
757    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
758    pub fn bump(&self) -> Self {
759        let next_version = self
760            .0
761            .checked_add(1)
762            .expect("added more than u64::MAX columns?");
763        RelationVersion(next_version)
764    }
765
766    /// Consume a [`RelationVersion`] returning the raw value.
767    ///
768    /// Should __only__ be used for serialization.
769    pub fn into_raw(self) -> u64 {
770        self.0
771    }
772
773    /// Create a [`RelationVersion`] from a raw value.
774    ///
775    /// Should __only__ be used for serialization.
776    pub fn from_raw(val: u64) -> RelationVersion {
777        RelationVersion(val)
778    }
779}
780
781impl From<RelationVersion> for SchemaId {
782    fn from(value: RelationVersion) -> Self {
783        SchemaId(usize::cast_from(value.0))
784    }
785}
786
787impl From<mz_sql_parser::ast::Version> for RelationVersion {
788    fn from(value: mz_sql_parser::ast::Version) -> Self {
789        RelationVersion(value.into_inner())
790    }
791}
792
793impl fmt::Display for RelationVersion {
794    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
795        write!(f, "v{}", self.0)
796    }
797}
798
799impl From<RelationVersion> for mz_sql_parser::ast::Version {
800    fn from(value: RelationVersion) -> Self {
801        mz_sql_parser::ast::Version::new(value.0)
802    }
803}
804
805impl RustType<ProtoRelationVersion> for RelationVersion {
806    fn into_proto(&self) -> ProtoRelationVersion {
807        ProtoRelationVersion { value: self.0 }
808    }
809
810    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
811        Ok(RelationVersion(proto.value))
812    }
813}
814
815/// Metadata (other than type) for a column in a [`RelationDesc`].
816#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
817struct ColumnMetadata {
818    /// Name of the column.
819    name: ColumnName,
820    /// Index into a [`SqlRelationType`] for this column.
821    typ_idx: usize,
822    /// Version this column was added at.
823    added: RelationVersion,
824    /// Version this column was dropped at.
825    dropped: Option<RelationVersion>,
826}
827
828/// A description of the shape of a relation.
829///
830/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
831/// the relation.
832///
833/// # Examples
834///
835/// A `RelationDesc`s is typically constructed via its builder API:
836///
837/// ```
838/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
839///
840/// let desc = RelationDesc::builder()
841///     .with_column("id", SqlScalarType::Int64.nullable(false))
842///     .with_column("price", SqlScalarType::Float64.nullable(true))
843///     .finish();
844/// ```
845///
846/// In more complicated cases, like when constructing a `RelationDesc` in
847/// response to user input, it may be more convenient to construct a relation
848/// type first, and imbue it with column names to form a `RelationDesc` later:
849///
850/// ```
851/// use mz_repr::RelationDesc;
852///
853/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
854/// let relation_type = plan_query("SELECT * FROM table");
855/// let names = (0..relation_type.arity()).map(|i| match i {
856///     0 => "first",
857///     1 => "second",
858///     _ => "unknown",
859/// });
860/// let desc = RelationDesc::new(relation_type, names);
861/// ```
862///
863/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
864/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
865/// column throughout the lifetime of the relation. This allows a
866/// [`RelationDesc`] to represent a projection over a version of itself.
867///
868/// ```
869/// use std::collections::BTreeSet;
870/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
871///
872/// let desc = RelationDesc::builder()
873///     .with_column("name", SqlScalarType::String.nullable(false))
874///     .with_column("email", SqlScalarType::String.nullable(false))
875///     .finish();
876///
877/// // Project away the second column.
878/// let demands = BTreeSet::from([1]);
879/// let proj = desc.apply_demand(&demands);
880///
881/// // We projected away the first column.
882/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
883/// // But retained the second.
884/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
885///
886/// // The underlying `SqlRelationType` also contains a single column.
887/// assert_eq!(proj.typ().arity(), 1);
888/// ```
889///
890/// To maintain this stable mapping and track the lifetime of a column (e.g.
891/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
892/// the index in [`SqlRelationType`] that corresponds to a given column, and the
893/// version at which this column was added or dropped.
894///
895#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
896pub struct RelationDesc {
897    typ: SqlRelationType,
898    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
899}
900
901impl RustType<ProtoRelationDesc> for RelationDesc {
902    fn into_proto(&self) -> ProtoRelationDesc {
903        let (names, metadata): (Vec<_>, Vec<_>) = self
904            .metadata
905            .values()
906            .map(|meta| {
907                let metadata = ProtoColumnMetadata {
908                    added: Some(meta.added.into_proto()),
909                    dropped: meta.dropped.map(|v| v.into_proto()),
910                };
911                (meta.name.into_proto(), metadata)
912            })
913            .unzip();
914
915        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
916        // metadata field was added. To make sure our serialization roundtrips the same as before
917        // we added the field, we omit `metadata` if all of the values are equal to the default.
918        //
919        // Note: This logic needs to exist approximately forever.
920        let is_all_default_metadata = metadata.iter().all(|meta| {
921            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
922        });
923        let metadata = if is_all_default_metadata {
924            Vec::new()
925        } else {
926            metadata
927        };
928
929        ProtoRelationDesc {
930            typ: Some(self.typ.into_proto()),
931            names,
932            metadata,
933        }
934    }
935
936    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
937        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
938        // metadata field was added. If the field doesn't exist we fill it in with default values,
939        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
940        //
941        // Note: This logic needs to exist approximately forever.
942        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
943            let val = ProtoColumnMetadata {
944                added: Some(RelationVersion::root().into_proto()),
945                dropped: None,
946            };
947            Box::new(itertools::repeat_n(val, proto.names.len()))
948        } else {
949            Box::new(proto.metadata.into_iter())
950        };
951
952        let metadata = proto
953            .names
954            .into_iter()
955            .zip_eq(proto_metadata)
956            .enumerate()
957            .map(|(idx, (name, metadata))| {
958                let meta = ColumnMetadata {
959                    name: name.into_rust()?,
960                    typ_idx: idx,
961                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
962                    dropped: metadata.dropped.into_rust()?,
963                };
964                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
965            })
966            .collect::<Result<_, _>>()?;
967
968        Ok(RelationDesc {
969            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
970            metadata,
971        })
972    }
973}
974
975impl RelationDesc {
976    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
977    pub fn builder() -> RelationDescBuilder {
978        RelationDescBuilder::default()
979    }
980
981    /// Constructs a new `RelationDesc` that represents the empty relation
982    /// with no columns and no keys.
983    pub fn empty() -> Self {
984        RelationDesc {
985            typ: SqlRelationType::empty(),
986            metadata: BTreeMap::default(),
987        }
988    }
989
990    /// Check if the `RelationDesc` is empty.
991    pub fn is_empty(&self) -> bool {
992        self == &Self::empty()
993    }
994
995    /// Returns the number of columns in this [`RelationDesc`].
996    pub fn len(&self) -> usize {
997        self.typ().column_types.len()
998    }
999
1000    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
1001    /// over column names.
1002    ///
1003    /// # Panics
1004    ///
1005    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
1006    /// items in `names`.
1007    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1008    where
1009        I: IntoIterator<Item = N>,
1010        N: Into<ColumnName>,
1011    {
1012        let metadata: BTreeMap<_, _> = names
1013            .into_iter()
1014            .enumerate()
1015            .map(|(idx, name)| {
1016                let col_idx = ColumnIndex(idx);
1017                let metadata = ColumnMetadata {
1018                    name: name.into(),
1019                    typ_idx: idx,
1020                    added: RelationVersion::root(),
1021                    dropped: None,
1022                };
1023                (col_idx, metadata)
1024            })
1025            .collect();
1026
1027        // TODO(parkmycar): Add better validation here.
1028        assert_eq!(typ.column_types.len(), metadata.len());
1029
1030        RelationDesc { typ, metadata }
1031    }
1032
1033    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1034    where
1035        I: IntoIterator<Item = (N, T)>,
1036        T: Into<SqlColumnType>,
1037        N: Into<ColumnName>,
1038    {
1039        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1040        let types = types.into_iter().map(Into::into).collect();
1041        let typ = SqlRelationType::new(types);
1042        Self::new(typ, names)
1043    }
1044
1045    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
1046    ///
1047    /// # Panics
1048    ///
1049    /// Panics if either `self` or `other` have columns that were added at a
1050    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1051    /// columns were dropped.
1052    ///
1053    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
1054    pub fn concat(mut self, other: Self) -> Self {
1055        let self_len = self.typ.column_types.len();
1056
1057        for (typ, (_col_idx, meta)) in other
1058            .typ
1059            .column_types
1060            .into_iter()
1061            .zip_eq(other.metadata.into_iter())
1062        {
1063            assert_eq!(meta.added, RelationVersion::root());
1064            assert_none!(meta.dropped);
1065
1066            let new_idx = self.typ.columns().len();
1067            let new_meta = ColumnMetadata {
1068                name: meta.name,
1069                typ_idx: new_idx,
1070                added: RelationVersion::root(),
1071                dropped: None,
1072            };
1073
1074            self.typ.column_types.push(typ);
1075            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1076
1077            assert_eq!(self.metadata.len(), self.typ.columns().len());
1078            assert_none!(prev);
1079        }
1080
1081        for k in other.typ.keys {
1082            let k = k.into_iter().map(|idx| idx + self_len).collect();
1083            self = self.with_key(k);
1084        }
1085        self
1086    }
1087
1088    /// Adds a new key for the relation.
1089    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1090        self.typ = self.typ.with_key(indices);
1091        self
1092    }
1093
1094    /// Drops all existing keys.
1095    pub fn without_keys(mut self) -> Self {
1096        self.typ.keys.clear();
1097        self
1098    }
1099
1100    /// Builds a new relation description with the column names replaced with
1101    /// new names.
1102    ///
1103    /// # Panics
1104    ///
1105    /// Panics if the arity of the relation type does not match the number of
1106    /// items in `names`.
1107    pub fn with_names<I, N>(self, names: I) -> Self
1108    where
1109        I: IntoIterator<Item = N>,
1110        N: Into<ColumnName>,
1111    {
1112        Self::new(self.typ, names)
1113    }
1114
1115    /// Computes the number of columns in the relation.
1116    pub fn arity(&self) -> usize {
1117        self.typ.arity()
1118    }
1119
1120    /// Returns the relation type underlying this relation description.
1121    pub fn typ(&self) -> &SqlRelationType {
1122        &self.typ
1123    }
1124
1125    /// Returns the owned relation type underlying this relation description.
1126    pub fn into_typ(self) -> SqlRelationType {
1127        self.typ
1128    }
1129
1130    /// Returns an iterator over the columns in this relation.
1131    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1132        self.metadata.values().map(|meta| {
1133            let typ = &self.typ.columns()[meta.typ_idx];
1134            (&meta.name, typ)
1135        })
1136    }
1137
1138    /// Returns an iterator over the types of the columns in this relation.
1139    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1140        self.typ.column_types.iter()
1141    }
1142
1143    /// Returns an iterator over the names of the columns in this relation.
1144    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1145        self.metadata.values().map(|meta| &meta.name)
1146    }
1147
1148    /// Returns an iterator over the columns in this relation, with all their metadata.
1149    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1150        self.metadata.iter().map(|(col_idx, metadata)| {
1151            let col_typ = &self.typ.columns()[metadata.typ_idx];
1152            (col_idx, &metadata.name, col_typ)
1153        })
1154    }
1155
1156    /// Returns an iterator over the names of the columns in this relation that are "similar" to
1157    /// the provided `name`.
1158    pub fn iter_similar_names<'a>(
1159        &'a self,
1160        name: &'a ColumnName,
1161    ) -> impl Iterator<Item = &'a ColumnName> {
1162        self.iter_names().filter(|n| n.is_similar(name))
1163    }
1164
1165    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
1166    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1167        self.metadata.contains_key(idx)
1168    }
1169
1170    /// Finds a column by name.
1171    ///
1172    /// Returns the index and type of the column named `name`. If no column with
1173    /// the specified name exists, returns `None`. If multiple columns have the
1174    /// specified name, the leftmost column is returned.
1175    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1176        self.iter_names()
1177            .position(|n| n == name)
1178            .map(|i| (i, &self.typ.column_types[i]))
1179    }
1180
1181    /// Gets the name of the `i`th column.
1182    ///
1183    /// # Panics
1184    ///
1185    /// Panics if `i` is not a valid column index.
1186    ///
1187    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
1188    pub fn get_name(&self, i: usize) -> &ColumnName {
1189        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1190        self.get_name_idx(&ColumnIndex(i))
1191    }
1192
1193    /// Gets the name of the column at `idx`.
1194    ///
1195    /// # Panics
1196    ///
1197    /// Panics if no column exists at `idx`.
1198    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1199        &self.metadata.get(idx).expect("should exist").name
1200    }
1201
1202    /// Mutably gets the name of the `i`th column.
1203    ///
1204    /// # Panics
1205    ///
1206    /// Panics if `i` is not a valid column index.
1207    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1208        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1209        &mut self
1210            .metadata
1211            .get_mut(&ColumnIndex(i))
1212            .expect("should exist")
1213            .name
1214    }
1215
1216    /// Gets the [`SqlColumnType`] of the column at `idx`.
1217    ///
1218    /// # Panics
1219    ///
1220    /// Panics if no column exists at `idx`.
1221    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1222        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1223        &self.typ.column_types[typ_idx]
1224    }
1225
1226    /// Gets the name of the `i`th column if that column name is unambiguous.
1227    ///
1228    /// If at least one other column has the same name as the `i`th column,
1229    /// returns `None`. If the `i`th column has no name, returns `None`.
1230    ///
1231    /// # Panics
1232    ///
1233    /// Panics if `i` is not a valid column index.
1234    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1235        let name = self.get_name(i);
1236        if self.iter_names().filter(|n| *n == name).count() == 1 {
1237            Some(name)
1238        } else {
1239            None
1240        }
1241    }
1242
1243    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1244    ///
1245    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1246    /// structure will be simple to extend.
1247    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1248        let name = self.get_name(i);
1249        let typ = &self.typ.column_types[i];
1250        if d == &Datum::Null && !typ.nullable {
1251            Err(NotNullViolation(name.clone()))
1252        } else {
1253            Ok(())
1254        }
1255    }
1256
1257    /// Computes the differences between two [`RelationDesc`]s.
1258    ///
1259    /// Returns a rich diff describing which columns differ, and in what way.
1260    ///
1261    /// # Panics
1262    ///
1263    /// Panics if either `self` or `other` have columns that were added at a
1264    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1265    /// columns were dropped.
1266    ///
1267    /// This simplifies things by allowing us to assume that `ColumnIndex`es are
1268    /// dense and that they match the indexes of `typ.columns()`. Without this
1269    /// we would, e.g., struggle comparing keys as those are in terms of
1270    /// `typ.columns()` indexes.
1271    pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1272        assert_eq!(self.metadata.len(), self.typ.columns().len());
1273        assert_eq!(other.metadata.len(), other.typ.columns().len());
1274        for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1275            assert_eq!(meta.typ_idx, idx.0);
1276            assert_eq!(meta.added, RelationVersion::root());
1277            assert_none!(meta.dropped);
1278        }
1279
1280        let mut column_diffs = BTreeMap::new();
1281        let mut key_diff = None;
1282
1283        let left_arity = self.arity();
1284        let right_arity = other.arity();
1285        let common_arity = std::cmp::min(left_arity, right_arity);
1286
1287        for idx in 0..common_arity {
1288            let left_name = self.get_name(idx);
1289            let right_name = other.get_name(idx);
1290            let left_type = &self.typ.column_types[idx];
1291            let right_type = &other.typ.column_types[idx];
1292
1293            if left_name != right_name {
1294                let diff = ColumnDiff::NameMismatch {
1295                    left: left_name.clone(),
1296                    right: right_name.clone(),
1297                };
1298                column_diffs.insert(idx, diff);
1299            } else if left_type.scalar_type != right_type.scalar_type {
1300                let diff = ColumnDiff::TypeMismatch {
1301                    name: left_name.clone(),
1302                    left: left_type.scalar_type.clone(),
1303                    right: right_type.scalar_type.clone(),
1304                };
1305                column_diffs.insert(idx, diff);
1306            } else if left_type.nullable != right_type.nullable {
1307                let diff = ColumnDiff::NullabilityMismatch {
1308                    name: left_name.clone(),
1309                    left: left_type.nullable,
1310                    right: right_type.nullable,
1311                };
1312                column_diffs.insert(idx, diff);
1313            }
1314        }
1315
1316        for idx in common_arity..left_arity {
1317            let diff = ColumnDiff::Missing {
1318                name: self.get_name(idx).clone(),
1319            };
1320            column_diffs.insert(idx, diff);
1321        }
1322
1323        for idx in common_arity..right_arity {
1324            let diff = ColumnDiff::Extra {
1325                name: other.get_name(idx).clone(),
1326            };
1327            column_diffs.insert(idx, diff);
1328        }
1329
1330        let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1331        let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1332        if left_keys != right_keys {
1333            let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1334                keys.iter()
1335                    .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1336                    .collect()
1337            };
1338            key_diff = Some(KeyDiff {
1339                left: column_names(self, left_keys),
1340                right: column_names(other, right_keys),
1341            });
1342        }
1343
1344        RelationDescDiff {
1345            column_diffs,
1346            key_diff,
1347        }
1348    }
1349
1350    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1351    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1352        let mut new_desc = self.clone();
1353
1354        // Update ColumnMetadata.
1355        let mut removed = 0;
1356        new_desc.metadata.retain(|idx, metadata| {
1357            let retain = demands.contains(&idx.0);
1358            if !retain {
1359                removed += 1;
1360            } else {
1361                metadata.typ_idx -= removed;
1362            }
1363            retain
1364        });
1365
1366        // Update SqlColumnType.
1367        let mut idx = 0;
1368        new_desc.typ.column_types.retain(|_| {
1369            let keep = demands.contains(&idx);
1370            idx += 1;
1371            keep
1372        });
1373
1374        new_desc
1375    }
1376}
1377
1378impl Arbitrary for RelationDesc {
1379    type Parameters = ();
1380    type Strategy = BoxedStrategy<RelationDesc>;
1381
1382    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1383        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1384        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1385            weights.extend([
1386                (12, Just(16..32)),
1387                (6, Just(32..64)),
1388                (3, Just(64..128)),
1389                (1, Just(128..256)),
1390            ]);
1391        }
1392        let num_columns = Union::new_weighted(weights);
1393
1394        num_columns.prop_flat_map(arb_relation_desc).boxed()
1395    }
1396}
1397
1398/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1399/// within the range provided.
1400pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1401    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1402        .prop_map(RelationDesc::from_names_and_types)
1403}
1404
1405/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1406pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1407    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1408    mask.prop_map(move |mask| {
1409        let demands: BTreeSet<_> = mask
1410            .into_iter()
1411            .enumerate()
1412            .filter_map(|(idx, keep)| keep.then_some(idx))
1413            .collect();
1414        desc.apply_demand(&demands)
1415    })
1416}
1417
1418impl IntoIterator for RelationDesc {
1419    type Item = (ColumnName, SqlColumnType);
1420    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1421
1422    fn into_iter(self) -> Self::IntoIter {
1423        let iter = self
1424            .metadata
1425            .into_values()
1426            .zip_eq(self.typ.column_types)
1427            .map(|(meta, typ)| (meta.name, typ));
1428        Box::new(iter)
1429    }
1430}
1431
1432/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1433pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1434    let datums: Vec<_> = desc
1435        .typ()
1436        .columns()
1437        .iter()
1438        .cloned()
1439        .map(arb_datum_for_column)
1440        .collect();
1441    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1442}
1443
1444/// Expression violated not-null constraint on named column
1445#[derive(Debug, PartialEq, Eq)]
1446pub struct NotNullViolation(pub ColumnName);
1447
1448impl fmt::Display for NotNullViolation {
1449    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1450        write!(
1451            f,
1452            "null value in column {} violates not-null constraint",
1453            self.0.quoted()
1454        )
1455    }
1456}
1457
1458/// The result of comparing two [`RelationDesc`]s.
1459#[derive(Debug, Clone, PartialEq, Eq)]
1460pub struct RelationDescDiff {
1461    /// Column differences, keyed by column index.
1462    pub column_diffs: BTreeMap<usize, ColumnDiff>,
1463    /// Key differences, if any.
1464    pub key_diff: Option<KeyDiff>,
1465}
1466
1467impl RelationDescDiff {
1468    /// Returns whether the diff contains any differences.
1469    pub fn is_empty(&self) -> bool {
1470        self.column_diffs.is_empty() && self.key_diff.is_none()
1471    }
1472}
1473
1474/// A difference in a column between two [`RelationDesc`]s.
1475#[derive(Debug, Clone, PartialEq, Eq)]
1476pub enum ColumnDiff {
1477    /// Column exists only in the left relation.
1478    Missing { name: ColumnName },
1479    /// Column exists only in the right relation.
1480    Extra { name: ColumnName },
1481    /// Columns have different types.
1482    TypeMismatch {
1483        name: ColumnName,
1484        left: SqlScalarType,
1485        right: SqlScalarType,
1486    },
1487    /// Columns have different nullability.
1488    NullabilityMismatch {
1489        name: ColumnName,
1490        left: bool,
1491        right: bool,
1492    },
1493    /// Columns have different names.
1494    NameMismatch { left: ColumnName, right: ColumnName },
1495}
1496
1497/// A difference in the keys of two [`RelationDesc`]s.
1498#[derive(Debug, Clone, PartialEq, Eq)]
1499pub struct KeyDiff {
1500    /// Keys of the left relation.
1501    pub left: BTreeSet<Vec<ColumnName>>,
1502    /// Keys of the right relation.
1503    pub right: BTreeSet<Vec<ColumnName>>,
1504}
1505
1506/// A builder for a [`RelationDesc`].
1507#[derive(Clone, Default, Debug, PartialEq, Eq)]
1508pub struct RelationDescBuilder {
1509    /// Columns of the relation.
1510    columns: Vec<(ColumnName, SqlColumnType)>,
1511    /// Sets of indices that are "keys" for the collection.
1512    keys: Vec<Vec<usize>>,
1513}
1514
1515impl RelationDescBuilder {
1516    /// Appends a column with the specified name and type.
1517    pub fn with_column<N: Into<ColumnName>>(
1518        mut self,
1519        name: N,
1520        ty: SqlColumnType,
1521    ) -> RelationDescBuilder {
1522        let name = name.into();
1523        self.columns.push((name, ty));
1524        self
1525    }
1526
1527    /// Appends the provided columns to the builder.
1528    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1529    where
1530        I: IntoIterator<Item = (N, T)>,
1531        T: Into<SqlColumnType>,
1532        N: Into<ColumnName>,
1533    {
1534        self.columns
1535            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1536        self
1537    }
1538
1539    /// Adds a new key for the relation.
1540    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1541        indices.sort_unstable();
1542        if !self.keys.contains(&indices) {
1543            self.keys.push(indices);
1544        }
1545        self
1546    }
1547
1548    /// Removes all previously inserted keys.
1549    pub fn without_keys(mut self) -> RelationDescBuilder {
1550        self.keys.clear();
1551        assert_eq!(self.keys.len(), 0);
1552        self
1553    }
1554
1555    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1556    pub fn concat(mut self, other: Self) -> Self {
1557        let self_len = self.columns.len();
1558
1559        self.columns.extend(other.columns);
1560        for k in other.keys {
1561            let k = k.into_iter().map(|idx| idx + self_len).collect();
1562            self = self.with_key(k);
1563        }
1564
1565        self
1566    }
1567
1568    /// Finish the builder, returning a [`RelationDesc`].
1569    pub fn finish(self) -> RelationDesc {
1570        let mut desc = RelationDesc::from_names_and_types(self.columns);
1571        desc.typ = desc.typ.with_keys(self.keys);
1572        desc
1573    }
1574}
1575
1576/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1577#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1578pub enum RelationVersionSelector {
1579    Specific(RelationVersion),
1580    Latest,
1581}
1582
1583impl RelationVersionSelector {
1584    pub fn specific(version: u64) -> Self {
1585        RelationVersionSelector::Specific(RelationVersion(version))
1586    }
1587}
1588
1589/// A wrapper around [`RelationDesc`] that provides an interface for adding
1590/// columns and generating new versions.
1591///
1592/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1593/// be great.
1594#[derive(Debug, Clone, Serialize)]
1595pub struct VersionedRelationDesc {
1596    inner: RelationDesc,
1597}
1598
1599impl VersionedRelationDesc {
1600    pub fn new(inner: RelationDesc) -> Self {
1601        VersionedRelationDesc { inner }
1602    }
1603
1604    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1605    ///
1606    /// # Panics
1607    ///
1608    /// * Panics if a column with `name` already exists that hasn't been dropped.
1609    ///
1610    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1611    #[must_use]
1612    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1613    where
1614        N: Into<ColumnName>,
1615        T: Into<SqlColumnType>,
1616    {
1617        let latest_version = self.latest_version();
1618        let new_version = latest_version.bump();
1619
1620        let name = name.into();
1621        let existing = self
1622            .inner
1623            .metadata
1624            .iter()
1625            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1626        if let Some(existing) = existing {
1627            panic!("column named '{name}' already exists! {existing:?}");
1628        }
1629
1630        let next_idx = self.inner.metadata.len();
1631        let col_meta = ColumnMetadata {
1632            name,
1633            typ_idx: next_idx,
1634            added: new_version,
1635            dropped: None,
1636        };
1637
1638        self.inner.typ.column_types.push(typ.into());
1639        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1640
1641        assert_none!(prev, "column index overlap!");
1642        self.validate();
1643
1644        new_version
1645    }
1646
1647    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1648    /// `name` drops the left-most one that hasn't already been dropped.
1649    ///
1650    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1651    ///
1652    /// # Panics
1653    ///
1654    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1655    #[must_use]
1656    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1657    where
1658        N: Into<ColumnName>,
1659    {
1660        let name = name.into();
1661        let latest_version = self.latest_version();
1662        let new_version = latest_version.bump();
1663
1664        let col = self
1665            .inner
1666            .metadata
1667            .values_mut()
1668            .find(|meta| meta.name == name && meta.dropped.is_none())
1669            .expect("column to exist");
1670
1671        // Make sure the column hadn't been previously dropped.
1672        assert_none!(col.dropped, "column was already dropped");
1673        col.dropped = Some(new_version);
1674
1675        // Make sure the column isn't being used as a key.
1676        let dropped_key = self
1677            .inner
1678            .typ
1679            .keys
1680            .iter()
1681            .any(|keys| keys.contains(&col.typ_idx));
1682        assert!(!dropped_key, "column being dropped was used as a key");
1683
1684        self.validate();
1685        new_version
1686    }
1687
1688    /// Returns the [`RelationDesc`] at the latest version.
1689    pub fn latest(&self) -> RelationDesc {
1690        self.inner.clone()
1691    }
1692
1693    /// Returns this [`RelationDesc`] at the specified version.
1694    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1695        // Get all of the changes from the start, up to whatever version was requested.
1696        let up_to_version = match version {
1697            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1698            RelationVersionSelector::Specific(v) => v,
1699        };
1700
1701        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1702            let added = meta.added <= up_to_version;
1703            let dropped = meta
1704                .dropped
1705                .map(|dropped_at| up_to_version >= dropped_at)
1706                .unwrap_or(false);
1707
1708            added && !dropped
1709        });
1710
1711        let mut column_types = Vec::new();
1712        let mut column_metas = BTreeMap::new();
1713
1714        // N.B. At this point we need to be careful because col_idx might not
1715        // equal typ_idx.
1716        //
1717        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1718        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1719        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1720        for (col_idx, meta) in valid_columns {
1721            let new_meta = ColumnMetadata {
1722                name: meta.name.clone(),
1723                typ_idx: column_types.len(),
1724                added: meta.added.clone(),
1725                dropped: meta.dropped.clone(),
1726            };
1727            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1728            column_metas.insert(*col_idx, new_meta);
1729        }
1730
1731        // Remap keys in case a column with an index less than that of a key was
1732        // dropped.
1733        //
1734        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1735        // keys and "b" was dropped.
1736        let keys = self
1737            .inner
1738            .typ
1739            .keys
1740            .iter()
1741            .map(|keys| {
1742                keys.iter()
1743                    .map(|key_idx| {
1744                        let metadata = column_metas
1745                            .get(&ColumnIndex(*key_idx))
1746                            .expect("found key for column that doesn't exist");
1747                        metadata.typ_idx
1748                    })
1749                    .collect()
1750            })
1751            .collect();
1752
1753        let relation_type = SqlRelationType { column_types, keys };
1754
1755        RelationDesc {
1756            typ: relation_type,
1757            metadata: column_metas,
1758        }
1759    }
1760
1761    pub fn latest_version(&self) -> RelationVersion {
1762        self.inner
1763            .metadata
1764            .values()
1765            // N.B. Dropped is always greater than added.
1766            .map(|meta| meta.dropped.unwrap_or(meta.added))
1767            .max()
1768            // If there aren't any columns we're implicitly the root version.
1769            .unwrap_or_else(RelationVersion::root)
1770    }
1771
1772    /// Validates internal contraints of the [`RelationDesc`] are correct.
1773    ///
1774    /// # Panics
1775    ///
1776    /// Panics if a constraint is not satisfied.
1777    fn validate(&self) {
1778        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1779            if desc.typ.column_types.len() != desc.metadata.len() {
1780                anyhow::bail!("mismatch between number of types and metadatas");
1781            }
1782
1783            for (col_idx, meta) in &desc.metadata {
1784                if col_idx.0 > desc.metadata.len() {
1785                    anyhow::bail!("column index out of bounds");
1786                }
1787                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1788                    anyhow::bail!("column was added after it was dropped?");
1789                }
1790                if desc.typ().columns().get(meta.typ_idx).is_none() {
1791                    anyhow::bail!("typ_idx incorrect");
1792                }
1793            }
1794
1795            for keys in &desc.typ.keys {
1796                for key in keys {
1797                    if *key >= desc.typ.column_types.len() {
1798                        anyhow::bail!("key index was out of bounds!");
1799                    }
1800                }
1801            }
1802
1803            let versions = desc
1804                .metadata
1805                .values()
1806                .map(|meta| meta.dropped.unwrap_or(meta.added));
1807            let mut max = 0;
1808            let mut sum = 0;
1809            for version in versions {
1810                max = std::cmp::max(max, version.0);
1811                sum += version.0;
1812            }
1813
1814            // Other than RelationVersion(0), we should never have duplicate
1815            // versions and they should always increase by 1. In other words, the
1816            // sum of all RelationVersions should be the sum of [0, max].
1817            //
1818            // N.B. n * (n + 1) / 2 = sum of [0, n]
1819            //
1820            // While I normally don't like tricks like this, it allows us to
1821            // validate that our column versions are correct in O(n) time and
1822            // without allocations.
1823            if sum != (max * (max + 1) / 2) {
1824                anyhow::bail!("there is a duplicate or missing relation version");
1825            }
1826
1827            Ok(())
1828        }
1829
1830        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1831    }
1832}
1833
1834/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1835/// exercise schema migrations.
1836#[derive(Debug)]
1837pub enum PropRelationDescDiff {
1838    AddColumn {
1839        name: ColumnName,
1840        typ: SqlColumnType,
1841    },
1842    DropColumn {
1843        name: ColumnName,
1844    },
1845    ToggleNullability {
1846        name: ColumnName,
1847    },
1848    ChangeType {
1849        name: ColumnName,
1850        typ: SqlColumnType,
1851    },
1852}
1853
1854impl PropRelationDescDiff {
1855    pub fn apply(self, desc: &mut RelationDesc) {
1856        match self {
1857            PropRelationDescDiff::AddColumn { name, typ } => {
1858                let new_idx = desc.metadata.len();
1859                let meta = ColumnMetadata {
1860                    name,
1861                    typ_idx: new_idx,
1862                    added: RelationVersion(0),
1863                    dropped: None,
1864                };
1865                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1866                desc.typ.column_types.push(typ);
1867
1868                assert_none!(prev);
1869                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1870            }
1871            PropRelationDescDiff::DropColumn { name } => {
1872                let next_version = desc
1873                    .metadata
1874                    .values()
1875                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1876                    .max()
1877                    .unwrap_or_else(RelationVersion::root)
1878                    .bump();
1879                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1880                else {
1881                    return;
1882                };
1883                if metadata.dropped.is_none() {
1884                    metadata.dropped = Some(next_version);
1885                }
1886            }
1887            PropRelationDescDiff::ToggleNullability { name } => {
1888                let Some((pos, _)) = desc.get_by_name(&name) else {
1889                    return;
1890                };
1891                let col_type = desc
1892                    .typ
1893                    .column_types
1894                    .get_mut(pos)
1895                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1896                col_type.nullable = !col_type.nullable;
1897            }
1898            PropRelationDescDiff::ChangeType { name, typ } => {
1899                let Some((pos, _)) = desc.get_by_name(&name) else {
1900                    return;
1901                };
1902                let col_type = desc
1903                    .typ
1904                    .column_types
1905                    .get_mut(pos)
1906                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1907                *col_type = typ;
1908            }
1909        }
1910    }
1911}
1912
1913/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1914pub fn arb_relation_desc_diff(
1915    source: &RelationDesc,
1916) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1917    let source = Rc::new(source.clone());
1918    let num_source_columns = source.typ.columns().len();
1919
1920    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1921    let add_columns_strat = num_add_columns
1922        .prop_flat_map(|num_columns| {
1923            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1924        })
1925        .prop_map(|cols| {
1926            cols.into_iter()
1927                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1928                .collect::<Vec<_>>()
1929        });
1930
1931    // If the source RelationDesc is empty there is nothing else to do.
1932    if num_source_columns == 0 {
1933        return add_columns_strat.boxed();
1934    }
1935
1936    let source_ = Rc::clone(&source);
1937    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1938        let mut set = BTreeSet::default();
1939        for _ in 0..num_columns {
1940            let col_idx = rng.random_range(0..num_source_columns);
1941            set.insert(source_.get_name(col_idx).clone());
1942        }
1943        set.into_iter()
1944            .map(|name| PropRelationDescDiff::DropColumn { name })
1945            .collect::<Vec<_>>()
1946    });
1947
1948    let source_ = Rc::clone(&source);
1949    let toggle_nullability_strat =
1950        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1951            let mut set = BTreeSet::default();
1952            for _ in 0..num_columns {
1953                let col_idx = rng.random_range(0..num_source_columns);
1954                set.insert(source_.get_name(col_idx).clone());
1955            }
1956            set.into_iter()
1957                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1958                .collect::<Vec<_>>()
1959        });
1960
1961    let source_ = Rc::clone(&source);
1962    let change_type_strat = (0..num_source_columns)
1963        .prop_perturb(move |num_columns, mut rng| {
1964            let mut set = BTreeSet::default();
1965            for _ in 0..num_columns {
1966                let col_idx = rng.random_range(0..num_source_columns);
1967                set.insert(source_.get_name(col_idx).clone());
1968            }
1969            set
1970        })
1971        .prop_flat_map(|cols| {
1972            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1973                .prop_map(move |types| (cols.clone(), types))
1974        })
1975        .prop_map(|(cols, types)| {
1976            cols.into_iter()
1977                .zip_eq(types)
1978                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1979                .collect::<Vec<_>>()
1980        });
1981
1982    (
1983        add_columns_strat,
1984        drop_columns_strat,
1985        toggle_nullability_strat,
1986        change_type_strat,
1987    )
1988        .prop_map(|(adds, drops, toggles, changes)| {
1989            adds.into_iter()
1990                .chain(drops)
1991                .chain(toggles)
1992                .chain(changes)
1993                .collect::<Vec<_>>()
1994        })
1995        .prop_shuffle()
1996        .boxed()
1997}
1998
1999#[cfg(test)]
2000mod tests {
2001    use super::*;
2002    use prost::Message;
2003
2004    #[mz_ore::test]
2005    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2006    fn smoktest_at_version() {
2007        let desc = RelationDesc::builder()
2008            .with_column("a", SqlScalarType::Bool.nullable(true))
2009            .with_column("z", SqlScalarType::String.nullable(false))
2010            .finish();
2011
2012        let mut versioned_desc = VersionedRelationDesc {
2013            inner: desc.clone(),
2014        };
2015        versioned_desc.validate();
2016
2017        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2018        assert_eq!(desc, latest);
2019
2020        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2021        assert_eq!(desc, v0);
2022
2023        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2024        assert_eq!(desc, v3);
2025
2026        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2027        assert_eq!(v1, RelationVersion(1));
2028
2029        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2030        insta::assert_json_snapshot!(v1.metadata, @r###"
2031        {
2032          "0": {
2033            "name": "a",
2034            "typ_idx": 0,
2035            "added": 0,
2036            "dropped": null
2037          },
2038          "1": {
2039            "name": "z",
2040            "typ_idx": 1,
2041            "added": 0,
2042            "dropped": null
2043          },
2044          "2": {
2045            "name": "b",
2046            "typ_idx": 2,
2047            "added": 1,
2048            "dropped": null
2049          }
2050        }
2051        "###);
2052
2053        // Check that V0 doesn't show the new column.
2054        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2055        assert!(v0.iter().eq(v0_b.iter()));
2056
2057        let v2 = versioned_desc.drop_column("z");
2058        assert_eq!(v2, RelationVersion(2));
2059
2060        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2061        insta::assert_json_snapshot!(v2.metadata, @r###"
2062        {
2063          "0": {
2064            "name": "a",
2065            "typ_idx": 0,
2066            "added": 0,
2067            "dropped": null
2068          },
2069          "2": {
2070            "name": "b",
2071            "typ_idx": 1,
2072            "added": 1,
2073            "dropped": null
2074          }
2075        }
2076        "###);
2077
2078        // Check that V0 and V1 are still correct.
2079        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2080        assert!(v0.iter().eq(v0_c.iter()));
2081
2082        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2083        assert!(v1.iter().eq(v1_b.iter()));
2084
2085        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2086        {
2087          "0": {
2088            "name": "a",
2089            "typ_idx": 0,
2090            "added": 0,
2091            "dropped": null
2092          },
2093          "1": {
2094            "name": "z",
2095            "typ_idx": 1,
2096            "added": 0,
2097            "dropped": 2
2098          },
2099          "2": {
2100            "name": "b",
2101            "typ_idx": 2,
2102            "added": 1,
2103            "dropped": null
2104          }
2105        }
2106        "###);
2107    }
2108
2109    #[mz_ore::test]
2110    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2111    fn test_dropping_columns_with_keys() {
2112        let desc = RelationDesc::builder()
2113            .with_column("a", SqlScalarType::Bool.nullable(true))
2114            .with_column("z", SqlScalarType::String.nullable(false))
2115            .with_key(vec![1])
2116            .finish();
2117
2118        let mut versioned_desc = VersionedRelationDesc {
2119            inner: desc.clone(),
2120        };
2121        versioned_desc.validate();
2122
2123        let v1 = versioned_desc.drop_column("a");
2124        assert_eq!(v1, RelationVersion(1));
2125
2126        // Make sure the key index for 'z' got remapped since 'a' was dropped.
2127        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2128        insta::assert_json_snapshot!(v1, @r###"
2129        {
2130          "typ": {
2131            "column_types": [
2132              {
2133                "scalar_type": "String",
2134                "nullable": false
2135              }
2136            ],
2137            "keys": [
2138              [
2139                0
2140              ]
2141            ]
2142          },
2143          "metadata": {
2144            "1": {
2145              "name": "z",
2146              "typ_idx": 0,
2147              "added": 0,
2148              "dropped": null
2149            }
2150          }
2151        }
2152        "###);
2153
2154        // Make sure the key index of 'z' is correct when all columns are present.
2155        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2156        insta::assert_json_snapshot!(v0, @r###"
2157        {
2158          "typ": {
2159            "column_types": [
2160              {
2161                "scalar_type": "Bool",
2162                "nullable": true
2163              },
2164              {
2165                "scalar_type": "String",
2166                "nullable": false
2167              }
2168            ],
2169            "keys": [
2170              [
2171                1
2172              ]
2173            ]
2174          },
2175          "metadata": {
2176            "0": {
2177              "name": "a",
2178              "typ_idx": 0,
2179              "added": 0,
2180              "dropped": 1
2181            },
2182            "1": {
2183              "name": "z",
2184              "typ_idx": 1,
2185              "added": 0,
2186              "dropped": null
2187            }
2188          }
2189        }
2190        "###);
2191    }
2192
2193    #[mz_ore::test]
2194    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2195    fn roundtrip_relation_desc_without_metadata() {
2196        let typ = ProtoRelationType {
2197            column_types: vec![
2198                SqlScalarType::String.nullable(false).into_proto(),
2199                SqlScalarType::Bool.nullable(true).into_proto(),
2200            ],
2201            keys: vec![],
2202        };
2203        let proto = ProtoRelationDesc {
2204            typ: Some(typ),
2205            names: vec![
2206                ColumnName("a".into()).into_proto(),
2207                ColumnName("b".into()).into_proto(),
2208            ],
2209            metadata: vec![],
2210        };
2211        let desc: RelationDesc = proto.into_rust().unwrap();
2212
2213        insta::assert_json_snapshot!(desc, @r###"
2214        {
2215          "typ": {
2216            "column_types": [
2217              {
2218                "scalar_type": "String",
2219                "nullable": false
2220              },
2221              {
2222                "scalar_type": "Bool",
2223                "nullable": true
2224              }
2225            ],
2226            "keys": []
2227          },
2228          "metadata": {
2229            "0": {
2230              "name": "a",
2231              "typ_idx": 0,
2232              "added": 0,
2233              "dropped": null
2234            },
2235            "1": {
2236              "name": "b",
2237              "typ_idx": 1,
2238              "added": 0,
2239              "dropped": null
2240            }
2241          }
2242        }
2243        "###);
2244    }
2245
2246    #[mz_ore::test]
2247    #[should_panic(expected = "column named 'a' already exists!")]
2248    fn test_add_column_with_same_name_panics() {
2249        let desc = RelationDesc::builder()
2250            .with_column("a", SqlScalarType::Bool.nullable(true))
2251            .finish();
2252        let mut versioned = VersionedRelationDesc::new(desc);
2253
2254        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2255    }
2256
2257    #[mz_ore::test]
2258    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2259    fn test_add_column_with_same_name_prev_dropped() {
2260        let desc = RelationDesc::builder()
2261            .with_column("a", SqlScalarType::Bool.nullable(true))
2262            .finish();
2263        let mut versioned = VersionedRelationDesc::new(desc);
2264
2265        let v1 = versioned.drop_column("a");
2266        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2267        insta::assert_json_snapshot!(v1, @r###"
2268        {
2269          "typ": {
2270            "column_types": [],
2271            "keys": []
2272          },
2273          "metadata": {}
2274        }
2275        "###);
2276
2277        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2278        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2279        insta::assert_json_snapshot!(v2, @r###"
2280        {
2281          "typ": {
2282            "column_types": [
2283              {
2284                "scalar_type": "String",
2285                "nullable": false
2286              }
2287            ],
2288            "keys": []
2289          },
2290          "metadata": {
2291            "1": {
2292              "name": "a",
2293              "typ_idx": 0,
2294              "added": 2,
2295              "dropped": null
2296            }
2297          }
2298        }
2299        "###);
2300    }
2301
2302    #[mz_ore::test]
2303    #[cfg_attr(miri, ignore)]
2304    fn apply_demand() {
2305        let desc = RelationDesc::builder()
2306            .with_column("a", SqlScalarType::String.nullable(true))
2307            .with_column("b", SqlScalarType::Int64.nullable(false))
2308            .with_column("c", SqlScalarType::Time.nullable(false))
2309            .finish();
2310        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2311        assert_eq!(desc.arity(), 2);
2312        // TODO(parkmycar): Move validate onto RelationDesc.
2313        VersionedRelationDesc::new(desc).validate();
2314    }
2315
2316    #[mz_ore::test]
2317    #[cfg_attr(miri, ignore)]
2318    fn smoketest_column_index_stable_ident() {
2319        let idx_a = ColumnIndex(42);
2320        // Note(parkmycar): This should never change.
2321        assert_eq!(idx_a.to_stable_name(), "42");
2322    }
2323
2324    #[mz_ore::test]
2325    #[cfg_attr(miri, ignore)] // too slow
2326    fn proptest_relation_desc_roundtrips() {
2327        fn testcase(og: RelationDesc) {
2328            let bytes = og.into_proto().encode_to_vec();
2329            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2330            let rnd = RelationDesc::from_proto(proto).unwrap();
2331
2332            assert_eq!(og, rnd);
2333        }
2334
2335        proptest!(|(desc in any::<RelationDesc>())| {
2336            testcase(desc);
2337        });
2338
2339        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2340            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2341        });
2342
2343        proptest!(|((mut desc, diffs) in strat)| {
2344            for diff in diffs {
2345                diff.apply(&mut desc);
2346            };
2347            testcase(desc);
2348        });
2349    }
2350}