1use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26
27use crate::relation_and_scalar::proto_relation_type::ProtoKey;
28pub use crate::relation_and_scalar::{
29 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
30 ProtoRelationVersion,
31};
32use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
33
34#[derive(
42 Arbitrary,
43 Clone,
44 Debug,
45 Eq,
46 PartialEq,
47 Ord,
48 PartialOrd,
49 Serialize,
50 Deserialize,
51 Hash,
52 MzReflect
53)]
54pub struct SqlColumnType {
55 pub scalar_type: SqlScalarType,
57 #[serde(default = "return_true")]
59 pub nullable: bool,
60}
61
62#[inline(always)]
68fn return_true() -> bool {
69 true
70}
71
72impl SqlColumnType {
73 pub fn backport_nullability(&mut self, backport_typ: &SqlColumnType) {
77 self.scalar_type
78 .backport_nullability(&backport_typ.scalar_type);
79 self.nullable = backport_typ.nullable;
80 }
81
82 pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
88 match (&self.scalar_type, &other.scalar_type) {
89 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
90 Ok(SqlColumnType {
91 scalar_type: scalar_type.clone(),
92 nullable: self.nullable || other.nullable,
93 })
94 }
95 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
96 Ok(SqlColumnType {
97 scalar_type: scalar_type.without_modifiers(),
98 nullable: self.nullable || other.nullable,
99 })
100 }
101 (
102 SqlScalarType::Record { fields, custom_id },
103 SqlScalarType::Record {
104 fields: other_fields,
105 custom_id: other_custom_id,
106 },
107 ) => {
108 if custom_id != other_custom_id {
109 bail!(
110 "Can't union types: {:?} and {:?}",
111 self.scalar_type,
112 other.scalar_type
113 );
114 };
115
116 if fields.len() != other_fields.len() {
117 bail!(
118 "Can't union types: {:?} and {:?}",
119 self.scalar_type,
120 other.scalar_type
121 );
122 }
123 let mut union_fields = Vec::with_capacity(fields.len());
124 for ((name, typ), (other_name, other_typ)) in
125 fields.iter().zip_eq(other_fields.iter())
126 {
127 if name != other_name {
128 bail!(
129 "Can't union types: {:?} and {:?}",
130 self.scalar_type,
131 other.scalar_type
132 );
133 } else {
134 let union_column_type = typ.sql_union(other_typ)?;
135 union_fields.push((name.clone(), union_column_type));
136 };
137 }
138
139 Ok(SqlColumnType {
140 scalar_type: SqlScalarType::Record {
141 fields: union_fields.into(),
142 custom_id: *custom_id,
143 },
144 nullable: self.nullable || other.nullable,
145 })
146 }
147 _ => bail!(
148 "Can't union types: {:?} and {:?}",
149 self.scalar_type,
150 other.scalar_type
151 ),
152 }
153 }
154
155 pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
156 self.sql_union(other).or_else(|e| {
157 ::tracing::trace!("repr type error: sql_union({self:?}, {other:?}): {e}");
158
159 let repr_self = ReprColumnType::from(self);
160 let repr_other = ReprColumnType::from(other);
161 repr_self
162 .union(&repr_other)
163 .map(|typ| SqlColumnType::from_repr(&typ))
164 })
165 }
166
167 pub fn union(&self, other: &Self) -> Self {
168 self.try_union(other).unwrap_or_else(|e| {
169 panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
170 })
171 }
172
173 pub fn nullable(mut self, nullable: bool) -> Self {
176 self.nullable = nullable;
177 self
178 }
179}
180
181impl RustType<ProtoColumnType> for SqlColumnType {
182 fn into_proto(&self) -> ProtoColumnType {
183 ProtoColumnType {
184 nullable: self.nullable,
185 scalar_type: Some(self.scalar_type.into_proto()),
186 }
187 }
188
189 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
190 Ok(SqlColumnType {
191 nullable: proto.nullable,
192 scalar_type: proto
193 .scalar_type
194 .into_rust_if_some("ProtoColumnType::scalar_type")?,
195 })
196 }
197}
198
199impl fmt::Display for SqlColumnType {
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
201 let nullable = if self.nullable { "Null" } else { "NotNull" };
202 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
203 }
204}
205
206#[derive(
208 Arbitrary,
209 Clone,
210 Debug,
211 Eq,
212 PartialEq,
213 Ord,
214 PartialOrd,
215 Serialize,
216 Deserialize,
217 Hash,
218 MzReflect
219)]
220pub struct SqlRelationType {
221 pub column_types: Vec<SqlColumnType>,
223 #[serde(default)]
233 pub keys: Vec<Vec<usize>>,
234}
235
236impl SqlRelationType {
237 pub fn empty() -> Self {
240 SqlRelationType::new(vec![])
241 }
242
243 pub fn new(column_types: Vec<SqlColumnType>) -> Self {
247 SqlRelationType {
248 column_types,
249 keys: Vec::new(),
250 }
251 }
252
253 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
255 indices.sort_unstable();
256 if !self.keys.contains(&indices) {
257 self.keys.push(indices);
258 }
259 self
260 }
261
262 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
263 for key in keys {
264 self = self.with_key(key)
265 }
266 self
267 }
268
269 pub fn arity(&self) -> usize {
271 self.column_types.len()
272 }
273
274 pub fn default_key(&self) -> Vec<usize> {
276 if let Some(key) = self.keys.first() {
277 if key.is_empty() {
278 (0..self.column_types.len()).collect()
279 } else {
280 key.clone()
281 }
282 } else {
283 (0..self.column_types.len()).collect()
284 }
285 }
286
287 pub fn columns(&self) -> &[SqlColumnType] {
289 &self.column_types
290 }
291
292 pub fn backport_nullability_and_keys(&mut self, backport_typ: &SqlRelationType) {
296 assert_eq!(
297 backport_typ.column_types.len(),
298 self.column_types.len(),
299 "HIR and MIR types should have the same number of columns"
300 );
301 for (backport_col, sql_col) in backport_typ
302 .column_types
303 .iter()
304 .zip_eq(self.column_types.iter_mut())
305 {
306 sql_col.backport_nullability(backport_col);
307 }
308
309 self.keys = backport_typ.keys.clone();
310 }
311}
312
313impl RustType<ProtoRelationType> for SqlRelationType {
314 fn into_proto(&self) -> ProtoRelationType {
315 ProtoRelationType {
316 column_types: self.column_types.into_proto(),
317 keys: self.keys.into_proto(),
318 }
319 }
320
321 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
322 Ok(SqlRelationType {
323 column_types: proto.column_types.into_rust()?,
324 keys: proto.keys.into_rust()?,
325 })
326 }
327}
328
329impl RustType<ProtoKey> for Vec<usize> {
330 fn into_proto(&self) -> ProtoKey {
331 ProtoKey {
332 keys: self.into_proto(),
333 }
334 }
335
336 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
337 proto.keys.into_rust()
338 }
339}
340
341#[derive(
343 Clone,
344 Debug,
345 Eq,
346 PartialEq,
347 Ord,
348 PartialOrd,
349 Serialize,
350 Deserialize,
351 Hash
352)]
353pub struct ReprRelationType {
354 pub column_types: Vec<ReprColumnType>,
356 #[serde(default)]
366 pub keys: Vec<Vec<usize>>,
367}
368
369impl ReprRelationType {
370 pub fn empty() -> Self {
373 ReprRelationType::new(vec![])
374 }
375
376 pub fn new(column_types: Vec<ReprColumnType>) -> Self {
380 ReprRelationType {
381 column_types,
382 keys: Vec::new(),
383 }
384 }
385
386 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
388 indices.sort_unstable();
389 if !self.keys.contains(&indices) {
390 self.keys.push(indices);
391 }
392 self
393 }
394
395 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
396 for key in keys {
397 self = self.with_key(key)
398 }
399 self
400 }
401
402 pub fn arity(&self) -> usize {
404 self.column_types.len()
405 }
406
407 pub fn default_key(&self) -> Vec<usize> {
409 if let Some(key) = self.keys.first() {
410 if key.is_empty() {
411 (0..self.column_types.len()).collect()
412 } else {
413 key.clone()
414 }
415 } else {
416 (0..self.column_types.len()).collect()
417 }
418 }
419
420 pub fn columns(&self) -> &[ReprColumnType] {
422 &self.column_types
423 }
424}
425
426impl From<&SqlRelationType> for ReprRelationType {
427 fn from(sql_relation_type: &SqlRelationType) -> Self {
428 ReprRelationType {
429 column_types: sql_relation_type
430 .column_types
431 .iter()
432 .map(ReprColumnType::from)
433 .collect(),
434 keys: sql_relation_type.keys.clone(),
435 }
436 }
437}
438
439#[derive(
440 Clone,
441 Debug,
442 Eq,
443 PartialEq,
444 Ord,
445 PartialOrd,
446 Serialize,
447 Deserialize,
448 Hash,
449 MzReflect
450)]
451pub struct ReprColumnType {
452 pub scalar_type: ReprScalarType,
454 #[serde(default = "return_true")]
456 pub nullable: bool,
457}
458
459impl ReprColumnType {
460 pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
461 let scalar_type = self.scalar_type.union(&col.scalar_type)?;
462 let nullable = self.nullable || col.nullable;
463
464 Ok(ReprColumnType {
465 scalar_type,
466 nullable,
467 })
468 }
469}
470
471impl From<&SqlColumnType> for ReprColumnType {
472 fn from(sql_column_type: &SqlColumnType) -> Self {
473 let scalar_type = &sql_column_type.scalar_type;
474 let scalar_type = scalar_type.into();
475 let nullable = sql_column_type.nullable;
476
477 ReprColumnType {
478 scalar_type,
479 nullable,
480 }
481 }
482}
483
484impl SqlColumnType {
485 pub fn from_repr(repr: &ReprColumnType) -> Self {
489 let scalar_type = &repr.scalar_type;
490 let scalar_type = SqlScalarType::from_repr(scalar_type);
491 let nullable = repr.nullable;
492
493 SqlColumnType {
494 scalar_type,
495 nullable,
496 }
497 }
498}
499
500#[derive(
502 Clone,
503 Debug,
504 Eq,
505 PartialEq,
506 Ord,
507 PartialOrd,
508 Serialize,
509 Deserialize,
510 Hash,
511 MzReflect
512)]
513pub struct ColumnName(Box<str>);
514
515impl ColumnName {
516 #[inline(always)]
518 pub fn as_str(&self) -> &str {
519 &*self
520 }
521
522 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
524 &mut self.0
525 }
526
527 pub fn is_similar(&self, other: &ColumnName) -> bool {
529 const SIMILARITY_THRESHOLD: f64 = 0.6;
530
531 let a_lowercase = self.to_lowercase();
532 let b_lowercase = other.to_lowercase();
533
534 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
535 }
536}
537
538impl std::ops::Deref for ColumnName {
539 type Target = str;
540
541 #[inline(always)]
542 fn deref(&self) -> &Self::Target {
543 &self.0
544 }
545}
546
547impl fmt::Display for ColumnName {
548 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
549 f.write_str(&self.0)
550 }
551}
552
553impl From<String> for ColumnName {
554 fn from(s: String) -> ColumnName {
555 ColumnName(s.into())
556 }
557}
558
559impl From<&str> for ColumnName {
560 fn from(s: &str) -> ColumnName {
561 ColumnName(s.into())
562 }
563}
564
565impl From<&ColumnName> for ColumnName {
566 fn from(n: &ColumnName) -> ColumnName {
567 n.clone()
568 }
569}
570
571impl RustType<ProtoColumnName> for ColumnName {
572 fn into_proto(&self) -> ProtoColumnName {
573 ProtoColumnName {
574 value: Some(self.0.to_string()),
575 }
576 }
577
578 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
579 Ok(ColumnName(
580 proto
581 .value
582 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
583 .into(),
584 ))
585 }
586}
587
588impl From<ColumnName> for mz_sql_parser::ast::Ident {
589 fn from(value: ColumnName) -> Self {
590 mz_sql_parser::ast::Ident::new_unchecked(value.0)
592 }
593}
594
595impl proptest::arbitrary::Arbitrary for ColumnName {
596 type Parameters = ();
597 type Strategy = BoxedStrategy<ColumnName>;
598
599 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
600 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
603 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
604 weights.extend([
605 (5, Just(16..128)),
606 (1, Just(128..1024)),
607 (1, Just(1024..4096)),
608 ]);
609 }
610 let name_length = Union::new_weighted(weights);
611
612 let char_strat = Rc::new(Union::new_weighted(vec![
615 (50, proptest::char::range('A', 'z').boxed()),
616 (1, any::<char>().boxed()),
617 ]));
618
619 name_length
620 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
621 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
622 .no_shrink()
623 .boxed()
624 }
625}
626
627pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
629
630#[derive(
632 Clone,
633 Copy,
634 Debug,
635 Eq,
636 PartialEq,
637 PartialOrd,
638 Ord,
639 Serialize,
640 Deserialize,
641 Hash,
642 MzReflect
643)]
644pub struct ColumnIndex(usize);
645
646static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
647
648impl ColumnIndex {
649 pub fn to_stable_name(&self) -> String {
651 self.0.to_string()
652 }
653
654 pub fn to_raw(&self) -> usize {
655 self.0
656 }
657
658 pub fn from_raw(val: usize) -> Self {
659 ColumnIndex(val)
660 }
661}
662
663#[derive(
665 Clone,
666 Copy,
667 Debug,
668 Eq,
669 PartialEq,
670 PartialOrd,
671 Ord,
672 Serialize,
673 Deserialize,
674 Hash,
675 MzReflect,
676 Arbitrary
677)]
678pub struct RelationVersion(u64);
679
680impl RelationVersion {
681 pub fn root() -> Self {
683 RelationVersion(0)
684 }
685
686 pub fn bump(&self) -> Self {
688 let next_version = self
689 .0
690 .checked_add(1)
691 .expect("added more than u64::MAX columns?");
692 RelationVersion(next_version)
693 }
694
695 pub fn into_raw(self) -> u64 {
699 self.0
700 }
701
702 pub fn from_raw(val: u64) -> RelationVersion {
706 RelationVersion(val)
707 }
708}
709
710impl From<RelationVersion> for SchemaId {
711 fn from(value: RelationVersion) -> Self {
712 SchemaId(usize::cast_from(value.0))
713 }
714}
715
716impl From<mz_sql_parser::ast::Version> for RelationVersion {
717 fn from(value: mz_sql_parser::ast::Version) -> Self {
718 RelationVersion(value.into_inner())
719 }
720}
721
722impl fmt::Display for RelationVersion {
723 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
724 write!(f, "v{}", self.0)
725 }
726}
727
728impl From<RelationVersion> for mz_sql_parser::ast::Version {
729 fn from(value: RelationVersion) -> Self {
730 mz_sql_parser::ast::Version::new(value.0)
731 }
732}
733
734impl RustType<ProtoRelationVersion> for RelationVersion {
735 fn into_proto(&self) -> ProtoRelationVersion {
736 ProtoRelationVersion { value: self.0 }
737 }
738
739 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
740 Ok(RelationVersion(proto.value))
741 }
742}
743
744#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
746struct ColumnMetadata {
747 name: ColumnName,
749 typ_idx: usize,
751 added: RelationVersion,
753 dropped: Option<RelationVersion>,
755}
756
757#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
825pub struct RelationDesc {
826 typ: SqlRelationType,
827 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
828}
829
830impl RustType<ProtoRelationDesc> for RelationDesc {
831 fn into_proto(&self) -> ProtoRelationDesc {
832 let (names, metadata): (Vec<_>, Vec<_>) = self
833 .metadata
834 .values()
835 .map(|meta| {
836 let metadata = ProtoColumnMetadata {
837 added: Some(meta.added.into_proto()),
838 dropped: meta.dropped.map(|v| v.into_proto()),
839 };
840 (meta.name.into_proto(), metadata)
841 })
842 .unzip();
843
844 let is_all_default_metadata = metadata.iter().all(|meta| {
850 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
851 });
852 let metadata = if is_all_default_metadata {
853 Vec::new()
854 } else {
855 metadata
856 };
857
858 ProtoRelationDesc {
859 typ: Some(self.typ.into_proto()),
860 names,
861 metadata,
862 }
863 }
864
865 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
866 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
872 let val = ProtoColumnMetadata {
873 added: Some(RelationVersion::root().into_proto()),
874 dropped: None,
875 };
876 Box::new(itertools::repeat_n(val, proto.names.len()))
877 } else {
878 Box::new(proto.metadata.into_iter())
879 };
880
881 let metadata = proto
882 .names
883 .into_iter()
884 .zip_eq(proto_metadata)
885 .enumerate()
886 .map(|(idx, (name, metadata))| {
887 let meta = ColumnMetadata {
888 name: name.into_rust()?,
889 typ_idx: idx,
890 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
891 dropped: metadata.dropped.into_rust()?,
892 };
893 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
894 })
895 .collect::<Result<_, _>>()?;
896
897 Ok(RelationDesc {
898 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
899 metadata,
900 })
901 }
902}
903
904impl RelationDesc {
905 pub fn builder() -> RelationDescBuilder {
907 RelationDescBuilder::default()
908 }
909
910 pub fn empty() -> Self {
913 RelationDesc {
914 typ: SqlRelationType::empty(),
915 metadata: BTreeMap::default(),
916 }
917 }
918
919 pub fn is_empty(&self) -> bool {
921 self == &Self::empty()
922 }
923
924 pub fn len(&self) -> usize {
926 self.typ().column_types.len()
927 }
928
929 pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
937 where
938 I: IntoIterator<Item = N>,
939 N: Into<ColumnName>,
940 {
941 let metadata: BTreeMap<_, _> = names
942 .into_iter()
943 .enumerate()
944 .map(|(idx, name)| {
945 let col_idx = ColumnIndex(idx);
946 let metadata = ColumnMetadata {
947 name: name.into(),
948 typ_idx: idx,
949 added: RelationVersion::root(),
950 dropped: None,
951 };
952 (col_idx, metadata)
953 })
954 .collect();
955
956 assert_eq!(typ.column_types.len(), metadata.len());
958
959 RelationDesc { typ, metadata }
960 }
961
962 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
963 where
964 I: IntoIterator<Item = (N, T)>,
965 T: Into<SqlColumnType>,
966 N: Into<ColumnName>,
967 {
968 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
969 let types = types.into_iter().map(Into::into).collect();
970 let typ = SqlRelationType::new(types);
971 Self::new(typ, names)
972 }
973
974 pub fn concat(mut self, other: Self) -> Self {
984 let self_len = self.typ.column_types.len();
985
986 for (typ, (_col_idx, meta)) in other
987 .typ
988 .column_types
989 .into_iter()
990 .zip_eq(other.metadata.into_iter())
991 {
992 assert_eq!(meta.added, RelationVersion::root());
993 assert_none!(meta.dropped);
994
995 let new_idx = self.typ.columns().len();
996 let new_meta = ColumnMetadata {
997 name: meta.name,
998 typ_idx: new_idx,
999 added: RelationVersion::root(),
1000 dropped: None,
1001 };
1002
1003 self.typ.column_types.push(typ);
1004 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1005
1006 assert_eq!(self.metadata.len(), self.typ.columns().len());
1007 assert_none!(prev);
1008 }
1009
1010 for k in other.typ.keys {
1011 let k = k.into_iter().map(|idx| idx + self_len).collect();
1012 self = self.with_key(k);
1013 }
1014 self
1015 }
1016
1017 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1019 self.typ = self.typ.with_key(indices);
1020 self
1021 }
1022
1023 pub fn without_keys(mut self) -> Self {
1025 self.typ.keys.clear();
1026 self
1027 }
1028
1029 pub fn with_names<I, N>(self, names: I) -> Self
1037 where
1038 I: IntoIterator<Item = N>,
1039 N: Into<ColumnName>,
1040 {
1041 Self::new(self.typ, names)
1042 }
1043
1044 pub fn arity(&self) -> usize {
1046 self.typ.arity()
1047 }
1048
1049 pub fn typ(&self) -> &SqlRelationType {
1051 &self.typ
1052 }
1053
1054 pub fn into_typ(self) -> SqlRelationType {
1056 self.typ
1057 }
1058
1059 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1061 self.metadata.values().map(|meta| {
1062 let typ = &self.typ.columns()[meta.typ_idx];
1063 (&meta.name, typ)
1064 })
1065 }
1066
1067 pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1069 self.typ.column_types.iter()
1070 }
1071
1072 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1074 self.metadata.values().map(|meta| &meta.name)
1075 }
1076
1077 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1079 self.metadata.iter().map(|(col_idx, metadata)| {
1080 let col_typ = &self.typ.columns()[metadata.typ_idx];
1081 (col_idx, &metadata.name, col_typ)
1082 })
1083 }
1084
1085 pub fn iter_similar_names<'a>(
1088 &'a self,
1089 name: &'a ColumnName,
1090 ) -> impl Iterator<Item = &'a ColumnName> {
1091 self.iter_names().filter(|n| n.is_similar(name))
1092 }
1093
1094 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1096 self.metadata.contains_key(idx)
1097 }
1098
1099 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1105 self.iter_names()
1106 .position(|n| n == name)
1107 .map(|i| (i, &self.typ.column_types[i]))
1108 }
1109
1110 pub fn get_name(&self, i: usize) -> &ColumnName {
1118 self.get_name_idx(&ColumnIndex(i))
1120 }
1121
1122 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1128 &self.metadata.get(idx).expect("should exist").name
1129 }
1130
1131 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1137 &mut self
1139 .metadata
1140 .get_mut(&ColumnIndex(i))
1141 .expect("should exist")
1142 .name
1143 }
1144
1145 pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1151 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1152 &self.typ.column_types[typ_idx]
1153 }
1154
1155 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1164 let name = self.get_name(i);
1165 if self.iter_names().filter(|n| *n == name).count() == 1 {
1166 Some(name)
1167 } else {
1168 None
1169 }
1170 }
1171
1172 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1177 let name = self.get_name(i);
1178 let typ = &self.typ.column_types[i];
1179 if d == &Datum::Null && !typ.nullable {
1180 Err(NotNullViolation(name.clone()))
1181 } else {
1182 Ok(())
1183 }
1184 }
1185
1186 pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1201 assert_eq!(self.metadata.len(), self.typ.columns().len());
1202 assert_eq!(other.metadata.len(), other.typ.columns().len());
1203 for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1204 assert_eq!(meta.typ_idx, idx.0);
1205 assert_eq!(meta.added, RelationVersion::root());
1206 assert_none!(meta.dropped);
1207 }
1208
1209 let mut column_diffs = BTreeMap::new();
1210 let mut key_diff = None;
1211
1212 let left_arity = self.arity();
1213 let right_arity = other.arity();
1214 let common_arity = std::cmp::min(left_arity, right_arity);
1215
1216 for idx in 0..common_arity {
1217 let left_name = self.get_name(idx);
1218 let right_name = other.get_name(idx);
1219 let left_type = &self.typ.column_types[idx];
1220 let right_type = &other.typ.column_types[idx];
1221
1222 if left_name != right_name {
1223 let diff = ColumnDiff::NameMismatch {
1224 left: left_name.clone(),
1225 right: right_name.clone(),
1226 };
1227 column_diffs.insert(idx, diff);
1228 } else if left_type.scalar_type != right_type.scalar_type {
1229 let diff = ColumnDiff::TypeMismatch {
1230 name: left_name.clone(),
1231 left: left_type.scalar_type.clone(),
1232 right: right_type.scalar_type.clone(),
1233 };
1234 column_diffs.insert(idx, diff);
1235 } else if left_type.nullable != right_type.nullable {
1236 let diff = ColumnDiff::NullabilityMismatch {
1237 name: left_name.clone(),
1238 left: left_type.nullable,
1239 right: right_type.nullable,
1240 };
1241 column_diffs.insert(idx, diff);
1242 }
1243 }
1244
1245 for idx in common_arity..left_arity {
1246 let diff = ColumnDiff::Missing {
1247 name: self.get_name(idx).clone(),
1248 };
1249 column_diffs.insert(idx, diff);
1250 }
1251
1252 for idx in common_arity..right_arity {
1253 let diff = ColumnDiff::Extra {
1254 name: other.get_name(idx).clone(),
1255 };
1256 column_diffs.insert(idx, diff);
1257 }
1258
1259 let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1260 let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1261 if left_keys != right_keys {
1262 let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1263 keys.iter()
1264 .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1265 .collect()
1266 };
1267 key_diff = Some(KeyDiff {
1268 left: column_names(self, left_keys),
1269 right: column_names(other, right_keys),
1270 });
1271 }
1272
1273 RelationDescDiff {
1274 column_diffs,
1275 key_diff,
1276 }
1277 }
1278
1279 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1281 let mut new_desc = self.clone();
1282
1283 let mut removed = 0;
1285 new_desc.metadata.retain(|idx, metadata| {
1286 let retain = demands.contains(&idx.0);
1287 if !retain {
1288 removed += 1;
1289 } else {
1290 metadata.typ_idx -= removed;
1291 }
1292 retain
1293 });
1294
1295 let mut idx = 0;
1297 new_desc.typ.column_types.retain(|_| {
1298 let keep = demands.contains(&idx);
1299 idx += 1;
1300 keep
1301 });
1302
1303 new_desc
1304 }
1305}
1306
1307impl Arbitrary for RelationDesc {
1308 type Parameters = ();
1309 type Strategy = BoxedStrategy<RelationDesc>;
1310
1311 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1312 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1313 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1314 weights.extend([
1315 (12, Just(16..32)),
1316 (6, Just(32..64)),
1317 (3, Just(64..128)),
1318 (1, Just(128..256)),
1319 ]);
1320 }
1321 let num_columns = Union::new_weighted(weights);
1322
1323 num_columns.prop_flat_map(arb_relation_desc).boxed()
1324 }
1325}
1326
1327pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1330 proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1331 .prop_map(RelationDesc::from_names_and_types)
1332}
1333
1334pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1336 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1337 mask.prop_map(move |mask| {
1338 let demands: BTreeSet<_> = mask
1339 .into_iter()
1340 .enumerate()
1341 .filter_map(|(idx, keep)| keep.then_some(idx))
1342 .collect();
1343 desc.apply_demand(&demands)
1344 })
1345}
1346
1347impl IntoIterator for RelationDesc {
1348 type Item = (ColumnName, SqlColumnType);
1349 type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1350
1351 fn into_iter(self) -> Self::IntoIter {
1352 let iter = self
1353 .metadata
1354 .into_values()
1355 .zip_eq(self.typ.column_types)
1356 .map(|(meta, typ)| (meta.name, typ));
1357 Box::new(iter)
1358 }
1359}
1360
1361pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1363 let datums: Vec<_> = desc
1364 .typ()
1365 .columns()
1366 .iter()
1367 .cloned()
1368 .map(arb_datum_for_column)
1369 .collect();
1370 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1371}
1372
1373#[derive(Debug, PartialEq, Eq)]
1375pub struct NotNullViolation(pub ColumnName);
1376
1377impl fmt::Display for NotNullViolation {
1378 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1379 write!(
1380 f,
1381 "null value in column {} violates not-null constraint",
1382 self.0.quoted()
1383 )
1384 }
1385}
1386
1387#[derive(Debug, Clone, PartialEq, Eq)]
1389pub struct RelationDescDiff {
1390 pub column_diffs: BTreeMap<usize, ColumnDiff>,
1392 pub key_diff: Option<KeyDiff>,
1394}
1395
1396impl RelationDescDiff {
1397 pub fn is_empty(&self) -> bool {
1399 self.column_diffs.is_empty() && self.key_diff.is_none()
1400 }
1401}
1402
1403#[derive(Debug, Clone, PartialEq, Eq)]
1405pub enum ColumnDiff {
1406 Missing { name: ColumnName },
1408 Extra { name: ColumnName },
1410 TypeMismatch {
1412 name: ColumnName,
1413 left: SqlScalarType,
1414 right: SqlScalarType,
1415 },
1416 NullabilityMismatch {
1418 name: ColumnName,
1419 left: bool,
1420 right: bool,
1421 },
1422 NameMismatch { left: ColumnName, right: ColumnName },
1424}
1425
1426#[derive(Debug, Clone, PartialEq, Eq)]
1428pub struct KeyDiff {
1429 pub left: BTreeSet<Vec<ColumnName>>,
1431 pub right: BTreeSet<Vec<ColumnName>>,
1433}
1434
1435#[derive(Clone, Default, Debug, PartialEq, Eq)]
1437pub struct RelationDescBuilder {
1438 columns: Vec<(ColumnName, SqlColumnType)>,
1440 keys: Vec<Vec<usize>>,
1442}
1443
1444impl RelationDescBuilder {
1445 pub fn with_column<N: Into<ColumnName>>(
1447 mut self,
1448 name: N,
1449 ty: SqlColumnType,
1450 ) -> RelationDescBuilder {
1451 let name = name.into();
1452 self.columns.push((name, ty));
1453 self
1454 }
1455
1456 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1458 where
1459 I: IntoIterator<Item = (N, T)>,
1460 T: Into<SqlColumnType>,
1461 N: Into<ColumnName>,
1462 {
1463 self.columns
1464 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1465 self
1466 }
1467
1468 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1470 indices.sort_unstable();
1471 if !self.keys.contains(&indices) {
1472 self.keys.push(indices);
1473 }
1474 self
1475 }
1476
1477 pub fn without_keys(mut self) -> RelationDescBuilder {
1479 self.keys.clear();
1480 assert_eq!(self.keys.len(), 0);
1481 self
1482 }
1483
1484 pub fn concat(mut self, other: Self) -> Self {
1486 let self_len = self.columns.len();
1487
1488 self.columns.extend(other.columns);
1489 for k in other.keys {
1490 let k = k.into_iter().map(|idx| idx + self_len).collect();
1491 self = self.with_key(k);
1492 }
1493
1494 self
1495 }
1496
1497 pub fn finish(self) -> RelationDesc {
1499 let mut desc = RelationDesc::from_names_and_types(self.columns);
1500 desc.typ = desc.typ.with_keys(self.keys);
1501 desc
1502 }
1503}
1504
1505#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1507pub enum RelationVersionSelector {
1508 Specific(RelationVersion),
1509 Latest,
1510}
1511
1512impl RelationVersionSelector {
1513 pub fn specific(version: u64) -> Self {
1514 RelationVersionSelector::Specific(RelationVersion(version))
1515 }
1516}
1517
1518#[derive(Debug, Clone, Serialize)]
1524pub struct VersionedRelationDesc {
1525 inner: RelationDesc,
1526}
1527
1528impl VersionedRelationDesc {
1529 pub fn new(inner: RelationDesc) -> Self {
1530 VersionedRelationDesc { inner }
1531 }
1532
1533 #[must_use]
1541 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1542 where
1543 N: Into<ColumnName>,
1544 T: Into<SqlColumnType>,
1545 {
1546 let latest_version = self.latest_version();
1547 let new_version = latest_version.bump();
1548
1549 let name = name.into();
1550 let existing = self
1551 .inner
1552 .metadata
1553 .iter()
1554 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1555 if let Some(existing) = existing {
1556 panic!("column named '{name}' already exists! {existing:?}");
1557 }
1558
1559 let next_idx = self.inner.metadata.len();
1560 let col_meta = ColumnMetadata {
1561 name,
1562 typ_idx: next_idx,
1563 added: new_version,
1564 dropped: None,
1565 };
1566
1567 self.inner.typ.column_types.push(typ.into());
1568 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1569
1570 assert_none!(prev, "column index overlap!");
1571 self.validate();
1572
1573 new_version
1574 }
1575
1576 #[must_use]
1585 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1586 where
1587 N: Into<ColumnName>,
1588 {
1589 let name = name.into();
1590 let latest_version = self.latest_version();
1591 let new_version = latest_version.bump();
1592
1593 let col = self
1594 .inner
1595 .metadata
1596 .values_mut()
1597 .find(|meta| meta.name == name && meta.dropped.is_none())
1598 .expect("column to exist");
1599
1600 assert_none!(col.dropped, "column was already dropped");
1602 col.dropped = Some(new_version);
1603
1604 let dropped_key = self
1606 .inner
1607 .typ
1608 .keys
1609 .iter()
1610 .any(|keys| keys.contains(&col.typ_idx));
1611 assert!(!dropped_key, "column being dropped was used as a key");
1612
1613 self.validate();
1614 new_version
1615 }
1616
1617 pub fn latest(&self) -> RelationDesc {
1619 self.inner.clone()
1620 }
1621
1622 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1624 let up_to_version = match version {
1626 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1627 RelationVersionSelector::Specific(v) => v,
1628 };
1629
1630 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1631 let added = meta.added <= up_to_version;
1632 let dropped = meta
1633 .dropped
1634 .map(|dropped_at| up_to_version >= dropped_at)
1635 .unwrap_or(false);
1636
1637 added && !dropped
1638 });
1639
1640 let mut column_types = Vec::new();
1641 let mut column_metas = BTreeMap::new();
1642
1643 for (col_idx, meta) in valid_columns {
1650 let new_meta = ColumnMetadata {
1651 name: meta.name.clone(),
1652 typ_idx: column_types.len(),
1653 added: meta.added.clone(),
1654 dropped: meta.dropped.clone(),
1655 };
1656 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1657 column_metas.insert(*col_idx, new_meta);
1658 }
1659
1660 let keys = self
1666 .inner
1667 .typ
1668 .keys
1669 .iter()
1670 .map(|keys| {
1671 keys.iter()
1672 .map(|key_idx| {
1673 let metadata = column_metas
1674 .get(&ColumnIndex(*key_idx))
1675 .expect("found key for column that doesn't exist");
1676 metadata.typ_idx
1677 })
1678 .collect()
1679 })
1680 .collect();
1681
1682 let relation_type = SqlRelationType { column_types, keys };
1683
1684 RelationDesc {
1685 typ: relation_type,
1686 metadata: column_metas,
1687 }
1688 }
1689
1690 pub fn latest_version(&self) -> RelationVersion {
1691 self.inner
1692 .metadata
1693 .values()
1694 .map(|meta| meta.dropped.unwrap_or(meta.added))
1696 .max()
1697 .unwrap_or_else(RelationVersion::root)
1699 }
1700
1701 fn validate(&self) {
1707 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1708 if desc.typ.column_types.len() != desc.metadata.len() {
1709 anyhow::bail!("mismatch between number of types and metadatas");
1710 }
1711
1712 for (col_idx, meta) in &desc.metadata {
1713 if col_idx.0 > desc.metadata.len() {
1714 anyhow::bail!("column index out of bounds");
1715 }
1716 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1717 anyhow::bail!("column was added after it was dropped?");
1718 }
1719 if desc.typ().columns().get(meta.typ_idx).is_none() {
1720 anyhow::bail!("typ_idx incorrect");
1721 }
1722 }
1723
1724 for keys in &desc.typ.keys {
1725 for key in keys {
1726 if *key >= desc.typ.column_types.len() {
1727 anyhow::bail!("key index was out of bounds!");
1728 }
1729 }
1730 }
1731
1732 let versions = desc
1733 .metadata
1734 .values()
1735 .map(|meta| meta.dropped.unwrap_or(meta.added));
1736 let mut max = 0;
1737 let mut sum = 0;
1738 for version in versions {
1739 max = std::cmp::max(max, version.0);
1740 sum += version.0;
1741 }
1742
1743 if sum != (max * (max + 1) / 2) {
1753 anyhow::bail!("there is a duplicate or missing relation version");
1754 }
1755
1756 Ok(())
1757 }
1758
1759 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1760 }
1761}
1762
1763#[derive(Debug)]
1766pub enum PropRelationDescDiff {
1767 AddColumn {
1768 name: ColumnName,
1769 typ: SqlColumnType,
1770 },
1771 DropColumn {
1772 name: ColumnName,
1773 },
1774 ToggleNullability {
1775 name: ColumnName,
1776 },
1777 ChangeType {
1778 name: ColumnName,
1779 typ: SqlColumnType,
1780 },
1781}
1782
1783impl PropRelationDescDiff {
1784 pub fn apply(self, desc: &mut RelationDesc) {
1785 match self {
1786 PropRelationDescDiff::AddColumn { name, typ } => {
1787 let new_idx = desc.metadata.len();
1788 let meta = ColumnMetadata {
1789 name,
1790 typ_idx: new_idx,
1791 added: RelationVersion(0),
1792 dropped: None,
1793 };
1794 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1795 desc.typ.column_types.push(typ);
1796
1797 assert_none!(prev);
1798 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1799 }
1800 PropRelationDescDiff::DropColumn { name } => {
1801 let next_version = desc
1802 .metadata
1803 .values()
1804 .map(|meta| meta.dropped.unwrap_or(meta.added))
1805 .max()
1806 .unwrap_or_else(RelationVersion::root)
1807 .bump();
1808 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1809 else {
1810 return;
1811 };
1812 if metadata.dropped.is_none() {
1813 metadata.dropped = Some(next_version);
1814 }
1815 }
1816 PropRelationDescDiff::ToggleNullability { name } => {
1817 let Some((pos, _)) = desc.get_by_name(&name) else {
1818 return;
1819 };
1820 let col_type = desc
1821 .typ
1822 .column_types
1823 .get_mut(pos)
1824 .expect("ColumnNames and SqlColumnTypes out of sync!");
1825 col_type.nullable = !col_type.nullable;
1826 }
1827 PropRelationDescDiff::ChangeType { name, typ } => {
1828 let Some((pos, _)) = desc.get_by_name(&name) else {
1829 return;
1830 };
1831 let col_type = desc
1832 .typ
1833 .column_types
1834 .get_mut(pos)
1835 .expect("ColumnNames and SqlColumnTypes out of sync!");
1836 *col_type = typ;
1837 }
1838 }
1839 }
1840}
1841
1842pub fn arb_relation_desc_diff(
1844 source: &RelationDesc,
1845) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1846 let source = Rc::new(source.clone());
1847 let num_source_columns = source.typ.columns().len();
1848
1849 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1850 let add_columns_strat = num_add_columns
1851 .prop_flat_map(|num_columns| {
1852 proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1853 })
1854 .prop_map(|cols| {
1855 cols.into_iter()
1856 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1857 .collect::<Vec<_>>()
1858 });
1859
1860 if num_source_columns == 0 {
1862 return add_columns_strat.boxed();
1863 }
1864
1865 let source_ = Rc::clone(&source);
1866 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1867 let mut set = BTreeSet::default();
1868 for _ in 0..num_columns {
1869 let col_idx = rng.random_range(0..num_source_columns);
1870 set.insert(source_.get_name(col_idx).clone());
1871 }
1872 set.into_iter()
1873 .map(|name| PropRelationDescDiff::DropColumn { name })
1874 .collect::<Vec<_>>()
1875 });
1876
1877 let source_ = Rc::clone(&source);
1878 let toggle_nullability_strat =
1879 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1880 let mut set = BTreeSet::default();
1881 for _ in 0..num_columns {
1882 let col_idx = rng.random_range(0..num_source_columns);
1883 set.insert(source_.get_name(col_idx).clone());
1884 }
1885 set.into_iter()
1886 .map(|name| PropRelationDescDiff::ToggleNullability { name })
1887 .collect::<Vec<_>>()
1888 });
1889
1890 let source_ = Rc::clone(&source);
1891 let change_type_strat = (0..num_source_columns)
1892 .prop_perturb(move |num_columns, mut rng| {
1893 let mut set = BTreeSet::default();
1894 for _ in 0..num_columns {
1895 let col_idx = rng.random_range(0..num_source_columns);
1896 set.insert(source_.get_name(col_idx).clone());
1897 }
1898 set
1899 })
1900 .prop_flat_map(|cols| {
1901 proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1902 .prop_map(move |types| (cols.clone(), types))
1903 })
1904 .prop_map(|(cols, types)| {
1905 cols.into_iter()
1906 .zip_eq(types)
1907 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1908 .collect::<Vec<_>>()
1909 });
1910
1911 (
1912 add_columns_strat,
1913 drop_columns_strat,
1914 toggle_nullability_strat,
1915 change_type_strat,
1916 )
1917 .prop_map(|(adds, drops, toggles, changes)| {
1918 adds.into_iter()
1919 .chain(drops)
1920 .chain(toggles)
1921 .chain(changes)
1922 .collect::<Vec<_>>()
1923 })
1924 .prop_shuffle()
1925 .boxed()
1926}
1927
1928#[cfg(test)]
1929mod tests {
1930 use super::*;
1931 use prost::Message;
1932
1933 #[mz_ore::test]
1934 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
1936 let desc = RelationDesc::builder()
1937 .with_column("a", SqlScalarType::Bool.nullable(true))
1938 .with_column("z", SqlScalarType::String.nullable(false))
1939 .finish();
1940
1941 let mut versioned_desc = VersionedRelationDesc {
1942 inner: desc.clone(),
1943 };
1944 versioned_desc.validate();
1945
1946 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1947 assert_eq!(desc, latest);
1948
1949 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1950 assert_eq!(desc, v0);
1951
1952 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1953 assert_eq!(desc, v3);
1954
1955 let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
1956 assert_eq!(v1, RelationVersion(1));
1957
1958 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1959 insta::assert_json_snapshot!(v1.metadata, @r###"
1960 {
1961 "0": {
1962 "name": "a",
1963 "typ_idx": 0,
1964 "added": 0,
1965 "dropped": null
1966 },
1967 "1": {
1968 "name": "z",
1969 "typ_idx": 1,
1970 "added": 0,
1971 "dropped": null
1972 },
1973 "2": {
1974 "name": "b",
1975 "typ_idx": 2,
1976 "added": 1,
1977 "dropped": null
1978 }
1979 }
1980 "###);
1981
1982 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1984 assert!(v0.iter().eq(v0_b.iter()));
1985
1986 let v2 = versioned_desc.drop_column("z");
1987 assert_eq!(v2, RelationVersion(2));
1988
1989 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1990 insta::assert_json_snapshot!(v2.metadata, @r###"
1991 {
1992 "0": {
1993 "name": "a",
1994 "typ_idx": 0,
1995 "added": 0,
1996 "dropped": null
1997 },
1998 "2": {
1999 "name": "b",
2000 "typ_idx": 1,
2001 "added": 1,
2002 "dropped": null
2003 }
2004 }
2005 "###);
2006
2007 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2009 assert!(v0.iter().eq(v0_c.iter()));
2010
2011 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2012 assert!(v1.iter().eq(v1_b.iter()));
2013
2014 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2015 {
2016 "0": {
2017 "name": "a",
2018 "typ_idx": 0,
2019 "added": 0,
2020 "dropped": null
2021 },
2022 "1": {
2023 "name": "z",
2024 "typ_idx": 1,
2025 "added": 0,
2026 "dropped": 2
2027 },
2028 "2": {
2029 "name": "b",
2030 "typ_idx": 2,
2031 "added": 1,
2032 "dropped": null
2033 }
2034 }
2035 "###);
2036 }
2037
2038 #[mz_ore::test]
2039 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
2041 let desc = RelationDesc::builder()
2042 .with_column("a", SqlScalarType::Bool.nullable(true))
2043 .with_column("z", SqlScalarType::String.nullable(false))
2044 .with_key(vec![1])
2045 .finish();
2046
2047 let mut versioned_desc = VersionedRelationDesc {
2048 inner: desc.clone(),
2049 };
2050 versioned_desc.validate();
2051
2052 let v1 = versioned_desc.drop_column("a");
2053 assert_eq!(v1, RelationVersion(1));
2054
2055 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2057 insta::assert_json_snapshot!(v1, @r###"
2058 {
2059 "typ": {
2060 "column_types": [
2061 {
2062 "scalar_type": "String",
2063 "nullable": false
2064 }
2065 ],
2066 "keys": [
2067 [
2068 0
2069 ]
2070 ]
2071 },
2072 "metadata": {
2073 "1": {
2074 "name": "z",
2075 "typ_idx": 0,
2076 "added": 0,
2077 "dropped": null
2078 }
2079 }
2080 }
2081 "###);
2082
2083 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2085 insta::assert_json_snapshot!(v0, @r###"
2086 {
2087 "typ": {
2088 "column_types": [
2089 {
2090 "scalar_type": "Bool",
2091 "nullable": true
2092 },
2093 {
2094 "scalar_type": "String",
2095 "nullable": false
2096 }
2097 ],
2098 "keys": [
2099 [
2100 1
2101 ]
2102 ]
2103 },
2104 "metadata": {
2105 "0": {
2106 "name": "a",
2107 "typ_idx": 0,
2108 "added": 0,
2109 "dropped": 1
2110 },
2111 "1": {
2112 "name": "z",
2113 "typ_idx": 1,
2114 "added": 0,
2115 "dropped": null
2116 }
2117 }
2118 }
2119 "###);
2120 }
2121
2122 #[mz_ore::test]
2123 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
2125 let typ = ProtoRelationType {
2126 column_types: vec![
2127 SqlScalarType::String.nullable(false).into_proto(),
2128 SqlScalarType::Bool.nullable(true).into_proto(),
2129 ],
2130 keys: vec![],
2131 };
2132 let proto = ProtoRelationDesc {
2133 typ: Some(typ),
2134 names: vec![
2135 ColumnName("a".into()).into_proto(),
2136 ColumnName("b".into()).into_proto(),
2137 ],
2138 metadata: vec![],
2139 };
2140 let desc: RelationDesc = proto.into_rust().unwrap();
2141
2142 insta::assert_json_snapshot!(desc, @r###"
2143 {
2144 "typ": {
2145 "column_types": [
2146 {
2147 "scalar_type": "String",
2148 "nullable": false
2149 },
2150 {
2151 "scalar_type": "Bool",
2152 "nullable": true
2153 }
2154 ],
2155 "keys": []
2156 },
2157 "metadata": {
2158 "0": {
2159 "name": "a",
2160 "typ_idx": 0,
2161 "added": 0,
2162 "dropped": null
2163 },
2164 "1": {
2165 "name": "b",
2166 "typ_idx": 1,
2167 "added": 0,
2168 "dropped": null
2169 }
2170 }
2171 }
2172 "###);
2173 }
2174
2175 #[mz_ore::test]
2176 #[should_panic(expected = "column named 'a' already exists!")]
2177 fn test_add_column_with_same_name_panics() {
2178 let desc = RelationDesc::builder()
2179 .with_column("a", SqlScalarType::Bool.nullable(true))
2180 .finish();
2181 let mut versioned = VersionedRelationDesc::new(desc);
2182
2183 let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2184 }
2185
2186 #[mz_ore::test]
2187 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
2189 let desc = RelationDesc::builder()
2190 .with_column("a", SqlScalarType::Bool.nullable(true))
2191 .finish();
2192 let mut versioned = VersionedRelationDesc::new(desc);
2193
2194 let v1 = versioned.drop_column("a");
2195 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2196 insta::assert_json_snapshot!(v1, @r###"
2197 {
2198 "typ": {
2199 "column_types": [],
2200 "keys": []
2201 },
2202 "metadata": {}
2203 }
2204 "###);
2205
2206 let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2207 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2208 insta::assert_json_snapshot!(v2, @r###"
2209 {
2210 "typ": {
2211 "column_types": [
2212 {
2213 "scalar_type": "String",
2214 "nullable": false
2215 }
2216 ],
2217 "keys": []
2218 },
2219 "metadata": {
2220 "1": {
2221 "name": "a",
2222 "typ_idx": 0,
2223 "added": 2,
2224 "dropped": null
2225 }
2226 }
2227 }
2228 "###);
2229 }
2230
2231 #[mz_ore::test]
2232 #[cfg_attr(miri, ignore)]
2233 fn apply_demand() {
2234 let desc = RelationDesc::builder()
2235 .with_column("a", SqlScalarType::String.nullable(true))
2236 .with_column("b", SqlScalarType::Int64.nullable(false))
2237 .with_column("c", SqlScalarType::Time.nullable(false))
2238 .finish();
2239 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2240 assert_eq!(desc.arity(), 2);
2241 VersionedRelationDesc::new(desc).validate();
2243 }
2244
2245 #[mz_ore::test]
2246 #[cfg_attr(miri, ignore)]
2247 fn smoketest_column_index_stable_ident() {
2248 let idx_a = ColumnIndex(42);
2249 assert_eq!(idx_a.to_stable_name(), "42");
2251 }
2252
2253 #[mz_ore::test]
2254 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
2256 fn testcase(og: RelationDesc) {
2257 let bytes = og.into_proto().encode_to_vec();
2258 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2259 let rnd = RelationDesc::from_proto(proto).unwrap();
2260
2261 assert_eq!(og, rnd);
2262 }
2263
2264 proptest!(|(desc in any::<RelationDesc>())| {
2265 testcase(desc);
2266 });
2267
2268 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2269 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2270 });
2271
2272 proptest!(|((mut desc, diffs) in strat)| {
2273 for diff in diffs {
2274 diff.apply(&mut desc);
2275 };
2276 testcase(desc);
2277 });
2278 }
2279}