mz_repr/
relation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26
27use crate::relation_and_scalar::proto_relation_type::ProtoKey;
28pub use crate::relation_and_scalar::{
29    ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
30    ProtoRelationVersion,
31};
32use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
33
34/// The type of a [`Datum`].
35///
36/// [`SqlColumnType`] bundles information about the scalar type of a datum (e.g.,
37/// Int32 or String) with its nullability.
38///
39/// To construct a column type, either initialize the struct directly, or
40/// use the [`SqlScalarType::nullable`] method.
41#[derive(
42    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
43)]
44pub struct SqlColumnType {
45    /// The underlying scalar type (e.g., Int32 or String) of this column.
46    pub scalar_type: SqlScalarType,
47    /// Whether this datum can be null.
48    #[serde(default = "return_true")]
49    pub nullable: bool,
50}
51
52/// This method exists solely for the purpose of making SqlColumnType nullable by
53/// default in unit tests. The default value of a bool is false, and the only
54/// way to make an object take on any other value by default is to pass it a
55/// function that returns the desired default value. See
56/// <https://github.com/serde-rs/serde/issues/1030>
57#[inline(always)]
58fn return_true() -> bool {
59    true
60}
61
62impl SqlColumnType {
63    pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
64        match (&self.scalar_type, &other.scalar_type) {
65            (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
66                Ok(SqlColumnType {
67                    scalar_type: scalar_type.clone(),
68                    nullable: self.nullable || other.nullable,
69                })
70            }
71            (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
72                Ok(SqlColumnType {
73                    scalar_type: scalar_type.without_modifiers(),
74                    nullable: self.nullable || other.nullable,
75                })
76            }
77            (
78                SqlScalarType::Record { fields, custom_id },
79                SqlScalarType::Record {
80                    fields: other_fields,
81                    custom_id: other_custom_id,
82                },
83            ) => {
84                if custom_id != other_custom_id {
85                    bail!(
86                        "Can't union types: {:?} and {:?}",
87                        self.scalar_type,
88                        other.scalar_type
89                    );
90                };
91
92                if fields.len() != other_fields.len() {
93                    bail!(
94                        "Can't union types: {:?} and {:?}",
95                        self.scalar_type,
96                        other.scalar_type
97                    );
98                }
99                let mut union_fields = Vec::with_capacity(fields.len());
100                for ((name, typ), (other_name, other_typ)) in
101                    fields.iter().zip_eq(other_fields.iter())
102                {
103                    if name != other_name {
104                        bail!(
105                            "Can't union types: {:?} and {:?}",
106                            self.scalar_type,
107                            other.scalar_type
108                        );
109                    } else {
110                        let union_column_type = typ.union(other_typ)?;
111                        union_fields.push((name.clone(), union_column_type));
112                    };
113                }
114
115                Ok(SqlColumnType {
116                    scalar_type: SqlScalarType::Record {
117                        fields: union_fields.into(),
118                        custom_id: *custom_id,
119                    },
120                    nullable: self.nullable || other.nullable,
121                })
122            }
123            _ => bail!(
124                "Can't union types: {:?} and {:?}",
125                self.scalar_type,
126                other.scalar_type
127            ),
128        }
129    }
130
131    /// Consumes this `SqlColumnType` and returns a new `SqlColumnType` with its
132    /// nullability set to the specified boolean.
133    pub fn nullable(mut self, nullable: bool) -> Self {
134        self.nullable = nullable;
135        self
136    }
137}
138
139impl RustType<ProtoColumnType> for SqlColumnType {
140    fn into_proto(&self) -> ProtoColumnType {
141        ProtoColumnType {
142            nullable: self.nullable,
143            scalar_type: Some(self.scalar_type.into_proto()),
144        }
145    }
146
147    fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
148        Ok(SqlColumnType {
149            nullable: proto.nullable,
150            scalar_type: proto
151                .scalar_type
152                .into_rust_if_some("ProtoColumnType::scalar_type")?,
153        })
154    }
155}
156
157impl fmt::Display for SqlColumnType {
158    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159        let nullable = if self.nullable { "Null" } else { "NotNull" };
160        f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
161    }
162}
163
164/// The type of a relation.
165#[derive(
166    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
167)]
168pub struct SqlRelationType {
169    /// The type for each column, in order.
170    pub column_types: Vec<SqlColumnType>,
171    /// Sets of indices that are "keys" for the collection.
172    ///
173    /// Each element in this list is a set of column indices, each with the
174    /// property that the collection contains at most one record with each
175    /// distinct set of values for each column. Alternately, for a specific set
176    /// of values assigned to the these columns there is at most one record.
177    ///
178    /// A collection can contain multiple sets of keys, although it is common to
179    /// have either zero or one sets of key indices.
180    #[serde(default)]
181    pub keys: Vec<Vec<usize>>,
182}
183
184impl SqlRelationType {
185    /// Constructs a `SqlRelationType` representing the relation with no columns and
186    /// no keys.
187    pub fn empty() -> Self {
188        SqlRelationType::new(vec![])
189    }
190
191    /// Constructs a new `SqlRelationType` from specified column types.
192    ///
193    /// The `SqlRelationType` will have no keys.
194    pub fn new(column_types: Vec<SqlColumnType>) -> Self {
195        SqlRelationType {
196            column_types,
197            keys: Vec::new(),
198        }
199    }
200
201    /// Adds a new key for the relation.
202    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
203        indices.sort_unstable();
204        if !self.keys.contains(&indices) {
205            self.keys.push(indices);
206        }
207        self
208    }
209
210    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
211        for key in keys {
212            self = self.with_key(key)
213        }
214        self
215    }
216
217    /// Computes the number of columns in the relation.
218    pub fn arity(&self) -> usize {
219        self.column_types.len()
220    }
221
222    /// Gets the index of the columns used when creating a default index.
223    pub fn default_key(&self) -> Vec<usize> {
224        if let Some(key) = self.keys.first() {
225            if key.is_empty() {
226                (0..self.column_types.len()).collect()
227            } else {
228                key.clone()
229            }
230        } else {
231            (0..self.column_types.len()).collect()
232        }
233    }
234
235    /// Returns all the [`SqlColumnType`]s, in order, for this relation.
236    pub fn columns(&self) -> &[SqlColumnType] {
237        &self.column_types
238    }
239}
240
241impl RustType<ProtoRelationType> for SqlRelationType {
242    fn into_proto(&self) -> ProtoRelationType {
243        ProtoRelationType {
244            column_types: self.column_types.into_proto(),
245            keys: self.keys.into_proto(),
246        }
247    }
248
249    fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
250        Ok(SqlRelationType {
251            column_types: proto.column_types.into_rust()?,
252            keys: proto.keys.into_rust()?,
253        })
254    }
255}
256
257impl RustType<ProtoKey> for Vec<usize> {
258    fn into_proto(&self) -> ProtoKey {
259        ProtoKey {
260            keys: self.into_proto(),
261        }
262    }
263
264    fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
265        proto.keys.into_rust()
266    }
267}
268
269/// The type of a relation.
270#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
271pub struct ReprRelationType {
272    /// The type for each column, in order.
273    pub column_types: Vec<ReprColumnType>,
274    /// Sets of indices that are "keys" for the collection.
275    ///
276    /// Each element in this list is a set of column indices, each with the
277    /// property that the collection contains at most one record with each
278    /// distinct set of values for each column. Alternately, for a specific set
279    /// of values assigned to the these columns there is at most one record.
280    ///
281    /// A collection can contain multiple sets of keys, although it is common to
282    /// have either zero or one sets of key indices.
283    #[serde(default)]
284    pub keys: Vec<Vec<usize>>,
285}
286
287impl ReprRelationType {
288    /// Constructs a `ReprRelationType` representing the relation with no columns and
289    /// no keys.
290    pub fn empty() -> Self {
291        ReprRelationType::new(vec![])
292    }
293
294    /// Constructs a new `ReprRelationType` from specified column types.
295    ///
296    /// The `ReprRelationType` will have no keys.
297    pub fn new(column_types: Vec<ReprColumnType>) -> Self {
298        ReprRelationType {
299            column_types,
300            keys: Vec::new(),
301        }
302    }
303
304    /// Adds a new key for the relation.
305    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
306        indices.sort_unstable();
307        if !self.keys.contains(&indices) {
308            self.keys.push(indices);
309        }
310        self
311    }
312
313    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
314        for key in keys {
315            self = self.with_key(key)
316        }
317        self
318    }
319
320    /// Computes the number of columns in the relation.
321    pub fn arity(&self) -> usize {
322        self.column_types.len()
323    }
324
325    /// Gets the index of the columns used when creating a default index.
326    pub fn default_key(&self) -> Vec<usize> {
327        if let Some(key) = self.keys.first() {
328            if key.is_empty() {
329                (0..self.column_types.len()).collect()
330            } else {
331                key.clone()
332            }
333        } else {
334            (0..self.column_types.len()).collect()
335        }
336    }
337
338    /// Returns all the column types in order, for this relation.
339    pub fn columns(&self) -> &[ReprColumnType] {
340        &self.column_types
341    }
342}
343
344impl From<&SqlRelationType> for ReprRelationType {
345    fn from(sql_relation_type: &SqlRelationType) -> Self {
346        ReprRelationType {
347            column_types: sql_relation_type
348                .column_types
349                .iter()
350                .map(ReprColumnType::from)
351                .collect(),
352            keys: sql_relation_type.keys.clone(),
353        }
354    }
355}
356
357#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
358pub struct ReprColumnType {
359    /// The underlying representation scalar type (e.g., Int32 or String) of this column.
360    pub scalar_type: ReprScalarType,
361    /// Whether this datum can be null.
362    #[serde(default = "return_true")]
363    pub nullable: bool,
364}
365
366impl From<&SqlColumnType> for ReprColumnType {
367    fn from(sql_column_type: &SqlColumnType) -> Self {
368        let scalar_type = &sql_column_type.scalar_type;
369        let scalar_type = scalar_type.into();
370        let nullable = sql_column_type.nullable;
371
372        ReprColumnType {
373            scalar_type,
374            nullable,
375        }
376    }
377}
378
379impl SqlColumnType {
380    /// Lossily translates a [`ReprColumnType`] back to a [`SqlColumnType`].
381    ///
382    /// See [`SqlScalarType::from_repr`] for an example of lossiness.
383    pub fn from_repr(repr: &ReprColumnType) -> Self {
384        let scalar_type = &repr.scalar_type;
385        let scalar_type = SqlScalarType::from_repr(scalar_type);
386        let nullable = repr.nullable;
387
388        SqlColumnType {
389            scalar_type,
390            nullable,
391        }
392    }
393}
394
395/// The name of a column in a [`RelationDesc`].
396#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
397pub struct ColumnName(Box<str>);
398
399impl ColumnName {
400    /// Returns this column name as a `str`.
401    #[inline(always)]
402    pub fn as_str(&self) -> &str {
403        &*self
404    }
405
406    /// Returns this column name as a `&mut Box<str>`.
407    pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
408        &mut self.0
409    }
410
411    /// Returns if this [`ColumnName`] is similar to the provided one.
412    pub fn is_similar(&self, other: &ColumnName) -> bool {
413        const SIMILARITY_THRESHOLD: f64 = 0.6;
414
415        let a_lowercase = self.to_lowercase();
416        let b_lowercase = other.to_lowercase();
417
418        strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
419    }
420}
421
422impl std::ops::Deref for ColumnName {
423    type Target = str;
424
425    #[inline(always)]
426    fn deref(&self) -> &Self::Target {
427        &self.0
428    }
429}
430
431impl fmt::Display for ColumnName {
432    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
433        f.write_str(&self.0)
434    }
435}
436
437impl From<String> for ColumnName {
438    fn from(s: String) -> ColumnName {
439        ColumnName(s.into())
440    }
441}
442
443impl From<&str> for ColumnName {
444    fn from(s: &str) -> ColumnName {
445        ColumnName(s.into())
446    }
447}
448
449impl From<&ColumnName> for ColumnName {
450    fn from(n: &ColumnName) -> ColumnName {
451        n.clone()
452    }
453}
454
455impl RustType<ProtoColumnName> for ColumnName {
456    fn into_proto(&self) -> ProtoColumnName {
457        ProtoColumnName {
458            value: Some(self.0.to_string()),
459        }
460    }
461
462    fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
463        Ok(ColumnName(
464            proto
465                .value
466                .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
467                .into(),
468        ))
469    }
470}
471
472impl From<ColumnName> for mz_sql_parser::ast::Ident {
473    fn from(value: ColumnName) -> Self {
474        // Note: ColumnNames are known to be less than the max length of an Ident (I think?).
475        mz_sql_parser::ast::Ident::new_unchecked(value.0)
476    }
477}
478
479impl proptest::arbitrary::Arbitrary for ColumnName {
480    type Parameters = ();
481    type Strategy = BoxedStrategy<ColumnName>;
482
483    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
484        // Long column names are generally uninteresting, and can greatly
485        // increase the runtime for a test case, so bound the max length.
486        let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
487        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
488            weights.extend([
489                (5, Just(16..128)),
490                (1, Just(128..1024)),
491                (1, Just(1024..4096)),
492            ]);
493        }
494        let name_length = Union::new_weighted(weights);
495
496        // Non-ASCII characters are also generally uninteresting and can make
497        // debugging harder.
498        let char_strat = Rc::new(Union::new_weighted(vec![
499            (50, proptest::char::range('A', 'z').boxed()),
500            (1, any::<char>().boxed()),
501        ]));
502
503        name_length
504            .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
505            .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
506            .no_shrink()
507            .boxed()
508    }
509}
510
511/// Default name of a column (when no other information is known).
512pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
513
514/// Stable index of a column in a [`RelationDesc`].
515#[derive(
516    Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
517)]
518pub struct ColumnIndex(usize);
519
520static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
521
522impl ColumnIndex {
523    /// Returns a stable identifier for this [`ColumnIndex`].
524    pub fn to_stable_name(&self) -> String {
525        self.0.to_string()
526    }
527
528    pub fn to_raw(&self) -> usize {
529        self.0
530    }
531
532    pub fn from_raw(val: usize) -> Self {
533        ColumnIndex(val)
534    }
535}
536
537/// The version a given column was added at.
538#[derive(
539    Clone,
540    Copy,
541    Debug,
542    Eq,
543    PartialEq,
544    PartialOrd,
545    Ord,
546    Serialize,
547    Deserialize,
548    Hash,
549    MzReflect,
550    Arbitrary,
551)]
552pub struct RelationVersion(u64);
553
554impl RelationVersion {
555    /// Returns the "root" or "initial" version of a [`RelationDesc`].
556    pub fn root() -> Self {
557        RelationVersion(0)
558    }
559
560    /// Returns an instance of [`RelationVersion`] which is "one" higher than `self`.
561    pub fn bump(&self) -> Self {
562        let next_version = self
563            .0
564            .checked_add(1)
565            .expect("added more than u64::MAX columns?");
566        RelationVersion(next_version)
567    }
568
569    /// Consume a [`RelationVersion`] returning the raw value.
570    ///
571    /// Should __only__ be used for serialization.
572    pub fn into_raw(self) -> u64 {
573        self.0
574    }
575
576    /// Create a [`RelationVersion`] from a raw value.
577    ///
578    /// Should __only__ be used for serialization.
579    pub fn from_raw(val: u64) -> RelationVersion {
580        RelationVersion(val)
581    }
582}
583
584impl From<RelationVersion> for SchemaId {
585    fn from(value: RelationVersion) -> Self {
586        SchemaId(usize::cast_from(value.0))
587    }
588}
589
590impl From<mz_sql_parser::ast::Version> for RelationVersion {
591    fn from(value: mz_sql_parser::ast::Version) -> Self {
592        RelationVersion(value.into_inner())
593    }
594}
595
596impl fmt::Display for RelationVersion {
597    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
598        write!(f, "v{}", self.0)
599    }
600}
601
602impl From<RelationVersion> for mz_sql_parser::ast::Version {
603    fn from(value: RelationVersion) -> Self {
604        mz_sql_parser::ast::Version::new(value.0)
605    }
606}
607
608impl RustType<ProtoRelationVersion> for RelationVersion {
609    fn into_proto(&self) -> ProtoRelationVersion {
610        ProtoRelationVersion { value: self.0 }
611    }
612
613    fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
614        Ok(RelationVersion(proto.value))
615    }
616}
617
618/// Metadata (other than type) for a column in a [`RelationDesc`].
619#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
620struct ColumnMetadata {
621    /// Name of the column.
622    name: ColumnName,
623    /// Index into a [`SqlRelationType`] for this column.
624    typ_idx: usize,
625    /// Version this column was added at.
626    added: RelationVersion,
627    /// Version this column was dropped at.
628    dropped: Option<RelationVersion>,
629}
630
631/// A description of the shape of a relation.
632///
633/// It bundles a [`SqlRelationType`] with `ColumnMetadata` for each column in
634/// the relation.
635///
636/// # Examples
637///
638/// A `RelationDesc`s is typically constructed via its builder API:
639///
640/// ```
641/// use mz_repr::{SqlColumnType, RelationDesc, SqlScalarType};
642///
643/// let desc = RelationDesc::builder()
644///     .with_column("id", SqlScalarType::Int64.nullable(false))
645///     .with_column("price", SqlScalarType::Float64.nullable(true))
646///     .finish();
647/// ```
648///
649/// In more complicated cases, like when constructing a `RelationDesc` in
650/// response to user input, it may be more convenient to construct a relation
651/// type first, and imbue it with column names to form a `RelationDesc` later:
652///
653/// ```
654/// use mz_repr::RelationDesc;
655///
656/// # fn plan_query(_: &str) -> mz_repr::SqlRelationType { mz_repr::SqlRelationType::new(vec![]) }
657/// let relation_type = plan_query("SELECT * FROM table");
658/// let names = (0..relation_type.arity()).map(|i| match i {
659///     0 => "first",
660///     1 => "second",
661///     _ => "unknown",
662/// });
663/// let desc = RelationDesc::new(relation_type, names);
664/// ```
665///
666/// Next to the [`SqlRelationType`] we maintain a map of `ColumnIndex` to
667/// `ColumnMetadata`, where [`ColumnIndex`] is a stable identifier for a
668/// column throughout the lifetime of the relation. This allows a
669/// [`RelationDesc`] to represent a projection over a version of itself.
670///
671/// ```
672/// use std::collections::BTreeSet;
673/// use mz_repr::{ColumnIndex, RelationDesc, SqlScalarType};
674///
675/// let desc = RelationDesc::builder()
676///     .with_column("name", SqlScalarType::String.nullable(false))
677///     .with_column("email", SqlScalarType::String.nullable(false))
678///     .finish();
679///
680/// // Project away the second column.
681/// let demands = BTreeSet::from([1]);
682/// let proj = desc.apply_demand(&demands);
683///
684/// // We projected away the first column.
685/// assert!(!proj.contains_index(&ColumnIndex::from_raw(0)));
686/// // But retained the second.
687/// assert!(proj.contains_index(&ColumnIndex::from_raw(1)));
688///
689/// // The underlying `SqlRelationType` also contains a single column.
690/// assert_eq!(proj.typ().arity(), 1);
691/// ```
692///
693/// To maintain this stable mapping and track the lifetime of a column (e.g.
694/// when adding or dropping a column) we use `ColumnMetadata`. It maintains
695/// the index in [`SqlRelationType`] that corresponds to a given column, and the
696/// version at which this column was added or dropped.
697///
698#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
699pub struct RelationDesc {
700    typ: SqlRelationType,
701    metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
702}
703
704impl RustType<ProtoRelationDesc> for RelationDesc {
705    fn into_proto(&self) -> ProtoRelationDesc {
706        let (names, metadata): (Vec<_>, Vec<_>) = self
707            .metadata
708            .values()
709            .map(|meta| {
710                let metadata = ProtoColumnMetadata {
711                    added: Some(meta.added.into_proto()),
712                    dropped: meta.dropped.map(|v| v.into_proto()),
713                };
714                (meta.name.into_proto(), metadata)
715            })
716            .unzip();
717
718        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
719        // metadata field was added. To make sure our serialization roundtrips the same as before
720        // we added the field, we omit `metadata` if all of the values are equal to the default.
721        //
722        // Note: This logic needs to exist approximately forever.
723        let is_all_default_metadata = metadata.iter().all(|meta| {
724            meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
725        });
726        let metadata = if is_all_default_metadata {
727            Vec::new()
728        } else {
729            metadata
730        };
731
732        ProtoRelationDesc {
733            typ: Some(self.typ.into_proto()),
734            names,
735            metadata,
736        }
737    }
738
739    fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
740        // `metadata` Migration Logic: We wrote some `ProtoRelationDesc`s into Persist before the
741        // metadata field was added. If the field doesn't exist we fill it in with default values,
742        // and when converting into_proto we omit these fields so the serialized bytes roundtrip.
743        //
744        // Note: This logic needs to exist approximately forever.
745        let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
746            let val = ProtoColumnMetadata {
747                added: Some(RelationVersion::root().into_proto()),
748                dropped: None,
749            };
750            Box::new(itertools::repeat_n(val, proto.names.len()))
751        } else {
752            Box::new(proto.metadata.into_iter())
753        };
754
755        let metadata = proto
756            .names
757            .into_iter()
758            .zip_eq(proto_metadata)
759            .enumerate()
760            .map(|(idx, (name, metadata))| {
761                let meta = ColumnMetadata {
762                    name: name.into_rust()?,
763                    typ_idx: idx,
764                    added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
765                    dropped: metadata.dropped.into_rust()?,
766                };
767                Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
768            })
769            .collect::<Result<_, _>>()?;
770
771        Ok(RelationDesc {
772            typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
773            metadata,
774        })
775    }
776}
777
778impl RelationDesc {
779    /// Returns a [`RelationDescBuilder`] that can be used to construct a [`RelationDesc`].
780    pub fn builder() -> RelationDescBuilder {
781        RelationDescBuilder::default()
782    }
783
784    /// Constructs a new `RelationDesc` that represents the empty relation
785    /// with no columns and no keys.
786    pub fn empty() -> Self {
787        RelationDesc {
788            typ: SqlRelationType::empty(),
789            metadata: BTreeMap::default(),
790        }
791    }
792
793    /// Check if the `RelationDesc` is empty.
794    pub fn is_empty(&self) -> bool {
795        self == &Self::empty()
796    }
797
798    /// Returns the number of columns in this [`RelationDesc`].
799    pub fn len(&self) -> usize {
800        self.typ().column_types.len()
801    }
802
803    /// Constructs a new `RelationDesc` from a `SqlRelationType` and an iterator
804    /// over column names.
805    ///
806    /// # Panics
807    ///
808    /// Panics if the arity of the `SqlRelationType` is not equal to the number of
809    /// items in `names`.
810    pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
811    where
812        I: IntoIterator<Item = N>,
813        N: Into<ColumnName>,
814    {
815        let metadata: BTreeMap<_, _> = names
816            .into_iter()
817            .enumerate()
818            .map(|(idx, name)| {
819                let col_idx = ColumnIndex(idx);
820                let metadata = ColumnMetadata {
821                    name: name.into(),
822                    typ_idx: idx,
823                    added: RelationVersion::root(),
824                    dropped: None,
825                };
826                (col_idx, metadata)
827            })
828            .collect();
829
830        // TODO(parkmycar): Add better validation here.
831        assert_eq!(typ.column_types.len(), metadata.len());
832
833        RelationDesc { typ, metadata }
834    }
835
836    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
837    where
838        I: IntoIterator<Item = (N, T)>,
839        T: Into<SqlColumnType>,
840        N: Into<ColumnName>,
841    {
842        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
843        let types = types.into_iter().map(Into::into).collect();
844        let typ = SqlRelationType::new(types);
845        Self::new(typ, names)
846    }
847
848    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
849    ///
850    /// # Panics
851    ///
852    /// Panics if either `self` or `other` have columns that were added at a
853    /// [`RelationVersion`] other than [`RelationVersion::root`] or if any
854    /// columns were dropped.
855    ///
856    /// TODO(parkmycar): Move this method to [`RelationDescBuilder`].
857    pub fn concat(mut self, other: Self) -> Self {
858        let self_len = self.typ.column_types.len();
859
860        for (typ, (_col_idx, meta)) in other
861            .typ
862            .column_types
863            .into_iter()
864            .zip_eq(other.metadata.into_iter())
865        {
866            assert_eq!(meta.added, RelationVersion::root());
867            assert_none!(meta.dropped);
868
869            let new_idx = self.typ.columns().len();
870            let new_meta = ColumnMetadata {
871                name: meta.name,
872                typ_idx: new_idx,
873                added: RelationVersion::root(),
874                dropped: None,
875            };
876
877            self.typ.column_types.push(typ);
878            let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
879
880            assert_eq!(self.metadata.len(), self.typ.columns().len());
881            assert_none!(prev);
882        }
883
884        for k in other.typ.keys {
885            let k = k.into_iter().map(|idx| idx + self_len).collect();
886            self = self.with_key(k);
887        }
888        self
889    }
890
891    /// Adds a new key for the relation.
892    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
893        self.typ = self.typ.with_key(indices);
894        self
895    }
896
897    /// Drops all existing keys.
898    pub fn without_keys(mut self) -> Self {
899        self.typ.keys.clear();
900        self
901    }
902
903    /// Builds a new relation description with the column names replaced with
904    /// new names.
905    ///
906    /// # Panics
907    ///
908    /// Panics if the arity of the relation type does not match the number of
909    /// items in `names`.
910    pub fn with_names<I, N>(self, names: I) -> Self
911    where
912        I: IntoIterator<Item = N>,
913        N: Into<ColumnName>,
914    {
915        Self::new(self.typ, names)
916    }
917
918    /// Computes the number of columns in the relation.
919    pub fn arity(&self) -> usize {
920        self.typ.arity()
921    }
922
923    /// Returns the relation type underlying this relation description.
924    pub fn typ(&self) -> &SqlRelationType {
925        &self.typ
926    }
927
928    /// Returns the owned relation type underlying this relation description.
929    pub fn into_typ(self) -> SqlRelationType {
930        self.typ
931    }
932
933    /// Returns an iterator over the columns in this relation.
934    pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
935        self.metadata.values().map(|meta| {
936            let typ = &self.typ.columns()[meta.typ_idx];
937            (&meta.name, typ)
938        })
939    }
940
941    /// Returns an iterator over the types of the columns in this relation.
942    pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
943        self.typ.column_types.iter()
944    }
945
946    /// Returns an iterator over the names of the columns in this relation.
947    pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
948        self.metadata.values().map(|meta| &meta.name)
949    }
950
951    /// Returns an iterator over the columns in this relation, with all their metadata.
952    pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
953        self.metadata.iter().map(|(col_idx, metadata)| {
954            let col_typ = &self.typ.columns()[metadata.typ_idx];
955            (col_idx, &metadata.name, col_typ)
956        })
957    }
958
959    /// Returns an iterator over the names of the columns in this relation that are "similar" to
960    /// the provided `name`.
961    pub fn iter_similar_names<'a>(
962        &'a self,
963        name: &'a ColumnName,
964    ) -> impl Iterator<Item = &'a ColumnName> {
965        self.iter_names().filter(|n| n.is_similar(name))
966    }
967
968    /// Returns whether this [`RelationDesc`] contains a column at the specified index.
969    pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
970        self.metadata.contains_key(idx)
971    }
972
973    /// Finds a column by name.
974    ///
975    /// Returns the index and type of the column named `name`. If no column with
976    /// the specified name exists, returns `None`. If multiple columns have the
977    /// specified name, the leftmost column is returned.
978    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
979        self.iter_names()
980            .position(|n| n == name)
981            .map(|i| (i, &self.typ.column_types[i]))
982    }
983
984    /// Gets the name of the `i`th column.
985    ///
986    /// # Panics
987    ///
988    /// Panics if `i` is not a valid column index.
989    ///
990    /// TODO(parkmycar): Migrate all uses of this to [`RelationDesc::get_name_idx`].
991    pub fn get_name(&self, i: usize) -> &ColumnName {
992        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
993        self.get_name_idx(&ColumnIndex(i))
994    }
995
996    /// Gets the name of the column at `idx`.
997    ///
998    /// # Panics
999    ///
1000    /// Panics if no column exists at `idx`.
1001    pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1002        &self.metadata.get(idx).expect("should exist").name
1003    }
1004
1005    /// Mutably gets the name of the `i`th column.
1006    ///
1007    /// # Panics
1008    ///
1009    /// Panics if `i` is not a valid column index.
1010    pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1011        // TODO(parkmycar): Refactor this to use `ColumnIndex`.
1012        &mut self
1013            .metadata
1014            .get_mut(&ColumnIndex(i))
1015            .expect("should exist")
1016            .name
1017    }
1018
1019    /// Gets the [`SqlColumnType`] of the column at `idx`.
1020    ///
1021    /// # Panics
1022    ///
1023    /// Panics if no column exists at `idx`.
1024    pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1025        let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1026        &self.typ.column_types[typ_idx]
1027    }
1028
1029    /// Gets the name of the `i`th column if that column name is unambiguous.
1030    ///
1031    /// If at least one other column has the same name as the `i`th column,
1032    /// returns `None`. If the `i`th column has no name, returns `None`.
1033    ///
1034    /// # Panics
1035    ///
1036    /// Panics if `i` is not a valid column index.
1037    pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1038        let name = self.get_name(i);
1039        if self.iter_names().filter(|n| *n == name).count() == 1 {
1040            Some(name)
1041        } else {
1042            None
1043        }
1044    }
1045
1046    /// Verifies that `d` meets all of the constraints for the `i`th column of `self`.
1047    ///
1048    /// n.b. The only constraint MZ currently supports in NOT NULL, but this
1049    /// structure will be simple to extend.
1050    pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1051        let name = self.get_name(i);
1052        let typ = &self.typ.column_types[i];
1053        if d == &Datum::Null && !typ.nullable {
1054            Err(NotNullViolation(name.clone()))
1055        } else {
1056            Ok(())
1057        }
1058    }
1059
1060    /// Creates a new [`RelationDesc`] retaining only the columns specified in `demands`.
1061    pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1062        let mut new_desc = self.clone();
1063
1064        // Update ColumnMetadata.
1065        let mut removed = 0;
1066        new_desc.metadata.retain(|idx, metadata| {
1067            let retain = demands.contains(&idx.0);
1068            if !retain {
1069                removed += 1;
1070            } else {
1071                metadata.typ_idx -= removed;
1072            }
1073            retain
1074        });
1075
1076        // Update SqlColumnType.
1077        let mut idx = 0;
1078        new_desc.typ.column_types.retain(|_| {
1079            let keep = demands.contains(&idx);
1080            idx += 1;
1081            keep
1082        });
1083
1084        new_desc
1085    }
1086}
1087
1088impl Arbitrary for RelationDesc {
1089    type Parameters = ();
1090    type Strategy = BoxedStrategy<RelationDesc>;
1091
1092    fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1093        let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1094        if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1095            weights.extend([
1096                (12, Just(16..32)),
1097                (6, Just(32..64)),
1098                (3, Just(64..128)),
1099                (1, Just(128..256)),
1100            ]);
1101        }
1102        let num_columns = Union::new_weighted(weights);
1103
1104        num_columns.prop_flat_map(arb_relation_desc).boxed()
1105    }
1106}
1107
1108/// Returns a [`Strategy`] that generates an arbitrary [`RelationDesc`] with a number columns
1109/// within the range provided.
1110pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1111    proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1112        .prop_map(RelationDesc::from_names_and_types)
1113}
1114
1115/// Returns a [`Strategy`] that generates a projection of the provided [`RelationDesc`].
1116pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1117    let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1118    mask.prop_map(move |mask| {
1119        let demands: BTreeSet<_> = mask
1120            .into_iter()
1121            .enumerate()
1122            .filter_map(|(idx, keep)| keep.then_some(idx))
1123            .collect();
1124        desc.apply_demand(&demands)
1125    })
1126}
1127
1128impl IntoIterator for RelationDesc {
1129    type Item = (ColumnName, SqlColumnType);
1130    type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1131
1132    fn into_iter(self) -> Self::IntoIter {
1133        let iter = self
1134            .metadata
1135            .into_values()
1136            .zip_eq(self.typ.column_types)
1137            .map(|(meta, typ)| (meta.name, typ));
1138        Box::new(iter)
1139    }
1140}
1141
1142/// Returns a [`Strategy`] that yields arbitrary [`Row`]s for the provided [`RelationDesc`].
1143pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1144    let datums: Vec<_> = desc
1145        .typ()
1146        .columns()
1147        .iter()
1148        .cloned()
1149        .map(arb_datum_for_column)
1150        .collect();
1151    datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1152}
1153
1154/// Expression violated not-null constraint on named column
1155#[derive(Debug, PartialEq, Eq)]
1156pub struct NotNullViolation(pub ColumnName);
1157
1158impl fmt::Display for NotNullViolation {
1159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1160        write!(
1161            f,
1162            "null value in column {} violates not-null constraint",
1163            self.0.quoted()
1164        )
1165    }
1166}
1167
1168/// A builder for a [`RelationDesc`].
1169#[derive(Clone, Default, Debug, PartialEq, Eq)]
1170pub struct RelationDescBuilder {
1171    /// Columns of the relation.
1172    columns: Vec<(ColumnName, SqlColumnType)>,
1173    /// Sets of indices that are "keys" for the collection.
1174    keys: Vec<Vec<usize>>,
1175}
1176
1177impl RelationDescBuilder {
1178    /// Appends a column with the specified name and type.
1179    pub fn with_column<N: Into<ColumnName>>(
1180        mut self,
1181        name: N,
1182        ty: SqlColumnType,
1183    ) -> RelationDescBuilder {
1184        let name = name.into();
1185        self.columns.push((name, ty));
1186        self
1187    }
1188
1189    /// Appends the provided columns to the builder.
1190    pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1191    where
1192        I: IntoIterator<Item = (N, T)>,
1193        T: Into<SqlColumnType>,
1194        N: Into<ColumnName>,
1195    {
1196        self.columns
1197            .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1198        self
1199    }
1200
1201    /// Adds a new key for the relation.
1202    pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1203        indices.sort_unstable();
1204        if !self.keys.contains(&indices) {
1205            self.keys.push(indices);
1206        }
1207        self
1208    }
1209
1210    /// Removes all previously inserted keys.
1211    pub fn without_keys(mut self) -> RelationDescBuilder {
1212        self.keys.clear();
1213        assert_eq!(self.keys.len(), 0);
1214        self
1215    }
1216
1217    /// Concatenates a [`RelationDescBuilder`] onto the end of this [`RelationDescBuilder`].
1218    pub fn concat(mut self, other: Self) -> Self {
1219        let self_len = self.columns.len();
1220
1221        self.columns.extend(other.columns);
1222        for k in other.keys {
1223            let k = k.into_iter().map(|idx| idx + self_len).collect();
1224            self = self.with_key(k);
1225        }
1226
1227        self
1228    }
1229
1230    /// Finish the builder, returning a [`RelationDesc`].
1231    pub fn finish(self) -> RelationDesc {
1232        let mut desc = RelationDesc::from_names_and_types(self.columns);
1233        desc.typ = desc.typ.with_keys(self.keys);
1234        desc
1235    }
1236}
1237
1238/// Describes a [`RelationDesc`] at a specific version of a [`VersionedRelationDesc`].
1239#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1240pub enum RelationVersionSelector {
1241    Specific(RelationVersion),
1242    Latest,
1243}
1244
1245impl RelationVersionSelector {
1246    pub fn specific(version: u64) -> Self {
1247        RelationVersionSelector::Specific(RelationVersion(version))
1248    }
1249}
1250
1251/// A wrapper around [`RelationDesc`] that provides an interface for adding
1252/// columns and generating new versions.
1253///
1254/// TODO(parkmycar): Using an immutable data structure for RelationDesc would
1255/// be great.
1256#[derive(Debug, Clone, Serialize)]
1257pub struct VersionedRelationDesc {
1258    inner: RelationDesc,
1259}
1260
1261impl VersionedRelationDesc {
1262    pub fn new(inner: RelationDesc) -> Self {
1263        VersionedRelationDesc { inner }
1264    }
1265
1266    /// Adds a new column to this [`RelationDesc`], creating a new version of the [`RelationDesc`].
1267    ///
1268    /// # Panics
1269    ///
1270    /// * Panics if a column with `name` already exists that hasn't been dropped.
1271    ///
1272    /// Note: For building a [`RelationDesc`] see [`RelationDescBuilder::with_column`].
1273    #[must_use]
1274    pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1275    where
1276        N: Into<ColumnName>,
1277        T: Into<SqlColumnType>,
1278    {
1279        let latest_version = self.latest_version();
1280        let new_version = latest_version.bump();
1281
1282        let name = name.into();
1283        let existing = self
1284            .inner
1285            .metadata
1286            .iter()
1287            .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1288        if let Some(existing) = existing {
1289            panic!("column named '{name}' already exists! {existing:?}");
1290        }
1291
1292        let next_idx = self.inner.metadata.len();
1293        let col_meta = ColumnMetadata {
1294            name,
1295            typ_idx: next_idx,
1296            added: new_version,
1297            dropped: None,
1298        };
1299
1300        self.inner.typ.column_types.push(typ.into());
1301        let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1302
1303        assert_none!(prev, "column index overlap!");
1304        self.validate();
1305
1306        new_version
1307    }
1308
1309    /// Drops the column `name` from this [`RelationDesc`]. If there are multiple columns with
1310    /// `name` drops the left-most one that hasn't already been dropped.
1311    ///
1312    /// TODO(parkmycar): Add handling for dropping a column that is currently used as a key.
1313    ///
1314    /// # Panics
1315    ///
1316    /// Panics if a column with `name` does not exist or the dropped column was used as a key.
1317    #[must_use]
1318    pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1319    where
1320        N: Into<ColumnName>,
1321    {
1322        let name = name.into();
1323        let latest_version = self.latest_version();
1324        let new_version = latest_version.bump();
1325
1326        let col = self
1327            .inner
1328            .metadata
1329            .values_mut()
1330            .find(|meta| meta.name == name && meta.dropped.is_none())
1331            .expect("column to exist");
1332
1333        // Make sure the column hadn't been previously dropped.
1334        assert_none!(col.dropped, "column was already dropped");
1335        col.dropped = Some(new_version);
1336
1337        // Make sure the column isn't being used as a key.
1338        let dropped_key = self
1339            .inner
1340            .typ
1341            .keys
1342            .iter()
1343            .any(|keys| keys.contains(&col.typ_idx));
1344        assert!(!dropped_key, "column being dropped was used as a key");
1345
1346        self.validate();
1347        new_version
1348    }
1349
1350    /// Returns the [`RelationDesc`] at the latest version.
1351    pub fn latest(&self) -> RelationDesc {
1352        self.inner.clone()
1353    }
1354
1355    /// Returns this [`RelationDesc`] at the specified version.
1356    pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1357        // Get all of the changes from the start, up to whatever version was requested.
1358        //
1359        // TODO(parkmycar): We should probably panic on unknown verisons?
1360        let up_to_version = match version {
1361            RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1362            RelationVersionSelector::Specific(v) => v,
1363        };
1364
1365        let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1366            let added = meta.added <= up_to_version;
1367            let dropped = meta
1368                .dropped
1369                .map(|dropped_at| up_to_version >= dropped_at)
1370                .unwrap_or(false);
1371
1372            added && !dropped
1373        });
1374
1375        let mut column_types = Vec::new();
1376        let mut column_metas = BTreeMap::new();
1377
1378        // N.B. At this point we need to be careful because col_idx might not
1379        // equal typ_idx.
1380        //
1381        // For example, consider columns "a", "b", and "c" with indexes 0, 1,
1382        // and 2. If we drop column "b" then we'll have "a" and "c" with column
1383        // indexes 0 and 2, but their indices in SqlRelationType will be 0 and 1.
1384        for (col_idx, meta) in valid_columns {
1385            let new_meta = ColumnMetadata {
1386                name: meta.name.clone(),
1387                typ_idx: column_types.len(),
1388                added: meta.added.clone(),
1389                dropped: meta.dropped.clone(),
1390            };
1391            column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1392            column_metas.insert(*col_idx, new_meta);
1393        }
1394
1395        // Remap keys in case a column with an index less than that of a key was
1396        // dropped.
1397        //
1398        // For example, consider columns "a", "b", and "c" where "a" and "c" are
1399        // keys and "b" was dropped.
1400        let keys = self
1401            .inner
1402            .typ
1403            .keys
1404            .iter()
1405            .map(|keys| {
1406                keys.iter()
1407                    .map(|key_idx| {
1408                        let metadata = column_metas
1409                            .get(&ColumnIndex(*key_idx))
1410                            .expect("found key for column that doesn't exist");
1411                        metadata.typ_idx
1412                    })
1413                    .collect()
1414            })
1415            .collect();
1416
1417        let relation_type = SqlRelationType { column_types, keys };
1418
1419        RelationDesc {
1420            typ: relation_type,
1421            metadata: column_metas,
1422        }
1423    }
1424
1425    pub fn latest_version(&self) -> RelationVersion {
1426        self.inner
1427            .metadata
1428            .values()
1429            // N.B. Dropped is always greater than added.
1430            .map(|meta| meta.dropped.unwrap_or(meta.added))
1431            .max()
1432            // If there aren't any columns we're implicitly the root version.
1433            .unwrap_or_else(RelationVersion::root)
1434    }
1435
1436    /// Validates internal contraints of the [`RelationDesc`] are correct.
1437    ///
1438    /// # Panics
1439    ///
1440    /// Panics if a constraint is not satisfied.
1441    fn validate(&self) {
1442        fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1443            if desc.typ.column_types.len() != desc.metadata.len() {
1444                anyhow::bail!("mismatch between number of types and metadatas");
1445            }
1446
1447            for (col_idx, meta) in &desc.metadata {
1448                if col_idx.0 > desc.metadata.len() {
1449                    anyhow::bail!("column index out of bounds");
1450                }
1451                if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1452                    anyhow::bail!("column was added after it was dropped?");
1453                }
1454                if desc.typ().columns().get(meta.typ_idx).is_none() {
1455                    anyhow::bail!("typ_idx incorrect");
1456                }
1457            }
1458
1459            for keys in &desc.typ.keys {
1460                for key in keys {
1461                    if *key >= desc.typ.column_types.len() {
1462                        anyhow::bail!("key index was out of bounds!");
1463                    }
1464                }
1465            }
1466
1467            let versions = desc
1468                .metadata
1469                .values()
1470                .map(|meta| meta.dropped.unwrap_or(meta.added));
1471            let mut max = 0;
1472            let mut sum = 0;
1473            for version in versions {
1474                max = std::cmp::max(max, version.0);
1475                sum += version.0;
1476            }
1477
1478            // Other than RelationVersion(0), we should never have duplicate
1479            // versions and they should always increase by 1. In other words, the
1480            // sum of all RelationVersions should be the sum of [0, max].
1481            //
1482            // N.B. n * (n + 1) / 2 = sum of [0, n]
1483            //
1484            // While I normally don't like tricks like this, it allows us to
1485            // validate that our column versions are correct in O(n) time and
1486            // without allocations.
1487            if sum != (max * (max + 1) / 2) {
1488                anyhow::bail!("there is a duplicate or missing relation version");
1489            }
1490
1491            Ok(())
1492        }
1493
1494        assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1495    }
1496}
1497
1498/// Diffs that can be generated proptest and applied to a [`RelationDesc`] to
1499/// exercise schema migrations.
1500#[derive(Debug)]
1501pub enum PropRelationDescDiff {
1502    AddColumn {
1503        name: ColumnName,
1504        typ: SqlColumnType,
1505    },
1506    DropColumn {
1507        name: ColumnName,
1508    },
1509    ToggleNullability {
1510        name: ColumnName,
1511    },
1512    ChangeType {
1513        name: ColumnName,
1514        typ: SqlColumnType,
1515    },
1516}
1517
1518impl PropRelationDescDiff {
1519    pub fn apply(self, desc: &mut RelationDesc) {
1520        match self {
1521            PropRelationDescDiff::AddColumn { name, typ } => {
1522                let new_idx = desc.metadata.len();
1523                let meta = ColumnMetadata {
1524                    name,
1525                    typ_idx: new_idx,
1526                    added: RelationVersion(0),
1527                    dropped: None,
1528                };
1529                let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1530                desc.typ.column_types.push(typ);
1531
1532                assert_none!(prev);
1533                assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1534            }
1535            PropRelationDescDiff::DropColumn { name } => {
1536                let next_version = desc
1537                    .metadata
1538                    .values()
1539                    .map(|meta| meta.dropped.unwrap_or(meta.added))
1540                    .max()
1541                    .unwrap_or_else(RelationVersion::root)
1542                    .bump();
1543                let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1544                else {
1545                    return;
1546                };
1547                if metadata.dropped.is_none() {
1548                    metadata.dropped = Some(next_version);
1549                }
1550            }
1551            PropRelationDescDiff::ToggleNullability { name } => {
1552                let Some((pos, _)) = desc.get_by_name(&name) else {
1553                    return;
1554                };
1555                let col_type = desc
1556                    .typ
1557                    .column_types
1558                    .get_mut(pos)
1559                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1560                col_type.nullable = !col_type.nullable;
1561            }
1562            PropRelationDescDiff::ChangeType { name, typ } => {
1563                let Some((pos, _)) = desc.get_by_name(&name) else {
1564                    return;
1565                };
1566                let col_type = desc
1567                    .typ
1568                    .column_types
1569                    .get_mut(pos)
1570                    .expect("ColumnNames and SqlColumnTypes out of sync!");
1571                *col_type = typ;
1572            }
1573        }
1574    }
1575}
1576
1577/// Generates a set of [`PropRelationDescDiff`]s based on some source [`RelationDesc`].
1578pub fn arb_relation_desc_diff(
1579    source: &RelationDesc,
1580) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1581    let source = Rc::new(source.clone());
1582    let num_source_columns = source.typ.columns().len();
1583
1584    let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1585    let add_columns_strat = num_add_columns
1586        .prop_flat_map(|num_columns| {
1587            proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1588        })
1589        .prop_map(|cols| {
1590            cols.into_iter()
1591                .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1592                .collect::<Vec<_>>()
1593        });
1594
1595    // If the source RelationDesc is empty there is nothing else to do.
1596    if num_source_columns == 0 {
1597        return add_columns_strat.boxed();
1598    }
1599
1600    let source_ = Rc::clone(&source);
1601    let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1602        let mut set = BTreeSet::default();
1603        for _ in 0..num_columns {
1604            let col_idx = rng.random_range(0..num_source_columns);
1605            set.insert(source_.get_name(col_idx).clone());
1606        }
1607        set.into_iter()
1608            .map(|name| PropRelationDescDiff::DropColumn { name })
1609            .collect::<Vec<_>>()
1610    });
1611
1612    let source_ = Rc::clone(&source);
1613    let toggle_nullability_strat =
1614        (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1615            let mut set = BTreeSet::default();
1616            for _ in 0..num_columns {
1617                let col_idx = rng.random_range(0..num_source_columns);
1618                set.insert(source_.get_name(col_idx).clone());
1619            }
1620            set.into_iter()
1621                .map(|name| PropRelationDescDiff::ToggleNullability { name })
1622                .collect::<Vec<_>>()
1623        });
1624
1625    let source_ = Rc::clone(&source);
1626    let change_type_strat = (0..num_source_columns)
1627        .prop_perturb(move |num_columns, mut rng| {
1628            let mut set = BTreeSet::default();
1629            for _ in 0..num_columns {
1630                let col_idx = rng.random_range(0..num_source_columns);
1631                set.insert(source_.get_name(col_idx).clone());
1632            }
1633            set
1634        })
1635        .prop_flat_map(|cols| {
1636            proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1637                .prop_map(move |types| (cols.clone(), types))
1638        })
1639        .prop_map(|(cols, types)| {
1640            cols.into_iter()
1641                .zip_eq(types)
1642                .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1643                .collect::<Vec<_>>()
1644        });
1645
1646    (
1647        add_columns_strat,
1648        drop_columns_strat,
1649        toggle_nullability_strat,
1650        change_type_strat,
1651    )
1652        .prop_map(|(adds, drops, toggles, changes)| {
1653            adds.into_iter()
1654                .chain(drops)
1655                .chain(toggles)
1656                .chain(changes)
1657                .collect::<Vec<_>>()
1658        })
1659        .prop_shuffle()
1660        .boxed()
1661}
1662
1663#[cfg(test)]
1664mod tests {
1665    use super::*;
1666    use prost::Message;
1667
1668    #[mz_ore::test]
1669    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1670    fn smoktest_at_version() {
1671        let desc = RelationDesc::builder()
1672            .with_column("a", SqlScalarType::Bool.nullable(true))
1673            .with_column("z", SqlScalarType::String.nullable(false))
1674            .finish();
1675
1676        let mut versioned_desc = VersionedRelationDesc {
1677            inner: desc.clone(),
1678        };
1679        versioned_desc.validate();
1680
1681        let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1682        assert_eq!(desc, latest);
1683
1684        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1685        assert_eq!(desc, v0);
1686
1687        let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1688        assert_eq!(desc, v3);
1689
1690        let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
1691        assert_eq!(v1, RelationVersion(1));
1692
1693        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1694        insta::assert_json_snapshot!(v1.metadata, @r###"
1695        {
1696          "0": {
1697            "name": "a",
1698            "typ_idx": 0,
1699            "added": 0,
1700            "dropped": null
1701          },
1702          "1": {
1703            "name": "z",
1704            "typ_idx": 1,
1705            "added": 0,
1706            "dropped": null
1707          },
1708          "2": {
1709            "name": "b",
1710            "typ_idx": 2,
1711            "added": 1,
1712            "dropped": null
1713          }
1714        }
1715        "###);
1716
1717        // Check that V0 doesn't show the new column.
1718        let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1719        assert!(v0.iter().eq(v0_b.iter()));
1720
1721        let v2 = versioned_desc.drop_column("z");
1722        assert_eq!(v2, RelationVersion(2));
1723
1724        let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1725        insta::assert_json_snapshot!(v2.metadata, @r###"
1726        {
1727          "0": {
1728            "name": "a",
1729            "typ_idx": 0,
1730            "added": 0,
1731            "dropped": null
1732          },
1733          "2": {
1734            "name": "b",
1735            "typ_idx": 1,
1736            "added": 1,
1737            "dropped": null
1738          }
1739        }
1740        "###);
1741
1742        // Check that V0 and V1 are still correct.
1743        let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1744        assert!(v0.iter().eq(v0_c.iter()));
1745
1746        let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1747        assert!(v1.iter().eq(v1_b.iter()));
1748
1749        insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1750        {
1751          "0": {
1752            "name": "a",
1753            "typ_idx": 0,
1754            "added": 0,
1755            "dropped": null
1756          },
1757          "1": {
1758            "name": "z",
1759            "typ_idx": 1,
1760            "added": 0,
1761            "dropped": 2
1762          },
1763          "2": {
1764            "name": "b",
1765            "typ_idx": 2,
1766            "added": 1,
1767            "dropped": null
1768          }
1769        }
1770        "###);
1771    }
1772
1773    #[mz_ore::test]
1774    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1775    fn test_dropping_columns_with_keys() {
1776        let desc = RelationDesc::builder()
1777            .with_column("a", SqlScalarType::Bool.nullable(true))
1778            .with_column("z", SqlScalarType::String.nullable(false))
1779            .with_key(vec![1])
1780            .finish();
1781
1782        let mut versioned_desc = VersionedRelationDesc {
1783            inner: desc.clone(),
1784        };
1785        versioned_desc.validate();
1786
1787        let v1 = versioned_desc.drop_column("a");
1788        assert_eq!(v1, RelationVersion(1));
1789
1790        // Make sure the key index for 'z' got remapped since 'a' was dropped.
1791        let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1792        insta::assert_json_snapshot!(v1, @r###"
1793        {
1794          "typ": {
1795            "column_types": [
1796              {
1797                "scalar_type": "String",
1798                "nullable": false
1799              }
1800            ],
1801            "keys": [
1802              [
1803                0
1804              ]
1805            ]
1806          },
1807          "metadata": {
1808            "1": {
1809              "name": "z",
1810              "typ_idx": 0,
1811              "added": 0,
1812              "dropped": null
1813            }
1814          }
1815        }
1816        "###);
1817
1818        // Make sure the key index of 'z' is correct when all columns are present.
1819        let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1820        insta::assert_json_snapshot!(v0, @r###"
1821        {
1822          "typ": {
1823            "column_types": [
1824              {
1825                "scalar_type": "Bool",
1826                "nullable": true
1827              },
1828              {
1829                "scalar_type": "String",
1830                "nullable": false
1831              }
1832            ],
1833            "keys": [
1834              [
1835                1
1836              ]
1837            ]
1838          },
1839          "metadata": {
1840            "0": {
1841              "name": "a",
1842              "typ_idx": 0,
1843              "added": 0,
1844              "dropped": 1
1845            },
1846            "1": {
1847              "name": "z",
1848              "typ_idx": 1,
1849              "added": 0,
1850              "dropped": null
1851            }
1852          }
1853        }
1854        "###);
1855    }
1856
1857    #[mz_ore::test]
1858    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1859    fn roundtrip_relation_desc_without_metadata() {
1860        let typ = ProtoRelationType {
1861            column_types: vec![
1862                SqlScalarType::String.nullable(false).into_proto(),
1863                SqlScalarType::Bool.nullable(true).into_proto(),
1864            ],
1865            keys: vec![],
1866        };
1867        let proto = ProtoRelationDesc {
1868            typ: Some(typ),
1869            names: vec![
1870                ColumnName("a".into()).into_proto(),
1871                ColumnName("b".into()).into_proto(),
1872            ],
1873            metadata: vec![],
1874        };
1875        let desc: RelationDesc = proto.into_rust().unwrap();
1876
1877        insta::assert_json_snapshot!(desc, @r###"
1878        {
1879          "typ": {
1880            "column_types": [
1881              {
1882                "scalar_type": "String",
1883                "nullable": false
1884              },
1885              {
1886                "scalar_type": "Bool",
1887                "nullable": true
1888              }
1889            ],
1890            "keys": []
1891          },
1892          "metadata": {
1893            "0": {
1894              "name": "a",
1895              "typ_idx": 0,
1896              "added": 0,
1897              "dropped": null
1898            },
1899            "1": {
1900              "name": "b",
1901              "typ_idx": 1,
1902              "added": 0,
1903              "dropped": null
1904            }
1905          }
1906        }
1907        "###);
1908    }
1909
1910    #[mz_ore::test]
1911    #[should_panic(expected = "column named 'a' already exists!")]
1912    fn test_add_column_with_same_name_panics() {
1913        let desc = RelationDesc::builder()
1914            .with_column("a", SqlScalarType::Bool.nullable(true))
1915            .finish();
1916        let mut versioned = VersionedRelationDesc::new(desc);
1917
1918        let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
1919    }
1920
1921    #[mz_ore::test]
1922    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `pipe2` on OS `linux`
1923    fn test_add_column_with_same_name_prev_dropped() {
1924        let desc = RelationDesc::builder()
1925            .with_column("a", SqlScalarType::Bool.nullable(true))
1926            .finish();
1927        let mut versioned = VersionedRelationDesc::new(desc);
1928
1929        let v1 = versioned.drop_column("a");
1930        let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
1931        insta::assert_json_snapshot!(v1, @r###"
1932        {
1933          "typ": {
1934            "column_types": [],
1935            "keys": []
1936          },
1937          "metadata": {}
1938        }
1939        "###);
1940
1941        let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
1942        let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
1943        insta::assert_json_snapshot!(v2, @r###"
1944        {
1945          "typ": {
1946            "column_types": [
1947              {
1948                "scalar_type": "String",
1949                "nullable": false
1950              }
1951            ],
1952            "keys": []
1953          },
1954          "metadata": {
1955            "1": {
1956              "name": "a",
1957              "typ_idx": 0,
1958              "added": 2,
1959              "dropped": null
1960            }
1961          }
1962        }
1963        "###);
1964    }
1965
1966    #[mz_ore::test]
1967    #[cfg_attr(miri, ignore)]
1968    fn apply_demand() {
1969        let desc = RelationDesc::builder()
1970            .with_column("a", SqlScalarType::String.nullable(true))
1971            .with_column("b", SqlScalarType::Int64.nullable(false))
1972            .with_column("c", SqlScalarType::Time.nullable(false))
1973            .finish();
1974        let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
1975        assert_eq!(desc.arity(), 2);
1976        // TODO(parkmycar): Move validate onto RelationDesc.
1977        VersionedRelationDesc::new(desc).validate();
1978    }
1979
1980    #[mz_ore::test]
1981    #[cfg_attr(miri, ignore)]
1982    fn smoketest_column_index_stable_ident() {
1983        let idx_a = ColumnIndex(42);
1984        // Note(parkmycar): This should never change.
1985        assert_eq!(idx_a.to_stable_name(), "42");
1986    }
1987
1988    #[mz_ore::test]
1989    #[cfg_attr(miri, ignore)] // too slow
1990    fn proptest_relation_desc_roundtrips() {
1991        fn testcase(og: RelationDesc) {
1992            let bytes = og.into_proto().encode_to_vec();
1993            let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
1994            let rnd = RelationDesc::from_proto(proto).unwrap();
1995
1996            assert_eq!(og, rnd);
1997        }
1998
1999        proptest!(|(desc in any::<RelationDesc>())| {
2000            testcase(desc);
2001        });
2002
2003        let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2004            arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2005        });
2006
2007        proptest!(|((mut desc, diffs) in strat)| {
2008            for diff in diffs {
2009                diff.apply(&mut desc);
2010            };
2011            testcase(desc);
2012        });
2013    }
2014}