1use std::collections::{BTreeMap, BTreeSet};
11#[cfg(any(test, feature = "proptest"))]
12use std::rc::Rc;
13use std::{fmt, vec};
14
15use anyhow::bail;
16use itertools::Itertools;
17use mz_lowertest::MzReflect;
18use mz_ore::cast::CastFrom;
19use mz_ore::soft_panic_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::{assert_none, assert_ok};
22use mz_persist_types::schema::SchemaId;
23use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
24#[cfg(any(test, feature = "proptest"))]
25use proptest::prelude::*;
26#[cfg(any(test, feature = "proptest"))]
27use proptest::strategy::{Strategy, Union};
28#[cfg(any(test, feature = "proptest"))]
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32#[cfg(any(test, feature = "proptest"))]
33use crate::Row;
34#[cfg(any(test, feature = "proptest"))]
35use crate::arb_datum_for_column;
36use crate::relation_and_scalar::proto_relation_type::ProtoKey;
37pub use crate::relation_and_scalar::{
38 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
39 ProtoRelationVersion,
40};
41use crate::{Datum, ReprScalarType, SqlScalarType};
42
43#[derive(
51 Clone,
52 Debug,
53 Eq,
54 PartialEq,
55 Ord,
56 PartialOrd,
57 Serialize,
58 Deserialize,
59 Hash,
60 MzReflect
61)]
62#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
63pub struct SqlColumnType {
64 pub scalar_type: SqlScalarType,
66 #[serde(default = "return_true")]
68 pub nullable: bool,
69}
70
71#[inline(always)]
77fn return_true() -> bool {
78 true
79}
80
81impl SqlColumnType {
82 pub fn try_union_many<'a>(
86 typs: impl IntoIterator<Item = &'a Self>,
87 ) -> Result<Self, anyhow::Error> {
88 let mut iter = typs.into_iter();
89 let Some(typ) = iter.next() else {
90 bail!("Cannot union empty iterator");
91 };
92 iter.try_fold(typ.clone(), |a, b| a.try_union(b))
93 }
94
95 pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
100 Self::try_union_many(typs).expect("Cannot union empty iterator")
101 }
102
103 pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
107 self.scalar_type
108 .backport_nullability(&backport_typ.scalar_type);
109 self.nullable = backport_typ.nullable;
110 }
111
112 pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
125 match (&self.scalar_type, &other.scalar_type) {
126 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
127 Ok(SqlColumnType {
128 scalar_type: scalar_type.clone(),
129 nullable: self.nullable || other.nullable,
130 })
131 }
132 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
133 Ok(SqlColumnType {
134 scalar_type: scalar_type.without_modifiers(),
135 nullable: self.nullable || other.nullable,
136 })
137 }
138 (
139 SqlScalarType::Record { fields, custom_id },
140 SqlScalarType::Record {
141 fields: other_fields,
142 custom_id: other_custom_id,
143 },
144 ) => {
145 if custom_id != other_custom_id {
146 bail!(
147 "Can't union types: {:?} and {:?}",
148 self.scalar_type,
149 other.scalar_type
150 );
151 };
152
153 if fields.len() != other_fields.len() {
154 bail!(
155 "Can't union types: {:?} and {:?}",
156 self.scalar_type,
157 other.scalar_type
158 );
159 }
160 let mut union_fields = Vec::with_capacity(fields.len());
161 for ((name, typ), (other_name, other_typ)) in
162 fields.iter().zip_eq(other_fields.iter())
163 {
164 if name != other_name {
165 bail!(
166 "Can't union types: {:?} and {:?}",
167 self.scalar_type,
168 other.scalar_type
169 );
170 } else {
171 let union_column_type = typ.sql_union(other_typ)?;
172 union_fields.push((name.clone(), union_column_type));
173 };
174 }
175
176 Ok(SqlColumnType {
177 scalar_type: SqlScalarType::Record {
178 fields: union_fields.into(),
179 custom_id: *custom_id,
180 },
181 nullable: self.nullable || other.nullable,
182 })
183 }
184 _ => bail!(
185 "Can't union types: {:?} and {:?}",
186 self.scalar_type,
187 other.scalar_type
188 ),
189 }
190 }
191
192 pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
202 self.sql_union(other).or_else(|e| {
203 let repr_self = ReprColumnType::from(self);
204 let repr_other = ReprColumnType::from(other);
205 match repr_self.union(&repr_other) {
206 Ok(typ) => {
207 soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
210 Ok(SqlColumnType::from_repr(&typ))
211 }
212 Err(_) => {
213 Err(e)
216 }
217 }
218 })
219 }
220
221 pub fn union(&self, other: &Self) -> Self {
226 self.try_union(other).unwrap_or_else(|e| {
227 panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
228 })
229 }
230
231 pub fn nullable(mut self, nullable: bool) -> Self {
234 self.nullable = nullable;
235 self
236 }
237}
238
239impl RustType<ProtoColumnType> for SqlColumnType {
240 fn into_proto(&self) -> ProtoColumnType {
241 ProtoColumnType {
242 nullable: self.nullable,
243 scalar_type: Some(self.scalar_type.into_proto()),
244 }
245 }
246
247 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
248 Ok(SqlColumnType {
249 nullable: proto.nullable,
250 scalar_type: proto
251 .scalar_type
252 .into_rust_if_some("ProtoColumnType::scalar_type")?,
253 })
254 }
255}
256
257impl fmt::Display for SqlColumnType {
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 let nullable = if self.nullable { "Null" } else { "NotNull" };
260 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
261 }
262}
263
264#[derive(
266 Clone,
267 Debug,
268 Eq,
269 PartialEq,
270 Ord,
271 PartialOrd,
272 Serialize,
273 Deserialize,
274 Hash,
275 MzReflect
276)]
277#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
278pub struct SqlRelationType {
279 pub column_types: Vec<SqlColumnType>,
281 #[serde(default)]
291 pub keys: Vec<Vec<usize>>,
292}
293
294impl SqlRelationType {
295 pub fn empty() -> Self {
298 SqlRelationType::new(vec![])
299 }
300
301 pub fn new(column_types: Vec<SqlColumnType>) -> Self {
305 SqlRelationType {
306 column_types,
307 keys: Vec::new(),
308 }
309 }
310
311 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
313 indices.sort_unstable();
314 if !self.keys.contains(&indices) {
315 self.keys.push(indices);
316 }
317 self
318 }
319
320 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
321 for key in keys {
322 self = self.with_key(key)
323 }
324 self
325 }
326
327 pub fn arity(&self) -> usize {
329 self.column_types.len()
330 }
331
332 pub fn default_key(&self) -> Vec<usize> {
334 if let Some(key) = self.keys.first() {
335 if key.is_empty() {
336 (0..self.column_types.len()).collect()
337 } else {
338 key.clone()
339 }
340 } else {
341 (0..self.column_types.len()).collect()
342 }
343 }
344
345 pub fn columns(&self) -> &[SqlColumnType] {
347 &self.column_types
348 }
349
350 pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
354 assert_eq!(
355 backport_typ.column_types.len(),
356 self.column_types.len(),
357 "HIR and MIR types should have the same number of columns"
358 );
359 for (backport_col, sql_col) in backport_typ
360 .column_types
361 .iter()
362 .zip_eq(self.column_types.iter_mut())
363 {
364 sql_col.backport_nullability(backport_col);
365 }
366
367 self.keys = backport_typ.keys.clone();
368 }
369
370 pub fn from_repr(repr: &ReprRelationType) -> Self {
374 SqlRelationType {
375 column_types: repr
376 .column_types
377 .iter()
378 .map(SqlColumnType::from_repr)
379 .collect(),
380 keys: repr.keys.clone(),
381 }
382 }
383}
384
385impl RustType<ProtoRelationType> for SqlRelationType {
386 fn into_proto(&self) -> ProtoRelationType {
387 ProtoRelationType {
388 column_types: self.column_types.into_proto(),
389 keys: self.keys.into_proto(),
390 }
391 }
392
393 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
394 Ok(SqlRelationType {
395 column_types: proto.column_types.into_rust()?,
396 keys: proto.keys.into_rust()?,
397 })
398 }
399}
400
401impl RustType<ProtoKey> for Vec<usize> {
402 fn into_proto(&self) -> ProtoKey {
403 ProtoKey {
404 keys: self.into_proto(),
405 }
406 }
407
408 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
409 proto.keys.into_rust()
410 }
411}
412
413#[derive(
415 Clone,
416 Debug,
417 Eq,
418 PartialEq,
419 Ord,
420 PartialOrd,
421 Serialize,
422 Deserialize,
423 Hash,
424 MzReflect
425)]
426pub struct ReprRelationType {
427 pub column_types: Vec<ReprColumnType>,
429 #[serde(default)]
439 pub keys: Vec<Vec<usize>>,
440}
441
442impl ReprRelationType {
443 pub fn empty() -> Self {
446 ReprRelationType::new(vec![])
447 }
448
449 pub fn new(column_types: Vec<ReprColumnType>) -> Self {
453 ReprRelationType {
454 column_types,
455 keys: Vec::new(),
456 }
457 }
458
459 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
461 indices.sort_unstable();
462 if !self.keys.contains(&indices) {
463 self.keys.push(indices);
464 }
465 self
466 }
467
468 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
469 for key in keys {
470 self = self.with_key(key)
471 }
472 self
473 }
474
475 pub fn arity(&self) -> usize {
477 self.column_types.len()
478 }
479
480 pub fn default_key(&self) -> Vec<usize> {
482 if let Some(key) = self.keys.first() {
483 if key.is_empty() {
484 (0..self.column_types.len()).collect()
485 } else {
486 key.clone()
487 }
488 } else {
489 (0..self.column_types.len()).collect()
490 }
491 }
492
493 pub fn columns(&self) -> &[ReprColumnType] {
495 &self.column_types
496 }
497}
498
499impl From<&SqlRelationType> for ReprRelationType {
500 fn from(sql_relation_type: &SqlRelationType) -> Self {
501 ReprRelationType {
502 column_types: sql_relation_type
503 .column_types
504 .iter()
505 .map(ReprColumnType::from)
506 .collect(),
507 keys: sql_relation_type.keys.clone(),
508 }
509 }
510}
511
512#[derive(
513 Clone,
514 Debug,
515 Eq,
516 PartialEq,
517 Ord,
518 PartialOrd,
519 Serialize,
520 Deserialize,
521 Hash,
522 MzReflect
523)]
524pub struct ReprColumnType {
525 pub scalar_type: ReprScalarType,
527 #[serde(default = "return_true")]
529 pub nullable: bool,
530}
531
532impl std::fmt::Display for ReprColumnType {
533 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534 write!(f, "{}", self.scalar_type)?;
535 if self.nullable {
536 write!(f, "?")?;
537 }
538 Ok(())
539 }
540}
541
542impl ReprColumnType {
543 pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
550 let scalar_type = self.scalar_type.union(&col.scalar_type)?;
551 let nullable = self.nullable || col.nullable;
552
553 Ok(ReprColumnType {
554 scalar_type,
555 nullable,
556 })
557 }
558}
559
560impl From<&SqlColumnType> for ReprColumnType {
561 fn from(sql_column_type: &SqlColumnType) -> Self {
562 let scalar_type = &sql_column_type.scalar_type;
563 let scalar_type = scalar_type.into();
564 let nullable = sql_column_type.nullable;
565
566 ReprColumnType {
567 scalar_type,
568 nullable,
569 }
570 }
571}
572
573impl SqlColumnType {
574 pub fn from_repr(repr: &ReprColumnType) -> Self {
578 let scalar_type = &repr.scalar_type;
579 let scalar_type = SqlScalarType::from_repr(scalar_type);
580 let nullable = repr.nullable;
581
582 SqlColumnType {
583 scalar_type,
584 nullable,
585 }
586 }
587}
588
589#[derive(
591 Clone,
592 Debug,
593 Eq,
594 PartialEq,
595 Ord,
596 PartialOrd,
597 Serialize,
598 Deserialize,
599 Hash,
600 MzReflect
601)]
602pub struct ColumnName(Box<str>);
603
604impl ColumnName {
605 #[inline(always)]
607 pub fn as_str(&self) -> &str {
608 &*self
609 }
610
611 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
613 &mut self.0
614 }
615
616 pub fn is_similar(&self, other: &ColumnName) -> bool {
618 const SIMILARITY_THRESHOLD: f64 = 0.6;
619
620 let a_lowercase = self.to_lowercase();
621 let b_lowercase = other.to_lowercase();
622
623 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
624 }
625}
626
627impl std::ops::Deref for ColumnName {
628 type Target = str;
629
630 #[inline(always)]
631 fn deref(&self) -> &Self::Target {
632 &self.0
633 }
634}
635
636impl fmt::Display for ColumnName {
637 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
638 f.write_str(&self.0)
639 }
640}
641
642impl From<String> for ColumnName {
643 fn from(s: String) -> ColumnName {
644 ColumnName(s.into())
645 }
646}
647
648impl From<&str> for ColumnName {
649 fn from(s: &str) -> ColumnName {
650 ColumnName(s.into())
651 }
652}
653
654impl From<&ColumnName> for ColumnName {
655 fn from(n: &ColumnName) -> ColumnName {
656 n.clone()
657 }
658}
659
660impl RustType<ProtoColumnName> for ColumnName {
661 fn into_proto(&self) -> ProtoColumnName {
662 ProtoColumnName {
663 value: Some(self.0.to_string()),
664 }
665 }
666
667 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
668 Ok(ColumnName(
669 proto
670 .value
671 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
672 .into(),
673 ))
674 }
675}
676
677impl From<ColumnName> for mz_sql_parser::ast::Ident {
678 fn from(value: ColumnName) -> Self {
679 mz_sql_parser::ast::Ident::new_unchecked(value.0)
681 }
682}
683
684#[cfg(any(test, feature = "proptest"))]
685impl proptest::arbitrary::Arbitrary for ColumnName {
686 type Parameters = ();
687 type Strategy = BoxedStrategy<ColumnName>;
688
689 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
690 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
693 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
694 weights.extend([
695 (5, Just(16..128)),
696 (1, Just(128..1024)),
697 (1, Just(1024..4096)),
698 ]);
699 }
700 let name_length = Union::new_weighted(weights);
701
702 let char_strat = Rc::new(Union::new_weighted(vec![
705 (50, proptest::char::range('A', 'z').boxed()),
706 (1, any::<char>().boxed()),
707 ]));
708
709 name_length
710 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
711 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
712 .no_shrink()
713 .boxed()
714 }
715}
716
717pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
719
720#[derive(
722 Clone,
723 Copy,
724 Debug,
725 Eq,
726 PartialEq,
727 PartialOrd,
728 Ord,
729 Serialize,
730 Deserialize,
731 Hash,
732 MzReflect
733)]
734pub struct ColumnIndex(usize);
735
736#[cfg(any(test, feature = "proptest"))]
737static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
738
739impl ColumnIndex {
740 pub fn to_stable_name(&self) -> String {
742 self.0.to_string()
743 }
744
745 pub fn to_raw(&self) -> usize {
746 self.0
747 }
748
749 pub fn from_raw(val: usize) -> Self {
750 ColumnIndex(val)
751 }
752}
753
754#[derive(
756 Clone,
757 Copy,
758 Debug,
759 Eq,
760 PartialEq,
761 PartialOrd,
762 Ord,
763 Serialize,
764 Deserialize,
765 Hash,
766 MzReflect
767)]
768#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
769pub struct RelationVersion(u64);
770
771impl RelationVersion {
772 pub fn root() -> Self {
774 RelationVersion(0)
775 }
776
777 pub fn bump(&self) -> Self {
779 let next_version = self
780 .0
781 .checked_add(1)
782 .expect("added more than u64::MAX columns?");
783 RelationVersion(next_version)
784 }
785
786 pub fn into_raw(self) -> u64 {
790 self.0
791 }
792
793 pub fn from_raw(val: u64) -> RelationVersion {
797 RelationVersion(val)
798 }
799}
800
801impl From<RelationVersion> for SchemaId {
802 fn from(value: RelationVersion) -> Self {
803 SchemaId(usize::cast_from(value.0))
804 }
805}
806
807impl From<mz_sql_parser::ast::Version> for RelationVersion {
808 fn from(value: mz_sql_parser::ast::Version) -> Self {
809 RelationVersion(value.into_inner())
810 }
811}
812
813impl fmt::Display for RelationVersion {
814 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815 write!(f, "v{}", self.0)
816 }
817}
818
819impl From<RelationVersion> for mz_sql_parser::ast::Version {
820 fn from(value: RelationVersion) -> Self {
821 mz_sql_parser::ast::Version::new(value.0)
822 }
823}
824
825impl RustType<ProtoRelationVersion> for RelationVersion {
826 fn into_proto(&self) -> ProtoRelationVersion {
827 ProtoRelationVersion { value: self.0 }
828 }
829
830 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
831 Ok(RelationVersion(proto.value))
832 }
833}
834
835#[derive(
842 Clone,
843 Copy,
844 Debug,
845 PartialEq,
846 Eq,
847 PartialOrd,
848 Ord,
849 Hash,
850 serde::Serialize
851)]
852pub enum SemanticType {
853 CatalogItemId,
854 GlobalId,
855 ClusterId,
856 ReplicaId,
857 SchemaId,
858 DatabaseId,
859 RoleId,
860 NetworkPolicyId,
861 ShardId,
862 OID,
863 ObjectType,
864 ConnectionType,
865 SourceType,
866 MzTimestamp,
867 WallclockTimestamp,
868 ByteCount,
869 RecordCount,
870 CreditRate,
871 SqlDefinition,
872 RedactedSqlDefinition,
873}
874
875impl fmt::Display for SemanticType {
876 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877 let s = match self {
878 SemanticType::CatalogItemId => "CatalogItemId",
879 SemanticType::GlobalId => "GlobalId",
880 SemanticType::ClusterId => "ClusterId",
881 SemanticType::ReplicaId => "ReplicaId",
882 SemanticType::SchemaId => "SchemaId",
883 SemanticType::DatabaseId => "DatabaseId",
884 SemanticType::RoleId => "RoleId",
885 SemanticType::NetworkPolicyId => "NetworkPolicyId",
886 SemanticType::ShardId => "ShardId",
887 SemanticType::OID => "OID",
888 SemanticType::ObjectType => "ObjectType",
889 SemanticType::ConnectionType => "ConnectionType",
890 SemanticType::SourceType => "SourceType",
891 SemanticType::MzTimestamp => "MzTimestamp",
892 SemanticType::WallclockTimestamp => "WallclockTimestamp",
893 SemanticType::ByteCount => "ByteCount",
894 SemanticType::RecordCount => "RecordCount",
895 SemanticType::CreditRate => "CreditRate",
896 SemanticType::SqlDefinition => "SqlDefinition",
897 SemanticType::RedactedSqlDefinition => "RedactedSqlDefinition",
898 };
899 f.write_str(s)
900 }
901}
902
903#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
905struct ColumnMetadata {
906 name: ColumnName,
908 typ_idx: usize,
910 added: RelationVersion,
912 dropped: Option<RelationVersion>,
914}
915
916#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, MzReflect)]
984pub struct RelationDesc {
985 typ: SqlRelationType,
986 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
987}
988
989impl RustType<ProtoRelationDesc> for RelationDesc {
990 fn into_proto(&self) -> ProtoRelationDesc {
991 let (names, metadata): (Vec<_>, Vec<_>) = self
992 .metadata
993 .values()
994 .map(|meta| {
995 let metadata = ProtoColumnMetadata {
996 added: Some(meta.added.into_proto()),
997 dropped: meta.dropped.map(|v| v.into_proto()),
998 };
999 (meta.name.into_proto(), metadata)
1000 })
1001 .unzip();
1002
1003 let is_all_default_metadata = metadata.iter().all(|meta| {
1009 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
1010 });
1011 let metadata = if is_all_default_metadata {
1012 Vec::new()
1013 } else {
1014 metadata
1015 };
1016
1017 ProtoRelationDesc {
1018 typ: Some(self.typ.into_proto()),
1019 names,
1020 metadata,
1021 }
1022 }
1023
1024 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
1025 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
1031 let val = ProtoColumnMetadata {
1032 added: Some(RelationVersion::root().into_proto()),
1033 dropped: None,
1034 };
1035 Box::new(itertools::repeat_n(val, proto.names.len()))
1036 } else {
1037 Box::new(proto.metadata.into_iter())
1038 };
1039
1040 let metadata = proto
1041 .names
1042 .into_iter()
1043 .zip_eq(proto_metadata)
1044 .enumerate()
1045 .map(|(idx, (name, metadata))| {
1046 let meta = ColumnMetadata {
1047 name: name.into_rust()?,
1048 typ_idx: idx,
1049 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
1050 dropped: metadata.dropped.into_rust()?,
1051 };
1052 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
1053 })
1054 .collect::<Result<_, _>>()?;
1055
1056 Ok(RelationDesc {
1057 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
1058 metadata,
1059 })
1060 }
1061}
1062
1063impl RelationDesc {
1064 pub fn builder() -> RelationDescBuilder {
1066 RelationDescBuilder::default()
1067 }
1068
1069 pub fn empty() -> Self {
1072 RelationDesc {
1073 typ: SqlRelationType::empty(),
1074 metadata: BTreeMap::default(),
1075 }
1076 }
1077
1078 pub fn is_empty(&self) -> bool {
1080 self == &Self::empty()
1081 }
1082
1083 pub fn len(&self) -> usize {
1085 self.typ().column_types.len()
1086 }
1087
1088 pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1096 where
1097 I: IntoIterator<Item = N>,
1098 N: Into<ColumnName>,
1099 {
1100 let metadata: BTreeMap<_, _> = names
1101 .into_iter()
1102 .enumerate()
1103 .map(|(idx, name)| {
1104 let col_idx = ColumnIndex(idx);
1105 let metadata = ColumnMetadata {
1106 name: name.into(),
1107 typ_idx: idx,
1108 added: RelationVersion::root(),
1109 dropped: None,
1110 };
1111 (col_idx, metadata)
1112 })
1113 .collect();
1114
1115 assert_eq!(typ.column_types.len(), metadata.len());
1117
1118 RelationDesc { typ, metadata }
1119 }
1120
1121 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1122 where
1123 I: IntoIterator<Item = (N, T)>,
1124 T: Into<SqlColumnType>,
1125 N: Into<ColumnName>,
1126 {
1127 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1128 let types = types.into_iter().map(Into::into).collect();
1129 let typ = SqlRelationType::new(types);
1130 Self::new(typ, names)
1131 }
1132
1133 pub fn concat(mut self, other: Self) -> Self {
1143 let self_len = self.typ.column_types.len();
1144
1145 for (typ, (_col_idx, meta)) in other.typ.column_types.into_iter().zip_eq(other.metadata) {
1146 assert_eq!(meta.added, RelationVersion::root());
1147 assert_none!(meta.dropped);
1148
1149 let new_idx = self.typ.columns().len();
1150 let new_meta = ColumnMetadata {
1151 name: meta.name,
1152 typ_idx: new_idx,
1153 added: RelationVersion::root(),
1154 dropped: None,
1155 };
1156
1157 self.typ.column_types.push(typ);
1158 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1159
1160 assert_eq!(self.metadata.len(), self.typ.columns().len());
1161 assert_none!(prev);
1162 }
1163
1164 for k in other.typ.keys {
1165 let k = k.into_iter().map(|idx| idx + self_len).collect();
1166 self = self.with_key(k);
1167 }
1168 self
1169 }
1170
1171 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1173 self.typ = self.typ.with_key(indices);
1174 self
1175 }
1176
1177 pub fn without_keys(mut self) -> Self {
1179 self.typ.keys.clear();
1180 self
1181 }
1182
1183 pub fn with_names<I, N>(self, names: I) -> Self
1191 where
1192 I: IntoIterator<Item = N>,
1193 N: Into<ColumnName>,
1194 {
1195 Self::new(self.typ, names)
1196 }
1197
1198 pub fn arity(&self) -> usize {
1200 self.typ.arity()
1201 }
1202
1203 pub fn typ(&self) -> &SqlRelationType {
1205 &self.typ
1206 }
1207
1208 pub fn into_typ(self) -> SqlRelationType {
1210 self.typ
1211 }
1212
1213 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1215 self.metadata.values().map(|meta| {
1216 let typ = &self.typ.columns()[meta.typ_idx];
1217 (&meta.name, typ)
1218 })
1219 }
1220
1221 pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1223 self.typ.column_types.iter()
1224 }
1225
1226 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1228 self.metadata.values().map(|meta| &meta.name)
1229 }
1230
1231 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1233 self.metadata.iter().map(|(col_idx, metadata)| {
1234 let col_typ = &self.typ.columns()[metadata.typ_idx];
1235 (col_idx, &metadata.name, col_typ)
1236 })
1237 }
1238
1239 pub fn iter_similar_names<'a>(
1242 &'a self,
1243 name: &'a ColumnName,
1244 ) -> impl Iterator<Item = &'a ColumnName> {
1245 self.iter_names().filter(|n| n.is_similar(name))
1246 }
1247
1248 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1250 self.metadata.contains_key(idx)
1251 }
1252
1253 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1259 self.iter_names()
1260 .position(|n| n == name)
1261 .map(|i| (i, &self.typ.column_types[i]))
1262 }
1263
1264 pub fn get_name(&self, i: usize) -> &ColumnName {
1272 self.get_name_idx(&ColumnIndex(i))
1274 }
1275
1276 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1282 &self.metadata.get(idx).expect("should exist").name
1283 }
1284
1285 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1291 &mut self
1293 .metadata
1294 .get_mut(&ColumnIndex(i))
1295 .expect("should exist")
1296 .name
1297 }
1298
1299 pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1305 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1306 &self.typ.column_types[typ_idx]
1307 }
1308
1309 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1318 let name = self.get_name(i);
1319 if self.iter_names().filter(|n| *n == name).count() == 1 {
1320 Some(name)
1321 } else {
1322 None
1323 }
1324 }
1325
1326 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1331 let name = self.get_name(i);
1332 let typ = &self.typ.column_types[i];
1333 if d == &Datum::Null && !typ.nullable {
1334 Err(NotNullViolation(name.clone()))
1335 } else {
1336 Ok(())
1337 }
1338 }
1339
1340 pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1355 assert_eq!(self.metadata.len(), self.typ.columns().len());
1356 assert_eq!(other.metadata.len(), other.typ.columns().len());
1357 for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1358 assert_eq!(meta.typ_idx, idx.0);
1359 assert_eq!(meta.added, RelationVersion::root());
1360 assert_none!(meta.dropped);
1361 }
1362
1363 let mut column_diffs = BTreeMap::new();
1364 let mut key_diff = None;
1365
1366 let left_arity = self.arity();
1367 let right_arity = other.arity();
1368 let common_arity = std::cmp::min(left_arity, right_arity);
1369
1370 for idx in 0..common_arity {
1371 let left_name = self.get_name(idx);
1372 let right_name = other.get_name(idx);
1373 let left_type = &self.typ.column_types[idx];
1374 let right_type = &other.typ.column_types[idx];
1375
1376 if left_name != right_name {
1377 let diff = ColumnDiff::NameMismatch {
1378 left: left_name.clone(),
1379 right: right_name.clone(),
1380 };
1381 column_diffs.insert(idx, diff);
1382 } else if left_type.scalar_type != right_type.scalar_type {
1383 let diff = ColumnDiff::TypeMismatch {
1384 name: left_name.clone(),
1385 left: left_type.scalar_type.clone(),
1386 right: right_type.scalar_type.clone(),
1387 };
1388 column_diffs.insert(idx, diff);
1389 } else if left_type.nullable != right_type.nullable {
1390 let diff = ColumnDiff::NullabilityMismatch {
1391 name: left_name.clone(),
1392 left: left_type.nullable,
1393 right: right_type.nullable,
1394 };
1395 column_diffs.insert(idx, diff);
1396 }
1397 }
1398
1399 for idx in common_arity..left_arity {
1400 let diff = ColumnDiff::Missing {
1401 name: self.get_name(idx).clone(),
1402 };
1403 column_diffs.insert(idx, diff);
1404 }
1405
1406 for idx in common_arity..right_arity {
1407 let diff = ColumnDiff::Extra {
1408 name: other.get_name(idx).clone(),
1409 };
1410 column_diffs.insert(idx, diff);
1411 }
1412
1413 let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1414 let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1415 if left_keys != right_keys {
1416 let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1417 keys.iter()
1418 .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1419 .collect()
1420 };
1421 key_diff = Some(KeyDiff {
1422 left: column_names(self, left_keys),
1423 right: column_names(other, right_keys),
1424 });
1425 }
1426
1427 RelationDescDiff {
1428 column_diffs,
1429 key_diff,
1430 }
1431 }
1432
1433 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1435 let mut new_desc = self.clone();
1436
1437 let mut removed = 0;
1439 new_desc.metadata.retain(|idx, metadata| {
1440 let retain = demands.contains(&idx.0);
1441 if !retain {
1442 removed += 1;
1443 } else {
1444 metadata.typ_idx -= removed;
1445 }
1446 retain
1447 });
1448
1449 let mut idx = 0;
1451 new_desc.typ.column_types.retain(|_| {
1452 let keep = demands.contains(&idx);
1453 idx += 1;
1454 keep
1455 });
1456
1457 new_desc
1458 }
1459}
1460
1461#[cfg(any(test, feature = "proptest"))]
1462impl Arbitrary for RelationDesc {
1463 type Parameters = ();
1464 type Strategy = BoxedStrategy<RelationDesc>;
1465
1466 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1467 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1468 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1469 weights.extend([
1470 (12, Just(16..32)),
1471 (6, Just(32..64)),
1472 (3, Just(64..128)),
1473 (1, Just(128..256)),
1474 ]);
1475 }
1476 let num_columns = Union::new_weighted(weights);
1477
1478 num_columns.prop_flat_map(arb_relation_desc).boxed()
1479 }
1480}
1481
1482#[cfg(any(test, feature = "proptest"))]
1485pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1486 proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1487 .prop_map(RelationDesc::from_names_and_types)
1488}
1489
1490#[cfg(any(test, feature = "proptest"))]
1492pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1493 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1494 mask.prop_map(move |mask| {
1495 let demands: BTreeSet<_> = mask
1496 .into_iter()
1497 .enumerate()
1498 .filter_map(|(idx, keep)| keep.then_some(idx))
1499 .collect();
1500 desc.apply_demand(&demands)
1501 })
1502}
1503
1504impl IntoIterator for RelationDesc {
1505 type Item = (ColumnName, SqlColumnType);
1506 type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1507
1508 fn into_iter(self) -> Self::IntoIter {
1509 let iter = self
1510 .metadata
1511 .into_values()
1512 .zip_eq(self.typ.column_types)
1513 .map(|(meta, typ)| (meta.name, typ));
1514 Box::new(iter)
1515 }
1516}
1517
1518#[cfg(any(test, feature = "proptest"))]
1520pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1521 let datums: Vec<_> = desc
1522 .typ()
1523 .columns()
1524 .iter()
1525 .cloned()
1526 .map(arb_datum_for_column)
1527 .collect();
1528 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1529}
1530
1531#[derive(Debug, PartialEq, Eq)]
1533pub struct NotNullViolation(pub ColumnName);
1534
1535impl fmt::Display for NotNullViolation {
1536 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1537 write!(
1538 f,
1539 "null value in column {} violates not-null constraint",
1540 self.0.quoted()
1541 )
1542 }
1543}
1544
1545#[derive(Debug, Clone, PartialEq, Eq)]
1547pub struct RelationDescDiff {
1548 pub column_diffs: BTreeMap<usize, ColumnDiff>,
1550 pub key_diff: Option<KeyDiff>,
1552}
1553
1554impl RelationDescDiff {
1555 pub fn is_empty(&self) -> bool {
1557 self.column_diffs.is_empty() && self.key_diff.is_none()
1558 }
1559}
1560
1561#[derive(Debug, Clone, PartialEq, Eq)]
1563pub enum ColumnDiff {
1564 Missing { name: ColumnName },
1566 Extra { name: ColumnName },
1568 TypeMismatch {
1570 name: ColumnName,
1571 left: SqlScalarType,
1572 right: SqlScalarType,
1573 },
1574 NullabilityMismatch {
1576 name: ColumnName,
1577 left: bool,
1578 right: bool,
1579 },
1580 NameMismatch { left: ColumnName, right: ColumnName },
1582}
1583
1584#[derive(Debug, Clone, PartialEq, Eq)]
1586pub struct KeyDiff {
1587 pub left: BTreeSet<Vec<ColumnName>>,
1589 pub right: BTreeSet<Vec<ColumnName>>,
1591}
1592
1593#[derive(Clone, Default, Debug, PartialEq, Eq)]
1595pub struct RelationDescBuilder {
1596 columns: Vec<(ColumnName, SqlColumnType)>,
1598 keys: Vec<Vec<usize>>,
1600}
1601
1602impl RelationDescBuilder {
1603 pub fn with_column<N: Into<ColumnName>>(
1605 mut self,
1606 name: N,
1607 ty: SqlColumnType,
1608 ) -> RelationDescBuilder {
1609 let name = name.into();
1610 self.columns.push((name, ty));
1611 self
1612 }
1613
1614 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1616 where
1617 I: IntoIterator<Item = (N, T)>,
1618 T: Into<SqlColumnType>,
1619 N: Into<ColumnName>,
1620 {
1621 self.columns
1622 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1623 self
1624 }
1625
1626 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1628 indices.sort_unstable();
1629 if !self.keys.contains(&indices) {
1630 self.keys.push(indices);
1631 }
1632 self
1633 }
1634
1635 pub fn without_keys(mut self) -> RelationDescBuilder {
1637 self.keys.clear();
1638 assert_eq!(self.keys.len(), 0);
1639 self
1640 }
1641
1642 pub fn concat(mut self, other: Self) -> Self {
1644 let self_len = self.columns.len();
1645
1646 self.columns.extend(other.columns);
1647 for k in other.keys {
1648 let k = k.into_iter().map(|idx| idx + self_len).collect();
1649 self = self.with_key(k);
1650 }
1651
1652 self
1653 }
1654
1655 pub fn finish(self) -> RelationDesc {
1657 let mut desc = RelationDesc::from_names_and_types(self.columns);
1658 desc.typ = desc.typ.with_keys(self.keys);
1659 desc
1660 }
1661}
1662
1663#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1665pub enum RelationVersionSelector {
1666 Specific(RelationVersion),
1667 Latest,
1668}
1669
1670impl RelationVersionSelector {
1671 pub fn specific(version: u64) -> Self {
1672 RelationVersionSelector::Specific(RelationVersion(version))
1673 }
1674}
1675
1676#[derive(Debug, Clone, Serialize)]
1682pub struct VersionedRelationDesc {
1683 inner: RelationDesc,
1684}
1685
1686impl VersionedRelationDesc {
1687 pub fn new(inner: RelationDesc) -> Self {
1688 VersionedRelationDesc { inner }
1689 }
1690
1691 #[must_use]
1699 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1700 where
1701 N: Into<ColumnName>,
1702 T: Into<SqlColumnType>,
1703 {
1704 let latest_version = self.latest_version();
1705 let new_version = latest_version.bump();
1706
1707 let name = name.into();
1708 let existing = self
1709 .inner
1710 .metadata
1711 .iter()
1712 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1713 if let Some(existing) = existing {
1714 panic!("column named '{name}' already exists! {existing:?}");
1715 }
1716
1717 let next_idx = self.inner.metadata.len();
1718 let col_meta = ColumnMetadata {
1719 name,
1720 typ_idx: next_idx,
1721 added: new_version,
1722 dropped: None,
1723 };
1724
1725 self.inner.typ.column_types.push(typ.into());
1726 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1727
1728 assert_none!(prev, "column index overlap!");
1729 self.validate();
1730
1731 new_version
1732 }
1733
1734 #[must_use]
1743 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1744 where
1745 N: Into<ColumnName>,
1746 {
1747 let name = name.into();
1748 let latest_version = self.latest_version();
1749 let new_version = latest_version.bump();
1750
1751 let col = self
1752 .inner
1753 .metadata
1754 .values_mut()
1755 .find(|meta| meta.name == name && meta.dropped.is_none())
1756 .expect("column to exist");
1757
1758 assert_none!(col.dropped, "column was already dropped");
1760 col.dropped = Some(new_version);
1761
1762 let dropped_key = self
1764 .inner
1765 .typ
1766 .keys
1767 .iter()
1768 .any(|keys| keys.contains(&col.typ_idx));
1769 assert!(!dropped_key, "column being dropped was used as a key");
1770
1771 self.validate();
1772 new_version
1773 }
1774
1775 pub fn latest(&self) -> RelationDesc {
1777 self.inner.clone()
1778 }
1779
1780 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1782 let up_to_version = match version {
1784 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1785 RelationVersionSelector::Specific(v) => v,
1786 };
1787
1788 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1789 let added = meta.added <= up_to_version;
1790 let dropped = meta
1791 .dropped
1792 .map(|dropped_at| up_to_version >= dropped_at)
1793 .unwrap_or(false);
1794
1795 added && !dropped
1796 });
1797
1798 let mut column_types = Vec::new();
1799 let mut column_metas = BTreeMap::new();
1800
1801 for (col_idx, meta) in valid_columns {
1808 let new_meta = ColumnMetadata {
1809 name: meta.name.clone(),
1810 typ_idx: column_types.len(),
1811 added: meta.added.clone(),
1812 dropped: meta.dropped.clone(),
1813 };
1814 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1815 column_metas.insert(*col_idx, new_meta);
1816 }
1817
1818 let keys = self
1824 .inner
1825 .typ
1826 .keys
1827 .iter()
1828 .map(|keys| {
1829 keys.iter()
1830 .map(|key_idx| {
1831 let metadata = column_metas
1832 .get(&ColumnIndex(*key_idx))
1833 .expect("found key for column that doesn't exist");
1834 metadata.typ_idx
1835 })
1836 .collect()
1837 })
1838 .collect();
1839
1840 let relation_type = SqlRelationType { column_types, keys };
1841
1842 RelationDesc {
1843 typ: relation_type,
1844 metadata: column_metas,
1845 }
1846 }
1847
1848 pub fn latest_version(&self) -> RelationVersion {
1849 self.inner
1850 .metadata
1851 .values()
1852 .map(|meta| meta.dropped.unwrap_or(meta.added))
1854 .max()
1855 .unwrap_or_else(RelationVersion::root)
1857 }
1858
1859 fn validate(&self) {
1865 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1866 if desc.typ.column_types.len() != desc.metadata.len() {
1867 anyhow::bail!("mismatch between number of types and metadatas");
1868 }
1869
1870 for (col_idx, meta) in &desc.metadata {
1871 if col_idx.0 > desc.metadata.len() {
1872 anyhow::bail!("column index out of bounds");
1873 }
1874 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1875 anyhow::bail!("column was added after it was dropped?");
1876 }
1877 if desc.typ().columns().get(meta.typ_idx).is_none() {
1878 anyhow::bail!("typ_idx incorrect");
1879 }
1880 }
1881
1882 for keys in &desc.typ.keys {
1883 for key in keys {
1884 if *key >= desc.typ.column_types.len() {
1885 anyhow::bail!("key index was out of bounds!");
1886 }
1887 }
1888 }
1889
1890 let versions = desc
1891 .metadata
1892 .values()
1893 .map(|meta| meta.dropped.unwrap_or(meta.added));
1894 let mut max = 0;
1895 let mut sum = 0;
1896 for version in versions {
1897 max = std::cmp::max(max, version.0);
1898 sum += version.0;
1899 }
1900
1901 if sum != (max * (max + 1) / 2) {
1911 anyhow::bail!("there is a duplicate or missing relation version");
1912 }
1913
1914 Ok(())
1915 }
1916
1917 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1918 }
1919}
1920
1921#[derive(Debug)]
1924#[cfg(any(test, feature = "proptest"))]
1925pub enum PropRelationDescDiff {
1926 AddColumn {
1927 name: ColumnName,
1928 typ: SqlColumnType,
1929 },
1930 DropColumn {
1931 name: ColumnName,
1932 },
1933 ToggleNullability {
1934 name: ColumnName,
1935 },
1936 ChangeType {
1937 name: ColumnName,
1938 typ: SqlColumnType,
1939 },
1940}
1941
1942#[cfg(any(test, feature = "proptest"))]
1943impl PropRelationDescDiff {
1944 pub fn apply(self, desc: &mut RelationDesc) {
1945 match self {
1946 PropRelationDescDiff::AddColumn { name, typ } => {
1947 let new_idx = desc.metadata.len();
1948 let meta = ColumnMetadata {
1949 name,
1950 typ_idx: new_idx,
1951 added: RelationVersion(0),
1952 dropped: None,
1953 };
1954 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1955 desc.typ.column_types.push(typ);
1956
1957 assert_none!(prev);
1958 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1959 }
1960 PropRelationDescDiff::DropColumn { name } => {
1961 let next_version = desc
1962 .metadata
1963 .values()
1964 .map(|meta| meta.dropped.unwrap_or(meta.added))
1965 .max()
1966 .unwrap_or_else(RelationVersion::root)
1967 .bump();
1968 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1969 else {
1970 return;
1971 };
1972 if metadata.dropped.is_none() {
1973 metadata.dropped = Some(next_version);
1974 }
1975 }
1976 PropRelationDescDiff::ToggleNullability { name } => {
1977 let Some((pos, _)) = desc.get_by_name(&name) else {
1978 return;
1979 };
1980 let col_type = desc
1981 .typ
1982 .column_types
1983 .get_mut(pos)
1984 .expect("ColumnNames and SqlColumnTypes out of sync!");
1985 col_type.nullable = !col_type.nullable;
1986 }
1987 PropRelationDescDiff::ChangeType { name, typ } => {
1988 let Some((pos, _)) = desc.get_by_name(&name) else {
1989 return;
1990 };
1991 let col_type = desc
1992 .typ
1993 .column_types
1994 .get_mut(pos)
1995 .expect("ColumnNames and SqlColumnTypes out of sync!");
1996 *col_type = typ;
1997 }
1998 }
1999 }
2000}
2001
2002#[cfg(any(test, feature = "proptest"))]
2004pub fn arb_relation_desc_diff(
2005 source: &RelationDesc,
2006) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
2007 let source = Rc::new(source.clone());
2008 let num_source_columns = source.typ.columns().len();
2009
2010 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
2011 let add_columns_strat = num_add_columns
2012 .prop_flat_map(|num_columns| {
2013 proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
2014 })
2015 .prop_map(|cols| {
2016 cols.into_iter()
2017 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
2018 .collect::<Vec<_>>()
2019 });
2020
2021 if num_source_columns == 0 {
2023 return add_columns_strat.boxed();
2024 }
2025
2026 let source_ = Rc::clone(&source);
2027 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2028 let mut set = BTreeSet::default();
2029 for _ in 0..num_columns {
2030 let col_idx = rng.random_range(0..num_source_columns);
2031 set.insert(source_.get_name(col_idx).clone());
2032 }
2033 set.into_iter()
2034 .map(|name| PropRelationDescDiff::DropColumn { name })
2035 .collect::<Vec<_>>()
2036 });
2037
2038 let source_ = Rc::clone(&source);
2039 let toggle_nullability_strat =
2040 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2041 let mut set = BTreeSet::default();
2042 for _ in 0..num_columns {
2043 let col_idx = rng.random_range(0..num_source_columns);
2044 set.insert(source_.get_name(col_idx).clone());
2045 }
2046 set.into_iter()
2047 .map(|name| PropRelationDescDiff::ToggleNullability { name })
2048 .collect::<Vec<_>>()
2049 });
2050
2051 let source_ = Rc::clone(&source);
2052 let change_type_strat = (0..num_source_columns)
2053 .prop_perturb(move |num_columns, mut rng| {
2054 let mut set = BTreeSet::default();
2055 for _ in 0..num_columns {
2056 let col_idx = rng.random_range(0..num_source_columns);
2057 set.insert(source_.get_name(col_idx).clone());
2058 }
2059 set
2060 })
2061 .prop_flat_map(|cols| {
2062 proptest::collection::vec(any::<SqlColumnType>(), cols.len())
2063 .prop_map(move |types| (cols.clone(), types))
2064 })
2065 .prop_map(|(cols, types)| {
2066 cols.into_iter()
2067 .zip_eq(types)
2068 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
2069 .collect::<Vec<_>>()
2070 });
2071
2072 (
2073 add_columns_strat,
2074 drop_columns_strat,
2075 toggle_nullability_strat,
2076 change_type_strat,
2077 )
2078 .prop_map(|(adds, drops, toggles, changes)| {
2079 adds.into_iter()
2080 .chain(drops)
2081 .chain(toggles)
2082 .chain(changes)
2083 .collect::<Vec<_>>()
2084 })
2085 .prop_shuffle()
2086 .boxed()
2087}
2088
2089#[cfg(test)]
2090mod tests {
2091 use super::*;
2092 use prost::Message;
2093
2094 #[mz_ore::test]
2095 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
2097 let desc = RelationDesc::builder()
2098 .with_column("a", SqlScalarType::Bool.nullable(true))
2099 .with_column("z", SqlScalarType::String.nullable(false))
2100 .finish();
2101
2102 let mut versioned_desc = VersionedRelationDesc {
2103 inner: desc.clone(),
2104 };
2105 versioned_desc.validate();
2106
2107 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2108 assert_eq!(desc, latest);
2109
2110 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2111 assert_eq!(desc, v0);
2112
2113 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2114 assert_eq!(desc, v3);
2115
2116 let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2117 assert_eq!(v1, RelationVersion(1));
2118
2119 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2120 insta::assert_json_snapshot!(v1.metadata, @r###"
2121 {
2122 "0": {
2123 "name": "a",
2124 "typ_idx": 0,
2125 "added": 0,
2126 "dropped": null
2127 },
2128 "1": {
2129 "name": "z",
2130 "typ_idx": 1,
2131 "added": 0,
2132 "dropped": null
2133 },
2134 "2": {
2135 "name": "b",
2136 "typ_idx": 2,
2137 "added": 1,
2138 "dropped": null
2139 }
2140 }
2141 "###);
2142
2143 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2145 assert!(v0.iter().eq(v0_b.iter()));
2146
2147 let v2 = versioned_desc.drop_column("z");
2148 assert_eq!(v2, RelationVersion(2));
2149
2150 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2151 insta::assert_json_snapshot!(v2.metadata, @r###"
2152 {
2153 "0": {
2154 "name": "a",
2155 "typ_idx": 0,
2156 "added": 0,
2157 "dropped": null
2158 },
2159 "2": {
2160 "name": "b",
2161 "typ_idx": 1,
2162 "added": 1,
2163 "dropped": null
2164 }
2165 }
2166 "###);
2167
2168 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2170 assert!(v0.iter().eq(v0_c.iter()));
2171
2172 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2173 assert!(v1.iter().eq(v1_b.iter()));
2174
2175 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2176 {
2177 "0": {
2178 "name": "a",
2179 "typ_idx": 0,
2180 "added": 0,
2181 "dropped": null
2182 },
2183 "1": {
2184 "name": "z",
2185 "typ_idx": 1,
2186 "added": 0,
2187 "dropped": 2
2188 },
2189 "2": {
2190 "name": "b",
2191 "typ_idx": 2,
2192 "added": 1,
2193 "dropped": null
2194 }
2195 }
2196 "###);
2197 }
2198
2199 #[mz_ore::test]
2200 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
2202 let desc = RelationDesc::builder()
2203 .with_column("a", SqlScalarType::Bool.nullable(true))
2204 .with_column("z", SqlScalarType::String.nullable(false))
2205 .with_key(vec![1])
2206 .finish();
2207
2208 let mut versioned_desc = VersionedRelationDesc {
2209 inner: desc.clone(),
2210 };
2211 versioned_desc.validate();
2212
2213 let v1 = versioned_desc.drop_column("a");
2214 assert_eq!(v1, RelationVersion(1));
2215
2216 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2218 insta::assert_json_snapshot!(v1, @r###"
2219 {
2220 "typ": {
2221 "column_types": [
2222 {
2223 "scalar_type": "String",
2224 "nullable": false
2225 }
2226 ],
2227 "keys": [
2228 [
2229 0
2230 ]
2231 ]
2232 },
2233 "metadata": {
2234 "1": {
2235 "name": "z",
2236 "typ_idx": 0,
2237 "added": 0,
2238 "dropped": null
2239 }
2240 }
2241 }
2242 "###);
2243
2244 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2246 insta::assert_json_snapshot!(v0, @r###"
2247 {
2248 "typ": {
2249 "column_types": [
2250 {
2251 "scalar_type": "Bool",
2252 "nullable": true
2253 },
2254 {
2255 "scalar_type": "String",
2256 "nullable": false
2257 }
2258 ],
2259 "keys": [
2260 [
2261 1
2262 ]
2263 ]
2264 },
2265 "metadata": {
2266 "0": {
2267 "name": "a",
2268 "typ_idx": 0,
2269 "added": 0,
2270 "dropped": 1
2271 },
2272 "1": {
2273 "name": "z",
2274 "typ_idx": 1,
2275 "added": 0,
2276 "dropped": null
2277 }
2278 }
2279 }
2280 "###);
2281 }
2282
2283 #[mz_ore::test]
2284 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
2286 let typ = ProtoRelationType {
2287 column_types: vec![
2288 SqlScalarType::String.nullable(false).into_proto(),
2289 SqlScalarType::Bool.nullable(true).into_proto(),
2290 ],
2291 keys: vec![],
2292 };
2293 let proto = ProtoRelationDesc {
2294 typ: Some(typ),
2295 names: vec![
2296 ColumnName("a".into()).into_proto(),
2297 ColumnName("b".into()).into_proto(),
2298 ],
2299 metadata: vec![],
2300 };
2301 let desc: RelationDesc = proto.into_rust().unwrap();
2302
2303 insta::assert_json_snapshot!(desc, @r###"
2304 {
2305 "typ": {
2306 "column_types": [
2307 {
2308 "scalar_type": "String",
2309 "nullable": false
2310 },
2311 {
2312 "scalar_type": "Bool",
2313 "nullable": true
2314 }
2315 ],
2316 "keys": []
2317 },
2318 "metadata": {
2319 "0": {
2320 "name": "a",
2321 "typ_idx": 0,
2322 "added": 0,
2323 "dropped": null
2324 },
2325 "1": {
2326 "name": "b",
2327 "typ_idx": 1,
2328 "added": 0,
2329 "dropped": null
2330 }
2331 }
2332 }
2333 "###);
2334 }
2335
2336 #[mz_ore::test]
2337 #[should_panic(expected = "column named 'a' already exists!")]
2338 fn test_add_column_with_same_name_panics() {
2339 let desc = RelationDesc::builder()
2340 .with_column("a", SqlScalarType::Bool.nullable(true))
2341 .finish();
2342 let mut versioned = VersionedRelationDesc::new(desc);
2343
2344 let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2345 }
2346
2347 #[mz_ore::test]
2348 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
2350 let desc = RelationDesc::builder()
2351 .with_column("a", SqlScalarType::Bool.nullable(true))
2352 .finish();
2353 let mut versioned = VersionedRelationDesc::new(desc);
2354
2355 let v1 = versioned.drop_column("a");
2356 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2357 insta::assert_json_snapshot!(v1, @r###"
2358 {
2359 "typ": {
2360 "column_types": [],
2361 "keys": []
2362 },
2363 "metadata": {}
2364 }
2365 "###);
2366
2367 let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2368 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2369 insta::assert_json_snapshot!(v2, @r###"
2370 {
2371 "typ": {
2372 "column_types": [
2373 {
2374 "scalar_type": "String",
2375 "nullable": false
2376 }
2377 ],
2378 "keys": []
2379 },
2380 "metadata": {
2381 "1": {
2382 "name": "a",
2383 "typ_idx": 0,
2384 "added": 2,
2385 "dropped": null
2386 }
2387 }
2388 }
2389 "###);
2390 }
2391
2392 #[mz_ore::test]
2393 #[cfg_attr(miri, ignore)]
2394 fn apply_demand() {
2395 let desc = RelationDesc::builder()
2396 .with_column("a", SqlScalarType::String.nullable(true))
2397 .with_column("b", SqlScalarType::Int64.nullable(false))
2398 .with_column("c", SqlScalarType::Time.nullable(false))
2399 .finish();
2400 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2401 assert_eq!(desc.arity(), 2);
2402 VersionedRelationDesc::new(desc).validate();
2404 }
2405
2406 #[mz_ore::test]
2407 #[cfg_attr(miri, ignore)]
2408 fn smoketest_column_index_stable_ident() {
2409 let idx_a = ColumnIndex(42);
2410 assert_eq!(idx_a.to_stable_name(), "42");
2412 }
2413
2414 #[mz_ore::test]
2415 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
2417 fn testcase(og: RelationDesc) {
2418 let bytes = og.into_proto().encode_to_vec();
2419 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2420 let rnd = RelationDesc::from_proto(proto).unwrap();
2421
2422 assert_eq!(og, rnd);
2423 }
2424
2425 proptest!(|(desc in any::<RelationDesc>())| {
2426 testcase(desc);
2427 });
2428
2429 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2430 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2431 });
2432
2433 proptest!(|((mut desc, diffs) in strat)| {
2434 for diff in diffs {
2435 diff.apply(&mut desc);
2436 };
2437 testcase(desc);
2438 });
2439 }
2440}