1use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::soft_panic_or_log;
19use mz_ore::str::StrExt;
20use mz_ore::{assert_none, assert_ok};
21use mz_persist_types::schema::SchemaId;
22use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
23use proptest::prelude::*;
24use proptest::strategy::{Strategy, Union};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31 ProtoRelationVersion,
32};
33use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
34
35#[derive(
43 Arbitrary,
44 Clone,
45 Debug,
46 Eq,
47 PartialEq,
48 Ord,
49 PartialOrd,
50 Serialize,
51 Deserialize,
52 Hash,
53 MzReflect
54)]
55pub struct SqlColumnType {
56 pub scalar_type: SqlScalarType,
58 #[serde(default = "return_true")]
60 pub nullable: bool,
61}
62
63#[inline(always)]
69fn return_true() -> bool {
70 true
71}
72
73impl SqlColumnType {
74 pub fn try_union_many<'a>(
78 typs: impl IntoIterator<Item = &'a Self>,
79 ) -> Result<Self, anyhow::Error> {
80 let mut iter = typs.into_iter();
81 let Some(typ) = iter.next() else {
82 bail!("Cannot union empty iterator");
83 };
84 iter.try_fold(typ.clone(), |a, b| a.try_union(b))
85 }
86
87 pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
92 Self::try_union_many(typs).expect("Cannot union empty iterator")
93 }
94
95 pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
99 self.scalar_type
100 .backport_nullability(&backport_typ.scalar_type);
101 self.nullable = backport_typ.nullable;
102 }
103
104 pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
117 match (&self.scalar_type, &other.scalar_type) {
118 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
119 Ok(SqlColumnType {
120 scalar_type: scalar_type.clone(),
121 nullable: self.nullable || other.nullable,
122 })
123 }
124 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
125 Ok(SqlColumnType {
126 scalar_type: scalar_type.without_modifiers(),
127 nullable: self.nullable || other.nullable,
128 })
129 }
130 (
131 SqlScalarType::Record { fields, custom_id },
132 SqlScalarType::Record {
133 fields: other_fields,
134 custom_id: other_custom_id,
135 },
136 ) => {
137 if custom_id != other_custom_id {
138 bail!(
139 "Can't union types: {:?} and {:?}",
140 self.scalar_type,
141 other.scalar_type
142 );
143 };
144
145 if fields.len() != other_fields.len() {
146 bail!(
147 "Can't union types: {:?} and {:?}",
148 self.scalar_type,
149 other.scalar_type
150 );
151 }
152 let mut union_fields = Vec::with_capacity(fields.len());
153 for ((name, typ), (other_name, other_typ)) in
154 fields.iter().zip_eq(other_fields.iter())
155 {
156 if name != other_name {
157 bail!(
158 "Can't union types: {:?} and {:?}",
159 self.scalar_type,
160 other.scalar_type
161 );
162 } else {
163 let union_column_type = typ.sql_union(other_typ)?;
164 union_fields.push((name.clone(), union_column_type));
165 };
166 }
167
168 Ok(SqlColumnType {
169 scalar_type: SqlScalarType::Record {
170 fields: union_fields.into(),
171 custom_id: *custom_id,
172 },
173 nullable: self.nullable || other.nullable,
174 })
175 }
176 _ => bail!(
177 "Can't union types: {:?} and {:?}",
178 self.scalar_type,
179 other.scalar_type
180 ),
181 }
182 }
183
184 pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
194 self.sql_union(other).or_else(|e| {
195 let repr_self = ReprColumnType::from(self);
196 let repr_other = ReprColumnType::from(other);
197 match repr_self.union(&repr_other) {
198 Ok(typ) => {
199 soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
202 Ok(SqlColumnType::from_repr(&typ))
203 }
204 Err(_) => {
205 Err(e)
208 }
209 }
210 })
211 }
212
213 pub fn union(&self, other: &Self) -> Self {
218 self.try_union(other).unwrap_or_else(|e| {
219 panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
220 })
221 }
222
223 pub fn nullable(mut self, nullable: bool) -> Self {
226 self.nullable = nullable;
227 self
228 }
229}
230
231impl RustType<ProtoColumnType> for SqlColumnType {
232 fn into_proto(&self) -> ProtoColumnType {
233 ProtoColumnType {
234 nullable: self.nullable,
235 scalar_type: Some(self.scalar_type.into_proto()),
236 }
237 }
238
239 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
240 Ok(SqlColumnType {
241 nullable: proto.nullable,
242 scalar_type: proto
243 .scalar_type
244 .into_rust_if_some("ProtoColumnType::scalar_type")?,
245 })
246 }
247}
248
249impl fmt::Display for SqlColumnType {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 let nullable = if self.nullable { "Null" } else { "NotNull" };
252 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
253 }
254}
255
256#[derive(
258 Arbitrary,
259 Clone,
260 Debug,
261 Eq,
262 PartialEq,
263 Ord,
264 PartialOrd,
265 Serialize,
266 Deserialize,
267 Hash,
268 MzReflect
269)]
270pub struct SqlRelationType {
271 pub column_types: Vec<SqlColumnType>,
273 #[serde(default)]
283 pub keys: Vec<Vec<usize>>,
284}
285
286impl SqlRelationType {
287 pub fn empty() -> Self {
290 SqlRelationType::new(vec![])
291 }
292
293 pub fn new(column_types: Vec<SqlColumnType>) -> Self {
297 SqlRelationType {
298 column_types,
299 keys: Vec::new(),
300 }
301 }
302
303 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
305 indices.sort_unstable();
306 if !self.keys.contains(&indices) {
307 self.keys.push(indices);
308 }
309 self
310 }
311
312 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
313 for key in keys {
314 self = self.with_key(key)
315 }
316 self
317 }
318
319 pub fn arity(&self) -> usize {
321 self.column_types.len()
322 }
323
324 pub fn default_key(&self) -> Vec<usize> {
326 if let Some(key) = self.keys.first() {
327 if key.is_empty() {
328 (0..self.column_types.len()).collect()
329 } else {
330 key.clone()
331 }
332 } else {
333 (0..self.column_types.len()).collect()
334 }
335 }
336
337 pub fn columns(&self) -> &[SqlColumnType] {
339 &self.column_types
340 }
341
342 pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
346 assert_eq!(
347 backport_typ.column_types.len(),
348 self.column_types.len(),
349 "HIR and MIR types should have the same number of columns"
350 );
351 for (backport_col, sql_col) in backport_typ
352 .column_types
353 .iter()
354 .zip_eq(self.column_types.iter_mut())
355 {
356 sql_col.backport_nullability(backport_col);
357 }
358
359 self.keys = backport_typ.keys.clone();
360 }
361
362 pub fn from_repr(repr: &ReprRelationType) -> Self {
366 SqlRelationType {
367 column_types: repr
368 .column_types
369 .iter()
370 .map(SqlColumnType::from_repr)
371 .collect(),
372 keys: repr.keys.clone(),
373 }
374 }
375}
376
377impl RustType<ProtoRelationType> for SqlRelationType {
378 fn into_proto(&self) -> ProtoRelationType {
379 ProtoRelationType {
380 column_types: self.column_types.into_proto(),
381 keys: self.keys.into_proto(),
382 }
383 }
384
385 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
386 Ok(SqlRelationType {
387 column_types: proto.column_types.into_rust()?,
388 keys: proto.keys.into_rust()?,
389 })
390 }
391}
392
393impl RustType<ProtoKey> for Vec<usize> {
394 fn into_proto(&self) -> ProtoKey {
395 ProtoKey {
396 keys: self.into_proto(),
397 }
398 }
399
400 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
401 proto.keys.into_rust()
402 }
403}
404
405#[derive(
407 Clone,
408 Debug,
409 Eq,
410 PartialEq,
411 Ord,
412 PartialOrd,
413 Serialize,
414 Deserialize,
415 Hash,
416 MzReflect
417)]
418pub struct ReprRelationType {
419 pub column_types: Vec<ReprColumnType>,
421 #[serde(default)]
431 pub keys: Vec<Vec<usize>>,
432}
433
434impl ReprRelationType {
435 pub fn empty() -> Self {
438 ReprRelationType::new(vec![])
439 }
440
441 pub fn new(column_types: Vec<ReprColumnType>) -> Self {
445 ReprRelationType {
446 column_types,
447 keys: Vec::new(),
448 }
449 }
450
451 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
453 indices.sort_unstable();
454 if !self.keys.contains(&indices) {
455 self.keys.push(indices);
456 }
457 self
458 }
459
460 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
461 for key in keys {
462 self = self.with_key(key)
463 }
464 self
465 }
466
467 pub fn arity(&self) -> usize {
469 self.column_types.len()
470 }
471
472 pub fn default_key(&self) -> Vec<usize> {
474 if let Some(key) = self.keys.first() {
475 if key.is_empty() {
476 (0..self.column_types.len()).collect()
477 } else {
478 key.clone()
479 }
480 } else {
481 (0..self.column_types.len()).collect()
482 }
483 }
484
485 pub fn columns(&self) -> &[ReprColumnType] {
487 &self.column_types
488 }
489}
490
491impl From<&SqlRelationType> for ReprRelationType {
492 fn from(sql_relation_type: &SqlRelationType) -> Self {
493 ReprRelationType {
494 column_types: sql_relation_type
495 .column_types
496 .iter()
497 .map(ReprColumnType::from)
498 .collect(),
499 keys: sql_relation_type.keys.clone(),
500 }
501 }
502}
503
504#[derive(
505 Clone,
506 Debug,
507 Eq,
508 PartialEq,
509 Ord,
510 PartialOrd,
511 Serialize,
512 Deserialize,
513 Hash,
514 MzReflect
515)]
516pub struct ReprColumnType {
517 pub scalar_type: ReprScalarType,
519 #[serde(default = "return_true")]
521 pub nullable: bool,
522}
523
524impl ReprColumnType {
525 pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
532 let scalar_type = self.scalar_type.union(&col.scalar_type)?;
533 let nullable = self.nullable || col.nullable;
534
535 Ok(ReprColumnType {
536 scalar_type,
537 nullable,
538 })
539 }
540}
541
542impl From<&SqlColumnType> for ReprColumnType {
543 fn from(sql_column_type: &SqlColumnType) -> Self {
544 let scalar_type = &sql_column_type.scalar_type;
545 let scalar_type = scalar_type.into();
546 let nullable = sql_column_type.nullable;
547
548 ReprColumnType {
549 scalar_type,
550 nullable,
551 }
552 }
553}
554
555impl SqlColumnType {
556 pub fn from_repr(repr: &ReprColumnType) -> Self {
560 let scalar_type = &repr.scalar_type;
561 let scalar_type = SqlScalarType::from_repr(scalar_type);
562 let nullable = repr.nullable;
563
564 SqlColumnType {
565 scalar_type,
566 nullable,
567 }
568 }
569}
570
571#[derive(
573 Clone,
574 Debug,
575 Eq,
576 PartialEq,
577 Ord,
578 PartialOrd,
579 Serialize,
580 Deserialize,
581 Hash,
582 MzReflect
583)]
584pub struct ColumnName(Box<str>);
585
586impl ColumnName {
587 #[inline(always)]
589 pub fn as_str(&self) -> &str {
590 &*self
591 }
592
593 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
595 &mut self.0
596 }
597
598 pub fn is_similar(&self, other: &ColumnName) -> bool {
600 const SIMILARITY_THRESHOLD: f64 = 0.6;
601
602 let a_lowercase = self.to_lowercase();
603 let b_lowercase = other.to_lowercase();
604
605 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
606 }
607}
608
609impl std::ops::Deref for ColumnName {
610 type Target = str;
611
612 #[inline(always)]
613 fn deref(&self) -> &Self::Target {
614 &self.0
615 }
616}
617
618impl fmt::Display for ColumnName {
619 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
620 f.write_str(&self.0)
621 }
622}
623
624impl From<String> for ColumnName {
625 fn from(s: String) -> ColumnName {
626 ColumnName(s.into())
627 }
628}
629
630impl From<&str> for ColumnName {
631 fn from(s: &str) -> ColumnName {
632 ColumnName(s.into())
633 }
634}
635
636impl From<&ColumnName> for ColumnName {
637 fn from(n: &ColumnName) -> ColumnName {
638 n.clone()
639 }
640}
641
642impl RustType<ProtoColumnName> for ColumnName {
643 fn into_proto(&self) -> ProtoColumnName {
644 ProtoColumnName {
645 value: Some(self.0.to_string()),
646 }
647 }
648
649 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
650 Ok(ColumnName(
651 proto
652 .value
653 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
654 .into(),
655 ))
656 }
657}
658
659impl From<ColumnName> for mz_sql_parser::ast::Ident {
660 fn from(value: ColumnName) -> Self {
661 mz_sql_parser::ast::Ident::new_unchecked(value.0)
663 }
664}
665
666impl proptest::arbitrary::Arbitrary for ColumnName {
667 type Parameters = ();
668 type Strategy = BoxedStrategy<ColumnName>;
669
670 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
671 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
674 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
675 weights.extend([
676 (5, Just(16..128)),
677 (1, Just(128..1024)),
678 (1, Just(1024..4096)),
679 ]);
680 }
681 let name_length = Union::new_weighted(weights);
682
683 let char_strat = Rc::new(Union::new_weighted(vec![
686 (50, proptest::char::range('A', 'z').boxed()),
687 (1, any::<char>().boxed()),
688 ]));
689
690 name_length
691 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
692 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
693 .no_shrink()
694 .boxed()
695 }
696}
697
698pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
700
701#[derive(
703 Clone,
704 Copy,
705 Debug,
706 Eq,
707 PartialEq,
708 PartialOrd,
709 Ord,
710 Serialize,
711 Deserialize,
712 Hash,
713 MzReflect
714)]
715pub struct ColumnIndex(usize);
716
717static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
718
719impl ColumnIndex {
720 pub fn to_stable_name(&self) -> String {
722 self.0.to_string()
723 }
724
725 pub fn to_raw(&self) -> usize {
726 self.0
727 }
728
729 pub fn from_raw(val: usize) -> Self {
730 ColumnIndex(val)
731 }
732}
733
734#[derive(
736 Clone,
737 Copy,
738 Debug,
739 Eq,
740 PartialEq,
741 PartialOrd,
742 Ord,
743 Serialize,
744 Deserialize,
745 Hash,
746 MzReflect,
747 Arbitrary
748)]
749pub struct RelationVersion(u64);
750
751impl RelationVersion {
752 pub fn root() -> Self {
754 RelationVersion(0)
755 }
756
757 pub fn bump(&self) -> Self {
759 let next_version = self
760 .0
761 .checked_add(1)
762 .expect("added more than u64::MAX columns?");
763 RelationVersion(next_version)
764 }
765
766 pub fn into_raw(self) -> u64 {
770 self.0
771 }
772
773 pub fn from_raw(val: u64) -> RelationVersion {
777 RelationVersion(val)
778 }
779}
780
781impl From<RelationVersion> for SchemaId {
782 fn from(value: RelationVersion) -> Self {
783 SchemaId(usize::cast_from(value.0))
784 }
785}
786
787impl From<mz_sql_parser::ast::Version> for RelationVersion {
788 fn from(value: mz_sql_parser::ast::Version) -> Self {
789 RelationVersion(value.into_inner())
790 }
791}
792
793impl fmt::Display for RelationVersion {
794 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
795 write!(f, "v{}", self.0)
796 }
797}
798
799impl From<RelationVersion> for mz_sql_parser::ast::Version {
800 fn from(value: RelationVersion) -> Self {
801 mz_sql_parser::ast::Version::new(value.0)
802 }
803}
804
805impl RustType<ProtoRelationVersion> for RelationVersion {
806 fn into_proto(&self) -> ProtoRelationVersion {
807 ProtoRelationVersion { value: self.0 }
808 }
809
810 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
811 Ok(RelationVersion(proto.value))
812 }
813}
814
815#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
817struct ColumnMetadata {
818 name: ColumnName,
820 typ_idx: usize,
822 added: RelationVersion,
824 dropped: Option<RelationVersion>,
826}
827
828#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
896pub struct RelationDesc {
897 typ: SqlRelationType,
898 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
899}
900
901impl RustType<ProtoRelationDesc> for RelationDesc {
902 fn into_proto(&self) -> ProtoRelationDesc {
903 let (names, metadata): (Vec<_>, Vec<_>) = self
904 .metadata
905 .values()
906 .map(|meta| {
907 let metadata = ProtoColumnMetadata {
908 added: Some(meta.added.into_proto()),
909 dropped: meta.dropped.map(|v| v.into_proto()),
910 };
911 (meta.name.into_proto(), metadata)
912 })
913 .unzip();
914
915 let is_all_default_metadata = metadata.iter().all(|meta| {
921 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
922 });
923 let metadata = if is_all_default_metadata {
924 Vec::new()
925 } else {
926 metadata
927 };
928
929 ProtoRelationDesc {
930 typ: Some(self.typ.into_proto()),
931 names,
932 metadata,
933 }
934 }
935
936 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
937 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
943 let val = ProtoColumnMetadata {
944 added: Some(RelationVersion::root().into_proto()),
945 dropped: None,
946 };
947 Box::new(itertools::repeat_n(val, proto.names.len()))
948 } else {
949 Box::new(proto.metadata.into_iter())
950 };
951
952 let metadata = proto
953 .names
954 .into_iter()
955 .zip_eq(proto_metadata)
956 .enumerate()
957 .map(|(idx, (name, metadata))| {
958 let meta = ColumnMetadata {
959 name: name.into_rust()?,
960 typ_idx: idx,
961 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
962 dropped: metadata.dropped.into_rust()?,
963 };
964 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
965 })
966 .collect::<Result<_, _>>()?;
967
968 Ok(RelationDesc {
969 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
970 metadata,
971 })
972 }
973}
974
975impl RelationDesc {
976 pub fn builder() -> RelationDescBuilder {
978 RelationDescBuilder::default()
979 }
980
981 pub fn empty() -> Self {
984 RelationDesc {
985 typ: SqlRelationType::empty(),
986 metadata: BTreeMap::default(),
987 }
988 }
989
990 pub fn is_empty(&self) -> bool {
992 self == &Self::empty()
993 }
994
995 pub fn len(&self) -> usize {
997 self.typ().column_types.len()
998 }
999
1000 pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1008 where
1009 I: IntoIterator<Item = N>,
1010 N: Into<ColumnName>,
1011 {
1012 let metadata: BTreeMap<_, _> = names
1013 .into_iter()
1014 .enumerate()
1015 .map(|(idx, name)| {
1016 let col_idx = ColumnIndex(idx);
1017 let metadata = ColumnMetadata {
1018 name: name.into(),
1019 typ_idx: idx,
1020 added: RelationVersion::root(),
1021 dropped: None,
1022 };
1023 (col_idx, metadata)
1024 })
1025 .collect();
1026
1027 assert_eq!(typ.column_types.len(), metadata.len());
1029
1030 RelationDesc { typ, metadata }
1031 }
1032
1033 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1034 where
1035 I: IntoIterator<Item = (N, T)>,
1036 T: Into<SqlColumnType>,
1037 N: Into<ColumnName>,
1038 {
1039 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1040 let types = types.into_iter().map(Into::into).collect();
1041 let typ = SqlRelationType::new(types);
1042 Self::new(typ, names)
1043 }
1044
1045 pub fn concat(mut self, other: Self) -> Self {
1055 let self_len = self.typ.column_types.len();
1056
1057 for (typ, (_col_idx, meta)) in other
1058 .typ
1059 .column_types
1060 .into_iter()
1061 .zip_eq(other.metadata.into_iter())
1062 {
1063 assert_eq!(meta.added, RelationVersion::root());
1064 assert_none!(meta.dropped);
1065
1066 let new_idx = self.typ.columns().len();
1067 let new_meta = ColumnMetadata {
1068 name: meta.name,
1069 typ_idx: new_idx,
1070 added: RelationVersion::root(),
1071 dropped: None,
1072 };
1073
1074 self.typ.column_types.push(typ);
1075 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1076
1077 assert_eq!(self.metadata.len(), self.typ.columns().len());
1078 assert_none!(prev);
1079 }
1080
1081 for k in other.typ.keys {
1082 let k = k.into_iter().map(|idx| idx + self_len).collect();
1083 self = self.with_key(k);
1084 }
1085 self
1086 }
1087
1088 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1090 self.typ = self.typ.with_key(indices);
1091 self
1092 }
1093
1094 pub fn without_keys(mut self) -> Self {
1096 self.typ.keys.clear();
1097 self
1098 }
1099
1100 pub fn with_names<I, N>(self, names: I) -> Self
1108 where
1109 I: IntoIterator<Item = N>,
1110 N: Into<ColumnName>,
1111 {
1112 Self::new(self.typ, names)
1113 }
1114
1115 pub fn arity(&self) -> usize {
1117 self.typ.arity()
1118 }
1119
1120 pub fn typ(&self) -> &SqlRelationType {
1122 &self.typ
1123 }
1124
1125 pub fn into_typ(self) -> SqlRelationType {
1127 self.typ
1128 }
1129
1130 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1132 self.metadata.values().map(|meta| {
1133 let typ = &self.typ.columns()[meta.typ_idx];
1134 (&meta.name, typ)
1135 })
1136 }
1137
1138 pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1140 self.typ.column_types.iter()
1141 }
1142
1143 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1145 self.metadata.values().map(|meta| &meta.name)
1146 }
1147
1148 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1150 self.metadata.iter().map(|(col_idx, metadata)| {
1151 let col_typ = &self.typ.columns()[metadata.typ_idx];
1152 (col_idx, &metadata.name, col_typ)
1153 })
1154 }
1155
1156 pub fn iter_similar_names<'a>(
1159 &'a self,
1160 name: &'a ColumnName,
1161 ) -> impl Iterator<Item = &'a ColumnName> {
1162 self.iter_names().filter(|n| n.is_similar(name))
1163 }
1164
1165 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1167 self.metadata.contains_key(idx)
1168 }
1169
1170 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1176 self.iter_names()
1177 .position(|n| n == name)
1178 .map(|i| (i, &self.typ.column_types[i]))
1179 }
1180
1181 pub fn get_name(&self, i: usize) -> &ColumnName {
1189 self.get_name_idx(&ColumnIndex(i))
1191 }
1192
1193 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1199 &self.metadata.get(idx).expect("should exist").name
1200 }
1201
1202 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1208 &mut self
1210 .metadata
1211 .get_mut(&ColumnIndex(i))
1212 .expect("should exist")
1213 .name
1214 }
1215
1216 pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1222 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1223 &self.typ.column_types[typ_idx]
1224 }
1225
1226 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1235 let name = self.get_name(i);
1236 if self.iter_names().filter(|n| *n == name).count() == 1 {
1237 Some(name)
1238 } else {
1239 None
1240 }
1241 }
1242
1243 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1248 let name = self.get_name(i);
1249 let typ = &self.typ.column_types[i];
1250 if d == &Datum::Null && !typ.nullable {
1251 Err(NotNullViolation(name.clone()))
1252 } else {
1253 Ok(())
1254 }
1255 }
1256
1257 pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1272 assert_eq!(self.metadata.len(), self.typ.columns().len());
1273 assert_eq!(other.metadata.len(), other.typ.columns().len());
1274 for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1275 assert_eq!(meta.typ_idx, idx.0);
1276 assert_eq!(meta.added, RelationVersion::root());
1277 assert_none!(meta.dropped);
1278 }
1279
1280 let mut column_diffs = BTreeMap::new();
1281 let mut key_diff = None;
1282
1283 let left_arity = self.arity();
1284 let right_arity = other.arity();
1285 let common_arity = std::cmp::min(left_arity, right_arity);
1286
1287 for idx in 0..common_arity {
1288 let left_name = self.get_name(idx);
1289 let right_name = other.get_name(idx);
1290 let left_type = &self.typ.column_types[idx];
1291 let right_type = &other.typ.column_types[idx];
1292
1293 if left_name != right_name {
1294 let diff = ColumnDiff::NameMismatch {
1295 left: left_name.clone(),
1296 right: right_name.clone(),
1297 };
1298 column_diffs.insert(idx, diff);
1299 } else if left_type.scalar_type != right_type.scalar_type {
1300 let diff = ColumnDiff::TypeMismatch {
1301 name: left_name.clone(),
1302 left: left_type.scalar_type.clone(),
1303 right: right_type.scalar_type.clone(),
1304 };
1305 column_diffs.insert(idx, diff);
1306 } else if left_type.nullable != right_type.nullable {
1307 let diff = ColumnDiff::NullabilityMismatch {
1308 name: left_name.clone(),
1309 left: left_type.nullable,
1310 right: right_type.nullable,
1311 };
1312 column_diffs.insert(idx, diff);
1313 }
1314 }
1315
1316 for idx in common_arity..left_arity {
1317 let diff = ColumnDiff::Missing {
1318 name: self.get_name(idx).clone(),
1319 };
1320 column_diffs.insert(idx, diff);
1321 }
1322
1323 for idx in common_arity..right_arity {
1324 let diff = ColumnDiff::Extra {
1325 name: other.get_name(idx).clone(),
1326 };
1327 column_diffs.insert(idx, diff);
1328 }
1329
1330 let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1331 let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1332 if left_keys != right_keys {
1333 let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1334 keys.iter()
1335 .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1336 .collect()
1337 };
1338 key_diff = Some(KeyDiff {
1339 left: column_names(self, left_keys),
1340 right: column_names(other, right_keys),
1341 });
1342 }
1343
1344 RelationDescDiff {
1345 column_diffs,
1346 key_diff,
1347 }
1348 }
1349
1350 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1352 let mut new_desc = self.clone();
1353
1354 let mut removed = 0;
1356 new_desc.metadata.retain(|idx, metadata| {
1357 let retain = demands.contains(&idx.0);
1358 if !retain {
1359 removed += 1;
1360 } else {
1361 metadata.typ_idx -= removed;
1362 }
1363 retain
1364 });
1365
1366 let mut idx = 0;
1368 new_desc.typ.column_types.retain(|_| {
1369 let keep = demands.contains(&idx);
1370 idx += 1;
1371 keep
1372 });
1373
1374 new_desc
1375 }
1376}
1377
1378impl Arbitrary for RelationDesc {
1379 type Parameters = ();
1380 type Strategy = BoxedStrategy<RelationDesc>;
1381
1382 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1383 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1384 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1385 weights.extend([
1386 (12, Just(16..32)),
1387 (6, Just(32..64)),
1388 (3, Just(64..128)),
1389 (1, Just(128..256)),
1390 ]);
1391 }
1392 let num_columns = Union::new_weighted(weights);
1393
1394 num_columns.prop_flat_map(arb_relation_desc).boxed()
1395 }
1396}
1397
1398pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1401 proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1402 .prop_map(RelationDesc::from_names_and_types)
1403}
1404
1405pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1407 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1408 mask.prop_map(move |mask| {
1409 let demands: BTreeSet<_> = mask
1410 .into_iter()
1411 .enumerate()
1412 .filter_map(|(idx, keep)| keep.then_some(idx))
1413 .collect();
1414 desc.apply_demand(&demands)
1415 })
1416}
1417
1418impl IntoIterator for RelationDesc {
1419 type Item = (ColumnName, SqlColumnType);
1420 type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1421
1422 fn into_iter(self) -> Self::IntoIter {
1423 let iter = self
1424 .metadata
1425 .into_values()
1426 .zip_eq(self.typ.column_types)
1427 .map(|(meta, typ)| (meta.name, typ));
1428 Box::new(iter)
1429 }
1430}
1431
1432pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1434 let datums: Vec<_> = desc
1435 .typ()
1436 .columns()
1437 .iter()
1438 .cloned()
1439 .map(arb_datum_for_column)
1440 .collect();
1441 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1442}
1443
1444#[derive(Debug, PartialEq, Eq)]
1446pub struct NotNullViolation(pub ColumnName);
1447
1448impl fmt::Display for NotNullViolation {
1449 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1450 write!(
1451 f,
1452 "null value in column {} violates not-null constraint",
1453 self.0.quoted()
1454 )
1455 }
1456}
1457
1458#[derive(Debug, Clone, PartialEq, Eq)]
1460pub struct RelationDescDiff {
1461 pub column_diffs: BTreeMap<usize, ColumnDiff>,
1463 pub key_diff: Option<KeyDiff>,
1465}
1466
1467impl RelationDescDiff {
1468 pub fn is_empty(&self) -> bool {
1470 self.column_diffs.is_empty() && self.key_diff.is_none()
1471 }
1472}
1473
1474#[derive(Debug, Clone, PartialEq, Eq)]
1476pub enum ColumnDiff {
1477 Missing { name: ColumnName },
1479 Extra { name: ColumnName },
1481 TypeMismatch {
1483 name: ColumnName,
1484 left: SqlScalarType,
1485 right: SqlScalarType,
1486 },
1487 NullabilityMismatch {
1489 name: ColumnName,
1490 left: bool,
1491 right: bool,
1492 },
1493 NameMismatch { left: ColumnName, right: ColumnName },
1495}
1496
1497#[derive(Debug, Clone, PartialEq, Eq)]
1499pub struct KeyDiff {
1500 pub left: BTreeSet<Vec<ColumnName>>,
1502 pub right: BTreeSet<Vec<ColumnName>>,
1504}
1505
1506#[derive(Clone, Default, Debug, PartialEq, Eq)]
1508pub struct RelationDescBuilder {
1509 columns: Vec<(ColumnName, SqlColumnType)>,
1511 keys: Vec<Vec<usize>>,
1513}
1514
1515impl RelationDescBuilder {
1516 pub fn with_column<N: Into<ColumnName>>(
1518 mut self,
1519 name: N,
1520 ty: SqlColumnType,
1521 ) -> RelationDescBuilder {
1522 let name = name.into();
1523 self.columns.push((name, ty));
1524 self
1525 }
1526
1527 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1529 where
1530 I: IntoIterator<Item = (N, T)>,
1531 T: Into<SqlColumnType>,
1532 N: Into<ColumnName>,
1533 {
1534 self.columns
1535 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1536 self
1537 }
1538
1539 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1541 indices.sort_unstable();
1542 if !self.keys.contains(&indices) {
1543 self.keys.push(indices);
1544 }
1545 self
1546 }
1547
1548 pub fn without_keys(mut self) -> RelationDescBuilder {
1550 self.keys.clear();
1551 assert_eq!(self.keys.len(), 0);
1552 self
1553 }
1554
1555 pub fn concat(mut self, other: Self) -> Self {
1557 let self_len = self.columns.len();
1558
1559 self.columns.extend(other.columns);
1560 for k in other.keys {
1561 let k = k.into_iter().map(|idx| idx + self_len).collect();
1562 self = self.with_key(k);
1563 }
1564
1565 self
1566 }
1567
1568 pub fn finish(self) -> RelationDesc {
1570 let mut desc = RelationDesc::from_names_and_types(self.columns);
1571 desc.typ = desc.typ.with_keys(self.keys);
1572 desc
1573 }
1574}
1575
1576#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1578pub enum RelationVersionSelector {
1579 Specific(RelationVersion),
1580 Latest,
1581}
1582
1583impl RelationVersionSelector {
1584 pub fn specific(version: u64) -> Self {
1585 RelationVersionSelector::Specific(RelationVersion(version))
1586 }
1587}
1588
1589#[derive(Debug, Clone, Serialize)]
1595pub struct VersionedRelationDesc {
1596 inner: RelationDesc,
1597}
1598
1599impl VersionedRelationDesc {
1600 pub fn new(inner: RelationDesc) -> Self {
1601 VersionedRelationDesc { inner }
1602 }
1603
1604 #[must_use]
1612 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1613 where
1614 N: Into<ColumnName>,
1615 T: Into<SqlColumnType>,
1616 {
1617 let latest_version = self.latest_version();
1618 let new_version = latest_version.bump();
1619
1620 let name = name.into();
1621 let existing = self
1622 .inner
1623 .metadata
1624 .iter()
1625 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1626 if let Some(existing) = existing {
1627 panic!("column named '{name}' already exists! {existing:?}");
1628 }
1629
1630 let next_idx = self.inner.metadata.len();
1631 let col_meta = ColumnMetadata {
1632 name,
1633 typ_idx: next_idx,
1634 added: new_version,
1635 dropped: None,
1636 };
1637
1638 self.inner.typ.column_types.push(typ.into());
1639 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1640
1641 assert_none!(prev, "column index overlap!");
1642 self.validate();
1643
1644 new_version
1645 }
1646
1647 #[must_use]
1656 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1657 where
1658 N: Into<ColumnName>,
1659 {
1660 let name = name.into();
1661 let latest_version = self.latest_version();
1662 let new_version = latest_version.bump();
1663
1664 let col = self
1665 .inner
1666 .metadata
1667 .values_mut()
1668 .find(|meta| meta.name == name && meta.dropped.is_none())
1669 .expect("column to exist");
1670
1671 assert_none!(col.dropped, "column was already dropped");
1673 col.dropped = Some(new_version);
1674
1675 let dropped_key = self
1677 .inner
1678 .typ
1679 .keys
1680 .iter()
1681 .any(|keys| keys.contains(&col.typ_idx));
1682 assert!(!dropped_key, "column being dropped was used as a key");
1683
1684 self.validate();
1685 new_version
1686 }
1687
1688 pub fn latest(&self) -> RelationDesc {
1690 self.inner.clone()
1691 }
1692
1693 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1695 let up_to_version = match version {
1697 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1698 RelationVersionSelector::Specific(v) => v,
1699 };
1700
1701 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1702 let added = meta.added <= up_to_version;
1703 let dropped = meta
1704 .dropped
1705 .map(|dropped_at| up_to_version >= dropped_at)
1706 .unwrap_or(false);
1707
1708 added && !dropped
1709 });
1710
1711 let mut column_types = Vec::new();
1712 let mut column_metas = BTreeMap::new();
1713
1714 for (col_idx, meta) in valid_columns {
1721 let new_meta = ColumnMetadata {
1722 name: meta.name.clone(),
1723 typ_idx: column_types.len(),
1724 added: meta.added.clone(),
1725 dropped: meta.dropped.clone(),
1726 };
1727 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1728 column_metas.insert(*col_idx, new_meta);
1729 }
1730
1731 let keys = self
1737 .inner
1738 .typ
1739 .keys
1740 .iter()
1741 .map(|keys| {
1742 keys.iter()
1743 .map(|key_idx| {
1744 let metadata = column_metas
1745 .get(&ColumnIndex(*key_idx))
1746 .expect("found key for column that doesn't exist");
1747 metadata.typ_idx
1748 })
1749 .collect()
1750 })
1751 .collect();
1752
1753 let relation_type = SqlRelationType { column_types, keys };
1754
1755 RelationDesc {
1756 typ: relation_type,
1757 metadata: column_metas,
1758 }
1759 }
1760
1761 pub fn latest_version(&self) -> RelationVersion {
1762 self.inner
1763 .metadata
1764 .values()
1765 .map(|meta| meta.dropped.unwrap_or(meta.added))
1767 .max()
1768 .unwrap_or_else(RelationVersion::root)
1770 }
1771
1772 fn validate(&self) {
1778 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1779 if desc.typ.column_types.len() != desc.metadata.len() {
1780 anyhow::bail!("mismatch between number of types and metadatas");
1781 }
1782
1783 for (col_idx, meta) in &desc.metadata {
1784 if col_idx.0 > desc.metadata.len() {
1785 anyhow::bail!("column index out of bounds");
1786 }
1787 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1788 anyhow::bail!("column was added after it was dropped?");
1789 }
1790 if desc.typ().columns().get(meta.typ_idx).is_none() {
1791 anyhow::bail!("typ_idx incorrect");
1792 }
1793 }
1794
1795 for keys in &desc.typ.keys {
1796 for key in keys {
1797 if *key >= desc.typ.column_types.len() {
1798 anyhow::bail!("key index was out of bounds!");
1799 }
1800 }
1801 }
1802
1803 let versions = desc
1804 .metadata
1805 .values()
1806 .map(|meta| meta.dropped.unwrap_or(meta.added));
1807 let mut max = 0;
1808 let mut sum = 0;
1809 for version in versions {
1810 max = std::cmp::max(max, version.0);
1811 sum += version.0;
1812 }
1813
1814 if sum != (max * (max + 1) / 2) {
1824 anyhow::bail!("there is a duplicate or missing relation version");
1825 }
1826
1827 Ok(())
1828 }
1829
1830 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1831 }
1832}
1833
1834#[derive(Debug)]
1837pub enum PropRelationDescDiff {
1838 AddColumn {
1839 name: ColumnName,
1840 typ: SqlColumnType,
1841 },
1842 DropColumn {
1843 name: ColumnName,
1844 },
1845 ToggleNullability {
1846 name: ColumnName,
1847 },
1848 ChangeType {
1849 name: ColumnName,
1850 typ: SqlColumnType,
1851 },
1852}
1853
1854impl PropRelationDescDiff {
1855 pub fn apply(self, desc: &mut RelationDesc) {
1856 match self {
1857 PropRelationDescDiff::AddColumn { name, typ } => {
1858 let new_idx = desc.metadata.len();
1859 let meta = ColumnMetadata {
1860 name,
1861 typ_idx: new_idx,
1862 added: RelationVersion(0),
1863 dropped: None,
1864 };
1865 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1866 desc.typ.column_types.push(typ);
1867
1868 assert_none!(prev);
1869 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1870 }
1871 PropRelationDescDiff::DropColumn { name } => {
1872 let next_version = desc
1873 .metadata
1874 .values()
1875 .map(|meta| meta.dropped.unwrap_or(meta.added))
1876 .max()
1877 .unwrap_or_else(RelationVersion::root)
1878 .bump();
1879 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1880 else {
1881 return;
1882 };
1883 if metadata.dropped.is_none() {
1884 metadata.dropped = Some(next_version);
1885 }
1886 }
1887 PropRelationDescDiff::ToggleNullability { name } => {
1888 let Some((pos, _)) = desc.get_by_name(&name) else {
1889 return;
1890 };
1891 let col_type = desc
1892 .typ
1893 .column_types
1894 .get_mut(pos)
1895 .expect("ColumnNames and SqlColumnTypes out of sync!");
1896 col_type.nullable = !col_type.nullable;
1897 }
1898 PropRelationDescDiff::ChangeType { name, typ } => {
1899 let Some((pos, _)) = desc.get_by_name(&name) else {
1900 return;
1901 };
1902 let col_type = desc
1903 .typ
1904 .column_types
1905 .get_mut(pos)
1906 .expect("ColumnNames and SqlColumnTypes out of sync!");
1907 *col_type = typ;
1908 }
1909 }
1910 }
1911}
1912
1913pub fn arb_relation_desc_diff(
1915 source: &RelationDesc,
1916) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1917 let source = Rc::new(source.clone());
1918 let num_source_columns = source.typ.columns().len();
1919
1920 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1921 let add_columns_strat = num_add_columns
1922 .prop_flat_map(|num_columns| {
1923 proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1924 })
1925 .prop_map(|cols| {
1926 cols.into_iter()
1927 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1928 .collect::<Vec<_>>()
1929 });
1930
1931 if num_source_columns == 0 {
1933 return add_columns_strat.boxed();
1934 }
1935
1936 let source_ = Rc::clone(&source);
1937 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1938 let mut set = BTreeSet::default();
1939 for _ in 0..num_columns {
1940 let col_idx = rng.random_range(0..num_source_columns);
1941 set.insert(source_.get_name(col_idx).clone());
1942 }
1943 set.into_iter()
1944 .map(|name| PropRelationDescDiff::DropColumn { name })
1945 .collect::<Vec<_>>()
1946 });
1947
1948 let source_ = Rc::clone(&source);
1949 let toggle_nullability_strat =
1950 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1951 let mut set = BTreeSet::default();
1952 for _ in 0..num_columns {
1953 let col_idx = rng.random_range(0..num_source_columns);
1954 set.insert(source_.get_name(col_idx).clone());
1955 }
1956 set.into_iter()
1957 .map(|name| PropRelationDescDiff::ToggleNullability { name })
1958 .collect::<Vec<_>>()
1959 });
1960
1961 let source_ = Rc::clone(&source);
1962 let change_type_strat = (0..num_source_columns)
1963 .prop_perturb(move |num_columns, mut rng| {
1964 let mut set = BTreeSet::default();
1965 for _ in 0..num_columns {
1966 let col_idx = rng.random_range(0..num_source_columns);
1967 set.insert(source_.get_name(col_idx).clone());
1968 }
1969 set
1970 })
1971 .prop_flat_map(|cols| {
1972 proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1973 .prop_map(move |types| (cols.clone(), types))
1974 })
1975 .prop_map(|(cols, types)| {
1976 cols.into_iter()
1977 .zip_eq(types)
1978 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1979 .collect::<Vec<_>>()
1980 });
1981
1982 (
1983 add_columns_strat,
1984 drop_columns_strat,
1985 toggle_nullability_strat,
1986 change_type_strat,
1987 )
1988 .prop_map(|(adds, drops, toggles, changes)| {
1989 adds.into_iter()
1990 .chain(drops)
1991 .chain(toggles)
1992 .chain(changes)
1993 .collect::<Vec<_>>()
1994 })
1995 .prop_shuffle()
1996 .boxed()
1997}
1998
1999#[cfg(test)]
2000mod tests {
2001 use super::*;
2002 use prost::Message;
2003
2004 #[mz_ore::test]
2005 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
2007 let desc = RelationDesc::builder()
2008 .with_column("a", SqlScalarType::Bool.nullable(true))
2009 .with_column("z", SqlScalarType::String.nullable(false))
2010 .finish();
2011
2012 let mut versioned_desc = VersionedRelationDesc {
2013 inner: desc.clone(),
2014 };
2015 versioned_desc.validate();
2016
2017 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2018 assert_eq!(desc, latest);
2019
2020 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2021 assert_eq!(desc, v0);
2022
2023 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2024 assert_eq!(desc, v3);
2025
2026 let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2027 assert_eq!(v1, RelationVersion(1));
2028
2029 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2030 insta::assert_json_snapshot!(v1.metadata, @r###"
2031 {
2032 "0": {
2033 "name": "a",
2034 "typ_idx": 0,
2035 "added": 0,
2036 "dropped": null
2037 },
2038 "1": {
2039 "name": "z",
2040 "typ_idx": 1,
2041 "added": 0,
2042 "dropped": null
2043 },
2044 "2": {
2045 "name": "b",
2046 "typ_idx": 2,
2047 "added": 1,
2048 "dropped": null
2049 }
2050 }
2051 "###);
2052
2053 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2055 assert!(v0.iter().eq(v0_b.iter()));
2056
2057 let v2 = versioned_desc.drop_column("z");
2058 assert_eq!(v2, RelationVersion(2));
2059
2060 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2061 insta::assert_json_snapshot!(v2.metadata, @r###"
2062 {
2063 "0": {
2064 "name": "a",
2065 "typ_idx": 0,
2066 "added": 0,
2067 "dropped": null
2068 },
2069 "2": {
2070 "name": "b",
2071 "typ_idx": 1,
2072 "added": 1,
2073 "dropped": null
2074 }
2075 }
2076 "###);
2077
2078 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2080 assert!(v0.iter().eq(v0_c.iter()));
2081
2082 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2083 assert!(v1.iter().eq(v1_b.iter()));
2084
2085 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2086 {
2087 "0": {
2088 "name": "a",
2089 "typ_idx": 0,
2090 "added": 0,
2091 "dropped": null
2092 },
2093 "1": {
2094 "name": "z",
2095 "typ_idx": 1,
2096 "added": 0,
2097 "dropped": 2
2098 },
2099 "2": {
2100 "name": "b",
2101 "typ_idx": 2,
2102 "added": 1,
2103 "dropped": null
2104 }
2105 }
2106 "###);
2107 }
2108
2109 #[mz_ore::test]
2110 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
2112 let desc = RelationDesc::builder()
2113 .with_column("a", SqlScalarType::Bool.nullable(true))
2114 .with_column("z", SqlScalarType::String.nullable(false))
2115 .with_key(vec![1])
2116 .finish();
2117
2118 let mut versioned_desc = VersionedRelationDesc {
2119 inner: desc.clone(),
2120 };
2121 versioned_desc.validate();
2122
2123 let v1 = versioned_desc.drop_column("a");
2124 assert_eq!(v1, RelationVersion(1));
2125
2126 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2128 insta::assert_json_snapshot!(v1, @r###"
2129 {
2130 "typ": {
2131 "column_types": [
2132 {
2133 "scalar_type": "String",
2134 "nullable": false
2135 }
2136 ],
2137 "keys": [
2138 [
2139 0
2140 ]
2141 ]
2142 },
2143 "metadata": {
2144 "1": {
2145 "name": "z",
2146 "typ_idx": 0,
2147 "added": 0,
2148 "dropped": null
2149 }
2150 }
2151 }
2152 "###);
2153
2154 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2156 insta::assert_json_snapshot!(v0, @r###"
2157 {
2158 "typ": {
2159 "column_types": [
2160 {
2161 "scalar_type": "Bool",
2162 "nullable": true
2163 },
2164 {
2165 "scalar_type": "String",
2166 "nullable": false
2167 }
2168 ],
2169 "keys": [
2170 [
2171 1
2172 ]
2173 ]
2174 },
2175 "metadata": {
2176 "0": {
2177 "name": "a",
2178 "typ_idx": 0,
2179 "added": 0,
2180 "dropped": 1
2181 },
2182 "1": {
2183 "name": "z",
2184 "typ_idx": 1,
2185 "added": 0,
2186 "dropped": null
2187 }
2188 }
2189 }
2190 "###);
2191 }
2192
2193 #[mz_ore::test]
2194 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
2196 let typ = ProtoRelationType {
2197 column_types: vec![
2198 SqlScalarType::String.nullable(false).into_proto(),
2199 SqlScalarType::Bool.nullable(true).into_proto(),
2200 ],
2201 keys: vec![],
2202 };
2203 let proto = ProtoRelationDesc {
2204 typ: Some(typ),
2205 names: vec![
2206 ColumnName("a".into()).into_proto(),
2207 ColumnName("b".into()).into_proto(),
2208 ],
2209 metadata: vec![],
2210 };
2211 let desc: RelationDesc = proto.into_rust().unwrap();
2212
2213 insta::assert_json_snapshot!(desc, @r###"
2214 {
2215 "typ": {
2216 "column_types": [
2217 {
2218 "scalar_type": "String",
2219 "nullable": false
2220 },
2221 {
2222 "scalar_type": "Bool",
2223 "nullable": true
2224 }
2225 ],
2226 "keys": []
2227 },
2228 "metadata": {
2229 "0": {
2230 "name": "a",
2231 "typ_idx": 0,
2232 "added": 0,
2233 "dropped": null
2234 },
2235 "1": {
2236 "name": "b",
2237 "typ_idx": 1,
2238 "added": 0,
2239 "dropped": null
2240 }
2241 }
2242 }
2243 "###);
2244 }
2245
2246 #[mz_ore::test]
2247 #[should_panic(expected = "column named 'a' already exists!")]
2248 fn test_add_column_with_same_name_panics() {
2249 let desc = RelationDesc::builder()
2250 .with_column("a", SqlScalarType::Bool.nullable(true))
2251 .finish();
2252 let mut versioned = VersionedRelationDesc::new(desc);
2253
2254 let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2255 }
2256
2257 #[mz_ore::test]
2258 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
2260 let desc = RelationDesc::builder()
2261 .with_column("a", SqlScalarType::Bool.nullable(true))
2262 .finish();
2263 let mut versioned = VersionedRelationDesc::new(desc);
2264
2265 let v1 = versioned.drop_column("a");
2266 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2267 insta::assert_json_snapshot!(v1, @r###"
2268 {
2269 "typ": {
2270 "column_types": [],
2271 "keys": []
2272 },
2273 "metadata": {}
2274 }
2275 "###);
2276
2277 let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2278 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2279 insta::assert_json_snapshot!(v2, @r###"
2280 {
2281 "typ": {
2282 "column_types": [
2283 {
2284 "scalar_type": "String",
2285 "nullable": false
2286 }
2287 ],
2288 "keys": []
2289 },
2290 "metadata": {
2291 "1": {
2292 "name": "a",
2293 "typ_idx": 0,
2294 "added": 2,
2295 "dropped": null
2296 }
2297 }
2298 }
2299 "###);
2300 }
2301
2302 #[mz_ore::test]
2303 #[cfg_attr(miri, ignore)]
2304 fn apply_demand() {
2305 let desc = RelationDesc::builder()
2306 .with_column("a", SqlScalarType::String.nullable(true))
2307 .with_column("b", SqlScalarType::Int64.nullable(false))
2308 .with_column("c", SqlScalarType::Time.nullable(false))
2309 .finish();
2310 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2311 assert_eq!(desc.arity(), 2);
2312 VersionedRelationDesc::new(desc).validate();
2314 }
2315
2316 #[mz_ore::test]
2317 #[cfg_attr(miri, ignore)]
2318 fn smoketest_column_index_stable_ident() {
2319 let idx_a = ColumnIndex(42);
2320 assert_eq!(idx_a.to_stable_name(), "42");
2322 }
2323
2324 #[mz_ore::test]
2325 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
2327 fn testcase(og: RelationDesc) {
2328 let bytes = og.into_proto().encode_to_vec();
2329 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2330 let rnd = RelationDesc::from_proto(proto).unwrap();
2331
2332 assert_eq!(og, rnd);
2333 }
2334
2335 proptest!(|(desc in any::<RelationDesc>())| {
2336 testcase(desc);
2337 });
2338
2339 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2340 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2341 });
2342
2343 proptest!(|((mut desc, diffs) in strat)| {
2344 for diff in diffs {
2345 diff.apply(&mut desc);
2346 };
2347 testcase(desc);
2348 });
2349 }
2350}