Skip to main content

mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11#[cfg(any(test, feature = "proptest"))]
12use std::rc::Rc;
13use std::{fmt, vec};
14
15use anyhow::bail;
16use itertools::Itertools;
17use mz_lowertest::MzReflect;
18use mz_ore::cast::CastFrom;
19use mz_ore::soft_panic_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::{assert_none, assert_ok};
22use mz_persist_types::schema::SchemaId;
23use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
24#[cfg(any(test, feature = "proptest"))]
25use proptest::prelude::*;
26#[cfg(any(test, feature = "proptest"))]
27use proptest::strategy::{Strategy, Union};
28#[cfg(any(test, feature = "proptest"))]
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32#[cfg(any(test, feature = "proptest"))]
33use crate::Row;
34#[cfg(any(test, feature = "proptest"))]
35use crate::arb_datum_for_column;
36use crate::relation_and_scalar::proto_relation_type::ProtoKey;
37pub use crate::relation_and_scalar::{
38    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
39    ProtoRelationVersion,
40};
41use crate::{Datum, ReprScalarType, SqlScalarType};
42
43/// The type of a [`Datum`].
44///
45/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
46/// Int32 or String) with its nullability.
47///
48/// To construct a column type, either initialize the struct directly, or
49/// use the [`SqlScalarType::nullable`] method.
50#[derive(
51    Clone,
52    Debug,
53    Eq,
54    PartialEq,
55    Ord,
56    PartialOrd,
57    Serialize,
58    Deserialize,
59    Hash,
60    MzReflect
61)]
62#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
63pub struct SqlColumnType {
64    /// The underlying scalar type (e.g., Int32 or String) of this column.
65    pub scalar_type: SqlScalarType,
66    /// Whether this datum can be null.
67    #[serde(default = "return_true")]
68    pub nullable: bool,
69}
70
71/// This method exists solely for the purpose of making SqlColumnType nullable by
72/// default in unit tests. The default value of a bool is false, and the only
73/// way to make an object take on any other value by default is to pass it a
74/// function that returns the desired default value. See
75/// <https://github.com/serde-rs/serde/issues/1030>
76#[inline(always)]
77fn return_true() -> bool {
78    true
79}
80
81impl SqlColumnType {
82    /// Compute the least upper bound of many column types, returning an error on
83    /// incompatible types or an empty iterator.
84    /// See [`SqlColumnType::try_union`] for details.
85    pub fn try_union_many<'a>(
86        typs: impl IntoIterator<Item = &'a Self>,
87    ) -> Result<Self, anyhow::Error> {
88        let mut iter = typs.into_iter();
89        let Some(typ) = iter.next() else {
90            bail!("Cannot union empty iterator");
91        };
92        iter.try_fold(typ.clone(), |a, b| a.try_union(b))
93    }
94
95    /// Compute the least upper bound of many column types.
96    /// See [`SqlColumnType::try_union`] for details.
97    ///
98    /// Panics on incompatible types or an empty iterator.
99    pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
100        Self::try_union_many(typs).expect("Cannot union empty iterator")
101    }
102
103    /// Backports nullability information from `backport_typ` into `self`,
104    /// affecting the outer `.nullable` field but also record fields deeper
105    /// into the type.
106    pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
107        self.scalar_type
108            .backport_nullability(&backport_typ.scalar_type);
109        self.nullable = backport_typ.nullable;
110    }
111
112    /// Compute the least upper bound of two column types at the SQL level.
113    ///
114    /// Two types are compatible when they are equal, share the same base type
115    /// (differing only in modifiers), or are records with pairwise-compatible
116    /// fields.
117    /// The resulting nullability is the disjunction of the two input
118    /// nullabilities.
119    ///
120    /// Returns an error for incompatible types, e.g. `Text` and `Int32`, or
121    /// `Text` and `VarChar` (different base types at the SQL level).
122    /// See [`SqlColumnType::try_union`] for a fallback that handles the latter
123    /// case via repr-level union.
124    pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
125        match (&self.scalar_type, &other.scalar_type) {
126            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
127                Ok(SqlColumnType {
128                    scalar_type: scalar_type.clone(),
129                    nullable: self.nullable || other.nullable,
130                })
131            }
132            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
133                Ok(SqlColumnType {
134                    scalar_type: scalar_type.without_modifiers(),
135                    nullable: self.nullable || other.nullable,
136                })
137            }
138            (
139                SqlScalarType::Record { fields, custom_id },
140                SqlScalarType::Record {
141                    fields: other_fields,
142                    custom_id: other_custom_id,
143                },
144            ) => {
145                if custom_id != other_custom_id {
146                    bail!(
147                        "Can't union types: {:?} and {:?}",
148                        self.scalar_type,
149                        other.scalar_type
150                    );
151                };
152
153                if fields.len() != other_fields.len() {
154                    bail!(
155                        "Can't union types: {:?} and {:?}",
156                        self.scalar_type,
157                        other.scalar_type
158                    );
159                }
160                let mut union_fields = Vec::with_capacity(fields.len());
161                for ((name, typ), (other_name, other_typ)) in
162                    fields.iter().zip_eq(other_fields.iter())
163                {
164                    if name != other_name {
165                        bail!(
166                            "Can't union types: {:?} and {:?}",
167                            self.scalar_type,
168                            other.scalar_type
169                        );
170                    } else {
171                        let union_column_type = typ.sql_union(other_typ)?;
172                        union_fields.push((name.clone(), union_column_type));
173                    };
174                }
175
176                Ok(SqlColumnType {
177                    scalar_type: SqlScalarType::Record {
178                        fields: union_fields.into(),
179                        custom_id: *custom_id,
180                    },
181                    nullable: self.nullable || other.nullable,
182                })
183            }
184            _ => bail!(
185                "Can't union types: {:?} and {:?}",
186                self.scalar_type,
187                other.scalar_type
188            ),
189        }
190    }
191
192    /// Compute the least upper bound of two column types.
193    ///
194    /// Attempts [`SqlColumnType::sql_union`] first, which preserves SQL-level type
195    /// information (e.g. modifiers). Falls back to a repr-level union via
196    /// [`ReprColumnType::union`] when the SQL types are incompatible but the
197    /// underlying repr types are compatible.
198    ///
199    /// The resulting nullability is the disjunction of the two input
200    /// nullabilities.
201    pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
202        self.sql_union(other).or_else(|e| {
203            let repr_self = ReprColumnType::from(self);
204            let repr_other = ReprColumnType::from(other);
205            match repr_self.union(&repr_other) {
206                Ok(typ) => {
207                    // sql_union failed but repr union succeeded — this indicates
208                    // a repr-type canonicalization gap that we want CI visibility for.
209                    soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
210                    Ok(SqlColumnType::from_repr(&typ))
211                }
212                Err(_) => {
213                    // Both sql_union and repr union failed — genuine type mismatch,
214                    // not a canonicalization issue. Just propagate the original error.
215                    Err(e)
216                }
217            }
218        })
219    }
220
221    /// Compute the least upper bound of two column types.
222    /// See [`SqlColumnType::try_union`] for details.
223    ///
224    /// Panics on incompatible types.
225    pub fn union(&self, other: &Self) -> Self {
226        self.try_union(other).unwrap_or_else(|e| {
227            panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
228        })
229    }
230
231    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
232    /// nullability set to the specified boolean.
233    pub fn nullable(mut self, nullable: bool) -> Self {
234        self.nullable = nullable;
235        self
236    }
237}
238
239impl RustType<ProtoColumnType> for SqlColumnType {
240    fn into_proto(&self) -> ProtoColumnType {
241        ProtoColumnType {
242            nullable: self.nullable,
243            scalar_type: Some(self.scalar_type.into_proto()),
244        }
245    }
246
247    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
248        Ok(SqlColumnType {
249            nullable: proto.nullable,
250            scalar_type: proto
251                .scalar_type
252                .into_rust_if_some("ProtoColumnType::scalar_type")?,
253        })
254    }
255}
256
257impl fmt::Display for SqlColumnType {
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        let nullable = if self.nullable { "Null" } else { "NotNull" };
260        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
261    }
262}
263
264/// The type of a relation.
265#[derive(
266    Clone,
267    Debug,
268    Eq,
269    PartialEq,
270    Ord,
271    PartialOrd,
272    Serialize,
273    Deserialize,
274    Hash,
275    MzReflect
276)]
277#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
278pub struct SqlRelationType {
279    /// The type for each column, in order.
280    pub column_types: Vec<SqlColumnType>,
281    /// Sets of indices that are "keys" for the collection.
282    ///
283    /// Each element in this list is a set of column indices, each with the
284    /// property that the collection contains at most one record with each
285    /// distinct set of values for each column. Alternately, for a specific set
286    /// of values assigned to the these columns there is at most one record.
287    ///
288    /// A collection can contain multiple sets of keys, although it is common to
289    /// have either zero or one sets of key indices.
290    #[serde(default)]
291    pub keys: Vec<Vec<usize>>,
292}
293
294impl SqlRelationType {
295    /// Constructs a `SqlRelationType` representing the relation with no columns and
296    /// no keys.
297    pub fn empty() -> Self {
298        SqlRelationType::new(vec![])
299    }
300
301    /// Constructs a new `SqlRelationType` from specified column types.
302    ///
303    /// The `SqlRelationType` will have no keys.
304    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
305        SqlRelationType {
306            column_types,
307            keys: Vec::new(),
308        }
309    }
310
311    /// Adds a new key for the relation.
312    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
313        indices.sort_unstable();
314        if !self.keys.contains(&indices) {
315            self.keys.push(indices);
316        }
317        self
318    }
319
320    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
321        for key in keys {
322            self = self.with_key(key)
323        }
324        self
325    }
326
327    /// Computes the number of columns in the relation.
328    pub fn arity(&self) -> usize {
329        self.column_types.len()
330    }
331
332    /// Gets the index of the columns used when creating a default index.
333    pub fn default_key(&self) -> Vec<usize> {
334        if let Some(key) = self.keys.first() {
335            if key.is_empty() {
336                (0..self.column_types.len()).collect()
337            } else {
338                key.clone()
339            }
340        } else {
341            (0..self.column_types.len()).collect()
342        }
343    }
344
345    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
346    pub fn columns(&self) -> &[SqlColumnType] {
347        &self.column_types
348    }
349
350    /// Adopts the nullability and keys from another `SqlRelationType`.
351    ///
352    /// Panics if the number of columns does not match.
353    pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
354        assert_eq!(
355            backport_typ.column_types.len(),
356            self.column_types.len(),
357            "HIR and MIR types should have the same number of columns"
358        );
359        for (backport_col, sql_col) in backport_typ
360            .column_types
361            .iter()
362            .zip_eq(self.column_types.iter_mut())
363        {
364            sql_col.backport_nullability(backport_col);
365        }
366
367        self.keys = backport_typ.keys.clone();
368    }
369
370    /// Constructs a `SqlRelationType` from a `ReprRelationType` by converting
371    /// each column type via [`SqlColumnType::from_repr`]. This is a lossy
372    /// inverse of `ReprRelationType::from(&SqlRelationType)`.
373    pub fn from_repr(repr: &ReprRelationType) -> Self {
374        SqlRelationType {
375            column_types: repr
376                .column_types
377                .iter()
378                .map(SqlColumnType::from_repr)
379                .collect(),
380            keys: repr.keys.clone(),
381        }
382    }
383}
384
385impl RustType<ProtoRelationType> for SqlRelationType {
386    fn into_proto(&self) -> ProtoRelationType {
387        ProtoRelationType {
388            column_types: self.column_types.into_proto(),
389            keys: self.keys.into_proto(),
390        }
391    }
392
393    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
394        Ok(SqlRelationType {
395            column_types: proto.column_types.into_rust()?,
396            keys: proto.keys.into_rust()?,
397        })
398    }
399}
400
401impl RustType<ProtoKey> for Vec<usize> {
402    fn into_proto(&self) -> ProtoKey {
403        ProtoKey {
404            keys: self.into_proto(),
405        }
406    }
407
408    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
409        proto.keys.into_rust()
410    }
411}
412
413/// The type of a relation.
414#[derive(
415    Clone,
416    Debug,
417    Eq,
418    PartialEq,
419    Ord,
420    PartialOrd,
421    Serialize,
422    Deserialize,
423    Hash,
424    MzReflect
425)]
426pub struct ReprRelationType {
427    /// The type for each column, in order.
428    pub column_types: Vec<ReprColumnType>,
429    /// Sets of indices that are "keys" for the collection.
430    ///
431    /// Each element in this list is a set of column indices, each with the
432    /// property that the collection contains at most one record with each
433    /// distinct set of values for each column. Alternately, for a specific set
434    /// of values assigned to the these columns there is at most one record.
435    ///
436    /// A collection can contain multiple sets of keys, although it is common to
437    /// have either zero or one sets of key indices.
438    #[serde(default)]
439    pub keys: Vec<Vec<usize>>,
440}
441
442impl ReprRelationType {
443    /// Constructs a `ReprRelationType` representing the relation with no columns and
444    /// no keys.
445    pub fn empty() -> Self {
446        ReprRelationType::new(vec![])
447    }
448
449    /// Constructs a new `ReprRelationType` from specified column types.
450    ///
451    /// The `ReprRelationType` will have no keys.
452    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
453        ReprRelationType {
454            column_types,
455            keys: Vec::new(),
456        }
457    }
458
459    /// Adds a new key for the relation.
460    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
461        indices.sort_unstable();
462        if !self.keys.contains(&indices) {
463            self.keys.push(indices);
464        }
465        self
466    }
467
468    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
469        for key in keys {
470            self = self.with_key(key)
471        }
472        self
473    }
474
475    /// Computes the number of columns in the relation.
476    pub fn arity(&self) -> usize {
477        self.column_types.len()
478    }
479
480    /// Gets the index of the columns used when creating a default index.
481    pub fn default_key(&self) -> Vec<usize> {
482        if let Some(key) = self.keys.first() {
483            if key.is_empty() {
484                (0..self.column_types.len()).collect()
485            } else {
486                key.clone()
487            }
488        } else {
489            (0..self.column_types.len()).collect()
490        }
491    }
492
493    /// Returns all the column types in order, for this relation.
494    pub fn columns(&self) -> &[ReprColumnType] {
495        &self.column_types
496    }
497}
498
499impl From<&SqlRelationType> for ReprRelationType {
500    fn from(sql_relation_type: &SqlRelationType) -> Self {
501        ReprRelationType {
502            column_types: sql_relation_type
503                .column_types
504                .iter()
505                .map(ReprColumnType::from)
506                .collect(),
507            keys: sql_relation_type.keys.clone(),
508        }
509    }
510}
511
512#[derive(
513    Clone,
514    Debug,
515    Eq,
516    PartialEq,
517    Ord,
518    PartialOrd,
519    Serialize,
520    Deserialize,
521    Hash,
522    MzReflect
523)]
524pub struct ReprColumnType {
525    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
526    pub scalar_type: ReprScalarType,
527    /// Whether this datum can be null.
528    #[serde(default = "return_true")]
529    pub nullable: bool,
530}
531
532impl std::fmt::Display for ReprColumnType {
533    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534        write!(f, "{}", self.scalar_type)?;
535        if self.nullable {
536            write!(f, "?")?;
537        }
538        Ok(())
539    }
540}
541
542impl ReprColumnType {
543    /// Compute the least upper bound of two column types at the repr level.
544    ///
545    /// More permissive than [`SqlColumnType::sql_union`] because it operates
546    /// on the underlying representation types, ignoring SQL-level distinctions
547    /// such as modifiers.
548    /// The resulting nullability is the disjunction of the two inputs.
549    pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
550        let scalar_type = self.scalar_type.union(&col.scalar_type)?;
551        let nullable = self.nullable || col.nullable;
552
553        Ok(ReprColumnType {
554            scalar_type,
555            nullable,
556        })
557    }
558}
559
560impl From<&SqlColumnType> for ReprColumnType {
561    fn from(sql_column_type: &SqlColumnType) -> Self {
562        let scalar_type = &sql_column_type.scalar_type;
563        let scalar_type = scalar_type.into();
564        let nullable = sql_column_type.nullable;
565
566        ReprColumnType {
567            scalar_type,
568            nullable,
569        }
570    }
571}
572
573impl SqlColumnType {
574    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
575    ///
576    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
577    pub fn from_repr(repr: &ReprColumnType) -> Self {
578        let scalar_type = &repr.scalar_type;
579        let scalar_type = SqlScalarType::from_repr(scalar_type);
580        let nullable = repr.nullable;
581
582        SqlColumnType {
583            scalar_type,
584            nullable,
585        }
586    }
587}
588
589/// The name of a column in a [`RelationDesc`].
590#[derive(
591    Clone,
592    Debug,
593    Eq,
594    PartialEq,
595    Ord,
596    PartialOrd,
597    Serialize,
598    Deserialize,
599    Hash,
600    MzReflect
601)]
602pub struct ColumnName(Box<str>);
603
604impl ColumnName {
605    /// Returns this column name as a `str`.
606    #[inline(always)]
607    pub fn as_str(&self) -> &str {
608        &*self
609    }
610
611    /// Returns this column name as a `&mut Box<str>`.
612    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
613        &mut self.0
614    }
615
616    /// Returns if this [`ColumnName`] is similar to the provided one.
617    pub fn is_similar(&self, other: &ColumnName) -> bool {
618        const SIMILARITY_THRESHOLD: f64 = 0.6;
619
620        let a_lowercase = self.to_lowercase();
621        let b_lowercase = other.to_lowercase();
622
623        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
624    }
625}
626
627impl std::ops::Deref for ColumnName {
628    type Target = str;
629
630    #[inline(always)]
631    fn deref(&self) -> &Self::Target {
632        &self.0
633    }
634}
635
636impl fmt::Display for ColumnName {
637    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
638        f.write_str(&self.0)
639    }
640}
641
642impl From<String> for ColumnName {
643    fn from(s: String) -> ColumnName {
644        ColumnName(s.into())
645    }
646}
647
648impl From<&str> for ColumnName {
649    fn from(s: &str) -> ColumnName {
650        ColumnName(s.into())
651    }
652}
653
654impl From<&ColumnName> for ColumnName {
655    fn from(n: &ColumnName) -> ColumnName {
656        n.clone()
657    }
658}
659
660impl RustType<ProtoColumnName> for ColumnName {
661    fn into_proto(&self) -> ProtoColumnName {
662        ProtoColumnName {
663            value: Some(self.0.to_string()),
664        }
665    }
666
667    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
668        Ok(ColumnName(
669            proto
670                .value
671                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
672                .into(),
673        ))
674    }
675}
676
677impl From<ColumnName> for mz_sql_parser::ast::Ident {
678    fn from(value: ColumnName) -> Self {
679        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
680        mz_sql_parser::ast::Ident::new_unchecked(value.0)
681    }
682}
683
684#[cfg(any(test, feature = "proptest"))]
685impl proptest::arbitrary::Arbitrary for ColumnName {
686    type Parameters = ();
687    type Strategy = BoxedStrategy<ColumnName>;
688
689    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
690        // Long column names are generally uninteresting, and can greatly
691        // increase the runtime for a test case, so bound the max length.
692        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
693        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
694            weights.extend([
695                (5, Just(16..128)),
696                (1, Just(128..1024)),
697                (1, Just(1024..4096)),
698            ]);
699        }
700        let name_length = Union::new_weighted(weights);
701
702        // Non-ASCII characters are also generally uninteresting and can make
703        // debugging harder.
704        let char_strat = Rc::new(Union::new_weighted(vec![
705            (50, proptest::char::range('A', 'z').boxed()),
706            (1, any::<char>().boxed()),
707        ]));
708
709        name_length
710            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
711            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
712            .no_shrink()
713            .boxed()
714    }
715}
716
717/// Default name of a column (when no other information is known).
718pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
719
720/// Stable index of a column in a [`RelationDesc`].
721#[derive(
722    Clone,
723    Copy,
724    Debug,
725    Eq,
726    PartialEq,
727    PartialOrd,
728    Ord,
729    Serialize,
730    Deserialize,
731    Hash,
732    MzReflect
733)]
734pub struct ColumnIndex(usize);
735
736#[cfg(any(test, feature = "proptest"))]
737static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
738
739impl ColumnIndex {
740    /// Returns a stable identifier for this [`ColumnIndex`].
741    pub fn to_stable_name(&self) -> String {
742        self.0.to_string()
743    }
744
745    pub fn to_raw(&self) -> usize {
746        self.0
747    }
748
749    pub fn from_raw(val: usize) -> Self {
750        ColumnIndex(val)
751    }
752}
753
754/// The version a given column was added at.
755#[derive(
756    Clone,
757    Copy,
758    Debug,
759    Eq,
760    PartialEq,
761    PartialOrd,
762    Ord,
763    Serialize,
764    Deserialize,
765    Hash,
766    MzReflect
767)]
768#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
769pub struct RelationVersion(u64);
770
771impl RelationVersion {
772    /// Returns the "root" or "initial" version of a [`RelationDesc`].
773    pub fn root() -> Self {
774        RelationVersion(0)
775    }
776
777    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
778    pub fn bump(&self) -> Self {
779        let next_version = self
780            .0
781            .checked_add(1)
782            .expect("added more than u64::MAX columns?");
783        RelationVersion(next_version)
784    }
785
786    /// Consume a [`RelationVersion`] returning the raw value.
787    ///
788    /// Should __only__ be used for serialization.
789    pub fn into_raw(self) -> u64 {
790        self.0
791    }
792
793    /// Create a [`RelationVersion`] from a raw value.
794    ///
795    /// Should __only__ be used for serialization.
796    pub fn from_raw(val: u64) -> RelationVersion {
797        RelationVersion(val)
798    }
799}
800
801impl From<RelationVersion> for SchemaId {
802    fn from(value: RelationVersion) -> Self {
803        SchemaId(usize::cast_from(value.0))
804    }
805}
806
807impl From<mz_sql_parser::ast::Version> for RelationVersion {
808    fn from(value: mz_sql_parser::ast::Version) -> Self {
809        RelationVersion(value.into_inner())
810    }
811}
812
813impl fmt::Display for RelationVersion {
814    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815        write!(f, "v{}", self.0)
816    }
817}
818
819impl From<RelationVersion> for mz_sql_parser::ast::Version {
820    fn from(value: RelationVersion) -> Self {
821        mz_sql_parser::ast::Version::new(value.0)
822    }
823}
824
825impl RustType<ProtoRelationVersion> for RelationVersion {
826    fn into_proto(&self) -> ProtoRelationVersion {
827        ProtoRelationVersion { value: self.0 }
828    }
829
830    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
831        Ok(RelationVersion(proto.value))
832    }
833}
834
835/// Semantic type annotation for a column in a builtin catalog relation.
836///
837/// These are compile-time metadata used by the catalog ontology layer to
838/// describe the meaning of a column (e.g., that it contains a catalog item ID
839/// or a role ID). Possible values correspond to the entries in
840/// `SEMANTIC_TYPE_DEFS` in the `mz-catalog` crate.
841#[derive(
842    Clone,
843    Copy,
844    Debug,
845    PartialEq,
846    Eq,
847    PartialOrd,
848    Ord,
849    Hash,
850    serde::Serialize
851)]
852pub enum SemanticType {
853    CatalogItemId,
854    GlobalId,
855    ClusterId,
856    ReplicaId,
857    SchemaId,
858    DatabaseId,
859    RoleId,
860    NetworkPolicyId,
861    ShardId,
862    OID,
863    ObjectType,
864    ConnectionType,
865    SourceType,
866    MzTimestamp,
867    WallclockTimestamp,
868    ByteCount,
869    RecordCount,
870    CreditRate,
871    SqlDefinition,
872    RedactedSqlDefinition,
873}
874
875impl fmt::Display for SemanticType {
876    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877        let s = match self {
878            SemanticType::CatalogItemId => "CatalogItemId",
879            SemanticType::GlobalId => "GlobalId",
880            SemanticType::ClusterId => "ClusterId",
881            SemanticType::ReplicaId => "ReplicaId",
882            SemanticType::SchemaId => "SchemaId",
883            SemanticType::DatabaseId => "DatabaseId",
884            SemanticType::RoleId => "RoleId",
885            SemanticType::NetworkPolicyId => "NetworkPolicyId",
886            SemanticType::ShardId => "ShardId",
887            SemanticType::OID => "OID",
888            SemanticType::ObjectType => "ObjectType",
889            SemanticType::ConnectionType => "ConnectionType",
890            SemanticType::SourceType => "SourceType",
891            SemanticType::MzTimestamp => "MzTimestamp",
892            SemanticType::WallclockTimestamp => "WallclockTimestamp",
893            SemanticType::ByteCount => "ByteCount",
894            SemanticType::RecordCount => "RecordCount",
895            SemanticType::CreditRate => "CreditRate",
896            SemanticType::SqlDefinition => "SqlDefinition",
897            SemanticType::RedactedSqlDefinition => "RedactedSqlDefinition",
898        };
899        f.write_str(s)
900    }
901}
902
903/// Metadata (other than type) for a column in a [`RelationDesc`].
904#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
905struct ColumnMetadata {
906    /// Name of the column.
907    name: ColumnName,
908    /// Index into a [`SqlRelationType`] for this column.
909    typ_idx: usize,
910    /// Version this column was added at.
911    added: RelationVersion,
912    /// Version this column was dropped at.
913    dropped: Option<RelationVersion>,
914}
915
916/// A description of the shape of a relation.
917///
918/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
919/// the relation.
920///
921/// # Examples
922///
923/// A `RelationDesc`s is typically constructed via its builder API:
924///
925/// ```
926/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
927///
928/// let desc = RelationDesc::builder()
929///     .with_column("id", SqlScalarType::Int64.nullable(false))
930///     .with_column("price", SqlScalarType::Float64.nullable(true))
931///     .finish();
932/// ```
933///
934/// In more complicated cases, like when constructing a `RelationDesc` in
935/// response to user input, it may be more convenient to construct a relation
936/// type first, and imbue it with column names to form a `RelationDesc` later:
937///
938/// ```
939/// use mz_repr::RelationDesc;
940///
941/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
942/// let relation_type = plan_query("SELECT * FROM table");
943/// let names = (0..relation_type.arity()).map(|i| match i {
944///     0 => "first",
945///     1 => "second",
946///     _ => "unknown",
947/// });
948/// let desc = RelationDesc::new(relation_type, names);
949/// ```
950///
951/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
952/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
953/// column throughout the lifetime of the relation. This allows a
954/// [`RelationDesc`] to represent a projection over a version of itself.
955///
956/// ```
957/// use std::collections::BTreeSet;
958/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
959///
960/// let desc = RelationDesc::builder()
961///     .with_column("name", SqlScalarType::String.nullable(false))
962///     .with_column("email", SqlScalarType::String.nullable(false))
963///     .finish();
964///
965/// // Project away the second column.
966/// let demands = BTreeSet::from([1]);
967/// let proj = desc.apply_demand(&demands);
968///
969/// // We projected away the first column.
970/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
971/// // But retained the second.
972/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
973///
974/// // The underlying `SqlRelationType` also contains a single column.
975/// assert_eq!(proj.typ().arity(), 1);
976/// ```
977///
978/// To maintain this stable mapping and track the lifetime of a column (e.g.
979/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
980/// the index in [`SqlRelationType`] that corresponds to a given column, and the
981/// version at which this column was added or dropped.
982///
983#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, MzReflect)]
984pub struct RelationDesc {
985    typ: SqlRelationType,
986    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
987}
988
989impl RustType<ProtoRelationDesc> for RelationDesc {
990    fn into_proto(&self) -> ProtoRelationDesc {
991        let (names, metadata): (Vec<_>, Vec<_>) = self
992            .metadata
993            .values()
994            .map(|meta| {
995                let metadata = ProtoColumnMetadata {
996                    added: Some(meta.added.into_proto()),
997                    dropped: meta.dropped.map(|v| v.into_proto()),
998                };
999                (meta.name.into_proto(), metadata)
1000            })
1001            .unzip();
1002
1003        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
1004        // metadata field was added. To make sure our serialization roundtrips the same as before
1005        // we added the field, we omit `metadata` if all of the values are equal to the default.
1006        //
1007        // Note: This logic needs to exist approximately forever.
1008        let is_all_default_metadata = metadata.iter().all(|meta| {
1009            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
1010        });
1011        let metadata = if is_all_default_metadata {
1012            Vec::new()
1013        } else {
1014            metadata
1015        };
1016
1017        ProtoRelationDesc {
1018            typ: Some(self.typ.into_proto()),
1019            names,
1020            metadata,
1021        }
1022    }
1023
1024    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
1025        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
1026        // metadata field was added. If the field doesn't exist we fill it in with default values,
1027        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
1028        //
1029        // Note: This logic needs to exist approximately forever.
1030        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
1031            let val = ProtoColumnMetadata {
1032                added: Some(RelationVersion::root().into_proto()),
1033                dropped: None,
1034            };
1035            Box::new(itertools::repeat_n(val, proto.names.len()))
1036        } else {
1037            Box::new(proto.metadata.into_iter())
1038        };
1039
1040        let metadata = proto
1041            .names
1042            .into_iter()
1043            .zip_eq(proto_metadata)
1044            .enumerate()
1045            .map(|(idx, (name, metadata))| {
1046                let meta = ColumnMetadata {
1047                    name: name.into_rust()?,
1048                    typ_idx: idx,
1049                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
1050                    dropped: metadata.dropped.into_rust()?,
1051                };
1052                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
1053            })
1054            .collect::<Result<_, _>>()?;
1055
1056        Ok(RelationDesc {
1057            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
1058            metadata,
1059        })
1060    }
1061}
1062
1063impl RelationDesc {
1064    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
1065    pub fn builder() -> RelationDescBuilder {
1066        RelationDescBuilder::default()
1067    }
1068
1069    /// Constructs a new `RelationDesc` that represents the empty relation
1070    /// with no columns and no keys.
1071    pub fn empty() -> Self {
1072        RelationDesc {
1073            typ: SqlRelationType::empty(),
1074            metadata: BTreeMap::default(),
1075        }
1076    }
1077
1078    /// Check if the `RelationDesc` is empty.
1079    pub fn is_empty(&self) -> bool {
1080        self == &Self::empty()
1081    }
1082
1083    /// Returns the number of columns in this [`RelationDesc`].
1084    pub fn len(&self) -> usize {
1085        self.typ().column_types.len()
1086    }
1087
1088    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
1089    /// over column names.
1090    ///
1091    /// # Panics
1092    ///
1093    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
1094    /// items in `names`.
1095    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1096    where
1097        I: IntoIterator<Item = N>,
1098        N: Into<ColumnName>,
1099    {
1100        let metadata: BTreeMap<_, _> = names
1101            .into_iter()
1102            .enumerate()
1103            .map(|(idx, name)| {
1104                let col_idx = ColumnIndex(idx);
1105                let metadata = ColumnMetadata {
1106                    name: name.into(),
1107                    typ_idx: idx,
1108                    added: RelationVersion::root(),
1109                    dropped: None,
1110                };
1111                (col_idx, metadata)
1112            })
1113            .collect();
1114
1115        // TODO(parkmycar): Add better validation here.
1116        assert_eq!(typ.column_types.len(), metadata.len());
1117
1118        RelationDesc { typ, metadata }
1119    }
1120
1121    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1122    where
1123        I: IntoIterator<Item = (N, T)>,
1124        T: Into<SqlColumnType>,
1125        N: Into<ColumnName>,
1126    {
1127        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1128        let types = types.into_iter().map(Into::into).collect();
1129        let typ = SqlRelationType::new(types);
1130        Self::new(typ, names)
1131    }
1132
1133    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
1134    ///
1135    /// # Panics
1136    ///
1137    /// Panics if either `self` or `other` have columns that were added at a
1138    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1139    /// columns were dropped.
1140    ///
1141    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
1142    pub fn concat(mut self, other: Self) -> Self {
1143        let self_len = self.typ.column_types.len();
1144
1145        for (typ, (_col_idx, meta)) in other.typ.column_types.into_iter().zip_eq(other.metadata) {
1146            assert_eq!(meta.added, RelationVersion::root());
1147            assert_none!(meta.dropped);
1148
1149            let new_idx = self.typ.columns().len();
1150            let new_meta = ColumnMetadata {
1151                name: meta.name,
1152                typ_idx: new_idx,
1153                added: RelationVersion::root(),
1154                dropped: None,
1155            };
1156
1157            self.typ.column_types.push(typ);
1158            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1159
1160            assert_eq!(self.metadata.len(), self.typ.columns().len());
1161            assert_none!(prev);
1162        }
1163
1164        for k in other.typ.keys {
1165            let k = k.into_iter().map(|idx| idx + self_len).collect();
1166            self = self.with_key(k);
1167        }
1168        self
1169    }
1170
1171    /// Adds a new key for the relation.
1172    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1173        self.typ = self.typ.with_key(indices);
1174        self
1175    }
1176
1177    /// Drops all existing keys.
1178    pub fn without_keys(mut self) -> Self {
1179        self.typ.keys.clear();
1180        self
1181    }
1182
1183    /// Builds a new relation description with the column names replaced with
1184    /// new names.
1185    ///
1186    /// # Panics
1187    ///
1188    /// Panics if the arity of the relation type does not match the number of
1189    /// items in `names`.
1190    pub fn with_names<I, N>(self, names: I) -> Self
1191    where
1192        I: IntoIterator<Item = N>,
1193        N: Into<ColumnName>,
1194    {
1195        Self::new(self.typ, names)
1196    }
1197
1198    /// Computes the number of columns in the relation.
1199    pub fn arity(&self) -> usize {
1200        self.typ.arity()
1201    }
1202
1203    /// Returns the relation type underlying this relation description.
1204    pub fn typ(&self) -> &SqlRelationType {
1205        &self.typ
1206    }
1207
1208    /// Returns the owned relation type underlying this relation description.
1209    pub fn into_typ(self) -> SqlRelationType {
1210        self.typ
1211    }
1212
1213    /// Returns an iterator over the columns in this relation.
1214    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1215        self.metadata.values().map(|meta| {
1216            let typ = &self.typ.columns()[meta.typ_idx];
1217            (&meta.name, typ)
1218        })
1219    }
1220
1221    /// Returns an iterator over the types of the columns in this relation.
1222    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1223        self.typ.column_types.iter()
1224    }
1225
1226    /// Returns an iterator over the names of the columns in this relation.
1227    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1228        self.metadata.values().map(|meta| &meta.name)
1229    }
1230
1231    /// Returns an iterator over the columns in this relation, with all their metadata.
1232    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1233        self.metadata.iter().map(|(col_idx, metadata)| {
1234            let col_typ = &self.typ.columns()[metadata.typ_idx];
1235            (col_idx, &metadata.name, col_typ)
1236        })
1237    }
1238
1239    /// Returns an iterator over the names of the columns in this relation that are "similar" to
1240    /// the provided `name`.
1241    pub fn iter_similar_names<'a>(
1242        &'a self,
1243        name: &'a ColumnName,
1244    ) -> impl Iterator<Item = &'a ColumnName> {
1245        self.iter_names().filter(|n| n.is_similar(name))
1246    }
1247
1248    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
1249    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1250        self.metadata.contains_key(idx)
1251    }
1252
1253    /// Finds a column by name.
1254    ///
1255    /// Returns the index and type of the column named `name`. If no column with
1256    /// the specified name exists, returns `None`. If multiple columns have the
1257    /// specified name, the leftmost column is returned.
1258    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1259        self.iter_names()
1260            .position(|n| n == name)
1261            .map(|i| (i, &self.typ.column_types[i]))
1262    }
1263
1264    /// Gets the name of the `i`th column.
1265    ///
1266    /// # Panics
1267    ///
1268    /// Panics if `i` is not a valid column index.
1269    ///
1270    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
1271    pub fn get_name(&self, i: usize) -> &ColumnName {
1272        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1273        self.get_name_idx(&ColumnIndex(i))
1274    }
1275
1276    /// Gets the name of the column at `idx`.
1277    ///
1278    /// # Panics
1279    ///
1280    /// Panics if no column exists at `idx`.
1281    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1282        &self.metadata.get(idx).expect("should exist").name
1283    }
1284
1285    /// Mutably gets the name of the `i`th column.
1286    ///
1287    /// # Panics
1288    ///
1289    /// Panics if `i` is not a valid column index.
1290    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1291        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1292        &mut self
1293            .metadata
1294            .get_mut(&ColumnIndex(i))
1295            .expect("should exist")
1296            .name
1297    }
1298
1299    /// Gets the [`SqlColumnType`] of the column at `idx`.
1300    ///
1301    /// # Panics
1302    ///
1303    /// Panics if no column exists at `idx`.
1304    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1305        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1306        &self.typ.column_types[typ_idx]
1307    }
1308
1309    /// Gets the name of the `i`th column if that column name is unambiguous.
1310    ///
1311    /// If at least one other column has the same name as the `i`th column,
1312    /// returns `None`. If the `i`th column has no name, returns `None`.
1313    ///
1314    /// # Panics
1315    ///
1316    /// Panics if `i` is not a valid column index.
1317    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1318        let name = self.get_name(i);
1319        if self.iter_names().filter(|n| *n == name).count() == 1 {
1320            Some(name)
1321        } else {
1322            None
1323        }
1324    }
1325
1326    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1327    ///
1328    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1329    /// structure will be simple to extend.
1330    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1331        let name = self.get_name(i);
1332        let typ = &self.typ.column_types[i];
1333        if d == &Datum::Null && !typ.nullable {
1334            Err(NotNullViolation(name.clone()))
1335        } else {
1336            Ok(())
1337        }
1338    }
1339
1340    /// Computes the differences between two [`RelationDesc`]s.
1341    ///
1342    /// Returns a rich diff describing which columns differ, and in what way.
1343    ///
1344    /// # Panics
1345    ///
1346    /// Panics if either `self` or `other` have columns that were added at a
1347    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
1348    /// columns were dropped.
1349    ///
1350    /// This simplifies things by allowing us to assume that `ColumnIndex`es are
1351    /// dense and that they match the indexes of `typ.columns()`. Without this
1352    /// we would, e.g., struggle comparing keys as those are in terms of
1353    /// `typ.columns()` indexes.
1354    pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1355        assert_eq!(self.metadata.len(), self.typ.columns().len());
1356        assert_eq!(other.metadata.len(), other.typ.columns().len());
1357        for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1358            assert_eq!(meta.typ_idx, idx.0);
1359            assert_eq!(meta.added, RelationVersion::root());
1360            assert_none!(meta.dropped);
1361        }
1362
1363        let mut column_diffs = BTreeMap::new();
1364        let mut key_diff = None;
1365
1366        let left_arity = self.arity();
1367        let right_arity = other.arity();
1368        let common_arity = std::cmp::min(left_arity, right_arity);
1369
1370        for idx in 0..common_arity {
1371            let left_name = self.get_name(idx);
1372            let right_name = other.get_name(idx);
1373            let left_type = &self.typ.column_types[idx];
1374            let right_type = &other.typ.column_types[idx];
1375
1376            if left_name != right_name {
1377                let diff = ColumnDiff::NameMismatch {
1378                    left: left_name.clone(),
1379                    right: right_name.clone(),
1380                };
1381                column_diffs.insert(idx, diff);
1382            } else if left_type.scalar_type != right_type.scalar_type {
1383                let diff = ColumnDiff::TypeMismatch {
1384                    name: left_name.clone(),
1385                    left: left_type.scalar_type.clone(),
1386                    right: right_type.scalar_type.clone(),
1387                };
1388                column_diffs.insert(idx, diff);
1389            } else if left_type.nullable != right_type.nullable {
1390                let diff = ColumnDiff::NullabilityMismatch {
1391                    name: left_name.clone(),
1392                    left: left_type.nullable,
1393                    right: right_type.nullable,
1394                };
1395                column_diffs.insert(idx, diff);
1396            }
1397        }
1398
1399        for idx in common_arity..left_arity {
1400            let diff = ColumnDiff::Missing {
1401                name: self.get_name(idx).clone(),
1402            };
1403            column_diffs.insert(idx, diff);
1404        }
1405
1406        for idx in common_arity..right_arity {
1407            let diff = ColumnDiff::Extra {
1408                name: other.get_name(idx).clone(),
1409            };
1410            column_diffs.insert(idx, diff);
1411        }
1412
1413        let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1414        let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1415        if left_keys != right_keys {
1416            let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1417                keys.iter()
1418                    .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1419                    .collect()
1420            };
1421            key_diff = Some(KeyDiff {
1422                left: column_names(self, left_keys),
1423                right: column_names(other, right_keys),
1424            });
1425        }
1426
1427        RelationDescDiff {
1428            column_diffs,
1429            key_diff,
1430        }
1431    }
1432
1433    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1434    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1435        let mut new_desc = self.clone();
1436
1437        // Update ColumnMetadata.
1438        let mut removed = 0;
1439        new_desc.metadata.retain(|idx, metadata| {
1440            let retain = demands.contains(&idx.0);
1441            if !retain {
1442                removed += 1;
1443            } else {
1444                metadata.typ_idx -= removed;
1445            }
1446            retain
1447        });
1448
1449        // Update SqlColumnType.
1450        let mut idx = 0;
1451        new_desc.typ.column_types.retain(|_| {
1452            let keep = demands.contains(&idx);
1453            idx += 1;
1454            keep
1455        });
1456
1457        new_desc
1458    }
1459}
1460
1461#[cfg(any(test, feature = "proptest"))]
1462impl Arbitrary for RelationDesc {
1463    type Parameters = ();
1464    type Strategy = BoxedStrategy<RelationDesc>;
1465
1466    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1467        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1468        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1469            weights.extend([
1470                (12, Just(16..32)),
1471                (6, Just(32..64)),
1472                (3, Just(64..128)),
1473                (1, Just(128..256)),
1474            ]);
1475        }
1476        let num_columns = Union::new_weighted(weights);
1477
1478        num_columns.prop_flat_map(arb_relation_desc).boxed()
1479    }
1480}
1481
1482/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1483/// within the range provided.
1484#[cfg(any(test, feature = "proptest"))]
1485pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1486    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1487        .prop_map(RelationDesc::from_names_and_types)
1488}
1489
1490/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1491#[cfg(any(test, feature = "proptest"))]
1492pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1493    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1494    mask.prop_map(move |mask| {
1495        let demands: BTreeSet<_> = mask
1496            .into_iter()
1497            .enumerate()
1498            .filter_map(|(idx, keep)| keep.then_some(idx))
1499            .collect();
1500        desc.apply_demand(&demands)
1501    })
1502}
1503
1504impl IntoIterator for RelationDesc {
1505    type Item = (ColumnName, SqlColumnType);
1506    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1507
1508    fn into_iter(self) -> Self::IntoIter {
1509        let iter = self
1510            .metadata
1511            .into_values()
1512            .zip_eq(self.typ.column_types)
1513            .map(|(meta, typ)| (meta.name, typ));
1514        Box::new(iter)
1515    }
1516}
1517
1518/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1519#[cfg(any(test, feature = "proptest"))]
1520pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1521    let datums: Vec<_> = desc
1522        .typ()
1523        .columns()
1524        .iter()
1525        .cloned()
1526        .map(arb_datum_for_column)
1527        .collect();
1528    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1529}
1530
1531/// Expression violated not-null constraint on named column
1532#[derive(Debug, PartialEq, Eq)]
1533pub struct NotNullViolation(pub ColumnName);
1534
1535impl fmt::Display for NotNullViolation {
1536    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1537        write!(
1538            f,
1539            "null value in column {} violates not-null constraint",
1540            self.0.quoted()
1541        )
1542    }
1543}
1544
1545/// The result of comparing two [`RelationDesc`]s.
1546#[derive(Debug, Clone, PartialEq, Eq)]
1547pub struct RelationDescDiff {
1548    /// Column differences, keyed by column index.
1549    pub column_diffs: BTreeMap<usize, ColumnDiff>,
1550    /// Key differences, if any.
1551    pub key_diff: Option<KeyDiff>,
1552}
1553
1554impl RelationDescDiff {
1555    /// Returns whether the diff contains any differences.
1556    pub fn is_empty(&self) -> bool {
1557        self.column_diffs.is_empty() && self.key_diff.is_none()
1558    }
1559}
1560
1561/// A difference in a column between two [`RelationDesc`]s.
1562#[derive(Debug, Clone, PartialEq, Eq)]
1563pub enum ColumnDiff {
1564    /// Column exists only in the left relation.
1565    Missing { name: ColumnName },
1566    /// Column exists only in the right relation.
1567    Extra { name: ColumnName },
1568    /// Columns have different types.
1569    TypeMismatch {
1570        name: ColumnName,
1571        left: SqlScalarType,
1572        right: SqlScalarType,
1573    },
1574    /// Columns have different nullability.
1575    NullabilityMismatch {
1576        name: ColumnName,
1577        left: bool,
1578        right: bool,
1579    },
1580    /// Columns have different names.
1581    NameMismatch { left: ColumnName, right: ColumnName },
1582}
1583
1584/// A difference in the keys of two [`RelationDesc`]s.
1585#[derive(Debug, Clone, PartialEq, Eq)]
1586pub struct KeyDiff {
1587    /// Keys of the left relation.
1588    pub left: BTreeSet<Vec<ColumnName>>,
1589    /// Keys of the right relation.
1590    pub right: BTreeSet<Vec<ColumnName>>,
1591}
1592
1593/// A builder for a [`RelationDesc`].
1594#[derive(Clone, Default, Debug, PartialEq, Eq)]
1595pub struct RelationDescBuilder {
1596    /// Columns of the relation.
1597    columns: Vec<(ColumnName, SqlColumnType)>,
1598    /// Sets of indices that are "keys" for the collection.
1599    keys: Vec<Vec<usize>>,
1600}
1601
1602impl RelationDescBuilder {
1603    /// Appends a column with the specified name and type.
1604    pub fn with_column<N: Into<ColumnName>>(
1605        mut self,
1606        name: N,
1607        ty: SqlColumnType,
1608    ) -> RelationDescBuilder {
1609        let name = name.into();
1610        self.columns.push((name, ty));
1611        self
1612    }
1613
1614    /// Appends the provided columns to the builder.
1615    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1616    where
1617        I: IntoIterator<Item = (N, T)>,
1618        T: Into<SqlColumnType>,
1619        N: Into<ColumnName>,
1620    {
1621        self.columns
1622            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1623        self
1624    }
1625
1626    /// Adds a new key for the relation.
1627    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1628        indices.sort_unstable();
1629        if !self.keys.contains(&indices) {
1630            self.keys.push(indices);
1631        }
1632        self
1633    }
1634
1635    /// Removes all previously inserted keys.
1636    pub fn without_keys(mut self) -> RelationDescBuilder {
1637        self.keys.clear();
1638        assert_eq!(self.keys.len(), 0);
1639        self
1640    }
1641
1642    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1643    pub fn concat(mut self, other: Self) -> Self {
1644        let self_len = self.columns.len();
1645
1646        self.columns.extend(other.columns);
1647        for k in other.keys {
1648            let k = k.into_iter().map(|idx| idx + self_len).collect();
1649            self = self.with_key(k);
1650        }
1651
1652        self
1653    }
1654
1655    /// Finish the builder, returning a [`RelationDesc`].
1656    pub fn finish(self) -> RelationDesc {
1657        let mut desc = RelationDesc::from_names_and_types(self.columns);
1658        desc.typ = desc.typ.with_keys(self.keys);
1659        desc
1660    }
1661}
1662
1663/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1664#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1665pub enum RelationVersionSelector {
1666    Specific(RelationVersion),
1667    Latest,
1668}
1669
1670impl RelationVersionSelector {
1671    pub fn specific(version: u64) -> Self {
1672        RelationVersionSelector::Specific(RelationVersion(version))
1673    }
1674}
1675
1676/// A wrapper around [`RelationDesc`] that provides an interface for adding
1677/// columns and generating new versions.
1678///
1679/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1680/// be great.
1681#[derive(Debug, Clone, Serialize)]
1682pub struct VersionedRelationDesc {
1683    inner: RelationDesc,
1684}
1685
1686impl VersionedRelationDesc {
1687    pub fn new(inner: RelationDesc) -> Self {
1688        VersionedRelationDesc { inner }
1689    }
1690
1691    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1692    ///
1693    /// # Panics
1694    ///
1695    /// * Panics if a column with `name` already exists that hasn't been dropped.
1696    ///
1697    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1698    #[must_use]
1699    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1700    where
1701        N: Into<ColumnName>,
1702        T: Into<SqlColumnType>,
1703    {
1704        let latest_version = self.latest_version();
1705        let new_version = latest_version.bump();
1706
1707        let name = name.into();
1708        let existing = self
1709            .inner
1710            .metadata
1711            .iter()
1712            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1713        if let Some(existing) = existing {
1714            panic!("column named '{name}' already exists! {existing:?}");
1715        }
1716
1717        let next_idx = self.inner.metadata.len();
1718        let col_meta = ColumnMetadata {
1719            name,
1720            typ_idx: next_idx,
1721            added: new_version,
1722            dropped: None,
1723        };
1724
1725        self.inner.typ.column_types.push(typ.into());
1726        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1727
1728        assert_none!(prev, "column index overlap!");
1729        self.validate();
1730
1731        new_version
1732    }
1733
1734    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1735    /// `name` drops the left-most one that hasn't already been dropped.
1736    ///
1737    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1738    ///
1739    /// # Panics
1740    ///
1741    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1742    #[must_use]
1743    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1744    where
1745        N: Into<ColumnName>,
1746    {
1747        let name = name.into();
1748        let latest_version = self.latest_version();
1749        let new_version = latest_version.bump();
1750
1751        let col = self
1752            .inner
1753            .metadata
1754            .values_mut()
1755            .find(|meta| meta.name == name && meta.dropped.is_none())
1756            .expect("column to exist");
1757
1758        // Make sure the column hadn't been previously dropped.
1759        assert_none!(col.dropped, "column was already dropped");
1760        col.dropped = Some(new_version);
1761
1762        // Make sure the column isn't being used as a key.
1763        let dropped_key = self
1764            .inner
1765            .typ
1766            .keys
1767            .iter()
1768            .any(|keys| keys.contains(&col.typ_idx));
1769        assert!(!dropped_key, "column being dropped was used as a key");
1770
1771        self.validate();
1772        new_version
1773    }
1774
1775    /// Returns the [`RelationDesc`] at the latest version.
1776    pub fn latest(&self) -> RelationDesc {
1777        self.inner.clone()
1778    }
1779
1780    /// Returns this [`RelationDesc`] at the specified version.
1781    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1782        // Get all of the changes from the start, up to whatever version was requested.
1783        let up_to_version = match version {
1784            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1785            RelationVersionSelector::Specific(v) => v,
1786        };
1787
1788        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1789            let added = meta.added <= up_to_version;
1790            let dropped = meta
1791                .dropped
1792                .map(|dropped_at| up_to_version >= dropped_at)
1793                .unwrap_or(false);
1794
1795            added && !dropped
1796        });
1797
1798        let mut column_types = Vec::new();
1799        let mut column_metas = BTreeMap::new();
1800
1801        // N.B. At this point we need to be careful because col_idx might not
1802        // equal typ_idx.
1803        //
1804        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1805        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1806        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1807        for (col_idx, meta) in valid_columns {
1808            let new_meta = ColumnMetadata {
1809                name: meta.name.clone(),
1810                typ_idx: column_types.len(),
1811                added: meta.added.clone(),
1812                dropped: meta.dropped.clone(),
1813            };
1814            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1815            column_metas.insert(*col_idx, new_meta);
1816        }
1817
1818        // Remap keys in case a column with an index less than that of a key was
1819        // dropped.
1820        //
1821        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1822        // keys and "b" was dropped.
1823        let keys = self
1824            .inner
1825            .typ
1826            .keys
1827            .iter()
1828            .map(|keys| {
1829                keys.iter()
1830                    .map(|key_idx| {
1831                        let metadata = column_metas
1832                            .get(&ColumnIndex(*key_idx))
1833                            .expect("found key for column that doesn't exist");
1834                        metadata.typ_idx
1835                    })
1836                    .collect()
1837            })
1838            .collect();
1839
1840        let relation_type = SqlRelationType { column_types, keys };
1841
1842        RelationDesc {
1843            typ: relation_type,
1844            metadata: column_metas,
1845        }
1846    }
1847
1848    pub fn latest_version(&self) -> RelationVersion {
1849        self.inner
1850            .metadata
1851            .values()
1852            // N.B. Dropped is always greater than added.
1853            .map(|meta| meta.dropped.unwrap_or(meta.added))
1854            .max()
1855            // If there aren't any columns we're implicitly the root version.
1856            .unwrap_or_else(RelationVersion::root)
1857    }
1858
1859    /// Validates internal contraints of the [`RelationDesc`] are correct.
1860    ///
1861    /// # Panics
1862    ///
1863    /// Panics if a constraint is not satisfied.
1864    fn validate(&self) {
1865        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1866            if desc.typ.column_types.len() != desc.metadata.len() {
1867                anyhow::bail!("mismatch between number of types and metadatas");
1868            }
1869
1870            for (col_idx, meta) in &desc.metadata {
1871                if col_idx.0 > desc.metadata.len() {
1872                    anyhow::bail!("column index out of bounds");
1873                }
1874                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1875                    anyhow::bail!("column was added after it was dropped?");
1876                }
1877                if desc.typ().columns().get(meta.typ_idx).is_none() {
1878                    anyhow::bail!("typ_idx incorrect");
1879                }
1880            }
1881
1882            for keys in &desc.typ.keys {
1883                for key in keys {
1884                    if *key >= desc.typ.column_types.len() {
1885                        anyhow::bail!("key index was out of bounds!");
1886                    }
1887                }
1888            }
1889
1890            let versions = desc
1891                .metadata
1892                .values()
1893                .map(|meta| meta.dropped.unwrap_or(meta.added));
1894            let mut max = 0;
1895            let mut sum = 0;
1896            for version in versions {
1897                max = std::cmp::max(max, version.0);
1898                sum += version.0;
1899            }
1900
1901            // Other than RelationVersion(0), we should never have duplicate
1902            // versions and they should always increase by 1. In other words, the
1903            // sum of all RelationVersions should be the sum of [0, max].
1904            //
1905            // N.B. n * (n + 1) / 2 = sum of [0, n]
1906            //
1907            // While I normally don't like tricks like this, it allows us to
1908            // validate that our column versions are correct in O(n) time and
1909            // without allocations.
1910            if sum != (max * (max + 1) / 2) {
1911                anyhow::bail!("there is a duplicate or missing relation version");
1912            }
1913
1914            Ok(())
1915        }
1916
1917        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1918    }
1919}
1920
1921/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1922/// exercise schema migrations.
1923#[derive(Debug)]
1924#[cfg(any(test, feature = "proptest"))]
1925pub enum PropRelationDescDiff {
1926    AddColumn {
1927        name: ColumnName,
1928        typ: SqlColumnType,
1929    },
1930    DropColumn {
1931        name: ColumnName,
1932    },
1933    ToggleNullability {
1934        name: ColumnName,
1935    },
1936    ChangeType {
1937        name: ColumnName,
1938        typ: SqlColumnType,
1939    },
1940}
1941
1942#[cfg(any(test, feature = "proptest"))]
1943impl PropRelationDescDiff {
1944    pub fn apply(self, desc: &mut RelationDesc) {
1945        match self {
1946            PropRelationDescDiff::AddColumn { name, typ } => {
1947                let new_idx = desc.metadata.len();
1948                let meta = ColumnMetadata {
1949                    name,
1950                    typ_idx: new_idx,
1951                    added: RelationVersion(0),
1952                    dropped: None,
1953                };
1954                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1955                desc.typ.column_types.push(typ);
1956
1957                assert_none!(prev);
1958                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1959            }
1960            PropRelationDescDiff::DropColumn { name } => {
1961                let next_version = desc
1962                    .metadata
1963                    .values()
1964                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1965                    .max()
1966                    .unwrap_or_else(RelationVersion::root)
1967                    .bump();
1968                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1969                else {
1970                    return;
1971                };
1972                if metadata.dropped.is_none() {
1973                    metadata.dropped = Some(next_version);
1974                }
1975            }
1976            PropRelationDescDiff::ToggleNullability { name } => {
1977                let Some((pos, _)) = desc.get_by_name(&name) else {
1978                    return;
1979                };
1980                let col_type = desc
1981                    .typ
1982                    .column_types
1983                    .get_mut(pos)
1984                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1985                col_type.nullable = !col_type.nullable;
1986            }
1987            PropRelationDescDiff::ChangeType { name, typ } => {
1988                let Some((pos, _)) = desc.get_by_name(&name) else {
1989                    return;
1990                };
1991                let col_type = desc
1992                    .typ
1993                    .column_types
1994                    .get_mut(pos)
1995                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1996                *col_type = typ;
1997            }
1998        }
1999    }
2000}
2001
2002/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
2003#[cfg(any(test, feature = "proptest"))]
2004pub fn arb_relation_desc_diff(
2005    source: &RelationDesc,
2006) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
2007    let source = Rc::new(source.clone());
2008    let num_source_columns = source.typ.columns().len();
2009
2010    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
2011    let add_columns_strat = num_add_columns
2012        .prop_flat_map(|num_columns| {
2013            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
2014        })
2015        .prop_map(|cols| {
2016            cols.into_iter()
2017                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
2018                .collect::<Vec<_>>()
2019        });
2020
2021    // If the source RelationDesc is empty there is nothing else to do.
2022    if num_source_columns == 0 {
2023        return add_columns_strat.boxed();
2024    }
2025
2026    let source_ = Rc::clone(&source);
2027    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2028        let mut set = BTreeSet::default();
2029        for _ in 0..num_columns {
2030            let col_idx = rng.random_range(0..num_source_columns);
2031            set.insert(source_.get_name(col_idx).clone());
2032        }
2033        set.into_iter()
2034            .map(|name| PropRelationDescDiff::DropColumn { name })
2035            .collect::<Vec<_>>()
2036    });
2037
2038    let source_ = Rc::clone(&source);
2039    let toggle_nullability_strat =
2040        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2041            let mut set = BTreeSet::default();
2042            for _ in 0..num_columns {
2043                let col_idx = rng.random_range(0..num_source_columns);
2044                set.insert(source_.get_name(col_idx).clone());
2045            }
2046            set.into_iter()
2047                .map(|name| PropRelationDescDiff::ToggleNullability { name })
2048                .collect::<Vec<_>>()
2049        });
2050
2051    let source_ = Rc::clone(&source);
2052    let change_type_strat = (0..num_source_columns)
2053        .prop_perturb(move |num_columns, mut rng| {
2054            let mut set = BTreeSet::default();
2055            for _ in 0..num_columns {
2056                let col_idx = rng.random_range(0..num_source_columns);
2057                set.insert(source_.get_name(col_idx).clone());
2058            }
2059            set
2060        })
2061        .prop_flat_map(|cols| {
2062            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
2063                .prop_map(move |types| (cols.clone(), types))
2064        })
2065        .prop_map(|(cols, types)| {
2066            cols.into_iter()
2067                .zip_eq(types)
2068                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
2069                .collect::<Vec<_>>()
2070        });
2071
2072    (
2073        add_columns_strat,
2074        drop_columns_strat,
2075        toggle_nullability_strat,
2076        change_type_strat,
2077    )
2078        .prop_map(|(adds, drops, toggles, changes)| {
2079            adds.into_iter()
2080                .chain(drops)
2081                .chain(toggles)
2082                .chain(changes)
2083                .collect::<Vec<_>>()
2084        })
2085        .prop_shuffle()
2086        .boxed()
2087}
2088
2089#[cfg(test)]
2090mod tests {
2091    use super::*;
2092    use prost::Message;
2093
2094    #[mz_ore::test]
2095    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2096    fn smoktest_at_version() {
2097        let desc = RelationDesc::builder()
2098            .with_column("a", SqlScalarType::Bool.nullable(true))
2099            .with_column("z", SqlScalarType::String.nullable(false))
2100            .finish();
2101
2102        let mut versioned_desc = VersionedRelationDesc {
2103            inner: desc.clone(),
2104        };
2105        versioned_desc.validate();
2106
2107        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2108        assert_eq!(desc, latest);
2109
2110        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2111        assert_eq!(desc, v0);
2112
2113        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2114        assert_eq!(desc, v3);
2115
2116        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2117        assert_eq!(v1, RelationVersion(1));
2118
2119        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2120        insta::assert_json_snapshot!(v1.metadata, @r###"
2121        {
2122          "0": {
2123            "name": "a",
2124            "typ_idx": 0,
2125            "added": 0,
2126            "dropped": null
2127          },
2128          "1": {
2129            "name": "z",
2130            "typ_idx": 1,
2131            "added": 0,
2132            "dropped": null
2133          },
2134          "2": {
2135            "name": "b",
2136            "typ_idx": 2,
2137            "added": 1,
2138            "dropped": null
2139          }
2140        }
2141        "###);
2142
2143        // Check that V0 doesn't show the new column.
2144        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2145        assert!(v0.iter().eq(v0_b.iter()));
2146
2147        let v2 = versioned_desc.drop_column("z");
2148        assert_eq!(v2, RelationVersion(2));
2149
2150        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2151        insta::assert_json_snapshot!(v2.metadata, @r###"
2152        {
2153          "0": {
2154            "name": "a",
2155            "typ_idx": 0,
2156            "added": 0,
2157            "dropped": null
2158          },
2159          "2": {
2160            "name": "b",
2161            "typ_idx": 1,
2162            "added": 1,
2163            "dropped": null
2164          }
2165        }
2166        "###);
2167
2168        // Check that V0 and V1 are still correct.
2169        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2170        assert!(v0.iter().eq(v0_c.iter()));
2171
2172        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2173        assert!(v1.iter().eq(v1_b.iter()));
2174
2175        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2176        {
2177          "0": {
2178            "name": "a",
2179            "typ_idx": 0,
2180            "added": 0,
2181            "dropped": null
2182          },
2183          "1": {
2184            "name": "z",
2185            "typ_idx": 1,
2186            "added": 0,
2187            "dropped": 2
2188          },
2189          "2": {
2190            "name": "b",
2191            "typ_idx": 2,
2192            "added": 1,
2193            "dropped": null
2194          }
2195        }
2196        "###);
2197    }
2198
2199    #[mz_ore::test]
2200    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2201    fn test_dropping_columns_with_keys() {
2202        let desc = RelationDesc::builder()
2203            .with_column("a", SqlScalarType::Bool.nullable(true))
2204            .with_column("z", SqlScalarType::String.nullable(false))
2205            .with_key(vec![1])
2206            .finish();
2207
2208        let mut versioned_desc = VersionedRelationDesc {
2209            inner: desc.clone(),
2210        };
2211        versioned_desc.validate();
2212
2213        let v1 = versioned_desc.drop_column("a");
2214        assert_eq!(v1, RelationVersion(1));
2215
2216        // Make sure the key index for 'z' got remapped since 'a' was dropped.
2217        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2218        insta::assert_json_snapshot!(v1, @r###"
2219        {
2220          "typ": {
2221            "column_types": [
2222              {
2223                "scalar_type": "String",
2224                "nullable": false
2225              }
2226            ],
2227            "keys": [
2228              [
2229                0
2230              ]
2231            ]
2232          },
2233          "metadata": {
2234            "1": {
2235              "name": "z",
2236              "typ_idx": 0,
2237              "added": 0,
2238              "dropped": null
2239            }
2240          }
2241        }
2242        "###);
2243
2244        // Make sure the key index of 'z' is correct when all columns are present.
2245        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2246        insta::assert_json_snapshot!(v0, @r###"
2247        {
2248          "typ": {
2249            "column_types": [
2250              {
2251                "scalar_type": "Bool",
2252                "nullable": true
2253              },
2254              {
2255                "scalar_type": "String",
2256                "nullable": false
2257              }
2258            ],
2259            "keys": [
2260              [
2261                1
2262              ]
2263            ]
2264          },
2265          "metadata": {
2266            "0": {
2267              "name": "a",
2268              "typ_idx": 0,
2269              "added": 0,
2270              "dropped": 1
2271            },
2272            "1": {
2273              "name": "z",
2274              "typ_idx": 1,
2275              "added": 0,
2276              "dropped": null
2277            }
2278          }
2279        }
2280        "###);
2281    }
2282
2283    #[mz_ore::test]
2284    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2285    fn roundtrip_relation_desc_without_metadata() {
2286        let typ = ProtoRelationType {
2287            column_types: vec![
2288                SqlScalarType::String.nullable(false).into_proto(),
2289                SqlScalarType::Bool.nullable(true).into_proto(),
2290            ],
2291            keys: vec![],
2292        };
2293        let proto = ProtoRelationDesc {
2294            typ: Some(typ),
2295            names: vec![
2296                ColumnName("a".into()).into_proto(),
2297                ColumnName("b".into()).into_proto(),
2298            ],
2299            metadata: vec![],
2300        };
2301        let desc: RelationDesc = proto.into_rust().unwrap();
2302
2303        insta::assert_json_snapshot!(desc, @r###"
2304        {
2305          "typ": {
2306            "column_types": [
2307              {
2308                "scalar_type": "String",
2309                "nullable": false
2310              },
2311              {
2312                "scalar_type": "Bool",
2313                "nullable": true
2314              }
2315            ],
2316            "keys": []
2317          },
2318          "metadata": {
2319            "0": {
2320              "name": "a",
2321              "typ_idx": 0,
2322              "added": 0,
2323              "dropped": null
2324            },
2325            "1": {
2326              "name": "b",
2327              "typ_idx": 1,
2328              "added": 0,
2329              "dropped": null
2330            }
2331          }
2332        }
2333        "###);
2334    }
2335
2336    #[mz_ore::test]
2337    #[should_panic(expected = "column named 'a' already exists!")]
2338    fn test_add_column_with_same_name_panics() {
2339        let desc = RelationDesc::builder()
2340            .with_column("a", SqlScalarType::Bool.nullable(true))
2341            .finish();
2342        let mut versioned = VersionedRelationDesc::new(desc);
2343
2344        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2345    }
2346
2347    #[mz_ore::test]
2348    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
2349    fn test_add_column_with_same_name_prev_dropped() {
2350        let desc = RelationDesc::builder()
2351            .with_column("a", SqlScalarType::Bool.nullable(true))
2352            .finish();
2353        let mut versioned = VersionedRelationDesc::new(desc);
2354
2355        let v1 = versioned.drop_column("a");
2356        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2357        insta::assert_json_snapshot!(v1, @r###"
2358        {
2359          "typ": {
2360            "column_types": [],
2361            "keys": []
2362          },
2363          "metadata": {}
2364        }
2365        "###);
2366
2367        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2368        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2369        insta::assert_json_snapshot!(v2, @r###"
2370        {
2371          "typ": {
2372            "column_types": [
2373              {
2374                "scalar_type": "String",
2375                "nullable": false
2376              }
2377            ],
2378            "keys": []
2379          },
2380          "metadata": {
2381            "1": {
2382              "name": "a",
2383              "typ_idx": 0,
2384              "added": 2,
2385              "dropped": null
2386            }
2387          }
2388        }
2389        "###);
2390    }
2391
2392    #[mz_ore::test]
2393    #[cfg_attr(miri, ignore)]
2394    fn apply_demand() {
2395        let desc = RelationDesc::builder()
2396            .with_column("a", SqlScalarType::String.nullable(true))
2397            .with_column("b", SqlScalarType::Int64.nullable(false))
2398            .with_column("c", SqlScalarType::Time.nullable(false))
2399            .finish();
2400        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2401        assert_eq!(desc.arity(), 2);
2402        // TODO(parkmycar): Move validate onto RelationDesc.
2403        VersionedRelationDesc::new(desc).validate();
2404    }
2405
2406    #[mz_ore::test]
2407    #[cfg_attr(miri, ignore)]
2408    fn smoketest_column_index_stable_ident() {
2409        let idx_a = ColumnIndex(42);
2410        // Note(parkmycar): This should never change.
2411        assert_eq!(idx_a.to_stable_name(), "42");
2412    }
2413
2414    #[mz_ore::test]
2415    #[cfg_attr(miri, ignore)] // too slow
2416    fn proptest_relation_desc_roundtrips() {
2417        fn testcase(og: RelationDesc) {
2418            let bytes = og.into_proto().encode_to_vec();
2419            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2420            let rnd = RelationDesc::from_proto(proto).unwrap();
2421
2422            assert_eq!(og, rnd);
2423        }
2424
2425        proptest!(|(desc in any::<RelationDesc>())| {
2426            testcase(desc);
2427        });
2428
2429        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2430            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2431        });
2432
2433        proptest!(|((mut desc, diffs) in strat)| {
2434            for diff in diffs {
2435                diff.apply(&mut desc);
2436            };
2437            testcase(desc);
2438        });
2439    }
2440}