Skip to main content

mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::soft_panic_or_log;
19use mz_ore::str::StrExt;
20use mz_ore::{assert_none, assert_ok};
21use mz_persist_types::schema::SchemaId;
22use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
23use proptest::prelude::*;
24use proptest::strategy::{Strategy, Union};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31    ProtoRelationVersion,
32};
33use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
34
35/// The type of a [`Datum`].
36///
37/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
38/// Int32 or String) with its nullability.
39///
40/// To construct a column type, either initialize the struct directly, or
41/// use the [`SqlScalarType::nullable`] method.
42#[derive(
43    Arbitrary,
44    Clone,
45    Debug,
46    Eq,
47    PartialEq,
48    Ord,
49    PartialOrd,
50    Serialize,
51    Deserialize,
52    Hash,
53    MzReflect
54)]
55pub struct SqlColumnType {
56    /// The underlying scalar type (e.g., Int32 or String) of this column.
57    pub scalar_type: SqlScalarType,
58    /// Whether this datum can be null.
59    #[serde(default = "return_true")]
60    pub nullable: bool,
61}
62
63/// This method exists solely for the purpose of making SqlColumnType nullable by
64/// default in unit tests. The default value of a bool is false, and the only
65/// way to make an object take on any other value by default is to pass it a
66/// function that returns the desired default value. See
67/// <https://github.com/serde-rs/serde/issues/1030>
68#[inline(always)]
69fn return_true() -> bool {
70    true
71}
72
73impl SqlColumnType {
74    /// Compute the least upper bound of many column types, returning an error on
75    /// incompatible types or an empty iterator.
76    /// See [`SqlColumnType::try_union`] for details.
77    pub fn try_union_many<'a>(
78        typs: impl IntoIterator<Item = &'a Self>,
79    ) -> Result<Self, anyhow::Error> {
80        let mut iter = typs.into_iter();
81        let Some(typ) = iter.next() else {
82            bail!("Cannot union empty iterator");
83        };
84        iter.try_fold(typ.clone(), |a, b| a.try_union(b))
85    }
86
87    /// Compute the least upper bound of many column types.
88    /// See [`SqlColumnType::try_union`] for details.
89    ///
90    /// Panics on incompatible types or an empty iterator.
91    pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
92        Self::try_union_many(typs).expect("Cannot union empty iterator")
93    }
94
95    /// Backports nullability information from `backport_typ` into `self`,
96    /// affecting the outer `.nullable` field but also record fields deeper
97    /// into the type.
98    pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
99        self.scalar_type
100            .backport_nullability(&backport_typ.scalar_type);
101        self.nullable = backport_typ.nullable;
102    }
103
104    /// Compute the least upper bound of two column types at the SQL level.
105    ///
106    /// Two types are compatible when they are equal, share the same base type
107    /// (differing only in modifiers), or are records with pairwise-compatible
108    /// fields.
109    /// The resulting nullability is the disjunction of the two input
110    /// nullabilities.
111    ///
112    /// Returns an error for incompatible types, e.g. `Text` and `Int32`, or
113    /// `Text` and `VarChar` (different base types at the SQL level).
114    /// See [`SqlColumnType::try_union`] for a fallback that handles the latter
115    /// case via repr-level union.
116    pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
117        match (&self.scalar_type, &other.scalar_type) {
118            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
119                Ok(SqlColumnType {
120                    scalar_type: scalar_type.clone(),
121                    nullable: self.nullable || other.nullable,
122                })
123            }
124            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
125                Ok(SqlColumnType {
126                    scalar_type: scalar_type.without_modifiers(),
127                    nullable: self.nullable || other.nullable,
128                })
129            }
130            (
131                SqlScalarType::Record { fields, custom_id },
132                SqlScalarType::Record {
133                    fields: other_fields,
134                    custom_id: other_custom_id,
135                },
136            ) => {
137                if custom_id != other_custom_id {
138                    bail!(
139                        "Can't union types: {:?} and {:?}",
140                        self.scalar_type,
141                        other.scalar_type
142                    );
143                };
144
145                if fields.len() != other_fields.len() {
146                    bail!(
147                        "Can't union types: {:?} and {:?}",
148                        self.scalar_type,
149                        other.scalar_type
150                    );
151                }
152                let mut union_fields = Vec::with_capacity(fields.len());
153                for ((name, typ), (other_name, other_typ)) in
154                    fields.iter().zip_eq(other_fields.iter())
155                {
156                    if name != other_name {
157                        bail!(
158                            "Can't union types: {:?} and {:?}",
159                            self.scalar_type,
160                            other.scalar_type
161                        );
162                    } else {
163                        let union_column_type = typ.sql_union(other_typ)?;
164                        union_fields.push((name.clone(), union_column_type));
165                    };
166                }
167
168                Ok(SqlColumnType {
169                    scalar_type: SqlScalarType::Record {
170                        fields: union_fields.into(),
171                        custom_id: *custom_id,
172                    },
173                    nullable: self.nullable || other.nullable,
174                })
175            }
176            _ => bail!(
177                "Can't union types: {:?} and {:?}",
178                self.scalar_type,
179                other.scalar_type
180            ),
181        }
182    }
183
184    /// Compute the least upper bound of two column types.
185    ///
186    /// Attempts [`SqlColumnType::sql_union`] first, which preserves SQL-level type
187    /// information (e.g. modifiers). Falls back to a repr-level union via
188    /// [`ReprColumnType::union`] when the SQL types are incompatible but the
189    /// underlying repr types are compatible.
190    ///
191    /// The resulting nullability is the disjunction of the two input
192    /// nullabilities.
193    pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
194        self.sql_union(other).or_else(|e| {
195            let repr_self = ReprColumnType::from(self);
196            let repr_other = ReprColumnType::from(other);
197            match repr_self.union(&repr_other) {
198                Ok(typ) => {
199                    // sql_union failed but repr union succeeded — this indicates
200                    // a repr-type canonicalization gap that we want CI visibility for.
201                    soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
202                    Ok(SqlColumnType::from_repr(&typ))
203                }
204                Err(_) => {
205                    // Both sql_union and repr union failed — genuine type mismatch,
206                    // not a canonicalization issue. Just propagate the original error.
207                    Err(e)
208                }
209            }
210        })
211    }
212
213    /// Compute the least upper bound of two column types.
214    /// See [`SqlColumnType::try_union`] for details.
215    ///
216    /// Panics on incompatible types.
217    pub fn union(&self, other: &Self) -> Self {
218        self.try_union(other).unwrap_or_else(|e| {
219            panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
220        })
221    }
222
223    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
224    /// nullability set to the specified boolean.
225    pub fn nullable(mut self, nullable: bool) -> Self {
226        self.nullable = nullable;
227        self
228    }
229}
230
231impl RustType<ProtoColumnType> for SqlColumnType {
232    fn into_proto(&self) -> ProtoColumnType {
233        ProtoColumnType {
234            nullable: self.nullable,
235            scalar_type: Some(self.scalar_type.into_proto()),
236        }
237    }
238
239    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
240        Ok(SqlColumnType {
241            nullable: proto.nullable,
242            scalar_type: proto
243                .scalar_type
244                .into_rust_if_some("ProtoColumnType::scalar_type")?,
245        })
246    }
247}
248
249impl fmt::Display for SqlColumnType {
250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251        let nullable = if self.nullable { "Null" } else { "NotNull" };
252        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
253    }
254}
255
256/// The type of a relation.
257#[derive(
258    Arbitrary,
259    Clone,
260    Debug,
261    Eq,
262    PartialEq,
263    Ord,
264    PartialOrd,
265    Serialize,
266    Deserialize,
267    Hash,
268    MzReflect
269)]
270pub struct SqlRelationType {
271    /// The type for each column, in order.
272    pub column_types: Vec<SqlColumnType>,
273    /// Sets of indices that are "keys" for the collection.
274    ///
275    /// Each element in this list is a set of column indices, each with the
276    /// property that the collection contains at most one record with each
277    /// distinct set of values for each column. Alternately, for a specific set
278    /// of values assigned to the these columns there is at most one record.
279    ///
280    /// A collection can contain multiple sets of keys, although it is common to
281    /// have either zero or one sets of key indices.
282    #[serde(default)]
283    pub keys: Vec<Vec<usize>>,
284}
285
286impl SqlRelationType {
287    /// Constructs a `SqlRelationType` representing the relation with no columns and
288    /// no keys.
289    pub fn empty() -> Self {
290        SqlRelationType::new(vec![])
291    }
292
293    /// Constructs a new `SqlRelationType` from specified column types.
294    ///
295    /// The `SqlRelationType` will have no keys.
296    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
297        SqlRelationType {
298            column_types,
299            keys: Vec::new(),
300        }
301    }
302
303    /// Adds a new key for the relation.
304    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
305        indices.sort_unstable();
306        if !self.keys.contains(&indices) {
307            self.keys.push(indices);
308        }
309        self
310    }
311
312    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
313        for key in keys {
314            self = self.with_key(key)
315        }
316        self
317    }
318
319    /// Computes the number of columns in the relation.
320    pub fn arity(&self) -> usize {
321        self.column_types.len()
322    }
323
324    /// Gets the index of the columns used when creating a default index.
325    pub fn default_key(&self) -> Vec<usize> {
326        if let Some(key) = self.keys.first() {
327            if key.is_empty() {
328                (0..self.column_types.len()).collect()
329            } else {
330                key.clone()
331            }
332        } else {
333            (0..self.column_types.len()).collect()
334        }
335    }
336
337    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
338    pub fn columns(&self) -> &[SqlColumnType] {
339        &self.column_types
340    }
341
342    /// Adopts the nullability and keys from another `SqlRelationType`.
343    ///
344    /// Panics if the number of columns does not match.
345    pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
346        assert_eq!(
347            backport_typ.column_types.len(),
348            self.column_types.len(),
349            "HIR and MIR types should have the same number of columns"
350        );
351        for (backport_col, sql_col) in backport_typ
352            .column_types
353            .iter()
354            .zip_eq(self.column_types.iter_mut())
355        {
356            sql_col.backport_nullability(backport_col);
357        }
358
359        self.keys = backport_typ.keys.clone();
360    }
361
362    /// Constructs a `SqlRelationType` from a `ReprRelationType` by converting
363    /// each column type via [`SqlColumnType::from_repr`]. This is a lossy
364    /// inverse of `ReprRelationType::from(&SqlRelationType)`.
365    pub fn from_repr(repr: &ReprRelationType) -> Self {
366        SqlRelationType {
367            column_types: repr
368                .column_types
369                .iter()
370                .map(SqlColumnType::from_repr)
371                .collect(),
372            keys: repr.keys.clone(),
373        }
374    }
375}
376
377impl RustType<ProtoRelationType> for SqlRelationType {
378    fn into_proto(&self) -> ProtoRelationType {
379        ProtoRelationType {
380            column_types: self.column_types.into_proto(),
381            keys: self.keys.into_proto(),
382        }
383    }
384
385    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
386        Ok(SqlRelationType {
387            column_types: proto.column_types.into_rust()?,
388            keys: proto.keys.into_rust()?,
389        })
390    }
391}
392
393impl RustType<ProtoKey> for Vec<usize> {
394    fn into_proto(&self) -> ProtoKey {
395        ProtoKey {
396            keys: self.into_proto(),
397        }
398    }
399
400    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
401        proto.keys.into_rust()
402    }
403}
404
405/// The type of a relation.
406#[derive(
407    Clone,
408    Debug,
409    Eq,
410    PartialEq,
411    Ord,
412    PartialOrd,
413    Serialize,
414    Deserialize,
415    Hash,
416    MzReflect
417)]
418pub struct ReprRelationType {
419    /// The type for each column, in order.
420    pub column_types: Vec<ReprColumnType>,
421    /// Sets of indices that are "keys" for the collection.
422    ///
423    /// Each element in this list is a set of column indices, each with the
424    /// property that the collection contains at most one record with each
425    /// distinct set of values for each column. Alternately, for a specific set
426    /// of values assigned to the these columns there is at most one record.
427    ///
428    /// A collection can contain multiple sets of keys, although it is common to
429    /// have either zero or one sets of key indices.
430    #[serde(default)]
431    pub keys: Vec<Vec<usize>>,
432}
433
434impl ReprRelationType {
435    /// Constructs a `ReprRelationType` representing the relation with no columns and
436    /// no keys.
437    pub fn empty() -> Self {
438        ReprRelationType::new(vec![])
439    }
440
441    /// Constructs a new `ReprRelationType` from specified column types.
442    ///
443    /// The `ReprRelationType` will have no keys.
444    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
445        ReprRelationType {
446            column_types,
447            keys: Vec::new(),
448        }
449    }
450
451    /// Adds a new key for the relation.
452    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
453        indices.sort_unstable();
454        if !self.keys.contains(&indices) {
455            self.keys.push(indices);
456        }
457        self
458    }
459
460    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
461        for key in keys {
462            self = self.with_key(key)
463        }
464        self
465    }
466
467    /// Computes the number of columns in the relation.
468    pub fn arity(&self) -> usize {
469        self.column_types.len()
470    }
471
472    /// Gets the index of the columns used when creating a default index.
473    pub fn default_key(&self) -> Vec<usize> {
474        if let Some(key) = self.keys.first() {
475            if key.is_empty() {
476                (0..self.column_types.len()).collect()
477            } else {
478                key.clone()
479            }
480        } else {
481            (0..self.column_types.len()).collect()
482        }
483    }
484
485    /// Returns all the column types in order, for this relation.
486    pub fn columns(&self) -> &[ReprColumnType] {
487        &self.column_types
488    }
489}
490
491impl From<&SqlRelationType> for ReprRelationType {
492    fn from(sql_relation_type: &SqlRelationType) -> Self {
493        ReprRelationType {
494            column_types: sql_relation_type
495                .column_types
496                .iter()
497                .map(ReprColumnType::from)
498                .collect(),
499            keys: sql_relation_type.keys.clone(),
500        }
501    }
502}
503
504#[derive(
505    Clone,
506    Debug,
507    Eq,
508    PartialEq,
509    Ord,
510    PartialOrd,
511    Serialize,
512    Deserialize,
513    Hash,
514    MzReflect
515)]
516pub struct ReprColumnType {
517    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
518    pub scalar_type: ReprScalarType,
519    /// Whether this datum can be null.
520    #[serde(default = "return_true")]
521    pub nullable: bool,
522}
523
524impl std::fmt::Display for ReprColumnType {
525    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526        write!(f, "{}", self.scalar_type)?;
527        if self.nullable {
528            write!(f, "?")?;
529        }
530        Ok(())
531    }
532}
533
534impl ReprColumnType {
535    /// Compute the least upper bound of two column types at the repr level.
536    ///
537    /// More permissive than [`SqlColumnType::sql_union`] because it operates
538    /// on the underlying representation types, ignoring SQL-level distinctions
539    /// such as modifiers.
540    /// The resulting nullability is the disjunction of the two inputs.
541    pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
542        let scalar_type = self.scalar_type.union(&col.scalar_type)?;
543        let nullable = self.nullable || col.nullable;
544
545        Ok(ReprColumnType {
546            scalar_type,
547            nullable,
548        })
549    }
550}
551
552impl From<&SqlColumnType> for ReprColumnType {
553    fn from(sql_column_type: &SqlColumnType) -> Self {
554        let scalar_type = &sql_column_type.scalar_type;
555        let scalar_type = scalar_type.into();
556        let nullable = sql_column_type.nullable;
557
558        ReprColumnType {
559            scalar_type,
560            nullable,
561        }
562    }
563}
564
565impl SqlColumnType {
566    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
567    ///
568    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
569    pub fn from_repr(repr: &ReprColumnType) -> Self {
570        let scalar_type = &repr.scalar_type;
571        let scalar_type = SqlScalarType::from_repr(scalar_type);
572        let nullable = repr.nullable;
573
574        SqlColumnType {
575            scalar_type,
576            nullable,
577        }
578    }
579}
580
581/// The name of a column in a [`RelationDesc`].
582#[derive(
583    Clone,
584    Debug,
585    Eq,
586    PartialEq,
587    Ord,
588    PartialOrd,
589    Serialize,
590    Deserialize,
591    Hash,
592    MzReflect
593)]
594pub struct ColumnName(Box<str>);
595
596impl ColumnName {
597    /// Returns this column name as a `str`.
598    #[inline(always)]
599    pub fn as_str(&self) -> &str {
600        &*self
601    }
602
603    /// Returns this column name as a `&mut Box<str>`.
604    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
605        &mut self.0
606    }
607
608    /// Returns if this [`ColumnName`] is similar to the provided one.
609    pub fn is_similar(&self, other: &ColumnName) -> bool {
610        const SIMILARITY_THRESHOLD: f64 = 0.6;
611
612        let a_lowercase = self.to_lowercase();
613        let b_lowercase = other.to_lowercase();
614
615        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
616    }
617}
618
619impl std::ops::Deref for ColumnName {
620    type Target = str;
621
622    #[inline(always)]
623    fn deref(&self) -> &Self::Target {
624        &self.0
625    }
626}
627
628impl fmt::Display for ColumnName {
629    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
630        f.write_str(&self.0)
631    }
632}
633
634impl From<String> for ColumnName {
635    fn from(s: String) -> ColumnName {
636        ColumnName(s.into())
637    }
638}
639
640impl From<&str> for ColumnName {
641    fn from(s: &str) -> ColumnName {
642        ColumnName(s.into())
643    }
644}
645
646impl From<&ColumnName> for ColumnName {
647    fn from(n: &ColumnName) -> ColumnName {
648        n.clone()
649    }
650}
651
652impl RustType<ProtoColumnName> for ColumnName {
653    fn into_proto(&self) -> ProtoColumnName {
654        ProtoColumnName {
655            value: Some(self.0.to_string()),
656        }
657    }
658
659    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
660        Ok(ColumnName(
661            proto
662                .value
663                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
664                .into(),
665        ))
666    }
667}
668
669impl From<ColumnName> for mz_sql_parser::ast::Ident {
670    fn from(value: ColumnName) -> Self {
671        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
672        mz_sql_parser::ast::Ident::new_unchecked(value.0)
673    }
674}
675
676impl proptest::arbitrary::Arbitrary for ColumnName {
677    type Parameters = ();
678    type Strategy = BoxedStrategy<ColumnName>;
679
680    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
681        // Long column names are generally uninteresting, and can greatly
682        // increase the runtime for a test case, so bound the max length.
683        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
684        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
685            weights.extend([
686                (5, Just(16..128)),
687                (1, Just(128..1024)),
688                (1, Just(1024..4096)),
689            ]);
690        }
691        let name_length = Union::new_weighted(weights);
692
693        // Non-ASCII characters are also generally uninteresting and can make
694        // debugging harder.
695        let char_strat = Rc::new(Union::new_weighted(vec![
696            (50, proptest::char::range('A', 'z').boxed()),
697            (1, any::<char>().boxed()),
698        ]));
699
700        name_length
701            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
702            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
703            .no_shrink()
704            .boxed()
705    }
706}
707
708/// Default name of a column (when no other information is known).
709pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
710
711/// Stable index of a column in a [`RelationDesc`].
712#[derive(
713    Clone,
714    Copy,
715    Debug,
716    Eq,
717    PartialEq,
718    PartialOrd,
719    Ord,
720    Serialize,
721    Deserialize,
722    Hash,
723    MzReflect
724)]
725pub struct ColumnIndex(usize);
726
727static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
728
729impl ColumnIndex {
730    /// Returns a stable identifier for this [`ColumnIndex`].
731    pub fn to_stable_name(&self) -> String {
732        self.0.to_string()
733    }
734
735    pub fn to_raw(&self) -> usize {
736        self.0
737    }
738
739    pub fn from_raw(val: usize) -> Self {
740        ColumnIndex(val)
741    }
742}
743
744/// The version a given column was added at.
745#[derive(
746    Clone,
747    Copy,
748    Debug,
749    Eq,
750    PartialEq,
751    PartialOrd,
752    Ord,
753    Serialize,
754    Deserialize,
755    Hash,
756    MzReflect,
757    Arbitrary
758)]
759pub struct RelationVersion(u64);
760
761impl RelationVersion {
762    /// Returns the "root" or "initial" version of a [`RelationDesc`].
763    pub fn root() -> Self {
764        RelationVersion(0)
765    }
766
767    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
768    pub fn bump(&self) -> Self {
769        let next_version = self
770            .0
771            .checked_add(1)
772            .expect("added more than u64::MAX columns?");
773        RelationVersion(next_version)
774    }
775
776    /// Consume a [`RelationVersion`] returning the raw value.
777    ///
778    /// Should __only__ be used for serialization.
779    pub fn into_raw(self) -> u64 {
780        self.0
781    }
782
783    /// Create a [`RelationVersion`] from a raw value.
784    ///
785    /// Should __only__ be used for serialization.
786    pub fn from_raw(val: u64) -> RelationVersion {
787        RelationVersion(val)
788    }
789}
790
791impl From<RelationVersion> for SchemaId {
792    fn from(value: RelationVersion) -> Self {
793        SchemaId(usize::cast_from(value.0))
794    }
795}
796
797impl From<mz_sql_parser::ast::Version> for RelationVersion {
798    fn from(value: mz_sql_parser::ast::Version) -> Self {
799        RelationVersion(value.into_inner())
800    }
801}
802
803impl fmt::Display for RelationVersion {
804    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
805        write!(f, "v{}", self.0)
806    }
807}
808
809impl From<RelationVersion> for mz_sql_parser::ast::Version {
810    fn from(value: RelationVersion) -> Self {
811        mz_sql_parser::ast::Version::new(value.0)
812    }
813}
814
815impl RustType<ProtoRelationVersion> for RelationVersion {
816    fn into_proto(&self) -> ProtoRelationVersion {
817        ProtoRelationVersion { value: self.0 }
818    }
819
820    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
821        Ok(RelationVersion(proto.value))
822    }
823}
824
825/// Semantic type annotation for a column in a builtin catalog relation.
826///
827/// These are compile-time metadata used by the catalog ontology layer to
828/// describe the meaning of a column (e.g., that it contains a catalog item ID
829/// or a role ID). Possible values correspond to the entries in
830/// `SEMANTIC_TYPE_DEFS` in the `mz-catalog` crate.
831#[derive(
832    Clone,
833    Copy,
834    Debug,
835    PartialEq,
836    Eq,
837    PartialOrd,
838    Ord,
839    Hash,
840    serde::Serialize
841)]
842pub enum SemanticType {
843    CatalogItemId,
844    GlobalId,
845    ClusterId,
846    ReplicaId,
847    SchemaId,
848    DatabaseId,
849    RoleId,
850    NetworkPolicyId,
851    ShardId,
852    OID,
853    ObjectType,
854    ConnectionType,
855    SourceType,
856    MzTimestamp,
857    WallclockTimestamp,
858    ByteCount,
859    RecordCount,
860    CreditRate,
861    SqlDefinition,
862    RedactedSqlDefinition,
863}
864
865impl fmt::Display for SemanticType {
866    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
867        let s = match self {
868            SemanticType::CatalogItemId => "CatalogItemId",
869            SemanticType::GlobalId => "GlobalId",
870            SemanticType::ClusterId => "ClusterId",
871            SemanticType::ReplicaId => "ReplicaId",
872            SemanticType::SchemaId => "SchemaId",
873            SemanticType::DatabaseId => "DatabaseId",
874            SemanticType::RoleId => "RoleId",
875            SemanticType::NetworkPolicyId => "NetworkPolicyId",
876            SemanticType::ShardId => "ShardId",
877            SemanticType::OID => "OID",
878            SemanticType::ObjectType => "ObjectType",
879            SemanticType::ConnectionType => "ConnectionType",
880            SemanticType::SourceType => "SourceType",
881            SemanticType::MzTimestamp => "MzTimestamp",
882            SemanticType::WallclockTimestamp => "WallclockTimestamp",
883            SemanticType::ByteCount => "ByteCount",
884            SemanticType::RecordCount => "RecordCount",
885            SemanticType::CreditRate => "CreditRate",
886            SemanticType::SqlDefinition => "SqlDefinition",
887            SemanticType::RedactedSqlDefinition => "RedactedSqlDefinition",
888        };
889        f.write_str(s)
890    }
891}
892
893/// Metadata (other than type) for a column in a [`RelationDesc`].
894#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
895struct ColumnMetadata {
896    /// Name of the column.
897    name: ColumnName,
898    /// Index into a [`SqlRelationType`] for this column.
899    typ_idx: usize,
900    /// Version this column was added at.
901    added: RelationVersion,
902    /// Version this column was dropped at.
903    dropped: Option<RelationVersion>,
904}
905
906/// A description of the shape of a relation.
907///
908/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
909/// the relation.
910///
911/// # Examples
912///
913/// A `RelationDesc`s is typically constructed via its builder API:
914///
915/// ```
916/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
917///
918/// let desc = RelationDesc::builder()
919///     .with_column("id", SqlScalarType::Int64.nullable(false))
920///     .with_column("price", SqlScalarType::Float64.nullable(true))
921///     .finish();
922/// ```
923///
924/// In more complicated cases, like when constructing a `RelationDesc` in
925/// response to user input, it may be more convenient to construct a relation
926/// type first, and imbue it with column names to form a `RelationDesc` later:
927///
928/// ```
929/// use mz_repr::RelationDesc;
930///
931/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
932/// let relation_type = plan_query("SELECT * FROM table");
933/// let names = (0..relation_type.arity()).map(|i| match i {
934///     0 => "first",
935///     1 => "second",
936///     _ => "unknown",
937/// });
938/// let desc = RelationDesc::new(relation_type, names);
939/// ```
940///
941/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
942/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
943/// column throughout the lifetime of the relation. This allows a
944/// [`RelationDesc`] to represent a projection over a version of itself.
945///
946/// ```
947/// use std::collections::BTreeSet;
948/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
949///
950/// let desc = RelationDesc::builder()
951///     .with_column("name", SqlScalarType::String.nullable(false))
952///     .with_column("email", SqlScalarType::String.nullable(false))
953///     .finish();
954///
955/// // Project away the second column.
956/// let demands = BTreeSet::from([1]);
957/// let proj = desc.apply_demand(&demands);
958///
959/// // We projected away the first column.
960/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
961/// // But retained the second.
962/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
963///
964/// // The underlying `SqlRelationType` also contains a single column.
965/// assert_eq!(proj.typ().arity(), 1);
966/// ```
967///
968/// To maintain this stable mapping and track the lifetime of a column (e.g.
969/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
970/// the index in [`SqlRelationType`] that corresponds to a given column, and the
971/// version at which this column was added or dropped.
972///
973#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, MzReflect)]
974pub struct RelationDesc {
975    typ: SqlRelationType,
976    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
977}
978
979impl RustType<ProtoRelationDesc> for RelationDesc {
980    fn into_proto(&self) -> ProtoRelationDesc {
981        let (names, metadata): (Vec<_>, Vec<_>) = self
982            .metadata
983            .values()
984            .map(|meta| {
985                let metadata = ProtoColumnMetadata {
986                    added: Some(meta.added.into_proto()),
987                    dropped: meta.dropped.map(|v| v.into_proto()),
988                };
989                (meta.name.into_proto(), metadata)
990            })
991            .unzip();
992
993        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
994        // metadata field was added. To make sure our serialization roundtrips the same as before
995        // we added the field, we omit `metadata` if all of the values are equal to the default.
996        //
997        // Note: This logic needs to exist approximately forever.
998        let is_all_default_metadata = metadata.iter().all(|meta| {
999            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
1000        });
1001        let metadata = if is_all_default_metadata {
1002            Vec::new()
1003        } else {
1004            metadata
1005        };
1006
1007        ProtoRelationDesc {
1008            typ: Some(self.typ.into_proto()),
1009            names,
1010            metadata,
1011        }
1012    }
1013
1014    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
1015        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
1016        // metadata field was added. If the field doesn't exist we fill it in with default values,
1017        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
1018        //
1019        // Note: This logic needs to exist approximately forever.
1020        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
1021            let val = ProtoColumnMetadata {
1022                added: Some(RelationVersion::root().into_proto()),
1023                dropped: None,
1024            };
1025            Box::new(itertools::repeat_n(val, proto.names.len()))
1026        } else {
1027            Box::new(proto.metadata.into_iter())
1028        };
1029
1030        let metadata = proto
1031            .names
1032            .into_iter()
1033            .zip_eq(proto_metadata)
1034            .enumerate()
1035            .map(|(idx, (name, metadata))| {
1036                let meta = ColumnMetadata {
1037                    name: name.into_rust()?,
1038                    typ_idx: idx,
1039                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
1040                    dropped: metadata.dropped.into_rust()?,
1041                };
1042                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
1043            })
1044            .collect::<Result<_, _>>()?;
1045
1046        Ok(RelationDesc {
1047            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
1048            metadata,
1049        })
1050    }
1051}
1052
1053impl RelationDesc {
1054    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
1055    pub fn builder() -> RelationDescBuilder {
1056        RelationDescBuilder::default()
1057    }
1058
1059    /// Constructs a new `RelationDesc` that represents the empty relation
1060    /// with no columns and no keys.
1061    pub fn empty() -> Self {
1062        RelationDesc {
1063            typ: SqlRelationType::empty(),
1064            metadata: BTreeMap::default(),
1065        }
1066    }
1067
1068    /// Check if the `RelationDesc` is empty.
1069    pub fn is_empty(&self) -> bool {
1070        self == &Self::empty()
1071    }
1072
1073    /// Returns the number of columns in this [`RelationDesc`].
1074    pub fn len(&self) -> usize {
1075        self.typ().column_types.len()
1076    }
1077
1078    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
1079    /// over column names.
1080    ///
1081    /// # Panics
1082    ///
1083    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
1084    /// items in `names`.
1085    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1086    where
1087        I: IntoIterator<Item = N>,
1088        N: Into<ColumnName>,
1089    {
1090        let metadata: BTreeMap<_, _> = names
1091            .into_iter()
1092            .enumerate()
1093            .map(|(idx, name)| {
1094                let col_idx = ColumnIndex(idx);
1095                let metadata = ColumnMetadata {
1096                    name: name.into(),
1097                    typ_idx: idx,
1098                    added: RelationVersion::root(),
1099                    dropped: None,
1100                };
1101                (col_idx, metadata)
1102            })
1103            .collect();
1104
1105        // TODO(parkmycar): Add better validation here.
1106        assert_eq!(typ.column_types.len(), metadata.len());
1107
1108        RelationDesc { typ, metadata }
1109    }
1110
1111    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1112    where
1113        I: IntoIterator<Item = (N, T)>,
1114        T: Into<SqlColumnType>,
1115        N: Into<ColumnName>,
1116    {
1117        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1118        let types = types.into_iter().map(Into::into).collect();
1119        let typ = SqlRelationType::new(types);
1120        Self::new(typ, names)
1121    }
1122
1123    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
1124    ///
1125    /// # Panics
1126    ///
1127    /// Panics if either `self` or `other` have columns that were added at a
1128    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1129    /// columns were dropped.
1130    ///
1131    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
1132    pub fn concat(mut self, other: Self) -> Self {
1133        let self_len = self.typ.column_types.len();
1134
1135        for (typ, (_col_idx, meta)) in other.typ.column_types.into_iter().zip_eq(other.metadata) {
1136            assert_eq!(meta.added, RelationVersion::root());
1137            assert_none!(meta.dropped);
1138
1139            let new_idx = self.typ.columns().len();
1140            let new_meta = ColumnMetadata {
1141                name: meta.name,
1142                typ_idx: new_idx,
1143                added: RelationVersion::root(),
1144                dropped: None,
1145            };
1146
1147            self.typ.column_types.push(typ);
1148            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1149
1150            assert_eq!(self.metadata.len(), self.typ.columns().len());
1151            assert_none!(prev);
1152        }
1153
1154        for k in other.typ.keys {
1155            let k = k.into_iter().map(|idx| idx + self_len).collect();
1156            self = self.with_key(k);
1157        }
1158        self
1159    }
1160
1161    /// Adds a new key for the relation.
1162    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1163        self.typ = self.typ.with_key(indices);
1164        self
1165    }
1166
1167    /// Drops all existing keys.
1168    pub fn without_keys(mut self) -> Self {
1169        self.typ.keys.clear();
1170        self
1171    }
1172
1173    /// Builds a new relation description with the column names replaced with
1174    /// new names.
1175    ///
1176    /// # Panics
1177    ///
1178    /// Panics if the arity of the relation type does not match the number of
1179    /// items in `names`.
1180    pub fn with_names<I, N>(self, names: I) -> Self
1181    where
1182        I: IntoIterator<Item = N>,
1183        N: Into<ColumnName>,
1184    {
1185        Self::new(self.typ, names)
1186    }
1187
1188    /// Computes the number of columns in the relation.
1189    pub fn arity(&self) -> usize {
1190        self.typ.arity()
1191    }
1192
1193    /// Returns the relation type underlying this relation description.
1194    pub fn typ(&self) -> &SqlRelationType {
1195        &self.typ
1196    }
1197
1198    /// Returns the owned relation type underlying this relation description.
1199    pub fn into_typ(self) -> SqlRelationType {
1200        self.typ
1201    }
1202
1203    /// Returns an iterator over the columns in this relation.
1204    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1205        self.metadata.values().map(|meta| {
1206            let typ = &self.typ.columns()[meta.typ_idx];
1207            (&meta.name, typ)
1208        })
1209    }
1210
1211    /// Returns an iterator over the types of the columns in this relation.
1212    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1213        self.typ.column_types.iter()
1214    }
1215
1216    /// Returns an iterator over the names of the columns in this relation.
1217    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1218        self.metadata.values().map(|meta| &meta.name)
1219    }
1220
1221    /// Returns an iterator over the columns in this relation, with all their metadata.
1222    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1223        self.metadata.iter().map(|(col_idx, metadata)| {
1224            let col_typ = &self.typ.columns()[metadata.typ_idx];
1225            (col_idx, &metadata.name, col_typ)
1226        })
1227    }
1228
1229    /// Returns an iterator over the names of the columns in this relation that are "similar" to
1230    /// the provided `name`.
1231    pub fn iter_similar_names<'a>(
1232        &'a self,
1233        name: &'a ColumnName,
1234    ) -> impl Iterator<Item = &'a ColumnName> {
1235        self.iter_names().filter(|n| n.is_similar(name))
1236    }
1237
1238    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
1239    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1240        self.metadata.contains_key(idx)
1241    }
1242
1243    /// Finds a column by name.
1244    ///
1245    /// Returns the index and type of the column named `name`. If no column with
1246    /// the specified name exists, returns `None`. If multiple columns have the
1247    /// specified name, the leftmost column is returned.
1248    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1249        self.iter_names()
1250            .position(|n| n == name)
1251            .map(|i| (i, &self.typ.column_types[i]))
1252    }
1253
1254    /// Gets the name of the `i`th column.
1255    ///
1256    /// # Panics
1257    ///
1258    /// Panics if `i` is not a valid column index.
1259    ///
1260    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
1261    pub fn get_name(&self, i: usize) -> &ColumnName {
1262        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1263        self.get_name_idx(&ColumnIndex(i))
1264    }
1265
1266    /// Gets the name of the column at `idx`.
1267    ///
1268    /// # Panics
1269    ///
1270    /// Panics if no column exists at `idx`.
1271    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1272        &self.metadata.get(idx).expect("should exist").name
1273    }
1274
1275    /// Mutably gets the name of the `i`th column.
1276    ///
1277    /// # Panics
1278    ///
1279    /// Panics if `i` is not a valid column index.
1280    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1281        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1282        &mut self
1283            .metadata
1284            .get_mut(&ColumnIndex(i))
1285            .expect("should exist")
1286            .name
1287    }
1288
1289    /// Gets the [`SqlColumnType`] of the column at `idx`.
1290    ///
1291    /// # Panics
1292    ///
1293    /// Panics if no column exists at `idx`.
1294    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1295        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1296        &self.typ.column_types[typ_idx]
1297    }
1298
1299    /// Gets the name of the `i`th column if that column name is unambiguous.
1300    ///
1301    /// If at least one other column has the same name as the `i`th column,
1302    /// returns `None`. If the `i`th column has no name, returns `None`.
1303    ///
1304    /// # Panics
1305    ///
1306    /// Panics if `i` is not a valid column index.
1307    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1308        let name = self.get_name(i);
1309        if self.iter_names().filter(|n| *n == name).count() == 1 {
1310            Some(name)
1311        } else {
1312            None
1313        }
1314    }
1315
1316    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1317    ///
1318    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1319    /// structure will be simple to extend.
1320    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1321        let name = self.get_name(i);
1322        let typ = &self.typ.column_types[i];
1323        if d == &Datum::Null && !typ.nullable {
1324            Err(NotNullViolation(name.clone()))
1325        } else {
1326            Ok(())
1327        }
1328    }
1329
1330    /// Computes the differences between two [`RelationDesc`]s.
1331    ///
1332    /// Returns a rich diff describing which columns differ, and in what way.
1333    ///
1334    /// # Panics
1335    ///
1336    /// Panics if either `self` or `other` have columns that were added at a
1337    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1338    /// columns were dropped.
1339    ///
1340    /// This simplifies things by allowing us to assume that `ColumnIndex`es are
1341    /// dense and that they match the indexes of `typ.columns()`. Without this
1342    /// we would, e.g., struggle comparing keys as those are in terms of
1343    /// `typ.columns()` indexes.
1344    pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1345        assert_eq!(self.metadata.len(), self.typ.columns().len());
1346        assert_eq!(other.metadata.len(), other.typ.columns().len());
1347        for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1348            assert_eq!(meta.typ_idx, idx.0);
1349            assert_eq!(meta.added, RelationVersion::root());
1350            assert_none!(meta.dropped);
1351        }
1352
1353        let mut column_diffs = BTreeMap::new();
1354        let mut key_diff = None;
1355
1356        let left_arity = self.arity();
1357        let right_arity = other.arity();
1358        let common_arity = std::cmp::min(left_arity, right_arity);
1359
1360        for idx in 0..common_arity {
1361            let left_name = self.get_name(idx);
1362            let right_name = other.get_name(idx);
1363            let left_type = &self.typ.column_types[idx];
1364            let right_type = &other.typ.column_types[idx];
1365
1366            if left_name != right_name {
1367                let diff = ColumnDiff::NameMismatch {
1368                    left: left_name.clone(),
1369                    right: right_name.clone(),
1370                };
1371                column_diffs.insert(idx, diff);
1372            } else if left_type.scalar_type != right_type.scalar_type {
1373                let diff = ColumnDiff::TypeMismatch {
1374                    name: left_name.clone(),
1375                    left: left_type.scalar_type.clone(),
1376                    right: right_type.scalar_type.clone(),
1377                };
1378                column_diffs.insert(idx, diff);
1379            } else if left_type.nullable != right_type.nullable {
1380                let diff = ColumnDiff::NullabilityMismatch {
1381                    name: left_name.clone(),
1382                    left: left_type.nullable,
1383                    right: right_type.nullable,
1384                };
1385                column_diffs.insert(idx, diff);
1386            }
1387        }
1388
1389        for idx in common_arity..left_arity {
1390            let diff = ColumnDiff::Missing {
1391                name: self.get_name(idx).clone(),
1392            };
1393            column_diffs.insert(idx, diff);
1394        }
1395
1396        for idx in common_arity..right_arity {
1397            let diff = ColumnDiff::Extra {
1398                name: other.get_name(idx).clone(),
1399            };
1400            column_diffs.insert(idx, diff);
1401        }
1402
1403        let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1404        let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1405        if left_keys != right_keys {
1406            let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1407                keys.iter()
1408                    .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1409                    .collect()
1410            };
1411            key_diff = Some(KeyDiff {
1412                left: column_names(self, left_keys),
1413                right: column_names(other, right_keys),
1414            });
1415        }
1416
1417        RelationDescDiff {
1418            column_diffs,
1419            key_diff,
1420        }
1421    }
1422
1423    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1424    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1425        let mut new_desc = self.clone();
1426
1427        // Update ColumnMetadata.
1428        let mut removed = 0;
1429        new_desc.metadata.retain(|idx, metadata| {
1430            let retain = demands.contains(&idx.0);
1431            if !retain {
1432                removed += 1;
1433            } else {
1434                metadata.typ_idx -= removed;
1435            }
1436            retain
1437        });
1438
1439        // Update SqlColumnType.
1440        let mut idx = 0;
1441        new_desc.typ.column_types.retain(|_| {
1442            let keep = demands.contains(&idx);
1443            idx += 1;
1444            keep
1445        });
1446
1447        new_desc
1448    }
1449}
1450
1451impl Arbitrary for RelationDesc {
1452    type Parameters = ();
1453    type Strategy = BoxedStrategy<RelationDesc>;
1454
1455    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1456        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1457        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1458            weights.extend([
1459                (12, Just(16..32)),
1460                (6, Just(32..64)),
1461                (3, Just(64..128)),
1462                (1, Just(128..256)),
1463            ]);
1464        }
1465        let num_columns = Union::new_weighted(weights);
1466
1467        num_columns.prop_flat_map(arb_relation_desc).boxed()
1468    }
1469}
1470
1471/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1472/// within the range provided.
1473pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1474    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1475        .prop_map(RelationDesc::from_names_and_types)
1476}
1477
1478/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1479pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1480    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1481    mask.prop_map(move |mask| {
1482        let demands: BTreeSet<_> = mask
1483            .into_iter()
1484            .enumerate()
1485            .filter_map(|(idx, keep)| keep.then_some(idx))
1486            .collect();
1487        desc.apply_demand(&demands)
1488    })
1489}
1490
1491impl IntoIterator for RelationDesc {
1492    type Item = (ColumnName, SqlColumnType);
1493    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1494
1495    fn into_iter(self) -> Self::IntoIter {
1496        let iter = self
1497            .metadata
1498            .into_values()
1499            .zip_eq(self.typ.column_types)
1500            .map(|(meta, typ)| (meta.name, typ));
1501        Box::new(iter)
1502    }
1503}
1504
1505/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1506pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1507    let datums: Vec<_> = desc
1508        .typ()
1509        .columns()
1510        .iter()
1511        .cloned()
1512        .map(arb_datum_for_column)
1513        .collect();
1514    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1515}
1516
1517/// Expression violated not-null constraint on named column
1518#[derive(Debug, PartialEq, Eq)]
1519pub struct NotNullViolation(pub ColumnName);
1520
1521impl fmt::Display for NotNullViolation {
1522    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1523        write!(
1524            f,
1525            "null value in column {} violates not-null constraint",
1526            self.0.quoted()
1527        )
1528    }
1529}
1530
1531/// The result of comparing two [`RelationDesc`]s.
1532#[derive(Debug, Clone, PartialEq, Eq)]
1533pub struct RelationDescDiff {
1534    /// Column differences, keyed by column index.
1535    pub column_diffs: BTreeMap<usize, ColumnDiff>,
1536    /// Key differences, if any.
1537    pub key_diff: Option<KeyDiff>,
1538}
1539
1540impl RelationDescDiff {
1541    /// Returns whether the diff contains any differences.
1542    pub fn is_empty(&self) -> bool {
1543        self.column_diffs.is_empty() && self.key_diff.is_none()
1544    }
1545}
1546
1547/// A difference in a column between two [`RelationDesc`]s.
1548#[derive(Debug, Clone, PartialEq, Eq)]
1549pub enum ColumnDiff {
1550    /// Column exists only in the left relation.
1551    Missing { name: ColumnName },
1552    /// Column exists only in the right relation.
1553    Extra { name: ColumnName },
1554    /// Columns have different types.
1555    TypeMismatch {
1556        name: ColumnName,
1557        left: SqlScalarType,
1558        right: SqlScalarType,
1559    },
1560    /// Columns have different nullability.
1561    NullabilityMismatch {
1562        name: ColumnName,
1563        left: bool,
1564        right: bool,
1565    },
1566    /// Columns have different names.
1567    NameMismatch { left: ColumnName, right: ColumnName },
1568}
1569
1570/// A difference in the keys of two [`RelationDesc`]s.
1571#[derive(Debug, Clone, PartialEq, Eq)]
1572pub struct KeyDiff {
1573    /// Keys of the left relation.
1574    pub left: BTreeSet<Vec<ColumnName>>,
1575    /// Keys of the right relation.
1576    pub right: BTreeSet<Vec<ColumnName>>,
1577}
1578
1579/// A builder for a [`RelationDesc`].
1580#[derive(Clone, Default, Debug, PartialEq, Eq)]
1581pub struct RelationDescBuilder {
1582    /// Columns of the relation.
1583    columns: Vec<(ColumnName, SqlColumnType)>,
1584    /// Sets of indices that are "keys" for the collection.
1585    keys: Vec<Vec<usize>>,
1586}
1587
1588impl RelationDescBuilder {
1589    /// Appends a column with the specified name and type.
1590    pub fn with_column<N: Into<ColumnName>>(
1591        mut self,
1592        name: N,
1593        ty: SqlColumnType,
1594    ) -> RelationDescBuilder {
1595        let name = name.into();
1596        self.columns.push((name, ty));
1597        self
1598    }
1599
1600    /// Appends the provided columns to the builder.
1601    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1602    where
1603        I: IntoIterator<Item = (N, T)>,
1604        T: Into<SqlColumnType>,
1605        N: Into<ColumnName>,
1606    {
1607        self.columns
1608            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1609        self
1610    }
1611
1612    /// Adds a new key for the relation.
1613    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1614        indices.sort_unstable();
1615        if !self.keys.contains(&indices) {
1616            self.keys.push(indices);
1617        }
1618        self
1619    }
1620
1621    /// Removes all previously inserted keys.
1622    pub fn without_keys(mut self) -> RelationDescBuilder {
1623        self.keys.clear();
1624        assert_eq!(self.keys.len(), 0);
1625        self
1626    }
1627
1628    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1629    pub fn concat(mut self, other: Self) -> Self {
1630        let self_len = self.columns.len();
1631
1632        self.columns.extend(other.columns);
1633        for k in other.keys {
1634            let k = k.into_iter().map(|idx| idx + self_len).collect();
1635            self = self.with_key(k);
1636        }
1637
1638        self
1639    }
1640
1641    /// Finish the builder, returning a [`RelationDesc`].
1642    pub fn finish(self) -> RelationDesc {
1643        let mut desc = RelationDesc::from_names_and_types(self.columns);
1644        desc.typ = desc.typ.with_keys(self.keys);
1645        desc
1646    }
1647}
1648
1649/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1650#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1651pub enum RelationVersionSelector {
1652    Specific(RelationVersion),
1653    Latest,
1654}
1655
1656impl RelationVersionSelector {
1657    pub fn specific(version: u64) -> Self {
1658        RelationVersionSelector::Specific(RelationVersion(version))
1659    }
1660}
1661
1662/// A wrapper around [`RelationDesc`] that provides an interface for adding
1663/// columns and generating new versions.
1664///
1665/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1666/// be great.
1667#[derive(Debug, Clone, Serialize)]
1668pub struct VersionedRelationDesc {
1669    inner: RelationDesc,
1670}
1671
1672impl VersionedRelationDesc {
1673    pub fn new(inner: RelationDesc) -> Self {
1674        VersionedRelationDesc { inner }
1675    }
1676
1677    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1678    ///
1679    /// # Panics
1680    ///
1681    /// * Panics if a column with `name` already exists that hasn't been dropped.
1682    ///
1683    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1684    #[must_use]
1685    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1686    where
1687        N: Into<ColumnName>,
1688        T: Into<SqlColumnType>,
1689    {
1690        let latest_version = self.latest_version();
1691        let new_version = latest_version.bump();
1692
1693        let name = name.into();
1694        let existing = self
1695            .inner
1696            .metadata
1697            .iter()
1698            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1699        if let Some(existing) = existing {
1700            panic!("column named '{name}' already exists! {existing:?}");
1701        }
1702
1703        let next_idx = self.inner.metadata.len();
1704        let col_meta = ColumnMetadata {
1705            name,
1706            typ_idx: next_idx,
1707            added: new_version,
1708            dropped: None,
1709        };
1710
1711        self.inner.typ.column_types.push(typ.into());
1712        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1713
1714        assert_none!(prev, "column index overlap!");
1715        self.validate();
1716
1717        new_version
1718    }
1719
1720    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1721    /// `name` drops the left-most one that hasn't already been dropped.
1722    ///
1723    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1724    ///
1725    /// # Panics
1726    ///
1727    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1728    #[must_use]
1729    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1730    where
1731        N: Into<ColumnName>,
1732    {
1733        let name = name.into();
1734        let latest_version = self.latest_version();
1735        let new_version = latest_version.bump();
1736
1737        let col = self
1738            .inner
1739            .metadata
1740            .values_mut()
1741            .find(|meta| meta.name == name && meta.dropped.is_none())
1742            .expect("column to exist");
1743
1744        // Make sure the column hadn't been previously dropped.
1745        assert_none!(col.dropped, "column was already dropped");
1746        col.dropped = Some(new_version);
1747
1748        // Make sure the column isn't being used as a key.
1749        let dropped_key = self
1750            .inner
1751            .typ
1752            .keys
1753            .iter()
1754            .any(|keys| keys.contains(&col.typ_idx));
1755        assert!(!dropped_key, "column being dropped was used as a key");
1756
1757        self.validate();
1758        new_version
1759    }
1760
1761    /// Returns the [`RelationDesc`] at the latest version.
1762    pub fn latest(&self) -> RelationDesc {
1763        self.inner.clone()
1764    }
1765
1766    /// Returns this [`RelationDesc`] at the specified version.
1767    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1768        // Get all of the changes from the start, up to whatever version was requested.
1769        let up_to_version = match version {
1770            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1771            RelationVersionSelector::Specific(v) => v,
1772        };
1773
1774        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1775            let added = meta.added <= up_to_version;
1776            let dropped = meta
1777                .dropped
1778                .map(|dropped_at| up_to_version >= dropped_at)
1779                .unwrap_or(false);
1780
1781            added && !dropped
1782        });
1783
1784        let mut column_types = Vec::new();
1785        let mut column_metas = BTreeMap::new();
1786
1787        // N.B. At this point we need to be careful because col_idx might not
1788        // equal typ_idx.
1789        //
1790        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1791        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1792        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1793        for (col_idx, meta) in valid_columns {
1794            let new_meta = ColumnMetadata {
1795                name: meta.name.clone(),
1796                typ_idx: column_types.len(),
1797                added: meta.added.clone(),
1798                dropped: meta.dropped.clone(),
1799            };
1800            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1801            column_metas.insert(*col_idx, new_meta);
1802        }
1803
1804        // Remap keys in case a column with an index less than that of a key was
1805        // dropped.
1806        //
1807        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1808        // keys and "b" was dropped.
1809        let keys = self
1810            .inner
1811            .typ
1812            .keys
1813            .iter()
1814            .map(|keys| {
1815                keys.iter()
1816                    .map(|key_idx| {
1817                        let metadata = column_metas
1818                            .get(&ColumnIndex(*key_idx))
1819                            .expect("found key for column that doesn't exist");
1820                        metadata.typ_idx
1821                    })
1822                    .collect()
1823            })
1824            .collect();
1825
1826        let relation_type = SqlRelationType { column_types, keys };
1827
1828        RelationDesc {
1829            typ: relation_type,
1830            metadata: column_metas,
1831        }
1832    }
1833
1834    pub fn latest_version(&self) -> RelationVersion {
1835        self.inner
1836            .metadata
1837            .values()
1838            // N.B. Dropped is always greater than added.
1839            .map(|meta| meta.dropped.unwrap_or(meta.added))
1840            .max()
1841            // If there aren't any columns we're implicitly the root version.
1842            .unwrap_or_else(RelationVersion::root)
1843    }
1844
1845    /// Validates internal contraints of the [`RelationDesc`] are correct.
1846    ///
1847    /// # Panics
1848    ///
1849    /// Panics if a constraint is not satisfied.
1850    fn validate(&self) {
1851        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1852            if desc.typ.column_types.len() != desc.metadata.len() {
1853                anyhow::bail!("mismatch between number of types and metadatas");
1854            }
1855
1856            for (col_idx, meta) in &desc.metadata {
1857                if col_idx.0 > desc.metadata.len() {
1858                    anyhow::bail!("column index out of bounds");
1859                }
1860                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1861                    anyhow::bail!("column was added after it was dropped?");
1862                }
1863                if desc.typ().columns().get(meta.typ_idx).is_none() {
1864                    anyhow::bail!("typ_idx incorrect");
1865                }
1866            }
1867
1868            for keys in &desc.typ.keys {
1869                for key in keys {
1870                    if *key >= desc.typ.column_types.len() {
1871                        anyhow::bail!("key index was out of bounds!");
1872                    }
1873                }
1874            }
1875
1876            let versions = desc
1877                .metadata
1878                .values()
1879                .map(|meta| meta.dropped.unwrap_or(meta.added));
1880            let mut max = 0;
1881            let mut sum = 0;
1882            for version in versions {
1883                max = std::cmp::max(max, version.0);
1884                sum += version.0;
1885            }
1886
1887            // Other than RelationVersion(0), we should never have duplicate
1888            // versions and they should always increase by 1. In other words, the
1889            // sum of all RelationVersions should be the sum of [0, max].
1890            //
1891            // N.B. n * (n + 1) / 2 = sum of [0, n]
1892            //
1893            // While I normally don't like tricks like this, it allows us to
1894            // validate that our column versions are correct in O(n) time and
1895            // without allocations.
1896            if sum != (max * (max + 1) / 2) {
1897                anyhow::bail!("there is a duplicate or missing relation version");
1898            }
1899
1900            Ok(())
1901        }
1902
1903        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1904    }
1905}
1906
1907/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1908/// exercise schema migrations.
1909#[derive(Debug)]
1910pub enum PropRelationDescDiff {
1911    AddColumn {
1912        name: ColumnName,
1913        typ: SqlColumnType,
1914    },
1915    DropColumn {
1916        name: ColumnName,
1917    },
1918    ToggleNullability {
1919        name: ColumnName,
1920    },
1921    ChangeType {
1922        name: ColumnName,
1923        typ: SqlColumnType,
1924    },
1925}
1926
1927impl PropRelationDescDiff {
1928    pub fn apply(self, desc: &mut RelationDesc) {
1929        match self {
1930            PropRelationDescDiff::AddColumn { name, typ } => {
1931                let new_idx = desc.metadata.len();
1932                let meta = ColumnMetadata {
1933                    name,
1934                    typ_idx: new_idx,
1935                    added: RelationVersion(0),
1936                    dropped: None,
1937                };
1938                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1939                desc.typ.column_types.push(typ);
1940
1941                assert_none!(prev);
1942                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1943            }
1944            PropRelationDescDiff::DropColumn { name } => {
1945                let next_version = desc
1946                    .metadata
1947                    .values()
1948                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1949                    .max()
1950                    .unwrap_or_else(RelationVersion::root)
1951                    .bump();
1952                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1953                else {
1954                    return;
1955                };
1956                if metadata.dropped.is_none() {
1957                    metadata.dropped = Some(next_version);
1958                }
1959            }
1960            PropRelationDescDiff::ToggleNullability { name } => {
1961                let Some((pos, _)) = desc.get_by_name(&name) else {
1962                    return;
1963                };
1964                let col_type = desc
1965                    .typ
1966                    .column_types
1967                    .get_mut(pos)
1968                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1969                col_type.nullable = !col_type.nullable;
1970            }
1971            PropRelationDescDiff::ChangeType { name, typ } => {
1972                let Some((pos, _)) = desc.get_by_name(&name) else {
1973                    return;
1974                };
1975                let col_type = desc
1976                    .typ
1977                    .column_types
1978                    .get_mut(pos)
1979                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1980                *col_type = typ;
1981            }
1982        }
1983    }
1984}
1985
1986/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1987pub fn arb_relation_desc_diff(
1988    source: &RelationDesc,
1989) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1990    let source = Rc::new(source.clone());
1991    let num_source_columns = source.typ.columns().len();
1992
1993    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1994    let add_columns_strat = num_add_columns
1995        .prop_flat_map(|num_columns| {
1996            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1997        })
1998        .prop_map(|cols| {
1999            cols.into_iter()
2000                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
2001                .collect::<Vec<_>>()
2002        });
2003
2004    // If the source RelationDesc is empty there is nothing else to do.
2005    if num_source_columns == 0 {
2006        return add_columns_strat.boxed();
2007    }
2008
2009    let source_ = Rc::clone(&source);
2010    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2011        let mut set = BTreeSet::default();
2012        for _ in 0..num_columns {
2013            let col_idx = rng.random_range(0..num_source_columns);
2014            set.insert(source_.get_name(col_idx).clone());
2015        }
2016        set.into_iter()
2017            .map(|name| PropRelationDescDiff::DropColumn { name })
2018            .collect::<Vec<_>>()
2019    });
2020
2021    let source_ = Rc::clone(&source);
2022    let toggle_nullability_strat =
2023        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2024            let mut set = BTreeSet::default();
2025            for _ in 0..num_columns {
2026                let col_idx = rng.random_range(0..num_source_columns);
2027                set.insert(source_.get_name(col_idx).clone());
2028            }
2029            set.into_iter()
2030                .map(|name| PropRelationDescDiff::ToggleNullability { name })
2031                .collect::<Vec<_>>()
2032        });
2033
2034    let source_ = Rc::clone(&source);
2035    let change_type_strat = (0..num_source_columns)
2036        .prop_perturb(move |num_columns, mut rng| {
2037            let mut set = BTreeSet::default();
2038            for _ in 0..num_columns {
2039                let col_idx = rng.random_range(0..num_source_columns);
2040                set.insert(source_.get_name(col_idx).clone());
2041            }
2042            set
2043        })
2044        .prop_flat_map(|cols| {
2045            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
2046                .prop_map(move |types| (cols.clone(), types))
2047        })
2048        .prop_map(|(cols, types)| {
2049            cols.into_iter()
2050                .zip_eq(types)
2051                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
2052                .collect::<Vec<_>>()
2053        });
2054
2055    (
2056        add_columns_strat,
2057        drop_columns_strat,
2058        toggle_nullability_strat,
2059        change_type_strat,
2060    )
2061        .prop_map(|(adds, drops, toggles, changes)| {
2062            adds.into_iter()
2063                .chain(drops)
2064                .chain(toggles)
2065                .chain(changes)
2066                .collect::<Vec<_>>()
2067        })
2068        .prop_shuffle()
2069        .boxed()
2070}
2071
2072#[cfg(test)]
2073mod tests {
2074    use super::*;
2075    use prost::Message;
2076
2077    #[mz_ore::test]
2078    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2079    fn smoktest_at_version() {
2080        let desc = RelationDesc::builder()
2081            .with_column("a", SqlScalarType::Bool.nullable(true))
2082            .with_column("z", SqlScalarType::String.nullable(false))
2083            .finish();
2084
2085        let mut versioned_desc = VersionedRelationDesc {
2086            inner: desc.clone(),
2087        };
2088        versioned_desc.validate();
2089
2090        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2091        assert_eq!(desc, latest);
2092
2093        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2094        assert_eq!(desc, v0);
2095
2096        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2097        assert_eq!(desc, v3);
2098
2099        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2100        assert_eq!(v1, RelationVersion(1));
2101
2102        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2103        insta::assert_json_snapshot!(v1.metadata, @r###"
2104        {
2105          "0": {
2106            "name": "a",
2107            "typ_idx": 0,
2108            "added": 0,
2109            "dropped": null
2110          },
2111          "1": {
2112            "name": "z",
2113            "typ_idx": 1,
2114            "added": 0,
2115            "dropped": null
2116          },
2117          "2": {
2118            "name": "b",
2119            "typ_idx": 2,
2120            "added": 1,
2121            "dropped": null
2122          }
2123        }
2124        "###);
2125
2126        // Check that V0 doesn't show the new column.
2127        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2128        assert!(v0.iter().eq(v0_b.iter()));
2129
2130        let v2 = versioned_desc.drop_column("z");
2131        assert_eq!(v2, RelationVersion(2));
2132
2133        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2134        insta::assert_json_snapshot!(v2.metadata, @r###"
2135        {
2136          "0": {
2137            "name": "a",
2138            "typ_idx": 0,
2139            "added": 0,
2140            "dropped": null
2141          },
2142          "2": {
2143            "name": "b",
2144            "typ_idx": 1,
2145            "added": 1,
2146            "dropped": null
2147          }
2148        }
2149        "###);
2150
2151        // Check that V0 and V1 are still correct.
2152        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2153        assert!(v0.iter().eq(v0_c.iter()));
2154
2155        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2156        assert!(v1.iter().eq(v1_b.iter()));
2157
2158        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2159        {
2160          "0": {
2161            "name": "a",
2162            "typ_idx": 0,
2163            "added": 0,
2164            "dropped": null
2165          },
2166          "1": {
2167            "name": "z",
2168            "typ_idx": 1,
2169            "added": 0,
2170            "dropped": 2
2171          },
2172          "2": {
2173            "name": "b",
2174            "typ_idx": 2,
2175            "added": 1,
2176            "dropped": null
2177          }
2178        }
2179        "###);
2180    }
2181
2182    #[mz_ore::test]
2183    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2184    fn test_dropping_columns_with_keys() {
2185        let desc = RelationDesc::builder()
2186            .with_column("a", SqlScalarType::Bool.nullable(true))
2187            .with_column("z", SqlScalarType::String.nullable(false))
2188            .with_key(vec![1])
2189            .finish();
2190
2191        let mut versioned_desc = VersionedRelationDesc {
2192            inner: desc.clone(),
2193        };
2194        versioned_desc.validate();
2195
2196        let v1 = versioned_desc.drop_column("a");
2197        assert_eq!(v1, RelationVersion(1));
2198
2199        // Make sure the key index for 'z' got remapped since 'a' was dropped.
2200        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2201        insta::assert_json_snapshot!(v1, @r###"
2202        {
2203          "typ": {
2204            "column_types": [
2205              {
2206                "scalar_type": "String",
2207                "nullable": false
2208              }
2209            ],
2210            "keys": [
2211              [
2212                0
2213              ]
2214            ]
2215          },
2216          "metadata": {
2217            "1": {
2218              "name": "z",
2219              "typ_idx": 0,
2220              "added": 0,
2221              "dropped": null
2222            }
2223          }
2224        }
2225        "###);
2226
2227        // Make sure the key index of 'z' is correct when all columns are present.
2228        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2229        insta::assert_json_snapshot!(v0, @r###"
2230        {
2231          "typ": {
2232            "column_types": [
2233              {
2234                "scalar_type": "Bool",
2235                "nullable": true
2236              },
2237              {
2238                "scalar_type": "String",
2239                "nullable": false
2240              }
2241            ],
2242            "keys": [
2243              [
2244                1
2245              ]
2246            ]
2247          },
2248          "metadata": {
2249            "0": {
2250              "name": "a",
2251              "typ_idx": 0,
2252              "added": 0,
2253              "dropped": 1
2254            },
2255            "1": {
2256              "name": "z",
2257              "typ_idx": 1,
2258              "added": 0,
2259              "dropped": null
2260            }
2261          }
2262        }
2263        "###);
2264    }
2265
2266    #[mz_ore::test]
2267    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2268    fn roundtrip_relation_desc_without_metadata() {
2269        let typ = ProtoRelationType {
2270            column_types: vec![
2271                SqlScalarType::String.nullable(false).into_proto(),
2272                SqlScalarType::Bool.nullable(true).into_proto(),
2273            ],
2274            keys: vec![],
2275        };
2276        let proto = ProtoRelationDesc {
2277            typ: Some(typ),
2278            names: vec![
2279                ColumnName("a".into()).into_proto(),
2280                ColumnName("b".into()).into_proto(),
2281            ],
2282            metadata: vec![],
2283        };
2284        let desc: RelationDesc = proto.into_rust().unwrap();
2285
2286        insta::assert_json_snapshot!(desc, @r###"
2287        {
2288          "typ": {
2289            "column_types": [
2290              {
2291                "scalar_type": "String",
2292                "nullable": false
2293              },
2294              {
2295                "scalar_type": "Bool",
2296                "nullable": true
2297              }
2298            ],
2299            "keys": []
2300          },
2301          "metadata": {
2302            "0": {
2303              "name": "a",
2304              "typ_idx": 0,
2305              "added": 0,
2306              "dropped": null
2307            },
2308            "1": {
2309              "name": "b",
2310              "typ_idx": 1,
2311              "added": 0,
2312              "dropped": null
2313            }
2314          }
2315        }
2316        "###);
2317    }
2318
2319    #[mz_ore::test]
2320    #[should_panic(expected = "column named 'a' already exists!")]
2321    fn test_add_column_with_same_name_panics() {
2322        let desc = RelationDesc::builder()
2323            .with_column("a", SqlScalarType::Bool.nullable(true))
2324            .finish();
2325        let mut versioned = VersionedRelationDesc::new(desc);
2326
2327        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2328    }
2329
2330    #[mz_ore::test]
2331    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2332    fn test_add_column_with_same_name_prev_dropped() {
2333        let desc = RelationDesc::builder()
2334            .with_column("a", SqlScalarType::Bool.nullable(true))
2335            .finish();
2336        let mut versioned = VersionedRelationDesc::new(desc);
2337
2338        let v1 = versioned.drop_column("a");
2339        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2340        insta::assert_json_snapshot!(v1, @r###"
2341        {
2342          "typ": {
2343            "column_types": [],
2344            "keys": []
2345          },
2346          "metadata": {}
2347        }
2348        "###);
2349
2350        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2351        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2352        insta::assert_json_snapshot!(v2, @r###"
2353        {
2354          "typ": {
2355            "column_types": [
2356              {
2357                "scalar_type": "String",
2358                "nullable": false
2359              }
2360            ],
2361            "keys": []
2362          },
2363          "metadata": {
2364            "1": {
2365              "name": "a",
2366              "typ_idx": 0,
2367              "added": 2,
2368              "dropped": null
2369            }
2370          }
2371        }
2372        "###);
2373    }
2374
2375    #[mz_ore::test]
2376    #[cfg_attr(miri, ignore)]
2377    fn apply_demand() {
2378        let desc = RelationDesc::builder()
2379            .with_column("a", SqlScalarType::String.nullable(true))
2380            .with_column("b", SqlScalarType::Int64.nullable(false))
2381            .with_column("c", SqlScalarType::Time.nullable(false))
2382            .finish();
2383        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2384        assert_eq!(desc.arity(), 2);
2385        // TODO(parkmycar): Move validate onto RelationDesc.
2386        VersionedRelationDesc::new(desc).validate();
2387    }
2388
2389    #[mz_ore::test]
2390    #[cfg_attr(miri, ignore)]
2391    fn smoketest_column_index_stable_ident() {
2392        let idx_a = ColumnIndex(42);
2393        // Note(parkmycar): This should never change.
2394        assert_eq!(idx_a.to_stable_name(), "42");
2395    }
2396
2397    #[mz_ore::test]
2398    #[cfg_attr(miri, ignore)] // too slow
2399    fn proptest_relation_desc_roundtrips() {
2400        fn testcase(og: RelationDesc) {
2401            let bytes = og.into_proto().encode_to_vec();
2402            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2403            let rnd = RelationDesc::from_proto(proto).unwrap();
2404
2405            assert_eq!(og, rnd);
2406        }
2407
2408        proptest!(|(desc in any::<RelationDesc>())| {
2409            testcase(desc);
2410        });
2411
2412        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2413            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2414        });
2415
2416        proptest!(|((mut desc, diffs) in strat)| {
2417            for diff in diffs {
2418                diff.apply(&mut desc);
2419            };
2420            testcase(desc);
2421        });
2422    }
2423}