Skip to main content

iceberg/catalog/memory/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains memory catalog implementation.
19
20use std::collections::HashMap;
21use std::sync::Arc;
22
23use async_trait::async_trait;
24use futures::lock::{Mutex, MutexGuard};
25use itertools::Itertools;
26
27use super::namespace_state::NamespaceState;
28use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory};
29use crate::spec::{TableMetadata, TableMetadataBuilder};
30use crate::table::Table;
31use crate::{
32    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
33    TableCommit, TableCreation, TableIdent,
34};
35
36/// Memory catalog warehouse location
37pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
38
39/// namespace `location` property
40const LOCATION: &str = "location";
41
42/// Builder for [`MemoryCatalog`].
43#[derive(Debug)]
44pub struct MemoryCatalogBuilder {
45    config: MemoryCatalogConfig,
46    storage_factory: Option<Arc<dyn StorageFactory>>,
47}
48
49impl Default for MemoryCatalogBuilder {
50    fn default() -> Self {
51        Self {
52            config: MemoryCatalogConfig {
53                name: None,
54                warehouse: "".to_string(),
55                props: HashMap::new(),
56            },
57            storage_factory: None,
58        }
59    }
60}
61
62impl CatalogBuilder for MemoryCatalogBuilder {
63    type C = MemoryCatalog;
64
65    fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
66        self.storage_factory = Some(storage_factory);
67        self
68    }
69
70    fn load(
71        mut self,
72        name: impl Into<String>,
73        props: HashMap<String, String>,
74    ) -> impl Future<Output = Result<Self::C>> + Send {
75        self.config.name = Some(name.into());
76
77        if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
78            self.config.warehouse = props
79                .get(MEMORY_CATALOG_WAREHOUSE)
80                .cloned()
81                .unwrap_or_default()
82        }
83
84        // Collect other remaining properties
85        self.config.props = props
86            .into_iter()
87            .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
88            .collect();
89
90        let result = {
91            if self.config.name.is_none() {
92                Err(Error::new(
93                    ErrorKind::DataInvalid,
94                    "Catalog name is required",
95                ))
96            } else if self.config.warehouse.is_empty() {
97                Err(Error::new(
98                    ErrorKind::DataInvalid,
99                    "Catalog warehouse is required",
100                ))
101            } else {
102                MemoryCatalog::new(self.config, self.storage_factory)
103            }
104        };
105
106        std::future::ready(result)
107    }
108}
109
110#[derive(Clone, Debug)]
111pub(crate) struct MemoryCatalogConfig {
112    name: Option<String>,
113    warehouse: String,
114    props: HashMap<String, String>,
115}
116
117/// Memory catalog implementation.
118#[derive(Debug)]
119pub struct MemoryCatalog {
120    root_namespace_state: Mutex<NamespaceState>,
121    file_io: FileIO,
122    warehouse_location: String,
123}
124
125impl MemoryCatalog {
126    /// Creates a memory catalog.
127    fn new(
128        config: MemoryCatalogConfig,
129        storage_factory: Option<Arc<dyn StorageFactory>>,
130    ) -> Result<Self> {
131        // Use provided factory or default to MemoryStorageFactory
132        let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory));
133
134        Ok(Self {
135            root_namespace_state: Mutex::new(NamespaceState::default()),
136            file_io: FileIOBuilder::new(factory).with_props(config.props).build(),
137            warehouse_location: config.warehouse,
138        })
139    }
140
141    /// Loads a table from the locked namespace state.
142    async fn load_table_from_locked_state(
143        &self,
144        table_ident: &TableIdent,
145        root_namespace_state: &MutexGuard<'_, NamespaceState>,
146    ) -> Result<Table> {
147        let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
148        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
149
150        Table::builder()
151            .identifier(table_ident.clone())
152            .metadata(metadata)
153            .metadata_location(metadata_location.to_string())
154            .file_io(self.file_io.clone())
155            .build()
156    }
157}
158
159#[async_trait]
160impl Catalog for MemoryCatalog {
161    /// List namespaces inside the catalog.
162    async fn list_namespaces(
163        &self,
164        maybe_parent: Option<&NamespaceIdent>,
165    ) -> Result<Vec<NamespaceIdent>> {
166        let root_namespace_state = self.root_namespace_state.lock().await;
167
168        match maybe_parent {
169            None => {
170                let namespaces = root_namespace_state
171                    .list_top_level_namespaces()
172                    .into_iter()
173                    .map(|str| NamespaceIdent::new(str.to_string()))
174                    .collect_vec();
175
176                Ok(namespaces)
177            }
178            Some(parent_namespace_ident) => {
179                let namespaces = root_namespace_state
180                    .list_namespaces_under(parent_namespace_ident)?
181                    .into_iter()
182                    .map(|name| {
183                        let mut names = parent_namespace_ident.iter().cloned().collect::<Vec<_>>();
184                        names.push(name.to_string());
185                        NamespaceIdent::from_vec(names)
186                    })
187                    .collect::<Result<Vec<_>>>()?;
188
189                Ok(namespaces)
190            }
191        }
192    }
193
194    /// Create a new namespace inside the catalog.
195    async fn create_namespace(
196        &self,
197        namespace_ident: &NamespaceIdent,
198        properties: HashMap<String, String>,
199    ) -> Result<Namespace> {
200        let mut root_namespace_state = self.root_namespace_state.lock().await;
201
202        root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
203        let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
204
205        Ok(namespace)
206    }
207
208    /// Get a namespace information from the catalog.
209    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
210        let root_namespace_state = self.root_namespace_state.lock().await;
211
212        let namespace = Namespace::with_properties(
213            namespace_ident.clone(),
214            root_namespace_state
215                .get_properties(namespace_ident)?
216                .clone(),
217        );
218
219        Ok(namespace)
220    }
221
222    /// Check if namespace exists in catalog.
223    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
224        let guarded_namespaces = self.root_namespace_state.lock().await;
225
226        Ok(guarded_namespaces.namespace_exists(namespace_ident))
227    }
228
229    /// Update a namespace inside the catalog.
230    ///
231    /// # Behavior
232    ///
233    /// The properties must be the full set of namespace.
234    async fn update_namespace(
235        &self,
236        namespace_ident: &NamespaceIdent,
237        properties: HashMap<String, String>,
238    ) -> Result<()> {
239        let mut root_namespace_state = self.root_namespace_state.lock().await;
240
241        root_namespace_state.replace_properties(namespace_ident, properties)
242    }
243
244    /// Drop a namespace from the catalog.
245    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
246        let mut root_namespace_state = self.root_namespace_state.lock().await;
247
248        root_namespace_state.remove_existing_namespace(namespace_ident)
249    }
250
251    /// List tables from namespace.
252    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
253        let root_namespace_state = self.root_namespace_state.lock().await;
254
255        let table_names = root_namespace_state.list_tables(namespace_ident)?;
256        let table_idents = table_names
257            .into_iter()
258            .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
259            .collect_vec();
260
261        Ok(table_idents)
262    }
263
264    /// Create a new table inside the namespace.
265    async fn create_table(
266        &self,
267        namespace_ident: &NamespaceIdent,
268        table_creation: TableCreation,
269    ) -> Result<Table> {
270        let mut root_namespace_state = self.root_namespace_state.lock().await;
271
272        let table_name = table_creation.name.clone();
273        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
274
275        let (table_creation, location) = match table_creation.location.clone() {
276            Some(location) => (table_creation, location),
277            None => {
278                let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
279                let location_prefix = match namespace_properties.get(LOCATION) {
280                    Some(namespace_location) => namespace_location.clone(),
281                    None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
282                };
283
284                let location = format!("{}/{}", location_prefix, table_ident.name());
285
286                let new_table_creation = TableCreation {
287                    location: Some(location.clone()),
288                    ..table_creation
289                };
290
291                (new_table_creation, location)
292            }
293        };
294
295        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
296            .build()?
297            .metadata;
298        let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
299
300        metadata.write_to(&self.file_io, &metadata_location).await?;
301
302        root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
303
304        Table::builder()
305            .file_io(self.file_io.clone())
306            .metadata_location(metadata_location)
307            .metadata(metadata)
308            .identifier(table_ident)
309            .build()
310    }
311
312    /// Load table from the catalog.
313    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
314        let root_namespace_state = self.root_namespace_state.lock().await;
315
316        self.load_table_from_locked_state(table_ident, &root_namespace_state)
317            .await
318    }
319
320    /// Drop a table from the catalog.
321    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
322        let mut root_namespace_state = self.root_namespace_state.lock().await;
323
324        let metadata_location = root_namespace_state.remove_existing_table(table_ident)?;
325        self.file_io.delete(&metadata_location).await
326    }
327
328    /// Check if a table exists in the catalog.
329    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
330        let root_namespace_state = self.root_namespace_state.lock().await;
331
332        root_namespace_state.table_exists(table_ident)
333    }
334
335    /// Rename a table in the catalog.
336    async fn rename_table(
337        &self,
338        src_table_ident: &TableIdent,
339        dst_table_ident: &TableIdent,
340    ) -> Result<()> {
341        let mut root_namespace_state = self.root_namespace_state.lock().await;
342
343        let mut new_root_namespace_state = root_namespace_state.clone();
344        let metadata_location = new_root_namespace_state
345            .get_existing_table_location(src_table_ident)?
346            .clone();
347        new_root_namespace_state.remove_existing_table(src_table_ident)?;
348        new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
349        *root_namespace_state = new_root_namespace_state;
350
351        Ok(())
352    }
353
354    async fn register_table(
355        &self,
356        table_ident: &TableIdent,
357        metadata_location: String,
358    ) -> Result<Table> {
359        let mut root_namespace_state = self.root_namespace_state.lock().await;
360        root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
361
362        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
363
364        Table::builder()
365            .file_io(self.file_io.clone())
366            .metadata_location(metadata_location)
367            .metadata(metadata)
368            .identifier(table_ident.clone())
369            .build()
370    }
371
372    /// Update a table in the catalog.
373    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
374        let mut root_namespace_state = self.root_namespace_state.lock().await;
375
376        let current_table = self
377            .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
378            .await?;
379
380        // Apply TableCommit to get staged table
381        let staged_table = commit.apply(current_table)?;
382
383        // Write table metadata to the new location
384        staged_table
385            .metadata()
386            .write_to(
387                staged_table.file_io(),
388                staged_table.metadata_location_result()?,
389            )
390            .await?;
391
392        // Flip the pointer to reference the new metadata file.
393        let updated_table = root_namespace_state.commit_table_update(staged_table)?;
394
395        Ok(updated_table)
396    }
397}
398
399#[cfg(test)]
400pub(crate) mod tests {
401    use std::collections::HashSet;
402    use std::hash::Hash;
403    use std::iter::FromIterator;
404    use std::vec;
405
406    use regex::Regex;
407    use tempfile::TempDir;
408
409    use super::*;
410    use crate::io::FileIO;
411    use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
412    use crate::transaction::{ApplyTransactionAction, Transaction};
413
414    fn temp_path() -> String {
415        let temp_dir = TempDir::new().unwrap();
416        temp_dir.path().to_str().unwrap().to_string()
417    }
418
419    pub(crate) async fn new_memory_catalog() -> impl Catalog {
420        let warehouse_location = temp_path();
421        MemoryCatalogBuilder::default()
422            .load(
423                "memory",
424                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
425            )
426            .await
427            .unwrap()
428    }
429
430    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
431        let _ = catalog
432            .create_namespace(namespace_ident, HashMap::new())
433            .await
434            .unwrap();
435    }
436
437    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
438        for namespace_ident in namespace_idents {
439            let _ = create_namespace(catalog, namespace_ident).await;
440        }
441    }
442
443    fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
444        HashSet::from_iter(vec)
445    }
446
447    fn simple_table_schema() -> Schema {
448        Schema::builder()
449            .with_fields(vec![
450                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
451            ])
452            .build()
453            .unwrap()
454    }
455
456    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
457        catalog
458            .create_table(
459                &table_ident.namespace,
460                TableCreation::builder()
461                    .name(table_ident.name().into())
462                    .schema(simple_table_schema())
463                    .build(),
464            )
465            .await
466            .unwrap()
467    }
468
469    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
470        for table_ident in table_idents {
471            create_table(catalog, table_ident).await;
472        }
473    }
474
475    async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
476        let namespace_ident = NamespaceIdent::new("abc".into());
477        create_namespace(catalog, &namespace_ident).await;
478
479        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
480        create_table(catalog, &table_ident).await
481    }
482
483    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
484        assert_eq!(table.identifier(), expected_table_ident);
485
486        let metadata = table.metadata();
487
488        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
489
490        let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
491            .with_spec_id(0)
492            .build()
493            .unwrap();
494
495        assert_eq!(
496            metadata
497                .partition_specs_iter()
498                .map(|p| p.as_ref())
499                .collect_vec(),
500            vec![&expected_partition_spec]
501        );
502
503        let expected_sorted_order = SortOrder::builder()
504            .with_order_id(0)
505            .with_fields(vec![])
506            .build(expected_schema)
507            .unwrap();
508
509        assert_eq!(
510            metadata
511                .sort_orders_iter()
512                .map(|s| s.as_ref())
513                .collect_vec(),
514            vec![&expected_sorted_order]
515        );
516
517        assert_eq!(metadata.properties(), &HashMap::new());
518
519        assert!(!table.readonly());
520    }
521
522    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
523
524    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
525        let actual = table.metadata_location().unwrap().to_string();
526        let regex = Regex::new(regex_str).unwrap();
527        assert!(
528            regex.is_match(&actual),
529            "Expected metadata location to match regex, but got location: {actual} and regex: {regex}"
530        )
531    }
532
533    #[tokio::test]
534    async fn test_list_namespaces_returns_empty_vector() {
535        let catalog = new_memory_catalog().await;
536
537        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
538    }
539
540    #[tokio::test]
541    async fn test_list_namespaces_returns_single_namespace() {
542        let catalog = new_memory_catalog().await;
543        let namespace_ident = NamespaceIdent::new("abc".into());
544        create_namespace(&catalog, &namespace_ident).await;
545
546        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
547            namespace_ident
548        ]);
549    }
550
551    #[tokio::test]
552    async fn test_list_namespaces_returns_multiple_namespaces() {
553        let catalog = new_memory_catalog().await;
554        let namespace_ident_1 = NamespaceIdent::new("a".into());
555        let namespace_ident_2 = NamespaceIdent::new("b".into());
556        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
557
558        assert_eq!(
559            to_set(catalog.list_namespaces(None).await.unwrap()),
560            to_set(vec![namespace_ident_1, namespace_ident_2])
561        );
562    }
563
564    #[tokio::test]
565    async fn test_list_namespaces_returns_only_top_level_namespaces() {
566        let catalog = new_memory_catalog().await;
567        let namespace_ident_1 = NamespaceIdent::new("a".into());
568        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
569        let namespace_ident_3 = NamespaceIdent::new("b".into());
570        create_namespaces(&catalog, &vec![
571            &namespace_ident_1,
572            &namespace_ident_2,
573            &namespace_ident_3,
574        ])
575        .await;
576
577        assert_eq!(
578            to_set(catalog.list_namespaces(None).await.unwrap()),
579            to_set(vec![namespace_ident_1, namespace_ident_3])
580        );
581    }
582
583    #[tokio::test]
584    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
585        let catalog = new_memory_catalog().await;
586        let namespace_ident_1 = NamespaceIdent::new("a".into());
587        let namespace_ident_2 = NamespaceIdent::new("b".into());
588        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
589
590        assert_eq!(
591            catalog
592                .list_namespaces(Some(&namespace_ident_1))
593                .await
594                .unwrap(),
595            vec![]
596        );
597    }
598
599    #[tokio::test]
600    async fn test_list_namespaces_returns_namespace_under_parent() {
601        let catalog = new_memory_catalog().await;
602        let namespace_ident_1 = NamespaceIdent::new("a".into());
603        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
604        let namespace_ident_3 = NamespaceIdent::new("c".into());
605        create_namespaces(&catalog, &vec![
606            &namespace_ident_1,
607            &namespace_ident_2,
608            &namespace_ident_3,
609        ])
610        .await;
611
612        assert_eq!(
613            to_set(catalog.list_namespaces(None).await.unwrap()),
614            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
615        );
616
617        assert_eq!(
618            catalog
619                .list_namespaces(Some(&namespace_ident_1))
620                .await
621                .unwrap(),
622            vec![namespace_ident_2]
623        );
624    }
625
626    #[tokio::test]
627    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
628        let catalog = new_memory_catalog().await;
629        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
630        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
631        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
632        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
633        let namespace_ident_5 = NamespaceIdent::new("b".into());
634        create_namespaces(&catalog, &vec![
635            &namespace_ident_1,
636            &namespace_ident_2,
637            &namespace_ident_3,
638            &namespace_ident_4,
639            &namespace_ident_5,
640        ])
641        .await;
642
643        assert_eq!(
644            to_set(
645                catalog
646                    .list_namespaces(Some(&namespace_ident_1))
647                    .await
648                    .unwrap()
649            ),
650            to_set(vec![
651                namespace_ident_2,
652                namespace_ident_3,
653                namespace_ident_4,
654            ])
655        );
656    }
657
658    #[tokio::test]
659    async fn test_namespace_exists_returns_false() {
660        let catalog = new_memory_catalog().await;
661        let namespace_ident = NamespaceIdent::new("a".into());
662        create_namespace(&catalog, &namespace_ident).await;
663
664        assert!(
665            !catalog
666                .namespace_exists(&NamespaceIdent::new("b".into()))
667                .await
668                .unwrap()
669        );
670    }
671
672    #[tokio::test]
673    async fn test_namespace_exists_returns_true() {
674        let catalog = new_memory_catalog().await;
675        let namespace_ident = NamespaceIdent::new("a".into());
676        create_namespace(&catalog, &namespace_ident).await;
677
678        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
679    }
680
681    #[tokio::test]
682    async fn test_create_namespace_with_empty_properties() {
683        let catalog = new_memory_catalog().await;
684        let namespace_ident = NamespaceIdent::new("a".into());
685
686        assert_eq!(
687            catalog
688                .create_namespace(&namespace_ident, HashMap::new())
689                .await
690                .unwrap(),
691            Namespace::new(namespace_ident.clone())
692        );
693
694        assert_eq!(
695            catalog.get_namespace(&namespace_ident).await.unwrap(),
696            Namespace::with_properties(namespace_ident, HashMap::new())
697        );
698    }
699
700    #[tokio::test]
701    async fn test_create_namespace_with_properties() {
702        let catalog = new_memory_catalog().await;
703        let namespace_ident = NamespaceIdent::new("abc".into());
704
705        let mut properties: HashMap<String, String> = HashMap::new();
706        properties.insert("k".into(), "v".into());
707
708        assert_eq!(
709            catalog
710                .create_namespace(&namespace_ident, properties.clone())
711                .await
712                .unwrap(),
713            Namespace::with_properties(namespace_ident.clone(), properties.clone())
714        );
715
716        assert_eq!(
717            catalog.get_namespace(&namespace_ident).await.unwrap(),
718            Namespace::with_properties(namespace_ident, properties)
719        );
720    }
721
722    #[tokio::test]
723    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
724        let catalog = new_memory_catalog().await;
725        let namespace_ident = NamespaceIdent::new("a".into());
726        create_namespace(&catalog, &namespace_ident).await;
727
728        assert_eq!(
729            catalog
730                .create_namespace(&namespace_ident, HashMap::new())
731                .await
732                .unwrap_err()
733                .to_string(),
734            format!(
735                "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
736                &namespace_ident
737            )
738        );
739
740        assert_eq!(
741            catalog.get_namespace(&namespace_ident).await.unwrap(),
742            Namespace::with_properties(namespace_ident, HashMap::new())
743        );
744    }
745
746    #[tokio::test]
747    async fn test_create_nested_namespace() {
748        let catalog = new_memory_catalog().await;
749        let parent_namespace_ident = NamespaceIdent::new("a".into());
750        create_namespace(&catalog, &parent_namespace_ident).await;
751
752        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
753
754        assert_eq!(
755            catalog
756                .create_namespace(&child_namespace_ident, HashMap::new())
757                .await
758                .unwrap(),
759            Namespace::new(child_namespace_ident.clone())
760        );
761
762        assert_eq!(
763            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
764            Namespace::with_properties(child_namespace_ident, HashMap::new())
765        );
766    }
767
768    #[tokio::test]
769    async fn test_create_deeply_nested_namespace() {
770        let catalog = new_memory_catalog().await;
771        let namespace_ident_a = NamespaceIdent::new("a".into());
772        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
773        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
774
775        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
776
777        assert_eq!(
778            catalog
779                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
780                .await
781                .unwrap(),
782            Namespace::new(namespace_ident_a_b_c.clone())
783        );
784
785        assert_eq!(
786            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
787            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
788        );
789    }
790
791    #[tokio::test]
792    async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
793        let catalog = new_memory_catalog().await;
794
795        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
796
797        assert_eq!(
798            catalog
799                .create_namespace(&nested_namespace_ident, HashMap::new())
800                .await
801                .unwrap_err()
802                .to_string(),
803            format!(
804                "NamespaceNotFound => No such namespace: {:?}",
805                NamespaceIdent::new("a".into())
806            )
807        );
808
809        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
810    }
811
812    #[tokio::test]
813    async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
814     {
815        let catalog = new_memory_catalog().await;
816
817        let namespace_ident_a = NamespaceIdent::new("a".into());
818        create_namespace(&catalog, &namespace_ident_a).await;
819
820        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
821
822        assert_eq!(
823            catalog
824                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
825                .await
826                .unwrap_err()
827                .to_string(),
828            format!(
829                "NamespaceNotFound => No such namespace: {:?}",
830                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
831            )
832        );
833
834        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
835            namespace_ident_a.clone()
836        ]);
837
838        assert_eq!(
839            catalog
840                .list_namespaces(Some(&namespace_ident_a))
841                .await
842                .unwrap(),
843            vec![]
844        );
845    }
846
847    #[tokio::test]
848    async fn test_get_namespace() {
849        let catalog = new_memory_catalog().await;
850        let namespace_ident = NamespaceIdent::new("abc".into());
851
852        let mut properties: HashMap<String, String> = HashMap::new();
853        properties.insert("k".into(), "v".into());
854        let _ = catalog
855            .create_namespace(&namespace_ident, properties.clone())
856            .await
857            .unwrap();
858
859        assert_eq!(
860            catalog.get_namespace(&namespace_ident).await.unwrap(),
861            Namespace::with_properties(namespace_ident, properties)
862        )
863    }
864
865    #[tokio::test]
866    async fn test_get_nested_namespace() {
867        let catalog = new_memory_catalog().await;
868        let namespace_ident_a = NamespaceIdent::new("a".into());
869        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
870        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
871
872        assert_eq!(
873            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
874            Namespace::with_properties(namespace_ident_a_b, HashMap::new())
875        );
876    }
877
878    #[tokio::test]
879    async fn test_get_deeply_nested_namespace() {
880        let catalog = new_memory_catalog().await;
881        let namespace_ident_a = NamespaceIdent::new("a".into());
882        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
883        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
884        create_namespaces(&catalog, &vec![
885            &namespace_ident_a,
886            &namespace_ident_a_b,
887            &namespace_ident_a_b_c,
888        ])
889        .await;
890
891        assert_eq!(
892            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
893            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
894        );
895    }
896
897    #[tokio::test]
898    async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
899        let catalog = new_memory_catalog().await;
900        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
901
902        let non_existent_namespace_ident = NamespaceIdent::new("b".into());
903        assert_eq!(
904            catalog
905                .get_namespace(&non_existent_namespace_ident)
906                .await
907                .unwrap_err()
908                .to_string(),
909            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
910        )
911    }
912
913    #[tokio::test]
914    async fn test_update_namespace() {
915        let catalog = new_memory_catalog().await;
916        let namespace_ident = NamespaceIdent::new("abc".into());
917        create_namespace(&catalog, &namespace_ident).await;
918
919        let mut new_properties: HashMap<String, String> = HashMap::new();
920        new_properties.insert("k".into(), "v".into());
921
922        catalog
923            .update_namespace(&namespace_ident, new_properties.clone())
924            .await
925            .unwrap();
926
927        assert_eq!(
928            catalog.get_namespace(&namespace_ident).await.unwrap(),
929            Namespace::with_properties(namespace_ident, new_properties)
930        )
931    }
932
933    #[tokio::test]
934    async fn test_update_nested_namespace() {
935        let catalog = new_memory_catalog().await;
936        let namespace_ident_a = NamespaceIdent::new("a".into());
937        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
938        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
939
940        let mut new_properties = HashMap::new();
941        new_properties.insert("k".into(), "v".into());
942
943        catalog
944            .update_namespace(&namespace_ident_a_b, new_properties.clone())
945            .await
946            .unwrap();
947
948        assert_eq!(
949            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
950            Namespace::with_properties(namespace_ident_a_b, new_properties)
951        );
952    }
953
954    #[tokio::test]
955    async fn test_update_deeply_nested_namespace() {
956        let catalog = new_memory_catalog().await;
957        let namespace_ident_a = NamespaceIdent::new("a".into());
958        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
959        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
960        create_namespaces(&catalog, &vec![
961            &namespace_ident_a,
962            &namespace_ident_a_b,
963            &namespace_ident_a_b_c,
964        ])
965        .await;
966
967        let mut new_properties = HashMap::new();
968        new_properties.insert("k".into(), "v".into());
969
970        catalog
971            .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
972            .await
973            .unwrap();
974
975        assert_eq!(
976            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
977            Namespace::with_properties(namespace_ident_a_b_c, new_properties)
978        );
979    }
980
981    #[tokio::test]
982    async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
983        let catalog = new_memory_catalog().await;
984        create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
985
986        let non_existent_namespace_ident = NamespaceIdent::new("def".into());
987        assert_eq!(
988            catalog
989                .update_namespace(&non_existent_namespace_ident, HashMap::new())
990                .await
991                .unwrap_err()
992                .to_string(),
993            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
994        )
995    }
996
997    #[tokio::test]
998    async fn test_drop_namespace() {
999        let catalog = new_memory_catalog().await;
1000        let namespace_ident = NamespaceIdent::new("abc".into());
1001        create_namespace(&catalog, &namespace_ident).await;
1002
1003        catalog.drop_namespace(&namespace_ident).await.unwrap();
1004
1005        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
1006    }
1007
1008    #[tokio::test]
1009    async fn test_drop_nested_namespace() {
1010        let catalog = new_memory_catalog().await;
1011        let namespace_ident_a = NamespaceIdent::new("a".into());
1012        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1013        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1014
1015        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1016
1017        assert!(
1018            !catalog
1019                .namespace_exists(&namespace_ident_a_b)
1020                .await
1021                .unwrap()
1022        );
1023
1024        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1025    }
1026
1027    #[tokio::test]
1028    async fn test_drop_deeply_nested_namespace() {
1029        let catalog = new_memory_catalog().await;
1030        let namespace_ident_a = NamespaceIdent::new("a".into());
1031        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1032        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1033        create_namespaces(&catalog, &vec![
1034            &namespace_ident_a,
1035            &namespace_ident_a_b,
1036            &namespace_ident_a_b_c,
1037        ])
1038        .await;
1039
1040        catalog
1041            .drop_namespace(&namespace_ident_a_b_c)
1042            .await
1043            .unwrap();
1044
1045        assert!(
1046            !catalog
1047                .namespace_exists(&namespace_ident_a_b_c)
1048                .await
1049                .unwrap()
1050        );
1051
1052        assert!(
1053            catalog
1054                .namespace_exists(&namespace_ident_a_b)
1055                .await
1056                .unwrap()
1057        );
1058
1059        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1060    }
1061
1062    #[tokio::test]
1063    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1064        let catalog = new_memory_catalog().await;
1065
1066        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1067        assert_eq!(
1068            catalog
1069                .drop_namespace(&non_existent_namespace_ident)
1070                .await
1071                .unwrap_err()
1072                .to_string(),
1073            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1074        )
1075    }
1076
1077    #[tokio::test]
1078    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1079        let catalog = new_memory_catalog().await;
1080        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1081
1082        let non_existent_namespace_ident =
1083            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1084        assert_eq!(
1085            catalog
1086                .drop_namespace(&non_existent_namespace_ident)
1087                .await
1088                .unwrap_err()
1089                .to_string(),
1090            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}")
1091        )
1092    }
1093
1094    #[tokio::test]
1095    async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1096        let catalog = new_memory_catalog().await;
1097        let namespace_ident_a = NamespaceIdent::new("a".into());
1098        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1099        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1100
1101        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1102
1103        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1104
1105        assert!(
1106            !catalog
1107                .namespace_exists(&namespace_ident_a_b)
1108                .await
1109                .unwrap()
1110        );
1111    }
1112
1113    #[tokio::test]
1114    async fn test_create_table_with_location() {
1115        let tmp_dir = TempDir::new().unwrap();
1116        let catalog = new_memory_catalog().await;
1117        let namespace_ident = NamespaceIdent::new("a".into());
1118        create_namespace(&catalog, &namespace_ident).await;
1119
1120        let table_name = "abc";
1121        let location = tmp_dir.path().to_str().unwrap().to_string();
1122        let table_creation = TableCreation::builder()
1123            .name(table_name.into())
1124            .location(location.clone())
1125            .schema(simple_table_schema())
1126            .build();
1127
1128        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1129
1130        assert_table_eq(
1131            &catalog
1132                .create_table(&namespace_ident, table_creation)
1133                .await
1134                .unwrap(),
1135            &expected_table_ident,
1136            &simple_table_schema(),
1137        );
1138
1139        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1140
1141        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1142
1143        assert!(
1144            table
1145                .metadata_location()
1146                .unwrap()
1147                .to_string()
1148                .starts_with(&location)
1149        )
1150    }
1151
1152    #[tokio::test]
1153    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1154        let warehouse_location = temp_path();
1155        let catalog = MemoryCatalogBuilder::default()
1156            .load(
1157                "memory",
1158                HashMap::from([(
1159                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1160                    warehouse_location.clone(),
1161                )]),
1162            )
1163            .await
1164            .unwrap();
1165
1166        let namespace_ident = NamespaceIdent::new("a".into());
1167        let mut namespace_properties = HashMap::new();
1168        let namespace_location = temp_path();
1169        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1170        catalog
1171            .create_namespace(&namespace_ident, namespace_properties)
1172            .await
1173            .unwrap();
1174
1175        let table_name = "tbl1";
1176        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1177        let expected_table_metadata_location_regex =
1178            format!("^{namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",);
1179
1180        let table = catalog
1181            .create_table(
1182                &namespace_ident,
1183                TableCreation::builder()
1184                    .name(table_name.into())
1185                    .schema(simple_table_schema())
1186                    // no location specified for table
1187                    .build(),
1188            )
1189            .await
1190            .unwrap();
1191        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1192        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1193
1194        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1195        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1196        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1197    }
1198
1199    #[tokio::test]
1200    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1201     {
1202        let warehouse_location = temp_path();
1203        let catalog = MemoryCatalogBuilder::default()
1204            .load(
1205                "memory",
1206                HashMap::from([(
1207                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1208                    warehouse_location.clone(),
1209                )]),
1210            )
1211            .await
1212            .unwrap();
1213
1214        let namespace_ident = NamespaceIdent::new("a".into());
1215        let mut namespace_properties = HashMap::new();
1216        let namespace_location = temp_path();
1217        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1218        catalog
1219            .create_namespace(&namespace_ident, namespace_properties)
1220            .await
1221            .unwrap();
1222
1223        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1224        let mut nested_namespace_properties = HashMap::new();
1225        let nested_namespace_location = temp_path();
1226        nested_namespace_properties
1227            .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1228        catalog
1229            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1230            .await
1231            .unwrap();
1232
1233        let table_name = "tbl1";
1234        let expected_table_ident =
1235            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1236        let expected_table_metadata_location_regex = format!(
1237            "^{nested_namespace_location}/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$",
1238        );
1239
1240        let table = catalog
1241            .create_table(
1242                &nested_namespace_ident,
1243                TableCreation::builder()
1244                    .name(table_name.into())
1245                    .schema(simple_table_schema())
1246                    // no location specified for table
1247                    .build(),
1248            )
1249            .await
1250            .unwrap();
1251        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1252        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1253
1254        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1255        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1256        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1257    }
1258
1259    #[tokio::test]
1260    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1261     {
1262        let warehouse_location = temp_path();
1263        let catalog = MemoryCatalogBuilder::default()
1264            .load(
1265                "memory",
1266                HashMap::from([(
1267                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1268                    warehouse_location.clone(),
1269                )]),
1270            )
1271            .await
1272            .unwrap();
1273
1274        let namespace_ident = NamespaceIdent::new("a".into());
1275        // note: no location specified in namespace_properties
1276        let namespace_properties = HashMap::new();
1277        catalog
1278            .create_namespace(&namespace_ident, namespace_properties)
1279            .await
1280            .unwrap();
1281
1282        let table_name = "tbl1";
1283        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1284        let expected_table_metadata_location_regex =
1285            format!("^{warehouse_location}/a/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$");
1286
1287        let table = catalog
1288            .create_table(
1289                &namespace_ident,
1290                TableCreation::builder()
1291                    .name(table_name.into())
1292                    .schema(simple_table_schema())
1293                    // no location specified for table
1294                    .build(),
1295            )
1296            .await
1297            .unwrap();
1298        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1299        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1300
1301        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1302        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1303        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1304    }
1305
1306    #[tokio::test]
1307    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1308     {
1309        let warehouse_location = temp_path();
1310        let catalog = MemoryCatalogBuilder::default()
1311            .load(
1312                "memory",
1313                HashMap::from([(
1314                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1315                    warehouse_location.clone(),
1316                )]),
1317            )
1318            .await
1319            .unwrap();
1320
1321        let namespace_ident = NamespaceIdent::new("a".into());
1322        catalog
1323            // note: no location specified in namespace_properties
1324            .create_namespace(&namespace_ident, HashMap::new())
1325            .await
1326            .unwrap();
1327
1328        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1329        catalog
1330            // note: no location specified in namespace_properties
1331            .create_namespace(&nested_namespace_ident, HashMap::new())
1332            .await
1333            .unwrap();
1334
1335        let table_name = "tbl1";
1336        let expected_table_ident =
1337            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1338        let expected_table_metadata_location_regex = format!(
1339            "^{warehouse_location}/a/b/tbl1/metadata/00000-{UUID_REGEX_STR}.metadata.json$"
1340        );
1341
1342        let table = catalog
1343            .create_table(
1344                &nested_namespace_ident,
1345                TableCreation::builder()
1346                    .name(table_name.into())
1347                    .schema(simple_table_schema())
1348                    // no location specified for table
1349                    .build(),
1350            )
1351            .await
1352            .unwrap();
1353        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1354        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1355
1356        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1357        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1358        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1359    }
1360
1361    #[tokio::test]
1362    async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1363     {
1364        let catalog = MemoryCatalogBuilder::default()
1365            .load("memory", HashMap::from([]))
1366            .await;
1367
1368        assert!(catalog.is_err());
1369        assert_eq!(
1370            catalog.unwrap_err().to_string(),
1371            "DataInvalid => Catalog warehouse is required"
1372        );
1373    }
1374
1375    #[tokio::test]
1376    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1377        let catalog = new_memory_catalog().await;
1378        let namespace_ident = NamespaceIdent::new("a".into());
1379        create_namespace(&catalog, &namespace_ident).await;
1380        let table_name = "tbl1";
1381        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1382        create_table(&catalog, &table_ident).await;
1383
1384        let tmp_dir = TempDir::new().unwrap();
1385        let location = tmp_dir.path().to_str().unwrap().to_string();
1386
1387        assert_eq!(
1388            catalog
1389                .create_table(
1390                    &namespace_ident,
1391                    TableCreation::builder()
1392                        .name(table_name.into())
1393                        .schema(simple_table_schema())
1394                        .location(location)
1395                        .build()
1396                )
1397                .await
1398                .unwrap_err()
1399                .to_string(),
1400            format!(
1401                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1402                &table_ident
1403            )
1404        );
1405    }
1406
1407    #[tokio::test]
1408    async fn test_list_tables_returns_empty_vector() {
1409        let catalog = new_memory_catalog().await;
1410        let namespace_ident = NamespaceIdent::new("a".into());
1411        create_namespace(&catalog, &namespace_ident).await;
1412
1413        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1414    }
1415
1416    #[tokio::test]
1417    async fn test_list_tables_returns_a_single_table() {
1418        let catalog = new_memory_catalog().await;
1419        let namespace_ident = NamespaceIdent::new("n1".into());
1420        create_namespace(&catalog, &namespace_ident).await;
1421
1422        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1423        create_table(&catalog, &table_ident).await;
1424
1425        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1426            table_ident
1427        ]);
1428    }
1429
1430    #[tokio::test]
1431    async fn test_list_tables_returns_multiple_tables() {
1432        let catalog = new_memory_catalog().await;
1433        let namespace_ident = NamespaceIdent::new("n1".into());
1434        create_namespace(&catalog, &namespace_ident).await;
1435
1436        let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1437        let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1438        let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1439
1440        assert_eq!(
1441            to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1442            to_set(vec![table_ident_1, table_ident_2])
1443        );
1444    }
1445
1446    #[tokio::test]
1447    async fn test_list_tables_returns_tables_from_correct_namespace() {
1448        let catalog = new_memory_catalog().await;
1449        let namespace_ident_1 = NamespaceIdent::new("n1".into());
1450        let namespace_ident_2 = NamespaceIdent::new("n2".into());
1451        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1452
1453        let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1454        let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1455        let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1456        let _ = create_tables(&catalog, vec![
1457            &table_ident_1,
1458            &table_ident_2,
1459            &table_ident_3,
1460        ])
1461        .await;
1462
1463        assert_eq!(
1464            to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1465            to_set(vec![table_ident_1, table_ident_2])
1466        );
1467
1468        assert_eq!(
1469            to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1470            to_set(vec![table_ident_3])
1471        );
1472    }
1473
1474    #[tokio::test]
1475    async fn test_list_tables_returns_table_under_nested_namespace() {
1476        let catalog = new_memory_catalog().await;
1477        let namespace_ident_a = NamespaceIdent::new("a".into());
1478        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1479        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1480
1481        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1482        create_table(&catalog, &table_ident).await;
1483
1484        assert_eq!(
1485            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1486            vec![table_ident]
1487        );
1488    }
1489
1490    #[tokio::test]
1491    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1492        let catalog = new_memory_catalog().await;
1493
1494        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1495
1496        assert_eq!(
1497            catalog
1498                .list_tables(&non_existent_namespace_ident)
1499                .await
1500                .unwrap_err()
1501                .to_string(),
1502            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1503        );
1504    }
1505
1506    #[tokio::test]
1507    async fn test_drop_table() {
1508        let catalog = new_memory_catalog().await;
1509        let namespace_ident = NamespaceIdent::new("n1".into());
1510        create_namespace(&catalog, &namespace_ident).await;
1511        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1512        create_table(&catalog, &table_ident).await;
1513
1514        catalog.drop_table(&table_ident).await.unwrap();
1515    }
1516
1517    #[tokio::test]
1518    async fn test_drop_table_drops_table_under_nested_namespace() {
1519        let catalog = new_memory_catalog().await;
1520        let namespace_ident_a = NamespaceIdent::new("a".into());
1521        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1522        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1523
1524        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1525        create_table(&catalog, &table_ident).await;
1526
1527        catalog.drop_table(&table_ident).await.unwrap();
1528
1529        assert_eq!(
1530            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1531            vec![]
1532        );
1533    }
1534
1535    #[tokio::test]
1536    async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1537        let catalog = new_memory_catalog().await;
1538
1539        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1540        let non_existent_table_ident =
1541            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1542
1543        assert_eq!(
1544            catalog
1545                .drop_table(&non_existent_table_ident)
1546                .await
1547                .unwrap_err()
1548                .to_string(),
1549            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1550        );
1551    }
1552
1553    #[tokio::test]
1554    async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1555        let catalog = new_memory_catalog().await;
1556        let namespace_ident = NamespaceIdent::new("n1".into());
1557        create_namespace(&catalog, &namespace_ident).await;
1558
1559        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1560
1561        assert_eq!(
1562            catalog
1563                .drop_table(&non_existent_table_ident)
1564                .await
1565                .unwrap_err()
1566                .to_string(),
1567            format!("TableNotFound => No such table: {non_existent_table_ident:?}"),
1568        );
1569    }
1570
1571    #[tokio::test]
1572    async fn test_table_exists_returns_true() {
1573        let catalog = new_memory_catalog().await;
1574        let namespace_ident = NamespaceIdent::new("n1".into());
1575        create_namespace(&catalog, &namespace_ident).await;
1576        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1577        create_table(&catalog, &table_ident).await;
1578
1579        assert!(catalog.table_exists(&table_ident).await.unwrap());
1580    }
1581
1582    #[tokio::test]
1583    async fn test_table_exists_returns_false() {
1584        let catalog = new_memory_catalog().await;
1585        let namespace_ident = NamespaceIdent::new("n1".into());
1586        create_namespace(&catalog, &namespace_ident).await;
1587        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1588
1589        assert!(
1590            !catalog
1591                .table_exists(&non_existent_table_ident)
1592                .await
1593                .unwrap()
1594        );
1595    }
1596
1597    #[tokio::test]
1598    async fn test_table_exists_under_nested_namespace() {
1599        let catalog = new_memory_catalog().await;
1600        let namespace_ident_a = NamespaceIdent::new("a".into());
1601        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1602        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1603
1604        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1605        create_table(&catalog, &table_ident).await;
1606
1607        assert!(catalog.table_exists(&table_ident).await.unwrap());
1608
1609        let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1610        assert!(
1611            !catalog
1612                .table_exists(&non_existent_table_ident)
1613                .await
1614                .unwrap()
1615        );
1616    }
1617
1618    #[tokio::test]
1619    async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1620        let catalog = new_memory_catalog().await;
1621
1622        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1623        let non_existent_table_ident =
1624            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1625
1626        assert_eq!(
1627            catalog
1628                .table_exists(&non_existent_table_ident)
1629                .await
1630                .unwrap_err()
1631                .to_string(),
1632            format!("NamespaceNotFound => No such namespace: {non_existent_namespace_ident:?}"),
1633        );
1634    }
1635
1636    #[tokio::test]
1637    async fn test_rename_table_in_same_namespace() {
1638        let catalog = new_memory_catalog().await;
1639        let namespace_ident = NamespaceIdent::new("n1".into());
1640        create_namespace(&catalog, &namespace_ident).await;
1641        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1642        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1643        create_table(&catalog, &src_table_ident).await;
1644
1645        catalog
1646            .rename_table(&src_table_ident, &dst_table_ident)
1647            .await
1648            .unwrap();
1649
1650        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1651            dst_table_ident
1652        ],);
1653    }
1654
1655    #[tokio::test]
1656    async fn test_rename_table_across_namespaces() {
1657        let catalog = new_memory_catalog().await;
1658        let src_namespace_ident = NamespaceIdent::new("a".into());
1659        let dst_namespace_ident = NamespaceIdent::new("b".into());
1660        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1661        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1662        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1663        create_table(&catalog, &src_table_ident).await;
1664
1665        catalog
1666            .rename_table(&src_table_ident, &dst_table_ident)
1667            .await
1668            .unwrap();
1669
1670        assert_eq!(
1671            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1672            vec![],
1673        );
1674
1675        assert_eq!(
1676            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1677            vec![dst_table_ident],
1678        );
1679    }
1680
1681    #[tokio::test]
1682    async fn test_rename_table_src_table_is_same_as_dst_table() {
1683        let catalog = new_memory_catalog().await;
1684        let namespace_ident = NamespaceIdent::new("n1".into());
1685        create_namespace(&catalog, &namespace_ident).await;
1686        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1687        create_table(&catalog, &table_ident).await;
1688
1689        catalog
1690            .rename_table(&table_ident, &table_ident)
1691            .await
1692            .unwrap();
1693
1694        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1695            table_ident
1696        ],);
1697    }
1698
1699    #[tokio::test]
1700    async fn test_rename_table_across_nested_namespaces() {
1701        let catalog = new_memory_catalog().await;
1702        let namespace_ident_a = NamespaceIdent::new("a".into());
1703        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1704        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1705        create_namespaces(&catalog, &vec![
1706            &namespace_ident_a,
1707            &namespace_ident_a_b,
1708            &namespace_ident_a_b_c,
1709        ])
1710        .await;
1711
1712        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1713        create_tables(&catalog, vec![&src_table_ident]).await;
1714
1715        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1716        catalog
1717            .rename_table(&src_table_ident, &dst_table_ident)
1718            .await
1719            .unwrap();
1720
1721        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1722
1723        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1724    }
1725
1726    #[tokio::test]
1727    async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1728        let catalog = new_memory_catalog().await;
1729
1730        let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1731        let src_table_ident =
1732            TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1733
1734        let dst_namespace_ident = NamespaceIdent::new("n2".into());
1735        create_namespace(&catalog, &dst_namespace_ident).await;
1736        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1737
1738        assert_eq!(
1739            catalog
1740                .rename_table(&src_table_ident, &dst_table_ident)
1741                .await
1742                .unwrap_err()
1743                .to_string(),
1744            format!("NamespaceNotFound => No such namespace: {non_existent_src_namespace_ident:?}"),
1745        );
1746    }
1747
1748    #[tokio::test]
1749    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1750        let catalog = new_memory_catalog().await;
1751        let src_namespace_ident = NamespaceIdent::new("n1".into());
1752        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1753        create_namespace(&catalog, &src_namespace_ident).await;
1754        create_table(&catalog, &src_table_ident).await;
1755
1756        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1757        let dst_table_ident =
1758            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1759        assert_eq!(
1760            catalog
1761                .rename_table(&src_table_ident, &dst_table_ident)
1762                .await
1763                .unwrap_err()
1764                .to_string(),
1765            format!("NamespaceNotFound => No such namespace: {non_existent_dst_namespace_ident:?}"),
1766        );
1767    }
1768
1769    #[tokio::test]
1770    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1771        let catalog = new_memory_catalog().await;
1772        let namespace_ident = NamespaceIdent::new("n1".into());
1773        create_namespace(&catalog, &namespace_ident).await;
1774        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1775        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1776
1777        assert_eq!(
1778            catalog
1779                .rename_table(&src_table_ident, &dst_table_ident)
1780                .await
1781                .unwrap_err()
1782                .to_string(),
1783            format!("TableNotFound => No such table: {src_table_ident:?}"),
1784        );
1785    }
1786
1787    #[tokio::test]
1788    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1789        let catalog = new_memory_catalog().await;
1790        let namespace_ident = NamespaceIdent::new("n1".into());
1791        create_namespace(&catalog, &namespace_ident).await;
1792        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1793        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1794        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1795
1796        assert_eq!(
1797            catalog
1798                .rename_table(&src_table_ident, &dst_table_ident)
1799                .await
1800                .unwrap_err()
1801                .to_string(),
1802            format!(
1803                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1804                &dst_table_ident
1805            ),
1806        );
1807    }
1808
1809    #[tokio::test]
1810    async fn test_register_table() {
1811        // Create a catalog and namespace
1812        let catalog = new_memory_catalog().await;
1813        let namespace_ident = NamespaceIdent::new("test_namespace".into());
1814        create_namespace(&catalog, &namespace_ident).await;
1815
1816        // Create a table to get a valid metadata file
1817        let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1818        create_table(&catalog, &source_table_ident).await;
1819
1820        // Get the metadata location from the source table
1821        let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1822        let metadata_location = source_table.metadata_location().unwrap().to_string();
1823
1824        // Register a new table using the same metadata location
1825        let register_table_ident =
1826            TableIdent::new(namespace_ident.clone(), "register_table".into());
1827        let registered_table = catalog
1828            .register_table(&register_table_ident, metadata_location.clone())
1829            .await
1830            .unwrap();
1831
1832        // Verify the registered table has the correct identifier
1833        assert_eq!(registered_table.identifier(), &register_table_ident);
1834
1835        // Verify the registered table has the correct metadata location
1836        assert_eq!(
1837            registered_table.metadata_location().unwrap().to_string(),
1838            metadata_location
1839        );
1840
1841        // Verify the table exists in the catalog
1842        assert!(catalog.table_exists(&register_table_ident).await.unwrap());
1843
1844        // Verify we can load the registered table
1845        let loaded_table = catalog.load_table(&register_table_ident).await.unwrap();
1846        assert_eq!(loaded_table.identifier(), &register_table_ident);
1847        assert_eq!(
1848            loaded_table.metadata_location().unwrap().to_string(),
1849            metadata_location
1850        );
1851    }
1852
1853    #[tokio::test]
1854    async fn test_update_table() {
1855        let catalog = new_memory_catalog().await;
1856
1857        let table = create_table_with_namespace(&catalog).await;
1858
1859        // Assert the table doesn't contain the update yet
1860        assert!(!table.metadata().properties().contains_key("key"));
1861
1862        // Update table metadata
1863        let tx = Transaction::new(&table);
1864        let updated_table = tx
1865            .update_table_properties()
1866            .set("key".to_string(), "value".to_string())
1867            .apply(tx)
1868            .unwrap()
1869            .commit(&catalog)
1870            .await
1871            .unwrap();
1872
1873        assert_eq!(
1874            updated_table.metadata().properties().get("key").unwrap(),
1875            "value"
1876        );
1877
1878        assert_eq!(table.identifier(), updated_table.identifier());
1879        assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1880        assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1881        assert_ne!(table.metadata_location(), updated_table.metadata_location());
1882
1883        assert!(
1884            table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1885        );
1886    }
1887
1888    #[tokio::test]
1889    async fn test_update_table_fails_if_table_doesnt_exist() {
1890        let catalog = new_memory_catalog().await;
1891
1892        let namespace_ident = NamespaceIdent::new("a".into());
1893        create_namespace(&catalog, &namespace_ident).await;
1894
1895        // This table is not known to the catalog.
1896        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1897        let table = build_table(table_ident);
1898
1899        let tx = Transaction::new(&table);
1900        let err = tx
1901            .update_table_properties()
1902            .set("key".to_string(), "value".to_string())
1903            .apply(tx)
1904            .unwrap()
1905            .commit(&catalog)
1906            .await
1907            .unwrap_err();
1908        assert_eq!(err.kind(), ErrorKind::TableNotFound);
1909    }
1910
1911    fn build_table(ident: TableIdent) -> Table {
1912        let file_io = FileIO::new_with_fs();
1913
1914        let temp_dir = TempDir::new().unwrap();
1915        let location = temp_dir.path().to_str().unwrap().to_string();
1916
1917        let table_creation = TableCreation::builder()
1918            .name(ident.name().to_string())
1919            .schema(simple_table_schema())
1920            .location(location)
1921            .build();
1922        let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1923            .unwrap()
1924            .build()
1925            .unwrap()
1926            .metadata;
1927
1928        Table::builder()
1929            .identifier(ident)
1930            .metadata(metadata)
1931            .file_io(file_io)
1932            .build()
1933            .unwrap()
1934    }
1935}