1use std::collections::{BTreeMap, BTreeSet};
11#[cfg(any(test, feature = "proptest"))]
12use std::rc::Rc;
13use std::{fmt, vec};
14
15use anyhow::bail;
16use itertools::Itertools;
17use mz_lowertest::MzReflect;
18use mz_ore::cast::CastFrom;
19use mz_ore::soft_panic_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::{assert_none, assert_ok};
22use mz_persist_types::schema::SchemaId;
23use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
24#[cfg(any(test, feature = "proptest"))]
25use proptest::prelude::*;
26#[cfg(any(test, feature = "proptest"))]
27use proptest::strategy::{Strategy, Union};
28#[cfg(any(test, feature = "proptest"))]
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32#[cfg(any(test, feature = "proptest"))]
33use crate::Row;
34#[cfg(any(test, feature = "proptest"))]
35use crate::arb_datum_for_column;
36use crate::relation_and_scalar::proto_relation_type::ProtoKey;
37pub use crate::relation_and_scalar::{
38 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
39 ProtoRelationVersion,
40};
41use crate::{Datum, ReprScalarType, SqlScalarType};
42
43#[derive(
51 Clone,
52 Debug,
53 Eq,
54 PartialEq,
55 Ord,
56 PartialOrd,
57 Serialize,
58 Deserialize,
59 Hash,
60 MzReflect
61)]
62#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
63pub struct SqlColumnType {
64 pub scalar_type: SqlScalarType,
66 #[serde(default = "return_true")]
68 pub nullable: bool,
69}
70
71#[inline(always)]
77fn return_true() -> bool {
78 true
79}
80
81impl SqlColumnType {
82 pub fn try_union_many<'a>(
86 typs: impl IntoIterator<Item = &'a Self>,
87 ) -> Result<Self, anyhow::Error> {
88 let mut iter = typs.into_iter();
89 let Some(typ) = iter.next() else {
90 bail!("Cannot union empty iterator");
91 };
92 iter.try_fold(typ.clone(), |a, b| a.try_union(b))
93 }
94
95 pub fn union_many<'a>(typs: impl IntoIterator<Item = &'a Self>) -> Self {
100 Self::try_union_many(typs).expect("Cannot union empty iterator")
101 }
102
103 pub fn backport_nullability(&mut self, backport_typ: &ReprColumnType) {
107 self.scalar_type
108 .backport_nullability(&backport_typ.scalar_type);
109 self.nullable = backport_typ.nullable;
110 }
111
112 pub fn sql_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
125 match (&self.scalar_type, &other.scalar_type) {
126 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
127 Ok(SqlColumnType {
128 scalar_type: scalar_type.clone(),
129 nullable: self.nullable || other.nullable,
130 })
131 }
132 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
133 Ok(SqlColumnType {
134 scalar_type: scalar_type.without_modifiers(),
135 nullable: self.nullable || other.nullable,
136 })
137 }
138 (
139 SqlScalarType::Record { fields, custom_id },
140 SqlScalarType::Record {
141 fields: other_fields,
142 custom_id: other_custom_id,
143 },
144 ) => {
145 if custom_id != other_custom_id {
146 bail!(
147 "Can't union types: {:?} and {:?}",
148 self.scalar_type,
149 other.scalar_type
150 );
151 };
152
153 if fields.len() != other_fields.len() {
154 bail!(
155 "Can't union types: {:?} and {:?}",
156 self.scalar_type,
157 other.scalar_type
158 );
159 }
160 let mut union_fields = Vec::with_capacity(fields.len());
161 for ((name, typ), (other_name, other_typ)) in
162 fields.iter().zip_eq(other_fields.iter())
163 {
164 if name != other_name {
165 bail!(
166 "Can't union types: {:?} and {:?}",
167 self.scalar_type,
168 other.scalar_type
169 );
170 } else {
171 let union_column_type = typ.sql_union(other_typ)?;
172 union_fields.push((name.clone(), union_column_type));
173 };
174 }
175
176 Ok(SqlColumnType {
177 scalar_type: SqlScalarType::Record {
178 fields: union_fields.into(),
179 custom_id: *custom_id,
180 },
181 nullable: self.nullable || other.nullable,
182 })
183 }
184 _ => bail!(
185 "Can't union types: {:?} and {:?}",
186 self.scalar_type,
187 other.scalar_type
188 ),
189 }
190 }
191
192 pub fn try_union(&self, other: &Self) -> Result<Self, anyhow::Error> {
202 self.sql_union(other).or_else(|e| {
203 let repr_self = ReprColumnType::from(self);
204 let repr_other = ReprColumnType::from(other);
205 match repr_self.union(&repr_other) {
206 Ok(typ) => {
207 soft_panic_or_log!("repr type error: sql_union({self:?}, {other:?}): {e}");
210 Ok(SqlColumnType::from_repr(&typ))
211 }
212 Err(_) => {
213 Err(e)
216 }
217 }
218 })
219 }
220
221 pub fn union(&self, other: &Self) -> Self {
226 self.try_union(other).unwrap_or_else(|e| {
227 panic!("repr type error: after sql_union({self:?}, {other:?}) error: {e}")
228 })
229 }
230
231 pub fn nullable(mut self, nullable: bool) -> Self {
234 self.nullable = nullable;
235 self
236 }
237}
238
239impl RustType<ProtoColumnType> for SqlColumnType {
240 fn into_proto(&self) -> ProtoColumnType {
241 ProtoColumnType {
242 nullable: self.nullable,
243 scalar_type: Some(self.scalar_type.into_proto()),
244 }
245 }
246
247 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
248 Ok(SqlColumnType {
249 nullable: proto.nullable,
250 scalar_type: proto
251 .scalar_type
252 .into_rust_if_some("ProtoColumnType::scalar_type")?,
253 })
254 }
255}
256
257impl fmt::Display for SqlColumnType {
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 let nullable = if self.nullable { "Null" } else { "NotNull" };
260 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
261 }
262}
263
264#[derive(
266 Clone,
267 Debug,
268 Eq,
269 PartialEq,
270 Ord,
271 PartialOrd,
272 Serialize,
273 Deserialize,
274 Hash,
275 MzReflect
276)]
277#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
278pub struct SqlRelationType {
279 pub column_types: Vec<SqlColumnType>,
281 #[serde(default)]
291 pub keys: Vec<Vec<usize>>,
292}
293
294impl SqlRelationType {
295 pub fn empty() -> Self {
298 SqlRelationType::new(vec![])
299 }
300
301 pub fn new(column_types: Vec<SqlColumnType>) -> Self {
305 SqlRelationType {
306 column_types,
307 keys: Vec::new(),
308 }
309 }
310
311 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
313 indices.sort_unstable();
314 if !self.keys.contains(&indices) {
315 self.keys.push(indices);
316 }
317 self
318 }
319
320 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
321 for key in keys {
322 self = self.with_key(key)
323 }
324 self
325 }
326
327 pub fn arity(&self) -> usize {
329 self.column_types.len()
330 }
331
332 pub fn default_key(&self) -> Vec<usize> {
334 if let Some(key) = self.keys.first() {
335 if key.is_empty() {
336 (0..self.column_types.len()).collect()
337 } else {
338 key.clone()
339 }
340 } else {
341 (0..self.column_types.len()).collect()
342 }
343 }
344
345 pub fn columns(&self) -> &[SqlColumnType] {
347 &self.column_types
348 }
349
350 pub fn backport_nullability_and_keys(&mut self, backport_typ: &ReprRelationType) {
354 assert_eq!(
355 backport_typ.column_types.len(),
356 self.column_types.len(),
357 "HIR and MIR types should have the same number of columns"
358 );
359 for (backport_col, sql_col) in backport_typ
360 .column_types
361 .iter()
362 .zip_eq(self.column_types.iter_mut())
363 {
364 sql_col.backport_nullability(backport_col);
365 }
366
367 self.keys = backport_typ.keys.clone();
368 }
369
370 pub fn from_repr(repr: &ReprRelationType) -> Self {
374 SqlRelationType {
375 column_types: repr
376 .column_types
377 .iter()
378 .map(SqlColumnType::from_repr)
379 .collect(),
380 keys: repr.keys.clone(),
381 }
382 }
383}
384
385impl RustType<ProtoRelationType> for SqlRelationType {
386 fn into_proto(&self) -> ProtoRelationType {
387 ProtoRelationType {
388 column_types: self.column_types.into_proto(),
389 keys: self.keys.into_proto(),
390 }
391 }
392
393 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
394 Ok(SqlRelationType {
395 column_types: proto.column_types.into_rust()?,
396 keys: proto.keys.into_rust()?,
397 })
398 }
399}
400
401impl RustType<ProtoKey> for Vec<usize> {
402 fn into_proto(&self) -> ProtoKey {
403 ProtoKey {
404 keys: self.into_proto(),
405 }
406 }
407
408 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
409 proto.keys.into_rust()
410 }
411}
412
413#[derive(
415 Clone,
416 Debug,
417 Eq,
418 PartialEq,
419 Ord,
420 PartialOrd,
421 Serialize,
422 Deserialize,
423 Hash,
424 MzReflect
425)]
426pub struct ReprRelationType {
427 pub column_types: Vec<ReprColumnType>,
429 #[serde(default)]
439 pub keys: Vec<Vec<usize>>,
440}
441
442impl ReprRelationType {
443 pub fn empty() -> Self {
446 ReprRelationType::new(vec![])
447 }
448
449 pub fn new(column_types: Vec<ReprColumnType>) -> Self {
453 ReprRelationType {
454 column_types,
455 keys: Vec::new(),
456 }
457 }
458
459 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
461 indices.sort_unstable();
462 if !self.keys.contains(&indices) {
463 self.keys.push(indices);
464 }
465 self
466 }
467
468 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
469 for key in keys {
470 self = self.with_key(key)
471 }
472 self
473 }
474
475 pub fn arity(&self) -> usize {
477 self.column_types.len()
478 }
479
480 pub fn default_key(&self) -> Vec<usize> {
482 if let Some(key) = self.keys.first() {
483 if key.is_empty() {
484 (0..self.column_types.len()).collect()
485 } else {
486 key.clone()
487 }
488 } else {
489 (0..self.column_types.len()).collect()
490 }
491 }
492
493 pub fn columns(&self) -> &[ReprColumnType] {
495 &self.column_types
496 }
497}
498
499impl From<&SqlRelationType> for ReprRelationType {
500 fn from(sql_relation_type: &SqlRelationType) -> Self {
501 ReprRelationType {
502 column_types: sql_relation_type
503 .column_types
504 .iter()
505 .map(ReprColumnType::from)
506 .collect(),
507 keys: sql_relation_type.keys.clone(),
508 }
509 }
510}
511
512#[derive(
513 Clone,
514 Debug,
515 Eq,
516 PartialEq,
517 Ord,
518 PartialOrd,
519 Serialize,
520 Deserialize,
521 Hash,
522 MzReflect
523)]
524pub struct ReprColumnType {
525 pub scalar_type: ReprScalarType,
527 #[serde(default = "return_true")]
529 pub nullable: bool,
530}
531
532impl std::fmt::Display for ReprColumnType {
533 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534 write!(f, "{}", self.scalar_type)?;
535 if self.nullable {
536 write!(f, "?")?;
537 }
538 Ok(())
539 }
540}
541
542impl ReprColumnType {
543 pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
550 let scalar_type = self.scalar_type.union(&col.scalar_type)?;
551 let nullable = self.nullable || col.nullable;
552
553 Ok(ReprColumnType {
554 scalar_type,
555 nullable,
556 })
557 }
558}
559
560impl From<&SqlColumnType> for ReprColumnType {
561 fn from(sql_column_type: &SqlColumnType) -> Self {
562 let scalar_type = &sql_column_type.scalar_type;
563 let scalar_type = scalar_type.into();
564 let nullable = sql_column_type.nullable;
565
566 ReprColumnType {
567 scalar_type,
568 nullable,
569 }
570 }
571}
572
573impl SqlColumnType {
574 pub fn from_repr(repr: &ReprColumnType) -> Self {
578 let scalar_type = &repr.scalar_type;
579 let scalar_type = SqlScalarType::from_repr(scalar_type);
580 let nullable = repr.nullable;
581
582 SqlColumnType {
583 scalar_type,
584 nullable,
585 }
586 }
587}
588
589#[derive(
591 Clone,
592 Debug,
593 Eq,
594 PartialEq,
595 Ord,
596 PartialOrd,
597 Serialize,
598 Deserialize,
599 Hash,
600 MzReflect
601)]
602pub struct ColumnName(Box<str>);
603
604impl ColumnName {
605 #[inline(always)]
607 pub fn as_str(&self) -> &str {
608 &*self
609 }
610
611 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
613 &mut self.0
614 }
615
616 pub fn is_similar(&self, other: &ColumnName) -> bool {
618 const SIMILARITY_THRESHOLD: f64 = 0.6;
619
620 let a_lowercase = self.to_lowercase();
621 let b_lowercase = other.to_lowercase();
622
623 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
624 }
625}
626
627impl std::ops::Deref for ColumnName {
628 type Target = str;
629
630 #[inline(always)]
631 fn deref(&self) -> &Self::Target {
632 &self.0
633 }
634}
635
636impl fmt::Display for ColumnName {
637 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
638 f.write_str(&self.0)
639 }
640}
641
642impl From<String> for ColumnName {
643 fn from(s: String) -> ColumnName {
644 ColumnName(s.into())
645 }
646}
647
648impl From<&str> for ColumnName {
649 fn from(s: &str) -> ColumnName {
650 ColumnName(s.into())
651 }
652}
653
654impl From<&ColumnName> for ColumnName {
655 fn from(n: &ColumnName) -> ColumnName {
656 n.clone()
657 }
658}
659
660impl RustType<ProtoColumnName> for ColumnName {
661 fn into_proto(&self) -> ProtoColumnName {
662 ProtoColumnName {
663 value: Some(self.0.to_string()),
664 }
665 }
666
667 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
668 Ok(ColumnName(
669 proto
670 .value
671 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
672 .into(),
673 ))
674 }
675}
676
677impl From<ColumnName> for mz_sql_parser::ast::Ident {
678 fn from(value: ColumnName) -> Self {
679 mz_sql_parser::ast::Ident::new_unchecked(value.0)
681 }
682}
683
684#[cfg(any(test, feature = "proptest"))]
685impl proptest::arbitrary::Arbitrary for ColumnName {
686 type Parameters = ();
687 type Strategy = BoxedStrategy<ColumnName>;
688
689 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
690 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
693 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
694 weights.extend([
695 (5, Just(16..128)),
696 (1, Just(128..1024)),
697 (1, Just(1024..4096)),
698 ]);
699 }
700 let name_length = Union::new_weighted(weights);
701
702 let char_strat = Rc::new(Union::new_weighted(vec![
705 (50, proptest::char::range('A', 'z').boxed()),
706 (1, any::<char>().boxed()),
707 ]));
708
709 name_length
710 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
711 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
712 .no_shrink()
713 .boxed()
714 }
715}
716
717pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
719
720#[derive(
722 Clone,
723 Copy,
724 Debug,
725 Eq,
726 PartialEq,
727 PartialOrd,
728 Ord,
729 Serialize,
730 Deserialize,
731 Hash,
732 MzReflect
733)]
734pub struct ColumnIndex(usize);
735
736#[cfg(any(test, feature = "proptest"))]
737static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
738
739impl ColumnIndex {
740 pub fn to_stable_name(&self) -> String {
742 self.0.to_string()
743 }
744
745 pub fn to_raw(&self) -> usize {
746 self.0
747 }
748
749 pub fn from_raw(val: usize) -> Self {
750 ColumnIndex(val)
751 }
752}
753
754#[derive(
756 Clone,
757 Copy,
758 Debug,
759 Eq,
760 PartialEq,
761 PartialOrd,
762 Ord,
763 Serialize,
764 Deserialize,
765 Hash,
766 MzReflect
767)]
768#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
769pub struct RelationVersion(u64);
770
771impl RelationVersion {
772 pub fn root() -> Self {
774 RelationVersion(0)
775 }
776
777 pub fn bump(&self) -> Self {
779 let next_version = self
780 .0
781 .checked_add(1)
782 .expect("added more than u64::MAX columns?");
783 RelationVersion(next_version)
784 }
785
786 pub fn into_raw(self) -> u64 {
790 self.0
791 }
792
793 pub fn from_raw(val: u64) -> RelationVersion {
797 RelationVersion(val)
798 }
799}
800
801impl From<RelationVersion> for SchemaId {
802 fn from(value: RelationVersion) -> Self {
803 SchemaId(usize::cast_from(value.0))
804 }
805}
806
807impl From<mz_sql_parser::ast::Version> for RelationVersion {
808 fn from(value: mz_sql_parser::ast::Version) -> Self {
809 RelationVersion(value.into_inner())
810 }
811}
812
813impl fmt::Display for RelationVersion {
814 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
815 write!(f, "v{}", self.0)
816 }
817}
818
819impl From<RelationVersion> for mz_sql_parser::ast::Version {
820 fn from(value: RelationVersion) -> Self {
821 mz_sql_parser::ast::Version::new(value.0)
822 }
823}
824
825impl RustType<ProtoRelationVersion> for RelationVersion {
826 fn into_proto(&self) -> ProtoRelationVersion {
827 ProtoRelationVersion { value: self.0 }
828 }
829
830 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
831 Ok(RelationVersion(proto.value))
832 }
833}
834
835#[derive(
842 Clone,
843 Copy,
844 Debug,
845 PartialEq,
846 Eq,
847 PartialOrd,
848 Ord,
849 Hash,
850 serde::Serialize
851)]
852pub enum SemanticType {
853 CatalogItemId,
854 GlobalId,
855 ClusterId,
856 ReplicaId,
857 SchemaId,
858 DatabaseId,
859 RoleId,
860 NetworkPolicyId,
861 ShardId,
862 OID,
863 ObjectType,
864 ConnectionType,
865 SourceType,
866 MzTimestamp,
867 WallclockTimestamp,
868 ByteCount,
869 RecordCount,
870 CreditRate,
871 SqlDefinition,
872 RedactedSqlDefinition,
873}
874
875impl fmt::Display for SemanticType {
876 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
877 let s = match self {
878 SemanticType::CatalogItemId => "CatalogItemId",
879 SemanticType::GlobalId => "GlobalId",
880 SemanticType::ClusterId => "ClusterId",
881 SemanticType::ReplicaId => "ReplicaId",
882 SemanticType::SchemaId => "SchemaId",
883 SemanticType::DatabaseId => "DatabaseId",
884 SemanticType::RoleId => "RoleId",
885 SemanticType::NetworkPolicyId => "NetworkPolicyId",
886 SemanticType::ShardId => "ShardId",
887 SemanticType::OID => "OID",
888 SemanticType::ObjectType => "ObjectType",
889 SemanticType::ConnectionType => "ConnectionType",
890 SemanticType::SourceType => "SourceType",
891 SemanticType::MzTimestamp => "MzTimestamp",
892 SemanticType::WallclockTimestamp => "WallclockTimestamp",
893 SemanticType::ByteCount => "ByteCount",
894 SemanticType::RecordCount => "RecordCount",
895 SemanticType::CreditRate => "CreditRate",
896 SemanticType::SqlDefinition => "SqlDefinition",
897 SemanticType::RedactedSqlDefinition => "RedactedSqlDefinition",
898 };
899 f.write_str(s)
900 }
901}
902
903#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
905struct ColumnMetadata {
906 name: ColumnName,
908 typ_idx: usize,
910 added: RelationVersion,
912 dropped: Option<RelationVersion>,
914}
915
916#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, MzReflect)]
984pub struct RelationDesc {
985 typ: SqlRelationType,
986 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
987}
988
989impl RustType<ProtoRelationDesc> for RelationDesc {
990 fn into_proto(&self) -> ProtoRelationDesc {
991 let (names, metadata): (Vec<_>, Vec<_>) = self
992 .metadata
993 .values()
994 .map(|meta| {
995 let metadata = ProtoColumnMetadata {
996 added: Some(meta.added.into_proto()),
997 dropped: meta.dropped.map(|v| v.into_proto()),
998 };
999 (meta.name.into_proto(), metadata)
1000 })
1001 .unzip();
1002
1003 let is_all_default_metadata = metadata.iter().all(|meta| {
1009 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
1010 });
1011 let metadata = if is_all_default_metadata {
1012 Vec::new()
1013 } else {
1014 metadata
1015 };
1016
1017 ProtoRelationDesc {
1018 typ: Some(self.typ.into_proto()),
1019 names,
1020 metadata,
1021 }
1022 }
1023
1024 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
1025 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
1031 let val = ProtoColumnMetadata {
1032 added: Some(RelationVersion::root().into_proto()),
1033 dropped: None,
1034 };
1035 Box::new(itertools::repeat_n(val, proto.names.len()))
1036 } else {
1037 if proto.names.len() != proto.metadata.len() {
1041 return Err(TryFromProtoError::InvalidFieldError(format!(
1042 "ProtoRelationDesc: names ({}) and metadata ({}) length mismatch",
1043 proto.names.len(),
1044 proto.metadata.len()
1045 )));
1046 }
1047 Box::new(proto.metadata.into_iter())
1048 };
1049
1050 let metadata = proto
1051 .names
1052 .into_iter()
1053 .zip_eq(proto_metadata)
1054 .enumerate()
1055 .map(|(idx, (name, metadata))| {
1056 let meta = ColumnMetadata {
1057 name: name.into_rust()?,
1058 typ_idx: idx,
1059 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
1060 dropped: metadata.dropped.into_rust()?,
1061 };
1062 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
1063 })
1064 .collect::<Result<_, _>>()?;
1065
1066 Ok(RelationDesc {
1067 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
1068 metadata,
1069 })
1070 }
1071}
1072
1073impl RelationDesc {
1074 pub fn builder() -> RelationDescBuilder {
1076 RelationDescBuilder::default()
1077 }
1078
1079 pub fn empty() -> Self {
1082 RelationDesc {
1083 typ: SqlRelationType::empty(),
1084 metadata: BTreeMap::default(),
1085 }
1086 }
1087
1088 pub fn is_empty(&self) -> bool {
1090 self == &Self::empty()
1091 }
1092
1093 pub fn len(&self) -> usize {
1095 self.typ().column_types.len()
1096 }
1097
1098 pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
1106 where
1107 I: IntoIterator<Item = N>,
1108 N: Into<ColumnName>,
1109 {
1110 let metadata: BTreeMap<_, _> = names
1111 .into_iter()
1112 .enumerate()
1113 .map(|(idx, name)| {
1114 let col_idx = ColumnIndex(idx);
1115 let metadata = ColumnMetadata {
1116 name: name.into(),
1117 typ_idx: idx,
1118 added: RelationVersion::root(),
1119 dropped: None,
1120 };
1121 (col_idx, metadata)
1122 })
1123 .collect();
1124
1125 assert_eq!(typ.column_types.len(), metadata.len());
1127
1128 RelationDesc { typ, metadata }
1129 }
1130
1131 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
1132 where
1133 I: IntoIterator<Item = (N, T)>,
1134 T: Into<SqlColumnType>,
1135 N: Into<ColumnName>,
1136 {
1137 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
1138 let types = types.into_iter().map(Into::into).collect();
1139 let typ = SqlRelationType::new(types);
1140 Self::new(typ, names)
1141 }
1142
1143 pub fn concat(mut self, other: Self) -> Self {
1153 let self_len = self.typ.column_types.len();
1154
1155 for (typ, (_col_idx, meta)) in other.typ.column_types.into_iter().zip_eq(other.metadata) {
1156 assert_eq!(meta.added, RelationVersion::root());
1157 assert_none!(meta.dropped);
1158
1159 let new_idx = self.typ.columns().len();
1160 let new_meta = ColumnMetadata {
1161 name: meta.name,
1162 typ_idx: new_idx,
1163 added: RelationVersion::root(),
1164 dropped: None,
1165 };
1166
1167 self.typ.column_types.push(typ);
1168 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
1169
1170 assert_eq!(self.metadata.len(), self.typ.columns().len());
1171 assert_none!(prev);
1172 }
1173
1174 for k in other.typ.keys {
1175 let k = k.into_iter().map(|idx| idx + self_len).collect();
1176 self = self.with_key(k);
1177 }
1178 self
1179 }
1180
1181 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
1183 self.typ = self.typ.with_key(indices);
1184 self
1185 }
1186
1187 pub fn without_keys(mut self) -> Self {
1189 self.typ.keys.clear();
1190 self
1191 }
1192
1193 pub fn with_names<I, N>(self, names: I) -> Self
1201 where
1202 I: IntoIterator<Item = N>,
1203 N: Into<ColumnName>,
1204 {
1205 Self::new(self.typ, names)
1206 }
1207
1208 pub fn arity(&self) -> usize {
1210 self.typ.arity()
1211 }
1212
1213 pub fn typ(&self) -> &SqlRelationType {
1215 &self.typ
1216 }
1217
1218 pub fn into_typ(self) -> SqlRelationType {
1220 self.typ
1221 }
1222
1223 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
1225 self.metadata.values().map(|meta| {
1226 let typ = &self.typ.columns()[meta.typ_idx];
1227 (&meta.name, typ)
1228 })
1229 }
1230
1231 pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
1233 self.typ.column_types.iter()
1234 }
1235
1236 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
1238 self.metadata.values().map(|meta| &meta.name)
1239 }
1240
1241 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
1243 self.metadata.iter().map(|(col_idx, metadata)| {
1244 let col_typ = &self.typ.columns()[metadata.typ_idx];
1245 (col_idx, &metadata.name, col_typ)
1246 })
1247 }
1248
1249 pub fn iter_similar_names<'a>(
1252 &'a self,
1253 name: &'a ColumnName,
1254 ) -> impl Iterator<Item = &'a ColumnName> {
1255 self.iter_names().filter(|n| n.is_similar(name))
1256 }
1257
1258 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1260 self.metadata.contains_key(idx)
1261 }
1262
1263 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1269 self.iter_names()
1270 .position(|n| n == name)
1271 .map(|i| (i, &self.typ.column_types[i]))
1272 }
1273
1274 pub fn get_name(&self, i: usize) -> &ColumnName {
1282 self.get_name_idx(&ColumnIndex(i))
1284 }
1285
1286 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1292 &self.metadata.get(idx).expect("should exist").name
1293 }
1294
1295 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1301 &mut self
1303 .metadata
1304 .get_mut(&ColumnIndex(i))
1305 .expect("should exist")
1306 .name
1307 }
1308
1309 pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1315 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1316 &self.typ.column_types[typ_idx]
1317 }
1318
1319 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1328 let name = self.get_name(i);
1329 if self.iter_names().filter(|n| *n == name).count() == 1 {
1330 Some(name)
1331 } else {
1332 None
1333 }
1334 }
1335
1336 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1341 let name = self.get_name(i);
1342 let typ = &self.typ.column_types[i];
1343 if d == &Datum::Null && !typ.nullable {
1344 Err(NotNullViolation(name.clone()))
1345 } else {
1346 Ok(())
1347 }
1348 }
1349
1350 pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1365 assert_eq!(self.metadata.len(), self.typ.columns().len());
1366 assert_eq!(other.metadata.len(), other.typ.columns().len());
1367 for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1368 assert_eq!(meta.typ_idx, idx.0);
1369 assert_eq!(meta.added, RelationVersion::root());
1370 assert_none!(meta.dropped);
1371 }
1372
1373 let mut column_diffs = BTreeMap::new();
1374 let mut key_diff = None;
1375
1376 let left_arity = self.arity();
1377 let right_arity = other.arity();
1378 let common_arity = std::cmp::min(left_arity, right_arity);
1379
1380 for idx in 0..common_arity {
1381 let left_name = self.get_name(idx);
1382 let right_name = other.get_name(idx);
1383 let left_type = &self.typ.column_types[idx];
1384 let right_type = &other.typ.column_types[idx];
1385
1386 if left_name != right_name {
1387 let diff = ColumnDiff::NameMismatch {
1388 left: left_name.clone(),
1389 right: right_name.clone(),
1390 };
1391 column_diffs.insert(idx, diff);
1392 } else if left_type.scalar_type != right_type.scalar_type {
1393 let diff = ColumnDiff::TypeMismatch {
1394 name: left_name.clone(),
1395 left: left_type.scalar_type.clone(),
1396 right: right_type.scalar_type.clone(),
1397 };
1398 column_diffs.insert(idx, diff);
1399 } else if left_type.nullable != right_type.nullable {
1400 let diff = ColumnDiff::NullabilityMismatch {
1401 name: left_name.clone(),
1402 left: left_type.nullable,
1403 right: right_type.nullable,
1404 };
1405 column_diffs.insert(idx, diff);
1406 }
1407 }
1408
1409 for idx in common_arity..left_arity {
1410 let diff = ColumnDiff::Missing {
1411 name: self.get_name(idx).clone(),
1412 };
1413 column_diffs.insert(idx, diff);
1414 }
1415
1416 for idx in common_arity..right_arity {
1417 let diff = ColumnDiff::Extra {
1418 name: other.get_name(idx).clone(),
1419 };
1420 column_diffs.insert(idx, diff);
1421 }
1422
1423 let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1424 let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1425 if left_keys != right_keys {
1426 let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1427 keys.iter()
1428 .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1429 .collect()
1430 };
1431 key_diff = Some(KeyDiff {
1432 left: column_names(self, left_keys),
1433 right: column_names(other, right_keys),
1434 });
1435 }
1436
1437 RelationDescDiff {
1438 column_diffs,
1439 key_diff,
1440 }
1441 }
1442
1443 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1445 let mut new_desc = self.clone();
1446
1447 let mut removed = 0;
1449 new_desc.metadata.retain(|idx, metadata| {
1450 let retain = demands.contains(&idx.0);
1451 if !retain {
1452 removed += 1;
1453 } else {
1454 metadata.typ_idx -= removed;
1455 }
1456 retain
1457 });
1458
1459 let mut idx = 0;
1461 new_desc.typ.column_types.retain(|_| {
1462 let keep = demands.contains(&idx);
1463 idx += 1;
1464 keep
1465 });
1466
1467 new_desc
1468 }
1469}
1470
1471#[cfg(any(test, feature = "proptest"))]
1472impl Arbitrary for RelationDesc {
1473 type Parameters = ();
1474 type Strategy = BoxedStrategy<RelationDesc>;
1475
1476 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1477 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1478 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1479 weights.extend([
1480 (12, Just(16..32)),
1481 (6, Just(32..64)),
1482 (3, Just(64..128)),
1483 (1, Just(128..256)),
1484 ]);
1485 }
1486 let num_columns = Union::new_weighted(weights);
1487
1488 num_columns.prop_flat_map(arb_relation_desc).boxed()
1489 }
1490}
1491
1492#[cfg(any(test, feature = "proptest"))]
1495pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1496 proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1497 .prop_map(RelationDesc::from_names_and_types)
1498}
1499
1500#[cfg(any(test, feature = "proptest"))]
1502pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1503 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1504 mask.prop_map(move |mask| {
1505 let demands: BTreeSet<_> = mask
1506 .into_iter()
1507 .enumerate()
1508 .filter_map(|(idx, keep)| keep.then_some(idx))
1509 .collect();
1510 desc.apply_demand(&demands)
1511 })
1512}
1513
1514impl IntoIterator for RelationDesc {
1515 type Item = (ColumnName, SqlColumnType);
1516 type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1517
1518 fn into_iter(self) -> Self::IntoIter {
1519 let iter = self
1520 .metadata
1521 .into_values()
1522 .zip_eq(self.typ.column_types)
1523 .map(|(meta, typ)| (meta.name, typ));
1524 Box::new(iter)
1525 }
1526}
1527
1528#[cfg(any(test, feature = "proptest"))]
1530pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1531 let datums: Vec<_> = desc
1532 .typ()
1533 .columns()
1534 .iter()
1535 .cloned()
1536 .map(arb_datum_for_column)
1537 .collect();
1538 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1539}
1540
1541#[derive(Debug, PartialEq, Eq)]
1543pub struct NotNullViolation(pub ColumnName);
1544
1545impl fmt::Display for NotNullViolation {
1546 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1547 write!(
1548 f,
1549 "null value in column {} violates not-null constraint",
1550 self.0.quoted()
1551 )
1552 }
1553}
1554
1555#[derive(Debug, Clone, PartialEq, Eq)]
1557pub struct RelationDescDiff {
1558 pub column_diffs: BTreeMap<usize, ColumnDiff>,
1560 pub key_diff: Option<KeyDiff>,
1562}
1563
1564impl RelationDescDiff {
1565 pub fn is_empty(&self) -> bool {
1567 self.column_diffs.is_empty() && self.key_diff.is_none()
1568 }
1569}
1570
1571#[derive(Debug, Clone, PartialEq, Eq)]
1573pub enum ColumnDiff {
1574 Missing { name: ColumnName },
1576 Extra { name: ColumnName },
1578 TypeMismatch {
1580 name: ColumnName,
1581 left: SqlScalarType,
1582 right: SqlScalarType,
1583 },
1584 NullabilityMismatch {
1586 name: ColumnName,
1587 left: bool,
1588 right: bool,
1589 },
1590 NameMismatch { left: ColumnName, right: ColumnName },
1592}
1593
1594#[derive(Debug, Clone, PartialEq, Eq)]
1596pub struct KeyDiff {
1597 pub left: BTreeSet<Vec<ColumnName>>,
1599 pub right: BTreeSet<Vec<ColumnName>>,
1601}
1602
1603#[derive(Clone, Default, Debug, PartialEq, Eq)]
1605pub struct RelationDescBuilder {
1606 columns: Vec<(ColumnName, SqlColumnType)>,
1608 keys: Vec<Vec<usize>>,
1610}
1611
1612impl RelationDescBuilder {
1613 pub fn with_column<N: Into<ColumnName>>(
1615 mut self,
1616 name: N,
1617 ty: SqlColumnType,
1618 ) -> RelationDescBuilder {
1619 let name = name.into();
1620 self.columns.push((name, ty));
1621 self
1622 }
1623
1624 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1626 where
1627 I: IntoIterator<Item = (N, T)>,
1628 T: Into<SqlColumnType>,
1629 N: Into<ColumnName>,
1630 {
1631 self.columns
1632 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1633 self
1634 }
1635
1636 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1638 indices.sort_unstable();
1639 if !self.keys.contains(&indices) {
1640 self.keys.push(indices);
1641 }
1642 self
1643 }
1644
1645 pub fn without_keys(mut self) -> RelationDescBuilder {
1647 self.keys.clear();
1648 assert_eq!(self.keys.len(), 0);
1649 self
1650 }
1651
1652 pub fn concat(mut self, other: Self) -> Self {
1654 let self_len = self.columns.len();
1655
1656 self.columns.extend(other.columns);
1657 for k in other.keys {
1658 let k = k.into_iter().map(|idx| idx + self_len).collect();
1659 self = self.with_key(k);
1660 }
1661
1662 self
1663 }
1664
1665 pub fn finish(self) -> RelationDesc {
1667 let mut desc = RelationDesc::from_names_and_types(self.columns);
1668 desc.typ = desc.typ.with_keys(self.keys);
1669 desc
1670 }
1671}
1672
1673#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1675pub enum RelationVersionSelector {
1676 Specific(RelationVersion),
1677 Latest,
1678}
1679
1680impl RelationVersionSelector {
1681 pub fn specific(version: u64) -> Self {
1682 RelationVersionSelector::Specific(RelationVersion(version))
1683 }
1684}
1685
1686#[derive(Debug, Clone, Serialize)]
1692pub struct VersionedRelationDesc {
1693 inner: RelationDesc,
1694}
1695
1696impl VersionedRelationDesc {
1697 pub fn new(inner: RelationDesc) -> Self {
1698 VersionedRelationDesc { inner }
1699 }
1700
1701 #[must_use]
1709 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1710 where
1711 N: Into<ColumnName>,
1712 T: Into<SqlColumnType>,
1713 {
1714 let latest_version = self.latest_version();
1715 let new_version = latest_version.bump();
1716
1717 let name = name.into();
1718 let existing = self
1719 .inner
1720 .metadata
1721 .iter()
1722 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1723 if let Some(existing) = existing {
1724 panic!("column named '{name}' already exists! {existing:?}");
1725 }
1726
1727 let next_idx = self.inner.metadata.len();
1728 let col_meta = ColumnMetadata {
1729 name,
1730 typ_idx: next_idx,
1731 added: new_version,
1732 dropped: None,
1733 };
1734
1735 self.inner.typ.column_types.push(typ.into());
1736 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1737
1738 assert_none!(prev, "column index overlap!");
1739 self.validate();
1740
1741 new_version
1742 }
1743
1744 #[must_use]
1753 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1754 where
1755 N: Into<ColumnName>,
1756 {
1757 let name = name.into();
1758 let latest_version = self.latest_version();
1759 let new_version = latest_version.bump();
1760
1761 let col = self
1762 .inner
1763 .metadata
1764 .values_mut()
1765 .find(|meta| meta.name == name && meta.dropped.is_none())
1766 .expect("column to exist");
1767
1768 assert_none!(col.dropped, "column was already dropped");
1770 col.dropped = Some(new_version);
1771
1772 let dropped_key = self
1774 .inner
1775 .typ
1776 .keys
1777 .iter()
1778 .any(|keys| keys.contains(&col.typ_idx));
1779 assert!(!dropped_key, "column being dropped was used as a key");
1780
1781 self.validate();
1782 new_version
1783 }
1784
1785 pub fn latest(&self) -> RelationDesc {
1787 self.inner.clone()
1788 }
1789
1790 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1792 let up_to_version = match version {
1794 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1795 RelationVersionSelector::Specific(v) => v,
1796 };
1797
1798 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1799 let added = meta.added <= up_to_version;
1800 let dropped = meta
1801 .dropped
1802 .map(|dropped_at| up_to_version >= dropped_at)
1803 .unwrap_or(false);
1804
1805 added && !dropped
1806 });
1807
1808 let mut column_types = Vec::new();
1809 let mut column_metas = BTreeMap::new();
1810
1811 for (col_idx, meta) in valid_columns {
1818 let new_meta = ColumnMetadata {
1819 name: meta.name.clone(),
1820 typ_idx: column_types.len(),
1821 added: meta.added.clone(),
1822 dropped: meta.dropped.clone(),
1823 };
1824 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1825 column_metas.insert(*col_idx, new_meta);
1826 }
1827
1828 let keys = self
1834 .inner
1835 .typ
1836 .keys
1837 .iter()
1838 .map(|keys| {
1839 keys.iter()
1840 .map(|key_idx| {
1841 let metadata = column_metas
1842 .get(&ColumnIndex(*key_idx))
1843 .expect("found key for column that doesn't exist");
1844 metadata.typ_idx
1845 })
1846 .collect()
1847 })
1848 .collect();
1849
1850 let relation_type = SqlRelationType { column_types, keys };
1851
1852 RelationDesc {
1853 typ: relation_type,
1854 metadata: column_metas,
1855 }
1856 }
1857
1858 pub fn latest_version(&self) -> RelationVersion {
1859 self.inner
1860 .metadata
1861 .values()
1862 .map(|meta| meta.dropped.unwrap_or(meta.added))
1864 .max()
1865 .unwrap_or_else(RelationVersion::root)
1867 }
1868
1869 fn validate(&self) {
1875 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1876 if desc.typ.column_types.len() != desc.metadata.len() {
1877 anyhow::bail!("mismatch between number of types and metadatas");
1878 }
1879
1880 for (col_idx, meta) in &desc.metadata {
1881 if col_idx.0 > desc.metadata.len() {
1882 anyhow::bail!("column index out of bounds");
1883 }
1884 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1885 anyhow::bail!("column was added after it was dropped?");
1886 }
1887 if desc.typ().columns().get(meta.typ_idx).is_none() {
1888 anyhow::bail!("typ_idx incorrect");
1889 }
1890 }
1891
1892 for keys in &desc.typ.keys {
1893 for key in keys {
1894 if *key >= desc.typ.column_types.len() {
1895 anyhow::bail!("key index was out of bounds!");
1896 }
1897 }
1898 }
1899
1900 let versions = desc
1901 .metadata
1902 .values()
1903 .map(|meta| meta.dropped.unwrap_or(meta.added));
1904 let mut max = 0;
1905 let mut sum = 0;
1906 for version in versions {
1907 max = std::cmp::max(max, version.0);
1908 sum += version.0;
1909 }
1910
1911 if sum != (max * (max + 1) / 2) {
1921 anyhow::bail!("there is a duplicate or missing relation version");
1922 }
1923
1924 Ok(())
1925 }
1926
1927 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1928 }
1929}
1930
1931#[derive(Debug)]
1934#[cfg(any(test, feature = "proptest"))]
1935pub enum PropRelationDescDiff {
1936 AddColumn {
1937 name: ColumnName,
1938 typ: SqlColumnType,
1939 },
1940 DropColumn {
1941 name: ColumnName,
1942 },
1943 ToggleNullability {
1944 name: ColumnName,
1945 },
1946 ChangeType {
1947 name: ColumnName,
1948 typ: SqlColumnType,
1949 },
1950}
1951
1952#[cfg(any(test, feature = "proptest"))]
1953impl PropRelationDescDiff {
1954 pub fn apply(self, desc: &mut RelationDesc) {
1955 match self {
1956 PropRelationDescDiff::AddColumn { name, typ } => {
1957 let new_idx = desc.metadata.len();
1958 let meta = ColumnMetadata {
1959 name,
1960 typ_idx: new_idx,
1961 added: RelationVersion(0),
1962 dropped: None,
1963 };
1964 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1965 desc.typ.column_types.push(typ);
1966
1967 assert_none!(prev);
1968 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1969 }
1970 PropRelationDescDiff::DropColumn { name } => {
1971 let next_version = desc
1972 .metadata
1973 .values()
1974 .map(|meta| meta.dropped.unwrap_or(meta.added))
1975 .max()
1976 .unwrap_or_else(RelationVersion::root)
1977 .bump();
1978 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1979 else {
1980 return;
1981 };
1982 if metadata.dropped.is_none() {
1983 metadata.dropped = Some(next_version);
1984 }
1985 }
1986 PropRelationDescDiff::ToggleNullability { name } => {
1987 let Some((pos, _)) = desc.get_by_name(&name) else {
1988 return;
1989 };
1990 let col_type = desc
1991 .typ
1992 .column_types
1993 .get_mut(pos)
1994 .expect("ColumnNames and SqlColumnTypes out of sync!");
1995 col_type.nullable = !col_type.nullable;
1996 }
1997 PropRelationDescDiff::ChangeType { name, typ } => {
1998 let Some((pos, _)) = desc.get_by_name(&name) else {
1999 return;
2000 };
2001 let col_type = desc
2002 .typ
2003 .column_types
2004 .get_mut(pos)
2005 .expect("ColumnNames and SqlColumnTypes out of sync!");
2006 *col_type = typ;
2007 }
2008 }
2009 }
2010}
2011
2012#[cfg(any(test, feature = "proptest"))]
2014pub fn arb_relation_desc_diff(
2015 source: &RelationDesc,
2016) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
2017 let source = Rc::new(source.clone());
2018 let num_source_columns = source.typ.columns().len();
2019
2020 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
2021 let add_columns_strat = num_add_columns
2022 .prop_flat_map(|num_columns| {
2023 proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
2024 })
2025 .prop_map(|cols| {
2026 cols.into_iter()
2027 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
2028 .collect::<Vec<_>>()
2029 });
2030
2031 if num_source_columns == 0 {
2033 return add_columns_strat.boxed();
2034 }
2035
2036 let source_ = Rc::clone(&source);
2037 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2038 let mut set = BTreeSet::default();
2039 for _ in 0..num_columns {
2040 let col_idx = rng.random_range(0..num_source_columns);
2041 set.insert(source_.get_name(col_idx).clone());
2042 }
2043 set.into_iter()
2044 .map(|name| PropRelationDescDiff::DropColumn { name })
2045 .collect::<Vec<_>>()
2046 });
2047
2048 let source_ = Rc::clone(&source);
2049 let toggle_nullability_strat =
2050 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
2051 let mut set = BTreeSet::default();
2052 for _ in 0..num_columns {
2053 let col_idx = rng.random_range(0..num_source_columns);
2054 set.insert(source_.get_name(col_idx).clone());
2055 }
2056 set.into_iter()
2057 .map(|name| PropRelationDescDiff::ToggleNullability { name })
2058 .collect::<Vec<_>>()
2059 });
2060
2061 let source_ = Rc::clone(&source);
2062 let change_type_strat = (0..num_source_columns)
2063 .prop_perturb(move |num_columns, mut rng| {
2064 let mut set = BTreeSet::default();
2065 for _ in 0..num_columns {
2066 let col_idx = rng.random_range(0..num_source_columns);
2067 set.insert(source_.get_name(col_idx).clone());
2068 }
2069 set
2070 })
2071 .prop_flat_map(|cols| {
2072 proptest::collection::vec(any::<SqlColumnType>(), cols.len())
2073 .prop_map(move |types| (cols.clone(), types))
2074 })
2075 .prop_map(|(cols, types)| {
2076 cols.into_iter()
2077 .zip_eq(types)
2078 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
2079 .collect::<Vec<_>>()
2080 });
2081
2082 (
2083 add_columns_strat,
2084 drop_columns_strat,
2085 toggle_nullability_strat,
2086 change_type_strat,
2087 )
2088 .prop_map(|(adds, drops, toggles, changes)| {
2089 adds.into_iter()
2090 .chain(drops)
2091 .chain(toggles)
2092 .chain(changes)
2093 .collect::<Vec<_>>()
2094 })
2095 .prop_shuffle()
2096 .boxed()
2097}
2098
2099#[cfg(test)]
2100mod tests {
2101 use super::*;
2102 use prost::Message;
2103
2104 #[mz_ore::test]
2105 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
2107 let desc = RelationDesc::builder()
2108 .with_column("a", SqlScalarType::Bool.nullable(true))
2109 .with_column("z", SqlScalarType::String.nullable(false))
2110 .finish();
2111
2112 let mut versioned_desc = VersionedRelationDesc {
2113 inner: desc.clone(),
2114 };
2115 versioned_desc.validate();
2116
2117 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
2118 assert_eq!(desc, latest);
2119
2120 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2121 assert_eq!(desc, v0);
2122
2123 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
2124 assert_eq!(desc, v3);
2125
2126 let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
2127 assert_eq!(v1, RelationVersion(1));
2128
2129 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2130 insta::assert_json_snapshot!(v1.metadata, @r###"
2131 {
2132 "0": {
2133 "name": "a",
2134 "typ_idx": 0,
2135 "added": 0,
2136 "dropped": null
2137 },
2138 "1": {
2139 "name": "z",
2140 "typ_idx": 1,
2141 "added": 0,
2142 "dropped": null
2143 },
2144 "2": {
2145 "name": "b",
2146 "typ_idx": 2,
2147 "added": 1,
2148 "dropped": null
2149 }
2150 }
2151 "###);
2152
2153 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
2155 assert!(v0.iter().eq(v0_b.iter()));
2156
2157 let v2 = versioned_desc.drop_column("z");
2158 assert_eq!(v2, RelationVersion(2));
2159
2160 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
2161 insta::assert_json_snapshot!(v2.metadata, @r###"
2162 {
2163 "0": {
2164 "name": "a",
2165 "typ_idx": 0,
2166 "added": 0,
2167 "dropped": null
2168 },
2169 "2": {
2170 "name": "b",
2171 "typ_idx": 1,
2172 "added": 1,
2173 "dropped": null
2174 }
2175 }
2176 "###);
2177
2178 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
2180 assert!(v0.iter().eq(v0_c.iter()));
2181
2182 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
2183 assert!(v1.iter().eq(v1_b.iter()));
2184
2185 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
2186 {
2187 "0": {
2188 "name": "a",
2189 "typ_idx": 0,
2190 "added": 0,
2191 "dropped": null
2192 },
2193 "1": {
2194 "name": "z",
2195 "typ_idx": 1,
2196 "added": 0,
2197 "dropped": 2
2198 },
2199 "2": {
2200 "name": "b",
2201 "typ_idx": 2,
2202 "added": 1,
2203 "dropped": null
2204 }
2205 }
2206 "###);
2207 }
2208
2209 #[mz_ore::test]
2210 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
2212 let desc = RelationDesc::builder()
2213 .with_column("a", SqlScalarType::Bool.nullable(true))
2214 .with_column("z", SqlScalarType::String.nullable(false))
2215 .with_key(vec![1])
2216 .finish();
2217
2218 let mut versioned_desc = VersionedRelationDesc {
2219 inner: desc.clone(),
2220 };
2221 versioned_desc.validate();
2222
2223 let v1 = versioned_desc.drop_column("a");
2224 assert_eq!(v1, RelationVersion(1));
2225
2226 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
2228 insta::assert_json_snapshot!(v1, @r###"
2229 {
2230 "typ": {
2231 "column_types": [
2232 {
2233 "scalar_type": "String",
2234 "nullable": false
2235 }
2236 ],
2237 "keys": [
2238 [
2239 0
2240 ]
2241 ]
2242 },
2243 "metadata": {
2244 "1": {
2245 "name": "z",
2246 "typ_idx": 0,
2247 "added": 0,
2248 "dropped": null
2249 }
2250 }
2251 }
2252 "###);
2253
2254 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
2256 insta::assert_json_snapshot!(v0, @r###"
2257 {
2258 "typ": {
2259 "column_types": [
2260 {
2261 "scalar_type": "Bool",
2262 "nullable": true
2263 },
2264 {
2265 "scalar_type": "String",
2266 "nullable": false
2267 }
2268 ],
2269 "keys": [
2270 [
2271 1
2272 ]
2273 ]
2274 },
2275 "metadata": {
2276 "0": {
2277 "name": "a",
2278 "typ_idx": 0,
2279 "added": 0,
2280 "dropped": 1
2281 },
2282 "1": {
2283 "name": "z",
2284 "typ_idx": 1,
2285 "added": 0,
2286 "dropped": null
2287 }
2288 }
2289 }
2290 "###);
2291 }
2292
2293 #[mz_ore::test]
2294 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
2296 let typ = ProtoRelationType {
2297 column_types: vec![
2298 SqlScalarType::String.nullable(false).into_proto(),
2299 SqlScalarType::Bool.nullable(true).into_proto(),
2300 ],
2301 keys: vec![],
2302 };
2303 let proto = ProtoRelationDesc {
2304 typ: Some(typ),
2305 names: vec![
2306 ColumnName("a".into()).into_proto(),
2307 ColumnName("b".into()).into_proto(),
2308 ],
2309 metadata: vec![],
2310 };
2311 let desc: RelationDesc = proto.into_rust().unwrap();
2312
2313 insta::assert_json_snapshot!(desc, @r###"
2314 {
2315 "typ": {
2316 "column_types": [
2317 {
2318 "scalar_type": "String",
2319 "nullable": false
2320 },
2321 {
2322 "scalar_type": "Bool",
2323 "nullable": true
2324 }
2325 ],
2326 "keys": []
2327 },
2328 "metadata": {
2329 "0": {
2330 "name": "a",
2331 "typ_idx": 0,
2332 "added": 0,
2333 "dropped": null
2334 },
2335 "1": {
2336 "name": "b",
2337 "typ_idx": 1,
2338 "added": 0,
2339 "dropped": null
2340 }
2341 }
2342 }
2343 "###);
2344 }
2345
2346 #[mz_ore::test]
2347 #[should_panic(expected = "column named 'a' already exists!")]
2348 fn test_add_column_with_same_name_panics() {
2349 let desc = RelationDesc::builder()
2350 .with_column("a", SqlScalarType::Bool.nullable(true))
2351 .finish();
2352 let mut versioned = VersionedRelationDesc::new(desc);
2353
2354 let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2355 }
2356
2357 #[mz_ore::test]
2358 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
2360 let desc = RelationDesc::builder()
2361 .with_column("a", SqlScalarType::Bool.nullable(true))
2362 .finish();
2363 let mut versioned = VersionedRelationDesc::new(desc);
2364
2365 let v1 = versioned.drop_column("a");
2366 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2367 insta::assert_json_snapshot!(v1, @r###"
2368 {
2369 "typ": {
2370 "column_types": [],
2371 "keys": []
2372 },
2373 "metadata": {}
2374 }
2375 "###);
2376
2377 let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2378 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2379 insta::assert_json_snapshot!(v2, @r###"
2380 {
2381 "typ": {
2382 "column_types": [
2383 {
2384 "scalar_type": "String",
2385 "nullable": false
2386 }
2387 ],
2388 "keys": []
2389 },
2390 "metadata": {
2391 "1": {
2392 "name": "a",
2393 "typ_idx": 0,
2394 "added": 2,
2395 "dropped": null
2396 }
2397 }
2398 }
2399 "###);
2400 }
2401
2402 #[mz_ore::test]
2403 #[cfg_attr(miri, ignore)]
2404 fn apply_demand() {
2405 let desc = RelationDesc::builder()
2406 .with_column("a", SqlScalarType::String.nullable(true))
2407 .with_column("b", SqlScalarType::Int64.nullable(false))
2408 .with_column("c", SqlScalarType::Time.nullable(false))
2409 .finish();
2410 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2411 assert_eq!(desc.arity(), 2);
2412 VersionedRelationDesc::new(desc).validate();
2414 }
2415
2416 #[mz_ore::test]
2417 #[cfg_attr(miri, ignore)]
2418 fn smoketest_column_index_stable_ident() {
2419 let idx_a = ColumnIndex(42);
2420 assert_eq!(idx_a.to_stable_name(), "42");
2422 }
2423
2424 #[mz_ore::test]
2425 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
2427 fn testcase(og: RelationDesc) {
2428 let bytes = og.into_proto().encode_to_vec();
2429 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2430 let rnd = RelationDesc::from_proto(proto).unwrap();
2431
2432 assert_eq!(og, rnd);
2433 }
2434
2435 proptest!(|(desc in any::<RelationDesc>())| {
2436 testcase(desc);
2437 });
2438
2439 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2440 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2441 });
2442
2443 proptest!(|((mut desc, diffs) in strat)| {
2444 for diff in diffs {
2445 diff.apply(&mut desc);
2446 };
2447 testcase(desc);
2448 });
2449 }
2450}