mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26use timely::Container;
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31    ProtoRelationVersion,
32};
33use crate::{Datum, Row, ScalarType, arb_datum_for_column};
34
35/// The type of a [`Datum`].
36///
37/// [`ColumnType`] bundles information about the scalar type of a datum (e.g.,
38/// Int32 or String) with its nullability.
39///
40/// To construct a column type, either initialize the struct directly, or
41/// use the [`ScalarType::nullable`] method.
42#[derive(
43    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
44)]
45pub struct ColumnType {
46    /// The underlying scalar type (e.g., Int32 or String) of this column.
47    pub scalar_type: ScalarType,
48    /// Whether this datum can be null.
49    #[serde(default = "return_true")]
50    pub nullable: bool,
51}
52
53/// This method exists solely for the purpose of making ColumnType nullable by
54/// default in unit tests. The default value of a bool is false, and the only
55/// way to make an object take on any other value by default is to pass it a
56/// function that returns the desired default value. See
57/// <https://github.com/serde-rs/serde/issues/1030>
58#[inline(always)]
59fn return_true() -> bool {
60    true
61}
62
63impl ColumnType {
64    pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
65        match (&self.scalar_type, &other.scalar_type) {
66            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
67                Ok(ColumnType {
68                    scalar_type: scalar_type.clone(),
69                    nullable: self.nullable || other.nullable,
70                })
71            }
72            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
73                Ok(ColumnType {
74                    scalar_type: scalar_type.without_modifiers(),
75                    nullable: self.nullable || other.nullable,
76                })
77            }
78            (
79                ScalarType::Record { fields, custom_id },
80                ScalarType::Record {
81                    fields: other_fields,
82                    custom_id: other_custom_id,
83                },
84            ) => {
85                if custom_id != other_custom_id {
86                    bail!(
87                        "Can't union types: {:?} and {:?}",
88                        self.scalar_type,
89                        other.scalar_type
90                    );
91                };
92
93                let mut union_fields = Vec::with_capacity(fields.len());
94                for ((name, typ), (other_name, other_typ)) in fields.iter().zip(other_fields.iter())
95                {
96                    if name != other_name {
97                        bail!(
98                            "Can't union types: {:?} and {:?}",
99                            self.scalar_type,
100                            other.scalar_type
101                        );
102                    } else {
103                        let union_column_type = typ.union(other_typ)?;
104                        union_fields.push((name.clone(), union_column_type));
105                    };
106                }
107
108                Ok(ColumnType {
109                    scalar_type: ScalarType::Record {
110                        fields: union_fields.into(),
111                        custom_id: *custom_id,
112                    },
113                    nullable: self.nullable || other.nullable,
114                })
115            }
116            _ => bail!(
117                "Can't union types: {:?} and {:?}",
118                self.scalar_type,
119                other.scalar_type
120            ),
121        }
122    }
123
124    /// Consumes this `ColumnType` and returns a new `ColumnType` with its
125    /// nullability set to the specified boolean.
126    pub fn nullable(mut self, nullable: bool) -> Self {
127        self.nullable = nullable;
128        self
129    }
130}
131
132impl RustType<ProtoColumnType> for ColumnType {
133    fn into_proto(&self) -> ProtoColumnType {
134        ProtoColumnType {
135            nullable: self.nullable,
136            scalar_type: Some(self.scalar_type.into_proto()),
137        }
138    }
139
140    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
141        Ok(ColumnType {
142            nullable: proto.nullable,
143            scalar_type: proto
144                .scalar_type
145                .into_rust_if_some("ProtoColumnType::scalar_type")?,
146        })
147    }
148}
149
150impl fmt::Display for ColumnType {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        let nullable = if self.nullable { "Null" } else { "NotNull" };
153        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
154    }
155}
156
157/// The type of a relation.
158#[derive(
159    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
160)]
161pub struct RelationType {
162    /// The type for each column, in order.
163    pub column_types: Vec<ColumnType>,
164    /// Sets of indices that are "keys" for the collection.
165    ///
166    /// Each element in this list is a set of column indices, each with the
167    /// property that the collection contains at most one record with each
168    /// distinct set of values for each column. Alternately, for a specific set
169    /// of values assigned to the these columns there is at most one record.
170    ///
171    /// A collection can contain multiple sets of keys, although it is common to
172    /// have either zero or one sets of key indices.
173    #[serde(default)]
174    pub keys: Vec<Vec<usize>>,
175}
176
177impl RelationType {
178    /// Constructs a `RelationType` representing the relation with no columns and
179    /// no keys.
180    pub fn empty() -> Self {
181        RelationType::new(vec![])
182    }
183
184    /// Constructs a new `RelationType` from specified column types.
185    ///
186    /// The `RelationType` will have no keys.
187    pub fn new(column_types: Vec<ColumnType>) -> Self {
188        RelationType {
189            column_types,
190            keys: Vec::new(),
191        }
192    }
193
194    /// Adds a new key for the relation.
195    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
196        indices.sort_unstable();
197        if !self.keys.contains(&indices) {
198            self.keys.push(indices);
199        }
200        self
201    }
202
203    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
204        for key in keys {
205            self = self.with_key(key)
206        }
207        self
208    }
209
210    /// Computes the number of columns in the relation.
211    pub fn arity(&self) -> usize {
212        self.column_types.len()
213    }
214
215    /// Gets the index of the columns used when creating a default index.
216    pub fn default_key(&self) -> Vec<usize> {
217        if let Some(key) = self.keys.first() {
218            if key.is_empty() {
219                (0..self.column_types.len()).collect()
220            } else {
221                key.clone()
222            }
223        } else {
224            (0..self.column_types.len()).collect()
225        }
226    }
227
228    /// Returns all the [`ColumnType`]s, in order, for this relation.
229    pub fn columns(&self) -> &[ColumnType] {
230        &self.column_types
231    }
232}
233
234impl RustType<ProtoRelationType> for RelationType {
235    fn into_proto(&self) -> ProtoRelationType {
236        ProtoRelationType {
237            column_types: self.column_types.into_proto(),
238            keys: self.keys.into_proto(),
239        }
240    }
241
242    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
243        Ok(RelationType {
244            column_types: proto.column_types.into_rust()?,
245            keys: proto.keys.into_rust()?,
246        })
247    }
248}
249
250impl RustType<ProtoKey> for Vec<usize> {
251    fn into_proto(&self) -> ProtoKey {
252        ProtoKey {
253            keys: self.into_proto(),
254        }
255    }
256
257    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
258        proto.keys.into_rust()
259    }
260}
261
262/// The name of a column in a [`RelationDesc`].
263#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
264pub struct ColumnName(Box<str>);
265
266impl ColumnName {
267    /// Returns this column name as a `str`.
268    #[inline(always)]
269    pub fn as_str(&self) -> &str {
270        &*self
271    }
272
273    /// Returns this column name as a `&mut Box<str>`.
274    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
275        &mut self.0
276    }
277
278    /// Returns if this [`ColumnName`] is similar to the provided one.
279    pub fn is_similar(&self, other: &ColumnName) -> bool {
280        const SIMILARITY_THRESHOLD: f64 = 0.6;
281
282        let a_lowercase = self.to_lowercase();
283        let b_lowercase = other.to_lowercase();
284
285        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
286    }
287}
288
289impl std::ops::Deref for ColumnName {
290    type Target = str;
291
292    #[inline(always)]
293    fn deref(&self) -> &Self::Target {
294        &self.0
295    }
296}
297
298impl fmt::Display for ColumnName {
299    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
300        f.write_str(&self.0)
301    }
302}
303
304impl From<String> for ColumnName {
305    fn from(s: String) -> ColumnName {
306        ColumnName(s.into())
307    }
308}
309
310impl From<&str> for ColumnName {
311    fn from(s: &str) -> ColumnName {
312        ColumnName(s.into())
313    }
314}
315
316impl From<&ColumnName> for ColumnName {
317    fn from(n: &ColumnName) -> ColumnName {
318        n.clone()
319    }
320}
321
322impl RustType<ProtoColumnName> for ColumnName {
323    fn into_proto(&self) -> ProtoColumnName {
324        ProtoColumnName {
325            value: Some(self.0.to_string()),
326        }
327    }
328
329    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
330        Ok(ColumnName(
331            proto
332                .value
333                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
334                .into(),
335        ))
336    }
337}
338
339impl From<ColumnName> for mz_sql_parser::ast::Ident {
340    fn from(value: ColumnName) -> Self {
341        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
342        mz_sql_parser::ast::Ident::new_unchecked(value.0)
343    }
344}
345
346impl proptest::arbitrary::Arbitrary for ColumnName {
347    type Parameters = ();
348    type Strategy = BoxedStrategy<ColumnName>;
349
350    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
351        // Long column names are generally uninteresting, and can greatly
352        // increase the runtime for a test case, so bound the max length.
353        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
354        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
355            weights.extend([
356                (5, Just(16..128)),
357                (1, Just(128..1024)),
358                (1, Just(1024..4096)),
359            ]);
360        }
361        let name_length = Union::new_weighted(weights);
362
363        // Non-ASCII characters are also generally uninteresting and can make
364        // debugging harder.
365        let char_strat = Rc::new(Union::new_weighted(vec![
366            (50, proptest::char::range('A', 'z').boxed()),
367            (1, any::<char>().boxed()),
368        ]));
369
370        name_length
371            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
372            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
373            .no_shrink()
374            .boxed()
375    }
376}
377
378/// Stable index of a column in a [`RelationDesc`].
379#[derive(
380    Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
381)]
382pub struct ColumnIndex(usize);
383
384static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
385
386impl ColumnIndex {
387    /// Returns a stable identifier for this [`ColumnIndex`].
388    pub fn to_stable_name(&self) -> String {
389        self.0.to_string()
390    }
391
392    pub fn to_raw(&self) -> usize {
393        self.0
394    }
395
396    pub fn from_raw(val: usize) -> Self {
397        ColumnIndex(val)
398    }
399}
400
401/// The version a given column was added at.
402#[derive(
403    Clone,
404    Copy,
405    Debug,
406    Eq,
407    PartialEq,
408    PartialOrd,
409    Ord,
410    Serialize,
411    Deserialize,
412    Hash,
413    MzReflect,
414    Arbitrary,
415)]
416pub struct RelationVersion(u64);
417
418impl RelationVersion {
419    /// Returns the "root" or "initial" version of a [`RelationDesc`].
420    pub fn root() -> Self {
421        RelationVersion(0)
422    }
423
424    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
425    pub fn bump(&self) -> Self {
426        let next_version = self
427            .0
428            .checked_add(1)
429            .expect("added more than u64::MAX columns?");
430        RelationVersion(next_version)
431    }
432
433    /// Consume a [`RelationVersion`] returning the raw value.
434    ///
435    /// Should __only__ be used for serialization.
436    pub fn into_raw(self) -> u64 {
437        self.0
438    }
439
440    /// Create a [`RelationVersion`] from a raw value.
441    ///
442    /// Should __only__ be used for serialization.
443    pub fn from_raw(val: u64) -> RelationVersion {
444        RelationVersion(val)
445    }
446}
447
448impl From<RelationVersion> for SchemaId {
449    fn from(value: RelationVersion) -> Self {
450        SchemaId(usize::cast_from(value.0))
451    }
452}
453
454impl From<mz_sql_parser::ast::Version> for RelationVersion {
455    fn from(value: mz_sql_parser::ast::Version) -> Self {
456        RelationVersion(value.into_inner())
457    }
458}
459
460impl fmt::Display for RelationVersion {
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        write!(f, "v{}", self.0)
463    }
464}
465
466impl From<RelationVersion> for mz_sql_parser::ast::Version {
467    fn from(value: RelationVersion) -> Self {
468        mz_sql_parser::ast::Version::new(value.0)
469    }
470}
471
472impl RustType<ProtoRelationVersion> for RelationVersion {
473    fn into_proto(&self) -> ProtoRelationVersion {
474        ProtoRelationVersion { value: self.0 }
475    }
476
477    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
478        Ok(RelationVersion(proto.value))
479    }
480}
481
482/// Metadata (other than type) for a column in a [`RelationDesc`].
483#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
484struct ColumnMetadata {
485    /// Name of the column.
486    name: ColumnName,
487    /// Index into a [`RelationType`] for this column.
488    typ_idx: usize,
489    /// Version this column was added at.
490    added: RelationVersion,
491    /// Version this column was dropped at.
492    dropped: Option<RelationVersion>,
493}
494
495/// A description of the shape of a relation.
496///
497/// It bundles a [`RelationType`] with `ColumnMetadata` for each column in
498/// the relation.
499///
500/// # Examples
501///
502/// A `RelationDesc`s is typically constructed via its builder API:
503///
504/// ```
505/// use mz_repr::{ColumnType, RelationDesc, ScalarType};
506///
507/// let desc = RelationDesc::builder()
508///     .with_column("id", ScalarType::Int64.nullable(false))
509///     .with_column("price", ScalarType::Float64.nullable(true))
510///     .finish();
511/// ```
512///
513/// In more complicated cases, like when constructing a `RelationDesc` in
514/// response to user input, it may be more convenient to construct a relation
515/// type first, and imbue it with column names to form a `RelationDesc` later:
516///
517/// ```
518/// use mz_repr::RelationDesc;
519///
520/// # fn plan_query(_: &str) -> mz_repr::RelationType { mz_repr::RelationType::new(vec![]) }
521/// let relation_type = plan_query("SELECT * FROM table");
522/// let names = (0..relation_type.arity()).map(|i| match i {
523///     0 => "first",
524///     1 => "second",
525///     _ => "unknown",
526/// });
527/// let desc = RelationDesc::new(relation_type, names);
528/// ```
529///
530/// Next to the [`RelationType`] we maintain a map of `ColumnIndex` to
531/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
532/// column throughout the lifetime of the relation. This allows a
533/// [`RelationDesc`] to represent a projection over a version of itself.
534///
535/// ```
536/// use std::collections::BTreeSet;
537/// use mz_repr::{ColumnIndex, RelationDesc, ScalarType};
538///
539/// let desc = RelationDesc::builder()
540///     .with_column("name", ScalarType::String.nullable(false))
541///     .with_column("email", ScalarType::String.nullable(false))
542///     .finish();
543///
544/// // Project away the second column.
545/// let demands = BTreeSet::from([1]);
546/// let proj = desc.apply_demand(&demands);
547///
548/// // We projected away the first column.
549/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
550/// // But retained the second.
551/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
552///
553/// // The underlying `RelationType` also contains a single column.
554/// assert_eq!(proj.typ().arity(), 1);
555/// ```
556///
557/// To maintain this stable mapping and track the lifetime of a column (e.g.
558/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
559/// the index in [`RelationType`] that corresponds to a given column, and the
560/// version at which this column was added or dropped.
561///
562#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
563pub struct RelationDesc {
564    typ: RelationType,
565    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
566}
567
568impl RustType<ProtoRelationDesc> for RelationDesc {
569    fn into_proto(&self) -> ProtoRelationDesc {
570        let (names, metadata): (Vec<_>, Vec<_>) = self
571            .metadata
572            .values()
573            .map(|meta| {
574                let metadata = ProtoColumnMetadata {
575                    added: Some(meta.added.into_proto()),
576                    dropped: meta.dropped.map(|v| v.into_proto()),
577                };
578                (meta.name.into_proto(), metadata)
579            })
580            .unzip();
581
582        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
583        // metadata field was added. To make sure our serialization roundtrips the same as before
584        // we added the field, we omit `metadata` if all of the values are equal to the default.
585        //
586        // Note: This logic needs to exist approximately forever.
587        let is_all_default_metadata = metadata.iter().all(|meta| {
588            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
589        });
590        let metadata = if is_all_default_metadata {
591            Vec::new()
592        } else {
593            metadata
594        };
595
596        ProtoRelationDesc {
597            typ: Some(self.typ.into_proto()),
598            names,
599            metadata,
600        }
601    }
602
603    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
604        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
605        // metadata field was added. If the field doesn't exist we fill it in with default values,
606        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
607        //
608        // Note: This logic needs to exist approximately forever.
609        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
610            let val = ProtoColumnMetadata {
611                added: Some(RelationVersion::root().into_proto()),
612                dropped: None,
613            };
614            Box::new(itertools::repeat_n(val, proto.names.len()))
615        } else {
616            Box::new(proto.metadata.into_iter())
617        };
618
619        let metadata = proto
620            .names
621            .into_iter()
622            .zip_eq(proto_metadata)
623            .enumerate()
624            .map(|(idx, (name, metadata))| {
625                let meta = ColumnMetadata {
626                    name: name.into_rust()?,
627                    typ_idx: idx,
628                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
629                    dropped: metadata.dropped.into_rust()?,
630                };
631                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
632            })
633            .collect::<Result<_, _>>()?;
634
635        Ok(RelationDesc {
636            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
637            metadata,
638        })
639    }
640}
641
642impl RelationDesc {
643    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
644    pub fn builder() -> RelationDescBuilder {
645        RelationDescBuilder::default()
646    }
647
648    /// Constructs a new `RelationDesc` that represents the empty relation
649    /// with no columns and no keys.
650    pub fn empty() -> Self {
651        RelationDesc {
652            typ: RelationType::empty(),
653            metadata: BTreeMap::default(),
654        }
655    }
656
657    /// Check if the `RelationDesc` is empty.
658    pub fn is_empty(&self) -> bool {
659        self == &Self::empty()
660    }
661
662    /// Returns the number of columns in this [`RelationDesc`].
663    pub fn len(&self) -> usize {
664        self.typ().column_types.len()
665    }
666
667    /// Constructs a new `RelationDesc` from a `RelationType` and an iterator
668    /// over column names.
669    ///
670    /// # Panics
671    ///
672    /// Panics if the arity of the `RelationType` is not equal to the number of
673    /// items in `names`.
674    pub fn new<I, N>(typ: RelationType, names: I) -> Self
675    where
676        I: IntoIterator<Item = N>,
677        N: Into<ColumnName>,
678    {
679        let metadata: BTreeMap<_, _> = names
680            .into_iter()
681            .enumerate()
682            .map(|(idx, name)| {
683                let col_idx = ColumnIndex(idx);
684                let metadata = ColumnMetadata {
685                    name: name.into(),
686                    typ_idx: idx,
687                    added: RelationVersion::root(),
688                    dropped: None,
689                };
690                (col_idx, metadata)
691            })
692            .collect();
693
694        // TODO(parkmycar): Add better validation here.
695        assert_eq!(typ.column_types.len(), metadata.len());
696
697        RelationDesc { typ, metadata }
698    }
699
700    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
701    where
702        I: IntoIterator<Item = (N, T)>,
703        T: Into<ColumnType>,
704        N: Into<ColumnName>,
705    {
706        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
707        let types = types.into_iter().map(Into::into).collect();
708        let typ = RelationType::new(types);
709        Self::new(typ, names)
710    }
711
712    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
713    ///
714    /// # Panics
715    ///
716    /// Panics if either `self` or `other` have columns that were added at a
717    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
718    /// columns were dropped.
719    ///
720    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
721    pub fn concat(mut self, other: Self) -> Self {
722        let self_len = self.typ.column_types.len();
723
724        for (typ, (_col_idx, meta)) in other
725            .typ
726            .column_types
727            .into_iter()
728            .zip_eq(other.metadata.into_iter())
729        {
730            assert_eq!(meta.added, RelationVersion::root());
731            assert_none!(meta.dropped);
732
733            let new_idx = self.typ.columns().len();
734            let new_meta = ColumnMetadata {
735                name: meta.name,
736                typ_idx: new_idx,
737                added: RelationVersion::root(),
738                dropped: None,
739            };
740
741            self.typ.column_types.push(typ);
742            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
743
744            assert_eq!(self.metadata.len(), self.typ.columns().len());
745            assert_none!(prev);
746        }
747
748        for k in other.typ.keys {
749            let k = k.into_iter().map(|idx| idx + self_len).collect();
750            self = self.with_key(k);
751        }
752        self
753    }
754
755    /// Adds a new key for the relation.
756    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
757        self.typ = self.typ.with_key(indices);
758        self
759    }
760
761    /// Drops all existing keys.
762    pub fn without_keys(mut self) -> Self {
763        self.typ.keys.clear();
764        self
765    }
766
767    /// Builds a new relation description with the column names replaced with
768    /// new names.
769    ///
770    /// # Panics
771    ///
772    /// Panics if the arity of the relation type does not match the number of
773    /// items in `names`.
774    pub fn with_names<I, N>(self, names: I) -> Self
775    where
776        I: IntoIterator<Item = N>,
777        N: Into<ColumnName>,
778    {
779        Self::new(self.typ, names)
780    }
781
782    /// Computes the number of columns in the relation.
783    pub fn arity(&self) -> usize {
784        self.typ.arity()
785    }
786
787    /// Returns the relation type underlying this relation description.
788    pub fn typ(&self) -> &RelationType {
789        &self.typ
790    }
791
792    /// Returns an iterator over the columns in this relation.
793    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &ColumnType)> {
794        self.metadata.values().map(|meta| {
795            let typ = &self.typ.columns()[meta.typ_idx];
796            (&meta.name, typ)
797        })
798    }
799
800    /// Returns an iterator over the types of the columns in this relation.
801    pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
802        self.typ.column_types.iter()
803    }
804
805    /// Returns an iterator over the names of the columns in this relation.
806    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
807        self.metadata.values().map(|meta| &meta.name)
808    }
809
810    /// Returns an iterator over the columns in this relation, with all their metadata.
811    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &ColumnType)> {
812        self.metadata.iter().map(|(col_idx, metadata)| {
813            let col_typ = &self.typ.columns()[metadata.typ_idx];
814            (col_idx, &metadata.name, col_typ)
815        })
816    }
817
818    /// Returns an iterator over the names of the columns in this relation that are "similar" to
819    /// the provided `name`.
820    pub fn iter_similar_names<'a>(
821        &'a self,
822        name: &'a ColumnName,
823    ) -> impl Iterator<Item = &'a ColumnName> {
824        self.iter_names().filter(|n| n.is_similar(name))
825    }
826
827    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
828    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
829        self.metadata.contains_key(idx)
830    }
831
832    /// Finds a column by name.
833    ///
834    /// Returns the index and type of the column named `name`. If no column with
835    /// the specified name exists, returns `None`. If multiple columns have the
836    /// specified name, the leftmost column is returned.
837    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
838        self.iter_names()
839            .position(|n| n == name)
840            .map(|i| (i, &self.typ.column_types[i]))
841    }
842
843    /// Gets the name of the `i`th column.
844    ///
845    /// # Panics
846    ///
847    /// Panics if `i` is not a valid column index.
848    ///
849    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
850    pub fn get_name(&self, i: usize) -> &ColumnName {
851        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
852        self.get_name_idx(&ColumnIndex(i))
853    }
854
855    /// Gets the name of the column at `idx`.
856    ///
857    /// # Panics
858    ///
859    /// Panics if no column exists at `idx`.
860    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
861        &self.metadata.get(idx).expect("should exist").name
862    }
863
864    /// Mutably gets the name of the `i`th column.
865    ///
866    /// # Panics
867    ///
868    /// Panics if `i` is not a valid column index.
869    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
870        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
871        &mut self
872            .metadata
873            .get_mut(&ColumnIndex(i))
874            .expect("should exist")
875            .name
876    }
877
878    /// Gets the [`ColumnType`] of the column at `idx`.
879    ///
880    /// # Panics
881    ///
882    /// Panics if no column exists at `idx`.
883    pub fn get_type(&self, idx: &ColumnIndex) -> &ColumnType {
884        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
885        &self.typ.column_types[typ_idx]
886    }
887
888    /// Gets the name of the `i`th column if that column name is unambiguous.
889    ///
890    /// If at least one other column has the same name as the `i`th column,
891    /// returns `None`. If the `i`th column has no name, returns `None`.
892    ///
893    /// # Panics
894    ///
895    /// Panics if `i` is not a valid column index.
896    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
897        let name = self.get_name(i);
898        if self.iter_names().filter(|n| *n == name).count() == 1 {
899            Some(name)
900        } else {
901            None
902        }
903    }
904
905    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
906    ///
907    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
908    /// structure will be simple to extend.
909    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
910        let name = self.get_name(i);
911        let typ = &self.typ.column_types[i];
912        if d == &Datum::Null && !typ.nullable {
913            Err(NotNullViolation(name.clone()))
914        } else {
915            Ok(())
916        }
917    }
918
919    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
920    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
921        let mut new_desc = self.clone();
922
923        // Update ColumnMetadata.
924        let mut removed = 0;
925        new_desc.metadata.retain(|idx, metadata| {
926            let retain = demands.contains(&idx.0);
927            if !retain {
928                removed += 1;
929            } else {
930                metadata.typ_idx -= removed;
931            }
932            retain
933        });
934
935        // Update ColumnType.
936        let mut idx = 0;
937        new_desc.typ.column_types.retain(|_| {
938            let keep = demands.contains(&idx);
939            idx += 1;
940            keep
941        });
942
943        new_desc
944    }
945}
946
947impl Arbitrary for RelationDesc {
948    type Parameters = ();
949    type Strategy = BoxedStrategy<RelationDesc>;
950
951    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
952        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
953        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
954            weights.extend([
955                (12, Just(16..32)),
956                (6, Just(32..64)),
957                (3, Just(64..128)),
958                (1, Just(128..256)),
959            ]);
960        }
961        let num_columns = Union::new_weighted(weights);
962
963        num_columns.prop_flat_map(arb_relation_desc).boxed()
964    }
965}
966
967/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
968/// within the range provided.
969pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
970    proptest::collection::btree_map(any::<ColumnName>(), any::<ColumnType>(), num_cols)
971        .prop_map(RelationDesc::from_names_and_types)
972}
973
974/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
975pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
976    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
977    mask.prop_map(move |mask| {
978        let demands: BTreeSet<_> = mask
979            .into_iter()
980            .enumerate()
981            .filter_map(|(idx, keep)| keep.then_some(idx))
982            .collect();
983        desc.apply_demand(&demands)
984    })
985}
986
987impl IntoIterator for RelationDesc {
988    type Item = (ColumnName, ColumnType);
989    type IntoIter = Box<dyn Iterator<Item = (ColumnName, ColumnType)>>;
990
991    fn into_iter(self) -> Self::IntoIter {
992        let iter = self
993            .metadata
994            .into_values()
995            .zip_eq(self.typ.column_types)
996            .map(|(meta, typ)| (meta.name, typ));
997        Box::new(iter)
998    }
999}
1000
1001/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1002pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1003    let datums: Vec<_> = desc
1004        .typ()
1005        .columns()
1006        .iter()
1007        .cloned()
1008        .map(arb_datum_for_column)
1009        .collect();
1010    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1011}
1012
1013/// Expression violated not-null constraint on named column
1014#[derive(Debug, PartialEq, Eq)]
1015pub struct NotNullViolation(pub ColumnName);
1016
1017impl fmt::Display for NotNullViolation {
1018    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1019        write!(
1020            f,
1021            "null value in column {} violates not-null constraint",
1022            self.0.quoted()
1023        )
1024    }
1025}
1026
1027/// A builder for a [`RelationDesc`].
1028#[derive(Clone, Default, Debug, PartialEq, Eq)]
1029pub struct RelationDescBuilder {
1030    /// Columns of the relation.
1031    columns: Vec<(ColumnName, ColumnType)>,
1032    /// Sets of indices that are "keys" for the collection.
1033    keys: Vec<Vec<usize>>,
1034}
1035
1036impl RelationDescBuilder {
1037    /// Appends a column with the specified name and type.
1038    pub fn with_column<N: Into<ColumnName>>(
1039        mut self,
1040        name: N,
1041        ty: ColumnType,
1042    ) -> RelationDescBuilder {
1043        let name = name.into();
1044        self.columns.push((name, ty));
1045        self
1046    }
1047
1048    /// Appends the provided columns to the builder.
1049    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1050    where
1051        I: IntoIterator<Item = (N, T)>,
1052        T: Into<ColumnType>,
1053        N: Into<ColumnName>,
1054    {
1055        self.columns
1056            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1057        self
1058    }
1059
1060    /// Adds a new key for the relation.
1061    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1062        indices.sort_unstable();
1063        if !self.keys.contains(&indices) {
1064            self.keys.push(indices);
1065        }
1066        self
1067    }
1068
1069    /// Removes all previously inserted keys.
1070    pub fn without_keys(mut self) -> RelationDescBuilder {
1071        self.keys.clear();
1072        assert_eq!(self.keys.len(), 0);
1073        self
1074    }
1075
1076    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1077    pub fn concat(mut self, other: Self) -> Self {
1078        let self_len = self.columns.len();
1079
1080        self.columns.extend(other.columns);
1081        for k in other.keys {
1082            let k = k.into_iter().map(|idx| idx + self_len).collect();
1083            self = self.with_key(k);
1084        }
1085
1086        self
1087    }
1088
1089    /// Finish the builder, returning a [`RelationDesc`].
1090    pub fn finish(self) -> RelationDesc {
1091        let mut desc = RelationDesc::from_names_and_types(self.columns);
1092        desc.typ = desc.typ.with_keys(self.keys);
1093        desc
1094    }
1095}
1096
1097/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1098#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1099pub enum RelationVersionSelector {
1100    Specific(RelationVersion),
1101    Latest,
1102}
1103
1104impl RelationVersionSelector {
1105    pub fn specific(version: u64) -> Self {
1106        RelationVersionSelector::Specific(RelationVersion(version))
1107    }
1108}
1109
1110/// A wrapper around [`RelationDesc`] that provides an interface for adding
1111/// columns and generating new versions.
1112///
1113/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1114/// be great.
1115#[derive(Debug, Clone, Serialize)]
1116pub struct VersionedRelationDesc {
1117    inner: RelationDesc,
1118}
1119
1120impl VersionedRelationDesc {
1121    pub fn new(inner: RelationDesc) -> Self {
1122        VersionedRelationDesc { inner }
1123    }
1124
1125    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1126    ///
1127    /// # Panics
1128    ///
1129    /// * Panics if a column with `name` already exists that hasn't been dropped.
1130    ///
1131    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1132    #[must_use]
1133    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1134    where
1135        N: Into<ColumnName>,
1136        T: Into<ColumnType>,
1137    {
1138        let latest_version = self.latest_version();
1139        let new_version = latest_version.bump();
1140
1141        let name = name.into();
1142        let existing = self
1143            .inner
1144            .metadata
1145            .iter()
1146            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1147        if let Some(existing) = existing {
1148            panic!("column named '{name}' already exists! {existing:?}");
1149        }
1150
1151        let next_idx = self.inner.metadata.len();
1152        let col_meta = ColumnMetadata {
1153            name,
1154            typ_idx: next_idx,
1155            added: new_version,
1156            dropped: None,
1157        };
1158
1159        self.inner.typ.column_types.push(typ.into());
1160        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1161
1162        assert_none!(prev, "column index overlap!");
1163        self.validate();
1164
1165        new_version
1166    }
1167
1168    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1169    /// `name` drops the left-most one that hasn't already been dropped.
1170    ///
1171    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1172    ///
1173    /// # Panics
1174    ///
1175    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1176    #[must_use]
1177    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1178    where
1179        N: Into<ColumnName>,
1180    {
1181        let name = name.into();
1182        let latest_version = self.latest_version();
1183        let new_version = latest_version.bump();
1184
1185        let col = self
1186            .inner
1187            .metadata
1188            .values_mut()
1189            .find(|meta| meta.name == name && meta.dropped.is_none())
1190            .expect("column to exist");
1191
1192        // Make sure the column hadn't been previously dropped.
1193        assert_none!(col.dropped, "column was already dropped");
1194        col.dropped = Some(new_version);
1195
1196        // Make sure the column isn't being used as a key.
1197        let dropped_key = self
1198            .inner
1199            .typ
1200            .keys
1201            .iter()
1202            .any(|keys| keys.iter().any(|key| *key == col.typ_idx));
1203        assert!(!dropped_key, "column being dropped was used as a key");
1204
1205        self.validate();
1206        new_version
1207    }
1208
1209    /// Returns the [`RelationDesc`] at the latest version.
1210    pub fn latest(&self) -> RelationDesc {
1211        self.inner.clone()
1212    }
1213
1214    /// Returns this [`RelationDesc`] at the specified version.
1215    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1216        // Get all of the changes from the start, up to whatever version was requested.
1217        //
1218        // TODO(parkmycar): We should probably panic on unknown verisons?
1219        let up_to_version = match version {
1220            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1221            RelationVersionSelector::Specific(v) => v,
1222        };
1223
1224        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1225            let added = meta.added <= up_to_version;
1226            let dropped = meta
1227                .dropped
1228                .map(|dropped_at| up_to_version >= dropped_at)
1229                .unwrap_or(false);
1230
1231            added && !dropped
1232        });
1233
1234        let mut column_types = Vec::new();
1235        let mut column_metas = BTreeMap::new();
1236
1237        // N.B. At this point we need to be careful because col_idx might not
1238        // equal typ_idx.
1239        //
1240        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1241        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1242        // indexes 0 and 2, but their indices in RelationType will be 0 and 1.
1243        for (col_idx, meta) in valid_columns {
1244            let new_meta = ColumnMetadata {
1245                name: meta.name.clone(),
1246                typ_idx: column_types.len(),
1247                added: meta.added.clone(),
1248                dropped: meta.dropped.clone(),
1249            };
1250            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1251            column_metas.insert(*col_idx, new_meta);
1252        }
1253
1254        // Remap keys in case a column with an index less than that of a key was
1255        // dropped.
1256        //
1257        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1258        // keys and "b" was dropped.
1259        let keys = self
1260            .inner
1261            .typ
1262            .keys
1263            .iter()
1264            .map(|keys| {
1265                keys.iter()
1266                    .map(|key_idx| {
1267                        let metadata = column_metas
1268                            .get(&ColumnIndex(*key_idx))
1269                            .expect("found key for column that doesn't exist");
1270                        metadata.typ_idx
1271                    })
1272                    .collect()
1273            })
1274            .collect();
1275
1276        let relation_type = RelationType { column_types, keys };
1277
1278        RelationDesc {
1279            typ: relation_type,
1280            metadata: column_metas,
1281        }
1282    }
1283
1284    pub fn latest_version(&self) -> RelationVersion {
1285        self.inner
1286            .metadata
1287            .values()
1288            // N.B. Dropped is always greater than added.
1289            .map(|meta| meta.dropped.unwrap_or(meta.added))
1290            .max()
1291            // If there aren't any columns we're implicitly the root version.
1292            .unwrap_or_else(RelationVersion::root)
1293    }
1294
1295    /// Validates internal contraints of the [`RelationDesc`] are correct.
1296    ///
1297    /// # Panics
1298    ///
1299    /// Panics if a constraint is not satisfied.
1300    fn validate(&self) {
1301        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1302            if desc.typ.column_types.len() != desc.metadata.len() {
1303                anyhow::bail!("mismatch between number of types and metadatas");
1304            }
1305
1306            for (col_idx, meta) in &desc.metadata {
1307                if col_idx.0 > desc.metadata.len() {
1308                    anyhow::bail!("column index out of bounds");
1309                }
1310                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1311                    anyhow::bail!("column was added after it was dropped?");
1312                }
1313                if desc.typ().columns().get(meta.typ_idx).is_none() {
1314                    anyhow::bail!("typ_idx incorrect");
1315                }
1316            }
1317
1318            for keys in &desc.typ.keys {
1319                for key in keys {
1320                    if *key >= desc.typ.column_types.len() {
1321                        anyhow::bail!("key index was out of bounds!");
1322                    }
1323                }
1324            }
1325
1326            let versions = desc
1327                .metadata
1328                .values()
1329                .map(|meta| meta.dropped.unwrap_or(meta.added));
1330            let mut max = 0;
1331            let mut sum = 0;
1332            for version in versions {
1333                max = std::cmp::max(max, version.0);
1334                sum += version.0;
1335            }
1336
1337            // Other than RelationVersion(0), we should never have duplicate
1338            // versions and they should always increase by 1. In other words, the
1339            // sum of all RelationVersions should be the sum of [0, max].
1340            //
1341            // N.B. n * (n + 1) / 2 = sum of [0, n]
1342            //
1343            // While I normally don't like tricks like this, it allows us to
1344            // validate that our column versions are correct in O(n) time and
1345            // without allocations.
1346            if sum != (max * (max + 1) / 2) {
1347                anyhow::bail!("there is a duplicate or missing relation version");
1348            }
1349
1350            Ok(())
1351        }
1352
1353        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1354    }
1355}
1356
1357/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1358/// exercise schema migrations.
1359#[derive(Debug)]
1360pub enum PropRelationDescDiff {
1361    AddColumn { name: ColumnName, typ: ColumnType },
1362    DropColumn { name: ColumnName },
1363    ToggleNullability { name: ColumnName },
1364    ChangeType { name: ColumnName, typ: ColumnType },
1365}
1366
1367impl PropRelationDescDiff {
1368    pub fn apply(self, desc: &mut RelationDesc) {
1369        match self {
1370            PropRelationDescDiff::AddColumn { name, typ } => {
1371                let new_idx = desc.metadata.len();
1372                let meta = ColumnMetadata {
1373                    name,
1374                    typ_idx: new_idx,
1375                    added: RelationVersion(0),
1376                    dropped: None,
1377                };
1378                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1379                desc.typ.column_types.push(typ);
1380
1381                assert_none!(prev);
1382                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1383            }
1384            PropRelationDescDiff::DropColumn { name } => {
1385                let next_version = desc
1386                    .metadata
1387                    .values()
1388                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1389                    .max()
1390                    .unwrap_or_else(RelationVersion::root)
1391                    .bump();
1392                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1393                else {
1394                    return;
1395                };
1396                if metadata.dropped.is_none() {
1397                    metadata.dropped = Some(next_version);
1398                }
1399            }
1400            PropRelationDescDiff::ToggleNullability { name } => {
1401                let Some((pos, _)) = desc.get_by_name(&name) else {
1402                    return;
1403                };
1404                let col_type = desc
1405                    .typ
1406                    .column_types
1407                    .get_mut(pos)
1408                    .expect("ColumnNames and ColumnTypes out of sync!");
1409                col_type.nullable = !col_type.nullable;
1410            }
1411            PropRelationDescDiff::ChangeType { name, typ } => {
1412                let Some((pos, _)) = desc.get_by_name(&name) else {
1413                    return;
1414                };
1415                let col_type = desc
1416                    .typ
1417                    .column_types
1418                    .get_mut(pos)
1419                    .expect("ColumnNames and ColumnTypes out of sync!");
1420                *col_type = typ;
1421            }
1422        }
1423    }
1424}
1425
1426/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1427pub fn arb_relation_desc_diff(
1428    source: &RelationDesc,
1429) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1430    let source = Rc::new(source.clone());
1431    let num_source_columns = source.typ.columns().len();
1432
1433    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1434    let add_columns_strat = num_add_columns
1435        .prop_flat_map(|num_columns| {
1436            proptest::collection::vec((any::<ColumnName>(), any::<ColumnType>()), num_columns)
1437        })
1438        .prop_map(|cols| {
1439            cols.into_iter()
1440                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1441                .collect::<Vec<_>>()
1442        });
1443
1444    // If the source RelationDesc is empty there is nothing else to do.
1445    if num_source_columns == 0 {
1446        return add_columns_strat.boxed();
1447    }
1448
1449    let source_ = Rc::clone(&source);
1450    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1451        let mut set = BTreeSet::default();
1452        for _ in 0..num_columns {
1453            let col_idx = rng.random_range(0..num_source_columns);
1454            set.insert(source_.get_name(col_idx).clone());
1455        }
1456        set.into_iter()
1457            .map(|name| PropRelationDescDiff::DropColumn { name })
1458            .collect::<Vec<_>>()
1459    });
1460
1461    let source_ = Rc::clone(&source);
1462    let toggle_nullability_strat =
1463        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1464            let mut set = BTreeSet::default();
1465            for _ in 0..num_columns {
1466                let col_idx = rng.random_range(0..num_source_columns);
1467                set.insert(source_.get_name(col_idx).clone());
1468            }
1469            set.into_iter()
1470                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1471                .collect::<Vec<_>>()
1472        });
1473
1474    let source_ = Rc::clone(&source);
1475    let change_type_strat = (0..num_source_columns)
1476        .prop_perturb(move |num_columns, mut rng| {
1477            let mut set = BTreeSet::default();
1478            for _ in 0..num_columns {
1479                let col_idx = rng.random_range(0..num_source_columns);
1480                set.insert(source_.get_name(col_idx).clone());
1481            }
1482            set
1483        })
1484        .prop_flat_map(|cols| {
1485            proptest::collection::vec(any::<ColumnType>(), cols.len())
1486                .prop_map(move |types| (cols.clone(), types))
1487        })
1488        .prop_map(|(cols, types)| {
1489            cols.into_iter()
1490                .zip(types)
1491                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1492                .collect::<Vec<_>>()
1493        });
1494
1495    (
1496        add_columns_strat,
1497        drop_columns_strat,
1498        toggle_nullability_strat,
1499        change_type_strat,
1500    )
1501        .prop_map(|(adds, drops, toggles, changes)| {
1502            adds.into_iter()
1503                .chain(drops)
1504                .chain(toggles)
1505                .chain(changes)
1506                .collect::<Vec<_>>()
1507        })
1508        .prop_shuffle()
1509        .boxed()
1510}
1511
1512#[cfg(test)]
1513mod tests {
1514    use super::*;
1515    use prost::Message;
1516
1517    #[mz_ore::test]
1518    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1519    fn smoktest_at_version() {
1520        let desc = RelationDesc::builder()
1521            .with_column("a", ScalarType::Bool.nullable(true))
1522            .with_column("z", ScalarType::String.nullable(false))
1523            .finish();
1524
1525        let mut versioned_desc = VersionedRelationDesc {
1526            inner: desc.clone(),
1527        };
1528        versioned_desc.validate();
1529
1530        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1531        assert_eq!(desc, latest);
1532
1533        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1534        assert_eq!(desc, v0);
1535
1536        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1537        assert_eq!(desc, v3);
1538
1539        let v1 = versioned_desc.add_column("b", ScalarType::Bytes.nullable(false));
1540        assert_eq!(v1, RelationVersion(1));
1541
1542        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1543        insta::assert_json_snapshot!(v1.metadata, @r###"
1544        {
1545          "0": {
1546            "name": "a",
1547            "typ_idx": 0,
1548            "added": 0,
1549            "dropped": null
1550          },
1551          "1": {
1552            "name": "z",
1553            "typ_idx": 1,
1554            "added": 0,
1555            "dropped": null
1556          },
1557          "2": {
1558            "name": "b",
1559            "typ_idx": 2,
1560            "added": 1,
1561            "dropped": null
1562          }
1563        }
1564        "###);
1565
1566        // Check that V0 doesn't show the new column.
1567        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1568        assert!(v0.iter().eq(v0_b.iter()));
1569
1570        let v2 = versioned_desc.drop_column("z");
1571        assert_eq!(v2, RelationVersion(2));
1572
1573        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1574        insta::assert_json_snapshot!(v2.metadata, @r###"
1575        {
1576          "0": {
1577            "name": "a",
1578            "typ_idx": 0,
1579            "added": 0,
1580            "dropped": null
1581          },
1582          "2": {
1583            "name": "b",
1584            "typ_idx": 1,
1585            "added": 1,
1586            "dropped": null
1587          }
1588        }
1589        "###);
1590
1591        // Check that V0 and V1 are still correct.
1592        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1593        assert!(v0.iter().eq(v0_c.iter()));
1594
1595        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1596        assert!(v1.iter().eq(v1_b.iter()));
1597
1598        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1599        {
1600          "0": {
1601            "name": "a",
1602            "typ_idx": 0,
1603            "added": 0,
1604            "dropped": null
1605          },
1606          "1": {
1607            "name": "z",
1608            "typ_idx": 1,
1609            "added": 0,
1610            "dropped": 2
1611          },
1612          "2": {
1613            "name": "b",
1614            "typ_idx": 2,
1615            "added": 1,
1616            "dropped": null
1617          }
1618        }
1619        "###);
1620    }
1621
1622    #[mz_ore::test]
1623    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1624    fn test_dropping_columns_with_keys() {
1625        let desc = RelationDesc::builder()
1626            .with_column("a", ScalarType::Bool.nullable(true))
1627            .with_column("z", ScalarType::String.nullable(false))
1628            .with_key(vec![1])
1629            .finish();
1630
1631        let mut versioned_desc = VersionedRelationDesc {
1632            inner: desc.clone(),
1633        };
1634        versioned_desc.validate();
1635
1636        let v1 = versioned_desc.drop_column("a");
1637        assert_eq!(v1, RelationVersion(1));
1638
1639        // Make sure the key index for 'z' got remapped since 'a' was dropped.
1640        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1641        insta::assert_json_snapshot!(v1, @r###"
1642        {
1643          "typ": {
1644            "column_types": [
1645              {
1646                "scalar_type": "String",
1647                "nullable": false
1648              }
1649            ],
1650            "keys": [
1651              [
1652                0
1653              ]
1654            ]
1655          },
1656          "metadata": {
1657            "1": {
1658              "name": "z",
1659              "typ_idx": 0,
1660              "added": 0,
1661              "dropped": null
1662            }
1663          }
1664        }
1665        "###);
1666
1667        // Make sure the key index of 'z' is correct when all columns are present.
1668        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1669        insta::assert_json_snapshot!(v0, @r###"
1670        {
1671          "typ": {
1672            "column_types": [
1673              {
1674                "scalar_type": "Bool",
1675                "nullable": true
1676              },
1677              {
1678                "scalar_type": "String",
1679                "nullable": false
1680              }
1681            ],
1682            "keys": [
1683              [
1684                1
1685              ]
1686            ]
1687          },
1688          "metadata": {
1689            "0": {
1690              "name": "a",
1691              "typ_idx": 0,
1692              "added": 0,
1693              "dropped": 1
1694            },
1695            "1": {
1696              "name": "z",
1697              "typ_idx": 1,
1698              "added": 0,
1699              "dropped": null
1700            }
1701          }
1702        }
1703        "###);
1704    }
1705
1706    #[mz_ore::test]
1707    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1708    fn roundtrip_relation_desc_without_metadata() {
1709        let typ = ProtoRelationType {
1710            column_types: vec![
1711                ScalarType::String.nullable(false).into_proto(),
1712                ScalarType::Bool.nullable(true).into_proto(),
1713            ],
1714            keys: vec![],
1715        };
1716        let proto = ProtoRelationDesc {
1717            typ: Some(typ),
1718            names: vec![
1719                ColumnName("a".into()).into_proto(),
1720                ColumnName("b".into()).into_proto(),
1721            ],
1722            metadata: vec![],
1723        };
1724        let desc: RelationDesc = proto.into_rust().unwrap();
1725
1726        insta::assert_json_snapshot!(desc, @r###"
1727        {
1728          "typ": {
1729            "column_types": [
1730              {
1731                "scalar_type": "String",
1732                "nullable": false
1733              },
1734              {
1735                "scalar_type": "Bool",
1736                "nullable": true
1737              }
1738            ],
1739            "keys": []
1740          },
1741          "metadata": {
1742            "0": {
1743              "name": "a",
1744              "typ_idx": 0,
1745              "added": 0,
1746              "dropped": null
1747            },
1748            "1": {
1749              "name": "b",
1750              "typ_idx": 1,
1751              "added": 0,
1752              "dropped": null
1753            }
1754          }
1755        }
1756        "###);
1757    }
1758
1759    #[mz_ore::test]
1760    #[should_panic(expected = "column named 'a' already exists!")]
1761    fn test_add_column_with_same_name_panics() {
1762        let desc = RelationDesc::builder()
1763            .with_column("a", ScalarType::Bool.nullable(true))
1764            .finish();
1765        let mut versioned = VersionedRelationDesc::new(desc);
1766
1767        let _ = versioned.add_column("a", ScalarType::String.nullable(false));
1768    }
1769
1770    #[mz_ore::test]
1771    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1772    fn test_add_column_with_same_name_prev_dropped() {
1773        let desc = RelationDesc::builder()
1774            .with_column("a", ScalarType::Bool.nullable(true))
1775            .finish();
1776        let mut versioned = VersionedRelationDesc::new(desc);
1777
1778        let v1 = versioned.drop_column("a");
1779        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
1780        insta::assert_json_snapshot!(v1, @r###"
1781        {
1782          "typ": {
1783            "column_types": [],
1784            "keys": []
1785          },
1786          "metadata": {}
1787        }
1788        "###);
1789
1790        let v2 = versioned.add_column("a", ScalarType::String.nullable(false));
1791        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
1792        insta::assert_json_snapshot!(v2, @r###"
1793        {
1794          "typ": {
1795            "column_types": [
1796              {
1797                "scalar_type": "String",
1798                "nullable": false
1799              }
1800            ],
1801            "keys": []
1802          },
1803          "metadata": {
1804            "1": {
1805              "name": "a",
1806              "typ_idx": 0,
1807              "added": 2,
1808              "dropped": null
1809            }
1810          }
1811        }
1812        "###);
1813    }
1814
1815    #[mz_ore::test]
1816    #[cfg_attr(miri, ignore)]
1817    fn apply_demand() {
1818        let desc = RelationDesc::builder()
1819            .with_column("a", ScalarType::String.nullable(true))
1820            .with_column("b", ScalarType::Int64.nullable(false))
1821            .with_column("c", ScalarType::Time.nullable(false))
1822            .finish();
1823        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
1824        assert_eq!(desc.arity(), 2);
1825        // TODO(parkmycar): Move validate onto RelationDesc.
1826        VersionedRelationDesc::new(desc).validate();
1827    }
1828
1829    #[mz_ore::test]
1830    #[cfg_attr(miri, ignore)]
1831    fn smoketest_column_index_stable_ident() {
1832        let idx_a = ColumnIndex(42);
1833        // Note(parkmycar): This should never change.
1834        assert_eq!(idx_a.to_stable_name(), "42");
1835    }
1836
1837    #[mz_ore::test]
1838    #[cfg_attr(miri, ignore)] // too slow
1839    fn proptest_relation_desc_roundtrips() {
1840        fn testcase(og: RelationDesc) {
1841            let bytes = og.into_proto().encode_to_vec();
1842            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
1843            let rnd = RelationDesc::from_proto(proto).unwrap();
1844
1845            assert_eq!(og, rnd);
1846        }
1847
1848        proptest!(|(desc in any::<RelationDesc>())| {
1849            testcase(desc);
1850        });
1851
1852        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
1853            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
1854        });
1855
1856        proptest!(|((mut desc, diffs) in strat)| {
1857            for diff in diffs {
1858                diff.apply(&mut desc);
1859            };
1860            testcase(desc);
1861        });
1862    }
1863}