1use std::collections::{BTreeMap, BTreeSet};
11use std::rc::Rc;
12use std::{fmt, vec};
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_lowertest::MzReflect;
17use mz_ore::cast::CastFrom;
18use mz_ore::str::StrExt;
19use mz_ore::{assert_none, assert_ok};
20use mz_persist_types::schema::SchemaId;
21use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
22use proptest::prelude::*;
23use proptest::strategy::{Strategy, Union};
24use proptest_derive::Arbitrary;
25use serde::{Deserialize, Serialize};
26
27use crate::relation_and_scalar::proto_relation_type::ProtoKey;
28pub use crate::relation_and_scalar::{
29 ProtoColumnMetadata, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
30 ProtoRelationVersion,
31};
32use crate::{Datum, ReprScalarType, Row, SqlScalarType, arb_datum_for_column};
33
34#[derive(
42 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
43)]
44pub struct SqlColumnType {
45 pub scalar_type: SqlScalarType,
47 #[serde(default = "return_true")]
49 pub nullable: bool,
50}
51
52#[inline(always)]
58fn return_true() -> bool {
59 true
60}
61
62impl SqlColumnType {
63 pub fn backport_nullability(&mut self, backport_typ: &SqlColumnType) {
64 self.scalar_type
65 .backport_nullability(&backport_typ.scalar_type);
66 self.nullable = backport_typ.nullable;
67 }
68
69 pub fn union(&self, other: &Self) -> Result<Self, anyhow::Error> {
70 match (&self.scalar_type, &other.scalar_type) {
71 (scalar_type, other_scalar_type) if scalar_type == other_scalar_type => {
72 Ok(SqlColumnType {
73 scalar_type: scalar_type.clone(),
74 nullable: self.nullable || other.nullable,
75 })
76 }
77 (scalar_type, other_scalar_type) if scalar_type.base_eq(other_scalar_type) => {
78 Ok(SqlColumnType {
79 scalar_type: scalar_type.without_modifiers(),
80 nullable: self.nullable || other.nullable,
81 })
82 }
83 (
84 SqlScalarType::Record { fields, custom_id },
85 SqlScalarType::Record {
86 fields: other_fields,
87 custom_id: other_custom_id,
88 },
89 ) => {
90 if custom_id != other_custom_id {
91 bail!(
92 "Can't union types: {:?} and {:?}",
93 self.scalar_type,
94 other.scalar_type
95 );
96 };
97
98 if fields.len() != other_fields.len() {
99 bail!(
100 "Can't union types: {:?} and {:?}",
101 self.scalar_type,
102 other.scalar_type
103 );
104 }
105 let mut union_fields = Vec::with_capacity(fields.len());
106 for ((name, typ), (other_name, other_typ)) in
107 fields.iter().zip_eq(other_fields.iter())
108 {
109 if name != other_name {
110 bail!(
111 "Can't union types: {:?} and {:?}",
112 self.scalar_type,
113 other.scalar_type
114 );
115 } else {
116 let union_column_type = typ.union(other_typ)?;
117 union_fields.push((name.clone(), union_column_type));
118 };
119 }
120
121 Ok(SqlColumnType {
122 scalar_type: SqlScalarType::Record {
123 fields: union_fields.into(),
124 custom_id: *custom_id,
125 },
126 nullable: self.nullable || other.nullable,
127 })
128 }
129 _ => bail!(
130 "Can't union types: {:?} and {:?}",
131 self.scalar_type,
132 other.scalar_type
133 ),
134 }
135 }
136
137 pub fn nullable(mut self, nullable: bool) -> Self {
140 self.nullable = nullable;
141 self
142 }
143}
144
145impl RustType<ProtoColumnType> for SqlColumnType {
146 fn into_proto(&self) -> ProtoColumnType {
147 ProtoColumnType {
148 nullable: self.nullable,
149 scalar_type: Some(self.scalar_type.into_proto()),
150 }
151 }
152
153 fn from_proto(proto: ProtoColumnType) -> Result<Self, TryFromProtoError> {
154 Ok(SqlColumnType {
155 nullable: proto.nullable,
156 scalar_type: proto
157 .scalar_type
158 .into_rust_if_some("ProtoColumnType::scalar_type")?,
159 })
160 }
161}
162
163impl fmt::Display for SqlColumnType {
164 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165 let nullable = if self.nullable { "Null" } else { "NotNull" };
166 f.write_fmt(format_args!("{:?}:{}", self.scalar_type, nullable))
167 }
168}
169
170#[derive(
172 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
173)]
174pub struct SqlRelationType {
175 pub column_types: Vec<SqlColumnType>,
177 #[serde(default)]
187 pub keys: Vec<Vec<usize>>,
188}
189
190impl SqlRelationType {
191 pub fn empty() -> Self {
194 SqlRelationType::new(vec![])
195 }
196
197 pub fn new(column_types: Vec<SqlColumnType>) -> Self {
201 SqlRelationType {
202 column_types,
203 keys: Vec::new(),
204 }
205 }
206
207 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
209 indices.sort_unstable();
210 if !self.keys.contains(&indices) {
211 self.keys.push(indices);
212 }
213 self
214 }
215
216 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
217 for key in keys {
218 self = self.with_key(key)
219 }
220 self
221 }
222
223 pub fn arity(&self) -> usize {
225 self.column_types.len()
226 }
227
228 pub fn default_key(&self) -> Vec<usize> {
230 if let Some(key) = self.keys.first() {
231 if key.is_empty() {
232 (0..self.column_types.len()).collect()
233 } else {
234 key.clone()
235 }
236 } else {
237 (0..self.column_types.len()).collect()
238 }
239 }
240
241 pub fn columns(&self) -> &[SqlColumnType] {
243 &self.column_types
244 }
245
246 pub fn backport_nullability_and_keys(&mut self, backport_typ: &SqlRelationType) {
250 assert_eq!(
251 backport_typ.column_types.len(),
252 self.column_types.len(),
253 "HIR and MIR types should have the same number of columns"
254 );
255 for (backport_col, sql_col) in backport_typ
256 .column_types
257 .iter()
258 .zip_eq(self.column_types.iter_mut())
259 {
260 sql_col.backport_nullability(backport_col);
261 }
262
263 self.keys = backport_typ.keys.clone();
264 }
265}
266
267impl RustType<ProtoRelationType> for SqlRelationType {
268 fn into_proto(&self) -> ProtoRelationType {
269 ProtoRelationType {
270 column_types: self.column_types.into_proto(),
271 keys: self.keys.into_proto(),
272 }
273 }
274
275 fn from_proto(proto: ProtoRelationType) -> Result<Self, TryFromProtoError> {
276 Ok(SqlRelationType {
277 column_types: proto.column_types.into_rust()?,
278 keys: proto.keys.into_rust()?,
279 })
280 }
281}
282
283impl RustType<ProtoKey> for Vec<usize> {
284 fn into_proto(&self) -> ProtoKey {
285 ProtoKey {
286 keys: self.into_proto(),
287 }
288 }
289
290 fn from_proto(proto: ProtoKey) -> Result<Self, TryFromProtoError> {
291 proto.keys.into_rust()
292 }
293}
294
295#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
297pub struct ReprRelationType {
298 pub column_types: Vec<ReprColumnType>,
300 #[serde(default)]
310 pub keys: Vec<Vec<usize>>,
311}
312
313impl ReprRelationType {
314 pub fn empty() -> Self {
317 ReprRelationType::new(vec![])
318 }
319
320 pub fn new(column_types: Vec<ReprColumnType>) -> Self {
324 ReprRelationType {
325 column_types,
326 keys: Vec::new(),
327 }
328 }
329
330 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
332 indices.sort_unstable();
333 if !self.keys.contains(&indices) {
334 self.keys.push(indices);
335 }
336 self
337 }
338
339 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
340 for key in keys {
341 self = self.with_key(key)
342 }
343 self
344 }
345
346 pub fn arity(&self) -> usize {
348 self.column_types.len()
349 }
350
351 pub fn default_key(&self) -> Vec<usize> {
353 if let Some(key) = self.keys.first() {
354 if key.is_empty() {
355 (0..self.column_types.len()).collect()
356 } else {
357 key.clone()
358 }
359 } else {
360 (0..self.column_types.len()).collect()
361 }
362 }
363
364 pub fn columns(&self) -> &[ReprColumnType] {
366 &self.column_types
367 }
368}
369
370impl From<&SqlRelationType> for ReprRelationType {
371 fn from(sql_relation_type: &SqlRelationType) -> Self {
372 ReprRelationType {
373 column_types: sql_relation_type
374 .column_types
375 .iter()
376 .map(ReprColumnType::from)
377 .collect(),
378 keys: sql_relation_type.keys.clone(),
379 }
380 }
381}
382
383#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
384pub struct ReprColumnType {
385 pub scalar_type: ReprScalarType,
387 #[serde(default = "return_true")]
389 pub nullable: bool,
390}
391
392impl ReprColumnType {
393 pub fn union(&self, col: &ReprColumnType) -> Result<Self, anyhow::Error> {
394 let scalar_type = self.scalar_type.union(&col.scalar_type)?;
395 let nullable = self.nullable || col.nullable;
396
397 Ok(ReprColumnType {
398 scalar_type,
399 nullable,
400 })
401 }
402}
403
404impl From<&SqlColumnType> for ReprColumnType {
405 fn from(sql_column_type: &SqlColumnType) -> Self {
406 let scalar_type = &sql_column_type.scalar_type;
407 let scalar_type = scalar_type.into();
408 let nullable = sql_column_type.nullable;
409
410 ReprColumnType {
411 scalar_type,
412 nullable,
413 }
414 }
415}
416
417impl SqlColumnType {
418 pub fn from_repr(repr: &ReprColumnType) -> Self {
422 let scalar_type = &repr.scalar_type;
423 let scalar_type = SqlScalarType::from_repr(scalar_type);
424 let nullable = repr.nullable;
425
426 SqlColumnType {
427 scalar_type,
428 nullable,
429 }
430 }
431}
432
433#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
435pub struct ColumnName(Box<str>);
436
437impl ColumnName {
438 #[inline(always)]
440 pub fn as_str(&self) -> &str {
441 &*self
442 }
443
444 pub fn as_mut_boxed_str(&mut self) -> &mut Box<str> {
446 &mut self.0
447 }
448
449 pub fn is_similar(&self, other: &ColumnName) -> bool {
451 const SIMILARITY_THRESHOLD: f64 = 0.6;
452
453 let a_lowercase = self.to_lowercase();
454 let b_lowercase = other.to_lowercase();
455
456 strsim::normalized_levenshtein(&a_lowercase, &b_lowercase) >= SIMILARITY_THRESHOLD
457 }
458}
459
460impl std::ops::Deref for ColumnName {
461 type Target = str;
462
463 #[inline(always)]
464 fn deref(&self) -> &Self::Target {
465 &self.0
466 }
467}
468
469impl fmt::Display for ColumnName {
470 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
471 f.write_str(&self.0)
472 }
473}
474
475impl From<String> for ColumnName {
476 fn from(s: String) -> ColumnName {
477 ColumnName(s.into())
478 }
479}
480
481impl From<&str> for ColumnName {
482 fn from(s: &str) -> ColumnName {
483 ColumnName(s.into())
484 }
485}
486
487impl From<&ColumnName> for ColumnName {
488 fn from(n: &ColumnName) -> ColumnName {
489 n.clone()
490 }
491}
492
493impl RustType<ProtoColumnName> for ColumnName {
494 fn into_proto(&self) -> ProtoColumnName {
495 ProtoColumnName {
496 value: Some(self.0.to_string()),
497 }
498 }
499
500 fn from_proto(proto: ProtoColumnName) -> Result<Self, TryFromProtoError> {
501 Ok(ColumnName(
502 proto
503 .value
504 .ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnName::value"))?
505 .into(),
506 ))
507 }
508}
509
510impl From<ColumnName> for mz_sql_parser::ast::Ident {
511 fn from(value: ColumnName) -> Self {
512 mz_sql_parser::ast::Ident::new_unchecked(value.0)
514 }
515}
516
517impl proptest::arbitrary::Arbitrary for ColumnName {
518 type Parameters = ();
519 type Strategy = BoxedStrategy<ColumnName>;
520
521 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
522 let mut weights = vec![(50, Just(1..8)), (20, Just(8..16))];
525 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
526 weights.extend([
527 (5, Just(16..128)),
528 (1, Just(128..1024)),
529 (1, Just(1024..4096)),
530 ]);
531 }
532 let name_length = Union::new_weighted(weights);
533
534 let char_strat = Rc::new(Union::new_weighted(vec![
537 (50, proptest::char::range('A', 'z').boxed()),
538 (1, any::<char>().boxed()),
539 ]));
540
541 name_length
542 .prop_flat_map(move |length| proptest::collection::vec(Rc::clone(&char_strat), length))
543 .prop_map(|chars| ColumnName(chars.into_iter().collect::<Box<str>>()))
544 .no_shrink()
545 .boxed()
546 }
547}
548
549pub const UNKNOWN_COLUMN_NAME: &str = "?column?";
551
552#[derive(
554 Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize, Hash, MzReflect,
555)]
556pub struct ColumnIndex(usize);
557
558static_assertions::assert_not_impl_all!(ColumnIndex: Arbitrary);
559
560impl ColumnIndex {
561 pub fn to_stable_name(&self) -> String {
563 self.0.to_string()
564 }
565
566 pub fn to_raw(&self) -> usize {
567 self.0
568 }
569
570 pub fn from_raw(val: usize) -> Self {
571 ColumnIndex(val)
572 }
573}
574
575#[derive(
577 Clone,
578 Copy,
579 Debug,
580 Eq,
581 PartialEq,
582 PartialOrd,
583 Ord,
584 Serialize,
585 Deserialize,
586 Hash,
587 MzReflect,
588 Arbitrary,
589)]
590pub struct RelationVersion(u64);
591
592impl RelationVersion {
593 pub fn root() -> Self {
595 RelationVersion(0)
596 }
597
598 pub fn bump(&self) -> Self {
600 let next_version = self
601 .0
602 .checked_add(1)
603 .expect("added more than u64::MAX columns?");
604 RelationVersion(next_version)
605 }
606
607 pub fn into_raw(self) -> u64 {
611 self.0
612 }
613
614 pub fn from_raw(val: u64) -> RelationVersion {
618 RelationVersion(val)
619 }
620}
621
622impl From<RelationVersion> for SchemaId {
623 fn from(value: RelationVersion) -> Self {
624 SchemaId(usize::cast_from(value.0))
625 }
626}
627
628impl From<mz_sql_parser::ast::Version> for RelationVersion {
629 fn from(value: mz_sql_parser::ast::Version) -> Self {
630 RelationVersion(value.into_inner())
631 }
632}
633
634impl fmt::Display for RelationVersion {
635 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
636 write!(f, "v{}", self.0)
637 }
638}
639
640impl From<RelationVersion> for mz_sql_parser::ast::Version {
641 fn from(value: RelationVersion) -> Self {
642 mz_sql_parser::ast::Version::new(value.0)
643 }
644}
645
646impl RustType<ProtoRelationVersion> for RelationVersion {
647 fn into_proto(&self) -> ProtoRelationVersion {
648 ProtoRelationVersion { value: self.0 }
649 }
650
651 fn from_proto(proto: ProtoRelationVersion) -> Result<Self, TryFromProtoError> {
652 Ok(RelationVersion(proto.value))
653 }
654}
655
656#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
658struct ColumnMetadata {
659 name: ColumnName,
661 typ_idx: usize,
663 added: RelationVersion,
665 dropped: Option<RelationVersion>,
667}
668
669#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, MzReflect)]
737pub struct RelationDesc {
738 typ: SqlRelationType,
739 metadata: BTreeMap<ColumnIndex, ColumnMetadata>,
740}
741
742impl RustType<ProtoRelationDesc> for RelationDesc {
743 fn into_proto(&self) -> ProtoRelationDesc {
744 let (names, metadata): (Vec<_>, Vec<_>) = self
745 .metadata
746 .values()
747 .map(|meta| {
748 let metadata = ProtoColumnMetadata {
749 added: Some(meta.added.into_proto()),
750 dropped: meta.dropped.map(|v| v.into_proto()),
751 };
752 (meta.name.into_proto(), metadata)
753 })
754 .unzip();
755
756 let is_all_default_metadata = metadata.iter().all(|meta| {
762 meta.added == Some(RelationVersion::root().into_proto()) && meta.dropped == None
763 });
764 let metadata = if is_all_default_metadata {
765 Vec::new()
766 } else {
767 metadata
768 };
769
770 ProtoRelationDesc {
771 typ: Some(self.typ.into_proto()),
772 names,
773 metadata,
774 }
775 }
776
777 fn from_proto(proto: ProtoRelationDesc) -> Result<Self, TryFromProtoError> {
778 let proto_metadata: Box<dyn Iterator<Item = _>> = if proto.metadata.is_empty() {
784 let val = ProtoColumnMetadata {
785 added: Some(RelationVersion::root().into_proto()),
786 dropped: None,
787 };
788 Box::new(itertools::repeat_n(val, proto.names.len()))
789 } else {
790 Box::new(proto.metadata.into_iter())
791 };
792
793 let metadata = proto
794 .names
795 .into_iter()
796 .zip_eq(proto_metadata)
797 .enumerate()
798 .map(|(idx, (name, metadata))| {
799 let meta = ColumnMetadata {
800 name: name.into_rust()?,
801 typ_idx: idx,
802 added: metadata.added.into_rust_if_some("ColumnMetadata::added")?,
803 dropped: metadata.dropped.into_rust()?,
804 };
805 Ok::<_, TryFromProtoError>((ColumnIndex(idx), meta))
806 })
807 .collect::<Result<_, _>>()?;
808
809 Ok(RelationDesc {
810 typ: proto.typ.into_rust_if_some("ProtoRelationDesc::typ")?,
811 metadata,
812 })
813 }
814}
815
816impl RelationDesc {
817 pub fn builder() -> RelationDescBuilder {
819 RelationDescBuilder::default()
820 }
821
822 pub fn empty() -> Self {
825 RelationDesc {
826 typ: SqlRelationType::empty(),
827 metadata: BTreeMap::default(),
828 }
829 }
830
831 pub fn is_empty(&self) -> bool {
833 self == &Self::empty()
834 }
835
836 pub fn len(&self) -> usize {
838 self.typ().column_types.len()
839 }
840
841 pub fn new<I, N>(typ: SqlRelationType, names: I) -> Self
849 where
850 I: IntoIterator<Item = N>,
851 N: Into<ColumnName>,
852 {
853 let metadata: BTreeMap<_, _> = names
854 .into_iter()
855 .enumerate()
856 .map(|(idx, name)| {
857 let col_idx = ColumnIndex(idx);
858 let metadata = ColumnMetadata {
859 name: name.into(),
860 typ_idx: idx,
861 added: RelationVersion::root(),
862 dropped: None,
863 };
864 (col_idx, metadata)
865 })
866 .collect();
867
868 assert_eq!(typ.column_types.len(), metadata.len());
870
871 RelationDesc { typ, metadata }
872 }
873
874 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
875 where
876 I: IntoIterator<Item = (N, T)>,
877 T: Into<SqlColumnType>,
878 N: Into<ColumnName>,
879 {
880 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
881 let types = types.into_iter().map(Into::into).collect();
882 let typ = SqlRelationType::new(types);
883 Self::new(typ, names)
884 }
885
886 pub fn concat(mut self, other: Self) -> Self {
896 let self_len = self.typ.column_types.len();
897
898 for (typ, (_col_idx, meta)) in other
899 .typ
900 .column_types
901 .into_iter()
902 .zip_eq(other.metadata.into_iter())
903 {
904 assert_eq!(meta.added, RelationVersion::root());
905 assert_none!(meta.dropped);
906
907 let new_idx = self.typ.columns().len();
908 let new_meta = ColumnMetadata {
909 name: meta.name,
910 typ_idx: new_idx,
911 added: RelationVersion::root(),
912 dropped: None,
913 };
914
915 self.typ.column_types.push(typ);
916 let prev = self.metadata.insert(ColumnIndex(new_idx), new_meta);
917
918 assert_eq!(self.metadata.len(), self.typ.columns().len());
919 assert_none!(prev);
920 }
921
922 for k in other.typ.keys {
923 let k = k.into_iter().map(|idx| idx + self_len).collect();
924 self = self.with_key(k);
925 }
926 self
927 }
928
929 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
931 self.typ = self.typ.with_key(indices);
932 self
933 }
934
935 pub fn without_keys(mut self) -> Self {
937 self.typ.keys.clear();
938 self
939 }
940
941 pub fn with_names<I, N>(self, names: I) -> Self
949 where
950 I: IntoIterator<Item = N>,
951 N: Into<ColumnName>,
952 {
953 Self::new(self.typ, names)
954 }
955
956 pub fn arity(&self) -> usize {
958 self.typ.arity()
959 }
960
961 pub fn typ(&self) -> &SqlRelationType {
963 &self.typ
964 }
965
966 pub fn into_typ(self) -> SqlRelationType {
968 self.typ
969 }
970
971 pub fn iter(&self) -> impl Iterator<Item = (&ColumnName, &SqlColumnType)> {
973 self.metadata.values().map(|meta| {
974 let typ = &self.typ.columns()[meta.typ_idx];
975 (&meta.name, typ)
976 })
977 }
978
979 pub fn iter_types(&self) -> impl Iterator<Item = &SqlColumnType> {
981 self.typ.column_types.iter()
982 }
983
984 pub fn iter_names(&self) -> impl Iterator<Item = &ColumnName> {
986 self.metadata.values().map(|meta| &meta.name)
987 }
988
989 pub fn iter_all(&self) -> impl Iterator<Item = (&ColumnIndex, &ColumnName, &SqlColumnType)> {
991 self.metadata.iter().map(|(col_idx, metadata)| {
992 let col_typ = &self.typ.columns()[metadata.typ_idx];
993 (col_idx, &metadata.name, col_typ)
994 })
995 }
996
997 pub fn iter_similar_names<'a>(
1000 &'a self,
1001 name: &'a ColumnName,
1002 ) -> impl Iterator<Item = &'a ColumnName> {
1003 self.iter_names().filter(|n| n.is_similar(name))
1004 }
1005
1006 pub fn contains_index(&self, idx: &ColumnIndex) -> bool {
1008 self.metadata.contains_key(idx)
1009 }
1010
1011 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &SqlColumnType)> {
1017 self.iter_names()
1018 .position(|n| n == name)
1019 .map(|i| (i, &self.typ.column_types[i]))
1020 }
1021
1022 pub fn get_name(&self, i: usize) -> &ColumnName {
1030 self.get_name_idx(&ColumnIndex(i))
1032 }
1033
1034 pub fn get_name_idx(&self, idx: &ColumnIndex) -> &ColumnName {
1040 &self.metadata.get(idx).expect("should exist").name
1041 }
1042
1043 pub fn get_name_mut(&mut self, i: usize) -> &mut ColumnName {
1049 &mut self
1051 .metadata
1052 .get_mut(&ColumnIndex(i))
1053 .expect("should exist")
1054 .name
1055 }
1056
1057 pub fn get_type(&self, idx: &ColumnIndex) -> &SqlColumnType {
1063 let typ_idx = self.metadata.get(idx).expect("should exist").typ_idx;
1064 &self.typ.column_types[typ_idx]
1065 }
1066
1067 pub fn get_unambiguous_name(&self, i: usize) -> Option<&ColumnName> {
1076 let name = self.get_name(i);
1077 if self.iter_names().filter(|n| *n == name).count() == 1 {
1078 Some(name)
1079 } else {
1080 None
1081 }
1082 }
1083
1084 pub fn constraints_met(&self, i: usize, d: &Datum) -> Result<(), NotNullViolation> {
1089 let name = self.get_name(i);
1090 let typ = &self.typ.column_types[i];
1091 if d == &Datum::Null && !typ.nullable {
1092 Err(NotNullViolation(name.clone()))
1093 } else {
1094 Ok(())
1095 }
1096 }
1097
1098 pub fn diff(&self, other: &RelationDesc) -> RelationDescDiff {
1113 assert_eq!(self.metadata.len(), self.typ.columns().len());
1114 assert_eq!(other.metadata.len(), other.typ.columns().len());
1115 for (idx, meta) in self.metadata.iter().chain(other.metadata.iter()) {
1116 assert_eq!(meta.typ_idx, idx.0);
1117 assert_eq!(meta.added, RelationVersion::root());
1118 assert_none!(meta.dropped);
1119 }
1120
1121 let mut column_diffs = BTreeMap::new();
1122 let mut key_diff = None;
1123
1124 let left_arity = self.arity();
1125 let right_arity = other.arity();
1126 let common_arity = std::cmp::min(left_arity, right_arity);
1127
1128 for idx in 0..common_arity {
1129 let left_name = self.get_name(idx);
1130 let right_name = other.get_name(idx);
1131 let left_type = &self.typ.column_types[idx];
1132 let right_type = &other.typ.column_types[idx];
1133
1134 if left_name != right_name {
1135 let diff = ColumnDiff::NameMismatch {
1136 left: left_name.clone(),
1137 right: right_name.clone(),
1138 };
1139 column_diffs.insert(idx, diff);
1140 } else if left_type.scalar_type != right_type.scalar_type {
1141 let diff = ColumnDiff::TypeMismatch {
1142 name: left_name.clone(),
1143 left: left_type.scalar_type.clone(),
1144 right: right_type.scalar_type.clone(),
1145 };
1146 column_diffs.insert(idx, diff);
1147 } else if left_type.nullable != right_type.nullable {
1148 let diff = ColumnDiff::NullabilityMismatch {
1149 name: left_name.clone(),
1150 left: left_type.nullable,
1151 right: right_type.nullable,
1152 };
1153 column_diffs.insert(idx, diff);
1154 }
1155 }
1156
1157 for idx in common_arity..left_arity {
1158 let diff = ColumnDiff::Missing {
1159 name: self.get_name(idx).clone(),
1160 };
1161 column_diffs.insert(idx, diff);
1162 }
1163
1164 for idx in common_arity..right_arity {
1165 let diff = ColumnDiff::Extra {
1166 name: other.get_name(idx).clone(),
1167 };
1168 column_diffs.insert(idx, diff);
1169 }
1170
1171 let left_keys: BTreeSet<_> = self.typ.keys.iter().collect();
1172 let right_keys: BTreeSet<_> = other.typ.keys.iter().collect();
1173 if left_keys != right_keys {
1174 let column_names = |desc: &RelationDesc, keys: BTreeSet<&Vec<usize>>| {
1175 keys.iter()
1176 .map(|key| key.iter().map(|&idx| desc.get_name(idx).clone()).collect())
1177 .collect()
1178 };
1179 key_diff = Some(KeyDiff {
1180 left: column_names(self, left_keys),
1181 right: column_names(other, right_keys),
1182 });
1183 }
1184
1185 RelationDescDiff {
1186 column_diffs,
1187 key_diff,
1188 }
1189 }
1190
1191 pub fn apply_demand(&self, demands: &BTreeSet<usize>) -> RelationDesc {
1193 let mut new_desc = self.clone();
1194
1195 let mut removed = 0;
1197 new_desc.metadata.retain(|idx, metadata| {
1198 let retain = demands.contains(&idx.0);
1199 if !retain {
1200 removed += 1;
1201 } else {
1202 metadata.typ_idx -= removed;
1203 }
1204 retain
1205 });
1206
1207 let mut idx = 0;
1209 new_desc.typ.column_types.retain(|_| {
1210 let keep = demands.contains(&idx);
1211 idx += 1;
1212 keep
1213 });
1214
1215 new_desc
1216 }
1217}
1218
1219impl Arbitrary for RelationDesc {
1220 type Parameters = ();
1221 type Strategy = BoxedStrategy<RelationDesc>;
1222
1223 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
1224 let mut weights = vec![(100, Just(0..4)), (50, Just(4..8)), (25, Just(8..16))];
1225 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1226 weights.extend([
1227 (12, Just(16..32)),
1228 (6, Just(32..64)),
1229 (3, Just(64..128)),
1230 (1, Just(128..256)),
1231 ]);
1232 }
1233 let num_columns = Union::new_weighted(weights);
1234
1235 num_columns.prop_flat_map(arb_relation_desc).boxed()
1236 }
1237}
1238
1239pub fn arb_relation_desc(num_cols: std::ops::Range<usize>) -> impl Strategy<Value = RelationDesc> {
1242 proptest::collection::btree_map(any::<ColumnName>(), any::<SqlColumnType>(), num_cols)
1243 .prop_map(RelationDesc::from_names_and_types)
1244}
1245
1246pub fn arb_relation_desc_projection(desc: RelationDesc) -> impl Strategy<Value = RelationDesc> {
1248 let mask: Vec<_> = (0..desc.len()).map(|_| any::<bool>()).collect();
1249 mask.prop_map(move |mask| {
1250 let demands: BTreeSet<_> = mask
1251 .into_iter()
1252 .enumerate()
1253 .filter_map(|(idx, keep)| keep.then_some(idx))
1254 .collect();
1255 desc.apply_demand(&demands)
1256 })
1257}
1258
1259impl IntoIterator for RelationDesc {
1260 type Item = (ColumnName, SqlColumnType);
1261 type IntoIter = Box<dyn Iterator<Item = (ColumnName, SqlColumnType)>>;
1262
1263 fn into_iter(self) -> Self::IntoIter {
1264 let iter = self
1265 .metadata
1266 .into_values()
1267 .zip_eq(self.typ.column_types)
1268 .map(|(meta, typ)| (meta.name, typ));
1269 Box::new(iter)
1270 }
1271}
1272
1273pub fn arb_row_for_relation(desc: &RelationDesc) -> impl Strategy<Value = Row> + use<> {
1275 let datums: Vec<_> = desc
1276 .typ()
1277 .columns()
1278 .iter()
1279 .cloned()
1280 .map(arb_datum_for_column)
1281 .collect();
1282 datums.prop_map(|x| Row::pack(x.iter().map(Datum::from)))
1283}
1284
1285#[derive(Debug, PartialEq, Eq)]
1287pub struct NotNullViolation(pub ColumnName);
1288
1289impl fmt::Display for NotNullViolation {
1290 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1291 write!(
1292 f,
1293 "null value in column {} violates not-null constraint",
1294 self.0.quoted()
1295 )
1296 }
1297}
1298
1299#[derive(Debug, Clone, PartialEq, Eq)]
1301pub struct RelationDescDiff {
1302 pub column_diffs: BTreeMap<usize, ColumnDiff>,
1304 pub key_diff: Option<KeyDiff>,
1306}
1307
1308impl RelationDescDiff {
1309 pub fn is_empty(&self) -> bool {
1311 self.column_diffs.is_empty() && self.key_diff.is_none()
1312 }
1313}
1314
1315#[derive(Debug, Clone, PartialEq, Eq)]
1317pub enum ColumnDiff {
1318 Missing { name: ColumnName },
1320 Extra { name: ColumnName },
1322 TypeMismatch {
1324 name: ColumnName,
1325 left: SqlScalarType,
1326 right: SqlScalarType,
1327 },
1328 NullabilityMismatch {
1330 name: ColumnName,
1331 left: bool,
1332 right: bool,
1333 },
1334 NameMismatch { left: ColumnName, right: ColumnName },
1336}
1337
1338#[derive(Debug, Clone, PartialEq, Eq)]
1340pub struct KeyDiff {
1341 pub left: BTreeSet<Vec<ColumnName>>,
1343 pub right: BTreeSet<Vec<ColumnName>>,
1345}
1346
1347#[derive(Clone, Default, Debug, PartialEq, Eq)]
1349pub struct RelationDescBuilder {
1350 columns: Vec<(ColumnName, SqlColumnType)>,
1352 keys: Vec<Vec<usize>>,
1354}
1355
1356impl RelationDescBuilder {
1357 pub fn with_column<N: Into<ColumnName>>(
1359 mut self,
1360 name: N,
1361 ty: SqlColumnType,
1362 ) -> RelationDescBuilder {
1363 let name = name.into();
1364 self.columns.push((name, ty));
1365 self
1366 }
1367
1368 pub fn with_columns<I, T, N>(mut self, iter: I) -> Self
1370 where
1371 I: IntoIterator<Item = (N, T)>,
1372 T: Into<SqlColumnType>,
1373 N: Into<ColumnName>,
1374 {
1375 self.columns
1376 .extend(iter.into_iter().map(|(name, ty)| (name.into(), ty.into())));
1377 self
1378 }
1379
1380 pub fn with_key(mut self, mut indices: Vec<usize>) -> RelationDescBuilder {
1382 indices.sort_unstable();
1383 if !self.keys.contains(&indices) {
1384 self.keys.push(indices);
1385 }
1386 self
1387 }
1388
1389 pub fn without_keys(mut self) -> RelationDescBuilder {
1391 self.keys.clear();
1392 assert_eq!(self.keys.len(), 0);
1393 self
1394 }
1395
1396 pub fn concat(mut self, other: Self) -> Self {
1398 let self_len = self.columns.len();
1399
1400 self.columns.extend(other.columns);
1401 for k in other.keys {
1402 let k = k.into_iter().map(|idx| idx + self_len).collect();
1403 self = self.with_key(k);
1404 }
1405
1406 self
1407 }
1408
1409 pub fn finish(self) -> RelationDesc {
1411 let mut desc = RelationDesc::from_names_and_types(self.columns);
1412 desc.typ = desc.typ.with_keys(self.keys);
1413 desc
1414 }
1415}
1416
1417#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
1419pub enum RelationVersionSelector {
1420 Specific(RelationVersion),
1421 Latest,
1422}
1423
1424impl RelationVersionSelector {
1425 pub fn specific(version: u64) -> Self {
1426 RelationVersionSelector::Specific(RelationVersion(version))
1427 }
1428}
1429
1430#[derive(Debug, Clone, Serialize)]
1436pub struct VersionedRelationDesc {
1437 inner: RelationDesc,
1438}
1439
1440impl VersionedRelationDesc {
1441 pub fn new(inner: RelationDesc) -> Self {
1442 VersionedRelationDesc { inner }
1443 }
1444
1445 #[must_use]
1453 pub fn add_column<N, T>(&mut self, name: N, typ: T) -> RelationVersion
1454 where
1455 N: Into<ColumnName>,
1456 T: Into<SqlColumnType>,
1457 {
1458 let latest_version = self.latest_version();
1459 let new_version = latest_version.bump();
1460
1461 let name = name.into();
1462 let existing = self
1463 .inner
1464 .metadata
1465 .iter()
1466 .find(|(_, meta)| meta.name == name && meta.dropped.is_none());
1467 if let Some(existing) = existing {
1468 panic!("column named '{name}' already exists! {existing:?}");
1469 }
1470
1471 let next_idx = self.inner.metadata.len();
1472 let col_meta = ColumnMetadata {
1473 name,
1474 typ_idx: next_idx,
1475 added: new_version,
1476 dropped: None,
1477 };
1478
1479 self.inner.typ.column_types.push(typ.into());
1480 let prev = self.inner.metadata.insert(ColumnIndex(next_idx), col_meta);
1481
1482 assert_none!(prev, "column index overlap!");
1483 self.validate();
1484
1485 new_version
1486 }
1487
1488 #[must_use]
1497 pub fn drop_column<N>(&mut self, name: N) -> RelationVersion
1498 where
1499 N: Into<ColumnName>,
1500 {
1501 let name = name.into();
1502 let latest_version = self.latest_version();
1503 let new_version = latest_version.bump();
1504
1505 let col = self
1506 .inner
1507 .metadata
1508 .values_mut()
1509 .find(|meta| meta.name == name && meta.dropped.is_none())
1510 .expect("column to exist");
1511
1512 assert_none!(col.dropped, "column was already dropped");
1514 col.dropped = Some(new_version);
1515
1516 let dropped_key = self
1518 .inner
1519 .typ
1520 .keys
1521 .iter()
1522 .any(|keys| keys.contains(&col.typ_idx));
1523 assert!(!dropped_key, "column being dropped was used as a key");
1524
1525 self.validate();
1526 new_version
1527 }
1528
1529 pub fn latest(&self) -> RelationDesc {
1531 self.inner.clone()
1532 }
1533
1534 pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
1536 let up_to_version = match version {
1538 RelationVersionSelector::Latest => RelationVersion(u64::MAX),
1539 RelationVersionSelector::Specific(v) => v,
1540 };
1541
1542 let valid_columns = self.inner.metadata.iter().filter(|(_col_idx, meta)| {
1543 let added = meta.added <= up_to_version;
1544 let dropped = meta
1545 .dropped
1546 .map(|dropped_at| up_to_version >= dropped_at)
1547 .unwrap_or(false);
1548
1549 added && !dropped
1550 });
1551
1552 let mut column_types = Vec::new();
1553 let mut column_metas = BTreeMap::new();
1554
1555 for (col_idx, meta) in valid_columns {
1562 let new_meta = ColumnMetadata {
1563 name: meta.name.clone(),
1564 typ_idx: column_types.len(),
1565 added: meta.added.clone(),
1566 dropped: meta.dropped.clone(),
1567 };
1568 column_types.push(self.inner.typ.columns()[meta.typ_idx].clone());
1569 column_metas.insert(*col_idx, new_meta);
1570 }
1571
1572 let keys = self
1578 .inner
1579 .typ
1580 .keys
1581 .iter()
1582 .map(|keys| {
1583 keys.iter()
1584 .map(|key_idx| {
1585 let metadata = column_metas
1586 .get(&ColumnIndex(*key_idx))
1587 .expect("found key for column that doesn't exist");
1588 metadata.typ_idx
1589 })
1590 .collect()
1591 })
1592 .collect();
1593
1594 let relation_type = SqlRelationType { column_types, keys };
1595
1596 RelationDesc {
1597 typ: relation_type,
1598 metadata: column_metas,
1599 }
1600 }
1601
1602 pub fn latest_version(&self) -> RelationVersion {
1603 self.inner
1604 .metadata
1605 .values()
1606 .map(|meta| meta.dropped.unwrap_or(meta.added))
1608 .max()
1609 .unwrap_or_else(RelationVersion::root)
1611 }
1612
1613 fn validate(&self) {
1619 fn validate_inner(desc: &RelationDesc) -> Result<(), anyhow::Error> {
1620 if desc.typ.column_types.len() != desc.metadata.len() {
1621 anyhow::bail!("mismatch between number of types and metadatas");
1622 }
1623
1624 for (col_idx, meta) in &desc.metadata {
1625 if col_idx.0 > desc.metadata.len() {
1626 anyhow::bail!("column index out of bounds");
1627 }
1628 if meta.added >= meta.dropped.unwrap_or(RelationVersion(u64::MAX)) {
1629 anyhow::bail!("column was added after it was dropped?");
1630 }
1631 if desc.typ().columns().get(meta.typ_idx).is_none() {
1632 anyhow::bail!("typ_idx incorrect");
1633 }
1634 }
1635
1636 for keys in &desc.typ.keys {
1637 for key in keys {
1638 if *key >= desc.typ.column_types.len() {
1639 anyhow::bail!("key index was out of bounds!");
1640 }
1641 }
1642 }
1643
1644 let versions = desc
1645 .metadata
1646 .values()
1647 .map(|meta| meta.dropped.unwrap_or(meta.added));
1648 let mut max = 0;
1649 let mut sum = 0;
1650 for version in versions {
1651 max = std::cmp::max(max, version.0);
1652 sum += version.0;
1653 }
1654
1655 if sum != (max * (max + 1) / 2) {
1665 anyhow::bail!("there is a duplicate or missing relation version");
1666 }
1667
1668 Ok(())
1669 }
1670
1671 assert_ok!(validate_inner(&self.inner), "validate failed! {self:?}");
1672 }
1673}
1674
1675#[derive(Debug)]
1678pub enum PropRelationDescDiff {
1679 AddColumn {
1680 name: ColumnName,
1681 typ: SqlColumnType,
1682 },
1683 DropColumn {
1684 name: ColumnName,
1685 },
1686 ToggleNullability {
1687 name: ColumnName,
1688 },
1689 ChangeType {
1690 name: ColumnName,
1691 typ: SqlColumnType,
1692 },
1693}
1694
1695impl PropRelationDescDiff {
1696 pub fn apply(self, desc: &mut RelationDesc) {
1697 match self {
1698 PropRelationDescDiff::AddColumn { name, typ } => {
1699 let new_idx = desc.metadata.len();
1700 let meta = ColumnMetadata {
1701 name,
1702 typ_idx: new_idx,
1703 added: RelationVersion(0),
1704 dropped: None,
1705 };
1706 let prev = desc.metadata.insert(ColumnIndex(new_idx), meta);
1707 desc.typ.column_types.push(typ);
1708
1709 assert_none!(prev);
1710 assert_eq!(desc.metadata.len(), desc.typ.column_types.len());
1711 }
1712 PropRelationDescDiff::DropColumn { name } => {
1713 let next_version = desc
1714 .metadata
1715 .values()
1716 .map(|meta| meta.dropped.unwrap_or(meta.added))
1717 .max()
1718 .unwrap_or_else(RelationVersion::root)
1719 .bump();
1720 let Some(metadata) = desc.metadata.values_mut().find(|meta| meta.name == name)
1721 else {
1722 return;
1723 };
1724 if metadata.dropped.is_none() {
1725 metadata.dropped = Some(next_version);
1726 }
1727 }
1728 PropRelationDescDiff::ToggleNullability { name } => {
1729 let Some((pos, _)) = desc.get_by_name(&name) else {
1730 return;
1731 };
1732 let col_type = desc
1733 .typ
1734 .column_types
1735 .get_mut(pos)
1736 .expect("ColumnNames and SqlColumnTypes out of sync!");
1737 col_type.nullable = !col_type.nullable;
1738 }
1739 PropRelationDescDiff::ChangeType { name, typ } => {
1740 let Some((pos, _)) = desc.get_by_name(&name) else {
1741 return;
1742 };
1743 let col_type = desc
1744 .typ
1745 .column_types
1746 .get_mut(pos)
1747 .expect("ColumnNames and SqlColumnTypes out of sync!");
1748 *col_type = typ;
1749 }
1750 }
1751 }
1752}
1753
1754pub fn arb_relation_desc_diff(
1756 source: &RelationDesc,
1757) -> impl Strategy<Value = Vec<PropRelationDescDiff>> + use<> {
1758 let source = Rc::new(source.clone());
1759 let num_source_columns = source.typ.columns().len();
1760
1761 let num_add_columns = Union::new_weighted(vec![(100, Just(0..8)), (1, Just(8..64))]);
1762 let add_columns_strat = num_add_columns
1763 .prop_flat_map(|num_columns| {
1764 proptest::collection::vec((any::<ColumnName>(), any::<SqlColumnType>()), num_columns)
1765 })
1766 .prop_map(|cols| {
1767 cols.into_iter()
1768 .map(|(name, typ)| PropRelationDescDiff::AddColumn { name, typ })
1769 .collect::<Vec<_>>()
1770 });
1771
1772 if num_source_columns == 0 {
1774 return add_columns_strat.boxed();
1775 }
1776
1777 let source_ = Rc::clone(&source);
1778 let drop_columns_strat = (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1779 let mut set = BTreeSet::default();
1780 for _ in 0..num_columns {
1781 let col_idx = rng.random_range(0..num_source_columns);
1782 set.insert(source_.get_name(col_idx).clone());
1783 }
1784 set.into_iter()
1785 .map(|name| PropRelationDescDiff::DropColumn { name })
1786 .collect::<Vec<_>>()
1787 });
1788
1789 let source_ = Rc::clone(&source);
1790 let toggle_nullability_strat =
1791 (0..num_source_columns).prop_perturb(move |num_columns, mut rng| {
1792 let mut set = BTreeSet::default();
1793 for _ in 0..num_columns {
1794 let col_idx = rng.random_range(0..num_source_columns);
1795 set.insert(source_.get_name(col_idx).clone());
1796 }
1797 set.into_iter()
1798 .map(|name| PropRelationDescDiff::ToggleNullability { name })
1799 .collect::<Vec<_>>()
1800 });
1801
1802 let source_ = Rc::clone(&source);
1803 let change_type_strat = (0..num_source_columns)
1804 .prop_perturb(move |num_columns, mut rng| {
1805 let mut set = BTreeSet::default();
1806 for _ in 0..num_columns {
1807 let col_idx = rng.random_range(0..num_source_columns);
1808 set.insert(source_.get_name(col_idx).clone());
1809 }
1810 set
1811 })
1812 .prop_flat_map(|cols| {
1813 proptest::collection::vec(any::<SqlColumnType>(), cols.len())
1814 .prop_map(move |types| (cols.clone(), types))
1815 })
1816 .prop_map(|(cols, types)| {
1817 cols.into_iter()
1818 .zip_eq(types)
1819 .map(|(name, typ)| PropRelationDescDiff::ChangeType { name, typ })
1820 .collect::<Vec<_>>()
1821 });
1822
1823 (
1824 add_columns_strat,
1825 drop_columns_strat,
1826 toggle_nullability_strat,
1827 change_type_strat,
1828 )
1829 .prop_map(|(adds, drops, toggles, changes)| {
1830 adds.into_iter()
1831 .chain(drops)
1832 .chain(toggles)
1833 .chain(changes)
1834 .collect::<Vec<_>>()
1835 })
1836 .prop_shuffle()
1837 .boxed()
1838}
1839
1840#[cfg(test)]
1841mod tests {
1842 use super::*;
1843 use prost::Message;
1844
1845 #[mz_ore::test]
1846 #[cfg_attr(miri, ignore)] fn smoktest_at_version() {
1848 let desc = RelationDesc::builder()
1849 .with_column("a", SqlScalarType::Bool.nullable(true))
1850 .with_column("z", SqlScalarType::String.nullable(false))
1851 .finish();
1852
1853 let mut versioned_desc = VersionedRelationDesc {
1854 inner: desc.clone(),
1855 };
1856 versioned_desc.validate();
1857
1858 let latest = versioned_desc.at_version(RelationVersionSelector::Latest);
1859 assert_eq!(desc, latest);
1860
1861 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1862 assert_eq!(desc, v0);
1863
1864 let v3 = versioned_desc.at_version(RelationVersionSelector::specific(3));
1865 assert_eq!(desc, v3);
1866
1867 let v1 = versioned_desc.add_column("b", SqlScalarType::Bytes.nullable(false));
1868 assert_eq!(v1, RelationVersion(1));
1869
1870 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1871 insta::assert_json_snapshot!(v1.metadata, @r###"
1872 {
1873 "0": {
1874 "name": "a",
1875 "typ_idx": 0,
1876 "added": 0,
1877 "dropped": null
1878 },
1879 "1": {
1880 "name": "z",
1881 "typ_idx": 1,
1882 "added": 0,
1883 "dropped": null
1884 },
1885 "2": {
1886 "name": "b",
1887 "typ_idx": 2,
1888 "added": 1,
1889 "dropped": null
1890 }
1891 }
1892 "###);
1893
1894 let v0_b = versioned_desc.at_version(RelationVersionSelector::specific(0));
1896 assert!(v0.iter().eq(v0_b.iter()));
1897
1898 let v2 = versioned_desc.drop_column("z");
1899 assert_eq!(v2, RelationVersion(2));
1900
1901 let v2 = versioned_desc.at_version(RelationVersionSelector::Specific(v2));
1902 insta::assert_json_snapshot!(v2.metadata, @r###"
1903 {
1904 "0": {
1905 "name": "a",
1906 "typ_idx": 0,
1907 "added": 0,
1908 "dropped": null
1909 },
1910 "2": {
1911 "name": "b",
1912 "typ_idx": 1,
1913 "added": 1,
1914 "dropped": null
1915 }
1916 }
1917 "###);
1918
1919 let v0_c = versioned_desc.at_version(RelationVersionSelector::specific(0));
1921 assert!(v0.iter().eq(v0_c.iter()));
1922
1923 let v1_b = versioned_desc.at_version(RelationVersionSelector::specific(1));
1924 assert!(v1.iter().eq(v1_b.iter()));
1925
1926 insta::assert_json_snapshot!(versioned_desc.inner.metadata, @r###"
1927 {
1928 "0": {
1929 "name": "a",
1930 "typ_idx": 0,
1931 "added": 0,
1932 "dropped": null
1933 },
1934 "1": {
1935 "name": "z",
1936 "typ_idx": 1,
1937 "added": 0,
1938 "dropped": 2
1939 },
1940 "2": {
1941 "name": "b",
1942 "typ_idx": 2,
1943 "added": 1,
1944 "dropped": null
1945 }
1946 }
1947 "###);
1948 }
1949
1950 #[mz_ore::test]
1951 #[cfg_attr(miri, ignore)] fn test_dropping_columns_with_keys() {
1953 let desc = RelationDesc::builder()
1954 .with_column("a", SqlScalarType::Bool.nullable(true))
1955 .with_column("z", SqlScalarType::String.nullable(false))
1956 .with_key(vec![1])
1957 .finish();
1958
1959 let mut versioned_desc = VersionedRelationDesc {
1960 inner: desc.clone(),
1961 };
1962 versioned_desc.validate();
1963
1964 let v1 = versioned_desc.drop_column("a");
1965 assert_eq!(v1, RelationVersion(1));
1966
1967 let v1 = versioned_desc.at_version(RelationVersionSelector::Specific(v1));
1969 insta::assert_json_snapshot!(v1, @r###"
1970 {
1971 "typ": {
1972 "column_types": [
1973 {
1974 "scalar_type": "String",
1975 "nullable": false
1976 }
1977 ],
1978 "keys": [
1979 [
1980 0
1981 ]
1982 ]
1983 },
1984 "metadata": {
1985 "1": {
1986 "name": "z",
1987 "typ_idx": 0,
1988 "added": 0,
1989 "dropped": null
1990 }
1991 }
1992 }
1993 "###);
1994
1995 let v0 = versioned_desc.at_version(RelationVersionSelector::specific(0));
1997 insta::assert_json_snapshot!(v0, @r###"
1998 {
1999 "typ": {
2000 "column_types": [
2001 {
2002 "scalar_type": "Bool",
2003 "nullable": true
2004 },
2005 {
2006 "scalar_type": "String",
2007 "nullable": false
2008 }
2009 ],
2010 "keys": [
2011 [
2012 1
2013 ]
2014 ]
2015 },
2016 "metadata": {
2017 "0": {
2018 "name": "a",
2019 "typ_idx": 0,
2020 "added": 0,
2021 "dropped": 1
2022 },
2023 "1": {
2024 "name": "z",
2025 "typ_idx": 1,
2026 "added": 0,
2027 "dropped": null
2028 }
2029 }
2030 }
2031 "###);
2032 }
2033
2034 #[mz_ore::test]
2035 #[cfg_attr(miri, ignore)] fn roundtrip_relation_desc_without_metadata() {
2037 let typ = ProtoRelationType {
2038 column_types: vec![
2039 SqlScalarType::String.nullable(false).into_proto(),
2040 SqlScalarType::Bool.nullable(true).into_proto(),
2041 ],
2042 keys: vec![],
2043 };
2044 let proto = ProtoRelationDesc {
2045 typ: Some(typ),
2046 names: vec![
2047 ColumnName("a".into()).into_proto(),
2048 ColumnName("b".into()).into_proto(),
2049 ],
2050 metadata: vec![],
2051 };
2052 let desc: RelationDesc = proto.into_rust().unwrap();
2053
2054 insta::assert_json_snapshot!(desc, @r###"
2055 {
2056 "typ": {
2057 "column_types": [
2058 {
2059 "scalar_type": "String",
2060 "nullable": false
2061 },
2062 {
2063 "scalar_type": "Bool",
2064 "nullable": true
2065 }
2066 ],
2067 "keys": []
2068 },
2069 "metadata": {
2070 "0": {
2071 "name": "a",
2072 "typ_idx": 0,
2073 "added": 0,
2074 "dropped": null
2075 },
2076 "1": {
2077 "name": "b",
2078 "typ_idx": 1,
2079 "added": 0,
2080 "dropped": null
2081 }
2082 }
2083 }
2084 "###);
2085 }
2086
2087 #[mz_ore::test]
2088 #[should_panic(expected = "column named 'a' already exists!")]
2089 fn test_add_column_with_same_name_panics() {
2090 let desc = RelationDesc::builder()
2091 .with_column("a", SqlScalarType::Bool.nullable(true))
2092 .finish();
2093 let mut versioned = VersionedRelationDesc::new(desc);
2094
2095 let _ = versioned.add_column("a", SqlScalarType::String.nullable(false));
2096 }
2097
2098 #[mz_ore::test]
2099 #[cfg_attr(miri, ignore)] fn test_add_column_with_same_name_prev_dropped() {
2101 let desc = RelationDesc::builder()
2102 .with_column("a", SqlScalarType::Bool.nullable(true))
2103 .finish();
2104 let mut versioned = VersionedRelationDesc::new(desc);
2105
2106 let v1 = versioned.drop_column("a");
2107 let v1 = versioned.at_version(RelationVersionSelector::Specific(v1));
2108 insta::assert_json_snapshot!(v1, @r###"
2109 {
2110 "typ": {
2111 "column_types": [],
2112 "keys": []
2113 },
2114 "metadata": {}
2115 }
2116 "###);
2117
2118 let v2 = versioned.add_column("a", SqlScalarType::String.nullable(false));
2119 let v2 = versioned.at_version(RelationVersionSelector::Specific(v2));
2120 insta::assert_json_snapshot!(v2, @r###"
2121 {
2122 "typ": {
2123 "column_types": [
2124 {
2125 "scalar_type": "String",
2126 "nullable": false
2127 }
2128 ],
2129 "keys": []
2130 },
2131 "metadata": {
2132 "1": {
2133 "name": "a",
2134 "typ_idx": 0,
2135 "added": 2,
2136 "dropped": null
2137 }
2138 }
2139 }
2140 "###);
2141 }
2142
2143 #[mz_ore::test]
2144 #[cfg_attr(miri, ignore)]
2145 fn apply_demand() {
2146 let desc = RelationDesc::builder()
2147 .with_column("a", SqlScalarType::String.nullable(true))
2148 .with_column("b", SqlScalarType::Int64.nullable(false))
2149 .with_column("c", SqlScalarType::Time.nullable(false))
2150 .finish();
2151 let desc = desc.apply_demand(&BTreeSet::from([0, 2]));
2152 assert_eq!(desc.arity(), 2);
2153 VersionedRelationDesc::new(desc).validate();
2155 }
2156
2157 #[mz_ore::test]
2158 #[cfg_attr(miri, ignore)]
2159 fn smoketest_column_index_stable_ident() {
2160 let idx_a = ColumnIndex(42);
2161 assert_eq!(idx_a.to_stable_name(), "42");
2163 }
2164
2165 #[mz_ore::test]
2166 #[cfg_attr(miri, ignore)] fn proptest_relation_desc_roundtrips() {
2168 fn testcase(og: RelationDesc) {
2169 let bytes = og.into_proto().encode_to_vec();
2170 let proto = ProtoRelationDesc::decode(&bytes[..]).unwrap();
2171 let rnd = RelationDesc::from_proto(proto).unwrap();
2172
2173 assert_eq!(og, rnd);
2174 }
2175
2176 proptest!(|(desc in any::<RelationDesc>())| {
2177 testcase(desc);
2178 });
2179
2180 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2181 arb_relation_desc_diff(&desc).prop_map(move |diffs| (desc.clone(), diffs))
2182 });
2183
2184 proptest!(|((mut desc, diffs) in strat)| {
2185 for diff in diffs {
2186 diff.apply(&mut desc);
2187 };
2188 testcase(desc);
2189 });
2190 }
2191}