1use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::soft_panic_or_log;
19use mz_ore::str::StrExt;
20use mz_ore::{assert_none, assert_ok};
21use mz_persist_types::schema::SchemaId;
22use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
23use proptest::prelude::*;
24use proptest::strategy::{Strategy, Union};
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28use crate::relation_and_scalar::proto_relation_type::ProtoKey;
29pub use crate::relation_and_scalar::{
30 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
31 ProtoRelationVersion,
32};
33use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
34
35#[derive(
43 Arbitrary,
44 Clone,
45 Debug,
46 Eq,
47 PartialEq,
48 Ord,
49 PartialOrd,
50 Serialize,
51 Deserialize,
52 Hash,
53 MzReflect
54)]
55pub struct SqlColumnType {
56 pub scalar_type: SqlScalarType,
58 #[serde(default = "return_true")]
60 pub nullable: bool,
61}
62
63#[inline(always)]
69fn return_true() -> bool {
70 true
71}
72
73impl SqlColumnType {
74 pub fn try_union_many<'a>(
78 typs: impl IntoIterator<Item = &'a Self>,
79 ) -> Result<Self, anyhow::Error> {
80 let mut iter = typs.into_iter();
81 let Some(typ) = iter.next() else {
82 bail!("Cannot union empty iterator");
83 };
84 iter.try_fold(typ.clone(), |a, b| a.try_union(b))
85 }
86
87 pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
92 Self::try_union_many(typs).expect("Cannot union empty iterator")
93 }
94
95 pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
99 self.scalar_type
100 .backport_nullability(&backport_typ.scalar_type);
101 self.nullable = backport_typ.nullable;
102 }
103
104 pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
117 match (&self.scalar_type, &other.scalar_type) {
118 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
119 Ok(SqlColumnType {
120 scalar_type: scalar_type.clone(),
121 nullable: self.nullable || other.nullable,
122 })
123 }
124 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
125 Ok(SqlColumnType {
126 scalar_type: scalar_type.without_modifiers(),
127 nullable: self.nullable || other.nullable,
128 })
129 }
130 (
131 SqlScalarType::Record { fields, custom_id },
132 SqlScalarType::Record {
133 fields: other_fields,
134 custom_id: other_custom_id,
135 },
136 ) => {
137 if custom_id != other_custom_id {
138 bail!(
139 "Can't union types: {:?} and {:?}",
140 self.scalar_type,
141 other.scalar_type
142 );
143 };
144
145 if fields.len() != other_fields.len() {
146 bail!(
147 "Can't union types: {:?} and {:?}",
148 self.scalar_type,
149 other.scalar_type
150 );
151 }
152 let mut union_fields = Vec::with_capacity(fields.len());
153 for ((name, typ), (other_name, other_typ)) in
154 fields.iter().zip_eq(other_fields.iter())
155 {
156 if name != other_name {
157 bail!(
158 "Can't union types: {:?} and {:?}",
159 self.scalar_type,
160 other.scalar_type
161 );
162 } else {
163 let union_column_type = typ.sql_union(other_typ)?;
164 union_fields.push((name.clone(), union_column_type));
165 };
166 }
167
168 Ok(SqlColumnType {
169 scalar_type: SqlScalarType::Record {
170 fields: union_fields.into(),
171 custom_id: *custom_id,
172 },
173 nullable: self.nullable || other.nullable,
174 })
175 }
176 _ => bail!(
177 "Can't union types: {:?} and {:?}",
178 self.scalar_type,
179 other.scalar_type
180 ),
181 }
182 }
183
184 pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
194 self.sql_union(other).or_else(|e| {
195 let repr_self = ReprColumnType::from(self);
196 let repr_other = ReprColumnType::from(other);
197 match repr_self.union(&repr_other) {
198 Ok(typ) => {
199 soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
202 Ok(SqlColumnType::from_repr(&typ))
203 }
204 Err(_) => {
205 Err(e)
208 }
209 }
210 })
211 }
212
213 pub fn union(&self, other: &Self) -> Self {
218 self.try_union(other).unwrap_or_else(|e| {
219 panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
220 })
221 }
222
223 pub fn nullable(mut self, nullable: bool) -> Self {
226 self.nullable = nullable;
227 self
228 }
229}
230
231impl RustType<ProtoColumnType> for SqlColumnType {
232 fn into_proto(&self) -> ProtoColumnType {
233 ProtoColumnType {
234 nullable: self.nullable,
235 scalar_type: Some(self.scalar_type.into_proto()),
236 }
237 }
238
239 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
240 Ok(SqlColumnType {
241 nullable: proto.nullable,
242 scalar_type: proto
243 .scalar_type
244 .into_rust_if_some("ProtoColumnType::scalar_type")?,
245 })
246 }
247}
248
249impl fmt::Display for SqlColumnType {
250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
251 let nullable = if self.nullable { "Null" } else { "NotNull" };
252 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
253 }
254}
255
256#[derive(
258 Arbitrary,
259 Clone,
260 Debug,
261 Eq,
262 PartialEq,
263 Ord,
264 PartialOrd,
265 Serialize,
266 Deserialize,
267 Hash,
268 MzReflect
269)]
270pub struct SqlRelationType {
271 pub column_types: Vec<SqlColumnType>,
273 #[serde(default)]
283 pub keys: Vec<Vec<usize>>,
284}
285
286impl SqlRelationType {
287 pub fn empty() -> Self {
290 SqlRelationType::new(vec![])
291 }
292
293 pub fn new(column_types: Vec<SqlColumnType>) -> Self {
297 SqlRelationType {
298 column_types,
299 keys: Vec::new(),
300 }
301 }
302
303 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
305 indices.sort_unstable();
306 if !self.keys.contains(&indices) {
307 self.keys.push(indices);
308 }
309 self
310 }
311
312 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
313 for key in keys {
314 self = self.with_key(key)
315 }
316 self
317 }
318
319 pub fn arity(&self) -> usize {
321 self.column_types.len()
322 }
323
324 pub fn default_key(&self) -> Vec<usize> {
326 if let Some(key) = self.keys.first() {
327 if key.is_empty() {
328 (0..self.column_types.len()).collect()
329 } else {
330 key.clone()
331 }
332 } else {
333 (0..self.column_types.len()).collect()
334 }
335 }
336
337 pub fn columns(&self) -> &[SqlColumnType] {
339 &self.column_types
340 }
341
342 pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
346 assert_eq!(
347 backport_typ.column_types.len(),
348 self.column_types.len(),
349 "HIR and MIR types should have the same number of columns"
350 );
351 for (backport_col, sql_col) in backport_typ
352 .column_types
353 .iter()
354 .zip_eq(self.column_types.iter_mut())
355 {
356 sql_col.backport_nullability(backport_col);
357 }
358
359 self.keys = backport_typ.keys.clone();
360 }
361
362 pub fn from_repr(repr: &ReprRelationType) -> Self {
366 SqlRelationType {
367 column_types: repr
368 .column_types
369 .iter()
370 .map(SqlColumnType::from_repr)
371 .collect(),
372 keys: repr.keys.clone(),
373 }
374 }
375}
376
377impl RustType<ProtoRelationType> for SqlRelationType {
378 fn into_proto(&self) -> ProtoRelationType {
379 ProtoRelationType {
380 column_types: self.column_types.into_proto(),
381 keys: self.keys.into_proto(),
382 }
383 }
384
385 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
386 Ok(SqlRelationType {
387 column_types: proto.column_types.into_rust()?,
388 keys: proto.keys.into_rust()?,
389 })
390 }
391}
392
393impl RustType<ProtoKey> for Vec<usize> {
394 fn into_proto(&self) -> ProtoKey {
395 ProtoKey {
396 keys: self.into_proto(),
397 }
398 }
399
400 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
401 proto.keys.into_rust()
402 }
403}
404
405#[derive(
407 Clone,
408 Debug,
409 Eq,
410 PartialEq,
411 Ord,
412 PartialOrd,
413 Serialize,
414 Deserialize,
415 Hash,
416 MzReflect
417)]
418pub struct ReprRelationType {
419 pub column_types: Vec<ReprColumnType>,
421 #[serde(default)]
431 pub keys: Vec<Vec<usize>>,
432}
433
434impl ReprRelationType {
435 pub fn empty() -> Self {
438 ReprRelationType::new(vec![])
439 }
440
441 pub fn new(column_types: Vec<ReprColumnType>) -> Self {
445 ReprRelationType {
446 column_types,
447 keys: Vec::new(),
448 }
449 }
450
451 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
453 indices.sort_unstable();
454 if !self.keys.contains(&indices) {
455 self.keys.push(indices);
456 }
457 self
458 }
459
460 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
461 for key in keys {
462 self = self.with_key(key)
463 }
464 self
465 }
466
467 pub fn arity(&self) -> usize {
469 self.column_types.len()
470 }
471
472 pub fn default_key(&self) -> Vec<usize> {
474 if let Some(key) = self.keys.first() {
475 if key.is_empty() {
476 (0..self.column_types.len()).collect()
477 } else {
478 key.clone()
479 }
480 } else {
481 (0..self.column_types.len()).collect()
482 }
483 }
484
485 pub fn columns(&self) -> &[ReprColumnType] {
487 &self.column_types
488 }
489}
490
491impl From<&SqlRelationType> for ReprRelationType {
492 fn from(sql_relation_type: &SqlRelationType) -> Self {
493 ReprRelationType {
494 column_types: sql_relation_type
495 .column_types
496 .iter()
497 .map(ReprColumnType::from)
498 .collect(),
499 keys: sql_relation_type.keys.clone(),
500 }
501 }
502}
503
504#[derive(
505 Clone,
506 Debug,
507 Eq,
508 PartialEq,
509 Ord,
510 PartialOrd,
511 Serialize,
512 Deserialize,
513 Hash,
514 MzReflect
515)]
516pub struct ReprColumnType {
517 pub scalar_type: ReprScalarType,
519 #[serde(default = "return_true")]
521 pub nullable: bool,
522}
523
524impl std::fmt::Display for ReprColumnType {
525 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
526 write!(f, "{}", self.scalar_type)?;
527 if self.nullable {
528 write!(f, "?")?;
529 }
530 Ok(())
531 }
532}
533
534impl ReprColumnType {
535 pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
542 let scalar_type = self.scalar_type.union(&col.scalar_type)?;
543 let nullable = self.nullable || col.nullable;
544
545 Ok(ReprColumnType {
546 scalar_type,
547 nullable,
548 })
549 }
550}
551
552impl From<&SqlColumnType> for ReprColumnType {
553 fn from(sql_column_type: &SqlColumnType) -> Self {
554 let scalar_type = &sql_column_type.scalar_type;
555 let scalar_type = scalar_type.into();
556 let nullable = sql_column_type.nullable;
557
558 ReprColumnType {
559 scalar_type,
560 nullable,
561 }
562 }
563}
564
565impl SqlColumnType {
566 pub fn from_repr(repr: &ReprColumnType) -> Self {
570 let scalar_type = &repr.scalar_type;
571 let scalar_type = SqlScalarType::from_repr(scalar_type);
572 let nullable = repr.nullable;
573
574 SqlColumnType {
575 scalar_type,
576 nullable,
577 }
578 }
579}
580
581#[derive(
583 Clone,
584 Debug,
585 Eq,
586 PartialEq,
587 Ord,
588 PartialOrd,
589 Serialize,
590 Deserialize,
591 Hash,
592 MzReflect
593)]
594pub struct ColumnName(Box<str>);
595
596impl ColumnName {
597 #[inline(always)]
599 pub fn as_str(&self) -> &str {
600 &*self
601 }
602
603 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
605 &mut self.0
606 }
607
608 pub fn is_similar(&self, other: &ColumnName) -> bool {
610 const SIMILARITY_THRESHOLD: f64 = 0.6;
611
612 let a_lowercase = self.to_lowercase();
613 let b_lowercase = other.to_lowercase();
614
615 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
616 }
617}
618
619impl std::ops::Deref for ColumnName {
620 type Target = str;
621
622 #[inline(always)]
623 fn deref(&self) -> &Self::Target {
624 &self.0
625 }
626}
627
628impl fmt::Display for ColumnName {
629 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
630 f.write_str(&self.0)
631 }
632}
633
634impl From<String> for ColumnName {
635 fn from(s: String) -> ColumnName {
636 ColumnName(s.into())
637 }
638}
639
640impl From<&str> for ColumnName {
641 fn from(s: &str) -> ColumnName {
642 ColumnName(s.into())
643 }
644}
645
646impl From<&ColumnName> for ColumnName {
647 fn from(n: &ColumnName) -> ColumnName {
648 n.clone()
649 }
650}
651
652impl RustType<ProtoColumnName> for ColumnName {
653 fn into_proto(&self) -> ProtoColumnName {
654 ProtoColumnName {
655 value: Some(self.0.to_string()),
656 }
657 }
658
659 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
660 Ok(ColumnName(
661 proto
662 .value
663 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
664 .into(),
665 ))
666 }
667}
668
669impl From<ColumnName> for mz_sql_parser::ast::Ident {
670 fn from(value: ColumnName) -> Self {
671 mz_sql_parser::ast::Ident::new_unchecked(value.0)
673 }
674}
675
676impl proptest::arbitrary::Arbitrary for ColumnName {
677 type Parameters = ();
678 type Strategy = BoxedStrategy<ColumnName>;
679
680 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
681 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
684 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
685 weights.extend([
686 (5, Just(16..128)),
687 (1, Just(128..1024)),
688 (1, Just(1024..4096)),
689 ]);
690 }
691 let name_length = Union::new_weighted(weights);
692
693 let char_strat = Rc::new(Union::new_weighted(vec![
696 (50, proptest::char::range('A', 'z').boxed()),
697 (1, any::<char>().boxed()),
698 ]));
699
700 name_length
701 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
702 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
703 .no_shrink()
704 .boxed()
705 }
706}
707
708pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
710
711#[derive(
713 Clone,
714 Copy,
715 Debug,
716 Eq,
717 PartialEq,
718 PartialOrd,
719 Ord,
720 Serialize,
721 Deserialize,
722 Hash,
723 MzReflect
724)]
725pub struct ColumnIndex(usize);
726
727static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
728
729impl ColumnIndex {
730 pub fn to_stable_name(&self) -> String {
732 self.0.to_string()
733 }
734
735 pub fn to_raw(&self) -> usize {
736 self.0
737 }
738
739 pub fn from_raw(val: usize) -> Self {
740 ColumnIndex(val)
741 }
742}
743
744#[derive(
746 Clone,
747 Copy,
748 Debug,
749 Eq,
750 PartialEq,
751 PartialOrd,
752 Ord,
753 Serialize,
754 Deserialize,
755 Hash,
756 MzReflect,
757 Arbitrary
758)]
759pub struct RelationVersion(u64);
760
761impl RelationVersion {
762 pub fn root() -> Self {
764 RelationVersion(0)
765 }
766
767 pub fn bump(&self) -> Self {
769 let next_version = self
770 .0
771 .checked_add(1)
772 .expect("added more than u64::MAX columns?");
773 RelationVersion(next_version)
774 }
775
776 pub fn into_raw(self) -> u64 {
780 self.0
781 }
782
783 pub fn from_raw(val: u64) -> RelationVersion {
787 RelationVersion(val)
788 }
789}
790
791impl From<RelationVersion> for SchemaId {
792 fn from(value: RelationVersion) -> Self {
793 SchemaId(usize::cast_from(value.0))
794 }
795}
796
797impl From<mz_sql_parser::ast::Version> for RelationVersion {
798 fn from(value: mz_sql_parser::ast::Version) -> Self {
799 RelationVersion(value.into_inner())
800 }
801}
802
803impl fmt::Display for RelationVersion {
804 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
805 write!(f, "v{}", self.0)
806 }
807}
808
809impl From<RelationVersion> for mz_sql_parser::ast::Version {
810 fn from(value: RelationVersion) -> Self {
811 mz_sql_parser::ast::Version::new(value.0)
812 }
813}
814
815impl RustType<ProtoRelationVersion> for RelationVersion {
816 fn into_proto(&self) -> ProtoRelationVersion {
817 ProtoRelationVersion { value: self.0 }
818 }
819
820 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
821 Ok(RelationVersion(proto.value))
822 }
823}
824
825#[derive(
832 Clone,
833 Copy,
834 Debug,
835 PartialEq,
836 Eq,
837 PartialOrd,
838 Ord,
839 Hash,
840 serde::Serialize
841)]
842pub enum SemanticType {
843 CatalogItemId,
844 GlobalId,
845 ClusterId,
846 ReplicaId,
847 SchemaId,
848 DatabaseId,
849 RoleId,
850 NetworkPolicyId,
851 ShardId,
852 OID,
853 ObjectType,
854 ConnectionType,
855 SourceType,
856 MzTimestamp,
857 WallclockTimestamp,
858 ByteCount,
859 RecordCount,
860 CreditRate,
861 SqlDefinition,
862 RedactedSqlDefinition,
863}
864
865impl fmt::Display for SemanticType {
866 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
867 let s = match self {
868 SemanticType::CatalogItemId => "CatalogItemId",
869 SemanticType::GlobalId => "GlobalId",
870 SemanticType::ClusterId => "ClusterId",
871 SemanticType::ReplicaId => "ReplicaId",
872 SemanticType::SchemaId => "SchemaId",
873 SemanticType::DatabaseId => "DatabaseId",
874 SemanticType::RoleId => "RoleId",
875 SemanticType::NetworkPolicyId => "NetworkPolicyId",
876 SemanticType::ShardId => "ShardId",
877 SemanticType::OID => "OID",
878 SemanticType::ObjectType => "ObjectType",
879 SemanticType::ConnectionType => "ConnectionType",
880 SemanticType::SourceType => "SourceType",
881 SemanticType::MzTimestamp => "MzTimestamp",
882 SemanticType::WallclockTimestamp => "WallclockTimestamp",
883 SemanticType::ByteCount => "ByteCount",
884 SemanticType::RecordCount => "RecordCount",
885 SemanticType::CreditRate => "CreditRate",
886 SemanticType::SqlDefinition => "SqlDefinition",
887 SemanticType::RedactedSqlDefinition => "RedactedSqlDefinition",
888 };
889 f.write_str(s)
890 }
891}
892
893#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
895struct ColumnMetadata {
896 name: ColumnName,
898 typ_idx: usize,
900 added: RelationVersion,
902 dropped: Option<RelationVersion>,
904}
905
906#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, MzReflect)]
974pub struct RelationDesc {
975 typ: SqlRelationType,
976 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
977}
978
979impl RustType<ProtoRelationDesc> for RelationDesc {
980 fn into_proto(&self) -> ProtoRelationDesc {
981 let (names, metadata): (Vec<_>, Vec<_>) = self
982 .metadata
983 .values()
984 .map(|meta| {
985 let metadata = ProtoColumnMetadata {
986 added: Some(meta.added.into_proto()),
987 dropped: meta.dropped.map(|v| v.into_proto()),
988 };
989 (meta.name.into_proto(), metadata)
990 })
991 .unzip();
992
993 let is_all_default_metadata = metadata.iter().all(|meta| {
999 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
1000 });
1001 let metadata = if is_all_default_metadata {
1002 Vec::new()
1003 } else {
1004 metadata
1005 };
1006
1007 ProtoRelationDesc {
1008 typ: Some(self.typ.into_proto()),
1009 names,
1010 metadata,
1011 }
1012 }
1013
1014 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
1015 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
1021 let val = ProtoColumnMetadata {
1022 added: Some(RelationVersion::root().into_proto()),
1023 dropped: None,
1024 };
1025 Box::new(itertools::repeat_n(val, proto.names.len()))
1026 } else {
1027 Box::new(proto.metadata.into_iter())
1028 };
1029
1030 let metadata = proto
1031 .names
1032 .into_iter()
1033 .zip_eq(proto_metadata)
1034 .enumerate()
1035 .map(|(idx, (name, metadata))| {
1036 let meta = ColumnMetadata {
1037 name: name.into_rust()?,
1038 typ_idx: idx,
1039 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
1040 dropped: metadata.dropped.into_rust()?,
1041 };
1042 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
1043 })
1044 .collect::<Result<_, _>>()?;
1045
1046 Ok(RelationDesc {
1047 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
1048 metadata,
1049 })
1050 }
1051}
1052
1053impl RelationDesc {
1054 pub fn builder() -> RelationDescBuilder {
1056 RelationDescBuilder::default()
1057 }
1058
1059 pub fn empty() -> Self {
1062 RelationDesc {
1063 typ: SqlRelationType::empty(),
1064 metadata: BTreeMap::default(),
1065 }
1066 }
1067
1068 pub fn is_empty(&self) -> bool {
1070 self == &Self::empty()
1071 }
1072
1073 pub fn len(&self) -> usize {
1075 self.typ().column_types.len()
1076 }
1077
1078 pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1086 where
1087 I: IntoIterator<Item = N>,
1088 N: Into<ColumnName>,
1089 {
1090 let metadata: BTreeMap<_, _> = names
1091 .into_iter()
1092 .enumerate()
1093 .map(|(idx, name)| {
1094 let col_idx = ColumnIndex(idx);
1095 let metadata = ColumnMetadata {
1096 name: name.into(),
1097 typ_idx: idx,
1098 added: RelationVersion::root(),
1099 dropped: None,
1100 };
1101 (col_idx, metadata)
1102 })
1103 .collect();
1104
1105 assert_eq!(typ.column_types.len(), metadata.len());
1107
1108 RelationDesc { typ, metadata }
1109 }
1110
1111 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1112 where
1113 I: IntoIterator<Item = (N, T)>,
1114 T: Into<SqlColumnType>,
1115 N: Into<ColumnName>,
1116 {
1117 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1118 let types = types.into_iter().map(Into::into).collect();
1119 let typ = SqlRelationType::new(types);
1120 Self::new(typ, names)
1121 }
1122
1123 pub fn concat(mut self, other: Self) -> Self {
1133 let self_len = self.typ.column_types.len();
1134
1135 for (typ, (_col_idx, meta)) in other.typ.column_types.into_iter().zip_eq(other.metadata) {
1136 assert_eq!(meta.added, RelationVersion::root());
1137 assert_none!(meta.dropped);
1138
1139 let new_idx = self.typ.columns().len();
1140 let new_meta = ColumnMetadata {
1141 name: meta.name,
1142 typ_idx: new_idx,
1143 added: RelationVersion::root(),
1144 dropped: None,
1145 };
1146
1147 self.typ.column_types.push(typ);
1148 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1149
1150 assert_eq!(self.metadata.len(), self.typ.columns().len());
1151 assert_none!(prev);
1152 }
1153
1154 for k in other.typ.keys {
1155 let k = k.into_iter().map(|idx| idx + self_len).collect();
1156 self = self.with_key(k);
1157 }
1158 self
1159 }
1160
1161 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1163 self.typ = self.typ.with_key(indices);
1164 self
1165 }
1166
1167 pub fn without_keys(mut self) -> Self {
1169 self.typ.keys.clear();
1170 self
1171 }
1172
1173 pub fn with_names<I, N>(self, names: I) -> Self
1181 where
1182 I: IntoIterator<Item = N>,
1183 N: Into<ColumnName>,
1184 {
1185 Self::new(self.typ, names)
1186 }
1187
1188 pub fn arity(&self) -> usize {
1190 self.typ.arity()
1191 }
1192
1193 pub fn typ(&self) -> &SqlRelationType {
1195 &self.typ
1196 }
1197
1198 pub fn into_typ(self) -> SqlRelationType {
1200 self.typ
1201 }
1202
1203 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1205 self.metadata.values().map(|meta| {
1206 let typ = &self.typ.columns()[meta.typ_idx];
1207 (&meta.name, typ)
1208 })
1209 }
1210
1211 pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1213 self.typ.column_types.iter()
1214 }
1215
1216 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1218 self.metadata.values().map(|meta| &meta.name)
1219 }
1220
1221 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1223 self.metadata.iter().map(|(col_idx, metadata)| {
1224 let col_typ = &self.typ.columns()[metadata.typ_idx];
1225 (col_idx, &metadata.name, col_typ)
1226 })
1227 }
1228
1229 pub fn iter_similar_names<'a>(
1232 &'a self,
1233 name: &'a ColumnName,
1234 ) -> impl Iterator<Item = &'a ColumnName> {
1235 self.iter_names().filter(|n| n.is_similar(name))
1236 }
1237
1238 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1240 self.metadata.contains_key(idx)
1241 }
1242
1243 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1249 self.iter_names()
1250 .position(|n| n == name)
1251 .map(|i| (i, &self.typ.column_types[i]))
1252 }
1253
1254 pub fn get_name(&self, i: usize) -> &ColumnName {
1262 self.get_name_idx(&ColumnIndex(i))
1264 }
1265
1266 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1272 &self.metadata.get(idx).expect("should exist").name
1273 }
1274
1275 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1281 &mut self
1283 .metadata
1284 .get_mut(&ColumnIndex(i))
1285 .expect("should exist")
1286 .name
1287 }
1288
1289 pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1295 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1296 &self.typ.column_types[typ_idx]
1297 }
1298
1299 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1308 let name = self.get_name(i);
1309 if self.iter_names().filter(|n| *n == name).count() == 1 {
1310 Some(name)
1311 } else {
1312 None
1313 }
1314 }
1315
1316 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1321 let name = self.get_name(i);
1322 let typ = &self.typ.column_types[i];
1323 if d == &Datum::Null && !typ.nullable {
1324 Err(NotNullViolation(name.clone()))
1325 } else {
1326 Ok(())
1327 }
1328 }
1329
1330 pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1345 assert_eq!(self.metadata.len(), self.typ.columns().len());
1346 assert_eq!(other.metadata.len(), other.typ.columns().len());
1347 for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1348 assert_eq!(meta.typ_idx, idx.0);
1349 assert_eq!(meta.added, RelationVersion::root());
1350 assert_none!(meta.dropped);
1351 }
1352
1353 let mut column_diffs = BTreeMap::new();
1354 let mut key_diff = None;
1355
1356 let left_arity = self.arity();
1357 let right_arity = other.arity();
1358 let common_arity = std::cmp::min(left_arity, right_arity);
1359
1360 for idx in 0..common_arity {
1361 let left_name = self.get_name(idx);
1362 let right_name = other.get_name(idx);
1363 let left_type = &self.typ.column_types[idx];
1364 let right_type = &other.typ.column_types[idx];
1365
1366 if left_name != right_name {
1367 let diff = ColumnDiff::NameMismatch {
1368 left: left_name.clone(),
1369 right: right_name.clone(),
1370 };
1371 column_diffs.insert(idx, diff);
1372 } else if left_type.scalar_type != right_type.scalar_type {
1373 let diff = ColumnDiff::TypeMismatch {
1374 name: left_name.clone(),
1375 left: left_type.scalar_type.clone(),
1376 right: right_type.scalar_type.clone(),
1377 };
1378 column_diffs.insert(idx, diff);
1379 } else if left_type.nullable != right_type.nullable {
1380 let diff = ColumnDiff::NullabilityMismatch {
1381 name: left_name.clone(),
1382 left: left_type.nullable,
1383 right: right_type.nullable,
1384 };
1385 column_diffs.insert(idx, diff);
1386 }
1387 }
1388
1389 for idx in common_arity..left_arity {
1390 let diff = ColumnDiff::Missing {
1391 name: self.get_name(idx).clone(),
1392 };
1393 column_diffs.insert(idx, diff);
1394 }
1395
1396 for idx in common_arity..right_arity {
1397 let diff = ColumnDiff::Extra {
1398 name: other.get_name(idx).clone(),
1399 };
1400 column_diffs.insert(idx, diff);
1401 }
1402
1403 let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1404 let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1405 if left_keys != right_keys {
1406 let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1407 keys.iter()
1408 .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1409 .collect()
1410 };
1411 key_diff = Some(KeyDiff {
1412 left: column_names(self, left_keys),
1413 right: column_names(other, right_keys),
1414 });
1415 }
1416
1417 RelationDescDiff {
1418 column_diffs,
1419 key_diff,
1420 }
1421 }
1422
1423 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1425 let mut new_desc = self.clone();
1426
1427 let mut removed = 0;
1429 new_desc.metadata.retain(|idx, metadata| {
1430 let retain = demands.contains(&idx.0);
1431 if !retain {
1432 removed += 1;
1433 } else {
1434 metadata.typ_idx -= removed;
1435 }
1436 retain
1437 });
1438
1439 let mut idx = 0;
1441 new_desc.typ.column_types.retain(|_| {
1442 let keep = demands.contains(&idx);
1443 idx += 1;
1444 keep
1445 });
1446
1447 new_desc
1448 }
1449}
1450
1451impl Arbitrary for RelationDesc {
1452 type Parameters = ();
1453 type Strategy = BoxedStrategy<RelationDesc>;
1454
1455 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1456 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1457 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1458 weights.extend([
1459 (12, Just(16..32)),
1460 (6, Just(32..64)),
1461 (3, Just(64..128)),
1462 (1, Just(128..256)),
1463 ]);
1464 }
1465 let num_columns = Union::new_weighted(weights);
1466
1467 num_columns.prop_flat_map(arb_relation_desc).boxed()
1468 }
1469}
1470
1471pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1474 proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1475 .prop_map(RelationDesc::from_names_and_types)
1476}
1477
1478pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1480 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1481 mask.prop_map(move |mask| {
1482 let demands: BTreeSet<_> = mask
1483 .into_iter()
1484 .enumerate()
1485 .filter_map(|(idx, keep)| keep.then_some(idx))
1486 .collect();
1487 desc.apply_demand(&demands)
1488 })
1489}
1490
1491impl IntoIterator for RelationDesc {
1492 type Item = (ColumnName, SqlColumnType);
1493 type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1494
1495 fn into_iter(self) -> Self::IntoIter {
1496 let iter = self
1497 .metadata
1498 .into_values()
1499 .zip_eq(self.typ.column_types)
1500 .map(|(meta, typ)| (meta.name, typ));
1501 Box::new(iter)
1502 }
1503}
1504
1505pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1507 let datums: Vec<_> = desc
1508 .typ()
1509 .columns()
1510 .iter()
1511 .cloned()
1512 .map(arb_datum_for_column)
1513 .collect();
1514 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1515}
1516
1517#[derive(Debug, PartialEq, Eq)]
1519pub struct NotNullViolation(pub ColumnName);
1520
1521impl fmt::Display for NotNullViolation {
1522 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1523 write!(
1524 f,
1525 "null value in column {} violates not-null constraint",
1526 self.0.quoted()
1527 )
1528 }
1529}
1530
1531#[derive(Debug, Clone, PartialEq, Eq)]
1533pub struct RelationDescDiff {
1534 pub column_diffs: BTreeMap<usize, ColumnDiff>,
1536 pub key_diff: Option<KeyDiff>,
1538}
1539
1540impl RelationDescDiff {
1541 pub fn is_empty(&self) -> bool {
1543 self.column_diffs.is_empty() && self.key_diff.is_none()
1544 }
1545}
1546
1547#[derive(Debug, Clone, PartialEq, Eq)]
1549pub enum ColumnDiff {
1550 Missing { name: ColumnName },
1552 Extra { name: ColumnName },
1554 TypeMismatch {
1556 name: ColumnName,
1557 left: SqlScalarType,
1558 right: SqlScalarType,
1559 },
1560 NullabilityMismatch {
1562 name: ColumnName,
1563 left: bool,
1564 right: bool,
1565 },
1566 NameMismatch { left: ColumnName, right: ColumnName },
1568}
1569
1570#[derive(Debug, Clone, PartialEq, Eq)]
1572pub struct KeyDiff {
1573 pub left: BTreeSet<Vec<ColumnName>>,
1575 pub right: BTreeSet<Vec<ColumnName>>,
1577}
1578
1579#[derive(Clone, Default, Debug, PartialEq, Eq)]
1581pub struct RelationDescBuilder {
1582 columns: Vec<(ColumnName, SqlColumnType)>,
1584 keys: Vec<Vec<usize>>,
1586}
1587
1588impl RelationDescBuilder {
1589 pub fn with_column<N: Into<ColumnName>>(
1591 mut self,
1592 name: N,
1593 ty: SqlColumnType,
1594 ) -> RelationDescBuilder {
1595 let name = name.into();
1596 self.columns.push((name, ty));
1597 self
1598 }
1599
1600 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1602 where
1603 I: IntoIterator<Item = (N, T)>,
1604 T: Into<SqlColumnType>,
1605 N: Into<ColumnName>,
1606 {
1607 self.columns
1608 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1609 self
1610 }
1611
1612 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1614 indices.sort_unstable();
1615 if !self.keys.contains(&indices) {
1616 self.keys.push(indices);
1617 }
1618 self
1619 }
1620
1621 pub fn without_keys(mut self) -> RelationDescBuilder {
1623 self.keys.clear();
1624 assert_eq!(self.keys.len(), 0);
1625 self
1626 }
1627
1628 pub fn concat(mut self, other: Self) -> Self {
1630 let self_len = self.columns.len();
1631
1632 self.columns.extend(other.columns);
1633 for k in other.keys {
1634 let k = k.into_iter().map(|idx| idx + self_len).collect();
1635 self = self.with_key(k);
1636 }
1637
1638 self
1639 }
1640
1641 pub fn finish(self) -> RelationDesc {
1643 let mut desc = RelationDesc::from_names_and_types(self.columns);
1644 desc.typ = desc.typ.with_keys(self.keys);
1645 desc
1646 }
1647}
1648
1649#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1651pub enum RelationVersionSelector {
1652 Specific(RelationVersion),
1653 Latest,
1654}
1655
1656impl RelationVersionSelector {
1657 pub fn specific(version: u64) -> Self {
1658 RelationVersionSelector::Specific(RelationVersion(version))
1659 }
1660}
1661
1662#[derive(Debug, Clone, Serialize)]
1668pub struct VersionedRelationDesc {
1669 inner: RelationDesc,
1670}
1671
1672impl VersionedRelationDesc {
1673 pub fn new(inner: RelationDesc) -> Self {
1674 VersionedRelationDesc { inner }
1675 }
1676
1677 #[must_use]
1685 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1686 where
1687 N: Into<ColumnName>,
1688 T: Into<SqlColumnType>,
1689 {
1690 let latest_version = self.latest_version();
1691 let new_version = latest_version.bump();
1692
1693 let name = name.into();
1694 let existing = self
1695 .inner
1696 .metadata
1697 .iter()
1698 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1699 if let Some(existing) = existing {
1700 panic!("column named '{name}' already exists! {existing:?}");
1701 }
1702
1703 let next_idx = self.inner.metadata.len();
1704 let col_meta = ColumnMetadata {
1705 name,
1706 typ_idx: next_idx,
1707 added: new_version,
1708 dropped: None,
1709 };
1710
1711 self.inner.typ.column_types.push(typ.into());
1712 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1713
1714 assert_none!(prev, "column index overlap!");
1715 self.validate();
1716
1717 new_version
1718 }
1719
1720 #[must_use]
1729 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1730 where
1731 N: Into<ColumnName>,
1732 {
1733 let name = name.into();
1734 let latest_version = self.latest_version();
1735 let new_version = latest_version.bump();
1736
1737 let col = self
1738 .inner
1739 .metadata
1740 .values_mut()
1741 .find(|meta| meta.name == name && meta.dropped.is_none())
1742 .expect("column to exist");
1743
1744 assert_none!(col.dropped, "column was already dropped");
1746 col.dropped = Some(new_version);
1747
1748 let dropped_key = self
1750 .inner
1751 .typ
1752 .keys
1753 .iter()
1754 .any(|keys| keys.contains(&col.typ_idx));
1755 assert!(!dropped_key, "column being dropped was used as a key");
1756
1757 self.validate();
1758 new_version
1759 }
1760
1761 pub fn latest(&self) -> RelationDesc {
1763 self.inner.clone()
1764 }
1765
1766 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1768 let up_to_version = match version {
1770 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1771 RelationVersionSelector::Specific(v) => v,
1772 };
1773
1774 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1775 let added = meta.added <= up_to_version;
1776 let dropped = meta
1777 .dropped
1778 .map(|dropped_at| up_to_version >= dropped_at)
1779 .unwrap_or(false);
1780
1781 added && !dropped
1782 });
1783
1784 let mut column_types = Vec::new();
1785 let mut column_metas = BTreeMap::new();
1786
1787 for (col_idx, meta) in valid_columns {
1794 let new_meta = ColumnMetadata {
1795 name: meta.name.clone(),
1796 typ_idx: column_types.len(),
1797 added: meta.added.clone(),
1798 dropped: meta.dropped.clone(),
1799 };
1800 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1801 column_metas.insert(*col_idx, new_meta);
1802 }
1803
1804 let keys = self
1810 .inner
1811 .typ
1812 .keys
1813 .iter()
1814 .map(|keys| {
1815 keys.iter()
1816 .map(|key_idx| {
1817 let metadata = column_metas
1818 .get(&ColumnIndex(*key_idx))
1819 .expect("found key for column that doesn't exist");
1820 metadata.typ_idx
1821 })
1822 .collect()
1823 })
1824 .collect();
1825
1826 let relation_type = SqlRelationType { column_types, keys };
1827
1828 RelationDesc {
1829 typ: relation_type,
1830 metadata: column_metas,
1831 }
1832 }
1833
1834 pub fn latest_version(&self) -> RelationVersion {
1835 self.inner
1836 .metadata
1837 .values()
1838 .map(|meta| meta.dropped.unwrap_or(meta.added))
1840 .max()
1841 .unwrap_or_else(RelationVersion::root)
1843 }
1844
1845 fn validate(&self) {
1851 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1852 if desc.typ.column_types.len() != desc.metadata.len() {
1853 anyhow::bail!("mismatch between number of types and metadatas");
1854 }
1855
1856 for (col_idx, meta) in &desc.metadata {
1857 if col_idx.0 > desc.metadata.len() {
1858 anyhow::bail!("column index out of bounds");
1859 }
1860 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1861 anyhow::bail!("column was added after it was dropped?");
1862 }
1863 if desc.typ().columns().get(meta.typ_idx).is_none() {
1864 anyhow::bail!("typ_idx incorrect");
1865 }
1866 }
1867
1868 for keys in &desc.typ.keys {
1869 for key in keys {
1870 if *key >= desc.typ.column_types.len() {
1871 anyhow::bail!("key index was out of bounds!");
1872 }
1873 }
1874 }
1875
1876 let versions = desc
1877 .metadata
1878 .values()
1879 .map(|meta| meta.dropped.unwrap_or(meta.added));
1880 let mut max = 0;
1881 let mut sum = 0;
1882 for version in versions {
1883 max = std::cmp::max(max, version.0);
1884 sum += version.0;
1885 }
1886
1887 if sum != (max * (max + 1) / 2) {
1897 anyhow::bail!("there is a duplicate or missing relation version");
1898 }
1899
1900 Ok(())
1901 }
1902
1903 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1904 }
1905}
1906
1907#[derive(Debug)]
1910pub enum PropRelationDescDiff {
1911 AddColumn {
1912 name: ColumnName,
1913 typ: SqlColumnType,
1914 },
1915 DropColumn {
1916 name: ColumnName,
1917 },
1918 ToggleNullability {
1919 name: ColumnName,
1920 },
1921 ChangeType {
1922 name: ColumnName,
1923 typ: SqlColumnType,
1924 },
1925}
1926
1927impl PropRelationDescDiff {
1928 pub fn apply(self, desc: &mut RelationDesc) {
1929 match self {
1930 PropRelationDescDiff::AddColumn { name, typ } => {
1931 let new_idx = desc.metadata.len();
1932 let meta = ColumnMetadata {
1933 name,
1934 typ_idx: new_idx,
1935 added: RelationVersion(0),
1936 dropped: None,
1937 };
1938 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1939 desc.typ.column_types.push(typ);
1940
1941 assert_none!(prev);
1942 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1943 }
1944 PropRelationDescDiff::DropColumn { name } => {
1945 let next_version = desc
1946 .metadata
1947 .values()
1948 .map(|meta| meta.dropped.unwrap_or(meta.added))
1949 .max()
1950 .unwrap_or_else(RelationVersion::root)
1951 .bump();
1952 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1953 else {
1954 return;
1955 };
1956 if metadata.dropped.is_none() {
1957 metadata.dropped = Some(next_version);
1958 }
1959 }
1960 PropRelationDescDiff::ToggleNullability { name } => {
1961 let Some((pos, _)) = desc.get_by_name(&name) else {
1962 return;
1963 };
1964 let col_type = desc
1965 .typ
1966 .column_types
1967 .get_mut(pos)
1968 .expect("ColumnNames and SqlColumnTypes out of sync!");
1969 col_type.nullable = !col_type.nullable;
1970 }
1971 PropRelationDescDiff::ChangeType { name, typ } => {
1972 let Some((pos, _)) = desc.get_by_name(&name) else {
1973 return;
1974 };
1975 let col_type = desc
1976 .typ
1977 .column_types
1978 .get_mut(pos)
1979 .expect("ColumnNames and SqlColumnTypes out of sync!");
1980 *col_type = typ;
1981 }
1982 }
1983 }
1984}
1985
1986pub fn arb_relation_desc_diff(
1988 source: &RelationDesc,
1989) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1990 let source = Rc::new(source.clone());
1991 let num_source_columns = source.typ.columns().len();
1992
1993 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1994 let add_columns_strat = num_add_columns
1995 .prop_flat_map(|num_columns| {
1996 proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1997 })
1998 .prop_map(|cols| {
1999 cols.into_iter()
2000 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
2001 .collect::<Vec<_>>()
2002 });
2003
2004 if num_source_columns == 0 {
2006 return add_columns_strat.boxed();
2007 }
2008
2009 let source_ = Rc::clone(&source);
2010 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2011 let mut set = BTreeSet::default();
2012 for _ in 0..num_columns {
2013 let col_idx = rng.random_range(0..num_source_columns);
2014 set.insert(source_.get_name(col_idx).clone());
2015 }
2016 set.into_iter()
2017 .map(|name| PropRelationDescDiff::DropColumn { name })
2018 .collect::<Vec<_>>()
2019 });
2020
2021 let source_ = Rc::clone(&source);
2022 let toggle_nullability_strat =
2023 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2024 let mut set = BTreeSet::default();
2025 for _ in 0..num_columns {
2026 let col_idx = rng.random_range(0..num_source_columns);
2027 set.insert(source_.get_name(col_idx).clone());
2028 }
2029 set.into_iter()
2030 .map(|name| PropRelationDescDiff::ToggleNullability { name })
2031 .collect::<Vec<_>>()
2032 });
2033
2034 let source_ = Rc::clone(&source);
2035 let change_type_strat = (0..num_source_columns)
2036 .prop_perturb(move |num_columns, mut rng| {
2037 let mut set = BTreeSet::default();
2038 for _ in 0..num_columns {
2039 let col_idx = rng.random_range(0..num_source_columns);
2040 set.insert(source_.get_name(col_idx).clone());
2041 }
2042 set
2043 })
2044 .prop_flat_map(|cols| {
2045 proptest::collection::vec(any::<SqlColumnType>(), cols.len())
2046 .prop_map(move |types| (cols.clone(), types))
2047 })
2048 .prop_map(|(cols, types)| {
2049 cols.into_iter()
2050 .zip_eq(types)
2051 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
2052 .collect::<Vec<_>>()
2053 });
2054
2055 (
2056 add_columns_strat,
2057 drop_columns_strat,
2058 toggle_nullability_strat,
2059 change_type_strat,
2060 )
2061 .prop_map(|(adds, drops, toggles, changes)| {
2062 adds.into_iter()
2063 .chain(drops)
2064 .chain(toggles)
2065 .chain(changes)
2066 .collect::<Vec<_>>()
2067 })
2068 .prop_shuffle()
2069 .boxed()
2070}
2071
2072#[cfg(test)]
2073mod tests {
2074 use super::*;
2075 use prost::Message;
2076
2077 #[mz_ore::test]
2078 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
2080 let desc = RelationDesc::builder()
2081 .with_column("a", SqlScalarType::Bool.nullable(true))
2082 .with_column("z", SqlScalarType::String.nullable(false))
2083 .finish();
2084
2085 let mut versioned_desc = VersionedRelationDesc {
2086 inner: desc.clone(),
2087 };
2088 versioned_desc.validate();
2089
2090 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2091 assert_eq!(desc, latest);
2092
2093 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2094 assert_eq!(desc, v0);
2095
2096 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2097 assert_eq!(desc, v3);
2098
2099 let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2100 assert_eq!(v1, RelationVersion(1));
2101
2102 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2103 insta::assert_json_snapshot!(v1.metadata, @r###"
2104 {
2105 "0": {
2106 "name": "a",
2107 "typ_idx": 0,
2108 "added": 0,
2109 "dropped": null
2110 },
2111 "1": {
2112 "name": "z",
2113 "typ_idx": 1,
2114 "added": 0,
2115 "dropped": null
2116 },
2117 "2": {
2118 "name": "b",
2119 "typ_idx": 2,
2120 "added": 1,
2121 "dropped": null
2122 }
2123 }
2124 "###);
2125
2126 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2128 assert!(v0.iter().eq(v0_b.iter()));
2129
2130 let v2 = versioned_desc.drop_column("z");
2131 assert_eq!(v2, RelationVersion(2));
2132
2133 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2134 insta::assert_json_snapshot!(v2.metadata, @r###"
2135 {
2136 "0": {
2137 "name": "a",
2138 "typ_idx": 0,
2139 "added": 0,
2140 "dropped": null
2141 },
2142 "2": {
2143 "name": "b",
2144 "typ_idx": 1,
2145 "added": 1,
2146 "dropped": null
2147 }
2148 }
2149 "###);
2150
2151 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2153 assert!(v0.iter().eq(v0_c.iter()));
2154
2155 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2156 assert!(v1.iter().eq(v1_b.iter()));
2157
2158 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2159 {
2160 "0": {
2161 "name": "a",
2162 "typ_idx": 0,
2163 "added": 0,
2164 "dropped": null
2165 },
2166 "1": {
2167 "name": "z",
2168 "typ_idx": 1,
2169 "added": 0,
2170 "dropped": 2
2171 },
2172 "2": {
2173 "name": "b",
2174 "typ_idx": 2,
2175 "added": 1,
2176 "dropped": null
2177 }
2178 }
2179 "###);
2180 }
2181
2182 #[mz_ore::test]
2183 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
2185 let desc = RelationDesc::builder()
2186 .with_column("a", SqlScalarType::Bool.nullable(true))
2187 .with_column("z", SqlScalarType::String.nullable(false))
2188 .with_key(vec![1])
2189 .finish();
2190
2191 let mut versioned_desc = VersionedRelationDesc {
2192 inner: desc.clone(),
2193 };
2194 versioned_desc.validate();
2195
2196 let v1 = versioned_desc.drop_column("a");
2197 assert_eq!(v1, RelationVersion(1));
2198
2199 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2201 insta::assert_json_snapshot!(v1, @r###"
2202 {
2203 "typ": {
2204 "column_types": [
2205 {
2206 "scalar_type": "String",
2207 "nullable": false
2208 }
2209 ],
2210 "keys": [
2211 [
2212 0
2213 ]
2214 ]
2215 },
2216 "metadata": {
2217 "1": {
2218 "name": "z",
2219 "typ_idx": 0,
2220 "added": 0,
2221 "dropped": null
2222 }
2223 }
2224 }
2225 "###);
2226
2227 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2229 insta::assert_json_snapshot!(v0, @r###"
2230 {
2231 "typ": {
2232 "column_types": [
2233 {
2234 "scalar_type": "Bool",
2235 "nullable": true
2236 },
2237 {
2238 "scalar_type": "String",
2239 "nullable": false
2240 }
2241 ],
2242 "keys": [
2243 [
2244 1
2245 ]
2246 ]
2247 },
2248 "metadata": {
2249 "0": {
2250 "name": "a",
2251 "typ_idx": 0,
2252 "added": 0,
2253 "dropped": 1
2254 },
2255 "1": {
2256 "name": "z",
2257 "typ_idx": 1,
2258 "added": 0,
2259 "dropped": null
2260 }
2261 }
2262 }
2263 "###);
2264 }
2265
2266 #[mz_ore::test]
2267 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
2269 let typ = ProtoRelationType {
2270 column_types: vec![
2271 SqlScalarType::String.nullable(false).into_proto(),
2272 SqlScalarType::Bool.nullable(true).into_proto(),
2273 ],
2274 keys: vec![],
2275 };
2276 let proto = ProtoRelationDesc {
2277 typ: Some(typ),
2278 names: vec![
2279 ColumnName("a".into()).into_proto(),
2280 ColumnName("b".into()).into_proto(),
2281 ],
2282 metadata: vec![],
2283 };
2284 let desc: RelationDesc = proto.into_rust().unwrap();
2285
2286 insta::assert_json_snapshot!(desc, @r###"
2287 {
2288 "typ": {
2289 "column_types": [
2290 {
2291 "scalar_type": "String",
2292 "nullable": false
2293 },
2294 {
2295 "scalar_type": "Bool",
2296 "nullable": true
2297 }
2298 ],
2299 "keys": []
2300 },
2301 "metadata": {
2302 "0": {
2303 "name": "a",
2304 "typ_idx": 0,
2305 "added": 0,
2306 "dropped": null
2307 },
2308 "1": {
2309 "name": "b",
2310 "typ_idx": 1,
2311 "added": 0,
2312 "dropped": null
2313 }
2314 }
2315 }
2316 "###);
2317 }
2318
2319 #[mz_ore::test]
2320 #[should_panic(expected = "column named 'a' already exists!")]
2321 fn test_add_column_with_same_name_panics() {
2322 let desc = RelationDesc::builder()
2323 .with_column("a", SqlScalarType::Bool.nullable(true))
2324 .finish();
2325 let mut versioned = VersionedRelationDesc::new(desc);
2326
2327 let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2328 }
2329
2330 #[mz_ore::test]
2331 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
2333 let desc = RelationDesc::builder()
2334 .with_column("a", SqlScalarType::Bool.nullable(true))
2335 .finish();
2336 let mut versioned = VersionedRelationDesc::new(desc);
2337
2338 let v1 = versioned.drop_column("a");
2339 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2340 insta::assert_json_snapshot!(v1, @r###"
2341 {
2342 "typ": {
2343 "column_types": [],
2344 "keys": []
2345 },
2346 "metadata": {}
2347 }
2348 "###);
2349
2350 let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2351 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2352 insta::assert_json_snapshot!(v2, @r###"
2353 {
2354 "typ": {
2355 "column_types": [
2356 {
2357 "scalar_type": "String",
2358 "nullable": false
2359 }
2360 ],
2361 "keys": []
2362 },
2363 "metadata": {
2364 "1": {
2365 "name": "a",
2366 "typ_idx": 0,
2367 "added": 2,
2368 "dropped": null
2369 }
2370 }
2371 }
2372 "###);
2373 }
2374
2375 #[mz_ore::test]
2376 #[cfg_attr(miri, ignore)]
2377 fn apply_demand() {
2378 let desc = RelationDesc::builder()
2379 .with_column("a", SqlScalarType::String.nullable(true))
2380 .with_column("b", SqlScalarType::Int64.nullable(false))
2381 .with_column("c", SqlScalarType::Time.nullable(false))
2382 .finish();
2383 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2384 assert_eq!(desc.arity(), 2);
2385 VersionedRelationDesc::new(desc).validate();
2387 }
2388
2389 #[mz_ore::test]
2390 #[cfg_attr(miri, ignore)]
2391 fn smoketest_column_index_stable_ident() {
2392 let idx_a = ColumnIndex(42);
2393 assert_eq!(idx_a.to_stable_name(), "42");
2395 }
2396
2397 #[mz_ore::test]
2398 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
2400 fn testcase(og: RelationDesc) {
2401 let bytes = og.into_proto().encode_to_vec();
2402 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2403 let rnd = RelationDesc::from_proto(proto).unwrap();
2404
2405 assert_eq!(og, rnd);
2406 }
2407
2408 proptest!(|(desc in any::<RelationDesc>())| {
2409 testcase(desc);
2410 });
2411
2412 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2413 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2414 });
2415
2416 proptest!(|((mut desc, diffs) in strat)| {
2417 for diff in diffs {
2418 diff.apply(&mut desc);
2419 };
2420 testcase(desc);
2421 });
2422 }
2423}