mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26use timely::Container;
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31    ProtoRelationVersion,
32};
33use crate::{Datum, Row, ScalarType, arb_datum_for_column};
34
35/// The type of a [`Datum`].
36///
37/// [`ColumnType`] bundles information about the scalar type of a datum (e.g.,
38/// Int32 or String) with its nullability.
39///
40/// To construct a column type, either initialize the struct directly, or
41/// use the [`ScalarType::nullable`] method.
42#[derive(
43    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
44)]
45pub struct ColumnType {
46    /// The underlying scalar type (e.g., Int32 or String) of this column.
47    pub scalar_type: ScalarType,
48    /// Whether this datum can be null.
49    #[serde(default = "return_true")]
50    pub nullable: bool,
51}
52
53/// This method exists solely for the purpose of making ColumnType nullable by
54/// default in unit tests. The default value of a bool is false, and the only
55/// way to make an object take on any other value by default is to pass it a
56/// function that returns the desired default value. See
57/// <https://github.com/serde-rs/serde/issues/1030>
58#[inline(always)]
59fn return_true() -> bool {
60    true
61}
62
63impl ColumnType {
64    pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
65        match (self.scalar_type.clone(), other.scalar_type.clone()) {
66            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
67                Ok(ColumnType {
68                    scalar_type,
69                    nullable: self.nullable || other.nullable,
70                })
71            }
72            (scalar_type, other_scalar_type) if scalar_type.base_eq(&other_scalar_type) => {
73                Ok(ColumnType {
74                    scalar_type: scalar_type.without_modifiers(),
75                    nullable: self.nullable || other.nullable,
76                })
77            }
78            (
79                ScalarType::Record { fields, custom_id },
80                ScalarType::Record {
81                    fields: other_fields,
82                    custom_id: other_custom_id,
83                },
84            ) => {
85                if custom_id != other_custom_id {
86                    bail!(
87                        "Can't union types: {:?} and {:?}",
88                        self.scalar_type,
89                        other.scalar_type
90                    );
91                };
92
93                let mut union_fields: Vec<(ColumnName, ColumnType)> = vec![];
94                for (field, other_field) in fields.iter().zip(other_fields.iter()) {
95                    if field.0 != other_field.0 {
96                        bail!(
97                            "Can't union types: {:?} and {:?}",
98                            self.scalar_type,
99                            other.scalar_type
100                        );
101                    } else {
102                        let union_column_type = field.1.union(&other_field.1)?;
103                        union_fields.push((field.0.clone(), union_column_type));
104                    };
105                }
106
107                Ok(ColumnType {
108                    scalar_type: ScalarType::Record {
109                        fields: union_fields.into(),
110                        custom_id,
111                    },
112                    nullable: self.nullable || other.nullable,
113                })
114            }
115            _ => bail!(
116                "Can't union types: {:?} and {:?}",
117                self.scalar_type,
118                other.scalar_type
119            ),
120        }
121    }
122
123    /// Consumes this `ColumnType` and returns a new `ColumnType` with its
124    /// nullability set to the specified boolean.
125    pub fn nullable(mut self, nullable: bool) -> Self {
126        self.nullable = nullable;
127        self
128    }
129}
130
131impl RustType<ProtoColumnType> for ColumnType {
132    fn into_proto(&self) -> ProtoColumnType {
133        ProtoColumnType {
134            nullable: self.nullable,
135            scalar_type: Some(self.scalar_type.into_proto()),
136        }
137    }
138
139    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
140        Ok(ColumnType {
141            nullable: proto.nullable,
142            scalar_type: proto
143                .scalar_type
144                .into_rust_if_some("ProtoColumnType::scalar_type")?,
145        })
146    }
147}
148
149impl fmt::Display for ColumnType {
150    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151        let nullable = if self.nullable { "Null" } else { "NotNull" };
152        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
153    }
154}
155
156/// The type of a relation.
157#[derive(
158    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
159)]
160pub struct RelationType {
161    /// The type for each column, in order.
162    pub column_types: Vec<ColumnType>,
163    /// Sets of indices that are "keys" for the collection.
164    ///
165    /// Each element in this list is a set of column indices, each with the
166    /// property that the collection contains at most one record with each
167    /// distinct set of values for each column. Alternately, for a specific set
168    /// of values assigned to the these columns there is at most one record.
169    ///
170    /// A collection can contain multiple sets of keys, although it is common to
171    /// have either zero or one sets of key indices.
172    #[serde(default)]
173    pub keys: Vec<Vec<usize>>,
174}
175
176impl RelationType {
177    /// Constructs a `RelationType` representing the relation with no columns and
178    /// no keys.
179    pub fn empty() -> Self {
180        RelationType::new(vec![])
181    }
182
183    /// Constructs a new `RelationType` from specified column types.
184    ///
185    /// The `RelationType` will have no keys.
186    pub fn new(column_types: Vec<ColumnType>) -> Self {
187        RelationType {
188            column_types,
189            keys: Vec::new(),
190        }
191    }
192
193    /// Adds a new key for the relation.
194    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
195        indices.sort_unstable();
196        if !self.keys.contains(&indices) {
197            self.keys.push(indices);
198        }
199        self
200    }
201
202    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
203        for key in keys {
204            self = self.with_key(key)
205        }
206        self
207    }
208
209    /// Computes the number of columns in the relation.
210    pub fn arity(&self) -> usize {
211        self.column_types.len()
212    }
213
214    /// Gets the index of the columns used when creating a default index.
215    pub fn default_key(&self) -> Vec<usize> {
216        if let Some(key) = self.keys.first() {
217            if key.is_empty() {
218                (0..self.column_types.len()).collect()
219            } else {
220                key.clone()
221            }
222        } else {
223            (0..self.column_types.len()).collect()
224        }
225    }
226
227    /// Returns all the [`ColumnType`]s, in order, for this relation.
228    pub fn columns(&self) -> &[ColumnType] {
229        &self.column_types
230    }
231}
232
233impl RustType<ProtoRelationType> for RelationType {
234    fn into_proto(&self) -> ProtoRelationType {
235        ProtoRelationType {
236            column_types: self.column_types.into_proto(),
237            keys: self.keys.into_proto(),
238        }
239    }
240
241    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
242        Ok(RelationType {
243            column_types: proto.column_types.into_rust()?,
244            keys: proto.keys.into_rust()?,
245        })
246    }
247}
248
249impl RustType<ProtoKey> for Vec<usize> {
250    fn into_proto(&self) -> ProtoKey {
251        ProtoKey {
252            keys: self.into_proto(),
253        }
254    }
255
256    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
257        proto.keys.into_rust()
258    }
259}
260
261/// The name of a column in a [`RelationDesc`].
262#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
263pub struct ColumnName(pub(crate) String);
264
265impl ColumnName {
266    /// Returns this column name as a `str`.
267    pub fn as_str(&self) -> &str {
268        &self.0
269    }
270
271    /// Returns a mutable reference to the string underlying this column name.
272    pub fn as_mut_str(&mut self) -> &mut String {
273        &mut self.0
274    }
275
276    /// Returns if this [`ColumnName`] is similar to the provided one.
277    pub fn is_similar(&self, other: &ColumnName) -> bool {
278        const SIMILARITY_THRESHOLD: f64 = 0.6;
279
280        let a_lowercase = self.0.to_lowercase();
281        let b_lowercase = other.as_str().to_lowercase();
282
283        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
284    }
285}
286
287impl fmt::Display for ColumnName {
288    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
289        f.write_str(&self.0)
290    }
291}
292
293impl From<String> for ColumnName {
294    fn from(s: String) -> ColumnName {
295        ColumnName(s)
296    }
297}
298
299impl From<&str> for ColumnName {
300    fn from(s: &str) -> ColumnName {
301        ColumnName(s.into())
302    }
303}
304
305impl From<&ColumnName> for ColumnName {
306    fn from(n: &ColumnName) -> ColumnName {
307        n.clone()
308    }
309}
310
311impl RustType<ProtoColumnName> for ColumnName {
312    fn into_proto(&self) -> ProtoColumnName {
313        ProtoColumnName {
314            value: Some(self.0.clone()),
315        }
316    }
317
318    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
319        Ok(ColumnName(proto.value.ok_or_else(|| {
320            TryFromProtoError::missing_field("ProtoColumnName::value")
321        })?))
322    }
323}
324
325impl From<ColumnName> for mz_sql_parser::ast::Ident {
326    fn from(value: ColumnName) -> Self {
327        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
328        mz_sql_parser::ast::Ident::new_unchecked(value.0)
329    }
330}
331
332impl proptest::arbitrary::Arbitrary for ColumnName {
333    type Parameters = ();
334    type Strategy = BoxedStrategy<ColumnName>;
335
336    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
337        // Long column names are generally uninteresting, and can greatly
338        // increase the runtime for a test case, so bound the max length.
339        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
340        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
341            weights.extend([
342                (5, Just(16..128)),
343                (1, Just(128..1024)),
344                (1, Just(1024..4096)),
345            ]);
346        }
347        let name_length = Union::new_weighted(weights);
348
349        // Non-ASCII characters are also generally uninteresting and can make
350        // debugging harder.
351        let char_strat = Rc::new(Union::new_weighted(vec![
352            (50, proptest::char::range('A', 'z').boxed()),
353            (1, any::<char>().boxed()),
354        ]));
355
356        name_length
357            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
358            .prop_map(|chars| ColumnName(chars.into_iter().collect::<String>()))
359            .no_shrink()
360            .boxed()
361    }
362}
363
364/// Stable index of a column in a [`RelationDesc`].
365#[derive(
366    Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
367)]
368pub struct ColumnIndex(usize);
369
370static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
371
372impl ColumnIndex {
373    /// Returns a stable identifier for this [`ColumnIndex`].
374    pub fn to_stable_name(&self) -> String {
375        self.0.to_string()
376    }
377
378    pub fn to_raw(&self) -> usize {
379        self.0
380    }
381
382    pub fn from_raw(val: usize) -> Self {
383        ColumnIndex(val)
384    }
385}
386
387/// The version a given column was added at.
388#[derive(
389    Clone,
390    Copy,
391    Debug,
392    Eq,
393    PartialEq,
394    PartialOrd,
395    Ord,
396    Serialize,
397    Deserialize,
398    Hash,
399    MzReflect,
400    Arbitrary,
401)]
402pub struct RelationVersion(u64);
403
404impl RelationVersion {
405    /// Returns the "root" or "initial" version of a [`RelationDesc`].
406    pub fn root() -> Self {
407        RelationVersion(0)
408    }
409
410    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
411    pub fn bump(&self) -> Self {
412        let next_version = self
413            .0
414            .checked_add(1)
415            .expect("added more than u64::MAX columns?");
416        RelationVersion(next_version)
417    }
418
419    /// Consume a [`RelationVersion`] returning the raw value.
420    ///
421    /// Should __only__ be used for serialization.
422    pub fn into_raw(self) -> u64 {
423        self.0
424    }
425
426    /// Create a [`RelationVersion`] from a raw value.
427    ///
428    /// Should __only__ be used for serialization.
429    pub fn from_raw(val: u64) -> RelationVersion {
430        RelationVersion(val)
431    }
432}
433
434impl From<RelationVersion> for SchemaId {
435    fn from(value: RelationVersion) -> Self {
436        SchemaId(usize::cast_from(value.0))
437    }
438}
439
440impl From<mz_sql_parser::ast::Version> for RelationVersion {
441    fn from(value: mz_sql_parser::ast::Version) -> Self {
442        RelationVersion(value.into_inner())
443    }
444}
445
446impl fmt::Display for RelationVersion {
447    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448        write!(f, "v{}", self.0)
449    }
450}
451
452impl From<RelationVersion> for mz_sql_parser::ast::Version {
453    fn from(value: RelationVersion) -> Self {
454        mz_sql_parser::ast::Version::new(value.0)
455    }
456}
457
458impl RustType<ProtoRelationVersion> for RelationVersion {
459    fn into_proto(&self) -> ProtoRelationVersion {
460        ProtoRelationVersion { value: self.0 }
461    }
462
463    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
464        Ok(RelationVersion(proto.value))
465    }
466}
467
468/// Metadata (other than type) for a column in a [`RelationDesc`].
469#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
470struct ColumnMetadata {
471    /// Name of the column.
472    name: ColumnName,
473    /// Index into a [`RelationType`] for this column.
474    typ_idx: usize,
475    /// Version this column was added at.
476    added: RelationVersion,
477    /// Version this column was dropped at.
478    dropped: Option<RelationVersion>,
479}
480
481/// A description of the shape of a relation.
482///
483/// It bundles a [`RelationType`] with `ColumnMetadata` for each column in
484/// the relation.
485///
486/// # Examples
487///
488/// A `RelationDesc`s is typically constructed via its builder API:
489///
490/// ```
491/// use mz_repr::{ColumnType, RelationDesc, ScalarType};
492///
493/// let desc = RelationDesc::builder()
494///     .with_column("id", ScalarType::Int64.nullable(false))
495///     .with_column("price", ScalarType::Float64.nullable(true))
496///     .finish();
497/// ```
498///
499/// In more complicated cases, like when constructing a `RelationDesc` in
500/// response to user input, it may be more convenient to construct a relation
501/// type first, and imbue it with column names to form a `RelationDesc` later:
502///
503/// ```
504/// use mz_repr::RelationDesc;
505///
506/// # fn plan_query(_: &str) -> mz_repr::RelationType { mz_repr::RelationType::new(vec![]) }
507/// let relation_type = plan_query("SELECT * FROM table");
508/// let names = (0..relation_type.arity()).map(|i| match i {
509///     0 => "first",
510///     1 => "second",
511///     _ => "unknown",
512/// });
513/// let desc = RelationDesc::new(relation_type, names);
514/// ```
515///
516/// Next to the [`RelationType`] we maintain a map of `ColumnIndex` to
517/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
518/// column throughout the lifetime of the relation. This allows a
519/// [`RelationDesc`] to represent a projection over a version of itself.
520///
521/// ```
522/// use std::collections::BTreeSet;
523/// use mz_repr::{ColumnIndex, RelationDesc, ScalarType};
524///
525/// let desc = RelationDesc::builder()
526///     .with_column("name", ScalarType::String.nullable(false))
527///     .with_column("email", ScalarType::String.nullable(false))
528///     .finish();
529///
530/// // Project away the second column.
531/// let demands = BTreeSet::from([1]);
532/// let proj = desc.apply_demand(&demands);
533///
534/// // We projected away the first column.
535/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
536/// // But retained the second.
537/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
538///
539/// // The underlying `RelationType` also contains a single column.
540/// assert_eq!(proj.typ().arity(), 1);
541/// ```
542///
543/// To maintain this stable mapping and track the lifetime of a column (e.g.
544/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
545/// the index in [`RelationType`] that corresponds to a given column, and the
546/// version at which this column was added or dropped.
547///
548#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
549pub struct RelationDesc {
550    typ: RelationType,
551    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
552}
553
554impl RustType<ProtoRelationDesc> for RelationDesc {
555    fn into_proto(&self) -> ProtoRelationDesc {
556        let (names, metadata): (Vec<_>, Vec<_>) = self
557            .metadata
558            .values()
559            .map(|meta| {
560                let metadata = ProtoColumnMetadata {
561                    added: Some(meta.added.into_proto()),
562                    dropped: meta.dropped.map(|v| v.into_proto()),
563                };
564                (meta.name.into_proto(), metadata)
565            })
566            .unzip();
567
568        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
569        // metadata field was added. To make sure our serialization roundtrips the same as before
570        // we added the field, we omit `metadata` if all of the values are equal to the default.
571        //
572        // Note: This logic needs to exist approximately forever.
573        let is_all_default_metadata = metadata.iter().all(|meta| {
574            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
575        });
576        let metadata = if is_all_default_metadata {
577            Vec::new()
578        } else {
579            metadata
580        };
581
582        ProtoRelationDesc {
583            typ: Some(self.typ.into_proto()),
584            names,
585            metadata,
586        }
587    }
588
589    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
590        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
591        // metadata field was added. If the field doesn't exist we fill it in with default values,
592        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
593        //
594        // Note: This logic needs to exist approximately forever.
595        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
596            let val = ProtoColumnMetadata {
597                added: Some(RelationVersion::root().into_proto()),
598                dropped: None,
599            };
600            Box::new(itertools::repeat_n(val, proto.names.len()))
601        } else {
602            Box::new(proto.metadata.into_iter())
603        };
604
605        let metadata = proto
606            .names
607            .into_iter()
608            .zip_eq(proto_metadata)
609            .enumerate()
610            .map(|(idx, (name, metadata))| {
611                let meta = ColumnMetadata {
612                    name: name.into_rust()?,
613                    typ_idx: idx,
614                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
615                    dropped: metadata.dropped.into_rust()?,
616                };
617                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
618            })
619            .collect::<Result<_, _>>()?;
620
621        Ok(RelationDesc {
622            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
623            metadata,
624        })
625    }
626}
627
628impl RelationDesc {
629    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
630    pub fn builder() -> RelationDescBuilder {
631        RelationDescBuilder::default()
632    }
633
634    /// Constructs a new `RelationDesc` that represents the empty relation
635    /// with no columns and no keys.
636    pub fn empty() -> Self {
637        RelationDesc {
638            typ: RelationType::empty(),
639            metadata: BTreeMap::default(),
640        }
641    }
642
643    /// Check if the `RelationDesc` is empty.
644    pub fn is_empty(&self) -> bool {
645        self == &Self::empty()
646    }
647
648    /// Returns the number of columns in this [`RelationDesc`].
649    pub fn len(&self) -> usize {
650        self.typ().column_types.len()
651    }
652
653    /// Constructs a new `RelationDesc` from a `RelationType` and an iterator
654    /// over column names.
655    ///
656    /// # Panics
657    ///
658    /// Panics if the arity of the `RelationType` is not equal to the number of
659    /// items in `names`.
660    pub fn new<I, N>(typ: RelationType, names: I) -> Self
661    where
662        I: IntoIterator<Item = N>,
663        N: Into<ColumnName>,
664    {
665        let metadata: BTreeMap<_, _> = names
666            .into_iter()
667            .enumerate()
668            .map(|(idx, name)| {
669                let col_idx = ColumnIndex(idx);
670                let metadata = ColumnMetadata {
671                    name: name.into(),
672                    typ_idx: idx,
673                    added: RelationVersion::root(),
674                    dropped: None,
675                };
676                (col_idx, metadata)
677            })
678            .collect();
679
680        // TODO(parkmycar): Add better validation here.
681        assert_eq!(typ.column_types.len(), metadata.len());
682
683        RelationDesc { typ, metadata }
684    }
685
686    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
687    where
688        I: IntoIterator<Item = (N, T)>,
689        T: Into<ColumnType>,
690        N: Into<ColumnName>,
691    {
692        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
693        let types = types.into_iter().map(Into::into).collect();
694        let typ = RelationType::new(types);
695        Self::new(typ, names)
696    }
697
698    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
699    ///
700    /// # Panics
701    ///
702    /// Panics if either `self` or `other` have columns that were added at a
703    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
704    /// columns were dropped.
705    ///
706    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
707    pub fn concat(mut self, other: Self) -> Self {
708        let self_len = self.typ.column_types.len();
709
710        for (typ, (_col_idx, meta)) in other
711            .typ
712            .column_types
713            .into_iter()
714            .zip_eq(other.metadata.into_iter())
715        {
716            assert_eq!(meta.added, RelationVersion::root());
717            assert_none!(meta.dropped);
718
719            let new_idx = self.typ.columns().len();
720            let new_meta = ColumnMetadata {
721                name: meta.name,
722                typ_idx: new_idx,
723                added: RelationVersion::root(),
724                dropped: None,
725            };
726
727            self.typ.column_types.push(typ);
728            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
729
730            assert_eq!(self.metadata.len(), self.typ.columns().len());
731            assert_none!(prev);
732        }
733
734        for k in other.typ.keys {
735            let k = k.into_iter().map(|idx| idx + self_len).collect();
736            self = self.with_key(k);
737        }
738        self
739    }
740
741    /// Adds a new key for the relation.
742    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
743        self.typ = self.typ.with_key(indices);
744        self
745    }
746
747    /// Drops all existing keys.
748    pub fn without_keys(mut self) -> Self {
749        self.typ.keys.clear();
750        self
751    }
752
753    /// Builds a new relation description with the column names replaced with
754    /// new names.
755    ///
756    /// # Panics
757    ///
758    /// Panics if the arity of the relation type does not match the number of
759    /// items in `names`.
760    pub fn with_names<I, N>(self, names: I) -> Self
761    where
762        I: IntoIterator<Item = N>,
763        N: Into<ColumnName>,
764    {
765        Self::new(self.typ, names)
766    }
767
768    /// Computes the number of columns in the relation.
769    pub fn arity(&self) -> usize {
770        self.typ.arity()
771    }
772
773    /// Returns the relation type underlying this relation description.
774    pub fn typ(&self) -> &RelationType {
775        &self.typ
776    }
777
778    /// Returns an iterator over the columns in this relation.
779    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &ColumnType)> {
780        self.metadata.values().map(|meta| {
781            let typ = &self.typ.columns()[meta.typ_idx];
782            (&meta.name, typ)
783        })
784    }
785
786    /// Returns an iterator over the types of the columns in this relation.
787    pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
788        self.typ.column_types.iter()
789    }
790
791    /// Returns an iterator over the names of the columns in this relation.
792    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
793        self.metadata.values().map(|meta| &meta.name)
794    }
795
796    /// Returns an iterator over the columns in this relation, with all their metadata.
797    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &ColumnType)> {
798        self.metadata.iter().map(|(col_idx, metadata)| {
799            let col_typ = &self.typ.columns()[metadata.typ_idx];
800            (col_idx, &metadata.name, col_typ)
801        })
802    }
803
804    /// Returns an iterator over the names of the columns in this relation that are "similar" to
805    /// the provided `name`.
806    pub fn iter_similar_names<'a>(
807        &'a self,
808        name: &'a ColumnName,
809    ) -> impl Iterator<Item = &'a ColumnName> {
810        self.iter_names().filter(|n| n.is_similar(name))
811    }
812
813    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
814    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
815        self.metadata.contains_key(idx)
816    }
817
818    /// Finds a column by name.
819    ///
820    /// Returns the index and type of the column named `name`. If no column with
821    /// the specified name exists, returns `None`. If multiple columns have the
822    /// specified name, the leftmost column is returned.
823    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
824        self.iter_names()
825            .position(|n| n == name)
826            .map(|i| (i, &self.typ.column_types[i]))
827    }
828
829    /// Gets the name of the `i`th column.
830    ///
831    /// # Panics
832    ///
833    /// Panics if `i` is not a valid column index.
834    ///
835    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
836    pub fn get_name(&self, i: usize) -> &ColumnName {
837        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
838        self.get_name_idx(&ColumnIndex(i))
839    }
840
841    /// Gets the name of the column at `idx`.
842    ///
843    /// # Panics
844    ///
845    /// Panics if no column exists at `idx`.
846    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
847        &self.metadata.get(idx).expect("should exist").name
848    }
849
850    /// Mutably gets the name of the `i`th column.
851    ///
852    /// # Panics
853    ///
854    /// Panics if `i` is not a valid column index.
855    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
856        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
857        &mut self
858            .metadata
859            .get_mut(&ColumnIndex(i))
860            .expect("should exist")
861            .name
862    }
863
864    /// Gets the [`ColumnType`] of the column at `idx`.
865    ///
866    /// # Panics
867    ///
868    /// Panics if no column exists at `idx`.
869    pub fn get_type(&self, idx: &ColumnIndex) -> &ColumnType {
870        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
871        &self.typ.column_types[typ_idx]
872    }
873
874    /// Gets the name of the `i`th column if that column name is unambiguous.
875    ///
876    /// If at least one other column has the same name as the `i`th column,
877    /// returns `None`. If the `i`th column has no name, returns `None`.
878    ///
879    /// # Panics
880    ///
881    /// Panics if `i` is not a valid column index.
882    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
883        let name = self.get_name(i);
884        if self.iter_names().filter(|n| *n == name).count() == 1 {
885            Some(name)
886        } else {
887            None
888        }
889    }
890
891    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
892    ///
893    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
894    /// structure will be simple to extend.
895    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
896        let name = self.get_name(i);
897        let typ = &self.typ.column_types[i];
898        if d == &Datum::Null && !typ.nullable {
899            Err(NotNullViolation(name.clone()))
900        } else {
901            Ok(())
902        }
903    }
904
905    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
906    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
907        let mut new_desc = self.clone();
908
909        // Update ColumnMetadata.
910        let mut removed = 0;
911        new_desc.metadata.retain(|idx, metadata| {
912            let retain = demands.contains(&idx.0);
913            if !retain {
914                removed += 1;
915            } else {
916                metadata.typ_idx -= removed;
917            }
918            retain
919        });
920
921        // Update ColumnType.
922        let mut idx = 0;
923        new_desc.typ.column_types.retain(|_| {
924            let keep = demands.contains(&idx);
925            idx += 1;
926            keep
927        });
928
929        new_desc
930    }
931}
932
933impl Arbitrary for RelationDesc {
934    type Parameters = ();
935    type Strategy = BoxedStrategy<RelationDesc>;
936
937    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
938        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
939        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
940            weights.extend([
941                (12, Just(16..32)),
942                (6, Just(32..64)),
943                (3, Just(64..128)),
944                (1, Just(128..256)),
945            ]);
946        }
947        let num_columns = Union::new_weighted(weights);
948
949        num_columns.prop_flat_map(arb_relation_desc).boxed()
950    }
951}
952
953/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
954/// within the range provided.
955pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
956    proptest::collection::btree_map(any::<ColumnName>(), any::<ColumnType>(), num_cols)
957        .prop_map(RelationDesc::from_names_and_types)
958}
959
960/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
961pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
962    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
963    mask.prop_map(move |mask| {
964        let demands: BTreeSet<_> = mask
965            .into_iter()
966            .enumerate()
967            .filter_map(|(idx, keep)| keep.then_some(idx))
968            .collect();
969        desc.apply_demand(&demands)
970    })
971}
972
973impl IntoIterator for RelationDesc {
974    type Item = (ColumnName, ColumnType);
975    type IntoIter = Box<dyn Iterator<Item = (ColumnName, ColumnType)>>;
976
977    fn into_iter(self) -> Self::IntoIter {
978        let iter = self
979            .metadata
980            .into_values()
981            .zip_eq(self.typ.column_types)
982            .map(|(meta, typ)| (meta.name, typ));
983        Box::new(iter)
984    }
985}
986
987/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
988pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
989    let datums: Vec<_> = desc
990        .typ()
991        .columns()
992        .iter()
993        .cloned()
994        .map(arb_datum_for_column)
995        .collect();
996    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
997}
998
999/// Expression violated not-null constraint on named column
1000#[derive(Debug, PartialEq, Eq)]
1001pub struct NotNullViolation(pub ColumnName);
1002
1003impl fmt::Display for NotNullViolation {
1004    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1005        write!(
1006            f,
1007            "null value in column {} violates not-null constraint",
1008            self.0.as_str().quoted()
1009        )
1010    }
1011}
1012
1013/// A builder for a [`RelationDesc`].
1014#[derive(Clone, Default, Debug, PartialEq, Eq)]
1015pub struct RelationDescBuilder {
1016    /// Columns of the relation.
1017    columns: Vec<(ColumnName, ColumnType)>,
1018    /// Sets of indices that are "keys" for the collection.
1019    keys: Vec<Vec<usize>>,
1020}
1021
1022impl RelationDescBuilder {
1023    /// Appends a column with the specified name and type.
1024    pub fn with_column<N: Into<ColumnName>>(
1025        mut self,
1026        name: N,
1027        ty: ColumnType,
1028    ) -> RelationDescBuilder {
1029        let name = name.into();
1030        self.columns.push((name, ty));
1031        self
1032    }
1033
1034    /// Appends the provided columns to the builder.
1035    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1036    where
1037        I: IntoIterator<Item = (N, T)>,
1038        T: Into<ColumnType>,
1039        N: Into<ColumnName>,
1040    {
1041        self.columns
1042            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1043        self
1044    }
1045
1046    /// Adds a new key for the relation.
1047    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1048        indices.sort_unstable();
1049        if !self.keys.contains(&indices) {
1050            self.keys.push(indices);
1051        }
1052        self
1053    }
1054
1055    /// Removes all previously inserted keys.
1056    pub fn without_keys(mut self) -> RelationDescBuilder {
1057        self.keys.clear();
1058        assert_eq!(self.keys.len(), 0);
1059        self
1060    }
1061
1062    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1063    pub fn concat(mut self, other: Self) -> Self {
1064        let self_len = self.columns.len();
1065
1066        self.columns.extend(other.columns);
1067        for k in other.keys {
1068            let k = k.into_iter().map(|idx| idx + self_len).collect();
1069            self = self.with_key(k);
1070        }
1071
1072        self
1073    }
1074
1075    /// Finish the builder, returning a [`RelationDesc`].
1076    pub fn finish(self) -> RelationDesc {
1077        let mut desc = RelationDesc::from_names_and_types(self.columns);
1078        desc.typ = desc.typ.with_keys(self.keys);
1079        desc
1080    }
1081}
1082
1083/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1084#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1085pub enum RelationVersionSelector {
1086    Specific(RelationVersion),
1087    Latest,
1088}
1089
1090impl RelationVersionSelector {
1091    pub fn specific(version: u64) -> Self {
1092        RelationVersionSelector::Specific(RelationVersion(version))
1093    }
1094}
1095
1096/// A wrapper around [`RelationDesc`] that provides an interface for adding
1097/// columns and generating new versions.
1098///
1099/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1100/// be great.
1101#[derive(Debug, Clone, Serialize)]
1102pub struct VersionedRelationDesc {
1103    inner: RelationDesc,
1104}
1105
1106impl VersionedRelationDesc {
1107    pub fn new(inner: RelationDesc) -> Self {
1108        VersionedRelationDesc { inner }
1109    }
1110
1111    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1112    ///
1113    /// # Panics
1114    ///
1115    /// * Panics if a column with `name` already exists that hasn't been dropped.
1116    ///
1117    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1118    #[must_use]
1119    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1120    where
1121        N: Into<ColumnName>,
1122        T: Into<ColumnType>,
1123    {
1124        let latest_version = self.latest_version();
1125        let new_version = latest_version.bump();
1126
1127        let name = name.into();
1128        let existing = self
1129            .inner
1130            .metadata
1131            .iter()
1132            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1133        if let Some(existing) = existing {
1134            panic!("column named '{name}' already exists! {existing:?}");
1135        }
1136
1137        let next_idx = self.inner.metadata.len();
1138        let col_meta = ColumnMetadata {
1139            name,
1140            typ_idx: next_idx,
1141            added: new_version,
1142            dropped: None,
1143        };
1144
1145        self.inner.typ.column_types.push(typ.into());
1146        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1147
1148        assert_none!(prev, "column index overlap!");
1149        self.validate();
1150
1151        new_version
1152    }
1153
1154    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1155    /// `name` drops the left-most one that hasn't already been dropped.
1156    ///
1157    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1158    ///
1159    /// # Panics
1160    ///
1161    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1162    #[must_use]
1163    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1164    where
1165        N: Into<ColumnName>,
1166    {
1167        let name = name.into();
1168        let latest_version = self.latest_version();
1169        let new_version = latest_version.bump();
1170
1171        let col = self
1172            .inner
1173            .metadata
1174            .values_mut()
1175            .find(|meta| meta.name == name && meta.dropped.is_none())
1176            .expect("column to exist");
1177
1178        // Make sure the column hadn't been previously dropped.
1179        assert_none!(col.dropped, "column was already dropped");
1180        col.dropped = Some(new_version);
1181
1182        // Make sure the column isn't being used as a key.
1183        let dropped_key = self
1184            .inner
1185            .typ
1186            .keys
1187            .iter()
1188            .any(|keys| keys.iter().any(|key| *key == col.typ_idx));
1189        assert!(!dropped_key, "column being dropped was used as a key");
1190
1191        self.validate();
1192        new_version
1193    }
1194
1195    /// Returns the [`RelationDesc`] at the latest version.
1196    pub fn latest(&self) -> RelationDesc {
1197        self.inner.clone()
1198    }
1199
1200    /// Returns this [`RelationDesc`] at the specified version.
1201    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1202        // Get all of the changes from the start, up to whatever version was requested.
1203        //
1204        // TODO(parkmycar): We should probably panic on unknown verisons?
1205        let up_to_version = match version {
1206            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1207            RelationVersionSelector::Specific(v) => v,
1208        };
1209
1210        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1211            let added = meta.added <= up_to_version;
1212            let dropped = meta
1213                .dropped
1214                .map(|dropped_at| up_to_version >= dropped_at)
1215                .unwrap_or(false);
1216
1217            added && !dropped
1218        });
1219
1220        let mut column_types = Vec::new();
1221        let mut column_metas = BTreeMap::new();
1222
1223        // N.B. At this point we need to be careful because col_idx might not
1224        // equal typ_idx.
1225        //
1226        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1227        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1228        // indexes 0 and 2, but their indices in RelationType will be 0 and 1.
1229        for (col_idx, meta) in valid_columns {
1230            let new_meta = ColumnMetadata {
1231                name: meta.name.clone(),
1232                typ_idx: column_types.len(),
1233                added: meta.added.clone(),
1234                dropped: meta.dropped.clone(),
1235            };
1236            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1237            column_metas.insert(*col_idx, new_meta);
1238        }
1239
1240        // Remap keys in case a column with an index less than that of a key was
1241        // dropped.
1242        //
1243        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1244        // keys and "b" was dropped.
1245        let keys = self
1246            .inner
1247            .typ
1248            .keys
1249            .iter()
1250            .map(|keys| {
1251                keys.iter()
1252                    .map(|key_idx| {
1253                        let metadata = column_metas
1254                            .get(&ColumnIndex(*key_idx))
1255                            .expect("found key for column that doesn't exist");
1256                        metadata.typ_idx
1257                    })
1258                    .collect()
1259            })
1260            .collect();
1261
1262        let relation_type = RelationType { column_types, keys };
1263
1264        RelationDesc {
1265            typ: relation_type,
1266            metadata: column_metas,
1267        }
1268    }
1269
1270    pub fn latest_version(&self) -> RelationVersion {
1271        self.inner
1272            .metadata
1273            .values()
1274            // N.B. Dropped is always greater than added.
1275            .map(|meta| meta.dropped.unwrap_or(meta.added))
1276            .max()
1277            // If there aren't any columns we're implicitly the root version.
1278            .unwrap_or(RelationVersion::root())
1279    }
1280
1281    /// Validates internal contraints of the [`RelationDesc`] are correct.
1282    ///
1283    /// # Panics
1284    ///
1285    /// Panics if a constraint is not satisfied.
1286    fn validate(&self) {
1287        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1288            if desc.typ.column_types.len() != desc.metadata.len() {
1289                anyhow::bail!("mismatch between number of types and metadatas");
1290            }
1291
1292            for (col_idx, meta) in &desc.metadata {
1293                if col_idx.0 > desc.metadata.len() {
1294                    anyhow::bail!("column index out of bounds");
1295                }
1296                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1297                    anyhow::bail!("column was added after it was dropped?");
1298                }
1299                if desc.typ().columns().get(meta.typ_idx).is_none() {
1300                    anyhow::bail!("typ_idx incorrect");
1301                }
1302            }
1303
1304            for keys in &desc.typ.keys {
1305                for key in keys {
1306                    if *key >= desc.typ.column_types.len() {
1307                        anyhow::bail!("key index was out of bounds!");
1308                    }
1309                }
1310            }
1311
1312            let versions = desc
1313                .metadata
1314                .values()
1315                .map(|meta| meta.dropped.unwrap_or(meta.added));
1316            let mut max = 0;
1317            let mut sum = 0;
1318            for version in versions {
1319                max = std::cmp::max(max, version.0);
1320                sum += version.0;
1321            }
1322
1323            // Other than RelationVersion(0), we should never have duplicate
1324            // versions and they should always increase by 1. In other words, the
1325            // sum of all RelationVersions should be the sum of [0, max].
1326            //
1327            // N.B. n * (n + 1) / 2 = sum of [0, n]
1328            //
1329            // While I normally don't like tricks like this, it allows us to
1330            // validate that our column versions are correct in O(n) time and
1331            // without allocations.
1332            if sum != (max * (max + 1) / 2) {
1333                anyhow::bail!("there is a duplicate or missing relation version");
1334            }
1335
1336            Ok(())
1337        }
1338
1339        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1340    }
1341}
1342
1343/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1344/// exercise schema migrations.
1345#[derive(Debug)]
1346pub enum PropRelationDescDiff {
1347    AddColumn { name: ColumnName, typ: ColumnType },
1348    DropColumn { name: ColumnName },
1349    ToggleNullability { name: ColumnName },
1350    ChangeType { name: ColumnName, typ: ColumnType },
1351}
1352
1353impl PropRelationDescDiff {
1354    pub fn apply(self, desc: &mut RelationDesc) {
1355        match self {
1356            PropRelationDescDiff::AddColumn { name, typ } => {
1357                let new_idx = desc.metadata.len();
1358                let meta = ColumnMetadata {
1359                    name,
1360                    typ_idx: new_idx,
1361                    added: RelationVersion(0),
1362                    dropped: None,
1363                };
1364                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1365                desc.typ.column_types.push(typ);
1366
1367                assert_none!(prev);
1368                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1369            }
1370            PropRelationDescDiff::DropColumn { name } => {
1371                let next_version = desc
1372                    .metadata
1373                    .values()
1374                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1375                    .max()
1376                    .unwrap_or(RelationVersion::root())
1377                    .bump();
1378                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1379                else {
1380                    return;
1381                };
1382                if metadata.dropped.is_none() {
1383                    metadata.dropped = Some(next_version);
1384                }
1385            }
1386            PropRelationDescDiff::ToggleNullability { name } => {
1387                let Some((pos, _)) = desc.get_by_name(&name) else {
1388                    return;
1389                };
1390                let col_type = desc
1391                    .typ
1392                    .column_types
1393                    .get_mut(pos)
1394                    .expect("ColumnNames and ColumnTypes out of sync!");
1395                col_type.nullable = !col_type.nullable;
1396            }
1397            PropRelationDescDiff::ChangeType { name, typ } => {
1398                let Some((pos, _)) = desc.get_by_name(&name) else {
1399                    return;
1400                };
1401                let col_type = desc
1402                    .typ
1403                    .column_types
1404                    .get_mut(pos)
1405                    .expect("ColumnNames and ColumnTypes out of sync!");
1406                *col_type = typ;
1407            }
1408        }
1409    }
1410}
1411
1412/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1413pub fn arb_relation_desc_diff(
1414    source: &RelationDesc,
1415) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1416    let source = Rc::new(source.clone());
1417    let num_source_columns = source.typ.columns().len();
1418
1419    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1420    let add_columns_strat = num_add_columns
1421        .prop_flat_map(|num_columns| {
1422            proptest::collection::vec((any::<ColumnName>(), any::<ColumnType>()), num_columns)
1423        })
1424        .prop_map(|cols| {
1425            cols.into_iter()
1426                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1427                .collect::<Vec<_>>()
1428        });
1429
1430    // If the source RelationDesc is empty there is nothing else to do.
1431    if num_source_columns == 0 {
1432        return add_columns_strat.boxed();
1433    }
1434
1435    let source_ = Rc::clone(&source);
1436    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1437        let mut set = BTreeSet::default();
1438        for _ in 0..num_columns {
1439            let col_idx = rng.gen_range(0..num_source_columns);
1440            set.insert(source_.get_name(col_idx).clone());
1441        }
1442        set.into_iter()
1443            .map(|name| PropRelationDescDiff::DropColumn { name })
1444            .collect::<Vec<_>>()
1445    });
1446
1447    let source_ = Rc::clone(&source);
1448    let toggle_nullability_strat =
1449        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1450            let mut set = BTreeSet::default();
1451            for _ in 0..num_columns {
1452                let col_idx = rng.gen_range(0..num_source_columns);
1453                set.insert(source_.get_name(col_idx).clone());
1454            }
1455            set.into_iter()
1456                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1457                .collect::<Vec<_>>()
1458        });
1459
1460    let source_ = Rc::clone(&source);
1461    let change_type_strat = (0..num_source_columns)
1462        .prop_perturb(move |num_columns, mut rng| {
1463            let mut set = BTreeSet::default();
1464            for _ in 0..num_columns {
1465                let col_idx = rng.gen_range(0..num_source_columns);
1466                set.insert(source_.get_name(col_idx).clone());
1467            }
1468            set
1469        })
1470        .prop_flat_map(|cols| {
1471            proptest::collection::vec(any::<ColumnType>(), cols.len())
1472                .prop_map(move |types| (cols.clone(), types))
1473        })
1474        .prop_map(|(cols, types)| {
1475            cols.into_iter()
1476                .zip(types)
1477                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1478                .collect::<Vec<_>>()
1479        });
1480
1481    (
1482        add_columns_strat,
1483        drop_columns_strat,
1484        toggle_nullability_strat,
1485        change_type_strat,
1486    )
1487        .prop_map(|(adds, drops, toggles, changes)| {
1488            adds.into_iter()
1489                .chain(drops)
1490                .chain(toggles)
1491                .chain(changes)
1492                .collect::<Vec<_>>()
1493        })
1494        .prop_shuffle()
1495        .boxed()
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500    use super::*;
1501    use prost::Message;
1502
1503    #[mz_ore::test]
1504    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1505    fn smoktest_at_version() {
1506        let desc = RelationDesc::builder()
1507            .with_column("a", ScalarType::Bool.nullable(true))
1508            .with_column("z", ScalarType::String.nullable(false))
1509            .finish();
1510
1511        let mut versioned_desc = VersionedRelationDesc {
1512            inner: desc.clone(),
1513        };
1514        versioned_desc.validate();
1515
1516        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1517        assert_eq!(desc, latest);
1518
1519        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1520        assert_eq!(desc, v0);
1521
1522        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1523        assert_eq!(desc, v3);
1524
1525        let v1 = versioned_desc.add_column("b", ScalarType::Bytes.nullable(false));
1526        assert_eq!(v1, RelationVersion(1));
1527
1528        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1529        insta::assert_json_snapshot!(v1.metadata, @r###"
1530        {
1531          "0": {
1532            "name": "a",
1533            "typ_idx": 0,
1534            "added": 0,
1535            "dropped": null
1536          },
1537          "1": {
1538            "name": "z",
1539            "typ_idx": 1,
1540            "added": 0,
1541            "dropped": null
1542          },
1543          "2": {
1544            "name": "b",
1545            "typ_idx": 2,
1546            "added": 1,
1547            "dropped": null
1548          }
1549        }
1550        "###);
1551
1552        // Check that V0 doesn't show the new column.
1553        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1554        assert!(v0.iter().eq(v0_b.iter()));
1555
1556        let v2 = versioned_desc.drop_column("z");
1557        assert_eq!(v2, RelationVersion(2));
1558
1559        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1560        insta::assert_json_snapshot!(v2.metadata, @r###"
1561        {
1562          "0": {
1563            "name": "a",
1564            "typ_idx": 0,
1565            "added": 0,
1566            "dropped": null
1567          },
1568          "2": {
1569            "name": "b",
1570            "typ_idx": 1,
1571            "added": 1,
1572            "dropped": null
1573          }
1574        }
1575        "###);
1576
1577        // Check that V0 and V1 are still correct.
1578        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1579        assert!(v0.iter().eq(v0_c.iter()));
1580
1581        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1582        assert!(v1.iter().eq(v1_b.iter()));
1583
1584        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1585        {
1586          "0": {
1587            "name": "a",
1588            "typ_idx": 0,
1589            "added": 0,
1590            "dropped": null
1591          },
1592          "1": {
1593            "name": "z",
1594            "typ_idx": 1,
1595            "added": 0,
1596            "dropped": 2
1597          },
1598          "2": {
1599            "name": "b",
1600            "typ_idx": 2,
1601            "added": 1,
1602            "dropped": null
1603          }
1604        }
1605        "###);
1606    }
1607
1608    #[mz_ore::test]
1609    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1610    fn test_dropping_columns_with_keys() {
1611        let desc = RelationDesc::builder()
1612            .with_column("a", ScalarType::Bool.nullable(true))
1613            .with_column("z", ScalarType::String.nullable(false))
1614            .with_key(vec![1])
1615            .finish();
1616
1617        let mut versioned_desc = VersionedRelationDesc {
1618            inner: desc.clone(),
1619        };
1620        versioned_desc.validate();
1621
1622        let v1 = versioned_desc.drop_column("a");
1623        assert_eq!(v1, RelationVersion(1));
1624
1625        // Make sure the key index for 'z' got remapped since 'a' was dropped.
1626        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1627        insta::assert_json_snapshot!(v1, @r###"
1628        {
1629          "typ": {
1630            "column_types": [
1631              {
1632                "scalar_type": "String",
1633                "nullable": false
1634              }
1635            ],
1636            "keys": [
1637              [
1638                0
1639              ]
1640            ]
1641          },
1642          "metadata": {
1643            "1": {
1644              "name": "z",
1645              "typ_idx": 0,
1646              "added": 0,
1647              "dropped": null
1648            }
1649          }
1650        }
1651        "###);
1652
1653        // Make sure the key index of 'z' is correct when all columns are present.
1654        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1655        insta::assert_json_snapshot!(v0, @r###"
1656        {
1657          "typ": {
1658            "column_types": [
1659              {
1660                "scalar_type": "Bool",
1661                "nullable": true
1662              },
1663              {
1664                "scalar_type": "String",
1665                "nullable": false
1666              }
1667            ],
1668            "keys": [
1669              [
1670                1
1671              ]
1672            ]
1673          },
1674          "metadata": {
1675            "0": {
1676              "name": "a",
1677              "typ_idx": 0,
1678              "added": 0,
1679              "dropped": 1
1680            },
1681            "1": {
1682              "name": "z",
1683              "typ_idx": 1,
1684              "added": 0,
1685              "dropped": null
1686            }
1687          }
1688        }
1689        "###);
1690    }
1691
1692    #[mz_ore::test]
1693    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1694    fn roundtrip_relation_desc_without_metadata() {
1695        let typ = ProtoRelationType {
1696            column_types: vec![
1697                ScalarType::String.nullable(false).into_proto(),
1698                ScalarType::Bool.nullable(true).into_proto(),
1699            ],
1700            keys: vec![],
1701        };
1702        let proto = ProtoRelationDesc {
1703            typ: Some(typ),
1704            names: vec![
1705                ColumnName("a".to_string()).into_proto(),
1706                ColumnName("b".to_string()).into_proto(),
1707            ],
1708            metadata: vec![],
1709        };
1710        let desc: RelationDesc = proto.into_rust().unwrap();
1711
1712        insta::assert_json_snapshot!(desc, @r###"
1713        {
1714          "typ": {
1715            "column_types": [
1716              {
1717                "scalar_type": "String",
1718                "nullable": false
1719              },
1720              {
1721                "scalar_type": "Bool",
1722                "nullable": true
1723              }
1724            ],
1725            "keys": []
1726          },
1727          "metadata": {
1728            "0": {
1729              "name": "a",
1730              "typ_idx": 0,
1731              "added": 0,
1732              "dropped": null
1733            },
1734            "1": {
1735              "name": "b",
1736              "typ_idx": 1,
1737              "added": 0,
1738              "dropped": null
1739            }
1740          }
1741        }
1742        "###);
1743    }
1744
1745    #[mz_ore::test]
1746    #[should_panic(expected = "column named 'a' already exists!")]
1747    fn test_add_column_with_same_name_panics() {
1748        let desc = RelationDesc::builder()
1749            .with_column("a", ScalarType::Bool.nullable(true))
1750            .finish();
1751        let mut versioned = VersionedRelationDesc::new(desc);
1752
1753        let _ = versioned.add_column("a", ScalarType::String.nullable(false));
1754    }
1755
1756    #[mz_ore::test]
1757    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1758    fn test_add_column_with_same_name_prev_dropped() {
1759        let desc = RelationDesc::builder()
1760            .with_column("a", ScalarType::Bool.nullable(true))
1761            .finish();
1762        let mut versioned = VersionedRelationDesc::new(desc);
1763
1764        let v1 = versioned.drop_column("a");
1765        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
1766        insta::assert_json_snapshot!(v1, @r###"
1767        {
1768          "typ": {
1769            "column_types": [],
1770            "keys": []
1771          },
1772          "metadata": {}
1773        }
1774        "###);
1775
1776        let v2 = versioned.add_column("a", ScalarType::String.nullable(false));
1777        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
1778        insta::assert_json_snapshot!(v2, @r###"
1779        {
1780          "typ": {
1781            "column_types": [
1782              {
1783                "scalar_type": "String",
1784                "nullable": false
1785              }
1786            ],
1787            "keys": []
1788          },
1789          "metadata": {
1790            "1": {
1791              "name": "a",
1792              "typ_idx": 0,
1793              "added": 2,
1794              "dropped": null
1795            }
1796          }
1797        }
1798        "###);
1799    }
1800
1801    #[mz_ore::test]
1802    #[cfg_attr(miri, ignore)]
1803    fn apply_demand() {
1804        let desc = RelationDesc::builder()
1805            .with_column("a", ScalarType::String.nullable(true))
1806            .with_column("b", ScalarType::Int64.nullable(false))
1807            .with_column("c", ScalarType::Time.nullable(false))
1808            .finish();
1809        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
1810        assert_eq!(desc.arity(), 2);
1811        // TODO(parkmycar): Move validate onto RelationDesc.
1812        VersionedRelationDesc::new(desc).validate();
1813    }
1814
1815    #[mz_ore::test]
1816    #[cfg_attr(miri, ignore)]
1817    fn smoketest_column_index_stable_ident() {
1818        let idx_a = ColumnIndex(42);
1819        // Note(parkmycar): This should never change.
1820        assert_eq!(idx_a.to_stable_name(), "42");
1821    }
1822
1823    #[mz_ore::test]
1824    #[cfg_attr(miri, ignore)] // too slow
1825    fn proptest_relation_desc_roundtrips() {
1826        fn testcase(og: RelationDesc) {
1827            let bytes = og.into_proto().encode_to_vec();
1828            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
1829            let rnd = RelationDesc::from_proto(proto).unwrap();
1830
1831            assert_eq!(og, rnd);
1832        }
1833
1834        proptest!(|(desc in any::<RelationDesc>())| {
1835            testcase(desc);
1836        });
1837
1838        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
1839            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
1840        });
1841
1842        proptest!(|((mut desc, diffs) in strat)| {
1843            for diff in diffs {
1844                diff.apply(&mut desc);
1845            };
1846            testcase(desc);
1847        });
1848    }
1849}