iceberg/spec/
view_metadata_builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use chrono::Utc;
22use itertools::Itertools;
23use uuid::Uuid;
24
25use super::{
26    DEFAULT_SCHEMA_ID, INITIAL_VIEW_VERSION_ID, ONE_MINUTE_MS, Schema, SchemaId,
27    TableMetadataBuilder, VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED,
28    VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT, VIEW_PROPERTY_VERSION_HISTORY_SIZE,
29    VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT, ViewFormatVersion, ViewMetadata,
30    ViewRepresentation, ViewVersion, ViewVersionLog, ViewVersionRef,
31};
32use crate::ViewCreation;
33use crate::catalog::ViewUpdate;
34use crate::error::{Error, ErrorKind, Result};
35use crate::io::is_truthy;
36
37/// Manipulating view metadata.
38///
39/// For this builder the order of called functions matters.
40/// All operations applied to the `ViewMetadata` are tracked in `changes` as  a chronologically
41/// ordered vec of `ViewUpdate`.
42/// If an operation does not lead to a change of the `ViewMetadata`, the corresponding update
43/// is omitted from `changes`.
44#[derive(Debug, Clone)]
45pub struct ViewMetadataBuilder {
46    metadata: ViewMetadata,
47    changes: Vec<ViewUpdate>,
48    last_added_schema_id: Option<SchemaId>,
49    last_added_version_id: Option<SchemaId>,
50    history_entry: Option<ViewVersionLog>,
51    // Previous view version is only used during build to check
52    // weather dialects are dropped or not.
53    previous_view_version: Option<ViewVersionRef>,
54}
55
56#[derive(Debug, Clone, PartialEq)]
57/// Result of modifying or creating a `ViewMetadata`.
58pub struct ViewMetadataBuildResult {
59    /// The new `ViewMetadata`.
60    pub metadata: ViewMetadata,
61    /// The changes that were applied to the metadata.
62    pub changes: Vec<ViewUpdate>,
63}
64
65impl ViewMetadataBuilder {
66    const LAST_ADDED: i32 = TableMetadataBuilder::LAST_ADDED;
67
68    /// Creates a new view metadata builder.
69    pub fn new(
70        location: String,
71        schema: Schema,
72        view_version: ViewVersion,
73        format_version: ViewFormatVersion,
74        properties: HashMap<String, String>,
75    ) -> Result<Self> {
76        let builder = Self {
77            metadata: ViewMetadata {
78                format_version,
79                view_uuid: Uuid::now_v7(),
80                location: "".to_string(), // Overwritten immediately by set_location
81                current_version_id: -1,   // Overwritten immediately by set_current_version,
82                versions: HashMap::new(), // Overwritten immediately by set_current_version
83                version_log: Vec::new(),
84                schemas: HashMap::new(), // Overwritten immediately by set_current_version
85                properties: HashMap::new(), // Overwritten immediately by set_properties
86            },
87            changes: vec![],
88            last_added_schema_id: None, // Overwritten immediately by set_current_version
89            last_added_version_id: None, // Overwritten immediately by set_current_version
90            history_entry: None,
91            previous_view_version: None, // This is a new view
92        };
93
94        builder
95            .set_location(location)
96            .set_current_version(view_version, schema)?
97            .set_properties(properties)
98    }
99
100    /// Creates a new view metadata builder from the given metadata to modify it.
101    #[must_use]
102    pub fn new_from_metadata(previous: ViewMetadata) -> Self {
103        let previous_view_version = previous.current_version().clone();
104        Self {
105            metadata: previous,
106            changes: Vec::default(),
107            last_added_schema_id: None,
108            last_added_version_id: None,
109            history_entry: None,
110            previous_view_version: Some(previous_view_version),
111        }
112    }
113
114    /// Creates a new view metadata builder from the given view creation.
115    pub fn from_view_creation(view_creation: ViewCreation) -> Result<Self> {
116        let ViewCreation {
117            location,
118            schema,
119            properties,
120            name: _,
121            representations,
122            default_catalog,
123            default_namespace,
124            summary,
125        } = view_creation;
126        let version = ViewVersion::builder()
127            .with_default_catalog(default_catalog)
128            .with_default_namespace(default_namespace)
129            .with_representations(representations)
130            .with_schema_id(schema.schema_id())
131            .with_summary(summary)
132            .with_timestamp_ms(Utc::now().timestamp_millis())
133            .with_version_id(INITIAL_VIEW_VERSION_ID)
134            .build();
135
136        Self::new(location, schema, version, ViewFormatVersion::V1, properties)
137    }
138
139    /// Upgrade `FormatVersion`. Downgrades are not allowed.
140    ///
141    /// # Errors
142    /// - Cannot downgrade to older format versions.
143    pub fn upgrade_format_version(self, format_version: ViewFormatVersion) -> Result<Self> {
144        if format_version < self.metadata.format_version {
145            return Err(Error::new(
146                ErrorKind::DataInvalid,
147                format!(
148                    "Cannot downgrade ViewFormatVersion from {} to {}",
149                    self.metadata.format_version, format_version
150                ),
151            ));
152        }
153
154        if format_version != self.metadata.format_version {
155            match format_version {
156                ViewFormatVersion::V1 => {
157                    // No changes needed for V1
158                }
159            }
160        }
161
162        Ok(self)
163    }
164
165    /// Set the location of the view, stripping any trailing slashes.
166    pub fn set_location(mut self, location: String) -> Self {
167        let location = location.trim_end_matches('/').to_string();
168        if self.metadata.location != location {
169            self.changes.push(ViewUpdate::SetLocation {
170                location: location.clone(),
171            });
172            self.metadata.location = location;
173        }
174
175        self
176    }
177
178    /// Set an existing view version as the current version.
179    ///
180    /// # Errors
181    /// - The specified `version_id` does not exist.
182    /// - The specified `version_id` is `-1` but no version has been added.
183    pub fn set_current_version_id(mut self, mut version_id: i32) -> Result<Self> {
184        if version_id == Self::LAST_ADDED {
185            let Some(last_added_id) = self.last_added_version_id else {
186                return Err(Error::new(
187                    ErrorKind::DataInvalid,
188                    "Cannot set current version id to last added version: no version has been added.",
189                ));
190            };
191            version_id = last_added_id;
192        }
193
194        let version_id = version_id; // make immutable
195
196        if version_id == self.metadata.current_version_id {
197            return Ok(self);
198        }
199
200        let version = self.metadata.versions.get(&version_id).ok_or_else(|| {
201            Error::new(
202                ErrorKind::DataInvalid,
203                format!(
204                    "Cannot set current version to unknown version with id: {}",
205                    version_id
206                ),
207            )
208        })?;
209
210        self.metadata.current_version_id = version_id;
211
212        if self.last_added_version_id == Some(version_id) {
213            self.changes.push(ViewUpdate::SetCurrentViewVersion {
214                view_version_id: Self::LAST_ADDED,
215            });
216        } else {
217            self.changes.push(ViewUpdate::SetCurrentViewVersion {
218                view_version_id: version_id,
219            });
220        }
221
222        // Use the timestamp of the snapshot if it was added in this set of changes,
223        // otherwise use a current timestamp for the log. The view version was added
224        // by a past transaction.
225        let version_added_in_this_changes = self
226            .changes
227            .iter()
228            .any(|update| matches!(update, ViewUpdate::AddViewVersion { view_version } if view_version.version_id() == version_id));
229
230        let mut log = version.log();
231        if !version_added_in_this_changes {
232            log.set_timestamp_ms(Utc::now().timestamp_millis());
233        }
234
235        self.history_entry = Some(log);
236
237        Ok(self)
238    }
239
240    /// Add a new view version and set it as current.
241    pub fn set_current_version(
242        mut self,
243        view_version: ViewVersion,
244        schema: Schema,
245    ) -> Result<Self> {
246        let schema_id = self.add_schema_internal(schema);
247        let view_version = view_version.with_schema_id(schema_id);
248        let view_version_id = self.add_version_internal(view_version)?;
249        self.set_current_version_id(view_version_id)
250    }
251
252    /// Add a new version to the view.
253    ///
254    /// # Errors
255    /// - The schema ID of the version is set to `-1`, but no schema has been added.
256    /// - The schema ID of the specified version is unknown.
257    /// - Multiple queries for the same dialect are added.
258    pub fn add_version(mut self, view_version: ViewVersion) -> Result<Self> {
259        self.add_version_internal(view_version)?;
260
261        Ok(self)
262    }
263
264    fn add_version_internal(&mut self, view_version: ViewVersion) -> Result<i32> {
265        let version_id = self.reuse_or_create_new_view_version_id(&view_version);
266        let view_version = view_version.with_version_id(version_id);
267
268        if self.metadata.versions.contains_key(&version_id) {
269            // ToDo Discuss: Similar to TableMetadata sort-order, Java does not add changes
270            // in this case. I prefer to add changes as the state of the builder is
271            // potentially mutated (`last_added_version_id`), thus we should record the change.
272            if self.last_added_version_id != Some(version_id) {
273                self.changes
274                    .push(ViewUpdate::AddViewVersion { view_version });
275                self.last_added_version_id = Some(version_id);
276            }
277            return Ok(version_id);
278        }
279
280        let view_version = if view_version.schema_id() == Self::LAST_ADDED {
281            let last_added_schema_id = self.last_added_schema_id.ok_or_else(|| {
282                Error::new(
283                    ErrorKind::DataInvalid,
284                    "Cannot set last added schema: no schema has been added",
285                )
286            })?;
287            view_version.with_schema_id(last_added_schema_id)
288        } else {
289            view_version
290        };
291
292        if !self
293            .metadata
294            .schemas
295            .contains_key(&view_version.schema_id())
296        {
297            return Err(Error::new(
298                ErrorKind::DataInvalid,
299                format!(
300                    "Cannot add version with unknown schema: {}",
301                    view_version.schema_id()
302                ),
303            ));
304        }
305
306        require_unique_dialects(&view_version)?;
307
308        // The `TableMetadataBuilder` uses these checks in multiple places - also in Java.
309        // If we think delayed requests are a problem, I think we should also add it here.
310        if let Some(last) = self.metadata.version_log.last() {
311            // commits can happen concurrently from different machines.
312            // A tolerance helps us avoid failure for small clock skew
313            if view_version.timestamp_ms() - last.timestamp_ms() < -ONE_MINUTE_MS {
314                return Err(Error::new(
315                    ErrorKind::DataInvalid,
316                    format!(
317                        "Invalid snapshot timestamp {}: before last snapshot timestamp {}",
318                        view_version.timestamp_ms(),
319                        last.timestamp_ms()
320                    ),
321                ));
322            }
323        }
324
325        self.metadata
326            .versions
327            .insert(version_id, Arc::new(view_version.clone()));
328
329        let view_version = if let Some(last_added_schema_id) = self.last_added_schema_id {
330            if view_version.schema_id() == last_added_schema_id {
331                view_version.with_schema_id(Self::LAST_ADDED)
332            } else {
333                view_version
334            }
335        } else {
336            view_version
337        };
338        self.changes
339            .push(ViewUpdate::AddViewVersion { view_version });
340
341        self.last_added_version_id = Some(version_id);
342
343        Ok(version_id)
344    }
345
346    fn reuse_or_create_new_view_version_id(&self, new_view_version: &ViewVersion) -> i32 {
347        self.metadata
348            .versions
349            .iter()
350            .find_map(|(id, other_version)| {
351                new_view_version
352                    .behaves_identical_to(other_version)
353                    .then_some(*id)
354            })
355            .unwrap_or_else(|| {
356                self.get_highest_view_version_id()
357                    .map(|id| id + 1)
358                    .unwrap_or(INITIAL_VIEW_VERSION_ID)
359            })
360    }
361
362    fn get_highest_view_version_id(&self) -> Option<i32> {
363        self.metadata.versions.keys().max().copied()
364    }
365
366    /// Add a new schema to the view.
367    pub fn add_schema(mut self, schema: Schema) -> Self {
368        self.add_schema_internal(schema);
369
370        self
371    }
372
373    fn add_schema_internal(&mut self, schema: Schema) -> SchemaId {
374        let schema_id = self.reuse_or_create_new_schema_id(&schema);
375
376        if self.metadata.schemas.contains_key(&schema_id) {
377            // ToDo Discuss: Java does not add changes in this case. I prefer to add changes
378            // as the state of the builder is potentially mutated (`last_added_schema_id`),
379            // thus we should record the change.
380            if self.last_added_schema_id != Some(schema_id) {
381                self.changes.push(ViewUpdate::AddSchema {
382                    schema: schema.clone().with_schema_id(schema_id),
383                    last_column_id: None,
384                });
385                self.last_added_schema_id = Some(schema_id);
386            }
387            return schema_id;
388        }
389
390        let schema = schema.with_schema_id(schema_id);
391
392        self.metadata
393            .schemas
394            .insert(schema_id, Arc::new(schema.clone()));
395        let last_column_id = schema.highest_field_id();
396        self.changes.push(ViewUpdate::AddSchema {
397            schema,
398            last_column_id: Some(last_column_id),
399        });
400
401        self.last_added_schema_id = Some(schema_id);
402
403        schema_id
404    }
405
406    fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> SchemaId {
407        self.metadata
408            .schemas
409            .iter()
410            .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id))
411            .unwrap_or_else(|| {
412                self.get_highest_schema_id()
413                    .map(|id| id + 1)
414                    .unwrap_or(DEFAULT_SCHEMA_ID)
415            })
416    }
417
418    fn get_highest_schema_id(&self) -> Option<SchemaId> {
419        self.metadata.schemas.keys().max().copied()
420    }
421
422    /// Update properties of the view.
423    pub fn set_properties(mut self, updates: HashMap<String, String>) -> Result<Self> {
424        if updates.is_empty() {
425            return Ok(self);
426        }
427
428        let num_versions_to_keep = updates
429            .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
430            .and_then(|v| v.parse::<i64>().ok())
431            .unwrap_or(1);
432        if num_versions_to_keep < 0 {
433            return Err(Error::new(
434                ErrorKind::DataInvalid,
435                format!(
436                    "{} must be positive but was {}",
437                    VIEW_PROPERTY_VERSION_HISTORY_SIZE, num_versions_to_keep
438                ),
439            ));
440        }
441
442        self.metadata.properties.extend(updates.clone());
443        self.changes.push(ViewUpdate::SetProperties { updates });
444
445        Ok(self)
446    }
447
448    /// Remove properties from the view
449    pub fn remove_properties(mut self, removals: &[String]) -> Self {
450        if removals.is_empty() {
451            return self;
452        }
453
454        for property in removals {
455            self.metadata.properties.remove(property);
456        }
457
458        self.changes.push(ViewUpdate::RemoveProperties {
459            removals: removals.to_vec(),
460        });
461
462        self
463    }
464
465    /// Assign a new UUID to the view.
466    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
467        if self.metadata.view_uuid != uuid {
468            self.metadata.view_uuid = uuid;
469            self.changes.push(ViewUpdate::AssignUuid { uuid });
470        }
471
472        self
473    }
474
475    /// Build the `ViewMetadata` from the changes.
476    pub fn build(mut self) -> Result<ViewMetadataBuildResult> {
477        if let Some(history_entry) = self.history_entry.take() {
478            self.metadata.version_log.push(history_entry);
479        }
480
481        // We should run validate before `self.metadata.current_version()` below,
482        // as it might panic if the metadata is invalid.
483        self.metadata.validate()?;
484
485        if let Some(previous) = self.previous_view_version.take() {
486            if !allow_replace_drop_dialects(&self.metadata.properties) {
487                require_no_dialect_dropped(&previous, self.metadata.current_version())?;
488            }
489        }
490
491        let _expired_versions = self.expire_versions();
492        self.metadata.version_log = update_version_log(
493            self.metadata.version_log,
494            self.metadata.versions.keys().copied().collect(),
495        );
496
497        Ok(ViewMetadataBuildResult {
498            metadata: self.metadata,
499            changes: self.changes,
500        })
501    }
502
503    /// Removes expired versions from the view and returns them.
504    fn expire_versions(&mut self) -> Vec<ViewVersionRef> {
505        let num_versions_to_keep = self
506            .metadata
507            .properties
508            .get(VIEW_PROPERTY_VERSION_HISTORY_SIZE)
509            .and_then(|v| v.parse::<usize>().ok())
510            .unwrap_or(VIEW_PROPERTY_VERSION_HISTORY_SIZE_DEFAULT)
511            .max(1);
512
513        // expire old versions, but keep at least the versions added in this builder
514        let num_added_versions = self
515            .changes
516            .iter()
517            .filter(|update| matches!(update, ViewUpdate::AddViewVersion { .. }))
518            .count();
519        let num_versions_to_keep = num_added_versions.max(num_versions_to_keep);
520
521        if self.metadata.versions.len() > num_versions_to_keep {
522            // version ids are assigned sequentially. keep the latest versions by ID.
523            let mut versions_to_keep = self
524                .metadata
525                .versions
526                .keys()
527                .copied()
528                .sorted()
529                .rev()
530                .take(num_versions_to_keep)
531                .collect::<HashSet<_>>();
532
533            // always retain current version
534            if !versions_to_keep.contains(&self.metadata.current_version_id) {
535                // Remove the lowest ID
536                if num_versions_to_keep > num_added_versions {
537                    let lowest_id = versions_to_keep.iter().min().copied();
538                    lowest_id.map(|id| versions_to_keep.remove(&id));
539                }
540                // Add the current version ID
541                versions_to_keep.insert(self.metadata.current_version_id);
542            }
543
544            let mut expired_versions = Vec::new();
545            // remove all versions which are not in versions_to_keep from the metadata
546            // and add them to the expired_versions list
547            self.metadata.versions.retain(|id, version| {
548                if versions_to_keep.contains(id) {
549                    true
550                } else {
551                    expired_versions.push(version.clone());
552                    false
553                }
554            });
555
556            expired_versions
557        } else {
558            Vec::new()
559        }
560    }
561}
562
563/// Expire version log entries that are no longer relevant.
564/// Returns the history entries to retain.
565fn update_version_log(
566    version_log: Vec<ViewVersionLog>,
567    ids_to_keep: HashSet<i32>,
568) -> Vec<ViewVersionLog> {
569    let mut retained_history = Vec::new();
570    for log_entry in version_log {
571        if ids_to_keep.contains(&log_entry.version_id()) {
572            retained_history.push(log_entry);
573        } else {
574            retained_history.clear();
575        }
576    }
577    retained_history
578}
579
580fn allow_replace_drop_dialects(properties: &HashMap<String, String>) -> bool {
581    properties
582        .get(VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED)
583        .map_or(
584            VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED_DEFAULT,
585            |value| is_truthy(value),
586        )
587}
588
589fn require_no_dialect_dropped(previous: &ViewVersion, current: &ViewVersion) -> Result<()> {
590    let base_dialects = lowercase_sql_dialects_for(previous);
591    let updated_dialects = lowercase_sql_dialects_for(current);
592
593    if !updated_dialects.is_superset(&base_dialects) {
594        return Err(Error::new(
595            ErrorKind::DataInvalid,
596            format!(
597                "Cannot replace view due to loss of view dialects: \nPrevious dialects: {:?}\nNew dialects: {:?}\nSet {} to true to allow dropping dialects.",
598                Vec::from_iter(base_dialects),
599                Vec::from_iter(updated_dialects),
600                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED
601            ),
602        ));
603    }
604
605    Ok(())
606}
607
608fn lowercase_sql_dialects_for(view_version: &ViewVersion) -> HashSet<String> {
609    view_version
610        .representations()
611        .iter()
612        .map(|repr| match repr {
613            ViewRepresentation::Sql(sql_repr) => sql_repr.dialect.to_lowercase(),
614        })
615        .collect()
616}
617
618pub(super) fn require_unique_dialects(view_version: &ViewVersion) -> Result<()> {
619    let mut seen_dialects = HashSet::with_capacity(view_version.representations().len());
620    for repr in view_version.representations().iter() {
621        match repr {
622            ViewRepresentation::Sql(sql_repr) => {
623                if !seen_dialects.insert(sql_repr.dialect.to_lowercase()) {
624                    return Err(Error::new(
625                        ErrorKind::DataInvalid,
626                        format!(
627                            "Invalid view version: Cannot add multiple queries for dialect {}",
628                            sql_repr.dialect
629                        ),
630                    ));
631                }
632            }
633        }
634    }
635    Ok(())
636}
637
638#[cfg(test)]
639mod test {
640    use super::super::view_metadata::tests::get_test_view_metadata;
641    use super::*;
642    use crate::NamespaceIdent;
643    use crate::spec::{
644        NestedField, PrimitiveType, SqlViewRepresentation, Type, ViewRepresentations,
645    };
646
647    fn new_view_version(id: usize, schema_id: SchemaId, sql: &str) -> ViewVersion {
648        new_view_version_with_dialect(id, schema_id, sql, vec!["spark"])
649    }
650
651    fn new_view_version_with_dialect(
652        id: usize,
653        schema_id: SchemaId,
654        sql: &str,
655        dialects: Vec<&str>,
656    ) -> ViewVersion {
657        ViewVersion::builder()
658            .with_version_id(id as i32)
659            .with_schema_id(schema_id)
660            .with_timestamp_ms(1573518431300)
661            .with_default_catalog(Some("prod".to_string()))
662            .with_summary(HashMap::from_iter(vec![(
663                "user".to_string(),
664                "some-user".to_string(),
665            )]))
666            .with_representations(ViewRepresentations(
667                dialects
668                    .iter()
669                    .map(|dialect| {
670                        ViewRepresentation::Sql(SqlViewRepresentation {
671                            dialect: dialect.to_string(),
672                            sql: sql.to_string(),
673                        })
674                    })
675                    .collect(),
676            ))
677            .with_default_namespace(NamespaceIdent::new("default".to_string()))
678            .build()
679    }
680
681    fn builder_without_changes() -> ViewMetadataBuilder {
682        ViewMetadataBuilder::new_from_metadata(get_test_view_metadata("ViewMetadataV1Valid.json"))
683    }
684
685    #[test]
686    fn test_minimal_builder() {
687        let location = "s3://bucket/table".to_string();
688        let schema = Schema::builder()
689            .with_schema_id(1)
690            .with_fields(vec![])
691            .build()
692            .unwrap();
693        // Version ID and schema should be re-assigned
694        let version = new_view_version(20, 21, "select 1 as count");
695        let format_version = ViewFormatVersion::V1;
696        let properties = HashMap::from_iter(vec![("key".to_string(), "value".to_string())]);
697
698        let build_result = ViewMetadataBuilder::new(
699            location.clone(),
700            schema.clone(),
701            version.clone(),
702            format_version,
703            properties.clone(),
704        )
705        .unwrap()
706        .build()
707        .unwrap();
708
709        let metadata = build_result.metadata;
710        assert_eq!(metadata.location, location);
711        assert_eq!(metadata.current_version_id, INITIAL_VIEW_VERSION_ID);
712        assert_eq!(metadata.format_version, format_version);
713        assert_eq!(metadata.properties, properties);
714        assert_eq!(metadata.versions.len(), 1);
715        assert_eq!(metadata.schemas.len(), 1);
716        assert_eq!(metadata.version_log.len(), 1);
717        assert_eq!(
718            Arc::unwrap_or_clone(metadata.versions[&INITIAL_VIEW_VERSION_ID].clone()),
719            version
720                .clone()
721                .with_version_id(INITIAL_VIEW_VERSION_ID)
722                .with_schema_id(0)
723        );
724
725        let changes = build_result.changes;
726        assert_eq!(changes.len(), 5);
727        assert!(changes.contains(&ViewUpdate::SetLocation { location }));
728        assert!(
729            changes.contains(&ViewUpdate::AddViewVersion {
730                view_version: version
731                    .with_version_id(INITIAL_VIEW_VERSION_ID)
732                    .with_schema_id(-1)
733            })
734        );
735        assert!(changes.contains(&ViewUpdate::SetCurrentViewVersion {
736            view_version_id: -1
737        }));
738        assert!(changes.contains(&ViewUpdate::AddSchema {
739            schema: schema.clone().with_schema_id(0),
740            last_column_id: Some(0)
741        }));
742        assert!(changes.contains(&ViewUpdate::SetProperties {
743            updates: properties
744        }));
745    }
746
747    #[test]
748    fn test_version_expiration() {
749        let v1 = new_view_version(0, 1, "select 1 as count");
750        let v2 = new_view_version(0, 1, "select count(1) as count from t2");
751        let v3 = new_view_version(0, 1, "select count from t1");
752
753        let builder = builder_without_changes()
754            .add_version(v1)
755            .unwrap()
756            .add_version(v2)
757            .unwrap()
758            .add_version(v3)
759            .unwrap();
760        let builder_without_changes = builder.clone().build().unwrap().metadata.into_builder();
761
762        // No limit on versions
763        let metadata = builder.clone().build().unwrap().metadata;
764        assert_eq!(
765            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
766            HashSet::from_iter(vec![1, 2, 3, 4])
767        );
768
769        // Limit to 2 versions, we still want to keep 3 versions as 3 where added during this build
770        // Plus the current version
771        let metadata = builder
772            .clone()
773            .set_properties(HashMap::from_iter(vec![(
774                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
775                "2".to_string(),
776            )]))
777            .unwrap()
778            .build()
779            .unwrap()
780            .metadata;
781        assert_eq!(
782            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
783            HashSet::from_iter(vec![1, 2, 3, 4])
784        );
785        assert_eq!(metadata.version_log.len(), 1);
786
787        // Limit to 2 versions in new build, only keep 2.
788        // One of them should be the current
789        let metadata = builder_without_changes
790            .clone()
791            .set_properties(HashMap::from_iter(vec![(
792                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
793                "2".to_string(),
794            )]))
795            .unwrap()
796            .build()
797            .unwrap()
798            .metadata;
799        assert_eq!(
800            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
801            HashSet::from_iter(vec![1, 4])
802        );
803
804        // Keep at least 1 version irrespective of the limit.
805        // This is the current version
806        let metadata = builder_without_changes
807            .set_properties(HashMap::from_iter(vec![(
808                VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
809                "0".to_string(),
810            )]))
811            .unwrap()
812            .build()
813            .unwrap()
814            .metadata;
815        assert_eq!(
816            metadata.versions.keys().cloned().collect::<HashSet<_>>(),
817            HashSet::from_iter(vec![1])
818        );
819    }
820
821    #[test]
822    fn test_update_version_log() {
823        let v1 = new_view_version(1, 1, "select 1 as count");
824        let v2 = new_view_version(2, 1, "select count(1) as count from t2");
825        let v3 = new_view_version(3, 1, "select count from t1");
826
827        let one = ViewVersionLog::new(1, v1.timestamp_ms());
828        let two = ViewVersionLog::new(2, v2.timestamp_ms());
829        let three = ViewVersionLog::new(3, v3.timestamp_ms());
830
831        assert_eq!(
832            update_version_log(
833                vec![one.clone(), two.clone(), three.clone()],
834                HashSet::from_iter(vec![1, 2, 3])
835            ),
836            vec![one.clone(), two.clone(), three.clone()]
837        );
838
839        // one was an invalid entry in the history, so all previous elements are removed
840        assert_eq!(
841            update_version_log(
842                vec![
843                    three.clone(),
844                    two.clone(),
845                    one.clone(),
846                    two.clone(),
847                    three.clone()
848                ],
849                HashSet::from_iter(vec![2, 3])
850            ),
851            vec![two.clone(), three.clone()]
852        );
853
854        // two was an invalid entry in the history, so all previous elements are removed
855        assert_eq!(
856            update_version_log(
857                vec![
858                    one.clone(),
859                    two.clone(),
860                    three.clone(),
861                    one.clone(),
862                    three.clone()
863                ],
864                HashSet::from_iter(vec![1, 3])
865            ),
866            vec![three.clone(), one.clone(), three.clone()]
867        );
868    }
869
870    #[test]
871    fn test_use_previously_added_version() {
872        let v2 = new_view_version(2, 1, "select 1 as count");
873        let v3 = new_view_version(3, 1, "select count(1) as count from t2");
874        let schema = Schema::builder().build().unwrap();
875
876        let log_v2 = ViewVersionLog::new(2, v2.timestamp_ms());
877        let log_v3 = ViewVersionLog::new(3, v3.timestamp_ms());
878
879        let metadata_v2 = builder_without_changes()
880            .set_current_version(v2.clone(), schema.clone())
881            .unwrap()
882            .build()
883            .unwrap()
884            .metadata;
885
886        // Log should use the exact timestamp of v1
887        assert_eq!(metadata_v2.version_log.last().unwrap(), &log_v2);
888
889        // Add second version, should use exact timestamp of v2
890        let metadata_v3 = metadata_v2
891            .into_builder()
892            .set_current_version(v3.clone(), schema)
893            .unwrap()
894            .build()
895            .unwrap()
896            .metadata;
897
898        assert_eq!(metadata_v3.version_log[1..], vec![
899            log_v2.clone(),
900            log_v3.clone()
901        ]);
902
903        // Re-use Version 1, add a new log entry with a new timestamp
904        let metadata_v4 = metadata_v3
905            .into_builder()
906            .set_current_version_id(2)
907            .unwrap()
908            .build()
909            .unwrap()
910            .metadata;
911
912        // Last entry should be equal to v2 but with an updated timestamp
913        let entry = metadata_v4.version_log.last().unwrap();
914        assert_eq!(entry.version_id(), 2);
915        assert!(entry.timestamp_ms() > v2.timestamp_ms());
916    }
917
918    #[test]
919    fn test_assign_uuid() {
920        let builder = builder_without_changes();
921        let uuid = Uuid::now_v7();
922        let build_result = builder.clone().assign_uuid(uuid).build().unwrap();
923        assert_eq!(build_result.metadata.view_uuid, uuid);
924        assert_eq!(build_result.changes, vec![ViewUpdate::AssignUuid { uuid }]);
925    }
926
927    #[test]
928    fn test_set_location() {
929        let builder = builder_without_changes();
930        let location = "s3://bucket/table".to_string();
931        let build_result = builder
932            .clone()
933            .set_location(location.clone())
934            .build()
935            .unwrap();
936        assert_eq!(build_result.metadata.location, location);
937        assert_eq!(build_result.changes, vec![ViewUpdate::SetLocation {
938            location
939        }]);
940    }
941
942    #[test]
943    fn test_set_and_remove_properties() {
944        let builder = builder_without_changes();
945        let properties = HashMap::from_iter(vec![
946            ("key1".to_string(), "value1".to_string()),
947            ("key2".to_string(), "value2".to_string()),
948        ]);
949        let build_result = builder
950            .clone()
951            .set_properties(properties.clone())
952            .unwrap()
953            .remove_properties(&["key2".to_string(), "key3".to_string()])
954            .build()
955            .unwrap();
956        assert_eq!(
957            build_result.metadata.properties.get("key1"),
958            Some(&"value1".to_string())
959        );
960        assert_eq!(build_result.metadata.properties.get("key2"), None);
961        assert_eq!(build_result.changes, vec![
962            ViewUpdate::SetProperties {
963                updates: properties
964            },
965            ViewUpdate::RemoveProperties {
966                removals: vec!["key2".to_string(), "key3".to_string()]
967            }
968        ]);
969    }
970
971    #[test]
972    fn test_add_schema() {
973        let builder = builder_without_changes();
974        let schema = Schema::builder()
975            .with_schema_id(1)
976            .with_fields(vec![])
977            .build()
978            .unwrap();
979        let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
980        assert_eq!(build_result.metadata.schemas.len(), 2);
981        assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
982            schema: schema.clone().with_schema_id(2),
983            last_column_id: Some(0)
984        }]);
985
986        // Add schema again - id is reused
987        let build_result = builder.clone().add_schema(schema.clone()).build().unwrap();
988        assert_eq!(build_result.metadata.schemas.len(), 2);
989        assert_eq!(build_result.changes, vec![ViewUpdate::AddSchema {
990            schema: schema.clone().with_schema_id(2),
991            last_column_id: Some(0)
992        }]);
993    }
994
995    #[test]
996    fn test_add_and_set_current_version() {
997        let builder = builder_without_changes();
998        let v1 = new_view_version(2, 1, "select 1 as count");
999        let v2 = new_view_version(3, 2, "select count(1) as count from t2");
1000        let v2_schema = Schema::builder()
1001            .with_schema_id(2)
1002            .with_fields(vec![])
1003            .build()
1004            .unwrap();
1005
1006        let build_result = builder
1007            .clone()
1008            .add_version(v1.clone())
1009            .unwrap()
1010            .add_schema(v2_schema.clone())
1011            .add_version(v2.clone())
1012            .unwrap()
1013            .set_current_version_id(3)
1014            .unwrap()
1015            .build()
1016            .unwrap();
1017
1018        assert_eq!(build_result.metadata.current_version_id, 3);
1019        assert_eq!(build_result.metadata.versions.len(), 3);
1020        assert_eq!(build_result.metadata.schemas.len(), 2);
1021        assert_eq!(build_result.metadata.version_log.len(), 2);
1022        assert_eq!(
1023            Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
1024            v1.clone().with_version_id(2).with_schema_id(1)
1025        );
1026        assert_eq!(
1027            Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
1028            v2.clone().with_version_id(3).with_schema_id(2)
1029        );
1030        assert_eq!(build_result.changes.len(), 4);
1031        assert_eq!(build_result.changes, vec![
1032            ViewUpdate::AddViewVersion {
1033                view_version: v1.clone().with_version_id(2).with_schema_id(1)
1034            },
1035            ViewUpdate::AddSchema {
1036                schema: v2_schema.clone().with_schema_id(2),
1037                last_column_id: Some(0)
1038            },
1039            ViewUpdate::AddViewVersion {
1040                view_version: v2.clone().with_version_id(3).with_schema_id(-1)
1041            },
1042            ViewUpdate::SetCurrentViewVersion {
1043                view_version_id: -1
1044            }
1045        ]);
1046        assert_eq!(
1047            build_result
1048                .metadata
1049                .version_log
1050                .iter()
1051                .map(|v| v.version_id())
1052                .collect::<Vec<_>>(),
1053            vec![1, 3]
1054        );
1055    }
1056
1057    #[test]
1058    fn test_schema_and_version_id_reassignment() {
1059        let builder = builder_without_changes();
1060        let v1 = new_view_version(0, 1, "select 1 as count");
1061        let v2 = new_view_version(0, 2, "select count(1) as count from t2");
1062        let v2_schema = Schema::builder()
1063            .with_schema_id(0)
1064            .with_fields(vec![])
1065            .build()
1066            .unwrap();
1067
1068        let build_result = builder
1069            .clone()
1070            .add_version(v1.clone())
1071            .unwrap()
1072            .set_current_version(v2.clone(), v2_schema.clone())
1073            .unwrap()
1074            .build()
1075            .unwrap();
1076
1077        assert_eq!(build_result.metadata.current_version_id, 3);
1078        assert_eq!(build_result.metadata.versions.len(), 3);
1079        assert_eq!(build_result.metadata.schemas.len(), 2);
1080        assert_eq!(build_result.metadata.version_log.len(), 2);
1081        assert_eq!(
1082            Arc::unwrap_or_clone(build_result.metadata.versions[&2].clone()),
1083            v1.clone().with_version_id(2).with_schema_id(1)
1084        );
1085        assert_eq!(
1086            Arc::unwrap_or_clone(build_result.metadata.versions[&3].clone()),
1087            v2.clone().with_version_id(3).with_schema_id(2)
1088        );
1089        assert_eq!(build_result.changes.len(), 4);
1090        assert_eq!(build_result.changes, vec![
1091            ViewUpdate::AddViewVersion {
1092                view_version: v1.clone().with_version_id(2).with_schema_id(1)
1093            },
1094            ViewUpdate::AddSchema {
1095                schema: v2_schema.clone().with_schema_id(2),
1096                last_column_id: Some(0)
1097            },
1098            ViewUpdate::AddViewVersion {
1099                view_version: v2.clone().with_version_id(3).with_schema_id(-1)
1100            },
1101            ViewUpdate::SetCurrentViewVersion {
1102                view_version_id: -1
1103            }
1104        ]);
1105        assert_eq!(
1106            build_result
1107                .metadata
1108                .version_log
1109                .iter()
1110                .map(|v| v.version_id())
1111                .collect::<Vec<_>>(),
1112            vec![1, 3]
1113        );
1114    }
1115
1116    #[test]
1117    fn test_view_version_deduplication() {
1118        let builder = builder_without_changes();
1119        let v1 = new_view_version(0, 1, "select * from ns.tbl");
1120
1121        assert_eq!(builder.metadata.versions.len(), 1);
1122        let build_result = builder
1123            .clone()
1124            .add_version(v1.clone())
1125            .unwrap()
1126            .add_version(v1)
1127            .unwrap()
1128            .build()
1129            .unwrap();
1130
1131        assert_eq!(build_result.metadata.versions.len(), 2);
1132        assert_eq!(build_result.metadata.schemas.len(), 1);
1133    }
1134
1135    #[test]
1136    fn test_view_version_and_schema_deduplication() {
1137        let schema_one = Schema::builder()
1138            .with_schema_id(5)
1139            .with_fields(vec![
1140                NestedField::required(1, "x", Type::Primitive(PrimitiveType::Long)).into(),
1141            ])
1142            .build()
1143            .unwrap();
1144        let schema_two = Schema::builder()
1145            .with_schema_id(7)
1146            .with_fields(vec![
1147                NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(),
1148            ])
1149            .build()
1150            .unwrap();
1151        let schema_three = Schema::builder()
1152            .with_schema_id(9)
1153            .with_fields(vec![
1154                NestedField::required(1, "z", Type::Primitive(PrimitiveType::Long)).into(),
1155            ])
1156            .build()
1157            .unwrap();
1158
1159        let v1 = new_view_version(1, 5, "select * from ns.tbl");
1160        let v2 = new_view_version(1, 7, "select count(*) from ns.tbl");
1161        let v3 = new_view_version(1, 9, "select count(*) as count from ns.tbl");
1162
1163        let build_result = builder_without_changes()
1164            .add_schema(schema_one.clone())
1165            .add_schema(schema_two.clone())
1166            .add_schema(schema_three.clone())
1167            .set_current_version(v1.clone(), schema_one.clone())
1168            .unwrap()
1169            .set_current_version(v2.clone(), schema_two.clone())
1170            .unwrap()
1171            .set_current_version(v3.clone(), schema_three.clone())
1172            .unwrap()
1173            .set_current_version(v3.clone(), schema_three.clone())
1174            .unwrap()
1175            .set_current_version(v2.clone(), schema_two.clone())
1176            .unwrap()
1177            .set_current_version(v1.clone(), schema_one.clone())
1178            .unwrap()
1179            .build()
1180            .unwrap();
1181
1182        assert_eq!(
1183            Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1184            v1.clone().with_version_id(2).with_schema_id(2)
1185        );
1186        assert_eq!(build_result.metadata.versions.len(), 4);
1187        assert_eq!(
1188            build_result.metadata.versions[&2],
1189            Arc::new(v1.clone().with_version_id(2).with_schema_id(2))
1190        );
1191        assert_eq!(
1192            build_result.metadata.versions[&3],
1193            Arc::new(v2.clone().with_version_id(3).with_schema_id(3))
1194        );
1195        assert_eq!(
1196            build_result.metadata.versions[&4],
1197            Arc::new(v3.clone().with_version_id(4).with_schema_id(4))
1198        );
1199        assert_eq!(
1200            // Remove schema_id 1 and get struct only
1201            build_result
1202                .metadata
1203                .schemas_iter()
1204                .filter(|s| s.schema_id() != 1)
1205                .sorted_by_key(|s| s.schema_id())
1206                .map(|s| s.as_struct())
1207                .collect::<Vec<_>>(),
1208            vec![
1209                schema_one.as_struct(),
1210                schema_two.as_struct(),
1211                schema_three.as_struct()
1212            ]
1213        )
1214    }
1215
1216    #[test]
1217    fn test_error_on_missing_schema() {
1218        let builder = builder_without_changes();
1219        // Missing schema
1220        assert!(
1221            builder
1222                .clone()
1223                .add_version(new_view_version(0, 10, "SELECT * FROM foo"))
1224                .unwrap_err()
1225                .to_string()
1226                .contains("Cannot add version with unknown schema: 10")
1227        );
1228
1229        // Missing last added schema
1230        assert!(
1231            builder
1232                .clone()
1233                .add_version(new_view_version(0, -1, "SELECT * FROM foo"))
1234                .unwrap_err()
1235                .to_string()
1236                .contains("Cannot set last added schema: no schema has been added")
1237        );
1238    }
1239
1240    #[test]
1241    fn test_error_on_missing_current_version() {
1242        let builder = builder_without_changes();
1243        assert!(builder
1244            .clone()
1245            .set_current_version_id(-1)
1246            .unwrap_err()
1247            .to_string()
1248            .contains(
1249                "Cannot set current version id to last added version: no version has been added."
1250            ));
1251        assert!(
1252            builder
1253                .clone()
1254                .set_current_version_id(10)
1255                .unwrap_err()
1256                .to_string()
1257                .contains("Cannot set current version to unknown version with id: 10")
1258        );
1259    }
1260
1261    #[test]
1262    fn test_set_current_version_to_last_added() {
1263        let builder = builder_without_changes();
1264        let v1 = new_view_version(2, 1, "select * from ns.tbl");
1265        let v2 = new_view_version(3, 1, "select a,b from ns.tbl");
1266        let meta = builder
1267            .clone()
1268            .add_version(v1)
1269            .unwrap()
1270            .add_version(v2)
1271            .unwrap()
1272            .set_current_version_id(-1)
1273            .unwrap()
1274            .build()
1275            .unwrap();
1276        assert_eq!(meta.metadata.current_version_id, 3);
1277    }
1278
1279    #[test]
1280    fn test_error_when_setting_negative_version_history_size() {
1281        let builder = builder_without_changes();
1282        assert!(
1283            builder
1284                .clone()
1285                .set_properties(HashMap::from_iter(vec![(
1286                    VIEW_PROPERTY_VERSION_HISTORY_SIZE.to_string(),
1287                    "-1".to_string(),
1288                )]))
1289                .unwrap_err()
1290                .to_string()
1291                .contains("version.history.num-entries must be positive but was -1")
1292        );
1293    }
1294
1295    #[test]
1296    fn test_view_version_changes() {
1297        let builder = builder_without_changes();
1298
1299        let v1 = new_view_version(2, 1, "select 1 as count");
1300        let v2 = new_view_version(3, 1, "select count(1) as count from t2");
1301
1302        let changes = builder
1303            .clone()
1304            .add_version(v1.clone())
1305            .unwrap()
1306            .add_version(v2.clone())
1307            .unwrap()
1308            .build()
1309            .unwrap()
1310            .changes;
1311
1312        assert_eq!(changes.len(), 2);
1313        assert_eq!(changes, vec![
1314            ViewUpdate::AddViewVersion {
1315                view_version: v1.clone()
1316            },
1317            ViewUpdate::AddViewVersion {
1318                view_version: v2.clone()
1319            }
1320        ]);
1321    }
1322
1323    #[test]
1324    fn test_dropping_dialect_fails_by_default() {
1325        let builder = builder_without_changes();
1326
1327        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1328        let spark_trino =
1329            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1330        let schema = Schema::builder()
1331            .with_schema_id(0)
1332            .with_fields(vec![])
1333            .build()
1334            .unwrap();
1335
1336        let err = builder
1337            .set_current_version(spark_trino, schema.clone())
1338            .unwrap()
1339            .build()
1340            .unwrap()
1341            .metadata
1342            .into_builder()
1343            .set_current_version(spark, schema)
1344            .unwrap()
1345            .build()
1346            .unwrap_err();
1347
1348        assert!(
1349            err.to_string()
1350                .contains("Cannot replace view due to loss of view dialects")
1351        );
1352    }
1353
1354    #[test]
1355    fn test_dropping_dialects_does_not_fail_when_allowed() {
1356        let builder = builder_without_changes();
1357
1358        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1359        let spark_trino =
1360            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1361        let schema = Schema::builder()
1362            .with_schema_id(0)
1363            .with_fields(vec![])
1364            .build()
1365            .unwrap();
1366
1367        let build_result = builder
1368            .set_properties(HashMap::from_iter(vec![(
1369                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1370                "true".to_string(),
1371            )]))
1372            .unwrap()
1373            .set_current_version(spark_trino, schema.clone())
1374            .unwrap()
1375            .build()
1376            .unwrap()
1377            .metadata
1378            .into_builder()
1379            .set_current_version(spark.clone(), schema)
1380            .unwrap()
1381            .build()
1382            .unwrap();
1383
1384        assert_eq!(
1385            Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1386            spark.with_version_id(3).with_schema_id(2)
1387        );
1388    }
1389
1390    #[test]
1391    fn test_can_add_dialects_by_default() {
1392        let builder = builder_without_changes();
1393
1394        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1395        let spark_trino =
1396            new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark", "trino"]);
1397
1398        let schema = Schema::builder()
1399            .with_schema_id(0)
1400            .with_fields(vec![])
1401            .build()
1402            .unwrap();
1403
1404        let build_result = builder
1405            .set_current_version(spark.clone(), schema.clone())
1406            .unwrap()
1407            .build()
1408            .unwrap()
1409            .metadata
1410            .into_builder()
1411            .set_current_version(spark_trino.clone(), schema.clone())
1412            .unwrap()
1413            .build()
1414            .unwrap();
1415
1416        assert_eq!(
1417            Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1418            spark_trino.with_version_id(3).with_schema_id(2)
1419        );
1420    }
1421
1422    #[test]
1423    fn test_can_update_dialect_by_default() {
1424        let builder = builder_without_changes();
1425
1426        let spark_v1 = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1427        let spark_v2 = new_view_version_with_dialect(0, 0, "SELECT * FROM bar", vec!["spark"]);
1428
1429        let schema = Schema::builder()
1430            .with_schema_id(0)
1431            .with_fields(vec![])
1432            .build()
1433            .unwrap();
1434
1435        let build_result = builder
1436            .set_current_version(spark_v1.clone(), schema.clone())
1437            .unwrap()
1438            .build()
1439            .unwrap()
1440            .metadata
1441            .into_builder()
1442            .set_current_version(spark_v2.clone(), schema.clone())
1443            .unwrap()
1444            .build()
1445            .unwrap();
1446
1447        assert_eq!(
1448            Arc::unwrap_or_clone(build_result.metadata.current_version().clone()),
1449            spark_v2.with_version_id(3).with_schema_id(2)
1450        );
1451    }
1452
1453    #[test]
1454    fn test_dropping_dialects_allowed_and_then_disallowed() {
1455        let builder = builder_without_changes();
1456
1457        let spark = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["spark"]);
1458        let trino = new_view_version_with_dialect(0, 0, "SELECT * FROM foo", vec!["trino"]);
1459
1460        let schema = Schema::builder()
1461            .with_schema_id(0)
1462            .with_fields(vec![])
1463            .build()
1464            .unwrap();
1465
1466        let updated = builder
1467            .set_current_version(spark.clone(), schema.clone())
1468            .unwrap()
1469            .build()
1470            .unwrap()
1471            .metadata
1472            .into_builder()
1473            .set_current_version(trino.clone(), schema.clone())
1474            .unwrap()
1475            .set_properties(HashMap::from_iter(vec![(
1476                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1477                "true".to_string(),
1478            )]))
1479            .unwrap()
1480            .build()
1481            .unwrap();
1482
1483        assert_eq!(
1484            Arc::unwrap_or_clone(updated.metadata.current_version().clone()),
1485            trino.with_version_id(3).with_schema_id(2)
1486        );
1487
1488        let err = updated
1489            .metadata
1490            .into_builder()
1491            .set_current_version(spark.clone(), schema.clone())
1492            .unwrap()
1493            .set_properties(HashMap::from_iter(vec![(
1494                VIEW_PROPERTY_REPLACE_DROP_DIALECT_ALLOWED.to_string(),
1495                "false".to_string(),
1496            )]))
1497            .unwrap()
1498            .build()
1499            .unwrap_err();
1500
1501        assert!(
1502            err.to_string()
1503                .contains("Cannot replace view due to loss of view dialects")
1504        );
1505    }
1506
1507    #[test]
1508    fn test_require_no_dialect_dropped() {
1509        let previous = ViewVersion::builder()
1510            .with_version_id(0)
1511            .with_schema_id(0)
1512            .with_timestamp_ms(0)
1513            .with_representations(ViewRepresentations(vec![
1514                ViewRepresentation::Sql(SqlViewRepresentation {
1515                    dialect: "trino".to_string(),
1516                    sql: "SELECT * FROM foo".to_string(),
1517                }),
1518                ViewRepresentation::Sql(SqlViewRepresentation {
1519                    dialect: "spark".to_string(),
1520                    sql: "SELECT * FROM bar".to_string(),
1521                }),
1522            ]))
1523            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1524            .build();
1525
1526        let current = ViewVersion::builder()
1527            .with_version_id(0)
1528            .with_schema_id(0)
1529            .with_timestamp_ms(0)
1530            .with_representations(ViewRepresentations(vec![ViewRepresentation::Sql(
1531                SqlViewRepresentation {
1532                    dialect: "trino".to_string(),
1533                    sql: "SELECT * FROM foo".to_string(),
1534                },
1535            )]))
1536            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1537            .build();
1538
1539        assert!(require_no_dialect_dropped(&previous, &current).is_err());
1540
1541        let current = ViewVersion::builder()
1542            .with_version_id(0)
1543            .with_schema_id(0)
1544            .with_timestamp_ms(0)
1545            .with_representations(ViewRepresentations(vec![
1546                ViewRepresentation::Sql(SqlViewRepresentation {
1547                    dialect: "spark".to_string(),
1548                    sql: "SELECT * FROM bar".to_string(),
1549                }),
1550                ViewRepresentation::Sql(SqlViewRepresentation {
1551                    dialect: "trino".to_string(),
1552                    sql: "SELECT * FROM foo".to_string(),
1553                }),
1554            ]))
1555            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1556            .build();
1557
1558        assert!(require_no_dialect_dropped(&previous, &current).is_ok());
1559    }
1560
1561    #[test]
1562    fn test_allow_replace_drop_dialects() {
1563        use std::collections::HashMap;
1564
1565        use super::allow_replace_drop_dialects;
1566
1567        let mut properties = HashMap::new();
1568        assert!(!allow_replace_drop_dialects(&properties));
1569
1570        properties.insert(
1571            "replace.drop-dialect.allowed".to_string(),
1572            "true".to_string(),
1573        );
1574        assert!(allow_replace_drop_dialects(&properties));
1575
1576        properties.insert(
1577            "replace.drop-dialect.allowed".to_string(),
1578            "false".to_string(),
1579        );
1580        assert!(!allow_replace_drop_dialects(&properties));
1581
1582        properties.insert(
1583            "replace.drop-dialect.allowed".to_string(),
1584            "TRUE".to_string(),
1585        );
1586        assert!(allow_replace_drop_dialects(&properties));
1587
1588        properties.insert(
1589            "replace.drop-dialect.allowed".to_string(),
1590            "FALSE".to_string(),
1591        );
1592        assert!(!allow_replace_drop_dialects(&properties));
1593    }
1594
1595    #[test]
1596    fn test_lowercase_sql_dialects_for() {
1597        let view_version = ViewVersion::builder()
1598            .with_version_id(0)
1599            .with_schema_id(0)
1600            .with_timestamp_ms(0)
1601            .with_representations(ViewRepresentations(vec![
1602                ViewRepresentation::Sql(SqlViewRepresentation {
1603                    dialect: "STARROCKS".to_string(),
1604                    sql: "SELECT * FROM foo".to_string(),
1605                }),
1606                ViewRepresentation::Sql(SqlViewRepresentation {
1607                    dialect: "trino".to_string(),
1608                    sql: "SELECT * FROM bar".to_string(),
1609                }),
1610                ViewRepresentation::Sql(SqlViewRepresentation {
1611                    dialect: "Spark".to_string(),
1612                    sql: "SELECT * FROM bar".to_string(),
1613                }),
1614            ]))
1615            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1616            .build();
1617
1618        let dialects = lowercase_sql_dialects_for(&view_version);
1619        assert_eq!(dialects.len(), 3);
1620        assert!(dialects.contains("trino"));
1621        assert!(dialects.contains("spark"));
1622        assert!(dialects.contains("starrocks"));
1623    }
1624
1625    #[test]
1626    fn test_require_unique_dialects() {
1627        let view_version = ViewVersion::builder()
1628            .with_version_id(0)
1629            .with_schema_id(0)
1630            .with_timestamp_ms(0)
1631            .with_representations(ViewRepresentations(vec![
1632                ViewRepresentation::Sql(SqlViewRepresentation {
1633                    dialect: "trino".to_string(),
1634                    sql: "SELECT * FROM foo".to_string(),
1635                }),
1636                ViewRepresentation::Sql(SqlViewRepresentation {
1637                    dialect: "trino".to_string(),
1638                    sql: "SELECT * FROM bar".to_string(),
1639                }),
1640            ]))
1641            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1642            .build();
1643
1644        assert!(require_unique_dialects(&view_version).is_err());
1645
1646        let view_version = ViewVersion::builder()
1647            .with_version_id(0)
1648            .with_schema_id(0)
1649            .with_timestamp_ms(0)
1650            .with_representations(ViewRepresentations(vec![
1651                ViewRepresentation::Sql(SqlViewRepresentation {
1652                    dialect: "trino".to_string(),
1653                    sql: "SELECT * FROM foo".to_string(),
1654                }),
1655                ViewRepresentation::Sql(SqlViewRepresentation {
1656                    dialect: "spark".to_string(),
1657                    sql: "SELECT * FROM bar".to_string(),
1658                }),
1659            ]))
1660            .with_default_namespace(NamespaceIdent::new("default".to_string()))
1661            .build();
1662
1663        assert!(require_unique_dialects(&view_version).is_ok());
1664    }
1665}