1use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::soft_panic_or_log;
19use mz_ore::str::StrExt;
20use mz_ore::{assert_none, assert_ok};
21use mz_persist_types::schema::SchemaId;
22use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
23use proptest::prelude::*;
24use proptest::strategy::{Strategy, Union};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31 ProtoRelationVersion,
32};
33use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
34
35#[derive(
43 Arbitrary,
44 Clone,
45 Debug,
46 Eq,
47 PartialEq,
48 Ord,
49 PartialOrd,
50 Serialize,
51 Deserialize,
52 Hash,
53 MzReflect
54)]
55pub struct SqlColumnType {
56 pub scalar_type: SqlScalarType,
58 #[serde(default = "return_true")]
60 pub nullable: bool,
61}
62
63#[inline(always)]
69fn return_true() -> bool {
70 true
71}
72
73impl SqlColumnType {
74 pub fn try_union_many<'a>(
78 typs: impl IntoIterator<Item = &'a Self>,
79 ) -> Result<Self, anyhow::Error> {
80 let mut iter = typs.into_iter();
81 let Some(typ) = iter.next() else {
82 bail!("Cannot union empty iterator");
83 };
84 iter.try_fold(typ.clone(), |a, b| a.try_union(b))
85 }
86
87 pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
92 Self::try_union_many(typs).expect("Cannot union empty iterator")
93 }
94
95 pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
99 self.scalar_type
100 .backport_nullability(&backport_typ.scalar_type);
101 self.nullable = backport_typ.nullable;
102 }
103
104 pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
117 match (&self.scalar_type, &other.scalar_type) {
118 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
119 Ok(SqlColumnType {
120 scalar_type: scalar_type.clone(),
121 nullable: self.nullable || other.nullable,
122 })
123 }
124 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
125 Ok(SqlColumnType {
126 scalar_type: scalar_type.without_modifiers(),
127 nullable: self.nullable || other.nullable,
128 })
129 }
130 (
131 SqlScalarType::Record { fields, custom_id },
132 SqlScalarType::Record {
133 fields: other_fields,
134 custom_id: other_custom_id,
135 },
136 ) => {
137 if custom_id != other_custom_id {
138 bail!(
139 "Can't union types: {:?} and {:?}",
140 self.scalar_type,
141 other.scalar_type
142 );
143 };
144
145 if fields.len() != other_fields.len() {
146 bail!(
147 "Can't union types: {:?} and {:?}",
148 self.scalar_type,
149 other.scalar_type
150 );
151 }
152 let mut union_fields = Vec::with_capacity(fields.len());
153 for ((name, typ), (other_name, other_typ)) in
154 fields.iter().zip_eq(other_fields.iter())
155 {
156 if name != other_name {
157 bail!(
158 "Can't union types: {:?} and {:?}",
159 self.scalar_type,
160 other.scalar_type
161 );
162 } else {
163 let union_column_type = typ.sql_union(other_typ)?;
164 union_fields.push((name.clone(), union_column_type));
165 };
166 }
167
168 Ok(SqlColumnType {
169 scalar_type: SqlScalarType::Record {
170 fields: union_fields.into(),
171 custom_id: *custom_id,
172 },
173 nullable: self.nullable || other.nullable,
174 })
175 }
176 _ => bail!(
177 "Can't union types: {:?} and {:?}",
178 self.scalar_type,
179 other.scalar_type
180 ),
181 }
182 }
183
184 pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
194 self.sql_union(other).or_else(|e| {
195 let repr_self = ReprColumnType::from(self);
196 let repr_other = ReprColumnType::from(other);
197 match repr_self.union(&repr_other) {
198 Ok(typ) => {
199 soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
202 Ok(SqlColumnType::from_repr(&typ))
203 }
204 Err(_) => {
205 Err(e)
208 }
209 }
210 })
211 }
212
213 pub fn union(&self, other: &Self) -> Self {
218 self.try_union(other).unwrap_or_else(|e| {
219 panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
220 })
221 }
222
223 pub fn nullable(mut self, nullable: bool) -> Self {
226 self.nullable = nullable;
227 self
228 }
229}
230
231impl RustType<ProtoColumnType> for SqlColumnType {
232 fn into_proto(&self) -> ProtoColumnType {
233 ProtoColumnType {
234 nullable: self.nullable,
235 scalar_type: Some(self.scalar_type.into_proto()),
236 }
237 }
238
239 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
240 Ok(SqlColumnType {
241 nullable: proto.nullable,
242 scalar_type: proto
243 .scalar_type
244 .into_rust_if_some("ProtoColumnType::scalar_type")?,
245 })
246 }
247}
248
249impl fmt::Display for SqlColumnType {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 let nullable = if self.nullable { "Null" } else { "NotNull" };
252 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
253 }
254}
255
256#[derive(
258 Arbitrary,
259 Clone,
260 Debug,
261 Eq,
262 PartialEq,
263 Ord,
264 PartialOrd,
265 Serialize,
266 Deserialize,
267 Hash,
268 MzReflect
269)]
270pub struct SqlRelationType {
271 pub column_types: Vec<SqlColumnType>,
273 #[serde(default)]
283 pub keys: Vec<Vec<usize>>,
284}
285
286impl SqlRelationType {
287 pub fn empty() -> Self {
290 SqlRelationType::new(vec![])
291 }
292
293 pub fn new(column_types: Vec<SqlColumnType>) -> Self {
297 SqlRelationType {
298 column_types,
299 keys: Vec::new(),
300 }
301 }
302
303 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
305 indices.sort_unstable();
306 if !self.keys.contains(&indices) {
307 self.keys.push(indices);
308 }
309 self
310 }
311
312 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
313 for key in keys {
314 self = self.with_key(key)
315 }
316 self
317 }
318
319 pub fn arity(&self) -> usize {
321 self.column_types.len()
322 }
323
324 pub fn default_key(&self) -> Vec<usize> {
326 if let Some(key) = self.keys.first() {
327 if key.is_empty() {
328 (0..self.column_types.len()).collect()
329 } else {
330 key.clone()
331 }
332 } else {
333 (0..self.column_types.len()).collect()
334 }
335 }
336
337 pub fn columns(&self) -> &[SqlColumnType] {
339 &self.column_types
340 }
341
342 pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
346 assert_eq!(
347 backport_typ.column_types.len(),
348 self.column_types.len(),
349 "HIR and MIR types should have the same number of columns"
350 );
351 for (backport_col, sql_col) in backport_typ
352 .column_types
353 .iter()
354 .zip_eq(self.column_types.iter_mut())
355 {
356 sql_col.backport_nullability(backport_col);
357 }
358
359 self.keys = backport_typ.keys.clone();
360 }
361
362 pub fn from_repr(repr: &ReprRelationType) -> Self {
366 SqlRelationType {
367 column_types: repr
368 .column_types
369 .iter()
370 .map(SqlColumnType::from_repr)
371 .collect(),
372 keys: repr.keys.clone(),
373 }
374 }
375}
376
377impl RustType<ProtoRelationType> for SqlRelationType {
378 fn into_proto(&self) -> ProtoRelationType {
379 ProtoRelationType {
380 column_types: self.column_types.into_proto(),
381 keys: self.keys.into_proto(),
382 }
383 }
384
385 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
386 Ok(SqlRelationType {
387 column_types: proto.column_types.into_rust()?,
388 keys: proto.keys.into_rust()?,
389 })
390 }
391}
392
393impl RustType<ProtoKey> for Vec<usize> {
394 fn into_proto(&self) -> ProtoKey {
395 ProtoKey {
396 keys: self.into_proto(),
397 }
398 }
399
400 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
401 proto.keys.into_rust()
402 }
403}
404
405#[derive(
407 Clone,
408 Debug,
409 Eq,
410 PartialEq,
411 Ord,
412 PartialOrd,
413 Serialize,
414 Deserialize,
415 Hash,
416 MzReflect
417)]
418pub struct ReprRelationType {
419 pub column_types: Vec<ReprColumnType>,
421 #[serde(default)]
431 pub keys: Vec<Vec<usize>>,
432}
433
434impl ReprRelationType {
435 pub fn empty() -> Self {
438 ReprRelationType::new(vec![])
439 }
440
441 pub fn new(column_types: Vec<ReprColumnType>) -> Self {
445 ReprRelationType {
446 column_types,
447 keys: Vec::new(),
448 }
449 }
450
451 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
453 indices.sort_unstable();
454 if !self.keys.contains(&indices) {
455 self.keys.push(indices);
456 }
457 self
458 }
459
460 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
461 for key in keys {
462 self = self.with_key(key)
463 }
464 self
465 }
466
467 pub fn arity(&self) -> usize {
469 self.column_types.len()
470 }
471
472 pub fn default_key(&self) -> Vec<usize> {
474 if let Some(key) = self.keys.first() {
475 if key.is_empty() {
476 (0..self.column_types.len()).collect()
477 } else {
478 key.clone()
479 }
480 } else {
481 (0..self.column_types.len()).collect()
482 }
483 }
484
485 pub fn columns(&self) -> &[ReprColumnType] {
487 &self.column_types
488 }
489}
490
491impl From<&SqlRelationType> for ReprRelationType {
492 fn from(sql_relation_type: &SqlRelationType) -> Self {
493 ReprRelationType {
494 column_types: sql_relation_type
495 .column_types
496 .iter()
497 .map(ReprColumnType::from)
498 .collect(),
499 keys: sql_relation_type.keys.clone(),
500 }
501 }
502}
503
504#[derive(
505 Clone,
506 Debug,
507 Eq,
508 PartialEq,
509 Ord,
510 PartialOrd,
511 Serialize,
512 Deserialize,
513 Hash,
514 MzReflect
515)]
516pub struct ReprColumnType {
517 pub scalar_type: ReprScalarType,
519 #[serde(default = "return_true")]
521 pub nullable: bool,
522}
523
524impl std::fmt::Display for ReprColumnType {
525 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526 write!(f, "{}", self.scalar_type)?;
527 if self.nullable {
528 write!(f, "?")?;
529 }
530 Ok(())
531 }
532}
533
534impl ReprColumnType {
535 pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
542 let scalar_type = self.scalar_type.union(&col.scalar_type)?;
543 let nullable = self.nullable || col.nullable;
544
545 Ok(ReprColumnType {
546 scalar_type,
547 nullable,
548 })
549 }
550}
551
552impl From<&SqlColumnType> for ReprColumnType {
553 fn from(sql_column_type: &SqlColumnType) -> Self {
554 let scalar_type = &sql_column_type.scalar_type;
555 let scalar_type = scalar_type.into();
556 let nullable = sql_column_type.nullable;
557
558 ReprColumnType {
559 scalar_type,
560 nullable,
561 }
562 }
563}
564
565impl SqlColumnType {
566 pub fn from_repr(repr: &ReprColumnType) -> Self {
570 let scalar_type = &repr.scalar_type;
571 let scalar_type = SqlScalarType::from_repr(scalar_type);
572 let nullable = repr.nullable;
573
574 SqlColumnType {
575 scalar_type,
576 nullable,
577 }
578 }
579}
580
581#[derive(
583 Clone,
584 Debug,
585 Eq,
586 PartialEq,
587 Ord,
588 PartialOrd,
589 Serialize,
590 Deserialize,
591 Hash,
592 MzReflect
593)]
594pub struct ColumnName(Box<str>);
595
596impl ColumnName {
597 #[inline(always)]
599 pub fn as_str(&self) -> &str {
600 &*self
601 }
602
603 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
605 &mut self.0
606 }
607
608 pub fn is_similar(&self, other: &ColumnName) -> bool {
610 const SIMILARITY_THRESHOLD: f64 = 0.6;
611
612 let a_lowercase = self.to_lowercase();
613 let b_lowercase = other.to_lowercase();
614
615 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
616 }
617}
618
619impl std::ops::Deref for ColumnName {
620 type Target = str;
621
622 #[inline(always)]
623 fn deref(&self) -> &Self::Target {
624 &self.0
625 }
626}
627
628impl fmt::Display for ColumnName {
629 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
630 f.write_str(&self.0)
631 }
632}
633
634impl From<String> for ColumnName {
635 fn from(s: String) -> ColumnName {
636 ColumnName(s.into())
637 }
638}
639
640impl From<&str> for ColumnName {
641 fn from(s: &str) -> ColumnName {
642 ColumnName(s.into())
643 }
644}
645
646impl From<&ColumnName> for ColumnName {
647 fn from(n: &ColumnName) -> ColumnName {
648 n.clone()
649 }
650}
651
652impl RustType<ProtoColumnName> for ColumnName {
653 fn into_proto(&self) -> ProtoColumnName {
654 ProtoColumnName {
655 value: Some(self.0.to_string()),
656 }
657 }
658
659 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
660 Ok(ColumnName(
661 proto
662 .value
663 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
664 .into(),
665 ))
666 }
667}
668
669impl From<ColumnName> for mz_sql_parser::ast::Ident {
670 fn from(value: ColumnName) -> Self {
671 mz_sql_parser::ast::Ident::new_unchecked(value.0)
673 }
674}
675
676impl proptest::arbitrary::Arbitrary for ColumnName {
677 type Parameters = ();
678 type Strategy = BoxedStrategy<ColumnName>;
679
680 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
681 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
684 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
685 weights.extend([
686 (5, Just(16..128)),
687 (1, Just(128..1024)),
688 (1, Just(1024..4096)),
689 ]);
690 }
691 let name_length = Union::new_weighted(weights);
692
693 let char_strat = Rc::new(Union::new_weighted(vec![
696 (50, proptest::char::range('A', 'z').boxed()),
697 (1, any::<char>().boxed()),
698 ]));
699
700 name_length
701 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
702 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
703 .no_shrink()
704 .boxed()
705 }
706}
707
708pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
710
711#[derive(
713 Clone,
714 Copy,
715 Debug,
716 Eq,
717 PartialEq,
718 PartialOrd,
719 Ord,
720 Serialize,
721 Deserialize,
722 Hash,
723 MzReflect
724)]
725pub struct ColumnIndex(usize);
726
727static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
728
729impl ColumnIndex {
730 pub fn to_stable_name(&self) -> String {
732 self.0.to_string()
733 }
734
735 pub fn to_raw(&self) -> usize {
736 self.0
737 }
738
739 pub fn from_raw(val: usize) -> Self {
740 ColumnIndex(val)
741 }
742}
743
744#[derive(
746 Clone,
747 Copy,
748 Debug,
749 Eq,
750 PartialEq,
751 PartialOrd,
752 Ord,
753 Serialize,
754 Deserialize,
755 Hash,
756 MzReflect,
757 Arbitrary
758)]
759pub struct RelationVersion(u64);
760
761impl RelationVersion {
762 pub fn root() -> Self {
764 RelationVersion(0)
765 }
766
767 pub fn bump(&self) -> Self {
769 let next_version = self
770 .0
771 .checked_add(1)
772 .expect("added more than u64::MAX columns?");
773 RelationVersion(next_version)
774 }
775
776 pub fn into_raw(self) -> u64 {
780 self.0
781 }
782
783 pub fn from_raw(val: u64) -> RelationVersion {
787 RelationVersion(val)
788 }
789}
790
791impl From<RelationVersion> for SchemaId {
792 fn from(value: RelationVersion) -> Self {
793 SchemaId(usize::cast_from(value.0))
794 }
795}
796
797impl From<mz_sql_parser::ast::Version> for RelationVersion {
798 fn from(value: mz_sql_parser::ast::Version) -> Self {
799 RelationVersion(value.into_inner())
800 }
801}
802
803impl fmt::Display for RelationVersion {
804 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
805 write!(f, "v{}", self.0)
806 }
807}
808
809impl From<RelationVersion> for mz_sql_parser::ast::Version {
810 fn from(value: RelationVersion) -> Self {
811 mz_sql_parser::ast::Version::new(value.0)
812 }
813}
814
815impl RustType<ProtoRelationVersion> for RelationVersion {
816 fn into_proto(&self) -> ProtoRelationVersion {
817 ProtoRelationVersion { value: self.0 }
818 }
819
820 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
821 Ok(RelationVersion(proto.value))
822 }
823}
824
825#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
827struct ColumnMetadata {
828 name: ColumnName,
830 typ_idx: usize,
832 added: RelationVersion,
834 dropped: Option<RelationVersion>,
836}
837
838#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
906pub struct RelationDesc {
907 typ: SqlRelationType,
908 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
909}
910
911impl RustType<ProtoRelationDesc> for RelationDesc {
912 fn into_proto(&self) -> ProtoRelationDesc {
913 let (names, metadata): (Vec<_>, Vec<_>) = self
914 .metadata
915 .values()
916 .map(|meta| {
917 let metadata = ProtoColumnMetadata {
918 added: Some(meta.added.into_proto()),
919 dropped: meta.dropped.map(|v| v.into_proto()),
920 };
921 (meta.name.into_proto(), metadata)
922 })
923 .unzip();
924
925 let is_all_default_metadata = metadata.iter().all(|meta| {
931 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
932 });
933 let metadata = if is_all_default_metadata {
934 Vec::new()
935 } else {
936 metadata
937 };
938
939 ProtoRelationDesc {
940 typ: Some(self.typ.into_proto()),
941 names,
942 metadata,
943 }
944 }
945
946 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
947 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
953 let val = ProtoColumnMetadata {
954 added: Some(RelationVersion::root().into_proto()),
955 dropped: None,
956 };
957 Box::new(itertools::repeat_n(val, proto.names.len()))
958 } else {
959 Box::new(proto.metadata.into_iter())
960 };
961
962 let metadata = proto
963 .names
964 .into_iter()
965 .zip_eq(proto_metadata)
966 .enumerate()
967 .map(|(idx, (name, metadata))| {
968 let meta = ColumnMetadata {
969 name: name.into_rust()?,
970 typ_idx: idx,
971 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
972 dropped: metadata.dropped.into_rust()?,
973 };
974 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
975 })
976 .collect::<Result<_, _>>()?;
977
978 Ok(RelationDesc {
979 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
980 metadata,
981 })
982 }
983}
984
985impl RelationDesc {
986 pub fn builder() -> RelationDescBuilder {
988 RelationDescBuilder::default()
989 }
990
991 pub fn empty() -> Self {
994 RelationDesc {
995 typ: SqlRelationType::empty(),
996 metadata: BTreeMap::default(),
997 }
998 }
999
1000 pub fn is_empty(&self) -> bool {
1002 self == &Self::empty()
1003 }
1004
1005 pub fn len(&self) -> usize {
1007 self.typ().column_types.len()
1008 }
1009
1010 pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1018 where
1019 I: IntoIterator<Item = N>,
1020 N: Into<ColumnName>,
1021 {
1022 let metadata: BTreeMap<_, _> = names
1023 .into_iter()
1024 .enumerate()
1025 .map(|(idx, name)| {
1026 let col_idx = ColumnIndex(idx);
1027 let metadata = ColumnMetadata {
1028 name: name.into(),
1029 typ_idx: idx,
1030 added: RelationVersion::root(),
1031 dropped: None,
1032 };
1033 (col_idx, metadata)
1034 })
1035 .collect();
1036
1037 assert_eq!(typ.column_types.len(), metadata.len());
1039
1040 RelationDesc { typ, metadata }
1041 }
1042
1043 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1044 where
1045 I: IntoIterator<Item = (N, T)>,
1046 T: Into<SqlColumnType>,
1047 N: Into<ColumnName>,
1048 {
1049 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1050 let types = types.into_iter().map(Into::into).collect();
1051 let typ = SqlRelationType::new(types);
1052 Self::new(typ, names)
1053 }
1054
1055 pub fn concat(mut self, other: Self) -> Self {
1065 let self_len = self.typ.column_types.len();
1066
1067 for (typ, (_col_idx, meta)) in other
1068 .typ
1069 .column_types
1070 .into_iter()
1071 .zip_eq(other.metadata.into_iter())
1072 {
1073 assert_eq!(meta.added, RelationVersion::root());
1074 assert_none!(meta.dropped);
1075
1076 let new_idx = self.typ.columns().len();
1077 let new_meta = ColumnMetadata {
1078 name: meta.name,
1079 typ_idx: new_idx,
1080 added: RelationVersion::root(),
1081 dropped: None,
1082 };
1083
1084 self.typ.column_types.push(typ);
1085 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1086
1087 assert_eq!(self.metadata.len(), self.typ.columns().len());
1088 assert_none!(prev);
1089 }
1090
1091 for k in other.typ.keys {
1092 let k = k.into_iter().map(|idx| idx + self_len).collect();
1093 self = self.with_key(k);
1094 }
1095 self
1096 }
1097
1098 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1100 self.typ = self.typ.with_key(indices);
1101 self
1102 }
1103
1104 pub fn without_keys(mut self) -> Self {
1106 self.typ.keys.clear();
1107 self
1108 }
1109
1110 pub fn with_names<I, N>(self, names: I) -> Self
1118 where
1119 I: IntoIterator<Item = N>,
1120 N: Into<ColumnName>,
1121 {
1122 Self::new(self.typ, names)
1123 }
1124
1125 pub fn arity(&self) -> usize {
1127 self.typ.arity()
1128 }
1129
1130 pub fn typ(&self) -> &SqlRelationType {
1132 &self.typ
1133 }
1134
1135 pub fn into_typ(self) -> SqlRelationType {
1137 self.typ
1138 }
1139
1140 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1142 self.metadata.values().map(|meta| {
1143 let typ = &self.typ.columns()[meta.typ_idx];
1144 (&meta.name, typ)
1145 })
1146 }
1147
1148 pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1150 self.typ.column_types.iter()
1151 }
1152
1153 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1155 self.metadata.values().map(|meta| &meta.name)
1156 }
1157
1158 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1160 self.metadata.iter().map(|(col_idx, metadata)| {
1161 let col_typ = &self.typ.columns()[metadata.typ_idx];
1162 (col_idx, &metadata.name, col_typ)
1163 })
1164 }
1165
1166 pub fn iter_similar_names<'a>(
1169 &'a self,
1170 name: &'a ColumnName,
1171 ) -> impl Iterator<Item = &'a ColumnName> {
1172 self.iter_names().filter(|n| n.is_similar(name))
1173 }
1174
1175 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1177 self.metadata.contains_key(idx)
1178 }
1179
1180 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1186 self.iter_names()
1187 .position(|n| n == name)
1188 .map(|i| (i, &self.typ.column_types[i]))
1189 }
1190
1191 pub fn get_name(&self, i: usize) -> &ColumnName {
1199 self.get_name_idx(&ColumnIndex(i))
1201 }
1202
1203 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1209 &self.metadata.get(idx).expect("should exist").name
1210 }
1211
1212 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1218 &mut self
1220 .metadata
1221 .get_mut(&ColumnIndex(i))
1222 .expect("should exist")
1223 .name
1224 }
1225
1226 pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1232 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1233 &self.typ.column_types[typ_idx]
1234 }
1235
1236 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1245 let name = self.get_name(i);
1246 if self.iter_names().filter(|n| *n == name).count() == 1 {
1247 Some(name)
1248 } else {
1249 None
1250 }
1251 }
1252
1253 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1258 let name = self.get_name(i);
1259 let typ = &self.typ.column_types[i];
1260 if d == &Datum::Null && !typ.nullable {
1261 Err(NotNullViolation(name.clone()))
1262 } else {
1263 Ok(())
1264 }
1265 }
1266
1267 pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1282 assert_eq!(self.metadata.len(), self.typ.columns().len());
1283 assert_eq!(other.metadata.len(), other.typ.columns().len());
1284 for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1285 assert_eq!(meta.typ_idx, idx.0);
1286 assert_eq!(meta.added, RelationVersion::root());
1287 assert_none!(meta.dropped);
1288 }
1289
1290 let mut column_diffs = BTreeMap::new();
1291 let mut key_diff = None;
1292
1293 let left_arity = self.arity();
1294 let right_arity = other.arity();
1295 let common_arity = std::cmp::min(left_arity, right_arity);
1296
1297 for idx in 0..common_arity {
1298 let left_name = self.get_name(idx);
1299 let right_name = other.get_name(idx);
1300 let left_type = &self.typ.column_types[idx];
1301 let right_type = &other.typ.column_types[idx];
1302
1303 if left_name != right_name {
1304 let diff = ColumnDiff::NameMismatch {
1305 left: left_name.clone(),
1306 right: right_name.clone(),
1307 };
1308 column_diffs.insert(idx, diff);
1309 } else if left_type.scalar_type != right_type.scalar_type {
1310 let diff = ColumnDiff::TypeMismatch {
1311 name: left_name.clone(),
1312 left: left_type.scalar_type.clone(),
1313 right: right_type.scalar_type.clone(),
1314 };
1315 column_diffs.insert(idx, diff);
1316 } else if left_type.nullable != right_type.nullable {
1317 let diff = ColumnDiff::NullabilityMismatch {
1318 name: left_name.clone(),
1319 left: left_type.nullable,
1320 right: right_type.nullable,
1321 };
1322 column_diffs.insert(idx, diff);
1323 }
1324 }
1325
1326 for idx in common_arity..left_arity {
1327 let diff = ColumnDiff::Missing {
1328 name: self.get_name(idx).clone(),
1329 };
1330 column_diffs.insert(idx, diff);
1331 }
1332
1333 for idx in common_arity..right_arity {
1334 let diff = ColumnDiff::Extra {
1335 name: other.get_name(idx).clone(),
1336 };
1337 column_diffs.insert(idx, diff);
1338 }
1339
1340 let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1341 let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1342 if left_keys != right_keys {
1343 let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1344 keys.iter()
1345 .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1346 .collect()
1347 };
1348 key_diff = Some(KeyDiff {
1349 left: column_names(self, left_keys),
1350 right: column_names(other, right_keys),
1351 });
1352 }
1353
1354 RelationDescDiff {
1355 column_diffs,
1356 key_diff,
1357 }
1358 }
1359
1360 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1362 let mut new_desc = self.clone();
1363
1364 let mut removed = 0;
1366 new_desc.metadata.retain(|idx, metadata| {
1367 let retain = demands.contains(&idx.0);
1368 if !retain {
1369 removed += 1;
1370 } else {
1371 metadata.typ_idx -= removed;
1372 }
1373 retain
1374 });
1375
1376 let mut idx = 0;
1378 new_desc.typ.column_types.retain(|_| {
1379 let keep = demands.contains(&idx);
1380 idx += 1;
1381 keep
1382 });
1383
1384 new_desc
1385 }
1386}
1387
1388impl Arbitrary for RelationDesc {
1389 type Parameters = ();
1390 type Strategy = BoxedStrategy<RelationDesc>;
1391
1392 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1393 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1394 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1395 weights.extend([
1396 (12, Just(16..32)),
1397 (6, Just(32..64)),
1398 (3, Just(64..128)),
1399 (1, Just(128..256)),
1400 ]);
1401 }
1402 let num_columns = Union::new_weighted(weights);
1403
1404 num_columns.prop_flat_map(arb_relation_desc).boxed()
1405 }
1406}
1407
1408pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1411 proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1412 .prop_map(RelationDesc::from_names_and_types)
1413}
1414
1415pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1417 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1418 mask.prop_map(move |mask| {
1419 let demands: BTreeSet<_> = mask
1420 .into_iter()
1421 .enumerate()
1422 .filter_map(|(idx, keep)| keep.then_some(idx))
1423 .collect();
1424 desc.apply_demand(&demands)
1425 })
1426}
1427
1428impl IntoIterator for RelationDesc {
1429 type Item = (ColumnName, SqlColumnType);
1430 type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1431
1432 fn into_iter(self) -> Self::IntoIter {
1433 let iter = self
1434 .metadata
1435 .into_values()
1436 .zip_eq(self.typ.column_types)
1437 .map(|(meta, typ)| (meta.name, typ));
1438 Box::new(iter)
1439 }
1440}
1441
1442pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1444 let datums: Vec<_> = desc
1445 .typ()
1446 .columns()
1447 .iter()
1448 .cloned()
1449 .map(arb_datum_for_column)
1450 .collect();
1451 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1452}
1453
1454#[derive(Debug, PartialEq, Eq)]
1456pub struct NotNullViolation(pub ColumnName);
1457
1458impl fmt::Display for NotNullViolation {
1459 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1460 write!(
1461 f,
1462 "null value in column {} violates not-null constraint",
1463 self.0.quoted()
1464 )
1465 }
1466}
1467
1468#[derive(Debug, Clone, PartialEq, Eq)]
1470pub struct RelationDescDiff {
1471 pub column_diffs: BTreeMap<usize, ColumnDiff>,
1473 pub key_diff: Option<KeyDiff>,
1475}
1476
1477impl RelationDescDiff {
1478 pub fn is_empty(&self) -> bool {
1480 self.column_diffs.is_empty() && self.key_diff.is_none()
1481 }
1482}
1483
1484#[derive(Debug, Clone, PartialEq, Eq)]
1486pub enum ColumnDiff {
1487 Missing { name: ColumnName },
1489 Extra { name: ColumnName },
1491 TypeMismatch {
1493 name: ColumnName,
1494 left: SqlScalarType,
1495 right: SqlScalarType,
1496 },
1497 NullabilityMismatch {
1499 name: ColumnName,
1500 left: bool,
1501 right: bool,
1502 },
1503 NameMismatch { left: ColumnName, right: ColumnName },
1505}
1506
1507#[derive(Debug, Clone, PartialEq, Eq)]
1509pub struct KeyDiff {
1510 pub left: BTreeSet<Vec<ColumnName>>,
1512 pub right: BTreeSet<Vec<ColumnName>>,
1514}
1515
1516#[derive(Clone, Default, Debug, PartialEq, Eq)]
1518pub struct RelationDescBuilder {
1519 columns: Vec<(ColumnName, SqlColumnType)>,
1521 keys: Vec<Vec<usize>>,
1523}
1524
1525impl RelationDescBuilder {
1526 pub fn with_column<N: Into<ColumnName>>(
1528 mut self,
1529 name: N,
1530 ty: SqlColumnType,
1531 ) -> RelationDescBuilder {
1532 let name = name.into();
1533 self.columns.push((name, ty));
1534 self
1535 }
1536
1537 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1539 where
1540 I: IntoIterator<Item = (N, T)>,
1541 T: Into<SqlColumnType>,
1542 N: Into<ColumnName>,
1543 {
1544 self.columns
1545 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1546 self
1547 }
1548
1549 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1551 indices.sort_unstable();
1552 if !self.keys.contains(&indices) {
1553 self.keys.push(indices);
1554 }
1555 self
1556 }
1557
1558 pub fn without_keys(mut self) -> RelationDescBuilder {
1560 self.keys.clear();
1561 assert_eq!(self.keys.len(), 0);
1562 self
1563 }
1564
1565 pub fn concat(mut self, other: Self) -> Self {
1567 let self_len = self.columns.len();
1568
1569 self.columns.extend(other.columns);
1570 for k in other.keys {
1571 let k = k.into_iter().map(|idx| idx + self_len).collect();
1572 self = self.with_key(k);
1573 }
1574
1575 self
1576 }
1577
1578 pub fn finish(self) -> RelationDesc {
1580 let mut desc = RelationDesc::from_names_and_types(self.columns);
1581 desc.typ = desc.typ.with_keys(self.keys);
1582 desc
1583 }
1584}
1585
1586#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1588pub enum RelationVersionSelector {
1589 Specific(RelationVersion),
1590 Latest,
1591}
1592
1593impl RelationVersionSelector {
1594 pub fn specific(version: u64) -> Self {
1595 RelationVersionSelector::Specific(RelationVersion(version))
1596 }
1597}
1598
1599#[derive(Debug, Clone, Serialize)]
1605pub struct VersionedRelationDesc {
1606 inner: RelationDesc,
1607}
1608
1609impl VersionedRelationDesc {
1610 pub fn new(inner: RelationDesc) -> Self {
1611 VersionedRelationDesc { inner }
1612 }
1613
1614 #[must_use]
1622 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1623 where
1624 N: Into<ColumnName>,
1625 T: Into<SqlColumnType>,
1626 {
1627 let latest_version = self.latest_version();
1628 let new_version = latest_version.bump();
1629
1630 let name = name.into();
1631 let existing = self
1632 .inner
1633 .metadata
1634 .iter()
1635 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1636 if let Some(existing) = existing {
1637 panic!("column named '{name}' already exists! {existing:?}");
1638 }
1639
1640 let next_idx = self.inner.metadata.len();
1641 let col_meta = ColumnMetadata {
1642 name,
1643 typ_idx: next_idx,
1644 added: new_version,
1645 dropped: None,
1646 };
1647
1648 self.inner.typ.column_types.push(typ.into());
1649 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1650
1651 assert_none!(prev, "column index overlap!");
1652 self.validate();
1653
1654 new_version
1655 }
1656
1657 #[must_use]
1666 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1667 where
1668 N: Into<ColumnName>,
1669 {
1670 let name = name.into();
1671 let latest_version = self.latest_version();
1672 let new_version = latest_version.bump();
1673
1674 let col = self
1675 .inner
1676 .metadata
1677 .values_mut()
1678 .find(|meta| meta.name == name && meta.dropped.is_none())
1679 .expect("column to exist");
1680
1681 assert_none!(col.dropped, "column was already dropped");
1683 col.dropped = Some(new_version);
1684
1685 let dropped_key = self
1687 .inner
1688 .typ
1689 .keys
1690 .iter()
1691 .any(|keys| keys.contains(&col.typ_idx));
1692 assert!(!dropped_key, "column being dropped was used as a key");
1693
1694 self.validate();
1695 new_version
1696 }
1697
1698 pub fn latest(&self) -> RelationDesc {
1700 self.inner.clone()
1701 }
1702
1703 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1705 let up_to_version = match version {
1707 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1708 RelationVersionSelector::Specific(v) => v,
1709 };
1710
1711 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1712 let added = meta.added <= up_to_version;
1713 let dropped = meta
1714 .dropped
1715 .map(|dropped_at| up_to_version >= dropped_at)
1716 .unwrap_or(false);
1717
1718 added && !dropped
1719 });
1720
1721 let mut column_types = Vec::new();
1722 let mut column_metas = BTreeMap::new();
1723
1724 for (col_idx, meta) in valid_columns {
1731 let new_meta = ColumnMetadata {
1732 name: meta.name.clone(),
1733 typ_idx: column_types.len(),
1734 added: meta.added.clone(),
1735 dropped: meta.dropped.clone(),
1736 };
1737 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1738 column_metas.insert(*col_idx, new_meta);
1739 }
1740
1741 let keys = self
1747 .inner
1748 .typ
1749 .keys
1750 .iter()
1751 .map(|keys| {
1752 keys.iter()
1753 .map(|key_idx| {
1754 let metadata = column_metas
1755 .get(&ColumnIndex(*key_idx))
1756 .expect("found key for column that doesn't exist");
1757 metadata.typ_idx
1758 })
1759 .collect()
1760 })
1761 .collect();
1762
1763 let relation_type = SqlRelationType { column_types, keys };
1764
1765 RelationDesc {
1766 typ: relation_type,
1767 metadata: column_metas,
1768 }
1769 }
1770
1771 pub fn latest_version(&self) -> RelationVersion {
1772 self.inner
1773 .metadata
1774 .values()
1775 .map(|meta| meta.dropped.unwrap_or(meta.added))
1777 .max()
1778 .unwrap_or_else(RelationVersion::root)
1780 }
1781
1782 fn validate(&self) {
1788 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1789 if desc.typ.column_types.len() != desc.metadata.len() {
1790 anyhow::bail!("mismatch between number of types and metadatas");
1791 }
1792
1793 for (col_idx, meta) in &desc.metadata {
1794 if col_idx.0 > desc.metadata.len() {
1795 anyhow::bail!("column index out of bounds");
1796 }
1797 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1798 anyhow::bail!("column was added after it was dropped?");
1799 }
1800 if desc.typ().columns().get(meta.typ_idx).is_none() {
1801 anyhow::bail!("typ_idx incorrect");
1802 }
1803 }
1804
1805 for keys in &desc.typ.keys {
1806 for key in keys {
1807 if *key >= desc.typ.column_types.len() {
1808 anyhow::bail!("key index was out of bounds!");
1809 }
1810 }
1811 }
1812
1813 let versions = desc
1814 .metadata
1815 .values()
1816 .map(|meta| meta.dropped.unwrap_or(meta.added));
1817 let mut max = 0;
1818 let mut sum = 0;
1819 for version in versions {
1820 max = std::cmp::max(max, version.0);
1821 sum += version.0;
1822 }
1823
1824 if sum != (max * (max + 1) / 2) {
1834 anyhow::bail!("there is a duplicate or missing relation version");
1835 }
1836
1837 Ok(())
1838 }
1839
1840 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1841 }
1842}
1843
1844#[derive(Debug)]
1847pub enum PropRelationDescDiff {
1848 AddColumn {
1849 name: ColumnName,
1850 typ: SqlColumnType,
1851 },
1852 DropColumn {
1853 name: ColumnName,
1854 },
1855 ToggleNullability {
1856 name: ColumnName,
1857 },
1858 ChangeType {
1859 name: ColumnName,
1860 typ: SqlColumnType,
1861 },
1862}
1863
1864impl PropRelationDescDiff {
1865 pub fn apply(self, desc: &mut RelationDesc) {
1866 match self {
1867 PropRelationDescDiff::AddColumn { name, typ } => {
1868 let new_idx = desc.metadata.len();
1869 let meta = ColumnMetadata {
1870 name,
1871 typ_idx: new_idx,
1872 added: RelationVersion(0),
1873 dropped: None,
1874 };
1875 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1876 desc.typ.column_types.push(typ);
1877
1878 assert_none!(prev);
1879 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1880 }
1881 PropRelationDescDiff::DropColumn { name } => {
1882 let next_version = desc
1883 .metadata
1884 .values()
1885 .map(|meta| meta.dropped.unwrap_or(meta.added))
1886 .max()
1887 .unwrap_or_else(RelationVersion::root)
1888 .bump();
1889 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1890 else {
1891 return;
1892 };
1893 if metadata.dropped.is_none() {
1894 metadata.dropped = Some(next_version);
1895 }
1896 }
1897 PropRelationDescDiff::ToggleNullability { name } => {
1898 let Some((pos, _)) = desc.get_by_name(&name) else {
1899 return;
1900 };
1901 let col_type = desc
1902 .typ
1903 .column_types
1904 .get_mut(pos)
1905 .expect("ColumnNames and SqlColumnTypes out of sync!");
1906 col_type.nullable = !col_type.nullable;
1907 }
1908 PropRelationDescDiff::ChangeType { name, typ } => {
1909 let Some((pos, _)) = desc.get_by_name(&name) else {
1910 return;
1911 };
1912 let col_type = desc
1913 .typ
1914 .column_types
1915 .get_mut(pos)
1916 .expect("ColumnNames and SqlColumnTypes out of sync!");
1917 *col_type = typ;
1918 }
1919 }
1920 }
1921}
1922
1923pub fn arb_relation_desc_diff(
1925 source: &RelationDesc,
1926) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1927 let source = Rc::new(source.clone());
1928 let num_source_columns = source.typ.columns().len();
1929
1930 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1931 let add_columns_strat = num_add_columns
1932 .prop_flat_map(|num_columns| {
1933 proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1934 })
1935 .prop_map(|cols| {
1936 cols.into_iter()
1937 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1938 .collect::<Vec<_>>()
1939 });
1940
1941 if num_source_columns == 0 {
1943 return add_columns_strat.boxed();
1944 }
1945
1946 let source_ = Rc::clone(&source);
1947 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1948 let mut set = BTreeSet::default();
1949 for _ in 0..num_columns {
1950 let col_idx = rng.random_range(0..num_source_columns);
1951 set.insert(source_.get_name(col_idx).clone());
1952 }
1953 set.into_iter()
1954 .map(|name| PropRelationDescDiff::DropColumn { name })
1955 .collect::<Vec<_>>()
1956 });
1957
1958 let source_ = Rc::clone(&source);
1959 let toggle_nullability_strat =
1960 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1961 let mut set = BTreeSet::default();
1962 for _ in 0..num_columns {
1963 let col_idx = rng.random_range(0..num_source_columns);
1964 set.insert(source_.get_name(col_idx).clone());
1965 }
1966 set.into_iter()
1967 .map(|name| PropRelationDescDiff::ToggleNullability { name })
1968 .collect::<Vec<_>>()
1969 });
1970
1971 let source_ = Rc::clone(&source);
1972 let change_type_strat = (0..num_source_columns)
1973 .prop_perturb(move |num_columns, mut rng| {
1974 let mut set = BTreeSet::default();
1975 for _ in 0..num_columns {
1976 let col_idx = rng.random_range(0..num_source_columns);
1977 set.insert(source_.get_name(col_idx).clone());
1978 }
1979 set
1980 })
1981 .prop_flat_map(|cols| {
1982 proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1983 .prop_map(move |types| (cols.clone(), types))
1984 })
1985 .prop_map(|(cols, types)| {
1986 cols.into_iter()
1987 .zip_eq(types)
1988 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1989 .collect::<Vec<_>>()
1990 });
1991
1992 (
1993 add_columns_strat,
1994 drop_columns_strat,
1995 toggle_nullability_strat,
1996 change_type_strat,
1997 )
1998 .prop_map(|(adds, drops, toggles, changes)| {
1999 adds.into_iter()
2000 .chain(drops)
2001 .chain(toggles)
2002 .chain(changes)
2003 .collect::<Vec<_>>()
2004 })
2005 .prop_shuffle()
2006 .boxed()
2007}
2008
2009#[cfg(test)]
2010mod tests {
2011 use super::*;
2012 use prost::Message;
2013
2014 #[mz_ore::test]
2015 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
2017 let desc = RelationDesc::builder()
2018 .with_column("a", SqlScalarType::Bool.nullable(true))
2019 .with_column("z", SqlScalarType::String.nullable(false))
2020 .finish();
2021
2022 let mut versioned_desc = VersionedRelationDesc {
2023 inner: desc.clone(),
2024 };
2025 versioned_desc.validate();
2026
2027 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2028 assert_eq!(desc, latest);
2029
2030 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2031 assert_eq!(desc, v0);
2032
2033 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2034 assert_eq!(desc, v3);
2035
2036 let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2037 assert_eq!(v1, RelationVersion(1));
2038
2039 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2040 insta::assert_json_snapshot!(v1.metadata, @r###"
2041 {
2042 "0": {
2043 "name": "a",
2044 "typ_idx": 0,
2045 "added": 0,
2046 "dropped": null
2047 },
2048 "1": {
2049 "name": "z",
2050 "typ_idx": 1,
2051 "added": 0,
2052 "dropped": null
2053 },
2054 "2": {
2055 "name": "b",
2056 "typ_idx": 2,
2057 "added": 1,
2058 "dropped": null
2059 }
2060 }
2061 "###);
2062
2063 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2065 assert!(v0.iter().eq(v0_b.iter()));
2066
2067 let v2 = versioned_desc.drop_column("z");
2068 assert_eq!(v2, RelationVersion(2));
2069
2070 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2071 insta::assert_json_snapshot!(v2.metadata, @r###"
2072 {
2073 "0": {
2074 "name": "a",
2075 "typ_idx": 0,
2076 "added": 0,
2077 "dropped": null
2078 },
2079 "2": {
2080 "name": "b",
2081 "typ_idx": 1,
2082 "added": 1,
2083 "dropped": null
2084 }
2085 }
2086 "###);
2087
2088 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2090 assert!(v0.iter().eq(v0_c.iter()));
2091
2092 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2093 assert!(v1.iter().eq(v1_b.iter()));
2094
2095 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2096 {
2097 "0": {
2098 "name": "a",
2099 "typ_idx": 0,
2100 "added": 0,
2101 "dropped": null
2102 },
2103 "1": {
2104 "name": "z",
2105 "typ_idx": 1,
2106 "added": 0,
2107 "dropped": 2
2108 },
2109 "2": {
2110 "name": "b",
2111 "typ_idx": 2,
2112 "added": 1,
2113 "dropped": null
2114 }
2115 }
2116 "###);
2117 }
2118
2119 #[mz_ore::test]
2120 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
2122 let desc = RelationDesc::builder()
2123 .with_column("a", SqlScalarType::Bool.nullable(true))
2124 .with_column("z", SqlScalarType::String.nullable(false))
2125 .with_key(vec![1])
2126 .finish();
2127
2128 let mut versioned_desc = VersionedRelationDesc {
2129 inner: desc.clone(),
2130 };
2131 versioned_desc.validate();
2132
2133 let v1 = versioned_desc.drop_column("a");
2134 assert_eq!(v1, RelationVersion(1));
2135
2136 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2138 insta::assert_json_snapshot!(v1, @r###"
2139 {
2140 "typ": {
2141 "column_types": [
2142 {
2143 "scalar_type": "String",
2144 "nullable": false
2145 }
2146 ],
2147 "keys": [
2148 [
2149 0
2150 ]
2151 ]
2152 },
2153 "metadata": {
2154 "1": {
2155 "name": "z",
2156 "typ_idx": 0,
2157 "added": 0,
2158 "dropped": null
2159 }
2160 }
2161 }
2162 "###);
2163
2164 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2166 insta::assert_json_snapshot!(v0, @r###"
2167 {
2168 "typ": {
2169 "column_types": [
2170 {
2171 "scalar_type": "Bool",
2172 "nullable": true
2173 },
2174 {
2175 "scalar_type": "String",
2176 "nullable": false
2177 }
2178 ],
2179 "keys": [
2180 [
2181 1
2182 ]
2183 ]
2184 },
2185 "metadata": {
2186 "0": {
2187 "name": "a",
2188 "typ_idx": 0,
2189 "added": 0,
2190 "dropped": 1
2191 },
2192 "1": {
2193 "name": "z",
2194 "typ_idx": 1,
2195 "added": 0,
2196 "dropped": null
2197 }
2198 }
2199 }
2200 "###);
2201 }
2202
2203 #[mz_ore::test]
2204 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
2206 let typ = ProtoRelationType {
2207 column_types: vec![
2208 SqlScalarType::String.nullable(false).into_proto(),
2209 SqlScalarType::Bool.nullable(true).into_proto(),
2210 ],
2211 keys: vec![],
2212 };
2213 let proto = ProtoRelationDesc {
2214 typ: Some(typ),
2215 names: vec![
2216 ColumnName("a".into()).into_proto(),
2217 ColumnName("b".into()).into_proto(),
2218 ],
2219 metadata: vec![],
2220 };
2221 let desc: RelationDesc = proto.into_rust().unwrap();
2222
2223 insta::assert_json_snapshot!(desc, @r###"
2224 {
2225 "typ": {
2226 "column_types": [
2227 {
2228 "scalar_type": "String",
2229 "nullable": false
2230 },
2231 {
2232 "scalar_type": "Bool",
2233 "nullable": true
2234 }
2235 ],
2236 "keys": []
2237 },
2238 "metadata": {
2239 "0": {
2240 "name": "a",
2241 "typ_idx": 0,
2242 "added": 0,
2243 "dropped": null
2244 },
2245 "1": {
2246 "name": "b",
2247 "typ_idx": 1,
2248 "added": 0,
2249 "dropped": null
2250 }
2251 }
2252 }
2253 "###);
2254 }
2255
2256 #[mz_ore::test]
2257 #[should_panic(expected = "column named 'a' already exists!")]
2258 fn test_add_column_with_same_name_panics() {
2259 let desc = RelationDesc::builder()
2260 .with_column("a", SqlScalarType::Bool.nullable(true))
2261 .finish();
2262 let mut versioned = VersionedRelationDesc::new(desc);
2263
2264 let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2265 }
2266
2267 #[mz_ore::test]
2268 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
2270 let desc = RelationDesc::builder()
2271 .with_column("a", SqlScalarType::Bool.nullable(true))
2272 .finish();
2273 let mut versioned = VersionedRelationDesc::new(desc);
2274
2275 let v1 = versioned.drop_column("a");
2276 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2277 insta::assert_json_snapshot!(v1, @r###"
2278 {
2279 "typ": {
2280 "column_types": [],
2281 "keys": []
2282 },
2283 "metadata": {}
2284 }
2285 "###);
2286
2287 let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2288 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2289 insta::assert_json_snapshot!(v2, @r###"
2290 {
2291 "typ": {
2292 "column_types": [
2293 {
2294 "scalar_type": "String",
2295 "nullable": false
2296 }
2297 ],
2298 "keys": []
2299 },
2300 "metadata": {
2301 "1": {
2302 "name": "a",
2303 "typ_idx": 0,
2304 "added": 2,
2305 "dropped": null
2306 }
2307 }
2308 }
2309 "###);
2310 }
2311
2312 #[mz_ore::test]
2313 #[cfg_attr(miri, ignore)]
2314 fn apply_demand() {
2315 let desc = RelationDesc::builder()
2316 .with_column("a", SqlScalarType::String.nullable(true))
2317 .with_column("b", SqlScalarType::Int64.nullable(false))
2318 .with_column("c", SqlScalarType::Time.nullable(false))
2319 .finish();
2320 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2321 assert_eq!(desc.arity(), 2);
2322 VersionedRelationDesc::new(desc).validate();
2324 }
2325
2326 #[mz_ore::test]
2327 #[cfg_attr(miri, ignore)]
2328 fn smoketest_column_index_stable_ident() {
2329 let idx_a = ColumnIndex(42);
2330 assert_eq!(idx_a.to_stable_name(), "42");
2332 }
2333
2334 #[mz_ore::test]
2335 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
2337 fn testcase(og: RelationDesc) {
2338 let bytes = og.into_proto().encode_to_vec();
2339 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2340 let rnd = RelationDesc::from_proto(proto).unwrap();
2341
2342 assert_eq!(og, rnd);
2343 }
2344
2345 proptest!(|(desc in any::<RelationDesc>())| {
2346 testcase(desc);
2347 });
2348
2349 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2350 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2351 });
2352
2353 proptest!(|((mut desc, diffs) in strat)| {
2354 for diff in diffs {
2355 diff.apply(&mut desc);
2356 };
2357 testcase(desc);
2358 });
2359 }
2360}