Skip to main content

mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11#[cfg(any(test, feature = "proptest"))]
12use std::rc::Rc;
13use std::{fmt, vec};
14
15use anyhow::bail;
16use itertools::Itertools;
17use mz_lowertest::MzReflect;
18use mz_ore::cast::CastFrom;
19use mz_ore::soft_panic_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::{assert_none, assert_ok};
22use mz_persist_types::schema::SchemaId;
23use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
24#[cfg(any(test, feature = "proptest"))]
25use proptest::prelude::*;
26#[cfg(any(test, feature = "proptest"))]
27use proptest::strategy::{Strategy, Union};
28#[cfg(any(test, feature = "proptest"))]
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32#[cfg(any(test, feature = "proptest"))]
33use crate::Row;
34#[cfg(any(test, feature = "proptest"))]
35use crate::arb_datum_for_column;
36use crate::relation_and_scalar::proto_relation_type::ProtoKey;
37pub use crate::relation_and_scalar::{
38    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
39    ProtoRelationVersion,
40};
41use crate::{Datum, ReprScalarType, SqlScalarType};
42
43/// The type of a [`Datum`].
44///
45/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
46/// Int32 or String) with its nullability.
47///
48/// To construct a column type, either initialize the struct directly, or
49/// use the [`SqlScalarType::nullable`] method.
50#[derive(
51    Clone,
52    Debug,
53    Eq,
54    PartialEq,
55    Ord,
56    PartialOrd,
57    Serialize,
58    Deserialize,
59    Hash,
60    MzReflect
61)]
62#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
63pub struct SqlColumnType {
64    /// The underlying scalar type (e.g., Int32 or String) of this column.
65    pub scalar_type: SqlScalarType,
66    /// Whether this datum can be null.
67    #[serde(default = "return_true")]
68    pub nullable: bool,
69}
70
71/// This method exists solely for the purpose of making SqlColumnType nullable by
72/// default in unit tests. The default value of a bool is false, and the only
73/// way to make an object take on any other value by default is to pass it a
74/// function that returns the desired default value. See
75/// <https://github.com/serde-rs/serde/issues/1030>
76#[inline(always)]
77fn return_true() -> bool {
78    true
79}
80
81impl SqlColumnType {
82    /// Compute the least upper bound of many column types, returning an error on
83    /// incompatible types or an empty iterator.
84    /// See [`SqlColumnType::try_union`] for details.
85    pub fn try_union_many<'a>(
86        typs: impl IntoIterator<Item = &'a Self>,
87    ) -> Result<Self, anyhow::Error> {
88        let mut iter = typs.into_iter();
89        let Some(typ) = iter.next() else {
90            bail!("Cannot union empty iterator");
91        };
92        iter.try_fold(typ.clone(), |a, b| a.try_union(b))
93    }
94
95    /// Compute the least upper bound of many column types.
96    /// See [`SqlColumnType::try_union`] for details.
97    ///
98    /// Panics on incompatible types or an empty iterator.
99    pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
100        Self::try_union_many(typs).expect("Cannot union empty iterator")
101    }
102
103    /// Backports nullability information from `backport_typ` into `self`,
104    /// affecting the outer `.nullable` field but also record fields deeper
105    /// into the type.
106    pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
107        self.scalar_type
108            .backport_nullability(&backport_typ.scalar_type);
109        self.nullable = backport_typ.nullable;
110    }
111
112    /// Compute the least upper bound of two column types at the SQL level.
113    ///
114    /// Two types are compatible when they are equal, share the same base type
115    /// (differing only in modifiers), or are records with pairwise-compatible
116    /// fields.
117    /// The resulting nullability is the disjunction of the two input
118    /// nullabilities.
119    ///
120    /// Returns an error for incompatible types, e.g. `Text` and `Int32`, or
121    /// `Text` and `VarChar` (different base types at the SQL level).
122    /// See [`SqlColumnType::try_union`] for a fallback that handles the latter
123    /// case via repr-level union.
124    pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
125        match (&self.scalar_type, &other.scalar_type) {
126            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
127                Ok(SqlColumnType {
128                    scalar_type: scalar_type.clone(),
129                    nullable: self.nullable || other.nullable,
130                })
131            }
132            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
133                Ok(SqlColumnType {
134                    scalar_type: scalar_type.without_modifiers(),
135                    nullable: self.nullable || other.nullable,
136                })
137            }
138            (
139                SqlScalarType::Record { fields, custom_id },
140                SqlScalarType::Record {
141                    fields: other_fields,
142                    custom_id: other_custom_id,
143                },
144            ) => {
145                if custom_id != other_custom_id {
146                    bail!(
147                        "Can't union types: {:?} and {:?}",
148                        self.scalar_type,
149                        other.scalar_type
150                    );
151                };
152
153                if fields.len() != other_fields.len() {
154                    bail!(
155                        "Can't union types: {:?} and {:?}",
156                        self.scalar_type,
157                        other.scalar_type
158                    );
159                }
160                let mut union_fields = Vec::with_capacity(fields.len());
161                for ((name, typ), (other_name, other_typ)) in
162                    fields.iter().zip_eq(other_fields.iter())
163                {
164                    if name != other_name {
165                        bail!(
166                            "Can't union types: {:?} and {:?}",
167                            self.scalar_type,
168                            other.scalar_type
169                        );
170                    } else {
171                        let union_column_type = typ.sql_union(other_typ)?;
172                        union_fields.push((name.clone(), union_column_type));
173                    };
174                }
175
176                Ok(SqlColumnType {
177                    scalar_type: SqlScalarType::Record {
178                        fields: union_fields.into(),
179                        custom_id: *custom_id,
180                    },
181                    nullable: self.nullable || other.nullable,
182                })
183            }
184            _ => bail!(
185                "Can't union types: {:?} and {:?}",
186                self.scalar_type,
187                other.scalar_type
188            ),
189        }
190    }
191
192    /// Compute the least upper bound of two column types.
193    ///
194    /// Attempts [`SqlColumnType::sql_union`] first, which preserves SQL-level type
195    /// information (e.g. modifiers). Falls back to a repr-level union via
196    /// [`ReprColumnType::union`] when the SQL types are incompatible but the
197    /// underlying repr types are compatible.
198    ///
199    /// The resulting nullability is the disjunction of the two input
200    /// nullabilities.
201    pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
202        self.sql_union(other).or_else(|e| {
203            let repr_self = ReprColumnType::from(self);
204            let repr_other = ReprColumnType::from(other);
205            match repr_self.union(&repr_other) {
206                Ok(typ) => {
207                    // sql_union failed but repr union succeeded — this indicates
208                    // a repr-type canonicalization gap that we want CI visibility for.
209                    soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
210                    Ok(SqlColumnType::from_repr(&typ))
211                }
212                Err(_) => {
213                    // Both sql_union and repr union failed — genuine type mismatch,
214                    // not a canonicalization issue. Just propagate the original error.
215                    Err(e)
216                }
217            }
218        })
219    }
220
221    /// Compute the least upper bound of two column types.
222    /// See [`SqlColumnType::try_union`] for details.
223    ///
224    /// Panics on incompatible types.
225    pub fn union(&self, other: &Self) -> Self {
226        self.try_union(other).unwrap_or_else(|e| {
227            panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
228        })
229    }
230
231    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
232    /// nullability set to the specified boolean.
233    pub fn nullable(mut self, nullable: bool) -> Self {
234        self.nullable = nullable;
235        self
236    }
237}
238
239impl RustType<ProtoColumnType> for SqlColumnType {
240    fn into_proto(&self) -> ProtoColumnType {
241        ProtoColumnType {
242            nullable: self.nullable,
243            scalar_type: Some(self.scalar_type.into_proto()),
244        }
245    }
246
247    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
248        Ok(SqlColumnType {
249            nullable: proto.nullable,
250            scalar_type: proto
251                .scalar_type
252                .into_rust_if_some("ProtoColumnType::scalar_type")?,
253        })
254    }
255}
256
257impl fmt::Display for SqlColumnType {
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        let nullable = if self.nullable { "Null" } else { "NotNull" };
260        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
261    }
262}
263
264/// The type of a relation.
265#[derive(
266    Clone,
267    Debug,
268    Eq,
269    PartialEq,
270    Ord,
271    PartialOrd,
272    Serialize,
273    Deserialize,
274    Hash,
275    MzReflect
276)]
277#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
278pub struct SqlRelationType {
279    /// The type for each column, in order.
280    pub column_types: Vec<SqlColumnType>,
281    /// Sets of indices that are "keys" for the collection.
282    ///
283    /// Each element in this list is a set of column indices, each with the
284    /// property that the collection contains at most one record with each
285    /// distinct set of values for each column. Alternately, for a specific set
286    /// of values assigned to the these columns there is at most one record.
287    ///
288    /// A collection can contain multiple sets of keys, although it is common to
289    /// have either zero or one sets of key indices.
290    #[serde(default)]
291    pub keys: Vec<Vec<usize>>,
292}
293
294impl SqlRelationType {
295    /// Constructs a `SqlRelationType` representing the relation with no columns and
296    /// no keys.
297    pub fn empty() -> Self {
298        SqlRelationType::new(vec![])
299    }
300
301    /// Constructs a new `SqlRelationType` from specified column types.
302    ///
303    /// The `SqlRelationType` will have no keys.
304    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
305        SqlRelationType {
306            column_types,
307            keys: Vec::new(),
308        }
309    }
310
311    /// Adds a new key for the relation.
312    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
313        indices.sort_unstable();
314        if !self.keys.contains(&indices) {
315            self.keys.push(indices);
316        }
317        self
318    }
319
320    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
321        for key in keys {
322            self = self.with_key(key)
323        }
324        self
325    }
326
327    /// Computes the number of columns in the relation.
328    pub fn arity(&self) -> usize {
329        self.column_types.len()
330    }
331
332    /// Gets the index of the columns used when creating a default index.
333    pub fn default_key(&self) -> Vec<usize> {
334        if let Some(key) = self.keys.first() {
335            if key.is_empty() {
336                (0..self.column_types.len()).collect()
337            } else {
338                key.clone()
339            }
340        } else {
341            (0..self.column_types.len()).collect()
342        }
343    }
344
345    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
346    pub fn columns(&self) -> &[SqlColumnType] {
347        &self.column_types
348    }
349
350    /// Adopts the nullability and keys from another `SqlRelationType`.
351    ///
352    /// Panics if the number of columns does not match.
353    pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
354        assert_eq!(
355            backport_typ.column_types.len(),
356            self.column_types.len(),
357            "HIR and MIR types should have the same number of columns"
358        );
359        for (backport_col, sql_col) in backport_typ
360            .column_types
361            .iter()
362            .zip_eq(self.column_types.iter_mut())
363        {
364            sql_col.backport_nullability(backport_col);
365        }
366
367        self.keys = backport_typ.keys.clone();
368    }
369
370    /// Constructs a `SqlRelationType` from a `ReprRelationType` by converting
371    /// each column type via [`SqlColumnType::from_repr`]. This is a lossy
372    /// inverse of `ReprRelationType::from(&SqlRelationType)`.
373    pub fn from_repr(repr: &ReprRelationType) -> Self {
374        SqlRelationType {
375            column_types: repr
376                .column_types
377                .iter()
378                .map(SqlColumnType::from_repr)
379                .collect(),
380            keys: repr.keys.clone(),
381        }
382    }
383}
384
385impl RustType<ProtoRelationType> for SqlRelationType {
386    fn into_proto(&self) -> ProtoRelationType {
387        ProtoRelationType {
388            column_types: self.column_types.into_proto(),
389            keys: self.keys.into_proto(),
390        }
391    }
392
393    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
394        Ok(SqlRelationType {
395            column_types: proto.column_types.into_rust()?,
396            keys: proto.keys.into_rust()?,
397        })
398    }
399}
400
401impl RustType<ProtoKey> for Vec<usize> {
402    fn into_proto(&self) -> ProtoKey {
403        ProtoKey {
404            keys: self.into_proto(),
405        }
406    }
407
408    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
409        proto.keys.into_rust()
410    }
411}
412
413/// The type of a relation.
414#[derive(
415    Clone,
416    Debug,
417    Eq,
418    PartialEq,
419    Ord,
420    PartialOrd,
421    Serialize,
422    Deserialize,
423    Hash,
424    MzReflect
425)]
426pub struct ReprRelationType {
427    /// The type for each column, in order.
428    pub column_types: Vec<ReprColumnType>,
429    /// Sets of indices that are "keys" for the collection.
430    ///
431    /// Each element in this list is a set of column indices, each with the
432    /// property that the collection contains at most one record with each
433    /// distinct set of values for each column. Alternately, for a specific set
434    /// of values assigned to the these columns there is at most one record.
435    ///
436    /// A collection can contain multiple sets of keys, although it is common to
437    /// have either zero or one sets of key indices.
438    #[serde(default)]
439    pub keys: Vec<Vec<usize>>,
440}
441
442impl ReprRelationType {
443    /// Constructs a `ReprRelationType` representing the relation with no columns and
444    /// no keys.
445    pub fn empty() -> Self {
446        ReprRelationType::new(vec![])
447    }
448
449    /// Constructs a new `ReprRelationType` from specified column types.
450    ///
451    /// The `ReprRelationType` will have no keys.
452    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
453        ReprRelationType {
454            column_types,
455            keys: Vec::new(),
456        }
457    }
458
459    /// Adds a new key for the relation.
460    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
461        indices.sort_unstable();
462        if !self.keys.contains(&indices) {
463            self.keys.push(indices);
464        }
465        self
466    }
467
468    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
469        for key in keys {
470            self = self.with_key(key)
471        }
472        self
473    }
474
475    /// Computes the number of columns in the relation.
476    pub fn arity(&self) -> usize {
477        self.column_types.len()
478    }
479
480    /// Gets the index of the columns used when creating a default index.
481    pub fn default_key(&self) -> Vec<usize> {
482        if let Some(key) = self.keys.first() {
483            if key.is_empty() {
484                (0..self.column_types.len()).collect()
485            } else {
486                key.clone()
487            }
488        } else {
489            (0..self.column_types.len()).collect()
490        }
491    }
492
493    /// Returns all the column types in order, for this relation.
494    pub fn columns(&self) -> &[ReprColumnType] {
495        &self.column_types
496    }
497}
498
499impl From<&SqlRelationType> for ReprRelationType {
500    fn from(sql_relation_type: &SqlRelationType) -> Self {
501        ReprRelationType {
502            column_types: sql_relation_type
503                .column_types
504                .iter()
505                .map(ReprColumnType::from)
506                .collect(),
507            keys: sql_relation_type.keys.clone(),
508        }
509    }
510}
511
512#[derive(
513    Clone,
514    Debug,
515    Eq,
516    PartialEq,
517    Ord,
518    PartialOrd,
519    Serialize,
520    Deserialize,
521    Hash,
522    MzReflect
523)]
524pub struct ReprColumnType {
525    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
526    pub scalar_type: ReprScalarType,
527    /// Whether this datum can be null.
528    #[serde(default = "return_true")]
529    pub nullable: bool,
530}
531
532impl std::fmt::Display for ReprColumnType {
533    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534        write!(f, "{}", self.scalar_type)?;
535        if self.nullable {
536            write!(f, "?")?;
537        }
538        Ok(())
539    }
540}
541
542impl ReprColumnType {
543    /// Compute the least upper bound of two column types at the repr level.
544    ///
545    /// More permissive than [`SqlColumnType::sql_union`] because it operates
546    /// on the underlying representation types, ignoring SQL-level distinctions
547    /// such as modifiers.
548    /// The resulting nullability is the disjunction of the two inputs.
549    pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
550        let scalar_type = self.scalar_type.union(&col.scalar_type)?;
551        let nullable = self.nullable || col.nullable;
552
553        Ok(ReprColumnType {
554            scalar_type,
555            nullable,
556        })
557    }
558}
559
560impl From<&SqlColumnType> for ReprColumnType {
561    fn from(sql_column_type: &SqlColumnType) -> Self {
562        let scalar_type = &sql_column_type.scalar_type;
563        let scalar_type = scalar_type.into();
564        let nullable = sql_column_type.nullable;
565
566        ReprColumnType {
567            scalar_type,
568            nullable,
569        }
570    }
571}
572
573impl SqlColumnType {
574    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
575    ///
576    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
577    pub fn from_repr(repr: &ReprColumnType) -> Self {
578        let scalar_type = &repr.scalar_type;
579        let scalar_type = SqlScalarType::from_repr(scalar_type);
580        let nullable = repr.nullable;
581
582        SqlColumnType {
583            scalar_type,
584            nullable,
585        }
586    }
587}
588
589/// The name of a column in a [`RelationDesc`].
590#[derive(
591    Clone,
592    Debug,
593    Eq,
594    PartialEq,
595    Ord,
596    PartialOrd,
597    Serialize,
598    Deserialize,
599    Hash,
600    MzReflect
601)]
602pub struct ColumnName(Box<str>);
603
604impl ColumnName {
605    /// Returns this column name as a `str`.
606    #[inline(always)]
607    pub fn as_str(&self) -> &str {
608        &*self
609    }
610
611    /// Returns this column name as a `&mut Box<str>`.
612    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
613        &mut self.0
614    }
615
616    /// Returns if this [`ColumnName`] is similar to the provided one.
617    pub fn is_similar(&self, other: &ColumnName) -> bool {
618        const SIMILARITY_THRESHOLD: f64 = 0.6;
619
620        let a_lowercase = self.to_lowercase();
621        let b_lowercase = other.to_lowercase();
622
623        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
624    }
625}
626
627impl std::ops::Deref for ColumnName {
628    type Target = str;
629
630    #[inline(always)]
631    fn deref(&self) -> &Self::Target {
632        &self.0
633    }
634}
635
636impl fmt::Display for ColumnName {
637    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
638        f.write_str(&self.0)
639    }
640}
641
642impl From<String> for ColumnName {
643    fn from(s: String) -> ColumnName {
644        ColumnName(s.into())
645    }
646}
647
648impl From<&str> for ColumnName {
649    fn from(s: &str) -> ColumnName {
650        ColumnName(s.into())
651    }
652}
653
654impl From<&ColumnName> for ColumnName {
655    fn from(n: &ColumnName) -> ColumnName {
656        n.clone()
657    }
658}
659
660impl RustType<ProtoColumnName> for ColumnName {
661    fn into_proto(&self) -> ProtoColumnName {
662        ProtoColumnName {
663            value: Some(self.0.to_string()),
664        }
665    }
666
667    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
668        Ok(ColumnName(
669            proto
670                .value
671                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
672                .into(),
673        ))
674    }
675}
676
677impl From<ColumnName> for mz_sql_parser::ast::Ident {
678    fn from(value: ColumnName) -> Self {
679        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
680        mz_sql_parser::ast::Ident::new_unchecked(value.0)
681    }
682}
683
684#[cfg(any(test, feature = "proptest"))]
685impl proptest::arbitrary::Arbitrary for ColumnName {
686    type Parameters = ();
687    type Strategy = BoxedStrategy<ColumnName>;
688
689    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
690        // Long column names are generally uninteresting, and can greatly
691        // increase the runtime for a test case, so bound the max length.
692        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
693        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
694            weights.extend([
695                (5, Just(16..128)),
696                (1, Just(128..1024)),
697                (1, Just(1024..4096)),
698            ]);
699        }
700        let name_length = Union::new_weighted(weights);
701
702        // Non-ASCII characters are also generally uninteresting and can make
703        // debugging harder.
704        let char_strat = Rc::new(Union::new_weighted(vec![
705            (50, proptest::char::range('A', 'z').boxed()),
706            (1, any::<char>().boxed()),
707        ]));
708
709        name_length
710            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
711            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
712            .no_shrink()
713            .boxed()
714    }
715}
716
717/// Default name of a column (when no other information is known).
718pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
719
720/// Stable index of a column in a [`RelationDesc`].
721#[derive(
722    Clone,
723    Copy,
724    Debug,
725    Eq,
726    PartialEq,
727    PartialOrd,
728    Ord,
729    Serialize,
730    Deserialize,
731    Hash,
732    MzReflect
733)]
734pub struct ColumnIndex(usize);
735
736#[cfg(any(test, feature = "proptest"))]
737static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
738
739impl ColumnIndex {
740    /// Returns a stable identifier for this [`ColumnIndex`].
741    pub fn to_stable_name(&self) -> String {
742        self.0.to_string()
743    }
744
745    pub fn to_raw(&self) -> usize {
746        self.0
747    }
748
749    pub fn from_raw(val: usize) -> Self {
750        ColumnIndex(val)
751    }
752}
753
754/// The version a given column was added at.
755#[derive(
756    Clone,
757    Copy,
758    Debug,
759    Eq,
760    PartialEq,
761    PartialOrd,
762    Ord,
763    Serialize,
764    Deserialize,
765    Hash,
766    MzReflect
767)]
768#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
769pub struct RelationVersion(u64);
770
771impl RelationVersion {
772    /// Returns the "root" or "initial" version of a [`RelationDesc`].
773    pub fn root() -> Self {
774        RelationVersion(0)
775    }
776
777    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
778    pub fn bump(&self) -> Self {
779        let next_version = self
780            .0
781            .checked_add(1)
782            .expect("added more than u64::MAX columns?");
783        RelationVersion(next_version)
784    }
785
786    /// Consume a [`RelationVersion`] returning the raw value.
787    ///
788    /// Should __only__ be used for serialization.
789    pub fn into_raw(self) -> u64 {
790        self.0
791    }
792
793    /// Create a [`RelationVersion`] from a raw value.
794    ///
795    /// Should __only__ be used for serialization.
796    pub fn from_raw(val: u64) -> RelationVersion {
797        RelationVersion(val)
798    }
799}
800
801impl From<RelationVersion> for SchemaId {
802    fn from(value: RelationVersion) -> Self {
803        SchemaId(usize::cast_from(value.0))
804    }
805}
806
807impl From<mz_sql_parser::ast::Version> for RelationVersion {
808    fn from(value: mz_sql_parser::ast::Version) -> Self {
809        RelationVersion(value.into_inner())
810    }
811}
812
813impl fmt::Display for RelationVersion {
814    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815        write!(f, "v{}", self.0)
816    }
817}
818
819impl From<RelationVersion> for mz_sql_parser::ast::Version {
820    fn from(value: RelationVersion) -> Self {
821        mz_sql_parser::ast::Version::new(value.0)
822    }
823}
824
825impl RustType<ProtoRelationVersion> for RelationVersion {
826    fn into_proto(&self) -> ProtoRelationVersion {
827        ProtoRelationVersion { value: self.0 }
828    }
829
830    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
831        Ok(RelationVersion(proto.value))
832    }
833}
834
835/// Semantic type annotation for a column in a builtin catalog relation.
836///
837/// These are compile-time metadata used by the catalog ontology layer to
838/// describe the meaning of a column (e.g., that it contains a catalog item ID
839/// or a role ID). Possible values correspond to the entries in
840/// `SEMANTIC_TYPE_DEFS` in the `mz-catalog` crate.
841#[derive(
842    Clone,
843    Copy,
844    Debug,
845    PartialEq,
846    Eq,
847    PartialOrd,
848    Ord,
849    Hash,
850    serde::Serialize
851)]
852pub enum SemanticType {
853    CatalogItemId,
854    GlobalId,
855    ClusterId,
856    ReplicaId,
857    SchemaId,
858    DatabaseId,
859    RoleId,
860    NetworkPolicyId,
861    ShardId,
862    OID,
863    ObjectType,
864    ConnectionType,
865    SourceType,
866    MzTimestamp,
867    WallclockTimestamp,
868    ByteCount,
869    RecordCount,
870    CreditRate,
871    SqlDefinition,
872    RedactedSqlDefinition,
873}
874
875impl fmt::Display for SemanticType {
876    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877        let s = match self {
878            SemanticType::CatalogItemId => "CatalogItemId",
879            SemanticType::GlobalId => "GlobalId",
880            SemanticType::ClusterId => "ClusterId",
881            SemanticType::ReplicaId => "ReplicaId",
882            SemanticType::SchemaId => "SchemaId",
883            SemanticType::DatabaseId => "DatabaseId",
884            SemanticType::RoleId => "RoleId",
885            SemanticType::NetworkPolicyId => "NetworkPolicyId",
886            SemanticType::ShardId => "ShardId",
887            SemanticType::OID => "OID",
888            SemanticType::ObjectType => "ObjectType",
889            SemanticType::ConnectionType => "ConnectionType",
890            SemanticType::SourceType => "SourceType",
891            SemanticType::MzTimestamp => "MzTimestamp",
892            SemanticType::WallclockTimestamp => "WallclockTimestamp",
893            SemanticType::ByteCount => "ByteCount",
894            SemanticType::RecordCount => "RecordCount",
895            SemanticType::CreditRate => "CreditRate",
896            SemanticType::SqlDefinition => "SqlDefinition",
897            SemanticType::RedactedSqlDefinition => "RedactedSqlDefinition",
898        };
899        f.write_str(s)
900    }
901}
902
903/// Metadata (other than type) for a column in a [`RelationDesc`].
904#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
905struct ColumnMetadata {
906    /// Name of the column.
907    name: ColumnName,
908    /// Index into a [`SqlRelationType`] for this column.
909    typ_idx: usize,
910    /// Version this column was added at.
911    added: RelationVersion,
912    /// Version this column was dropped at.
913    dropped: Option<RelationVersion>,
914}
915
916/// A description of the shape of a relation.
917///
918/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
919/// the relation.
920///
921/// # Examples
922///
923/// A `RelationDesc`s is typically constructed via its builder API:
924///
925/// ```
926/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
927///
928/// let desc = RelationDesc::builder()
929///     .with_column("id", SqlScalarType::Int64.nullable(false))
930///     .with_column("price", SqlScalarType::Float64.nullable(true))
931///     .finish();
932/// ```
933///
934/// In more complicated cases, like when constructing a `RelationDesc` in
935/// response to user input, it may be more convenient to construct a relation
936/// type first, and imbue it with column names to form a `RelationDesc` later:
937///
938/// ```
939/// use mz_repr::RelationDesc;
940///
941/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
942/// let relation_type = plan_query("SELECT * FROM table");
943/// let names = (0..relation_type.arity()).map(|i| match i {
944///     0 => "first",
945///     1 => "second",
946///     _ => "unknown",
947/// });
948/// let desc = RelationDesc::new(relation_type, names);
949/// ```
950///
951/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
952/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
953/// column throughout the lifetime of the relation. This allows a
954/// [`RelationDesc`] to represent a projection over a version of itself.
955///
956/// ```
957/// use std::collections::BTreeSet;
958/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
959///
960/// let desc = RelationDesc::builder()
961///     .with_column("name", SqlScalarType::String.nullable(false))
962///     .with_column("email", SqlScalarType::String.nullable(false))
963///     .finish();
964///
965/// // Project away the second column.
966/// let demands = BTreeSet::from([1]);
967/// let proj = desc.apply_demand(&demands);
968///
969/// // We projected away the first column.
970/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
971/// // But retained the second.
972/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
973///
974/// // The underlying `SqlRelationType` also contains a single column.
975/// assert_eq!(proj.typ().arity(), 1);
976/// ```
977///
978/// To maintain this stable mapping and track the lifetime of a column (e.g.
979/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
980/// the index in [`SqlRelationType`] that corresponds to a given column, and the
981/// version at which this column was added or dropped.
982///
983#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, MzReflect)]
984pub struct RelationDesc {
985    typ: SqlRelationType,
986    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
987}
988
989impl RustType<ProtoRelationDesc> for RelationDesc {
990    fn into_proto(&self) -> ProtoRelationDesc {
991        let (names, metadata): (Vec<_>, Vec<_>) = self
992            .metadata
993            .values()
994            .map(|meta| {
995                let metadata = ProtoColumnMetadata {
996                    added: Some(meta.added.into_proto()),
997                    dropped: meta.dropped.map(|v| v.into_proto()),
998                };
999                (meta.name.into_proto(), metadata)
1000            })
1001            .unzip();
1002
1003        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
1004        // metadata field was added. To make sure our serialization roundtrips the same as before
1005        // we added the field, we omit `metadata` if all of the values are equal to the default.
1006        //
1007        // Note: This logic needs to exist approximately forever.
1008        let is_all_default_metadata = metadata.iter().all(|meta| {
1009            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
1010        });
1011        let metadata = if is_all_default_metadata {
1012            Vec::new()
1013        } else {
1014            metadata
1015        };
1016
1017        ProtoRelationDesc {
1018            typ: Some(self.typ.into_proto()),
1019            names,
1020            metadata,
1021        }
1022    }
1023
1024    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
1025        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
1026        // metadata field was added. If the field doesn't exist we fill it in with default values,
1027        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
1028        //
1029        // Note: This logic needs to exist approximately forever.
1030        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
1031            let val = ProtoColumnMetadata {
1032                added: Some(RelationVersion::root().into_proto()),
1033                dropped: None,
1034            };
1035            Box::new(itertools::repeat_n(val, proto.names.len()))
1036        } else {
1037            // Reject mismatched lengths explicitly rather than panicking via
1038            // `zip_eq` below, since this branch is reachable from untrusted
1039            // proto bytes.
1040            if proto.names.len() != proto.metadata.len() {
1041                return Err(TryFromProtoError::InvalidFieldError(format!(
1042                    "ProtoRelationDesc: names ({}) and metadata ({}) length mismatch",
1043                    proto.names.len(),
1044                    proto.metadata.len()
1045                )));
1046            }
1047            Box::new(proto.metadata.into_iter())
1048        };
1049
1050        let metadata = proto
1051            .names
1052            .into_iter()
1053            .zip_eq(proto_metadata)
1054            .enumerate()
1055            .map(|(idx, (name, metadata))| {
1056                let meta = ColumnMetadata {
1057                    name: name.into_rust()?,
1058                    typ_idx: idx,
1059                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
1060                    dropped: metadata.dropped.into_rust()?,
1061                };
1062                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
1063            })
1064            .collect::<Result<_, _>>()?;
1065
1066        Ok(RelationDesc {
1067            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
1068            metadata,
1069        })
1070    }
1071}
1072
1073impl RelationDesc {
1074    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
1075    pub fn builder() -> RelationDescBuilder {
1076        RelationDescBuilder::default()
1077    }
1078
1079    /// Constructs a new `RelationDesc` that represents the empty relation
1080    /// with no columns and no keys.
1081    pub fn empty() -> Self {
1082        RelationDesc {
1083            typ: SqlRelationType::empty(),
1084            metadata: BTreeMap::default(),
1085        }
1086    }
1087
1088    /// Check if the `RelationDesc` is empty.
1089    pub fn is_empty(&self) -> bool {
1090        self == &Self::empty()
1091    }
1092
1093    /// Returns the number of columns in this [`RelationDesc`].
1094    pub fn len(&self) -> usize {
1095        self.typ().column_types.len()
1096    }
1097
1098    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
1099    /// over column names.
1100    ///
1101    /// # Panics
1102    ///
1103    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
1104    /// items in `names`.
1105    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1106    where
1107        I: IntoIterator<Item = N>,
1108        N: Into<ColumnName>,
1109    {
1110        let metadata: BTreeMap<_, _> = names
1111            .into_iter()
1112            .enumerate()
1113            .map(|(idx, name)| {
1114                let col_idx = ColumnIndex(idx);
1115                let metadata = ColumnMetadata {
1116                    name: name.into(),
1117                    typ_idx: idx,
1118                    added: RelationVersion::root(),
1119                    dropped: None,
1120                };
1121                (col_idx, metadata)
1122            })
1123            .collect();
1124
1125        // TODO(parkmycar): Add better validation here.
1126        assert_eq!(typ.column_types.len(), metadata.len());
1127
1128        RelationDesc { typ, metadata }
1129    }
1130
1131    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1132    where
1133        I: IntoIterator<Item = (N, T)>,
1134        T: Into<SqlColumnType>,
1135        N: Into<ColumnName>,
1136    {
1137        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1138        let types = types.into_iter().map(Into::into).collect();
1139        let typ = SqlRelationType::new(types);
1140        Self::new(typ, names)
1141    }
1142
1143    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
1144    ///
1145    /// # Panics
1146    ///
1147    /// Panics if either `self` or `other` have columns that were added at a
1148    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1149    /// columns were dropped.
1150    ///
1151    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
1152    pub fn concat(mut self, other: Self) -> Self {
1153        let self_len = self.typ.column_types.len();
1154
1155        for (typ, (_col_idx, meta)) in other.typ.column_types.into_iter().zip_eq(other.metadata) {
1156            assert_eq!(meta.added, RelationVersion::root());
1157            assert_none!(meta.dropped);
1158
1159            let new_idx = self.typ.columns().len();
1160            let new_meta = ColumnMetadata {
1161                name: meta.name,
1162                typ_idx: new_idx,
1163                added: RelationVersion::root(),
1164                dropped: None,
1165            };
1166
1167            self.typ.column_types.push(typ);
1168            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1169
1170            assert_eq!(self.metadata.len(), self.typ.columns().len());
1171            assert_none!(prev);
1172        }
1173
1174        for k in other.typ.keys {
1175            let k = k.into_iter().map(|idx| idx + self_len).collect();
1176            self = self.with_key(k);
1177        }
1178        self
1179    }
1180
1181    /// Adds a new key for the relation.
1182    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1183        self.typ = self.typ.with_key(indices);
1184        self
1185    }
1186
1187    /// Drops all existing keys.
1188    pub fn without_keys(mut self) -> Self {
1189        self.typ.keys.clear();
1190        self
1191    }
1192
1193    /// Builds a new relation description with the column names replaced with
1194    /// new names.
1195    ///
1196    /// # Panics
1197    ///
1198    /// Panics if the arity of the relation type does not match the number of
1199    /// items in `names`.
1200    pub fn with_names<I, N>(self, names: I) -> Self
1201    where
1202        I: IntoIterator<Item = N>,
1203        N: Into<ColumnName>,
1204    {
1205        Self::new(self.typ, names)
1206    }
1207
1208    /// Computes the number of columns in the relation.
1209    pub fn arity(&self) -> usize {
1210        self.typ.arity()
1211    }
1212
1213    /// Returns the relation type underlying this relation description.
1214    pub fn typ(&self) -> &SqlRelationType {
1215        &self.typ
1216    }
1217
1218    /// Returns the owned relation type underlying this relation description.
1219    pub fn into_typ(self) -> SqlRelationType {
1220        self.typ
1221    }
1222
1223    /// Returns an iterator over the columns in this relation.
1224    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1225        self.metadata.values().map(|meta| {
1226            let typ = &self.typ.columns()[meta.typ_idx];
1227            (&meta.name, typ)
1228        })
1229    }
1230
1231    /// Returns an iterator over the types of the columns in this relation.
1232    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1233        self.typ.column_types.iter()
1234    }
1235
1236    /// Returns an iterator over the names of the columns in this relation.
1237    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1238        self.metadata.values().map(|meta| &meta.name)
1239    }
1240
1241    /// Returns an iterator over the columns in this relation, with all their metadata.
1242    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1243        self.metadata.iter().map(|(col_idx, metadata)| {
1244            let col_typ = &self.typ.columns()[metadata.typ_idx];
1245            (col_idx, &metadata.name, col_typ)
1246        })
1247    }
1248
1249    /// Returns an iterator over the names of the columns in this relation that are "similar" to
1250    /// the provided `name`.
1251    pub fn iter_similar_names<'a>(
1252        &'a self,
1253        name: &'a ColumnName,
1254    ) -> impl Iterator<Item = &'a ColumnName> {
1255        self.iter_names().filter(|n| n.is_similar(name))
1256    }
1257
1258    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
1259    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1260        self.metadata.contains_key(idx)
1261    }
1262
1263    /// Finds a column by name.
1264    ///
1265    /// Returns the index and type of the column named `name`. If no column with
1266    /// the specified name exists, returns `None`. If multiple columns have the
1267    /// specified name, the leftmost column is returned.
1268    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1269        self.iter_names()
1270            .position(|n| n == name)
1271            .map(|i| (i, &self.typ.column_types[i]))
1272    }
1273
1274    /// Gets the name of the `i`th column.
1275    ///
1276    /// # Panics
1277    ///
1278    /// Panics if `i` is not a valid column index.
1279    ///
1280    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
1281    pub fn get_name(&self, i: usize) -> &ColumnName {
1282        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1283        self.get_name_idx(&ColumnIndex(i))
1284    }
1285
1286    /// Gets the name of the column at `idx`.
1287    ///
1288    /// # Panics
1289    ///
1290    /// Panics if no column exists at `idx`.
1291    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1292        &self.metadata.get(idx).expect("should exist").name
1293    }
1294
1295    /// Mutably gets the name of the `i`th column.
1296    ///
1297    /// # Panics
1298    ///
1299    /// Panics if `i` is not a valid column index.
1300    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1301        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1302        &mut self
1303            .metadata
1304            .get_mut(&ColumnIndex(i))
1305            .expect("should exist")
1306            .name
1307    }
1308
1309    /// Gets the [`SqlColumnType`] of the column at `idx`.
1310    ///
1311    /// # Panics
1312    ///
1313    /// Panics if no column exists at `idx`.
1314    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1315        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1316        &self.typ.column_types[typ_idx]
1317    }
1318
1319    /// Gets the name of the `i`th column if that column name is unambiguous.
1320    ///
1321    /// If at least one other column has the same name as the `i`th column,
1322    /// returns `None`. If the `i`th column has no name, returns `None`.
1323    ///
1324    /// # Panics
1325    ///
1326    /// Panics if `i` is not a valid column index.
1327    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1328        let name = self.get_name(i);
1329        if self.iter_names().filter(|n| *n == name).count() == 1 {
1330            Some(name)
1331        } else {
1332            None
1333        }
1334    }
1335
1336    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1337    ///
1338    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1339    /// structure will be simple to extend.
1340    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1341        let name = self.get_name(i);
1342        let typ = &self.typ.column_types[i];
1343        if d == &Datum::Null && !typ.nullable {
1344            Err(NotNullViolation(name.clone()))
1345        } else {
1346            Ok(())
1347        }
1348    }
1349
1350    /// Computes the differences between two [`RelationDesc`]s.
1351    ///
1352    /// Returns a rich diff describing which columns differ, and in what way.
1353    ///
1354    /// # Panics
1355    ///
1356    /// Panics if either `self` or `other` have columns that were added at a
1357    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1358    /// columns were dropped.
1359    ///
1360    /// This simplifies things by allowing us to assume that `ColumnIndex`es are
1361    /// dense and that they match the indexes of `typ.columns()`. Without this
1362    /// we would, e.g., struggle comparing keys as those are in terms of
1363    /// `typ.columns()` indexes.
1364    pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1365        assert_eq!(self.metadata.len(), self.typ.columns().len());
1366        assert_eq!(other.metadata.len(), other.typ.columns().len());
1367        for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1368            assert_eq!(meta.typ_idx, idx.0);
1369            assert_eq!(meta.added, RelationVersion::root());
1370            assert_none!(meta.dropped);
1371        }
1372
1373        let mut column_diffs = BTreeMap::new();
1374        let mut key_diff = None;
1375
1376        let left_arity = self.arity();
1377        let right_arity = other.arity();
1378        let common_arity = std::cmp::min(left_arity, right_arity);
1379
1380        for idx in 0..common_arity {
1381            let left_name = self.get_name(idx);
1382            let right_name = other.get_name(idx);
1383            let left_type = &self.typ.column_types[idx];
1384            let right_type = &other.typ.column_types[idx];
1385
1386            if left_name != right_name {
1387                let diff = ColumnDiff::NameMismatch {
1388                    left: left_name.clone(),
1389                    right: right_name.clone(),
1390                };
1391                column_diffs.insert(idx, diff);
1392            } else if left_type.scalar_type != right_type.scalar_type {
1393                let diff = ColumnDiff::TypeMismatch {
1394                    name: left_name.clone(),
1395                    left: left_type.scalar_type.clone(),
1396                    right: right_type.scalar_type.clone(),
1397                };
1398                column_diffs.insert(idx, diff);
1399            } else if left_type.nullable != right_type.nullable {
1400                let diff = ColumnDiff::NullabilityMismatch {
1401                    name: left_name.clone(),
1402                    left: left_type.nullable,
1403                    right: right_type.nullable,
1404                };
1405                column_diffs.insert(idx, diff);
1406            }
1407        }
1408
1409        for idx in common_arity..left_arity {
1410            let diff = ColumnDiff::Missing {
1411                name: self.get_name(idx).clone(),
1412            };
1413            column_diffs.insert(idx, diff);
1414        }
1415
1416        for idx in common_arity..right_arity {
1417            let diff = ColumnDiff::Extra {
1418                name: other.get_name(idx).clone(),
1419            };
1420            column_diffs.insert(idx, diff);
1421        }
1422
1423        let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1424        let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1425        if left_keys != right_keys {
1426            let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1427                keys.iter()
1428                    .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1429                    .collect()
1430            };
1431            key_diff = Some(KeyDiff {
1432                left: column_names(self, left_keys),
1433                right: column_names(other, right_keys),
1434            });
1435        }
1436
1437        RelationDescDiff {
1438            column_diffs,
1439            key_diff,
1440        }
1441    }
1442
1443    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1444    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1445        let mut new_desc = self.clone();
1446
1447        // Update ColumnMetadata.
1448        let mut removed = 0;
1449        new_desc.metadata.retain(|idx, metadata| {
1450            let retain = demands.contains(&idx.0);
1451            if !retain {
1452                removed += 1;
1453            } else {
1454                metadata.typ_idx -= removed;
1455            }
1456            retain
1457        });
1458
1459        // Update SqlColumnType.
1460        let mut idx = 0;
1461        new_desc.typ.column_types.retain(|_| {
1462            let keep = demands.contains(&idx);
1463            idx += 1;
1464            keep
1465        });
1466
1467        new_desc
1468    }
1469}
1470
1471#[cfg(any(test, feature = "proptest"))]
1472impl Arbitrary for RelationDesc {
1473    type Parameters = ();
1474    type Strategy = BoxedStrategy<RelationDesc>;
1475
1476    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1477        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1478        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1479            weights.extend([
1480                (12, Just(16..32)),
1481                (6, Just(32..64)),
1482                (3, Just(64..128)),
1483                (1, Just(128..256)),
1484            ]);
1485        }
1486        let num_columns = Union::new_weighted(weights);
1487
1488        num_columns.prop_flat_map(arb_relation_desc).boxed()
1489    }
1490}
1491
1492/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1493/// within the range provided.
1494#[cfg(any(test, feature = "proptest"))]
1495pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1496    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1497        .prop_map(RelationDesc::from_names_and_types)
1498}
1499
1500/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1501#[cfg(any(test, feature = "proptest"))]
1502pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1503    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1504    mask.prop_map(move |mask| {
1505        let demands: BTreeSet<_> = mask
1506            .into_iter()
1507            .enumerate()
1508            .filter_map(|(idx, keep)| keep.then_some(idx))
1509            .collect();
1510        desc.apply_demand(&demands)
1511    })
1512}
1513
1514impl IntoIterator for RelationDesc {
1515    type Item = (ColumnName, SqlColumnType);
1516    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1517
1518    fn into_iter(self) -> Self::IntoIter {
1519        let iter = self
1520            .metadata
1521            .into_values()
1522            .zip_eq(self.typ.column_types)
1523            .map(|(meta, typ)| (meta.name, typ));
1524        Box::new(iter)
1525    }
1526}
1527
1528/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1529#[cfg(any(test, feature = "proptest"))]
1530pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1531    let datums: Vec<_> = desc
1532        .typ()
1533        .columns()
1534        .iter()
1535        .cloned()
1536        .map(arb_datum_for_column)
1537        .collect();
1538    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1539}
1540
1541/// Expression violated not-null constraint on named column
1542#[derive(Debug, PartialEq, Eq)]
1543pub struct NotNullViolation(pub ColumnName);
1544
1545impl fmt::Display for NotNullViolation {
1546    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1547        write!(
1548            f,
1549            "null value in column {} violates not-null constraint",
1550            self.0.quoted()
1551        )
1552    }
1553}
1554
1555/// The result of comparing two [`RelationDesc`]s.
1556#[derive(Debug, Clone, PartialEq, Eq)]
1557pub struct RelationDescDiff {
1558    /// Column differences, keyed by column index.
1559    pub column_diffs: BTreeMap<usize, ColumnDiff>,
1560    /// Key differences, if any.
1561    pub key_diff: Option<KeyDiff>,
1562}
1563
1564impl RelationDescDiff {
1565    /// Returns whether the diff contains any differences.
1566    pub fn is_empty(&self) -> bool {
1567        self.column_diffs.is_empty() && self.key_diff.is_none()
1568    }
1569}
1570
1571/// A difference in a column between two [`RelationDesc`]s.
1572#[derive(Debug, Clone, PartialEq, Eq)]
1573pub enum ColumnDiff {
1574    /// Column exists only in the left relation.
1575    Missing { name: ColumnName },
1576    /// Column exists only in the right relation.
1577    Extra { name: ColumnName },
1578    /// Columns have different types.
1579    TypeMismatch {
1580        name: ColumnName,
1581        left: SqlScalarType,
1582        right: SqlScalarType,
1583    },
1584    /// Columns have different nullability.
1585    NullabilityMismatch {
1586        name: ColumnName,
1587        left: bool,
1588        right: bool,
1589    },
1590    /// Columns have different names.
1591    NameMismatch { left: ColumnName, right: ColumnName },
1592}
1593
1594/// A difference in the keys of two [`RelationDesc`]s.
1595#[derive(Debug, Clone, PartialEq, Eq)]
1596pub struct KeyDiff {
1597    /// Keys of the left relation.
1598    pub left: BTreeSet<Vec<ColumnName>>,
1599    /// Keys of the right relation.
1600    pub right: BTreeSet<Vec<ColumnName>>,
1601}
1602
1603/// A builder for a [`RelationDesc`].
1604#[derive(Clone, Default, Debug, PartialEq, Eq)]
1605pub struct RelationDescBuilder {
1606    /// Columns of the relation.
1607    columns: Vec<(ColumnName, SqlColumnType)>,
1608    /// Sets of indices that are "keys" for the collection.
1609    keys: Vec<Vec<usize>>,
1610}
1611
1612impl RelationDescBuilder {
1613    /// Appends a column with the specified name and type.
1614    pub fn with_column<N: Into<ColumnName>>(
1615        mut self,
1616        name: N,
1617        ty: SqlColumnType,
1618    ) -> RelationDescBuilder {
1619        let name = name.into();
1620        self.columns.push((name, ty));
1621        self
1622    }
1623
1624    /// Appends the provided columns to the builder.
1625    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1626    where
1627        I: IntoIterator<Item = (N, T)>,
1628        T: Into<SqlColumnType>,
1629        N: Into<ColumnName>,
1630    {
1631        self.columns
1632            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1633        self
1634    }
1635
1636    /// Adds a new key for the relation.
1637    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1638        indices.sort_unstable();
1639        if !self.keys.contains(&indices) {
1640            self.keys.push(indices);
1641        }
1642        self
1643    }
1644
1645    /// Removes all previously inserted keys.
1646    pub fn without_keys(mut self) -> RelationDescBuilder {
1647        self.keys.clear();
1648        assert_eq!(self.keys.len(), 0);
1649        self
1650    }
1651
1652    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1653    pub fn concat(mut self, other: Self) -> Self {
1654        let self_len = self.columns.len();
1655
1656        self.columns.extend(other.columns);
1657        for k in other.keys {
1658            let k = k.into_iter().map(|idx| idx + self_len).collect();
1659            self = self.with_key(k);
1660        }
1661
1662        self
1663    }
1664
1665    /// Finish the builder, returning a [`RelationDesc`].
1666    pub fn finish(self) -> RelationDesc {
1667        let mut desc = RelationDesc::from_names_and_types(self.columns);
1668        desc.typ = desc.typ.with_keys(self.keys);
1669        desc
1670    }
1671}
1672
1673/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1674#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1675pub enum RelationVersionSelector {
1676    Specific(RelationVersion),
1677    Latest,
1678}
1679
1680impl RelationVersionSelector {
1681    pub fn specific(version: u64) -> Self {
1682        RelationVersionSelector::Specific(RelationVersion(version))
1683    }
1684}
1685
1686/// A wrapper around [`RelationDesc`] that provides an interface for adding
1687/// columns and generating new versions.
1688///
1689/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1690/// be great.
1691#[derive(Debug, Clone, Serialize)]
1692pub struct VersionedRelationDesc {
1693    inner: RelationDesc,
1694}
1695
1696impl VersionedRelationDesc {
1697    pub fn new(inner: RelationDesc) -> Self {
1698        VersionedRelationDesc { inner }
1699    }
1700
1701    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1702    ///
1703    /// # Panics
1704    ///
1705    /// * Panics if a column with `name` already exists that hasn't been dropped.
1706    ///
1707    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1708    #[must_use]
1709    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1710    where
1711        N: Into<ColumnName>,
1712        T: Into<SqlColumnType>,
1713    {
1714        let latest_version = self.latest_version();
1715        let new_version = latest_version.bump();
1716
1717        let name = name.into();
1718        let existing = self
1719            .inner
1720            .metadata
1721            .iter()
1722            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1723        if let Some(existing) = existing {
1724            panic!("column named '{name}' already exists! {existing:?}");
1725        }
1726
1727        let next_idx = self.inner.metadata.len();
1728        let col_meta = ColumnMetadata {
1729            name,
1730            typ_idx: next_idx,
1731            added: new_version,
1732            dropped: None,
1733        };
1734
1735        self.inner.typ.column_types.push(typ.into());
1736        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1737
1738        assert_none!(prev, "column index overlap!");
1739        self.validate();
1740
1741        new_version
1742    }
1743
1744    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1745    /// `name` drops the left-most one that hasn't already been dropped.
1746    ///
1747    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1748    ///
1749    /// # Panics
1750    ///
1751    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1752    #[must_use]
1753    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1754    where
1755        N: Into<ColumnName>,
1756    {
1757        let name = name.into();
1758        let latest_version = self.latest_version();
1759        let new_version = latest_version.bump();
1760
1761        let col = self
1762            .inner
1763            .metadata
1764            .values_mut()
1765            .find(|meta| meta.name == name && meta.dropped.is_none())
1766            .expect("column to exist");
1767
1768        // Make sure the column hadn't been previously dropped.
1769        assert_none!(col.dropped, "column was already dropped");
1770        col.dropped = Some(new_version);
1771
1772        // Make sure the column isn't being used as a key.
1773        let dropped_key = self
1774            .inner
1775            .typ
1776            .keys
1777            .iter()
1778            .any(|keys| keys.contains(&col.typ_idx));
1779        assert!(!dropped_key, "column being dropped was used as a key");
1780
1781        self.validate();
1782        new_version
1783    }
1784
1785    /// Returns the [`RelationDesc`] at the latest version.
1786    pub fn latest(&self) -> RelationDesc {
1787        self.inner.clone()
1788    }
1789
1790    /// Returns this [`RelationDesc`] at the specified version.
1791    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1792        // Get all of the changes from the start, up to whatever version was requested.
1793        let up_to_version = match version {
1794            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1795            RelationVersionSelector::Specific(v) => v,
1796        };
1797
1798        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1799            let added = meta.added <= up_to_version;
1800            let dropped = meta
1801                .dropped
1802                .map(|dropped_at| up_to_version >= dropped_at)
1803                .unwrap_or(false);
1804
1805            added && !dropped
1806        });
1807
1808        let mut column_types = Vec::new();
1809        let mut column_metas = BTreeMap::new();
1810
1811        // N.B. At this point we need to be careful because col_idx might not
1812        // equal typ_idx.
1813        //
1814        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1815        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1816        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1817        for (col_idx, meta) in valid_columns {
1818            let new_meta = ColumnMetadata {
1819                name: meta.name.clone(),
1820                typ_idx: column_types.len(),
1821                added: meta.added.clone(),
1822                dropped: meta.dropped.clone(),
1823            };
1824            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1825            column_metas.insert(*col_idx, new_meta);
1826        }
1827
1828        // Remap keys in case a column with an index less than that of a key was
1829        // dropped.
1830        //
1831        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1832        // keys and "b" was dropped.
1833        let keys = self
1834            .inner
1835            .typ
1836            .keys
1837            .iter()
1838            .map(|keys| {
1839                keys.iter()
1840                    .map(|key_idx| {
1841                        let metadata = column_metas
1842                            .get(&ColumnIndex(*key_idx))
1843                            .expect("found key for column that doesn't exist");
1844                        metadata.typ_idx
1845                    })
1846                    .collect()
1847            })
1848            .collect();
1849
1850        let relation_type = SqlRelationType { column_types, keys };
1851
1852        RelationDesc {
1853            typ: relation_type,
1854            metadata: column_metas,
1855        }
1856    }
1857
1858    pub fn latest_version(&self) -> RelationVersion {
1859        self.inner
1860            .metadata
1861            .values()
1862            // N.B. Dropped is always greater than added.
1863            .map(|meta| meta.dropped.unwrap_or(meta.added))
1864            .max()
1865            // If there aren't any columns we're implicitly the root version.
1866            .unwrap_or_else(RelationVersion::root)
1867    }
1868
1869    /// Validates internal contraints of the [`RelationDesc`] are correct.
1870    ///
1871    /// # Panics
1872    ///
1873    /// Panics if a constraint is not satisfied.
1874    fn validate(&self) {
1875        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1876            if desc.typ.column_types.len() != desc.metadata.len() {
1877                anyhow::bail!("mismatch between number of types and metadatas");
1878            }
1879
1880            for (col_idx, meta) in &desc.metadata {
1881                if col_idx.0 > desc.metadata.len() {
1882                    anyhow::bail!("column index out of bounds");
1883                }
1884                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1885                    anyhow::bail!("column was added after it was dropped?");
1886                }
1887                if desc.typ().columns().get(meta.typ_idx).is_none() {
1888                    anyhow::bail!("typ_idx incorrect");
1889                }
1890            }
1891
1892            for keys in &desc.typ.keys {
1893                for key in keys {
1894                    if *key >= desc.typ.column_types.len() {
1895                        anyhow::bail!("key index was out of bounds!");
1896                    }
1897                }
1898            }
1899
1900            let versions = desc
1901                .metadata
1902                .values()
1903                .map(|meta| meta.dropped.unwrap_or(meta.added));
1904            let mut max = 0;
1905            let mut sum = 0;
1906            for version in versions {
1907                max = std::cmp::max(max, version.0);
1908                sum += version.0;
1909            }
1910
1911            // Other than RelationVersion(0), we should never have duplicate
1912            // versions and they should always increase by 1. In other words, the
1913            // sum of all RelationVersions should be the sum of [0, max].
1914            //
1915            // N.B. n * (n + 1) / 2 = sum of [0, n]
1916            //
1917            // While I normally don't like tricks like this, it allows us to
1918            // validate that our column versions are correct in O(n) time and
1919            // without allocations.
1920            if sum != (max * (max + 1) / 2) {
1921                anyhow::bail!("there is a duplicate or missing relation version");
1922            }
1923
1924            Ok(())
1925        }
1926
1927        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1928    }
1929}
1930
1931/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1932/// exercise schema migrations.
1933#[derive(Debug)]
1934#[cfg(any(test, feature = "proptest"))]
1935pub enum PropRelationDescDiff {
1936    AddColumn {
1937        name: ColumnName,
1938        typ: SqlColumnType,
1939    },
1940    DropColumn {
1941        name: ColumnName,
1942    },
1943    ToggleNullability {
1944        name: ColumnName,
1945    },
1946    ChangeType {
1947        name: ColumnName,
1948        typ: SqlColumnType,
1949    },
1950}
1951
1952#[cfg(any(test, feature = "proptest"))]
1953impl PropRelationDescDiff {
1954    pub fn apply(self, desc: &mut RelationDesc) {
1955        match self {
1956            PropRelationDescDiff::AddColumn { name, typ } => {
1957                let new_idx = desc.metadata.len();
1958                let meta = ColumnMetadata {
1959                    name,
1960                    typ_idx: new_idx,
1961                    added: RelationVersion(0),
1962                    dropped: None,
1963                };
1964                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1965                desc.typ.column_types.push(typ);
1966
1967                assert_none!(prev);
1968                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1969            }
1970            PropRelationDescDiff::DropColumn { name } => {
1971                let next_version = desc
1972                    .metadata
1973                    .values()
1974                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1975                    .max()
1976                    .unwrap_or_else(RelationVersion::root)
1977                    .bump();
1978                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1979                else {
1980                    return;
1981                };
1982                if metadata.dropped.is_none() {
1983                    metadata.dropped = Some(next_version);
1984                }
1985            }
1986            PropRelationDescDiff::ToggleNullability { name } => {
1987                let Some((pos, _)) = desc.get_by_name(&name) else {
1988                    return;
1989                };
1990                let col_type = desc
1991                    .typ
1992                    .column_types
1993                    .get_mut(pos)
1994                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1995                col_type.nullable = !col_type.nullable;
1996            }
1997            PropRelationDescDiff::ChangeType { name, typ } => {
1998                let Some((pos, _)) = desc.get_by_name(&name) else {
1999                    return;
2000                };
2001                let col_type = desc
2002                    .typ
2003                    .column_types
2004                    .get_mut(pos)
2005                    .expect("ColumnNames and SqlColumnTypes out of sync!");
2006                *col_type = typ;
2007            }
2008        }
2009    }
2010}
2011
2012/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
2013#[cfg(any(test, feature = "proptest"))]
2014pub fn arb_relation_desc_diff(
2015    source: &RelationDesc,
2016) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
2017    let source = Rc::new(source.clone());
2018    let num_source_columns = source.typ.columns().len();
2019
2020    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
2021    let add_columns_strat = num_add_columns
2022        .prop_flat_map(|num_columns| {
2023            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
2024        })
2025        .prop_map(|cols| {
2026            cols.into_iter()
2027                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
2028                .collect::<Vec<_>>()
2029        });
2030
2031    // If the source RelationDesc is empty there is nothing else to do.
2032    if num_source_columns == 0 {
2033        return add_columns_strat.boxed();
2034    }
2035
2036    let source_ = Rc::clone(&source);
2037    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2038        let mut set = BTreeSet::default();
2039        for _ in 0..num_columns {
2040            let col_idx = rng.random_range(0..num_source_columns);
2041            set.insert(source_.get_name(col_idx).clone());
2042        }
2043        set.into_iter()
2044            .map(|name| PropRelationDescDiff::DropColumn { name })
2045            .collect::<Vec<_>>()
2046    });
2047
2048    let source_ = Rc::clone(&source);
2049    let toggle_nullability_strat =
2050        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2051            let mut set = BTreeSet::default();
2052            for _ in 0..num_columns {
2053                let col_idx = rng.random_range(0..num_source_columns);
2054                set.insert(source_.get_name(col_idx).clone());
2055            }
2056            set.into_iter()
2057                .map(|name| PropRelationDescDiff::ToggleNullability { name })
2058                .collect::<Vec<_>>()
2059        });
2060
2061    let source_ = Rc::clone(&source);
2062    let change_type_strat = (0..num_source_columns)
2063        .prop_perturb(move |num_columns, mut rng| {
2064            let mut set = BTreeSet::default();
2065            for _ in 0..num_columns {
2066                let col_idx = rng.random_range(0..num_source_columns);
2067                set.insert(source_.get_name(col_idx).clone());
2068            }
2069            set
2070        })
2071        .prop_flat_map(|cols| {
2072            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
2073                .prop_map(move |types| (cols.clone(), types))
2074        })
2075        .prop_map(|(cols, types)| {
2076            cols.into_iter()
2077                .zip_eq(types)
2078                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
2079                .collect::<Vec<_>>()
2080        });
2081
2082    (
2083        add_columns_strat,
2084        drop_columns_strat,
2085        toggle_nullability_strat,
2086        change_type_strat,
2087    )
2088        .prop_map(|(adds, drops, toggles, changes)| {
2089            adds.into_iter()
2090                .chain(drops)
2091                .chain(toggles)
2092                .chain(changes)
2093                .collect::<Vec<_>>()
2094        })
2095        .prop_shuffle()
2096        .boxed()
2097}
2098
2099#[cfg(test)]
2100mod tests {
2101    use super::*;
2102    use prost::Message;
2103
2104    #[mz_ore::test]
2105    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2106    fn smoktest_at_version() {
2107        let desc = RelationDesc::builder()
2108            .with_column("a", SqlScalarType::Bool.nullable(true))
2109            .with_column("z", SqlScalarType::String.nullable(false))
2110            .finish();
2111
2112        let mut versioned_desc = VersionedRelationDesc {
2113            inner: desc.clone(),
2114        };
2115        versioned_desc.validate();
2116
2117        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2118        assert_eq!(desc, latest);
2119
2120        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2121        assert_eq!(desc, v0);
2122
2123        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2124        assert_eq!(desc, v3);
2125
2126        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2127        assert_eq!(v1, RelationVersion(1));
2128
2129        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2130        insta::assert_json_snapshot!(v1.metadata, @r###"
2131        {
2132          "0": {
2133            "name": "a",
2134            "typ_idx": 0,
2135            "added": 0,
2136            "dropped": null
2137          },
2138          "1": {
2139            "name": "z",
2140            "typ_idx": 1,
2141            "added": 0,
2142            "dropped": null
2143          },
2144          "2": {
2145            "name": "b",
2146            "typ_idx": 2,
2147            "added": 1,
2148            "dropped": null
2149          }
2150        }
2151        "###);
2152
2153        // Check that V0 doesn't show the new column.
2154        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2155        assert!(v0.iter().eq(v0_b.iter()));
2156
2157        let v2 = versioned_desc.drop_column("z");
2158        assert_eq!(v2, RelationVersion(2));
2159
2160        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2161        insta::assert_json_snapshot!(v2.metadata, @r###"
2162        {
2163          "0": {
2164            "name": "a",
2165            "typ_idx": 0,
2166            "added": 0,
2167            "dropped": null
2168          },
2169          "2": {
2170            "name": "b",
2171            "typ_idx": 1,
2172            "added": 1,
2173            "dropped": null
2174          }
2175        }
2176        "###);
2177
2178        // Check that V0 and V1 are still correct.
2179        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2180        assert!(v0.iter().eq(v0_c.iter()));
2181
2182        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2183        assert!(v1.iter().eq(v1_b.iter()));
2184
2185        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2186        {
2187          "0": {
2188            "name": "a",
2189            "typ_idx": 0,
2190            "added": 0,
2191            "dropped": null
2192          },
2193          "1": {
2194            "name": "z",
2195            "typ_idx": 1,
2196            "added": 0,
2197            "dropped": 2
2198          },
2199          "2": {
2200            "name": "b",
2201            "typ_idx": 2,
2202            "added": 1,
2203            "dropped": null
2204          }
2205        }
2206        "###);
2207    }
2208
2209    #[mz_ore::test]
2210    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2211    fn test_dropping_columns_with_keys() {
2212        let desc = RelationDesc::builder()
2213            .with_column("a", SqlScalarType::Bool.nullable(true))
2214            .with_column("z", SqlScalarType::String.nullable(false))
2215            .with_key(vec![1])
2216            .finish();
2217
2218        let mut versioned_desc = VersionedRelationDesc {
2219            inner: desc.clone(),
2220        };
2221        versioned_desc.validate();
2222
2223        let v1 = versioned_desc.drop_column("a");
2224        assert_eq!(v1, RelationVersion(1));
2225
2226        // Make sure the key index for 'z' got remapped since 'a' was dropped.
2227        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2228        insta::assert_json_snapshot!(v1, @r###"
2229        {
2230          "typ": {
2231            "column_types": [
2232              {
2233                "scalar_type": "String",
2234                "nullable": false
2235              }
2236            ],
2237            "keys": [
2238              [
2239                0
2240              ]
2241            ]
2242          },
2243          "metadata": {
2244            "1": {
2245              "name": "z",
2246              "typ_idx": 0,
2247              "added": 0,
2248              "dropped": null
2249            }
2250          }
2251        }
2252        "###);
2253
2254        // Make sure the key index of 'z' is correct when all columns are present.
2255        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2256        insta::assert_json_snapshot!(v0, @r###"
2257        {
2258          "typ": {
2259            "column_types": [
2260              {
2261                "scalar_type": "Bool",
2262                "nullable": true
2263              },
2264              {
2265                "scalar_type": "String",
2266                "nullable": false
2267              }
2268            ],
2269            "keys": [
2270              [
2271                1
2272              ]
2273            ]
2274          },
2275          "metadata": {
2276            "0": {
2277              "name": "a",
2278              "typ_idx": 0,
2279              "added": 0,
2280              "dropped": 1
2281            },
2282            "1": {
2283              "name": "z",
2284              "typ_idx": 1,
2285              "added": 0,
2286              "dropped": null
2287            }
2288          }
2289        }
2290        "###);
2291    }
2292
2293    #[mz_ore::test]
2294    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2295    fn roundtrip_relation_desc_without_metadata() {
2296        let typ = ProtoRelationType {
2297            column_types: vec![
2298                SqlScalarType::String.nullable(false).into_proto(),
2299                SqlScalarType::Bool.nullable(true).into_proto(),
2300            ],
2301            keys: vec![],
2302        };
2303        let proto = ProtoRelationDesc {
2304            typ: Some(typ),
2305            names: vec![
2306                ColumnName("a".into()).into_proto(),
2307                ColumnName("b".into()).into_proto(),
2308            ],
2309            metadata: vec![],
2310        };
2311        let desc: RelationDesc = proto.into_rust().unwrap();
2312
2313        insta::assert_json_snapshot!(desc, @r###"
2314        {
2315          "typ": {
2316            "column_types": [
2317              {
2318                "scalar_type": "String",
2319                "nullable": false
2320              },
2321              {
2322                "scalar_type": "Bool",
2323                "nullable": true
2324              }
2325            ],
2326            "keys": []
2327          },
2328          "metadata": {
2329            "0": {
2330              "name": "a",
2331              "typ_idx": 0,
2332              "added": 0,
2333              "dropped": null
2334            },
2335            "1": {
2336              "name": "b",
2337              "typ_idx": 1,
2338              "added": 0,
2339              "dropped": null
2340            }
2341          }
2342        }
2343        "###);
2344    }
2345
2346    #[mz_ore::test]
2347    #[should_panic(expected = "column named 'a' already exists!")]
2348    fn test_add_column_with_same_name_panics() {
2349        let desc = RelationDesc::builder()
2350            .with_column("a", SqlScalarType::Bool.nullable(true))
2351            .finish();
2352        let mut versioned = VersionedRelationDesc::new(desc);
2353
2354        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2355    }
2356
2357    #[mz_ore::test]
2358    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2359    fn test_add_column_with_same_name_prev_dropped() {
2360        let desc = RelationDesc::builder()
2361            .with_column("a", SqlScalarType::Bool.nullable(true))
2362            .finish();
2363        let mut versioned = VersionedRelationDesc::new(desc);
2364
2365        let v1 = versioned.drop_column("a");
2366        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2367        insta::assert_json_snapshot!(v1, @r###"
2368        {
2369          "typ": {
2370            "column_types": [],
2371            "keys": []
2372          },
2373          "metadata": {}
2374        }
2375        "###);
2376
2377        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2378        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2379        insta::assert_json_snapshot!(v2, @r###"
2380        {
2381          "typ": {
2382            "column_types": [
2383              {
2384                "scalar_type": "String",
2385                "nullable": false
2386              }
2387            ],
2388            "keys": []
2389          },
2390          "metadata": {
2391            "1": {
2392              "name": "a",
2393              "typ_idx": 0,
2394              "added": 2,
2395              "dropped": null
2396            }
2397          }
2398        }
2399        "###);
2400    }
2401
2402    #[mz_ore::test]
2403    #[cfg_attr(miri, ignore)]
2404    fn apply_demand() {
2405        let desc = RelationDesc::builder()
2406            .with_column("a", SqlScalarType::String.nullable(true))
2407            .with_column("b", SqlScalarType::Int64.nullable(false))
2408            .with_column("c", SqlScalarType::Time.nullable(false))
2409            .finish();
2410        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2411        assert_eq!(desc.arity(), 2);
2412        // TODO(parkmycar): Move validate onto RelationDesc.
2413        VersionedRelationDesc::new(desc).validate();
2414    }
2415
2416    #[mz_ore::test]
2417    #[cfg_attr(miri, ignore)]
2418    fn smoketest_column_index_stable_ident() {
2419        let idx_a = ColumnIndex(42);
2420        // Note(parkmycar): This should never change.
2421        assert_eq!(idx_a.to_stable_name(), "42");
2422    }
2423
2424    #[mz_ore::test]
2425    #[cfg_attr(miri, ignore)] // too slow
2426    fn proptest_relation_desc_roundtrips() {
2427        fn testcase(og: RelationDesc) {
2428            let bytes = og.into_proto().encode_to_vec();
2429            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2430            let rnd = RelationDesc::from_proto(proto).unwrap();
2431
2432            assert_eq!(og, rnd);
2433        }
2434
2435        proptest!(|(desc in any::<RelationDesc>())| {
2436            testcase(desc);
2437        });
2438
2439        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2440            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2441        });
2442
2443        proptest!(|((mut desc, diffs) in strat)| {
2444            for diff in diffs {
2445                diff.apply(&mut desc);
2446            };
2447            testcase(desc);
2448        });
2449    }
2450}