mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26
27use crate::relation_and_scalar::proto_relation_type::ProtoKey;
28pub use crate::relation_and_scalar::{
29    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
30    ProtoRelationVersion,
31};
32use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
33
34/// The type of a [`Datum`].
35///
36/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
37/// Int32 or String) with its nullability.
38///
39/// To construct a column type, either initialize the struct directly, or
40/// use the [`SqlScalarType::nullable`] method.
41#[derive(
42    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
43)]
44pub struct SqlColumnType {
45    /// The underlying scalar type (e.g., Int32 or String) of this column.
46    pub scalar_type: SqlScalarType,
47    /// Whether this datum can be null.
48    #[serde(default = "return_true")]
49    pub nullable: bool,
50}
51
52/// This method exists solely for the purpose of making SqlColumnType nullable by
53/// default in unit tests. The default value of a bool is false, and the only
54/// way to make an object take on any other value by default is to pass it a
55/// function that returns the desired default value. See
56/// <https://github.com/serde-rs/serde/issues/1030>
57#[inline(always)]
58fn return_true() -> bool {
59    true
60}
61
62impl SqlColumnType {
63    pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
64        match (&self.scalar_type, &other.scalar_type) {
65            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
66                Ok(SqlColumnType {
67                    scalar_type: scalar_type.clone(),
68                    nullable: self.nullable || other.nullable,
69                })
70            }
71            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
72                Ok(SqlColumnType {
73                    scalar_type: scalar_type.without_modifiers(),
74                    nullable: self.nullable || other.nullable,
75                })
76            }
77            (
78                SqlScalarType::Record { fields, custom_id },
79                SqlScalarType::Record {
80                    fields: other_fields,
81                    custom_id: other_custom_id,
82                },
83            ) => {
84                if custom_id != other_custom_id {
85                    bail!(
86                        "Can't union types: {:?} and {:?}",
87                        self.scalar_type,
88                        other.scalar_type
89                    );
90                };
91
92                let mut union_fields = Vec::with_capacity(fields.len());
93                for ((name, typ), (other_name, other_typ)) in
94                    fields.iter().zip_eq(other_fields.iter())
95                {
96                    if name != other_name {
97                        bail!(
98                            "Can't union types: {:?} and {:?}",
99                            self.scalar_type,
100                            other.scalar_type
101                        );
102                    } else {
103                        let union_column_type = typ.union(other_typ)?;
104                        union_fields.push((name.clone(), union_column_type));
105                    };
106                }
107
108                Ok(SqlColumnType {
109                    scalar_type: SqlScalarType::Record {
110                        fields: union_fields.into(),
111                        custom_id: *custom_id,
112                    },
113                    nullable: self.nullable || other.nullable,
114                })
115            }
116            _ => bail!(
117                "Can't union types: {:?} and {:?}",
118                self.scalar_type,
119                other.scalar_type
120            ),
121        }
122    }
123
124    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
125    /// nullability set to the specified boolean.
126    pub fn nullable(mut self, nullable: bool) -> Self {
127        self.nullable = nullable;
128        self
129    }
130}
131
132impl RustType<ProtoColumnType> for SqlColumnType {
133    fn into_proto(&self) -> ProtoColumnType {
134        ProtoColumnType {
135            nullable: self.nullable,
136            scalar_type: Some(self.scalar_type.into_proto()),
137        }
138    }
139
140    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
141        Ok(SqlColumnType {
142            nullable: proto.nullable,
143            scalar_type: proto
144                .scalar_type
145                .into_rust_if_some("ProtoColumnType::scalar_type")?,
146        })
147    }
148}
149
150impl fmt::Display for SqlColumnType {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        let nullable = if self.nullable { "Null" } else { "NotNull" };
153        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
154    }
155}
156
157/// The type of a relation.
158#[derive(
159    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
160)]
161pub struct SqlRelationType {
162    /// The type for each column, in order.
163    pub column_types: Vec<SqlColumnType>,
164    /// Sets of indices that are "keys" for the collection.
165    ///
166    /// Each element in this list is a set of column indices, each with the
167    /// property that the collection contains at most one record with each
168    /// distinct set of values for each column. Alternately, for a specific set
169    /// of values assigned to the these columns there is at most one record.
170    ///
171    /// A collection can contain multiple sets of keys, although it is common to
172    /// have either zero or one sets of key indices.
173    #[serde(default)]
174    pub keys: Vec<Vec<usize>>,
175}
176
177impl SqlRelationType {
178    /// Constructs a `SqlRelationType` representing the relation with no columns and
179    /// no keys.
180    pub fn empty() -> Self {
181        SqlRelationType::new(vec![])
182    }
183
184    /// Constructs a new `SqlRelationType` from specified column types.
185    ///
186    /// The `SqlRelationType` will have no keys.
187    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
188        SqlRelationType {
189            column_types,
190            keys: Vec::new(),
191        }
192    }
193
194    /// Adds a new key for the relation.
195    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
196        indices.sort_unstable();
197        if !self.keys.contains(&indices) {
198            self.keys.push(indices);
199        }
200        self
201    }
202
203    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
204        for key in keys {
205            self = self.with_key(key)
206        }
207        self
208    }
209
210    /// Computes the number of columns in the relation.
211    pub fn arity(&self) -> usize {
212        self.column_types.len()
213    }
214
215    /// Gets the index of the columns used when creating a default index.
216    pub fn default_key(&self) -> Vec<usize> {
217        if let Some(key) = self.keys.first() {
218            if key.is_empty() {
219                (0..self.column_types.len()).collect()
220            } else {
221                key.clone()
222            }
223        } else {
224            (0..self.column_types.len()).collect()
225        }
226    }
227
228    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
229    pub fn columns(&self) -> &[SqlColumnType] {
230        &self.column_types
231    }
232}
233
234impl RustType<ProtoRelationType> for SqlRelationType {
235    fn into_proto(&self) -> ProtoRelationType {
236        ProtoRelationType {
237            column_types: self.column_types.into_proto(),
238            keys: self.keys.into_proto(),
239        }
240    }
241
242    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
243        Ok(SqlRelationType {
244            column_types: proto.column_types.into_rust()?,
245            keys: proto.keys.into_rust()?,
246        })
247    }
248}
249
250impl RustType<ProtoKey> for Vec<usize> {
251    fn into_proto(&self) -> ProtoKey {
252        ProtoKey {
253            keys: self.into_proto(),
254        }
255    }
256
257    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
258        proto.keys.into_rust()
259    }
260}
261
262#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
263pub struct ReprColumnType {
264    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
265    pub scalar_type: ReprScalarType,
266    /// Whether this datum can be null.
267    #[serde(default = "return_true")]
268    pub nullable: bool,
269}
270
271impl From<SqlColumnType> for ReprColumnType {
272    fn from(sql_column_type: SqlColumnType) -> Self {
273        ReprColumnType {
274            scalar_type: sql_column_type.scalar_type.into(),
275            nullable: sql_column_type.nullable,
276        }
277    }
278}
279
280/// The name of a column in a [`RelationDesc`].
281#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
282pub struct ColumnName(Box<str>);
283
284impl ColumnName {
285    /// Returns this column name as a `str`.
286    #[inline(always)]
287    pub fn as_str(&self) -> &str {
288        &*self
289    }
290
291    /// Returns this column name as a `&mut Box<str>`.
292    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
293        &mut self.0
294    }
295
296    /// Returns if this [`ColumnName`] is similar to the provided one.
297    pub fn is_similar(&self, other: &ColumnName) -> bool {
298        const SIMILARITY_THRESHOLD: f64 = 0.6;
299
300        let a_lowercase = self.to_lowercase();
301        let b_lowercase = other.to_lowercase();
302
303        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
304    }
305}
306
307impl std::ops::Deref for ColumnName {
308    type Target = str;
309
310    #[inline(always)]
311    fn deref(&self) -> &Self::Target {
312        &self.0
313    }
314}
315
316impl fmt::Display for ColumnName {
317    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
318        f.write_str(&self.0)
319    }
320}
321
322impl From<String> for ColumnName {
323    fn from(s: String) -> ColumnName {
324        ColumnName(s.into())
325    }
326}
327
328impl From<&str> for ColumnName {
329    fn from(s: &str) -> ColumnName {
330        ColumnName(s.into())
331    }
332}
333
334impl From<&ColumnName> for ColumnName {
335    fn from(n: &ColumnName) -> ColumnName {
336        n.clone()
337    }
338}
339
340impl RustType<ProtoColumnName> for ColumnName {
341    fn into_proto(&self) -> ProtoColumnName {
342        ProtoColumnName {
343            value: Some(self.0.to_string()),
344        }
345    }
346
347    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
348        Ok(ColumnName(
349            proto
350                .value
351                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
352                .into(),
353        ))
354    }
355}
356
357impl From<ColumnName> for mz_sql_parser::ast::Ident {
358    fn from(value: ColumnName) -> Self {
359        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
360        mz_sql_parser::ast::Ident::new_unchecked(value.0)
361    }
362}
363
364impl proptest::arbitrary::Arbitrary for ColumnName {
365    type Parameters = ();
366    type Strategy = BoxedStrategy<ColumnName>;
367
368    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
369        // Long column names are generally uninteresting, and can greatly
370        // increase the runtime for a test case, so bound the max length.
371        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
372        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
373            weights.extend([
374                (5, Just(16..128)),
375                (1, Just(128..1024)),
376                (1, Just(1024..4096)),
377            ]);
378        }
379        let name_length = Union::new_weighted(weights);
380
381        // Non-ASCII characters are also generally uninteresting and can make
382        // debugging harder.
383        let char_strat = Rc::new(Union::new_weighted(vec![
384            (50, proptest::char::range('A', 'z').boxed()),
385            (1, any::<char>().boxed()),
386        ]));
387
388        name_length
389            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
390            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
391            .no_shrink()
392            .boxed()
393    }
394}
395
396/// Default name of a column (when no other information is known).
397pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
398
399/// Stable index of a column in a [`RelationDesc`].
400#[derive(
401    Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
402)]
403pub struct ColumnIndex(usize);
404
405static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
406
407impl ColumnIndex {
408    /// Returns a stable identifier for this [`ColumnIndex`].
409    pub fn to_stable_name(&self) -> String {
410        self.0.to_string()
411    }
412
413    pub fn to_raw(&self) -> usize {
414        self.0
415    }
416
417    pub fn from_raw(val: usize) -> Self {
418        ColumnIndex(val)
419    }
420}
421
422/// The version a given column was added at.
423#[derive(
424    Clone,
425    Copy,
426    Debug,
427    Eq,
428    PartialEq,
429    PartialOrd,
430    Ord,
431    Serialize,
432    Deserialize,
433    Hash,
434    MzReflect,
435    Arbitrary,
436)]
437pub struct RelationVersion(u64);
438
439impl RelationVersion {
440    /// Returns the "root" or "initial" version of a [`RelationDesc`].
441    pub fn root() -> Self {
442        RelationVersion(0)
443    }
444
445    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
446    pub fn bump(&self) -> Self {
447        let next_version = self
448            .0
449            .checked_add(1)
450            .expect("added more than u64::MAX columns?");
451        RelationVersion(next_version)
452    }
453
454    /// Consume a [`RelationVersion`] returning the raw value.
455    ///
456    /// Should __only__ be used for serialization.
457    pub fn into_raw(self) -> u64 {
458        self.0
459    }
460
461    /// Create a [`RelationVersion`] from a raw value.
462    ///
463    /// Should __only__ be used for serialization.
464    pub fn from_raw(val: u64) -> RelationVersion {
465        RelationVersion(val)
466    }
467}
468
469impl From<RelationVersion> for SchemaId {
470    fn from(value: RelationVersion) -> Self {
471        SchemaId(usize::cast_from(value.0))
472    }
473}
474
475impl From<mz_sql_parser::ast::Version> for RelationVersion {
476    fn from(value: mz_sql_parser::ast::Version) -> Self {
477        RelationVersion(value.into_inner())
478    }
479}
480
481impl fmt::Display for RelationVersion {
482    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483        write!(f, "v{}", self.0)
484    }
485}
486
487impl From<RelationVersion> for mz_sql_parser::ast::Version {
488    fn from(value: RelationVersion) -> Self {
489        mz_sql_parser::ast::Version::new(value.0)
490    }
491}
492
493impl RustType<ProtoRelationVersion> for RelationVersion {
494    fn into_proto(&self) -> ProtoRelationVersion {
495        ProtoRelationVersion { value: self.0 }
496    }
497
498    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
499        Ok(RelationVersion(proto.value))
500    }
501}
502
503/// Metadata (other than type) for a column in a [`RelationDesc`].
504#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
505struct ColumnMetadata {
506    /// Name of the column.
507    name: ColumnName,
508    /// Index into a [`SqlRelationType`] for this column.
509    typ_idx: usize,
510    /// Version this column was added at.
511    added: RelationVersion,
512    /// Version this column was dropped at.
513    dropped: Option<RelationVersion>,
514}
515
516/// A description of the shape of a relation.
517///
518/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
519/// the relation.
520///
521/// # Examples
522///
523/// A `RelationDesc`s is typically constructed via its builder API:
524///
525/// ```
526/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
527///
528/// let desc = RelationDesc::builder()
529///     .with_column("id", SqlScalarType::Int64.nullable(false))
530///     .with_column("price", SqlScalarType::Float64.nullable(true))
531///     .finish();
532/// ```
533///
534/// In more complicated cases, like when constructing a `RelationDesc` in
535/// response to user input, it may be more convenient to construct a relation
536/// type first, and imbue it with column names to form a `RelationDesc` later:
537///
538/// ```
539/// use mz_repr::RelationDesc;
540///
541/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
542/// let relation_type = plan_query("SELECT * FROM table");
543/// let names = (0..relation_type.arity()).map(|i| match i {
544///     0 => "first",
545///     1 => "second",
546///     _ => "unknown",
547/// });
548/// let desc = RelationDesc::new(relation_type, names);
549/// ```
550///
551/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
552/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
553/// column throughout the lifetime of the relation. This allows a
554/// [`RelationDesc`] to represent a projection over a version of itself.
555///
556/// ```
557/// use std::collections::BTreeSet;
558/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
559///
560/// let desc = RelationDesc::builder()
561///     .with_column("name", SqlScalarType::String.nullable(false))
562///     .with_column("email", SqlScalarType::String.nullable(false))
563///     .finish();
564///
565/// // Project away the second column.
566/// let demands = BTreeSet::from([1]);
567/// let proj = desc.apply_demand(&demands);
568///
569/// // We projected away the first column.
570/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
571/// // But retained the second.
572/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
573///
574/// // The underlying `SqlRelationType` also contains a single column.
575/// assert_eq!(proj.typ().arity(), 1);
576/// ```
577///
578/// To maintain this stable mapping and track the lifetime of a column (e.g.
579/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
580/// the index in [`SqlRelationType`] that corresponds to a given column, and the
581/// version at which this column was added or dropped.
582///
583#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
584pub struct RelationDesc {
585    typ: SqlRelationType,
586    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
587}
588
589impl RustType<ProtoRelationDesc> for RelationDesc {
590    fn into_proto(&self) -> ProtoRelationDesc {
591        let (names, metadata): (Vec<_>, Vec<_>) = self
592            .metadata
593            .values()
594            .map(|meta| {
595                let metadata = ProtoColumnMetadata {
596                    added: Some(meta.added.into_proto()),
597                    dropped: meta.dropped.map(|v| v.into_proto()),
598                };
599                (meta.name.into_proto(), metadata)
600            })
601            .unzip();
602
603        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
604        // metadata field was added. To make sure our serialization roundtrips the same as before
605        // we added the field, we omit `metadata` if all of the values are equal to the default.
606        //
607        // Note: This logic needs to exist approximately forever.
608        let is_all_default_metadata = metadata.iter().all(|meta| {
609            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
610        });
611        let metadata = if is_all_default_metadata {
612            Vec::new()
613        } else {
614            metadata
615        };
616
617        ProtoRelationDesc {
618            typ: Some(self.typ.into_proto()),
619            names,
620            metadata,
621        }
622    }
623
624    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
625        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
626        // metadata field was added. If the field doesn't exist we fill it in with default values,
627        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
628        //
629        // Note: This logic needs to exist approximately forever.
630        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
631            let val = ProtoColumnMetadata {
632                added: Some(RelationVersion::root().into_proto()),
633                dropped: None,
634            };
635            Box::new(itertools::repeat_n(val, proto.names.len()))
636        } else {
637            Box::new(proto.metadata.into_iter())
638        };
639
640        let metadata = proto
641            .names
642            .into_iter()
643            .zip_eq(proto_metadata)
644            .enumerate()
645            .map(|(idx, (name, metadata))| {
646                let meta = ColumnMetadata {
647                    name: name.into_rust()?,
648                    typ_idx: idx,
649                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
650                    dropped: metadata.dropped.into_rust()?,
651                };
652                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
653            })
654            .collect::<Result<_, _>>()?;
655
656        Ok(RelationDesc {
657            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
658            metadata,
659        })
660    }
661}
662
663impl RelationDesc {
664    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
665    pub fn builder() -> RelationDescBuilder {
666        RelationDescBuilder::default()
667    }
668
669    /// Constructs a new `RelationDesc` that represents the empty relation
670    /// with no columns and no keys.
671    pub fn empty() -> Self {
672        RelationDesc {
673            typ: SqlRelationType::empty(),
674            metadata: BTreeMap::default(),
675        }
676    }
677
678    /// Check if the `RelationDesc` is empty.
679    pub fn is_empty(&self) -> bool {
680        self == &Self::empty()
681    }
682
683    /// Returns the number of columns in this [`RelationDesc`].
684    pub fn len(&self) -> usize {
685        self.typ().column_types.len()
686    }
687
688    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
689    /// over column names.
690    ///
691    /// # Panics
692    ///
693    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
694    /// items in `names`.
695    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
696    where
697        I: IntoIterator<Item = N>,
698        N: Into<ColumnName>,
699    {
700        let metadata: BTreeMap<_, _> = names
701            .into_iter()
702            .enumerate()
703            .map(|(idx, name)| {
704                let col_idx = ColumnIndex(idx);
705                let metadata = ColumnMetadata {
706                    name: name.into(),
707                    typ_idx: idx,
708                    added: RelationVersion::root(),
709                    dropped: None,
710                };
711                (col_idx, metadata)
712            })
713            .collect();
714
715        // TODO(parkmycar): Add better validation here.
716        assert_eq!(typ.column_types.len(), metadata.len());
717
718        RelationDesc { typ, metadata }
719    }
720
721    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
722    where
723        I: IntoIterator<Item = (N, T)>,
724        T: Into<SqlColumnType>,
725        N: Into<ColumnName>,
726    {
727        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
728        let types = types.into_iter().map(Into::into).collect();
729        let typ = SqlRelationType::new(types);
730        Self::new(typ, names)
731    }
732
733    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
734    ///
735    /// # Panics
736    ///
737    /// Panics if either `self` or `other` have columns that were added at a
738    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
739    /// columns were dropped.
740    ///
741    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
742    pub fn concat(mut self, other: Self) -> Self {
743        let self_len = self.typ.column_types.len();
744
745        for (typ, (_col_idx, meta)) in other
746            .typ
747            .column_types
748            .into_iter()
749            .zip_eq(other.metadata.into_iter())
750        {
751            assert_eq!(meta.added, RelationVersion::root());
752            assert_none!(meta.dropped);
753
754            let new_idx = self.typ.columns().len();
755            let new_meta = ColumnMetadata {
756                name: meta.name,
757                typ_idx: new_idx,
758                added: RelationVersion::root(),
759                dropped: None,
760            };
761
762            self.typ.column_types.push(typ);
763            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
764
765            assert_eq!(self.metadata.len(), self.typ.columns().len());
766            assert_none!(prev);
767        }
768
769        for k in other.typ.keys {
770            let k = k.into_iter().map(|idx| idx + self_len).collect();
771            self = self.with_key(k);
772        }
773        self
774    }
775
776    /// Adds a new key for the relation.
777    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
778        self.typ = self.typ.with_key(indices);
779        self
780    }
781
782    /// Drops all existing keys.
783    pub fn without_keys(mut self) -> Self {
784        self.typ.keys.clear();
785        self
786    }
787
788    /// Builds a new relation description with the column names replaced with
789    /// new names.
790    ///
791    /// # Panics
792    ///
793    /// Panics if the arity of the relation type does not match the number of
794    /// items in `names`.
795    pub fn with_names<I, N>(self, names: I) -> Self
796    where
797        I: IntoIterator<Item = N>,
798        N: Into<ColumnName>,
799    {
800        Self::new(self.typ, names)
801    }
802
803    /// Computes the number of columns in the relation.
804    pub fn arity(&self) -> usize {
805        self.typ.arity()
806    }
807
808    /// Returns the relation type underlying this relation description.
809    pub fn typ(&self) -> &SqlRelationType {
810        &self.typ
811    }
812
813    /// Returns an iterator over the columns in this relation.
814    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
815        self.metadata.values().map(|meta| {
816            let typ = &self.typ.columns()[meta.typ_idx];
817            (&meta.name, typ)
818        })
819    }
820
821    /// Returns an iterator over the types of the columns in this relation.
822    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
823        self.typ.column_types.iter()
824    }
825
826    /// Returns an iterator over the names of the columns in this relation.
827    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
828        self.metadata.values().map(|meta| &meta.name)
829    }
830
831    /// Returns an iterator over the columns in this relation, with all their metadata.
832    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
833        self.metadata.iter().map(|(col_idx, metadata)| {
834            let col_typ = &self.typ.columns()[metadata.typ_idx];
835            (col_idx, &metadata.name, col_typ)
836        })
837    }
838
839    /// Returns an iterator over the names of the columns in this relation that are "similar" to
840    /// the provided `name`.
841    pub fn iter_similar_names<'a>(
842        &'a self,
843        name: &'a ColumnName,
844    ) -> impl Iterator<Item = &'a ColumnName> {
845        self.iter_names().filter(|n| n.is_similar(name))
846    }
847
848    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
849    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
850        self.metadata.contains_key(idx)
851    }
852
853    /// Finds a column by name.
854    ///
855    /// Returns the index and type of the column named `name`. If no column with
856    /// the specified name exists, returns `None`. If multiple columns have the
857    /// specified name, the leftmost column is returned.
858    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
859        self.iter_names()
860            .position(|n| n == name)
861            .map(|i| (i, &self.typ.column_types[i]))
862    }
863
864    /// Gets the name of the `i`th column.
865    ///
866    /// # Panics
867    ///
868    /// Panics if `i` is not a valid column index.
869    ///
870    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
871    pub fn get_name(&self, i: usize) -> &ColumnName {
872        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
873        self.get_name_idx(&ColumnIndex(i))
874    }
875
876    /// Gets the name of the column at `idx`.
877    ///
878    /// # Panics
879    ///
880    /// Panics if no column exists at `idx`.
881    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
882        &self.metadata.get(idx).expect("should exist").name
883    }
884
885    /// Mutably gets the name of the `i`th column.
886    ///
887    /// # Panics
888    ///
889    /// Panics if `i` is not a valid column index.
890    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
891        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
892        &mut self
893            .metadata
894            .get_mut(&ColumnIndex(i))
895            .expect("should exist")
896            .name
897    }
898
899    /// Gets the [`SqlColumnType`] of the column at `idx`.
900    ///
901    /// # Panics
902    ///
903    /// Panics if no column exists at `idx`.
904    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
905        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
906        &self.typ.column_types[typ_idx]
907    }
908
909    /// Gets the name of the `i`th column if that column name is unambiguous.
910    ///
911    /// If at least one other column has the same name as the `i`th column,
912    /// returns `None`. If the `i`th column has no name, returns `None`.
913    ///
914    /// # Panics
915    ///
916    /// Panics if `i` is not a valid column index.
917    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
918        let name = self.get_name(i);
919        if self.iter_names().filter(|n| *n == name).count() == 1 {
920            Some(name)
921        } else {
922            None
923        }
924    }
925
926    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
927    ///
928    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
929    /// structure will be simple to extend.
930    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
931        let name = self.get_name(i);
932        let typ = &self.typ.column_types[i];
933        if d == &Datum::Null && !typ.nullable {
934            Err(NotNullViolation(name.clone()))
935        } else {
936            Ok(())
937        }
938    }
939
940    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
941    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
942        let mut new_desc = self.clone();
943
944        // Update ColumnMetadata.
945        let mut removed = 0;
946        new_desc.metadata.retain(|idx, metadata| {
947            let retain = demands.contains(&idx.0);
948            if !retain {
949                removed += 1;
950            } else {
951                metadata.typ_idx -= removed;
952            }
953            retain
954        });
955
956        // Update SqlColumnType.
957        let mut idx = 0;
958        new_desc.typ.column_types.retain(|_| {
959            let keep = demands.contains(&idx);
960            idx += 1;
961            keep
962        });
963
964        new_desc
965    }
966}
967
968impl Arbitrary for RelationDesc {
969    type Parameters = ();
970    type Strategy = BoxedStrategy<RelationDesc>;
971
972    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
973        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
974        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
975            weights.extend([
976                (12, Just(16..32)),
977                (6, Just(32..64)),
978                (3, Just(64..128)),
979                (1, Just(128..256)),
980            ]);
981        }
982        let num_columns = Union::new_weighted(weights);
983
984        num_columns.prop_flat_map(arb_relation_desc).boxed()
985    }
986}
987
988/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
989/// within the range provided.
990pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
991    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
992        .prop_map(RelationDesc::from_names_and_types)
993}
994
995/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
996pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
997    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
998    mask.prop_map(move |mask| {
999        let demands: BTreeSet<_> = mask
1000            .into_iter()
1001            .enumerate()
1002            .filter_map(|(idx, keep)| keep.then_some(idx))
1003            .collect();
1004        desc.apply_demand(&demands)
1005    })
1006}
1007
1008impl IntoIterator for RelationDesc {
1009    type Item = (ColumnName, SqlColumnType);
1010    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1011
1012    fn into_iter(self) -> Self::IntoIter {
1013        let iter = self
1014            .metadata
1015            .into_values()
1016            .zip_eq(self.typ.column_types)
1017            .map(|(meta, typ)| (meta.name, typ));
1018        Box::new(iter)
1019    }
1020}
1021
1022/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1023pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1024    let datums: Vec<_> = desc
1025        .typ()
1026        .columns()
1027        .iter()
1028        .cloned()
1029        .map(arb_datum_for_column)
1030        .collect();
1031    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1032}
1033
1034/// Expression violated not-null constraint on named column
1035#[derive(Debug, PartialEq, Eq)]
1036pub struct NotNullViolation(pub ColumnName);
1037
1038impl fmt::Display for NotNullViolation {
1039    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1040        write!(
1041            f,
1042            "null value in column {} violates not-null constraint",
1043            self.0.quoted()
1044        )
1045    }
1046}
1047
1048/// A builder for a [`RelationDesc`].
1049#[derive(Clone, Default, Debug, PartialEq, Eq)]
1050pub struct RelationDescBuilder {
1051    /// Columns of the relation.
1052    columns: Vec<(ColumnName, SqlColumnType)>,
1053    /// Sets of indices that are "keys" for the collection.
1054    keys: Vec<Vec<usize>>,
1055}
1056
1057impl RelationDescBuilder {
1058    /// Appends a column with the specified name and type.
1059    pub fn with_column<N: Into<ColumnName>>(
1060        mut self,
1061        name: N,
1062        ty: SqlColumnType,
1063    ) -> RelationDescBuilder {
1064        let name = name.into();
1065        self.columns.push((name, ty));
1066        self
1067    }
1068
1069    /// Appends the provided columns to the builder.
1070    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1071    where
1072        I: IntoIterator<Item = (N, T)>,
1073        T: Into<SqlColumnType>,
1074        N: Into<ColumnName>,
1075    {
1076        self.columns
1077            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1078        self
1079    }
1080
1081    /// Adds a new key for the relation.
1082    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1083        indices.sort_unstable();
1084        if !self.keys.contains(&indices) {
1085            self.keys.push(indices);
1086        }
1087        self
1088    }
1089
1090    /// Removes all previously inserted keys.
1091    pub fn without_keys(mut self) -> RelationDescBuilder {
1092        self.keys.clear();
1093        assert_eq!(self.keys.len(), 0);
1094        self
1095    }
1096
1097    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1098    pub fn concat(mut self, other: Self) -> Self {
1099        let self_len = self.columns.len();
1100
1101        self.columns.extend(other.columns);
1102        for k in other.keys {
1103            let k = k.into_iter().map(|idx| idx + self_len).collect();
1104            self = self.with_key(k);
1105        }
1106
1107        self
1108    }
1109
1110    /// Finish the builder, returning a [`RelationDesc`].
1111    pub fn finish(self) -> RelationDesc {
1112        let mut desc = RelationDesc::from_names_and_types(self.columns);
1113        desc.typ = desc.typ.with_keys(self.keys);
1114        desc
1115    }
1116}
1117
1118/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1119#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1120pub enum RelationVersionSelector {
1121    Specific(RelationVersion),
1122    Latest,
1123}
1124
1125impl RelationVersionSelector {
1126    pub fn specific(version: u64) -> Self {
1127        RelationVersionSelector::Specific(RelationVersion(version))
1128    }
1129}
1130
1131/// A wrapper around [`RelationDesc`] that provides an interface for adding
1132/// columns and generating new versions.
1133///
1134/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1135/// be great.
1136#[derive(Debug, Clone, Serialize)]
1137pub struct VersionedRelationDesc {
1138    inner: RelationDesc,
1139}
1140
1141impl VersionedRelationDesc {
1142    pub fn new(inner: RelationDesc) -> Self {
1143        VersionedRelationDesc { inner }
1144    }
1145
1146    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1147    ///
1148    /// # Panics
1149    ///
1150    /// * Panics if a column with `name` already exists that hasn't been dropped.
1151    ///
1152    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1153    #[must_use]
1154    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1155    where
1156        N: Into<ColumnName>,
1157        T: Into<SqlColumnType>,
1158    {
1159        let latest_version = self.latest_version();
1160        let new_version = latest_version.bump();
1161
1162        let name = name.into();
1163        let existing = self
1164            .inner
1165            .metadata
1166            .iter()
1167            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1168        if let Some(existing) = existing {
1169            panic!("column named '{name}' already exists! {existing:?}");
1170        }
1171
1172        let next_idx = self.inner.metadata.len();
1173        let col_meta = ColumnMetadata {
1174            name,
1175            typ_idx: next_idx,
1176            added: new_version,
1177            dropped: None,
1178        };
1179
1180        self.inner.typ.column_types.push(typ.into());
1181        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1182
1183        assert_none!(prev, "column index overlap!");
1184        self.validate();
1185
1186        new_version
1187    }
1188
1189    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1190    /// `name` drops the left-most one that hasn't already been dropped.
1191    ///
1192    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1193    ///
1194    /// # Panics
1195    ///
1196    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1197    #[must_use]
1198    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1199    where
1200        N: Into<ColumnName>,
1201    {
1202        let name = name.into();
1203        let latest_version = self.latest_version();
1204        let new_version = latest_version.bump();
1205
1206        let col = self
1207            .inner
1208            .metadata
1209            .values_mut()
1210            .find(|meta| meta.name == name && meta.dropped.is_none())
1211            .expect("column to exist");
1212
1213        // Make sure the column hadn't been previously dropped.
1214        assert_none!(col.dropped, "column was already dropped");
1215        col.dropped = Some(new_version);
1216
1217        // Make sure the column isn't being used as a key.
1218        let dropped_key = self
1219            .inner
1220            .typ
1221            .keys
1222            .iter()
1223            .any(|keys| keys.contains(&col.typ_idx));
1224        assert!(!dropped_key, "column being dropped was used as a key");
1225
1226        self.validate();
1227        new_version
1228    }
1229
1230    /// Returns the [`RelationDesc`] at the latest version.
1231    pub fn latest(&self) -> RelationDesc {
1232        self.inner.clone()
1233    }
1234
1235    /// Returns this [`RelationDesc`] at the specified version.
1236    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1237        // Get all of the changes from the start, up to whatever version was requested.
1238        //
1239        // TODO(parkmycar): We should probably panic on unknown verisons?
1240        let up_to_version = match version {
1241            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1242            RelationVersionSelector::Specific(v) => v,
1243        };
1244
1245        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1246            let added = meta.added <= up_to_version;
1247            let dropped = meta
1248                .dropped
1249                .map(|dropped_at| up_to_version >= dropped_at)
1250                .unwrap_or(false);
1251
1252            added && !dropped
1253        });
1254
1255        let mut column_types = Vec::new();
1256        let mut column_metas = BTreeMap::new();
1257
1258        // N.B. At this point we need to be careful because col_idx might not
1259        // equal typ_idx.
1260        //
1261        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1262        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1263        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1264        for (col_idx, meta) in valid_columns {
1265            let new_meta = ColumnMetadata {
1266                name: meta.name.clone(),
1267                typ_idx: column_types.len(),
1268                added: meta.added.clone(),
1269                dropped: meta.dropped.clone(),
1270            };
1271            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1272            column_metas.insert(*col_idx, new_meta);
1273        }
1274
1275        // Remap keys in case a column with an index less than that of a key was
1276        // dropped.
1277        //
1278        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1279        // keys and "b" was dropped.
1280        let keys = self
1281            .inner
1282            .typ
1283            .keys
1284            .iter()
1285            .map(|keys| {
1286                keys.iter()
1287                    .map(|key_idx| {
1288                        let metadata = column_metas
1289                            .get(&ColumnIndex(*key_idx))
1290                            .expect("found key for column that doesn't exist");
1291                        metadata.typ_idx
1292                    })
1293                    .collect()
1294            })
1295            .collect();
1296
1297        let relation_type = SqlRelationType { column_types, keys };
1298
1299        RelationDesc {
1300            typ: relation_type,
1301            metadata: column_metas,
1302        }
1303    }
1304
1305    pub fn latest_version(&self) -> RelationVersion {
1306        self.inner
1307            .metadata
1308            .values()
1309            // N.B. Dropped is always greater than added.
1310            .map(|meta| meta.dropped.unwrap_or(meta.added))
1311            .max()
1312            // If there aren't any columns we're implicitly the root version.
1313            .unwrap_or_else(RelationVersion::root)
1314    }
1315
1316    /// Validates internal contraints of the [`RelationDesc`] are correct.
1317    ///
1318    /// # Panics
1319    ///
1320    /// Panics if a constraint is not satisfied.
1321    fn validate(&self) {
1322        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1323            if desc.typ.column_types.len() != desc.metadata.len() {
1324                anyhow::bail!("mismatch between number of types and metadatas");
1325            }
1326
1327            for (col_idx, meta) in &desc.metadata {
1328                if col_idx.0 > desc.metadata.len() {
1329                    anyhow::bail!("column index out of bounds");
1330                }
1331                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1332                    anyhow::bail!("column was added after it was dropped?");
1333                }
1334                if desc.typ().columns().get(meta.typ_idx).is_none() {
1335                    anyhow::bail!("typ_idx incorrect");
1336                }
1337            }
1338
1339            for keys in &desc.typ.keys {
1340                for key in keys {
1341                    if *key >= desc.typ.column_types.len() {
1342                        anyhow::bail!("key index was out of bounds!");
1343                    }
1344                }
1345            }
1346
1347            let versions = desc
1348                .metadata
1349                .values()
1350                .map(|meta| meta.dropped.unwrap_or(meta.added));
1351            let mut max = 0;
1352            let mut sum = 0;
1353            for version in versions {
1354                max = std::cmp::max(max, version.0);
1355                sum += version.0;
1356            }
1357
1358            // Other than RelationVersion(0), we should never have duplicate
1359            // versions and they should always increase by 1. In other words, the
1360            // sum of all RelationVersions should be the sum of [0, max].
1361            //
1362            // N.B. n * (n + 1) / 2 = sum of [0, n]
1363            //
1364            // While I normally don't like tricks like this, it allows us to
1365            // validate that our column versions are correct in O(n) time and
1366            // without allocations.
1367            if sum != (max * (max + 1) / 2) {
1368                anyhow::bail!("there is a duplicate or missing relation version");
1369            }
1370
1371            Ok(())
1372        }
1373
1374        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1375    }
1376}
1377
1378/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1379/// exercise schema migrations.
1380#[derive(Debug)]
1381pub enum PropRelationDescDiff {
1382    AddColumn {
1383        name: ColumnName,
1384        typ: SqlColumnType,
1385    },
1386    DropColumn {
1387        name: ColumnName,
1388    },
1389    ToggleNullability {
1390        name: ColumnName,
1391    },
1392    ChangeType {
1393        name: ColumnName,
1394        typ: SqlColumnType,
1395    },
1396}
1397
1398impl PropRelationDescDiff {
1399    pub fn apply(self, desc: &mut RelationDesc) {
1400        match self {
1401            PropRelationDescDiff::AddColumn { name, typ } => {
1402                let new_idx = desc.metadata.len();
1403                let meta = ColumnMetadata {
1404                    name,
1405                    typ_idx: new_idx,
1406                    added: RelationVersion(0),
1407                    dropped: None,
1408                };
1409                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1410                desc.typ.column_types.push(typ);
1411
1412                assert_none!(prev);
1413                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1414            }
1415            PropRelationDescDiff::DropColumn { name } => {
1416                let next_version = desc
1417                    .metadata
1418                    .values()
1419                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1420                    .max()
1421                    .unwrap_or_else(RelationVersion::root)
1422                    .bump();
1423                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1424                else {
1425                    return;
1426                };
1427                if metadata.dropped.is_none() {
1428                    metadata.dropped = Some(next_version);
1429                }
1430            }
1431            PropRelationDescDiff::ToggleNullability { name } => {
1432                let Some((pos, _)) = desc.get_by_name(&name) else {
1433                    return;
1434                };
1435                let col_type = desc
1436                    .typ
1437                    .column_types
1438                    .get_mut(pos)
1439                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1440                col_type.nullable = !col_type.nullable;
1441            }
1442            PropRelationDescDiff::ChangeType { name, typ } => {
1443                let Some((pos, _)) = desc.get_by_name(&name) else {
1444                    return;
1445                };
1446                let col_type = desc
1447                    .typ
1448                    .column_types
1449                    .get_mut(pos)
1450                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1451                *col_type = typ;
1452            }
1453        }
1454    }
1455}
1456
1457/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1458pub fn arb_relation_desc_diff(
1459    source: &RelationDesc,
1460) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1461    let source = Rc::new(source.clone());
1462    let num_source_columns = source.typ.columns().len();
1463
1464    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1465    let add_columns_strat = num_add_columns
1466        .prop_flat_map(|num_columns| {
1467            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1468        })
1469        .prop_map(|cols| {
1470            cols.into_iter()
1471                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1472                .collect::<Vec<_>>()
1473        });
1474
1475    // If the source RelationDesc is empty there is nothing else to do.
1476    if num_source_columns == 0 {
1477        return add_columns_strat.boxed();
1478    }
1479
1480    let source_ = Rc::clone(&source);
1481    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1482        let mut set = BTreeSet::default();
1483        for _ in 0..num_columns {
1484            let col_idx = rng.random_range(0..num_source_columns);
1485            set.insert(source_.get_name(col_idx).clone());
1486        }
1487        set.into_iter()
1488            .map(|name| PropRelationDescDiff::DropColumn { name })
1489            .collect::<Vec<_>>()
1490    });
1491
1492    let source_ = Rc::clone(&source);
1493    let toggle_nullability_strat =
1494        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1495            let mut set = BTreeSet::default();
1496            for _ in 0..num_columns {
1497                let col_idx = rng.random_range(0..num_source_columns);
1498                set.insert(source_.get_name(col_idx).clone());
1499            }
1500            set.into_iter()
1501                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1502                .collect::<Vec<_>>()
1503        });
1504
1505    let source_ = Rc::clone(&source);
1506    let change_type_strat = (0..num_source_columns)
1507        .prop_perturb(move |num_columns, mut rng| {
1508            let mut set = BTreeSet::default();
1509            for _ in 0..num_columns {
1510                let col_idx = rng.random_range(0..num_source_columns);
1511                set.insert(source_.get_name(col_idx).clone());
1512            }
1513            set
1514        })
1515        .prop_flat_map(|cols| {
1516            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1517                .prop_map(move |types| (cols.clone(), types))
1518        })
1519        .prop_map(|(cols, types)| {
1520            cols.into_iter()
1521                .zip_eq(types)
1522                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1523                .collect::<Vec<_>>()
1524        });
1525
1526    (
1527        add_columns_strat,
1528        drop_columns_strat,
1529        toggle_nullability_strat,
1530        change_type_strat,
1531    )
1532        .prop_map(|(adds, drops, toggles, changes)| {
1533            adds.into_iter()
1534                .chain(drops)
1535                .chain(toggles)
1536                .chain(changes)
1537                .collect::<Vec<_>>()
1538        })
1539        .prop_shuffle()
1540        .boxed()
1541}
1542
1543#[cfg(test)]
1544mod tests {
1545    use super::*;
1546    use prost::Message;
1547
1548    #[mz_ore::test]
1549    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1550    fn smoktest_at_version() {
1551        let desc = RelationDesc::builder()
1552            .with_column("a", SqlScalarType::Bool.nullable(true))
1553            .with_column("z", SqlScalarType::String.nullable(false))
1554            .finish();
1555
1556        let mut versioned_desc = VersionedRelationDesc {
1557            inner: desc.clone(),
1558        };
1559        versioned_desc.validate();
1560
1561        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1562        assert_eq!(desc, latest);
1563
1564        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1565        assert_eq!(desc, v0);
1566
1567        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1568        assert_eq!(desc, v3);
1569
1570        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
1571        assert_eq!(v1, RelationVersion(1));
1572
1573        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1574        insta::assert_json_snapshot!(v1.metadata, @r###"
1575        {
1576          "0": {
1577            "name": "a",
1578            "typ_idx": 0,
1579            "added": 0,
1580            "dropped": null
1581          },
1582          "1": {
1583            "name": "z",
1584            "typ_idx": 1,
1585            "added": 0,
1586            "dropped": null
1587          },
1588          "2": {
1589            "name": "b",
1590            "typ_idx": 2,
1591            "added": 1,
1592            "dropped": null
1593          }
1594        }
1595        "###);
1596
1597        // Check that V0 doesn't show the new column.
1598        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1599        assert!(v0.iter().eq(v0_b.iter()));
1600
1601        let v2 = versioned_desc.drop_column("z");
1602        assert_eq!(v2, RelationVersion(2));
1603
1604        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1605        insta::assert_json_snapshot!(v2.metadata, @r###"
1606        {
1607          "0": {
1608            "name": "a",
1609            "typ_idx": 0,
1610            "added": 0,
1611            "dropped": null
1612          },
1613          "2": {
1614            "name": "b",
1615            "typ_idx": 1,
1616            "added": 1,
1617            "dropped": null
1618          }
1619        }
1620        "###);
1621
1622        // Check that V0 and V1 are still correct.
1623        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1624        assert!(v0.iter().eq(v0_c.iter()));
1625
1626        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1627        assert!(v1.iter().eq(v1_b.iter()));
1628
1629        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1630        {
1631          "0": {
1632            "name": "a",
1633            "typ_idx": 0,
1634            "added": 0,
1635            "dropped": null
1636          },
1637          "1": {
1638            "name": "z",
1639            "typ_idx": 1,
1640            "added": 0,
1641            "dropped": 2
1642          },
1643          "2": {
1644            "name": "b",
1645            "typ_idx": 2,
1646            "added": 1,
1647            "dropped": null
1648          }
1649        }
1650        "###);
1651    }
1652
1653    #[mz_ore::test]
1654    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1655    fn test_dropping_columns_with_keys() {
1656        let desc = RelationDesc::builder()
1657            .with_column("a", SqlScalarType::Bool.nullable(true))
1658            .with_column("z", SqlScalarType::String.nullable(false))
1659            .with_key(vec![1])
1660            .finish();
1661
1662        let mut versioned_desc = VersionedRelationDesc {
1663            inner: desc.clone(),
1664        };
1665        versioned_desc.validate();
1666
1667        let v1 = versioned_desc.drop_column("a");
1668        assert_eq!(v1, RelationVersion(1));
1669
1670        // Make sure the key index for 'z' got remapped since 'a' was dropped.
1671        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1672        insta::assert_json_snapshot!(v1, @r###"
1673        {
1674          "typ": {
1675            "column_types": [
1676              {
1677                "scalar_type": "String",
1678                "nullable": false
1679              }
1680            ],
1681            "keys": [
1682              [
1683                0
1684              ]
1685            ]
1686          },
1687          "metadata": {
1688            "1": {
1689              "name": "z",
1690              "typ_idx": 0,
1691              "added": 0,
1692              "dropped": null
1693            }
1694          }
1695        }
1696        "###);
1697
1698        // Make sure the key index of 'z' is correct when all columns are present.
1699        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1700        insta::assert_json_snapshot!(v0, @r###"
1701        {
1702          "typ": {
1703            "column_types": [
1704              {
1705                "scalar_type": "Bool",
1706                "nullable": true
1707              },
1708              {
1709                "scalar_type": "String",
1710                "nullable": false
1711              }
1712            ],
1713            "keys": [
1714              [
1715                1
1716              ]
1717            ]
1718          },
1719          "metadata": {
1720            "0": {
1721              "name": "a",
1722              "typ_idx": 0,
1723              "added": 0,
1724              "dropped": 1
1725            },
1726            "1": {
1727              "name": "z",
1728              "typ_idx": 1,
1729              "added": 0,
1730              "dropped": null
1731            }
1732          }
1733        }
1734        "###);
1735    }
1736
1737    #[mz_ore::test]
1738    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1739    fn roundtrip_relation_desc_without_metadata() {
1740        let typ = ProtoRelationType {
1741            column_types: vec![
1742                SqlScalarType::String.nullable(false).into_proto(),
1743                SqlScalarType::Bool.nullable(true).into_proto(),
1744            ],
1745            keys: vec![],
1746        };
1747        let proto = ProtoRelationDesc {
1748            typ: Some(typ),
1749            names: vec![
1750                ColumnName("a".into()).into_proto(),
1751                ColumnName("b".into()).into_proto(),
1752            ],
1753            metadata: vec![],
1754        };
1755        let desc: RelationDesc = proto.into_rust().unwrap();
1756
1757        insta::assert_json_snapshot!(desc, @r###"
1758        {
1759          "typ": {
1760            "column_types": [
1761              {
1762                "scalar_type": "String",
1763                "nullable": false
1764              },
1765              {
1766                "scalar_type": "Bool",
1767                "nullable": true
1768              }
1769            ],
1770            "keys": []
1771          },
1772          "metadata": {
1773            "0": {
1774              "name": "a",
1775              "typ_idx": 0,
1776              "added": 0,
1777              "dropped": null
1778            },
1779            "1": {
1780              "name": "b",
1781              "typ_idx": 1,
1782              "added": 0,
1783              "dropped": null
1784            }
1785          }
1786        }
1787        "###);
1788    }
1789
1790    #[mz_ore::test]
1791    #[should_panic(expected = "column named 'a' already exists!")]
1792    fn test_add_column_with_same_name_panics() {
1793        let desc = RelationDesc::builder()
1794            .with_column("a", SqlScalarType::Bool.nullable(true))
1795            .finish();
1796        let mut versioned = VersionedRelationDesc::new(desc);
1797
1798        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
1799    }
1800
1801    #[mz_ore::test]
1802    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1803    fn test_add_column_with_same_name_prev_dropped() {
1804        let desc = RelationDesc::builder()
1805            .with_column("a", SqlScalarType::Bool.nullable(true))
1806            .finish();
1807        let mut versioned = VersionedRelationDesc::new(desc);
1808
1809        let v1 = versioned.drop_column("a");
1810        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
1811        insta::assert_json_snapshot!(v1, @r###"
1812        {
1813          "typ": {
1814            "column_types": [],
1815            "keys": []
1816          },
1817          "metadata": {}
1818        }
1819        "###);
1820
1821        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
1822        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
1823        insta::assert_json_snapshot!(v2, @r###"
1824        {
1825          "typ": {
1826            "column_types": [
1827              {
1828                "scalar_type": "String",
1829                "nullable": false
1830              }
1831            ],
1832            "keys": []
1833          },
1834          "metadata": {
1835            "1": {
1836              "name": "a",
1837              "typ_idx": 0,
1838              "added": 2,
1839              "dropped": null
1840            }
1841          }
1842        }
1843        "###);
1844    }
1845
1846    #[mz_ore::test]
1847    #[cfg_attr(miri, ignore)]
1848    fn apply_demand() {
1849        let desc = RelationDesc::builder()
1850            .with_column("a", SqlScalarType::String.nullable(true))
1851            .with_column("b", SqlScalarType::Int64.nullable(false))
1852            .with_column("c", SqlScalarType::Time.nullable(false))
1853            .finish();
1854        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
1855        assert_eq!(desc.arity(), 2);
1856        // TODO(parkmycar): Move validate onto RelationDesc.
1857        VersionedRelationDesc::new(desc).validate();
1858    }
1859
1860    #[mz_ore::test]
1861    #[cfg_attr(miri, ignore)]
1862    fn smoketest_column_index_stable_ident() {
1863        let idx_a = ColumnIndex(42);
1864        // Note(parkmycar): This should never change.
1865        assert_eq!(idx_a.to_stable_name(), "42");
1866    }
1867
1868    #[mz_ore::test]
1869    #[cfg_attr(miri, ignore)] // too slow
1870    fn proptest_relation_desc_roundtrips() {
1871        fn testcase(og: RelationDesc) {
1872            let bytes = og.into_proto().encode_to_vec();
1873            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
1874            let rnd = RelationDesc::from_proto(proto).unwrap();
1875
1876            assert_eq!(og, rnd);
1877        }
1878
1879        proptest!(|(desc in any::<RelationDesc>())| {
1880            testcase(desc);
1881        });
1882
1883        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
1884            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
1885        });
1886
1887        proptest!(|((mut desc, diffs) in strat)| {
1888            for diff in diffs {
1889                diff.apply(&mut desc);
1890            };
1891            testcase(desc);
1892        });
1893    }
1894}