1use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26use timely::Container;
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31 ProtoRelationVersion,
32};
33use crate::{Datum, Row, ScalarType, arb_datum_for_column};
34
35#[derive(
43 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
44)]
45pub struct ColumnType {
46 pub scalar_type: ScalarType,
48 #[serde(default = "return_true")]
50 pub nullable: bool,
51}
52
53#[inline(always)]
59fn return_true() -> bool {
60 true
61}
62
63impl ColumnType {
64 pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
65 match (&self.scalar_type, &other.scalar_type) {
66 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
67 Ok(ColumnType {
68 scalar_type: scalar_type.clone(),
69 nullable: self.nullable || other.nullable,
70 })
71 }
72 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
73 Ok(ColumnType {
74 scalar_type: scalar_type.without_modifiers(),
75 nullable: self.nullable || other.nullable,
76 })
77 }
78 (
79 ScalarType::Record { fields, custom_id },
80 ScalarType::Record {
81 fields: other_fields,
82 custom_id: other_custom_id,
83 },
84 ) => {
85 if custom_id != other_custom_id {
86 bail!(
87 "Can't union types: {:?} and {:?}",
88 self.scalar_type,
89 other.scalar_type
90 );
91 };
92
93 let mut union_fields = Vec::with_capacity(fields.len());
94 for ((name, typ), (other_name, other_typ)) in fields.iter().zip(other_fields.iter())
95 {
96 if name != other_name {
97 bail!(
98 "Can't union types: {:?} and {:?}",
99 self.scalar_type,
100 other.scalar_type
101 );
102 } else {
103 let union_column_type = typ.union(other_typ)?;
104 union_fields.push((name.clone(), union_column_type));
105 };
106 }
107
108 Ok(ColumnType {
109 scalar_type: ScalarType::Record {
110 fields: union_fields.into(),
111 custom_id: *custom_id,
112 },
113 nullable: self.nullable || other.nullable,
114 })
115 }
116 _ => bail!(
117 "Can't union types: {:?} and {:?}",
118 self.scalar_type,
119 other.scalar_type
120 ),
121 }
122 }
123
124 pub fn nullable(mut self, nullable: bool) -> Self {
127 self.nullable = nullable;
128 self
129 }
130}
131
132impl RustType<ProtoColumnType> for ColumnType {
133 fn into_proto(&self) -> ProtoColumnType {
134 ProtoColumnType {
135 nullable: self.nullable,
136 scalar_type: Some(self.scalar_type.into_proto()),
137 }
138 }
139
140 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
141 Ok(ColumnType {
142 nullable: proto.nullable,
143 scalar_type: proto
144 .scalar_type
145 .into_rust_if_some("ProtoColumnType::scalar_type")?,
146 })
147 }
148}
149
150impl fmt::Display for ColumnType {
151 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
152 let nullable = if self.nullable { "Null" } else { "NotNull" };
153 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
154 }
155}
156
157#[derive(
159 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
160)]
161pub struct RelationType {
162 pub column_types: Vec<ColumnType>,
164 #[serde(default)]
174 pub keys: Vec<Vec<usize>>,
175}
176
177impl RelationType {
178 pub fn empty() -> Self {
181 RelationType::new(vec![])
182 }
183
184 pub fn new(column_types: Vec<ColumnType>) -> Self {
188 RelationType {
189 column_types,
190 keys: Vec::new(),
191 }
192 }
193
194 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
196 indices.sort_unstable();
197 if !self.keys.contains(&indices) {
198 self.keys.push(indices);
199 }
200 self
201 }
202
203 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
204 for key in keys {
205 self = self.with_key(key)
206 }
207 self
208 }
209
210 pub fn arity(&self) -> usize {
212 self.column_types.len()
213 }
214
215 pub fn default_key(&self) -> Vec<usize> {
217 if let Some(key) = self.keys.first() {
218 if key.is_empty() {
219 (0..self.column_types.len()).collect()
220 } else {
221 key.clone()
222 }
223 } else {
224 (0..self.column_types.len()).collect()
225 }
226 }
227
228 pub fn columns(&self) -> &[ColumnType] {
230 &self.column_types
231 }
232}
233
234impl RustType<ProtoRelationType> for RelationType {
235 fn into_proto(&self) -> ProtoRelationType {
236 ProtoRelationType {
237 column_types: self.column_types.into_proto(),
238 keys: self.keys.into_proto(),
239 }
240 }
241
242 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
243 Ok(RelationType {
244 column_types: proto.column_types.into_rust()?,
245 keys: proto.keys.into_rust()?,
246 })
247 }
248}
249
250impl RustType<ProtoKey> for Vec<usize> {
251 fn into_proto(&self) -> ProtoKey {
252 ProtoKey {
253 keys: self.into_proto(),
254 }
255 }
256
257 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
258 proto.keys.into_rust()
259 }
260}
261
262#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
264pub struct ColumnName(Box<str>);
265
266impl ColumnName {
267 #[inline(always)]
269 pub fn as_str(&self) -> &str {
270 &*self
271 }
272
273 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
275 &mut self.0
276 }
277
278 pub fn is_similar(&self, other: &ColumnName) -> bool {
280 const SIMILARITY_THRESHOLD: f64 = 0.6;
281
282 let a_lowercase = self.to_lowercase();
283 let b_lowercase = other.to_lowercase();
284
285 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
286 }
287}
288
289impl std::ops::Deref for ColumnName {
290 type Target = str;
291
292 #[inline(always)]
293 fn deref(&self) -> &Self::Target {
294 &self.0
295 }
296}
297
298impl fmt::Display for ColumnName {
299 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
300 f.write_str(&self.0)
301 }
302}
303
304impl From<String> for ColumnName {
305 fn from(s: String) -> ColumnName {
306 ColumnName(s.into())
307 }
308}
309
310impl From<&str> for ColumnName {
311 fn from(s: &str) -> ColumnName {
312 ColumnName(s.into())
313 }
314}
315
316impl From<&ColumnName> for ColumnName {
317 fn from(n: &ColumnName) -> ColumnName {
318 n.clone()
319 }
320}
321
322impl RustType<ProtoColumnName> for ColumnName {
323 fn into_proto(&self) -> ProtoColumnName {
324 ProtoColumnName {
325 value: Some(self.0.to_string()),
326 }
327 }
328
329 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
330 Ok(ColumnName(
331 proto
332 .value
333 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
334 .into(),
335 ))
336 }
337}
338
339impl From<ColumnName> for mz_sql_parser::ast::Ident {
340 fn from(value: ColumnName) -> Self {
341 mz_sql_parser::ast::Ident::new_unchecked(value.0)
343 }
344}
345
346impl proptest::arbitrary::Arbitrary for ColumnName {
347 type Parameters = ();
348 type Strategy = BoxedStrategy<ColumnName>;
349
350 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
351 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
354 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
355 weights.extend([
356 (5, Just(16..128)),
357 (1, Just(128..1024)),
358 (1, Just(1024..4096)),
359 ]);
360 }
361 let name_length = Union::new_weighted(weights);
362
363 let char_strat = Rc::new(Union::new_weighted(vec![
366 (50, proptest::char::range('A', 'z').boxed()),
367 (1, any::<char>().boxed()),
368 ]));
369
370 name_length
371 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
372 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
373 .no_shrink()
374 .boxed()
375 }
376}
377
378#[derive(
380 Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
381)]
382pub struct ColumnIndex(usize);
383
384static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
385
386impl ColumnIndex {
387 pub fn to_stable_name(&self) -> String {
389 self.0.to_string()
390 }
391
392 pub fn to_raw(&self) -> usize {
393 self.0
394 }
395
396 pub fn from_raw(val: usize) -> Self {
397 ColumnIndex(val)
398 }
399}
400
401#[derive(
403 Clone,
404 Copy,
405 Debug,
406 Eq,
407 PartialEq,
408 PartialOrd,
409 Ord,
410 Serialize,
411 Deserialize,
412 Hash,
413 MzReflect,
414 Arbitrary,
415)]
416pub struct RelationVersion(u64);
417
418impl RelationVersion {
419 pub fn root() -> Self {
421 RelationVersion(0)
422 }
423
424 pub fn bump(&self) -> Self {
426 let next_version = self
427 .0
428 .checked_add(1)
429 .expect("added more than u64::MAX columns?");
430 RelationVersion(next_version)
431 }
432
433 pub fn into_raw(self) -> u64 {
437 self.0
438 }
439
440 pub fn from_raw(val: u64) -> RelationVersion {
444 RelationVersion(val)
445 }
446}
447
448impl From<RelationVersion> for SchemaId {
449 fn from(value: RelationVersion) -> Self {
450 SchemaId(usize::cast_from(value.0))
451 }
452}
453
454impl From<mz_sql_parser::ast::Version> for RelationVersion {
455 fn from(value: mz_sql_parser::ast::Version) -> Self {
456 RelationVersion(value.into_inner())
457 }
458}
459
460impl fmt::Display for RelationVersion {
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 write!(f, "v{}", self.0)
463 }
464}
465
466impl From<RelationVersion> for mz_sql_parser::ast::Version {
467 fn from(value: RelationVersion) -> Self {
468 mz_sql_parser::ast::Version::new(value.0)
469 }
470}
471
472impl RustType<ProtoRelationVersion> for RelationVersion {
473 fn into_proto(&self) -> ProtoRelationVersion {
474 ProtoRelationVersion { value: self.0 }
475 }
476
477 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
478 Ok(RelationVersion(proto.value))
479 }
480}
481
482#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
484struct ColumnMetadata {
485 name: ColumnName,
487 typ_idx: usize,
489 added: RelationVersion,
491 dropped: Option<RelationVersion>,
493}
494
495#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
563pub struct RelationDesc {
564 typ: RelationType,
565 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
566}
567
568impl RustType<ProtoRelationDesc> for RelationDesc {
569 fn into_proto(&self) -> ProtoRelationDesc {
570 let (names, metadata): (Vec<_>, Vec<_>) = self
571 .metadata
572 .values()
573 .map(|meta| {
574 let metadata = ProtoColumnMetadata {
575 added: Some(meta.added.into_proto()),
576 dropped: meta.dropped.map(|v| v.into_proto()),
577 };
578 (meta.name.into_proto(), metadata)
579 })
580 .unzip();
581
582 let is_all_default_metadata = metadata.iter().all(|meta| {
588 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
589 });
590 let metadata = if is_all_default_metadata {
591 Vec::new()
592 } else {
593 metadata
594 };
595
596 ProtoRelationDesc {
597 typ: Some(self.typ.into_proto()),
598 names,
599 metadata,
600 }
601 }
602
603 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
604 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
610 let val = ProtoColumnMetadata {
611 added: Some(RelationVersion::root().into_proto()),
612 dropped: None,
613 };
614 Box::new(itertools::repeat_n(val, proto.names.len()))
615 } else {
616 Box::new(proto.metadata.into_iter())
617 };
618
619 let metadata = proto
620 .names
621 .into_iter()
622 .zip_eq(proto_metadata)
623 .enumerate()
624 .map(|(idx, (name, metadata))| {
625 let meta = ColumnMetadata {
626 name: name.into_rust()?,
627 typ_idx: idx,
628 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
629 dropped: metadata.dropped.into_rust()?,
630 };
631 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
632 })
633 .collect::<Result<_, _>>()?;
634
635 Ok(RelationDesc {
636 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
637 metadata,
638 })
639 }
640}
641
642impl RelationDesc {
643 pub fn builder() -> RelationDescBuilder {
645 RelationDescBuilder::default()
646 }
647
648 pub fn empty() -> Self {
651 RelationDesc {
652 typ: RelationType::empty(),
653 metadata: BTreeMap::default(),
654 }
655 }
656
657 pub fn is_empty(&self) -> bool {
659 self == &Self::empty()
660 }
661
662 pub fn len(&self) -> usize {
664 self.typ().column_types.len()
665 }
666
667 pub fn new<I, N>(typ: RelationType, names: I) -> Self
675 where
676 I: IntoIterator<Item = N>,
677 N: Into<ColumnName>,
678 {
679 let metadata: BTreeMap<_, _> = names
680 .into_iter()
681 .enumerate()
682 .map(|(idx, name)| {
683 let col_idx = ColumnIndex(idx);
684 let metadata = ColumnMetadata {
685 name: name.into(),
686 typ_idx: idx,
687 added: RelationVersion::root(),
688 dropped: None,
689 };
690 (col_idx, metadata)
691 })
692 .collect();
693
694 assert_eq!(typ.column_types.len(), metadata.len());
696
697 RelationDesc { typ, metadata }
698 }
699
700 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
701 where
702 I: IntoIterator<Item = (N, T)>,
703 T: Into<ColumnType>,
704 N: Into<ColumnName>,
705 {
706 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
707 let types = types.into_iter().map(Into::into).collect();
708 let typ = RelationType::new(types);
709 Self::new(typ, names)
710 }
711
712 pub fn concat(mut self, other: Self) -> Self {
722 let self_len = self.typ.column_types.len();
723
724 for (typ, (_col_idx, meta)) in other
725 .typ
726 .column_types
727 .into_iter()
728 .zip_eq(other.metadata.into_iter())
729 {
730 assert_eq!(meta.added, RelationVersion::root());
731 assert_none!(meta.dropped);
732
733 let new_idx = self.typ.columns().len();
734 let new_meta = ColumnMetadata {
735 name: meta.name,
736 typ_idx: new_idx,
737 added: RelationVersion::root(),
738 dropped: None,
739 };
740
741 self.typ.column_types.push(typ);
742 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
743
744 assert_eq!(self.metadata.len(), self.typ.columns().len());
745 assert_none!(prev);
746 }
747
748 for k in other.typ.keys {
749 let k = k.into_iter().map(|idx| idx + self_len).collect();
750 self = self.with_key(k);
751 }
752 self
753 }
754
755 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
757 self.typ = self.typ.with_key(indices);
758 self
759 }
760
761 pub fn without_keys(mut self) -> Self {
763 self.typ.keys.clear();
764 self
765 }
766
767 pub fn with_names<I, N>(self, names: I) -> Self
775 where
776 I: IntoIterator<Item = N>,
777 N: Into<ColumnName>,
778 {
779 Self::new(self.typ, names)
780 }
781
782 pub fn arity(&self) -> usize {
784 self.typ.arity()
785 }
786
787 pub fn typ(&self) -> &RelationType {
789 &self.typ
790 }
791
792 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &ColumnType)> {
794 self.metadata.values().map(|meta| {
795 let typ = &self.typ.columns()[meta.typ_idx];
796 (&meta.name, typ)
797 })
798 }
799
800 pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
802 self.typ.column_types.iter()
803 }
804
805 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
807 self.metadata.values().map(|meta| &meta.name)
808 }
809
810 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &ColumnType)> {
812 self.metadata.iter().map(|(col_idx, metadata)| {
813 let col_typ = &self.typ.columns()[metadata.typ_idx];
814 (col_idx, &metadata.name, col_typ)
815 })
816 }
817
818 pub fn iter_similar_names<'a>(
821 &'a self,
822 name: &'a ColumnName,
823 ) -> impl Iterator<Item = &'a ColumnName> {
824 self.iter_names().filter(|n| n.is_similar(name))
825 }
826
827 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
829 self.metadata.contains_key(idx)
830 }
831
832 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
838 self.iter_names()
839 .position(|n| n == name)
840 .map(|i| (i, &self.typ.column_types[i]))
841 }
842
843 pub fn get_name(&self, i: usize) -> &ColumnName {
851 self.get_name_idx(&ColumnIndex(i))
853 }
854
855 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
861 &self.metadata.get(idx).expect("should exist").name
862 }
863
864 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
870 &mut self
872 .metadata
873 .get_mut(&ColumnIndex(i))
874 .expect("should exist")
875 .name
876 }
877
878 pub fn get_type(&self, idx: &ColumnIndex) -> &ColumnType {
884 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
885 &self.typ.column_types[typ_idx]
886 }
887
888 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
897 let name = self.get_name(i);
898 if self.iter_names().filter(|n| *n == name).count() == 1 {
899 Some(name)
900 } else {
901 None
902 }
903 }
904
905 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
910 let name = self.get_name(i);
911 let typ = &self.typ.column_types[i];
912 if d == &Datum::Null && !typ.nullable {
913 Err(NotNullViolation(name.clone()))
914 } else {
915 Ok(())
916 }
917 }
918
919 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
921 let mut new_desc = self.clone();
922
923 let mut removed = 0;
925 new_desc.metadata.retain(|idx, metadata| {
926 let retain = demands.contains(&idx.0);
927 if !retain {
928 removed += 1;
929 } else {
930 metadata.typ_idx -= removed;
931 }
932 retain
933 });
934
935 let mut idx = 0;
937 new_desc.typ.column_types.retain(|_| {
938 let keep = demands.contains(&idx);
939 idx += 1;
940 keep
941 });
942
943 new_desc
944 }
945}
946
947impl Arbitrary for RelationDesc {
948 type Parameters = ();
949 type Strategy = BoxedStrategy<RelationDesc>;
950
951 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
952 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
953 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
954 weights.extend([
955 (12, Just(16..32)),
956 (6, Just(32..64)),
957 (3, Just(64..128)),
958 (1, Just(128..256)),
959 ]);
960 }
961 let num_columns = Union::new_weighted(weights);
962
963 num_columns.prop_flat_map(arb_relation_desc).boxed()
964 }
965}
966
967pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
970 proptest::collection::btree_map(any::<ColumnName>(), any::<ColumnType>(), num_cols)
971 .prop_map(RelationDesc::from_names_and_types)
972}
973
974pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
976 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
977 mask.prop_map(move |mask| {
978 let demands: BTreeSet<_> = mask
979 .into_iter()
980 .enumerate()
981 .filter_map(|(idx, keep)| keep.then_some(idx))
982 .collect();
983 desc.apply_demand(&demands)
984 })
985}
986
987impl IntoIterator for RelationDesc {
988 type Item = (ColumnName, ColumnType);
989 type IntoIter = Box<dyn Iterator<Item = (ColumnName, ColumnType)>>;
990
991 fn into_iter(self) -> Self::IntoIter {
992 let iter = self
993 .metadata
994 .into_values()
995 .zip_eq(self.typ.column_types)
996 .map(|(meta, typ)| (meta.name, typ));
997 Box::new(iter)
998 }
999}
1000
1001pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1003 let datums: Vec<_> = desc
1004 .typ()
1005 .columns()
1006 .iter()
1007 .cloned()
1008 .map(arb_datum_for_column)
1009 .collect();
1010 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1011}
1012
1013#[derive(Debug, PartialEq, Eq)]
1015pub struct NotNullViolation(pub ColumnName);
1016
1017impl fmt::Display for NotNullViolation {
1018 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1019 write!(
1020 f,
1021 "null value in column {} violates not-null constraint",
1022 self.0.quoted()
1023 )
1024 }
1025}
1026
1027#[derive(Clone, Default, Debug, PartialEq, Eq)]
1029pub struct RelationDescBuilder {
1030 columns: Vec<(ColumnName, ColumnType)>,
1032 keys: Vec<Vec<usize>>,
1034}
1035
1036impl RelationDescBuilder {
1037 pub fn with_column<N: Into<ColumnName>>(
1039 mut self,
1040 name: N,
1041 ty: ColumnType,
1042 ) -> RelationDescBuilder {
1043 let name = name.into();
1044 self.columns.push((name, ty));
1045 self
1046 }
1047
1048 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1050 where
1051 I: IntoIterator<Item = (N, T)>,
1052 T: Into<ColumnType>,
1053 N: Into<ColumnName>,
1054 {
1055 self.columns
1056 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1057 self
1058 }
1059
1060 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1062 indices.sort_unstable();
1063 if !self.keys.contains(&indices) {
1064 self.keys.push(indices);
1065 }
1066 self
1067 }
1068
1069 pub fn without_keys(mut self) -> RelationDescBuilder {
1071 self.keys.clear();
1072 assert_eq!(self.keys.len(), 0);
1073 self
1074 }
1075
1076 pub fn concat(mut self, other: Self) -> Self {
1078 let self_len = self.columns.len();
1079
1080 self.columns.extend(other.columns);
1081 for k in other.keys {
1082 let k = k.into_iter().map(|idx| idx + self_len).collect();
1083 self = self.with_key(k);
1084 }
1085
1086 self
1087 }
1088
1089 pub fn finish(self) -> RelationDesc {
1091 let mut desc = RelationDesc::from_names_and_types(self.columns);
1092 desc.typ = desc.typ.with_keys(self.keys);
1093 desc
1094 }
1095}
1096
1097#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1099pub enum RelationVersionSelector {
1100 Specific(RelationVersion),
1101 Latest,
1102}
1103
1104impl RelationVersionSelector {
1105 pub fn specific(version: u64) -> Self {
1106 RelationVersionSelector::Specific(RelationVersion(version))
1107 }
1108}
1109
1110#[derive(Debug, Clone, Serialize)]
1116pub struct VersionedRelationDesc {
1117 inner: RelationDesc,
1118}
1119
1120impl VersionedRelationDesc {
1121 pub fn new(inner: RelationDesc) -> Self {
1122 VersionedRelationDesc { inner }
1123 }
1124
1125 #[must_use]
1133 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1134 where
1135 N: Into<ColumnName>,
1136 T: Into<ColumnType>,
1137 {
1138 let latest_version = self.latest_version();
1139 let new_version = latest_version.bump();
1140
1141 let name = name.into();
1142 let existing = self
1143 .inner
1144 .metadata
1145 .iter()
1146 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1147 if let Some(existing) = existing {
1148 panic!("column named '{name}' already exists! {existing:?}");
1149 }
1150
1151 let next_idx = self.inner.metadata.len();
1152 let col_meta = ColumnMetadata {
1153 name,
1154 typ_idx: next_idx,
1155 added: new_version,
1156 dropped: None,
1157 };
1158
1159 self.inner.typ.column_types.push(typ.into());
1160 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1161
1162 assert_none!(prev, "column index overlap!");
1163 self.validate();
1164
1165 new_version
1166 }
1167
1168 #[must_use]
1177 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1178 where
1179 N: Into<ColumnName>,
1180 {
1181 let name = name.into();
1182 let latest_version = self.latest_version();
1183 let new_version = latest_version.bump();
1184
1185 let col = self
1186 .inner
1187 .metadata
1188 .values_mut()
1189 .find(|meta| meta.name == name && meta.dropped.is_none())
1190 .expect("column to exist");
1191
1192 assert_none!(col.dropped, "column was already dropped");
1194 col.dropped = Some(new_version);
1195
1196 let dropped_key = self
1198 .inner
1199 .typ
1200 .keys
1201 .iter()
1202 .any(|keys| keys.iter().any(|key| *key == col.typ_idx));
1203 assert!(!dropped_key, "column being dropped was used as a key");
1204
1205 self.validate();
1206 new_version
1207 }
1208
1209 pub fn latest(&self) -> RelationDesc {
1211 self.inner.clone()
1212 }
1213
1214 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1216 let up_to_version = match version {
1220 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1221 RelationVersionSelector::Specific(v) => v,
1222 };
1223
1224 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1225 let added = meta.added <= up_to_version;
1226 let dropped = meta
1227 .dropped
1228 .map(|dropped_at| up_to_version >= dropped_at)
1229 .unwrap_or(false);
1230
1231 added && !dropped
1232 });
1233
1234 let mut column_types = Vec::new();
1235 let mut column_metas = BTreeMap::new();
1236
1237 for (col_idx, meta) in valid_columns {
1244 let new_meta = ColumnMetadata {
1245 name: meta.name.clone(),
1246 typ_idx: column_types.len(),
1247 added: meta.added.clone(),
1248 dropped: meta.dropped.clone(),
1249 };
1250 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1251 column_metas.insert(*col_idx, new_meta);
1252 }
1253
1254 let keys = self
1260 .inner
1261 .typ
1262 .keys
1263 .iter()
1264 .map(|keys| {
1265 keys.iter()
1266 .map(|key_idx| {
1267 let metadata = column_metas
1268 .get(&ColumnIndex(*key_idx))
1269 .expect("found key for column that doesn't exist");
1270 metadata.typ_idx
1271 })
1272 .collect()
1273 })
1274 .collect();
1275
1276 let relation_type = RelationType { column_types, keys };
1277
1278 RelationDesc {
1279 typ: relation_type,
1280 metadata: column_metas,
1281 }
1282 }
1283
1284 pub fn latest_version(&self) -> RelationVersion {
1285 self.inner
1286 .metadata
1287 .values()
1288 .map(|meta| meta.dropped.unwrap_or(meta.added))
1290 .max()
1291 .unwrap_or_else(RelationVersion::root)
1293 }
1294
1295 fn validate(&self) {
1301 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1302 if desc.typ.column_types.len() != desc.metadata.len() {
1303 anyhow::bail!("mismatch between number of types and metadatas");
1304 }
1305
1306 for (col_idx, meta) in &desc.metadata {
1307 if col_idx.0 > desc.metadata.len() {
1308 anyhow::bail!("column index out of bounds");
1309 }
1310 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1311 anyhow::bail!("column was added after it was dropped?");
1312 }
1313 if desc.typ().columns().get(meta.typ_idx).is_none() {
1314 anyhow::bail!("typ_idx incorrect");
1315 }
1316 }
1317
1318 for keys in &desc.typ.keys {
1319 for key in keys {
1320 if *key >= desc.typ.column_types.len() {
1321 anyhow::bail!("key index was out of bounds!");
1322 }
1323 }
1324 }
1325
1326 let versions = desc
1327 .metadata
1328 .values()
1329 .map(|meta| meta.dropped.unwrap_or(meta.added));
1330 let mut max = 0;
1331 let mut sum = 0;
1332 for version in versions {
1333 max = std::cmp::max(max, version.0);
1334 sum += version.0;
1335 }
1336
1337 if sum != (max * (max + 1) / 2) {
1347 anyhow::bail!("there is a duplicate or missing relation version");
1348 }
1349
1350 Ok(())
1351 }
1352
1353 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1354 }
1355}
1356
1357#[derive(Debug)]
1360pub enum PropRelationDescDiff {
1361 AddColumn { name: ColumnName, typ: ColumnType },
1362 DropColumn { name: ColumnName },
1363 ToggleNullability { name: ColumnName },
1364 ChangeType { name: ColumnName, typ: ColumnType },
1365}
1366
1367impl PropRelationDescDiff {
1368 pub fn apply(self, desc: &mut RelationDesc) {
1369 match self {
1370 PropRelationDescDiff::AddColumn { name, typ } => {
1371 let new_idx = desc.metadata.len();
1372 let meta = ColumnMetadata {
1373 name,
1374 typ_idx: new_idx,
1375 added: RelationVersion(0),
1376 dropped: None,
1377 };
1378 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1379 desc.typ.column_types.push(typ);
1380
1381 assert_none!(prev);
1382 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1383 }
1384 PropRelationDescDiff::DropColumn { name } => {
1385 let next_version = desc
1386 .metadata
1387 .values()
1388 .map(|meta| meta.dropped.unwrap_or(meta.added))
1389 .max()
1390 .unwrap_or_else(RelationVersion::root)
1391 .bump();
1392 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1393 else {
1394 return;
1395 };
1396 if metadata.dropped.is_none() {
1397 metadata.dropped = Some(next_version);
1398 }
1399 }
1400 PropRelationDescDiff::ToggleNullability { name } => {
1401 let Some((pos, _)) = desc.get_by_name(&name) else {
1402 return;
1403 };
1404 let col_type = desc
1405 .typ
1406 .column_types
1407 .get_mut(pos)
1408 .expect("ColumnNames and ColumnTypes out of sync!");
1409 col_type.nullable = !col_type.nullable;
1410 }
1411 PropRelationDescDiff::ChangeType { name, typ } => {
1412 let Some((pos, _)) = desc.get_by_name(&name) else {
1413 return;
1414 };
1415 let col_type = desc
1416 .typ
1417 .column_types
1418 .get_mut(pos)
1419 .expect("ColumnNames and ColumnTypes out of sync!");
1420 *col_type = typ;
1421 }
1422 }
1423 }
1424}
1425
1426pub fn arb_relation_desc_diff(
1428 source: &RelationDesc,
1429) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1430 let source = Rc::new(source.clone());
1431 let num_source_columns = source.typ.columns().len();
1432
1433 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1434 let add_columns_strat = num_add_columns
1435 .prop_flat_map(|num_columns| {
1436 proptest::collection::vec((any::<ColumnName>(), any::<ColumnType>()), num_columns)
1437 })
1438 .prop_map(|cols| {
1439 cols.into_iter()
1440 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1441 .collect::<Vec<_>>()
1442 });
1443
1444 if num_source_columns == 0 {
1446 return add_columns_strat.boxed();
1447 }
1448
1449 let source_ = Rc::clone(&source);
1450 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1451 let mut set = BTreeSet::default();
1452 for _ in 0..num_columns {
1453 let col_idx = rng.random_range(0..num_source_columns);
1454 set.insert(source_.get_name(col_idx).clone());
1455 }
1456 set.into_iter()
1457 .map(|name| PropRelationDescDiff::DropColumn { name })
1458 .collect::<Vec<_>>()
1459 });
1460
1461 let source_ = Rc::clone(&source);
1462 let toggle_nullability_strat =
1463 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1464 let mut set = BTreeSet::default();
1465 for _ in 0..num_columns {
1466 let col_idx = rng.random_range(0..num_source_columns);
1467 set.insert(source_.get_name(col_idx).clone());
1468 }
1469 set.into_iter()
1470 .map(|name| PropRelationDescDiff::ToggleNullability { name })
1471 .collect::<Vec<_>>()
1472 });
1473
1474 let source_ = Rc::clone(&source);
1475 let change_type_strat = (0..num_source_columns)
1476 .prop_perturb(move |num_columns, mut rng| {
1477 let mut set = BTreeSet::default();
1478 for _ in 0..num_columns {
1479 let col_idx = rng.random_range(0..num_source_columns);
1480 set.insert(source_.get_name(col_idx).clone());
1481 }
1482 set
1483 })
1484 .prop_flat_map(|cols| {
1485 proptest::collection::vec(any::<ColumnType>(), cols.len())
1486 .prop_map(move |types| (cols.clone(), types))
1487 })
1488 .prop_map(|(cols, types)| {
1489 cols.into_iter()
1490 .zip(types)
1491 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1492 .collect::<Vec<_>>()
1493 });
1494
1495 (
1496 add_columns_strat,
1497 drop_columns_strat,
1498 toggle_nullability_strat,
1499 change_type_strat,
1500 )
1501 .prop_map(|(adds, drops, toggles, changes)| {
1502 adds.into_iter()
1503 .chain(drops)
1504 .chain(toggles)
1505 .chain(changes)
1506 .collect::<Vec<_>>()
1507 })
1508 .prop_shuffle()
1509 .boxed()
1510}
1511
1512#[cfg(test)]
1513mod tests {
1514 use super::*;
1515 use prost::Message;
1516
1517 #[mz_ore::test]
1518 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
1520 let desc = RelationDesc::builder()
1521 .with_column("a", ScalarType::Bool.nullable(true))
1522 .with_column("z", ScalarType::String.nullable(false))
1523 .finish();
1524
1525 let mut versioned_desc = VersionedRelationDesc {
1526 inner: desc.clone(),
1527 };
1528 versioned_desc.validate();
1529
1530 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1531 assert_eq!(desc, latest);
1532
1533 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1534 assert_eq!(desc, v0);
1535
1536 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1537 assert_eq!(desc, v3);
1538
1539 let v1 = versioned_desc.add_column("b", ScalarType::Bytes.nullable(false));
1540 assert_eq!(v1, RelationVersion(1));
1541
1542 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1543 insta::assert_json_snapshot!(v1.metadata, @r###"
1544 {
1545 "0": {
1546 "name": "a",
1547 "typ_idx": 0,
1548 "added": 0,
1549 "dropped": null
1550 },
1551 "1": {
1552 "name": "z",
1553 "typ_idx": 1,
1554 "added": 0,
1555 "dropped": null
1556 },
1557 "2": {
1558 "name": "b",
1559 "typ_idx": 2,
1560 "added": 1,
1561 "dropped": null
1562 }
1563 }
1564 "###);
1565
1566 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1568 assert!(v0.iter().eq(v0_b.iter()));
1569
1570 let v2 = versioned_desc.drop_column("z");
1571 assert_eq!(v2, RelationVersion(2));
1572
1573 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1574 insta::assert_json_snapshot!(v2.metadata, @r###"
1575 {
1576 "0": {
1577 "name": "a",
1578 "typ_idx": 0,
1579 "added": 0,
1580 "dropped": null
1581 },
1582 "2": {
1583 "name": "b",
1584 "typ_idx": 1,
1585 "added": 1,
1586 "dropped": null
1587 }
1588 }
1589 "###);
1590
1591 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1593 assert!(v0.iter().eq(v0_c.iter()));
1594
1595 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1596 assert!(v1.iter().eq(v1_b.iter()));
1597
1598 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1599 {
1600 "0": {
1601 "name": "a",
1602 "typ_idx": 0,
1603 "added": 0,
1604 "dropped": null
1605 },
1606 "1": {
1607 "name": "z",
1608 "typ_idx": 1,
1609 "added": 0,
1610 "dropped": 2
1611 },
1612 "2": {
1613 "name": "b",
1614 "typ_idx": 2,
1615 "added": 1,
1616 "dropped": null
1617 }
1618 }
1619 "###);
1620 }
1621
1622 #[mz_ore::test]
1623 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
1625 let desc = RelationDesc::builder()
1626 .with_column("a", ScalarType::Bool.nullable(true))
1627 .with_column("z", ScalarType::String.nullable(false))
1628 .with_key(vec![1])
1629 .finish();
1630
1631 let mut versioned_desc = VersionedRelationDesc {
1632 inner: desc.clone(),
1633 };
1634 versioned_desc.validate();
1635
1636 let v1 = versioned_desc.drop_column("a");
1637 assert_eq!(v1, RelationVersion(1));
1638
1639 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1641 insta::assert_json_snapshot!(v1, @r###"
1642 {
1643 "typ": {
1644 "column_types": [
1645 {
1646 "scalar_type": "String",
1647 "nullable": false
1648 }
1649 ],
1650 "keys": [
1651 [
1652 0
1653 ]
1654 ]
1655 },
1656 "metadata": {
1657 "1": {
1658 "name": "z",
1659 "typ_idx": 0,
1660 "added": 0,
1661 "dropped": null
1662 }
1663 }
1664 }
1665 "###);
1666
1667 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1669 insta::assert_json_snapshot!(v0, @r###"
1670 {
1671 "typ": {
1672 "column_types": [
1673 {
1674 "scalar_type": "Bool",
1675 "nullable": true
1676 },
1677 {
1678 "scalar_type": "String",
1679 "nullable": false
1680 }
1681 ],
1682 "keys": [
1683 [
1684 1
1685 ]
1686 ]
1687 },
1688 "metadata": {
1689 "0": {
1690 "name": "a",
1691 "typ_idx": 0,
1692 "added": 0,
1693 "dropped": 1
1694 },
1695 "1": {
1696 "name": "z",
1697 "typ_idx": 1,
1698 "added": 0,
1699 "dropped": null
1700 }
1701 }
1702 }
1703 "###);
1704 }
1705
1706 #[mz_ore::test]
1707 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
1709 let typ = ProtoRelationType {
1710 column_types: vec![
1711 ScalarType::String.nullable(false).into_proto(),
1712 ScalarType::Bool.nullable(true).into_proto(),
1713 ],
1714 keys: vec![],
1715 };
1716 let proto = ProtoRelationDesc {
1717 typ: Some(typ),
1718 names: vec![
1719 ColumnName("a".into()).into_proto(),
1720 ColumnName("b".into()).into_proto(),
1721 ],
1722 metadata: vec![],
1723 };
1724 let desc: RelationDesc = proto.into_rust().unwrap();
1725
1726 insta::assert_json_snapshot!(desc, @r###"
1727 {
1728 "typ": {
1729 "column_types": [
1730 {
1731 "scalar_type": "String",
1732 "nullable": false
1733 },
1734 {
1735 "scalar_type": "Bool",
1736 "nullable": true
1737 }
1738 ],
1739 "keys": []
1740 },
1741 "metadata": {
1742 "0": {
1743 "name": "a",
1744 "typ_idx": 0,
1745 "added": 0,
1746 "dropped": null
1747 },
1748 "1": {
1749 "name": "b",
1750 "typ_idx": 1,
1751 "added": 0,
1752 "dropped": null
1753 }
1754 }
1755 }
1756 "###);
1757 }
1758
1759 #[mz_ore::test]
1760 #[should_panic(expected = "column named 'a' already exists!")]
1761 fn test_add_column_with_same_name_panics() {
1762 let desc = RelationDesc::builder()
1763 .with_column("a", ScalarType::Bool.nullable(true))
1764 .finish();
1765 let mut versioned = VersionedRelationDesc::new(desc);
1766
1767 let _ = versioned.add_column("a", ScalarType::String.nullable(false));
1768 }
1769
1770 #[mz_ore::test]
1771 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
1773 let desc = RelationDesc::builder()
1774 .with_column("a", ScalarType::Bool.nullable(true))
1775 .finish();
1776 let mut versioned = VersionedRelationDesc::new(desc);
1777
1778 let v1 = versioned.drop_column("a");
1779 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
1780 insta::assert_json_snapshot!(v1, @r###"
1781 {
1782 "typ": {
1783 "column_types": [],
1784 "keys": []
1785 },
1786 "metadata": {}
1787 }
1788 "###);
1789
1790 let v2 = versioned.add_column("a", ScalarType::String.nullable(false));
1791 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
1792 insta::assert_json_snapshot!(v2, @r###"
1793 {
1794 "typ": {
1795 "column_types": [
1796 {
1797 "scalar_type": "String",
1798 "nullable": false
1799 }
1800 ],
1801 "keys": []
1802 },
1803 "metadata": {
1804 "1": {
1805 "name": "a",
1806 "typ_idx": 0,
1807 "added": 2,
1808 "dropped": null
1809 }
1810 }
1811 }
1812 "###);
1813 }
1814
1815 #[mz_ore::test]
1816 #[cfg_attr(miri, ignore)]
1817 fn apply_demand() {
1818 let desc = RelationDesc::builder()
1819 .with_column("a", ScalarType::String.nullable(true))
1820 .with_column("b", ScalarType::Int64.nullable(false))
1821 .with_column("c", ScalarType::Time.nullable(false))
1822 .finish();
1823 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
1824 assert_eq!(desc.arity(), 2);
1825 VersionedRelationDesc::new(desc).validate();
1827 }
1828
1829 #[mz_ore::test]
1830 #[cfg_attr(miri, ignore)]
1831 fn smoketest_column_index_stable_ident() {
1832 let idx_a = ColumnIndex(42);
1833 assert_eq!(idx_a.to_stable_name(), "42");
1835 }
1836
1837 #[mz_ore::test]
1838 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
1840 fn testcase(og: RelationDesc) {
1841 let bytes = og.into_proto().encode_to_vec();
1842 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
1843 let rnd = RelationDesc::from_proto(proto).unwrap();
1844
1845 assert_eq!(og, rnd);
1846 }
1847
1848 proptest!(|(desc in any::<RelationDesc>())| {
1849 testcase(desc);
1850 });
1851
1852 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
1853 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
1854 });
1855
1856 proptest!(|((mut desc, diffs) in strat)| {
1857 for diff in diffs {
1858 diff.apply(&mut desc);
1859 };
1860 testcase(desc);
1861 });
1862 }
1863}