Skip to main content

mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::soft_panic_or_log;
19use mz_ore::str::StrExt;
20use mz_ore::{assert_none, assert_ok};
21use mz_persist_types::schema::SchemaId;
22use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
23#[cfg(any(test, feature = "proptest"))]
24use proptest::prelude::*;
25#[cfg(any(test, feature = "proptest"))]
26use proptest::strategy::{Strategy, Union};
27#[cfg(any(test, feature = "proptest"))]
28use proptest_derive::Arbitrary;
29use serde::{Deserialize, Serialize};
30
31#[cfg(any(test, feature = "proptest"))]
32use crate::arb_datum_for_column;
33use crate::relation_and_scalar::proto_relation_type::ProtoKey;
34pub use crate::relation_and_scalar::{
35    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
36    ProtoRelationVersion,
37};
38use crate::{Datum, ReprScalarType, Row, SqlScalarType};
39
40/// The type of a [`Datum`].
41///
42/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
43/// Int32 or String) with its nullability.
44///
45/// To construct a column type, either initialize the struct directly, or
46/// use the [`SqlScalarType::nullable`] method.
47#[derive(
48    Clone,
49    Debug,
50    Eq,
51    PartialEq,
52    Ord,
53    PartialOrd,
54    Serialize,
55    Deserialize,
56    Hash,
57    MzReflect
58)]
59#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
60pub struct SqlColumnType {
61    /// The underlying scalar type (e.g., Int32 or String) of this column.
62    pub scalar_type: SqlScalarType,
63    /// Whether this datum can be null.
64    #[serde(default = "return_true")]
65    pub nullable: bool,
66}
67
68/// This method exists solely for the purpose of making SqlColumnType nullable by
69/// default in unit tests. The default value of a bool is false, and the only
70/// way to make an object take on any other value by default is to pass it a
71/// function that returns the desired default value. See
72/// <https://github.com/serde-rs/serde/issues/1030>
73#[inline(always)]
74fn return_true() -> bool {
75    true
76}
77
78impl SqlColumnType {
79    /// Compute the least upper bound of many column types, returning an error on
80    /// incompatible types or an empty iterator.
81    /// See [`SqlColumnType::try_union`] for details.
82    pub fn try_union_many<'a>(
83        typs: impl IntoIterator<Item = &'a Self>,
84    ) -> Result<Self, anyhow::Error> {
85        let mut iter = typs.into_iter();
86        let Some(typ) = iter.next() else {
87            bail!("Cannot union empty iterator");
88        };
89        iter.try_fold(typ.clone(), |a, b| a.try_union(b))
90    }
91
92    /// Compute the least upper bound of many column types.
93    /// See [`SqlColumnType::try_union`] for details.
94    ///
95    /// Panics on incompatible types or an empty iterator.
96    pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
97        Self::try_union_many(typs).expect("Cannot union empty iterator")
98    }
99
100    /// Backports nullability information from `backport_typ` into `self`,
101    /// affecting the outer `.nullable` field but also record fields deeper
102    /// into the type.
103    pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
104        self.scalar_type
105            .backport_nullability(&backport_typ.scalar_type);
106        self.nullable = backport_typ.nullable;
107    }
108
109    /// Compute the least upper bound of two column types at the SQL level.
110    ///
111    /// Two types are compatible when they are equal, share the same base type
112    /// (differing only in modifiers), or are records with pairwise-compatible
113    /// fields.
114    /// The resulting nullability is the disjunction of the two input
115    /// nullabilities.
116    ///
117    /// Returns an error for incompatible types, e.g. `Text` and `Int32`, or
118    /// `Text` and `VarChar` (different base types at the SQL level).
119    /// See [`SqlColumnType::try_union`] for a fallback that handles the latter
120    /// case via repr-level union.
121    pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
122        match (&self.scalar_type, &other.scalar_type) {
123            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
124                Ok(SqlColumnType {
125                    scalar_type: scalar_type.clone(),
126                    nullable: self.nullable || other.nullable,
127                })
128            }
129            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
130                Ok(SqlColumnType {
131                    scalar_type: scalar_type.without_modifiers(),
132                    nullable: self.nullable || other.nullable,
133                })
134            }
135            (
136                SqlScalarType::Record { fields, custom_id },
137                SqlScalarType::Record {
138                    fields: other_fields,
139                    custom_id: other_custom_id,
140                },
141            ) => {
142                if custom_id != other_custom_id {
143                    bail!(
144                        "Can't union types: {:?} and {:?}",
145                        self.scalar_type,
146                        other.scalar_type
147                    );
148                };
149
150                if fields.len() != other_fields.len() {
151                    bail!(
152                        "Can't union types: {:?} and {:?}",
153                        self.scalar_type,
154                        other.scalar_type
155                    );
156                }
157                let mut union_fields = Vec::with_capacity(fields.len());
158                for ((name, typ), (other_name, other_typ)) in
159                    fields.iter().zip_eq(other_fields.iter())
160                {
161                    if name != other_name {
162                        bail!(
163                            "Can't union types: {:?} and {:?}",
164                            self.scalar_type,
165                            other.scalar_type
166                        );
167                    } else {
168                        let union_column_type = typ.sql_union(other_typ)?;
169                        union_fields.push((name.clone(), union_column_type));
170                    };
171                }
172
173                Ok(SqlColumnType {
174                    scalar_type: SqlScalarType::Record {
175                        fields: union_fields.into(),
176                        custom_id: *custom_id,
177                    },
178                    nullable: self.nullable || other.nullable,
179                })
180            }
181            _ => bail!(
182                "Can't union types: {:?} and {:?}",
183                self.scalar_type,
184                other.scalar_type
185            ),
186        }
187    }
188
189    /// Compute the least upper bound of two column types.
190    ///
191    /// Attempts [`SqlColumnType::sql_union`] first, which preserves SQL-level type
192    /// information (e.g. modifiers). Falls back to a repr-level union via
193    /// [`ReprColumnType::union`] when the SQL types are incompatible but the
194    /// underlying repr types are compatible.
195    ///
196    /// The resulting nullability is the disjunction of the two input
197    /// nullabilities.
198    pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
199        self.sql_union(other).or_else(|e| {
200            let repr_self = ReprColumnType::from(self);
201            let repr_other = ReprColumnType::from(other);
202            match repr_self.union(&repr_other) {
203                Ok(typ) => {
204                    // sql_union failed but repr union succeeded — this indicates
205                    // a repr-type canonicalization gap that we want CI visibility for.
206                    soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
207                    Ok(SqlColumnType::from_repr(&typ))
208                }
209                Err(_) => {
210                    // Both sql_union and repr union failed — genuine type mismatch,
211                    // not a canonicalization issue. Just propagate the original error.
212                    Err(e)
213                }
214            }
215        })
216    }
217
218    /// Compute the least upper bound of two column types.
219    /// See [`SqlColumnType::try_union`] for details.
220    ///
221    /// Panics on incompatible types.
222    pub fn union(&self, other: &Self) -> Self {
223        self.try_union(other).unwrap_or_else(|e| {
224            panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
225        })
226    }
227
228    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
229    /// nullability set to the specified boolean.
230    pub fn nullable(mut self, nullable: bool) -> Self {
231        self.nullable = nullable;
232        self
233    }
234}
235
236impl RustType<ProtoColumnType> for SqlColumnType {
237    fn into_proto(&self) -> ProtoColumnType {
238        ProtoColumnType {
239            nullable: self.nullable,
240            scalar_type: Some(self.scalar_type.into_proto()),
241        }
242    }
243
244    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
245        Ok(SqlColumnType {
246            nullable: proto.nullable,
247            scalar_type: proto
248                .scalar_type
249                .into_rust_if_some("ProtoColumnType::scalar_type")?,
250        })
251    }
252}
253
254impl fmt::Display for SqlColumnType {
255    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256        let nullable = if self.nullable { "Null" } else { "NotNull" };
257        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
258    }
259}
260
261/// The type of a relation.
262#[derive(
263    Clone,
264    Debug,
265    Eq,
266    PartialEq,
267    Ord,
268    PartialOrd,
269    Serialize,
270    Deserialize,
271    Hash,
272    MzReflect
273)]
274#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
275pub struct SqlRelationType {
276    /// The type for each column, in order.
277    pub column_types: Vec<SqlColumnType>,
278    /// Sets of indices that are "keys" for the collection.
279    ///
280    /// Each element in this list is a set of column indices, each with the
281    /// property that the collection contains at most one record with each
282    /// distinct set of values for each column. Alternately, for a specific set
283    /// of values assigned to the these columns there is at most one record.
284    ///
285    /// A collection can contain multiple sets of keys, although it is common to
286    /// have either zero or one sets of key indices.
287    #[serde(default)]
288    pub keys: Vec<Vec<usize>>,
289}
290
291impl SqlRelationType {
292    /// Constructs a `SqlRelationType` representing the relation with no columns and
293    /// no keys.
294    pub fn empty() -> Self {
295        SqlRelationType::new(vec![])
296    }
297
298    /// Constructs a new `SqlRelationType` from specified column types.
299    ///
300    /// The `SqlRelationType` will have no keys.
301    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
302        SqlRelationType {
303            column_types,
304            keys: Vec::new(),
305        }
306    }
307
308    /// Adds a new key for the relation.
309    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
310        indices.sort_unstable();
311        if !self.keys.contains(&indices) {
312            self.keys.push(indices);
313        }
314        self
315    }
316
317    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
318        for key in keys {
319            self = self.with_key(key)
320        }
321        self
322    }
323
324    /// Computes the number of columns in the relation.
325    pub fn arity(&self) -> usize {
326        self.column_types.len()
327    }
328
329    /// Gets the index of the columns used when creating a default index.
330    pub fn default_key(&self) -> Vec<usize> {
331        if let Some(key) = self.keys.first() {
332            if key.is_empty() {
333                (0..self.column_types.len()).collect()
334            } else {
335                key.clone()
336            }
337        } else {
338            (0..self.column_types.len()).collect()
339        }
340    }
341
342    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
343    pub fn columns(&self) -> &[SqlColumnType] {
344        &self.column_types
345    }
346
347    /// Adopts the nullability and keys from another `SqlRelationType`.
348    ///
349    /// Panics if the number of columns does not match.
350    pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
351        assert_eq!(
352            backport_typ.column_types.len(),
353            self.column_types.len(),
354            "HIR and MIR types should have the same number of columns"
355        );
356        for (backport_col, sql_col) in backport_typ
357            .column_types
358            .iter()
359            .zip_eq(self.column_types.iter_mut())
360        {
361            sql_col.backport_nullability(backport_col);
362        }
363
364        self.keys = backport_typ.keys.clone();
365    }
366
367    /// Constructs a `SqlRelationType` from a `ReprRelationType` by converting
368    /// each column type via [`SqlColumnType::from_repr`]. This is a lossy
369    /// inverse of `ReprRelationType::from(&SqlRelationType)`.
370    pub fn from_repr(repr: &ReprRelationType) -> Self {
371        SqlRelationType {
372            column_types: repr
373                .column_types
374                .iter()
375                .map(SqlColumnType::from_repr)
376                .collect(),
377            keys: repr.keys.clone(),
378        }
379    }
380}
381
382impl RustType<ProtoRelationType> for SqlRelationType {
383    fn into_proto(&self) -> ProtoRelationType {
384        ProtoRelationType {
385            column_types: self.column_types.into_proto(),
386            keys: self.keys.into_proto(),
387        }
388    }
389
390    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
391        Ok(SqlRelationType {
392            column_types: proto.column_types.into_rust()?,
393            keys: proto.keys.into_rust()?,
394        })
395    }
396}
397
398impl RustType<ProtoKey> for Vec<usize> {
399    fn into_proto(&self) -> ProtoKey {
400        ProtoKey {
401            keys: self.into_proto(),
402        }
403    }
404
405    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
406        proto.keys.into_rust()
407    }
408}
409
410/// The type of a relation.
411#[derive(
412    Clone,
413    Debug,
414    Eq,
415    PartialEq,
416    Ord,
417    PartialOrd,
418    Serialize,
419    Deserialize,
420    Hash,
421    MzReflect
422)]
423pub struct ReprRelationType {
424    /// The type for each column, in order.
425    pub column_types: Vec<ReprColumnType>,
426    /// Sets of indices that are "keys" for the collection.
427    ///
428    /// Each element in this list is a set of column indices, each with the
429    /// property that the collection contains at most one record with each
430    /// distinct set of values for each column. Alternately, for a specific set
431    /// of values assigned to the these columns there is at most one record.
432    ///
433    /// A collection can contain multiple sets of keys, although it is common to
434    /// have either zero or one sets of key indices.
435    #[serde(default)]
436    pub keys: Vec<Vec<usize>>,
437}
438
439impl ReprRelationType {
440    /// Constructs a `ReprRelationType` representing the relation with no columns and
441    /// no keys.
442    pub fn empty() -> Self {
443        ReprRelationType::new(vec![])
444    }
445
446    /// Constructs a new `ReprRelationType` from specified column types.
447    ///
448    /// The `ReprRelationType` will have no keys.
449    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
450        ReprRelationType {
451            column_types,
452            keys: Vec::new(),
453        }
454    }
455
456    /// Adds a new key for the relation.
457    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
458        indices.sort_unstable();
459        if !self.keys.contains(&indices) {
460            self.keys.push(indices);
461        }
462        self
463    }
464
465    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
466        for key in keys {
467            self = self.with_key(key)
468        }
469        self
470    }
471
472    /// Computes the number of columns in the relation.
473    pub fn arity(&self) -> usize {
474        self.column_types.len()
475    }
476
477    /// Gets the index of the columns used when creating a default index.
478    pub fn default_key(&self) -> Vec<usize> {
479        if let Some(key) = self.keys.first() {
480            if key.is_empty() {
481                (0..self.column_types.len()).collect()
482            } else {
483                key.clone()
484            }
485        } else {
486            (0..self.column_types.len()).collect()
487        }
488    }
489
490    /// Returns all the column types in order, for this relation.
491    pub fn columns(&self) -> &[ReprColumnType] {
492        &self.column_types
493    }
494}
495
496impl From<&SqlRelationType> for ReprRelationType {
497    fn from(sql_relation_type: &SqlRelationType) -> Self {
498        ReprRelationType {
499            column_types: sql_relation_type
500                .column_types
501                .iter()
502                .map(ReprColumnType::from)
503                .collect(),
504            keys: sql_relation_type.keys.clone(),
505        }
506    }
507}
508
509#[derive(
510    Clone,
511    Debug,
512    Eq,
513    PartialEq,
514    Ord,
515    PartialOrd,
516    Serialize,
517    Deserialize,
518    Hash,
519    MzReflect
520)]
521pub struct ReprColumnType {
522    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
523    pub scalar_type: ReprScalarType,
524    /// Whether this datum can be null.
525    #[serde(default = "return_true")]
526    pub nullable: bool,
527}
528
529impl std::fmt::Display for ReprColumnType {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        write!(f, "{}", self.scalar_type)?;
532        if self.nullable {
533            write!(f, "?")?;
534        }
535        Ok(())
536    }
537}
538
539impl ReprColumnType {
540    /// Compute the least upper bound of two column types at the repr level.
541    ///
542    /// More permissive than [`SqlColumnType::sql_union`] because it operates
543    /// on the underlying representation types, ignoring SQL-level distinctions
544    /// such as modifiers.
545    /// The resulting nullability is the disjunction of the two inputs.
546    pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
547        let scalar_type = self.scalar_type.union(&col.scalar_type)?;
548        let nullable = self.nullable || col.nullable;
549
550        Ok(ReprColumnType {
551            scalar_type,
552            nullable,
553        })
554    }
555}
556
557impl From<&SqlColumnType> for ReprColumnType {
558    fn from(sql_column_type: &SqlColumnType) -> Self {
559        let scalar_type = &sql_column_type.scalar_type;
560        let scalar_type = scalar_type.into();
561        let nullable = sql_column_type.nullable;
562
563        ReprColumnType {
564            scalar_type,
565            nullable,
566        }
567    }
568}
569
570impl SqlColumnType {
571    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
572    ///
573    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
574    pub fn from_repr(repr: &ReprColumnType) -> Self {
575        let scalar_type = &repr.scalar_type;
576        let scalar_type = SqlScalarType::from_repr(scalar_type);
577        let nullable = repr.nullable;
578
579        SqlColumnType {
580            scalar_type,
581            nullable,
582        }
583    }
584}
585
586/// The name of a column in a [`RelationDesc`].
587#[derive(
588    Clone,
589    Debug,
590    Eq,
591    PartialEq,
592    Ord,
593    PartialOrd,
594    Serialize,
595    Deserialize,
596    Hash,
597    MzReflect
598)]
599pub struct ColumnName(Box<str>);
600
601impl ColumnName {
602    /// Returns this column name as a `str`.
603    #[inline(always)]
604    pub fn as_str(&self) -> &str {
605        &*self
606    }
607
608    /// Returns this column name as a `&mut Box<str>`.
609    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
610        &mut self.0
611    }
612
613    /// Returns if this [`ColumnName`] is similar to the provided one.
614    pub fn is_similar(&self, other: &ColumnName) -> bool {
615        const SIMILARITY_THRESHOLD: f64 = 0.6;
616
617        let a_lowercase = self.to_lowercase();
618        let b_lowercase = other.to_lowercase();
619
620        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
621    }
622}
623
624impl std::ops::Deref for ColumnName {
625    type Target = str;
626
627    #[inline(always)]
628    fn deref(&self) -> &Self::Target {
629        &self.0
630    }
631}
632
633impl fmt::Display for ColumnName {
634    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
635        f.write_str(&self.0)
636    }
637}
638
639impl From<String> for ColumnName {
640    fn from(s: String) -> ColumnName {
641        ColumnName(s.into())
642    }
643}
644
645impl From<&str> for ColumnName {
646    fn from(s: &str) -> ColumnName {
647        ColumnName(s.into())
648    }
649}
650
651impl From<&ColumnName> for ColumnName {
652    fn from(n: &ColumnName) -> ColumnName {
653        n.clone()
654    }
655}
656
657impl RustType<ProtoColumnName> for ColumnName {
658    fn into_proto(&self) -> ProtoColumnName {
659        ProtoColumnName {
660            value: Some(self.0.to_string()),
661        }
662    }
663
664    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
665        Ok(ColumnName(
666            proto
667                .value
668                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
669                .into(),
670        ))
671    }
672}
673
674impl From<ColumnName> for mz_sql_parser::ast::Ident {
675    fn from(value: ColumnName) -> Self {
676        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
677        mz_sql_parser::ast::Ident::new_unchecked(value.0)
678    }
679}
680
681#[cfg(any(test, feature = "proptest"))]
682impl proptest::arbitrary::Arbitrary for ColumnName {
683    type Parameters = ();
684    type Strategy = BoxedStrategy<ColumnName>;
685
686    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
687        // Long column names are generally uninteresting, and can greatly
688        // increase the runtime for a test case, so bound the max length.
689        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
690        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
691            weights.extend([
692                (5, Just(16..128)),
693                (1, Just(128..1024)),
694                (1, Just(1024..4096)),
695            ]);
696        }
697        let name_length = Union::new_weighted(weights);
698
699        // Non-ASCII characters are also generally uninteresting and can make
700        // debugging harder.
701        let char_strat = Rc::new(Union::new_weighted(vec![
702            (50, proptest::char::range('A', 'z').boxed()),
703            (1, any::<char>().boxed()),
704        ]));
705
706        name_length
707            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
708            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
709            .no_shrink()
710            .boxed()
711    }
712}
713
714/// Default name of a column (when no other information is known).
715pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
716
717/// Stable index of a column in a [`RelationDesc`].
718#[derive(
719    Clone,
720    Copy,
721    Debug,
722    Eq,
723    PartialEq,
724    PartialOrd,
725    Ord,
726    Serialize,
727    Deserialize,
728    Hash,
729    MzReflect
730)]
731pub struct ColumnIndex(usize);
732
733#[cfg(any(test, feature = "proptest"))]
734static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
735
736impl ColumnIndex {
737    /// Returns a stable identifier for this [`ColumnIndex`].
738    pub fn to_stable_name(&self) -> String {
739        self.0.to_string()
740    }
741
742    pub fn to_raw(&self) -> usize {
743        self.0
744    }
745
746    pub fn from_raw(val: usize) -> Self {
747        ColumnIndex(val)
748    }
749}
750
751/// The version a given column was added at.
752#[derive(
753    Clone,
754    Copy,
755    Debug,
756    Eq,
757    PartialEq,
758    PartialOrd,
759    Ord,
760    Serialize,
761    Deserialize,
762    Hash,
763    MzReflect
764)]
765#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
766pub struct RelationVersion(u64);
767
768impl RelationVersion {
769    /// Returns the "root" or "initial" version of a [`RelationDesc`].
770    pub fn root() -> Self {
771        RelationVersion(0)
772    }
773
774    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
775    pub fn bump(&self) -> Self {
776        let next_version = self
777            .0
778            .checked_add(1)
779            .expect("added more than u64::MAX columns?");
780        RelationVersion(next_version)
781    }
782
783    /// Consume a [`RelationVersion`] returning the raw value.
784    ///
785    /// Should __only__ be used for serialization.
786    pub fn into_raw(self) -> u64 {
787        self.0
788    }
789
790    /// Create a [`RelationVersion`] from a raw value.
791    ///
792    /// Should __only__ be used for serialization.
793    pub fn from_raw(val: u64) -> RelationVersion {
794        RelationVersion(val)
795    }
796}
797
798impl From<RelationVersion> for SchemaId {
799    fn from(value: RelationVersion) -> Self {
800        SchemaId(usize::cast_from(value.0))
801    }
802}
803
804impl From<mz_sql_parser::ast::Version> for RelationVersion {
805    fn from(value: mz_sql_parser::ast::Version) -> Self {
806        RelationVersion(value.into_inner())
807    }
808}
809
810impl fmt::Display for RelationVersion {
811    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812        write!(f, "v{}", self.0)
813    }
814}
815
816impl From<RelationVersion> for mz_sql_parser::ast::Version {
817    fn from(value: RelationVersion) -> Self {
818        mz_sql_parser::ast::Version::new(value.0)
819    }
820}
821
822impl RustType<ProtoRelationVersion> for RelationVersion {
823    fn into_proto(&self) -> ProtoRelationVersion {
824        ProtoRelationVersion { value: self.0 }
825    }
826
827    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
828        Ok(RelationVersion(proto.value))
829    }
830}
831
832/// Semantic type annotation for a column in a builtin catalog relation.
833///
834/// These are compile-time metadata used by the catalog ontology layer to
835/// describe the meaning of a column (e.g., that it contains a catalog item ID
836/// or a role ID). Possible values correspond to the entries in
837/// `SEMANTIC_TYPE_DEFS` in the `mz-catalog` crate.
838#[derive(
839    Clone,
840    Copy,
841    Debug,
842    PartialEq,
843    Eq,
844    PartialOrd,
845    Ord,
846    Hash,
847    serde::Serialize
848)]
849pub enum SemanticType {
850    CatalogItemId,
851    GlobalId,
852    ClusterId,
853    ReplicaId,
854    SchemaId,
855    DatabaseId,
856    RoleId,
857    NetworkPolicyId,
858    ShardId,
859    OID,
860    ObjectType,
861    ConnectionType,
862    SourceType,
863    MzTimestamp,
864    WallclockTimestamp,
865    ByteCount,
866    RecordCount,
867    CreditRate,
868    SqlDefinition,
869    RedactedSqlDefinition,
870}
871
872impl fmt::Display for SemanticType {
873    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
874        let s = match self {
875            SemanticType::CatalogItemId => "CatalogItemId",
876            SemanticType::GlobalId => "GlobalId",
877            SemanticType::ClusterId => "ClusterId",
878            SemanticType::ReplicaId => "ReplicaId",
879            SemanticType::SchemaId => "SchemaId",
880            SemanticType::DatabaseId => "DatabaseId",
881            SemanticType::RoleId => "RoleId",
882            SemanticType::NetworkPolicyId => "NetworkPolicyId",
883            SemanticType::ShardId => "ShardId",
884            SemanticType::OID => "OID",
885            SemanticType::ObjectType => "ObjectType",
886            SemanticType::ConnectionType => "ConnectionType",
887            SemanticType::SourceType => "SourceType",
888            SemanticType::MzTimestamp => "MzTimestamp",
889            SemanticType::WallclockTimestamp => "WallclockTimestamp",
890            SemanticType::ByteCount => "ByteCount",
891            SemanticType::RecordCount => "RecordCount",
892            SemanticType::CreditRate => "CreditRate",
893            SemanticType::SqlDefinition => "SqlDefinition",
894            SemanticType::RedactedSqlDefinition => "RedactedSqlDefinition",
895        };
896        f.write_str(s)
897    }
898}
899
900/// Metadata (other than type) for a column in a [`RelationDesc`].
901#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
902struct ColumnMetadata {
903    /// Name of the column.
904    name: ColumnName,
905    /// Index into a [`SqlRelationType`] for this column.
906    typ_idx: usize,
907    /// Version this column was added at.
908    added: RelationVersion,
909    /// Version this column was dropped at.
910    dropped: Option<RelationVersion>,
911}
912
913/// A description of the shape of a relation.
914///
915/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
916/// the relation.
917///
918/// # Examples
919///
920/// A `RelationDesc`s is typically constructed via its builder API:
921///
922/// ```
923/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
924///
925/// let desc = RelationDesc::builder()
926///     .with_column("id", SqlScalarType::Int64.nullable(false))
927///     .with_column("price", SqlScalarType::Float64.nullable(true))
928///     .finish();
929/// ```
930///
931/// In more complicated cases, like when constructing a `RelationDesc` in
932/// response to user input, it may be more convenient to construct a relation
933/// type first, and imbue it with column names to form a `RelationDesc` later:
934///
935/// ```
936/// use mz_repr::RelationDesc;
937///
938/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
939/// let relation_type = plan_query("SELECT * FROM table");
940/// let names = (0..relation_type.arity()).map(|i| match i {
941///     0 => "first",
942///     1 => "second",
943///     _ => "unknown",
944/// });
945/// let desc = RelationDesc::new(relation_type, names);
946/// ```
947///
948/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
949/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
950/// column throughout the lifetime of the relation. This allows a
951/// [`RelationDesc`] to represent a projection over a version of itself.
952///
953/// ```
954/// use std::collections::BTreeSet;
955/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
956///
957/// let desc = RelationDesc::builder()
958///     .with_column("name", SqlScalarType::String.nullable(false))
959///     .with_column("email", SqlScalarType::String.nullable(false))
960///     .finish();
961///
962/// // Project away the second column.
963/// let demands = BTreeSet::from([1]);
964/// let proj = desc.apply_demand(&demands);
965///
966/// // We projected away the first column.
967/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
968/// // But retained the second.
969/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
970///
971/// // The underlying `SqlRelationType` also contains a single column.
972/// assert_eq!(proj.typ().arity(), 1);
973/// ```
974///
975/// To maintain this stable mapping and track the lifetime of a column (e.g.
976/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
977/// the index in [`SqlRelationType`] that corresponds to a given column, and the
978/// version at which this column was added or dropped.
979///
980#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, MzReflect)]
981pub struct RelationDesc {
982    typ: SqlRelationType,
983    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
984}
985
986impl RustType<ProtoRelationDesc> for RelationDesc {
987    fn into_proto(&self) -> ProtoRelationDesc {
988        let (names, metadata): (Vec<_>, Vec<_>) = self
989            .metadata
990            .values()
991            .map(|meta| {
992                let metadata = ProtoColumnMetadata {
993                    added: Some(meta.added.into_proto()),
994                    dropped: meta.dropped.map(|v| v.into_proto()),
995                };
996                (meta.name.into_proto(), metadata)
997            })
998            .unzip();
999
1000        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
1001        // metadata field was added. To make sure our serialization roundtrips the same as before
1002        // we added the field, we omit `metadata` if all of the values are equal to the default.
1003        //
1004        // Note: This logic needs to exist approximately forever.
1005        let is_all_default_metadata = metadata.iter().all(|meta| {
1006            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
1007        });
1008        let metadata = if is_all_default_metadata {
1009            Vec::new()
1010        } else {
1011            metadata
1012        };
1013
1014        ProtoRelationDesc {
1015            typ: Some(self.typ.into_proto()),
1016            names,
1017            metadata,
1018        }
1019    }
1020
1021    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
1022        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
1023        // metadata field was added. If the field doesn't exist we fill it in with default values,
1024        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
1025        //
1026        // Note: This logic needs to exist approximately forever.
1027        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
1028            let val = ProtoColumnMetadata {
1029                added: Some(RelationVersion::root().into_proto()),
1030                dropped: None,
1031            };
1032            Box::new(itertools::repeat_n(val, proto.names.len()))
1033        } else {
1034            Box::new(proto.metadata.into_iter())
1035        };
1036
1037        let metadata = proto
1038            .names
1039            .into_iter()
1040            .zip_eq(proto_metadata)
1041            .enumerate()
1042            .map(|(idx, (name, metadata))| {
1043                let meta = ColumnMetadata {
1044                    name: name.into_rust()?,
1045                    typ_idx: idx,
1046                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
1047                    dropped: metadata.dropped.into_rust()?,
1048                };
1049                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
1050            })
1051            .collect::<Result<_, _>>()?;
1052
1053        Ok(RelationDesc {
1054            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
1055            metadata,
1056        })
1057    }
1058}
1059
1060impl RelationDesc {
1061    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
1062    pub fn builder() -> RelationDescBuilder {
1063        RelationDescBuilder::default()
1064    }
1065
1066    /// Constructs a new `RelationDesc` that represents the empty relation
1067    /// with no columns and no keys.
1068    pub fn empty() -> Self {
1069        RelationDesc {
1070            typ: SqlRelationType::empty(),
1071            metadata: BTreeMap::default(),
1072        }
1073    }
1074
1075    /// Check if the `RelationDesc` is empty.
1076    pub fn is_empty(&self) -> bool {
1077        self == &Self::empty()
1078    }
1079
1080    /// Returns the number of columns in this [`RelationDesc`].
1081    pub fn len(&self) -> usize {
1082        self.typ().column_types.len()
1083    }
1084
1085    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
1086    /// over column names.
1087    ///
1088    /// # Panics
1089    ///
1090    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
1091    /// items in `names`.
1092    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1093    where
1094        I: IntoIterator<Item = N>,
1095        N: Into<ColumnName>,
1096    {
1097        let metadata: BTreeMap<_, _> = names
1098            .into_iter()
1099            .enumerate()
1100            .map(|(idx, name)| {
1101                let col_idx = ColumnIndex(idx);
1102                let metadata = ColumnMetadata {
1103                    name: name.into(),
1104                    typ_idx: idx,
1105                    added: RelationVersion::root(),
1106                    dropped: None,
1107                };
1108                (col_idx, metadata)
1109            })
1110            .collect();
1111
1112        // TODO(parkmycar): Add better validation here.
1113        assert_eq!(typ.column_types.len(), metadata.len());
1114
1115        RelationDesc { typ, metadata }
1116    }
1117
1118    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1119    where
1120        I: IntoIterator<Item = (N, T)>,
1121        T: Into<SqlColumnType>,
1122        N: Into<ColumnName>,
1123    {
1124        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1125        let types = types.into_iter().map(Into::into).collect();
1126        let typ = SqlRelationType::new(types);
1127        Self::new(typ, names)
1128    }
1129
1130    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
1131    ///
1132    /// # Panics
1133    ///
1134    /// Panics if either `self` or `other` have columns that were added at a
1135    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1136    /// columns were dropped.
1137    ///
1138    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
1139    pub fn concat(mut self, other: Self) -> Self {
1140        let self_len = self.typ.column_types.len();
1141
1142        for (typ, (_col_idx, meta)) in other.typ.column_types.into_iter().zip_eq(other.metadata) {
1143            assert_eq!(meta.added, RelationVersion::root());
1144            assert_none!(meta.dropped);
1145
1146            let new_idx = self.typ.columns().len();
1147            let new_meta = ColumnMetadata {
1148                name: meta.name,
1149                typ_idx: new_idx,
1150                added: RelationVersion::root(),
1151                dropped: None,
1152            };
1153
1154            self.typ.column_types.push(typ);
1155            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1156
1157            assert_eq!(self.metadata.len(), self.typ.columns().len());
1158            assert_none!(prev);
1159        }
1160
1161        for k in other.typ.keys {
1162            let k = k.into_iter().map(|idx| idx + self_len).collect();
1163            self = self.with_key(k);
1164        }
1165        self
1166    }
1167
1168    /// Adds a new key for the relation.
1169    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1170        self.typ = self.typ.with_key(indices);
1171        self
1172    }
1173
1174    /// Drops all existing keys.
1175    pub fn without_keys(mut self) -> Self {
1176        self.typ.keys.clear();
1177        self
1178    }
1179
1180    /// Builds a new relation description with the column names replaced with
1181    /// new names.
1182    ///
1183    /// # Panics
1184    ///
1185    /// Panics if the arity of the relation type does not match the number of
1186    /// items in `names`.
1187    pub fn with_names<I, N>(self, names: I) -> Self
1188    where
1189        I: IntoIterator<Item = N>,
1190        N: Into<ColumnName>,
1191    {
1192        Self::new(self.typ, names)
1193    }
1194
1195    /// Computes the number of columns in the relation.
1196    pub fn arity(&self) -> usize {
1197        self.typ.arity()
1198    }
1199
1200    /// Returns the relation type underlying this relation description.
1201    pub fn typ(&self) -> &SqlRelationType {
1202        &self.typ
1203    }
1204
1205    /// Returns the owned relation type underlying this relation description.
1206    pub fn into_typ(self) -> SqlRelationType {
1207        self.typ
1208    }
1209
1210    /// Returns an iterator over the columns in this relation.
1211    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1212        self.metadata.values().map(|meta| {
1213            let typ = &self.typ.columns()[meta.typ_idx];
1214            (&meta.name, typ)
1215        })
1216    }
1217
1218    /// Returns an iterator over the types of the columns in this relation.
1219    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1220        self.typ.column_types.iter()
1221    }
1222
1223    /// Returns an iterator over the names of the columns in this relation.
1224    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1225        self.metadata.values().map(|meta| &meta.name)
1226    }
1227
1228    /// Returns an iterator over the columns in this relation, with all their metadata.
1229    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1230        self.metadata.iter().map(|(col_idx, metadata)| {
1231            let col_typ = &self.typ.columns()[metadata.typ_idx];
1232            (col_idx, &metadata.name, col_typ)
1233        })
1234    }
1235
1236    /// Returns an iterator over the names of the columns in this relation that are "similar" to
1237    /// the provided `name`.
1238    pub fn iter_similar_names<'a>(
1239        &'a self,
1240        name: &'a ColumnName,
1241    ) -> impl Iterator<Item = &'a ColumnName> {
1242        self.iter_names().filter(|n| n.is_similar(name))
1243    }
1244
1245    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
1246    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1247        self.metadata.contains_key(idx)
1248    }
1249
1250    /// Finds a column by name.
1251    ///
1252    /// Returns the index and type of the column named `name`. If no column with
1253    /// the specified name exists, returns `None`. If multiple columns have the
1254    /// specified name, the leftmost column is returned.
1255    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1256        self.iter_names()
1257            .position(|n| n == name)
1258            .map(|i| (i, &self.typ.column_types[i]))
1259    }
1260
1261    /// Gets the name of the `i`th column.
1262    ///
1263    /// # Panics
1264    ///
1265    /// Panics if `i` is not a valid column index.
1266    ///
1267    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
1268    pub fn get_name(&self, i: usize) -> &ColumnName {
1269        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1270        self.get_name_idx(&ColumnIndex(i))
1271    }
1272
1273    /// Gets the name of the column at `idx`.
1274    ///
1275    /// # Panics
1276    ///
1277    /// Panics if no column exists at `idx`.
1278    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1279        &self.metadata.get(idx).expect("should exist").name
1280    }
1281
1282    /// Mutably gets the name of the `i`th column.
1283    ///
1284    /// # Panics
1285    ///
1286    /// Panics if `i` is not a valid column index.
1287    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1288        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1289        &mut self
1290            .metadata
1291            .get_mut(&ColumnIndex(i))
1292            .expect("should exist")
1293            .name
1294    }
1295
1296    /// Gets the [`SqlColumnType`] of the column at `idx`.
1297    ///
1298    /// # Panics
1299    ///
1300    /// Panics if no column exists at `idx`.
1301    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1302        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1303        &self.typ.column_types[typ_idx]
1304    }
1305
1306    /// Gets the name of the `i`th column if that column name is unambiguous.
1307    ///
1308    /// If at least one other column has the same name as the `i`th column,
1309    /// returns `None`. If the `i`th column has no name, returns `None`.
1310    ///
1311    /// # Panics
1312    ///
1313    /// Panics if `i` is not a valid column index.
1314    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1315        let name = self.get_name(i);
1316        if self.iter_names().filter(|n| *n == name).count() == 1 {
1317            Some(name)
1318        } else {
1319            None
1320        }
1321    }
1322
1323    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1324    ///
1325    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1326    /// structure will be simple to extend.
1327    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1328        let name = self.get_name(i);
1329        let typ = &self.typ.column_types[i];
1330        if d == &Datum::Null && !typ.nullable {
1331            Err(NotNullViolation(name.clone()))
1332        } else {
1333            Ok(())
1334        }
1335    }
1336
1337    /// Computes the differences between two [`RelationDesc`]s.
1338    ///
1339    /// Returns a rich diff describing which columns differ, and in what way.
1340    ///
1341    /// # Panics
1342    ///
1343    /// Panics if either `self` or `other` have columns that were added at a
1344    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1345    /// columns were dropped.
1346    ///
1347    /// This simplifies things by allowing us to assume that `ColumnIndex`es are
1348    /// dense and that they match the indexes of `typ.columns()`. Without this
1349    /// we would, e.g., struggle comparing keys as those are in terms of
1350    /// `typ.columns()` indexes.
1351    pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1352        assert_eq!(self.metadata.len(), self.typ.columns().len());
1353        assert_eq!(other.metadata.len(), other.typ.columns().len());
1354        for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1355            assert_eq!(meta.typ_idx, idx.0);
1356            assert_eq!(meta.added, RelationVersion::root());
1357            assert_none!(meta.dropped);
1358        }
1359
1360        let mut column_diffs = BTreeMap::new();
1361        let mut key_diff = None;
1362
1363        let left_arity = self.arity();
1364        let right_arity = other.arity();
1365        let common_arity = std::cmp::min(left_arity, right_arity);
1366
1367        for idx in 0..common_arity {
1368            let left_name = self.get_name(idx);
1369            let right_name = other.get_name(idx);
1370            let left_type = &self.typ.column_types[idx];
1371            let right_type = &other.typ.column_types[idx];
1372
1373            if left_name != right_name {
1374                let diff = ColumnDiff::NameMismatch {
1375                    left: left_name.clone(),
1376                    right: right_name.clone(),
1377                };
1378                column_diffs.insert(idx, diff);
1379            } else if left_type.scalar_type != right_type.scalar_type {
1380                let diff = ColumnDiff::TypeMismatch {
1381                    name: left_name.clone(),
1382                    left: left_type.scalar_type.clone(),
1383                    right: right_type.scalar_type.clone(),
1384                };
1385                column_diffs.insert(idx, diff);
1386            } else if left_type.nullable != right_type.nullable {
1387                let diff = ColumnDiff::NullabilityMismatch {
1388                    name: left_name.clone(),
1389                    left: left_type.nullable,
1390                    right: right_type.nullable,
1391                };
1392                column_diffs.insert(idx, diff);
1393            }
1394        }
1395
1396        for idx in common_arity..left_arity {
1397            let diff = ColumnDiff::Missing {
1398                name: self.get_name(idx).clone(),
1399            };
1400            column_diffs.insert(idx, diff);
1401        }
1402
1403        for idx in common_arity..right_arity {
1404            let diff = ColumnDiff::Extra {
1405                name: other.get_name(idx).clone(),
1406            };
1407            column_diffs.insert(idx, diff);
1408        }
1409
1410        let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1411        let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1412        if left_keys != right_keys {
1413            let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1414                keys.iter()
1415                    .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1416                    .collect()
1417            };
1418            key_diff = Some(KeyDiff {
1419                left: column_names(self, left_keys),
1420                right: column_names(other, right_keys),
1421            });
1422        }
1423
1424        RelationDescDiff {
1425            column_diffs,
1426            key_diff,
1427        }
1428    }
1429
1430    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1431    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1432        let mut new_desc = self.clone();
1433
1434        // Update ColumnMetadata.
1435        let mut removed = 0;
1436        new_desc.metadata.retain(|idx, metadata| {
1437            let retain = demands.contains(&idx.0);
1438            if !retain {
1439                removed += 1;
1440            } else {
1441                metadata.typ_idx -= removed;
1442            }
1443            retain
1444        });
1445
1446        // Update SqlColumnType.
1447        let mut idx = 0;
1448        new_desc.typ.column_types.retain(|_| {
1449            let keep = demands.contains(&idx);
1450            idx += 1;
1451            keep
1452        });
1453
1454        new_desc
1455    }
1456}
1457
1458#[cfg(any(test, feature = "proptest"))]
1459impl Arbitrary for RelationDesc {
1460    type Parameters = ();
1461    type Strategy = BoxedStrategy<RelationDesc>;
1462
1463    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1464        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1465        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1466            weights.extend([
1467                (12, Just(16..32)),
1468                (6, Just(32..64)),
1469                (3, Just(64..128)),
1470                (1, Just(128..256)),
1471            ]);
1472        }
1473        let num_columns = Union::new_weighted(weights);
1474
1475        num_columns.prop_flat_map(arb_relation_desc).boxed()
1476    }
1477}
1478
1479/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1480/// within the range provided.
1481#[cfg(any(test, feature = "proptest"))]
1482pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1483    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1484        .prop_map(RelationDesc::from_names_and_types)
1485}
1486
1487/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1488#[cfg(any(test, feature = "proptest"))]
1489pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1490    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1491    mask.prop_map(move |mask| {
1492        let demands: BTreeSet<_> = mask
1493            .into_iter()
1494            .enumerate()
1495            .filter_map(|(idx, keep)| keep.then_some(idx))
1496            .collect();
1497        desc.apply_demand(&demands)
1498    })
1499}
1500
1501impl IntoIterator for RelationDesc {
1502    type Item = (ColumnName, SqlColumnType);
1503    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1504
1505    fn into_iter(self) -> Self::IntoIter {
1506        let iter = self
1507            .metadata
1508            .into_values()
1509            .zip_eq(self.typ.column_types)
1510            .map(|(meta, typ)| (meta.name, typ));
1511        Box::new(iter)
1512    }
1513}
1514
1515/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1516#[cfg(any(test, feature = "proptest"))]
1517pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1518    let datums: Vec<_> = desc
1519        .typ()
1520        .columns()
1521        .iter()
1522        .cloned()
1523        .map(arb_datum_for_column)
1524        .collect();
1525    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1526}
1527
1528/// Expression violated not-null constraint on named column
1529#[derive(Debug, PartialEq, Eq)]
1530pub struct NotNullViolation(pub ColumnName);
1531
1532impl fmt::Display for NotNullViolation {
1533    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1534        write!(
1535            f,
1536            "null value in column {} violates not-null constraint",
1537            self.0.quoted()
1538        )
1539    }
1540}
1541
1542/// The result of comparing two [`RelationDesc`]s.
1543#[derive(Debug, Clone, PartialEq, Eq)]
1544pub struct RelationDescDiff {
1545    /// Column differences, keyed by column index.
1546    pub column_diffs: BTreeMap<usize, ColumnDiff>,
1547    /// Key differences, if any.
1548    pub key_diff: Option<KeyDiff>,
1549}
1550
1551impl RelationDescDiff {
1552    /// Returns whether the diff contains any differences.
1553    pub fn is_empty(&self) -> bool {
1554        self.column_diffs.is_empty() && self.key_diff.is_none()
1555    }
1556}
1557
1558/// A difference in a column between two [`RelationDesc`]s.
1559#[derive(Debug, Clone, PartialEq, Eq)]
1560pub enum ColumnDiff {
1561    /// Column exists only in the left relation.
1562    Missing { name: ColumnName },
1563    /// Column exists only in the right relation.
1564    Extra { name: ColumnName },
1565    /// Columns have different types.
1566    TypeMismatch {
1567        name: ColumnName,
1568        left: SqlScalarType,
1569        right: SqlScalarType,
1570    },
1571    /// Columns have different nullability.
1572    NullabilityMismatch {
1573        name: ColumnName,
1574        left: bool,
1575        right: bool,
1576    },
1577    /// Columns have different names.
1578    NameMismatch { left: ColumnName, right: ColumnName },
1579}
1580
1581/// A difference in the keys of two [`RelationDesc`]s.
1582#[derive(Debug, Clone, PartialEq, Eq)]
1583pub struct KeyDiff {
1584    /// Keys of the left relation.
1585    pub left: BTreeSet<Vec<ColumnName>>,
1586    /// Keys of the right relation.
1587    pub right: BTreeSet<Vec<ColumnName>>,
1588}
1589
1590/// A builder for a [`RelationDesc`].
1591#[derive(Clone, Default, Debug, PartialEq, Eq)]
1592pub struct RelationDescBuilder {
1593    /// Columns of the relation.
1594    columns: Vec<(ColumnName, SqlColumnType)>,
1595    /// Sets of indices that are "keys" for the collection.
1596    keys: Vec<Vec<usize>>,
1597}
1598
1599impl RelationDescBuilder {
1600    /// Appends a column with the specified name and type.
1601    pub fn with_column<N: Into<ColumnName>>(
1602        mut self,
1603        name: N,
1604        ty: SqlColumnType,
1605    ) -> RelationDescBuilder {
1606        let name = name.into();
1607        self.columns.push((name, ty));
1608        self
1609    }
1610
1611    /// Appends the provided columns to the builder.
1612    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1613    where
1614        I: IntoIterator<Item = (N, T)>,
1615        T: Into<SqlColumnType>,
1616        N: Into<ColumnName>,
1617    {
1618        self.columns
1619            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1620        self
1621    }
1622
1623    /// Adds a new key for the relation.
1624    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1625        indices.sort_unstable();
1626        if !self.keys.contains(&indices) {
1627            self.keys.push(indices);
1628        }
1629        self
1630    }
1631
1632    /// Removes all previously inserted keys.
1633    pub fn without_keys(mut self) -> RelationDescBuilder {
1634        self.keys.clear();
1635        assert_eq!(self.keys.len(), 0);
1636        self
1637    }
1638
1639    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1640    pub fn concat(mut self, other: Self) -> Self {
1641        let self_len = self.columns.len();
1642
1643        self.columns.extend(other.columns);
1644        for k in other.keys {
1645            let k = k.into_iter().map(|idx| idx + self_len).collect();
1646            self = self.with_key(k);
1647        }
1648
1649        self
1650    }
1651
1652    /// Finish the builder, returning a [`RelationDesc`].
1653    pub fn finish(self) -> RelationDesc {
1654        let mut desc = RelationDesc::from_names_and_types(self.columns);
1655        desc.typ = desc.typ.with_keys(self.keys);
1656        desc
1657    }
1658}
1659
1660/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1661#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1662pub enum RelationVersionSelector {
1663    Specific(RelationVersion),
1664    Latest,
1665}
1666
1667impl RelationVersionSelector {
1668    pub fn specific(version: u64) -> Self {
1669        RelationVersionSelector::Specific(RelationVersion(version))
1670    }
1671}
1672
1673/// A wrapper around [`RelationDesc`] that provides an interface for adding
1674/// columns and generating new versions.
1675///
1676/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1677/// be great.
1678#[derive(Debug, Clone, Serialize)]
1679pub struct VersionedRelationDesc {
1680    inner: RelationDesc,
1681}
1682
1683impl VersionedRelationDesc {
1684    pub fn new(inner: RelationDesc) -> Self {
1685        VersionedRelationDesc { inner }
1686    }
1687
1688    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1689    ///
1690    /// # Panics
1691    ///
1692    /// * Panics if a column with `name` already exists that hasn't been dropped.
1693    ///
1694    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1695    #[must_use]
1696    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1697    where
1698        N: Into<ColumnName>,
1699        T: Into<SqlColumnType>,
1700    {
1701        let latest_version = self.latest_version();
1702        let new_version = latest_version.bump();
1703
1704        let name = name.into();
1705        let existing = self
1706            .inner
1707            .metadata
1708            .iter()
1709            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1710        if let Some(existing) = existing {
1711            panic!("column named '{name}' already exists! {existing:?}");
1712        }
1713
1714        let next_idx = self.inner.metadata.len();
1715        let col_meta = ColumnMetadata {
1716            name,
1717            typ_idx: next_idx,
1718            added: new_version,
1719            dropped: None,
1720        };
1721
1722        self.inner.typ.column_types.push(typ.into());
1723        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1724
1725        assert_none!(prev, "column index overlap!");
1726        self.validate();
1727
1728        new_version
1729    }
1730
1731    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1732    /// `name` drops the left-most one that hasn't already been dropped.
1733    ///
1734    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1735    ///
1736    /// # Panics
1737    ///
1738    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1739    #[must_use]
1740    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1741    where
1742        N: Into<ColumnName>,
1743    {
1744        let name = name.into();
1745        let latest_version = self.latest_version();
1746        let new_version = latest_version.bump();
1747
1748        let col = self
1749            .inner
1750            .metadata
1751            .values_mut()
1752            .find(|meta| meta.name == name && meta.dropped.is_none())
1753            .expect("column to exist");
1754
1755        // Make sure the column hadn't been previously dropped.
1756        assert_none!(col.dropped, "column was already dropped");
1757        col.dropped = Some(new_version);
1758
1759        // Make sure the column isn't being used as a key.
1760        let dropped_key = self
1761            .inner
1762            .typ
1763            .keys
1764            .iter()
1765            .any(|keys| keys.contains(&col.typ_idx));
1766        assert!(!dropped_key, "column being dropped was used as a key");
1767
1768        self.validate();
1769        new_version
1770    }
1771
1772    /// Returns the [`RelationDesc`] at the latest version.
1773    pub fn latest(&self) -> RelationDesc {
1774        self.inner.clone()
1775    }
1776
1777    /// Returns this [`RelationDesc`] at the specified version.
1778    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1779        // Get all of the changes from the start, up to whatever version was requested.
1780        let up_to_version = match version {
1781            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1782            RelationVersionSelector::Specific(v) => v,
1783        };
1784
1785        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1786            let added = meta.added <= up_to_version;
1787            let dropped = meta
1788                .dropped
1789                .map(|dropped_at| up_to_version >= dropped_at)
1790                .unwrap_or(false);
1791
1792            added && !dropped
1793        });
1794
1795        let mut column_types = Vec::new();
1796        let mut column_metas = BTreeMap::new();
1797
1798        // N.B. At this point we need to be careful because col_idx might not
1799        // equal typ_idx.
1800        //
1801        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1802        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1803        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1804        for (col_idx, meta) in valid_columns {
1805            let new_meta = ColumnMetadata {
1806                name: meta.name.clone(),
1807                typ_idx: column_types.len(),
1808                added: meta.added.clone(),
1809                dropped: meta.dropped.clone(),
1810            };
1811            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1812            column_metas.insert(*col_idx, new_meta);
1813        }
1814
1815        // Remap keys in case a column with an index less than that of a key was
1816        // dropped.
1817        //
1818        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1819        // keys and "b" was dropped.
1820        let keys = self
1821            .inner
1822            .typ
1823            .keys
1824            .iter()
1825            .map(|keys| {
1826                keys.iter()
1827                    .map(|key_idx| {
1828                        let metadata = column_metas
1829                            .get(&ColumnIndex(*key_idx))
1830                            .expect("found key for column that doesn't exist");
1831                        metadata.typ_idx
1832                    })
1833                    .collect()
1834            })
1835            .collect();
1836
1837        let relation_type = SqlRelationType { column_types, keys };
1838
1839        RelationDesc {
1840            typ: relation_type,
1841            metadata: column_metas,
1842        }
1843    }
1844
1845    pub fn latest_version(&self) -> RelationVersion {
1846        self.inner
1847            .metadata
1848            .values()
1849            // N.B. Dropped is always greater than added.
1850            .map(|meta| meta.dropped.unwrap_or(meta.added))
1851            .max()
1852            // If there aren't any columns we're implicitly the root version.
1853            .unwrap_or_else(RelationVersion::root)
1854    }
1855
1856    /// Validates internal contraints of the [`RelationDesc`] are correct.
1857    ///
1858    /// # Panics
1859    ///
1860    /// Panics if a constraint is not satisfied.
1861    fn validate(&self) {
1862        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1863            if desc.typ.column_types.len() != desc.metadata.len() {
1864                anyhow::bail!("mismatch between number of types and metadatas");
1865            }
1866
1867            for (col_idx, meta) in &desc.metadata {
1868                if col_idx.0 > desc.metadata.len() {
1869                    anyhow::bail!("column index out of bounds");
1870                }
1871                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1872                    anyhow::bail!("column was added after it was dropped?");
1873                }
1874                if desc.typ().columns().get(meta.typ_idx).is_none() {
1875                    anyhow::bail!("typ_idx incorrect");
1876                }
1877            }
1878
1879            for keys in &desc.typ.keys {
1880                for key in keys {
1881                    if *key >= desc.typ.column_types.len() {
1882                        anyhow::bail!("key index was out of bounds!");
1883                    }
1884                }
1885            }
1886
1887            let versions = desc
1888                .metadata
1889                .values()
1890                .map(|meta| meta.dropped.unwrap_or(meta.added));
1891            let mut max = 0;
1892            let mut sum = 0;
1893            for version in versions {
1894                max = std::cmp::max(max, version.0);
1895                sum += version.0;
1896            }
1897
1898            // Other than RelationVersion(0), we should never have duplicate
1899            // versions and they should always increase by 1. In other words, the
1900            // sum of all RelationVersions should be the sum of [0, max].
1901            //
1902            // N.B. n * (n + 1) / 2 = sum of [0, n]
1903            //
1904            // While I normally don't like tricks like this, it allows us to
1905            // validate that our column versions are correct in O(n) time and
1906            // without allocations.
1907            if sum != (max * (max + 1) / 2) {
1908                anyhow::bail!("there is a duplicate or missing relation version");
1909            }
1910
1911            Ok(())
1912        }
1913
1914        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1915    }
1916}
1917
1918/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1919/// exercise schema migrations.
1920#[derive(Debug)]
1921#[cfg(any(test, feature = "proptest"))]
1922pub enum PropRelationDescDiff {
1923    AddColumn {
1924        name: ColumnName,
1925        typ: SqlColumnType,
1926    },
1927    DropColumn {
1928        name: ColumnName,
1929    },
1930    ToggleNullability {
1931        name: ColumnName,
1932    },
1933    ChangeType {
1934        name: ColumnName,
1935        typ: SqlColumnType,
1936    },
1937}
1938
1939#[cfg(any(test, feature = "proptest"))]
1940impl PropRelationDescDiff {
1941    pub fn apply(self, desc: &mut RelationDesc) {
1942        match self {
1943            PropRelationDescDiff::AddColumn { name, typ } => {
1944                let new_idx = desc.metadata.len();
1945                let meta = ColumnMetadata {
1946                    name,
1947                    typ_idx: new_idx,
1948                    added: RelationVersion(0),
1949                    dropped: None,
1950                };
1951                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1952                desc.typ.column_types.push(typ);
1953
1954                assert_none!(prev);
1955                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1956            }
1957            PropRelationDescDiff::DropColumn { name } => {
1958                let next_version = desc
1959                    .metadata
1960                    .values()
1961                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1962                    .max()
1963                    .unwrap_or_else(RelationVersion::root)
1964                    .bump();
1965                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1966                else {
1967                    return;
1968                };
1969                if metadata.dropped.is_none() {
1970                    metadata.dropped = Some(next_version);
1971                }
1972            }
1973            PropRelationDescDiff::ToggleNullability { name } => {
1974                let Some((pos, _)) = desc.get_by_name(&name) else {
1975                    return;
1976                };
1977                let col_type = desc
1978                    .typ
1979                    .column_types
1980                    .get_mut(pos)
1981                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1982                col_type.nullable = !col_type.nullable;
1983            }
1984            PropRelationDescDiff::ChangeType { name, typ } => {
1985                let Some((pos, _)) = desc.get_by_name(&name) else {
1986                    return;
1987                };
1988                let col_type = desc
1989                    .typ
1990                    .column_types
1991                    .get_mut(pos)
1992                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1993                *col_type = typ;
1994            }
1995        }
1996    }
1997}
1998
1999/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
2000#[cfg(any(test, feature = "proptest"))]
2001pub fn arb_relation_desc_diff(
2002    source: &RelationDesc,
2003) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
2004    let source = Rc::new(source.clone());
2005    let num_source_columns = source.typ.columns().len();
2006
2007    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
2008    let add_columns_strat = num_add_columns
2009        .prop_flat_map(|num_columns| {
2010            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
2011        })
2012        .prop_map(|cols| {
2013            cols.into_iter()
2014                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
2015                .collect::<Vec<_>>()
2016        });
2017
2018    // If the source RelationDesc is empty there is nothing else to do.
2019    if num_source_columns == 0 {
2020        return add_columns_strat.boxed();
2021    }
2022
2023    let source_ = Rc::clone(&source);
2024    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2025        let mut set = BTreeSet::default();
2026        for _ in 0..num_columns {
2027            let col_idx = rng.random_range(0..num_source_columns);
2028            set.insert(source_.get_name(col_idx).clone());
2029        }
2030        set.into_iter()
2031            .map(|name| PropRelationDescDiff::DropColumn { name })
2032            .collect::<Vec<_>>()
2033    });
2034
2035    let source_ = Rc::clone(&source);
2036    let toggle_nullability_strat =
2037        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2038            let mut set = BTreeSet::default();
2039            for _ in 0..num_columns {
2040                let col_idx = rng.random_range(0..num_source_columns);
2041                set.insert(source_.get_name(col_idx).clone());
2042            }
2043            set.into_iter()
2044                .map(|name| PropRelationDescDiff::ToggleNullability { name })
2045                .collect::<Vec<_>>()
2046        });
2047
2048    let source_ = Rc::clone(&source);
2049    let change_type_strat = (0..num_source_columns)
2050        .prop_perturb(move |num_columns, mut rng| {
2051            let mut set = BTreeSet::default();
2052            for _ in 0..num_columns {
2053                let col_idx = rng.random_range(0..num_source_columns);
2054                set.insert(source_.get_name(col_idx).clone());
2055            }
2056            set
2057        })
2058        .prop_flat_map(|cols| {
2059            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
2060                .prop_map(move |types| (cols.clone(), types))
2061        })
2062        .prop_map(|(cols, types)| {
2063            cols.into_iter()
2064                .zip_eq(types)
2065                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
2066                .collect::<Vec<_>>()
2067        });
2068
2069    (
2070        add_columns_strat,
2071        drop_columns_strat,
2072        toggle_nullability_strat,
2073        change_type_strat,
2074    )
2075        .prop_map(|(adds, drops, toggles, changes)| {
2076            adds.into_iter()
2077                .chain(drops)
2078                .chain(toggles)
2079                .chain(changes)
2080                .collect::<Vec<_>>()
2081        })
2082        .prop_shuffle()
2083        .boxed()
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088    use super::*;
2089    use prost::Message;
2090
2091    #[mz_ore::test]
2092    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2093    fn smoktest_at_version() {
2094        let desc = RelationDesc::builder()
2095            .with_column("a", SqlScalarType::Bool.nullable(true))
2096            .with_column("z", SqlScalarType::String.nullable(false))
2097            .finish();
2098
2099        let mut versioned_desc = VersionedRelationDesc {
2100            inner: desc.clone(),
2101        };
2102        versioned_desc.validate();
2103
2104        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2105        assert_eq!(desc, latest);
2106
2107        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2108        assert_eq!(desc, v0);
2109
2110        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2111        assert_eq!(desc, v3);
2112
2113        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2114        assert_eq!(v1, RelationVersion(1));
2115
2116        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2117        insta::assert_json_snapshot!(v1.metadata, @r###"
2118        {
2119          "0": {
2120            "name": "a",
2121            "typ_idx": 0,
2122            "added": 0,
2123            "dropped": null
2124          },
2125          "1": {
2126            "name": "z",
2127            "typ_idx": 1,
2128            "added": 0,
2129            "dropped": null
2130          },
2131          "2": {
2132            "name": "b",
2133            "typ_idx": 2,
2134            "added": 1,
2135            "dropped": null
2136          }
2137        }
2138        "###);
2139
2140        // Check that V0 doesn't show the new column.
2141        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2142        assert!(v0.iter().eq(v0_b.iter()));
2143
2144        let v2 = versioned_desc.drop_column("z");
2145        assert_eq!(v2, RelationVersion(2));
2146
2147        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2148        insta::assert_json_snapshot!(v2.metadata, @r###"
2149        {
2150          "0": {
2151            "name": "a",
2152            "typ_idx": 0,
2153            "added": 0,
2154            "dropped": null
2155          },
2156          "2": {
2157            "name": "b",
2158            "typ_idx": 1,
2159            "added": 1,
2160            "dropped": null
2161          }
2162        }
2163        "###);
2164
2165        // Check that V0 and V1 are still correct.
2166        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2167        assert!(v0.iter().eq(v0_c.iter()));
2168
2169        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2170        assert!(v1.iter().eq(v1_b.iter()));
2171
2172        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2173        {
2174          "0": {
2175            "name": "a",
2176            "typ_idx": 0,
2177            "added": 0,
2178            "dropped": null
2179          },
2180          "1": {
2181            "name": "z",
2182            "typ_idx": 1,
2183            "added": 0,
2184            "dropped": 2
2185          },
2186          "2": {
2187            "name": "b",
2188            "typ_idx": 2,
2189            "added": 1,
2190            "dropped": null
2191          }
2192        }
2193        "###);
2194    }
2195
2196    #[mz_ore::test]
2197    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2198    fn test_dropping_columns_with_keys() {
2199        let desc = RelationDesc::builder()
2200            .with_column("a", SqlScalarType::Bool.nullable(true))
2201            .with_column("z", SqlScalarType::String.nullable(false))
2202            .with_key(vec![1])
2203            .finish();
2204
2205        let mut versioned_desc = VersionedRelationDesc {
2206            inner: desc.clone(),
2207        };
2208        versioned_desc.validate();
2209
2210        let v1 = versioned_desc.drop_column("a");
2211        assert_eq!(v1, RelationVersion(1));
2212
2213        // Make sure the key index for 'z' got remapped since 'a' was dropped.
2214        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2215        insta::assert_json_snapshot!(v1, @r###"
2216        {
2217          "typ": {
2218            "column_types": [
2219              {
2220                "scalar_type": "String",
2221                "nullable": false
2222              }
2223            ],
2224            "keys": [
2225              [
2226                0
2227              ]
2228            ]
2229          },
2230          "metadata": {
2231            "1": {
2232              "name": "z",
2233              "typ_idx": 0,
2234              "added": 0,
2235              "dropped": null
2236            }
2237          }
2238        }
2239        "###);
2240
2241        // Make sure the key index of 'z' is correct when all columns are present.
2242        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2243        insta::assert_json_snapshot!(v0, @r###"
2244        {
2245          "typ": {
2246            "column_types": [
2247              {
2248                "scalar_type": "Bool",
2249                "nullable": true
2250              },
2251              {
2252                "scalar_type": "String",
2253                "nullable": false
2254              }
2255            ],
2256            "keys": [
2257              [
2258                1
2259              ]
2260            ]
2261          },
2262          "metadata": {
2263            "0": {
2264              "name": "a",
2265              "typ_idx": 0,
2266              "added": 0,
2267              "dropped": 1
2268            },
2269            "1": {
2270              "name": "z",
2271              "typ_idx": 1,
2272              "added": 0,
2273              "dropped": null
2274            }
2275          }
2276        }
2277        "###);
2278    }
2279
2280    #[mz_ore::test]
2281    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2282    fn roundtrip_relation_desc_without_metadata() {
2283        let typ = ProtoRelationType {
2284            column_types: vec![
2285                SqlScalarType::String.nullable(false).into_proto(),
2286                SqlScalarType::Bool.nullable(true).into_proto(),
2287            ],
2288            keys: vec![],
2289        };
2290        let proto = ProtoRelationDesc {
2291            typ: Some(typ),
2292            names: vec![
2293                ColumnName("a".into()).into_proto(),
2294                ColumnName("b".into()).into_proto(),
2295            ],
2296            metadata: vec![],
2297        };
2298        let desc: RelationDesc = proto.into_rust().unwrap();
2299
2300        insta::assert_json_snapshot!(desc, @r###"
2301        {
2302          "typ": {
2303            "column_types": [
2304              {
2305                "scalar_type": "String",
2306                "nullable": false
2307              },
2308              {
2309                "scalar_type": "Bool",
2310                "nullable": true
2311              }
2312            ],
2313            "keys": []
2314          },
2315          "metadata": {
2316            "0": {
2317              "name": "a",
2318              "typ_idx": 0,
2319              "added": 0,
2320              "dropped": null
2321            },
2322            "1": {
2323              "name": "b",
2324              "typ_idx": 1,
2325              "added": 0,
2326              "dropped": null
2327            }
2328          }
2329        }
2330        "###);
2331    }
2332
2333    #[mz_ore::test]
2334    #[should_panic(expected = "column named 'a' already exists!")]
2335    fn test_add_column_with_same_name_panics() {
2336        let desc = RelationDesc::builder()
2337            .with_column("a", SqlScalarType::Bool.nullable(true))
2338            .finish();
2339        let mut versioned = VersionedRelationDesc::new(desc);
2340
2341        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2342    }
2343
2344    #[mz_ore::test]
2345    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2346    fn test_add_column_with_same_name_prev_dropped() {
2347        let desc = RelationDesc::builder()
2348            .with_column("a", SqlScalarType::Bool.nullable(true))
2349            .finish();
2350        let mut versioned = VersionedRelationDesc::new(desc);
2351
2352        let v1 = versioned.drop_column("a");
2353        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2354        insta::assert_json_snapshot!(v1, @r###"
2355        {
2356          "typ": {
2357            "column_types": [],
2358            "keys": []
2359          },
2360          "metadata": {}
2361        }
2362        "###);
2363
2364        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2365        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2366        insta::assert_json_snapshot!(v2, @r###"
2367        {
2368          "typ": {
2369            "column_types": [
2370              {
2371                "scalar_type": "String",
2372                "nullable": false
2373              }
2374            ],
2375            "keys": []
2376          },
2377          "metadata": {
2378            "1": {
2379              "name": "a",
2380              "typ_idx": 0,
2381              "added": 2,
2382              "dropped": null
2383            }
2384          }
2385        }
2386        "###);
2387    }
2388
2389    #[mz_ore::test]
2390    #[cfg_attr(miri, ignore)]
2391    fn apply_demand() {
2392        let desc = RelationDesc::builder()
2393            .with_column("a", SqlScalarType::String.nullable(true))
2394            .with_column("b", SqlScalarType::Int64.nullable(false))
2395            .with_column("c", SqlScalarType::Time.nullable(false))
2396            .finish();
2397        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2398        assert_eq!(desc.arity(), 2);
2399        // TODO(parkmycar): Move validate onto RelationDesc.
2400        VersionedRelationDesc::new(desc).validate();
2401    }
2402
2403    #[mz_ore::test]
2404    #[cfg_attr(miri, ignore)]
2405    fn smoketest_column_index_stable_ident() {
2406        let idx_a = ColumnIndex(42);
2407        // Note(parkmycar): This should never change.
2408        assert_eq!(idx_a.to_stable_name(), "42");
2409    }
2410
2411    #[mz_ore::test]
2412    #[cfg_attr(miri, ignore)] // too slow
2413    fn proptest_relation_desc_roundtrips() {
2414        fn testcase(og: RelationDesc) {
2415            let bytes = og.into_proto().encode_to_vec();
2416            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2417            let rnd = RelationDesc::from_proto(proto).unwrap();
2418
2419            assert_eq!(og, rnd);
2420        }
2421
2422        proptest!(|(desc in any::<RelationDesc>())| {
2423            testcase(desc);
2424        });
2425
2426        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2427            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2428        });
2429
2430        proptest!(|((mut desc, diffs) in strat)| {
2431            for diff in diffs {
2432                diff.apply(&mut desc);
2433            };
2434            testcase(desc);
2435        });
2436    }
2437}