1use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::soft_panic_or_log;
19use mz_ore::str::StrExt;
20use mz_ore::{assert_none, assert_ok};
21use mz_persist_types::schema::SchemaId;
22use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
23#[cfg(any(test, feature = "proptest"))]
24use proptest::prelude::*;
25#[cfg(any(test, feature = "proptest"))]
26use proptest::strategy::{Strategy, Union};
27#[cfg(any(test, feature = "proptest"))]
28use proptest_derive::Arbitrary;
29use serde::{Deserialize, Serialize};
30
31#[cfg(any(test, feature = "proptest"))]
32use crate::arb_datum_for_column;
33use crate::relation_and_scalar::proto_relation_type::ProtoKey;
34pub use crate::relation_and_scalar::{
35 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
36 ProtoRelationVersion,
37};
38use crate::{Datum, ReprScalarType, Row, SqlScalarType};
39
40#[derive(
48 Clone,
49 Debug,
50 Eq,
51 PartialEq,
52 Ord,
53 PartialOrd,
54 Serialize,
55 Deserialize,
56 Hash,
57 MzReflect
58)]
59#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
60pub struct SqlColumnType {
61 pub scalar_type: SqlScalarType,
63 #[serde(default = "return_true")]
65 pub nullable: bool,
66}
67
68#[inline(always)]
74fn return_true() -> bool {
75 true
76}
77
78impl SqlColumnType {
79 pub fn try_union_many<'a>(
83 typs: impl IntoIterator<Item = &'a Self>,
84 ) -> Result<Self, anyhow::Error> {
85 let mut iter = typs.into_iter();
86 let Some(typ) = iter.next() else {
87 bail!("Cannot union empty iterator");
88 };
89 iter.try_fold(typ.clone(), |a, b| a.try_union(b))
90 }
91
92 pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
97 Self::try_union_many(typs).expect("Cannot union empty iterator")
98 }
99
100 pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
104 self.scalar_type
105 .backport_nullability(&backport_typ.scalar_type);
106 self.nullable = backport_typ.nullable;
107 }
108
109 pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
122 match (&self.scalar_type, &other.scalar_type) {
123 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
124 Ok(SqlColumnType {
125 scalar_type: scalar_type.clone(),
126 nullable: self.nullable || other.nullable,
127 })
128 }
129 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
130 Ok(SqlColumnType {
131 scalar_type: scalar_type.without_modifiers(),
132 nullable: self.nullable || other.nullable,
133 })
134 }
135 (
136 SqlScalarType::Record { fields, custom_id },
137 SqlScalarType::Record {
138 fields: other_fields,
139 custom_id: other_custom_id,
140 },
141 ) => {
142 if custom_id != other_custom_id {
143 bail!(
144 "Can't union types: {:?} and {:?}",
145 self.scalar_type,
146 other.scalar_type
147 );
148 };
149
150 if fields.len() != other_fields.len() {
151 bail!(
152 "Can't union types: {:?} and {:?}",
153 self.scalar_type,
154 other.scalar_type
155 );
156 }
157 let mut union_fields = Vec::with_capacity(fields.len());
158 for ((name, typ), (other_name, other_typ)) in
159 fields.iter().zip_eq(other_fields.iter())
160 {
161 if name != other_name {
162 bail!(
163 "Can't union types: {:?} and {:?}",
164 self.scalar_type,
165 other.scalar_type
166 );
167 } else {
168 let union_column_type = typ.sql_union(other_typ)?;
169 union_fields.push((name.clone(), union_column_type));
170 };
171 }
172
173 Ok(SqlColumnType {
174 scalar_type: SqlScalarType::Record {
175 fields: union_fields.into(),
176 custom_id: *custom_id,
177 },
178 nullable: self.nullable || other.nullable,
179 })
180 }
181 _ => bail!(
182 "Can't union types: {:?} and {:?}",
183 self.scalar_type,
184 other.scalar_type
185 ),
186 }
187 }
188
189 pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
199 self.sql_union(other).or_else(|e| {
200 let repr_self = ReprColumnType::from(self);
201 let repr_other = ReprColumnType::from(other);
202 match repr_self.union(&repr_other) {
203 Ok(typ) => {
204 soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
207 Ok(SqlColumnType::from_repr(&typ))
208 }
209 Err(_) => {
210 Err(e)
213 }
214 }
215 })
216 }
217
218 pub fn union(&self, other: &Self) -> Self {
223 self.try_union(other).unwrap_or_else(|e| {
224 panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
225 })
226 }
227
228 pub fn nullable(mut self, nullable: bool) -> Self {
231 self.nullable = nullable;
232 self
233 }
234}
235
236impl RustType<ProtoColumnType> for SqlColumnType {
237 fn into_proto(&self) -> ProtoColumnType {
238 ProtoColumnType {
239 nullable: self.nullable,
240 scalar_type: Some(self.scalar_type.into_proto()),
241 }
242 }
243
244 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
245 Ok(SqlColumnType {
246 nullable: proto.nullable,
247 scalar_type: proto
248 .scalar_type
249 .into_rust_if_some("ProtoColumnType::scalar_type")?,
250 })
251 }
252}
253
254impl fmt::Display for SqlColumnType {
255 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
256 let nullable = if self.nullable { "Null" } else { "NotNull" };
257 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
258 }
259}
260
261#[derive(
263 Clone,
264 Debug,
265 Eq,
266 PartialEq,
267 Ord,
268 PartialOrd,
269 Serialize,
270 Deserialize,
271 Hash,
272 MzReflect
273)]
274#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
275pub struct SqlRelationType {
276 pub column_types: Vec<SqlColumnType>,
278 #[serde(default)]
288 pub keys: Vec<Vec<usize>>,
289}
290
291impl SqlRelationType {
292 pub fn empty() -> Self {
295 SqlRelationType::new(vec![])
296 }
297
298 pub fn new(column_types: Vec<SqlColumnType>) -> Self {
302 SqlRelationType {
303 column_types,
304 keys: Vec::new(),
305 }
306 }
307
308 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
310 indices.sort_unstable();
311 if !self.keys.contains(&indices) {
312 self.keys.push(indices);
313 }
314 self
315 }
316
317 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
318 for key in keys {
319 self = self.with_key(key)
320 }
321 self
322 }
323
324 pub fn arity(&self) -> usize {
326 self.column_types.len()
327 }
328
329 pub fn default_key(&self) -> Vec<usize> {
331 if let Some(key) = self.keys.first() {
332 if key.is_empty() {
333 (0..self.column_types.len()).collect()
334 } else {
335 key.clone()
336 }
337 } else {
338 (0..self.column_types.len()).collect()
339 }
340 }
341
342 pub fn columns(&self) -> &[SqlColumnType] {
344 &self.column_types
345 }
346
347 pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
351 assert_eq!(
352 backport_typ.column_types.len(),
353 self.column_types.len(),
354 "HIR and MIR types should have the same number of columns"
355 );
356 for (backport_col, sql_col) in backport_typ
357 .column_types
358 .iter()
359 .zip_eq(self.column_types.iter_mut())
360 {
361 sql_col.backport_nullability(backport_col);
362 }
363
364 self.keys = backport_typ.keys.clone();
365 }
366
367 pub fn from_repr(repr: &ReprRelationType) -> Self {
371 SqlRelationType {
372 column_types: repr
373 .column_types
374 .iter()
375 .map(SqlColumnType::from_repr)
376 .collect(),
377 keys: repr.keys.clone(),
378 }
379 }
380}
381
382impl RustType<ProtoRelationType> for SqlRelationType {
383 fn into_proto(&self) -> ProtoRelationType {
384 ProtoRelationType {
385 column_types: self.column_types.into_proto(),
386 keys: self.keys.into_proto(),
387 }
388 }
389
390 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
391 Ok(SqlRelationType {
392 column_types: proto.column_types.into_rust()?,
393 keys: proto.keys.into_rust()?,
394 })
395 }
396}
397
398impl RustType<ProtoKey> for Vec<usize> {
399 fn into_proto(&self) -> ProtoKey {
400 ProtoKey {
401 keys: self.into_proto(),
402 }
403 }
404
405 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
406 proto.keys.into_rust()
407 }
408}
409
410#[derive(
412 Clone,
413 Debug,
414 Eq,
415 PartialEq,
416 Ord,
417 PartialOrd,
418 Serialize,
419 Deserialize,
420 Hash,
421 MzReflect
422)]
423pub struct ReprRelationType {
424 pub column_types: Vec<ReprColumnType>,
426 #[serde(default)]
436 pub keys: Vec<Vec<usize>>,
437}
438
439impl ReprRelationType {
440 pub fn empty() -> Self {
443 ReprRelationType::new(vec![])
444 }
445
446 pub fn new(column_types: Vec<ReprColumnType>) -> Self {
450 ReprRelationType {
451 column_types,
452 keys: Vec::new(),
453 }
454 }
455
456 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
458 indices.sort_unstable();
459 if !self.keys.contains(&indices) {
460 self.keys.push(indices);
461 }
462 self
463 }
464
465 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
466 for key in keys {
467 self = self.with_key(key)
468 }
469 self
470 }
471
472 pub fn arity(&self) -> usize {
474 self.column_types.len()
475 }
476
477 pub fn default_key(&self) -> Vec<usize> {
479 if let Some(key) = self.keys.first() {
480 if key.is_empty() {
481 (0..self.column_types.len()).collect()
482 } else {
483 key.clone()
484 }
485 } else {
486 (0..self.column_types.len()).collect()
487 }
488 }
489
490 pub fn columns(&self) -> &[ReprColumnType] {
492 &self.column_types
493 }
494}
495
496impl From<&SqlRelationType> for ReprRelationType {
497 fn from(sql_relation_type: &SqlRelationType) -> Self {
498 ReprRelationType {
499 column_types: sql_relation_type
500 .column_types
501 .iter()
502 .map(ReprColumnType::from)
503 .collect(),
504 keys: sql_relation_type.keys.clone(),
505 }
506 }
507}
508
509#[derive(
510 Clone,
511 Debug,
512 Eq,
513 PartialEq,
514 Ord,
515 PartialOrd,
516 Serialize,
517 Deserialize,
518 Hash,
519 MzReflect
520)]
521pub struct ReprColumnType {
522 pub scalar_type: ReprScalarType,
524 #[serde(default = "return_true")]
526 pub nullable: bool,
527}
528
529impl std::fmt::Display for ReprColumnType {
530 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531 write!(f, "{}", self.scalar_type)?;
532 if self.nullable {
533 write!(f, "?")?;
534 }
535 Ok(())
536 }
537}
538
539impl ReprColumnType {
540 pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
547 let scalar_type = self.scalar_type.union(&col.scalar_type)?;
548 let nullable = self.nullable || col.nullable;
549
550 Ok(ReprColumnType {
551 scalar_type,
552 nullable,
553 })
554 }
555}
556
557impl From<&SqlColumnType> for ReprColumnType {
558 fn from(sql_column_type: &SqlColumnType) -> Self {
559 let scalar_type = &sql_column_type.scalar_type;
560 let scalar_type = scalar_type.into();
561 let nullable = sql_column_type.nullable;
562
563 ReprColumnType {
564 scalar_type,
565 nullable,
566 }
567 }
568}
569
570impl SqlColumnType {
571 pub fn from_repr(repr: &ReprColumnType) -> Self {
575 let scalar_type = &repr.scalar_type;
576 let scalar_type = SqlScalarType::from_repr(scalar_type);
577 let nullable = repr.nullable;
578
579 SqlColumnType {
580 scalar_type,
581 nullable,
582 }
583 }
584}
585
586#[derive(
588 Clone,
589 Debug,
590 Eq,
591 PartialEq,
592 Ord,
593 PartialOrd,
594 Serialize,
595 Deserialize,
596 Hash,
597 MzReflect
598)]
599pub struct ColumnName(Box<str>);
600
601impl ColumnName {
602 #[inline(always)]
604 pub fn as_str(&self) -> &str {
605 &*self
606 }
607
608 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
610 &mut self.0
611 }
612
613 pub fn is_similar(&self, other: &ColumnName) -> bool {
615 const SIMILARITY_THRESHOLD: f64 = 0.6;
616
617 let a_lowercase = self.to_lowercase();
618 let b_lowercase = other.to_lowercase();
619
620 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
621 }
622}
623
624impl std::ops::Deref for ColumnName {
625 type Target = str;
626
627 #[inline(always)]
628 fn deref(&self) -> &Self::Target {
629 &self.0
630 }
631}
632
633impl fmt::Display for ColumnName {
634 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
635 f.write_str(&self.0)
636 }
637}
638
639impl From<String> for ColumnName {
640 fn from(s: String) -> ColumnName {
641 ColumnName(s.into())
642 }
643}
644
645impl From<&str> for ColumnName {
646 fn from(s: &str) -> ColumnName {
647 ColumnName(s.into())
648 }
649}
650
651impl From<&ColumnName> for ColumnName {
652 fn from(n: &ColumnName) -> ColumnName {
653 n.clone()
654 }
655}
656
657impl RustType<ProtoColumnName> for ColumnName {
658 fn into_proto(&self) -> ProtoColumnName {
659 ProtoColumnName {
660 value: Some(self.0.to_string()),
661 }
662 }
663
664 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
665 Ok(ColumnName(
666 proto
667 .value
668 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
669 .into(),
670 ))
671 }
672}
673
674impl From<ColumnName> for mz_sql_parser::ast::Ident {
675 fn from(value: ColumnName) -> Self {
676 mz_sql_parser::ast::Ident::new_unchecked(value.0)
678 }
679}
680
681#[cfg(any(test, feature = "proptest"))]
682impl proptest::arbitrary::Arbitrary for ColumnName {
683 type Parameters = ();
684 type Strategy = BoxedStrategy<ColumnName>;
685
686 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
687 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
690 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
691 weights.extend([
692 (5, Just(16..128)),
693 (1, Just(128..1024)),
694 (1, Just(1024..4096)),
695 ]);
696 }
697 let name_length = Union::new_weighted(weights);
698
699 let char_strat = Rc::new(Union::new_weighted(vec![
702 (50, proptest::char::range('A', 'z').boxed()),
703 (1, any::<char>().boxed()),
704 ]));
705
706 name_length
707 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
708 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
709 .no_shrink()
710 .boxed()
711 }
712}
713
714pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
716
717#[derive(
719 Clone,
720 Copy,
721 Debug,
722 Eq,
723 PartialEq,
724 PartialOrd,
725 Ord,
726 Serialize,
727 Deserialize,
728 Hash,
729 MzReflect
730)]
731pub struct ColumnIndex(usize);
732
733#[cfg(any(test, feature = "proptest"))]
734static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
735
736impl ColumnIndex {
737 pub fn to_stable_name(&self) -> String {
739 self.0.to_string()
740 }
741
742 pub fn to_raw(&self) -> usize {
743 self.0
744 }
745
746 pub fn from_raw(val: usize) -> Self {
747 ColumnIndex(val)
748 }
749}
750
751#[derive(
753 Clone,
754 Copy,
755 Debug,
756 Eq,
757 PartialEq,
758 PartialOrd,
759 Ord,
760 Serialize,
761 Deserialize,
762 Hash,
763 MzReflect
764)]
765#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
766pub struct RelationVersion(u64);
767
768impl RelationVersion {
769 pub fn root() -> Self {
771 RelationVersion(0)
772 }
773
774 pub fn bump(&self) -> Self {
776 let next_version = self
777 .0
778 .checked_add(1)
779 .expect("added more than u64::MAX columns?");
780 RelationVersion(next_version)
781 }
782
783 pub fn into_raw(self) -> u64 {
787 self.0
788 }
789
790 pub fn from_raw(val: u64) -> RelationVersion {
794 RelationVersion(val)
795 }
796}
797
798impl From<RelationVersion> for SchemaId {
799 fn from(value: RelationVersion) -> Self {
800 SchemaId(usize::cast_from(value.0))
801 }
802}
803
804impl From<mz_sql_parser::ast::Version> for RelationVersion {
805 fn from(value: mz_sql_parser::ast::Version) -> Self {
806 RelationVersion(value.into_inner())
807 }
808}
809
810impl fmt::Display for RelationVersion {
811 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
812 write!(f, "v{}", self.0)
813 }
814}
815
816impl From<RelationVersion> for mz_sql_parser::ast::Version {
817 fn from(value: RelationVersion) -> Self {
818 mz_sql_parser::ast::Version::new(value.0)
819 }
820}
821
822impl RustType<ProtoRelationVersion> for RelationVersion {
823 fn into_proto(&self) -> ProtoRelationVersion {
824 ProtoRelationVersion { value: self.0 }
825 }
826
827 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
828 Ok(RelationVersion(proto.value))
829 }
830}
831
832#[derive(
839 Clone,
840 Copy,
841 Debug,
842 PartialEq,
843 Eq,
844 PartialOrd,
845 Ord,
846 Hash,
847 serde::Serialize
848)]
849pub enum SemanticType {
850 CatalogItemId,
851 GlobalId,
852 ClusterId,
853 ReplicaId,
854 SchemaId,
855 DatabaseId,
856 RoleId,
857 NetworkPolicyId,
858 ShardId,
859 OID,
860 ObjectType,
861 ConnectionType,
862 SourceType,
863 MzTimestamp,
864 WallclockTimestamp,
865 ByteCount,
866 RecordCount,
867 CreditRate,
868 SqlDefinition,
869 RedactedSqlDefinition,
870}
871
872impl fmt::Display for SemanticType {
873 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
874 let s = match self {
875 SemanticType::CatalogItemId => "CatalogItemId",
876 SemanticType::GlobalId => "GlobalId",
877 SemanticType::ClusterId => "ClusterId",
878 SemanticType::ReplicaId => "ReplicaId",
879 SemanticType::SchemaId => "SchemaId",
880 SemanticType::DatabaseId => "DatabaseId",
881 SemanticType::RoleId => "RoleId",
882 SemanticType::NetworkPolicyId => "NetworkPolicyId",
883 SemanticType::ShardId => "ShardId",
884 SemanticType::OID => "OID",
885 SemanticType::ObjectType => "ObjectType",
886 SemanticType::ConnectionType => "ConnectionType",
887 SemanticType::SourceType => "SourceType",
888 SemanticType::MzTimestamp => "MzTimestamp",
889 SemanticType::WallclockTimestamp => "WallclockTimestamp",
890 SemanticType::ByteCount => "ByteCount",
891 SemanticType::RecordCount => "RecordCount",
892 SemanticType::CreditRate => "CreditRate",
893 SemanticType::SqlDefinition => "SqlDefinition",
894 SemanticType::RedactedSqlDefinition => "RedactedSqlDefinition",
895 };
896 f.write_str(s)
897 }
898}
899
900#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
902struct ColumnMetadata {
903 name: ColumnName,
905 typ_idx: usize,
907 added: RelationVersion,
909 dropped: Option<RelationVersion>,
911}
912
913#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, MzReflect)]
981pub struct RelationDesc {
982 typ: SqlRelationType,
983 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
984}
985
986impl RustType<ProtoRelationDesc> for RelationDesc {
987 fn into_proto(&self) -> ProtoRelationDesc {
988 let (names, metadata): (Vec<_>, Vec<_>) = self
989 .metadata
990 .values()
991 .map(|meta| {
992 let metadata = ProtoColumnMetadata {
993 added: Some(meta.added.into_proto()),
994 dropped: meta.dropped.map(|v| v.into_proto()),
995 };
996 (meta.name.into_proto(), metadata)
997 })
998 .unzip();
999
1000 let is_all_default_metadata = metadata.iter().all(|meta| {
1006 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
1007 });
1008 let metadata = if is_all_default_metadata {
1009 Vec::new()
1010 } else {
1011 metadata
1012 };
1013
1014 ProtoRelationDesc {
1015 typ: Some(self.typ.into_proto()),
1016 names,
1017 metadata,
1018 }
1019 }
1020
1021 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
1022 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
1028 let val = ProtoColumnMetadata {
1029 added: Some(RelationVersion::root().into_proto()),
1030 dropped: None,
1031 };
1032 Box::new(itertools::repeat_n(val, proto.names.len()))
1033 } else {
1034 Box::new(proto.metadata.into_iter())
1035 };
1036
1037 let metadata = proto
1038 .names
1039 .into_iter()
1040 .zip_eq(proto_metadata)
1041 .enumerate()
1042 .map(|(idx, (name, metadata))| {
1043 let meta = ColumnMetadata {
1044 name: name.into_rust()?,
1045 typ_idx: idx,
1046 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
1047 dropped: metadata.dropped.into_rust()?,
1048 };
1049 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
1050 })
1051 .collect::<Result<_, _>>()?;
1052
1053 Ok(RelationDesc {
1054 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
1055 metadata,
1056 })
1057 }
1058}
1059
1060impl RelationDesc {
1061 pub fn builder() -> RelationDescBuilder {
1063 RelationDescBuilder::default()
1064 }
1065
1066 pub fn empty() -> Self {
1069 RelationDesc {
1070 typ: SqlRelationType::empty(),
1071 metadata: BTreeMap::default(),
1072 }
1073 }
1074
1075 pub fn is_empty(&self) -> bool {
1077 self == &Self::empty()
1078 }
1079
1080 pub fn len(&self) -> usize {
1082 self.typ().column_types.len()
1083 }
1084
1085 pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1093 where
1094 I: IntoIterator<Item = N>,
1095 N: Into<ColumnName>,
1096 {
1097 let metadata: BTreeMap<_, _> = names
1098 .into_iter()
1099 .enumerate()
1100 .map(|(idx, name)| {
1101 let col_idx = ColumnIndex(idx);
1102 let metadata = ColumnMetadata {
1103 name: name.into(),
1104 typ_idx: idx,
1105 added: RelationVersion::root(),
1106 dropped: None,
1107 };
1108 (col_idx, metadata)
1109 })
1110 .collect();
1111
1112 assert_eq!(typ.column_types.len(), metadata.len());
1114
1115 RelationDesc { typ, metadata }
1116 }
1117
1118 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1119 where
1120 I: IntoIterator<Item = (N, T)>,
1121 T: Into<SqlColumnType>,
1122 N: Into<ColumnName>,
1123 {
1124 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1125 let types = types.into_iter().map(Into::into).collect();
1126 let typ = SqlRelationType::new(types);
1127 Self::new(typ, names)
1128 }
1129
1130 pub fn concat(mut self, other: Self) -> Self {
1140 let self_len = self.typ.column_types.len();
1141
1142 for (typ, (_col_idx, meta)) in other.typ.column_types.into_iter().zip_eq(other.metadata) {
1143 assert_eq!(meta.added, RelationVersion::root());
1144 assert_none!(meta.dropped);
1145
1146 let new_idx = self.typ.columns().len();
1147 let new_meta = ColumnMetadata {
1148 name: meta.name,
1149 typ_idx: new_idx,
1150 added: RelationVersion::root(),
1151 dropped: None,
1152 };
1153
1154 self.typ.column_types.push(typ);
1155 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1156
1157 assert_eq!(self.metadata.len(), self.typ.columns().len());
1158 assert_none!(prev);
1159 }
1160
1161 for k in other.typ.keys {
1162 let k = k.into_iter().map(|idx| idx + self_len).collect();
1163 self = self.with_key(k);
1164 }
1165 self
1166 }
1167
1168 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1170 self.typ = self.typ.with_key(indices);
1171 self
1172 }
1173
1174 pub fn without_keys(mut self) -> Self {
1176 self.typ.keys.clear();
1177 self
1178 }
1179
1180 pub fn with_names<I, N>(self, names: I) -> Self
1188 where
1189 I: IntoIterator<Item = N>,
1190 N: Into<ColumnName>,
1191 {
1192 Self::new(self.typ, names)
1193 }
1194
1195 pub fn arity(&self) -> usize {
1197 self.typ.arity()
1198 }
1199
1200 pub fn typ(&self) -> &SqlRelationType {
1202 &self.typ
1203 }
1204
1205 pub fn into_typ(self) -> SqlRelationType {
1207 self.typ
1208 }
1209
1210 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1212 self.metadata.values().map(|meta| {
1213 let typ = &self.typ.columns()[meta.typ_idx];
1214 (&meta.name, typ)
1215 })
1216 }
1217
1218 pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1220 self.typ.column_types.iter()
1221 }
1222
1223 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1225 self.metadata.values().map(|meta| &meta.name)
1226 }
1227
1228 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1230 self.metadata.iter().map(|(col_idx, metadata)| {
1231 let col_typ = &self.typ.columns()[metadata.typ_idx];
1232 (col_idx, &metadata.name, col_typ)
1233 })
1234 }
1235
1236 pub fn iter_similar_names<'a>(
1239 &'a self,
1240 name: &'a ColumnName,
1241 ) -> impl Iterator<Item = &'a ColumnName> {
1242 self.iter_names().filter(|n| n.is_similar(name))
1243 }
1244
1245 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1247 self.metadata.contains_key(idx)
1248 }
1249
1250 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1256 self.iter_names()
1257 .position(|n| n == name)
1258 .map(|i| (i, &self.typ.column_types[i]))
1259 }
1260
1261 pub fn get_name(&self, i: usize) -> &ColumnName {
1269 self.get_name_idx(&ColumnIndex(i))
1271 }
1272
1273 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1279 &self.metadata.get(idx).expect("should exist").name
1280 }
1281
1282 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1288 &mut self
1290 .metadata
1291 .get_mut(&ColumnIndex(i))
1292 .expect("should exist")
1293 .name
1294 }
1295
1296 pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1302 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1303 &self.typ.column_types[typ_idx]
1304 }
1305
1306 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1315 let name = self.get_name(i);
1316 if self.iter_names().filter(|n| *n == name).count() == 1 {
1317 Some(name)
1318 } else {
1319 None
1320 }
1321 }
1322
1323 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1328 let name = self.get_name(i);
1329 let typ = &self.typ.column_types[i];
1330 if d == &Datum::Null && !typ.nullable {
1331 Err(NotNullViolation(name.clone()))
1332 } else {
1333 Ok(())
1334 }
1335 }
1336
1337 pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1352 assert_eq!(self.metadata.len(), self.typ.columns().len());
1353 assert_eq!(other.metadata.len(), other.typ.columns().len());
1354 for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1355 assert_eq!(meta.typ_idx, idx.0);
1356 assert_eq!(meta.added, RelationVersion::root());
1357 assert_none!(meta.dropped);
1358 }
1359
1360 let mut column_diffs = BTreeMap::new();
1361 let mut key_diff = None;
1362
1363 let left_arity = self.arity();
1364 let right_arity = other.arity();
1365 let common_arity = std::cmp::min(left_arity, right_arity);
1366
1367 for idx in 0..common_arity {
1368 let left_name = self.get_name(idx);
1369 let right_name = other.get_name(idx);
1370 let left_type = &self.typ.column_types[idx];
1371 let right_type = &other.typ.column_types[idx];
1372
1373 if left_name != right_name {
1374 let diff = ColumnDiff::NameMismatch {
1375 left: left_name.clone(),
1376 right: right_name.clone(),
1377 };
1378 column_diffs.insert(idx, diff);
1379 } else if left_type.scalar_type != right_type.scalar_type {
1380 let diff = ColumnDiff::TypeMismatch {
1381 name: left_name.clone(),
1382 left: left_type.scalar_type.clone(),
1383 right: right_type.scalar_type.clone(),
1384 };
1385 column_diffs.insert(idx, diff);
1386 } else if left_type.nullable != right_type.nullable {
1387 let diff = ColumnDiff::NullabilityMismatch {
1388 name: left_name.clone(),
1389 left: left_type.nullable,
1390 right: right_type.nullable,
1391 };
1392 column_diffs.insert(idx, diff);
1393 }
1394 }
1395
1396 for idx in common_arity..left_arity {
1397 let diff = ColumnDiff::Missing {
1398 name: self.get_name(idx).clone(),
1399 };
1400 column_diffs.insert(idx, diff);
1401 }
1402
1403 for idx in common_arity..right_arity {
1404 let diff = ColumnDiff::Extra {
1405 name: other.get_name(idx).clone(),
1406 };
1407 column_diffs.insert(idx, diff);
1408 }
1409
1410 let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1411 let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1412 if left_keys != right_keys {
1413 let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1414 keys.iter()
1415 .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1416 .collect()
1417 };
1418 key_diff = Some(KeyDiff {
1419 left: column_names(self, left_keys),
1420 right: column_names(other, right_keys),
1421 });
1422 }
1423
1424 RelationDescDiff {
1425 column_diffs,
1426 key_diff,
1427 }
1428 }
1429
1430 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1432 let mut new_desc = self.clone();
1433
1434 let mut removed = 0;
1436 new_desc.metadata.retain(|idx, metadata| {
1437 let retain = demands.contains(&idx.0);
1438 if !retain {
1439 removed += 1;
1440 } else {
1441 metadata.typ_idx -= removed;
1442 }
1443 retain
1444 });
1445
1446 let mut idx = 0;
1448 new_desc.typ.column_types.retain(|_| {
1449 let keep = demands.contains(&idx);
1450 idx += 1;
1451 keep
1452 });
1453
1454 new_desc
1455 }
1456}
1457
1458#[cfg(any(test, feature = "proptest"))]
1459impl Arbitrary for RelationDesc {
1460 type Parameters = ();
1461 type Strategy = BoxedStrategy<RelationDesc>;
1462
1463 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1464 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1465 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1466 weights.extend([
1467 (12, Just(16..32)),
1468 (6, Just(32..64)),
1469 (3, Just(64..128)),
1470 (1, Just(128..256)),
1471 ]);
1472 }
1473 let num_columns = Union::new_weighted(weights);
1474
1475 num_columns.prop_flat_map(arb_relation_desc).boxed()
1476 }
1477}
1478
1479#[cfg(any(test, feature = "proptest"))]
1482pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1483 proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1484 .prop_map(RelationDesc::from_names_and_types)
1485}
1486
1487#[cfg(any(test, feature = "proptest"))]
1489pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1490 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1491 mask.prop_map(move |mask| {
1492 let demands: BTreeSet<_> = mask
1493 .into_iter()
1494 .enumerate()
1495 .filter_map(|(idx, keep)| keep.then_some(idx))
1496 .collect();
1497 desc.apply_demand(&demands)
1498 })
1499}
1500
1501impl IntoIterator for RelationDesc {
1502 type Item = (ColumnName, SqlColumnType);
1503 type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1504
1505 fn into_iter(self) -> Self::IntoIter {
1506 let iter = self
1507 .metadata
1508 .into_values()
1509 .zip_eq(self.typ.column_types)
1510 .map(|(meta, typ)| (meta.name, typ));
1511 Box::new(iter)
1512 }
1513}
1514
1515#[cfg(any(test, feature = "proptest"))]
1517pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1518 let datums: Vec<_> = desc
1519 .typ()
1520 .columns()
1521 .iter()
1522 .cloned()
1523 .map(arb_datum_for_column)
1524 .collect();
1525 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1526}
1527
1528#[derive(Debug, PartialEq, Eq)]
1530pub struct NotNullViolation(pub ColumnName);
1531
1532impl fmt::Display for NotNullViolation {
1533 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1534 write!(
1535 f,
1536 "null value in column {} violates not-null constraint",
1537 self.0.quoted()
1538 )
1539 }
1540}
1541
1542#[derive(Debug, Clone, PartialEq, Eq)]
1544pub struct RelationDescDiff {
1545 pub column_diffs: BTreeMap<usize, ColumnDiff>,
1547 pub key_diff: Option<KeyDiff>,
1549}
1550
1551impl RelationDescDiff {
1552 pub fn is_empty(&self) -> bool {
1554 self.column_diffs.is_empty() && self.key_diff.is_none()
1555 }
1556}
1557
1558#[derive(Debug, Clone, PartialEq, Eq)]
1560pub enum ColumnDiff {
1561 Missing { name: ColumnName },
1563 Extra { name: ColumnName },
1565 TypeMismatch {
1567 name: ColumnName,
1568 left: SqlScalarType,
1569 right: SqlScalarType,
1570 },
1571 NullabilityMismatch {
1573 name: ColumnName,
1574 left: bool,
1575 right: bool,
1576 },
1577 NameMismatch { left: ColumnName, right: ColumnName },
1579}
1580
1581#[derive(Debug, Clone, PartialEq, Eq)]
1583pub struct KeyDiff {
1584 pub left: BTreeSet<Vec<ColumnName>>,
1586 pub right: BTreeSet<Vec<ColumnName>>,
1588}
1589
1590#[derive(Clone, Default, Debug, PartialEq, Eq)]
1592pub struct RelationDescBuilder {
1593 columns: Vec<(ColumnName, SqlColumnType)>,
1595 keys: Vec<Vec<usize>>,
1597}
1598
1599impl RelationDescBuilder {
1600 pub fn with_column<N: Into<ColumnName>>(
1602 mut self,
1603 name: N,
1604 ty: SqlColumnType,
1605 ) -> RelationDescBuilder {
1606 let name = name.into();
1607 self.columns.push((name, ty));
1608 self
1609 }
1610
1611 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1613 where
1614 I: IntoIterator<Item = (N, T)>,
1615 T: Into<SqlColumnType>,
1616 N: Into<ColumnName>,
1617 {
1618 self.columns
1619 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1620 self
1621 }
1622
1623 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1625 indices.sort_unstable();
1626 if !self.keys.contains(&indices) {
1627 self.keys.push(indices);
1628 }
1629 self
1630 }
1631
1632 pub fn without_keys(mut self) -> RelationDescBuilder {
1634 self.keys.clear();
1635 assert_eq!(self.keys.len(), 0);
1636 self
1637 }
1638
1639 pub fn concat(mut self, other: Self) -> Self {
1641 let self_len = self.columns.len();
1642
1643 self.columns.extend(other.columns);
1644 for k in other.keys {
1645 let k = k.into_iter().map(|idx| idx + self_len).collect();
1646 self = self.with_key(k);
1647 }
1648
1649 self
1650 }
1651
1652 pub fn finish(self) -> RelationDesc {
1654 let mut desc = RelationDesc::from_names_and_types(self.columns);
1655 desc.typ = desc.typ.with_keys(self.keys);
1656 desc
1657 }
1658}
1659
1660#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1662pub enum RelationVersionSelector {
1663 Specific(RelationVersion),
1664 Latest,
1665}
1666
1667impl RelationVersionSelector {
1668 pub fn specific(version: u64) -> Self {
1669 RelationVersionSelector::Specific(RelationVersion(version))
1670 }
1671}
1672
1673#[derive(Debug, Clone, Serialize)]
1679pub struct VersionedRelationDesc {
1680 inner: RelationDesc,
1681}
1682
1683impl VersionedRelationDesc {
1684 pub fn new(inner: RelationDesc) -> Self {
1685 VersionedRelationDesc { inner }
1686 }
1687
1688 #[must_use]
1696 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1697 where
1698 N: Into<ColumnName>,
1699 T: Into<SqlColumnType>,
1700 {
1701 let latest_version = self.latest_version();
1702 let new_version = latest_version.bump();
1703
1704 let name = name.into();
1705 let existing = self
1706 .inner
1707 .metadata
1708 .iter()
1709 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1710 if let Some(existing) = existing {
1711 panic!("column named '{name}' already exists! {existing:?}");
1712 }
1713
1714 let next_idx = self.inner.metadata.len();
1715 let col_meta = ColumnMetadata {
1716 name,
1717 typ_idx: next_idx,
1718 added: new_version,
1719 dropped: None,
1720 };
1721
1722 self.inner.typ.column_types.push(typ.into());
1723 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1724
1725 assert_none!(prev, "column index overlap!");
1726 self.validate();
1727
1728 new_version
1729 }
1730
1731 #[must_use]
1740 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1741 where
1742 N: Into<ColumnName>,
1743 {
1744 let name = name.into();
1745 let latest_version = self.latest_version();
1746 let new_version = latest_version.bump();
1747
1748 let col = self
1749 .inner
1750 .metadata
1751 .values_mut()
1752 .find(|meta| meta.name == name && meta.dropped.is_none())
1753 .expect("column to exist");
1754
1755 assert_none!(col.dropped, "column was already dropped");
1757 col.dropped = Some(new_version);
1758
1759 let dropped_key = self
1761 .inner
1762 .typ
1763 .keys
1764 .iter()
1765 .any(|keys| keys.contains(&col.typ_idx));
1766 assert!(!dropped_key, "column being dropped was used as a key");
1767
1768 self.validate();
1769 new_version
1770 }
1771
1772 pub fn latest(&self) -> RelationDesc {
1774 self.inner.clone()
1775 }
1776
1777 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1779 let up_to_version = match version {
1781 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1782 RelationVersionSelector::Specific(v) => v,
1783 };
1784
1785 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1786 let added = meta.added <= up_to_version;
1787 let dropped = meta
1788 .dropped
1789 .map(|dropped_at| up_to_version >= dropped_at)
1790 .unwrap_or(false);
1791
1792 added && !dropped
1793 });
1794
1795 let mut column_types = Vec::new();
1796 let mut column_metas = BTreeMap::new();
1797
1798 for (col_idx, meta) in valid_columns {
1805 let new_meta = ColumnMetadata {
1806 name: meta.name.clone(),
1807 typ_idx: column_types.len(),
1808 added: meta.added.clone(),
1809 dropped: meta.dropped.clone(),
1810 };
1811 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1812 column_metas.insert(*col_idx, new_meta);
1813 }
1814
1815 let keys = self
1821 .inner
1822 .typ
1823 .keys
1824 .iter()
1825 .map(|keys| {
1826 keys.iter()
1827 .map(|key_idx| {
1828 let metadata = column_metas
1829 .get(&ColumnIndex(*key_idx))
1830 .expect("found key for column that doesn't exist");
1831 metadata.typ_idx
1832 })
1833 .collect()
1834 })
1835 .collect();
1836
1837 let relation_type = SqlRelationType { column_types, keys };
1838
1839 RelationDesc {
1840 typ: relation_type,
1841 metadata: column_metas,
1842 }
1843 }
1844
1845 pub fn latest_version(&self) -> RelationVersion {
1846 self.inner
1847 .metadata
1848 .values()
1849 .map(|meta| meta.dropped.unwrap_or(meta.added))
1851 .max()
1852 .unwrap_or_else(RelationVersion::root)
1854 }
1855
1856 fn validate(&self) {
1862 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1863 if desc.typ.column_types.len() != desc.metadata.len() {
1864 anyhow::bail!("mismatch between number of types and metadatas");
1865 }
1866
1867 for (col_idx, meta) in &desc.metadata {
1868 if col_idx.0 > desc.metadata.len() {
1869 anyhow::bail!("column index out of bounds");
1870 }
1871 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1872 anyhow::bail!("column was added after it was dropped?");
1873 }
1874 if desc.typ().columns().get(meta.typ_idx).is_none() {
1875 anyhow::bail!("typ_idx incorrect");
1876 }
1877 }
1878
1879 for keys in &desc.typ.keys {
1880 for key in keys {
1881 if *key >= desc.typ.column_types.len() {
1882 anyhow::bail!("key index was out of bounds!");
1883 }
1884 }
1885 }
1886
1887 let versions = desc
1888 .metadata
1889 .values()
1890 .map(|meta| meta.dropped.unwrap_or(meta.added));
1891 let mut max = 0;
1892 let mut sum = 0;
1893 for version in versions {
1894 max = std::cmp::max(max, version.0);
1895 sum += version.0;
1896 }
1897
1898 if sum != (max * (max + 1) / 2) {
1908 anyhow::bail!("there is a duplicate or missing relation version");
1909 }
1910
1911 Ok(())
1912 }
1913
1914 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1915 }
1916}
1917
1918#[derive(Debug)]
1921#[cfg(any(test, feature = "proptest"))]
1922pub enum PropRelationDescDiff {
1923 AddColumn {
1924 name: ColumnName,
1925 typ: SqlColumnType,
1926 },
1927 DropColumn {
1928 name: ColumnName,
1929 },
1930 ToggleNullability {
1931 name: ColumnName,
1932 },
1933 ChangeType {
1934 name: ColumnName,
1935 typ: SqlColumnType,
1936 },
1937}
1938
1939#[cfg(any(test, feature = "proptest"))]
1940impl PropRelationDescDiff {
1941 pub fn apply(self, desc: &mut RelationDesc) {
1942 match self {
1943 PropRelationDescDiff::AddColumn { name, typ } => {
1944 let new_idx = desc.metadata.len();
1945 let meta = ColumnMetadata {
1946 name,
1947 typ_idx: new_idx,
1948 added: RelationVersion(0),
1949 dropped: None,
1950 };
1951 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1952 desc.typ.column_types.push(typ);
1953
1954 assert_none!(prev);
1955 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1956 }
1957 PropRelationDescDiff::DropColumn { name } => {
1958 let next_version = desc
1959 .metadata
1960 .values()
1961 .map(|meta| meta.dropped.unwrap_or(meta.added))
1962 .max()
1963 .unwrap_or_else(RelationVersion::root)
1964 .bump();
1965 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1966 else {
1967 return;
1968 };
1969 if metadata.dropped.is_none() {
1970 metadata.dropped = Some(next_version);
1971 }
1972 }
1973 PropRelationDescDiff::ToggleNullability { name } => {
1974 let Some((pos, _)) = desc.get_by_name(&name) else {
1975 return;
1976 };
1977 let col_type = desc
1978 .typ
1979 .column_types
1980 .get_mut(pos)
1981 .expect("ColumnNames and SqlColumnTypes out of sync!");
1982 col_type.nullable = !col_type.nullable;
1983 }
1984 PropRelationDescDiff::ChangeType { name, typ } => {
1985 let Some((pos, _)) = desc.get_by_name(&name) else {
1986 return;
1987 };
1988 let col_type = desc
1989 .typ
1990 .column_types
1991 .get_mut(pos)
1992 .expect("ColumnNames and SqlColumnTypes out of sync!");
1993 *col_type = typ;
1994 }
1995 }
1996 }
1997}
1998
1999#[cfg(any(test, feature = "proptest"))]
2001pub fn arb_relation_desc_diff(
2002 source: &RelationDesc,
2003) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
2004 let source = Rc::new(source.clone());
2005 let num_source_columns = source.typ.columns().len();
2006
2007 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
2008 let add_columns_strat = num_add_columns
2009 .prop_flat_map(|num_columns| {
2010 proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
2011 })
2012 .prop_map(|cols| {
2013 cols.into_iter()
2014 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
2015 .collect::<Vec<_>>()
2016 });
2017
2018 if num_source_columns == 0 {
2020 return add_columns_strat.boxed();
2021 }
2022
2023 let source_ = Rc::clone(&source);
2024 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2025 let mut set = BTreeSet::default();
2026 for _ in 0..num_columns {
2027 let col_idx = rng.random_range(0..num_source_columns);
2028 set.insert(source_.get_name(col_idx).clone());
2029 }
2030 set.into_iter()
2031 .map(|name| PropRelationDescDiff::DropColumn { name })
2032 .collect::<Vec<_>>()
2033 });
2034
2035 let source_ = Rc::clone(&source);
2036 let toggle_nullability_strat =
2037 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2038 let mut set = BTreeSet::default();
2039 for _ in 0..num_columns {
2040 let col_idx = rng.random_range(0..num_source_columns);
2041 set.insert(source_.get_name(col_idx).clone());
2042 }
2043 set.into_iter()
2044 .map(|name| PropRelationDescDiff::ToggleNullability { name })
2045 .collect::<Vec<_>>()
2046 });
2047
2048 let source_ = Rc::clone(&source);
2049 let change_type_strat = (0..num_source_columns)
2050 .prop_perturb(move |num_columns, mut rng| {
2051 let mut set = BTreeSet::default();
2052 for _ in 0..num_columns {
2053 let col_idx = rng.random_range(0..num_source_columns);
2054 set.insert(source_.get_name(col_idx).clone());
2055 }
2056 set
2057 })
2058 .prop_flat_map(|cols| {
2059 proptest::collection::vec(any::<SqlColumnType>(), cols.len())
2060 .prop_map(move |types| (cols.clone(), types))
2061 })
2062 .prop_map(|(cols, types)| {
2063 cols.into_iter()
2064 .zip_eq(types)
2065 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
2066 .collect::<Vec<_>>()
2067 });
2068
2069 (
2070 add_columns_strat,
2071 drop_columns_strat,
2072 toggle_nullability_strat,
2073 change_type_strat,
2074 )
2075 .prop_map(|(adds, drops, toggles, changes)| {
2076 adds.into_iter()
2077 .chain(drops)
2078 .chain(toggles)
2079 .chain(changes)
2080 .collect::<Vec<_>>()
2081 })
2082 .prop_shuffle()
2083 .boxed()
2084}
2085
2086#[cfg(test)]
2087mod tests {
2088 use super::*;
2089 use prost::Message;
2090
2091 #[mz_ore::test]
2092 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
2094 let desc = RelationDesc::builder()
2095 .with_column("a", SqlScalarType::Bool.nullable(true))
2096 .with_column("z", SqlScalarType::String.nullable(false))
2097 .finish();
2098
2099 let mut versioned_desc = VersionedRelationDesc {
2100 inner: desc.clone(),
2101 };
2102 versioned_desc.validate();
2103
2104 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2105 assert_eq!(desc, latest);
2106
2107 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2108 assert_eq!(desc, v0);
2109
2110 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2111 assert_eq!(desc, v3);
2112
2113 let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2114 assert_eq!(v1, RelationVersion(1));
2115
2116 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2117 insta::assert_json_snapshot!(v1.metadata, @r###"
2118 {
2119 "0": {
2120 "name": "a",
2121 "typ_idx": 0,
2122 "added": 0,
2123 "dropped": null
2124 },
2125 "1": {
2126 "name": "z",
2127 "typ_idx": 1,
2128 "added": 0,
2129 "dropped": null
2130 },
2131 "2": {
2132 "name": "b",
2133 "typ_idx": 2,
2134 "added": 1,
2135 "dropped": null
2136 }
2137 }
2138 "###);
2139
2140 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2142 assert!(v0.iter().eq(v0_b.iter()));
2143
2144 let v2 = versioned_desc.drop_column("z");
2145 assert_eq!(v2, RelationVersion(2));
2146
2147 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2148 insta::assert_json_snapshot!(v2.metadata, @r###"
2149 {
2150 "0": {
2151 "name": "a",
2152 "typ_idx": 0,
2153 "added": 0,
2154 "dropped": null
2155 },
2156 "2": {
2157 "name": "b",
2158 "typ_idx": 1,
2159 "added": 1,
2160 "dropped": null
2161 }
2162 }
2163 "###);
2164
2165 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2167 assert!(v0.iter().eq(v0_c.iter()));
2168
2169 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2170 assert!(v1.iter().eq(v1_b.iter()));
2171
2172 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2173 {
2174 "0": {
2175 "name": "a",
2176 "typ_idx": 0,
2177 "added": 0,
2178 "dropped": null
2179 },
2180 "1": {
2181 "name": "z",
2182 "typ_idx": 1,
2183 "added": 0,
2184 "dropped": 2
2185 },
2186 "2": {
2187 "name": "b",
2188 "typ_idx": 2,
2189 "added": 1,
2190 "dropped": null
2191 }
2192 }
2193 "###);
2194 }
2195
2196 #[mz_ore::test]
2197 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
2199 let desc = RelationDesc::builder()
2200 .with_column("a", SqlScalarType::Bool.nullable(true))
2201 .with_column("z", SqlScalarType::String.nullable(false))
2202 .with_key(vec![1])
2203 .finish();
2204
2205 let mut versioned_desc = VersionedRelationDesc {
2206 inner: desc.clone(),
2207 };
2208 versioned_desc.validate();
2209
2210 let v1 = versioned_desc.drop_column("a");
2211 assert_eq!(v1, RelationVersion(1));
2212
2213 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2215 insta::assert_json_snapshot!(v1, @r###"
2216 {
2217 "typ": {
2218 "column_types": [
2219 {
2220 "scalar_type": "String",
2221 "nullable": false
2222 }
2223 ],
2224 "keys": [
2225 [
2226 0
2227 ]
2228 ]
2229 },
2230 "metadata": {
2231 "1": {
2232 "name": "z",
2233 "typ_idx": 0,
2234 "added": 0,
2235 "dropped": null
2236 }
2237 }
2238 }
2239 "###);
2240
2241 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2243 insta::assert_json_snapshot!(v0, @r###"
2244 {
2245 "typ": {
2246 "column_types": [
2247 {
2248 "scalar_type": "Bool",
2249 "nullable": true
2250 },
2251 {
2252 "scalar_type": "String",
2253 "nullable": false
2254 }
2255 ],
2256 "keys": [
2257 [
2258 1
2259 ]
2260 ]
2261 },
2262 "metadata": {
2263 "0": {
2264 "name": "a",
2265 "typ_idx": 0,
2266 "added": 0,
2267 "dropped": 1
2268 },
2269 "1": {
2270 "name": "z",
2271 "typ_idx": 1,
2272 "added": 0,
2273 "dropped": null
2274 }
2275 }
2276 }
2277 "###);
2278 }
2279
2280 #[mz_ore::test]
2281 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
2283 let typ = ProtoRelationType {
2284 column_types: vec![
2285 SqlScalarType::String.nullable(false).into_proto(),
2286 SqlScalarType::Bool.nullable(true).into_proto(),
2287 ],
2288 keys: vec![],
2289 };
2290 let proto = ProtoRelationDesc {
2291 typ: Some(typ),
2292 names: vec![
2293 ColumnName("a".into()).into_proto(),
2294 ColumnName("b".into()).into_proto(),
2295 ],
2296 metadata: vec![],
2297 };
2298 let desc: RelationDesc = proto.into_rust().unwrap();
2299
2300 insta::assert_json_snapshot!(desc, @r###"
2301 {
2302 "typ": {
2303 "column_types": [
2304 {
2305 "scalar_type": "String",
2306 "nullable": false
2307 },
2308 {
2309 "scalar_type": "Bool",
2310 "nullable": true
2311 }
2312 ],
2313 "keys": []
2314 },
2315 "metadata": {
2316 "0": {
2317 "name": "a",
2318 "typ_idx": 0,
2319 "added": 0,
2320 "dropped": null
2321 },
2322 "1": {
2323 "name": "b",
2324 "typ_idx": 1,
2325 "added": 0,
2326 "dropped": null
2327 }
2328 }
2329 }
2330 "###);
2331 }
2332
2333 #[mz_ore::test]
2334 #[should_panic(expected = "column named 'a' already exists!")]
2335 fn test_add_column_with_same_name_panics() {
2336 let desc = RelationDesc::builder()
2337 .with_column("a", SqlScalarType::Bool.nullable(true))
2338 .finish();
2339 let mut versioned = VersionedRelationDesc::new(desc);
2340
2341 let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2342 }
2343
2344 #[mz_ore::test]
2345 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
2347 let desc = RelationDesc::builder()
2348 .with_column("a", SqlScalarType::Bool.nullable(true))
2349 .finish();
2350 let mut versioned = VersionedRelationDesc::new(desc);
2351
2352 let v1 = versioned.drop_column("a");
2353 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2354 insta::assert_json_snapshot!(v1, @r###"
2355 {
2356 "typ": {
2357 "column_types": [],
2358 "keys": []
2359 },
2360 "metadata": {}
2361 }
2362 "###);
2363
2364 let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2365 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2366 insta::assert_json_snapshot!(v2, @r###"
2367 {
2368 "typ": {
2369 "column_types": [
2370 {
2371 "scalar_type": "String",
2372 "nullable": false
2373 }
2374 ],
2375 "keys": []
2376 },
2377 "metadata": {
2378 "1": {
2379 "name": "a",
2380 "typ_idx": 0,
2381 "added": 2,
2382 "dropped": null
2383 }
2384 }
2385 }
2386 "###);
2387 }
2388
2389 #[mz_ore::test]
2390 #[cfg_attr(miri, ignore)]
2391 fn apply_demand() {
2392 let desc = RelationDesc::builder()
2393 .with_column("a", SqlScalarType::String.nullable(true))
2394 .with_column("b", SqlScalarType::Int64.nullable(false))
2395 .with_column("c", SqlScalarType::Time.nullable(false))
2396 .finish();
2397 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2398 assert_eq!(desc.arity(), 2);
2399 VersionedRelationDesc::new(desc).validate();
2401 }
2402
2403 #[mz_ore::test]
2404 #[cfg_attr(miri, ignore)]
2405 fn smoketest_column_index_stable_ident() {
2406 let idx_a = ColumnIndex(42);
2407 assert_eq!(idx_a.to_stable_name(), "42");
2409 }
2410
2411 #[mz_ore::test]
2412 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
2414 fn testcase(og: RelationDesc) {
2415 let bytes = og.into_proto().encode_to_vec();
2416 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2417 let rnd = RelationDesc::from_proto(proto).unwrap();
2418
2419 assert_eq!(og, rnd);
2420 }
2421
2422 proptest!(|(desc in any::<RelationDesc>())| {
2423 testcase(desc);
2424 });
2425
2426 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2427 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2428 });
2429
2430 proptest!(|((mut desc, diffs) in strat)| {
2431 for diff in diffs {
2432 diff.apply(&mut desc);
2433 };
2434 testcase(desc);
2435 });
2436 }
2437}