mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26
27use crate::relation_and_scalar::proto_relation_type::ProtoKey;
28pub use crate::relation_and_scalar::{
29    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
30    ProtoRelationVersion,
31};
32use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
33
34/// The type of a [`Datum`].
35///
36/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
37/// Int32 or String) with its nullability.
38///
39/// To construct a column type, either initialize the struct directly, or
40/// use the [`SqlScalarType::nullable`] method.
41#[derive(
42    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
43)]
44pub struct SqlColumnType {
45    /// The underlying scalar type (e.g., Int32 or String) of this column.
46    pub scalar_type: SqlScalarType,
47    /// Whether this datum can be null.
48    #[serde(default = "return_true")]
49    pub nullable: bool,
50}
51
52/// This method exists solely for the purpose of making SqlColumnType nullable by
53/// default in unit tests. The default value of a bool is false, and the only
54/// way to make an object take on any other value by default is to pass it a
55/// function that returns the desired default value. See
56/// <https://github.com/serde-rs/serde/issues/1030>
57#[inline(always)]
58fn return_true() -> bool {
59    true
60}
61
62impl SqlColumnType {
63    pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
64        match (&self.scalar_type, &other.scalar_type) {
65            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
66                Ok(SqlColumnType {
67                    scalar_type: scalar_type.clone(),
68                    nullable: self.nullable || other.nullable,
69                })
70            }
71            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
72                Ok(SqlColumnType {
73                    scalar_type: scalar_type.without_modifiers(),
74                    nullable: self.nullable || other.nullable,
75                })
76            }
77            (
78                SqlScalarType::Record { fields, custom_id },
79                SqlScalarType::Record {
80                    fields: other_fields,
81                    custom_id: other_custom_id,
82                },
83            ) => {
84                if custom_id != other_custom_id {
85                    bail!(
86                        "Can't union types: {:?} and {:?}",
87                        self.scalar_type,
88                        other.scalar_type
89                    );
90                };
91
92                let mut union_fields = Vec::with_capacity(fields.len());
93                for ((name, typ), (other_name, other_typ)) in
94                    fields.iter().zip_eq(other_fields.iter())
95                {
96                    if name != other_name {
97                        bail!(
98                            "Can't union types: {:?} and {:?}",
99                            self.scalar_type,
100                            other.scalar_type
101                        );
102                    } else {
103                        let union_column_type = typ.union(other_typ)?;
104                        union_fields.push((name.clone(), union_column_type));
105                    };
106                }
107
108                Ok(SqlColumnType {
109                    scalar_type: SqlScalarType::Record {
110                        fields: union_fields.into(),
111                        custom_id: *custom_id,
112                    },
113                    nullable: self.nullable || other.nullable,
114                })
115            }
116            _ => bail!(
117                "Can't union types: {:?} and {:?}",
118                self.scalar_type,
119                other.scalar_type
120            ),
121        }
122    }
123
124    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
125    /// nullability set to the specified boolean.
126    pub fn nullable(mut self, nullable: bool) -> Self {
127        self.nullable = nullable;
128        self
129    }
130}
131
132impl RustType<ProtoColumnType> for SqlColumnType {
133    fn into_proto(&self) -> ProtoColumnType {
134        ProtoColumnType {
135            nullable: self.nullable,
136            scalar_type: Some(self.scalar_type.into_proto()),
137        }
138    }
139
140    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
141        Ok(SqlColumnType {
142            nullable: proto.nullable,
143            scalar_type: proto
144                .scalar_type
145                .into_rust_if_some("ProtoColumnType::scalar_type")?,
146        })
147    }
148}
149
150impl fmt::Display for SqlColumnType {
151    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152        let nullable = if self.nullable { "Null" } else { "NotNull" };
153        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
154    }
155}
156
157/// The type of a relation.
158#[derive(
159    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
160)]
161pub struct SqlRelationType {
162    /// The type for each column, in order.
163    pub column_types: Vec<SqlColumnType>,
164    /// Sets of indices that are "keys" for the collection.
165    ///
166    /// Each element in this list is a set of column indices, each with the
167    /// property that the collection contains at most one record with each
168    /// distinct set of values for each column. Alternately, for a specific set
169    /// of values assigned to the these columns there is at most one record.
170    ///
171    /// A collection can contain multiple sets of keys, although it is common to
172    /// have either zero or one sets of key indices.
173    #[serde(default)]
174    pub keys: Vec<Vec<usize>>,
175}
176
177impl SqlRelationType {
178    /// Constructs a `SqlRelationType` representing the relation with no columns and
179    /// no keys.
180    pub fn empty() -> Self {
181        SqlRelationType::new(vec![])
182    }
183
184    /// Constructs a new `SqlRelationType` from specified column types.
185    ///
186    /// The `SqlRelationType` will have no keys.
187    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
188        SqlRelationType {
189            column_types,
190            keys: Vec::new(),
191        }
192    }
193
194    /// Adds a new key for the relation.
195    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
196        indices.sort_unstable();
197        if !self.keys.contains(&indices) {
198            self.keys.push(indices);
199        }
200        self
201    }
202
203    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
204        for key in keys {
205            self = self.with_key(key)
206        }
207        self
208    }
209
210    /// Computes the number of columns in the relation.
211    pub fn arity(&self) -> usize {
212        self.column_types.len()
213    }
214
215    /// Gets the index of the columns used when creating a default index.
216    pub fn default_key(&self) -> Vec<usize> {
217        if let Some(key) = self.keys.first() {
218            if key.is_empty() {
219                (0..self.column_types.len()).collect()
220            } else {
221                key.clone()
222            }
223        } else {
224            (0..self.column_types.len()).collect()
225        }
226    }
227
228    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
229    pub fn columns(&self) -> &[SqlColumnType] {
230        &self.column_types
231    }
232}
233
234impl RustType<ProtoRelationType> for SqlRelationType {
235    fn into_proto(&self) -> ProtoRelationType {
236        ProtoRelationType {
237            column_types: self.column_types.into_proto(),
238            keys: self.keys.into_proto(),
239        }
240    }
241
242    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
243        Ok(SqlRelationType {
244            column_types: proto.column_types.into_rust()?,
245            keys: proto.keys.into_rust()?,
246        })
247    }
248}
249
250impl RustType<ProtoKey> for Vec<usize> {
251    fn into_proto(&self) -> ProtoKey {
252        ProtoKey {
253            keys: self.into_proto(),
254        }
255    }
256
257    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
258        proto.keys.into_rust()
259    }
260}
261
262#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
263pub struct ReprColumnType {
264    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
265    pub scalar_type: ReprScalarType,
266    /// Whether this datum can be null.
267    #[serde(default = "return_true")]
268    pub nullable: bool,
269}
270
271impl From<&SqlColumnType> for ReprColumnType {
272    fn from(sql_column_type: &SqlColumnType) -> Self {
273        let scalar_type = &sql_column_type.scalar_type;
274        let scalar_type = scalar_type.into();
275        let nullable = sql_column_type.nullable;
276
277        ReprColumnType {
278            scalar_type,
279            nullable,
280        }
281    }
282}
283
284impl SqlColumnType {
285    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
286    ///
287    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
288    pub fn from_repr(repr: &ReprColumnType) -> Self {
289        let scalar_type = &repr.scalar_type;
290        let scalar_type = SqlScalarType::from_repr(scalar_type);
291        let nullable = repr.nullable;
292
293        SqlColumnType {
294            scalar_type,
295            nullable,
296        }
297    }
298}
299
300/// The name of a column in a [`RelationDesc`].
301#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
302pub struct ColumnName(Box<str>);
303
304impl ColumnName {
305    /// Returns this column name as a `str`.
306    #[inline(always)]
307    pub fn as_str(&self) -> &str {
308        &*self
309    }
310
311    /// Returns this column name as a `&mut Box<str>`.
312    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
313        &mut self.0
314    }
315
316    /// Returns if this [`ColumnName`] is similar to the provided one.
317    pub fn is_similar(&self, other: &ColumnName) -> bool {
318        const SIMILARITY_THRESHOLD: f64 = 0.6;
319
320        let a_lowercase = self.to_lowercase();
321        let b_lowercase = other.to_lowercase();
322
323        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
324    }
325}
326
327impl std::ops::Deref for ColumnName {
328    type Target = str;
329
330    #[inline(always)]
331    fn deref(&self) -> &Self::Target {
332        &self.0
333    }
334}
335
336impl fmt::Display for ColumnName {
337    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
338        f.write_str(&self.0)
339    }
340}
341
342impl From<String> for ColumnName {
343    fn from(s: String) -> ColumnName {
344        ColumnName(s.into())
345    }
346}
347
348impl From<&str> for ColumnName {
349    fn from(s: &str) -> ColumnName {
350        ColumnName(s.into())
351    }
352}
353
354impl From<&ColumnName> for ColumnName {
355    fn from(n: &ColumnName) -> ColumnName {
356        n.clone()
357    }
358}
359
360impl RustType<ProtoColumnName> for ColumnName {
361    fn into_proto(&self) -> ProtoColumnName {
362        ProtoColumnName {
363            value: Some(self.0.to_string()),
364        }
365    }
366
367    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
368        Ok(ColumnName(
369            proto
370                .value
371                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
372                .into(),
373        ))
374    }
375}
376
377impl From<ColumnName> for mz_sql_parser::ast::Ident {
378    fn from(value: ColumnName) -> Self {
379        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
380        mz_sql_parser::ast::Ident::new_unchecked(value.0)
381    }
382}
383
384impl proptest::arbitrary::Arbitrary for ColumnName {
385    type Parameters = ();
386    type Strategy = BoxedStrategy<ColumnName>;
387
388    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
389        // Long column names are generally uninteresting, and can greatly
390        // increase the runtime for a test case, so bound the max length.
391        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
392        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
393            weights.extend([
394                (5, Just(16..128)),
395                (1, Just(128..1024)),
396                (1, Just(1024..4096)),
397            ]);
398        }
399        let name_length = Union::new_weighted(weights);
400
401        // Non-ASCII characters are also generally uninteresting and can make
402        // debugging harder.
403        let char_strat = Rc::new(Union::new_weighted(vec![
404            (50, proptest::char::range('A', 'z').boxed()),
405            (1, any::<char>().boxed()),
406        ]));
407
408        name_length
409            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
410            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
411            .no_shrink()
412            .boxed()
413    }
414}
415
416/// Default name of a column (when no other information is known).
417pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
418
419/// Stable index of a column in a [`RelationDesc`].
420#[derive(
421    Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
422)]
423pub struct ColumnIndex(usize);
424
425static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
426
427impl ColumnIndex {
428    /// Returns a stable identifier for this [`ColumnIndex`].
429    pub fn to_stable_name(&self) -> String {
430        self.0.to_string()
431    }
432
433    pub fn to_raw(&self) -> usize {
434        self.0
435    }
436
437    pub fn from_raw(val: usize) -> Self {
438        ColumnIndex(val)
439    }
440}
441
442/// The version a given column was added at.
443#[derive(
444    Clone,
445    Copy,
446    Debug,
447    Eq,
448    PartialEq,
449    PartialOrd,
450    Ord,
451    Serialize,
452    Deserialize,
453    Hash,
454    MzReflect,
455    Arbitrary,
456)]
457pub struct RelationVersion(u64);
458
459impl RelationVersion {
460    /// Returns the "root" or "initial" version of a [`RelationDesc`].
461    pub fn root() -> Self {
462        RelationVersion(0)
463    }
464
465    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
466    pub fn bump(&self) -> Self {
467        let next_version = self
468            .0
469            .checked_add(1)
470            .expect("added more than u64::MAX columns?");
471        RelationVersion(next_version)
472    }
473
474    /// Consume a [`RelationVersion`] returning the raw value.
475    ///
476    /// Should __only__ be used for serialization.
477    pub fn into_raw(self) -> u64 {
478        self.0
479    }
480
481    /// Create a [`RelationVersion`] from a raw value.
482    ///
483    /// Should __only__ be used for serialization.
484    pub fn from_raw(val: u64) -> RelationVersion {
485        RelationVersion(val)
486    }
487}
488
489impl From<RelationVersion> for SchemaId {
490    fn from(value: RelationVersion) -> Self {
491        SchemaId(usize::cast_from(value.0))
492    }
493}
494
495impl From<mz_sql_parser::ast::Version> for RelationVersion {
496    fn from(value: mz_sql_parser::ast::Version) -> Self {
497        RelationVersion(value.into_inner())
498    }
499}
500
501impl fmt::Display for RelationVersion {
502    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
503        write!(f, "v{}", self.0)
504    }
505}
506
507impl From<RelationVersion> for mz_sql_parser::ast::Version {
508    fn from(value: RelationVersion) -> Self {
509        mz_sql_parser::ast::Version::new(value.0)
510    }
511}
512
513impl RustType<ProtoRelationVersion> for RelationVersion {
514    fn into_proto(&self) -> ProtoRelationVersion {
515        ProtoRelationVersion { value: self.0 }
516    }
517
518    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
519        Ok(RelationVersion(proto.value))
520    }
521}
522
523/// Metadata (other than type) for a column in a [`RelationDesc`].
524#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
525struct ColumnMetadata {
526    /// Name of the column.
527    name: ColumnName,
528    /// Index into a [`SqlRelationType`] for this column.
529    typ_idx: usize,
530    /// Version this column was added at.
531    added: RelationVersion,
532    /// Version this column was dropped at.
533    dropped: Option<RelationVersion>,
534}
535
536/// A description of the shape of a relation.
537///
538/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
539/// the relation.
540///
541/// # Examples
542///
543/// A `RelationDesc`s is typically constructed via its builder API:
544///
545/// ```
546/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
547///
548/// let desc = RelationDesc::builder()
549///     .with_column("id", SqlScalarType::Int64.nullable(false))
550///     .with_column("price", SqlScalarType::Float64.nullable(true))
551///     .finish();
552/// ```
553///
554/// In more complicated cases, like when constructing a `RelationDesc` in
555/// response to user input, it may be more convenient to construct a relation
556/// type first, and imbue it with column names to form a `RelationDesc` later:
557///
558/// ```
559/// use mz_repr::RelationDesc;
560///
561/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
562/// let relation_type = plan_query("SELECT * FROM table");
563/// let names = (0..relation_type.arity()).map(|i| match i {
564///     0 => "first",
565///     1 => "second",
566///     _ => "unknown",
567/// });
568/// let desc = RelationDesc::new(relation_type, names);
569/// ```
570///
571/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
572/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
573/// column throughout the lifetime of the relation. This allows a
574/// [`RelationDesc`] to represent a projection over a version of itself.
575///
576/// ```
577/// use std::collections::BTreeSet;
578/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
579///
580/// let desc = RelationDesc::builder()
581///     .with_column("name", SqlScalarType::String.nullable(false))
582///     .with_column("email", SqlScalarType::String.nullable(false))
583///     .finish();
584///
585/// // Project away the second column.
586/// let demands = BTreeSet::from([1]);
587/// let proj = desc.apply_demand(&demands);
588///
589/// // We projected away the first column.
590/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
591/// // But retained the second.
592/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
593///
594/// // The underlying `SqlRelationType` also contains a single column.
595/// assert_eq!(proj.typ().arity(), 1);
596/// ```
597///
598/// To maintain this stable mapping and track the lifetime of a column (e.g.
599/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
600/// the index in [`SqlRelationType`] that corresponds to a given column, and the
601/// version at which this column was added or dropped.
602///
603#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
604pub struct RelationDesc {
605    typ: SqlRelationType,
606    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
607}
608
609impl RustType<ProtoRelationDesc> for RelationDesc {
610    fn into_proto(&self) -> ProtoRelationDesc {
611        let (names, metadata): (Vec<_>, Vec<_>) = self
612            .metadata
613            .values()
614            .map(|meta| {
615                let metadata = ProtoColumnMetadata {
616                    added: Some(meta.added.into_proto()),
617                    dropped: meta.dropped.map(|v| v.into_proto()),
618                };
619                (meta.name.into_proto(), metadata)
620            })
621            .unzip();
622
623        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
624        // metadata field was added. To make sure our serialization roundtrips the same as before
625        // we added the field, we omit `metadata` if all of the values are equal to the default.
626        //
627        // Note: This logic needs to exist approximately forever.
628        let is_all_default_metadata = metadata.iter().all(|meta| {
629            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
630        });
631        let metadata = if is_all_default_metadata {
632            Vec::new()
633        } else {
634            metadata
635        };
636
637        ProtoRelationDesc {
638            typ: Some(self.typ.into_proto()),
639            names,
640            metadata,
641        }
642    }
643
644    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
645        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
646        // metadata field was added. If the field doesn't exist we fill it in with default values,
647        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
648        //
649        // Note: This logic needs to exist approximately forever.
650        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
651            let val = ProtoColumnMetadata {
652                added: Some(RelationVersion::root().into_proto()),
653                dropped: None,
654            };
655            Box::new(itertools::repeat_n(val, proto.names.len()))
656        } else {
657            Box::new(proto.metadata.into_iter())
658        };
659
660        let metadata = proto
661            .names
662            .into_iter()
663            .zip_eq(proto_metadata)
664            .enumerate()
665            .map(|(idx, (name, metadata))| {
666                let meta = ColumnMetadata {
667                    name: name.into_rust()?,
668                    typ_idx: idx,
669                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
670                    dropped: metadata.dropped.into_rust()?,
671                };
672                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
673            })
674            .collect::<Result<_, _>>()?;
675
676        Ok(RelationDesc {
677            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
678            metadata,
679        })
680    }
681}
682
683impl RelationDesc {
684    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
685    pub fn builder() -> RelationDescBuilder {
686        RelationDescBuilder::default()
687    }
688
689    /// Constructs a new `RelationDesc` that represents the empty relation
690    /// with no columns and no keys.
691    pub fn empty() -> Self {
692        RelationDesc {
693            typ: SqlRelationType::empty(),
694            metadata: BTreeMap::default(),
695        }
696    }
697
698    /// Check if the `RelationDesc` is empty.
699    pub fn is_empty(&self) -> bool {
700        self == &Self::empty()
701    }
702
703    /// Returns the number of columns in this [`RelationDesc`].
704    pub fn len(&self) -> usize {
705        self.typ().column_types.len()
706    }
707
708    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
709    /// over column names.
710    ///
711    /// # Panics
712    ///
713    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
714    /// items in `names`.
715    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
716    where
717        I: IntoIterator<Item = N>,
718        N: Into<ColumnName>,
719    {
720        let metadata: BTreeMap<_, _> = names
721            .into_iter()
722            .enumerate()
723            .map(|(idx, name)| {
724                let col_idx = ColumnIndex(idx);
725                let metadata = ColumnMetadata {
726                    name: name.into(),
727                    typ_idx: idx,
728                    added: RelationVersion::root(),
729                    dropped: None,
730                };
731                (col_idx, metadata)
732            })
733            .collect();
734
735        // TODO(parkmycar): Add better validation here.
736        assert_eq!(typ.column_types.len(), metadata.len());
737
738        RelationDesc { typ, metadata }
739    }
740
741    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
742    where
743        I: IntoIterator<Item = (N, T)>,
744        T: Into<SqlColumnType>,
745        N: Into<ColumnName>,
746    {
747        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
748        let types = types.into_iter().map(Into::into).collect();
749        let typ = SqlRelationType::new(types);
750        Self::new(typ, names)
751    }
752
753    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
754    ///
755    /// # Panics
756    ///
757    /// Panics if either `self` or `other` have columns that were added at a
758    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
759    /// columns were dropped.
760    ///
761    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
762    pub fn concat(mut self, other: Self) -> Self {
763        let self_len = self.typ.column_types.len();
764
765        for (typ, (_col_idx, meta)) in other
766            .typ
767            .column_types
768            .into_iter()
769            .zip_eq(other.metadata.into_iter())
770        {
771            assert_eq!(meta.added, RelationVersion::root());
772            assert_none!(meta.dropped);
773
774            let new_idx = self.typ.columns().len();
775            let new_meta = ColumnMetadata {
776                name: meta.name,
777                typ_idx: new_idx,
778                added: RelationVersion::root(),
779                dropped: None,
780            };
781
782            self.typ.column_types.push(typ);
783            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
784
785            assert_eq!(self.metadata.len(), self.typ.columns().len());
786            assert_none!(prev);
787        }
788
789        for k in other.typ.keys {
790            let k = k.into_iter().map(|idx| idx + self_len).collect();
791            self = self.with_key(k);
792        }
793        self
794    }
795
796    /// Adds a new key for the relation.
797    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
798        self.typ = self.typ.with_key(indices);
799        self
800    }
801
802    /// Drops all existing keys.
803    pub fn without_keys(mut self) -> Self {
804        self.typ.keys.clear();
805        self
806    }
807
808    /// Builds a new relation description with the column names replaced with
809    /// new names.
810    ///
811    /// # Panics
812    ///
813    /// Panics if the arity of the relation type does not match the number of
814    /// items in `names`.
815    pub fn with_names<I, N>(self, names: I) -> Self
816    where
817        I: IntoIterator<Item = N>,
818        N: Into<ColumnName>,
819    {
820        Self::new(self.typ, names)
821    }
822
823    /// Computes the number of columns in the relation.
824    pub fn arity(&self) -> usize {
825        self.typ.arity()
826    }
827
828    /// Returns the relation type underlying this relation description.
829    pub fn typ(&self) -> &SqlRelationType {
830        &self.typ
831    }
832
833    /// Returns an iterator over the columns in this relation.
834    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
835        self.metadata.values().map(|meta| {
836            let typ = &self.typ.columns()[meta.typ_idx];
837            (&meta.name, typ)
838        })
839    }
840
841    /// Returns an iterator over the types of the columns in this relation.
842    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
843        self.typ.column_types.iter()
844    }
845
846    /// Returns an iterator over the names of the columns in this relation.
847    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
848        self.metadata.values().map(|meta| &meta.name)
849    }
850
851    /// Returns an iterator over the columns in this relation, with all their metadata.
852    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
853        self.metadata.iter().map(|(col_idx, metadata)| {
854            let col_typ = &self.typ.columns()[metadata.typ_idx];
855            (col_idx, &metadata.name, col_typ)
856        })
857    }
858
859    /// Returns an iterator over the names of the columns in this relation that are "similar" to
860    /// the provided `name`.
861    pub fn iter_similar_names<'a>(
862        &'a self,
863        name: &'a ColumnName,
864    ) -> impl Iterator<Item = &'a ColumnName> {
865        self.iter_names().filter(|n| n.is_similar(name))
866    }
867
868    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
869    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
870        self.metadata.contains_key(idx)
871    }
872
873    /// Finds a column by name.
874    ///
875    /// Returns the index and type of the column named `name`. If no column with
876    /// the specified name exists, returns `None`. If multiple columns have the
877    /// specified name, the leftmost column is returned.
878    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
879        self.iter_names()
880            .position(|n| n == name)
881            .map(|i| (i, &self.typ.column_types[i]))
882    }
883
884    /// Gets the name of the `i`th column.
885    ///
886    /// # Panics
887    ///
888    /// Panics if `i` is not a valid column index.
889    ///
890    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
891    pub fn get_name(&self, i: usize) -> &ColumnName {
892        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
893        self.get_name_idx(&ColumnIndex(i))
894    }
895
896    /// Gets the name of the column at `idx`.
897    ///
898    /// # Panics
899    ///
900    /// Panics if no column exists at `idx`.
901    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
902        &self.metadata.get(idx).expect("should exist").name
903    }
904
905    /// Mutably gets the name of the `i`th column.
906    ///
907    /// # Panics
908    ///
909    /// Panics if `i` is not a valid column index.
910    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
911        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
912        &mut self
913            .metadata
914            .get_mut(&ColumnIndex(i))
915            .expect("should exist")
916            .name
917    }
918
919    /// Gets the [`SqlColumnType`] of the column at `idx`.
920    ///
921    /// # Panics
922    ///
923    /// Panics if no column exists at `idx`.
924    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
925        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
926        &self.typ.column_types[typ_idx]
927    }
928
929    /// Gets the name of the `i`th column if that column name is unambiguous.
930    ///
931    /// If at least one other column has the same name as the `i`th column,
932    /// returns `None`. If the `i`th column has no name, returns `None`.
933    ///
934    /// # Panics
935    ///
936    /// Panics if `i` is not a valid column index.
937    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
938        let name = self.get_name(i);
939        if self.iter_names().filter(|n| *n == name).count() == 1 {
940            Some(name)
941        } else {
942            None
943        }
944    }
945
946    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
947    ///
948    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
949    /// structure will be simple to extend.
950    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
951        let name = self.get_name(i);
952        let typ = &self.typ.column_types[i];
953        if d == &Datum::Null && !typ.nullable {
954            Err(NotNullViolation(name.clone()))
955        } else {
956            Ok(())
957        }
958    }
959
960    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
961    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
962        let mut new_desc = self.clone();
963
964        // Update ColumnMetadata.
965        let mut removed = 0;
966        new_desc.metadata.retain(|idx, metadata| {
967            let retain = demands.contains(&idx.0);
968            if !retain {
969                removed += 1;
970            } else {
971                metadata.typ_idx -= removed;
972            }
973            retain
974        });
975
976        // Update SqlColumnType.
977        let mut idx = 0;
978        new_desc.typ.column_types.retain(|_| {
979            let keep = demands.contains(&idx);
980            idx += 1;
981            keep
982        });
983
984        new_desc
985    }
986}
987
988impl Arbitrary for RelationDesc {
989    type Parameters = ();
990    type Strategy = BoxedStrategy<RelationDesc>;
991
992    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
993        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
994        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
995            weights.extend([
996                (12, Just(16..32)),
997                (6, Just(32..64)),
998                (3, Just(64..128)),
999                (1, Just(128..256)),
1000            ]);
1001        }
1002        let num_columns = Union::new_weighted(weights);
1003
1004        num_columns.prop_flat_map(arb_relation_desc).boxed()
1005    }
1006}
1007
1008/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1009/// within the range provided.
1010pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1011    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1012        .prop_map(RelationDesc::from_names_and_types)
1013}
1014
1015/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1016pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1017    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1018    mask.prop_map(move |mask| {
1019        let demands: BTreeSet<_> = mask
1020            .into_iter()
1021            .enumerate()
1022            .filter_map(|(idx, keep)| keep.then_some(idx))
1023            .collect();
1024        desc.apply_demand(&demands)
1025    })
1026}
1027
1028impl IntoIterator for RelationDesc {
1029    type Item = (ColumnName, SqlColumnType);
1030    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1031
1032    fn into_iter(self) -> Self::IntoIter {
1033        let iter = self
1034            .metadata
1035            .into_values()
1036            .zip_eq(self.typ.column_types)
1037            .map(|(meta, typ)| (meta.name, typ));
1038        Box::new(iter)
1039    }
1040}
1041
1042/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1043pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1044    let datums: Vec<_> = desc
1045        .typ()
1046        .columns()
1047        .iter()
1048        .cloned()
1049        .map(arb_datum_for_column)
1050        .collect();
1051    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1052}
1053
1054/// Expression violated not-null constraint on named column
1055#[derive(Debug, PartialEq, Eq)]
1056pub struct NotNullViolation(pub ColumnName);
1057
1058impl fmt::Display for NotNullViolation {
1059    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1060        write!(
1061            f,
1062            "null value in column {} violates not-null constraint",
1063            self.0.quoted()
1064        )
1065    }
1066}
1067
1068/// A builder for a [`RelationDesc`].
1069#[derive(Clone, Default, Debug, PartialEq, Eq)]
1070pub struct RelationDescBuilder {
1071    /// Columns of the relation.
1072    columns: Vec<(ColumnName, SqlColumnType)>,
1073    /// Sets of indices that are "keys" for the collection.
1074    keys: Vec<Vec<usize>>,
1075}
1076
1077impl RelationDescBuilder {
1078    /// Appends a column with the specified name and type.
1079    pub fn with_column<N: Into<ColumnName>>(
1080        mut self,
1081        name: N,
1082        ty: SqlColumnType,
1083    ) -> RelationDescBuilder {
1084        let name = name.into();
1085        self.columns.push((name, ty));
1086        self
1087    }
1088
1089    /// Appends the provided columns to the builder.
1090    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1091    where
1092        I: IntoIterator<Item = (N, T)>,
1093        T: Into<SqlColumnType>,
1094        N: Into<ColumnName>,
1095    {
1096        self.columns
1097            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1098        self
1099    }
1100
1101    /// Adds a new key for the relation.
1102    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1103        indices.sort_unstable();
1104        if !self.keys.contains(&indices) {
1105            self.keys.push(indices);
1106        }
1107        self
1108    }
1109
1110    /// Removes all previously inserted keys.
1111    pub fn without_keys(mut self) -> RelationDescBuilder {
1112        self.keys.clear();
1113        assert_eq!(self.keys.len(), 0);
1114        self
1115    }
1116
1117    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1118    pub fn concat(mut self, other: Self) -> Self {
1119        let self_len = self.columns.len();
1120
1121        self.columns.extend(other.columns);
1122        for k in other.keys {
1123            let k = k.into_iter().map(|idx| idx + self_len).collect();
1124            self = self.with_key(k);
1125        }
1126
1127        self
1128    }
1129
1130    /// Finish the builder, returning a [`RelationDesc`].
1131    pub fn finish(self) -> RelationDesc {
1132        let mut desc = RelationDesc::from_names_and_types(self.columns);
1133        desc.typ = desc.typ.with_keys(self.keys);
1134        desc
1135    }
1136}
1137
1138/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1139#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1140pub enum RelationVersionSelector {
1141    Specific(RelationVersion),
1142    Latest,
1143}
1144
1145impl RelationVersionSelector {
1146    pub fn specific(version: u64) -> Self {
1147        RelationVersionSelector::Specific(RelationVersion(version))
1148    }
1149}
1150
1151/// A wrapper around [`RelationDesc`] that provides an interface for adding
1152/// columns and generating new versions.
1153///
1154/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1155/// be great.
1156#[derive(Debug, Clone, Serialize)]
1157pub struct VersionedRelationDesc {
1158    inner: RelationDesc,
1159}
1160
1161impl VersionedRelationDesc {
1162    pub fn new(inner: RelationDesc) -> Self {
1163        VersionedRelationDesc { inner }
1164    }
1165
1166    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1167    ///
1168    /// # Panics
1169    ///
1170    /// * Panics if a column with `name` already exists that hasn't been dropped.
1171    ///
1172    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1173    #[must_use]
1174    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1175    where
1176        N: Into<ColumnName>,
1177        T: Into<SqlColumnType>,
1178    {
1179        let latest_version = self.latest_version();
1180        let new_version = latest_version.bump();
1181
1182        let name = name.into();
1183        let existing = self
1184            .inner
1185            .metadata
1186            .iter()
1187            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1188        if let Some(existing) = existing {
1189            panic!("column named '{name}' already exists! {existing:?}");
1190        }
1191
1192        let next_idx = self.inner.metadata.len();
1193        let col_meta = ColumnMetadata {
1194            name,
1195            typ_idx: next_idx,
1196            added: new_version,
1197            dropped: None,
1198        };
1199
1200        self.inner.typ.column_types.push(typ.into());
1201        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1202
1203        assert_none!(prev, "column index overlap!");
1204        self.validate();
1205
1206        new_version
1207    }
1208
1209    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1210    /// `name` drops the left-most one that hasn't already been dropped.
1211    ///
1212    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1213    ///
1214    /// # Panics
1215    ///
1216    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1217    #[must_use]
1218    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1219    where
1220        N: Into<ColumnName>,
1221    {
1222        let name = name.into();
1223        let latest_version = self.latest_version();
1224        let new_version = latest_version.bump();
1225
1226        let col = self
1227            .inner
1228            .metadata
1229            .values_mut()
1230            .find(|meta| meta.name == name && meta.dropped.is_none())
1231            .expect("column to exist");
1232
1233        // Make sure the column hadn't been previously dropped.
1234        assert_none!(col.dropped, "column was already dropped");
1235        col.dropped = Some(new_version);
1236
1237        // Make sure the column isn't being used as a key.
1238        let dropped_key = self
1239            .inner
1240            .typ
1241            .keys
1242            .iter()
1243            .any(|keys| keys.contains(&col.typ_idx));
1244        assert!(!dropped_key, "column being dropped was used as a key");
1245
1246        self.validate();
1247        new_version
1248    }
1249
1250    /// Returns the [`RelationDesc`] at the latest version.
1251    pub fn latest(&self) -> RelationDesc {
1252        self.inner.clone()
1253    }
1254
1255    /// Returns this [`RelationDesc`] at the specified version.
1256    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1257        // Get all of the changes from the start, up to whatever version was requested.
1258        //
1259        // TODO(parkmycar): We should probably panic on unknown verisons?
1260        let up_to_version = match version {
1261            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1262            RelationVersionSelector::Specific(v) => v,
1263        };
1264
1265        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1266            let added = meta.added <= up_to_version;
1267            let dropped = meta
1268                .dropped
1269                .map(|dropped_at| up_to_version >= dropped_at)
1270                .unwrap_or(false);
1271
1272            added && !dropped
1273        });
1274
1275        let mut column_types = Vec::new();
1276        let mut column_metas = BTreeMap::new();
1277
1278        // N.B. At this point we need to be careful because col_idx might not
1279        // equal typ_idx.
1280        //
1281        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1282        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1283        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1284        for (col_idx, meta) in valid_columns {
1285            let new_meta = ColumnMetadata {
1286                name: meta.name.clone(),
1287                typ_idx: column_types.len(),
1288                added: meta.added.clone(),
1289                dropped: meta.dropped.clone(),
1290            };
1291            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1292            column_metas.insert(*col_idx, new_meta);
1293        }
1294
1295        // Remap keys in case a column with an index less than that of a key was
1296        // dropped.
1297        //
1298        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1299        // keys and "b" was dropped.
1300        let keys = self
1301            .inner
1302            .typ
1303            .keys
1304            .iter()
1305            .map(|keys| {
1306                keys.iter()
1307                    .map(|key_idx| {
1308                        let metadata = column_metas
1309                            .get(&ColumnIndex(*key_idx))
1310                            .expect("found key for column that doesn't exist");
1311                        metadata.typ_idx
1312                    })
1313                    .collect()
1314            })
1315            .collect();
1316
1317        let relation_type = SqlRelationType { column_types, keys };
1318
1319        RelationDesc {
1320            typ: relation_type,
1321            metadata: column_metas,
1322        }
1323    }
1324
1325    pub fn latest_version(&self) -> RelationVersion {
1326        self.inner
1327            .metadata
1328            .values()
1329            // N.B. Dropped is always greater than added.
1330            .map(|meta| meta.dropped.unwrap_or(meta.added))
1331            .max()
1332            // If there aren't any columns we're implicitly the root version.
1333            .unwrap_or_else(RelationVersion::root)
1334    }
1335
1336    /// Validates internal contraints of the [`RelationDesc`] are correct.
1337    ///
1338    /// # Panics
1339    ///
1340    /// Panics if a constraint is not satisfied.
1341    fn validate(&self) {
1342        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1343            if desc.typ.column_types.len() != desc.metadata.len() {
1344                anyhow::bail!("mismatch between number of types and metadatas");
1345            }
1346
1347            for (col_idx, meta) in &desc.metadata {
1348                if col_idx.0 > desc.metadata.len() {
1349                    anyhow::bail!("column index out of bounds");
1350                }
1351                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1352                    anyhow::bail!("column was added after it was dropped?");
1353                }
1354                if desc.typ().columns().get(meta.typ_idx).is_none() {
1355                    anyhow::bail!("typ_idx incorrect");
1356                }
1357            }
1358
1359            for keys in &desc.typ.keys {
1360                for key in keys {
1361                    if *key >= desc.typ.column_types.len() {
1362                        anyhow::bail!("key index was out of bounds!");
1363                    }
1364                }
1365            }
1366
1367            let versions = desc
1368                .metadata
1369                .values()
1370                .map(|meta| meta.dropped.unwrap_or(meta.added));
1371            let mut max = 0;
1372            let mut sum = 0;
1373            for version in versions {
1374                max = std::cmp::max(max, version.0);
1375                sum += version.0;
1376            }
1377
1378            // Other than RelationVersion(0), we should never have duplicate
1379            // versions and they should always increase by 1. In other words, the
1380            // sum of all RelationVersions should be the sum of [0, max].
1381            //
1382            // N.B. n * (n + 1) / 2 = sum of [0, n]
1383            //
1384            // While I normally don't like tricks like this, it allows us to
1385            // validate that our column versions are correct in O(n) time and
1386            // without allocations.
1387            if sum != (max * (max + 1) / 2) {
1388                anyhow::bail!("there is a duplicate or missing relation version");
1389            }
1390
1391            Ok(())
1392        }
1393
1394        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1395    }
1396}
1397
1398/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1399/// exercise schema migrations.
1400#[derive(Debug)]
1401pub enum PropRelationDescDiff {
1402    AddColumn {
1403        name: ColumnName,
1404        typ: SqlColumnType,
1405    },
1406    DropColumn {
1407        name: ColumnName,
1408    },
1409    ToggleNullability {
1410        name: ColumnName,
1411    },
1412    ChangeType {
1413        name: ColumnName,
1414        typ: SqlColumnType,
1415    },
1416}
1417
1418impl PropRelationDescDiff {
1419    pub fn apply(self, desc: &mut RelationDesc) {
1420        match self {
1421            PropRelationDescDiff::AddColumn { name, typ } => {
1422                let new_idx = desc.metadata.len();
1423                let meta = ColumnMetadata {
1424                    name,
1425                    typ_idx: new_idx,
1426                    added: RelationVersion(0),
1427                    dropped: None,
1428                };
1429                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1430                desc.typ.column_types.push(typ);
1431
1432                assert_none!(prev);
1433                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1434            }
1435            PropRelationDescDiff::DropColumn { name } => {
1436                let next_version = desc
1437                    .metadata
1438                    .values()
1439                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1440                    .max()
1441                    .unwrap_or_else(RelationVersion::root)
1442                    .bump();
1443                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1444                else {
1445                    return;
1446                };
1447                if metadata.dropped.is_none() {
1448                    metadata.dropped = Some(next_version);
1449                }
1450            }
1451            PropRelationDescDiff::ToggleNullability { name } => {
1452                let Some((pos, _)) = desc.get_by_name(&name) else {
1453                    return;
1454                };
1455                let col_type = desc
1456                    .typ
1457                    .column_types
1458                    .get_mut(pos)
1459                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1460                col_type.nullable = !col_type.nullable;
1461            }
1462            PropRelationDescDiff::ChangeType { name, typ } => {
1463                let Some((pos, _)) = desc.get_by_name(&name) else {
1464                    return;
1465                };
1466                let col_type = desc
1467                    .typ
1468                    .column_types
1469                    .get_mut(pos)
1470                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1471                *col_type = typ;
1472            }
1473        }
1474    }
1475}
1476
1477/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1478pub fn arb_relation_desc_diff(
1479    source: &RelationDesc,
1480) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1481    let source = Rc::new(source.clone());
1482    let num_source_columns = source.typ.columns().len();
1483
1484    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1485    let add_columns_strat = num_add_columns
1486        .prop_flat_map(|num_columns| {
1487            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1488        })
1489        .prop_map(|cols| {
1490            cols.into_iter()
1491                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1492                .collect::<Vec<_>>()
1493        });
1494
1495    // If the source RelationDesc is empty there is nothing else to do.
1496    if num_source_columns == 0 {
1497        return add_columns_strat.boxed();
1498    }
1499
1500    let source_ = Rc::clone(&source);
1501    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1502        let mut set = BTreeSet::default();
1503        for _ in 0..num_columns {
1504            let col_idx = rng.random_range(0..num_source_columns);
1505            set.insert(source_.get_name(col_idx).clone());
1506        }
1507        set.into_iter()
1508            .map(|name| PropRelationDescDiff::DropColumn { name })
1509            .collect::<Vec<_>>()
1510    });
1511
1512    let source_ = Rc::clone(&source);
1513    let toggle_nullability_strat =
1514        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1515            let mut set = BTreeSet::default();
1516            for _ in 0..num_columns {
1517                let col_idx = rng.random_range(0..num_source_columns);
1518                set.insert(source_.get_name(col_idx).clone());
1519            }
1520            set.into_iter()
1521                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1522                .collect::<Vec<_>>()
1523        });
1524
1525    let source_ = Rc::clone(&source);
1526    let change_type_strat = (0..num_source_columns)
1527        .prop_perturb(move |num_columns, mut rng| {
1528            let mut set = BTreeSet::default();
1529            for _ in 0..num_columns {
1530                let col_idx = rng.random_range(0..num_source_columns);
1531                set.insert(source_.get_name(col_idx).clone());
1532            }
1533            set
1534        })
1535        .prop_flat_map(|cols| {
1536            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1537                .prop_map(move |types| (cols.clone(), types))
1538        })
1539        .prop_map(|(cols, types)| {
1540            cols.into_iter()
1541                .zip_eq(types)
1542                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1543                .collect::<Vec<_>>()
1544        });
1545
1546    (
1547        add_columns_strat,
1548        drop_columns_strat,
1549        toggle_nullability_strat,
1550        change_type_strat,
1551    )
1552        .prop_map(|(adds, drops, toggles, changes)| {
1553            adds.into_iter()
1554                .chain(drops)
1555                .chain(toggles)
1556                .chain(changes)
1557                .collect::<Vec<_>>()
1558        })
1559        .prop_shuffle()
1560        .boxed()
1561}
1562
1563#[cfg(test)]
1564mod tests {
1565    use super::*;
1566    use prost::Message;
1567
1568    #[mz_ore::test]
1569    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1570    fn smoktest_at_version() {
1571        let desc = RelationDesc::builder()
1572            .with_column("a", SqlScalarType::Bool.nullable(true))
1573            .with_column("z", SqlScalarType::String.nullable(false))
1574            .finish();
1575
1576        let mut versioned_desc = VersionedRelationDesc {
1577            inner: desc.clone(),
1578        };
1579        versioned_desc.validate();
1580
1581        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1582        assert_eq!(desc, latest);
1583
1584        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1585        assert_eq!(desc, v0);
1586
1587        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1588        assert_eq!(desc, v3);
1589
1590        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
1591        assert_eq!(v1, RelationVersion(1));
1592
1593        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1594        insta::assert_json_snapshot!(v1.metadata, @r###"
1595        {
1596          "0": {
1597            "name": "a",
1598            "typ_idx": 0,
1599            "added": 0,
1600            "dropped": null
1601          },
1602          "1": {
1603            "name": "z",
1604            "typ_idx": 1,
1605            "added": 0,
1606            "dropped": null
1607          },
1608          "2": {
1609            "name": "b",
1610            "typ_idx": 2,
1611            "added": 1,
1612            "dropped": null
1613          }
1614        }
1615        "###);
1616
1617        // Check that V0 doesn't show the new column.
1618        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1619        assert!(v0.iter().eq(v0_b.iter()));
1620
1621        let v2 = versioned_desc.drop_column("z");
1622        assert_eq!(v2, RelationVersion(2));
1623
1624        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1625        insta::assert_json_snapshot!(v2.metadata, @r###"
1626        {
1627          "0": {
1628            "name": "a",
1629            "typ_idx": 0,
1630            "added": 0,
1631            "dropped": null
1632          },
1633          "2": {
1634            "name": "b",
1635            "typ_idx": 1,
1636            "added": 1,
1637            "dropped": null
1638          }
1639        }
1640        "###);
1641
1642        // Check that V0 and V1 are still correct.
1643        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1644        assert!(v0.iter().eq(v0_c.iter()));
1645
1646        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1647        assert!(v1.iter().eq(v1_b.iter()));
1648
1649        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1650        {
1651          "0": {
1652            "name": "a",
1653            "typ_idx": 0,
1654            "added": 0,
1655            "dropped": null
1656          },
1657          "1": {
1658            "name": "z",
1659            "typ_idx": 1,
1660            "added": 0,
1661            "dropped": 2
1662          },
1663          "2": {
1664            "name": "b",
1665            "typ_idx": 2,
1666            "added": 1,
1667            "dropped": null
1668          }
1669        }
1670        "###);
1671    }
1672
1673    #[mz_ore::test]
1674    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1675    fn test_dropping_columns_with_keys() {
1676        let desc = RelationDesc::builder()
1677            .with_column("a", SqlScalarType::Bool.nullable(true))
1678            .with_column("z", SqlScalarType::String.nullable(false))
1679            .with_key(vec![1])
1680            .finish();
1681
1682        let mut versioned_desc = VersionedRelationDesc {
1683            inner: desc.clone(),
1684        };
1685        versioned_desc.validate();
1686
1687        let v1 = versioned_desc.drop_column("a");
1688        assert_eq!(v1, RelationVersion(1));
1689
1690        // Make sure the key index for 'z' got remapped since 'a' was dropped.
1691        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1692        insta::assert_json_snapshot!(v1, @r###"
1693        {
1694          "typ": {
1695            "column_types": [
1696              {
1697                "scalar_type": "String",
1698                "nullable": false
1699              }
1700            ],
1701            "keys": [
1702              [
1703                0
1704              ]
1705            ]
1706          },
1707          "metadata": {
1708            "1": {
1709              "name": "z",
1710              "typ_idx": 0,
1711              "added": 0,
1712              "dropped": null
1713            }
1714          }
1715        }
1716        "###);
1717
1718        // Make sure the key index of 'z' is correct when all columns are present.
1719        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1720        insta::assert_json_snapshot!(v0, @r###"
1721        {
1722          "typ": {
1723            "column_types": [
1724              {
1725                "scalar_type": "Bool",
1726                "nullable": true
1727              },
1728              {
1729                "scalar_type": "String",
1730                "nullable": false
1731              }
1732            ],
1733            "keys": [
1734              [
1735                1
1736              ]
1737            ]
1738          },
1739          "metadata": {
1740            "0": {
1741              "name": "a",
1742              "typ_idx": 0,
1743              "added": 0,
1744              "dropped": 1
1745            },
1746            "1": {
1747              "name": "z",
1748              "typ_idx": 1,
1749              "added": 0,
1750              "dropped": null
1751            }
1752          }
1753        }
1754        "###);
1755    }
1756
1757    #[mz_ore::test]
1758    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1759    fn roundtrip_relation_desc_without_metadata() {
1760        let typ = ProtoRelationType {
1761            column_types: vec![
1762                SqlScalarType::String.nullable(false).into_proto(),
1763                SqlScalarType::Bool.nullable(true).into_proto(),
1764            ],
1765            keys: vec![],
1766        };
1767        let proto = ProtoRelationDesc {
1768            typ: Some(typ),
1769            names: vec![
1770                ColumnName("a".into()).into_proto(),
1771                ColumnName("b".into()).into_proto(),
1772            ],
1773            metadata: vec![],
1774        };
1775        let desc: RelationDesc = proto.into_rust().unwrap();
1776
1777        insta::assert_json_snapshot!(desc, @r###"
1778        {
1779          "typ": {
1780            "column_types": [
1781              {
1782                "scalar_type": "String",
1783                "nullable": false
1784              },
1785              {
1786                "scalar_type": "Bool",
1787                "nullable": true
1788              }
1789            ],
1790            "keys": []
1791          },
1792          "metadata": {
1793            "0": {
1794              "name": "a",
1795              "typ_idx": 0,
1796              "added": 0,
1797              "dropped": null
1798            },
1799            "1": {
1800              "name": "b",
1801              "typ_idx": 1,
1802              "added": 0,
1803              "dropped": null
1804            }
1805          }
1806        }
1807        "###);
1808    }
1809
1810    #[mz_ore::test]
1811    #[should_panic(expected = "column named 'a' already exists!")]
1812    fn test_add_column_with_same_name_panics() {
1813        let desc = RelationDesc::builder()
1814            .with_column("a", SqlScalarType::Bool.nullable(true))
1815            .finish();
1816        let mut versioned = VersionedRelationDesc::new(desc);
1817
1818        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
1819    }
1820
1821    #[mz_ore::test]
1822    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1823    fn test_add_column_with_same_name_prev_dropped() {
1824        let desc = RelationDesc::builder()
1825            .with_column("a", SqlScalarType::Bool.nullable(true))
1826            .finish();
1827        let mut versioned = VersionedRelationDesc::new(desc);
1828
1829        let v1 = versioned.drop_column("a");
1830        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
1831        insta::assert_json_snapshot!(v1, @r###"
1832        {
1833          "typ": {
1834            "column_types": [],
1835            "keys": []
1836          },
1837          "metadata": {}
1838        }
1839        "###);
1840
1841        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
1842        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
1843        insta::assert_json_snapshot!(v2, @r###"
1844        {
1845          "typ": {
1846            "column_types": [
1847              {
1848                "scalar_type": "String",
1849                "nullable": false
1850              }
1851            ],
1852            "keys": []
1853          },
1854          "metadata": {
1855            "1": {
1856              "name": "a",
1857              "typ_idx": 0,
1858              "added": 2,
1859              "dropped": null
1860            }
1861          }
1862        }
1863        "###);
1864    }
1865
1866    #[mz_ore::test]
1867    #[cfg_attr(miri, ignore)]
1868    fn apply_demand() {
1869        let desc = RelationDesc::builder()
1870            .with_column("a", SqlScalarType::String.nullable(true))
1871            .with_column("b", SqlScalarType::Int64.nullable(false))
1872            .with_column("c", SqlScalarType::Time.nullable(false))
1873            .finish();
1874        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
1875        assert_eq!(desc.arity(), 2);
1876        // TODO(parkmycar): Move validate onto RelationDesc.
1877        VersionedRelationDesc::new(desc).validate();
1878    }
1879
1880    #[mz_ore::test]
1881    #[cfg_attr(miri, ignore)]
1882    fn smoketest_column_index_stable_ident() {
1883        let idx_a = ColumnIndex(42);
1884        // Note(parkmycar): This should never change.
1885        assert_eq!(idx_a.to_stable_name(), "42");
1886    }
1887
1888    #[mz_ore::test]
1889    #[cfg_attr(miri, ignore)] // too slow
1890    fn proptest_relation_desc_roundtrips() {
1891        fn testcase(og: RelationDesc) {
1892            let bytes = og.into_proto().encode_to_vec();
1893            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
1894            let rnd = RelationDesc::from_proto(proto).unwrap();
1895
1896            assert_eq!(og, rnd);
1897        }
1898
1899        proptest!(|(desc in any::<RelationDesc>())| {
1900            testcase(desc);
1901        });
1902
1903        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
1904            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
1905        });
1906
1907        proptest!(|((mut desc, diffs) in strat)| {
1908            for diff in diffs {
1909                diff.apply(&mut desc);
1910            };
1911            testcase(desc);
1912        });
1913    }
1914}