1use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26use timely::Container;
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31 ProtoRelationVersion,
32};
33use crate::{Datum, Row, ScalarType, arb_datum_for_column};
34
35#[derive(
43 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
44)]
45pub struct ColumnType {
46 pub scalar_type: ScalarType,
48 #[serde(default = "return_true")]
50 pub nullable: bool,
51}
52
53#[inline(always)]
59fn return_true() -> bool {
60 true
61}
62
63impl ColumnType {
64 pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
65 match (self.scalar_type.clone(), other.scalar_type.clone()) {
66 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
67 Ok(ColumnType {
68 scalar_type,
69 nullable: self.nullable || other.nullable,
70 })
71 }
72 (scalar_type, other_scalar_type) if scalar_type.base_eq(&other_scalar_type) => {
73 Ok(ColumnType {
74 scalar_type: scalar_type.without_modifiers(),
75 nullable: self.nullable || other.nullable,
76 })
77 }
78 (
79 ScalarType::Record { fields, custom_id },
80 ScalarType::Record {
81 fields: other_fields,
82 custom_id: other_custom_id,
83 },
84 ) => {
85 if custom_id != other_custom_id {
86 bail!(
87 "Can't union types: {:?} and {:?}",
88 self.scalar_type,
89 other.scalar_type
90 );
91 };
92
93 let mut union_fields: Vec<(ColumnName, ColumnType)> = vec![];
94 for (field, other_field) in fields.iter().zip(other_fields.iter()) {
95 if field.0 != other_field.0 {
96 bail!(
97 "Can't union types: {:?} and {:?}",
98 self.scalar_type,
99 other.scalar_type
100 );
101 } else {
102 let union_column_type = field.1.union(&other_field.1)?;
103 union_fields.push((field.0.clone(), union_column_type));
104 };
105 }
106
107 Ok(ColumnType {
108 scalar_type: ScalarType::Record {
109 fields: union_fields.into(),
110 custom_id,
111 },
112 nullable: self.nullable || other.nullable,
113 })
114 }
115 _ => bail!(
116 "Can't union types: {:?} and {:?}",
117 self.scalar_type,
118 other.scalar_type
119 ),
120 }
121 }
122
123 pub fn nullable(mut self, nullable: bool) -> Self {
126 self.nullable = nullable;
127 self
128 }
129}
130
131impl RustType<ProtoColumnType> for ColumnType {
132 fn into_proto(&self) -> ProtoColumnType {
133 ProtoColumnType {
134 nullable: self.nullable,
135 scalar_type: Some(self.scalar_type.into_proto()),
136 }
137 }
138
139 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
140 Ok(ColumnType {
141 nullable: proto.nullable,
142 scalar_type: proto
143 .scalar_type
144 .into_rust_if_some("ProtoColumnType::scalar_type")?,
145 })
146 }
147}
148
149impl fmt::Display for ColumnType {
150 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
151 let nullable = if self.nullable { "Null" } else { "NotNull" };
152 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
153 }
154}
155
156#[derive(
158 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
159)]
160pub struct RelationType {
161 pub column_types: Vec<ColumnType>,
163 #[serde(default)]
173 pub keys: Vec<Vec<usize>>,
174}
175
176impl RelationType {
177 pub fn empty() -> Self {
180 RelationType::new(vec![])
181 }
182
183 pub fn new(column_types: Vec<ColumnType>) -> Self {
187 RelationType {
188 column_types,
189 keys: Vec::new(),
190 }
191 }
192
193 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
195 indices.sort_unstable();
196 if !self.keys.contains(&indices) {
197 self.keys.push(indices);
198 }
199 self
200 }
201
202 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
203 for key in keys {
204 self = self.with_key(key)
205 }
206 self
207 }
208
209 pub fn arity(&self) -> usize {
211 self.column_types.len()
212 }
213
214 pub fn default_key(&self) -> Vec<usize> {
216 if let Some(key) = self.keys.first() {
217 if key.is_empty() {
218 (0..self.column_types.len()).collect()
219 } else {
220 key.clone()
221 }
222 } else {
223 (0..self.column_types.len()).collect()
224 }
225 }
226
227 pub fn columns(&self) -> &[ColumnType] {
229 &self.column_types
230 }
231}
232
233impl RustType<ProtoRelationType> for RelationType {
234 fn into_proto(&self) -> ProtoRelationType {
235 ProtoRelationType {
236 column_types: self.column_types.into_proto(),
237 keys: self.keys.into_proto(),
238 }
239 }
240
241 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
242 Ok(RelationType {
243 column_types: proto.column_types.into_rust()?,
244 keys: proto.keys.into_rust()?,
245 })
246 }
247}
248
249impl RustType<ProtoKey> for Vec<usize> {
250 fn into_proto(&self) -> ProtoKey {
251 ProtoKey {
252 keys: self.into_proto(),
253 }
254 }
255
256 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
257 proto.keys.into_rust()
258 }
259}
260
261#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
263pub struct ColumnName(pub(crate) String);
264
265impl ColumnName {
266 pub fn as_str(&self) -> &str {
268 &self.0
269 }
270
271 pub fn as_mut_str(&mut self) -> &mut String {
273 &mut self.0
274 }
275
276 pub fn is_similar(&self, other: &ColumnName) -> bool {
278 const SIMILARITY_THRESHOLD: f64 = 0.6;
279
280 let a_lowercase = self.0.to_lowercase();
281 let b_lowercase = other.as_str().to_lowercase();
282
283 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
284 }
285}
286
287impl fmt::Display for ColumnName {
288 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
289 f.write_str(&self.0)
290 }
291}
292
293impl From<String> for ColumnName {
294 fn from(s: String) -> ColumnName {
295 ColumnName(s)
296 }
297}
298
299impl From<&str> for ColumnName {
300 fn from(s: &str) -> ColumnName {
301 ColumnName(s.into())
302 }
303}
304
305impl From<&ColumnName> for ColumnName {
306 fn from(n: &ColumnName) -> ColumnName {
307 n.clone()
308 }
309}
310
311impl RustType<ProtoColumnName> for ColumnName {
312 fn into_proto(&self) -> ProtoColumnName {
313 ProtoColumnName {
314 value: Some(self.0.clone()),
315 }
316 }
317
318 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
319 Ok(ColumnName(proto.value.ok_or_else(|| {
320 TryFromProtoError::missing_field("ProtoColumnName::value")
321 })?))
322 }
323}
324
325impl From<ColumnName> for mz_sql_parser::ast::Ident {
326 fn from(value: ColumnName) -> Self {
327 mz_sql_parser::ast::Ident::new_unchecked(value.0)
329 }
330}
331
332impl proptest::arbitrary::Arbitrary for ColumnName {
333 type Parameters = ();
334 type Strategy = BoxedStrategy<ColumnName>;
335
336 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
337 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
340 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
341 weights.extend([
342 (5, Just(16..128)),
343 (1, Just(128..1024)),
344 (1, Just(1024..4096)),
345 ]);
346 }
347 let name_length = Union::new_weighted(weights);
348
349 let char_strat = Rc::new(Union::new_weighted(vec![
352 (50, proptest::char::range('A', 'z').boxed()),
353 (1, any::<char>().boxed()),
354 ]));
355
356 name_length
357 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
358 .prop_map(|chars| ColumnName(chars.into_iter().collect::<String>()))
359 .no_shrink()
360 .boxed()
361 }
362}
363
364#[derive(
366 Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
367)]
368pub struct ColumnIndex(usize);
369
370static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
371
372impl ColumnIndex {
373 pub fn to_stable_name(&self) -> String {
375 self.0.to_string()
376 }
377
378 pub fn to_raw(&self) -> usize {
379 self.0
380 }
381
382 pub fn from_raw(val: usize) -> Self {
383 ColumnIndex(val)
384 }
385}
386
387#[derive(
389 Clone,
390 Copy,
391 Debug,
392 Eq,
393 PartialEq,
394 PartialOrd,
395 Ord,
396 Serialize,
397 Deserialize,
398 Hash,
399 MzReflect,
400 Arbitrary,
401)]
402pub struct RelationVersion(u64);
403
404impl RelationVersion {
405 pub fn root() -> Self {
407 RelationVersion(0)
408 }
409
410 pub fn bump(&self) -> Self {
412 let next_version = self
413 .0
414 .checked_add(1)
415 .expect("added more than u64::MAX columns?");
416 RelationVersion(next_version)
417 }
418
419 pub fn into_raw(self) -> u64 {
423 self.0
424 }
425
426 pub fn from_raw(val: u64) -> RelationVersion {
430 RelationVersion(val)
431 }
432}
433
434impl From<RelationVersion> for SchemaId {
435 fn from(value: RelationVersion) -> Self {
436 SchemaId(usize::cast_from(value.0))
437 }
438}
439
440impl From<mz_sql_parser::ast::Version> for RelationVersion {
441 fn from(value: mz_sql_parser::ast::Version) -> Self {
442 RelationVersion(value.into_inner())
443 }
444}
445
446impl fmt::Display for RelationVersion {
447 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
448 write!(f, "v{}", self.0)
449 }
450}
451
452impl From<RelationVersion> for mz_sql_parser::ast::Version {
453 fn from(value: RelationVersion) -> Self {
454 mz_sql_parser::ast::Version::new(value.0)
455 }
456}
457
458impl RustType<ProtoRelationVersion> for RelationVersion {
459 fn into_proto(&self) -> ProtoRelationVersion {
460 ProtoRelationVersion { value: self.0 }
461 }
462
463 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
464 Ok(RelationVersion(proto.value))
465 }
466}
467
468#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
470struct ColumnMetadata {
471 name: ColumnName,
473 typ_idx: usize,
475 added: RelationVersion,
477 dropped: Option<RelationVersion>,
479}
480
481#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
549pub struct RelationDesc {
550 typ: RelationType,
551 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
552}
553
554impl RustType<ProtoRelationDesc> for RelationDesc {
555 fn into_proto(&self) -> ProtoRelationDesc {
556 let (names, metadata): (Vec<_>, Vec<_>) = self
557 .metadata
558 .values()
559 .map(|meta| {
560 let metadata = ProtoColumnMetadata {
561 added: Some(meta.added.into_proto()),
562 dropped: meta.dropped.map(|v| v.into_proto()),
563 };
564 (meta.name.into_proto(), metadata)
565 })
566 .unzip();
567
568 let is_all_default_metadata = metadata.iter().all(|meta| {
574 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
575 });
576 let metadata = if is_all_default_metadata {
577 Vec::new()
578 } else {
579 metadata
580 };
581
582 ProtoRelationDesc {
583 typ: Some(self.typ.into_proto()),
584 names,
585 metadata,
586 }
587 }
588
589 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
590 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
596 let val = ProtoColumnMetadata {
597 added: Some(RelationVersion::root().into_proto()),
598 dropped: None,
599 };
600 Box::new(itertools::repeat_n(val, proto.names.len()))
601 } else {
602 Box::new(proto.metadata.into_iter())
603 };
604
605 let metadata = proto
606 .names
607 .into_iter()
608 .zip_eq(proto_metadata)
609 .enumerate()
610 .map(|(idx, (name, metadata))| {
611 let meta = ColumnMetadata {
612 name: name.into_rust()?,
613 typ_idx: idx,
614 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
615 dropped: metadata.dropped.into_rust()?,
616 };
617 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
618 })
619 .collect::<Result<_, _>>()?;
620
621 Ok(RelationDesc {
622 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
623 metadata,
624 })
625 }
626}
627
628impl RelationDesc {
629 pub fn builder() -> RelationDescBuilder {
631 RelationDescBuilder::default()
632 }
633
634 pub fn empty() -> Self {
637 RelationDesc {
638 typ: RelationType::empty(),
639 metadata: BTreeMap::default(),
640 }
641 }
642
643 pub fn is_empty(&self) -> bool {
645 self == &Self::empty()
646 }
647
648 pub fn len(&self) -> usize {
650 self.typ().column_types.len()
651 }
652
653 pub fn new<I, N>(typ: RelationType, names: I) -> Self
661 where
662 I: IntoIterator<Item = N>,
663 N: Into<ColumnName>,
664 {
665 let metadata: BTreeMap<_, _> = names
666 .into_iter()
667 .enumerate()
668 .map(|(idx, name)| {
669 let col_idx = ColumnIndex(idx);
670 let metadata = ColumnMetadata {
671 name: name.into(),
672 typ_idx: idx,
673 added: RelationVersion::root(),
674 dropped: None,
675 };
676 (col_idx, metadata)
677 })
678 .collect();
679
680 assert_eq!(typ.column_types.len(), metadata.len());
682
683 RelationDesc { typ, metadata }
684 }
685
686 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
687 where
688 I: IntoIterator<Item = (N, T)>,
689 T: Into<ColumnType>,
690 N: Into<ColumnName>,
691 {
692 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
693 let types = types.into_iter().map(Into::into).collect();
694 let typ = RelationType::new(types);
695 Self::new(typ, names)
696 }
697
698 pub fn concat(mut self, other: Self) -> Self {
708 let self_len = self.typ.column_types.len();
709
710 for (typ, (_col_idx, meta)) in other
711 .typ
712 .column_types
713 .into_iter()
714 .zip_eq(other.metadata.into_iter())
715 {
716 assert_eq!(meta.added, RelationVersion::root());
717 assert_none!(meta.dropped);
718
719 let new_idx = self.typ.columns().len();
720 let new_meta = ColumnMetadata {
721 name: meta.name,
722 typ_idx: new_idx,
723 added: RelationVersion::root(),
724 dropped: None,
725 };
726
727 self.typ.column_types.push(typ);
728 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
729
730 assert_eq!(self.metadata.len(), self.typ.columns().len());
731 assert_none!(prev);
732 }
733
734 for k in other.typ.keys {
735 let k = k.into_iter().map(|idx| idx + self_len).collect();
736 self = self.with_key(k);
737 }
738 self
739 }
740
741 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
743 self.typ = self.typ.with_key(indices);
744 self
745 }
746
747 pub fn without_keys(mut self) -> Self {
749 self.typ.keys.clear();
750 self
751 }
752
753 pub fn with_names<I, N>(self, names: I) -> Self
761 where
762 I: IntoIterator<Item = N>,
763 N: Into<ColumnName>,
764 {
765 Self::new(self.typ, names)
766 }
767
768 pub fn arity(&self) -> usize {
770 self.typ.arity()
771 }
772
773 pub fn typ(&self) -> &RelationType {
775 &self.typ
776 }
777
778 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &ColumnType)> {
780 self.metadata.values().map(|meta| {
781 let typ = &self.typ.columns()[meta.typ_idx];
782 (&meta.name, typ)
783 })
784 }
785
786 pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
788 self.typ.column_types.iter()
789 }
790
791 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
793 self.metadata.values().map(|meta| &meta.name)
794 }
795
796 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &ColumnType)> {
798 self.metadata.iter().map(|(col_idx, metadata)| {
799 let col_typ = &self.typ.columns()[metadata.typ_idx];
800 (col_idx, &metadata.name, col_typ)
801 })
802 }
803
804 pub fn iter_similar_names<'a>(
807 &'a self,
808 name: &'a ColumnName,
809 ) -> impl Iterator<Item = &'a ColumnName> {
810 self.iter_names().filter(|n| n.is_similar(name))
811 }
812
813 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
815 self.metadata.contains_key(idx)
816 }
817
818 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
824 self.iter_names()
825 .position(|n| n == name)
826 .map(|i| (i, &self.typ.column_types[i]))
827 }
828
829 pub fn get_name(&self, i: usize) -> &ColumnName {
837 self.get_name_idx(&ColumnIndex(i))
839 }
840
841 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
847 &self.metadata.get(idx).expect("should exist").name
848 }
849
850 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
856 &mut self
858 .metadata
859 .get_mut(&ColumnIndex(i))
860 .expect("should exist")
861 .name
862 }
863
864 pub fn get_type(&self, idx: &ColumnIndex) -> &ColumnType {
870 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
871 &self.typ.column_types[typ_idx]
872 }
873
874 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
883 let name = self.get_name(i);
884 if self.iter_names().filter(|n| *n == name).count() == 1 {
885 Some(name)
886 } else {
887 None
888 }
889 }
890
891 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
896 let name = self.get_name(i);
897 let typ = &self.typ.column_types[i];
898 if d == &Datum::Null && !typ.nullable {
899 Err(NotNullViolation(name.clone()))
900 } else {
901 Ok(())
902 }
903 }
904
905 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
907 let mut new_desc = self.clone();
908
909 let mut removed = 0;
911 new_desc.metadata.retain(|idx, metadata| {
912 let retain = demands.contains(&idx.0);
913 if !retain {
914 removed += 1;
915 } else {
916 metadata.typ_idx -= removed;
917 }
918 retain
919 });
920
921 let mut idx = 0;
923 new_desc.typ.column_types.retain(|_| {
924 let keep = demands.contains(&idx);
925 idx += 1;
926 keep
927 });
928
929 new_desc
930 }
931}
932
933impl Arbitrary for RelationDesc {
934 type Parameters = ();
935 type Strategy = BoxedStrategy<RelationDesc>;
936
937 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
938 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
939 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
940 weights.extend([
941 (12, Just(16..32)),
942 (6, Just(32..64)),
943 (3, Just(64..128)),
944 (1, Just(128..256)),
945 ]);
946 }
947 let num_columns = Union::new_weighted(weights);
948
949 num_columns.prop_flat_map(arb_relation_desc).boxed()
950 }
951}
952
953pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
956 proptest::collection::btree_map(any::<ColumnName>(), any::<ColumnType>(), num_cols)
957 .prop_map(RelationDesc::from_names_and_types)
958}
959
960pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
962 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
963 mask.prop_map(move |mask| {
964 let demands: BTreeSet<_> = mask
965 .into_iter()
966 .enumerate()
967 .filter_map(|(idx, keep)| keep.then_some(idx))
968 .collect();
969 desc.apply_demand(&demands)
970 })
971}
972
973impl IntoIterator for RelationDesc {
974 type Item = (ColumnName, ColumnType);
975 type IntoIter = Box<dyn Iterator<Item = (ColumnName, ColumnType)>>;
976
977 fn into_iter(self) -> Self::IntoIter {
978 let iter = self
979 .metadata
980 .into_values()
981 .zip_eq(self.typ.column_types)
982 .map(|(meta, typ)| (meta.name, typ));
983 Box::new(iter)
984 }
985}
986
987pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
989 let datums: Vec<_> = desc
990 .typ()
991 .columns()
992 .iter()
993 .cloned()
994 .map(arb_datum_for_column)
995 .collect();
996 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
997}
998
999#[derive(Debug, PartialEq, Eq)]
1001pub struct NotNullViolation(pub ColumnName);
1002
1003impl fmt::Display for NotNullViolation {
1004 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1005 write!(
1006 f,
1007 "null value in column {} violates not-null constraint",
1008 self.0.as_str().quoted()
1009 )
1010 }
1011}
1012
1013#[derive(Clone, Default, Debug, PartialEq, Eq)]
1015pub struct RelationDescBuilder {
1016 columns: Vec<(ColumnName, ColumnType)>,
1018 keys: Vec<Vec<usize>>,
1020}
1021
1022impl RelationDescBuilder {
1023 pub fn with_column<N: Into<ColumnName>>(
1025 mut self,
1026 name: N,
1027 ty: ColumnType,
1028 ) -> RelationDescBuilder {
1029 let name = name.into();
1030 self.columns.push((name, ty));
1031 self
1032 }
1033
1034 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1036 where
1037 I: IntoIterator<Item = (N, T)>,
1038 T: Into<ColumnType>,
1039 N: Into<ColumnName>,
1040 {
1041 self.columns
1042 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1043 self
1044 }
1045
1046 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1048 indices.sort_unstable();
1049 if !self.keys.contains(&indices) {
1050 self.keys.push(indices);
1051 }
1052 self
1053 }
1054
1055 pub fn without_keys(mut self) -> RelationDescBuilder {
1057 self.keys.clear();
1058 assert_eq!(self.keys.len(), 0);
1059 self
1060 }
1061
1062 pub fn concat(mut self, other: Self) -> Self {
1064 let self_len = self.columns.len();
1065
1066 self.columns.extend(other.columns);
1067 for k in other.keys {
1068 let k = k.into_iter().map(|idx| idx + self_len).collect();
1069 self = self.with_key(k);
1070 }
1071
1072 self
1073 }
1074
1075 pub fn finish(self) -> RelationDesc {
1077 let mut desc = RelationDesc::from_names_and_types(self.columns);
1078 desc.typ = desc.typ.with_keys(self.keys);
1079 desc
1080 }
1081}
1082
1083#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1085pub enum RelationVersionSelector {
1086 Specific(RelationVersion),
1087 Latest,
1088}
1089
1090impl RelationVersionSelector {
1091 pub fn specific(version: u64) -> Self {
1092 RelationVersionSelector::Specific(RelationVersion(version))
1093 }
1094}
1095
1096#[derive(Debug, Clone, Serialize)]
1102pub struct VersionedRelationDesc {
1103 inner: RelationDesc,
1104}
1105
1106impl VersionedRelationDesc {
1107 pub fn new(inner: RelationDesc) -> Self {
1108 VersionedRelationDesc { inner }
1109 }
1110
1111 #[must_use]
1119 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1120 where
1121 N: Into<ColumnName>,
1122 T: Into<ColumnType>,
1123 {
1124 let latest_version = self.latest_version();
1125 let new_version = latest_version.bump();
1126
1127 let name = name.into();
1128 let existing = self
1129 .inner
1130 .metadata
1131 .iter()
1132 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1133 if let Some(existing) = existing {
1134 panic!("column named '{name}' already exists! {existing:?}");
1135 }
1136
1137 let next_idx = self.inner.metadata.len();
1138 let col_meta = ColumnMetadata {
1139 name,
1140 typ_idx: next_idx,
1141 added: new_version,
1142 dropped: None,
1143 };
1144
1145 self.inner.typ.column_types.push(typ.into());
1146 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1147
1148 assert_none!(prev, "column index overlap!");
1149 self.validate();
1150
1151 new_version
1152 }
1153
1154 #[must_use]
1163 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1164 where
1165 N: Into<ColumnName>,
1166 {
1167 let name = name.into();
1168 let latest_version = self.latest_version();
1169 let new_version = latest_version.bump();
1170
1171 let col = self
1172 .inner
1173 .metadata
1174 .values_mut()
1175 .find(|meta| meta.name == name && meta.dropped.is_none())
1176 .expect("column to exist");
1177
1178 assert_none!(col.dropped, "column was already dropped");
1180 col.dropped = Some(new_version);
1181
1182 let dropped_key = self
1184 .inner
1185 .typ
1186 .keys
1187 .iter()
1188 .any(|keys| keys.iter().any(|key| *key == col.typ_idx));
1189 assert!(!dropped_key, "column being dropped was used as a key");
1190
1191 self.validate();
1192 new_version
1193 }
1194
1195 pub fn latest(&self) -> RelationDesc {
1197 self.inner.clone()
1198 }
1199
1200 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1202 let up_to_version = match version {
1206 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1207 RelationVersionSelector::Specific(v) => v,
1208 };
1209
1210 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1211 let added = meta.added <= up_to_version;
1212 let dropped = meta
1213 .dropped
1214 .map(|dropped_at| up_to_version >= dropped_at)
1215 .unwrap_or(false);
1216
1217 added && !dropped
1218 });
1219
1220 let mut column_types = Vec::new();
1221 let mut column_metas = BTreeMap::new();
1222
1223 for (col_idx, meta) in valid_columns {
1230 let new_meta = ColumnMetadata {
1231 name: meta.name.clone(),
1232 typ_idx: column_types.len(),
1233 added: meta.added.clone(),
1234 dropped: meta.dropped.clone(),
1235 };
1236 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1237 column_metas.insert(*col_idx, new_meta);
1238 }
1239
1240 let keys = self
1246 .inner
1247 .typ
1248 .keys
1249 .iter()
1250 .map(|keys| {
1251 keys.iter()
1252 .map(|key_idx| {
1253 let metadata = column_metas
1254 .get(&ColumnIndex(*key_idx))
1255 .expect("found key for column that doesn't exist");
1256 metadata.typ_idx
1257 })
1258 .collect()
1259 })
1260 .collect();
1261
1262 let relation_type = RelationType { column_types, keys };
1263
1264 RelationDesc {
1265 typ: relation_type,
1266 metadata: column_metas,
1267 }
1268 }
1269
1270 pub fn latest_version(&self) -> RelationVersion {
1271 self.inner
1272 .metadata
1273 .values()
1274 .map(|meta| meta.dropped.unwrap_or(meta.added))
1276 .max()
1277 .unwrap_or(RelationVersion::root())
1279 }
1280
1281 fn validate(&self) {
1287 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1288 if desc.typ.column_types.len() != desc.metadata.len() {
1289 anyhow::bail!("mismatch between number of types and metadatas");
1290 }
1291
1292 for (col_idx, meta) in &desc.metadata {
1293 if col_idx.0 > desc.metadata.len() {
1294 anyhow::bail!("column index out of bounds");
1295 }
1296 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1297 anyhow::bail!("column was added after it was dropped?");
1298 }
1299 if desc.typ().columns().get(meta.typ_idx).is_none() {
1300 anyhow::bail!("typ_idx incorrect");
1301 }
1302 }
1303
1304 for keys in &desc.typ.keys {
1305 for key in keys {
1306 if *key >= desc.typ.column_types.len() {
1307 anyhow::bail!("key index was out of bounds!");
1308 }
1309 }
1310 }
1311
1312 let versions = desc
1313 .metadata
1314 .values()
1315 .map(|meta| meta.dropped.unwrap_or(meta.added));
1316 let mut max = 0;
1317 let mut sum = 0;
1318 for version in versions {
1319 max = std::cmp::max(max, version.0);
1320 sum += version.0;
1321 }
1322
1323 if sum != (max * (max + 1) / 2) {
1333 anyhow::bail!("there is a duplicate or missing relation version");
1334 }
1335
1336 Ok(())
1337 }
1338
1339 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1340 }
1341}
1342
1343#[derive(Debug)]
1346pub enum PropRelationDescDiff {
1347 AddColumn { name: ColumnName, typ: ColumnType },
1348 DropColumn { name: ColumnName },
1349 ToggleNullability { name: ColumnName },
1350 ChangeType { name: ColumnName, typ: ColumnType },
1351}
1352
1353impl PropRelationDescDiff {
1354 pub fn apply(self, desc: &mut RelationDesc) {
1355 match self {
1356 PropRelationDescDiff::AddColumn { name, typ } => {
1357 let new_idx = desc.metadata.len();
1358 let meta = ColumnMetadata {
1359 name,
1360 typ_idx: new_idx,
1361 added: RelationVersion(0),
1362 dropped: None,
1363 };
1364 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1365 desc.typ.column_types.push(typ);
1366
1367 assert_none!(prev);
1368 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1369 }
1370 PropRelationDescDiff::DropColumn { name } => {
1371 let next_version = desc
1372 .metadata
1373 .values()
1374 .map(|meta| meta.dropped.unwrap_or(meta.added))
1375 .max()
1376 .unwrap_or(RelationVersion::root())
1377 .bump();
1378 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1379 else {
1380 return;
1381 };
1382 if metadata.dropped.is_none() {
1383 metadata.dropped = Some(next_version);
1384 }
1385 }
1386 PropRelationDescDiff::ToggleNullability { name } => {
1387 let Some((pos, _)) = desc.get_by_name(&name) else {
1388 return;
1389 };
1390 let col_type = desc
1391 .typ
1392 .column_types
1393 .get_mut(pos)
1394 .expect("ColumnNames and ColumnTypes out of sync!");
1395 col_type.nullable = !col_type.nullable;
1396 }
1397 PropRelationDescDiff::ChangeType { name, typ } => {
1398 let Some((pos, _)) = desc.get_by_name(&name) else {
1399 return;
1400 };
1401 let col_type = desc
1402 .typ
1403 .column_types
1404 .get_mut(pos)
1405 .expect("ColumnNames and ColumnTypes out of sync!");
1406 *col_type = typ;
1407 }
1408 }
1409 }
1410}
1411
1412pub fn arb_relation_desc_diff(
1414 source: &RelationDesc,
1415) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1416 let source = Rc::new(source.clone());
1417 let num_source_columns = source.typ.columns().len();
1418
1419 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1420 let add_columns_strat = num_add_columns
1421 .prop_flat_map(|num_columns| {
1422 proptest::collection::vec((any::<ColumnName>(), any::<ColumnType>()), num_columns)
1423 })
1424 .prop_map(|cols| {
1425 cols.into_iter()
1426 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1427 .collect::<Vec<_>>()
1428 });
1429
1430 if num_source_columns == 0 {
1432 return add_columns_strat.boxed();
1433 }
1434
1435 let source_ = Rc::clone(&source);
1436 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1437 let mut set = BTreeSet::default();
1438 for _ in 0..num_columns {
1439 let col_idx = rng.gen_range(0..num_source_columns);
1440 set.insert(source_.get_name(col_idx).clone());
1441 }
1442 set.into_iter()
1443 .map(|name| PropRelationDescDiff::DropColumn { name })
1444 .collect::<Vec<_>>()
1445 });
1446
1447 let source_ = Rc::clone(&source);
1448 let toggle_nullability_strat =
1449 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1450 let mut set = BTreeSet::default();
1451 for _ in 0..num_columns {
1452 let col_idx = rng.gen_range(0..num_source_columns);
1453 set.insert(source_.get_name(col_idx).clone());
1454 }
1455 set.into_iter()
1456 .map(|name| PropRelationDescDiff::ToggleNullability { name })
1457 .collect::<Vec<_>>()
1458 });
1459
1460 let source_ = Rc::clone(&source);
1461 let change_type_strat = (0..num_source_columns)
1462 .prop_perturb(move |num_columns, mut rng| {
1463 let mut set = BTreeSet::default();
1464 for _ in 0..num_columns {
1465 let col_idx = rng.gen_range(0..num_source_columns);
1466 set.insert(source_.get_name(col_idx).clone());
1467 }
1468 set
1469 })
1470 .prop_flat_map(|cols| {
1471 proptest::collection::vec(any::<ColumnType>(), cols.len())
1472 .prop_map(move |types| (cols.clone(), types))
1473 })
1474 .prop_map(|(cols, types)| {
1475 cols.into_iter()
1476 .zip(types)
1477 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1478 .collect::<Vec<_>>()
1479 });
1480
1481 (
1482 add_columns_strat,
1483 drop_columns_strat,
1484 toggle_nullability_strat,
1485 change_type_strat,
1486 )
1487 .prop_map(|(adds, drops, toggles, changes)| {
1488 adds.into_iter()
1489 .chain(drops)
1490 .chain(toggles)
1491 .chain(changes)
1492 .collect::<Vec<_>>()
1493 })
1494 .prop_shuffle()
1495 .boxed()
1496}
1497
1498#[cfg(test)]
1499mod tests {
1500 use super::*;
1501 use prost::Message;
1502
1503 #[mz_ore::test]
1504 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
1506 let desc = RelationDesc::builder()
1507 .with_column("a", ScalarType::Bool.nullable(true))
1508 .with_column("z", ScalarType::String.nullable(false))
1509 .finish();
1510
1511 let mut versioned_desc = VersionedRelationDesc {
1512 inner: desc.clone(),
1513 };
1514 versioned_desc.validate();
1515
1516 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1517 assert_eq!(desc, latest);
1518
1519 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1520 assert_eq!(desc, v0);
1521
1522 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1523 assert_eq!(desc, v3);
1524
1525 let v1 = versioned_desc.add_column("b", ScalarType::Bytes.nullable(false));
1526 assert_eq!(v1, RelationVersion(1));
1527
1528 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1529 insta::assert_json_snapshot!(v1.metadata, @r###"
1530 {
1531 "0": {
1532 "name": "a",
1533 "typ_idx": 0,
1534 "added": 0,
1535 "dropped": null
1536 },
1537 "1": {
1538 "name": "z",
1539 "typ_idx": 1,
1540 "added": 0,
1541 "dropped": null
1542 },
1543 "2": {
1544 "name": "b",
1545 "typ_idx": 2,
1546 "added": 1,
1547 "dropped": null
1548 }
1549 }
1550 "###);
1551
1552 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1554 assert!(v0.iter().eq(v0_b.iter()));
1555
1556 let v2 = versioned_desc.drop_column("z");
1557 assert_eq!(v2, RelationVersion(2));
1558
1559 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1560 insta::assert_json_snapshot!(v2.metadata, @r###"
1561 {
1562 "0": {
1563 "name": "a",
1564 "typ_idx": 0,
1565 "added": 0,
1566 "dropped": null
1567 },
1568 "2": {
1569 "name": "b",
1570 "typ_idx": 1,
1571 "added": 1,
1572 "dropped": null
1573 }
1574 }
1575 "###);
1576
1577 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1579 assert!(v0.iter().eq(v0_c.iter()));
1580
1581 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1582 assert!(v1.iter().eq(v1_b.iter()));
1583
1584 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1585 {
1586 "0": {
1587 "name": "a",
1588 "typ_idx": 0,
1589 "added": 0,
1590 "dropped": null
1591 },
1592 "1": {
1593 "name": "z",
1594 "typ_idx": 1,
1595 "added": 0,
1596 "dropped": 2
1597 },
1598 "2": {
1599 "name": "b",
1600 "typ_idx": 2,
1601 "added": 1,
1602 "dropped": null
1603 }
1604 }
1605 "###);
1606 }
1607
1608 #[mz_ore::test]
1609 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
1611 let desc = RelationDesc::builder()
1612 .with_column("a", ScalarType::Bool.nullable(true))
1613 .with_column("z", ScalarType::String.nullable(false))
1614 .with_key(vec![1])
1615 .finish();
1616
1617 let mut versioned_desc = VersionedRelationDesc {
1618 inner: desc.clone(),
1619 };
1620 versioned_desc.validate();
1621
1622 let v1 = versioned_desc.drop_column("a");
1623 assert_eq!(v1, RelationVersion(1));
1624
1625 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1627 insta::assert_json_snapshot!(v1, @r###"
1628 {
1629 "typ": {
1630 "column_types": [
1631 {
1632 "scalar_type": "String",
1633 "nullable": false
1634 }
1635 ],
1636 "keys": [
1637 [
1638 0
1639 ]
1640 ]
1641 },
1642 "metadata": {
1643 "1": {
1644 "name": "z",
1645 "typ_idx": 0,
1646 "added": 0,
1647 "dropped": null
1648 }
1649 }
1650 }
1651 "###);
1652
1653 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1655 insta::assert_json_snapshot!(v0, @r###"
1656 {
1657 "typ": {
1658 "column_types": [
1659 {
1660 "scalar_type": "Bool",
1661 "nullable": true
1662 },
1663 {
1664 "scalar_type": "String",
1665 "nullable": false
1666 }
1667 ],
1668 "keys": [
1669 [
1670 1
1671 ]
1672 ]
1673 },
1674 "metadata": {
1675 "0": {
1676 "name": "a",
1677 "typ_idx": 0,
1678 "added": 0,
1679 "dropped": 1
1680 },
1681 "1": {
1682 "name": "z",
1683 "typ_idx": 1,
1684 "added": 0,
1685 "dropped": null
1686 }
1687 }
1688 }
1689 "###);
1690 }
1691
1692 #[mz_ore::test]
1693 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
1695 let typ = ProtoRelationType {
1696 column_types: vec![
1697 ScalarType::String.nullable(false).into_proto(),
1698 ScalarType::Bool.nullable(true).into_proto(),
1699 ],
1700 keys: vec![],
1701 };
1702 let proto = ProtoRelationDesc {
1703 typ: Some(typ),
1704 names: vec![
1705 ColumnName("a".to_string()).into_proto(),
1706 ColumnName("b".to_string()).into_proto(),
1707 ],
1708 metadata: vec![],
1709 };
1710 let desc: RelationDesc = proto.into_rust().unwrap();
1711
1712 insta::assert_json_snapshot!(desc, @r###"
1713 {
1714 "typ": {
1715 "column_types": [
1716 {
1717 "scalar_type": "String",
1718 "nullable": false
1719 },
1720 {
1721 "scalar_type": "Bool",
1722 "nullable": true
1723 }
1724 ],
1725 "keys": []
1726 },
1727 "metadata": {
1728 "0": {
1729 "name": "a",
1730 "typ_idx": 0,
1731 "added": 0,
1732 "dropped": null
1733 },
1734 "1": {
1735 "name": "b",
1736 "typ_idx": 1,
1737 "added": 0,
1738 "dropped": null
1739 }
1740 }
1741 }
1742 "###);
1743 }
1744
1745 #[mz_ore::test]
1746 #[should_panic(expected = "column named 'a' already exists!")]
1747 fn test_add_column_with_same_name_panics() {
1748 let desc = RelationDesc::builder()
1749 .with_column("a", ScalarType::Bool.nullable(true))
1750 .finish();
1751 let mut versioned = VersionedRelationDesc::new(desc);
1752
1753 let _ = versioned.add_column("a", ScalarType::String.nullable(false));
1754 }
1755
1756 #[mz_ore::test]
1757 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
1759 let desc = RelationDesc::builder()
1760 .with_column("a", ScalarType::Bool.nullable(true))
1761 .finish();
1762 let mut versioned = VersionedRelationDesc::new(desc);
1763
1764 let v1 = versioned.drop_column("a");
1765 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
1766 insta::assert_json_snapshot!(v1, @r###"
1767 {
1768 "typ": {
1769 "column_types": [],
1770 "keys": []
1771 },
1772 "metadata": {}
1773 }
1774 "###);
1775
1776 let v2 = versioned.add_column("a", ScalarType::String.nullable(false));
1777 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
1778 insta::assert_json_snapshot!(v2, @r###"
1779 {
1780 "typ": {
1781 "column_types": [
1782 {
1783 "scalar_type": "String",
1784 "nullable": false
1785 }
1786 ],
1787 "keys": []
1788 },
1789 "metadata": {
1790 "1": {
1791 "name": "a",
1792 "typ_idx": 0,
1793 "added": 2,
1794 "dropped": null
1795 }
1796 }
1797 }
1798 "###);
1799 }
1800
1801 #[mz_ore::test]
1802 #[cfg_attr(miri, ignore)]
1803 fn apply_demand() {
1804 let desc = RelationDesc::builder()
1805 .with_column("a", ScalarType::String.nullable(true))
1806 .with_column("b", ScalarType::Int64.nullable(false))
1807 .with_column("c", ScalarType::Time.nullable(false))
1808 .finish();
1809 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
1810 assert_eq!(desc.arity(), 2);
1811 VersionedRelationDesc::new(desc).validate();
1813 }
1814
1815 #[mz_ore::test]
1816 #[cfg_attr(miri, ignore)]
1817 fn smoketest_column_index_stable_ident() {
1818 let idx_a = ColumnIndex(42);
1819 assert_eq!(idx_a.to_stable_name(), "42");
1821 }
1822
1823 #[mz_ore::test]
1824 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
1826 fn testcase(og: RelationDesc) {
1827 let bytes = og.into_proto().encode_to_vec();
1828 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
1829 let rnd = RelationDesc::from_proto(proto).unwrap();
1830
1831 assert_eq!(og, rnd);
1832 }
1833
1834 proptest!(|(desc in any::<RelationDesc>())| {
1835 testcase(desc);
1836 });
1837
1838 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
1839 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
1840 });
1841
1842 proptest!(|((mut desc, diffs) in strat)| {
1843 for diff in diffs {
1844 diff.apply(&mut desc);
1845 };
1846 testcase(desc);
1847 });
1848 }
1849}