iceberg/catalog/memory/
catalog.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains memory catalog implementation.
19
20use std::collections::HashMap;
21
22use async_trait::async_trait;
23use futures::lock::{Mutex, MutexGuard};
24use itertools::Itertools;
25
26use super::namespace_state::NamespaceState;
27use crate::io::FileIO;
28use crate::spec::{TableMetadata, TableMetadataBuilder};
29use crate::table::Table;
30use crate::{
31    Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result,
32    TableCommit, TableCreation, TableIdent,
33};
34
35/// Memory catalog warehouse location
36pub const MEMORY_CATALOG_WAREHOUSE: &str = "warehouse";
37
38/// namespace `location` property
39const LOCATION: &str = "location";
40
41/// Builder for [`MemoryCatalog`].
42#[derive(Debug)]
43pub struct MemoryCatalogBuilder(MemoryCatalogConfig);
44
45impl Default for MemoryCatalogBuilder {
46    fn default() -> Self {
47        Self(MemoryCatalogConfig {
48            name: None,
49            warehouse: "".to_string(),
50            props: HashMap::new(),
51        })
52    }
53}
54
55impl CatalogBuilder for MemoryCatalogBuilder {
56    type C = MemoryCatalog;
57
58    fn load(
59        mut self,
60        name: impl Into<String>,
61        props: HashMap<String, String>,
62    ) -> impl Future<Output = Result<Self::C>> + Send {
63        self.0.name = Some(name.into());
64
65        if props.contains_key(MEMORY_CATALOG_WAREHOUSE) {
66            self.0.warehouse = props
67                .get(MEMORY_CATALOG_WAREHOUSE)
68                .cloned()
69                .unwrap_or_default()
70        }
71
72        // Collect other remaining properties
73        self.0.props = props
74            .into_iter()
75            .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE)
76            .collect();
77
78        let result = {
79            if self.0.name.is_none() {
80                Err(Error::new(
81                    ErrorKind::DataInvalid,
82                    "Catalog name is required",
83                ))
84            } else if self.0.warehouse.is_empty() {
85                Err(Error::new(
86                    ErrorKind::DataInvalid,
87                    "Catalog warehouse is required",
88                ))
89            } else {
90                MemoryCatalog::new(self.0)
91            }
92        };
93
94        std::future::ready(result)
95    }
96}
97
98#[derive(Clone, Debug)]
99pub(crate) struct MemoryCatalogConfig {
100    name: Option<String>,
101    warehouse: String,
102    props: HashMap<String, String>,
103}
104
105/// Memory catalog implementation.
106#[derive(Debug)]
107pub struct MemoryCatalog {
108    root_namespace_state: Mutex<NamespaceState>,
109    file_io: FileIO,
110    warehouse_location: String,
111}
112
113impl MemoryCatalog {
114    /// Creates a memory catalog.
115    fn new(config: MemoryCatalogConfig) -> Result<Self> {
116        Ok(Self {
117            root_namespace_state: Mutex::new(NamespaceState::default()),
118            file_io: FileIO::from_path(&config.warehouse)?
119                .with_props(config.props)
120                .build()?,
121            warehouse_location: config.warehouse,
122        })
123    }
124
125    /// Loads a table from the locked namespace state.
126    async fn load_table_from_locked_state(
127        &self,
128        table_ident: &TableIdent,
129        root_namespace_state: &MutexGuard<'_, NamespaceState>,
130    ) -> Result<Table> {
131        let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?;
132        let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?;
133
134        Table::builder()
135            .identifier(table_ident.clone())
136            .metadata(metadata)
137            .metadata_location(metadata_location.to_string())
138            .file_io(self.file_io.clone())
139            .build()
140    }
141}
142
143#[async_trait]
144impl Catalog for MemoryCatalog {
145    /// List namespaces inside the catalog.
146    async fn list_namespaces(
147        &self,
148        maybe_parent: Option<&NamespaceIdent>,
149    ) -> Result<Vec<NamespaceIdent>> {
150        let root_namespace_state = self.root_namespace_state.lock().await;
151
152        match maybe_parent {
153            None => {
154                let namespaces = root_namespace_state
155                    .list_top_level_namespaces()
156                    .into_iter()
157                    .map(|str| NamespaceIdent::new(str.to_string()))
158                    .collect_vec();
159
160                Ok(namespaces)
161            }
162            Some(parent_namespace_ident) => {
163                let namespaces = root_namespace_state
164                    .list_namespaces_under(parent_namespace_ident)?
165                    .into_iter()
166                    .map(|name| NamespaceIdent::new(name.to_string()))
167                    .collect_vec();
168
169                Ok(namespaces)
170            }
171        }
172    }
173
174    /// Create a new namespace inside the catalog.
175    async fn create_namespace(
176        &self,
177        namespace_ident: &NamespaceIdent,
178        properties: HashMap<String, String>,
179    ) -> Result<Namespace> {
180        let mut root_namespace_state = self.root_namespace_state.lock().await;
181
182        root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?;
183        let namespace = Namespace::with_properties(namespace_ident.clone(), properties);
184
185        Ok(namespace)
186    }
187
188    /// Get a namespace information from the catalog.
189    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> {
190        let root_namespace_state = self.root_namespace_state.lock().await;
191
192        let namespace = Namespace::with_properties(
193            namespace_ident.clone(),
194            root_namespace_state
195                .get_properties(namespace_ident)?
196                .clone(),
197        );
198
199        Ok(namespace)
200    }
201
202    /// Check if namespace exists in catalog.
203    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> {
204        let guarded_namespaces = self.root_namespace_state.lock().await;
205
206        Ok(guarded_namespaces.namespace_exists(namespace_ident))
207    }
208
209    /// Update a namespace inside the catalog.
210    ///
211    /// # Behavior
212    ///
213    /// The properties must be the full set of namespace.
214    async fn update_namespace(
215        &self,
216        namespace_ident: &NamespaceIdent,
217        properties: HashMap<String, String>,
218    ) -> Result<()> {
219        let mut root_namespace_state = self.root_namespace_state.lock().await;
220
221        root_namespace_state.replace_properties(namespace_ident, properties)
222    }
223
224    /// Drop a namespace from the catalog.
225    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> {
226        let mut root_namespace_state = self.root_namespace_state.lock().await;
227
228        root_namespace_state.remove_existing_namespace(namespace_ident)
229    }
230
231    /// List tables from namespace.
232    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> {
233        let root_namespace_state = self.root_namespace_state.lock().await;
234
235        let table_names = root_namespace_state.list_tables(namespace_ident)?;
236        let table_idents = table_names
237            .into_iter()
238            .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone()))
239            .collect_vec();
240
241        Ok(table_idents)
242    }
243
244    /// Create a new table inside the namespace.
245    async fn create_table(
246        &self,
247        namespace_ident: &NamespaceIdent,
248        table_creation: TableCreation,
249    ) -> Result<Table> {
250        let mut root_namespace_state = self.root_namespace_state.lock().await;
251
252        let table_name = table_creation.name.clone();
253        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
254
255        let (table_creation, location) = match table_creation.location.clone() {
256            Some(location) => (table_creation, location),
257            None => {
258                let namespace_properties = root_namespace_state.get_properties(namespace_ident)?;
259                let location_prefix = match namespace_properties.get(LOCATION) {
260                    Some(namespace_location) => namespace_location.clone(),
261                    None => format!("{}/{}", self.warehouse_location, namespace_ident.join("/")),
262                };
263
264                let location = format!("{}/{}", location_prefix, table_ident.name());
265
266                let new_table_creation = TableCreation {
267                    location: Some(location.clone()),
268                    ..table_creation
269                };
270
271                (new_table_creation, location)
272            }
273        };
274
275        let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
276            .build()?
277            .metadata;
278        let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
279
280        metadata.write_to(&self.file_io, &metadata_location).await?;
281
282        root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
283
284        Table::builder()
285            .file_io(self.file_io.clone())
286            .metadata_location(metadata_location)
287            .metadata(metadata)
288            .identifier(table_ident)
289            .build()
290    }
291
292    /// Load table from the catalog.
293    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
294        let root_namespace_state = self.root_namespace_state.lock().await;
295
296        self.load_table_from_locked_state(table_ident, &root_namespace_state)
297            .await
298    }
299
300    /// Drop a table from the catalog.
301    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
302        let mut root_namespace_state = self.root_namespace_state.lock().await;
303
304        let metadata_location = root_namespace_state.remove_existing_table(table_ident)?;
305        self.file_io.delete(&metadata_location).await
306    }
307
308    /// Check if a table exists in the catalog.
309    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
310        let root_namespace_state = self.root_namespace_state.lock().await;
311
312        root_namespace_state.table_exists(table_ident)
313    }
314
315    /// Rename a table in the catalog.
316    async fn rename_table(
317        &self,
318        src_table_ident: &TableIdent,
319        dst_table_ident: &TableIdent,
320    ) -> Result<()> {
321        let mut root_namespace_state = self.root_namespace_state.lock().await;
322
323        let mut new_root_namespace_state = root_namespace_state.clone();
324        let metadata_location = new_root_namespace_state
325            .get_existing_table_location(src_table_ident)?
326            .clone();
327        new_root_namespace_state.remove_existing_table(src_table_ident)?;
328        new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
329        *root_namespace_state = new_root_namespace_state;
330
331        Ok(())
332    }
333
334    async fn register_table(
335        &self,
336        table_ident: &TableIdent,
337        metadata_location: String,
338    ) -> Result<Table> {
339        let mut root_namespace_state = self.root_namespace_state.lock().await;
340        root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?;
341
342        let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
343
344        Table::builder()
345            .file_io(self.file_io.clone())
346            .metadata_location(metadata_location)
347            .metadata(metadata)
348            .identifier(table_ident.clone())
349            .build()
350    }
351
352    /// Update a table in the catalog.
353    async fn update_table(&self, commit: TableCommit) -> Result<Table> {
354        let mut root_namespace_state = self.root_namespace_state.lock().await;
355
356        let current_table = self
357            .load_table_from_locked_state(commit.identifier(), &root_namespace_state)
358            .await?;
359
360        // Apply TableCommit to get staged table
361        let staged_table = commit.apply(current_table)?;
362
363        // Write table metadata to the new location
364        staged_table
365            .metadata()
366            .write_to(
367                staged_table.file_io(),
368                staged_table.metadata_location_result()?,
369            )
370            .await?;
371
372        // Flip the pointer to reference the new metadata file.
373        let updated_table = root_namespace_state.commit_table_update(staged_table)?;
374
375        Ok(updated_table)
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use std::collections::HashSet;
382    use std::hash::Hash;
383    use std::iter::FromIterator;
384    use std::vec;
385
386    use regex::Regex;
387    use tempfile::TempDir;
388
389    use super::*;
390    use crate::io::FileIOBuilder;
391    use crate::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
392    use crate::transaction::{ApplyTransactionAction, Transaction};
393
394    fn temp_path() -> String {
395        let temp_dir = TempDir::new().unwrap();
396        temp_dir.path().to_str().unwrap().to_string()
397    }
398
399    async fn new_memory_catalog() -> impl Catalog {
400        let warehouse_location = temp_path();
401        MemoryCatalogBuilder::default()
402            .load(
403                "memory",
404                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_location)]),
405            )
406            .await
407            .unwrap()
408    }
409
410    async fn create_namespace<C: Catalog>(catalog: &C, namespace_ident: &NamespaceIdent) {
411        let _ = catalog
412            .create_namespace(namespace_ident, HashMap::new())
413            .await
414            .unwrap();
415    }
416
417    async fn create_namespaces<C: Catalog>(catalog: &C, namespace_idents: &Vec<&NamespaceIdent>) {
418        for namespace_ident in namespace_idents {
419            let _ = create_namespace(catalog, namespace_ident).await;
420        }
421    }
422
423    fn to_set<T: Eq + Hash>(vec: Vec<T>) -> HashSet<T> {
424        HashSet::from_iter(vec)
425    }
426
427    fn simple_table_schema() -> Schema {
428        Schema::builder()
429            .with_fields(vec![
430                NestedField::required(1, "foo", Type::Primitive(PrimitiveType::Int)).into(),
431            ])
432            .build()
433            .unwrap()
434    }
435
436    async fn create_table<C: Catalog>(catalog: &C, table_ident: &TableIdent) -> Table {
437        catalog
438            .create_table(
439                &table_ident.namespace,
440                TableCreation::builder()
441                    .name(table_ident.name().into())
442                    .schema(simple_table_schema())
443                    .build(),
444            )
445            .await
446            .unwrap()
447    }
448
449    async fn create_tables<C: Catalog>(catalog: &C, table_idents: Vec<&TableIdent>) {
450        for table_ident in table_idents {
451            create_table(catalog, table_ident).await;
452        }
453    }
454
455    async fn create_table_with_namespace<C: Catalog>(catalog: &C) -> Table {
456        let namespace_ident = NamespaceIdent::new("abc".into());
457        create_namespace(catalog, &namespace_ident).await;
458
459        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
460        create_table(catalog, &table_ident).await
461    }
462
463    fn assert_table_eq(table: &Table, expected_table_ident: &TableIdent, expected_schema: &Schema) {
464        assert_eq!(table.identifier(), expected_table_ident);
465
466        let metadata = table.metadata();
467
468        assert_eq!(metadata.current_schema().as_ref(), expected_schema);
469
470        let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone())
471            .with_spec_id(0)
472            .build()
473            .unwrap();
474
475        assert_eq!(
476            metadata
477                .partition_specs_iter()
478                .map(|p| p.as_ref())
479                .collect_vec(),
480            vec![&expected_partition_spec]
481        );
482
483        let expected_sorted_order = SortOrder::builder()
484            .with_order_id(0)
485            .with_fields(vec![])
486            .build(expected_schema)
487            .unwrap();
488
489        assert_eq!(
490            metadata
491                .sort_orders_iter()
492                .map(|s| s.as_ref())
493                .collect_vec(),
494            vec![&expected_sorted_order]
495        );
496
497        assert_eq!(metadata.properties(), &HashMap::new());
498
499        assert!(!table.readonly());
500    }
501
502    const UUID_REGEX_STR: &str = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}";
503
504    fn assert_table_metadata_location_matches(table: &Table, regex_str: &str) {
505        let actual = table.metadata_location().unwrap().to_string();
506        let regex = Regex::new(regex_str).unwrap();
507        assert!(
508            regex.is_match(&actual),
509            "Expected metadata location to match regex, but got location: {} and regex: {}",
510            actual,
511            regex
512        )
513    }
514
515    #[tokio::test]
516    async fn test_list_namespaces_returns_empty_vector() {
517        let catalog = new_memory_catalog().await;
518
519        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
520    }
521
522    #[tokio::test]
523    async fn test_list_namespaces_returns_single_namespace() {
524        let catalog = new_memory_catalog().await;
525        let namespace_ident = NamespaceIdent::new("abc".into());
526        create_namespace(&catalog, &namespace_ident).await;
527
528        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
529            namespace_ident
530        ]);
531    }
532
533    #[tokio::test]
534    async fn test_list_namespaces_returns_multiple_namespaces() {
535        let catalog = new_memory_catalog().await;
536        let namespace_ident_1 = NamespaceIdent::new("a".into());
537        let namespace_ident_2 = NamespaceIdent::new("b".into());
538        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
539
540        assert_eq!(
541            to_set(catalog.list_namespaces(None).await.unwrap()),
542            to_set(vec![namespace_ident_1, namespace_ident_2])
543        );
544    }
545
546    #[tokio::test]
547    async fn test_list_namespaces_returns_only_top_level_namespaces() {
548        let catalog = new_memory_catalog().await;
549        let namespace_ident_1 = NamespaceIdent::new("a".into());
550        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
551        let namespace_ident_3 = NamespaceIdent::new("b".into());
552        create_namespaces(&catalog, &vec![
553            &namespace_ident_1,
554            &namespace_ident_2,
555            &namespace_ident_3,
556        ])
557        .await;
558
559        assert_eq!(
560            to_set(catalog.list_namespaces(None).await.unwrap()),
561            to_set(vec![namespace_ident_1, namespace_ident_3])
562        );
563    }
564
565    #[tokio::test]
566    async fn test_list_namespaces_returns_no_namespaces_under_parent() {
567        let catalog = new_memory_catalog().await;
568        let namespace_ident_1 = NamespaceIdent::new("a".into());
569        let namespace_ident_2 = NamespaceIdent::new("b".into());
570        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
571
572        assert_eq!(
573            catalog
574                .list_namespaces(Some(&namespace_ident_1))
575                .await
576                .unwrap(),
577            vec![]
578        );
579    }
580
581    #[tokio::test]
582    async fn test_list_namespaces_returns_namespace_under_parent() {
583        let catalog = new_memory_catalog().await;
584        let namespace_ident_1 = NamespaceIdent::new("a".into());
585        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
586        let namespace_ident_3 = NamespaceIdent::new("c".into());
587        create_namespaces(&catalog, &vec![
588            &namespace_ident_1,
589            &namespace_ident_2,
590            &namespace_ident_3,
591        ])
592        .await;
593
594        assert_eq!(
595            to_set(catalog.list_namespaces(None).await.unwrap()),
596            to_set(vec![namespace_ident_1.clone(), namespace_ident_3])
597        );
598
599        assert_eq!(
600            catalog
601                .list_namespaces(Some(&namespace_ident_1))
602                .await
603                .unwrap(),
604            vec![NamespaceIdent::new("b".into())]
605        );
606    }
607
608    #[tokio::test]
609    async fn test_list_namespaces_returns_multiple_namespaces_under_parent() {
610        let catalog = new_memory_catalog().await;
611        let namespace_ident_1 = NamespaceIdent::new("a".to_string());
612        let namespace_ident_2 = NamespaceIdent::from_strs(vec!["a", "a"]).unwrap();
613        let namespace_ident_3 = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
614        let namespace_ident_4 = NamespaceIdent::from_strs(vec!["a", "c"]).unwrap();
615        let namespace_ident_5 = NamespaceIdent::new("b".into());
616        create_namespaces(&catalog, &vec![
617            &namespace_ident_1,
618            &namespace_ident_2,
619            &namespace_ident_3,
620            &namespace_ident_4,
621            &namespace_ident_5,
622        ])
623        .await;
624
625        assert_eq!(
626            to_set(
627                catalog
628                    .list_namespaces(Some(&namespace_ident_1))
629                    .await
630                    .unwrap()
631            ),
632            to_set(vec![
633                NamespaceIdent::new("a".into()),
634                NamespaceIdent::new("b".into()),
635                NamespaceIdent::new("c".into()),
636            ])
637        );
638    }
639
640    #[tokio::test]
641    async fn test_namespace_exists_returns_false() {
642        let catalog = new_memory_catalog().await;
643        let namespace_ident = NamespaceIdent::new("a".into());
644        create_namespace(&catalog, &namespace_ident).await;
645
646        assert!(
647            !catalog
648                .namespace_exists(&NamespaceIdent::new("b".into()))
649                .await
650                .unwrap()
651        );
652    }
653
654    #[tokio::test]
655    async fn test_namespace_exists_returns_true() {
656        let catalog = new_memory_catalog().await;
657        let namespace_ident = NamespaceIdent::new("a".into());
658        create_namespace(&catalog, &namespace_ident).await;
659
660        assert!(catalog.namespace_exists(&namespace_ident).await.unwrap());
661    }
662
663    #[tokio::test]
664    async fn test_create_namespace_with_empty_properties() {
665        let catalog = new_memory_catalog().await;
666        let namespace_ident = NamespaceIdent::new("a".into());
667
668        assert_eq!(
669            catalog
670                .create_namespace(&namespace_ident, HashMap::new())
671                .await
672                .unwrap(),
673            Namespace::new(namespace_ident.clone())
674        );
675
676        assert_eq!(
677            catalog.get_namespace(&namespace_ident).await.unwrap(),
678            Namespace::with_properties(namespace_ident, HashMap::new())
679        );
680    }
681
682    #[tokio::test]
683    async fn test_create_namespace_with_properties() {
684        let catalog = new_memory_catalog().await;
685        let namespace_ident = NamespaceIdent::new("abc".into());
686
687        let mut properties: HashMap<String, String> = HashMap::new();
688        properties.insert("k".into(), "v".into());
689
690        assert_eq!(
691            catalog
692                .create_namespace(&namespace_ident, properties.clone())
693                .await
694                .unwrap(),
695            Namespace::with_properties(namespace_ident.clone(), properties.clone())
696        );
697
698        assert_eq!(
699            catalog.get_namespace(&namespace_ident).await.unwrap(),
700            Namespace::with_properties(namespace_ident, properties)
701        );
702    }
703
704    #[tokio::test]
705    async fn test_create_namespace_throws_error_if_namespace_already_exists() {
706        let catalog = new_memory_catalog().await;
707        let namespace_ident = NamespaceIdent::new("a".into());
708        create_namespace(&catalog, &namespace_ident).await;
709
710        assert_eq!(
711            catalog
712                .create_namespace(&namespace_ident, HashMap::new())
713                .await
714                .unwrap_err()
715                .to_string(),
716            format!(
717                "NamespaceAlreadyExists => Cannot create namespace {:?}. Namespace already exists.",
718                &namespace_ident
719            )
720        );
721
722        assert_eq!(
723            catalog.get_namespace(&namespace_ident).await.unwrap(),
724            Namespace::with_properties(namespace_ident, HashMap::new())
725        );
726    }
727
728    #[tokio::test]
729    async fn test_create_nested_namespace() {
730        let catalog = new_memory_catalog().await;
731        let parent_namespace_ident = NamespaceIdent::new("a".into());
732        create_namespace(&catalog, &parent_namespace_ident).await;
733
734        let child_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
735
736        assert_eq!(
737            catalog
738                .create_namespace(&child_namespace_ident, HashMap::new())
739                .await
740                .unwrap(),
741            Namespace::new(child_namespace_ident.clone())
742        );
743
744        assert_eq!(
745            catalog.get_namespace(&child_namespace_ident).await.unwrap(),
746            Namespace::with_properties(child_namespace_ident, HashMap::new())
747        );
748    }
749
750    #[tokio::test]
751    async fn test_create_deeply_nested_namespace() {
752        let catalog = new_memory_catalog().await;
753        let namespace_ident_a = NamespaceIdent::new("a".into());
754        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
755        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
756
757        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
758
759        assert_eq!(
760            catalog
761                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
762                .await
763                .unwrap(),
764            Namespace::new(namespace_ident_a_b_c.clone())
765        );
766
767        assert_eq!(
768            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
769            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
770        );
771    }
772
773    #[tokio::test]
774    async fn test_create_nested_namespace_throws_error_if_top_level_namespace_doesnt_exist() {
775        let catalog = new_memory_catalog().await;
776
777        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
778
779        assert_eq!(
780            catalog
781                .create_namespace(&nested_namespace_ident, HashMap::new())
782                .await
783                .unwrap_err()
784                .to_string(),
785            format!(
786                "NamespaceNotFound => No such namespace: {:?}",
787                NamespaceIdent::new("a".into())
788            )
789        );
790
791        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![]);
792    }
793
794    #[tokio::test]
795    async fn test_create_deeply_nested_namespace_throws_error_if_intermediate_namespace_doesnt_exist()
796     {
797        let catalog = new_memory_catalog().await;
798
799        let namespace_ident_a = NamespaceIdent::new("a".into());
800        create_namespace(&catalog, &namespace_ident_a).await;
801
802        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
803
804        assert_eq!(
805            catalog
806                .create_namespace(&namespace_ident_a_b_c, HashMap::new())
807                .await
808                .unwrap_err()
809                .to_string(),
810            format!(
811                "NamespaceNotFound => No such namespace: {:?}",
812                NamespaceIdent::from_strs(vec!["a", "b"]).unwrap()
813            )
814        );
815
816        assert_eq!(catalog.list_namespaces(None).await.unwrap(), vec![
817            namespace_ident_a.clone()
818        ]);
819
820        assert_eq!(
821            catalog
822                .list_namespaces(Some(&namespace_ident_a))
823                .await
824                .unwrap(),
825            vec![]
826        );
827    }
828
829    #[tokio::test]
830    async fn test_get_namespace() {
831        let catalog = new_memory_catalog().await;
832        let namespace_ident = NamespaceIdent::new("abc".into());
833
834        let mut properties: HashMap<String, String> = HashMap::new();
835        properties.insert("k".into(), "v".into());
836        let _ = catalog
837            .create_namespace(&namespace_ident, properties.clone())
838            .await
839            .unwrap();
840
841        assert_eq!(
842            catalog.get_namespace(&namespace_ident).await.unwrap(),
843            Namespace::with_properties(namespace_ident, properties)
844        )
845    }
846
847    #[tokio::test]
848    async fn test_get_nested_namespace() {
849        let catalog = new_memory_catalog().await;
850        let namespace_ident_a = NamespaceIdent::new("a".into());
851        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
852        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
853
854        assert_eq!(
855            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
856            Namespace::with_properties(namespace_ident_a_b, HashMap::new())
857        );
858    }
859
860    #[tokio::test]
861    async fn test_get_deeply_nested_namespace() {
862        let catalog = new_memory_catalog().await;
863        let namespace_ident_a = NamespaceIdent::new("a".into());
864        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
865        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
866        create_namespaces(&catalog, &vec![
867            &namespace_ident_a,
868            &namespace_ident_a_b,
869            &namespace_ident_a_b_c,
870        ])
871        .await;
872
873        assert_eq!(
874            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
875            Namespace::with_properties(namespace_ident_a_b_c, HashMap::new())
876        );
877    }
878
879    #[tokio::test]
880    async fn test_get_namespace_throws_error_if_namespace_doesnt_exist() {
881        let catalog = new_memory_catalog().await;
882        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
883
884        let non_existent_namespace_ident = NamespaceIdent::new("b".into());
885        assert_eq!(
886            catalog
887                .get_namespace(&non_existent_namespace_ident)
888                .await
889                .unwrap_err()
890                .to_string(),
891            format!(
892                "NamespaceNotFound => No such namespace: {:?}",
893                non_existent_namespace_ident
894            )
895        )
896    }
897
898    #[tokio::test]
899    async fn test_update_namespace() {
900        let catalog = new_memory_catalog().await;
901        let namespace_ident = NamespaceIdent::new("abc".into());
902        create_namespace(&catalog, &namespace_ident).await;
903
904        let mut new_properties: HashMap<String, String> = HashMap::new();
905        new_properties.insert("k".into(), "v".into());
906
907        catalog
908            .update_namespace(&namespace_ident, new_properties.clone())
909            .await
910            .unwrap();
911
912        assert_eq!(
913            catalog.get_namespace(&namespace_ident).await.unwrap(),
914            Namespace::with_properties(namespace_ident, new_properties)
915        )
916    }
917
918    #[tokio::test]
919    async fn test_update_nested_namespace() {
920        let catalog = new_memory_catalog().await;
921        let namespace_ident_a = NamespaceIdent::new("a".into());
922        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
923        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
924
925        let mut new_properties = HashMap::new();
926        new_properties.insert("k".into(), "v".into());
927
928        catalog
929            .update_namespace(&namespace_ident_a_b, new_properties.clone())
930            .await
931            .unwrap();
932
933        assert_eq!(
934            catalog.get_namespace(&namespace_ident_a_b).await.unwrap(),
935            Namespace::with_properties(namespace_ident_a_b, new_properties)
936        );
937    }
938
939    #[tokio::test]
940    async fn test_update_deeply_nested_namespace() {
941        let catalog = new_memory_catalog().await;
942        let namespace_ident_a = NamespaceIdent::new("a".into());
943        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
944        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
945        create_namespaces(&catalog, &vec![
946            &namespace_ident_a,
947            &namespace_ident_a_b,
948            &namespace_ident_a_b_c,
949        ])
950        .await;
951
952        let mut new_properties = HashMap::new();
953        new_properties.insert("k".into(), "v".into());
954
955        catalog
956            .update_namespace(&namespace_ident_a_b_c, new_properties.clone())
957            .await
958            .unwrap();
959
960        assert_eq!(
961            catalog.get_namespace(&namespace_ident_a_b_c).await.unwrap(),
962            Namespace::with_properties(namespace_ident_a_b_c, new_properties)
963        );
964    }
965
966    #[tokio::test]
967    async fn test_update_namespace_throws_error_if_namespace_doesnt_exist() {
968        let catalog = new_memory_catalog().await;
969        create_namespace(&catalog, &NamespaceIdent::new("abc".into())).await;
970
971        let non_existent_namespace_ident = NamespaceIdent::new("def".into());
972        assert_eq!(
973            catalog
974                .update_namespace(&non_existent_namespace_ident, HashMap::new())
975                .await
976                .unwrap_err()
977                .to_string(),
978            format!(
979                "NamespaceNotFound => No such namespace: {:?}",
980                non_existent_namespace_ident
981            )
982        )
983    }
984
985    #[tokio::test]
986    async fn test_drop_namespace() {
987        let catalog = new_memory_catalog().await;
988        let namespace_ident = NamespaceIdent::new("abc".into());
989        create_namespace(&catalog, &namespace_ident).await;
990
991        catalog.drop_namespace(&namespace_ident).await.unwrap();
992
993        assert!(!catalog.namespace_exists(&namespace_ident).await.unwrap())
994    }
995
996    #[tokio::test]
997    async fn test_drop_nested_namespace() {
998        let catalog = new_memory_catalog().await;
999        let namespace_ident_a = NamespaceIdent::new("a".into());
1000        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1001        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1002
1003        catalog.drop_namespace(&namespace_ident_a_b).await.unwrap();
1004
1005        assert!(
1006            !catalog
1007                .namespace_exists(&namespace_ident_a_b)
1008                .await
1009                .unwrap()
1010        );
1011
1012        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1013    }
1014
1015    #[tokio::test]
1016    async fn test_drop_deeply_nested_namespace() {
1017        let catalog = new_memory_catalog().await;
1018        let namespace_ident_a = NamespaceIdent::new("a".into());
1019        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1020        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1021        create_namespaces(&catalog, &vec![
1022            &namespace_ident_a,
1023            &namespace_ident_a_b,
1024            &namespace_ident_a_b_c,
1025        ])
1026        .await;
1027
1028        catalog
1029            .drop_namespace(&namespace_ident_a_b_c)
1030            .await
1031            .unwrap();
1032
1033        assert!(
1034            !catalog
1035                .namespace_exists(&namespace_ident_a_b_c)
1036                .await
1037                .unwrap()
1038        );
1039
1040        assert!(
1041            catalog
1042                .namespace_exists(&namespace_ident_a_b)
1043                .await
1044                .unwrap()
1045        );
1046
1047        assert!(catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1048    }
1049
1050    #[tokio::test]
1051    async fn test_drop_namespace_throws_error_if_namespace_doesnt_exist() {
1052        let catalog = new_memory_catalog().await;
1053
1054        let non_existent_namespace_ident = NamespaceIdent::new("abc".into());
1055        assert_eq!(
1056            catalog
1057                .drop_namespace(&non_existent_namespace_ident)
1058                .await
1059                .unwrap_err()
1060                .to_string(),
1061            format!(
1062                "NamespaceNotFound => No such namespace: {:?}",
1063                non_existent_namespace_ident
1064            )
1065        )
1066    }
1067
1068    #[tokio::test]
1069    async fn test_drop_namespace_throws_error_if_nested_namespace_doesnt_exist() {
1070        let catalog = new_memory_catalog().await;
1071        create_namespace(&catalog, &NamespaceIdent::new("a".into())).await;
1072
1073        let non_existent_namespace_ident =
1074            NamespaceIdent::from_vec(vec!["a".into(), "b".into()]).unwrap();
1075        assert_eq!(
1076            catalog
1077                .drop_namespace(&non_existent_namespace_ident)
1078                .await
1079                .unwrap_err()
1080                .to_string(),
1081            format!(
1082                "NamespaceNotFound => No such namespace: {:?}",
1083                non_existent_namespace_ident
1084            )
1085        )
1086    }
1087
1088    #[tokio::test]
1089    async fn test_dropping_a_namespace_also_drops_namespaces_nested_under_that_one() {
1090        let catalog = new_memory_catalog().await;
1091        let namespace_ident_a = NamespaceIdent::new("a".into());
1092        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1093        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1094
1095        catalog.drop_namespace(&namespace_ident_a).await.unwrap();
1096
1097        assert!(!catalog.namespace_exists(&namespace_ident_a).await.unwrap());
1098
1099        assert!(
1100            !catalog
1101                .namespace_exists(&namespace_ident_a_b)
1102                .await
1103                .unwrap()
1104        );
1105    }
1106
1107    #[tokio::test]
1108    async fn test_create_table_with_location() {
1109        let tmp_dir = TempDir::new().unwrap();
1110        let catalog = new_memory_catalog().await;
1111        let namespace_ident = NamespaceIdent::new("a".into());
1112        create_namespace(&catalog, &namespace_ident).await;
1113
1114        let table_name = "abc";
1115        let location = tmp_dir.path().to_str().unwrap().to_string();
1116        let table_creation = TableCreation::builder()
1117            .name(table_name.into())
1118            .location(location.clone())
1119            .schema(simple_table_schema())
1120            .build();
1121
1122        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1123
1124        assert_table_eq(
1125            &catalog
1126                .create_table(&namespace_ident, table_creation)
1127                .await
1128                .unwrap(),
1129            &expected_table_ident,
1130            &simple_table_schema(),
1131        );
1132
1133        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1134
1135        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1136
1137        assert!(
1138            table
1139                .metadata_location()
1140                .unwrap()
1141                .to_string()
1142                .starts_with(&location)
1143        )
1144    }
1145
1146    #[tokio::test]
1147    async fn test_create_table_falls_back_to_namespace_location_if_table_location_is_missing() {
1148        let warehouse_location = temp_path();
1149        let catalog = MemoryCatalogBuilder::default()
1150            .load(
1151                "memory",
1152                HashMap::from([(
1153                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1154                    warehouse_location.clone(),
1155                )]),
1156            )
1157            .await
1158            .unwrap();
1159
1160        let namespace_ident = NamespaceIdent::new("a".into());
1161        let mut namespace_properties = HashMap::new();
1162        let namespace_location = temp_path();
1163        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1164        catalog
1165            .create_namespace(&namespace_ident, namespace_properties)
1166            .await
1167            .unwrap();
1168
1169        let table_name = "tbl1";
1170        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1171        let expected_table_metadata_location_regex = format!(
1172            "^{}/tbl1/metadata/00000-{}.metadata.json$",
1173            namespace_location, UUID_REGEX_STR,
1174        );
1175
1176        let table = catalog
1177            .create_table(
1178                &namespace_ident,
1179                TableCreation::builder()
1180                    .name(table_name.into())
1181                    .schema(simple_table_schema())
1182                    // no location specified for table
1183                    .build(),
1184            )
1185            .await
1186            .unwrap();
1187        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1188        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1189
1190        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1191        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1192        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1193    }
1194
1195    #[tokio::test]
1196    async fn test_create_table_in_nested_namespace_falls_back_to_nested_namespace_location_if_table_location_is_missing()
1197     {
1198        let warehouse_location = temp_path();
1199        let catalog = MemoryCatalogBuilder::default()
1200            .load(
1201                "memory",
1202                HashMap::from([(
1203                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1204                    warehouse_location.clone(),
1205                )]),
1206            )
1207            .await
1208            .unwrap();
1209
1210        let namespace_ident = NamespaceIdent::new("a".into());
1211        let mut namespace_properties = HashMap::new();
1212        let namespace_location = temp_path();
1213        namespace_properties.insert(LOCATION.to_string(), namespace_location.to_string());
1214        catalog
1215            .create_namespace(&namespace_ident, namespace_properties)
1216            .await
1217            .unwrap();
1218
1219        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1220        let mut nested_namespace_properties = HashMap::new();
1221        let nested_namespace_location = temp_path();
1222        nested_namespace_properties
1223            .insert(LOCATION.to_string(), nested_namespace_location.to_string());
1224        catalog
1225            .create_namespace(&nested_namespace_ident, nested_namespace_properties)
1226            .await
1227            .unwrap();
1228
1229        let table_name = "tbl1";
1230        let expected_table_ident =
1231            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1232        let expected_table_metadata_location_regex = format!(
1233            "^{}/tbl1/metadata/00000-{}.metadata.json$",
1234            nested_namespace_location, UUID_REGEX_STR,
1235        );
1236
1237        let table = catalog
1238            .create_table(
1239                &nested_namespace_ident,
1240                TableCreation::builder()
1241                    .name(table_name.into())
1242                    .schema(simple_table_schema())
1243                    // no location specified for table
1244                    .build(),
1245            )
1246            .await
1247            .unwrap();
1248        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1249        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1250
1251        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1252        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1253        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1254    }
1255
1256    #[tokio::test]
1257    async fn test_create_table_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1258     {
1259        let warehouse_location = temp_path();
1260        let catalog = MemoryCatalogBuilder::default()
1261            .load(
1262                "memory",
1263                HashMap::from([(
1264                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1265                    warehouse_location.clone(),
1266                )]),
1267            )
1268            .await
1269            .unwrap();
1270
1271        let namespace_ident = NamespaceIdent::new("a".into());
1272        // note: no location specified in namespace_properties
1273        let namespace_properties = HashMap::new();
1274        catalog
1275            .create_namespace(&namespace_ident, namespace_properties)
1276            .await
1277            .unwrap();
1278
1279        let table_name = "tbl1";
1280        let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1281        let expected_table_metadata_location_regex = format!(
1282            "^{}/a/tbl1/metadata/00000-{}.metadata.json$",
1283            warehouse_location, UUID_REGEX_STR
1284        );
1285
1286        let table = catalog
1287            .create_table(
1288                &namespace_ident,
1289                TableCreation::builder()
1290                    .name(table_name.into())
1291                    .schema(simple_table_schema())
1292                    // no location specified for table
1293                    .build(),
1294            )
1295            .await
1296            .unwrap();
1297        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1298        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1299
1300        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1301        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1302        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1303    }
1304
1305    #[tokio::test]
1306    async fn test_create_table_in_nested_namespace_falls_back_to_warehouse_location_if_both_table_location_and_namespace_location_are_missing()
1307     {
1308        let warehouse_location = temp_path();
1309        let catalog = MemoryCatalogBuilder::default()
1310            .load(
1311                "memory",
1312                HashMap::from([(
1313                    MEMORY_CATALOG_WAREHOUSE.to_string(),
1314                    warehouse_location.clone(),
1315                )]),
1316            )
1317            .await
1318            .unwrap();
1319
1320        let namespace_ident = NamespaceIdent::new("a".into());
1321        catalog
1322            // note: no location specified in namespace_properties
1323            .create_namespace(&namespace_ident, HashMap::new())
1324            .await
1325            .unwrap();
1326
1327        let nested_namespace_ident = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1328        catalog
1329            // note: no location specified in namespace_properties
1330            .create_namespace(&nested_namespace_ident, HashMap::new())
1331            .await
1332            .unwrap();
1333
1334        let table_name = "tbl1";
1335        let expected_table_ident =
1336            TableIdent::new(nested_namespace_ident.clone(), table_name.into());
1337        let expected_table_metadata_location_regex = format!(
1338            "^{}/a/b/tbl1/metadata/00000-{}.metadata.json$",
1339            warehouse_location, UUID_REGEX_STR
1340        );
1341
1342        let table = catalog
1343            .create_table(
1344                &nested_namespace_ident,
1345                TableCreation::builder()
1346                    .name(table_name.into())
1347                    .schema(simple_table_schema())
1348                    // no location specified for table
1349                    .build(),
1350            )
1351            .await
1352            .unwrap();
1353        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1354        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1355
1356        let table = catalog.load_table(&expected_table_ident).await.unwrap();
1357        assert_table_eq(&table, &expected_table_ident, &simple_table_schema());
1358        assert_table_metadata_location_matches(&table, &expected_table_metadata_location_regex);
1359    }
1360
1361    #[tokio::test]
1362    async fn test_create_table_throws_error_if_table_location_and_namespace_location_and_warehouse_location_are_missing()
1363     {
1364        let catalog = MemoryCatalogBuilder::default()
1365            .load("memory", HashMap::from([]))
1366            .await;
1367
1368        assert!(catalog.is_err());
1369        assert_eq!(
1370            catalog.unwrap_err().to_string(),
1371            "DataInvalid => Catalog warehouse is required"
1372        );
1373    }
1374
1375    #[tokio::test]
1376    async fn test_create_table_throws_error_if_table_with_same_name_already_exists() {
1377        let catalog = new_memory_catalog().await;
1378        let namespace_ident = NamespaceIdent::new("a".into());
1379        create_namespace(&catalog, &namespace_ident).await;
1380        let table_name = "tbl1";
1381        let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
1382        create_table(&catalog, &table_ident).await;
1383
1384        let tmp_dir = TempDir::new().unwrap();
1385        let location = tmp_dir.path().to_str().unwrap().to_string();
1386
1387        assert_eq!(
1388            catalog
1389                .create_table(
1390                    &namespace_ident,
1391                    TableCreation::builder()
1392                        .name(table_name.into())
1393                        .schema(simple_table_schema())
1394                        .location(location)
1395                        .build()
1396                )
1397                .await
1398                .unwrap_err()
1399                .to_string(),
1400            format!(
1401                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1402                &table_ident
1403            )
1404        );
1405    }
1406
1407    #[tokio::test]
1408    async fn test_list_tables_returns_empty_vector() {
1409        let catalog = new_memory_catalog().await;
1410        let namespace_ident = NamespaceIdent::new("a".into());
1411        create_namespace(&catalog, &namespace_ident).await;
1412
1413        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![]);
1414    }
1415
1416    #[tokio::test]
1417    async fn test_list_tables_returns_a_single_table() {
1418        let catalog = new_memory_catalog().await;
1419        let namespace_ident = NamespaceIdent::new("n1".into());
1420        create_namespace(&catalog, &namespace_ident).await;
1421
1422        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1423        create_table(&catalog, &table_ident).await;
1424
1425        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1426            table_ident
1427        ]);
1428    }
1429
1430    #[tokio::test]
1431    async fn test_list_tables_returns_multiple_tables() {
1432        let catalog = new_memory_catalog().await;
1433        let namespace_ident = NamespaceIdent::new("n1".into());
1434        create_namespace(&catalog, &namespace_ident).await;
1435
1436        let table_ident_1 = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1437        let table_ident_2 = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1438        let _ = create_tables(&catalog, vec![&table_ident_1, &table_ident_2]).await;
1439
1440        assert_eq!(
1441            to_set(catalog.list_tables(&namespace_ident).await.unwrap()),
1442            to_set(vec![table_ident_1, table_ident_2])
1443        );
1444    }
1445
1446    #[tokio::test]
1447    async fn test_list_tables_returns_tables_from_correct_namespace() {
1448        let catalog = new_memory_catalog().await;
1449        let namespace_ident_1 = NamespaceIdent::new("n1".into());
1450        let namespace_ident_2 = NamespaceIdent::new("n2".into());
1451        create_namespaces(&catalog, &vec![&namespace_ident_1, &namespace_ident_2]).await;
1452
1453        let table_ident_1 = TableIdent::new(namespace_ident_1.clone(), "tbl1".into());
1454        let table_ident_2 = TableIdent::new(namespace_ident_1.clone(), "tbl2".into());
1455        let table_ident_3 = TableIdent::new(namespace_ident_2.clone(), "tbl1".into());
1456        let _ = create_tables(&catalog, vec![
1457            &table_ident_1,
1458            &table_ident_2,
1459            &table_ident_3,
1460        ])
1461        .await;
1462
1463        assert_eq!(
1464            to_set(catalog.list_tables(&namespace_ident_1).await.unwrap()),
1465            to_set(vec![table_ident_1, table_ident_2])
1466        );
1467
1468        assert_eq!(
1469            to_set(catalog.list_tables(&namespace_ident_2).await.unwrap()),
1470            to_set(vec![table_ident_3])
1471        );
1472    }
1473
1474    #[tokio::test]
1475    async fn test_list_tables_returns_table_under_nested_namespace() {
1476        let catalog = new_memory_catalog().await;
1477        let namespace_ident_a = NamespaceIdent::new("a".into());
1478        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1479        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1480
1481        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1482        create_table(&catalog, &table_ident).await;
1483
1484        assert_eq!(
1485            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1486            vec![table_ident]
1487        );
1488    }
1489
1490    #[tokio::test]
1491    async fn test_list_tables_throws_error_if_namespace_doesnt_exist() {
1492        let catalog = new_memory_catalog().await;
1493
1494        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1495
1496        assert_eq!(
1497            catalog
1498                .list_tables(&non_existent_namespace_ident)
1499                .await
1500                .unwrap_err()
1501                .to_string(),
1502            format!(
1503                "NamespaceNotFound => No such namespace: {:?}",
1504                non_existent_namespace_ident
1505            ),
1506        );
1507    }
1508
1509    #[tokio::test]
1510    async fn test_drop_table() {
1511        let catalog = new_memory_catalog().await;
1512        let namespace_ident = NamespaceIdent::new("n1".into());
1513        create_namespace(&catalog, &namespace_ident).await;
1514        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1515        create_table(&catalog, &table_ident).await;
1516
1517        catalog.drop_table(&table_ident).await.unwrap();
1518    }
1519
1520    #[tokio::test]
1521    async fn test_drop_table_drops_table_under_nested_namespace() {
1522        let catalog = new_memory_catalog().await;
1523        let namespace_ident_a = NamespaceIdent::new("a".into());
1524        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1525        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1526
1527        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1528        create_table(&catalog, &table_ident).await;
1529
1530        catalog.drop_table(&table_ident).await.unwrap();
1531
1532        assert_eq!(
1533            catalog.list_tables(&namespace_ident_a_b).await.unwrap(),
1534            vec![]
1535        );
1536    }
1537
1538    #[tokio::test]
1539    async fn test_drop_table_throws_error_if_namespace_doesnt_exist() {
1540        let catalog = new_memory_catalog().await;
1541
1542        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1543        let non_existent_table_ident =
1544            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1545
1546        assert_eq!(
1547            catalog
1548                .drop_table(&non_existent_table_ident)
1549                .await
1550                .unwrap_err()
1551                .to_string(),
1552            format!(
1553                "NamespaceNotFound => No such namespace: {:?}",
1554                non_existent_namespace_ident
1555            ),
1556        );
1557    }
1558
1559    #[tokio::test]
1560    async fn test_drop_table_throws_error_if_table_doesnt_exist() {
1561        let catalog = new_memory_catalog().await;
1562        let namespace_ident = NamespaceIdent::new("n1".into());
1563        create_namespace(&catalog, &namespace_ident).await;
1564
1565        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1566
1567        assert_eq!(
1568            catalog
1569                .drop_table(&non_existent_table_ident)
1570                .await
1571                .unwrap_err()
1572                .to_string(),
1573            format!(
1574                "TableNotFound => No such table: {:?}",
1575                non_existent_table_ident
1576            ),
1577        );
1578    }
1579
1580    #[tokio::test]
1581    async fn test_table_exists_returns_true() {
1582        let catalog = new_memory_catalog().await;
1583        let namespace_ident = NamespaceIdent::new("n1".into());
1584        create_namespace(&catalog, &namespace_ident).await;
1585        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1586        create_table(&catalog, &table_ident).await;
1587
1588        assert!(catalog.table_exists(&table_ident).await.unwrap());
1589    }
1590
1591    #[tokio::test]
1592    async fn test_table_exists_returns_false() {
1593        let catalog = new_memory_catalog().await;
1594        let namespace_ident = NamespaceIdent::new("n1".into());
1595        create_namespace(&catalog, &namespace_ident).await;
1596        let non_existent_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1597
1598        assert!(
1599            !catalog
1600                .table_exists(&non_existent_table_ident)
1601                .await
1602                .unwrap()
1603        );
1604    }
1605
1606    #[tokio::test]
1607    async fn test_table_exists_under_nested_namespace() {
1608        let catalog = new_memory_catalog().await;
1609        let namespace_ident_a = NamespaceIdent::new("a".into());
1610        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1611        create_namespaces(&catalog, &vec![&namespace_ident_a, &namespace_ident_a_b]).await;
1612
1613        let table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1614        create_table(&catalog, &table_ident).await;
1615
1616        assert!(catalog.table_exists(&table_ident).await.unwrap());
1617
1618        let non_existent_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl2".into());
1619        assert!(
1620            !catalog
1621                .table_exists(&non_existent_table_ident)
1622                .await
1623                .unwrap()
1624        );
1625    }
1626
1627    #[tokio::test]
1628    async fn test_table_exists_throws_error_if_namespace_doesnt_exist() {
1629        let catalog = new_memory_catalog().await;
1630
1631        let non_existent_namespace_ident = NamespaceIdent::new("n1".into());
1632        let non_existent_table_ident =
1633            TableIdent::new(non_existent_namespace_ident.clone(), "tbl1".into());
1634
1635        assert_eq!(
1636            catalog
1637                .table_exists(&non_existent_table_ident)
1638                .await
1639                .unwrap_err()
1640                .to_string(),
1641            format!(
1642                "NamespaceNotFound => No such namespace: {:?}",
1643                non_existent_namespace_ident
1644            ),
1645        );
1646    }
1647
1648    #[tokio::test]
1649    async fn test_rename_table_in_same_namespace() {
1650        let catalog = new_memory_catalog().await;
1651        let namespace_ident = NamespaceIdent::new("n1".into());
1652        create_namespace(&catalog, &namespace_ident).await;
1653        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1654        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1655        create_table(&catalog, &src_table_ident).await;
1656
1657        catalog
1658            .rename_table(&src_table_ident, &dst_table_ident)
1659            .await
1660            .unwrap();
1661
1662        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1663            dst_table_ident
1664        ],);
1665    }
1666
1667    #[tokio::test]
1668    async fn test_rename_table_across_namespaces() {
1669        let catalog = new_memory_catalog().await;
1670        let src_namespace_ident = NamespaceIdent::new("a".into());
1671        let dst_namespace_ident = NamespaceIdent::new("b".into());
1672        create_namespaces(&catalog, &vec![&src_namespace_ident, &dst_namespace_ident]).await;
1673        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1674        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl2".into());
1675        create_table(&catalog, &src_table_ident).await;
1676
1677        catalog
1678            .rename_table(&src_table_ident, &dst_table_ident)
1679            .await
1680            .unwrap();
1681
1682        assert_eq!(
1683            catalog.list_tables(&src_namespace_ident).await.unwrap(),
1684            vec![],
1685        );
1686
1687        assert_eq!(
1688            catalog.list_tables(&dst_namespace_ident).await.unwrap(),
1689            vec![dst_table_ident],
1690        );
1691    }
1692
1693    #[tokio::test]
1694    async fn test_rename_table_src_table_is_same_as_dst_table() {
1695        let catalog = new_memory_catalog().await;
1696        let namespace_ident = NamespaceIdent::new("n1".into());
1697        create_namespace(&catalog, &namespace_ident).await;
1698        let table_ident = TableIdent::new(namespace_ident.clone(), "tbl".into());
1699        create_table(&catalog, &table_ident).await;
1700
1701        catalog
1702            .rename_table(&table_ident, &table_ident)
1703            .await
1704            .unwrap();
1705
1706        assert_eq!(catalog.list_tables(&namespace_ident).await.unwrap(), vec![
1707            table_ident
1708        ],);
1709    }
1710
1711    #[tokio::test]
1712    async fn test_rename_table_across_nested_namespaces() {
1713        let catalog = new_memory_catalog().await;
1714        let namespace_ident_a = NamespaceIdent::new("a".into());
1715        let namespace_ident_a_b = NamespaceIdent::from_strs(vec!["a", "b"]).unwrap();
1716        let namespace_ident_a_b_c = NamespaceIdent::from_strs(vec!["a", "b", "c"]).unwrap();
1717        create_namespaces(&catalog, &vec![
1718            &namespace_ident_a,
1719            &namespace_ident_a_b,
1720            &namespace_ident_a_b_c,
1721        ])
1722        .await;
1723
1724        let src_table_ident = TableIdent::new(namespace_ident_a_b_c.clone(), "tbl1".into());
1725        create_tables(&catalog, vec![&src_table_ident]).await;
1726
1727        let dst_table_ident = TableIdent::new(namespace_ident_a_b.clone(), "tbl1".into());
1728        catalog
1729            .rename_table(&src_table_ident, &dst_table_ident)
1730            .await
1731            .unwrap();
1732
1733        assert!(!catalog.table_exists(&src_table_ident).await.unwrap());
1734
1735        assert!(catalog.table_exists(&dst_table_ident).await.unwrap());
1736    }
1737
1738    #[tokio::test]
1739    async fn test_rename_table_throws_error_if_src_namespace_doesnt_exist() {
1740        let catalog = new_memory_catalog().await;
1741
1742        let non_existent_src_namespace_ident = NamespaceIdent::new("n1".into());
1743        let src_table_ident =
1744            TableIdent::new(non_existent_src_namespace_ident.clone(), "tbl1".into());
1745
1746        let dst_namespace_ident = NamespaceIdent::new("n2".into());
1747        create_namespace(&catalog, &dst_namespace_ident).await;
1748        let dst_table_ident = TableIdent::new(dst_namespace_ident.clone(), "tbl1".into());
1749
1750        assert_eq!(
1751            catalog
1752                .rename_table(&src_table_ident, &dst_table_ident)
1753                .await
1754                .unwrap_err()
1755                .to_string(),
1756            format!(
1757                "NamespaceNotFound => No such namespace: {:?}",
1758                non_existent_src_namespace_ident
1759            ),
1760        );
1761    }
1762
1763    #[tokio::test]
1764    async fn test_rename_table_throws_error_if_dst_namespace_doesnt_exist() {
1765        let catalog = new_memory_catalog().await;
1766        let src_namespace_ident = NamespaceIdent::new("n1".into());
1767        let src_table_ident = TableIdent::new(src_namespace_ident.clone(), "tbl1".into());
1768        create_namespace(&catalog, &src_namespace_ident).await;
1769        create_table(&catalog, &src_table_ident).await;
1770
1771        let non_existent_dst_namespace_ident = NamespaceIdent::new("n2".into());
1772        let dst_table_ident =
1773            TableIdent::new(non_existent_dst_namespace_ident.clone(), "tbl1".into());
1774        assert_eq!(
1775            catalog
1776                .rename_table(&src_table_ident, &dst_table_ident)
1777                .await
1778                .unwrap_err()
1779                .to_string(),
1780            format!(
1781                "NamespaceNotFound => No such namespace: {:?}",
1782                non_existent_dst_namespace_ident
1783            ),
1784        );
1785    }
1786
1787    #[tokio::test]
1788    async fn test_rename_table_throws_error_if_src_table_doesnt_exist() {
1789        let catalog = new_memory_catalog().await;
1790        let namespace_ident = NamespaceIdent::new("n1".into());
1791        create_namespace(&catalog, &namespace_ident).await;
1792        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1793        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1794
1795        assert_eq!(
1796            catalog
1797                .rename_table(&src_table_ident, &dst_table_ident)
1798                .await
1799                .unwrap_err()
1800                .to_string(),
1801            format!("TableNotFound => No such table: {:?}", src_table_ident),
1802        );
1803    }
1804
1805    #[tokio::test]
1806    async fn test_rename_table_throws_error_if_dst_table_already_exists() {
1807        let catalog = new_memory_catalog().await;
1808        let namespace_ident = NamespaceIdent::new("n1".into());
1809        create_namespace(&catalog, &namespace_ident).await;
1810        let src_table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
1811        let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
1812        create_tables(&catalog, vec![&src_table_ident, &dst_table_ident]).await;
1813
1814        assert_eq!(
1815            catalog
1816                .rename_table(&src_table_ident, &dst_table_ident)
1817                .await
1818                .unwrap_err()
1819                .to_string(),
1820            format!(
1821                "TableAlreadyExists => Cannot create table {:?}. Table already exists.",
1822                &dst_table_ident
1823            ),
1824        );
1825    }
1826
1827    #[tokio::test]
1828    async fn test_register_table() {
1829        // Create a catalog and namespace
1830        let catalog = new_memory_catalog().await;
1831        let namespace_ident = NamespaceIdent::new("test_namespace".into());
1832        create_namespace(&catalog, &namespace_ident).await;
1833
1834        // Create a table to get a valid metadata file
1835        let source_table_ident = TableIdent::new(namespace_ident.clone(), "source_table".into());
1836        create_table(&catalog, &source_table_ident).await;
1837
1838        // Get the metadata location from the source table
1839        let source_table = catalog.load_table(&source_table_ident).await.unwrap();
1840        let metadata_location = source_table.metadata_location().unwrap().to_string();
1841
1842        // Register a new table using the same metadata location
1843        let register_table_ident =
1844            TableIdent::new(namespace_ident.clone(), "register_table".into());
1845        let registered_table = catalog
1846            .register_table(&register_table_ident, metadata_location.clone())
1847            .await
1848            .unwrap();
1849
1850        // Verify the registered table has the correct identifier
1851        assert_eq!(registered_table.identifier(), &register_table_ident);
1852
1853        // Verify the registered table has the correct metadata location
1854        assert_eq!(
1855            registered_table.metadata_location().unwrap().to_string(),
1856            metadata_location
1857        );
1858
1859        // Verify the table exists in the catalog
1860        assert!(catalog.table_exists(&register_table_ident).await.unwrap());
1861
1862        // Verify we can load the registered table
1863        let loaded_table = catalog.load_table(&register_table_ident).await.unwrap();
1864        assert_eq!(loaded_table.identifier(), &register_table_ident);
1865        assert_eq!(
1866            loaded_table.metadata_location().unwrap().to_string(),
1867            metadata_location
1868        );
1869    }
1870
1871    #[tokio::test]
1872    async fn test_update_table() {
1873        let catalog = new_memory_catalog().await;
1874
1875        let table = create_table_with_namespace(&catalog).await;
1876
1877        // Assert the table doesn't contain the update yet
1878        assert!(!table.metadata().properties().contains_key("key"));
1879
1880        // Update table metadata
1881        let tx = Transaction::new(&table);
1882        let updated_table = tx
1883            .update_table_properties()
1884            .set("key".to_string(), "value".to_string())
1885            .apply(tx)
1886            .unwrap()
1887            .commit(&catalog)
1888            .await
1889            .unwrap();
1890
1891        assert_eq!(
1892            updated_table.metadata().properties().get("key").unwrap(),
1893            "value"
1894        );
1895
1896        assert_eq!(table.identifier(), updated_table.identifier());
1897        assert_eq!(table.metadata().uuid(), updated_table.metadata().uuid());
1898        assert!(table.metadata().last_updated_ms() < updated_table.metadata().last_updated_ms());
1899        assert_ne!(table.metadata_location(), updated_table.metadata_location());
1900
1901        assert!(
1902            table.metadata().metadata_log().len() < updated_table.metadata().metadata_log().len()
1903        );
1904    }
1905
1906    #[tokio::test]
1907    async fn test_update_table_fails_if_table_doesnt_exist() {
1908        let catalog = new_memory_catalog().await;
1909
1910        let namespace_ident = NamespaceIdent::new("a".into());
1911        create_namespace(&catalog, &namespace_ident).await;
1912
1913        // This table is not known to the catalog.
1914        let table_ident = TableIdent::new(namespace_ident, "test".to_string());
1915        let table = build_table(table_ident);
1916
1917        let tx = Transaction::new(&table);
1918        let err = tx
1919            .update_table_properties()
1920            .set("key".to_string(), "value".to_string())
1921            .apply(tx)
1922            .unwrap()
1923            .commit(&catalog)
1924            .await
1925            .unwrap_err();
1926        assert_eq!(err.kind(), ErrorKind::TableNotFound);
1927    }
1928
1929    fn build_table(ident: TableIdent) -> Table {
1930        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1931
1932        let temp_dir = TempDir::new().unwrap();
1933        let location = temp_dir.path().to_str().unwrap().to_string();
1934
1935        let table_creation = TableCreation::builder()
1936            .name(ident.name().to_string())
1937            .schema(simple_table_schema())
1938            .location(location)
1939            .build();
1940        let metadata = TableMetadataBuilder::from_table_creation(table_creation)
1941            .unwrap()
1942            .build()
1943            .unwrap()
1944            .metadata;
1945
1946        Table::builder()
1947            .identifier(ident)
1948            .metadata(metadata)
1949            .file_io(file_io)
1950            .build()
1951            .unwrap()
1952    }
1953}