1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use chrono::Utc;
22use itertools::Itertools;
23use uuid::Uuid;
24
25use super::{
26 DEFAULT_SCHEMA_ID, INITIAL_VIEW_VERSION_ID, ONE_MINUTE_MS, Schema, SchemaId,
27 TableMetadataBuilder, VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED,
28 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VIEW_PROPERTY_VERSION_HISTORY_SIZE,
29 VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT, ViewFormatVersion, ViewMetadata,
30 ViewRepresentation, ViewVersion, ViewVersionLog, ViewVersionRef,
31};
32use crate::ViewCreation;
33use crate::catalog::ViewUpdate;
34use crate::error::{Error, ErrorKind, Result};
35use crate::io::is_truthy;
36
37#[derive(Debug, Clone)]
45pub struct ViewMetadataBuilder {
46 metadata: ViewMetadata,
47 changes: Vec<ViewUpdate>,
48 last_added_schema_id: Option<SchemaId>,
49 last_added_version_id: Option<SchemaId>,
50 history_entry: Option<ViewVersionLog>,
51 previous_view_version: Option<ViewVersionRef>,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57pub struct ViewMetadataBuildResult {
59 pub metadata: ViewMetadata,
61 pub changes: Vec<ViewUpdate>,
63}
64
65impl ViewMetadataBuilder {
66 const LAST_ADDED: i32 = TableMetadataBuilder::LAST_ADDED;
67
68 pub fn new(
70 location: String,
71 schema: Schema,
72 view_version: ViewVersion,
73 format_version: ViewFormatVersion,
74 properties: HashMap<String, String>,
75 ) -> Result<Self> {
76 let builder = Self {
77 metadata: ViewMetadata {
78 format_version,
79 view_uuid: Uuid::now_v7(),
80 location: "".to_string(), current_version_id: -1, versions: HashMap::new(), version_log: Vec::new(),
84 schemas: HashMap::new(), properties: HashMap::new(), },
87 changes: vec![],
88 last_added_schema_id: None, last_added_version_id: None, history_entry: None,
91 previous_view_version: None, };
93
94 builder
95 .set_location(location)
96 .set_current_version(view_version, schema)?
97 .set_properties(properties)
98 }
99
100 #[must_use]
102 pub fn new_from_metadata(previous: ViewMetadata) -> Self {
103 let previous_view_version = previous.current_version().clone();
104 Self {
105 metadata: previous,
106 changes: Vec::default(),
107 last_added_schema_id: None,
108 last_added_version_id: None,
109 history_entry: None,
110 previous_view_version: Some(previous_view_version),
111 }
112 }
113
114 pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
116 let ViewCreation {
117 location,
118 schema,
119 properties,
120 name: _,
121 representations,
122 default_catalog,
123 default_namespace,
124 summary,
125 } = view_creation;
126 let version = ViewVersion::builder()
127 .with_default_catalog(default_catalog)
128 .with_default_namespace(default_namespace)
129 .with_representations(representations)
130 .with_schema_id(schema.schema_id())
131 .with_summary(summary)
132 .with_timestamp_ms(Utc::now().timestamp_millis())
133 .with_version_id(INITIAL_VIEW_VERSION_ID)
134 .build();
135
136 Self::new(location, schema, version, ViewFormatVersion::V1, properties)
137 }
138
139 pub fn upgrade_format_version(self, format_version: ViewFormatVersion) -> Result<Self> {
144 if format_version < self.metadata.format_version {
145 return Err(Error::new(
146 ErrorKind::DataInvalid,
147 format!(
148 "Cannot downgrade ViewFormatVersion from {} to {}",
149 self.metadata.format_version, format_version
150 ),
151 ));
152 }
153
154 if format_version != self.metadata.format_version {
155 match format_version {
156 ViewFormatVersion::V1 => {
157 }
159 }
160 }
161
162 Ok(self)
163 }
164
165 pub fn set_location(mut self, location: String) -> Self {
167 let location = location.trim_end_matches('/').to_string();
168 if self.metadata.location != location {
169 self.changes.push(ViewUpdate::SetLocation {
170 location: location.clone(),
171 });
172 self.metadata.location = location;
173 }
174
175 self
176 }
177
178 pub fn set_current_version_id(mut self, mut version_id: i32) -> Result<Self> {
184 if version_id == Self::LAST_ADDED {
185 let Some(last_added_id) = self.last_added_version_id else {
186 return Err(Error::new(
187 ErrorKind::DataInvalid,
188 "Cannot set current version id to last added version: no version has been added.",
189 ));
190 };
191 version_id = last_added_id;
192 }
193
194 let version_id = version_id; if version_id == self.metadata.current_version_id {
197 return Ok(self);
198 }
199
200 let version = self.metadata.versions.get(&version_id).ok_or_else(|| {
201 Error::new(
202 ErrorKind::DataInvalid,
203 format!(
204 "Cannot set current version to unknown version with id: {}",
205 version_id
206 ),
207 )
208 })?;
209
210 self.metadata.current_version_id = version_id;
211
212 if self.last_added_version_id == Some(version_id) {
213 self.changes.push(ViewUpdate::SetCurrentViewVersion {
214 view_version_id: Self::LAST_ADDED,
215 });
216 } else {
217 self.changes.push(ViewUpdate::SetCurrentViewVersion {
218 view_version_id: version_id,
219 });
220 }
221
222 let version_added_in_this_changes = self
226 .changes
227 .iter()
228 .any(|update| matches!(update, ViewUpdate::AddViewVersion { view_version } if view_version.version_id() == version_id));
229
230 let mut log = version.log();
231 if !version_added_in_this_changes {
232 log.set_timestamp_ms(Utc::now().timestamp_millis());
233 }
234
235 self.history_entry = Some(log);
236
237 Ok(self)
238 }
239
240 pub fn set_current_version(
242 mut self,
243 view_version: ViewVersion,
244 schema: Schema,
245 ) -> Result<Self> {
246 let schema_id = self.add_schema_internal(schema);
247 let view_version = view_version.with_schema_id(schema_id);
248 let view_version_id = self.add_version_internal(view_version)?;
249 self.set_current_version_id(view_version_id)
250 }
251
252 pub fn add_version(mut self, view_version: ViewVersion) -> Result<Self> {
259 self.add_version_internal(view_version)?;
260
261 Ok(self)
262 }
263
264 fn add_version_internal(&mut self, view_version: ViewVersion) -> Result<i32> {
265 let version_id = self.reuse_or_create_new_view_version_id(&view_version);
266 let view_version = view_version.with_version_id(version_id);
267
268 if self.metadata.versions.contains_key(&version_id) {
269 if self.last_added_version_id != Some(version_id) {
273 self.changes
274 .push(ViewUpdate::AddViewVersion { view_version });
275 self.last_added_version_id = Some(version_id);
276 }
277 return Ok(version_id);
278 }
279
280 let view_version = if view_version.schema_id() == Self::LAST_ADDED {
281 let last_added_schema_id = self.last_added_schema_id.ok_or_else(|| {
282 Error::new(
283 ErrorKind::DataInvalid,
284 "Cannot set last added schema: no schema has been added",
285 )
286 })?;
287 view_version.with_schema_id(last_added_schema_id)
288 } else {
289 view_version
290 };
291
292 if !self
293 .metadata
294 .schemas
295 .contains_key(&view_version.schema_id())
296 {
297 return Err(Error::new(
298 ErrorKind::DataInvalid,
299 format!(
300 "Cannot add version with unknown schema: {}",
301 view_version.schema_id()
302 ),
303 ));
304 }
305
306 require_unique_dialects(&view_version)?;
307
308 if let Some(last) = self.metadata.version_log.last() {
311 if view_version.timestamp_ms() - last.timestamp_ms() < -ONE_MINUTE_MS {
314 return Err(Error::new(
315 ErrorKind::DataInvalid,
316 format!(
317 "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
318 view_version.timestamp_ms(),
319 last.timestamp_ms()
320 ),
321 ));
322 }
323 }
324
325 self.metadata
326 .versions
327 .insert(version_id, Arc::new(view_version.clone()));
328
329 let view_version = if let Some(last_added_schema_id) = self.last_added_schema_id {
330 if view_version.schema_id() == last_added_schema_id {
331 view_version.with_schema_id(Self::LAST_ADDED)
332 } else {
333 view_version
334 }
335 } else {
336 view_version
337 };
338 self.changes
339 .push(ViewUpdate::AddViewVersion { view_version });
340
341 self.last_added_version_id = Some(version_id);
342
343 Ok(version_id)
344 }
345
346 fn reuse_or_create_new_view_version_id(&self, new_view_version: &ViewVersion) -> i32 {
347 self.metadata
348 .versions
349 .iter()
350 .find_map(|(id, other_version)| {
351 new_view_version
352 .behaves_identical_to(other_version)
353 .then_some(*id)
354 })
355 .unwrap_or_else(|| {
356 self.get_highest_view_version_id()
357 .map(|id| id + 1)
358 .unwrap_or(INITIAL_VIEW_VERSION_ID)
359 })
360 }
361
362 fn get_highest_view_version_id(&self) -> Option<i32> {
363 self.metadata.versions.keys().max().copied()
364 }
365
366 pub fn add_schema(mut self, schema: Schema) -> Self {
368 self.add_schema_internal(schema);
369
370 self
371 }
372
373 fn add_schema_internal(&mut self, schema: Schema) -> SchemaId {
374 let schema_id = self.reuse_or_create_new_schema_id(&schema);
375
376 if self.metadata.schemas.contains_key(&schema_id) {
377 if self.last_added_schema_id != Some(schema_id) {
381 self.changes.push(ViewUpdate::AddSchema {
382 schema: schema.clone().with_schema_id(schema_id),
383 last_column_id: None,
384 });
385 self.last_added_schema_id = Some(schema_id);
386 }
387 return schema_id;
388 }
389
390 let schema = schema.with_schema_id(schema_id);
391
392 self.metadata
393 .schemas
394 .insert(schema_id, Arc::new(schema.clone()));
395 let last_column_id = schema.highest_field_id();
396 self.changes.push(ViewUpdate::AddSchema {
397 schema,
398 last_column_id: Some(last_column_id),
399 });
400
401 self.last_added_schema_id = Some(schema_id);
402
403 schema_id
404 }
405
406 fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> SchemaId {
407 self.metadata
408 .schemas
409 .iter()
410 .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
411 .unwrap_or_else(|| {
412 self.get_highest_schema_id()
413 .map(|id| id + 1)
414 .unwrap_or(DEFAULT_SCHEMA_ID)
415 })
416 }
417
418 fn get_highest_schema_id(&self) -> Option<SchemaId> {
419 self.metadata.schemas.keys().max().copied()
420 }
421
422 pub fn set_properties(mut self, updates: HashMap<String, String>) -> Result<Self> {
424 if updates.is_empty() {
425 return Ok(self);
426 }
427
428 let num_versions_to_keep = updates
429 .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
430 .and_then(|v| v.parse::<i64>().ok())
431 .unwrap_or(1);
432 if num_versions_to_keep < 0 {
433 return Err(Error::new(
434 ErrorKind::DataInvalid,
435 format!(
436 "{} must be positive but was {}",
437 VIEW_PROPERTY_VERSION_HISTORY_SIZE, num_versions_to_keep
438 ),
439 ));
440 }
441
442 self.metadata.properties.extend(updates.clone());
443 self.changes.push(ViewUpdate::SetProperties { updates });
444
445 Ok(self)
446 }
447
448 pub fn remove_properties(mut self, removals: &[String]) -> Self {
450 if removals.is_empty() {
451 return self;
452 }
453
454 for property in removals {
455 self.metadata.properties.remove(property);
456 }
457
458 self.changes.push(ViewUpdate::RemoveProperties {
459 removals: removals.to_vec(),
460 });
461
462 self
463 }
464
465 pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
467 if self.metadata.view_uuid != uuid {
468 self.metadata.view_uuid = uuid;
469 self.changes.push(ViewUpdate::AssignUuid { uuid });
470 }
471
472 self
473 }
474
475 pub fn build(mut self) -> Result<ViewMetadataBuildResult> {
477 if let Some(history_entry) = self.history_entry.take() {
478 self.metadata.version_log.push(history_entry);
479 }
480
481 self.metadata.validate()?;
484
485 if let Some(previous) = self.previous_view_version.take() {
486 if !allow_replace_drop_dialects(&self.metadata.properties) {
487 require_no_dialect_dropped(&previous, self.metadata.current_version())?;
488 }
489 }
490
491 let _expired_versions = self.expire_versions();
492 self.metadata.version_log = update_version_log(
493 self.metadata.version_log,
494 self.metadata.versions.keys().copied().collect(),
495 );
496
497 Ok(ViewMetadataBuildResult {
498 metadata: self.metadata,
499 changes: self.changes,
500 })
501 }
502
503 fn expire_versions(&mut self) -> Vec<ViewVersionRef> {
505 let num_versions_to_keep = self
506 .metadata
507 .properties
508 .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
509 .and_then(|v| v.parse::<usize>().ok())
510 .unwrap_or(VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT)
511 .max(1);
512
513 let num_added_versions = self
515 .changes
516 .iter()
517 .filter(|update| matches!(update, ViewUpdate::AddViewVersion { .. }))
518 .count();
519 let num_versions_to_keep = num_added_versions.max(num_versions_to_keep);
520
521 if self.metadata.versions.len() > num_versions_to_keep {
522 let mut versions_to_keep = self
524 .metadata
525 .versions
526 .keys()
527 .copied()
528 .sorted()
529 .rev()
530 .take(num_versions_to_keep)
531 .collect::<HashSet<_>>();
532
533 if !versions_to_keep.contains(&self.metadata.current_version_id) {
535 if num_versions_to_keep > num_added_versions {
537 let lowest_id = versions_to_keep.iter().min().copied();
538 lowest_id.map(|id| versions_to_keep.remove(&id));
539 }
540 versions_to_keep.insert(self.metadata.current_version_id);
542 }
543
544 let mut expired_versions = Vec::new();
545 self.metadata.versions.retain(|id, version| {
548 if versions_to_keep.contains(id) {
549 true
550 } else {
551 expired_versions.push(version.clone());
552 false
553 }
554 });
555
556 expired_versions
557 } else {
558 Vec::new()
559 }
560 }
561}
562
563fn update_version_log(
566 version_log: Vec<ViewVersionLog>,
567 ids_to_keep: HashSet<i32>,
568) -> Vec<ViewVersionLog> {
569 let mut retained_history = Vec::new();
570 for log_entry in version_log {
571 if ids_to_keep.contains(&log_entry.version_id()) {
572 retained_history.push(log_entry);
573 } else {
574 retained_history.clear();
575 }
576 }
577 retained_history
578}
579
580fn allow_replace_drop_dialects(properties: &HashMap<String, String>) -> bool {
581 properties
582 .get(VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED)
583 .map_or(
584 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT,
585 |value| is_truthy(value),
586 )
587}
588
589fn require_no_dialect_dropped(previous: &ViewVersion, current: &ViewVersion) -> Result<()> {
590 let base_dialects = lowercase_sql_dialects_for(previous);
591 let updated_dialects = lowercase_sql_dialects_for(current);
592
593 if !updated_dialects.is_superset(&base_dialects) {
594 return Err(Error::new(
595 ErrorKind::DataInvalid,
596 format!(
597 "Cannot replace view due to loss of view dialects: \nPrevious dialects: {:?}\nNew dialects: {:?}\nSet {} to true to allow dropping dialects.",
598 Vec::from_iter(base_dialects),
599 Vec::from_iter(updated_dialects),
600 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED
601 ),
602 ));
603 }
604
605 Ok(())
606}
607
608fn lowercase_sql_dialects_for(view_version: &ViewVersion) -> HashSet<String> {
609 view_version
610 .representations()
611 .iter()
612 .map(|repr| match repr {
613 ViewRepresentation::Sql(sql_repr) => sql_repr.dialect.to_lowercase(),
614 })
615 .collect()
616}
617
618pub(super) fn require_unique_dialects(view_version: &ViewVersion) -> Result<()> {
619 let mut seen_dialects = HashSet::with_capacity(view_version.representations().len());
620 for repr in view_version.representations().iter() {
621 match repr {
622 ViewRepresentation::Sql(sql_repr) => {
623 if !seen_dialects.insert(sql_repr.dialect.to_lowercase()) {
624 return Err(Error::new(
625 ErrorKind::DataInvalid,
626 format!(
627 "Invalid view version: Cannot add multiple queries for dialect {}",
628 sql_repr.dialect
629 ),
630 ));
631 }
632 }
633 }
634 }
635 Ok(())
636}
637
638#[cfg(test)]
639mod test {
640 use super::super::view_metadata::tests::get_test_view_metadata;
641 use super::*;
642 use crate::NamespaceIdent;
643 use crate::spec::{
644 NestedField, PrimitiveType, SqlViewRepresentation, Type, ViewRepresentations,
645 };
646
647 fn new_view_version(id: usize, schema_id: SchemaId, sql: &str) -> ViewVersion {
648 new_view_version_with_dialect(id, schema_id, sql, vec!["spark"])
649 }
650
651 fn new_view_version_with_dialect(
652 id: usize,
653 schema_id: SchemaId,
654 sql: &str,
655 dialects: Vec<&str>,
656 ) -> ViewVersion {
657 ViewVersion::builder()
658 .with_version_id(id as i32)
659 .with_schema_id(schema_id)
660 .with_timestamp_ms(1573518431300)
661 .with_default_catalog(Some("prod".to_string()))
662 .with_summary(HashMap::from_iter(vec![(
663 "user".to_string(),
664 "some-user".to_string(),
665 )]))
666 .with_representations(ViewRepresentations(
667 dialects
668 .iter()
669 .map(|dialect| {
670 ViewRepresentation::Sql(SqlViewRepresentation {
671 dialect: dialect.to_string(),
672 sql: sql.to_string(),
673 })
674 })
675 .collect(),
676 ))
677 .with_default_namespace(NamespaceIdent::new("default".to_string()))
678 .build()
679 }
680
681 fn builder_without_changes() -> ViewMetadataBuilder {
682 ViewMetadataBuilder::new_from_metadata(get_test_view_metadata("ViewMetadataV1Valid.json"))
683 }
684
685 #[test]
686 fn test_minimal_builder() {
687 let location = "s3://bucket/table".to_string();
688 let schema = Schema::builder()
689 .with_schema_id(1)
690 .with_fields(vec![])
691 .build()
692 .unwrap();
693 let version = new_view_version(20, 21, "select 1 as count");
695 let format_version = ViewFormatVersion::V1;
696 let properties = HashMap::from_iter(vec![("key".to_string(), "value".to_string())]);
697
698 let build_result = ViewMetadataBuilder::new(
699 location.clone(),
700 schema.clone(),
701 version.clone(),
702 format_version,
703 properties.clone(),
704 )
705 .unwrap()
706 .build()
707 .unwrap();
708
709 let metadata = build_result.metadata;
710 assert_eq!(metadata.location, location);
711 assert_eq!(metadata.current_version_id, INITIAL_VIEW_VERSION_ID);
712 assert_eq!(metadata.format_version, format_version);
713 assert_eq!(metadata.properties, properties);
714 assert_eq!(metadata.versions.len(), 1);
715 assert_eq!(metadata.schemas.len(), 1);
716 assert_eq!(metadata.version_log.len(), 1);
717 assert_eq!(
718 Arc::unwrap_or_clone(metadata.versions[&INITIAL_VIEW_VERSION_ID].clone()),
719 version
720 .clone()
721 .with_version_id(INITIAL_VIEW_VERSION_ID)
722 .with_schema_id(0)
723 );
724
725 let changes = build_result.changes;
726 assert_eq!(changes.len(), 5);
727 assert!(changes.contains(&ViewUpdate::SetLocation { location }));
728 assert!(
729 changes.contains(&ViewUpdate::AddViewVersion {
730 view_version: version
731 .with_version_id(INITIAL_VIEW_VERSION_ID)
732 .with_schema_id(-1)
733 })
734 );
735 assert!(changes.contains(&ViewUpdate::SetCurrentViewVersion {
736 view_version_id: -1
737 }));
738 assert!(changes.contains(&ViewUpdate::AddSchema {
739 schema: schema.clone().with_schema_id(0),
740 last_column_id: Some(0)
741 }));
742 assert!(changes.contains(&ViewUpdate::SetProperties {
743 updates: properties
744 }));
745 }
746
747 #[test]
748 fn test_version_expiration() {
749 let v1 = new_view_version(0, 1, "select 1 as count");
750 let v2 = new_view_version(0, 1, "select count(1) as count from t2");
751 let v3 = new_view_version(0, 1, "select count from t1");
752
753 let builder = builder_without_changes()
754 .add_version(v1)
755 .unwrap()
756 .add_version(v2)
757 .unwrap()
758 .add_version(v3)
759 .unwrap();
760 let builder_without_changes = builder.clone().build().unwrap().metadata.into_builder();
761
762 let metadata = builder.clone().build().unwrap().metadata;
764 assert_eq!(
765 metadata.versions.keys().cloned().collect::<HashSet<_>>(),
766 HashSet::from_iter(vec![1, 2, 3, 4])
767 );
768
769 let metadata = builder
772 .clone()
773 .set_properties(HashMap::from_iter(vec![(
774 VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
775 "2".to_string(),
776 )]))
777 .unwrap()
778 .build()
779 .unwrap()
780 .metadata;
781 assert_eq!(
782 metadata.versions.keys().cloned().collect::<HashSet<_>>(),
783 HashSet::from_iter(vec![1, 2, 3, 4])
784 );
785 assert_eq!(metadata.version_log.len(), 1);
786
787 let metadata = builder_without_changes
790 .clone()
791 .set_properties(HashMap::from_iter(vec![(
792 VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
793 "2".to_string(),
794 )]))
795 .unwrap()
796 .build()
797 .unwrap()
798 .metadata;
799 assert_eq!(
800 metadata.versions.keys().cloned().collect::<HashSet<_>>(),
801 HashSet::from_iter(vec![1, 4])
802 );
803
804 let metadata = builder_without_changes
807 .set_properties(HashMap::from_iter(vec![(
808 VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
809 "0".to_string(),
810 )]))
811 .unwrap()
812 .build()
813 .unwrap()
814 .metadata;
815 assert_eq!(
816 metadata.versions.keys().cloned().collect::<HashSet<_>>(),
817 HashSet::from_iter(vec![1])
818 );
819 }
820
821 #[test]
822 fn test_update_version_log() {
823 let v1 = new_view_version(1, 1, "select 1 as count");
824 let v2 = new_view_version(2, 1, "select count(1) as count from t2");
825 let v3 = new_view_version(3, 1, "select count from t1");
826
827 let one = ViewVersionLog::new(1, v1.timestamp_ms());
828 let two = ViewVersionLog::new(2, v2.timestamp_ms());
829 let three = ViewVersionLog::new(3, v3.timestamp_ms());
830
831 assert_eq!(
832 update_version_log(
833 vec![one.clone(), two.clone(), three.clone()],
834 HashSet::from_iter(vec![1, 2, 3])
835 ),
836 vec![one.clone(), two.clone(), three.clone()]
837 );
838
839 assert_eq!(
841 update_version_log(
842 vec![
843 three.clone(),
844 two.clone(),
845 one.clone(),
846 two.clone(),
847 three.clone()
848 ],
849 HashSet::from_iter(vec![2, 3])
850 ),
851 vec![two.clone(), three.clone()]
852 );
853
854 assert_eq!(
856 update_version_log(
857 vec![
858 one.clone(),
859 two.clone(),
860 three.clone(),
861 one.clone(),
862 three.clone()
863 ],
864 HashSet::from_iter(vec![1, 3])
865 ),
866 vec![three.clone(), one.clone(), three.clone()]
867 );
868 }
869
870 #[test]
871 fn test_use_previously_added_version() {
872 let v2 = new_view_version(2, 1, "select 1 as count");
873 let v3 = new_view_version(3, 1, "select count(1) as count from t2");
874 let schema = Schema::builder().build().unwrap();
875
876 let log_v2 = ViewVersionLog::new(2, v2.timestamp_ms());
877 let log_v3 = ViewVersionLog::new(3, v3.timestamp_ms());
878
879 let metadata_v2 = builder_without_changes()
880 .set_current_version(v2.clone(), schema.clone())
881 .unwrap()
882 .build()
883 .unwrap()
884 .metadata;
885
886 assert_eq!(metadata_v2.version_log.last().unwrap(), &log_v2);
888
889 let metadata_v3 = metadata_v2
891 .into_builder()
892 .set_current_version(v3.clone(), schema)
893 .unwrap()
894 .build()
895 .unwrap()
896 .metadata;
897
898 assert_eq!(metadata_v3.version_log[1..], vec![
899 log_v2.clone(),
900 log_v3.clone()
901 ]);
902
903 let metadata_v4 = metadata_v3
905 .into_builder()
906 .set_current_version_id(2)
907 .unwrap()
908 .build()
909 .unwrap()
910 .metadata;
911
912 let entry = metadata_v4.version_log.last().unwrap();
914 assert_eq!(entry.version_id(), 2);
915 assert!(entry.timestamp_ms() > v2.timestamp_ms());
916 }
917
918 #[test]
919 fn test_assign_uuid() {
920 let builder = builder_without_changes();
921 let uuid = Uuid::now_v7();
922 let build_result = builder.clone().assign_uuid(uuid).build().unwrap();
923 assert_eq!(build_result.metadata.view_uuid, uuid);
924 assert_eq!(build_result.changes, vec![ViewUpdate::AssignUuid { uuid }]);
925 }
926
927 #[test]
928 fn test_set_location() {
929 let builder = builder_without_changes();
930 let location = "s3://bucket/table".to_string();
931 let build_result = builder
932 .clone()
933 .set_location(location.clone())
934 .build()
935 .unwrap();
936 assert_eq!(build_result.metadata.location, location);
937 assert_eq!(build_result.changes, vec![ViewUpdate::SetLocation {
938 location
939 }]);
940 }
941
942 #[test]
943 fn test_set_and_remove_properties() {
944 let builder = builder_without_changes();
945 let properties = HashMap::from_iter(vec![
946 ("key1".to_string(), "value1".to_string()),
947 ("key2".to_string(), "value2".to_string()),
948 ]);
949 let build_result = builder
950 .clone()
951 .set_properties(properties.clone())
952 .unwrap()
953 .remove_properties(&["key2".to_string(), "key3".to_string()])
954 .build()
955 .unwrap();
956 assert_eq!(
957 build_result.metadata.properties.get("key1"),
958 Some(&"value1".to_string())
959 );
960 assert_eq!(build_result.metadata.properties.get("key2"), None);
961 assert_eq!(build_result.changes, vec![
962 ViewUpdate::SetProperties {
963 updates: properties
964 },
965 ViewUpdate::RemoveProperties {
966 removals: vec!["key2".to_string(), "key3".to_string()]
967 }
968 ]);
969 }
970
971 #[test]
972 fn test_add_schema() {
973 let builder = builder_without_changes();
974 let schema = Schema::builder()
975 .with_schema_id(1)
976 .with_fields(vec![])
977 .build()
978 .unwrap();
979 let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
980 assert_eq!(build_result.metadata.schemas.len(), 2);
981 assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
982 schema: schema.clone().with_schema_id(2),
983 last_column_id: Some(0)
984 }]);
985
986 let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
988 assert_eq!(build_result.metadata.schemas.len(), 2);
989 assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
990 schema: schema.clone().with_schema_id(2),
991 last_column_id: Some(0)
992 }]);
993 }
994
995 #[test]
996 fn test_add_and_set_current_version() {
997 let builder = builder_without_changes();
998 let v1 = new_view_version(2, 1, "select 1 as count");
999 let v2 = new_view_version(3, 2, "select count(1) as count from t2");
1000 let v2_schema = Schema::builder()
1001 .with_schema_id(2)
1002 .with_fields(vec![])
1003 .build()
1004 .unwrap();
1005
1006 let build_result = builder
1007 .clone()
1008 .add_version(v1.clone())
1009 .unwrap()
1010 .add_schema(v2_schema.clone())
1011 .add_version(v2.clone())
1012 .unwrap()
1013 .set_current_version_id(3)
1014 .unwrap()
1015 .build()
1016 .unwrap();
1017
1018 assert_eq!(build_result.metadata.current_version_id, 3);
1019 assert_eq!(build_result.metadata.versions.len(), 3);
1020 assert_eq!(build_result.metadata.schemas.len(), 2);
1021 assert_eq!(build_result.metadata.version_log.len(), 2);
1022 assert_eq!(
1023 Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
1024 v1.clone().with_version_id(2).with_schema_id(1)
1025 );
1026 assert_eq!(
1027 Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
1028 v2.clone().with_version_id(3).with_schema_id(2)
1029 );
1030 assert_eq!(build_result.changes.len(), 4);
1031 assert_eq!(build_result.changes, vec![
1032 ViewUpdate::AddViewVersion {
1033 view_version: v1.clone().with_version_id(2).with_schema_id(1)
1034 },
1035 ViewUpdate::AddSchema {
1036 schema: v2_schema.clone().with_schema_id(2),
1037 last_column_id: Some(0)
1038 },
1039 ViewUpdate::AddViewVersion {
1040 view_version: v2.clone().with_version_id(3).with_schema_id(-1)
1041 },
1042 ViewUpdate::SetCurrentViewVersion {
1043 view_version_id: -1
1044 }
1045 ]);
1046 assert_eq!(
1047 build_result
1048 .metadata
1049 .version_log
1050 .iter()
1051 .map(|v| v.version_id())
1052 .collect::<Vec<_>>(),
1053 vec![1, 3]
1054 );
1055 }
1056
1057 #[test]
1058 fn test_schema_and_version_id_reassignment() {
1059 let builder = builder_without_changes();
1060 let v1 = new_view_version(0, 1, "select 1 as count");
1061 let v2 = new_view_version(0, 2, "select count(1) as count from t2");
1062 let v2_schema = Schema::builder()
1063 .with_schema_id(0)
1064 .with_fields(vec![])
1065 .build()
1066 .unwrap();
1067
1068 let build_result = builder
1069 .clone()
1070 .add_version(v1.clone())
1071 .unwrap()
1072 .set_current_version(v2.clone(), v2_schema.clone())
1073 .unwrap()
1074 .build()
1075 .unwrap();
1076
1077 assert_eq!(build_result.metadata.current_version_id, 3);
1078 assert_eq!(build_result.metadata.versions.len(), 3);
1079 assert_eq!(build_result.metadata.schemas.len(), 2);
1080 assert_eq!(build_result.metadata.version_log.len(), 2);
1081 assert_eq!(
1082 Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
1083 v1.clone().with_version_id(2).with_schema_id(1)
1084 );
1085 assert_eq!(
1086 Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
1087 v2.clone().with_version_id(3).with_schema_id(2)
1088 );
1089 assert_eq!(build_result.changes.len(), 4);
1090 assert_eq!(build_result.changes, vec![
1091 ViewUpdate::AddViewVersion {
1092 view_version: v1.clone().with_version_id(2).with_schema_id(1)
1093 },
1094 ViewUpdate::AddSchema {
1095 schema: v2_schema.clone().with_schema_id(2),
1096 last_column_id: Some(0)
1097 },
1098 ViewUpdate::AddViewVersion {
1099 view_version: v2.clone().with_version_id(3).with_schema_id(-1)
1100 },
1101 ViewUpdate::SetCurrentViewVersion {
1102 view_version_id: -1
1103 }
1104 ]);
1105 assert_eq!(
1106 build_result
1107 .metadata
1108 .version_log
1109 .iter()
1110 .map(|v| v.version_id())
1111 .collect::<Vec<_>>(),
1112 vec![1, 3]
1113 );
1114 }
1115
1116 #[test]
1117 fn test_view_version_deduplication() {
1118 let builder = builder_without_changes();
1119 let v1 = new_view_version(0, 1, "select * from ns.tbl");
1120
1121 assert_eq!(builder.metadata.versions.len(), 1);
1122 let build_result = builder
1123 .clone()
1124 .add_version(v1.clone())
1125 .unwrap()
1126 .add_version(v1)
1127 .unwrap()
1128 .build()
1129 .unwrap();
1130
1131 assert_eq!(build_result.metadata.versions.len(), 2);
1132 assert_eq!(build_result.metadata.schemas.len(), 1);
1133 }
1134
1135 #[test]
1136 fn test_view_version_and_schema_deduplication() {
1137 let schema_one = Schema::builder()
1138 .with_schema_id(5)
1139 .with_fields(vec![
1140 NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1141 ])
1142 .build()
1143 .unwrap();
1144 let schema_two = Schema::builder()
1145 .with_schema_id(7)
1146 .with_fields(vec![
1147 NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(),
1148 ])
1149 .build()
1150 .unwrap();
1151 let schema_three = Schema::builder()
1152 .with_schema_id(9)
1153 .with_fields(vec![
1154 NestedField::required(1, "z", Type::Primitive(PrimitiveType::Long)).into(),
1155 ])
1156 .build()
1157 .unwrap();
1158
1159 let v1 = new_view_version(1, 5, "select * from ns.tbl");
1160 let v2 = new_view_version(1, 7, "select count(*) from ns.tbl");
1161 let v3 = new_view_version(1, 9, "select count(*) as count from ns.tbl");
1162
1163 let build_result = builder_without_changes()
1164 .add_schema(schema_one.clone())
1165 .add_schema(schema_two.clone())
1166 .add_schema(schema_three.clone())
1167 .set_current_version(v1.clone(), schema_one.clone())
1168 .unwrap()
1169 .set_current_version(v2.clone(), schema_two.clone())
1170 .unwrap()
1171 .set_current_version(v3.clone(), schema_three.clone())
1172 .unwrap()
1173 .set_current_version(v3.clone(), schema_three.clone())
1174 .unwrap()
1175 .set_current_version(v2.clone(), schema_two.clone())
1176 .unwrap()
1177 .set_current_version(v1.clone(), schema_one.clone())
1178 .unwrap()
1179 .build()
1180 .unwrap();
1181
1182 assert_eq!(
1183 Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1184 v1.clone().with_version_id(2).with_schema_id(2)
1185 );
1186 assert_eq!(build_result.metadata.versions.len(), 4);
1187 assert_eq!(
1188 build_result.metadata.versions[&2],
1189 Arc::new(v1.clone().with_version_id(2).with_schema_id(2))
1190 );
1191 assert_eq!(
1192 build_result.metadata.versions[&3],
1193 Arc::new(v2.clone().with_version_id(3).with_schema_id(3))
1194 );
1195 assert_eq!(
1196 build_result.metadata.versions[&4],
1197 Arc::new(v3.clone().with_version_id(4).with_schema_id(4))
1198 );
1199 assert_eq!(
1200 build_result
1202 .metadata
1203 .schemas_iter()
1204 .filter(|s| s.schema_id() != 1)
1205 .sorted_by_key(|s| s.schema_id())
1206 .map(|s| s.as_struct())
1207 .collect::<Vec<_>>(),
1208 vec![
1209 schema_one.as_struct(),
1210 schema_two.as_struct(),
1211 schema_three.as_struct()
1212 ]
1213 )
1214 }
1215
1216 #[test]
1217 fn test_error_on_missing_schema() {
1218 let builder = builder_without_changes();
1219 assert!(
1221 builder
1222 .clone()
1223 .add_version(new_view_version(0, 10, "SELECT * FROM foo"))
1224 .unwrap_err()
1225 .to_string()
1226 .contains("Cannot add version with unknown schema: 10")
1227 );
1228
1229 assert!(
1231 builder
1232 .clone()
1233 .add_version(new_view_version(0, -1, "SELECT * FROM foo"))
1234 .unwrap_err()
1235 .to_string()
1236 .contains("Cannot set last added schema: no schema has been added")
1237 );
1238 }
1239
1240 #[test]
1241 fn test_error_on_missing_current_version() {
1242 let builder = builder_without_changes();
1243 assert!(builder
1244 .clone()
1245 .set_current_version_id(-1)
1246 .unwrap_err()
1247 .to_string()
1248 .contains(
1249 "Cannot set current version id to last added version: no version has been added."
1250 ));
1251 assert!(
1252 builder
1253 .clone()
1254 .set_current_version_id(10)
1255 .unwrap_err()
1256 .to_string()
1257 .contains("Cannot set current version to unknown version with id: 10")
1258 );
1259 }
1260
1261 #[test]
1262 fn test_set_current_version_to_last_added() {
1263 let builder = builder_without_changes();
1264 let v1 = new_view_version(2, 1, "select * from ns.tbl");
1265 let v2 = new_view_version(3, 1, "select a,b from ns.tbl");
1266 let meta = builder
1267 .clone()
1268 .add_version(v1)
1269 .unwrap()
1270 .add_version(v2)
1271 .unwrap()
1272 .set_current_version_id(-1)
1273 .unwrap()
1274 .build()
1275 .unwrap();
1276 assert_eq!(meta.metadata.current_version_id, 3);
1277 }
1278
1279 #[test]
1280 fn test_error_when_setting_negative_version_history_size() {
1281 let builder = builder_without_changes();
1282 assert!(
1283 builder
1284 .clone()
1285 .set_properties(HashMap::from_iter(vec![(
1286 VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
1287 "-1".to_string(),
1288 )]))
1289 .unwrap_err()
1290 .to_string()
1291 .contains("version.history.num-entries must be positive but was -1")
1292 );
1293 }
1294
1295 #[test]
1296 fn test_view_version_changes() {
1297 let builder = builder_without_changes();
1298
1299 let v1 = new_view_version(2, 1, "select 1 as count");
1300 let v2 = new_view_version(3, 1, "select count(1) as count from t2");
1301
1302 let changes = builder
1303 .clone()
1304 .add_version(v1.clone())
1305 .unwrap()
1306 .add_version(v2.clone())
1307 .unwrap()
1308 .build()
1309 .unwrap()
1310 .changes;
1311
1312 assert_eq!(changes.len(), 2);
1313 assert_eq!(changes, vec![
1314 ViewUpdate::AddViewVersion {
1315 view_version: v1.clone()
1316 },
1317 ViewUpdate::AddViewVersion {
1318 view_version: v2.clone()
1319 }
1320 ]);
1321 }
1322
1323 #[test]
1324 fn test_dropping_dialect_fails_by_default() {
1325 let builder = builder_without_changes();
1326
1327 let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1328 let spark_trino =
1329 new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1330 let schema = Schema::builder()
1331 .with_schema_id(0)
1332 .with_fields(vec![])
1333 .build()
1334 .unwrap();
1335
1336 let err = builder
1337 .set_current_version(spark_trino, schema.clone())
1338 .unwrap()
1339 .build()
1340 .unwrap()
1341 .metadata
1342 .into_builder()
1343 .set_current_version(spark, schema)
1344 .unwrap()
1345 .build()
1346 .unwrap_err();
1347
1348 assert!(
1349 err.to_string()
1350 .contains("Cannot replace view due to loss of view dialects")
1351 );
1352 }
1353
1354 #[test]
1355 fn test_dropping_dialects_does_not_fail_when_allowed() {
1356 let builder = builder_without_changes();
1357
1358 let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1359 let spark_trino =
1360 new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1361 let schema = Schema::builder()
1362 .with_schema_id(0)
1363 .with_fields(vec![])
1364 .build()
1365 .unwrap();
1366
1367 let build_result = builder
1368 .set_properties(HashMap::from_iter(vec![(
1369 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1370 "true".to_string(),
1371 )]))
1372 .unwrap()
1373 .set_current_version(spark_trino, schema.clone())
1374 .unwrap()
1375 .build()
1376 .unwrap()
1377 .metadata
1378 .into_builder()
1379 .set_current_version(spark.clone(), schema)
1380 .unwrap()
1381 .build()
1382 .unwrap();
1383
1384 assert_eq!(
1385 Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1386 spark.with_version_id(3).with_schema_id(2)
1387 );
1388 }
1389
1390 #[test]
1391 fn test_can_add_dialects_by_default() {
1392 let builder = builder_without_changes();
1393
1394 let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1395 let spark_trino =
1396 new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1397
1398 let schema = Schema::builder()
1399 .with_schema_id(0)
1400 .with_fields(vec![])
1401 .build()
1402 .unwrap();
1403
1404 let build_result = builder
1405 .set_current_version(spark.clone(), schema.clone())
1406 .unwrap()
1407 .build()
1408 .unwrap()
1409 .metadata
1410 .into_builder()
1411 .set_current_version(spark_trino.clone(), schema.clone())
1412 .unwrap()
1413 .build()
1414 .unwrap();
1415
1416 assert_eq!(
1417 Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1418 spark_trino.with_version_id(3).with_schema_id(2)
1419 );
1420 }
1421
1422 #[test]
1423 fn test_can_update_dialect_by_default() {
1424 let builder = builder_without_changes();
1425
1426 let spark_v1 = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1427 let spark_v2 = new_view_version_with_dialect(0, 0, "SELECT * FROM bar", vec!["spark"]);
1428
1429 let schema = Schema::builder()
1430 .with_schema_id(0)
1431 .with_fields(vec![])
1432 .build()
1433 .unwrap();
1434
1435 let build_result = builder
1436 .set_current_version(spark_v1.clone(), schema.clone())
1437 .unwrap()
1438 .build()
1439 .unwrap()
1440 .metadata
1441 .into_builder()
1442 .set_current_version(spark_v2.clone(), schema.clone())
1443 .unwrap()
1444 .build()
1445 .unwrap();
1446
1447 assert_eq!(
1448 Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1449 spark_v2.with_version_id(3).with_schema_id(2)
1450 );
1451 }
1452
1453 #[test]
1454 fn test_dropping_dialects_allowed_and_then_disallowed() {
1455 let builder = builder_without_changes();
1456
1457 let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1458 let trino = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["trino"]);
1459
1460 let schema = Schema::builder()
1461 .with_schema_id(0)
1462 .with_fields(vec![])
1463 .build()
1464 .unwrap();
1465
1466 let updated = builder
1467 .set_current_version(spark.clone(), schema.clone())
1468 .unwrap()
1469 .build()
1470 .unwrap()
1471 .metadata
1472 .into_builder()
1473 .set_current_version(trino.clone(), schema.clone())
1474 .unwrap()
1475 .set_properties(HashMap::from_iter(vec![(
1476 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1477 "true".to_string(),
1478 )]))
1479 .unwrap()
1480 .build()
1481 .unwrap();
1482
1483 assert_eq!(
1484 Arc::unwrap_or_clone(updated.metadata.current_version().clone()),
1485 trino.with_version_id(3).with_schema_id(2)
1486 );
1487
1488 let err = updated
1489 .metadata
1490 .into_builder()
1491 .set_current_version(spark.clone(), schema.clone())
1492 .unwrap()
1493 .set_properties(HashMap::from_iter(vec![(
1494 VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1495 "false".to_string(),
1496 )]))
1497 .unwrap()
1498 .build()
1499 .unwrap_err();
1500
1501 assert!(
1502 err.to_string()
1503 .contains("Cannot replace view due to loss of view dialects")
1504 );
1505 }
1506
1507 #[test]
1508 fn test_require_no_dialect_dropped() {
1509 let previous = ViewVersion::builder()
1510 .with_version_id(0)
1511 .with_schema_id(0)
1512 .with_timestamp_ms(0)
1513 .with_representations(ViewRepresentations(vec![
1514 ViewRepresentation::Sql(SqlViewRepresentation {
1515 dialect: "trino".to_string(),
1516 sql: "SELECT * FROM foo".to_string(),
1517 }),
1518 ViewRepresentation::Sql(SqlViewRepresentation {
1519 dialect: "spark".to_string(),
1520 sql: "SELECT * FROM bar".to_string(),
1521 }),
1522 ]))
1523 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1524 .build();
1525
1526 let current = ViewVersion::builder()
1527 .with_version_id(0)
1528 .with_schema_id(0)
1529 .with_timestamp_ms(0)
1530 .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(
1531 SqlViewRepresentation {
1532 dialect: "trino".to_string(),
1533 sql: "SELECT * FROM foo".to_string(),
1534 },
1535 )]))
1536 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1537 .build();
1538
1539 assert!(require_no_dialect_dropped(&previous, ¤t).is_err());
1540
1541 let current = ViewVersion::builder()
1542 .with_version_id(0)
1543 .with_schema_id(0)
1544 .with_timestamp_ms(0)
1545 .with_representations(ViewRepresentations(vec![
1546 ViewRepresentation::Sql(SqlViewRepresentation {
1547 dialect: "spark".to_string(),
1548 sql: "SELECT * FROM bar".to_string(),
1549 }),
1550 ViewRepresentation::Sql(SqlViewRepresentation {
1551 dialect: "trino".to_string(),
1552 sql: "SELECT * FROM foo".to_string(),
1553 }),
1554 ]))
1555 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1556 .build();
1557
1558 assert!(require_no_dialect_dropped(&previous, ¤t).is_ok());
1559 }
1560
1561 #[test]
1562 fn test_allow_replace_drop_dialects() {
1563 use std::collections::HashMap;
1564
1565 use super::allow_replace_drop_dialects;
1566
1567 let mut properties = HashMap::new();
1568 assert!(!allow_replace_drop_dialects(&properties));
1569
1570 properties.insert(
1571 "replace.drop-dialect.allowed".to_string(),
1572 "true".to_string(),
1573 );
1574 assert!(allow_replace_drop_dialects(&properties));
1575
1576 properties.insert(
1577 "replace.drop-dialect.allowed".to_string(),
1578 "false".to_string(),
1579 );
1580 assert!(!allow_replace_drop_dialects(&properties));
1581
1582 properties.insert(
1583 "replace.drop-dialect.allowed".to_string(),
1584 "TRUE".to_string(),
1585 );
1586 assert!(allow_replace_drop_dialects(&properties));
1587
1588 properties.insert(
1589 "replace.drop-dialect.allowed".to_string(),
1590 "FALSE".to_string(),
1591 );
1592 assert!(!allow_replace_drop_dialects(&properties));
1593 }
1594
1595 #[test]
1596 fn test_lowercase_sql_dialects_for() {
1597 let view_version = ViewVersion::builder()
1598 .with_version_id(0)
1599 .with_schema_id(0)
1600 .with_timestamp_ms(0)
1601 .with_representations(ViewRepresentations(vec![
1602 ViewRepresentation::Sql(SqlViewRepresentation {
1603 dialect: "STARROCKS".to_string(),
1604 sql: "SELECT * FROM foo".to_string(),
1605 }),
1606 ViewRepresentation::Sql(SqlViewRepresentation {
1607 dialect: "trino".to_string(),
1608 sql: "SELECT * FROM bar".to_string(),
1609 }),
1610 ViewRepresentation::Sql(SqlViewRepresentation {
1611 dialect: "Spark".to_string(),
1612 sql: "SELECT * FROM bar".to_string(),
1613 }),
1614 ]))
1615 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1616 .build();
1617
1618 let dialects = lowercase_sql_dialects_for(&view_version);
1619 assert_eq!(dialects.len(), 3);
1620 assert!(dialects.contains("trino"));
1621 assert!(dialects.contains("spark"));
1622 assert!(dialects.contains("starrocks"));
1623 }
1624
1625 #[test]
1626 fn test_require_unique_dialects() {
1627 let view_version = ViewVersion::builder()
1628 .with_version_id(0)
1629 .with_schema_id(0)
1630 .with_timestamp_ms(0)
1631 .with_representations(ViewRepresentations(vec![
1632 ViewRepresentation::Sql(SqlViewRepresentation {
1633 dialect: "trino".to_string(),
1634 sql: "SELECT * FROM foo".to_string(),
1635 }),
1636 ViewRepresentation::Sql(SqlViewRepresentation {
1637 dialect: "trino".to_string(),
1638 sql: "SELECT * FROM bar".to_string(),
1639 }),
1640 ]))
1641 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1642 .build();
1643
1644 assert!(require_unique_dialects(&view_version).is_err());
1645
1646 let view_version = ViewVersion::builder()
1647 .with_version_id(0)
1648 .with_schema_id(0)
1649 .with_timestamp_ms(0)
1650 .with_representations(ViewRepresentations(vec![
1651 ViewRepresentation::Sql(SqlViewRepresentation {
1652 dialect: "trino".to_string(),
1653 sql: "SELECT * FROM foo".to_string(),
1654 }),
1655 ViewRepresentation::Sql(SqlViewRepresentation {
1656 dialect: "spark".to_string(),
1657 sql: "SELECT * FROM bar".to_string(),
1658 }),
1659 ]))
1660 .with_default_namespace(NamespaceIdent::new("default".to_string()))
1661 .build();
1662
1663 assert!(require_unique_dialects(&view_version).is_ok());
1664 }
1665}